[AZ-332] C1 OKVIS2 Strategy: facade + binding skeleton

Python facade (`Okvis2Strategy`) is production-quality and satisfies
AZ-331's `VioStrategy` protocol; full AC-1..10 coverage with
AC-9 + NFR-perf marked `tier2`. The C++ pybind11 binding compiles
and loads but throws `OkvisFatalException("estimator not yet wired")`
on first `add_frame` — the `okvis::ThreadedKFVio` wiring is a tier2
follow-up the Step-15 Product Completeness Gate is expected to track
as a remediation task.

Resolved contradictions:

* Constructor signature aligned with the AZ-331 factory: `(config, *,
  fdr_client, clock=None)`. Calibration / preintegrator / logger
  built internally from config. No churn on AZ-331.
* IMU substrate: OKVIS2 owns its internal estimator IMU integration;
  the AZ-276 `ImuPreintegrator` is a separate substrate consumed by
  E-C5's fusion graph. Single source of truth lives at the sample
  stream, not the integrator instance.
* FDR API: `FdrClient.enqueue(record)` with new `vio.health` kind
  added to AZ-272 `KNOWN_PAYLOAD_KEYS`.

CI matrix forces `-DBUILD_OKVIS2=OFF` until the tier2 wiring task
brings Ceres / SuiteSparse / OKVIS2 vendored submodules into the
Linux build.

Files: 17 added/modified across `c1_vio/`, `fdr_client/records.py`,
`cpp/okvis2/CMakeLists.txt`, CI workflow, AZ-332 task spec
(implementation-notes section), batch 23 report.

