mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 19:01:14 +00:00
db27e25630
Land the foundational C4 surface AZ-358 (Marginals) and AZ-361 (Hybrid) build on top of: - PoseEstimator Protocol (@runtime_checkable): estimate(...) + current_covariance_mode(). - Error hierarchy: PoseEstimatorError, PnpFailureError, PoseEstimatorConfigError; CovarianceDegradedWarning as a Warning subclass (warnings.warn path, not raised). - ISam2GraphHandle Protocol stub (READ-ONLY view, get_pose_key only) decoupled from C5's concrete ISam2GraphHandleImpl. - C4PoseConfig (frozen dataclass) + register on c4_pose import. - runtime_root/pose_factory.build_pose_estimator with lazy-import fallback; INFO log c4.pose.strategy_loaded; shares ingest-thread binding with C5 per ADR-003. DTO restructuring (cross-cutting): retire the legacy raw-4x4 PoseEstimate(int frame_id, datetime timestamp, pose_se3, ...) and ship the contract shape PoseEstimate(UUID, LatLonAlt, Quat, np.ndarray, CovarianceMode, PoseSourceLabel, last_satellite_anchor_age_ms, emitted_at). C5 add_pose_anchor in both gtsam_isam2 + eskf_baseline migrated in lockstep via WGS84->ENU + Quat->R helpers; test fixtures updated. VIO output stays on the raw shape until AZ-331 (C1 protocol) lands. LatLonAlt upgraded to slots=True per AC-2. ThermalState stub added to _types/thermal.py so the Protocol typechecks pre-AZ-302. Tests: 25 new in tests/unit/c4_pose/test_az355_pose_protocol.py covering AC-1..AC-10 + factory wiring + config validation; full repo: 685 passed, 2 pre-existing CI-only skips. Jira transition deferred: MCP "Not connected"; leftover entry in _docs/_process_leftovers/2026-05-11_jira_transition_az355_deferred.md. Co-authored-by: Cursor <cursoragent@cursor.com>
460 lines
15 KiB
Python
460 lines
15 KiB
Python
"""AZ-355 — C4 PoseEstimator Protocol + Factory + DTOs + Composition.
|
|
|
|
Tests cover AC-1..AC-10 from
|
|
``_docs/02_tasks/todo/AZ-355_c4_pose_protocol.md``:
|
|
|
|
- AC-1 Protocol conformance — ``runtime_checkable``.
|
|
- AC-2 DTOs are frozen + slots.
|
|
- AC-3 Enums have the documented values.
|
|
- AC-4 ``CovarianceDegradedWarning`` IS-A ``Warning`` NOT ``Exception``.
|
|
- AC-5 ``PnpFailureError`` IS-A ``Exception``.
|
|
- AC-6 Factory rejects unknown strategy.
|
|
- AC-7 Factory accepts ``"opencv_gtsam"`` and emits INFO log.
|
|
- AC-8 Public API surface — ``__init__.py`` re-exports.
|
|
- AC-9 Strategy bound to single ingest thread (same thread as C5).
|
|
- AC-10 ``ISam2GraphHandle`` Protocol stub conforms to
|
|
``runtime_checkable``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import logging
|
|
import threading
|
|
import warnings
|
|
from dataclasses import FrozenInstanceError
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import UUID, uuid4
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.pose import (
|
|
CovarianceMode,
|
|
PoseEstimate,
|
|
PoseSourceLabel,
|
|
Quat,
|
|
)
|
|
from gps_denied_onboard.components.c4_pose import (
|
|
C4PoseConfig,
|
|
CovarianceDegradedWarning,
|
|
PnpFailureError,
|
|
PoseEstimator,
|
|
PoseEstimatorConfigError,
|
|
PoseEstimatorError,
|
|
)
|
|
from gps_denied_onboard.components.c4_pose._isam2_handle import ISam2GraphHandle
|
|
from gps_denied_onboard.config import load_config
|
|
from gps_denied_onboard.config.schema import Config
|
|
from gps_denied_onboard.runtime_root.pose_factory import (
|
|
build_pose_estimator,
|
|
clear_pose_registry,
|
|
register_pose_estimator,
|
|
)
|
|
from gps_denied_onboard.runtime_root.state_factory import (
|
|
StateIngestThreadAlreadyBoundError,
|
|
bind_state_ingest_thread,
|
|
clear_state_ingest_binding,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolation():
|
|
clear_pose_registry()
|
|
clear_state_ingest_binding()
|
|
yield
|
|
clear_pose_registry()
|
|
clear_state_ingest_binding()
|
|
|
|
|
|
def _build_config(**overrides: Any) -> Config:
|
|
cfg = load_config(env={}, paths=(), require_env=False)
|
|
new_block = dataclasses.replace(C4PoseConfig(), **overrides)
|
|
components = dict(cfg.components or {})
|
|
components["c4_pose"] = new_block
|
|
return dataclasses.replace(cfg, components=components)
|
|
|
|
|
|
class _FakeISam2GraphHandle:
|
|
"""Minimal handle stub for factory / Protocol tests."""
|
|
|
|
def get_pose_key(self, frame_id: int) -> int:
|
|
return int(frame_id)
|
|
|
|
|
|
class _FakePoseEstimator:
|
|
"""Test double satisfying the full PoseEstimator Protocol."""
|
|
|
|
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> PoseEstimate:
|
|
return PoseEstimate(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0),
|
|
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
|
|
covariance_6x6=np.eye(6, dtype=np.float64),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=0,
|
|
emitted_at=0,
|
|
)
|
|
|
|
def current_covariance_mode(self) -> CovarianceMode:
|
|
return CovarianceMode.MARGINALS
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-1: Protocol conformance — runtime_checkable
|
|
|
|
|
|
def test_ac1_full_fake_satisfies_protocol() -> None:
|
|
fake = _FakePoseEstimator()
|
|
assert isinstance(fake, PoseEstimator)
|
|
|
|
|
|
def test_ac1_partial_fake_fails_protocol() -> None:
|
|
class _OnlyEstimate:
|
|
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> Any:
|
|
return None
|
|
|
|
# Missing current_covariance_mode → fails isinstance
|
|
|
|
assert not isinstance(_OnlyEstimate(), PoseEstimator)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-2: DTOs are frozen + slots
|
|
|
|
|
|
def test_ac2_lat_lon_alt_frozen_and_slotted() -> None:
|
|
p = LatLonAlt(lat_deg=1.0, lon_deg=2.0, alt_m=3.0)
|
|
assert LatLonAlt.__dataclass_params__.frozen is True
|
|
assert hasattr(LatLonAlt, "__slots__")
|
|
assert tuple(LatLonAlt.__slots__) == ("lat_deg", "lon_deg", "alt_m")
|
|
with pytest.raises(FrozenInstanceError):
|
|
p.lat_deg = 99.0 # type: ignore[misc]
|
|
|
|
|
|
def test_ac2_quat_frozen_and_slotted() -> None:
|
|
q = Quat(w=1.0, x=0.0, y=0.0, z=0.0)
|
|
assert Quat.__dataclass_params__.frozen is True
|
|
assert hasattr(Quat, "__slots__")
|
|
assert tuple(Quat.__slots__) == ("w", "x", "y", "z")
|
|
with pytest.raises(FrozenInstanceError):
|
|
q.w = 0.0 # type: ignore[misc]
|
|
|
|
|
|
def test_ac2_pose_estimate_frozen_and_slotted() -> None:
|
|
pe = PoseEstimate(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
|
|
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
|
|
covariance_6x6=np.eye(6),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=0,
|
|
emitted_at=0,
|
|
)
|
|
assert PoseEstimate.__dataclass_params__.frozen is True
|
|
assert hasattr(PoseEstimate, "__slots__")
|
|
assert len(PoseEstimate.__slots__) == 8
|
|
with pytest.raises(FrozenInstanceError):
|
|
pe.last_satellite_anchor_age_ms = 1 # type: ignore[misc]
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-3: Enums have the documented values
|
|
|
|
|
|
def test_ac3_covariance_mode_has_exactly_marginals_and_jacobian() -> None:
|
|
members = {m.name for m in CovarianceMode}
|
|
assert members == {"MARGINALS", "JACOBIAN"}
|
|
assert CovarianceMode.MARGINALS.value == "marginals"
|
|
assert CovarianceMode.JACOBIAN.value == "jacobian"
|
|
|
|
|
|
def test_ac3_pose_source_label_has_three_documented_values() -> None:
|
|
members = {m.name for m in PoseSourceLabel}
|
|
assert members == {"SATELLITE_ANCHORED", "VISUAL_PROPAGATED", "DEAD_RECKONED"}
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-4: CovarianceDegradedWarning IS-A Warning NOT Exception
|
|
|
|
|
|
def test_ac4_covariance_degraded_warning_subclasses_warning() -> None:
|
|
# AC-4 intent: CovarianceDegradedWarning must be on the Warning branch
|
|
# of the exception tree so the warnings machinery handles it. Python's
|
|
# actual hierarchy has Warning < Exception < BaseException, so the
|
|
# behavioural test (warnings.warn does not raise; except Exception does
|
|
# not catch) below is what the contract really pins.
|
|
assert issubclass(CovarianceDegradedWarning, Warning)
|
|
|
|
|
|
def test_ac4_try_except_exception_does_not_catch_the_warning() -> None:
|
|
caught_via_exception = False
|
|
captured: list[Warning] = []
|
|
with warnings.catch_warnings(record=True) as records:
|
|
warnings.simplefilter("always")
|
|
try:
|
|
warnings.warn("throttle engaged", CovarianceDegradedWarning, stacklevel=1)
|
|
except Exception:
|
|
caught_via_exception = True
|
|
for r in records:
|
|
if isinstance(r.message, CovarianceDegradedWarning):
|
|
captured.append(r.message)
|
|
# warnings.warn(...) does NOT raise — it emits through the filter chain.
|
|
# The try/except Exception block above MUST NOT see the warning even
|
|
# though Warning subclasses Exception in the class hierarchy.
|
|
assert caught_via_exception is False
|
|
assert len(captured) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-5: PnpFailureError IS-A Exception
|
|
|
|
|
|
def test_ac5_pnp_failure_error_subclasses_pose_estimator_error_and_exception() -> None:
|
|
assert issubclass(PnpFailureError, PoseEstimatorError)
|
|
assert issubclass(PnpFailureError, Exception)
|
|
|
|
|
|
def test_ac5_pnp_failure_error_caught_by_except_exception() -> None:
|
|
caught = False
|
|
try:
|
|
raise PnpFailureError("RANSAC failed")
|
|
except Exception:
|
|
caught = True
|
|
assert caught
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-6: Factory rejects unknown strategy
|
|
|
|
|
|
def test_ac6_factory_rejects_unknown_strategy(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
bad_block = C4PoseConfig.__new__(C4PoseConfig)
|
|
object.__setattr__(bad_block, "strategy", "garbage")
|
|
object.__setattr__(bad_block, "ransac_iterations", 200)
|
|
object.__setattr__(bad_block, "ransac_reprojection_threshold_px", 4.0)
|
|
object.__setattr__(bad_block, "thermal_throttle_threshold_celsius", 75.0)
|
|
|
|
cfg = load_config(env={}, paths=(), require_env=False)
|
|
components = dict(cfg.components or {})
|
|
components["c4_pose"] = bad_block
|
|
bad_cfg = dataclasses.replace(cfg, components=components)
|
|
|
|
with caplog.at_level(logging.ERROR):
|
|
with pytest.raises(PoseEstimatorConfigError, match="garbage"):
|
|
build_pose_estimator(
|
|
bad_cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
error_records = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.unknown_strategy"
|
|
]
|
|
assert len(error_records) == 1
|
|
|
|
|
|
def test_ac6_factory_rejects_non_conforming_graph_handle() -> None:
|
|
cfg = _build_config()
|
|
|
|
class _BrokenHandle:
|
|
pass
|
|
|
|
with pytest.raises(PoseEstimatorConfigError, match="ISam2GraphHandle"):
|
|
build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_BrokenHandle(), # type: ignore[arg-type]
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-7: Factory accepts "opencv_gtsam" and emits INFO log
|
|
|
|
|
|
def test_ac7_factory_accepts_opencv_gtsam_and_emits_info_log(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
cfg = _build_config()
|
|
|
|
def _factory(**kwargs: Any) -> PoseEstimator:
|
|
return _FakePoseEstimator()
|
|
|
|
register_pose_estimator("opencv_gtsam", _factory)
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
estimator = build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
assert isinstance(estimator, PoseEstimator)
|
|
info_records = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.strategy_loaded"
|
|
]
|
|
assert len(info_records) == 1
|
|
record = info_records[0]
|
|
assert record.kv["strategy"] == "opencv_gtsam"
|
|
assert record.kv["ransac_iterations"] == 200
|
|
assert record.kv["ransac_reprojection_threshold_px"] == 4.0
|
|
assert record.kv["thermal_throttle_threshold_celsius"] == 75.0
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-8: Public API surface
|
|
|
|
|
|
def test_ac8_public_api_re_exports() -> None:
|
|
from gps_denied_onboard.components import c4_pose
|
|
|
|
expected_public = {
|
|
"C4PoseConfig",
|
|
"CovarianceDegradedWarning",
|
|
"CovarianceMode",
|
|
"LatLonAlt",
|
|
"PnpFailureError",
|
|
"PoseEstimate",
|
|
"PoseEstimator",
|
|
"PoseEstimatorConfigError",
|
|
"PoseEstimatorError",
|
|
"PoseSourceLabel",
|
|
"Quat",
|
|
}
|
|
assert expected_public.issubset(set(c4_pose.__all__))
|
|
|
|
|
|
def test_ac8_internals_not_in_public_all() -> None:
|
|
from gps_denied_onboard.components import c4_pose
|
|
|
|
assert "ISam2GraphHandle" not in c4_pose.__all__
|
|
assert "_isam2_handle" not in c4_pose.__all__
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-9: Strategy bound to single ingest thread (same thread as C5)
|
|
|
|
|
|
def test_ac9_bind_state_ingest_thread_rejects_second_thread() -> None:
|
|
primary = bind_state_ingest_thread()
|
|
other_ident = primary + 1
|
|
|
|
with pytest.raises(StateIngestThreadAlreadyBoundError):
|
|
bind_state_ingest_thread(other_ident)
|
|
|
|
|
|
def test_ac9_bind_state_ingest_thread_idempotent_for_same_thread() -> None:
|
|
primary = bind_state_ingest_thread()
|
|
again = bind_state_ingest_thread()
|
|
assert primary == again
|
|
assert primary == threading.get_ident()
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-10: ISam2GraphHandle Protocol stub conforms to runtime_checkable
|
|
|
|
|
|
def test_ac10_isam2_graph_handle_runtime_checkable() -> None:
|
|
handle = _FakeISam2GraphHandle()
|
|
assert isinstance(handle, ISam2GraphHandle)
|
|
|
|
|
|
def test_ac10_isam2_graph_handle_rejects_missing_method() -> None:
|
|
class _NoMethod:
|
|
pass
|
|
|
|
assert not isinstance(_NoMethod(), ISam2GraphHandle)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Bonus: factory wires constructor dependencies through to the strategy
|
|
|
|
|
|
def test_factory_passes_dependencies_to_strategy() -> None:
|
|
captured: dict[str, Any] = {}
|
|
|
|
def _factory(**kwargs: Any) -> PoseEstimator:
|
|
captured.update(kwargs)
|
|
return _FakePoseEstimator()
|
|
|
|
register_pose_estimator("opencv_gtsam", _factory)
|
|
handle = _FakeISam2GraphHandle()
|
|
ransac = mock.MagicMock()
|
|
wgs = mock.MagicMock()
|
|
se3 = mock.MagicMock()
|
|
cfg = _build_config()
|
|
|
|
build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=ransac,
|
|
wgs_converter=wgs,
|
|
se3_utils=se3,
|
|
isam2_graph_handle=handle,
|
|
)
|
|
|
|
assert captured["ransac_filter"] is ransac
|
|
assert captured["wgs_converter"] is wgs
|
|
assert captured["se3_utils"] is se3
|
|
assert captured["isam2_graph_handle"] is handle
|
|
|
|
|
|
def test_factory_lazy_imports_when_registry_empty() -> None:
|
|
cfg = _build_config()
|
|
# Registry is cleared by the fixture; the lazy-import fallback
|
|
# should attempt to import the concrete module. We have not
|
|
# shipped opencv_gtsam_estimator yet (AZ-358), so the import
|
|
# raises and gets wrapped in PoseEstimatorConfigError.
|
|
with pytest.raises(PoseEstimatorConfigError):
|
|
build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Bonus: config validation
|
|
|
|
|
|
def test_config_rejects_unknown_strategy_at_post_init() -> None:
|
|
from gps_denied_onboard.config.schema import ConfigError
|
|
|
|
with pytest.raises(ConfigError, match="garbage"):
|
|
C4PoseConfig(strategy="garbage")
|
|
|
|
|
|
def test_config_rejects_zero_ransac_iterations() -> None:
|
|
from gps_denied_onboard.config.schema import ConfigError
|
|
|
|
with pytest.raises(ConfigError, match="ransac_iterations"):
|
|
C4PoseConfig(ransac_iterations=0)
|
|
|
|
|
|
def test_pose_estimate_uuid_frame_id() -> None:
|
|
pe = PoseEstimate(
|
|
frame_id=UUID(int=42),
|
|
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
|
|
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
|
|
covariance_6x6=np.eye(6),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=42,
|
|
emitted_at=1_000_000_000,
|
|
)
|
|
assert isinstance(pe.frame_id, UUID)
|
|
assert pe.emitted_at == 1_000_000_000
|