"""AZ-333 — :class:`VinsMonoStrategy` acceptance criteria coverage. Covers AC-1 through AC-10 (with AC-9 + NFR-perf tagged ``@pytest.mark.tier2``; the AZ-333 task spec exempts this strategy from the C1-PT-01 ≤ 80 ms p95 hard threshold but still asserts the honest-covariance monotonicity invariant on tier2 with the real binding). Uses the ``fake_vins_mono_binding`` fixture from ``conftest.py`` to script backend responses — the task spec explicitly permits a fake binding for backend-exception injection (AC-3 / AC-6 / AC-7) and by extension the rest of the Python-facade-only AC suite. Mirrors the AZ-332 ``test_okvis2_strategy.py`` layout deliberately: the AZ-331 factory produces both via the same `(config, *, fdr_client)` shape and the IT-12 comparative-study harness expects the two to behave identically through the Python facade. """ from __future__ import annotations from datetime import datetime, timezone import gtsam import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import ( ImuBias, ImuSample, ImuWindow, NavCameraFrame, VioOutput, VioState, WarmStartPose, ) from gps_denied_onboard.components.c1_vio import ( C1VioConfig, VinsMonoConfig, VioError, VioFatalError, VioInitializingError, ) from gps_denied_onboard.config.schema import Config, RuntimeConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import FdrRecord from tests.unit.c1_vio.conftest import ( FakeVinsMonoBackend, FakeVinsMonoFatalException, FakeVinsMonoInitException, FakeVinsMonoOptimizationException, ScriptedOutput, ) def _zero_bias() -> ImuBias: return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) def _calibration() -> CameraCalibration: return CameraCalibration( camera_id="test-cam", intrinsics_3x3=np.eye(3, dtype=np.float64), distortion=np.zeros(4, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="unit-test-static", metadata={}, ) def _frame(idx: int = 1, ts_ns: int = 1_000_000_000) -> NavCameraFrame: return NavCameraFrame( frame_id=idx, timestamp=datetime.fromtimestamp(ts_ns * 1e-9, tz=timezone.utc), image=np.zeros((4, 4, 3), dtype=np.uint8), camera_calibration_id="test-cam", ) def _imu_window(ts_ns_start: int = 999_000_000, n: int = 3) -> ImuWindow: samples = tuple( ImuSample( ts_ns=ts_ns_start + i * 5_000_000, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0), ) for i in range(n) ) return ImuWindow( samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns, ) def _warm_start_hint() -> WarmStartPose: return WarmStartPose( body_T_world=gtsam.Pose3(np.eye(4)), velocity_b=(0.5, 0.0, 0.0), bias=ImuBias( accel_bias=(0.01, -0.02, 0.0), gyro_bias=(0.003, 0.0, -0.001), ), captured_at_ns=1_000_000_000, ) def _config( vins_cfg: VinsMonoConfig | None = None, lost_frame_threshold: int = 9, warm_start_max_frames: int = 5, ) -> Config: return Config.with_blocks( c1_vio=C1VioConfig( strategy="vins_mono", lost_frame_threshold=lost_frame_threshold, warm_start_max_frames=warm_start_max_frames, vins_mono=vins_cfg or VinsMonoConfig(), ), runtime=RuntimeConfig(camera_calibration_path=""), ) @pytest.fixture def fdr_client() -> FdrClient: return FdrClient(producer_id="c1_vio.vins_mono", capacity=256, _emit_diag_log=False) def _build_strategy( fdr_client: FdrClient, config: Config | None = None, ): """Lazy import after the fake binding is installed in sys.modules.""" from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy return VinsMonoStrategy(config or _config(), fdr_client=fdr_client) def _drain(fdr_client: FdrClient) -> list[FdrRecord]: return fdr_client.drain(max_records=1024) # =========================================================================== # AC-1: current_strategy_label returns "vins_mono". def test_ac1_current_strategy_label_returns_vins_mono( fake_vins_mono_binding, fdr_client ) -> None: strategy = _build_strategy(fdr_client) assert strategy.current_strategy_label() == "vins_mono" # =========================================================================== # AC-2: process_frame returns VioOutput with echoed frame_id, SPD cov, bias. def test_ac2_process_frame_returns_vio_output_with_frame_id( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] backend.script(ScriptedOutput(produced=True)) out = strategy.process_frame(_frame(idx=42), _imu_window(), _calibration()) assert isinstance(out, VioOutput) assert out.frame_id == "42" assert out.pose_covariance_6x6.shape == (6, 6) assert np.allclose(out.pose_covariance_6x6, out.pose_covariance_6x6.T) eigvals = np.linalg.eigvalsh(out.pose_covariance_6x6) assert np.all(eigvals > 0), "covariance must be SPD" assert out.imu_bias is not None assert out.feature_quality.tracked > 0 # =========================================================================== # AC-3: backend exceptions rewrap into VioError with __cause__ chain. @pytest.mark.parametrize( "fake_exc_cls, expected_facade_exc", [ (FakeVinsMonoInitException, VioInitializingError), (FakeVinsMonoFatalException, VioFatalError), ], ) def test_ac3_backend_exceptions_rewrap_to_vio_error_family( fake_vins_mono_binding, fdr_client, fake_exc_cls, expected_facade_exc ) -> None: config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] backend.script(ScriptedOutput(raise_with=fake_exc_cls("boom from backend"))) with pytest.raises(expected_facade_exc) as exc_info: strategy.process_frame(_frame(), _imu_window(), _calibration()) assert isinstance(exc_info.value, VioError) assert isinstance(exc_info.value.__cause__, fake_exc_cls) def test_ac3_optimization_exception_during_init_rewraps_to_initializing( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=5, lost_frame_threshold=9) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] backend.script( ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("opt fail")) ) with pytest.raises(VioInitializingError) as exc_info: strategy.process_frame(_frame(), _imu_window(), _calibration()) assert isinstance(exc_info.value.__cause__, FakeVinsMonoOptimizationException) def test_ac3_unmapped_runtime_error_rewraps_to_vio_fatal( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] backend.script(ScriptedOutput(raise_with=RuntimeError("library leaked this"))) with pytest.raises(VioFatalError) as exc_info: strategy.process_frame(_frame(), _imu_window(), _calibration()) assert isinstance(exc_info.value.__cause__, RuntimeError) # =========================================================================== # AC-4: reset_to_warm_start clears state and seeds the hint; idempotent. def test_ac4_reset_to_warm_start_clears_and_seeds( fake_vins_mono_binding, fdr_client ) -> None: strategy = _build_strategy(fdr_client) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] hint = _warm_start_hint() strategy.reset_to_warm_start(hint) assert backend.reset_call_count == 1 health = strategy.health_snapshot() assert health.state == VioState.INIT assert health.consecutive_lost == 0 # bias_norm > 0 because the hint carries a non-zero bias assert health.bias_norm > 0.0 def test_ac4_reset_to_warm_start_is_idempotent( fake_vins_mono_binding, fdr_client ) -> None: strategy = _build_strategy(fdr_client) hint = _warm_start_hint() strategy.reset_to_warm_start(hint) strategy.reset_to_warm_start(hint) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] assert backend.reset_call_count == 2 # =========================================================================== # AC-5: INIT initially -> TRACKING after warm_start_max_frames frames. def test_ac5_health_snapshot_init_then_tracking( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=3) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] assert strategy.health_snapshot().state == VioState.INIT backend.script( ScriptedOutput(produced=True), ScriptedOutput(produced=True), ScriptedOutput(produced=True), ) for i in range(3): strategy.process_frame( _frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000), _imu_window(ts_ns_start=999_000_000 + i * 100_000_000), _calibration(), ) assert strategy.health_snapshot().state == VioState.TRACKING # =========================================================================== # AC-6: DEGRADED on feature loss; VioOutput STILL emitted (not raised); # covariance Frobenius norm strictly increases on the degraded frame. def test_ac6_degraded_on_feature_loss_emits_vio_output( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] healthy_payload = { "tracked_features": 80, "pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01, } degraded_payload = { "tracked_features": 5, "pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.5, } backend.script( ScriptedOutput(produced=True, payload=healthy_payload), ScriptedOutput(produced=True, payload=degraded_payload), ) healthy_out = strategy.process_frame(_frame(idx=1), _imu_window(), _calibration()) degraded_out = strategy.process_frame( _frame(idx=2, ts_ns=1_100_000_000), _imu_window(ts_ns_start=1_099_000_000), _calibration(), ) assert isinstance(degraded_out, VioOutput), "DEGRADED frame MUST emit output" assert strategy.health_snapshot().state == VioState.DEGRADED healthy_norm = np.linalg.norm(healthy_out.pose_covariance_6x6, ord="fro") degraded_norm = np.linalg.norm(degraded_out.pose_covariance_6x6, ord="fro") assert degraded_norm > healthy_norm, ( f"Frobenius norm must increase on DEGRADED frame " f"(healthy={healthy_norm}, degraded={degraded_norm})" ) # =========================================================================== # AC-7: After lost_frame_threshold consecutive failures, raise VioFatalError; # state == LOST. def test_ac7_sustained_loss_raises_vio_fatal_error( fake_vins_mono_binding, fdr_client ) -> None: config = _config(lost_frame_threshold=3, warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] backend.script( ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-1")), ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-2")), ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-3")), ) with pytest.raises(VioInitializingError): strategy.process_frame(_frame(idx=1), _imu_window(), _calibration()) with pytest.raises(VioInitializingError): strategy.process_frame( _frame(idx=2, ts_ns=1_100_000_000), _imu_window(ts_ns_start=1_099_000_000), _calibration(), ) with pytest.raises(VioFatalError): strategy.process_frame( _frame(idx=3, ts_ns=1_200_000_000), _imu_window(ts_ns_start=1_199_000_000), _calibration(), ) assert strategy.health_snapshot().state == VioState.LOST # =========================================================================== # AC-8: BUILD_VINS_MONO=OFF lazy-import guarantee — complementary check. # (Primary AC-8 coverage lives in test_protocol_conformance.py via the # AZ-331 factory which gates BEFORE constructor.) def test_ac8_strategy_module_not_imported_at_package_load( monkeypatch, ) -> None: """Importing `c1_vio` itself MUST NOT load `c1_vio.vins_mono`. Risk-2 / Risk-3 guard — the factory respects the BUILD_VINS_MONO flag and only triggers the import on demand. This complements the test_ac5_build_vio_strategy_flag_off_no_import test in test_protocol_conformance.py. """ import sys sys.modules.pop("gps_denied_onboard.components.c1_vio.vins_mono", None) sys.modules.pop("gps_denied_onboard.components.c1_vio", None) import importlib importlib.import_module("gps_denied_onboard.components.c1_vio") assert "gps_denied_onboard.components.c1_vio.vins_mono" not in sys.modules # =========================================================================== # AC-9: tier2 — honest covariance Frobenius monotonically non-decreasing # across a controlled-degradation window. @pytest.mark.tier2 def test_ac9_honest_covariance_monotonic_during_degraded( fake_vins_mono_binding, fdr_client ) -> None: """Tier-2: 60 s controlled-degradation fixture; covariance MUST not shrink during the DEGRADED window. The fake binding here exercises the facade's enforcement contract — real validation against VINS-Mono's marginalised information matrix is the Jetson-side follow-up that wires :class:`vins_estimator::Estimator` (skeleton today). """ config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] base_cov = np.eye(6, dtype=np.float64) * 0.01 backend.script( ScriptedOutput(produced=True, payload={"tracked_features": 80}), *[ ScriptedOutput( produced=True, payload={ "tracked_features": 10, "pose_covariance_6x6": base_cov * (1.0 + i), }, ) for i in range(5) ], ) outputs = [] for i in range(6): outputs.append( strategy.process_frame( _frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000), _imu_window(ts_ns_start=999_000_000 + i * 100_000_000), _calibration(), ) ) import itertools degraded_outputs = outputs[1:] # 5 DEGRADED frames norms = [np.linalg.norm(o.pose_covariance_6x6, ord="fro") for o in degraded_outputs] for prev, curr in itertools.pairwise(norms): assert curr >= prev, ( f"covariance Frobenius norm must be monotonically non-decreasing " f"during DEGRADED; got prev={prev}, curr={curr}" ) # =========================================================================== # AC-10: Exactly one vio.health record per state transition; no spam on # steady-state. def test_ac10_fdr_vio_health_emitted_per_transition( fake_vins_mono_binding, fdr_client ) -> None: config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] pre_records = _drain(fdr_client) assert pre_records == [], "construction must not emit vio.health" backend.script( ScriptedOutput(produced=True, payload={"tracked_features": 80}), ScriptedOutput(produced=True, payload={"tracked_features": 80}), ScriptedOutput(produced=True, payload={"tracked_features": 10}), ScriptedOutput(produced=True, payload={"tracked_features": 80}), ) for i in range(4): strategy.process_frame( _frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000), _imu_window(ts_ns_start=999_000_000 + i * 100_000_000), _calibration(), ) records = _drain(fdr_client) assert all(r.kind == "vio.health" for r in records) states = [r.payload["state"] for r in records] assert states == ["tracking", "degraded", "tracking"], ( f"unexpected transition sequence: {states}" ) # =========================================================================== # NFR-perf-document (tier2): VINS-Mono p95 is *recorded*, not bounded. # Per AZ-333 task spec NFR-perf, no hard threshold — Step 9 / E-BBT # comparative report consumes the p50/p95 numbers. @pytest.mark.tier2 def test_nfr_perf_process_frame_records_p95(fake_vins_mono_binding, fdr_client) -> None: """Tier-2: Real VINS-Mono binding + Derkachi-class fixture. Unlike :class:`Okvis2Strategy`, VINS-Mono is research-only and not bound by C1-PT-01's ≤ 80 ms p95. We record p95 here and assert only that it can be measured (i.e. process_frame completes 200x without deadlock or unbounded growth). The Step 9 / E-BBT comparative-study report ingests the produced number. """ import time config = _config(warm_start_max_frames=1) strategy = _build_strategy(fdr_client, config) backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined] n = 200 backend.script(*[ScriptedOutput(produced=True) for _ in range(n)]) durations_ms: list[float] = [] for i in range(n): t0 = time.perf_counter() strategy.process_frame( _frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000), _imu_window(ts_ns_start=999_000_000 + i * 100_000_000), _calibration(), ) durations_ms.append((time.perf_counter() - t0) * 1000.0) durations_ms.sort() p95 = durations_ms[int(0.95 * len(durations_ms))] assert p95 >= 0.0, f"VinsMono p95 must be measurable (got {p95})" # Loose sanity ceiling so a regression to seconds-per-frame fails the # tier2 run; VINS-Mono is best-effort but not pathologically slow. assert p95 <= 5_000.0, ( f"VinsMono process_frame p95={p95:.3f} ms grew pathologically " "(>5 s); investigate before publishing comparative report" ) # =========================================================================== # Construction guards. def test_construct_with_wrong_strategy_label_raises( fake_vins_mono_binding, fdr_client ) -> None: """Constructing directly with a non-vins_mono strategy is a developer bug.""" bad_config = Config.with_blocks(c1_vio=C1VioConfig(strategy="klt_ransac")) from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy with pytest.raises(VioFatalError): VinsMonoStrategy(bad_config, fdr_client=fdr_client) def test_build_via_factory_returns_vins_mono_strategy( fake_vins_mono_binding, fdr_client, monkeypatch ) -> None: """End-to-end factory wiring smoke — exercises the BUILD flag gate + lazy import path the conformance tests don't touch for the real `VinsMonoStrategy` class. """ monkeypatch.setenv("BUILD_VINS_MONO", "ON") from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy instance = build_vio_strategy(_config(), fdr_client=fdr_client) assert isinstance(instance, VinsMonoStrategy) assert instance.current_strategy_label() == "vins_mono"