Files
gps-denied-onboard/tests/unit/c1_vio/conftest.py
T
Oleksandr Bezdieniezhnykh 1ebab29a4f [AZ-332] C1 OKVIS2 Strategy: facade + binding skeleton
Python facade (`Okvis2Strategy`) is production-quality and satisfies
AZ-331's `VioStrategy` protocol; full AC-1..10 coverage with
AC-9 + NFR-perf marked `tier2`. The C++ pybind11 binding compiles
and loads but throws `OkvisFatalException("estimator not yet wired")`
on first `add_frame` — the `okvis::ThreadedKFVio` wiring is a tier2
follow-up the Step-15 Product Completeness Gate is expected to track
as a remediation task.

Resolved contradictions:

* Constructor signature aligned with the AZ-331 factory: `(config, *,
  fdr_client, clock=None)`. Calibration / preintegrator / logger
  built internally from config. No churn on AZ-331.
* IMU substrate: OKVIS2 owns its internal estimator IMU integration;
  the AZ-276 `ImuPreintegrator` is a separate substrate consumed by
  E-C5's fusion graph. Single source of truth lives at the sample
  stream, not the integrator instance.
* FDR API: `FdrClient.enqueue(record)` with new `vio.health` kind
  added to AZ-272 `KNOWN_PAYLOAD_KEYS`.

CI matrix forces `-DBUILD_OKVIS2=OFF` until the tier2 wiring task
brings Ceres / SuiteSparse / OKVIS2 vendored submodules into the
Linux build.

Files: 17 added/modified across `c1_vio/`, `fdr_client/records.py`,
`cpp/okvis2/CMakeLists.txt`, CI workflow, AZ-332 task spec
(implementation-notes section), batch 23 report.

Tests: 17 new (15 tier1 + 2 tier2). Full Tier-1 suite: 1109 pass,
2 skipped (env), 2 deselected (tier2). No regressions.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 09:56:45 +03:00

188 lines
6.1 KiB
Python

