Files
gps-denied-onboard/tests/unit/c1_vio/test_protocol_conformance.py
Oleksandr Bezdieniezhnykh ceb24b5a62 [AZ-334] C1 KLT/RANSAC strategy — engine-rule simple-baseline VIO
Implement KltRansacStrategy, the ADR-002 engine-rule mandatory
simple-baseline VioStrategy for E-C1. Pure-Python facade over
OpenCV's cv2.goodFeaturesToTrack / calcOpticalFlowPyrLK /
findEssentialMat / recoverPose pipeline — no C++/pybind11 binding
by design so a Tier-0 workstation runs the strategy with
`pip install opencv-python` and the BUILD_KLT_RANSAC=ON gate alone.
Constructor + state machine + FDR transition spine mirror
Okvis2Strategy + VinsMonoStrategy so the AZ-331 factory + IT-12
comparative harness treat all three as drop-in substitutable; the
duplication is the consolidation target now formally in scope for
the next cumulative review (batches 52-54).

AC coverage: AC-1..AC-11 + NFR-perf mapped to passing tests
(25 tests, 23 pass + 2 tier-2 skipped on dev/CI runners; all 25
pass under GPS_DENIED_TIER=2). Honest-covariance invariant (AC-9)
implemented as residual-scatter / (N_inliers - 5) with an inlier-
count penalty — no client-side floor or smoother; cov Frobenius
grows monotonically across DEGRADED. Camera-agnostic source
(AC-11) enforced by CI-grep gate that excludes docstring text.

Test-Run Cadence: focused suite tests/unit/c1_vio/ green (95 passed,
6 skipped); config-loader + compose-root suites green; full-suite
gate deferred to Step 16 per implement skill.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 02:40:01 +03:00

476 lines
16 KiB
Python

