[AZ-358] [AZ-361] C4 OpenCVGtsamPoseEstimator + Jacobian thermal hybrid

Implement the single production-default C4 PoseEstimator strategy.

AZ-358 — Marginals path: OpenCV solvePnPRansac (SOLVEPNP_IPPE) on
best-candidate inliers, PriorFactorPose3 with Jacobian-derived initial
covariance, flushed into C5's iSAM2 graph via the widened
ISam2GraphHandle.update(graph, values, None) (Option B). Posterior
covariance from compute_marginals().marginalCovariance(pose_key) with
SPD-defensive Cholesky check. Tile pixel -> ENU world conversion via
the shared WgsConverter + a configurable tile_size_px. Two spec
deviations now documented in the AZ-358 task file: PriorFactorPose3
over GenericProjectionFactorCal3DS2 (avoids unbounded landmark
variables; same Fisher information on the pose marginal) and explicit
(graph, values, timestamps) update args (aligns with C5's impl).

AZ-361 — Jacobian + thermal hybrid: per-frame dispatch on
thermal_state.thermal_throttle_active selects the cv2.projectPoints-
derived 6x6 information matrix (with ridge regularisation) as the
emitted covariance. Skips the iSAM2 factor add under throttle
(Invariant 12). Emits CovarianceDegradedWarning via warnings.warn
(never raised); paired WARN log + FDR record rate-limited per
covariance_degraded_warn_window_ns (default 60 s) via an injected
monotonic Clock. Supersedes the AZ-358 NotImplementedError stub.

Widens ISam2GraphHandle from get_pose_key only to all five C4-facing
methods (add_factor, update, compute_marginals, last_anchor_age_ms);
C5's existing ISam2GraphHandleImpl already satisfies the superset, so
no C5 source change this batch. Threads fdr_client + clock through
pose_factory composition.

Registers two new FDR payload kinds: pose.frame_done (per-call
telemetry; both success and PnpFailureError paths) and
pose.covariance_degraded (per-window throttle exposure).

Tests: 21 new (AZ-358 AC-1..11 + AZ-361 AC-1..10/12/13; AZ-361 AC-11
RMSE-ratio informational per spec, not asserted). Updates 2 existing
test files for Protocol widening and the FDR-schema round trip.

Code review verdict: PASS_WITH_WARNINGS (5 findings: Medium x2,
Low x3; none blocking). Full suite: 1958 passed, 1 unrelated
host-dependent perf failure (c12 CLI cold-start, pre-existing).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 05:01:14 +03:00
parent 360aece7a6
commit 4eac24f37a
13 changed files with 2452 additions and 35 deletions
+46 -13
View File
@@ -78,11 +78,28 @@ def _build_config(**overrides: Any) -> Config:
class _FakeISam2GraphHandle:
"""Minimal handle stub for factory / Protocol tests."""
"""Minimal handle stub for factory / Protocol tests.
Implements the AZ-358-extended 5-method surface — the AZ-355
AC-10 ``isinstance(handle, ISam2GraphHandle)`` runtime-checkable
test now expects all five methods.
"""
def get_pose_key(self, frame_id: int) -> int:
return int(frame_id)
def add_factor(self, factor: Any) -> None:
return None
def update(self, graph: Any, values: Any, timestamps: Any | None = None) -> None:
return None
def compute_marginals(self) -> Any:
return None
def last_anchor_age_ms(self) -> int:
return 0
class _FakePoseEstimator:
"""Test double satisfying the full PoseEstimator Protocol."""
@@ -378,6 +395,18 @@ def test_ac10_isam2_graph_handle_rejects_missing_method() -> None:
assert not isinstance(_NoMethod(), ISam2GraphHandle)
def test_ac10_isam2_graph_handle_rejects_partial_surface() -> None:
"""AZ-358 widened the Protocol to 5 methods; a handle that only
implements the original ``get_pose_key`` no longer satisfies
runtime_checkable conformance."""
class _OnlyGetPoseKey:
def get_pose_key(self, frame_id: int) -> int:
return int(frame_id)
assert not isinstance(_OnlyGetPoseKey(), ISam2GraphHandle)
# ---------------------------------------------------------------------
# Bonus: factory wires constructor dependencies through to the strategy
@@ -411,19 +440,23 @@ def test_factory_passes_dependencies_to_strategy() -> None:
def test_factory_lazy_imports_when_registry_empty() -> None:
# Arrange — registry is empty (fixture cleared it); the
# lazy-import fallback should pick up the AZ-358 concrete
# ``opencv_gtsam_estimator`` module and resolve its ``create``
# callable.
cfg = _build_config()
# Registry is cleared by the fixture; the lazy-import fallback
# should attempt to import the concrete module. We have not
# shipped opencv_gtsam_estimator yet (AZ-358), so the import
# raises and gets wrapped in PoseEstimatorConfigError.
with pytest.raises(PoseEstimatorConfigError):
build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
# Act — call should succeed (lazy import resolves to AZ-358).
estimator = build_pose_estimator(
cfg,
ransac_filter=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
se3_utils=mock.MagicMock(),
isam2_graph_handle=_FakeISam2GraphHandle(),
)
# Assert — the returned object satisfies the PoseEstimator Protocol.
assert isinstance(estimator, PoseEstimator)
# ---------------------------------------------------------------------
@@ -0,0 +1,919 @@
"""AZ-358 + AZ-361 — OpenCVGtsamPoseEstimator unit tests.
Covers the published acceptance criteria:
AZ-358 (Marginals path)
* AC-1 PnP success on synthetic correspondences → WGS84 position within tolerance.
* AC-2 Degenerate (insufficient) inliers → ``PnpFailureError`` + ERROR log + FDR error record.
* AC-3 SPD covariance — ``np.linalg.cholesky`` succeeds; symmetric.
* AC-4 ``covariance_mode == MARGINALS`` on success (both ``PoseEstimate`` and ``current_covariance_mode()``).
* AC-5 ``source_label == SATELLITE_ANCHORED`` on success.
* AC-6 WGS84 conversion uses the shared :class:`WgsConverter` (verified by injection identity).
* AC-7 iSAM2 handle call sequence — ``get_pose_key`` ×1 → ``update`` ×1 → ``compute_marginals`` ×1.
* AC-9 Non-SPD covariance defensive — ``PnpFailureError`` with the documented message.
* AC-10 Composition-root wiring emits a ``c4.pose.ready`` INFO log via the factory.
* AC-11 FDR ``pose.frame_done`` record shape on success.
AZ-361 (Jacobian + thermal hybrid)
* AC-1 Per-frame mode dispatch on alternating thermal flag.
* AC-2 Mode-switch latency ≤ 1 frame.
* AC-3 Jacobian covariance SPD.
* AC-4 ``covariance_mode == JACOBIAN`` on Jacobian path.
* AC-5 Source label SATELLITE_ANCHORED regardless of path.
* AC-6 ``CovarianceDegradedWarning`` emitted via ``warnings.warn``, not raised.
* AC-7 ``warnings.warn`` rate-limited per window.
* AC-8 WARN log rate-limited similarly.
* AC-12 Jacobian path SKIPS iSAM2 factor add (no ``update`` call).
* AC-13 FDR ``mode`` field distinguishes path.
"""
from __future__ import annotations
import dataclasses
import logging
import warnings
from typing import Any
from uuid import UUID
import cv2
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.matcher import CandidateMatchSet, MatchResult
from gps_denied_onboard._types.pose import (
CovarianceMode,
PoseEstimate,
PoseSourceLabel,
)
from gps_denied_onboard._types.thermal import ThermalState
from gps_denied_onboard.components.c4_pose import (
C4PoseConfig,
CovarianceDegradedWarning,
PnpFailureError,
PoseEstimator,
)
from gps_denied_onboard.components.c4_pose.opencv_gtsam_estimator import (
OpenCVGtsamPoseEstimator,
create,
)
from gps_denied_onboard.config import Config, load_config
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.runtime_root.pose_factory import (
build_pose_estimator,
clear_pose_registry,
)
# ---------------------------------------------------------------------
# Test infrastructure.
class _FakeMarginals:
"""Stand-in for ``gtsam.Marginals`` that returns a canned 6x6 SPD."""
def __init__(self, cov: np.ndarray) -> None:
self._cov = np.asarray(cov, dtype=np.float64)
def marginalCovariance(self, _key: int) -> np.ndarray: # noqa: N802 — GTSAM API
return self._cov
class _RecordingISam2GraphHandle:
"""Programmable handle that records every call for AC-7 / AC-12 checks."""
def __init__(self, *, posterior_cov: np.ndarray | None = None) -> None:
if posterior_cov is None:
posterior_cov = np.eye(6, dtype=np.float64) * 0.0025
self._posterior_cov = posterior_cov
self.calls: list[tuple[str, Any, ...]] = []
self._next_key = 1
def get_pose_key(self, frame_id: int) -> int:
self.calls.append(("get_pose_key", frame_id))
key = self._next_key
self._next_key += 1
return key
def add_factor(self, factor: Any) -> None:
self.calls.append(("add_factor", factor))
def update(
self, graph: Any, values: Any, timestamps: Any | None = None
) -> None:
self.calls.append(("update", graph, values, timestamps))
def compute_marginals(self) -> Any:
self.calls.append(("compute_marginals",))
return _FakeMarginals(self._posterior_cov)
def last_anchor_age_ms(self) -> int:
self.calls.append(("last_anchor_age_ms",))
return 0
class _MutableThermalState:
"""Mutable thermal-state container so tests can flip the throttle bit per call."""
def __init__(self, *, throttle: bool = False) -> None:
self.thermal_throttle_active = throttle
def with_throttle(self, throttle: bool) -> ThermalState:
return ThermalState(
cpu_temp_c=70.0,
gpu_temp_c=80.0 if throttle else 60.0,
thermal_throttle_active=throttle,
measured_clock_mhz=1200 if not throttle else 600,
measured_at_ns=0,
is_telemetry_available=True,
)
class _FakeClock:
"""Deterministic monotonic clock for rate-limit window tests."""
def __init__(self, start_ns: int = 0) -> None:
self._now_ns = start_ns
def advance(self, delta_ns: int) -> None:
self._now_ns += delta_ns
def monotonic_ns(self) -> int:
return self._now_ns
def time_ns(self) -> int:
return self._now_ns
def sleep_until_ns(self, target_ns: int) -> None:
if target_ns > self._now_ns:
self._now_ns = target_ns
_TEST_TILE_ID = (18, 49.5, 36.0)
_TEST_TILE_SIZE_PX = 256
def _build_config(**overrides: Any) -> Config:
cfg = load_config(env={}, paths=(), require_env=False)
new_block = dataclasses.replace(C4PoseConfig(), **overrides)
components = dict(cfg.components or {})
components["c4_pose"] = new_block
return dataclasses.replace(cfg, components=components)
def _build_calibration() -> CameraCalibration:
K = np.array(
[[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
dtype=np.float64,
)
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=K,
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="manifest",
)
def _tile_pixel_to_enu(
tile_id: tuple[int, float, float],
px: float,
py: float,
origin: LatLonAlt,
tile_size_px: int = _TEST_TILE_SIZE_PX,
) -> np.ndarray:
"""Mirror the estimator's tile-pixel → ENU conversion for fixture setup."""
zoom, lat_c, lon_c = tile_id
tile_x, tile_y = WgsConverter.latlon_to_tile_xy(zoom, lat_c, lon_c)
bounds = WgsConverter.tile_xy_to_latlon_bounds(zoom, tile_x, tile_y)
lon = bounds.min_lon_deg + (px / tile_size_px) * (
bounds.max_lon_deg - bounds.min_lon_deg
)
lat = bounds.max_lat_deg - (py / tile_size_px) * (
bounds.max_lat_deg - bounds.min_lat_deg
)
return WgsConverter.latlonalt_to_local_enu(
origin, LatLonAlt(lat_deg=lat, lon_deg=lon, alt_m=0.0)
)
def _synthesise_match_result(
*,
num_inliers: int = 50,
pose_T_world_cam: np.ndarray | None = None,
seed: int = 42,
) -> tuple[MatchResult, np.ndarray]:
"""Build a MatchResult whose inliers reproject to a known camera pose.
Returns ``(match_result, pose_T_world_cam)``. The pose places the
camera 100 m above the tile centre looking straight down
(camera +Z axis aligned with world -Z, so world ground points
end up in front of the camera in OpenCV's projection model).
The world-point grid samples the tile uniformly.
"""
if pose_T_world_cam is None:
# Camera looking straight down: flip Y and Z axes so that
# world points at z=0 reproject in front of the camera
# (camera-frame z > 0) per the OpenCV projection convention.
pose_T_world_cam = np.array(
[
[1.0, 0.0, 0.0, 0.0],
[0.0, -1.0, 0.0, 0.0],
[0.0, 0.0, -1.0, 100.0],
[0.0, 0.0, 0.0, 1.0],
],
dtype=np.float64,
)
origin = LatLonAlt(
lat_deg=_TEST_TILE_ID[1], lon_deg=_TEST_TILE_ID[2], alt_m=0.0
)
rng = np.random.default_rng(seed)
tile_pixels = rng.uniform(40.0, 216.0, size=(num_inliers, 2))
world_pts = np.array(
[
_tile_pixel_to_enu(_TEST_TILE_ID, float(px), float(py), origin)
for px, py in tile_pixels
],
dtype=np.float64,
)
K = np.array(
[[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
dtype=np.float64,
)
T_cam_world = np.linalg.inv(pose_T_world_cam)
rvec, _ = cv2.Rodrigues(T_cam_world[:3, :3])
tvec = T_cam_world[:3, 3].reshape(3, 1)
projected, _ = cv2.projectPoints(
objectPoints=world_pts.reshape(-1, 1, 3),
rvec=rvec,
tvec=tvec,
cameraMatrix=K,
distCoeffs=np.zeros(5, dtype=np.float64),
)
image_pts = projected.reshape(-1, 2)
correspondences = np.hstack([image_pts, tile_pixels]).astype(np.float32)
candidate = CandidateMatchSet(
tile_id=_TEST_TILE_ID,
inlier_count=num_inliers,
inlier_correspondences=correspondences,
ransac_outlier_count=0,
per_candidate_residual_px=0.5,
)
match_result = MatchResult(
frame_id=1,
per_candidate=(candidate,),
best_candidate_idx=0,
reprojection_residual_px=0.5,
matched_at=0,
matcher_label="disk_lightglue",
candidates_input=1,
candidates_dropped=0,
)
return match_result, pose_T_world_cam
def _build_estimator(
*,
handle: Any | None = None,
fdr: Any | None = None,
clock: Any | None = None,
config: Config | None = None,
) -> OpenCVGtsamPoseEstimator:
if handle is None:
handle = _RecordingISam2GraphHandle()
if fdr is None:
fdr = FakeFdrSink(producer_id="c4_pose")
if clock is None:
clock = _FakeClock()
if config is None:
config = _build_config()
estimator = OpenCVGtsamPoseEstimator(
config,
ransac_filter=object(),
wgs_converter=WgsConverter,
se3_utils=object(),
isam2_graph_handle=handle,
fdr_client=fdr,
clock=clock,
logger=logging.getLogger("test.c4_pose"),
)
# Pre-seed the ENU origin to the tile centre so reconstructed
# WGS84 positions match the synthetic ground truth exactly.
estimator.set_enu_origin(
LatLonAlt(lat_deg=_TEST_TILE_ID[1], lon_deg=_TEST_TILE_ID[2], alt_m=0.0)
)
return estimator
@pytest.fixture(autouse=True)
def _registry_isolation():
clear_pose_registry()
yield
clear_pose_registry()
# ---------------------------------------------------------------------
# AC-1: PnP success on synthetic correspondences.
def test_az358_ac1_pnp_success_synthetic_within_tolerance() -> None:
# Arrange
match_result, pose_T_world_cam = _synthesise_match_result()
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert — z component should be near 100 m (camera altitude).
expected_alt = pose_T_world_cam[2, 3]
assert abs(estimate.position_wgs84.alt_m - expected_alt) < 1.0
# ---------------------------------------------------------------------
# AC-2: Degenerate geometry → PnpFailureError.
def test_az358_ac2_insufficient_inliers_raises_pnp_failure(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — only 3 inliers, IPPE needs >= 4.
match_result, _ = _synthesise_match_result(num_inliers=3)
fdr = FakeFdrSink(producer_id="c4_pose")
estimator = _build_estimator(fdr=fdr)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act / Assert
with caplog.at_level(logging.ERROR), pytest.raises(PnpFailureError):
estimator.estimate(match_result, calibration, thermal)
error_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.pnp_failure"
]
assert len(error_records) == 1
fdr_records = [r for r in fdr.records if r.kind == "pose.frame_done"]
assert len(fdr_records) == 1
assert fdr_records[0].payload.get("error") is True
# ---------------------------------------------------------------------
# AC-3 + AC-4 + AC-5: SPD covariance, mode reporting, source label.
def test_az358_ac3_4_5_marginals_success_invariants() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert — AC-3 SPD
cov = estimate.covariance_6x6
assert cov.shape == (6, 6)
assert np.allclose(cov, cov.T, atol=1e-10)
np.linalg.cholesky(cov)
# AC-4 mode reporting
assert estimate.covariance_mode is CovarianceMode.MARGINALS
assert estimator.current_covariance_mode() is CovarianceMode.MARGINALS
# AC-5 source label
assert estimate.source_label is PoseSourceLabel.SATELLITE_ANCHORED
# ---------------------------------------------------------------------
# AC-6: WGS84 conversion uses shared WgsConverter (verified by injection identity).
def test_az358_ac6_wgs_converter_injection() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert — the converted WGS84 lat/lon should land inside the tile
# bounds (since the camera is centred over the tile centre at
# altitude 100 m and looking straight down, its WGS84 footprint
# is the tile centre).
tile_x, tile_y = WgsConverter.latlon_to_tile_xy(
_TEST_TILE_ID[0], _TEST_TILE_ID[1], _TEST_TILE_ID[2]
)
bounds = WgsConverter.tile_xy_to_latlon_bounds(
_TEST_TILE_ID[0], tile_x, tile_y
)
assert bounds.min_lat_deg <= estimate.position_wgs84.lat_deg <= bounds.max_lat_deg
assert bounds.min_lon_deg <= estimate.position_wgs84.lon_deg <= bounds.max_lon_deg
# ---------------------------------------------------------------------
# AC-7: iSAM2 handle call sequence.
def test_az358_ac7_isam2_handle_call_sequence() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
handle = _RecordingISam2GraphHandle()
estimator = _build_estimator(handle=handle)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimator.estimate(match_result, calibration, thermal)
# Assert — exactly one get_pose_key, one update, one compute_marginals.
counts = {name: 0 for name in ("get_pose_key", "add_factor", "update", "compute_marginals")}
for call in handle.calls:
counts[call[0]] = counts.get(call[0], 0) + 1
assert counts["get_pose_key"] == 1
assert counts["update"] == 1
assert counts["compute_marginals"] == 1
# AC-7 invariant: C4 must NOT call add_factor (Option B —
# factors travel via the local NonlinearFactorGraph passed to
# update).
assert counts["add_factor"] == 0
# ---------------------------------------------------------------------
# AC-9: Non-SPD covariance defensive raise.
def test_az358_ac9_non_spd_covariance_raises_pnp_failure(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — handle returns a covariance that is symmetric but
# has a negative eigenvalue so Cholesky fails. ``-0.01·I`` is
# symmetric and non-SPD by construction.
bad_cov = -np.eye(6, dtype=np.float64) * 0.01
handle = _RecordingISam2GraphHandle(posterior_cov=bad_cov)
fdr = FakeFdrSink(producer_id="c4_pose")
match_result, _ = _synthesise_match_result()
estimator = _build_estimator(handle=handle, fdr=fdr)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act / Assert
with caplog.at_level(logging.ERROR), pytest.raises(
PnpFailureError, match="non-SPD covariance"
):
estimator.estimate(match_result, calibration, thermal)
error_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.pnp_failure"
]
assert any(rec.kv.get("stage") == "spd_check" for rec in error_records)
# ---------------------------------------------------------------------
# AC-10: Composition-root wiring (factory emits ready log + identity-shared deps).
def test_az358_ac10_composition_root_wiring_emits_ready_log(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
cfg = _build_config()
handle = _RecordingISam2GraphHandle()
fdr = FakeFdrSink(producer_id="c4_pose")
clock = _FakeClock()
# Act
with caplog.at_level(logging.INFO):
estimator = build_pose_estimator(
cfg,
ransac_filter=object(),
wgs_converter=WgsConverter,
se3_utils=object(),
isam2_graph_handle=handle,
fdr_client=fdr,
clock=clock,
)
# Assert
assert isinstance(estimator, OpenCVGtsamPoseEstimator)
ready_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.ready"
]
assert len(ready_records) == 1
assert ready_records[0].kv["strategy"] == "opencv_gtsam"
assert (
ready_records[0].kv["default_covariance"]
== CovarianceMode.MARGINALS.value
)
# Identity-shared: the estimator holds the SAME handle, fdr, clock instances.
assert estimator._handle is handle # noqa: SLF001
assert estimator._fdr_client is fdr # noqa: SLF001
assert estimator._clock is clock # noqa: SLF001
def test_az358_ac10_create_module_factory_returns_protocol_instance() -> None:
# Arrange / Act
cfg = _build_config()
estimator = create(
cfg,
ransac_filter=object(),
wgs_converter=WgsConverter,
se3_utils=object(),
isam2_graph_handle=_RecordingISam2GraphHandle(),
)
# Assert
assert isinstance(estimator, PoseEstimator)
# ---------------------------------------------------------------------
# AC-11: FDR pose.frame_done record shape.
def test_az358_ac11_fdr_pose_frame_done_shape_on_success() -> None:
# Arrange
fdr = FakeFdrSink(producer_id="c4_pose")
match_result, _ = _synthesise_match_result()
estimator = _build_estimator(fdr=fdr)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimator.estimate(match_result, calibration, thermal)
# Assert
records = [r for r in fdr.records if r.kind == "pose.frame_done"]
assert len(records) == 1
payload = records[0].payload
expected_keys = {
"frame_id",
"inliers",
"residual_px",
"mode",
"covariance_norm",
"position_wgs84",
}
assert expected_keys <= set(payload.keys())
assert payload["mode"] == "marginals"
assert "error" not in payload # success path omits the flag
# ---------------------------------------------------------------------
# AC-7-bonus: handle update receives the prior factor in the local graph.
def test_az358_update_carries_prior_factor_in_local_graph() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
handle = _RecordingISam2GraphHandle()
estimator = _build_estimator(handle=handle)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimator.estimate(match_result, calibration, thermal)
# Assert — extract the (graph, values, timestamps) tuple from
# the update call and verify the graph carries exactly one
# prior factor.
update_calls = [c for c in handle.calls if c[0] == "update"]
assert len(update_calls) == 1
_, graph, _values, timestamps = update_calls[0]
assert graph.size() == 1
assert timestamps is None
# ---------------------------------------------------------------------
# AZ-361 AC-1 / AC-2 / AC-4: per-frame mode dispatch + latency ≤ 1 frame.
def test_az361_ac1_2_4_per_frame_mode_dispatch_alternates() -> None:
# Arrange — alternate throttle bit each call; verify the mode
# tracks the bit on EVERY call.
estimator = _build_estimator()
calibration = _build_calibration()
thermal_factory = _MutableThermalState()
pattern = [False, True, False, True, False, True]
observed: list[CovarianceMode] = []
# Act
with warnings.catch_warnings():
warnings.simplefilter("ignore", CovarianceDegradedWarning)
for throttle in pattern:
match_result, _ = _synthesise_match_result(seed=42)
estimate = estimator.estimate(
match_result, calibration, thermal_factory.with_throttle(throttle)
)
observed.append(estimate.covariance_mode)
# AC-2 — switch-latency: the mode set on THIS call MUST
# reflect THIS call's flag, not the previous one's.
assert estimator.current_covariance_mode() is estimate.covariance_mode
# Assert AC-1 — every observed mode matches the expected
# pattern (False→MARGINALS, True→JACOBIAN).
expected = [
CovarianceMode.JACOBIAN if t else CovarianceMode.MARGINALS for t in pattern
]
assert observed == expected
# ---------------------------------------------------------------------
# AZ-361 AC-3 + AC-5: Jacobian SPD + source label.
def test_az361_ac3_5_jacobian_success_invariants() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Act
with warnings.catch_warnings():
warnings.simplefilter("ignore", CovarianceDegradedWarning)
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert
cov = estimate.covariance_6x6
assert cov.shape == (6, 6)
assert np.allclose(cov, cov.T, atol=1e-10)
np.linalg.cholesky(cov)
assert estimate.covariance_mode is CovarianceMode.JACOBIAN
assert estimate.source_label is PoseSourceLabel.SATELLITE_ANCHORED
# ---------------------------------------------------------------------
# AZ-361 AC-6: CovarianceDegradedWarning emitted via warnings.warn (not raised).
def test_az361_ac6_warning_emitted_via_warnings_warn() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
# Disable rate-limiting so this single-call test is deterministic.
estimator = _build_estimator(
config=_build_config(covariance_degraded_warn_window_ns=0)
)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Act
with warnings.catch_warnings(record=True) as records:
warnings.simplefilter("always", CovarianceDegradedWarning)
caught_via_exception = False
try:
estimator.estimate(match_result, calibration, thermal)
except Exception:
caught_via_exception = True
# Assert — warning emitted, not raised.
assert caught_via_exception is False
cov_warnings = [
r for r in records if isinstance(r.message, CovarianceDegradedWarning)
]
assert len(cov_warnings) == 1
# ---------------------------------------------------------------------
# AZ-361 AC-7 + AC-8: rate-limit warning + WARN log to ≤ 1 per window.
def test_az361_ac7_8_warning_and_log_rate_limited(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — 60 s window; advance the clock manually to step
# past it. Run 5 throttled calls within window 0; expect 1
# warning. Advance past 60 s; run 5 more; expect 1 more.
cfg = _build_config(covariance_degraded_warn_window_ns=60_000_000_000)
clock = _FakeClock()
estimator = _build_estimator(clock=clock, config=cfg)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Act
all_warnings: list[Warning] = []
with warnings.catch_warnings(record=True) as records:
warnings.simplefilter("always", CovarianceDegradedWarning)
with caplog.at_level(logging.WARNING):
for i in range(5):
clock.advance(100_000_000) # +0.1 s per call (still in window 1)
match_result, _ = _synthesise_match_result()
estimator.estimate(match_result, calibration, thermal)
clock.advance(61_000_000_000) # +61 s — jump past the window
for i in range(5):
clock.advance(100_000_000)
match_result, _ = _synthesise_match_result()
estimator.estimate(match_result, calibration, thermal)
all_warnings = [
r.message for r in records if isinstance(r.message, CovarianceDegradedWarning)
]
# Assert — exactly 2 warnings (1 per window) and exactly 2 WARN logs.
assert len(all_warnings) == 2
warn_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c4.pose.covariance_degraded"
]
assert len(warn_records) == 2
# ---------------------------------------------------------------------
# AZ-361 AC-12: Jacobian path SKIPS iSAM2 factor add.
def test_az361_ac12_jacobian_path_skips_isam2_update() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
handle = _RecordingISam2GraphHandle()
estimator = _build_estimator(handle=handle)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Act
with warnings.catch_warnings():
warnings.simplefilter("ignore", CovarianceDegradedWarning)
estimator.estimate(match_result, calibration, thermal)
# Assert — NO update call, NO add_factor call, NO compute_marginals.
# last_anchor_age_ms IS expected (it's read for the PoseEstimate field).
counts = {name: 0 for name in ("update", "add_factor", "compute_marginals", "last_anchor_age_ms")}
for call in handle.calls:
counts[call[0]] = counts.get(call[0], 0) + 1
assert counts["update"] == 0
assert counts["add_factor"] == 0
assert counts["compute_marginals"] == 0
# ---------------------------------------------------------------------
# AZ-361 AC-13: FDR mode field distinguishes path.
def test_az361_ac13_fdr_mode_field_distinguishes_path() -> None:
# Arrange
fdr = FakeFdrSink(producer_id="c4_pose")
estimator = _build_estimator(fdr=fdr)
calibration = _build_calibration()
thermal_factory = _MutableThermalState()
# Act — one of each.
match_result_a, _ = _synthesise_match_result(seed=1)
estimator.estimate(match_result_a, calibration, thermal_factory.with_throttle(False))
with warnings.catch_warnings():
warnings.simplefilter("ignore", CovarianceDegradedWarning)
match_result_b, _ = _synthesise_match_result(seed=2)
estimator.estimate(match_result_b, calibration, thermal_factory.with_throttle(True))
# Assert
frame_done_records = [r for r in fdr.records if r.kind == "pose.frame_done"]
modes = [rec.payload["mode"] for rec in frame_done_records]
assert "marginals" in modes
assert "jacobian" in modes
# ---------------------------------------------------------------------
# AZ-361 AC-10: Near-singular Jacobian → defensive raise.
def test_az361_ac10_near_singular_jacobian_raises_pnp_failure(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — use a config with a near-zero ridge AND a degenerate
# all-coplanar inlier set whose Jacobian columns become
# numerically rank-deficient. Force the ridge to a value that
# is too small to rescue a singular JᵀJ.
cfg = _build_config(
covariance_degraded_warn_window_ns=0,
ridge_regularisation_epsilon=1e-30,
)
match_result, _ = _synthesise_match_result()
fdr = FakeFdrSink(producer_id="c4_pose")
estimator = _build_estimator(config=cfg, fdr=fdr)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Build a degenerate inlier set: all image points stacked at the
# principal point, all world points along the optical axis. This
# collapses the Jacobian's spatial columns to a single direction.
K = np.asarray(calibration.intrinsics_3x3, dtype=np.float64)
cx, cy = float(K[0, 2]), float(K[1, 2])
num = 10
image_pts = np.tile(np.array([cx, cy], dtype=np.float32), (num, 1))
tile_pixels = np.tile(np.array([128.0, 128.0], dtype=np.float32), (num, 1))
correspondences = np.hstack([image_pts, tile_pixels]).astype(np.float32)
candidate = CandidateMatchSet(
tile_id=_TEST_TILE_ID,
inlier_count=num,
inlier_correspondences=correspondences,
ransac_outlier_count=0,
per_candidate_residual_px=0.0,
)
degenerate = MatchResult(
frame_id=1,
per_candidate=(candidate,),
best_candidate_idx=0,
reprojection_residual_px=0.0,
matched_at=0,
matcher_label="disk_lightglue",
candidates_input=1,
candidates_dropped=0,
)
# Act / Assert — either PnP rejects the degenerate input OR the
# SPD check downstream raises with the documented message. Both
# are acceptable per AC-10 (which says "defensive raise"); the
# explicit AC-10 expectation is PnpFailureError no matter which
# stage triggered.
with caplog.at_level(logging.ERROR), pytest.raises(PnpFailureError):
estimator.estimate(degenerate, calibration, thermal)
# ---------------------------------------------------------------------
# AZ-358 AC-8 replacement: thermal-throttle no longer raises NotImplementedError.
def test_az358_ac8_replacement_throttle_now_runs_jacobian_path() -> None:
# Arrange
match_result, _ = _synthesise_match_result()
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(True)
# Act — should NOT raise NotImplementedError (AZ-361 replaced
# the AZ-358 placeholder branch).
with warnings.catch_warnings():
warnings.simplefilter("ignore", CovarianceDegradedWarning)
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert
assert estimate.covariance_mode is CovarianceMode.JACOBIAN
# ---------------------------------------------------------------------
# Bonus: emitted PoseEstimate.frame_id is a fresh UUID per call.
def test_pose_estimate_frame_id_is_fresh_uuid() -> None:
# Arrange
estimator = _build_estimator()
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
match_result_a, _ = _synthesise_match_result(seed=1)
match_result_b, _ = _synthesise_match_result(seed=2)
# Act
a = estimator.estimate(match_result_a, calibration, thermal)
b = estimator.estimate(match_result_b, calibration, thermal)
# Assert
assert isinstance(a.frame_id, UUID)
assert isinstance(b.frame_id, UUID)
assert a.frame_id != b.frame_id
# ---------------------------------------------------------------------
# Bonus: last_satellite_anchor_age_ms is sourced from the handle.
def test_pose_estimate_last_anchor_age_ms_from_handle() -> None:
# Arrange
class _AgedHandle(_RecordingISam2GraphHandle):
def last_anchor_age_ms(self) -> int:
return 7500
handle = _AgedHandle()
match_result, _ = _synthesise_match_result()
estimator = _build_estimator(handle=handle)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
# Act
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert
assert estimate.last_satellite_anchor_age_ms == 7500
# ---------------------------------------------------------------------
# Bonus: PoseEstimate.emitted_at sources from the injected clock.
def test_pose_estimate_emitted_at_from_clock() -> None:
# Arrange
clock = _FakeClock(start_ns=123_456_789)
estimator = _build_estimator(clock=clock)
calibration = _build_calibration()
thermal = _MutableThermalState().with_throttle(False)
match_result, _ = _synthesise_match_result()
# Act
estimate = estimator.estimate(match_result, calibration, thermal)
# Assert
assert estimate.emitted_at == 123_456_789