"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332 + AZ-333). Provides scriptable fake binding modules installed at the ``sys.modules`` boundary BEFORE each strategy's lazy import inside the constructor runs. Each fake mirrors its real binding's surface (``Okvis2Backend`` / ``VinsMonoBackend`` class + 3 exception types) so the Python facades can be exercised on macOS dev + GitHub Actions Linux runner without the real OKVIS2 / VINS-Mono / pybind11 native libs. Each task spec explicitly permits this for AC-3, AC-6, AC-7 backend- exception injection (and by extension the rest of the AC suite that exercises the Python facade only). The :class:`FakeOkvis2Backend` and :class:`FakeVinsMonoBackend` classes share the same scripted-output shape (:class:`ScriptedOutput`) because the AZ-331 Protocol forces both strategies to surface the same payload contract — keeping the fakes shape-compatible cuts duplication and makes the IT-12 comparative harness trivially substitutable. """ from __future__ import annotations import sys from collections import deque from collections.abc import Iterator from dataclasses import dataclass, field from typing import Any, Final import numpy as np import pytest _BINDING_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio._native.okvis2_binding" _STRATEGY_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio.okvis2" _VINS_BINDING_MODULE_NAME: Final[str] = ( "gps_denied_onboard.components.c1_vio._native.vins_mono_binding" ) _VINS_STRATEGY_MODULE_NAME: Final[str] = ( "gps_denied_onboard.components.c1_vio.vins_mono" ) # --------------------------------------------------------------------------- # Fake exception types — Python classes mirroring the C++ side. class FakeOkvisInitException(Exception): pass class FakeOkvisFatalException(Exception): pass class FakeOkvisOptimizationException(Exception): pass # --------------------------------------------------------------------------- # Scripted output payload — what the fake backend pops on each add_frame. @dataclass class ScriptedOutput: """A single scripted backend response. ``produced`` mirrors the real binding's ``add_frame`` return: True means a new estimator output is available via :meth:`Okvis2Backend.get_latest_output`. ``raise_with`` (if not None) is raised from ``add_frame`` instead of producing an output. """ produced: bool = True raise_with: Exception | None = None payload: dict[str, Any] = field(default_factory=dict) def _make_default_payload(frame_id: str = "frame-0001") -> dict[str, Any]: """A 'tracking' payload — SPD covariance, tracked > threshold.""" return { "frame_id": frame_id, "pose_T_world_body": np.eye(4, dtype=np.float64), "pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01, "accel_bias": np.zeros(3, dtype=np.float64), "gyro_bias": np.zeros(3, dtype=np.float64), "tracked_features": 80, "new_features": 3, "lost_features": 1, "mean_parallax": 5.0, "mre_px": 0.8, "emitted_at_ns": 1_000_000_000, } # --------------------------------------------------------------------------- # Scriptable fake Okvis2Backend. class FakeOkvis2Backend: def __init__( self, yaml_config: str, camera_intrinsics_3x3: np.ndarray, ) -> None: self.yaml_config = yaml_config self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64) self._scripted: deque[ScriptedOutput] = deque() self._latest: dict[str, Any] | None = None self._frames_seen: list[tuple[str, int]] = [] self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = [] self._reset_calls: int = 0 self._health: dict[str, Any] = { "state": "init", "consecutive_lost": 0, "bias_norm": 0.0, } # Test-only API — caller scripts the queue of responses. def script(self, *outputs: ScriptedOutput) -> None: self._scripted.extend(outputs) # ---- Real surface mirrored 1:1 with the C++ binding. ---- def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool: self._frames_seen.append((frame_id, ts_ns)) if not self._scripted: self._latest = _make_default_payload(frame_id) return True head = self._scripted.popleft() if head.raise_with is not None: raise head.raise_with if head.produced: payload = dict(_make_default_payload(frame_id)) payload.update(head.payload) payload["frame_id"] = frame_id self._latest = payload return head.produced def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None: self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro))) def get_latest_output(self) -> dict[str, Any] | None: return self._latest def reset( self, body_T_world: np.ndarray, velocity: np.ndarray, accel_bias: np.ndarray, gyro_bias: np.ndarray, ) -> None: self._reset_calls += 1 self._latest = None self._health["state"] = "init" self._health["consecutive_lost"] = 0 def health(self) -> dict[str, Any]: return dict(self._health) # ---- Test introspection helpers (NOT part of the real binding). ---- @property def frames_seen(self) -> list[tuple[str, int]]: return list(self._frames_seen) @property def reset_call_count(self) -> int: return self._reset_calls # --------------------------------------------------------------------------- # Module fixture — installs fake `_native.okvis2_binding` at sys.modules. @pytest.fixture def fake_okvis2_binding( monkeypatch: pytest.MonkeyPatch, ) -> Iterator[type[FakeOkvis2Backend]]: """Install a fake ``okvis2_binding`` module at the import boundary. Cleans up both the binding module and the strategy module so each test starts with a fresh lazy-import state. """ import types fake_module = types.ModuleType(_BINDING_MODULE_NAME) fake_module.Okvis2Backend = FakeOkvis2Backend # type: ignore[attr-defined] fake_module.OkvisInitException = FakeOkvisInitException # type: ignore[attr-defined] fake_module.OkvisFatalException = FakeOkvisFatalException # type: ignore[attr-defined] fake_module.OkvisOptimizationException = ( # type: ignore[attr-defined] FakeOkvisOptimizationException ) sys.modules.pop(_BINDING_MODULE_NAME, None) sys.modules.pop(_STRATEGY_MODULE_NAME, None) monkeypatch.setitem(sys.modules, _BINDING_MODULE_NAME, fake_module) yield FakeOkvis2Backend sys.modules.pop(_STRATEGY_MODULE_NAME, None) # =========================================================================== # AZ-333 — VINS-Mono fake binding + fixture (mirrors the OKVIS2 pattern). # Shape-compatible with FakeOkvis2Backend so the IT-12 comparative # harness can drive both strategies through the same ScriptedOutput # pipeline. class FakeVinsMonoInitException(Exception): pass class FakeVinsMonoFatalException(Exception): pass class FakeVinsMonoOptimizationException(Exception): pass class FakeVinsMonoBackend: def __init__( self, yaml_config: str, camera_intrinsics_3x3: np.ndarray, ) -> None: self.yaml_config = yaml_config self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64) self._scripted: deque[ScriptedOutput] = deque() self._latest: dict[str, Any] | None = None self._frames_seen: list[tuple[str, int]] = [] self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = [] self._reset_calls: int = 0 self._health: dict[str, Any] = { "state": "init", "consecutive_lost": 0, "bias_norm": 0.0, } def script(self, *outputs: ScriptedOutput) -> None: self._scripted.extend(outputs) def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool: self._frames_seen.append((frame_id, ts_ns)) if not self._scripted: self._latest = _make_default_payload(frame_id) return True head = self._scripted.popleft() if head.raise_with is not None: raise head.raise_with if head.produced: payload = dict(_make_default_payload(frame_id)) payload.update(head.payload) payload["frame_id"] = frame_id self._latest = payload return head.produced def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None: self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro))) def get_latest_output(self) -> dict[str, Any] | None: return self._latest def reset( self, body_T_world: np.ndarray, velocity: np.ndarray, accel_bias: np.ndarray, gyro_bias: np.ndarray, ) -> None: self._reset_calls += 1 self._latest = None self._health["state"] = "init" self._health["consecutive_lost"] = 0 def health(self) -> dict[str, Any]: return dict(self._health) @property def frames_seen(self) -> list[tuple[str, int]]: return list(self._frames_seen) @property def reset_call_count(self) -> int: return self._reset_calls @pytest.fixture def fake_vins_mono_binding( monkeypatch: pytest.MonkeyPatch, ) -> Iterator[type[FakeVinsMonoBackend]]: """Install a fake ``vins_mono_binding`` module at the import boundary. Cleans up both the binding module and the strategy module so each test starts with a fresh lazy-import state. Mirrors :func:`fake_okvis2_binding` exactly because the two strategies are drop-in substitutable via the AZ-331 factory. """ import types fake_module = types.ModuleType(_VINS_BINDING_MODULE_NAME) fake_module.VinsMonoBackend = FakeVinsMonoBackend # type: ignore[attr-defined] fake_module.VinsMonoInitException = FakeVinsMonoInitException # type: ignore[attr-defined] fake_module.VinsMonoFatalException = FakeVinsMonoFatalException # type: ignore[attr-defined] fake_module.VinsMonoOptimizationException = ( # type: ignore[attr-defined] FakeVinsMonoOptimizationException ) sys.modules.pop(_VINS_BINDING_MODULE_NAME, None) sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None) monkeypatch.setitem(sys.modules, _VINS_BINDING_MODULE_NAME, fake_module) yield FakeVinsMonoBackend sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None)