"""AZ-381 — StateEstimator Protocol + DTOs + factory + concrete handle. Covers all 10 ACs of AZ-381 (see ``_docs/02_tasks/done/AZ-381...``). """ from __future__ import annotations import dataclasses import logging import threading from typing import Any from unittest import mock from uuid import uuid4 import numpy as np import pytest from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.pose import PoseEstimate from gps_denied_onboard._types.state import ( EstimatorHealth, EstimatorOutput, IsamState, PoseSourceLabel, Quat, ) from gps_denied_onboard._types.nav import VioOutput from gps_denied_onboard.components.c5_state import ( C5StateConfig, EstimatorDegradedError, EstimatorFatalError, StateEstimator, StateEstimatorConfigError, StateEstimatorError, ) from gps_denied_onboard.components.c5_state._isam2_handle import ( ISam2GraphHandle, ISam2GraphHandleImpl, ) from gps_denied_onboard.config import load_config from gps_denied_onboard.config.schema import Config, ConfigError from gps_denied_onboard.runtime_root.state_factory import ( StateIngestThreadAlreadyBoundError, bind_state_ingest_thread, build_state_estimator, clear_state_ingest_binding, clear_state_registry, register_state_estimator, ) @pytest.fixture(autouse=True) def _isolation(): clear_state_registry() clear_state_ingest_binding() yield clear_state_registry() clear_state_ingest_binding() def _make_estimator_output() -> EstimatorOutput: return EstimatorOutput( frame_id=uuid4(), position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0), orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=np.eye(6), source_label=PoseSourceLabel.VISUAL_PROPAGATED, last_satellite_anchor_age_ms=0, smoothed=False, emitted_at=0, ) def _make_estimator_health() -> EstimatorHealth: return EstimatorHealth( isam2_state=IsamState.TRACKING, keyframe_count=15, cov_norm_growing_for_s=0.0, spoof_promotion_blocked=False, ) def _build_config(**state_overrides: Any) -> Config: cfg = load_config(env={}, paths=(), require_env=False) # Replace c5_state block with overrides. new_state = dataclasses.replace(C5StateConfig(), **state_overrides) components = dict(cfg.components or {}) components["c5_state"] = new_state return dataclasses.replace(cfg, components=components) class _FakeEstimator: """Test fake satisfying every StateEstimator method (AC-1).""" def set_takeoff_origin( self, origin: LatLonAlt, sigma_horiz_m: float, sigma_vert_m: float, ) -> None: pass def add_vio(self, vio: VioOutput) -> None: pass def add_pose_anchor(self, pose: PoseEstimate) -> None: pass def add_fc_imu(self, imu_window: Any) -> None: pass def current_estimate(self) -> EstimatorOutput: return _make_estimator_output() def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]: return [_make_estimator_output() for _ in range(min(n_keyframes, 5))] def health_snapshot(self) -> EstimatorHealth: return _make_estimator_health() def _fake_handle(estimator: Any) -> ISam2GraphHandle: return ISam2GraphHandleImpl(estimator) # ---------------------------------------------------------------------- # AC-1: Protocol conformance via @runtime_checkable def test_ac1_protocol_runtime_checkable() -> None: # Arrange fake = _FakeEstimator() # Assert assert isinstance(fake, StateEstimator) def test_ac1_missing_method_fails_isinstance() -> None: class _Incomplete: def add_vio(self, vio: VioOutput) -> None: pass # Assert — missing 6 methods → not a StateEstimator assert not isinstance(_Incomplete(), StateEstimator) # ---------------------------------------------------------------------- # AC-2: DTOs frozen + slots def test_ac2_estimator_output_frozen_and_slotted() -> None: out = _make_estimator_output() # Assert — __slots__ exists and is non-empty assert hasattr(EstimatorOutput, "__slots__") assert len(EstimatorOutput.__slots__) > 0 # Assert — mutation raises with pytest.raises(dataclasses.FrozenInstanceError): out.smoothed = True # type: ignore[misc] def test_ac2_estimator_health_frozen_and_slotted() -> None: h = _make_estimator_health() assert hasattr(EstimatorHealth, "__slots__") assert len(EstimatorHealth.__slots__) > 0 with pytest.raises(dataclasses.FrozenInstanceError): h.keyframe_count = 99 # type: ignore[misc] # ---------------------------------------------------------------------- # AC-3: IsamState enum has 4 values def test_ac3_isam_state_has_four_values() -> None: members = {m.name for m in IsamState} assert members == {"INIT", "TRACKING", "DEGRADED", "LOST"} # ---------------------------------------------------------------------- # AC-4: Factory rejects build-flag OFF def test_ac4_build_flag_off_rejected(monkeypatch: pytest.MonkeyPatch) -> None: register_state_estimator("gtsam_isam2", lambda **_: (_FakeEstimator(), _fake_handle(None))) monkeypatch.setenv("BUILD_STATE_GTSAM_ISAM2", "OFF") cfg = _build_config(strategy="gtsam_isam2") with pytest.raises(StateEstimatorConfigError, match="BUILD_STATE_GTSAM_ISAM2 is OFF"): build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) # ---------------------------------------------------------------------- # AC-5: Factory rejects unknown strategy at config-load def test_ac5_unknown_strategy_rejected_at_config() -> None: # Direct construction surfaces ConfigError (validation in __post_init__). with pytest.raises(ConfigError, match="garbage"): C5StateConfig(strategy="garbage") def test_ac5_unknown_strategy_via_factory() -> None: # If the block escapes (e.g. test sets strategy = "nonexistent" via a # post-validation bypass), the factory still rejects with a clear # StateEstimatorConfigError naming the disabled flag. fake_block = mock.MagicMock(spec=C5StateConfig) fake_block.strategy = "nonexistent" fake_block.keyframe_window_size = 15 cfg = mock.MagicMock(spec=Config) cfg.components = {"c5_state": fake_block} with pytest.raises(StateEstimatorConfigError, match="not in"): build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) # ---------------------------------------------------------------------- # AC-6: Factory returns the tuple + INFO log def test_ac6_factory_returns_tuple_and_logs( caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch ) -> None: fake_estimator = _FakeEstimator() fake_handle = _fake_handle(fake_estimator) register_state_estimator( "gtsam_isam2", lambda **_: (fake_estimator, fake_handle), ) monkeypatch.delenv("BUILD_STATE_GTSAM_ISAM2", raising=False) cfg = _build_config(strategy="gtsam_isam2", keyframe_window_size=15) with caplog.at_level(logging.INFO, logger="runtime_root.state_factory"): result = build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) estimator, handle = result assert estimator is fake_estimator assert handle is fake_handle loaded_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c5.state.strategy_loaded" ] assert len(loaded_records) == 1 assert loaded_records[0].kv["strategy"] == "gtsam_isam2" assert loaded_records[0].kv["keyframe_window_size"] == 15 # ---------------------------------------------------------------------- # AC-7: Thread binding def test_ac7_second_thread_binding_rejected() -> None: main_ident = bind_state_ingest_thread() err: list[BaseException] = [] def run() -> None: try: bind_state_ingest_thread() except StateIngestThreadAlreadyBoundError as e: err.append(e) t = threading.Thread(target=run) t.start() t.join(timeout=2.0) assert len(err) == 1 assert main_ident != threading.get_ident() + 1 # placeholder no-op assertion def test_ac7_same_thread_rebinding_idempotent() -> None: first = bind_state_ingest_thread() second = bind_state_ingest_thread() assert first == second # ---------------------------------------------------------------------- # AC-8: ISam2GraphHandleImpl skeleton def test_ac8_handle_is_isam2_graph_handle() -> None: handle = ISam2GraphHandleImpl(estimator=mock.MagicMock()) assert isinstance(handle, ISam2GraphHandle) def test_handle_satisfies_c4_isam2_graph_handle_protocol() -> None: """Cross-component conformance: ``ISam2GraphHandleImpl`` MUST satisfy the C4-side Protocol stub (``c4_pose._isam2_handle.ISam2GraphHandle``) so the same instance can be passed to ``pose_factory.build_pose_estimator`` without an adapter. The c4 stub requires only ``get_pose_key(frame_id) -> int``; the c5 impl delegates to ``estimator.key_for_frame`` for that lookup. """ # Arrange from gps_denied_onboard.components.c4_pose._isam2_handle import ( ISam2GraphHandle as C4ISam2GraphHandle, ) estimator_mock = mock.MagicMock() estimator_mock.key_for_frame.return_value = 0x7800000000000007 handle = ISam2GraphHandleImpl(estimator=estimator_mock) # Act key = handle.get_pose_key(42) # Assert assert isinstance(handle, C4ISam2GraphHandle) estimator_mock.key_for_frame.assert_called_once_with(42) assert key == 0x7800000000000007 # Note: the AZ-381 skeleton's ``NotImplementedError`` bodies were # replaced with real GTSAM calls by AZ-382. The "methods raise" test # that lived here has moved to # ``tests/unit/c5_state/test_az382_isam2_smoother_wiring.py``, which # now asserts the real behaviour (``add_factor`` grows the graph, # ``update`` advances iSAM2 + the smoother, etc.). # ---------------------------------------------------------------------- # AC-9: Public API re-exports def test_ac9_public_api_resolves() -> None: # Re-importing here makes the test self-documenting: every symbol below # is part of the C5 public surface AC-9 requires. from gps_denied_onboard.components.c5_state import ( # noqa: F401 C5StateConfig, EstimatorHealth, EstimatorOutput, IsamState, PoseSourceLabel, StateEstimator, ) def test_ac9_internals_not_in_all() -> None: from gps_denied_onboard.components import c5_state # _isam2_handle is an internal module; it must not be re-exported. assert "ISam2GraphHandle" not in c5_state.__all__ assert "ISam2GraphHandleImpl" not in c5_state.__all__ # ---------------------------------------------------------------------- # AC-10: Error hierarchy catchability def test_ac10_every_error_is_state_estimator_error() -> None: for exc_cls in ( EstimatorDegradedError, EstimatorFatalError, StateEstimatorConfigError, ): with pytest.raises(StateEstimatorError): raise exc_cls("test") # ---------------------------------------------------------------------- # Config block validation def test_config_keyframe_window_must_be_in_range() -> None: with pytest.raises(ConfigError, match=r"\[10, 20\]"): C5StateConfig(keyframe_window_size=5) with pytest.raises(ConfigError, match=r"\[10, 20\]"): C5StateConfig(keyframe_window_size=25) # Boundaries OK. C5StateConfig(keyframe_window_size=10) C5StateConfig(keyframe_window_size=20) def test_config_spoof_min_stable_must_be_positive() -> None: with pytest.raises(ConfigError, match="spoof_promotion_min_stable_s"): C5StateConfig(spoof_promotion_min_stable_s=0.0) def test_config_no_estimate_fallback_must_be_positive() -> None: with pytest.raises(ConfigError, match="no_estimate_fallback_s"): C5StateConfig(no_estimate_fallback_s=-1.0) # ---------------------------------------------------------------------- # Performance budget def test_nfr_perf_build_under_50ms(monkeypatch: pytest.MonkeyPatch) -> None: import time as _t register_state_estimator( "gtsam_isam2", lambda **_: (_FakeEstimator(), _fake_handle(None)), ) monkeypatch.delenv("BUILD_STATE_GTSAM_ISAM2", raising=False) cfg = _build_config(strategy="gtsam_isam2") iters = 100 start = _t.perf_counter() for _ in range(iters): build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) avg_ms = (_t.perf_counter() - start) / iters * 1000.0 assert avg_ms < 50.0, f"avg build {avg_ms:.2f}ms exceeds 50ms p99 budget"