Files
Oleksandr Bezdieniezhnykh 1ebab29a4f [AZ-332] C1 OKVIS2 Strategy: facade + binding skeleton
Python facade (`Okvis2Strategy`) is production-quality and satisfies
AZ-331's `VioStrategy` protocol; full AC-1..10 coverage with
AC-9 + NFR-perf marked `tier2`. The C++ pybind11 binding compiles
and loads but throws `OkvisFatalException("estimator not yet wired")`
on first `add_frame` — the `okvis::ThreadedKFVio` wiring is a tier2
follow-up the Step-15 Product Completeness Gate is expected to track
as a remediation task.

Resolved contradictions:

* Constructor signature aligned with the AZ-331 factory: `(config, *,
  fdr_client, clock=None)`. Calibration / preintegrator / logger
  built internally from config. No churn on AZ-331.
* IMU substrate: OKVIS2 owns its internal estimator IMU integration;
  the AZ-276 `ImuPreintegrator` is a separate substrate consumed by
  E-C5's fusion graph. Single source of truth lives at the sample
  stream, not the integrator instance.
* FDR API: `FdrClient.enqueue(record)` with new `vio.health` kind
  added to AZ-272 `KNOWN_PAYLOAD_KEYS`.

CI matrix forces `-DBUILD_OKVIS2=OFF` until the tier2 wiring task
brings Ceres / SuiteSparse / OKVIS2 vendored submodules into the
Linux build.

Files: 17 added/modified across `c1_vio/`, `fdr_client/records.py`,
`cpp/okvis2/CMakeLists.txt`, CI workflow, AZ-332 task spec
(implementation-notes section), batch 23 report.

Tests: 17 new (15 tier1 + 2 tier2). Full Tier-1 suite: 1109 pass,
2 skipped (env), 2 deselected (tier2). No regressions.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 09:56:45 +03:00

546 lines
20 KiB
Python

