Files
gps-denied-onboard/tests/unit/c4_pose/test_az355_pose_protocol.py
T
Oleksandr Bezdieniezhnykh 4eac24f37a [AZ-358] [AZ-361] C4 OpenCVGtsamPoseEstimator + Jacobian thermal hybrid
Implement the single production-default C4 PoseEstimator strategy.

AZ-358 — Marginals path: OpenCV solvePnPRansac (SOLVEPNP_IPPE) on
best-candidate inliers, PriorFactorPose3 with Jacobian-derived initial
covariance, flushed into C5's iSAM2 graph via the widened
ISam2GraphHandle.update(graph, values, None) (Option B). Posterior
covariance from compute_marginals().marginalCovariance(pose_key) with
SPD-defensive Cholesky check. Tile pixel -> ENU world conversion via
the shared WgsConverter + a configurable tile_size_px. Two spec
deviations now documented in the AZ-358 task file: PriorFactorPose3
over GenericProjectionFactorCal3DS2 (avoids unbounded landmark
variables; same Fisher information on the pose marginal) and explicit
(graph, values, timestamps) update args (aligns with C5's impl).

AZ-361 — Jacobian + thermal hybrid: per-frame dispatch on
thermal_state.thermal_throttle_active selects the cv2.projectPoints-
derived 6x6 information matrix (with ridge regularisation) as the
emitted covariance. Skips the iSAM2 factor add under throttle
(Invariant 12). Emits CovarianceDegradedWarning via warnings.warn
(never raised); paired WARN log + FDR record rate-limited per
covariance_degraded_warn_window_ns (default 60 s) via an injected
monotonic Clock. Supersedes the AZ-358 NotImplementedError stub.

Widens ISam2GraphHandle from get_pose_key only to all five C4-facing
methods (add_factor, update, compute_marginals, last_anchor_age_ms);
C5's existing ISam2GraphHandleImpl already satisfies the superset, so
no C5 source change this batch. Threads fdr_client + clock through
pose_factory composition.

Registers two new FDR payload kinds: pose.frame_done (per-call
telemetry; both success and PnpFailureError paths) and
pose.covariance_degraded (per-window throttle exposure).

Tests: 21 new (AZ-358 AC-1..11 + AZ-361 AC-1..10/12/13; AZ-361 AC-11
RMSE-ratio informational per spec, not asserted). Updates 2 existing
test files for Protocol widening and the FDR-schema round trip.

Code review verdict: PASS_WITH_WARNINGS (5 findings: Medium x2,
Low x3; none blocking). Full suite: 1958 passed, 1 unrelated
host-dependent perf failure (c12 CLI cold-start, pre-existing).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 05:01:14 +03:00

493 lines
16 KiB
Python

"""AZ-355 — C4 PoseEstimator Protocol + Factory + DTOs + Composition.
Tests cover AC-1..AC-10 from
``_docs/02_tasks/todo/AZ-355_c4_pose_protocol.md``:
- AC-1 Protocol conformance — ``runtime_checkable``.
- AC-2 DTOs are frozen + slots.
- AC-3 Enums have the documented values.
- AC-4 ``CovarianceDegradedWarning`` IS-A ``Warning`` NOT ``Exception``.
- AC-5 ``PnpFailureError`` IS-A ``Exception``.
- AC-6 Factory rejects unknown strategy.
- AC-7 Factory accepts ``"opencv_gtsam"`` and emits INFO log.
- AC-8 Public API surface — ``__init__.py`` re-exports.
- AC-9 Strategy bound to single ingest thread (same thread as C5).
- AC-10 ``ISam2GraphHandle`` Protocol stub conforms to
``runtime_checkable``.
"""
from __future__ import annotations
import dataclasses
import logging
import threading
import warnings
from dataclasses import FrozenInstanceError
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.pose import (
CovarianceMode,
PoseEstimate,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c4_pose import (
C4PoseConfig,
CovarianceDegradedWarning,
PnpFailureError,
PoseEstimator,
PoseEstimatorConfigError,
PoseEstimatorError,
)
from gps_denied_onboard.components.c4_pose._isam2_handle import ISam2GraphHandle
from gps_denied_onboard.config import load_config
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.runtime_root.pose_factory import (
build_pose_estimator,
clear_pose_registry,
register_pose_estimator,
)
from gps_denied_onboard.runtime_root.state_factory import (
StateIngestThreadAlreadyBoundError,
bind_state_ingest_thread,
clear_state_ingest_binding,
)
@pytest.fixture(autouse=True)
def _isolation():
clear_pose_registry()
clear_state_ingest_binding()
yield
clear_pose_registry()
clear_state_ingest_binding()
def _build_config(**overrides: Any) -> Config:
cfg = load_config(env={}, paths=(), require_env=False)
new_block = dataclasses.replace(C4PoseConfig(), **overrides)
components = dict(cfg.components or {})
components["c4_pose"] = new_block
return dataclasses.replace(cfg, components=components)
class _FakeISam2GraphHandle:
"""Minimal handle stub for factory / Protocol tests.
Implements the AZ-358-extended 5-method surface — the AZ-355
AC-10 ``isinstance(handle, ISam2GraphHandle)`` runtime-checkable
test now expects all five methods.
"""
def get_pose_key(self, frame_id: int) -> int:
return int(frame_id)
def add_factor(self, factor: Any) -> None:
return None
def update(self, graph: Any, values: Any, timestamps: Any | None = None) -> None:
return None
def compute_marginals(self) -> Any:
return None
def last_anchor_age_ms(self) -> int:
return 0
class _FakePoseEstimator:
"""Test double satisfying the full PoseEstimator Protocol."""
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> PoseEstimate:
return PoseEstimate(
frame_id=uuid4(),
position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
covariance_6x6=np.eye(6, dtype=np.float64),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=0,
)
def current_covariance_mode(self) -> CovarianceMode:
return CovarianceMode.MARGINALS
# ---------------------------------------------------------------------
# AC-1: Protocol conformance — runtime_checkable
def test_ac1_full_fake_satisfies_protocol() -> None:
fake = _FakePoseEstimator()
assert isinstance(fake, PoseEstimator)
def test_ac1_partial_fake_fails_protocol() -> None:
class _OnlyEstimate:
def estimate(self, match_result: Any, calibration: Any, thermal_state: Any) -> Any:
return None
# Missing current_covariance_mode → fails isinstance
assert not isinstance(_OnlyEstimate(), PoseEstimator)
# ---------------------------------------------------------------------
# AC-2: DTOs are frozen + slots
def test_ac2_lat_lon_alt_frozen_and_slotted() -> None:
p = LatLonAlt(lat_deg=1.0, lon_deg=2.0, alt_m=3.0)
assert LatLonAlt.__dataclass_params__.frozen is True
assert hasattr(LatLonAlt, "__slots__")
assert tuple(LatLonAlt.__slots__) == ("lat_deg", "lon_deg", "alt_m")
with pytest.raises(FrozenInstanceError):
p.lat_deg = 99.0 # type: ignore[misc]
def test_ac2_quat_frozen_and_slotted() -> None:
q = Quat(w=1.0, x=0.0, y=0.0, z=0.0)
assert Quat.__dataclass_params__.frozen is True
assert hasattr(Quat, "__slots__")
assert tuple(Quat.__slots__) == ("w", "x", "y", "z")
with pytest.raises(FrozenInstanceError):
q.w = 0.0 # type: ignore[misc]
def test_ac2_pose_estimate_frozen_and_slotted() -> None:
pe = PoseEstimate(
frame_id=uuid4(),
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
covariance_6x6=np.eye(6),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=0,
)
assert PoseEstimate.__dataclass_params__.frozen is True
assert hasattr(PoseEstimate, "__slots__")
assert len(PoseEstimate.__slots__) == 8
with pytest.raises(FrozenInstanceError):
pe.last_satellite_anchor_age_ms = 1 # type: ignore[misc]
# ---------------------------------------------------------------------
# AC-3: Enums have the documented values
def test_ac3_covariance_mode_has_exactly_marginals_and_jacobian() -> None:
members = {m.name for m in CovarianceMode}
assert members == {"MARGINALS", "JACOBIAN"}
assert CovarianceMode.MARGINALS.value == "marginals"
assert CovarianceMode.JACOBIAN.value == "jacobian"
def test_ac3_pose_source_label_has_three_documented_values() -> None:
members = {m.name for m in PoseSourceLabel}
assert members == {"SATELLITE_ANCHORED", "VISUAL_PROPAGATED", "DEAD_RECKONED"}
# ---------------------------------------------------------------------
# AC-4: CovarianceDegradedWarning IS-A Warning NOT Exception
def test_ac4_covariance_degraded_warning_subclasses_warning() -> None:
# AC-4 intent: CovarianceDegradedWarning must be on the Warning branch
# of the exception tree so the warnings machinery handles it. Python's
# actual hierarchy has Warning < Exception < BaseException, so the
# behavioural test (warnings.warn does not raise; except Exception does
# not catch) below is what the contract really pins.
assert issubclass(CovarianceDegradedWarning, Warning)
def test_ac4_try_except_exception_does_not_catch_the_warning() -> None:
caught_via_exception = False
captured: list[Warning] = []
with warnings.catch_warnings(record=True) as records:
warnings.simplefilter("always")
try:
warnings.warn("throttle engaged", CovarianceDegradedWarning, stacklevel=1)
except Exception:
caught_via_exception = True
for r in records:
if isinstance(r.message, CovarianceDegradedWarning):
captured.append(r.message)
# warnings.warn(...) does NOT raise — it emits through the filter chain.
# The try/except Exception block above MUST NOT see the warning even
# though Warning subclasses Exception in the class hierarchy.
assert caught_via_exception is False
assert len(captured) == 1
# ---------------------------------------------------------------------
# AC-5: PnpFailureError IS-A Exception
def test_ac5_pnp_failure_error_subclasses_pose_estimator_error_and_exception() -> None:
assert issubclass(PnpFailureError, PoseEstimatorError)
assert issubclass(PnpFailureError, Exception)
def test_ac5_pnp_failure_error_caught_by_except_exception() -> None:
caught = False
try:
raise PnpFailureError("RANSAC failed")
except Exception:
caught = True
assert caught
# ---------------------------------------------------------------------
# AC-6: Factory rejects unknown strategy
def test_ac6_factory_rejects_unknown_strategy(
caplog: pytest.LogCaptureFixture,
) -> None:
bad_block = C4PoseConfig.__new__(C4PoseConfig)
object.__setattr__(bad_block, "strategy", "garbage")
object.__setattr__(bad_block, "ransac_iterations", 200)
object.__setattr__(bad_block, "ransac_reprojection_threshold_px", 4.0)
object.__setattr__(bad_block, "thermal_throttle_threshold_celsius", 75.0)
cfg = load_config(env={}, paths=(), require_env=False)
components = dict(cfg.components or {})
components["c4_pose"] = bad_block
bad_cfg = dataclasses.replace(cfg, components=components)
with caplog.at_level(logging.ERROR):
with pytest.raises(PoseEstimatorConfigError, match="garbage"):
build_pose_estimator(
bad_cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
error_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.unknown_strategy"
]
assert len(error_records) == 1
def test_ac6_factory_rejects_non_conforming_graph_handle() -> None:
cfg = _build_config()
class _BrokenHandle:
pass
with pytest.raises(PoseEstimatorConfigError, match="ISam2GraphHandle"):
build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_BrokenHandle(), # type: ignore[arg-type]
)
# ---------------------------------------------------------------------
# AC-7: Factory accepts "opencv_gtsam" and emits INFO log
def test_ac7_factory_accepts_opencv_gtsam_and_emits_info_log(
caplog: pytest.LogCaptureFixture,
) -> None:
cfg = _build_config()
def _factory(**kwargs: Any) -> PoseEstimator:
return _FakePoseEstimator()
register_pose_estimator("opencv_gtsam", _factory)
with caplog.at_level(logging.INFO):
estimator = build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
assert isinstance(estimator, PoseEstimator)
info_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.strategy_loaded"
]
assert len(info_records) == 1
record = info_records[0]
assert record.kv["strategy"] == "opencv_gtsam"
assert record.kv["ransac_iterations"] == 200
assert record.kv["ransac_reprojection_threshold_px"] == 4.0
assert record.kv["thermal_throttle_threshold_celsius"] == 75.0
# ---------------------------------------------------------------------
# AC-8: Public API surface
def test_ac8_public_api_re_exports() -> None:
from gps_denied_onboard.components import c4_pose
expected_public = {
"C4PoseConfig",
"CovarianceDegradedWarning",
"CovarianceMode",
"LatLonAlt",
"PnpFailureError",
"PoseEstimate",
"PoseEstimator",
"PoseEstimatorConfigError",
"PoseEstimatorError",
"PoseSourceLabel",
"Quat",
}
assert expected_public.issubset(set(c4_pose.__all__))
def test_ac8_internals_not_in_public_all() -> None:
from gps_denied_onboard.components import c4_pose
assert "ISam2GraphHandle" not in c4_pose.__all__
assert "_isam2_handle" not in c4_pose.__all__
# ---------------------------------------------------------------------
# AC-9: Strategy bound to single ingest thread (same thread as C5)
def test_ac9_bind_state_ingest_thread_rejects_second_thread() -> None:
primary = bind_state_ingest_thread()
other_ident = primary + 1
with pytest.raises(StateIngestThreadAlreadyBoundError):
bind_state_ingest_thread(other_ident)
def test_ac9_bind_state_ingest_thread_idempotent_for_same_thread() -> None:
primary = bind_state_ingest_thread()
again = bind_state_ingest_thread()
assert primary == again
assert primary == threading.get_ident()
# ---------------------------------------------------------------------
# AC-10: ISam2GraphHandle Protocol stub conforms to runtime_checkable
def test_ac10_isam2_graph_handle_runtime_checkable() -> None:
handle = _FakeISam2GraphHandle()
assert isinstance(handle, ISam2GraphHandle)
def test_ac10_isam2_graph_handle_rejects_missing_method() -> None:
class _NoMethod:
pass
assert not isinstance(_NoMethod(), ISam2GraphHandle)
def test_ac10_isam2_graph_handle_rejects_partial_surface() -> None:
"""AZ-358 widened the Protocol to 5 methods; a handle that only
implements the original ``get_pose_key`` no longer satisfies
runtime_checkable conformance."""
class _OnlyGetPoseKey:
def get_pose_key(self, frame_id: int) -> int:
return int(frame_id)
assert not isinstance(_OnlyGetPoseKey(), ISam2GraphHandle)
# ---------------------------------------------------------------------
# Bonus: factory wires constructor dependencies through to the strategy
def test_factory_passes_dependencies_to_strategy() -> None:
captured: dict[str, Any] = {}
def _factory(**kwargs: Any) -> PoseEstimator:
captured.update(kwargs)
return _FakePoseEstimator()
register_pose_estimator("opencv_gtsam", _factory)
handle = _FakeISam2GraphHandle()
ransac = mock.MagicMock()
wgs = mock.MagicMock()
se3 = mock.MagicMock()
cfg = _build_config()
build_pose_estimator(
cfg,
ransac_filter=ransac,
wgs_converter=wgs,
se3_utils=se3,
isam2_graph_handle=handle,
)
assert captured["ransac_filter"] is ransac
assert captured["wgs_converter"] is wgs
assert captured["se3_utils"] is se3
assert captured["isam2_graph_handle"] is handle
def test_factory_lazy_imports_when_registry_empty() -> None:
# Arrange — registry is empty (fixture cleared it); the
# lazy-import fallback should pick up the AZ-358 concrete
# ``opencv_gtsam_estimator`` module and resolve its ``create``
# callable.
cfg = _build_config()
# Act — call should succeed (lazy import resolves to AZ-358).
estimator = build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
# Assert — the returned object satisfies the PoseEstimator Protocol.
assert isinstance(estimator, PoseEstimator)
# ---------------------------------------------------------------------
# Bonus: config validation
def test_config_rejects_unknown_strategy_at_post_init() -> None:
from gps_denied_onboard.config.schema import ConfigError
with pytest.raises(ConfigError, match="garbage"):
C4PoseConfig(strategy="garbage")
def test_config_rejects_zero_ransac_iterations() -> None:
from gps_denied_onboard.config.schema import ConfigError
with pytest.raises(ConfigError, match="ransac_iterations"):
C4PoseConfig(ransac_iterations=0)
def test_pose_estimate_uuid_frame_id() -> None:
pe = PoseEstimate(
frame_id=UUID(int=42),
position_wgs84=LatLonAlt(0.0, 0.0, 0.0),
orientation_world_T_body=Quat(1.0, 0.0, 0.0, 0.0),
covariance_6x6=np.eye(6),
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=42,
emitted_at=1_000_000_000,
)
assert isinstance(pe.frame_id, UUID)
assert pe.emitted_at == 1_000_000_000