Files
Oleksandr Bezdieniezhnykh 6a5954bdae [AZ-333] C1 VINS-Mono strategy — research-only comparative VIO
VinsMonoStrategy: Python facade conforming to AZ-331 Protocol; mirrors
the AZ-332 OKVIS2 facade so the AZ-331 factory + IT-12 comparative
harness can treat both as drop-in substitutable. Native binding is a
pybind11 skeleton compiled behind BUILD_VINS_MONO=ON (default OFF for
airborne / operator-tooling / replay-cli per module-layout.md
Build-Time Exclusion Map). Real vins_estimator wiring is the Tier-2
follow-up.

VinsMonoConfig added to c1_vio/config.py with sliding-window /
feature-tracker / marginalisation / opt-iteration knobs plus
__post_init__ validation; exported through the package __init__.

cpp/vins_mono/CMakeLists.txt replaces the AZ-263 placeholder with full
pybind11 wiring: Risk-1 mitigation forces VINS_MONO_USE_ROS=OFF;
Risk-2 mitigation links Eigen from the same cpp/_third_party/eigen pin
as OKVIS2; Risk-3 mitigation enforces BUILD_VINS_MONO=OFF in
deployment binaries via the gate at the top of the file.

Tests: 17 new in test_vins_mono_strategy.py (15 pass + 2 tier2 skip);
fake_vins_mono_binding fixture added to conftest.py mirroring the
fake_okvis2_binding pattern; test_protocol_conformance updated to drop
vins_mono from _STRATEGIES_WITHOUT_PY_MODULE so the existing
parametrised factory tests route through the new strategy.

Focused c1_vio suite: 72 passed, 4 skipped. Full suite: 1788 passed,
1 unrelated pre-existing flake (c12 cold-start perf, env-bound).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 01:11:09 +03:00

316 lines
10 KiB
Python

