mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 18:01:14 +00:00
4eac24f37a
Implement the single production-default C4 PoseEstimator strategy. AZ-358 — Marginals path: OpenCV solvePnPRansac (SOLVEPNP_IPPE) on best-candidate inliers, PriorFactorPose3 with Jacobian-derived initial covariance, flushed into C5's iSAM2 graph via the widened ISam2GraphHandle.update(graph, values, None) (Option B). Posterior covariance from compute_marginals().marginalCovariance(pose_key) with SPD-defensive Cholesky check. Tile pixel -> ENU world conversion via the shared WgsConverter + a configurable tile_size_px. Two spec deviations now documented in the AZ-358 task file: PriorFactorPose3 over GenericProjectionFactorCal3DS2 (avoids unbounded landmark variables; same Fisher information on the pose marginal) and explicit (graph, values, timestamps) update args (aligns with C5's impl). AZ-361 — Jacobian + thermal hybrid: per-frame dispatch on thermal_state.thermal_throttle_active selects the cv2.projectPoints- derived 6x6 information matrix (with ridge regularisation) as the emitted covariance. Skips the iSAM2 factor add under throttle (Invariant 12). Emits CovarianceDegradedWarning via warnings.warn (never raised); paired WARN log + FDR record rate-limited per covariance_degraded_warn_window_ns (default 60 s) via an injected monotonic Clock. Supersedes the AZ-358 NotImplementedError stub. Widens ISam2GraphHandle from get_pose_key only to all five C4-facing methods (add_factor, update, compute_marginals, last_anchor_age_ms); C5's existing ISam2GraphHandleImpl already satisfies the superset, so no C5 source change this batch. Threads fdr_client + clock through pose_factory composition. Registers two new FDR payload kinds: pose.frame_done (per-call telemetry; both success and PnpFailureError paths) and pose.covariance_degraded (per-window throttle exposure). Tests: 21 new (AZ-358 AC-1..11 + AZ-361 AC-1..10/12/13; AZ-361 AC-11 RMSE-ratio informational per spec, not asserted). Updates 2 existing test files for Protocol widening and the FDR-schema round trip. Code review verdict: PASS_WITH_WARNINGS (5 findings: Medium x2, Low x3; none blocking). Full suite: 1958 passed, 1 unrelated host-dependent perf failure (c12 CLI cold-start, pre-existing). Co-authored-by: Cursor <cursoragent@cursor.com>
493 lines
16 KiB
Python
493 lines
16 KiB
Python
"""AZ-355 — C4 PoseEstimator Protocol + Factory + DTOs + Composition.
|
|
|
|
Tests cover AC-1..AC-10 from
|
|
``_docs/02_tasks/todo/AZ-355_c4_pose_protocol.md``:
|
|
|
|
- AC-1 Protocol conformance — ``runtime_checkable``.
|
|
- AC-2 DTOs are frozen + slots.
|
|
- AC-3 Enums have the documented values.
|
|
- AC-4 ``CovarianceDegradedWarning`` IS-A ``Warning`` NOT ``Exception``.
|
|
- AC-5 ``PnpFailureError`` IS-A ``Exception``.
|
|
- AC-6 Factory rejects unknown strategy.
|
|
- AC-7 Factory accepts ``"opencv_gtsam"`` and emits INFO log.
|
|
- AC-8 Public API surface — ``__init__.py`` re-exports.
|
|
- AC-9 Strategy bound to single ingest thread (same thread as C5).
|
|
- AC-10 ``ISam2GraphHandle`` Protocol stub conforms to
|
|
``runtime_checkable``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import logging
|
|
import threading
|
|
import warnings
|
|
from dataclasses import FrozenInstanceError
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import UUID, uuid4
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.pose import (
|
|
CovarianceMode,
|
|
PoseEstimate,
|
|
PoseSourceLabel,
|
|
Quat,
|
|
)
|
|
from gps_denied_onboard.components.c4_pose import (
|
|
C4PoseConfig,
|
|
CovarianceDegradedWarning,
|
|
PnpFailureError,
|
|
PoseEstimator,
|
|
PoseEstimatorConfigError,
|
|
PoseEstimatorError,
|
|
)
|
|
from gps_denied_onboard.components.c4_pose._isam2_handle import ISam2GraphHandle
|
|
from gps_denied_onboard.config import load_config
|
|
from gps_denied_onboard.config.schema import Config
|
|
from gps_denied_onboard.runtime_root.pose_factory import (
|
|
build_pose_estimator,
|
|
clear_pose_registry,
|
|
register_pose_estimator,
|
|
)
|
|
from gps_denied_onboard.runtime_root.state_factory import (
|
|
StateIngestThreadAlreadyBoundError,
|
|
bind_state_ingest_thread,
|
|
clear_state_ingest_binding,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolation():
|
|
clear_pose_registry()
|
|
clear_state_ingest_binding()
|
|
yield
|
|
clear_pose_registry()
|
|
clear_state_ingest_binding()
|
|
|
|
|
|
def _build_config(**overrides: Any) -> Config:
|
|
cfg = load_config(env={}, paths=(), require_env=False)
|
|
new_block = dataclasses.replace(C4PoseConfig(), **overrides)
|
|
components = dict(cfg.components or {})
|
|
components["c4_pose"] = new_block
|
|
return dataclasses.replace(cfg, components=components)
|
|
|
|
|
|
class _FakeISam2GraphHandle:
|
|
"""Minimal handle stub for factory / Protocol tests.
|
|
|
|
Implements the AZ-358-extended 5-method surface — the AZ-355
|
|
AC-10 ``isinstance(handle, ISam2GraphHandle)`` runtime-checkable
|
|
test now expects all five methods.
|
|
"""
|
|
|
|
def get_pose_key(self, frame_id: int) -> int:
|
|
return int(frame_id)
|
|
|
|
def add_factor(self, factor: Any) -> None:
|
|
return None
|
|
|
|
def update(self, graph: Any, values: Any, timestamps: Any | None = None) -> None:
|
|
return None
|
|
|
|
def compute_marginals(self) -> Any:
|
|
return None
|
|
|
|
def last_anchor_age_ms(self) -> int:
|
|
return 0
|
|
|
|
|
|
class _FakePoseEstimator:
|
|
"""Test double satisfying the full PoseEstimator Protocol."""
|
|
|
|
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> PoseEstimate:
|
|
return PoseEstimate(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0),
|
|
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
|
|
covariance_6x6=np.eye(6, dtype=np.float64),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=0,
|
|
emitted_at=0,
|
|
)
|
|
|
|
def current_covariance_mode(self) -> CovarianceMode:
|
|
return CovarianceMode.MARGINALS
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-1: Protocol conformance — runtime_checkable
|
|
|
|
|
|
def test_ac1_full_fake_satisfies_protocol() -> None:
|
|
fake = _FakePoseEstimator()
|
|
assert isinstance(fake, PoseEstimator)
|
|
|
|
|
|
def test_ac1_partial_fake_fails_protocol() -> None:
|
|
class _OnlyEstimate:
|
|
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> Any:
|
|
return None
|
|
|
|
# Missing current_covariance_mode → fails isinstance
|
|
|
|
assert not isinstance(_OnlyEstimate(), PoseEstimator)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-2: DTOs are frozen + slots
|
|
|
|
|
|
def test_ac2_lat_lon_alt_frozen_and_slotted() -> None:
|
|
p = LatLonAlt(lat_deg=1.0, lon_deg=2.0, alt_m=3.0)
|
|
assert LatLonAlt.__dataclass_params__.frozen is True
|
|
assert hasattr(LatLonAlt, "__slots__")
|
|
assert tuple(LatLonAlt.__slots__) == ("lat_deg", "lon_deg", "alt_m")
|
|
with pytest.raises(FrozenInstanceError):
|
|
p.lat_deg = 99.0 # type: ignore[misc]
|
|
|
|
|
|
def test_ac2_quat_frozen_and_slotted() -> None:
|
|
q = Quat(w=1.0, x=0.0, y=0.0, z=0.0)
|
|
assert Quat.__dataclass_params__.frozen is True
|
|
assert hasattr(Quat, "__slots__")
|
|
assert tuple(Quat.__slots__) == ("w", "x", "y", "z")
|
|
with pytest.raises(FrozenInstanceError):
|
|
q.w = 0.0 # type: ignore[misc]
|
|
|
|
|
|
def test_ac2_pose_estimate_frozen_and_slotted() -> None:
|
|
pe = PoseEstimate(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
|
|
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
|
|
covariance_6x6=np.eye(6),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=0,
|
|
emitted_at=0,
|
|
)
|
|
assert PoseEstimate.__dataclass_params__.frozen is True
|
|
assert hasattr(PoseEstimate, "__slots__")
|
|
assert len(PoseEstimate.__slots__) == 8
|
|
with pytest.raises(FrozenInstanceError):
|
|
pe.last_satellite_anchor_age_ms = 1 # type: ignore[misc]
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-3: Enums have the documented values
|
|
|
|
|
|
def test_ac3_covariance_mode_has_exactly_marginals_and_jacobian() -> None:
|
|
members = {m.name for m in CovarianceMode}
|
|
assert members == {"MARGINALS", "JACOBIAN"}
|
|
assert CovarianceMode.MARGINALS.value == "marginals"
|
|
assert CovarianceMode.JACOBIAN.value == "jacobian"
|
|
|
|
|
|
def test_ac3_pose_source_label_has_three_documented_values() -> None:
|
|
members = {m.name for m in PoseSourceLabel}
|
|
assert members == {"SATELLITE_ANCHORED", "VISUAL_PROPAGATED", "DEAD_RECKONED"}
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-4: CovarianceDegradedWarning IS-A Warning NOT Exception
|
|
|
|
|
|
def test_ac4_covariance_degraded_warning_subclasses_warning() -> None:
|
|
# AC-4 intent: CovarianceDegradedWarning must be on the Warning branch
|
|
# of the exception tree so the warnings machinery handles it. Python's
|
|
# actual hierarchy has Warning < Exception < BaseException, so the
|
|
# behavioural test (warnings.warn does not raise; except Exception does
|
|
# not catch) below is what the contract really pins.
|
|
assert issubclass(CovarianceDegradedWarning, Warning)
|
|
|
|
|
|
def test_ac4_try_except_exception_does_not_catch_the_warning() -> None:
|
|
caught_via_exception = False
|
|
captured: list[Warning] = []
|
|
with warnings.catch_warnings(record=True) as records:
|
|
warnings.simplefilter("always")
|
|
try:
|
|
warnings.warn("throttle engaged", CovarianceDegradedWarning, stacklevel=1)
|
|
except Exception:
|
|
caught_via_exception = True
|
|
for r in records:
|
|
if isinstance(r.message, CovarianceDegradedWarning):
|
|
captured.append(r.message)
|
|
# warnings.warn(...) does NOT raise — it emits through the filter chain.
|
|
# The try/except Exception block above MUST NOT see the warning even
|
|
# though Warning subclasses Exception in the class hierarchy.
|
|
assert caught_via_exception is False
|
|
assert len(captured) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-5: PnpFailureError IS-A Exception
|
|
|
|
|
|
def test_ac5_pnp_failure_error_subclasses_pose_estimator_error_and_exception() -> None:
|
|
assert issubclass(PnpFailureError, PoseEstimatorError)
|
|
assert issubclass(PnpFailureError, Exception)
|
|
|
|
|
|
def test_ac5_pnp_failure_error_caught_by_except_exception() -> None:
|
|
caught = False
|
|
try:
|
|
raise PnpFailureError("RANSAC failed")
|
|
except Exception:
|
|
caught = True
|
|
assert caught
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-6: Factory rejects unknown strategy
|
|
|
|
|
|
def test_ac6_factory_rejects_unknown_strategy(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
bad_block = C4PoseConfig.__new__(C4PoseConfig)
|
|
object.__setattr__(bad_block, "strategy", "garbage")
|
|
object.__setattr__(bad_block, "ransac_iterations", 200)
|
|
object.__setattr__(bad_block, "ransac_reprojection_threshold_px", 4.0)
|
|
object.__setattr__(bad_block, "thermal_throttle_threshold_celsius", 75.0)
|
|
|
|
cfg = load_config(env={}, paths=(), require_env=False)
|
|
components = dict(cfg.components or {})
|
|
components["c4_pose"] = bad_block
|
|
bad_cfg = dataclasses.replace(cfg, components=components)
|
|
|
|
with caplog.at_level(logging.ERROR):
|
|
with pytest.raises(PoseEstimatorConfigError, match="garbage"):
|
|
build_pose_estimator(
|
|
bad_cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
error_records = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.unknown_strategy"
|
|
]
|
|
assert len(error_records) == 1
|
|
|
|
|
|
def test_ac6_factory_rejects_non_conforming_graph_handle() -> None:
|
|
cfg = _build_config()
|
|
|
|
class _BrokenHandle:
|
|
pass
|
|
|
|
with pytest.raises(PoseEstimatorConfigError, match="ISam2GraphHandle"):
|
|
build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_BrokenHandle(), # type: ignore[arg-type]
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-7: Factory accepts "opencv_gtsam" and emits INFO log
|
|
|
|
|
|
def test_ac7_factory_accepts_opencv_gtsam_and_emits_info_log(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
cfg = _build_config()
|
|
|
|
def _factory(**kwargs: Any) -> PoseEstimator:
|
|
return _FakePoseEstimator()
|
|
|
|
register_pose_estimator("opencv_gtsam", _factory)
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
estimator = build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
assert isinstance(estimator, PoseEstimator)
|
|
info_records = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.strategy_loaded"
|
|
]
|
|
assert len(info_records) == 1
|
|
record = info_records[0]
|
|
assert record.kv["strategy"] == "opencv_gtsam"
|
|
assert record.kv["ransac_iterations"] == 200
|
|
assert record.kv["ransac_reprojection_threshold_px"] == 4.0
|
|
assert record.kv["thermal_throttle_threshold_celsius"] == 75.0
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-8: Public API surface
|
|
|
|
|
|
def test_ac8_public_api_re_exports() -> None:
|
|
from gps_denied_onboard.components import c4_pose
|
|
|
|
expected_public = {
|
|
"C4PoseConfig",
|
|
"CovarianceDegradedWarning",
|
|
"CovarianceMode",
|
|
"LatLonAlt",
|
|
"PnpFailureError",
|
|
"PoseEstimate",
|
|
"PoseEstimator",
|
|
"PoseEstimatorConfigError",
|
|
"PoseEstimatorError",
|
|
"PoseSourceLabel",
|
|
"Quat",
|
|
}
|
|
assert expected_public.issubset(set(c4_pose.__all__))
|
|
|
|
|
|
def test_ac8_internals_not_in_public_all() -> None:
|
|
from gps_denied_onboard.components import c4_pose
|
|
|
|
assert "ISam2GraphHandle" not in c4_pose.__all__
|
|
assert "_isam2_handle" not in c4_pose.__all__
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-9: Strategy bound to single ingest thread (same thread as C5)
|
|
|
|
|
|
def test_ac9_bind_state_ingest_thread_rejects_second_thread() -> None:
|
|
primary = bind_state_ingest_thread()
|
|
other_ident = primary + 1
|
|
|
|
with pytest.raises(StateIngestThreadAlreadyBoundError):
|
|
bind_state_ingest_thread(other_ident)
|
|
|
|
|
|
def test_ac9_bind_state_ingest_thread_idempotent_for_same_thread() -> None:
|
|
primary = bind_state_ingest_thread()
|
|
again = bind_state_ingest_thread()
|
|
assert primary == again
|
|
assert primary == threading.get_ident()
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# AC-10: ISam2GraphHandle Protocol stub conforms to runtime_checkable
|
|
|
|
|
|
def test_ac10_isam2_graph_handle_runtime_checkable() -> None:
|
|
handle = _FakeISam2GraphHandle()
|
|
assert isinstance(handle, ISam2GraphHandle)
|
|
|
|
|
|
def test_ac10_isam2_graph_handle_rejects_missing_method() -> None:
|
|
class _NoMethod:
|
|
pass
|
|
|
|
assert not isinstance(_NoMethod(), ISam2GraphHandle)
|
|
|
|
|
|
def test_ac10_isam2_graph_handle_rejects_partial_surface() -> None:
|
|
"""AZ-358 widened the Protocol to 5 methods; a handle that only
|
|
implements the original ``get_pose_key`` no longer satisfies
|
|
runtime_checkable conformance."""
|
|
|
|
class _OnlyGetPoseKey:
|
|
def get_pose_key(self, frame_id: int) -> int:
|
|
return int(frame_id)
|
|
|
|
assert not isinstance(_OnlyGetPoseKey(), ISam2GraphHandle)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Bonus: factory wires constructor dependencies through to the strategy
|
|
|
|
|
|
def test_factory_passes_dependencies_to_strategy() -> None:
|
|
captured: dict[str, Any] = {}
|
|
|
|
def _factory(**kwargs: Any) -> PoseEstimator:
|
|
captured.update(kwargs)
|
|
return _FakePoseEstimator()
|
|
|
|
register_pose_estimator("opencv_gtsam", _factory)
|
|
handle = _FakeISam2GraphHandle()
|
|
ransac = mock.MagicMock()
|
|
wgs = mock.MagicMock()
|
|
se3 = mock.MagicMock()
|
|
cfg = _build_config()
|
|
|
|
build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=ransac,
|
|
wgs_converter=wgs,
|
|
se3_utils=se3,
|
|
isam2_graph_handle=handle,
|
|
)
|
|
|
|
assert captured["ransac_filter"] is ransac
|
|
assert captured["wgs_converter"] is wgs
|
|
assert captured["se3_utils"] is se3
|
|
assert captured["isam2_graph_handle"] is handle
|
|
|
|
|
|
def test_factory_lazy_imports_when_registry_empty() -> None:
|
|
# Arrange — registry is empty (fixture cleared it); the
|
|
# lazy-import fallback should pick up the AZ-358 concrete
|
|
# ``opencv_gtsam_estimator`` module and resolve its ``create``
|
|
# callable.
|
|
cfg = _build_config()
|
|
|
|
# Act — call should succeed (lazy import resolves to AZ-358).
|
|
estimator = build_pose_estimator(
|
|
cfg,
|
|
ransac_filter=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
isam2_graph_handle=_FakeISam2GraphHandle(),
|
|
)
|
|
|
|
# Assert — the returned object satisfies the PoseEstimator Protocol.
|
|
assert isinstance(estimator, PoseEstimator)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Bonus: config validation
|
|
|
|
|
|
def test_config_rejects_unknown_strategy_at_post_init() -> None:
|
|
from gps_denied_onboard.config.schema import ConfigError
|
|
|
|
with pytest.raises(ConfigError, match="garbage"):
|
|
C4PoseConfig(strategy="garbage")
|
|
|
|
|
|
def test_config_rejects_zero_ransac_iterations() -> None:
|
|
from gps_denied_onboard.config.schema import ConfigError
|
|
|
|
with pytest.raises(ConfigError, match="ransac_iterations"):
|
|
C4PoseConfig(ransac_iterations=0)
|
|
|
|
|
|
def test_pose_estimate_uuid_frame_id() -> None:
|
|
pe = PoseEstimate(
|
|
frame_id=UUID(int=42),
|
|
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
|
|
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
|
|
covariance_6x6=np.eye(6),
|
|
covariance_mode=CovarianceMode.MARGINALS,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=42,
|
|
emitted_at=1_000_000_000,
|
|
)
|
|
assert isinstance(pe.frame_id, UUID)
|
|
assert pe.emitted_at == 1_000_000_000
|