mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 13:31:14 +00:00
f12789ebf0
Replace 3-way byte-equivalent orchestration-spine duplication across okvis2.py / vins_mono.py / klt_ransac.py with a single c1-internal helper at components/c1_vio/_facade_spine.py. Closes cumulative review batches 52-54 Finding F1. No behaviour change — all existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified (114 c1_vio tests green, 237 with adjacent regression suite). The helper exposes 5 stateless free functions (now_iso, bias_norm, se3_from_4x4, frame_ts_ns, frame_image) and a FacadeSpine mixin class providing _classify_state / _tick_lost / _emit_transition. Concrete strategies inherit the mixin and set spine-required instance attributes in __init__. Mirrors the AZ-527 precedent for c2_vpr-side _assert_engine_output_dim consolidation. New test file test_az528_facade_spine.py covers AC-1..AC-8 with 19 tests, including an AST regression guard that prevents future re-introduction of the consolidated free functions in any strategy module, plus a Risk-1 static check that every strategy's __init__ assigns every spine-required attribute. Archive AZ-528 task spec to done/, bump autodev state to batch 56. Co-authored-by: Cursor <cursoragent@cursor.com>
458 lines
14 KiB
Python
458 lines
14 KiB
Python
"""AZ-528 — c1_vio facade orchestration-spine consolidation tests.
|
|
|
|
Covers AC-1..AC-8 of
|
|
``_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md``:
|
|
|
|
- AC-1: helper module exposes the documented surface.
|
|
- AC-2: ``now_iso()`` returns an aware UTC ISO-8601 timestamp with
|
|
``+00:00`` offset (NOT the ``Z``-suffix variant — that is
|
|
``iso_ts_from_clock`` in AZ-526).
|
|
- AC-3: ``bias_norm`` matches the L2 formula on a hand-checked vector.
|
|
- AC-4: ``se3_from_4x4`` builds a ``gtsam.Pose3`` with the expected
|
|
identity rotation + zero translation when fed ``np.eye(4)``.
|
|
- AC-5: ``FacadeSpine._classify_state`` returns INIT during warm-up,
|
|
TRACKING above the threshold, DEGRADED below it.
|
|
- AC-6: ``FacadeSpine._tick_lost`` demotes TRACKING → DEGRADED on the
|
|
first lost frame and escalates to LOST at the threshold.
|
|
- AC-7: ``FacadeSpine._emit_transition`` emits exactly one
|
|
``vio.health`` FDR record per state change (no record on
|
|
steady-state).
|
|
- AC-8: AST-walk regression guard — zero module-level definitions of
|
|
the consolidated free functions remain in any of the three strategy
|
|
modules. Mirrors the AZ-508 / AZ-526 / AZ-527 pattern.
|
|
|
|
AC-9 (existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified) and
|
|
AC-10 (AZ-270 layer lint) ride the existing test files; this module
|
|
does not re-stage them.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Final
|
|
|
|
import gtsam
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.nav import FeatureQuality, ImuBias, VioState
|
|
from gps_denied_onboard.components.c1_vio._facade_spine import (
|
|
FacadeSpine,
|
|
bias_norm,
|
|
frame_image,
|
|
frame_ts_ns,
|
|
now_iso,
|
|
se3_from_4x4,
|
|
)
|
|
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
|
|
|
|
_C1_VIO_SRC: Final[Path] = (
|
|
Path(__file__).resolve().parents[3]
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "components"
|
|
/ "c1_vio"
|
|
)
|
|
_STRATEGY_MODULES: Final[tuple[str, ...]] = (
|
|
"okvis2.py",
|
|
"vins_mono.py",
|
|
"klt_ransac.py",
|
|
)
|
|
_FORBIDDEN_FREE_FUNCS: Final[frozenset[str]] = frozenset(
|
|
{"_now_iso", "_bias_norm", "_se3_from_4x4", "_frame_ts_ns", "_frame_image"}
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-1 — surface.
|
|
|
|
|
|
def test_ac1_helper_module_exposes_documented_surface() -> None:
|
|
# Assert
|
|
assert callable(now_iso)
|
|
assert callable(bias_norm)
|
|
assert callable(se3_from_4x4)
|
|
assert callable(frame_ts_ns)
|
|
assert callable(frame_image)
|
|
assert isinstance(FacadeSpine, type)
|
|
for method_name in ("_classify_state", "_tick_lost", "_emit_transition"):
|
|
assert callable(getattr(FacadeSpine, method_name)), method_name
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-2 — now_iso ISO-8601 UTC with +00:00 offset.
|
|
|
|
|
|
def test_ac2_now_iso_returns_aware_utc_with_plus_offset() -> None:
|
|
# Act
|
|
stamp = now_iso()
|
|
|
|
# Assert
|
|
parsed = datetime.fromisoformat(stamp)
|
|
assert parsed.utcoffset() is not None, "expected aware datetime"
|
|
assert parsed.utcoffset().total_seconds() == 0.0 # type: ignore[union-attr]
|
|
assert stamp.endswith("+00:00"), (
|
|
f"expected '+00:00' offset suffix, not 'Z'; got {stamp!r}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-3 — bias_norm matches the L2 formula.
|
|
|
|
|
|
def test_ac3_bias_norm_matches_l2_formula() -> None:
|
|
# Arrange
|
|
bias = ImuBias(accel_bias=(1.0, 2.0, 2.0), gyro_bias=(0.0, 0.0, 0.0))
|
|
|
|
# Act
|
|
result = bias_norm(bias)
|
|
|
|
# Assert
|
|
assert result == pytest.approx(3.0)
|
|
|
|
|
|
def test_ac3_bias_norm_includes_gyro_component() -> None:
|
|
# Arrange
|
|
bias = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 4.0, 3.0))
|
|
|
|
# Act
|
|
result = bias_norm(bias)
|
|
|
|
# Assert
|
|
assert result == pytest.approx(5.0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-4 — se3_from_4x4 returns gtsam.Pose3.
|
|
|
|
|
|
def test_ac4_se3_from_4x4_builds_identity_pose() -> None:
|
|
# Act
|
|
pose = se3_from_4x4(np.eye(4, dtype=np.float64))
|
|
|
|
# Assert
|
|
assert isinstance(pose, gtsam.Pose3)
|
|
translation = np.asarray(pose.translation())
|
|
assert translation.shape == (3,)
|
|
assert np.allclose(translation, np.zeros(3))
|
|
rotation = np.asarray(pose.rotation().matrix())
|
|
assert np.allclose(rotation, np.eye(3))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers for AC-5..AC-7 — minimal test-only mixin subclass that
|
|
# only sets the attributes the mixin reads. No native binding, no
|
|
# config DTO, no real fdr client wiring.
|
|
|
|
|
|
class _SpineHarness(FacadeSpine):
|
|
def __init__(
|
|
self,
|
|
*,
|
|
fdr: FakeFdrSink,
|
|
producer_id: str = "c1_vio.test",
|
|
strategy_label: str = "test_strategy",
|
|
warm_start_max_frames: int = 5,
|
|
feature_threshold: int = 50,
|
|
lost_frame_threshold: int = 3,
|
|
reported_state: VioState = VioState.INIT,
|
|
last_emitted_state: VioState | None = None,
|
|
frames_since_warmup: int = 0,
|
|
consecutive_lost: int = 0,
|
|
latest_bias: ImuBias | None = None,
|
|
) -> None:
|
|
self._fdr = fdr # type: ignore[assignment]
|
|
self._producer_id = producer_id
|
|
self._strategy_label = strategy_label
|
|
self._warm_start_max_frames = warm_start_max_frames
|
|
self._feature_threshold = feature_threshold
|
|
self._lost_frame_threshold = lost_frame_threshold
|
|
self._reported_state = reported_state
|
|
self._last_emitted_state = last_emitted_state
|
|
self._frames_since_warmup = frames_since_warmup
|
|
self._consecutive_lost = consecutive_lost
|
|
self._latest_bias = latest_bias or ImuBias(
|
|
accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)
|
|
)
|
|
|
|
|
|
def _fq(tracked: int) -> FeatureQuality:
|
|
return FeatureQuality(
|
|
tracked=tracked, new=0, lost=0, mean_parallax=1.0, mre_px=0.5
|
|
)
|
|
|
|
|
|
def _fdr() -> FakeFdrSink:
|
|
return FakeFdrSink(producer_id="c1_vio.test", capacity=64)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-5 — _classify_state mirrors the existing logic across all strategies.
|
|
|
|
|
|
def test_ac5_classify_state_returns_init_during_warmup() -> None:
|
|
# Arrange
|
|
spine = _SpineHarness(
|
|
fdr=_fdr(),
|
|
warm_start_max_frames=5,
|
|
feature_threshold=50,
|
|
frames_since_warmup=0,
|
|
reported_state=VioState.INIT,
|
|
)
|
|
|
|
# Act
|
|
state = spine._classify_state(_fq(tracked=80))
|
|
|
|
# Assert
|
|
assert state == VioState.INIT
|
|
|
|
|
|
def test_ac5_classify_state_returns_tracking_after_warmup() -> None:
|
|
# Arrange
|
|
spine = _SpineHarness(
|
|
fdr=_fdr(),
|
|
warm_start_max_frames=5,
|
|
feature_threshold=50,
|
|
frames_since_warmup=5,
|
|
reported_state=VioState.INIT,
|
|
)
|
|
|
|
# Act
|
|
state = spine._classify_state(_fq(tracked=80))
|
|
|
|
# Assert
|
|
assert state == VioState.TRACKING
|
|
|
|
|
|
def test_ac5_classify_state_returns_degraded_below_threshold() -> None:
|
|
# Arrange
|
|
spine = _SpineHarness(
|
|
fdr=_fdr(),
|
|
warm_start_max_frames=5,
|
|
feature_threshold=50,
|
|
frames_since_warmup=10,
|
|
reported_state=VioState.TRACKING,
|
|
)
|
|
|
|
# Act
|
|
state = spine._classify_state(_fq(tracked=10))
|
|
|
|
# Assert
|
|
assert state == VioState.DEGRADED
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-6 — _tick_lost transitions correctly.
|
|
|
|
|
|
def test_ac6_tick_lost_demotes_tracking_to_degraded_first_call() -> None:
|
|
# Arrange
|
|
spine = _SpineHarness(
|
|
fdr=_fdr(),
|
|
lost_frame_threshold=3,
|
|
reported_state=VioState.TRACKING,
|
|
consecutive_lost=0,
|
|
)
|
|
|
|
# Act
|
|
spine._tick_lost("frame_42")
|
|
|
|
# Assert
|
|
assert spine._reported_state == VioState.DEGRADED
|
|
assert spine._consecutive_lost == 1
|
|
|
|
|
|
def test_ac6_tick_lost_escalates_to_lost_at_threshold() -> None:
|
|
# Arrange
|
|
spine = _SpineHarness(
|
|
fdr=_fdr(),
|
|
lost_frame_threshold=3,
|
|
reported_state=VioState.TRACKING,
|
|
consecutive_lost=0,
|
|
)
|
|
|
|
# Act
|
|
spine._tick_lost("frame_42")
|
|
spine._tick_lost("frame_43")
|
|
spine._tick_lost("frame_44")
|
|
|
|
# Assert
|
|
assert spine._reported_state == VioState.LOST
|
|
assert spine._consecutive_lost == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-7 — _emit_transition emits exactly one FDR record per state change.
|
|
|
|
|
|
def test_ac7_emit_transition_no_record_on_steady_state() -> None:
|
|
# Arrange
|
|
fdr = _fdr()
|
|
spine = _SpineHarness(
|
|
fdr=fdr,
|
|
reported_state=VioState.TRACKING,
|
|
last_emitted_state=VioState.TRACKING,
|
|
)
|
|
|
|
# Act
|
|
spine._emit_transition(VioState.TRACKING, "frame_42")
|
|
|
|
# Assert
|
|
assert fdr.records == []
|
|
assert spine._last_emitted_state == VioState.TRACKING
|
|
|
|
|
|
def test_ac7_emit_transition_one_record_per_state_change() -> None:
|
|
# Arrange
|
|
fdr = _fdr()
|
|
spine = _SpineHarness(
|
|
fdr=fdr,
|
|
producer_id="c1_vio.test",
|
|
strategy_label="test_strategy",
|
|
reported_state=VioState.TRACKING,
|
|
last_emitted_state=VioState.TRACKING,
|
|
consecutive_lost=2,
|
|
latest_bias=ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)),
|
|
)
|
|
|
|
# Act
|
|
spine._emit_transition(VioState.DEGRADED, "frame_42")
|
|
|
|
# Assert
|
|
assert len(fdr.records) == 1
|
|
record = fdr.records[0]
|
|
assert record.kind == "vio.health"
|
|
assert record.producer_id == "c1_vio.test"
|
|
assert set(record.payload.keys()) == {
|
|
"state",
|
|
"consecutive_lost",
|
|
"bias_norm",
|
|
"strategy_label",
|
|
"frame_id",
|
|
}
|
|
assert record.payload["state"] == VioState.DEGRADED.value
|
|
assert record.payload["consecutive_lost"] == 2
|
|
assert record.payload["bias_norm"] == pytest.approx(1.0)
|
|
assert record.payload["strategy_label"] == "test_strategy"
|
|
assert record.payload["frame_id"] == "frame_42"
|
|
assert spine._last_emitted_state == VioState.DEGRADED
|
|
|
|
|
|
def test_ac7_emit_transition_idempotent_for_repeated_state() -> None:
|
|
# Arrange
|
|
fdr = _fdr()
|
|
spine = _SpineHarness(
|
|
fdr=fdr,
|
|
reported_state=VioState.TRACKING,
|
|
last_emitted_state=VioState.TRACKING,
|
|
)
|
|
|
|
# Act
|
|
spine._emit_transition(VioState.DEGRADED, "frame_42")
|
|
spine._emit_transition(VioState.DEGRADED, "frame_43")
|
|
spine._emit_transition(VioState.DEGRADED, "frame_44")
|
|
|
|
# Assert
|
|
assert len(fdr.records) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-8 — AST regression guard: no duplicated free-function definitions
|
|
# remain in any strategy module. Mirrors the AZ-508 / AZ-526 / AZ-527
|
|
# precedent of AST-based source-asserts so a future strategy author
|
|
# cannot silently re-introduce a 4th local copy.
|
|
|
|
|
|
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
|
|
def test_ac8_no_duplicated_free_functions_remain_in_strategy_module(
|
|
strategy_module: str,
|
|
) -> None:
|
|
# Arrange
|
|
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
|
|
tree = ast.parse(src)
|
|
|
|
# Act
|
|
offenders = sorted(
|
|
{
|
|
node.name
|
|
for node in tree.body
|
|
if isinstance(node, ast.FunctionDef) and node.name in _FORBIDDEN_FREE_FUNCS
|
|
}
|
|
)
|
|
|
|
# Assert
|
|
assert offenders == [], (
|
|
f"{strategy_module} re-introduced consolidated free functions: "
|
|
f"{offenders}. They live in _facade_spine.py — import them from "
|
|
f"there instead of re-declaring."
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Risk-1 mitigation — strategies set every attribute the mixin reads.
|
|
# AST-based check: each strategy's __init__ writes every required attr
|
|
# before any mixin method could be called externally. This is the
|
|
# spec's "verify all required attributes set after construction"
|
|
# pattern, executed statically so it does not require booting a fake
|
|
# native binding for the assertion.
|
|
|
|
|
|
_REQUIRED_SPINE_ATTRS: Final[frozenset[str]] = frozenset(
|
|
{
|
|
"_reported_state",
|
|
"_frames_since_warmup",
|
|
"_warm_start_max_frames",
|
|
"_feature_threshold",
|
|
"_consecutive_lost",
|
|
"_lost_frame_threshold",
|
|
"_last_emitted_state",
|
|
"_producer_id",
|
|
"_strategy_label",
|
|
"_latest_bias",
|
|
"_fdr",
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
|
|
def test_strategy_init_sets_all_required_spine_attributes(
|
|
strategy_module: str,
|
|
) -> None:
|
|
# Arrange
|
|
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
|
|
tree = ast.parse(src)
|
|
strategy_class = next(
|
|
node
|
|
for node in ast.walk(tree)
|
|
if isinstance(node, ast.ClassDef) and node.name.endswith("Strategy")
|
|
)
|
|
init_method = next(
|
|
node
|
|
for node in strategy_class.body
|
|
if isinstance(node, ast.FunctionDef) and node.name == "__init__"
|
|
)
|
|
|
|
# Act
|
|
assigned_attrs: set[str] = set()
|
|
for stmt in ast.walk(init_method):
|
|
if not isinstance(stmt, (ast.Assign, ast.AnnAssign)):
|
|
continue
|
|
targets = stmt.targets if isinstance(stmt, ast.Assign) else [stmt.target]
|
|
for target in targets:
|
|
if (
|
|
isinstance(target, ast.Attribute)
|
|
and isinstance(target.value, ast.Name)
|
|
and target.value.id == "self"
|
|
):
|
|
assigned_attrs.add(target.attr)
|
|
|
|
# Assert
|
|
missing = _REQUIRED_SPINE_ATTRS - assigned_attrs
|
|
assert not missing, (
|
|
f"{strategy_module}.__init__ does not set {sorted(missing)}; "
|
|
f"the FacadeSpine mixin needs every one before any state-machine "
|
|
f"method runs."
|
|
)
|