"""AZ-332 — :class:`Okvis2Strategy` acceptance criteria coverage.
Covers AC-1 through AC-10 (with AC-9 + NFR-perf tagged
``@pytest.mark.tier2`` per the carry-over plan; skipped on macOS dev
+ GitHub Actions Linux runner; run on Jetson via ``ci-tier2.yml``).
Uses the ``fake_okvis2_binding`` fixture from ``conftest.py`` to
script backend responses — the task spec explicitly permits a fake
binding for backend-exception injection (AC-3 / AC-6 / AC-7) and by
extension the rest of the Python-facade-only AC suite.
"""
from __future__ import annotations
from datetime import datetime, timezone
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuBias,
ImuSample,
ImuWindow,
NavCameraFrame,
VioOutput,
VioState,
WarmStartPose,
)
from gps_denied_onboard.components.c1_vio import (
C1VioConfig,
Okvis2Config,
VioError,
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.config.schema import Config, RuntimeConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import FdrRecord
from tests.unit.c1_vio.conftest import (
FakeOkvis2Backend,
FakeOkvisFatalException,
FakeOkvisInitException,
FakeOkvisOptimizationException,
ScriptedOutput,
)
# ---------------------------------------------------------------------------
# Helpers.
def _zero_bias() -> ImuBias:
return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
def _calibration() -> CameraCalibration:
return CameraCalibration(
camera_id="test-cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(4, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="unit-test-static",
metadata={},
)
def _frame(idx: int = 1, ts_ns: int = 1_000_000_000) -> NavCameraFrame:
return NavCameraFrame(
frame_id=idx,
timestamp=datetime.fromtimestamp(ts_ns * 1e-9, tz=timezone.utc),
image=np.zeros((4, 4, 3), dtype=np.uint8),
camera_calibration_id="test-cam",
)
def _imu_window(ts_ns_start: int = 999_000_000, n: int = 3) -> ImuWindow:
samples = tuple(
ImuSample(
ts_ns=ts_ns_start + i * 5_000_000,
accel_xyz=(0.0, 0.0, 9.81),
gyro_xyz=(0.0, 0.0, 0.0),
)
for i in range(n)
)
return ImuWindow(
samples=samples,
ts_start_ns=samples[0].ts_ns,
ts_end_ns=samples[-1].ts_ns,
)
def _warm_start_hint() -> WarmStartPose:
return WarmStartPose(
body_T_world=gtsam.Pose3(np.eye(4)),
velocity_b=(0.5, 0.0, 0.0),
bias=ImuBias(
accel_bias=(0.01, -0.02, 0.0),
gyro_bias=(0.003, 0.0, -0.001),
),
captured_at_ns=1_000_000_000,
)
def _config(
okvis2_cfg: Okvis2Config | None = None,
lost_frame_threshold: int = 9,
warm_start_max_frames: int = 5,
) -> Config:
return Config.with_blocks(
c1_vio=C1VioConfig(
strategy="okvis2",
lost_frame_threshold=lost_frame_threshold,
warm_start_max_frames=warm_start_max_frames,
okvis2=okvis2_cfg or Okvis2Config(),
),
runtime=RuntimeConfig(camera_calibration_path=""),
)
@pytest.fixture
def fdr_client() -> FdrClient:
return FdrClient(producer_id="c1_vio.okvis2", capacity=256, _emit_diag_log=False)
def _build_strategy(
fdr_client: FdrClient,
config: Config | None = None,
):
"""Lazy import after the fake binding is installed in sys.modules."""
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
return Okvis2Strategy(config or _config(), fdr_client=fdr_client)
def _drain(fdr_client: FdrClient) -> list[FdrRecord]:
return fdr_client.drain(max_records=1024)
# ===========================================================================
# AC-1: current_strategy_label returns "okvis2".
def test_ac1_current_strategy_label_returns_okvis2(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
assert strategy.current_strategy_label() == "okvis2"
# ===========================================================================
# AC-2: process_frame returns VioOutput with echoed frame_id, SPD cov, bias.
def test_ac2_process_frame_returns_vio_output_with_frame_id(
fake_okvis2_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(produced=True))
out = strategy.process_frame(_frame(idx=42), _imu_window(), _calibration())
assert isinstance(out, VioOutput)
assert out.frame_id == "42"
assert out.pose_covariance_6x6.shape == (6, 6)
assert np.allclose(out.pose_covariance_6x6, out.pose_covariance_6x6.T)
eigvals = np.linalg.eigvalsh(out.pose_covariance_6x6)
assert np.all(eigvals > 0), "covariance must be SPD"
assert out.imu_bias is not None
assert out.feature_quality.tracked > 0
# ===========================================================================
# AC-3: backend exceptions rewrap into VioError with __cause__ chain.
@pytest.mark.parametrize(
"fake_exc_cls, expected_facade_exc",
[
(FakeOkvisInitException, VioInitializingError),
(FakeOkvisFatalException, VioFatalError),
],
)
def test_ac3_backend_exceptions_rewrap_to_vio_error_family(
fake_okvis2_binding, fdr_client, fake_exc_cls, expected_facade_exc
) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=fake_exc_cls("boom from backend")))
with pytest.raises(expected_facade_exc) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value, VioError)
assert isinstance(exc_info.value.__cause__, fake_exc_cls)
def test_ac3_optimization_exception_during_init_rewraps_to_initializing(
fake_okvis2_binding, fdr_client
) -> None:
config = _config(warm_start_max_frames=5, lost_frame_threshold=9)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=FakeOkvisOptimizationException("opt fail")))
with pytest.raises(VioInitializingError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, FakeOkvisOptimizationException)
def test_ac3_unmapped_runtime_error_rewraps_to_vio_fatal(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
backend.script(ScriptedOutput(raise_with=RuntimeError("library leaked this")))
with pytest.raises(VioFatalError) as exc_info:
strategy.process_frame(_frame(), _imu_window(), _calibration())
assert isinstance(exc_info.value.__cause__, RuntimeError)
# ===========================================================================
# AC-4: reset_to_warm_start clears state and seeds the hint; idempotent.
def test_ac4_reset_to_warm_start_clears_and_seeds(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
assert backend.reset_call_count == 1
health = strategy.health_snapshot()
assert health.state == VioState.INIT
assert health.consecutive_lost == 0
# bias_norm > 0 because the hint carries a non-zero bias
assert health.bias_norm > 0.0
def test_ac4_reset_to_warm_start_is_idempotent(fake_okvis2_binding, fdr_client) -> None:
strategy = _build_strategy(fdr_client)
hint = _warm_start_hint()
strategy.reset_to_warm_start(hint)
strategy.reset_to_warm_start(hint)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
assert backend.reset_call_count == 2
# ===========================================================================
# AC-5: INIT initially -> TRACKING after warm_start_max_frames frames.
def test_ac5_health_snapshot_init_then_tracking(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=3)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# AC-5 invariant: pre-frame snapshot is INIT.
assert strategy.health_snapshot().state == VioState.INIT
# Three successful frames (each "produced=True" -> tracked > threshold).
backend.script(
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
ScriptedOutput(produced=True),
)
for i in range(3):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.TRACKING
# ===========================================================================
# AC-6: DEGRADED on feature loss; VioOutput STILL emitted (not raised);
# covariance Frobenius norm strictly increases on the degraded frame.
def test_ac6_degraded_on_feature_loss_emits_vio_output(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# First frame: healthy (tracked >> degraded threshold).
healthy_payload = {
"tracked_features": 80,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.01,
}
# Second frame: feature loss below the degraded threshold (default 30).
degraded_payload = {
"tracked_features": 5,
"pose_covariance_6x6": np.eye(6, dtype=np.float64) * 0.5,
}
backend.script(
ScriptedOutput(produced=True, payload=healthy_payload),
ScriptedOutput(produced=True, payload=degraded_payload),
)
healthy_out = strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
degraded_out = strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
assert isinstance(degraded_out, VioOutput), "DEGRADED frame MUST emit output"
assert strategy.health_snapshot().state == VioState.DEGRADED
healthy_norm = np.linalg.norm(healthy_out.pose_covariance_6x6, ord="fro")
degraded_norm = np.linalg.norm(degraded_out.pose_covariance_6x6, ord="fro")
assert degraded_norm > healthy_norm, (
f"Frobenius norm must increase on DEGRADED frame "
f"(healthy={healthy_norm}, degraded={degraded_norm})"
)
# ===========================================================================
# AC-7: After lost_frame_threshold consecutive failures, raise VioFatalError;
# state == LOST.
def test_ac7_sustained_loss_raises_vio_fatal_error(fake_okvis2_binding, fdr_client) -> None:
config = _config(lost_frame_threshold=3, warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Three consecutive optimization failures.
backend.script(
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-1")),
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-2")),
ScriptedOutput(raise_with=FakeOkvisOptimizationException("loss-3")),
)
# First 2 are VioInitializingError (degraded path); third hits LOST.
with pytest.raises(VioInitializingError):
strategy.process_frame(_frame(idx=1), _imu_window(), _calibration())
with pytest.raises(VioInitializingError):
strategy.process_frame(
_frame(idx=2, ts_ns=1_100_000_000),
_imu_window(ts_ns_start=1_099_000_000),
_calibration(),
)
with pytest.raises(VioFatalError):
strategy.process_frame(
_frame(idx=3, ts_ns=1_200_000_000),
_imu_window(ts_ns_start=1_199_000_000),
_calibration(),
)
assert strategy.health_snapshot().state == VioState.LOST
# ===========================================================================
# AC-8: BUILD_OKVIS2=OFF lazy-import guarantee — complementary check.
# (Primary AC-8 coverage lives in test_protocol_conformance.py via the
# AZ-331 factory which gates BEFORE constructor.)
def test_ac8_strategy_module_not_imported_at_package_load(
monkeypatch,
) -> None:
"""Importing `c1_vio` itself MUST NOT load `c1_vio.okvis2`.
Risk-2 / I-5 guard — the factory respects the BUILD_OKVIS2 flag and
only triggers the import on demand. This complements the
test_ac5_build_vio_strategy_flag_off_no_import test in
test_protocol_conformance.py.
"""
import sys
sys.modules.pop("gps_denied_onboard.components.c1_vio.okvis2", None)
sys.modules.pop("gps_denied_onboard.components.c1_vio", None)
import importlib
importlib.import_module("gps_denied_onboard.components.c1_vio")
assert "gps_denied_onboard.components.c1_vio.okvis2" not in sys.modules
# ===========================================================================
# AC-9: tier2 — honest covariance Frobenius monotonically non-decreasing
# across a controlled-degradation window.
@pytest.mark.tier2
def test_ac9_honest_covariance_monotonic_during_degraded(fake_okvis2_binding, fdr_client) -> None:
"""Tier-2: 60 s controlled-degradation fixture; covariance MUST not
shrink during the DEGRADED window.
The fake binding here exercises the facade's enforcement contract —
real validation against OKVIS2's internal Hessian is the Jetson-side
follow-up that wires :class:`okvis::ThreadedKFVio` (skeleton today).
"""
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Healthy frame, then 5 DEGRADED frames with non-decreasing covariance.
base_cov = np.eye(6, dtype=np.float64) * 0.01
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
*[
ScriptedOutput(
produced=True,
payload={
"tracked_features": 10,
"pose_covariance_6x6": base_cov * (1.0 + i),
},
)
for i in range(5)
],
)
outputs = []
for i in range(6):
outputs.append(
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
)
import itertools
degraded_outputs = outputs[1:] # 5 DEGRADED frames
norms = [np.linalg.norm(o.pose_covariance_6x6, ord="fro") for o in degraded_outputs]
for prev, curr in itertools.pairwise(norms):
assert curr >= prev, (
f"covariance Frobenius norm must be monotonically non-decreasing "
f"during DEGRADED; got prev={prev}, curr={curr}"
)
# ===========================================================================
# AC-10: Exactly one vio.health record per state transition; no spam on
# steady-state.
def test_ac10_fdr_vio_health_emitted_per_transition(fake_okvis2_binding, fdr_client) -> None:
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
# Drain INIT-on-construct record (the constructor itself does NOT emit;
# the first transition is on the first frame). Document the invariant
# by asserting drain returns empty here.
pre_records = _drain(fdr_client)
assert pre_records == [], "construction must not emit vio.health"
# Sequence: INIT -> TRACKING -> DEGRADED -> back to TRACKING.
backend.script(
ScriptedOutput(produced=True, payload={"tracked_features": 80}),
ScriptedOutput(produced=True, payload={"tracked_features": 80}), # steady
ScriptedOutput(produced=True, payload={"tracked_features": 10}), # DEGRADED
ScriptedOutput(produced=True, payload={"tracked_features": 80}), # TRACKING
)
for i in range(4):
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
records = _drain(fdr_client)
assert all(r.kind == "vio.health" for r in records)
states = [r.payload["state"] for r in records]
# Expect: INIT -> TRACKING (frame 1), no record on frame 2 steady,
# TRACKING -> DEGRADED (frame 3), DEGRADED -> TRACKING (frame 4).
assert states == ["tracking", "degraded", "tracking"], (
f"unexpected transition sequence: {states}"
)
# ===========================================================================
# NFR-perf (tier2): p95 process_frame <= 80 ms on Tier-2 with real OKVIS2.
@pytest.mark.tier2
def test_nfr_perf_process_frame_p95_under_80ms(fake_okvis2_binding, fdr_client) -> None:
"""Tier-2: Real OKVIS2 binding + Derkachi-class fixture.
The fake binding here measures the Python facade overhead only,
which is the floor under which the real OKVIS2 latency must stay
within budget. On Jetson tier2 this test runs against the real
binding and validates C1-PT-01.
"""
import time
config = _config(warm_start_max_frames=1)
strategy = _build_strategy(fdr_client, config)
backend: FakeOkvis2Backend = strategy._backend # type: ignore[attr-defined]
n = 200
backend.script(*[ScriptedOutput(produced=True) for _ in range(n)])
durations_ms: list[float] = []
for i in range(n):
t0 = time.perf_counter()
strategy.process_frame(
_frame(idx=i + 1, ts_ns=1_000_000_000 + i * 1_000_000),
_imu_window(ts_ns_start=999_000_000 + i * 100_000_000),
_calibration(),
)
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p95 = durations_ms[int(0.95 * len(durations_ms))]
assert p95 <= 80.0, f"process_frame p95={p95:.3f} ms exceeds C1-PT-01 budget (80 ms)"
# ===========================================================================
# Construction guards.
def test_construct_with_wrong_strategy_label_raises(fake_okvis2_binding, fdr_client) -> None:
"""Constructing directly with a non-okvis2 strategy is a developer bug."""
bad_config = Config.with_blocks(c1_vio=C1VioConfig(strategy="klt_ransac"))
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
with pytest.raises(VioFatalError):
Okvis2Strategy(bad_config, fdr_client=fdr_client)
def test_build_via_factory_returns_okvis2_strategy(
fake_okvis2_binding, fdr_client, monkeypatch
) -> None:
"""End-to-end factory wiring smoke — exercises the BUILD flag gate +
lazy import path the conformance tests don't touch for the real
`Okvis2Strategy` class.
"""
monkeypatch.setenv("BUILD_OKVIS2", "ON")
from gps_denied_onboard.components.c1_vio.okvis2 import Okvis2Strategy
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
instance = build_vio_strategy(_config(), fdr_client=fdr_client)
assert isinstance(instance, Okvis2Strategy)
assert instance.current_strategy_label() == "okvis2"