"""AZ-386 — `EskfStateEstimator` mandatory simple-baseline. Covers AC-1..AC-10 from ``_docs/02_tasks/todo/AZ-386_c5_eskf_baseline.md``. The tests construct an estimator directly (no GTSAM required) and exercise: - AC-1: Protocol conformance (``isinstance`` against ``StateEstimator``). - AC-2: Prediction step accuracy on a stationary synthetic IMU sequence (drift < 1 m over 5 s of zero specific force). - AC-3: ``add_vio`` updates state + shrinks covariance over consistent measurements. - AC-4: ``add_pose_anchor`` integrates BOTH ``marginals`` and ``jacobian`` modes (no skip — ESKF has no graph). - AC-5: Every emitted ``covariance_6x6`` is SPD; injected non-SPD raises ``EstimatorFatalError``. - AC-6: ``smoothed_history`` returns ``smoothed=False`` entries. - AC-7: ``BUILD_STATE_ESKF=OFF`` rejected by the factory. - AC-8: ``SourceLabelStateMachine`` is auto-wired (label updates on satellite anchor). - AC-9: Filter divergence handling raises ``EstimatorFatalError``. - AC-10: ``config.strategy="eskf"`` + ``BUILD_STATE_ESKF=ON`` resolves to ``EskfStateEstimator`` via the runtime root factory. """ from __future__ import annotations import dataclasses from datetime import datetime, timezone from typing import Any from unittest import mock from uuid import uuid4 import numpy as np import pytest from gps_denied_onboard._types.fc import GpsHealth, GpsStatus from gps_denied_onboard._types.nav import ImuSample, ImuWindow from gps_denied_onboard._types.pose import PoseEstimate from gps_denied_onboard._types.state import IsamState, PoseSourceLabel from gps_denied_onboard._types.vio import VioOutput from gps_denied_onboard.components.c5_state import ( C5StateConfig, EstimatorFatalError, StateEstimator, StateEstimatorConfigError, ) from gps_denied_onboard.components.c5_state.eskf_baseline import ( EskfStateEstimator, create, register, ) from gps_denied_onboard.config import load_config from gps_denied_onboard.config.schema import Config from gps_denied_onboard.runtime_root.state_factory import ( build_state_estimator, clear_state_ingest_binding, clear_state_registry, ) # ---------------------------------------------------------------------- # Fixtures + helpers. @pytest.fixture(autouse=True) def _isolation(): clear_state_registry() clear_state_ingest_binding() yield clear_state_registry() clear_state_ingest_binding() def _build_config(**state_overrides: Any) -> Config: cfg = load_config(env={}, paths=(), require_env=False) new_state = dataclasses.replace(C5StateConfig(strategy="eskf"), **state_overrides) components = dict(cfg.components or {}) components["c5_state"] = new_state return dataclasses.replace(cfg, components=components) def _make_estimator(**kwargs: Any) -> EskfStateEstimator: cfg = _build_config(**kwargs) return EskfStateEstimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) def _identity_pose() -> np.ndarray: return np.eye(4, dtype=np.float64) def _pose_translated(x: float, y: float, z: float) -> np.ndarray: p = np.eye(4, dtype=np.float64) p[0, 3] = x p[1, 3] = y p[2, 3] = z return p def _vio( frame_id: int, ts_ns: int, pose: np.ndarray, cov: np.ndarray | None = None, ) -> VioOutput: # Default sigma ≈ 0.1 m / 0.1 rad — realistic VIO uncertainty. return VioOutput( frame_id=frame_id, timestamp=datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc), pose_se3=pose, covariance_6x6=cov if cov is not None else np.eye(6) * 0.01, ) def _pose_anchor( frame_id: int, ts_ns: int, pose: np.ndarray, cov: np.ndarray | None = None, mode: str = "marginals", ) -> PoseEstimate: return PoseEstimate( frame_id=frame_id, timestamp=datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc), pose_se3=pose, covariance_6x6=cov if cov is not None else np.eye(6) * 0.01, covariance_mode=mode, mre_px=0.5, ) def _imu_window( *, ts_start_ns: int, dt_s: float, n_samples: int, accel: tuple, gyro: tuple ) -> ImuWindow: samples = tuple( ImuSample( ts_ns=ts_start_ns + int(i * dt_s * 1_000_000_000), accel_xyz=accel, gyro_xyz=gyro, ) for i in range(n_samples) ) return ImuWindow( samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns, ) # ---------------------------------------------------------------------- # AC-1: Protocol conformance. def test_ac1_eskf_satisfies_state_estimator_protocol() -> None: estimator = _make_estimator() assert isinstance(estimator, StateEstimator) # ---------------------------------------------------------------------- # AC-2: Prediction step accuracy. def test_ac2_prediction_drift_under_one_metre_over_five_seconds() -> None: # Arrange — stationary FC: specific force = -gravity in body frame # (so world-frame accel is 0). Zero gyro. 100 Hz over 5 s. estimator = _make_estimator() accel_body_stationary = (0.0, 0.0, 9.80665) window = _imu_window( ts_start_ns=1_000_000_000, dt_s=0.01, n_samples=501, accel=accel_body_stationary, gyro=(0.0, 0.0, 0.0), ) # Act estimator.add_fc_imu(window) # Assert — under perfect zero specific force the nominal # position must remain near the origin. Forward Euler accrues a # small residual from quaternion-renormalisation noise. drift = float(np.linalg.norm(estimator._nominal_pos)) assert drift < 1.0 # ---------------------------------------------------------------------- # AC-3: Relative-pose update via add_vio. def test_ac3_add_vio_updates_state_and_covariance_shrinks() -> None: # Arrange — two consistent VIO frames; first frame seeds, second # frame measures a 1 m forward delta. estimator = _make_estimator() cov_before_first = estimator._pose_covariance_6x6().copy() estimator.add_vio(_vio(1, 1_000_000_000, _identity_pose())) estimator.add_vio(_vio(2, 1_100_000_000, _pose_translated(1.0, 0.0, 0.0))) # Assert cov_after = estimator._pose_covariance_6x6() assert np.linalg.norm(cov_after, ord="fro") <= np.linalg.norm(cov_before_first, ord="fro") # ---------------------------------------------------------------------- # AC-4: Absolute-pose update; mode doesn't skip ESKF. def test_ac4_add_pose_anchor_runs_for_marginals_and_jacobian() -> None: # Arrange — two anchors, one in each mode; both must shift the # nominal position toward the measurement. estimator_m = _make_estimator() estimator_j = _make_estimator() pose_m = _pose_anchor(1, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="marginals") pose_j = _pose_anchor(2, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="jacobian") # Act estimator_m.add_pose_anchor(pose_m) estimator_j.add_pose_anchor(pose_j) # Assert — both modes move the nominal toward the measurement. assert estimator_m._nominal_pos[0] > 0.1 assert estimator_j._nominal_pos[0] > 0.1 assert estimator_m._last_anchor_ns > 0 assert estimator_j._last_anchor_ns > 0 def test_ac4_pose_anchor_sets_last_anchor_ns() -> None: estimator = _make_estimator() pose = _pose_anchor(1, 1_000_000_000, _pose_translated(1.0, 0.0, 0.0)) estimator.add_pose_anchor(pose) assert estimator._last_anchor_ns > 0 # ---------------------------------------------------------------------- # AC-5: SPD covariance enforcement. def test_ac5_current_estimate_emits_spd_covariance() -> None: estimator = _make_estimator() estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) out = estimator.current_estimate() assert out.covariance_6x6.shape == (6, 6) np.linalg.cholesky(out.covariance_6x6) def test_ac5_non_spd_covariance_raises_fatal() -> None: # Arrange — force the internal covariance to be indefinite. estimator = _make_estimator() P = np.eye(16, dtype=np.float64) P[0, 0] = -1.0 # negative variance on x-position estimator._P = P # Act / Assert with pytest.raises(EstimatorFatalError, match="not SPD"): estimator.current_estimate() assert estimator._isam2_state == IsamState.LOST # ---------------------------------------------------------------------- # AC-6: smoothed_history honesty. def test_ac6_smoothed_history_returns_smoothed_false_entries() -> None: estimator = _make_estimator() estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) estimator.current_estimate() estimator.add_pose_anchor(_pose_anchor(2, 1_100_000_000, _pose_translated(0.5, 0.0, 0.0))) estimator.current_estimate() history = estimator.smoothed_history(n_keyframes=10) assert len(history) == 2 assert all(entry.smoothed is False for entry in history) def test_ac6_smoothed_history_empty_buffer_returns_empty() -> None: estimator = _make_estimator() assert estimator.smoothed_history(10) == [] def test_ac6_smoothed_history_n_zero_returns_empty() -> None: estimator = _make_estimator() estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) estimator.current_estimate() assert estimator.smoothed_history(0) == [] # ---------------------------------------------------------------------- # AC-7: BUILD_STATE_ESKF=OFF rejection. def test_ac7_build_state_eskf_off_rejected(monkeypatch: pytest.MonkeyPatch) -> None: register() monkeypatch.setenv("BUILD_STATE_ESKF", "OFF") cfg = _build_config() with pytest.raises(StateEstimatorConfigError, match="BUILD_STATE_ESKF is OFF"): build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) # ---------------------------------------------------------------------- # AC-8: SourceLabelStateMachine integration. def test_ac8_source_label_machine_is_auto_wired() -> None: # Arrange — gate is open by default; the first satellite anchor # transitions the label DEAD_RECKONED → SATELLITE_ANCHORED. estimator = _make_estimator() assert estimator.current_estimate().source_label == PoseSourceLabel.DEAD_RECKONED # Act estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) out_after = estimator.current_estimate() # Assert assert out_after.source_label == PoseSourceLabel.SATELLITE_ANCHORED def test_ac8_spoof_status_closes_gate_and_rejects_anchor() -> None: # Arrange — SPOOFED GPS latches the gate closed; next anchor is # a promotion attempt that fails (no consistency check possible # because the estimator passes ``gps_consistency_delta_m=None``). estimator = _make_estimator() received: list[Any] = [] estimator.subscribe_spoof_rejection( lambda reason, severity, text: received.append((reason, severity, text)) ) estimator.notify_gps_health(_gps_status(GpsStatus.SPOOFED), now_ns=0) # Act estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) # Assert assert len(received) == 1 assert estimator.current_estimate().source_label == PoseSourceLabel.VISUAL_PROPAGATED # ---------------------------------------------------------------------- # AC-9: Filter divergence handling. def test_ac9_filter_divergence_raises_fatal() -> None: # Arrange — feed a pose anchor whose measured position is wildly # inconsistent with the nominal state, with a tight measurement # covariance. innov_mag = 1000 m; sqrt(||R||_F) ≈ 1e-2 → ratio ≫ 10. estimator = _make_estimator() far_pose = _pose_translated(1000.0, 0.0, 0.0) tight_cov = np.eye(6) * 1e-6 # Act / Assert with pytest.raises(EstimatorFatalError, match="filter divergence"): estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, far_pose, cov=tight_cov)) assert estimator._isam2_state == IsamState.LOST # ---------------------------------------------------------------------- # AC-10: Composition wiring through the runtime root. def test_ac10_factory_resolves_eskf_strategy(monkeypatch: pytest.MonkeyPatch) -> None: register() monkeypatch.setenv("BUILD_STATE_ESKF", "ON") cfg = _build_config(strategy="eskf") estimator, handle = build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) assert isinstance(estimator, EskfStateEstimator) assert handle is None def test_ac10_module_level_create_returns_protocol_and_none_handle() -> None: cfg = _build_config() estimator, handle = create( config=cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) assert isinstance(estimator, StateEstimator) assert handle is None # ---------------------------------------------------------------------- # Other robustness checks. def test_health_snapshot_reports_init_then_tracking() -> None: estimator = _make_estimator() assert estimator.health_snapshot().isam2_state == IsamState.INIT estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) assert estimator.health_snapshot().isam2_state == IsamState.TRACKING def test_current_estimate_returns_uuid_frame_id_and_increments_keyframes() -> None: estimator = _make_estimator() estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) out = estimator.current_estimate() assert isinstance(out.frame_id, type(uuid4())) assert estimator.health_snapshot().keyframe_count == 1 def test_out_of_order_vio_raises_degraded() -> None: from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError estimator = _make_estimator() estimator.add_vio(_vio(1, 2_000_000_000, _identity_pose())) with pytest.raises(EstimatorDegradedError, match="out-of-order"): estimator.add_vio(_vio(2, 1_000_000_000, _identity_pose())) def test_pose_anchor_sets_velocity_to_zero_initially() -> None: estimator = _make_estimator() estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose())) out = estimator.current_estimate() assert out.velocity_world_mps == (0.0, 0.0, 0.0) # ---------------------------------------------------------------------- # Helpers for AC-8 GPS health construction. def _gps_status(status: GpsStatus) -> GpsHealth: return GpsHealth(status=status, fix_age_ms=0, captured_at=0)