Files
gps-denied-onboard/tests/unit/c5_state/test_az384_marginals_outputs.py
T
Oleksandr Bezdieniezhnykh b3ad94c155 [AZ-384] C5 marginals + current_estimate/smoothed_history/health_snapshot
Replaces the last three NotImplementedError placeholders on
GtsamIsam2StateEstimator with real Marginals + output methods:

- current_estimate(): recovers the 6x6 Marginals covariance for the
  most-recently committed pose key, enforces the SPD invariant via
  np.linalg.cholesky (Invariant 10), converts the local-ENU pose
  translation to WGS84 via the shared WgsConverter, derives a
  body->world quaternion, and emits a fresh EstimatorOutput
  (smoothed=False, Invariant 4). On SPD failure transitions
  isam2_state -> LOST and raises EstimatorFatalError (AC-5.2 path).
- smoothed_history(n): iterates the smoother's active POSE keys via
  _smoother.calculateEstimate().keys() (filtered by GTSAM symbol
  char) and the smoother timestamps via ts_map.at(key) - workaround
  for the pinned gtsam_unstable build's non-iterable
  FixedLagSmootherKeyTimestampMap. Bounded by K (Invariant 6); every
  entry has smoothed=True (Invariant 7).
- health_snapshot(): cheap O(1) accumulator read; reports
  IsamState lifecycle, pose-key count, AC-NEW-8
  cov_norm_growing_for_s rolling 60s deque-backed counter, and
  spoof_promotion_blocked via the AZ-385 state machine injection
  point.

Adds two public injection points for AZ-385/composition root:
set_enu_origin(LatLonAlt) and attach_source_label_state_machine(machine).
Defaults: (0, 0, 0) ENU origin, VISUAL_PROPAGATED source label,
spoof_promotion_blocked=False.

Wires _record_committed_pose_key into the three add_* success paths
so current_estimate only reads keys that have real values in iSAM2.
The JACOBIAN path in add_pose_anchor deliberately skips this call -
Invariant 3 keeps the JACOBIAN pose out of the iSAM2 graph.

Tests: +27 in tests/unit/c5_state/test_az384_marginals_outputs.py
covering all 10 ACs. Three obsolete AZ-382 tests
(test_ac10_*_raises_named_az384) removed. Full suite: 589 passed,
2 skipped.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 06:20:01 +03:00

467 lines
15 KiB
Python