"""AZ-331 — C1 VioStrategy Protocol + DTO + error + factory conformance.
Covers all 9 ACs of AZ-331 plus NFR-perf-factory and
NFR-reliability-error-family. The factory ACs (AC-4 / AC-5) substitute
fake strategy modules at ``sys.modules`` boundaries so the test never
touches OKVIS2 / VINS-Mono / OpenCV native libraries.
"""
from __future__ import annotations
import dataclasses
import re
import sys
import time
import types
from pathlib import Path
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.nav import (
FeatureQuality,
ImuBias,
VioHealth,
VioOutput,
VioState,
WarmStartPose,
)
from gps_denied_onboard.components.c1_vio import (
C1VioConfig,
VioDegradedError,
VioError,
VioFatalError,
VioInitializingError,
VioStrategy,
)
from gps_denied_onboard.components.c1_vio.config import KNOWN_STRATEGIES
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
_CONTRACT_PATH = (
Path(__file__).resolve().parents[3]
/ "_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md"
)
_STRATEGY_MODULES: dict[str, tuple[str, str, str]] = {
"okvis2": (
"gps_denied_onboard.components.c1_vio.okvis2",
"Okvis2Strategy",
"BUILD_OKVIS2",
),
"vins_mono": (
"gps_denied_onboard.components.c1_vio.vins_mono",
"VinsMonoStrategy",
"BUILD_VINS_MONO",
),
"klt_ransac": (
"gps_denied_onboard.components.c1_vio.klt_ransac",
"KltRansacStrategy",
"BUILD_KLT_RANSAC",
),
}
# ----------------------------------------------------------------------
# Fakes that structurally satisfy the VioStrategy Protocol.
class _FullVioStrategy:
def __init__(self, config: Config, *, fdr_client) -> None:
self.config = config
self.fdr_client = fdr_client
self._label = config.components["c1_vio"].strategy
def process_frame(self, frame, imu, calibration):
raise NotImplementedError
def reset_to_warm_start(self, hint):
return None
def health_snapshot(self):
return VioHealth(state=VioState.INIT, consecutive_lost=0, bias_norm=0.0)
def current_strategy_label(self):
return self._label
class _PartialVioStrategy:
def process_frame(self, frame, imu, calibration):
raise NotImplementedError
def reset_to_warm_start(self, hint):
return None
def _config_with_strategy(strategy: str) -> Config:
return Config.with_blocks(c1_vio=C1VioConfig(strategy=strategy))
def _install_fake_strategy(strategy_label: str) -> type:
module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label]
class _FakeStrategy(_FullVioStrategy):
pass
_FakeStrategy.__name__ = class_name
module = types.ModuleType(module_name)
setattr(module, class_name, _FakeStrategy)
sys.modules[module_name] = module
return _FakeStrategy
@pytest.fixture
def strategy_module_cleanup():
"""Pop every fake strategy module before/after each factory test."""
for module_name, _, _ in _STRATEGY_MODULES.values():
sys.modules.pop(module_name, None)
yield
for module_name, _, _ in _STRATEGY_MODULES.values():
sys.modules.pop(module_name, None)
def _zero_bias() -> ImuBias:
return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
def _neutral_feature_quality() -> FeatureQuality:
return FeatureQuality(tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0)
def _make_vio_output(frame_id: str = "frame-0001") -> VioOutput:
return VioOutput(
frame_id=frame_id,
relative_pose_T=gtsam.Pose3(np.eye(4)),
pose_covariance_6x6=np.eye(6) * 0.01,
imu_bias=_zero_bias(),
feature_quality=_neutral_feature_quality(),
emitted_at_ns=1_000_000_000,
)
# ----------------------------------------------------------------------
# AC-1: Protocol is conformance-checkable.
def test_ac1_vio_strategy_conformance_full() -> None:
instance = _FullVioStrategy(_config_with_strategy("klt_ransac"), fdr_client=None)
assert isinstance(instance, VioStrategy)
def test_ac1_vio_strategy_conformance_partial_missing_methods() -> None:
assert not isinstance(_PartialVioStrategy(), VioStrategy)
# ----------------------------------------------------------------------
# AC-2: frozen DTOs reject mutation.
@pytest.mark.parametrize(
"dto, field_name, new_value",
[
(_make_vio_output(), "frame_id", "renamed"),
(
VioHealth(state=VioState.TRACKING, consecutive_lost=0, bias_norm=0.0),
"state",
VioState.LOST,
),
(
WarmStartPose(
body_T_world=gtsam.Pose3(np.eye(4)),
velocity_b=(0.0, 0.0, 0.0),
bias=_zero_bias(),
captured_at_ns=1_000_000_000,
),
"captured_at_ns",
0,
),
(_neutral_feature_quality(), "tracked", 0),
],
)
def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None:
original_value = getattr(dto, field_name)
with pytest.raises(dataclasses.FrozenInstanceError):
setattr(dto, field_name, new_value)
assert getattr(dto, field_name) == original_value
# ----------------------------------------------------------------------
# AC-3: error hierarchy catchable as a single family.
@pytest.mark.parametrize(
"exc_factory",
[VioInitializingError, VioDegradedError, VioFatalError],
)
def test_ac3_all_vio_errors_caught_as_family(exc_factory) -> None:
with pytest.raises(VioError):
raise exc_factory("boom")
def test_ac3_unrelated_exception_not_caught_as_family() -> None:
with pytest.raises(ValueError):
try:
raise ValueError("not us")
except VioError:
pytest.fail("ValueError must not be caught as VioError")
def test_ac3_strategy_not_available_outside_family() -> None:
with pytest.raises(StrategyNotAvailableError):
try:
raise StrategyNotAvailableError("composition-time")
except VioError:
pytest.fail(
"StrategyNotAvailableError is a composition-root error "
"and MUST NOT be in the c1 VioError family"
)
# ----------------------------------------------------------------------
# AC-4 + AC-5: factory honours config + BUILD flag gate.
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
def test_ac4_build_vio_strategy_returns_protocol_impl(
monkeypatch, strategy_module_cleanup, strategy
) -> None:
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
fake_cls = _install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
instance = build_vio_strategy(config, fdr_client=object())
assert isinstance(instance, fake_cls)
assert isinstance(instance, VioStrategy)
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
def test_ac5_build_vio_strategy_flag_off_no_import(
monkeypatch, strategy_module_cleanup, strategy
) -> None:
module_name, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.delenv(flag, raising=False)
config = _config_with_strategy(strategy)
with pytest.raises(StrategyNotAvailableError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert strategy in str(exc_info.value)
assert flag in str(exc_info.value)
assert module_name not in sys.modules
# Which strategies still have NO concrete Python module on disk?
# All three strategies (AZ-332 / AZ-333 / AZ-334) have landed; tuple
# remains as a tombstone for git-blame archaeology of the build-time
# gating evolution. Once removed, the parametrisation below collapses
# to two branches (native-binding vs pure-Python).
_STRATEGIES_WITHOUT_PY_MODULE: tuple[str, ...] = ()
# Strategies whose concrete implementation has NO native binding —
# the AZ-334 KLT/RANSAC simple-baseline is pure-Python over OpenCV.
# When ``BUILD_KLT_RANSAC=ON`` and the module is on disk, the
# constructor succeeds end-to-end (no native ``.so`` to be missing).
# The AC-5 spirit (meaningful error before first frame) is still
# satisfied: the only way construction fails for klt_ransac is the
# ``BUILD_*=OFF`` path which is already covered by
# :func:`test_ac5_build_vio_strategy_flag_off_no_import`.
_STRATEGIES_WITHOUT_NATIVE_BINDING: tuple[str, ...] = ("klt_ransac",)
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
def test_ac5_build_vio_strategy_flag_on_but_module_missing(
monkeypatch, strategy_module_cleanup, strategy
) -> None:
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
config = _config_with_strategy(strategy)
if strategy in _STRATEGIES_WITHOUT_PY_MODULE:
# Module not yet implemented — factory's __import__ raises
# ModuleNotFoundError, rewrapped into StrategyNotAvailableError.
with pytest.raises(StrategyNotAvailableError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert strategy in str(exc_info.value)
elif strategy in _STRATEGIES_WITHOUT_NATIVE_BINDING:
# Module IS implemented AND has no native binding (AZ-334
# KLT/RANSAC). Constructor succeeds without raising; the
# only failure mode the AC-5 test guards against does not
# apply to a pure-Python strategy. We assert the construction
# produces a real :class:`VioStrategy` instance to keep the
# test branch non-trivial.
instance = build_vio_strategy(config, fdr_client=object())
assert isinstance(instance, VioStrategy)
assert instance.current_strategy_label() == strategy
else:
# Module IS implemented (AZ-332 / AZ-333). Factory import
# succeeds, then the strategy constructor fails on missing
# native binding — which the strategy MUST surface as
# VioFatalError BEFORE any frame is processed (the AC-5
# spirit: no silent fall-through).
with pytest.raises(VioFatalError) as exc_info:
build_vio_strategy(config, fdr_client=object())
assert "native binding" in str(exc_info.value)
# ----------------------------------------------------------------------
# AC-6: unknown strategy label rejected at config load.
@pytest.mark.parametrize(
"bad_label",
["openvslam", "orbslam3", "OKVIS2", "okvis", ""],
)
def test_ac6_unknown_strategy_rejected_at_config_load(bad_label: str) -> None:
with pytest.raises(ConfigError) as exc_info:
C1VioConfig(strategy=bad_label)
msg = str(exc_info.value)
for valid in KNOWN_STRATEGIES:
assert valid in msg
# ----------------------------------------------------------------------
# AC-7: current_strategy_label() matches config exactly.
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
def test_ac7_current_strategy_label_matches_config(
monkeypatch, strategy_module_cleanup, strategy
) -> None:
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
_install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
instance = build_vio_strategy(config, fdr_client=object())
assert instance.current_strategy_label() == strategy
assert instance.current_strategy_label() == config.components["c1_vio"].strategy
# ----------------------------------------------------------------------
# AC-8: contract file matches Protocol shape.
_METHOD_TABLE_RE = re.compile(r"^\|\s*`(?P<name>[a-z_][a-z0-9_]*)`\s*\|", re.MULTILINE)
def _methods_from_contract() -> set[str]:
text = _CONTRACT_PATH.read_text(encoding="utf-8")
surface_start = text.index("### Protocol surface")
next_section = text.find("\n### ", surface_start + len("### Protocol surface"))
section = text[surface_start:next_section] if next_section != -1 else text[surface_start:]
return {m.group("name") for m in _METHOD_TABLE_RE.finditer(section)}
def _protocol_methods(proto: type) -> set[str]:
return {
name for name in dir(proto) if not name.startswith("_") and callable(getattr(proto, name))
}
def test_ac8_contract_methods_match_protocol() -> None:
contract_methods = _methods_from_contract()
protocol_methods = _protocol_methods(VioStrategy)
missing_in_protocol = contract_methods - protocol_methods
missing_in_contract = protocol_methods - contract_methods
assert not missing_in_protocol, (
"Methods declared in vio_strategy_protocol.md Shape section but "
f"missing from the Protocol: {sorted(missing_in_protocol)}"
)
assert not missing_in_contract, (
"Methods present on the Protocol but missing from the contract "
f"Shape section: {sorted(missing_in_contract)}"
)
def test_ac8_contract_lists_all_three_error_subtypes() -> None:
text = _CONTRACT_PATH.read_text(encoding="utf-8")
for name in {"VioInitializingError", "VioDegradedError", "VioFatalError"}:
assert name in text, f"Contract file is missing the documented error subtype {name!r}"
# ----------------------------------------------------------------------
# AC-9: VioOutput.frame_id echo invariant is typed.
def test_ac9_vio_output_frame_id_is_typed_str() -> None:
"""``VioOutput.frame_id`` annotation is ``str`` per AZ-331 AC-9.
With ``from __future__ import annotations`` PEP-563 stringifies
every annotation at module load, so ``__annotations__`` returns
the literal ``'str'``. Compare against the string to avoid the
full ``get_type_hints`` forward-ref resolution path (which would
try to resolve neighbouring TYPE_CHECKING-only names like
:class:`SE3`).
"""
annotation = VioOutput.__annotations__["frame_id"]
assert annotation == "str", f"frame_id annotation should be 'str'; got {annotation!r}"
def test_ac9_vio_output_docstring_documents_echo_invariant() -> None:
docstring = VioOutput.__doc__ or ""
assert "echo" in docstring.lower(), (
"VioOutput docstring must document the frame_id echo invariant "
"(MUST equal NavCameraFrame.frame_id from the input frame)"
)
assert "frame_id" in docstring.lower()
# ----------------------------------------------------------------------
# NFRs.
@pytest.mark.parametrize(
"exc_type",
[VioInitializingError, VioDegradedError, VioFatalError],
)
def test_nfr_reliability_all_vio_errors_subclass_family(exc_type) -> None:
assert issubclass(exc_type, VioError)
def test_nfr_reliability_strategy_not_available_not_in_family() -> None:
assert not issubclass(StrategyNotAvailableError, VioError)
def test_nfr_perf_factory_under_200ms_p99(monkeypatch, strategy_module_cleanup) -> None:
"""Factory p99 ≤ 200 ms across 1000 calls (NFR-perf-factory)."""
strategy = "klt_ransac"
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
_install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
durations_ms: list[float] = []
for _ in range(1000):
t0 = time.perf_counter()
build_vio_strategy(config, fdr_client=object())
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p99 = durations_ms[int(0.99 * len(durations_ms))]
assert p99 <= 200.0, f"build_vio_strategy() p99={p99:.3f} ms exceeds 200 ms NFR"
# ----------------------------------------------------------------------
# Surface coverage.
def test_vio_state_enum_surface() -> None:
assert {v.value for v in VioState} == {"init", "tracking", "degraded", "lost"}
def test_c1_config_lost_frame_threshold_validation() -> None:
with pytest.raises(ConfigError):
C1VioConfig(lost_frame_threshold=0)
with pytest.raises(ConfigError):
C1VioConfig(lost_frame_threshold=-1)
def test_c1_config_warm_start_max_frames_validation() -> None:
with pytest.raises(ConfigError):
C1VioConfig(warm_start_max_frames=0)
def test_feature_quality_dto_constructs_and_freezes() -> None:
fq = _neutral_feature_quality()
with pytest.raises(dataclasses.FrozenInstanceError):
fq.mre_px = 99.0 # type: ignore[misc]
def test_warm_start_pose_constructs_with_zero_bias() -> None:
hint = WarmStartPose(
body_T_world=gtsam.Pose3(np.eye(4)),
velocity_b=(0.0, 0.0, 0.0),
bias=_zero_bias(),
captured_at_ns=1_000_000_000,
)
assert hint.captured_at_ns == 1_000_000_000
assert hint.bias.accel_bias == (0.0, 0.0, 0.0)