"""AZ-528 — c1_vio facade orchestration-spine consolidation tests. Covers AC-1..AC-8 of ``_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md``: - AC-1: helper module exposes the documented surface. - AC-2: ``now_iso()`` returns an aware UTC ISO-8601 timestamp with ``+00:00`` offset (NOT the ``Z``-suffix variant — that is ``iso_ts_from_clock`` in AZ-526). - AC-3: ``bias_norm`` matches the L2 formula on a hand-checked vector. - AC-4: ``se3_from_4x4`` builds a ``gtsam.Pose3`` with the expected identity rotation + zero translation when fed ``np.eye(4)``. - AC-5: ``FacadeSpine._classify_state`` returns INIT during warm-up, TRACKING above the threshold, DEGRADED below it. - AC-6: ``FacadeSpine._tick_lost`` demotes TRACKING → DEGRADED on the first lost frame and escalates to LOST at the threshold. - AC-7: ``FacadeSpine._emit_transition`` emits exactly one ``vio.health`` FDR record per state change (no record on steady-state). - AC-8: AST-walk regression guard — zero module-level definitions of the consolidated free functions remain in any of the three strategy modules. Mirrors the AZ-508 / AZ-526 / AZ-527 pattern. AC-9 (existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified) and AC-10 (AZ-270 layer lint) ride the existing test files; this module does not re-stage them. """ from __future__ import annotations import ast from datetime import datetime from pathlib import Path from typing import Final import gtsam import numpy as np import pytest from gps_denied_onboard._types.nav import FeatureQuality, ImuBias, VioState from gps_denied_onboard.components.c1_vio._facade_spine import ( FacadeSpine, bias_norm, frame_image, frame_ts_ns, now_iso, se3_from_4x4, ) from gps_denied_onboard.fdr_client.fakes import FakeFdrSink _C1_VIO_SRC: Final[Path] = ( Path(__file__).resolve().parents[3] / "src" / "gps_denied_onboard" / "components" / "c1_vio" ) _STRATEGY_MODULES: Final[tuple[str, ...]] = ( "okvis2.py", "vins_mono.py", "klt_ransac.py", ) _FORBIDDEN_FREE_FUNCS: Final[frozenset[str]] = frozenset( {"_now_iso", "_bias_norm", "_se3_from_4x4", "_frame_ts_ns", "_frame_image"} ) # --------------------------------------------------------------------------- # AC-1 — surface. def test_ac1_helper_module_exposes_documented_surface() -> None: # Assert assert callable(now_iso) assert callable(bias_norm) assert callable(se3_from_4x4) assert callable(frame_ts_ns) assert callable(frame_image) assert isinstance(FacadeSpine, type) for method_name in ("_classify_state", "_tick_lost", "_emit_transition"): assert callable(getattr(FacadeSpine, method_name)), method_name # --------------------------------------------------------------------------- # AC-2 — now_iso ISO-8601 UTC with +00:00 offset. def test_ac2_now_iso_returns_aware_utc_with_plus_offset() -> None: # Act stamp = now_iso() # Assert parsed = datetime.fromisoformat(stamp) assert parsed.utcoffset() is not None, "expected aware datetime" assert parsed.utcoffset().total_seconds() == 0.0 # type: ignore[union-attr] assert stamp.endswith("+00:00"), ( f"expected '+00:00' offset suffix, not 'Z'; got {stamp!r}" ) # --------------------------------------------------------------------------- # AC-3 — bias_norm matches the L2 formula. def test_ac3_bias_norm_matches_l2_formula() -> None: # Arrange bias = ImuBias(accel_bias=(1.0, 2.0, 2.0), gyro_bias=(0.0, 0.0, 0.0)) # Act result = bias_norm(bias) # Assert assert result == pytest.approx(3.0) def test_ac3_bias_norm_includes_gyro_component() -> None: # Arrange bias = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 4.0, 3.0)) # Act result = bias_norm(bias) # Assert assert result == pytest.approx(5.0) # --------------------------------------------------------------------------- # AC-4 — se3_from_4x4 returns gtsam.Pose3. def test_ac4_se3_from_4x4_builds_identity_pose() -> None: # Act pose = se3_from_4x4(np.eye(4, dtype=np.float64)) # Assert assert isinstance(pose, gtsam.Pose3) translation = np.asarray(pose.translation()) assert translation.shape == (3,) assert np.allclose(translation, np.zeros(3)) rotation = np.asarray(pose.rotation().matrix()) assert np.allclose(rotation, np.eye(3)) # --------------------------------------------------------------------------- # Helpers for AC-5..AC-7 — minimal test-only mixin subclass that # only sets the attributes the mixin reads. No native binding, no # config DTO, no real fdr client wiring. class _SpineHarness(FacadeSpine): def __init__( self, *, fdr: FakeFdrSink, producer_id: str = "c1_vio.test", strategy_label: str = "test_strategy", warm_start_max_frames: int = 5, feature_threshold: int = 50, lost_frame_threshold: int = 3, reported_state: VioState = VioState.INIT, last_emitted_state: VioState | None = None, frames_since_warmup: int = 0, consecutive_lost: int = 0, latest_bias: ImuBias | None = None, ) -> None: self._fdr = fdr # type: ignore[assignment] self._producer_id = producer_id self._strategy_label = strategy_label self._warm_start_max_frames = warm_start_max_frames self._feature_threshold = feature_threshold self._lost_frame_threshold = lost_frame_threshold self._reported_state = reported_state self._last_emitted_state = last_emitted_state self._frames_since_warmup = frames_since_warmup self._consecutive_lost = consecutive_lost self._latest_bias = latest_bias or ImuBias( accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0) ) def _fq(tracked: int) -> FeatureQuality: return FeatureQuality( tracked=tracked, new=0, lost=0, mean_parallax=1.0, mre_px=0.5 ) def _fdr() -> FakeFdrSink: return FakeFdrSink(producer_id="c1_vio.test", capacity=64) # --------------------------------------------------------------------------- # AC-5 — _classify_state mirrors the existing logic across all strategies. def test_ac5_classify_state_returns_init_during_warmup() -> None: # Arrange spine = _SpineHarness( fdr=_fdr(), warm_start_max_frames=5, feature_threshold=50, frames_since_warmup=0, reported_state=VioState.INIT, ) # Act state = spine._classify_state(_fq(tracked=80)) # Assert assert state == VioState.INIT def test_ac5_classify_state_returns_tracking_after_warmup() -> None: # Arrange spine = _SpineHarness( fdr=_fdr(), warm_start_max_frames=5, feature_threshold=50, frames_since_warmup=5, reported_state=VioState.INIT, ) # Act state = spine._classify_state(_fq(tracked=80)) # Assert assert state == VioState.TRACKING def test_ac5_classify_state_returns_degraded_below_threshold() -> None: # Arrange spine = _SpineHarness( fdr=_fdr(), warm_start_max_frames=5, feature_threshold=50, frames_since_warmup=10, reported_state=VioState.TRACKING, ) # Act state = spine._classify_state(_fq(tracked=10)) # Assert assert state == VioState.DEGRADED # --------------------------------------------------------------------------- # AC-6 — _tick_lost transitions correctly. def test_ac6_tick_lost_demotes_tracking_to_degraded_first_call() -> None: # Arrange spine = _SpineHarness( fdr=_fdr(), lost_frame_threshold=3, reported_state=VioState.TRACKING, consecutive_lost=0, ) # Act spine._tick_lost("frame_42") # Assert assert spine._reported_state == VioState.DEGRADED assert spine._consecutive_lost == 1 def test_ac6_tick_lost_escalates_to_lost_at_threshold() -> None: # Arrange spine = _SpineHarness( fdr=_fdr(), lost_frame_threshold=3, reported_state=VioState.TRACKING, consecutive_lost=0, ) # Act spine._tick_lost("frame_42") spine._tick_lost("frame_43") spine._tick_lost("frame_44") # Assert assert spine._reported_state == VioState.LOST assert spine._consecutive_lost == 3 # --------------------------------------------------------------------------- # AC-7 — _emit_transition emits exactly one FDR record per state change. def test_ac7_emit_transition_no_record_on_steady_state() -> None: # Arrange fdr = _fdr() spine = _SpineHarness( fdr=fdr, reported_state=VioState.TRACKING, last_emitted_state=VioState.TRACKING, ) # Act spine._emit_transition(VioState.TRACKING, "frame_42") # Assert assert fdr.records == [] assert spine._last_emitted_state == VioState.TRACKING def test_ac7_emit_transition_one_record_per_state_change() -> None: # Arrange fdr = _fdr() spine = _SpineHarness( fdr=fdr, producer_id="c1_vio.test", strategy_label="test_strategy", reported_state=VioState.TRACKING, last_emitted_state=VioState.TRACKING, consecutive_lost=2, latest_bias=ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)), ) # Act spine._emit_transition(VioState.DEGRADED, "frame_42") # Assert assert len(fdr.records) == 1 record = fdr.records[0] assert record.kind == "vio.health" assert record.producer_id == "c1_vio.test" assert set(record.payload.keys()) == { "state", "consecutive_lost", "bias_norm", "strategy_label", "frame_id", } assert record.payload["state"] == VioState.DEGRADED.value assert record.payload["consecutive_lost"] == 2 assert record.payload["bias_norm"] == pytest.approx(1.0) assert record.payload["strategy_label"] == "test_strategy" assert record.payload["frame_id"] == "frame_42" assert spine._last_emitted_state == VioState.DEGRADED def test_ac7_emit_transition_idempotent_for_repeated_state() -> None: # Arrange fdr = _fdr() spine = _SpineHarness( fdr=fdr, reported_state=VioState.TRACKING, last_emitted_state=VioState.TRACKING, ) # Act spine._emit_transition(VioState.DEGRADED, "frame_42") spine._emit_transition(VioState.DEGRADED, "frame_43") spine._emit_transition(VioState.DEGRADED, "frame_44") # Assert assert len(fdr.records) == 1 # --------------------------------------------------------------------------- # AC-8 — AST regression guard: no duplicated free-function definitions # remain in any strategy module. Mirrors the AZ-508 / AZ-526 / AZ-527 # precedent of AST-based source-asserts so a future strategy author # cannot silently re-introduce a 4th local copy. @pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES) def test_ac8_no_duplicated_free_functions_remain_in_strategy_module( strategy_module: str, ) -> None: # Arrange src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8") tree = ast.parse(src) # Act offenders = sorted( { node.name for node in tree.body if isinstance(node, ast.FunctionDef) and node.name in _FORBIDDEN_FREE_FUNCS } ) # Assert assert offenders == [], ( f"{strategy_module} re-introduced consolidated free functions: " f"{offenders}. They live in _facade_spine.py — import them from " f"there instead of re-declaring." ) # --------------------------------------------------------------------------- # Risk-1 mitigation — strategies set every attribute the mixin reads. # AST-based check: each strategy's __init__ writes every required attr # before any mixin method could be called externally. This is the # spec's "verify all required attributes set after construction" # pattern, executed statically so it does not require booting a fake # native binding for the assertion. _REQUIRED_SPINE_ATTRS: Final[frozenset[str]] = frozenset( { "_reported_state", "_frames_since_warmup", "_warm_start_max_frames", "_feature_threshold", "_consecutive_lost", "_lost_frame_threshold", "_last_emitted_state", "_producer_id", "_strategy_label", "_latest_bias", "_fdr", } ) @pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES) def test_strategy_init_sets_all_required_spine_attributes( strategy_module: str, ) -> None: # Arrange src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8") tree = ast.parse(src) strategy_class = next( node for node in ast.walk(tree) if isinstance(node, ast.ClassDef) and node.name.endswith("Strategy") ) init_method = next( node for node in strategy_class.body if isinstance(node, ast.FunctionDef) and node.name == "__init__" ) # Act assigned_attrs: set[str] = set() for stmt in ast.walk(init_method): if not isinstance(stmt, (ast.Assign, ast.AnnAssign)): continue targets = stmt.targets if isinstance(stmt, ast.Assign) else [stmt.target] for target in targets: if ( isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == "self" ): assigned_attrs.add(target.attr) # Assert missing = _REQUIRED_SPINE_ATTRS - assigned_attrs assert not missing, ( f"{strategy_module}.__init__ does not set {sorted(missing)}; " f"the FacadeSpine mixin needs every one before any state-machine " f"method runs." )