"""AZ-922 — cheirality / rotation-plausibility gate in KLT/RANSAC. Covers AC-1 (axis-angle helper math), AC-2/AC-3 (gate threshold + routing through `_pose_recovery_failed`), and AC-4 (boundary + AC-7 lost-frame escalation). AC-5 (Jetson e2e Derkachi past frame 5) is gated separately via the Tier-2 harness — not in unit scope. """ from __future__ import annotations import math from datetime import datetime, timezone from typing import Any import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import ( ImuSample, ImuWindow, NavCameraFrame, ) from gps_denied_onboard.components.c1_vio import C1VioConfig, KltRansacConfig from gps_denied_onboard.components.c1_vio import klt_ransac as klt_ransac_module from gps_denied_onboard.components.c1_vio.errors import ( VioFatalError, VioInitializingError, ) from gps_denied_onboard.components.c1_vio.klt_ransac import ( KltRansacStrategy, _rotation_angle_rad, ) from gps_denied_onboard.config.schema import Config, RuntimeConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.helpers.ransac_filter import RansacResult # ---------------------------------------------------------------------- # AC-1 — pure helper math # ---------------------------------------------------------------------- def test_rotation_angle_identity_is_zero() -> None: # Assert assert _rotation_angle_rad(np.eye(3, dtype=np.float64)) == pytest.approx(0.0, abs=1e-12) def test_rotation_angle_180_about_x_is_pi() -> None: # Arrange — R rotates 180° about the body-X axis: trace = 1 + (-1) + (-1) = -1. R = np.array( [ [1.0, 0.0, 0.0], [0.0, -1.0, 0.0], [0.0, 0.0, -1.0], ], dtype=np.float64, ) # Assert assert _rotation_angle_rad(R) == pytest.approx(math.pi, abs=1e-9) def test_rotation_angle_general_diagonal_recovers_axis_angle_magnitude() -> None: # Arrange — 30° rotation about body-Z: trace = cos(30)+cos(30)+1. theta = math.radians(30.0) R = np.array( [ [math.cos(theta), -math.sin(theta), 0.0], [math.sin(theta), math.cos(theta), 0.0], [0.0, 0.0, 1.0], ], dtype=np.float64, ) # Assert assert _rotation_angle_rad(R) == pytest.approx(theta, abs=1e-9) def test_rotation_angle_arccos_argument_clipped_against_float_drift() -> None: # Arrange — synthesise an R whose trace is *just* above 3.0 (impossible # in exact arithmetic but achievable via float drift). Without the # arccos clip this would yield NaN; with the clip it must return 0. R = np.eye(3, dtype=np.float64) + np.full((3, 3), 1e-15, dtype=np.float64) # Assert assert _rotation_angle_rad(R) == pytest.approx(0.0, abs=1e-6) assert math.isfinite(_rotation_angle_rad(R)) def test_rotation_angle_uses_jetson_frame_5_diagonal_signature() -> None: # Arrange — exact R diagonal observed at Derkachi frame 5 (the # divergence point). Trace = -0.848 - 0.639 + 0.487 = -1.0. R = np.diag([-0.848, -0.639, 0.487]).astype(np.float64) # Assert — recover the 180° rotation the ESKF traceback implied. assert _rotation_angle_rad(R) == pytest.approx(math.pi, abs=1e-9) # ---------------------------------------------------------------------- # Strategy-level scaffolding (mirrors test_az920_klt_ransac_scale_integration.py) # ---------------------------------------------------------------------- def _calibration() -> CameraCalibration: return CameraCalibration( camera_id="khp20s30-test", intrinsics_3x3=np.array( [ [1680.0, 0.0, 960.0], [0.0, 1680.0, 540.0], [0.0, 0.0, 1.0], ], dtype=np.float64, ), distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="test_az922", ) def _frame(idx: int) -> NavCameraFrame: rng = np.random.default_rng(seed=idx) image = (rng.integers(0, 255, size=(240, 240), dtype=np.int16)).astype(np.uint8) return NavCameraFrame( frame_id=idx, timestamp=datetime.fromtimestamp(idx * 0.1, tz=timezone.utc), image=image, camera_calibration_id="khp20s30-test", ) def _imu_window(frame_idx: int) -> ImuWindow: ts_start_ns = 1_000_000_000 + frame_idx * 100_000_000 samples = tuple( ImuSample( ts_ns=ts_start_ns + i * 5_000_000, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0), ) for i in range(3) ) return ImuWindow( samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns, ) def _config(lost_frame_threshold: int = 3) -> Config: return Config.with_blocks( c1_vio=C1VioConfig( strategy="klt_ransac", klt_ransac=KltRansacConfig(), lost_frame_threshold=lost_frame_threshold, ), runtime=RuntimeConfig(), ) def _patch_pose_recovery_with_rotation( monkeypatch: pytest.MonkeyPatch, *, R_3x3: np.ndarray, inlier_count: int = 40, ) -> None: """Force the geometry stack to a controlled path with a chosen R. Mirrors the AZ-920 test helper but takes the rotation matrix as a parameter so AZ-922 can exercise the cheirality gate directly. """ rng = np.random.default_rng(seed=271828) base = rng.uniform(50.0, 1800.0, size=(inlier_count, 2)) inliers = np.column_stack([base, base + np.array([5.0, 0.0])]) mask = np.ones((inlier_count, 1), dtype=np.uint8) def _fake_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: return RansacResult( inlier_correspondences=inliers, inlier_count=inlier_count, outlier_count=0, median_residual_px=0.5, ) def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, np.ndarray]: return np.eye(3, dtype=np.float64), mask def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: t_col = np.array([[1.0], [0.0], [0.0]], dtype=np.float64) return inlier_count, R_3x3.astype(np.float64), t_col, mask monkeypatch.setattr( klt_ransac_module.RansacFilter, "filter_correspondences", staticmethod(_fake_filter), ) monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) def _fdr_client() -> FdrClient: return FdrClient(producer_id="test.az922", capacity=16, _emit_diag_log=False) def _r_about_z(theta_rad: float) -> np.ndarray: return np.array( [ [math.cos(theta_rad), -math.sin(theta_rad), 0.0], [math.sin(theta_rad), math.cos(theta_rad), 0.0], [0.0, 0.0, 1.0], ], dtype=np.float64, ) # ---------------------------------------------------------------------- # AC-2/AC-3 — gate threshold + routing through _pose_recovery_failed # ---------------------------------------------------------------------- def test_identity_rotation_passes_gate(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=np.eye(3)) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) # Assert assert second is not None assert second.frame_id == "2" def test_five_degree_rotation_passes_gate(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange — 5° rotation, well below the 30° default threshold. _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=_r_about_z(math.radians(5.0))) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) # Assert assert second is not None def test_sixty_degree_rotation_rejected_by_gate(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange — 60° rotation exceeds the 30° default threshold. _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=_r_about_z(math.radians(60.0))) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act + Assert strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) def test_180_degree_rotation_rejected_by_gate(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange — synthesise the Jetson-observed frame-5 R signature. R_180 = np.diag([-0.848, -0.639, 0.487]).astype(np.float64) _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=R_180) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act + Assert strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) def test_threshold_boundary_just_above_rejects(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange — 30.1° vs 30° threshold. _patch_pose_recovery_with_rotation( monkeypatch, R_3x3=_r_about_z(math.radians(30.1)) ) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act + Assert strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) def test_threshold_boundary_just_below_passes(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange — 29.9° just below 30° threshold. _patch_pose_recovery_with_rotation( monkeypatch, R_3x3=_r_about_z(math.radians(29.9)) ) strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) calibration = _calibration() # Act strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) # Assert assert second is not None # ---------------------------------------------------------------------- # AC-4 (AC-7 path) — consecutive rejections escalate to VioFatalError # ---------------------------------------------------------------------- def test_consecutive_rejections_eventually_raise_vio_fatal( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange — every frame yields a 180° flip; lost_frame_threshold=3 # means the 4th frame in this stream (the 3rd consecutive failure) # crosses the LOST gate. R_180 = np.diag([-1.0, -1.0, 1.0]).astype(np.float64) # 180° about Z _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=R_180) strategy = KltRansacStrategy( _config(lost_frame_threshold=3), fdr_client=_fdr_client() ) calibration = _calibration() # Act — frame 1 seeds INIT; frames 2-4 hit the gate and tick lost. strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) for idx in range(2, 4): with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): strategy.process_frame(_frame(idx=idx), _imu_window(idx), calibration) # The 4th consecutive failed frame trips the LOST/VioFatalError gate. with pytest.raises(VioFatalError, match="exhausted lost-frame budget"): strategy.process_frame(_frame(idx=4), _imu_window(4), calibration) # ---------------------------------------------------------------------- # Config validation — defends the (0, π] range # ---------------------------------------------------------------------- def test_config_rejects_zero_or_negative_threshold() -> None: # Act + Assert with pytest.raises(Exception, match="max_frame_rotation_rad must be in"): KltRansacConfig(max_frame_rotation_rad=0.0) def test_config_rejects_threshold_above_pi() -> None: # Act + Assert with pytest.raises(Exception, match="max_frame_rotation_rad must be in"): KltRansacConfig(max_frame_rotation_rad=math.pi + 0.01) def test_config_accepts_pi_exactly() -> None: # Act — the inclusive upper bound. No rotation can exceed π anyway. cfg = KltRansacConfig(max_frame_rotation_rad=math.pi) # Assert assert cfg.max_frame_rotation_rad == pytest.approx(math.pi)