"""AZ-384 — GtsamIsam2StateEstimator current_estimate / smoothed_history / health_snapshot.
Ten ACs from ``_docs/02_tasks/done/AZ-384_c5_marginals_outputs.md``:
- AC-1 ``current_estimate`` returns a fresh ``EstimatorOutput`` per
call with ``smoothed=False``.
- AC-2 SPD covariance — Cholesky must succeed; non-SPD raises
:class:`EstimatorFatalError`.
- AC-3 WGS84 conversion — uses the shared ``WgsConverter``;
ENU origin defaults to (0, 0, 0) when none injected.
- AC-4 ``smoothed_history(n)`` is bounded by K=15; every entry has
``smoothed=True``.
- AC-5 ``current_estimate`` has ``smoothed=False`` (distinguishes
from history).
- AC-6 ``health_snapshot.isam2_state`` reflects convergence:
``INIT`` before first estimate, ``TRACKING`` after, ``LOST``
on a fatal SPD failure.
- AC-7 ``keyframe_count`` matches the smoother's pose-key count.
- AC-8 ``cov_norm_growing_for_s`` increments under monotone-rising
covariance norms; resets to 0 on a non-rising frame.
- AC-9 ``spoof_promotion_blocked`` queries the injected
``source_label_state_machine`` (default ``False`` when none).
- AC-10 ``last_satellite_anchor_age_ms`` is a pass-through from
``handle.last_anchor_age_ms()``.
The seeded-prior approach in the helpers below mirrors what AZ-388
(AC-5.2 fallback) will do at startup — a single prior factor + an
initial value for x0, so iSAM2 has something to read in
``current_estimate``. Marginal-quality tests use mock handles so we
can synthesise specific covariance matrices without driving the
optimiser into a particular numerical state.
"""
from __future__ import annotations
from unittest import mock
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import (
EstimatorHealth,
EstimatorOutput,
IsamState,
PoseSourceLabel,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import EstimatorFatalError
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create,
)
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
def _build_estimator() -> GtsamIsam2StateEstimator:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, _ = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
return estimator
def _seed_prior(
estimator: GtsamIsam2StateEstimator,
pose: gtsam.Pose3 | None = None,
*,
ts_seconds: float = 0.0,
) -> int:
"""Plant a prior factor + initial value at x0 on the real iSAM2 graph.
Returns the seeded pose key. This is the minimal scaffolding
needed to make ``current_estimate`` succeed; AZ-388 will own
this seeding at startup once the AC-5.2 fallback wiring lands.
Provides a real ``FixedLagSmootherKeyTimestampMap`` so the
smoother tracks the key (otherwise ``keyframe_count`` /
``smoothed_history`` see an empty window).
"""
import gtsam_unstable
pose = pose if pose is not None else gtsam.Pose3()
key = gtsam.symbol("x", estimator._next_key_counter)
estimator._next_key_counter += 1
noise = gtsam.noiseModel.Isotropic.Sigma(6, 0.1)
prior = gtsam.PriorFactorPose3(key, pose, noise)
graph = gtsam.NonlinearFactorGraph()
graph.add(prior)
values = gtsam.Values()
values.insert(key, pose)
ts_map = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
ts_map.insert((key, ts_seconds))
estimator._isam2_handle.update(graph, values, timestamps=ts_map)
estimator._record_committed_pose_key(key)
return key
# ---------------------------------------------------------------------
# AC-1 + AC-5: current_estimate fresh EstimatorOutput; smoothed=False
def test_ac1_current_estimate_returns_fresh_estimator_output() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out_a = estimator.current_estimate()
out_b = estimator.current_estimate()
assert isinstance(out_a, EstimatorOutput)
assert isinstance(out_b, EstimatorOutput)
assert out_a is not out_b
assert out_a.frame_id != out_b.frame_id
def test_ac1_no_committed_pose_key_raises_fatal() -> None:
estimator = _build_estimator()
with pytest.raises(EstimatorFatalError, match="no committed pose key"):
estimator.current_estimate()
def test_ac5_current_estimate_smoothed_false() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
assert out.smoothed is False
# ---------------------------------------------------------------------
# AC-2: SPD covariance enforcement
def test_ac2_spd_invariant_holds_for_real_marginals() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
# Should not raise — well-conditioned posterior is SPD.
np.linalg.cholesky(out.covariance_6x6)
def test_ac2_non_spd_marginals_raises_fatal() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.zeros((6, 6))
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
with pytest.raises(EstimatorFatalError, match="SPD"):
estimator.current_estimate()
assert estimator._isam2_state == IsamState.LOST
# ---------------------------------------------------------------------
# AC-3: WGS84 conversion uses WgsConverter
def test_ac3_default_origin_is_equator() -> None:
estimator = _build_estimator()
_seed_prior(estimator, pose=gtsam.Pose3())
out = estimator.current_estimate()
# Identity pose at default origin (0, 0, 0) → output is exactly the origin.
assert out.position_wgs84.lat_deg == pytest.approx(0.0, abs=1e-6)
assert out.position_wgs84.lon_deg == pytest.approx(0.0, abs=1e-6)
assert out.position_wgs84.alt_m == pytest.approx(0.0, abs=1e-3)
def test_ac3_explicit_origin_round_trips() -> None:
estimator = _build_estimator()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.0, alt_m=120.0)
estimator.set_enu_origin(origin)
_seed_prior(estimator, pose=gtsam.Pose3())
out = estimator.current_estimate()
assert out.position_wgs84.lat_deg == pytest.approx(50.0, abs=1e-6)
assert out.position_wgs84.lon_deg == pytest.approx(36.0, abs=1e-6)
assert out.position_wgs84.alt_m == pytest.approx(120.0, abs=1e-3)
def test_ac3_translated_pose_offsets_from_origin() -> None:
estimator = _build_estimator()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.0, alt_m=120.0)
estimator.set_enu_origin(origin)
pose_100m_east = gtsam.Pose3(gtsam.Rot3(), gtsam.Point3(100.0, 0.0, 0.0))
_seed_prior(estimator, pose=pose_100m_east)
out = estimator.current_estimate()
# ~100m east at 50° lat is ~0.0014° lon; just confirm the
# direction + same lat.
assert out.position_wgs84.lat_deg == pytest.approx(50.0, abs=1e-4)
assert out.position_wgs84.lon_deg > 36.0
assert out.position_wgs84.lon_deg < 36.01
# ---------------------------------------------------------------------
# AC-4: smoothed_history bounded by K
def test_ac4_smoothed_history_bounded_by_k() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(100)
assert len(history) <= 15
def test_ac4_smoothed_history_entries_have_smoothed_true() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(100)
assert len(history) >= 1
for entry in history:
assert entry.smoothed is True
def test_ac4_smoothed_history_empty_when_n_zero() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(0)
assert history == []
def test_ac4_smoothed_history_empty_before_seed() -> None:
estimator = _build_estimator()
history = estimator.smoothed_history(10)
assert history == []
# ---------------------------------------------------------------------
# AC-6: IsamState lifecycle
def test_ac6_isam2_state_init_before_first_estimate() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.INIT
def test_ac6_isam2_state_tracking_after_estimate() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.TRACKING
def test_ac6_isam2_state_lost_after_fatal() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.zeros((6, 6))
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
with pytest.raises(EstimatorFatalError):
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.LOST
# ---------------------------------------------------------------------
# AC-7: keyframe_count accuracy
def test_ac7_keyframe_count_initially_zero() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.keyframe_count == 0
def test_ac7_keyframe_count_grows_with_seeded_keys() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
snap = estimator.health_snapshot()
assert snap.keyframe_count == 1
# ---------------------------------------------------------------------
# AC-8: cov_norm_growing_for_s monotonicity
def test_ac8_cov_norm_growing_zero_with_constant_norm() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.1
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for _ in range(5):
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.cov_norm_growing_for_s == 0.0
def test_ac8_cov_norm_growing_increments_under_rising_norm() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for sigma in [0.05, 0.1, 0.2, 0.3, 0.5]:
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * sigma
estimator.current_estimate()
growing_s = estimator._cov_norm_growing_for_s()
assert growing_s > 0.0
def test_ac8_cov_norm_growing_resets_on_drop() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for sigma in [0.05, 0.1, 0.2, 0.3, 0.5]:
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * sigma
estimator.current_estimate()
# Now drop the norm — the rising chain breaks.
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.05
estimator.current_estimate()
assert estimator._cov_norm_growing_for_s() == 0.0
# ---------------------------------------------------------------------
# AC-9: spoof_promotion_blocked via injected state machine
def test_ac9_default_spoof_promotion_blocked_false() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.spoof_promotion_blocked is False
def test_ac9_spoof_promotion_blocked_from_state_machine() -> None:
estimator = _build_estimator()
machine = mock.MagicMock()
machine.is_spoof_promotion_blocked.return_value = True
machine.current_label.return_value = PoseSourceLabel.DEAD_RECKONED
estimator.attach_source_label_state_machine(machine)
snap = estimator.health_snapshot()
assert snap.spoof_promotion_blocked is True
def test_ac9_state_machine_drives_source_label() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
machine = mock.MagicMock()
machine.is_spoof_promotion_blocked.return_value = False
machine.current_label.return_value = PoseSourceLabel.SATELLITE_ANCHORED
estimator.attach_source_label_state_machine(machine)
out = estimator.current_estimate()
assert out.source_label == PoseSourceLabel.SATELLITE_ANCHORED
def test_ac9_default_source_label_is_visual_propagated() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
assert out.source_label == PoseSourceLabel.VISUAL_PROPAGATED
# ---------------------------------------------------------------------
# AC-10: last_satellite_anchor_age_ms pass-through
def test_ac10_last_satellite_anchor_age_ms_passthrough() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.1
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 1234
estimator._isam2_handle = fake_handle
out = estimator.current_estimate()
assert out.last_satellite_anchor_age_ms == 1234
def test_ac10_emitted_at_is_monotonic_ns() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out_a = estimator.current_estimate()
out_b = estimator.current_estimate()
assert isinstance(out_a.emitted_at, int)
assert out_b.emitted_at >= out_a.emitted_at
# ---------------------------------------------------------------------
# Defensive — health_snapshot is cheap (no marginals call)
def test_health_snapshot_does_not_call_marginals() -> None:
estimator = _build_estimator()
fake_handle = mock.MagicMock()
estimator._isam2_handle = fake_handle
snap = estimator.health_snapshot()
fake_handle.compute_marginals.assert_not_called()
assert isinstance(snap, EstimatorHealth)