"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332 + AZ-333).
Provides scriptable fake binding modules installed at the
``sys.modules`` boundary BEFORE each strategy's lazy import inside the
constructor runs. Each fake mirrors its real binding's surface
(``Okvis2Backend`` / ``VinsMonoBackend`` class + 3 exception types)
so the Python facades can be exercised on macOS dev + GitHub Actions
Linux runner without the real OKVIS2 / VINS-Mono / pybind11 native
libs.
Each task spec explicitly permits this for AC-3, AC-6, AC-7 backend-
exception injection (and by extension the rest of the AC suite that
exercises the Python facade only). The :class:`FakeOkvis2Backend` and
:class:`FakeVinsMonoBackend` classes share the same scripted-output
shape (:class:`ScriptedOutput`) because the AZ-331 Protocol forces
both strategies to surface the same payload contract — keeping the
fakes shape-compatible cuts duplication and makes the IT-12
comparative harness trivially substitutable.
"""
from __future__ import annotations
import sys
from collections import deque
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import Any, Final
import numpy as np
import pytest
_BINDING_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio._native.okvis2_binding"
_STRATEGY_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio.okvis2"
_VINS_BINDING_MODULE_NAME: Final[str] = (
"gps_denied_onboard.components.c1_vio._native.vins_mono_binding"
)
_VINS_STRATEGY_MODULE_NAME: Final[str] = (
"gps_denied_onboard.components.c1_vio.vins_mono"
)
# ---------------------------------------------------------------------------
# Fake exception types — Python classes mirroring the C++ side.
class FakeOkvisInitException(Exception):
pass
class FakeOkvisFatalException(Exception):
pass
class FakeOkvisOptimizationException(Exception):
pass
# ---------------------------------------------------------------------------
# Scripted output payload — what the fake backend pops on each add_frame.
@dataclass
class ScriptedOutput:
"""A single scripted backend response.
``produced`` mirrors the real binding's ``add_frame`` return: True
means a new estimator output is available via
:meth:`Okvis2Backend.get_latest_output`. ``raise_with`` (if not
None) is raised from ``add_frame`` instead of producing an output.
"""
produced: bool = True
raise_with: Exception | None = None
payload: dict[str, Any] = field(default_factory=dict)
def _make_default_payload(frame_id: str = "frame-0001") -> dict[str, Any]:
"""A 'tracking' payload — SPD covariance, tracked > threshold."""
return {
"frame_id": frame_id,
"pose_T_world_body": np.eye(4, dtype=np.float64),
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
"accel_bias": np.zeros(3, dtype=np.float64),
"gyro_bias": np.zeros(3, dtype=np.float64),
"tracked_features": 80,
"new_features": 3,
"lost_features": 1,
"mean_parallax": 5.0,
"mre_px": 0.8,
"emitted_at_ns": 1_000_000_000,
}
# ---------------------------------------------------------------------------
# Scriptable fake Okvis2Backend.
class FakeOkvis2Backend:
def __init__(
self,
yaml_config: str,
camera_intrinsics_3x3: np.ndarray,
) -> None:
self.yaml_config = yaml_config
self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64)
self._scripted: deque[ScriptedOutput] = deque()
self._latest: dict[str, Any] | None = None
self._frames_seen: list[tuple[str, int]] = []
self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = []
self._reset_calls: int = 0
self._health: dict[str, Any] = {
"state": "init",
"consecutive_lost": 0,
"bias_norm": 0.0,
}
# Test-only API — caller scripts the queue of responses.
def script(self, *outputs: ScriptedOutput) -> None:
self._scripted.extend(outputs)
# ---- Real surface mirrored 1:1 with the C++ binding. ----
def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool:
self._frames_seen.append((frame_id, ts_ns))
if not self._scripted:
self._latest = _make_default_payload(frame_id)
return True
head = self._scripted.popleft()
if head.raise_with is not None:
raise head.raise_with
if head.produced:
payload = dict(_make_default_payload(frame_id))
payload.update(head.payload)
payload["frame_id"] = frame_id
self._latest = payload
return head.produced
def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None:
self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro)))
def get_latest_output(self) -> dict[str, Any] | None:
return self._latest
def reset(
self,
body_T_world: np.ndarray,
velocity: np.ndarray,
accel_bias: np.ndarray,
gyro_bias: np.ndarray,
) -> None:
self._reset_calls += 1
self._latest = None
self._health["state"] = "init"
self._health["consecutive_lost"] = 0
def health(self) -> dict[str, Any]:
return dict(self._health)
# ---- Test introspection helpers (NOT part of the real binding). ----
@property
def frames_seen(self) -> list[tuple[str, int]]:
return list(self._frames_seen)
@property
def reset_call_count(self) -> int:
return self._reset_calls
# ---------------------------------------------------------------------------
# Module fixture — installs fake `_native.okvis2_binding` at sys.modules.
@pytest.fixture
def fake_okvis2_binding(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[type[FakeOkvis2Backend]]:
"""Install a fake ``okvis2_binding`` module at the import boundary.
Cleans up both the binding module and the strategy module so each
test starts with a fresh lazy-import state.
"""
import types
fake_module = types.ModuleType(_BINDING_MODULE_NAME)
fake_module.Okvis2Backend = FakeOkvis2Backend # type: ignore[attr-defined]
fake_module.OkvisInitException = FakeOkvisInitException # type: ignore[attr-defined]
fake_module.OkvisFatalException = FakeOkvisFatalException # type: ignore[attr-defined]
fake_module.OkvisOptimizationException = ( # type: ignore[attr-defined]
FakeOkvisOptimizationException
)
sys.modules.pop(_BINDING_MODULE_NAME, None)
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
monkeypatch.setitem(sys.modules, _BINDING_MODULE_NAME, fake_module)
yield FakeOkvis2Backend
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
# ===========================================================================
# AZ-333 — VINS-Mono fake binding + fixture (mirrors the OKVIS2 pattern).
# Shape-compatible with FakeOkvis2Backend so the IT-12 comparative
# harness can drive both strategies through the same ScriptedOutput
# pipeline.
class FakeVinsMonoInitException(Exception):
pass
class FakeVinsMonoFatalException(Exception):
pass
class FakeVinsMonoOptimizationException(Exception):
pass
class FakeVinsMonoBackend:
def __init__(
self,
yaml_config: str,
camera_intrinsics_3x3: np.ndarray,
) -> None:
self.yaml_config = yaml_config
self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64)
self._scripted: deque[ScriptedOutput] = deque()
self._latest: dict[str, Any] | None = None
self._frames_seen: list[tuple[str, int]] = []
self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = []
self._reset_calls: int = 0
self._health: dict[str, Any] = {
"state": "init",
"consecutive_lost": 0,
"bias_norm": 0.0,
}
def script(self, *outputs: ScriptedOutput) -> None:
self._scripted.extend(outputs)
def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool:
self._frames_seen.append((frame_id, ts_ns))
if not self._scripted:
self._latest = _make_default_payload(frame_id)
return True
head = self._scripted.popleft()
if head.raise_with is not None:
raise head.raise_with
if head.produced:
payload = dict(_make_default_payload(frame_id))
payload.update(head.payload)
payload["frame_id"] = frame_id
self._latest = payload
return head.produced
def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None:
self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro)))
def get_latest_output(self) -> dict[str, Any] | None:
return self._latest
def reset(
self,
body_T_world: np.ndarray,
velocity: np.ndarray,
accel_bias: np.ndarray,
gyro_bias: np.ndarray,
) -> None:
self._reset_calls += 1
self._latest = None
self._health["state"] = "init"
self._health["consecutive_lost"] = 0
def health(self) -> dict[str, Any]:
return dict(self._health)
@property
def frames_seen(self) -> list[tuple[str, int]]:
return list(self._frames_seen)
@property
def reset_call_count(self) -> int:
return self._reset_calls
@pytest.fixture
def fake_vins_mono_binding(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[type[FakeVinsMonoBackend]]:
"""Install a fake ``vins_mono_binding`` module at the import boundary.
Cleans up both the binding module and the strategy module so each
test starts with a fresh lazy-import state. Mirrors
:func:`fake_okvis2_binding` exactly because the two strategies are
drop-in substitutable via the AZ-331 factory.
"""
import types
fake_module = types.ModuleType(_VINS_BINDING_MODULE_NAME)
fake_module.VinsMonoBackend = FakeVinsMonoBackend # type: ignore[attr-defined]
fake_module.VinsMonoInitException = FakeVinsMonoInitException # type: ignore[attr-defined]
fake_module.VinsMonoFatalException = FakeVinsMonoFatalException # type: ignore[attr-defined]
fake_module.VinsMonoOptimizationException = ( # type: ignore[attr-defined]
FakeVinsMonoOptimizationException
)
sys.modules.pop(_VINS_BINDING_MODULE_NAME, None)
sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None)
monkeypatch.setitem(sys.modules, _VINS_BINDING_MODULE_NAME, fake_module)
yield FakeVinsMonoBackend
sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None)