Files
gps-denied-onboard/tests/unit/c4_pose/test_az355_pose_protocol.py
T
Oleksandr Bezdieniezhnykh db27e25630 [AZ-355] C4 PoseEstimator Protocol + factory + DTOs + composition
Land the foundational C4 surface AZ-358 (Marginals) and AZ-361
(Hybrid) build on top of:

- PoseEstimator Protocol (@runtime_checkable): estimate(...) +
  current_covariance_mode().
- Error hierarchy: PoseEstimatorError, PnpFailureError,
  PoseEstimatorConfigError; CovarianceDegradedWarning as a Warning
  subclass (warnings.warn path, not raised).
- ISam2GraphHandle Protocol stub (READ-ONLY view, get_pose_key only)
  decoupled from C5's concrete ISam2GraphHandleImpl.
- C4PoseConfig (frozen dataclass) + register on c4_pose import.
- runtime_root/pose_factory.build_pose_estimator with lazy-import
  fallback; INFO log c4.pose.strategy_loaded; shares ingest-thread
  binding with C5 per ADR-003.

DTO restructuring (cross-cutting): retire the legacy raw-4x4
PoseEstimate(int frame_id, datetime timestamp, pose_se3, ...) and
ship the contract shape PoseEstimate(UUID, LatLonAlt, Quat,
np.ndarray, CovarianceMode, PoseSourceLabel,
last_satellite_anchor_age_ms, emitted_at). C5 add_pose_anchor in
both gtsam_isam2 + eskf_baseline migrated in lockstep via
WGS84->ENU + Quat->R helpers; test fixtures updated. VIO output
stays on the raw shape until AZ-331 (C1 protocol) lands.

LatLonAlt upgraded to slots=True per AC-2. ThermalState stub added
to _types/thermal.py so the Protocol typechecks pre-AZ-302.

Tests: 25 new in tests/unit/c4_pose/test_az355_pose_protocol.py
covering AC-1..AC-10 + factory wiring + config validation; full
repo: 685 passed, 2 pre-existing CI-only skips.

Jira transition deferred: MCP "Not connected"; leftover entry in
_docs/_process_leftovers/2026-05-11_jira_transition_az355_deferred.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 10:32:14 +03:00

460 lines
15 KiB
Python