"""Shared fixtures for ``tests/unit/c1_vio/`` (AZ-332).
Provides a scriptable fake ``okvis2_binding`` module installed at the
``sys.modules`` boundary BEFORE the strategy's lazy import inside the
constructor runs. The fake mirrors the real binding's surface
(``Okvis2Backend`` class + 3 exception types) so :class:`Okvis2Strategy`
can be exercised on macOS dev + GitHub Actions Linux runner without
the real OKVIS2 / pybind11 native lib.
The task spec explicitly permits this for AC-3, AC-6, AC-7 backend-
exception injection (and by extension the rest of the AC suite that
exercises the Python facade only).
"""
from __future__ import annotations
import sys
from collections import deque
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import Any, Final
import numpy as np
import pytest
_BINDING_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio._native.okvis2_binding"
_STRATEGY_MODULE_NAME: Final[str] = "gps_denied_onboard.components.c1_vio.okvis2"
# ---------------------------------------------------------------------------
# Fake exception types — Python classes mirroring the C++ side.
class FakeOkvisInitException(Exception):
pass
class FakeOkvisFatalException(Exception):
pass
class FakeOkvisOptimizationException(Exception):
pass
# ---------------------------------------------------------------------------
# Scripted output payload — what the fake backend pops on each add_frame.
@dataclass
class ScriptedOutput:
"""A single scripted backend response.
``produced`` mirrors the real binding's ``add_frame`` return: True
means a new estimator output is available via
:meth:`Okvis2Backend.get_latest_output`. ``raise_with`` (if not
None) is raised from ``add_frame`` instead of producing an output.
"""
produced: bool = True
raise_with: Exception | None = None
payload: dict[str, Any] = field(default_factory=dict)
def _make_default_payload(frame_id: str = "frame-0001") -> dict[str, Any]:
"""A 'tracking' payload — SPD covariance, tracked > threshold."""
return {
"frame_id": frame_id,
"pose_T_world_body": np.eye(4, dtype=np.float64),
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
"accel_bias": np.zeros(3, dtype=np.float64),
"gyro_bias": np.zeros(3, dtype=np.float64),
"tracked_features": 80,
"new_features": 3,
"lost_features": 1,
"mean_parallax": 5.0,
"mre_px": 0.8,
"emitted_at_ns": 1_000_000_000,
}
# ---------------------------------------------------------------------------
# Scriptable fake Okvis2Backend.
class FakeOkvis2Backend:
def __init__(
self,
yaml_config: str,
camera_intrinsics_3x3: np.ndarray,
) -> None:
self.yaml_config = yaml_config
self.camera_intrinsics_3x3 = np.asarray(camera_intrinsics_3x3, dtype=np.float64)
self._scripted: deque[ScriptedOutput] = deque()
self._latest: dict[str, Any] | None = None
self._frames_seen: list[tuple[str, int]] = []
self._imu_samples: list[tuple[int, np.ndarray, np.ndarray]] = []
self._reset_calls: int = 0
self._health: dict[str, Any] = {
"state": "init",
"consecutive_lost": 0,
"bias_norm": 0.0,
}
# Test-only API — caller scripts the queue of responses.
def script(self, *outputs: ScriptedOutput) -> None:
self._scripted.extend(outputs)
# ---- Real surface mirrored 1:1 with the C++ binding. ----
def add_frame(self, frame_id: str, ts_ns: int, image: np.ndarray) -> bool:
self._frames_seen.append((frame_id, ts_ns))
if not self._scripted:
self._latest = _make_default_payload(frame_id)
return True
head = self._scripted.popleft()
if head.raise_with is not None:
raise head.raise_with
if head.produced:
payload = dict(_make_default_payload(frame_id))
payload.update(head.payload)
payload["frame_id"] = frame_id
self._latest = payload
return head.produced
def add_imu(self, ts_ns: int, accel: np.ndarray, gyro: np.ndarray) -> None:
self._imu_samples.append((ts_ns, np.asarray(accel), np.asarray(gyro)))
def get_latest_output(self) -> dict[str, Any] | None:
return self._latest
def reset(
self,
body_T_world: np.ndarray,
velocity: np.ndarray,
accel_bias: np.ndarray,
gyro_bias: np.ndarray,
) -> None:
self._reset_calls += 1
self._latest = None
self._health["state"] = "init"
self._health["consecutive_lost"] = 0
def health(self) -> dict[str, Any]:
return dict(self._health)
# ---- Test introspection helpers (NOT part of the real binding). ----
@property
def frames_seen(self) -> list[tuple[str, int]]:
return list(self._frames_seen)
@property
def reset_call_count(self) -> int:
return self._reset_calls
# ---------------------------------------------------------------------------
# Module fixture — installs fake `_native.okvis2_binding` at sys.modules.
@pytest.fixture
def fake_okvis2_binding(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[type[FakeOkvis2Backend]]:
"""Install a fake ``okvis2_binding`` module at the import boundary.
Cleans up both the binding module and the strategy module so each
test starts with a fresh lazy-import state.
"""
import types
fake_module = types.ModuleType(_BINDING_MODULE_NAME)
fake_module.Okvis2Backend = FakeOkvis2Backend # type: ignore[attr-defined]
fake_module.OkvisInitException = FakeOkvisInitException # type: ignore[attr-defined]
fake_module.OkvisFatalException = FakeOkvisFatalException # type: ignore[attr-defined]
fake_module.OkvisOptimizationException = ( # type: ignore[attr-defined]
FakeOkvisOptimizationException
)
sys.modules.pop(_BINDING_MODULE_NAME, None)
sys.modules.pop(_STRATEGY_MODULE_NAME, None)
monkeypatch.setitem(sys.modules, _BINDING_MODULE_NAME, fake_module)
yield FakeOkvis2Backend
sys.modules.pop(_STRATEGY_MODULE_NAME, None)