[AZ-333] C1 VINS-Mono strategy — research-only comparative VIO

VinsMonoStrategy: Python facade conforming to AZ-331 Protocol; mirrors
the AZ-332 OKVIS2 facade so the AZ-331 factory + IT-12 comparative
harness can treat both as drop-in substitutable. Native binding is a
pybind11 skeleton compiled behind BUILD_VINS_MONO=ON (default OFF for
airborne / operator-tooling / replay-cli per module-layout.md
Build-Time Exclusion Map). Real vins_estimator wiring is the Tier-2
follow-up.

VinsMonoConfig added to c1_vio/config.py with sliding-window /
feature-tracker / marginalisation / opt-iteration knobs plus
__post_init__ validation; exported through the package __init__.

cpp/vins_mono/CMakeLists.txt replaces the AZ-263 placeholder with full
pybind11 wiring: Risk-1 mitigation forces VINS_MONO_USE_ROS=OFF;
Risk-2 mitigation links Eigen from the same cpp/_third_party/eigen pin
as OKVIS2; Risk-3 mitigation enforces BUILD_VINS_MONO=OFF in
deployment binaries via the gate at the top of the file.

Tests: 17 new in test_vins_mono_strategy.py (15 pass + 2 tier2 skip);
fake_vins_mono_binding fixture added to conftest.py mirroring the
fake_okvis2_binding pattern; test_protocol_conformance updated to drop
vins_mono from _STRATEGIES_WITHOUT_PY_MODULE so the existing
parametrised factory tests route through the new strategy.

