"""AZ-334 — :class:`KltRansacStrategy` acceptance criteria coverage. Covers AC-1 through AC-11 (with AC-9 + NFR-perf tagged ``@pytest.mark.tier2``; the AZ-334 task spec exempts those bounds from the standard dev/Linux-CI matrix). Unlike the OKVIS2 / VINS-Mono test modules, this file does NOT use a fake binding fixture — KLT/RANSAC is pure-Python over OpenCV's Python bindings, so we exercise the real :func:`cv2.calcOpticalFlowPyrLK` / :func:`cv2.findEssentialMat` / :func:`cv2.recoverPose` path against controlled synthetic correspondences. For the error-injection ACs (AC-4) and the failure-counter ACs (AC-7) we monkeypatch specific cv2 symbols inside the strategy module's namespace. Mirrors the AZ-333 ``test_vins_mono_strategy.py`` layout deliberately: the AZ-331 factory produces all three via the same ``(config, *, fdr_client)`` shape and the IT-12 comparative-study harness expects them to behave identically through the Python facade. """ from __future__ import annotations import re from datetime import datetime, timezone from pathlib import Path from typing import Any import cv2 import gtsam import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import ( ImuBias, ImuSample, ImuWindow, NavCameraFrame, VioOutput, VioState, WarmStartPose, ) from gps_denied_onboard.components.c1_vio import ( C1VioConfig, KltRansacConfig, VioError, VioFatalError, VioInitializingError, ) from gps_denied_onboard.components.c1_vio import klt_ransac as klt_ransac_module from gps_denied_onboard.components.c1_vio.klt_ransac import KltRansacStrategy from gps_denied_onboard.config.schema import Config, RuntimeConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import FdrRecord # ---------------------------------------------------------------------- # Helpers — keep boilerplate out of the AC test bodies. _KLT_RANSAC_SOURCE_PATH = ( Path(__file__).resolve().parents[3] / "src/gps_denied_onboard/components/c1_vio/klt_ransac.py" ) def _zero_bias() -> ImuBias: return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) def _make_calibration( *, camera_id: str = "test-cam", focal_length: float = 500.0, cx: float = 320.0, cy: float = 240.0, ) -> CameraCalibration: return CameraCalibration( camera_id=camera_id, intrinsics_3x3=np.array( [[focal_length, 0.0, cx], [0.0, focal_length, cy], [0.0, 0.0, 1.0]], dtype=np.float64, ), distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="unit-test-static", metadata={}, ) def _alternate_calibration() -> CameraCalibration: """Second calibration for the AC-11 camera-agnostic test.""" return _make_calibration(camera_id="alt-cam", focal_length=720.0, cx=400.0, cy=300.0) def _frame(idx: int = 1, *, image: np.ndarray | None = None) -> NavCameraFrame: if image is None: image = _synthetic_frame_image(seed=idx) return NavCameraFrame( frame_id=idx, timestamp=datetime.fromtimestamp(idx * 0.1, tz=timezone.utc), image=image, camera_calibration_id="test-cam", ) def _synthetic_frame_image(*, seed: int, size: int = 240, shift_x: int = 0) -> np.ndarray: """Build a single-channel ``uint8`` image with a deterministic blob pattern. Each frame is a 240x240 white canvas with N random corner-friendly bright pixels offset by ``shift_x`` columns from frame to frame. OpenCV's ``cv2.goodFeaturesToTrack`` finds these as Harris corners so the KLT track succeeds on consecutive frames. """ rng = np.random.default_rng(seed) img = np.zeros((size, size), dtype=np.uint8) # Lay down a regular grid of 5x5 white blocks shifted by ``shift_x``. for row in range(20, size - 20, 30): for col in range(20, size - 20, 30): cc = (col + shift_x) % (size - 5) img[row : row + 5, cc : cc + 5] = 255 # Add a small amount of structured texture so feature detection has # more than one corner to choose from. noise = rng.integers(0, 80, size=(size, size), dtype=np.int16) img = np.clip(img.astype(np.int16) + noise, 0, 255).astype(np.uint8) return img def _imu_window(*, ts_ns_start: int = 999_000_000, n: int = 3) -> ImuWindow: """Build an IMU window with strictly monotonic timestamps. The AZ-276 preintegrator enforces strict monotonicity across successive ``integrate_window`` calls; callers spacing consecutive windows MUST advance ``ts_ns_start`` by more than ``n * 5ms`` (the inter-sample gap below) — typically by ``100ms`` increments per frame, matching the VINS-Mono test pattern. """ samples = tuple( ImuSample( ts_ns=ts_ns_start + i * 5_000_000, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0), ) for i in range(n) ) return ImuWindow( samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns, ) def _imu_for_frame(idx: int) -> ImuWindow: """Convenience: monotonic window per frame index. Frame N's samples live in ``[N*100ms, N*100ms + 10ms]`` so the preintegrator's strict-monotonic guard sees a clean stream when frames are walked in order 1, 2, 3, .... """ return _imu_window(ts_ns_start=1_000_000_000 + idx * 100_000_000) def _warm_start_hint(*, accel_bias: tuple[float, float, float] = (0.05, 0.0, 0.0)) -> WarmStartPose: return WarmStartPose( body_T_world=gtsam.Pose3(np.eye(4)), velocity_b=(0.5, 0.0, 0.0), bias=ImuBias(accel_bias=accel_bias, gyro_bias=(0.0, 0.0, 0.0)), captured_at_ns=1_000_000_000, ) def _config(**overrides: Any) -> Config: klt_cfg = KltRansacConfig(**overrides) if overrides else KltRansacConfig() c1 = C1VioConfig(strategy="klt_ransac", klt_ransac=klt_cfg, lost_frame_threshold=3) return Config.with_blocks(c1_vio=c1, runtime=RuntimeConfig()) def _fdr_client_capturing() -> tuple[FdrClient, list[FdrRecord]]: """Build an FdrClient whose recorded events we can inspect. Uses the production client with an in-memory drain so the strategy sees a real producer surface. Tests inspect the drained list. """ captured: list[FdrRecord] = [] client = FdrClient(producer_id="test.klt_ransac", capacity=64, _emit_diag_log=False) return client, captured def _drain(client: FdrClient, sink: list[FdrRecord]) -> list[FdrRecord]: sink.extend(client.drain(max_records=256)) return sink def _new_strategy( *, config: Config | None = None, fdr_client: FdrClient | None = None, ) -> tuple[KltRansacStrategy, list[FdrRecord]]: cfg = config if config is not None else _config() if fdr_client is None: fdr_client, captured = _fdr_client_capturing() else: captured = [] return KltRansacStrategy(cfg, fdr_client=fdr_client), captured def _patch_pose_recovery( monkeypatch: pytest.MonkeyPatch, *, inlier_count: int = 40 ) -> None: """Force the cv2 + RansacFilter geometry stack to a deterministic success path. The unit suite tests the FACADE behaviour; real geometry validation lives in the C1-IT-12 Jetson Tier-2 fixture. """ from gps_denied_onboard.helpers.ransac_filter import RansacResult rng = np.random.default_rng(seed=271828) fake_inliers = np.column_stack([ rng.uniform(50.0, 250.0, size=inlier_count), rng.uniform(50.0, 250.0, size=inlier_count), rng.uniform(50.0, 250.0, size=inlier_count), rng.uniform(50.0, 250.0, size=inlier_count), ]) mask = np.ones((inlier_count, 1), dtype=np.uint8) def _fake_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: return RansacResult( inlier_correspondences=fake_inliers, inlier_count=inlier_count, outlier_count=0, median_residual_px=0.5, ) def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, np.ndarray]: return np.eye(3, dtype=np.float64), mask def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: R = np.eye(3, dtype=np.float64) t = np.array([[0.01], [0.0], [0.0]], dtype=np.float64) return inlier_count, R, t, mask monkeypatch.setattr( klt_ransac_module.RansacFilter, "filter_correspondences", staticmethod(_fake_filter), ) monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) # ---------------------------------------------------------------------- # AC-1: current_strategy_label() returns "klt_ransac". def test_ac1_current_strategy_label_returns_klt_ransac() -> None: # Arrange strategy, _captured = _new_strategy() # Act label = strategy.current_strategy_label() # Assert assert label == "klt_ransac" def test_ac1_constructor_rejects_mismatched_strategy_label() -> None: # Arrange config = Config.with_blocks( c1_vio=C1VioConfig(strategy="okvis2"), runtime=RuntimeConfig() ) fdr_client = FdrClient(producer_id="test.klt_ransac", capacity=4) # Act / Assert with pytest.raises(VioFatalError) as exc_info: KltRansacStrategy(config, fdr_client=fdr_client) assert "klt_ransac" in str(exc_info.value) assert "okvis2" in str(exc_info.value) # ---------------------------------------------------------------------- # AC-2: First frame emits VioOutput with state == INIT and identity pose. def test_ac2_first_frame_emits_init_state_with_identity_pose() -> None: # Arrange strategy, captured = _new_strategy() calibration = _make_calibration() # Act output = strategy.process_frame(_frame(idx=1), _imu_for_frame(1), calibration) _drain(strategy._fdr, captured) # Assert assert isinstance(output, VioOutput) pose_matrix = np.asarray(output.relative_pose_T.matrix()) assert np.allclose(pose_matrix, np.eye(4), atol=1e-9) assert strategy.health_snapshot().state == VioState.INIT assert output.frame_id == "1" assert output.pose_covariance_6x6.shape == (6, 6) # AC-2 spec: "conservative covariance" — strictly larger than # zero, symmetric, positive-definite. We assert SPD by eigenvalue. eigenvalues = np.linalg.eigvalsh(output.pose_covariance_6x6) assert np.all(eigenvalues > 0.0) # ---------------------------------------------------------------------- # AC-3: Steady-state frame emits non-identity pose + SPD covariance. def test_ac3_steady_state_frame_emits_pose_and_spd_covariance( monkeypatch: pytest.MonkeyPatch, ) -> None: """AC-3: facade returns a well-formed :class:`VioOutput` on the steady-state success path. cv2 internals are patched so the test is deterministic on dev/CI runners (real geometry is exercised by the C1-IT-12 Jetson Tier-2 fixture, not this unit). """ # Arrange strategy, _captured = _new_strategy( config=_config(min_features_for_pose=5, max_corners=200, ransac_inlier_ratio=0.5) ) calibration = _make_calibration() _patch_pose_recovery(monkeypatch, inlier_count=40) # Act — drive two consecutive frames; the first seeds features and # emits an INIT-state VioOutput; the second exercises the steady- # state success path. out1 = strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=42, shift_x=0)), _imu_for_frame(1), calibration, ) out2 = strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=42, shift_x=5)), _imu_for_frame(2), calibration, ) # Assert assert out1.frame_id == "1" assert out2.frame_id == "2" assert out2.pose_covariance_6x6.shape == (6, 6) eigenvalues = np.linalg.eigvalsh(out2.pose_covariance_6x6) assert np.all(eigenvalues > 0.0) assert np.allclose(out2.pose_covariance_6x6, out2.pose_covariance_6x6.T, atol=1e-12) assert out2.feature_quality.mre_px >= 0.0 assert out2.feature_quality.tracked == 40 # ---------------------------------------------------------------------- # AC-4: cv2.error rewrapped into VioFatalError with __cause__ chain. def test_ac4_cv2_error_in_find_essential_mat_rewrapped_to_vio_fatal_error( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange strategy, _captured = _new_strategy( config=_config(min_features_for_pose=5, ransac_inlier_ratio=0.5) ) calibration = _make_calibration() strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=11)), _imu_for_frame(1), calibration, ) def _raise(*_args: Any, **_kwargs: Any) -> tuple[Any, Any]: raise cv2.error("synthetic findEssentialMat failure") monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _raise) # Act / Assert with pytest.raises(VioFatalError) as exc_info: strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=11, shift_x=4)), _imu_for_frame(2), calibration, ) assert isinstance(exc_info.value.__cause__, cv2.error) def test_ac4_cv2_error_in_recover_pose_rewrapped_to_vio_fatal_error( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange strategy, _captured = _new_strategy( config=_config(min_features_for_pose=5, ransac_inlier_ratio=0.5) ) calibration = _make_calibration() strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=12)), _imu_for_frame(1), calibration, ) def _raise(*_args: Any, **_kwargs: Any) -> tuple[Any, Any, Any, Any]: raise cv2.error("synthetic recoverPose failure") monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _raise) # Act / Assert with pytest.raises(VioFatalError) as exc_info: strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=12, shift_x=4)), _imu_for_frame(2), calibration, ) assert isinstance(exc_info.value.__cause__, cv2.error) # ---------------------------------------------------------------------- # AC-5: reset_to_warm_start clears feature buffer + re-seeds bias. def test_ac5_reset_to_warm_start_clears_feature_buffer_and_seeds_bias( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange strategy, _captured = _new_strategy() calibration = _make_calibration() # One frame is enough — the first-frame path calls # ``_seed_features`` which populates the prior-feature buffer. strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=21)), _imu_for_frame(1), calibration, ) assert strategy._prev_features is not None assert strategy._prev_features.size > 0 seen_calls: list[str] = [] real_good = klt_ransac_module.cv2.goodFeaturesToTrack real_klt = klt_ransac_module.cv2.calcOpticalFlowPyrLK def _spy_good(*args: Any, **kwargs: Any) -> Any: seen_calls.append("goodFeaturesToTrack") return real_good(*args, **kwargs) def _spy_klt(*args: Any, **kwargs: Any) -> Any: seen_calls.append("calcOpticalFlowPyrLK") return real_klt(*args, **kwargs) monkeypatch.setattr(klt_ransac_module.cv2, "goodFeaturesToTrack", _spy_good) monkeypatch.setattr(klt_ransac_module.cv2, "calcOpticalFlowPyrLK", _spy_klt) # Act hint = _warm_start_hint(accel_bias=(0.07, 0.0, 0.0)) strategy.reset_to_warm_start(hint) # Reset clears the preintegrator's monotonic baseline; jump forward # to frame index 10 so the timestamps stay deterministic and well # past the prior frame-2 window. output = strategy.process_frame( _frame(idx=10, image=_synthetic_frame_image(seed=99)), _imu_for_frame(10), calibration, ) # Assert — first OpenCV call AFTER reset is goodFeaturesToTrack # (NOT calcOpticalFlowPyrLK). Buffer was cleared by reset. assert seen_calls[0] == "goodFeaturesToTrack", seen_calls assert "calcOpticalFlowPyrLK" not in seen_calls # imu_bias reflects the hint assert output.imu_bias == hint.bias # State machine reset to INIT assert strategy.health_snapshot().state == VioState.INIT def test_ac5_reset_to_warm_start_idempotent_across_consecutive_calls() -> None: # Arrange strategy, _captured = _new_strategy() hint = _warm_start_hint() # Act / Assert — second consecutive call must not raise strategy.reset_to_warm_start(hint) strategy.reset_to_warm_start(hint) def test_ac5_reset_to_warm_start_rejects_non_pose3_hint() -> None: # Arrange strategy, _captured = _new_strategy() class _NotAPose3: pass bad_hint = WarmStartPose( body_T_world=_NotAPose3(), # type: ignore[arg-type] velocity_b=(0.0, 0.0, 0.0), bias=_zero_bias(), captured_at_ns=1_000_000_000, ) # Act / Assert with pytest.raises(VioFatalError): strategy.reset_to_warm_start(bad_hint) # ---------------------------------------------------------------------- # AC-6: Inlier loss -> DEGRADED + monotonic covariance + VioOutput emitted. def test_ac6_low_inlier_count_emits_degraded_with_monotonic_covariance( monkeypatch: pytest.MonkeyPatch, ) -> None: """AC-6: inlier count below `min_features_for_pose` (but >=5) → state == DEGRADED + covariance Frobenius strictly greater than prior.""" # Arrange — drive the strategy out of INIT into TRACKING by setting # warm_start_max_frames=1 (so frame 2 already classifies normally). config = Config.with_blocks( c1_vio=C1VioConfig( strategy="klt_ransac", klt_ransac=KltRansacConfig( min_features_for_pose=30, ransac_inlier_ratio=0.5, max_corners=120, ), warm_start_max_frames=1, lost_frame_threshold=10, ), runtime=RuntimeConfig(), ) strategy, captured = _new_strategy(config=config) calibration = _make_calibration() # Frame 1 — INIT path; seeds features. strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=31)), _imu_for_frame(1), calibration, ) # Frame 2 — first successful pose recovery (TRACKING). out_tracking = strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=31, shift_x=3)), _imu_for_frame(2), calibration, ) cov_norm_tracking = float(np.linalg.norm(out_tracking.pose_covariance_6x6, ord="fro")) assert strategy.health_snapshot().state in (VioState.TRACKING, VioState.DEGRADED) # Frame 3 — inject a RansacFilter result with low inlier count # (below min_features_for_pose) so DEGRADED is reported. Also # monkeypatch findEssentialMat / recoverPose so the test is # independent of OpenCV's behaviour on synthetic inputs (Risk-3 # mitigation; the AC-6 assertion is about the FACADE'S state # classification, not about OpenCV's geometry solver). from gps_denied_onboard.helpers.ransac_filter import RansacResult rng = np.random.default_rng(seed=1234) fake_inliers = np.column_stack([ rng.uniform(50.0, 250.0, size=10), rng.uniform(50.0, 250.0, size=10), rng.uniform(50.0, 250.0, size=10), rng.uniform(50.0, 250.0, size=10), ]) def _fake_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: return RansacResult( inlier_correspondences=fake_inliers, inlier_count=10, outlier_count=0, median_residual_px=0.5, ) def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, np.ndarray]: return np.eye(3, dtype=np.float64), np.ones((10, 1), dtype=np.uint8) def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: R = np.eye(3, dtype=np.float64) t = np.array([[0.01], [0.0], [0.0]], dtype=np.float64) return 10, R, t, np.ones((10, 1), dtype=np.uint8) monkeypatch.setattr( klt_ransac_module.RansacFilter, "filter_correspondences", staticmethod(_fake_filter) ) monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) # Act out_degraded = strategy.process_frame( _frame(idx=3, image=_synthetic_frame_image(seed=31, shift_x=6)), _imu_for_frame(3), calibration, ) cov_norm_degraded = float(np.linalg.norm(out_degraded.pose_covariance_6x6, ord="fro")) _drain(strategy._fdr, captured) # Assert — DEGRADED state + cov Frobenius strictly greater than prior assert isinstance(out_degraded, VioOutput) assert strategy.health_snapshot().state == VioState.DEGRADED assert cov_norm_degraded > cov_norm_tracking, ( f"AC-6: covariance Frobenius must strictly grow on inlier loss " f"(was {cov_norm_tracking:.6f}, now {cov_norm_degraded:.6f})" ) # ---------------------------------------------------------------------- # AC-7: Sustained pose-recovery failure raises VioFatalError. def test_ac7_sustained_pose_recovery_failure_raises_vio_fatal_error( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange config = Config.with_blocks( c1_vio=C1VioConfig( strategy="klt_ransac", klt_ransac=KltRansacConfig(min_features_for_pose=10, ransac_inlier_ratio=0.5), warm_start_max_frames=1, lost_frame_threshold=3, ), runtime=RuntimeConfig(), ) strategy, _captured = _new_strategy(config=config) calibration = _make_calibration() # Seed two TRACKING frames first. strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=51)), _imu_for_frame(1), calibration, ) strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=51, shift_x=3)), _imu_for_frame(2), calibration, ) # Force RansacFilter to always return zero inliers — pose recovery # path takes the failed branch every time. from gps_denied_onboard.helpers.ransac_filter import RansacResult def _no_inliers(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: return RansacResult( inlier_correspondences=np.empty((0, 4), dtype=np.float64), inlier_count=0, outlier_count=int(_corr.shape[0]), median_residual_px=float("nan"), ) monkeypatch.setattr( klt_ransac_module.RansacFilter, "filter_correspondences", staticmethod(_no_inliers) ) # Act — feed 3 consecutive failed-pose frames (== lost_frame_threshold) raised: list[type] = [] for idx in range(3, 6): try: strategy.process_frame( _frame(idx=idx, image=_synthetic_frame_image(seed=51, shift_x=idx * 2)), _imu_for_frame(idx), calibration, ) except (VioInitializingError, VioFatalError) as exc: raised.append(type(exc)) # Assert — last raise is VioFatalError + state == LOST assert raised, "AC-7: at least one VioError must have been raised" assert raised[-1] is VioFatalError assert strategy.health_snapshot().state == VioState.LOST # ---------------------------------------------------------------------- # AC-8: BUILD_KLT_RANSAC=OFF does not import the strategy module. def test_ac8_strategy_module_not_imported_at_package_load() -> None: """The package ``__init__.py`` MUST NOT eagerly import the ``klt_ransac`` concrete module; AZ-331's factory does the lazy import. ``BUILD_KLT_RANSAC=OFF`` gating + the :func:`build_vio_strategy` factory test already cover the end-to-end behaviour (`test_protocol_conformance.py`); here we just assert the import side-effect property. """ # Read the package __init__ source verbatim and assert that the # only string referencing klt_ransac is the config + factory # boundary (NOT a concrete-strategy class import). init_source = ( _KLT_RANSAC_SOURCE_PATH.parent / "__init__.py" ).read_text(encoding="utf-8") # The init MUST NOT import the strategy class directly. We grep # only the executable portion of the file (lines beginning with # ``from `` or ``import ``, plus the ``__all__`` literal) so the # docstring's free-text mention of ``KltRansacStrategy`` does not # trip the assertion. executable_lines = [ line for line in init_source.splitlines() if line.lstrip().startswith(("from ", "import ")) ] forbidden = "gps_denied_onboard.components.c1_vio.klt_ransac" assert all(forbidden not in line for line in executable_lines), ( "AC-8: c1_vio/__init__.py must NOT eagerly import KltRansacStrategy " "(violates the lazy-import boundary the AZ-331 factory relies on)" ) # __all__ also must not re-export it. import ast module_ast = ast.parse(init_source) all_names: list[str] = [] for node in ast.walk(module_ast): if ( isinstance(node, ast.Assign) and len(node.targets) == 1 and isinstance(node.targets[0], ast.Name) and node.targets[0].id == "__all__" and isinstance(node.value, (ast.List, ast.Tuple)) ): all_names = [elt.value for elt in node.value.elts if isinstance(elt, ast.Constant)] break assert "KltRansacStrategy" not in all_names, ( "AC-8: c1_vio/__init__.py.__all__ must NOT contain KltRansacStrategy" ) # ---------------------------------------------------------------------- # AC-9: Honest covariance — no shrinkage during DEGRADED (tier2). @pytest.mark.tier2 def test_ac9_honest_covariance_monotonic_during_degraded( monkeypatch: pytest.MonkeyPatch, ) -> None: """60-frame controlled-degradation: covariance Frobenius is monotonically non-decreasing from the first DEGRADED transition until either TRACKING is restored or LOST is reached. Tier-2 because it walks 60 synthetic frames + injects a step inlier-count drop — not a per-batch full-suite blocker. Real Derkachi fixtures bind this on Jetson via C1-IT-12. """ config = Config.with_blocks( c1_vio=C1VioConfig( strategy="klt_ransac", klt_ransac=KltRansacConfig(min_features_for_pose=50, ransac_inlier_ratio=0.5), warm_start_max_frames=1, lost_frame_threshold=100, ), runtime=RuntimeConfig(), ) strategy, _captured = _new_strategy(config=config) calibration = _make_calibration() strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=70)), _imu_for_frame(1), calibration, ) from gps_denied_onboard.helpers.ransac_filter import RansacResult # Step-function: inlier count drops from 80 → 25 at frame 30, # then climbs back to 80 at frame 50 (recovery NOT covered by AC-9; # the AC asserts non-decreasing during DEGRADED). inlier_sequence = [80] * 28 + [40, 35, 30, 28, 26, 24, 22, 20, 18, 16, 15, 14, 13, 12] pad = [10] * max(0, 60 - len(inlier_sequence) - 1) inlier_sequence.extend(pad) counter = {"i": 0} rng = np.random.default_rng(seed=12321) def _scripted_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: n = inlier_sequence[counter["i"] % len(inlier_sequence)] counter["i"] += 1 inliers = np.column_stack([ rng.uniform(50.0, 250.0, size=n), rng.uniform(50.0, 250.0, size=n), rng.uniform(50.0, 250.0, size=n), rng.uniform(50.0, 250.0, size=n), ]) return RansacResult( inlier_correspondences=inliers, inlier_count=n, outlier_count=0, median_residual_px=0.4, ) def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, None]: # Returning None for em_mask routes the strategy to use # ransac_result.inlier_count for the final classification — # exactly what we want so the inlier_sequence drives state. return np.eye(3, dtype=np.float64), None def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: R = np.eye(3, dtype=np.float64) t = np.array([[0.01], [0.0], [0.0]], dtype=np.float64) return 1, R, t, np.empty((0, 1), dtype=np.uint8) monkeypatch.setattr( klt_ransac_module.RansacFilter, "filter_correspondences", staticmethod(_scripted_filter), ) monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) # Walk the frames and record (state, cov_frobenius) per frame. cov_history: list[tuple[VioState, float]] = [] for idx in range(2, 50): try: out = strategy.process_frame( _frame(idx=idx, image=_synthetic_frame_image(seed=70, shift_x=(idx % 7))), _imu_for_frame(idx), calibration, ) except VioError: break cov_history.append( (strategy.health_snapshot().state, float(np.linalg.norm(out.pose_covariance_6x6, ord="fro"))) ) # Find the first DEGRADED entry; monotonicity holds from there # until either TRACKING is restored OR the run ends. first_degraded = next( (idx for idx, (s, _f) in enumerate(cov_history) if s == VioState.DEGRADED), None ) assert first_degraded is not None, "AC-9: never reached DEGRADED state" prior_norm = cov_history[first_degraded][1] for state, norm in cov_history[first_degraded + 1 :]: if state != VioState.DEGRADED: break assert norm + 1e-12 >= prior_norm, ( f"AC-9 honest-covariance violation: cov Frobenius shrank during " f"DEGRADED (was {prior_norm}, now {norm})" ) prior_norm = norm # ---------------------------------------------------------------------- # AC-10: One FDR vio.health record per state transition. def test_ac10_fdr_vio_health_emitted_per_transition( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange strategy, captured = _new_strategy( config=_config(min_features_for_pose=5, max_corners=200, ransac_inlier_ratio=0.5) ) calibration = _make_calibration() _patch_pose_recovery(monkeypatch, inlier_count=40) # Act — first frame triggers INIT transition strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=81)), _imu_for_frame(1), calibration, ) _drain(strategy._fdr, captured) init_records = [r for r in captured if r.kind == "vio.health"] # Second frame — possibly transitions to TRACKING (or stays INIT # depending on warm_start_max_frames). strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=81, shift_x=3)), _imu_for_frame(2), calibration, ) _drain(strategy._fdr, captured) # Assert all_records = [r for r in captured if r.kind == "vio.health"] # Exactly one INIT record (no spam) init_states = [r.payload["state"] for r in init_records] assert init_states.count("init") == 1 # Every record carries the strategy label. for rec in all_records: assert rec.payload["strategy_label"] == "klt_ransac" assert rec.producer_id == "c1_vio.klt_ransac" # ---------------------------------------------------------------------- # AC-11: Camera-agnostic source + run with two calibrations. _ADTI_LITERAL_RE = re.compile(r"\badti(?:20|26)\b", re.IGNORECASE) def _strip_docstrings(source: str) -> str: """Return ``source`` with module/class/function docstrings replaced by empty lines. The AC-11 grep gate should match executable code only — mentioning the deployed-camera ID in a docstring (e.g. ``"AC-11: no adti20 literals"``) is documentation, not a hard-coded branch. """ import ast tree = ast.parse(source) docstring_lines: set[int] = set() for node in ast.walk(tree): if ( isinstance(node, (ast.Module, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.body ): first = node.body[0] if ( isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str) ): end_lineno = first.end_lineno or first.lineno for ln in range(first.lineno, end_lineno + 1): docstring_lines.add(ln) return "\n".join( "" if (idx + 1) in docstring_lines else line for idx, line in enumerate(source.splitlines()) ) def test_ac11_source_has_no_camera_id_literals() -> None: """AC-11 CI-grep gate: ``klt_ransac.py`` must not embed any deployed-camera ID literal in executable code (docstring mentions are documentation and excluded from the grep). The calibration arrives via the per-call :class:`CameraCalibration` argument. """ source = _KLT_RANSAC_SOURCE_PATH.read_text(encoding="utf-8") code_only = _strip_docstrings(source) matches = _ADTI_LITERAL_RE.findall(code_only) assert not matches, ( f"AC-11: klt_ransac.py must be camera-agnostic; found literals " f"in executable code: {matches}" ) def test_ac11_strategy_handles_two_distinct_calibrations( monkeypatch: pytest.MonkeyPatch, ) -> None: """The same code path produces a sensible :class:`VioOutput` for two distinct ``CameraCalibration`` instances; no calibration- specific branch exists in the source. """ # Arrange — strategy 1 uses the default test camera; strategy 2 # uses an alternate (different f, cx, cy). s1, _ = _new_strategy( config=_config(min_features_for_pose=5, max_corners=200, ransac_inlier_ratio=0.5) ) s2, _ = _new_strategy( config=_config(min_features_for_pose=5, max_corners=200, ransac_inlier_ratio=0.5) ) cal_a = _make_calibration() cal_b = _alternate_calibration() _patch_pose_recovery(monkeypatch, inlier_count=40) # Act — drive the first two frames through each strategy with its # respective calibration. out_a1 = s1.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=91)), _imu_for_frame(1), cal_a, ) out_a2 = s1.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=91, shift_x=4)), _imu_for_frame(2), cal_a, ) out_b1 = s2.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=92)), _imu_for_frame(1), cal_b, ) out_b2 = s2.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=92, shift_x=4)), _imu_for_frame(2), cal_b, ) # Assert — both calibrations produce VioOutput with SPD cov. for out in (out_a1, out_a2, out_b1, out_b2): assert isinstance(out, VioOutput) eigenvalues = np.linalg.eigvalsh(out.pose_covariance_6x6) assert np.all(eigenvalues > 0.0) # ---------------------------------------------------------------------- # NFR-perf — process_frame p95 budget (tier2). @pytest.mark.tier2 def test_nfr_perf_process_frame_records_p95(monkeypatch: pytest.MonkeyPatch) -> None: """Tier-2: process_frame p95 must complete the per-frame loop within the AZ-334 budget (≤ 80 ms shared with OKVIS2 per description.md). We record p95 and assert against a loose macOS- dev sanity ceiling — the real C1-PT-01 Tier-2 fixture binds the strict bound on the Jetson AGX Orin runner. """ import time as _time strategy, _captured = _new_strategy( config=_config(min_features_for_pose=5, max_corners=200, ransac_inlier_ratio=0.5) ) calibration = _make_calibration() _patch_pose_recovery(monkeypatch, inlier_count=40) strategy.process_frame( _frame(idx=1, image=_synthetic_frame_image(seed=101)), _imu_for_frame(1), calibration, ) durations_ms: list[float] = [] for idx in range(2, 22): t0 = _time.perf_counter() strategy.process_frame( _frame(idx=idx, image=_synthetic_frame_image(seed=101, shift_x=idx % 6)), _imu_for_frame(idx), calibration, ) durations_ms.append((_time.perf_counter() - t0) * 1000.0) durations_ms.sort() p95 = durations_ms[int(0.95 * len(durations_ms))] # Loose 5-second sanity ceiling (the real budget lives on Jetson). assert p95 < 5000.0, f"process_frame p95={p95:.2f} ms exceeds sanity ceiling" # ---------------------------------------------------------------------- # Extra surface coverage — KltRansacConfig validation. @pytest.mark.parametrize( "kwargs, fragment", [ ({"max_corners": 3}, "max_corners"), ({"klt_window_size_px": 4}, "klt_window_size_px"), ({"klt_pyramid_levels": 0}, "klt_pyramid_levels"), ({"min_features_for_pose": 3}, "min_features_for_pose"), ({"ransac_inlier_ratio": 0.0}, "ransac_inlier_ratio"), ({"ransac_inlier_ratio": 1.5}, "ransac_inlier_ratio"), ({"essential_matrix_ransac_threshold_px": 0.0}, "essential_matrix_ransac_threshold_px"), ], ) def test_klt_ransac_config_validation(kwargs: dict[str, Any], fragment: str) -> None: from gps_denied_onboard.config.schema import ConfigError with pytest.raises(ConfigError) as exc_info: KltRansacConfig(**kwargs) assert fragment in str(exc_info.value) def test_klt_ransac_config_defaults_round_trip() -> None: cfg = KltRansacConfig() assert cfg.max_corners >= 5 assert cfg.klt_window_size_px % 2 == 1 assert cfg.klt_pyramid_levels >= 1 assert cfg.min_features_for_pose >= 5 assert 0.0 < cfg.ransac_inlier_ratio <= 1.0 assert cfg.essential_matrix_ransac_threshold_px > 0.0 assert cfg.per_frame_debug_log is False