"""AZ-355 — C4 PoseEstimator Protocol + Factory + DTOs + Composition.
Tests cover AC-1..AC-10 from
``_docs/02_tasks/todo/AZ-355_c4_pose_protocol.md``:
- AC-1 Protocol conformance — ``runtime_checkable``.
- AC-2 DTOs are frozen + slots.
- AC-3 Enums have the documented values.
- AC-4 ``CovarianceDegradedWarning`` IS-A ``Warning`` NOT ``Exception``.
- AC-5 ``PnpFailureError`` IS-A ``Exception``.
- AC-6 Factory rejects unknown strategy.
- AC-7 Factory accepts ``"opencv_gtsam"`` and emits INFO log.
- AC-8 Public API surface — ``__init__.py`` re-exports.
- AC-9 Strategy bound to single ingest thread (same thread as C5).
- AC-10 ``ISam2GraphHandle`` Protocol stub conforms to
``runtime_checkable``.
"""
from __future__ import annotations
import dataclasses
import logging
import threading
import warnings
from dataclasses import FrozenInstanceError
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.pose import (
CovarianceMode,
PoseEstimate,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c4_pose import (
C4PoseConfig,
CovarianceDegradedWarning,
PnpFailureError,
PoseEstimator,
PoseEstimatorConfigError,
PoseEstimatorError,
)
from gps_denied_onboard.components.c4_pose._isam2_handle import ISam2GraphHandle
from gps_denied_onboard.config import load_config
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.runtime_root.pose_factory import (
build_pose_estimator,
clear_pose_registry,
register_pose_estimator,
)
from gps_denied_onboard.runtime_root.state_factory import (
StateIngestThreadAlreadyBoundError,
bind_state_ingest_thread,
clear_state_ingest_binding,
)
@pytest.fixture(autouse=True)
def _isolation():
clear_pose_registry()
clear_state_ingest_binding()
yield
clear_pose_registry()
clear_state_ingest_binding()
def _build_config(**overrides: Any) -> Config:
cfg = load_config(env={}, paths=(), require_env=False)
new_block = dataclasses.replace(C4PoseConfig(), **overrides)
components = dict(cfg.components or {})
components["c4_pose"] = new_block
return dataclasses.replace(cfg, components=components)
class _FakeISam2GraphHandle:
"""Minimal handle stub for factory / Protocol tests."""
def get_pose_key(self, frame_id: int) -> int:
return int(frame_id)
class _FakePoseEstimator:
"""Test double satisfying the full PoseEstimator Protocol."""
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> PoseEstimate:
return PoseEstimate(
frame_id=uuid4(),
position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
covariance_6x6=np.eye(6, dtype=np.float64),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=0,
)
def current_covariance_mode(self) -> CovarianceMode:
return CovarianceMode.MARGINALS
# ---------------------------------------------------------------------
# AC-1: Protocol conformance — runtime_checkable
def test_ac1_full_fake_satisfies_protocol() -> None:
fake = _FakePoseEstimator()
assert isinstance(fake, PoseEstimator)
def test_ac1_partial_fake_fails_protocol() -> None:
class _OnlyEstimate:
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> Any:
return None
# Missing current_covariance_mode → fails isinstance
assert not isinstance(_OnlyEstimate(), PoseEstimator)
# ---------------------------------------------------------------------
# AC-2: DTOs are frozen + slots
def test_ac2_lat_lon_alt_frozen_and_slotted() -> None:
p = LatLonAlt(lat_deg=1.0, lon_deg=2.0, alt_m=3.0)
assert LatLonAlt.__dataclass_params__.frozen is True
assert hasattr(LatLonAlt, "__slots__")
assert tuple(LatLonAlt.__slots__) == ("lat_deg", "lon_deg", "alt_m")
with pytest.raises(FrozenInstanceError):
p.lat_deg = 99.0 # type: ignore[misc]
def test_ac2_quat_frozen_and_slotted() -> None:
q = Quat(w=1.0, x=0.0, y=0.0, z=0.0)
assert Quat.__dataclass_params__.frozen is True
assert hasattr(Quat, "__slots__")
assert tuple(Quat.__slots__) == ("w", "x", "y", "z")
with pytest.raises(FrozenInstanceError):
q.w = 0.0 # type: ignore[misc]
def test_ac2_pose_estimate_frozen_and_slotted() -> None:
pe = PoseEstimate(
frame_id=uuid4(),
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
covariance_6x6=np.eye(6),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=0,
)
assert PoseEstimate.__dataclass_params__.frozen is True
assert hasattr(PoseEstimate, "__slots__")
assert len(PoseEstimate.__slots__) == 8
with pytest.raises(FrozenInstanceError):
pe.last_satellite_anchor_age_ms = 1 # type: ignore[misc]
# ---------------------------------------------------------------------
# AC-3: Enums have the documented values
def test_ac3_covariance_mode_has_exactly_marginals_and_jacobian() -> None:
members = {m.name for m in CovarianceMode}
assert members == {"MARGINALS", "JACOBIAN"}
assert CovarianceMode.MARGINALS.value == "marginals"
assert CovarianceMode.JACOBIAN.value == "jacobian"
def test_ac3_pose_source_label_has_three_documented_values() -> None:
members = {m.name for m in PoseSourceLabel}
assert members == {"SATELLITE_ANCHORED", "VISUAL_PROPAGATED", "DEAD_RECKONED"}
# ---------------------------------------------------------------------
# AC-4: CovarianceDegradedWarning IS-A Warning NOT Exception
def test_ac4_covariance_degraded_warning_subclasses_warning() -> None:
# AC-4 intent: CovarianceDegradedWarning must be on the Warning branch
# of the exception tree so the warnings machinery handles it. Python's
# actual hierarchy has Warning < Exception < BaseException, so the
# behavioural test (warnings.warn does not raise; except Exception does
# not catch) below is what the contract really pins.
assert issubclass(CovarianceDegradedWarning, Warning)
def test_ac4_try_except_exception_does_not_catch_the_warning() -> None:
caught_via_exception = False
captured: list[Warning] = []
with warnings.catch_warnings(record=True) as records:
warnings.simplefilter("always")
try:
warnings.warn("throttle engaged", CovarianceDegradedWarning, stacklevel=1)
except Exception:
caught_via_exception = True
for r in records:
if isinstance(r.message, CovarianceDegradedWarning):
captured.append(r.message)
# warnings.warn(...) does NOT raise — it emits through the filter chain.
# The try/except Exception block above MUST NOT see the warning even
# though Warning subclasses Exception in the class hierarchy.
assert caught_via_exception is False
assert len(captured) == 1
# ---------------------------------------------------------------------
# AC-5: PnpFailureError IS-A Exception
def test_ac5_pnp_failure_error_subclasses_pose_estimator_error_and_exception() -> None:
assert issubclass(PnpFailureError, PoseEstimatorError)
assert issubclass(PnpFailureError, Exception)
def test_ac5_pnp_failure_error_caught_by_except_exception() -> None:
caught = False
try:
raise PnpFailureError("RANSAC failed")
except Exception:
caught = True
assert caught
# ---------------------------------------------------------------------
# AC-6: Factory rejects unknown strategy
def test_ac6_factory_rejects_unknown_strategy(
caplog: pytest.LogCaptureFixture,
) -> None:
bad_block = C4PoseConfig.__new__(C4PoseConfig)
object.__setattr__(bad_block, "strategy", "garbage")
object.__setattr__(bad_block, "ransac_iterations", 200)
object.__setattr__(bad_block, "ransac_reprojection_threshold_px", 4.0)
object.__setattr__(bad_block, "thermal_throttle_threshold_celsius", 75.0)
cfg = load_config(env={}, paths=(), require_env=False)
components = dict(cfg.components or {})
components["c4_pose"] = bad_block
bad_cfg = dataclasses.replace(cfg, components=components)
with caplog.at_level(logging.ERROR):
with pytest.raises(PoseEstimatorConfigError, match="garbage"):
build_pose_estimator(
bad_cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
error_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.unknown_strategy"
]
assert len(error_records) == 1
def test_ac6_factory_rejects_non_conforming_graph_handle() -> None:
cfg = _build_config()
class _BrokenHandle:
pass
with pytest.raises(PoseEstimatorConfigError, match="ISam2GraphHandle"):
build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_BrokenHandle(), # type: ignore[arg-type]
)
# ---------------------------------------------------------------------
# AC-7: Factory accepts "opencv_gtsam" and emits INFO log
def test_ac7_factory_accepts_opencv_gtsam_and_emits_info_log(
caplog: pytest.LogCaptureFixture,
) -> None:
cfg = _build_config()
def _factory(**kwargs: Any) -> PoseEstimator:
return _FakePoseEstimator()
register_pose_estimator("opencv_gtsam", _factory)
with caplog.at_level(logging.INFO):
estimator = build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
assert isinstance(estimator, PoseEstimator)
info_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.strategy_loaded"
]
assert len(info_records) == 1
record = info_records[0]
assert record.kv["strategy"] == "opencv_gtsam"
assert record.kv["ransac_iterations"] == 200
assert record.kv["ransac_reprojection_threshold_px"] == 4.0
assert record.kv["thermal_throttle_threshold_celsius"] == 75.0
# ---------------------------------------------------------------------
# AC-8: Public API surface
def test_ac8_public_api_re_exports() -> None:
from gps_denied_onboard.components import c4_pose
expected_public = {
"C4PoseConfig",
"CovarianceDegradedWarning",
"CovarianceMode",
"LatLonAlt",
"PnpFailureError",
"PoseEstimate",
"PoseEstimator",
"PoseEstimatorConfigError",
"PoseEstimatorError",
"PoseSourceLabel",
"Quat",
}
assert expected_public.issubset(set(c4_pose.__all__))
def test_ac8_internals_not_in_public_all() -> None:
from gps_denied_onboard.components import c4_pose
assert "ISam2GraphHandle" not in c4_pose.__all__
assert "_isam2_handle" not in c4_pose.__all__
# ---------------------------------------------------------------------
# AC-9: Strategy bound to single ingest thread (same thread as C5)
def test_ac9_bind_state_ingest_thread_rejects_second_thread() -> None:
primary = bind_state_ingest_thread()
other_ident = primary + 1
with pytest.raises(StateIngestThreadAlreadyBoundError):
bind_state_ingest_thread(other_ident)
def test_ac9_bind_state_ingest_thread_idempotent_for_same_thread() -> None:
primary = bind_state_ingest_thread()
again = bind_state_ingest_thread()
assert primary == again
assert primary == threading.get_ident()
# ---------------------------------------------------------------------
# AC-10: ISam2GraphHandle Protocol stub conforms to runtime_checkable
def test_ac10_isam2_graph_handle_runtime_checkable() -> None:
handle = _FakeISam2GraphHandle()
assert isinstance(handle, ISam2GraphHandle)
def test_ac10_isam2_graph_handle_rejects_missing_method() -> None:
class _NoMethod:
pass
assert not isinstance(_NoMethod(), ISam2GraphHandle)
# ---------------------------------------------------------------------
# Bonus: factory wires constructor dependencies through to the strategy
def test_factory_passes_dependencies_to_strategy() -> None:
captured: dict[str, Any] = {}
def _factory(**kwargs: Any) -> PoseEstimator:
captured.update(kwargs)
return _FakePoseEstimator()
register_pose_estimator("opencv_gtsam", _factory)
handle = _FakeISam2GraphHandle()
ransac = mock.MagicMock()
wgs = mock.MagicMock()
se3 = mock.MagicMock()
cfg = _build_config()
build_pose_estimator(
cfg,
ransac_filter=ransac,
wgs_converter=wgs,
se3_utils=se3,
isam2_graph_handle=handle,
)
assert captured["ransac_filter"] is ransac
assert captured["wgs_converter"] is wgs
assert captured["se3_utils"] is se3
assert captured["isam2_graph_handle"] is handle
def test_factory_lazy_imports_when_registry_empty() -> None:
cfg = _build_config()
# Registry is cleared by the fixture; the lazy-import fallback
# should attempt to import the concrete module. We have not
# shipped opencv_gtsam_estimator yet (AZ-358), so the import
# raises and gets wrapped in PoseEstimatorConfigError.
with pytest.raises(PoseEstimatorConfigError):
build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
# ---------------------------------------------------------------------
# Bonus: config validation
def test_config_rejects_unknown_strategy_at_post_init() -> None:
from gps_denied_onboard.config.schema import ConfigError
with pytest.raises(ConfigError, match="garbage"):
C4PoseConfig(strategy="garbage")
def test_config_rejects_zero_ransac_iterations() -> None:
from gps_denied_onboard.config.schema import ConfigError
with pytest.raises(ConfigError, match="ransac_iterations"):
C4PoseConfig(ransac_iterations=0)
def test_pose_estimate_uuid_frame_id() -> None:
pe = PoseEstimate(
frame_id=UUID(int=42),
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
covariance_6x6=np.eye(6),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=42,
emitted_at=1_000_000_000,
)
assert isinstance(pe.frame_id, UUID)
assert pe.emitted_at == 1_000_000_000