Focused c1_vio suite: 72 passed, 4 skipped. Full suite: 1788 passed,
1 unrelated pre-existing flake (c12 cold-start perf, env-bound).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 01:11:09 +03:00
parent 2ce300ddb1
commit 6a5954bdae
13 changed files with 2056 additions and 15 deletions
+137 -9
View File
@@ -1,15 +1,21 @@
"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332).
"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332 + AZ-333).
Provides a scriptable fake ``okvis2_binding`` module installed at the
``sys.modules`` boundary BEFORE the strategy's lazy import inside the
constructor runs. The fake mirrors the real binding's surface
(``Okvis2Backend`` class + 3 exception types) so :class:`Okvis2Strategy`
can be exercised on macOS dev + GitHub Actions Linux runner without
the real OKVIS2 / pybind11 native lib.
Provides scriptable fake binding modules installed at the
``sys.modules`` boundary BEFORE each strategy's lazy import inside the
constructor runs. Each fake mirrors its real binding's surface
(``Okvis2Backend`` / ``VinsMonoBackend`` class + 3 exception types)
so the Python facades can be exercised on macOS dev + GitHub Actions
Linux runner without the real OKVIS2 / VINS-Mono / pybind11 native
libs.
The task spec explicitly permits this for AC-3, AC-6, AC-7 backend-
Each task spec explicitly permits this for AC-3, AC-6, AC-7 backend-
exception injection (and by extension the rest of the AC suite that
exercises the Python facade only).
exercises the Python facade only). The :class:`FakeOkvis2Backend` and
:class:`FakeVinsMonoBackend` classes share the same scripted-output
shape (:class:`ScriptedOutput`) because the AZ-331 Protocol forces
both strategies to surface the same payload contract — keeping the
fakes shape-compatible cuts duplication and makes the IT-12
comparative harness trivially substitutable.
"""
from __future__ import annotations
@@ -25,6 +31,12 @@ import pytest
_BINDING_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio._native.okvis2_binding"
_STRATEGY_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio.okvis2"
_VINS_BINDING_MODULE_NAME: Final[str] = (
"gps_denied_onboard.components.c1_vio._native.vins_mono_binding"
)
_VINS_STRATEGY_MODULE_NAME: Final[str] = (
"gps_denied_onboard.components.c1_vio.vins_mono"
)
# ---------------------------------------------------------------------------
@@ -185,3 +197,119 @@ def fake_okvis2_binding(
yield FakeOkvis2Backend
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
# ===========================================================================
# AZ-333 — VINS-Mono fake binding + fixture (mirrors the OKVIS2 pattern).
# Shape-compatible with FakeOkvis2Backend so the IT-12 comparative
# harness can drive both strategies through the same ScriptedOutput
# pipeline.
class FakeVinsMonoInitException(Exception):
pass
class FakeVinsMonoFatalException(Exception):
pass
class FakeVinsMonoOptimizationException(Exception):
pass
class FakeVinsMonoBackend:
def __init__(
self,
yaml_config: str,
camera_intrinsics_3x3: np.ndarray,
) -> None:
self.yaml_config = yaml_config
self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64)
self._scripted: deque[ScriptedOutput] = deque()
self._latest: dict[str, Any] | None = None
self._frames_seen: list[tuple[str, int]] = []
self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = []
self._reset_calls: int = 0
self._health: dict[str, Any] = {
"state": "init",
"consecutive_lost": 0,
"bias_norm": 0.0,
}
def script(self, *outputs: ScriptedOutput) -> None:
self._scripted.extend(outputs)
def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool:
self._frames_seen.append((frame_id, ts_ns))
if not self._scripted:
self._latest = _make_default_payload(frame_id)
return True
head = self._scripted.popleft()
if head.raise_with is not None:
raise head.raise_with
if head.produced:
payload = dict(_make_default_payload(frame_id))
payload.update(head.payload)
payload["frame_id"] = frame_id
self._latest = payload
return head.produced
def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None:
self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro)))
def get_latest_output(self) -> dict[str, Any] | None:
return self._latest
def reset(
self,
body_T_world: np.ndarray,
velocity: np.ndarray,
accel_bias: np.ndarray,
gyro_bias: np.ndarray,
) -> None:
self._reset_calls += 1
self._latest = None
self._health["state"] = "init"
self._health["consecutive_lost"] = 0
def health(self) -> dict[str, Any]:
return dict(self._health)
@property
def frames_seen(self) -> list[tuple[str, int]]:
return list(self._frames_seen)
@property
def reset_call_count(self) -> int:
return self._reset_calls
@pytest.fixture
def fake_vins_mono_binding(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[type[FakeVinsMonoBackend]]:
"""Install a fake ``vins_mono_binding`` module at the import boundary.
Cleans up both the binding module and the strategy module so each
test starts with a fresh lazy-import state. Mirrors
:func:`fake_okvis2_binding` exactly because the two strategies are
drop-in substitutable via the AZ-331 factory.
"""
import types
fake_module = types.ModuleType(_VINS_BINDING_MODULE_NAME)
fake_module.VinsMonoBackend = FakeVinsMonoBackend # type: ignore[attr-defined]
fake_module.VinsMonoInitException = FakeVinsMonoInitException # type: ignore[attr-defined]
fake_module.VinsMonoFatalException = FakeVinsMonoFatalException # type: ignore[attr-defined]
fake_module.VinsMonoOptimizationException = ( # type: ignore[attr-defined]
FakeVinsMonoOptimizationException
)
sys.modules.pop(_VINS_BINDING_MODULE_NAME, None)
sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None)
monkeypatch.setitem(sys.modules, _VINS_BINDING_MODULE_NAME, fake_module)
yield FakeVinsMonoBackend
sys.modules.pop(_VINS_STRATEGY_MODULE_NAME, None)
@@ -256,7 +256,7 @@ def test_ac5_build_vio_strategy_flag_off_no_import(
# prerequisite. We assert the meaningful-error-before-first-frame
# property holds for BOTH cases — the exception class differs by
# strategy.
_STRATEGIES_WITHOUT_PY_MODULE: tuple[str, ...] = ("vins_mono", "klt_ransac")
_STRATEGIES_WITHOUT_PY_MODULE: tuple[str, ...] = ("klt_ransac",)
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
@@ -0,0 +1,568 @@
"""AZ-333 — :class:`VinsMonoStrategy` acceptance criteria coverage.
Covers AC-1 through AC-10 (with AC-9 + NFR-perf tagged
``@pytest.mark.tier2``; the AZ-333 task spec exempts this strategy
from the C1-PT-01 ≤ 80 ms p95 hard threshold but still asserts the
honest-covariance monotonicity invariant on tier2 with the real
binding).
Uses the ``fake_vins_mono_binding`` fixture from ``conftest.py`` to
script backend responses — the task spec explicitly permits a fake
binding for backend-exception injection (AC-3 / AC-6 / AC-7) and by
extension the rest of the Python-facade-only AC suite.
Mirrors the AZ-332 ``test_okvis2_strategy.py`` layout deliberately:
the AZ-331 factory produces both via the same `(config, *,
fdr_client)` shape and the IT-12 comparative-study harness expects the
two to behave identically through the Python facade.
"""
from __future__ import annotations
from datetime import datetime, timezone
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuBias,
ImuSample,
ImuWindow,
NavCameraFrame,
VioOutput,
VioState,
WarmStartPose,
)
from gps_denied_onboard.components.c1_vio import (
C1VioConfig,
VinsMonoConfig,
VioError,
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.config.schema import Config, RuntimeConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import FdrRecord
from tests.unit.c1_vio.conftest import (
FakeVinsMonoBackend,
FakeVinsMonoFatalException,
FakeVinsMonoInitException,
FakeVinsMonoOptimizationException,
ScriptedOutput,
)
def _zero_bias() -> ImuBias:
return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
def _calibration() -> CameraCalibration:
return CameraCalibration(
camera_id="test-cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(4, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="unit-test-static",
metadata={},
)
def _frame(idx: int = 1, ts_ns: int = 1_000_000_000) -> NavCameraFrame:
return NavCameraFrame(
frame_id=idx,
timestamp=datetime.fromtimestamp(ts_ns * 1e-9, tz=timezone.utc),
image=np.zeros((4, 4, 3), dtype=np.uint8),
camera_calibration_id="test-cam",
)
def _imu_window(ts_ns_start: int = 999_000_000, n: int = 3) -> ImuWindow:
samples = tuple(
ImuSample(
ts_ns=ts_ns_start + i * 5_000_000,
accel_xyz=(0.0, 0.0, 9.81),
gyro_xyz=(0.0, 0.0, 0.0),
)
for i in range(n)
)
return ImuWindow(
samples=samples,
ts_start_ns=samples[0].ts_ns,
ts_end_ns=samples[-1].ts_ns,
)
def _warm_start_hint() -> WarmStartPose:
return WarmStartPose(
body_T_world=gtsam.Pose3(np.eye(4)),
velocity_b=(0.5, 0.0, 0.0),
bias=ImuBias(
accel_bias=(0.01, -0.02, 0.0),
gyro_bias=(0.003, 0.0, -0.001),
),
captured_at_ns=1_000_000_000,
)
def _config(
vins_cfg: VinsMonoConfig | None = None,
lost_frame_threshold: int = 9,
warm_start_max_frames: int = 5,
) -> Config:
return Config.with_blocks(
c1_vio=C1VioConfig(
strategy="vins_mono",
lost_frame_threshold=lost_frame_threshold,
warm_start_max_frames=warm_start_max_frames,
vins_mono=vins_cfg or VinsMonoConfig(),
),
runtime=RuntimeConfig(camera_calibration_path=""),
)
@pytest.fixture
def fdr_client() -> FdrClient:
return FdrClient(producer_id="c1_vio.vins_mono", capacity=256, _emit_diag_log=False)
def _build_strategy(
fdr_client: FdrClient,
config: Config | None = None,
):
"""Lazy import after the fake binding is installed in sys.modules."""
from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy
return VinsMonoStrategy(config or _config(), fdr_client=fdr_client)
def _drain(fdr_client: FdrClient) -> list[FdrRecord]:
return fdr_client.drain(max_records=1024)
# ===========================================================================
# AC-1: current_strategy_label returns "vins_mono".
def test_ac1_current_strategy_label_returns_vins_mono(
fake_vins_mono_binding, fdr_client
) -> None:
strategy = _build_strategy(fdr_client)
assert strategy.current_strategy_label() == "vins_mono"
# ===========================================================================
# AC-2: process_frame returns VioOutput with echoed frame_id, SPD cov, bias.
def test_ac2_process_frame_returns_vio_output_with_frame_id(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(produced=True))
out = strategy.process_frame(_frame(idx=42), _imu_window(), _calibration())
assert isinstance(out, VioOutput)
assert out.frame_id == "42"
assert out.pose_covariance_6x6.shape == (6, 6)
assert np.allclose(out.pose_covariance_6x6, out.pose_covariance_6x6.T)
eigvals = np.linalg.eigvalsh(out.pose_covariance_6x6)
assert np.all(eigvals > 0), "covariance must be SPD"
assert out.imu_bias is not None
assert out.feature_quality.tracked > 0
# ===========================================================================
# AC-3: backend exceptions rewrap into VioError with __cause__ chain.
@pytest.mark.parametrize(
"fake_exc_cls, expected_facade_exc",
[
(FakeVinsMonoInitException, VioInitializingError),
(FakeVinsMonoFatalException, VioFatalError),
],
)
def test_ac3_backend_exceptions_rewrap_to_vio_error_family(
fake_vins_mono_binding, fdr_client, fake_exc_cls, expected_facade_exc
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=fake_exc_cls("boom from backend")))
with pytest.raises(expected_facade_exc) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value, VioError)
assert isinstance(exc_info.value.__cause__, fake_exc_cls)
def test_ac3_optimization_exception_during_init_rewraps_to_initializing(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=5, lost_frame_threshold=9)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
backend.script(
ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("opt fail"))
)
with pytest.raises(VioInitializingError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, FakeVinsMonoOptimizationException)
def test_ac3_unmapped_runtime_error_rewraps_to_vio_fatal(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=RuntimeError("library leaked this")))
with pytest.raises(VioFatalError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, RuntimeError)
# ===========================================================================
# AC-4: reset_to_warm_start clears state and seeds the hint; idempotent.
def test_ac4_reset_to_warm_start_clears_and_seeds(
fake_vins_mono_binding, fdr_client
) -> None:
strategy = _build_strategy(fdr_client)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
assert backend.reset_call_count == 1
health = strategy.health_snapshot()
assert health.state == VioState.INIT
assert health.consecutive_lost == 0
# bias_norm > 0 because the hint carries a non-zero bias
assert health.bias_norm > 0.0
def test_ac4_reset_to_warm_start_is_idempotent(
fake_vins_mono_binding, fdr_client
) -> None:
strategy = _build_strategy(fdr_client)
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
strategy.reset_to_warm_start(hint)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
assert backend.reset_call_count == 2
# ===========================================================================
# AC-5: INIT initially -> TRACKING after warm_start_max_frames frames.
def test_ac5_health_snapshot_init_then_tracking(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=3)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
assert strategy.health_snapshot().state == VioState.INIT
backend.script(
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
)
for i in range(3):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.TRACKING
# ===========================================================================
# AC-6: DEGRADED on feature loss; VioOutput STILL emitted (not raised);
# covariance Frobenius norm strictly increases on the degraded frame.
def test_ac6_degraded_on_feature_loss_emits_vio_output(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
healthy_payload = {
"tracked_features": 80,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
}
degraded_payload = {
"tracked_features": 5,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.5,
}
backend.script(
ScriptedOutput(produced=True, payload=healthy_payload),
ScriptedOutput(produced=True, payload=degraded_payload),
)
healthy_out = strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
degraded_out = strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
assert isinstance(degraded_out, VioOutput), "DEGRADED frame MUST emit output"
assert strategy.health_snapshot().state == VioState.DEGRADED
healthy_norm = np.linalg.norm(healthy_out.pose_covariance_6x6, ord="fro")
degraded_norm = np.linalg.norm(degraded_out.pose_covariance_6x6, ord="fro")
assert degraded_norm > healthy_norm, (
f"Frobenius norm must increase on DEGRADED frame "
f"(healthy={healthy_norm}, degraded={degraded_norm})"
)
# ===========================================================================
# AC-7: After lost_frame_threshold consecutive failures, raise VioFatalError;
# state == LOST.
def test_ac7_sustained_loss_raises_vio_fatal_error(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(lost_frame_threshold=3, warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
backend.script(
ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-1")),
ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-2")),
ScriptedOutput(raise_with=FakeVinsMonoOptimizationException("loss-3")),
)
with pytest.raises(VioInitializingError):
strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
with pytest.raises(VioInitializingError):
strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
with pytest.raises(VioFatalError):
strategy.process_frame(
_frame(idx=3, ts_ns=1_200_000_000),
_imu_window(ts_ns_start=1_199_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.LOST
# ===========================================================================
# AC-8: BUILD_VINS_MONO=OFF lazy-import guarantee — complementary check.
# (Primary AC-8 coverage lives in test_protocol_conformance.py via the
# AZ-331 factory which gates BEFORE constructor.)
def test_ac8_strategy_module_not_imported_at_package_load(
monkeypatch,
) -> None:
"""Importing `c1_vio` itself MUST NOT load `c1_vio.vins_mono`.
Risk-2 / Risk-3 guard — the factory respects the BUILD_VINS_MONO
flag and only triggers the import on demand. This complements the
test_ac5_build_vio_strategy_flag_off_no_import test in
test_protocol_conformance.py.
"""
import sys
sys.modules.pop("gps_denied_onboard.components.c1_vio.vins_mono", None)
sys.modules.pop("gps_denied_onboard.components.c1_vio", None)
import importlib
importlib.import_module("gps_denied_onboard.components.c1_vio")
assert "gps_denied_onboard.components.c1_vio.vins_mono" not in sys.modules
# ===========================================================================
# AC-9: tier2 — honest covariance Frobenius monotonically non-decreasing
# across a controlled-degradation window.
@pytest.mark.tier2
def test_ac9_honest_covariance_monotonic_during_degraded(
fake_vins_mono_binding, fdr_client
) -> None:
"""Tier-2: 60 s controlled-degradation fixture; covariance MUST not
shrink during the DEGRADED window.
The fake binding here exercises the facade's enforcement contract —
real validation against VINS-Mono's marginalised information matrix
is the Jetson-side follow-up that wires
:class:`vins_estimator::Estimator` (skeleton today).
"""
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
base_cov = np.eye(6, dtype=np.float64) * 0.01
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
*[
ScriptedOutput(
produced=True,
payload={
"tracked_features": 10,
"pose_covariance_6x6": base_cov * (1.0 + i),
},
)
for i in range(5)
],
)
outputs = []
for i in range(6):
outputs.append(
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
)
import itertools
degraded_outputs = outputs[1:] # 5 DEGRADED frames
norms = [np.linalg.norm(o.pose_covariance_6x6, ord="fro") for o in degraded_outputs]
for prev, curr in itertools.pairwise(norms):
assert curr >= prev, (
f"covariance Frobenius norm must be monotonically non-decreasing "
f"during DEGRADED; got prev={prev}, curr={curr}"
)
# ===========================================================================
# AC-10: Exactly one vio.health record per state transition; no spam on
# steady-state.
def test_ac10_fdr_vio_health_emitted_per_transition(
fake_vins_mono_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
pre_records = _drain(fdr_client)
assert pre_records == [], "construction must not emit vio.health"
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
ScriptedOutput(produced=True, payload={"tracked_features": 10}),
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
)
for i in range(4):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
records = _drain(fdr_client)
assert all(r.kind == "vio.health" for r in records)
states = [r.payload["state"] for r in records]
assert states == ["tracking", "degraded", "tracking"], (
f"unexpected transition sequence: {states}"
)
# ===========================================================================
# NFR-perf-document (tier2): VINS-Mono p95 is *recorded*, not bounded.
# Per AZ-333 task spec NFR-perf, no hard threshold — Step 9 / E-BBT
# comparative report consumes the p50/p95 numbers.
@pytest.mark.tier2
def test_nfr_perf_process_frame_records_p95(fake_vins_mono_binding, fdr_client) -> None:
"""Tier-2: Real VINS-Mono binding + Derkachi-class fixture.
Unlike :class:`Okvis2Strategy`, VINS-Mono is research-only and not
bound by C1-PT-01's ≤ 80 ms p95. We record p95 here and assert
only that it can be measured (i.e. process_frame completes 200x
without deadlock or unbounded growth). The Step 9 / E-BBT
comparative-study report ingests the produced number.
"""
import time
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeVinsMonoBackend = strategy._backend # type: ignore[attr-defined]
n = 200
backend.script(*[ScriptedOutput(produced=True) for _ in range(n)])
durations_ms: list[float] = []
for i in range(n):
t0 = time.perf_counter()
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p95 = durations_ms[int(0.95 * len(durations_ms))]
assert p95 >= 0.0, f"VinsMono p95 must be measurable (got {p95})"
# Loose sanity ceiling so a regression to seconds-per-frame fails the
# tier2 run; VINS-Mono is best-effort but not pathologically slow.
assert p95 <= 5_000.0, (
f"VinsMono process_frame p95={p95:.3f} ms grew pathologically "
"(>5 s); investigate before publishing comparative report"
)
# ===========================================================================
# Construction guards.
def test_construct_with_wrong_strategy_label_raises(
fake_vins_mono_binding, fdr_client
) -> None:
"""Constructing directly with a non-vins_mono strategy is a developer bug."""
bad_config = Config.with_blocks(c1_vio=C1VioConfig(strategy="klt_ransac"))
from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy
with pytest.raises(VioFatalError):
VinsMonoStrategy(bad_config, fdr_client=fdr_client)
def test_build_via_factory_returns_vins_mono_strategy(
fake_vins_mono_binding, fdr_client, monkeypatch
) -> None:
"""End-to-end factory wiring smoke — exercises the BUILD flag gate +
lazy import path the conformance tests don't touch for the real
`VinsMonoStrategy` class.
"""
monkeypatch.setenv("BUILD_VINS_MONO", "ON")
from gps_denied_onboard.components.c1_vio.vins_mono import VinsMonoStrategy
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
instance = build_vio_strategy(_config(), fdr_client=fdr_client)
assert isinstance(instance, VinsMonoStrategy)
assert instance.current_strategy_label() == "vins_mono"