"""AZ-331 — C1 VioStrategy Protocol + DTO + error + factory conformance. Covers all 9 ACs of AZ-331 plus NFR-perf-factory and NFR-reliability-error-family. The factory ACs (AC-4 / AC-5) substitute fake strategy modules at ``sys.modules`` boundaries so the test never touches OKVIS2 / VINS-Mono / OpenCV native libraries. """ from __future__ import annotations import dataclasses import re import sys import time import types from pathlib import Path import gtsam import numpy as np import pytest from gps_denied_onboard._types.nav import ( FeatureQuality, ImuBias, VioHealth, VioOutput, VioState, WarmStartPose, ) from gps_denied_onboard.components.c1_vio import ( C1VioConfig, VioDegradedError, VioError, VioFatalError, VioInitializingError, VioStrategy, ) from gps_denied_onboard.components.c1_vio.config import KNOWN_STRATEGIES from gps_denied_onboard.config.schema import Config, ConfigError from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy _CONTRACT_PATH = ( Path(__file__).resolve().parents[3] / "_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md" ) _STRATEGY_MODULES: dict[str, tuple[str, str, str]] = { "okvis2": ( "gps_denied_onboard.components.c1_vio.okvis2", "Okvis2Strategy", "BUILD_OKVIS2", ), "vins_mono": ( "gps_denied_onboard.components.c1_vio.vins_mono", "VinsMonoStrategy", "BUILD_VINS_MONO", ), "klt_ransac": ( "gps_denied_onboard.components.c1_vio.klt_ransac", "KltRansacStrategy", "BUILD_KLT_RANSAC", ), } # ---------------------------------------------------------------------- # Fakes that structurally satisfy the VioStrategy Protocol. class _FullVioStrategy: def __init__(self, config: Config, *, fdr_client) -> None: self.config = config self.fdr_client = fdr_client self._label = config.components["c1_vio"].strategy def process_frame(self, frame, imu, calibration): raise NotImplementedError def reset_to_warm_start(self, hint): return None def health_snapshot(self): return VioHealth(state=VioState.INIT, consecutive_lost=0, bias_norm=0.0) def current_strategy_label(self): return self._label class _PartialVioStrategy: def process_frame(self, frame, imu, calibration): raise NotImplementedError def reset_to_warm_start(self, hint): return None def _config_with_strategy(strategy: str) -> Config: return Config.with_blocks(c1_vio=C1VioConfig(strategy=strategy)) def _install_fake_strategy(strategy_label: str) -> type: module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label] class _FakeStrategy(_FullVioStrategy): pass _FakeStrategy.__name__ = class_name module = types.ModuleType(module_name) setattr(module, class_name, _FakeStrategy) sys.modules[module_name] = module return _FakeStrategy @pytest.fixture def strategy_module_cleanup(): """Pop every fake strategy module before/after each factory test.""" for module_name, _, _ in _STRATEGY_MODULES.values(): sys.modules.pop(module_name, None) yield for module_name, _, _ in _STRATEGY_MODULES.values(): sys.modules.pop(module_name, None) def _zero_bias() -> ImuBias: return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) def _neutral_feature_quality() -> FeatureQuality: return FeatureQuality(tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0) def _make_vio_output(frame_id: str = "frame-0001") -> VioOutput: return VioOutput( frame_id=frame_id, relative_pose_T=gtsam.Pose3(np.eye(4)), pose_covariance_6x6=np.eye(6) * 0.01, imu_bias=_zero_bias(), feature_quality=_neutral_feature_quality(), emitted_at_ns=1_000_000_000, ) # ---------------------------------------------------------------------- # AC-1: Protocol is conformance-checkable. def test_ac1_vio_strategy_conformance_full() -> None: instance = _FullVioStrategy(_config_with_strategy("klt_ransac"), fdr_client=None) assert isinstance(instance, VioStrategy) def test_ac1_vio_strategy_conformance_partial_missing_methods() -> None: assert not isinstance(_PartialVioStrategy(), VioStrategy) # ---------------------------------------------------------------------- # AC-2: frozen DTOs reject mutation. @pytest.mark.parametrize( "dto, field_name, new_value", [ (_make_vio_output(), "frame_id", "renamed"), ( VioHealth(state=VioState.TRACKING, consecutive_lost=0, bias_norm=0.0), "state", VioState.LOST, ), ( WarmStartPose( body_T_world=gtsam.Pose3(np.eye(4)), velocity_b=(0.0, 0.0, 0.0), bias=_zero_bias(), captured_at_ns=1_000_000_000, ), "captured_at_ns", 0, ), (_neutral_feature_quality(), "tracked", 0), ], ) def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None: original_value = getattr(dto, field_name) with pytest.raises(dataclasses.FrozenInstanceError): setattr(dto, field_name, new_value) assert getattr(dto, field_name) == original_value # ---------------------------------------------------------------------- # AC-3: error hierarchy catchable as a single family. @pytest.mark.parametrize( "exc_factory", [VioInitializingError, VioDegradedError, VioFatalError], ) def test_ac3_all_vio_errors_caught_as_family(exc_factory) -> None: with pytest.raises(VioError): raise exc_factory("boom") def test_ac3_unrelated_exception_not_caught_as_family() -> None: with pytest.raises(ValueError): try: raise ValueError("not us") except VioError: pytest.fail("ValueError must not be caught as VioError") def test_ac3_strategy_not_available_outside_family() -> None: with pytest.raises(StrategyNotAvailableError): try: raise StrategyNotAvailableError("composition-time") except VioError: pytest.fail( "StrategyNotAvailableError is a composition-root error " "and MUST NOT be in the c1 VioError family" ) # ---------------------------------------------------------------------- # AC-4 + AC-5: factory honours config + BUILD flag gate. @pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) def test_ac4_build_vio_strategy_returns_protocol_impl( monkeypatch, strategy_module_cleanup, strategy ) -> None: _, _, flag = _STRATEGY_MODULES[strategy] monkeypatch.setenv(flag, "ON") fake_cls = _install_fake_strategy(strategy) config = _config_with_strategy(strategy) instance = build_vio_strategy(config, fdr_client=object()) assert isinstance(instance, fake_cls) assert isinstance(instance, VioStrategy) @pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) def test_ac5_build_vio_strategy_flag_off_no_import( monkeypatch, strategy_module_cleanup, strategy ) -> None: module_name, _, flag = _STRATEGY_MODULES[strategy] monkeypatch.delenv(flag, raising=False) config = _config_with_strategy(strategy) with pytest.raises(StrategyNotAvailableError) as exc_info: build_vio_strategy(config, fdr_client=object()) assert strategy in str(exc_info.value) assert flag in str(exc_info.value) assert module_name not in sys.modules # Which strategies still have NO concrete Python module on disk? # Once an AZ-332 / AZ-333 / AZ-334 implementation lands, the # `flag_on_but_module_missing` semantic shifts: the factory's import # succeeds, the constructor fails on missing native binding or other # prerequisite. We assert the meaningful-error-before-first-frame # property holds for BOTH cases — the exception class differs by # strategy. _STRATEGIES_WITHOUT_PY_MODULE: tuple[str, ...] = ("vins_mono", "klt_ransac") @pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) def test_ac5_build_vio_strategy_flag_on_but_module_missing( monkeypatch, strategy_module_cleanup, strategy ) -> None: _, _, flag = _STRATEGY_MODULES[strategy] monkeypatch.setenv(flag, "ON") config = _config_with_strategy(strategy) if strategy in _STRATEGIES_WITHOUT_PY_MODULE: # Module not yet implemented — factory's __import__ raises # ModuleNotFoundError, rewrapped into StrategyNotAvailableError. with pytest.raises(StrategyNotAvailableError) as exc_info: build_vio_strategy(config, fdr_client=object()) assert strategy in str(exc_info.value) else: # Module IS implemented (AZ-332). Factory import succeeds, then # the strategy constructor fails on missing native binding — # which the strategy MUST surface as VioFatalError BEFORE any # frame is processed (the AC-5 spirit: no silent fall-through). with pytest.raises(VioFatalError) as exc_info: build_vio_strategy(config, fdr_client=object()) assert "native binding" in str(exc_info.value) # ---------------------------------------------------------------------- # AC-6: unknown strategy label rejected at config load. @pytest.mark.parametrize( "bad_label", ["openvslam", "orbslam3", "OKVIS2", "okvis", ""], ) def test_ac6_unknown_strategy_rejected_at_config_load(bad_label: str) -> None: with pytest.raises(ConfigError) as exc_info: C1VioConfig(strategy=bad_label) msg = str(exc_info.value) for valid in KNOWN_STRATEGIES: assert valid in msg # ---------------------------------------------------------------------- # AC-7: current_strategy_label() matches config exactly. @pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) def test_ac7_current_strategy_label_matches_config( monkeypatch, strategy_module_cleanup, strategy ) -> None: _, _, flag = _STRATEGY_MODULES[strategy] monkeypatch.setenv(flag, "ON") _install_fake_strategy(strategy) config = _config_with_strategy(strategy) instance = build_vio_strategy(config, fdr_client=object()) assert instance.current_strategy_label() == strategy assert instance.current_strategy_label() == config.components["c1_vio"].strategy # ---------------------------------------------------------------------- # AC-8: contract file matches Protocol shape. _METHOD_TABLE_RE = re.compile(r"^\|\s*`(?P[a-z_][a-z0-9_]*)`\s*\|", re.MULTILINE) def _methods_from_contract() -> set[str]: text = _CONTRACT_PATH.read_text(encoding="utf-8") surface_start = text.index("### Protocol surface") next_section = text.find("\n### ", surface_start + len("### Protocol surface")) section = text[surface_start:next_section] if next_section != -1 else text[surface_start:] return {m.group("name") for m in _METHOD_TABLE_RE.finditer(section)} def _protocol_methods(proto: type) -> set[str]: return { name for name in dir(proto) if not name.startswith("_") and callable(getattr(proto, name)) } def test_ac8_contract_methods_match_protocol() -> None: contract_methods = _methods_from_contract() protocol_methods = _protocol_methods(VioStrategy) missing_in_protocol = contract_methods - protocol_methods missing_in_contract = protocol_methods - contract_methods assert not missing_in_protocol, ( "Methods declared in vio_strategy_protocol.md Shape section but " f"missing from the Protocol: {sorted(missing_in_protocol)}" ) assert not missing_in_contract, ( "Methods present on the Protocol but missing from the contract " f"Shape section: {sorted(missing_in_contract)}" ) def test_ac8_contract_lists_all_three_error_subtypes() -> None: text = _CONTRACT_PATH.read_text(encoding="utf-8") for name in {"VioInitializingError", "VioDegradedError", "VioFatalError"}: assert name in text, f"Contract file is missing the documented error subtype {name!r}" # ---------------------------------------------------------------------- # AC-9: VioOutput.frame_id echo invariant is typed. def test_ac9_vio_output_frame_id_is_typed_str() -> None: """``VioOutput.frame_id`` annotation is ``str`` per AZ-331 AC-9. With ``from __future__ import annotations`` PEP-563 stringifies every annotation at module load, so ``__annotations__`` returns the literal ``'str'``. Compare against the string to avoid the full ``get_type_hints`` forward-ref resolution path (which would try to resolve neighbouring TYPE_CHECKING-only names like :class:`SE3`). """ annotation = VioOutput.__annotations__["frame_id"] assert annotation == "str", f"frame_id annotation should be 'str'; got {annotation!r}" def test_ac9_vio_output_docstring_documents_echo_invariant() -> None: docstring = VioOutput.__doc__ or "" assert "echo" in docstring.lower(), ( "VioOutput docstring must document the frame_id echo invariant " "(MUST equal NavCameraFrame.frame_id from the input frame)" ) assert "frame_id" in docstring.lower() # ---------------------------------------------------------------------- # NFRs. @pytest.mark.parametrize( "exc_type", [VioInitializingError, VioDegradedError, VioFatalError], ) def test_nfr_reliability_all_vio_errors_subclass_family(exc_type) -> None: assert issubclass(exc_type, VioError) def test_nfr_reliability_strategy_not_available_not_in_family() -> None: assert not issubclass(StrategyNotAvailableError, VioError) def test_nfr_perf_factory_under_200ms_p99(monkeypatch, strategy_module_cleanup) -> None: """Factory p99 ≤ 200 ms across 1000 calls (NFR-perf-factory).""" strategy = "klt_ransac" _, _, flag = _STRATEGY_MODULES[strategy] monkeypatch.setenv(flag, "ON") _install_fake_strategy(strategy) config = _config_with_strategy(strategy) durations_ms: list[float] = [] for _ in range(1000): t0 = time.perf_counter() build_vio_strategy(config, fdr_client=object()) durations_ms.append((time.perf_counter() - t0) * 1000.0) durations_ms.sort() p99 = durations_ms[int(0.99 * len(durations_ms))] assert p99 <= 200.0, f"build_vio_strategy() p99={p99:.3f} ms exceeds 200 ms NFR" # ---------------------------------------------------------------------- # Surface coverage. def test_vio_state_enum_surface() -> None: assert {v.value for v in VioState} == {"init", "tracking", "degraded", "lost"} def test_c1_config_lost_frame_threshold_validation() -> None: with pytest.raises(ConfigError): C1VioConfig(lost_frame_threshold=0) with pytest.raises(ConfigError): C1VioConfig(lost_frame_threshold=-1) def test_c1_config_warm_start_max_frames_validation() -> None: with pytest.raises(ConfigError): C1VioConfig(warm_start_max_frames=0) def test_feature_quality_dto_constructs_and_freezes() -> None: fq = _neutral_feature_quality() with pytest.raises(dataclasses.FrozenInstanceError): fq.mre_px = 99.0 # type: ignore[misc] def test_warm_start_pose_constructs_with_zero_bias() -> None: hint = WarmStartPose( body_T_world=gtsam.Pose3(np.eye(4)), velocity_b=(0.0, 0.0, 0.0), bias=_zero_bias(), captured_at_ns=1_000_000_000, ) assert hint.captured_at_ns == 1_000_000_000 assert hint.bias.accel_bias == (0.0, 0.0, 0.0)