"""AZ-355 — C4 PoseEstimator Protocol + Factory + DTOs + Composition. Tests cover AC-1..AC-10 from ``_docs/02_tasks/todo/AZ-355_c4_pose_protocol.md``: - AC-1 Protocol conformance — ``runtime_checkable``. - AC-2 DTOs are frozen + slots. - AC-3 Enums have the documented values. - AC-4 ``CovarianceDegradedWarning`` IS-A ``Warning`` NOT ``Exception``. - AC-5 ``PnpFailureError`` IS-A ``Exception``. - AC-6 Factory rejects unknown strategy. - AC-7 Factory accepts ``"opencv_gtsam"`` and emits INFO log. - AC-8 Public API surface — ``__init__.py`` re-exports. - AC-9 Strategy bound to single ingest thread (same thread as C5). - AC-10 ``ISam2GraphHandle`` Protocol stub conforms to ``runtime_checkable``. """ from __future__ import annotations import dataclasses import logging import threading import warnings from dataclasses import FrozenInstanceError from typing import Any from unittest import mock from uuid import UUID, uuid4 import numpy as np import pytest from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.pose import ( CovarianceMode, PoseEstimate, PoseSourceLabel, Quat, ) from gps_denied_onboard.components.c4_pose import ( C4PoseConfig, CovarianceDegradedWarning, PnpFailureError, PoseEstimator, PoseEstimatorConfigError, PoseEstimatorError, ) from gps_denied_onboard.components.c4_pose._isam2_handle import ISam2GraphHandle from gps_denied_onboard.config import load_config from gps_denied_onboard.config.schema import Config from gps_denied_onboard.runtime_root.pose_factory import ( build_pose_estimator, clear_pose_registry, register_pose_estimator, ) from gps_denied_onboard.runtime_root.state_factory import ( StateIngestThreadAlreadyBoundError, bind_state_ingest_thread, clear_state_ingest_binding, ) @pytest.fixture(autouse=True) def _isolation(): clear_pose_registry() clear_state_ingest_binding() yield clear_pose_registry() clear_state_ingest_binding() def _build_config(**overrides: Any) -> Config: cfg = load_config(env={}, paths=(), require_env=False) new_block = dataclasses.replace(C4PoseConfig(), **overrides) components = dict(cfg.components or {}) components["c4_pose"] = new_block return dataclasses.replace(cfg, components=components) class _FakeISam2GraphHandle: """Minimal handle stub for factory / Protocol tests.""" def get_pose_key(self, frame_id: int) -> int: return int(frame_id) class _FakePoseEstimator: """Test double satisfying the full PoseEstimator Protocol.""" def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> PoseEstimate: return PoseEstimate( frame_id=uuid4(), position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0), orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), covariance_6x6=np.eye(6, dtype=np.float64), covariance_mode=CovarianceMode.MARGINALS, source_label=PoseSourceLabel.SATELLITE_ANCHORED, last_satellite_anchor_age_ms=0, emitted_at=0, ) def current_covariance_mode(self) -> CovarianceMode: return CovarianceMode.MARGINALS # --------------------------------------------------------------------- # AC-1: Protocol conformance — runtime_checkable def test_ac1_full_fake_satisfies_protocol() -> None: fake = _FakePoseEstimator() assert isinstance(fake, PoseEstimator) def test_ac1_partial_fake_fails_protocol() -> None: class _OnlyEstimate: def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> Any: return None # Missing current_covariance_mode → fails isinstance assert not isinstance(_OnlyEstimate(), PoseEstimator) # --------------------------------------------------------------------- # AC-2: DTOs are frozen + slots def test_ac2_lat_lon_alt_frozen_and_slotted() -> None: p = LatLonAlt(lat_deg=1.0, lon_deg=2.0, alt_m=3.0) assert LatLonAlt.__dataclass_params__.frozen is True assert hasattr(LatLonAlt, "__slots__") assert tuple(LatLonAlt.__slots__) == ("lat_deg", "lon_deg", "alt_m") with pytest.raises(FrozenInstanceError): p.lat_deg = 99.0 # type: ignore[misc] def test_ac2_quat_frozen_and_slotted() -> None: q = Quat(w=1.0, x=0.0, y=0.0, z=0.0) assert Quat.__dataclass_params__.frozen is True assert hasattr(Quat, "__slots__") assert tuple(Quat.__slots__) == ("w", "x", "y", "z") with pytest.raises(FrozenInstanceError): q.w = 0.0 # type: ignore[misc] def test_ac2_pose_estimate_frozen_and_slotted() -> None: pe = PoseEstimate( frame_id=uuid4(), position_wgs84=LatLonAlt(0.0, 0.0, 0.0), orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0), covariance_6x6=np.eye(6), covariance_mode=CovarianceMode.MARGINALS, source_label=PoseSourceLabel.SATELLITE_ANCHORED, last_satellite_anchor_age_ms=0, emitted_at=0, ) assert PoseEstimate.__dataclass_params__.frozen is True assert hasattr(PoseEstimate, "__slots__") assert len(PoseEstimate.__slots__) == 8 with pytest.raises(FrozenInstanceError): pe.last_satellite_anchor_age_ms = 1 # type: ignore[misc] # --------------------------------------------------------------------- # AC-3: Enums have the documented values def test_ac3_covariance_mode_has_exactly_marginals_and_jacobian() -> None: members = {m.name for m in CovarianceMode} assert members == {"MARGINALS", "JACOBIAN"} assert CovarianceMode.MARGINALS.value == "marginals" assert CovarianceMode.JACOBIAN.value == "jacobian" def test_ac3_pose_source_label_has_three_documented_values() -> None: members = {m.name for m in PoseSourceLabel} assert members == {"SATELLITE_ANCHORED", "VISUAL_PROPAGATED", "DEAD_RECKONED"} # --------------------------------------------------------------------- # AC-4: CovarianceDegradedWarning IS-A Warning NOT Exception def test_ac4_covariance_degraded_warning_subclasses_warning() -> None: # AC-4 intent: CovarianceDegradedWarning must be on the Warning branch # of the exception tree so the warnings machinery handles it. Python's # actual hierarchy has Warning < Exception < BaseException, so the # behavioural test (warnings.warn does not raise; except Exception does # not catch) below is what the contract really pins. assert issubclass(CovarianceDegradedWarning, Warning) def test_ac4_try_except_exception_does_not_catch_the_warning() -> None: caught_via_exception = False captured: list[Warning] = [] with warnings.catch_warnings(record=True) as records: warnings.simplefilter("always") try: warnings.warn("throttle engaged", CovarianceDegradedWarning, stacklevel=1) except Exception: caught_via_exception = True for r in records: if isinstance(r.message, CovarianceDegradedWarning): captured.append(r.message) # warnings.warn(...) does NOT raise — it emits through the filter chain. # The try/except Exception block above MUST NOT see the warning even # though Warning subclasses Exception in the class hierarchy. assert caught_via_exception is False assert len(captured) == 1 # --------------------------------------------------------------------- # AC-5: PnpFailureError IS-A Exception def test_ac5_pnp_failure_error_subclasses_pose_estimator_error_and_exception() -> None: assert issubclass(PnpFailureError, PoseEstimatorError) assert issubclass(PnpFailureError, Exception) def test_ac5_pnp_failure_error_caught_by_except_exception() -> None: caught = False try: raise PnpFailureError("RANSAC failed") except Exception: caught = True assert caught # --------------------------------------------------------------------- # AC-6: Factory rejects unknown strategy def test_ac6_factory_rejects_unknown_strategy( caplog: pytest.LogCaptureFixture, ) -> None: bad_block = C4PoseConfig.__new__(C4PoseConfig) object.__setattr__(bad_block, "strategy", "garbage") object.__setattr__(bad_block, "ransac_iterations", 200) object.__setattr__(bad_block, "ransac_reprojection_threshold_px", 4.0) object.__setattr__(bad_block, "thermal_throttle_threshold_celsius", 75.0) cfg = load_config(env={}, paths=(), require_env=False) components = dict(cfg.components or {}) components["c4_pose"] = bad_block bad_cfg = dataclasses.replace(cfg, components=components) with caplog.at_level(logging.ERROR): with pytest.raises(PoseEstimatorConfigError, match="garbage"): build_pose_estimator( bad_cfg, ransac_filter=mock.MagicMock(), wgs_converter=mock.MagicMock(), se3_utils=mock.MagicMock(), isam2_graph_handle=_FakeISam2GraphHandle(), ) error_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.unknown_strategy" ] assert len(error_records) == 1 def test_ac6_factory_rejects_non_conforming_graph_handle() -> None: cfg = _build_config() class _BrokenHandle: pass with pytest.raises(PoseEstimatorConfigError, match="ISam2GraphHandle"): build_pose_estimator( cfg, ransac_filter=mock.MagicMock(), wgs_converter=mock.MagicMock(), se3_utils=mock.MagicMock(), isam2_graph_handle=_BrokenHandle(), # type: ignore[arg-type] ) # --------------------------------------------------------------------- # AC-7: Factory accepts "opencv_gtsam" and emits INFO log def test_ac7_factory_accepts_opencv_gtsam_and_emits_info_log( caplog: pytest.LogCaptureFixture, ) -> None: cfg = _build_config() def _factory(**kwargs: Any) -> PoseEstimator: return _FakePoseEstimator() register_pose_estimator("opencv_gtsam", _factory) with caplog.at_level(logging.INFO): estimator = build_pose_estimator( cfg, ransac_filter=mock.MagicMock(), wgs_converter=mock.MagicMock(), se3_utils=mock.MagicMock(), isam2_graph_handle=_FakeISam2GraphHandle(), ) assert isinstance(estimator, PoseEstimator) info_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.strategy_loaded" ] assert len(info_records) == 1 record = info_records[0] assert record.kv["strategy"] == "opencv_gtsam" assert record.kv["ransac_iterations"] == 200 assert record.kv["ransac_reprojection_threshold_px"] == 4.0 assert record.kv["thermal_throttle_threshold_celsius"] == 75.0 # --------------------------------------------------------------------- # AC-8: Public API surface def test_ac8_public_api_re_exports() -> None: from gps_denied_onboard.components import c4_pose expected_public = { "C4PoseConfig", "CovarianceDegradedWarning", "CovarianceMode", "LatLonAlt", "PnpFailureError", "PoseEstimate", "PoseEstimator", "PoseEstimatorConfigError", "PoseEstimatorError", "PoseSourceLabel", "Quat", } assert expected_public.issubset(set(c4_pose.__all__)) def test_ac8_internals_not_in_public_all() -> None: from gps_denied_onboard.components import c4_pose assert "ISam2GraphHandle" not in c4_pose.__all__ assert "_isam2_handle" not in c4_pose.__all__ # --------------------------------------------------------------------- # AC-9: Strategy bound to single ingest thread (same thread as C5) def test_ac9_bind_state_ingest_thread_rejects_second_thread() -> None: primary = bind_state_ingest_thread() other_ident = primary + 1 with pytest.raises(StateIngestThreadAlreadyBoundError): bind_state_ingest_thread(other_ident) def test_ac9_bind_state_ingest_thread_idempotent_for_same_thread() -> None: primary = bind_state_ingest_thread() again = bind_state_ingest_thread() assert primary == again assert primary == threading.get_ident() # --------------------------------------------------------------------- # AC-10: ISam2GraphHandle Protocol stub conforms to runtime_checkable def test_ac10_isam2_graph_handle_runtime_checkable() -> None: handle = _FakeISam2GraphHandle() assert isinstance(handle, ISam2GraphHandle) def test_ac10_isam2_graph_handle_rejects_missing_method() -> None: class _NoMethod: pass assert not isinstance(_NoMethod(), ISam2GraphHandle) # --------------------------------------------------------------------- # Bonus: factory wires constructor dependencies through to the strategy def test_factory_passes_dependencies_to_strategy() -> None: captured: dict[str, Any] = {} def _factory(**kwargs: Any) -> PoseEstimator: captured.update(kwargs) return _FakePoseEstimator() register_pose_estimator("opencv_gtsam", _factory) handle = _FakeISam2GraphHandle() ransac = mock.MagicMock() wgs = mock.MagicMock() se3 = mock.MagicMock() cfg = _build_config() build_pose_estimator( cfg, ransac_filter=ransac, wgs_converter=wgs, se3_utils=se3, isam2_graph_handle=handle, ) assert captured["ransac_filter"] is ransac assert captured["wgs_converter"] is wgs assert captured["se3_utils"] is se3 assert captured["isam2_graph_handle"] is handle def test_factory_lazy_imports_when_registry_empty() -> None: cfg = _build_config() # Registry is cleared by the fixture; the lazy-import fallback # should attempt to import the concrete module. We have not # shipped opencv_gtsam_estimator yet (AZ-358), so the import # raises and gets wrapped in PoseEstimatorConfigError. with pytest.raises(PoseEstimatorConfigError): build_pose_estimator( cfg, ransac_filter=mock.MagicMock(), wgs_converter=mock.MagicMock(), se3_utils=mock.MagicMock(), isam2_graph_handle=_FakeISam2GraphHandle(), ) # --------------------------------------------------------------------- # Bonus: config validation def test_config_rejects_unknown_strategy_at_post_init() -> None: from gps_denied_onboard.config.schema import ConfigError with pytest.raises(ConfigError, match="garbage"): C4PoseConfig(strategy="garbage") def test_config_rejects_zero_ransac_iterations() -> None: from gps_denied_onboard.config.schema import ConfigError with pytest.raises(ConfigError, match="ransac_iterations"): C4PoseConfig(ransac_iterations=0) def test_pose_estimate_uuid_frame_id() -> None: pe = PoseEstimate( frame_id=UUID(int=42), position_wgs84=LatLonAlt(0.0, 0.0, 0.0), orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0), covariance_6x6=np.eye(6), covariance_mode=CovarianceMode.MARGINALS, source_label=PoseSourceLabel.SATELLITE_ANCHORED, last_satellite_anchor_age_ms=42, emitted_at=1_000_000_000, ) assert isinstance(pe.frame_id, UUID) assert pe.emitted_at == 1_000_000_000