Tests: 17 new (15 tier1 + 2 tier2). Full Tier-1 suite: 1109 pass,
2 skipped (env), 2 deselected (tier2). No regressions.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 09:56:45 +03:00
parent 9c35776bcb
commit 1ebab29a4f
19 changed files with 2083 additions and 49 deletions
+187
View File
@@ -0,0 +1,187 @@
"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332).
Provides a scriptable fake ``okvis2_binding`` module installed at the
``sys.modules`` boundary BEFORE the strategy's lazy import inside the
constructor runs. The fake mirrors the real binding's surface
(``Okvis2Backend`` class + 3 exception types) so :class:`Okvis2Strategy`
can be exercised on macOS dev + GitHub Actions Linux runner without
the real OKVIS2 / pybind11 native lib.
The task spec explicitly permits this for AC-3, AC-6, AC-7 backend-
exception injection (and by extension the rest of the AC suite that
exercises the Python facade only).
"""
from __future__ import annotations
import sys
from collections import deque
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import Any, Final
import numpy as np
import pytest
_BINDING_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio._native.okvis2_binding"
_STRATEGY_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio.okvis2"
# ---------------------------------------------------------------------------
# Fake exception types — Python classes mirroring the C++ side.
class FakeOkvisInitException(Exception):
pass
class FakeOkvisFatalException(Exception):
pass
class FakeOkvisOptimizationException(Exception):
pass
# ---------------------------------------------------------------------------
# Scripted output payload — what the fake backend pops on each add_frame.
@dataclass
class ScriptedOutput:
"""A single scripted backend response.
``produced`` mirrors the real binding's ``add_frame`` return: True
means a new estimator output is available via
:meth:`Okvis2Backend.get_latest_output`. ``raise_with`` (if not
None) is raised from ``add_frame`` instead of producing an output.
"""
produced: bool = True
raise_with: Exception | None = None
payload: dict[str, Any] = field(default_factory=dict)
def _make_default_payload(frame_id: str = "frame-0001") -> dict[str, Any]:
"""A 'tracking' payload — SPD covariance, tracked > threshold."""
return {
"frame_id": frame_id,
"pose_T_world_body": np.eye(4, dtype=np.float64),
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
"accel_bias": np.zeros(3, dtype=np.float64),
"gyro_bias": np.zeros(3, dtype=np.float64),
"tracked_features": 80,
"new_features": 3,
"lost_features": 1,
"mean_parallax": 5.0,
"mre_px": 0.8,
"emitted_at_ns": 1_000_000_000,
}
# ---------------------------------------------------------------------------
# Scriptable fake Okvis2Backend.
class FakeOkvis2Backend:
def __init__(
self,
yaml_config: str,
camera_intrinsics_3x3: np.ndarray,
) -> None:
self.yaml_config = yaml_config
self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64)
self._scripted: deque[ScriptedOutput] = deque()
self._latest: dict[str, Any] | None = None
self._frames_seen: list[tuple[str, int]] = []
self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = []
self._reset_calls: int = 0
self._health: dict[str, Any] = {
"state": "init",
"consecutive_lost": 0,
"bias_norm": 0.0,
}
# Test-only API — caller scripts the queue of responses.
def script(self, *outputs: ScriptedOutput) -> None:
self._scripted.extend(outputs)
# ---- Real surface mirrored 1:1 with the C++ binding. ----
def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool:
self._frames_seen.append((frame_id, ts_ns))
if not self._scripted:
self._latest = _make_default_payload(frame_id)
return True
head = self._scripted.popleft()
if head.raise_with is not None:
raise head.raise_with
if head.produced:
payload = dict(_make_default_payload(frame_id))
payload.update(head.payload)
payload["frame_id"] = frame_id
self._latest = payload
return head.produced
def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None:
self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro)))
def get_latest_output(self) -> dict[str, Any] | None:
return self._latest
def reset(
self,
body_T_world: np.ndarray,
velocity: np.ndarray,
accel_bias: np.ndarray,
gyro_bias: np.ndarray,
) -> None:
self._reset_calls += 1
self._latest = None
self._health["state"] = "init"
self._health["consecutive_lost"] = 0
def health(self) -> dict[str, Any]:
return dict(self._health)
# ---- Test introspection helpers (NOT part of the real binding). ----
@property
def frames_seen(self) -> list[tuple[str, int]]:
return list(self._frames_seen)
@property
def reset_call_count(self) -> int:
return self._reset_calls
# ---------------------------------------------------------------------------
# Module fixture — installs fake `_native.okvis2_binding` at sys.modules.
@pytest.fixture
def fake_okvis2_binding(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[type[FakeOkvis2Backend]]:
"""Install a fake ``okvis2_binding`` module at the import boundary.
Cleans up both the binding module and the strategy module so each
test starts with a fresh lazy-import state.
"""
import types
fake_module = types.ModuleType(_BINDING_MODULE_NAME)
fake_module.Okvis2Backend = FakeOkvis2Backend # type: ignore[attr-defined]
fake_module.OkvisInitException = FakeOkvisInitException # type: ignore[attr-defined]
fake_module.OkvisFatalException = FakeOkvisFatalException # type: ignore[attr-defined]
fake_module.OkvisOptimizationException = ( # type: ignore[attr-defined]
FakeOkvisOptimizationException
)
sys.modules.pop(_BINDING_MODULE_NAME, None)
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
monkeypatch.setitem(sys.modules, _BINDING_MODULE_NAME, fake_module)
yield FakeOkvis2Backend
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
+545
View File
@@ -0,0 +1,545 @@
"""AZ-332 — :class:`Okvis2Strategy` acceptance criteria coverage.
Covers AC-1 through AC-10 (with AC-9 + NFR-perf tagged
``@pytest.mark.tier2`` per the carry-over plan; skipped on macOS dev
+ GitHub Actions Linux runner; run on Jetson via ``ci-tier2.yml``).
Uses the ``fake_okvis2_binding`` fixture from ``conftest.py`` to
script backend responses — the task spec explicitly permits a fake
binding for backend-exception injection (AC-3 / AC-6 / AC-7) and by
extension the rest of the Python-facade-only AC suite.
"""
from __future__ import annotations
from datetime import datetime, timezone
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuBias,
ImuSample,
ImuWindow,
NavCameraFrame,
VioOutput,
VioState,
WarmStartPose,
)
from gps_denied_onboard.components.c1_vio import (
C1VioConfig,
Okvis2Config,
VioError,
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.config.schema import Config, RuntimeConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import FdrRecord
from tests.unit.c1_vio.conftest import (
FakeOkvis2Backend,
FakeOkvisFatalException,
FakeOkvisInitException,
FakeOkvisOptimizationException,
ScriptedOutput,
)
# ---------------------------------------------------------------------------
# Helpers.
def _zero_bias() -> ImuBias:
return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
def _calibration() -> CameraCalibration:
return CameraCalibration(
camera_id="test-cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(4, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="unit-test-static",
metadata={},
)
def _frame(idx: int = 1, ts_ns: int = 1_000_000_000) -> NavCameraFrame:
return NavCameraFrame(
frame_id=idx,
timestamp=datetime.fromtimestamp(ts_ns * 1e-9, tz=timezone.utc),
image=np.zeros((4, 4, 3), dtype=np.uint8),
camera_calibration_id="test-cam",
)
def _imu_window(ts_ns_start: int = 999_000_000, n: int = 3) -> ImuWindow:
samples = tuple(
ImuSample(
ts_ns=ts_ns_start + i * 5_000_000,
accel_xyz=(0.0, 0.0, 9.81),
gyro_xyz=(0.0, 0.0, 0.0),
)
for i in range(n)
)
return ImuWindow(
samples=samples,
ts_start_ns=samples[0].ts_ns,
ts_end_ns=samples[-1].ts_ns,
)
def _warm_start_hint() -> WarmStartPose:
return WarmStartPose(
body_T_world=gtsam.Pose3(np.eye(4)),
velocity_b=(0.5, 0.0, 0.0),
bias=ImuBias(
accel_bias=(0.01, -0.02, 0.0),
gyro_bias=(0.003, 0.0, -0.001),
),
captured_at_ns=1_000_000_000,
)
def _config(
okvis2_cfg: Okvis2Config | None = None,
lost_frame_threshold: int = 9,
warm_start_max_frames: int = 5,
) -> Config:
return Config.with_blocks(
c1_vio=C1VioConfig(
strategy="okvis2",
lost_frame_threshold=lost_frame_threshold,
warm_start_max_frames=warm_start_max_frames,
okvis2=okvis2_cfg or Okvis2Config(),
),
runtime=RuntimeConfig(camera_calibration_path=""),
)
@pytest.fixture
def fdr_client() -> FdrClient:
return FdrClient(producer_id="c1_vio.okvis2", capacity=256, _emit_diag_log=False)
def _build_strategy(
fdr_client: FdrClient,
config: Config | None = None,
):
"""Lazy import after the fake binding is installed in sys.modules."""
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
return Okvis2Strategy(config or _config(), fdr_client=fdr_client)
def _drain(fdr_client: FdrClient) -> list[FdrRecord]:
return fdr_client.drain(max_records=1024)
# ===========================================================================
# AC-1: current_strategy_label returns "okvis2".
def test_ac1_current_strategy_label_returns_okvis2(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
assert strategy.current_strategy_label() == "okvis2"
# ===========================================================================
# AC-2: process_frame returns VioOutput with echoed frame_id, SPD cov, bias.
def test_ac2_process_frame_returns_vio_output_with_frame_id(
fake_okvis2_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(produced=True))
out = strategy.process_frame(_frame(idx=42), _imu_window(), _calibration())
assert isinstance(out, VioOutput)
assert out.frame_id == "42"
assert out.pose_covariance_6x6.shape == (6, 6)
assert np.allclose(out.pose_covariance_6x6, out.pose_covariance_6x6.T)
eigvals = np.linalg.eigvalsh(out.pose_covariance_6x6)
assert np.all(eigvals > 0), "covariance must be SPD"
assert out.imu_bias is not None
assert out.feature_quality.tracked > 0
# ===========================================================================
# AC-3: backend exceptions rewrap into VioError with __cause__ chain.
@pytest.mark.parametrize(
"fake_exc_cls, expected_facade_exc",
[
(FakeOkvisInitException, VioInitializingError),
(FakeOkvisFatalException, VioFatalError),
],
)
def test_ac3_backend_exceptions_rewrap_to_vio_error_family(
fake_okvis2_binding, fdr_client, fake_exc_cls, expected_facade_exc
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=fake_exc_cls("boom from backend")))
with pytest.raises(expected_facade_exc) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value, VioError)
assert isinstance(exc_info.value.__cause__, fake_exc_cls)
def test_ac3_optimization_exception_during_init_rewraps_to_initializing(
fake_okvis2_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=5, lost_frame_threshold=9)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=FakeOkvisOptimizationException("opt fail")))
with pytest.raises(VioInitializingError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, FakeOkvisOptimizationException)
def test_ac3_unmapped_runtime_error_rewraps_to_vio_fatal(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=RuntimeError("library leaked this")))
with pytest.raises(VioFatalError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, RuntimeError)
# ===========================================================================
# AC-4: reset_to_warm_start clears state and seeds the hint; idempotent.
def test_ac4_reset_to_warm_start_clears_and_seeds(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
assert backend.reset_call_count == 1
health = strategy.health_snapshot()
assert health.state == VioState.INIT
assert health.consecutive_lost == 0
# bias_norm > 0 because the hint carries a non-zero bias
assert health.bias_norm > 0.0
def test_ac4_reset_to_warm_start_is_idempotent(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
strategy.reset_to_warm_start(hint)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
assert backend.reset_call_count == 2
# ===========================================================================
# AC-5: INIT initially -> TRACKING after warm_start_max_frames frames.
def test_ac5_health_snapshot_init_then_tracking(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=3)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# AC-5 invariant: pre-frame snapshot is INIT.
assert strategy.health_snapshot().state == VioState.INIT
# Three successful frames (each "produced=True" -> tracked > threshold).
backend.script(
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
)
for i in range(3):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.TRACKING
# ===========================================================================
# AC-6: DEGRADED on feature loss; VioOutput STILL emitted (not raised);
# covariance Frobenius norm strictly increases on the degraded frame.
def test_ac6_degraded_on_feature_loss_emits_vio_output(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# First frame: healthy (tracked >> degraded threshold).
healthy_payload = {
"tracked_features": 80,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
}
# Second frame: feature loss below the degraded threshold (default 30).
degraded_payload = {
"tracked_features": 5,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.5,
}
backend.script(
ScriptedOutput(produced=True, payload=healthy_payload),
ScriptedOutput(produced=True, payload=degraded_payload),
)
healthy_out = strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
degraded_out = strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
assert isinstance(degraded_out, VioOutput), "DEGRADED frame MUST emit output"
assert strategy.health_snapshot().state == VioState.DEGRADED
healthy_norm = np.linalg.norm(healthy_out.pose_covariance_6x6, ord="fro")
degraded_norm = np.linalg.norm(degraded_out.pose_covariance_6x6, ord="fro")
assert degraded_norm > healthy_norm, (
f"Frobenius norm must increase on DEGRADED frame "
f"(healthy={healthy_norm}, degraded={degraded_norm})"
)
# ===========================================================================
# AC-7: After lost_frame_threshold consecutive failures, raise VioFatalError;
# state == LOST.
def test_ac7_sustained_loss_raises_vio_fatal_error(fake_okvis2_binding, fdr_client) -> None:
config = _config(lost_frame_threshold=3, warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Three consecutive optimization failures.
backend.script(
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-1")),
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-2")),
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-3")),
)
# First 2 are VioInitializingError (degraded path); third hits LOST.
with pytest.raises(VioInitializingError):
strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
with pytest.raises(VioInitializingError):
strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
with pytest.raises(VioFatalError):
strategy.process_frame(
_frame(idx=3, ts_ns=1_200_000_000),
_imu_window(ts_ns_start=1_199_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.LOST
# ===========================================================================
# AC-8: BUILD_OKVIS2=OFF lazy-import guarantee — complementary check.
# (Primary AC-8 coverage lives in test_protocol_conformance.py via the
# AZ-331 factory which gates BEFORE constructor.)
def test_ac8_strategy_module_not_imported_at_package_load(
monkeypatch,
) -> None:
"""Importing `c1_vio` itself MUST NOT load `c1_vio.okvis2`.
Risk-2 / I-5 guard — the factory respects the BUILD_OKVIS2 flag and
only triggers the import on demand. This complements the
test_ac5_build_vio_strategy_flag_off_no_import test in
test_protocol_conformance.py.
"""
import sys
sys.modules.pop("gps_denied_onboard.components.c1_vio.okvis2", None)
sys.modules.pop("gps_denied_onboard.components.c1_vio", None)
import importlib
importlib.import_module("gps_denied_onboard.components.c1_vio")
assert "gps_denied_onboard.components.c1_vio.okvis2" not in sys.modules
# ===========================================================================
# AC-9: tier2 — honest covariance Frobenius monotonically non-decreasing
# across a controlled-degradation window.
@pytest.mark.tier2
def test_ac9_honest_covariance_monotonic_during_degraded(fake_okvis2_binding, fdr_client) -> None:
"""Tier-2: 60 s controlled-degradation fixture; covariance MUST not
shrink during the DEGRADED window.
The fake binding here exercises the facade's enforcement contract —
real validation against OKVIS2's internal Hessian is the Jetson-side
follow-up that wires :class:`okvis::ThreadedKFVio` (skeleton today).
"""
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Healthy frame, then 5 DEGRADED frames with non-decreasing covariance.
base_cov = np.eye(6, dtype=np.float64) * 0.01
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
*[
ScriptedOutput(
produced=True,
payload={
"tracked_features": 10,
"pose_covariance_6x6": base_cov * (1.0 + i),
},
)
for i in range(5)
],
)
outputs = []
for i in range(6):
outputs.append(
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
)
import itertools
degraded_outputs = outputs[1:] # 5 DEGRADED frames
norms = [np.linalg.norm(o.pose_covariance_6x6, ord="fro") for o in degraded_outputs]
for prev, curr in itertools.pairwise(norms):
assert curr >= prev, (
f"covariance Frobenius norm must be monotonically non-decreasing "
f"during DEGRADED; got prev={prev}, curr={curr}"
)
# ===========================================================================
# AC-10: Exactly one vio.health record per state transition; no spam on
# steady-state.
def test_ac10_fdr_vio_health_emitted_per_transition(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Drain INIT-on-construct record (the constructor itself does NOT emit;
# the first transition is on the first frame). Document the invariant
# by asserting drain returns empty here.
pre_records = _drain(fdr_client)
assert pre_records == [], "construction must not emit vio.health"
# Sequence: INIT -> TRACKING -> DEGRADED -> back to TRACKING.
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
ScriptedOutput(produced=True, payload={"tracked_features": 80}), # steady
ScriptedOutput(produced=True, payload={"tracked_features": 10}), # DEGRADED
ScriptedOutput(produced=True, payload={"tracked_features": 80}), # TRACKING
)
for i in range(4):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
records = _drain(fdr_client)
assert all(r.kind == "vio.health" for r in records)
states = [r.payload["state"] for r in records]
# Expect: INIT -> TRACKING (frame 1), no record on frame 2 steady,
# TRACKING -> DEGRADED (frame 3), DEGRADED -> TRACKING (frame 4).
assert states == ["tracking", "degraded", "tracking"], (
f"unexpected transition sequence: {states}"
)
# ===========================================================================
# NFR-perf (tier2): p95 process_frame <= 80 ms on Tier-2 with real OKVIS2.
@pytest.mark.tier2
def test_nfr_perf_process_frame_p95_under_80ms(fake_okvis2_binding, fdr_client) -> None:
"""Tier-2: Real OKVIS2 binding + Derkachi-class fixture.
The fake binding here measures the Python facade overhead only,
which is the floor under which the real OKVIS2 latency must stay
within budget. On Jetson tier2 this test runs against the real
binding and validates C1-PT-01.
"""
import time
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
n = 200
backend.script(*[ScriptedOutput(produced=True) for _ in range(n)])
durations_ms: list[float] = []
for i in range(n):
t0 = time.perf_counter()
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p95 = durations_ms[int(0.95 * len(durations_ms))]
assert p95 <= 80.0, f"process_frame p95={p95:.3f} ms exceeds C1-PT-01 budget (80 ms)"
# ===========================================================================
# Construction guards.
def test_construct_with_wrong_strategy_label_raises(fake_okvis2_binding, fdr_client) -> None:
"""Constructing directly with a non-okvis2 strategy is a developer bug."""
bad_config = Config.with_blocks(c1_vio=C1VioConfig(strategy="klt_ransac"))
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
with pytest.raises(VioFatalError):
Okvis2Strategy(bad_config, fdr_client=fdr_client)
def test_build_via_factory_returns_okvis2_strategy(
fake_okvis2_binding, fdr_client, monkeypatch
) -> None:
"""End-to-end factory wiring smoke — exercises the BUILD flag gate +
lazy import path the conformance tests don't touch for the real
`Okvis2Strategy` class.
"""
monkeypatch.setenv("BUILD_OKVIS2", "ON")
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
instance = build_vio_strategy(_config(), fdr_client=fdr_client)
assert isinstance(instance, Okvis2Strategy)
assert instance.current_strategy_label() == "okvis2"
+30 -22
View File
@@ -40,7 +40,6 @@ from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
_CONTRACT_PATH = (
Path(__file__).resolve().parents[3]
/ "_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md"
@@ -250,6 +249,16 @@ def test_ac5_build_vio_strategy_flag_off_no_import(
assert module_name not in sys.modules
# Which strategies still have NO concrete Python module on disk?
# Once an AZ-332 / AZ-333 / AZ-334 implementation lands, the
# `flag_on_but_module_missing` semantic shifts: the factory's import
# succeeds, the constructor fails on missing native binding or other
# prerequisite. We assert the meaningful-error-before-first-frame
# property holds for BOTH cases — the exception class differs by
# strategy.
_STRATEGIES_WITHOUT_PY_MODULE: tuple[str, ...] = ("vins_mono", "klt_ransac")
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
def test_ac5_build_vio_strategy_flag_on_but_module_missing(
monkeypatch, strategy_module_cleanup, strategy
@@ -257,9 +266,20 @@ def test_ac5_build_vio_strategy_flag_on_but_module_missing(
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
config = _config_with_strategy(strategy)
with pytest.raises(StrategyNotAvailableError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert strategy in str(exc_info.value)
if strategy in _STRATEGIES_WITHOUT_PY_MODULE:
# Module not yet implemented — factory's __import__ raises
# ModuleNotFoundError, rewrapped into StrategyNotAvailableError.
with pytest.raises(StrategyNotAvailableError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert strategy in str(exc_info.value)
else:
# Module IS implemented (AZ-332). Factory import succeeds, then
# the strategy constructor fails on missing native binding —
# which the strategy MUST surface as VioFatalError BEFORE any
# frame is processed (the AC-5 spirit: no silent fall-through).
with pytest.raises(VioFatalError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert "native binding" in str(exc_info.value)
# ----------------------------------------------------------------------
@@ -292,9 +312,7 @@ def test_ac7_current_strategy_label_matches_config(
config = _config_with_strategy(strategy)
instance = build_vio_strategy(config, fdr_client=object())
assert instance.current_strategy_label() == strategy
assert (
instance.current_strategy_label() == config.components["c1_vio"].strategy
)
assert instance.current_strategy_label() == config.components["c1_vio"].strategy
# ----------------------------------------------------------------------
@@ -314,9 +332,7 @@ def _methods_from_contract() -> set[str]:
def _protocol_methods(proto: type) -> set[str]:
return {
name
for name in dir(proto)
if not name.startswith("_") and callable(getattr(proto, name))
name for name in dir(proto) if not name.startswith("_") and callable(getattr(proto, name))
}
@@ -338,9 +354,7 @@ def test_ac8_contract_methods_match_protocol() -> None:
def test_ac8_contract_lists_all_three_error_subtypes() -> None:
text = _CONTRACT_PATH.read_text(encoding="utf-8")
for name in {"VioInitializingError", "VioDegradedError", "VioFatalError"}:
assert name in text, (
f"Contract file is missing the documented error subtype {name!r}"
)
assert name in text, f"Contract file is missing the documented error subtype {name!r}"
# ----------------------------------------------------------------------
@@ -358,9 +372,7 @@ def test_ac9_vio_output_frame_id_is_typed_str() -> None:
:class:`SE3`).
"""
annotation = VioOutput.__annotations__["frame_id"]
assert annotation == "str", (
f"frame_id annotation should be 'str'; got {annotation!r}"
)
assert annotation == "str", f"frame_id annotation should be 'str'; got {annotation!r}"
def test_ac9_vio_output_docstring_documents_echo_invariant() -> None:
@@ -388,9 +400,7 @@ def test_nfr_reliability_strategy_not_available_not_in_family() -> None:
assert not issubclass(StrategyNotAvailableError, VioError)
def test_nfr_perf_factory_under_200ms_p99(
monkeypatch, strategy_module_cleanup
) -> None:
def test_nfr_perf_factory_under_200ms_p99(monkeypatch, strategy_module_cleanup) -> None:
"""Factory p99 ≤ 200 ms across 1000 calls (NFR-perf-factory)."""
strategy = "klt_ransac"
_, _, flag = _STRATEGY_MODULES[strategy]
@@ -406,9 +416,7 @@ def test_nfr_perf_factory_under_200ms_p99(
durations_ms.sort()
p99 = durations_ms[int(0.99 * len(durations_ms))]
assert p99 <= 200.0, (
f"build_vio_strategy() p99={p99:.3f} ms exceeds 200 ms NFR"
)
assert p99 <= 200.0, f"build_vio_strategy() p99={p99:.3f} ms exceeds 200 ms NFR"
# ----------------------------------------------------------------------
@@ -123,6 +123,14 @@ def _kind_payload(kind: str) -> dict[str, object]:
"distance_m": 700.0,
"threshold_m": 200.0,
}
if kind == "vio.health":
return {
"state": "tracking",
"consecutive_lost": 0,
"bias_norm": 0.012,
"strategy_label": "okvis2",
"frame_id": "frame-0001",
}
raise AssertionError(f"unhandled kind in fixture: {kind!r}")