[AZ-384] C5 marginals + current_estimate/smoothed_history/health_snapshot

Replaces the last three NotImplementedError placeholders on
GtsamIsam2StateEstimator with real Marginals + output methods:

- current_estimate(): recovers the 6x6 Marginals covariance for the
  most-recently committed pose key, enforces the SPD invariant via
  np.linalg.cholesky (Invariant 10), converts the local-ENU pose
  translation to WGS84 via the shared WgsConverter, derives a
  body->world quaternion, and emits a fresh EstimatorOutput
  (smoothed=False, Invariant 4). On SPD failure transitions
  isam2_state -> LOST and raises EstimatorFatalError (AC-5.2 path).
- smoothed_history(n): iterates the smoother's active POSE keys via
  _smoother.calculateEstimate().keys() (filtered by GTSAM symbol
  char) and the smoother timestamps via ts_map.at(key) - workaround
  for the pinned gtsam_unstable build's non-iterable
  FixedLagSmootherKeyTimestampMap. Bounded by K (Invariant 6); every
  entry has smoothed=True (Invariant 7).
- health_snapshot(): cheap O(1) accumulator read; reports
  IsamState lifecycle, pose-key count, AC-NEW-8
  cov_norm_growing_for_s rolling 60s deque-backed counter, and
  spoof_promotion_blocked via the AZ-385 state machine injection
  point.

Adds two public injection points for AZ-385/composition root:
set_enu_origin(LatLonAlt) and attach_source_label_state_machine(machine).
Defaults: (0, 0, 0) ENU origin, VISUAL_PROPAGATED source label,
spoof_promotion_blocked=False.

Wires _record_committed_pose_key into the three add_* success paths
so current_estimate only reads keys that have real values in iSAM2.
The JACOBIAN path in add_pose_anchor deliberately skips this call -
Invariant 3 keeps the JACOBIAN pose out of the iSAM2 graph.

Tests: +27 in tests/unit/c5_state/test_az384_marginals_outputs.py
covering all 10 ACs. Three obsolete AZ-382 tests
(test_ac10_*_raises_named_az384) removed. Full suite: 589 passed,
2 skipped.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 06:20:01 +03:00
parent fd848266d1
commit b3ad94c155
6 changed files with 991 additions and 37 deletions
@@ -0,0 +1,78 @@
# Batch 15 — Cycle 1 Implementation Report
**Batch**: 15 of N
**Tasks landed**: AZ-384 (`GtsamIsam2StateEstimator` — Marginals + output methods)
**Cycle**: 1
**Date**: 2026-05-11
## Scope
| Task | Component | Purpose |
|------|-----------|---------|
| AZ-384 | C5 state estimator | Replaces the three remaining `NotImplementedError` placeholders on `GtsamIsam2StateEstimator` (`current_estimate`, `smoothed_history`, `health_snapshot`) with real implementations. `current_estimate` recovers the 6x6 Marginals covariance for the most-recently committed pose key, enforces the SPD invariant via Cholesky decomposition (Invariant 10), converts the local-ENU pose translation to WGS84 via the shared `WgsConverter`, derives a body→world quaternion from the iSAM2 pose, and emits a fresh `EstimatorOutput(smoothed=False)` (Invariant 4). `smoothed_history(n)` iterates the smoother's active POSE keys (filtered by GTSAM symbol char), sorts by per-key timestamp from `IncrementalFixedLagSmoother.timestamps()`, takes the most recent `min(n, K)` entries, and emits `EstimatorOutput(smoothed=True)` for each (Invariant 6 + 7). `health_snapshot` returns an O(1) accumulator read — `IsamState` lifecycle, pose-key count from the smoother estimate, AC-NEW-8 `cov_norm_growing_for_s` rolling-window counter, and the spoof-promotion gate query through the (still-pending AZ-385) state machine injection point. |
## Files added / modified
### Modified (prod)
- `src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py` — replaced the three Protocol `NotImplementedError` bodies with real Marginals + output method bodies; introduced six new state fields on `GtsamIsam2StateEstimator` (`_last_committed_pose_key`, `_enu_origin`, `_source_label_machine`, `_cov_norm_window`, `_isam2_state`); added two public injection methods (`set_enu_origin`, `attach_source_label_state_machine`); added seven internal helpers (`_record_committed_pose_key`, `_pose_at_key`, `_enu_pose_to_wgs84`, `_latest_velocity_or_zero`, `_derive_source_label`, `_spoof_promotion_blocked`, `_smoother_keyframe_count`, `_record_cov_norm_sample`, `_prune_cov_norm_window`, `_cov_norm_growing_for_s`); wired `_record_committed_pose_key` into the three `add_*` success paths (sets the key the next `current_estimate` will read); added two module-level pure helpers (`_quat_from_pose3`, `_enforce_spd`); added module-level constants `_COV_NORM_WINDOW_NS = 60 * 1e9` ns and `_DEFAULT_ENU_ORIGIN = LatLonAlt(0, 0, 0)`.
### Added (tests)
- `tests/unit/c5_state/test_az384_marginals_outputs.py` — 27 tests covering all 10 ACs. The shared `_seed_prior` helper plants a real prior factor + initial value + timestamp map onto the iSAM2 + smoother — the minimal scaffolding `current_estimate` needs (AZ-388 will own this seeding at startup once the AC-5.2 fallback lands).
### Modified (tests)
- `tests/unit/c5_state/test_az382_isam2_smoother_wiring.py` — removed the three now-obsolete `test_ac10_*_raises_named_az384` tests (`current_estimate` / `smoothed_history` / `health_snapshot` no longer raise `NotImplementedError`; they have real bodies). Replaced with a comment block pointing readers to both `test_az383_factor_adds.py` and `test_az384_marginals_outputs.py`.
## Architectural notes
- **`_last_committed_pose_key` tracking** — set ONLY after a successful `handle.update` that committed a value for the key. The JACOBIAN path in `add_pose_anchor` deliberately does NOT call `_record_committed_pose_key` because Invariant 3 forbids adding the JACOBIAN pose to the iSAM2 graph (the running estimate consumes it downstream, but the graph stops growing under throttle). This means `current_estimate` reads only keys that have ACTUAL values in iSAM2 — no missing-key surprises in the steady state.
- **ENU origin as injection point** — `set_enu_origin(LatLonAlt)` is the wiring seam for AZ-385 (which will derive the origin from the first satellite-anchored pose via the spoof-promotion gate). Before the wire-up, the estimator falls back to `_DEFAULT_ENU_ORIGIN = (0, 0, 0)` — correct for tests, obviously not for flight; the composition root SHOULD call `set_enu_origin` before steady-state operation.
- **GTSAM iteration via `calculateEstimate().keys()` + `timestamps().at(key)`** — `FixedLagSmootherKeyTimestampMap` is not iterable in the pinned `gtsam_unstable` build (no `items()`, no `keys()`, no `__iter__`). The workaround: iterate `_smoother.calculateEstimate().keys()` (which returns the smoother's active values, bounded by the K-keyframe window), filter to `'x'`-namespace pose keys via `gtsam.symbolChr(key) == ord("x")`, then probe per-key timestamps with `ts_map.at(int(key))`. Documented inline as a forward-compat note since a future GTSAM version may expose `items()` directly.
- **Smoother estimate vs. iSAM2 estimate** — `current_estimate` reads from `_isam2.calculateEstimate()` (the filter posterior — the most up-to-date best estimate). `smoothed_history` reads from `_smoother.calculateEstimate()` (the smoother posterior — past keyframes re-optimised given all later evidence). This split matches the C5 contract semantics: `smoothed=True` entries are genuinely smoothed; `smoothed=False` entries are filter-only.
- **SPD invariant enforcement (Invariant 10)** — `_enforce_spd` uses `np.linalg.cholesky` (same primitive iSAM2 uses internally). On `LinAlgError` we transition `_isam2_state = LOST`, log a structured `c5.state.current_estimate_spd_failed` record with the covariance Frobenius norm for forensics, and raise `EstimatorFatalError` — which triggers the AC-5.2 IMU-only fallback path in C8 once AZ-388 lands.
- **AC-NEW-8 `cov_norm_growing_for_s` rolling window** — implemented as a `deque[(monotonic_ns, fro_norm)]` lazy-pruned to a 60 s window on every `_record_cov_norm_sample` call. The "growing for" length is computed by walking the deque newest → oldest until the chain breaks (a sample whose norm is ≤ its successor). Cheap O(n) per snapshot, n bounded by the keyframe rate × 60 s.
- **Source-label state machine — injection point only** — `attach_source_label_state_machine` accepts any object exposing `current_label() -> PoseSourceLabel` and `is_spoof_promotion_blocked() -> bool`. AZ-385 owns the actual state-machine impl + transition logic. Defaults: `VISUAL_PROPAGATED` label and `spoof_promotion_blocked = False`. Exceptions raised by the injected machine are logged but downgraded to the defaults so a flaky state machine does NOT take down the estimator.
- **No marginals call in `health_snapshot`** — the contract NFR is `health_snapshot` p99 ≤ 5 µs. Strictly accumulator reads + a single `calculateEstimate().keys()` walk for the pose-key count. Verified by an explicit "no compute_marginals call" test.
## Test counts
| Suite | Before (B14) | After (B15) | Delta |
|-------|--------------|-------------|-------|
| Total passing | 565 | 589 | +24 |
| Skipped | 2 | 2 | 0 |
| AZ-384 (new) | 0 | 27 | +27 |
| AZ-382 (preserved) | 24 | 21 | 3 (obsolete `test_ac10_*_raises_named_az384` tests removed; behaviour now lives in AZ-384 tests) |
Run command: `PYTHONPATH=src pytest tests/ -q``589 passed, 2 skipped in ~19s`.
## Lint / type
- `ruff check src/gps_denied_onboard/components/c5_state/ tests/unit/c5_state/` — clean.
- `ruff format` — one file reformatted, 12 already formatted.
- `ReadLints` on touched files — 0 errors.
## Acceptance evidence
| AC | Test(s) | Status |
|----|---------|--------|
| AC-1 Fresh `EstimatorOutput` | `test_ac1_current_estimate_returns_fresh_estimator_output`, `test_ac1_no_committed_pose_key_raises_fatal` | PASS |
| AC-2 SPD covariance | `test_ac2_spd_invariant_holds_for_real_marginals`, `test_ac2_non_spd_marginals_raises_fatal` | PASS |
| AC-3 WGS84 conversion | `test_ac3_default_origin_is_equator`, `test_ac3_explicit_origin_round_trips`, `test_ac3_translated_pose_offsets_from_origin` | PASS |
| AC-4 `smoothed_history` bounded by K | `test_ac4_smoothed_history_bounded_by_k`, `test_ac4_smoothed_history_entries_have_smoothed_true`, `test_ac4_smoothed_history_empty_when_n_zero`, `test_ac4_smoothed_history_empty_before_seed` | PASS |
| AC-5 `current_estimate` has `smoothed=False` | `test_ac5_current_estimate_smoothed_false` | PASS |
| AC-6 `isam2_state` lifecycle | `test_ac6_isam2_state_init_before_first_estimate`, `test_ac6_isam2_state_tracking_after_estimate`, `test_ac6_isam2_state_lost_after_fatal` | PASS |
| AC-7 `keyframe_count` accuracy | `test_ac7_keyframe_count_initially_zero`, `test_ac7_keyframe_count_grows_with_seeded_keys` | PASS |
| AC-8 `cov_norm_growing_for_s` | `test_ac8_cov_norm_growing_zero_with_constant_norm`, `test_ac8_cov_norm_growing_increments_under_rising_norm`, `test_ac8_cov_norm_growing_resets_on_drop` | PASS |
| AC-9 `spoof_promotion_blocked` via state machine | `test_ac9_default_spoof_promotion_blocked_false`, `test_ac9_spoof_promotion_blocked_from_state_machine`, `test_ac9_state_machine_drives_source_label`, `test_ac9_default_source_label_is_visual_propagated` | PASS |
| AC-10 `last_satellite_anchor_age_ms` pass-through | `test_ac10_last_satellite_anchor_age_ms_passthrough`, `test_ac10_emitted_at_is_monotonic_ns` | PASS |
| Defensive — cheap `health_snapshot` | `test_health_snapshot_does_not_call_marginals` | PASS |
## Known forward actions (not in scope this batch)
- **AZ-385 (source-label + spoof-promotion gate)** — owns the `SourceLabelStateMachine` implementation that AZ-384 only holds a reference to. Will also wire `set_enu_origin` to the first satellite-anchored pose.
- **AZ-386 (ESKF baseline)** — the mandatory simple-baseline `EskfStateEstimator` (IT-12 engine rule). Independent of AZ-384; shares the same `EstimatorOutput` / `EstimatorHealth` DTOs.
- **AZ-387 (smoothed history → FDR)** — the C13 FDR writer path. AZ-384's `smoothed_history(n)` is the input.
- **AZ-388 (AC-5.2 fallback)** — owns the startup `add_vio`-first-frame seeding so the full real-iSAM2 ↔ AZ-383 ↔ AZ-384 chain runs end-to-end. The AZ-384 test fixture's `_seed_prior` helper mirrors what AZ-388 will do at composition-root startup.
- **`_latest_velocity_or_zero`** — currently returns zeros until an IMU keyframe has committed a velocity value via `add_fc_imu`. The actual nonzero-velocity case will exercise once AZ-388 lands first-frame seeding for the IMU chain.
+1 -1
View File
@@ -8,7 +8,7 @@ status: in_progress
sub_step:
phase: 6
name: implement-tasks
detail: "batch 14 of N committed (AZ-383 c5 factor adds: add_vio BetweenFactorPose3 + add_pose_anchor mode-dispatch + add_fc_imu CombinedImuFactor with v/b key namespaces)"
detail: "batch 15 of N committed (AZ-384 c5 marginals + current_estimate/smoothed_history/health_snapshot + SPD invariant + ENU\u2192WGS84 + IsamState lifecycle + cov_norm_growing_for_s)"
retry_count: 0
cycle: 1
tracker: jira
@@ -31,13 +31,23 @@ there.
from __future__ import annotations
import time
from collections import deque
from typing import TYPE_CHECKING, Any, Final
from uuid import UUID
from uuid import UUID, uuid4
import gtsam
import gtsam_unstable
import numpy as np
from numpy.linalg import LinAlgError
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import (
EstimatorHealth,
EstimatorOutput,
IsamState,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c5_state._isam2_handle import (
ISam2GraphHandle,
ISam2GraphHandleImpl,
@@ -45,15 +55,16 @@ from gps_denied_onboard.components.c5_state._isam2_handle import (
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import (
EstimatorDegradedError,
EstimatorFatalError,
StateEstimatorConfigError,
)
from gps_denied_onboard.components.c5_state.interface import StateEstimator
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.nav import ImuWindow
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard._types.state import EstimatorHealth, EstimatorOutput
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard.config import Config
@@ -82,6 +93,19 @@ _STRATEGY: Final[str] = "gtsam_isam2"
# converge without exploding.
_DEFAULT_POSE_SIGMA: Final[float] = 0.1
# AC-NEW-8 rolling-window length (60 s) for ``cov_norm_growing_for_s``.
# We keep ``(monotonic_ns, cov_norm)`` tuples for every successful
# ``current_estimate()`` and lazy-prune anything older than this on
# read.
_COV_NORM_WINDOW_NS: Final[int] = 60 * 1_000_000_000
# Default ENU origin when none was injected — equator, prime meridian,
# sea level. Behaviour is deliberately permissive so the estimator can
# run on synthetic fixtures without a wired-up satellite anchor; the
# composition root SHOULD call :meth:`set_enu_origin` before steady
# state. AZ-385 will tie origin selection to the spoof-promotion gate.
_DEFAULT_ENU_ORIGIN: Final[LatLonAlt] = LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0)
class GtsamIsam2StateEstimator(StateEstimator):
"""Production-default C5 estimator — iSAM2 + ``IncrementalFixedLagSmoother``.
@@ -154,6 +178,30 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._prev_imu_v_key: int | None = None
self._prev_imu_b_key: int | None = None
# AZ-384 state -----------------------------------------------------
# Last pose key with an actually committed value in iSAM2 (used by
# ``current_estimate``). Set ONLY after a successful
# ``handle.update`` that inserts a value for that key.
self._last_committed_pose_key: int | None = None
# ENU origin for ``current_estimate``'s pose conversion. None
# means use ``_DEFAULT_ENU_ORIGIN``; AZ-385 wires the real
# origin via :meth:`set_enu_origin` from the first satellite
# anchor.
self._enu_origin: LatLonAlt | None = None
# Source-label state machine (AZ-385). When None,
# ``current_estimate`` emits ``VISUAL_PROPAGATED`` per the
# contract default and ``health_snapshot`` reports
# ``spoof_promotion_blocked=False``.
self._source_label_machine: Any | None = None
# AC-NEW-8 rolling window of ``(ts_monotonic_ns, cov_norm)``
# tuples for ``cov_norm_growing_for_s`` accounting.
self._cov_norm_window: deque[tuple[int, float]] = deque()
# iSAM2 lifecycle state. ``INIT`` until the first successful
# ``current_estimate``; ``TRACKING`` after; transitions to
# ``DEGRADED`` on an inflated covariance trend (AC-NEW-8) and
# ``LOST`` on a fatal SPD failure or GTSAM exception.
self._isam2_state: IsamState = IsamState.INIT
self._log.debug(
"c5.state.isam2_initialised",
extra={
@@ -188,6 +236,28 @@ class GtsamIsam2StateEstimator(StateEstimator):
"""
self._isam2_handle = handle
def set_enu_origin(self, origin: LatLonAlt) -> None:
"""Set the WGS84 origin used by :meth:`current_estimate` for ENU→WGS84.
The composition root SHOULD call this once at startup before
steady-state ``current_estimate`` calls. AZ-385 (source-label
gate) will eventually drive this from the first satellite-
anchored frame. Before this is called the estimator falls
back to :data:`_DEFAULT_ENU_ORIGIN` (equator / prime meridian
/ sea level) which is correct for tests but obviously wrong
for flight.
"""
self._enu_origin = origin
def attach_source_label_state_machine(self, machine: Any) -> None:
"""Wire the AZ-385 source-label / spoof-promotion state machine.
The injected object MUST expose ``current_label() -> PoseSourceLabel``
and ``is_spoof_promotion_blocked() -> bool``. AZ-384 only holds
the reference; AZ-385 owns the actual transition logic.
"""
self._source_label_machine = machine
def key_for_frame(self, frame_id: UUID | int) -> int:
"""Return the GTSAM ``Key`` for ``frame_id``, assigning on first use.
@@ -266,6 +336,7 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._reset_staging()
self._prev_vio = vio
self._last_added_ts_ns = ts_ns
self._record_committed_pose_key(curr_key)
self._log.debug(
"c5.state.add_vio_ok",
extra={
@@ -321,6 +392,7 @@ class GtsamIsam2StateEstimator(StateEstimator):
)
raise EstimatorDegradedError(f"add_pose_anchor failed: {exc}") from exc
self._reset_staging()
self._record_committed_pose_key(pose_key)
self._log.debug(
"c5.state.add_pose_anchor_ok",
extra={
@@ -452,6 +524,7 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._prev_imu_v_key = curr_v_key
self._prev_imu_b_key = curr_b_key
self._last_added_ts_ns = imu_window.ts_end_ns
self._record_committed_pose_key(curr_x_key)
self._log.debug(
"c5.state.add_fc_imu_ok",
extra={
@@ -466,21 +539,202 @@ class GtsamIsam2StateEstimator(StateEstimator):
)
# ------------------------------------------------------------------
# AZ-384 / AZ-385 / AZ-386 — body still owned by those tasks.
# AZ-384: marginals + output methods.
def current_estimate(self) -> EstimatorOutput:
raise NotImplementedError(
"Marginals + outputs owned by AZ-384 — current_estimate body lands there."
"""Recover the current posterior pose + 6x6 covariance.
Walks ``_isam2.calculateEstimate()`` for the most-recently
committed pose key, recovers the 6x6 Marginals covariance
from the iSAM2 graph (D-C5-5 = (c)), enforces the
SPD-positive-definite invariant (Invariant 10) before
emission, and assembles a fresh :class:`EstimatorOutput`
(Invariant 4 — never returns ``None`` on the steady-state
path).
"""
handle = self._require_handle()
if self._last_committed_pose_key is None:
raise EstimatorFatalError(
"current_estimate: no committed pose key yet (graph empty); "
"AC-5.2 fallback in C8 will trigger after no_estimate_fallback_s"
)
try:
marginals = handle.compute_marginals()
covariance = np.asarray(
marginals.marginalCovariance(self._last_committed_pose_key),
dtype=np.float64,
)
except EstimatorFatalError:
self._isam2_state = IsamState.LOST
raise
except Exception as exc:
self._isam2_state = IsamState.LOST
self._log.error(
"c5.state.current_estimate_marginals_failed",
extra={
"kind": "c5.state.current_estimate_marginals_failed",
"kv": {
"pose_key": self._last_committed_pose_key,
"error": str(exc),
},
},
)
raise EstimatorFatalError(f"compute_marginals failed: {exc}") from exc
try:
_enforce_spd(covariance)
except EstimatorFatalError:
self._isam2_state = IsamState.LOST
self._log.error(
"c5.state.current_estimate_spd_failed",
extra={
"kind": "c5.state.current_estimate_spd_failed",
"kv": {
"pose_key": self._last_committed_pose_key,
"covariance_fro_norm": float(np.linalg.norm(covariance, ord="fro")),
},
},
)
raise
pose = self._pose_at_key(self._last_committed_pose_key)
position_wgs84 = self._enu_pose_to_wgs84(pose)
orientation = _quat_from_pose3(pose)
velocity_world = self._latest_velocity_or_zero()
last_anchor_age_ms = int(handle.last_anchor_age_ms())
source_label = self._derive_source_label()
emitted_at = time.monotonic_ns()
self._record_cov_norm_sample(emitted_at, covariance)
if self._isam2_state == IsamState.INIT:
self._isam2_state = IsamState.TRACKING
if self._cov_norm_growing_for_s() > 0.0:
# AC-NEW-8 — sustained monotone covariance growth signals
# poor convergence. Surface DEGRADED but keep tracking;
# only the SPD failure above flips us to LOST.
self._isam2_state = IsamState.DEGRADED
return EstimatorOutput(
frame_id=uuid4(),
position_wgs84=position_wgs84,
orientation_world_T_body=orientation,
velocity_world_mps=velocity_world,
covariance_6x6=covariance,
source_label=source_label,
last_satellite_anchor_age_ms=last_anchor_age_ms,
smoothed=False,
emitted_at=emitted_at,
)
def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]:
raise NotImplementedError(
"Marginals + outputs owned by AZ-384 — smoothed_history body lands there."
)
"""Return up to ``min(n_keyframes, K)`` past smoothed estimates.
Iterates the ``IncrementalFixedLagSmoother`` active timestamp
map, sorts keys by ascending timestamp, takes the most recent
``min(n_keyframes, K)`` POSE keys (filters out velocity/bias
keys), and emits an :class:`EstimatorOutput` per key with
``smoothed=True`` (Invariant 7). Out-of-window keyframes are
not recoverable (Invariant 6).
"""
handle = self._require_handle()
if n_keyframes <= 0:
return []
try:
active_values = self._smoother.calculateEstimate()
ts_map = self._smoother.timestamps()
except Exception:
return []
pose_entries: list[tuple[int, float]] = []
for key in active_values.keys():
ikey = int(key)
if gtsam.symbolChr(ikey) != ord("x"):
continue
try:
ts_sec = float(ts_map.at(ikey))
except Exception:
continue
pose_entries.append((ikey, ts_sec))
pose_entries.sort(key=lambda kt: kt[1])
max_entries = min(n_keyframes, self._block.keyframe_window_size)
selected = pose_entries[-max_entries:]
if not selected:
return []
try:
marginals = handle.compute_marginals()
except Exception as exc:
self._log.error(
"c5.state.smoothed_history_marginals_failed",
extra={
"kind": "c5.state.smoothed_history_marginals_failed",
"kv": {"error": str(exc)},
},
)
raise EstimatorFatalError(f"smoothed_history marginals failed: {exc}") from exc
last_anchor_age_ms = int(handle.last_anchor_age_ms())
source_label = self._derive_source_label()
emitted_at = time.monotonic_ns()
out: list[EstimatorOutput] = []
for key, _ts in selected:
try:
cov = np.asarray(marginals.marginalCovariance(key), dtype=np.float64)
except Exception as exc:
self._log.error(
"c5.state.smoothed_history_per_key_failed",
extra={
"kind": "c5.state.smoothed_history_per_key_failed",
"kv": {"pose_key": key, "error": str(exc)},
},
)
continue
_enforce_spd(cov)
try:
pose = active_values.atPose3(key)
except Exception as exc:
self._log.error(
"c5.state.smoothed_history_pose_read_failed",
extra={
"kind": "c5.state.smoothed_history_pose_read_failed",
"kv": {"pose_key": key, "error": str(exc)},
},
)
continue
out.append(
EstimatorOutput(
frame_id=uuid4(),
position_wgs84=self._enu_pose_to_wgs84(pose),
orientation_world_T_body=_quat_from_pose3(pose),
velocity_world_mps=(0.0, 0.0, 0.0),
covariance_6x6=cov,
source_label=source_label,
last_satellite_anchor_age_ms=last_anchor_age_ms,
smoothed=True,
emitted_at=emitted_at,
)
)
return out
def health_snapshot(self) -> EstimatorHealth:
raise NotImplementedError(
"Marginals + outputs owned by AZ-384 — health_snapshot body lands there."
"""Return the current iSAM2 health snapshot.
Cheap O(1) accumulator read — never touches Marginals
(contract NFR: ``health_snapshot`` p99 ≤ 5 µs). Reports the
cached :class:`IsamState`, the smoother's active keyframe
count, the AC-NEW-8 ``cov_norm_growing_for_s`` rolling
counter, and the spoof-promotion gate state via the injected
:attr:`_source_label_machine` (default ``False`` if none).
"""
return EstimatorHealth(
isam2_state=self._isam2_state,
keyframe_count=self._smoother_keyframe_count(),
cov_norm_growing_for_s=self._cov_norm_growing_for_s(),
spoof_promotion_blocked=self._spoof_promotion_blocked(),
)
# ------------------------------------------------------------------
@@ -515,6 +769,146 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._graph = gtsam.NonlinearFactorGraph()
self._values = gtsam.Values()
def _record_committed_pose_key(self, key: int) -> None:
"""Cache the most-recently committed pose key for ``current_estimate``.
Called from every ``add_*`` success path AFTER
``handle.update`` returns; guarantees the key has a real
value in ``_isam2`` before ``current_estimate`` tries to read
it. The JACOBIAN path of ``add_pose_anchor`` deliberately
does NOT call this — its pose is never committed to the
iSAM2 graph (Invariant 3).
"""
self._last_committed_pose_key = key
def _pose_at_key(self, key: int) -> gtsam.Pose3:
"""Read ``Pose3`` for ``key`` from the current iSAM2 estimate.
Raises :class:`EstimatorFatalError` if the key is missing or
the read fails (e.g. GTSAM raised). The caller is responsible
for guaranteeing the key was previously committed via
:meth:`_record_committed_pose_key`.
"""
try:
values = self._isam2.calculateEstimate()
return values.atPose3(key)
except Exception as exc:
raise EstimatorFatalError(f"_pose_at_key({key}) failed: {exc}") from exc
def _enu_pose_to_wgs84(self, pose: gtsam.Pose3) -> LatLonAlt:
"""Convert a local-ENU pose's translation to WGS84.
Uses the injected ``_enu_origin`` (or :data:`_DEFAULT_ENU_ORIGIN`
when none was wired). The orientation is preserved verbatim
as a body→world quaternion — only the translation needs the
WGS coordinate change.
"""
origin = self._enu_origin if self._enu_origin is not None else _DEFAULT_ENU_ORIGIN
translation = np.asarray(pose.translation(), dtype=np.float64).reshape(3)
try:
return WgsConverter.local_enu_to_latlonalt(origin, translation)
except Exception as exc:
raise EstimatorFatalError(f"_enu_pose_to_wgs84 failed: {exc}") from exc
def _latest_velocity_or_zero(self) -> tuple[float, float, float]:
"""Read the most-recent IMU keyframe's velocity or fall back to zero.
``CombinedImuFactor`` writes a 3-vector velocity at the ``v``
key for each IMU keyframe. When no IMU keyframe has been
committed yet, returns ``(0, 0, 0)`` — the downstream consumer
infers this from a ``DEAD_RECKONED`` / ``VISUAL_PROPAGATED``
source label.
"""
if self._prev_imu_v_key is None:
return (0.0, 0.0, 0.0)
try:
values = self._isam2.calculateEstimate()
raw = np.asarray(values.atVector(self._prev_imu_v_key), dtype=np.float64).reshape(3)
return (float(raw[0]), float(raw[1]), float(raw[2]))
except Exception:
return (0.0, 0.0, 0.0)
def _derive_source_label(self) -> PoseSourceLabel:
"""Read the source label from the AZ-385 state machine if wired.
Falls back to ``VISUAL_PROPAGATED`` when no state machine is
attached — the contract's neutral steady-state default before
the spoof-promotion gate has any evidence either way.
"""
if self._source_label_machine is None:
return PoseSourceLabel.VISUAL_PROPAGATED
try:
label = self._source_label_machine.current_label()
except Exception as exc:
self._log.error(
"c5.state.source_label_machine_failed",
extra={
"kind": "c5.state.source_label_machine_failed",
"kv": {"error": str(exc)},
},
)
return PoseSourceLabel.VISUAL_PROPAGATED
if not isinstance(label, PoseSourceLabel):
return PoseSourceLabel.VISUAL_PROPAGATED
return label
def _spoof_promotion_blocked(self) -> bool:
"""Query the AZ-385 state machine for the spoof gate state."""
if self._source_label_machine is None:
return False
try:
return bool(self._source_label_machine.is_spoof_promotion_blocked())
except Exception:
return False
def _smoother_keyframe_count(self) -> int:
"""Count the smoother's active POSE keys.
``IncrementalFixedLagSmoother.calculateEstimate()`` returns
the ``Values`` for every key still inside the sliding
window; we filter to POSE keys (``'x'`` namespace) per
Invariant 6.
"""
try:
keys = list(self._smoother.calculateEstimate().keys())
except Exception:
return 0
return sum(1 for k in keys if gtsam.symbolChr(int(k)) == ord("x"))
def _record_cov_norm_sample(self, ts_ns: int, covariance: np.ndarray) -> None:
"""Append a ``(ts_ns, fro_norm(cov))`` sample to the rolling window."""
norm = float(np.linalg.norm(covariance, ord="fro"))
self._cov_norm_window.append((ts_ns, norm))
self._prune_cov_norm_window(ts_ns)
def _prune_cov_norm_window(self, now_ns: int) -> None:
cutoff = now_ns - _COV_NORM_WINDOW_NS
while self._cov_norm_window and self._cov_norm_window[0][0] < cutoff:
self._cov_norm_window.popleft()
def _cov_norm_growing_for_s(self) -> float:
"""Return the length (s) of the latest strictly-rising suffix.
Walks the rolling window from newest to oldest; accumulates
time while each sample is greater than its successor. Resets
the moment we find a non-rising step. Maps AC-NEW-8: "Resets
to 0 on a non-rising frame".
"""
if len(self._cov_norm_window) < 2:
return 0.0
samples = list(self._cov_norm_window)
# Walk newest → second-newest; stop when the chain breaks.
oldest_rising_idx = len(samples) - 1
for i in range(len(samples) - 1, 0, -1):
if samples[i][1] > samples[i - 1][1]:
oldest_rising_idx = i - 1
else:
break
if oldest_rising_idx == len(samples) - 1:
return 0.0
span_ns = samples[-1][0] - samples[oldest_rising_idx][0]
return max(0.0, span_ns * 1e-9)
# ----------------------------------------------------------------------
# Module-level helpers.
@@ -614,3 +1008,38 @@ def _make_timestamp_map(
for key in keys:
ts_map.insert((key, ts_seconds))
return ts_map
# ----------------------------------------------------------------------
# Module-level pure helpers (AZ-384).
def _quat_from_pose3(pose: gtsam.Pose3) -> Quat:
"""Build a scalar-first :class:`Quat` from a ``gtsam.Pose3``.
Uses ``Rot3.toQuaternion`` to get the body→world unit
quaternion; we explicitly normalise to defend against numerical
drift inside the iSAM2 estimate.
"""
rot = pose.rotation()
q = rot.toQuaternion()
w, x, y, z = float(q.w()), float(q.x()), float(q.y()), float(q.z())
norm = (w * w + x * x + y * y + z * z) ** 0.5
if norm < 1e-12:
return Quat(w=1.0, x=0.0, y=0.0, z=0.0)
return Quat(w=w / norm, x=x / norm, y=y / norm, z=z / norm)
def _enforce_spd(covariance: np.ndarray) -> None:
"""Cholesky-check ``covariance``; raise ``EstimatorFatalError`` on failure.
The C5 contract (Invariant 10) demands every emitted
``covariance_6x6`` is symmetric positive-definite. We use
Cholesky-decomposition as the defensive check because it's the
same primitive iSAM2 uses internally — a failure here means the
posterior is numerically broken, NOT merely degraded.
"""
try:
np.linalg.cholesky(covariance)
except LinAlgError as exc:
raise EstimatorFatalError(f"covariance failed SPD invariant: {exc}") from exc
@@ -368,33 +368,14 @@ def test_ac9_update_emits_success_log(caplog: pytest.LogCaptureFixture) -> None:
# ---------------------------------------------------------------------
# AC-10: StateEstimator Protocol methods still raise NotImplementedError
# AC-10 NOTE: All six ``StateEstimator`` Protocol methods originally
# raised ``NotImplementedError`` in AZ-382's scope. They have all
# landed:
#
# Note: The three ``add_*`` methods USED to raise ``NotImplementedError``
# naming AZ-383 in AZ-382's scope. AZ-383 has since landed and replaced
# those bodies with real factor adds; the now-active behaviour is
# tested in ``tests/unit/c5_state/test_az383_factor_adds.py``. Only the
# three output methods (``current_estimate`` / ``smoothed_history`` /
# ``health_snapshot``) still raise NotImplementedError pointing at the
# next-task AZ-384.
def test_ac10_current_estimate_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.current_estimate()
def test_ac10_smoothed_history_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.smoothed_history(n_keyframes=5)
def test_ac10_health_snapshot_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.health_snapshot()
# - ``add_vio`` / ``add_pose_anchor`` / ``add_fc_imu`` → tested in
# ``tests/unit/c5_state/test_az383_factor_adds.py``.
# - ``current_estimate`` / ``smoothed_history`` / ``health_snapshot``
# tested in ``tests/unit/c5_state/test_az384_marginals_outputs.py``.
# ---------------------------------------------------------------------
@@ -0,0 +1,466 @@
"""AZ-384 — GtsamIsam2StateEstimator current_estimate / smoothed_history / health_snapshot.
Ten ACs from ``_docs/02_tasks/done/AZ-384_c5_marginals_outputs.md``:
- AC-1 ``current_estimate`` returns a fresh ``EstimatorOutput`` per
call with ``smoothed=False``.
- AC-2 SPD covariance — Cholesky must succeed; non-SPD raises
:class:`EstimatorFatalError`.
- AC-3 WGS84 conversion — uses the shared ``WgsConverter``;
ENU origin defaults to (0, 0, 0) when none injected.
- AC-4 ``smoothed_history(n)`` is bounded by K=15; every entry has
``smoothed=True``.
- AC-5 ``current_estimate`` has ``smoothed=False`` (distinguishes
from history).
- AC-6 ``health_snapshot.isam2_state`` reflects convergence:
``INIT`` before first estimate, ``TRACKING`` after, ``LOST``
on a fatal SPD failure.
- AC-7 ``keyframe_count`` matches the smoother's pose-key count.
- AC-8 ``cov_norm_growing_for_s`` increments under monotone-rising
covariance norms; resets to 0 on a non-rising frame.
- AC-9 ``spoof_promotion_blocked`` queries the injected
``source_label_state_machine`` (default ``False`` when none).
- AC-10 ``last_satellite_anchor_age_ms`` is a pass-through from
``handle.last_anchor_age_ms()``.
The seeded-prior approach in the helpers below mirrors what AZ-388
(AC-5.2 fallback) will do at startup — a single prior factor + an
initial value for x0, so iSAM2 has something to read in
``current_estimate``. Marginal-quality tests use mock handles so we
can synthesise specific covariance matrices without driving the
optimiser into a particular numerical state.
"""
from __future__ import annotations
from unittest import mock
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import (
EstimatorHealth,
EstimatorOutput,
IsamState,
PoseSourceLabel,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import EstimatorFatalError
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create,
)
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
def _build_estimator() -> GtsamIsam2StateEstimator:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, _ = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
return estimator
def _seed_prior(
estimator: GtsamIsam2StateEstimator,
pose: gtsam.Pose3 | None = None,
*,
ts_seconds: float = 0.0,
) -> int:
"""Plant a prior factor + initial value at x0 on the real iSAM2 graph.
Returns the seeded pose key. This is the minimal scaffolding
needed to make ``current_estimate`` succeed; AZ-388 will own
this seeding at startup once the AC-5.2 fallback wiring lands.
Provides a real ``FixedLagSmootherKeyTimestampMap`` so the
smoother tracks the key (otherwise ``keyframe_count`` /
``smoothed_history`` see an empty window).
"""
import gtsam_unstable
pose = pose if pose is not None else gtsam.Pose3()
key = gtsam.symbol("x", estimator._next_key_counter)
estimator._next_key_counter += 1
noise = gtsam.noiseModel.Isotropic.Sigma(6, 0.1)
prior = gtsam.PriorFactorPose3(key, pose, noise)
graph = gtsam.NonlinearFactorGraph()
graph.add(prior)
values = gtsam.Values()
values.insert(key, pose)
ts_map = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
ts_map.insert((key, ts_seconds))
estimator._isam2_handle.update(graph, values, timestamps=ts_map)
estimator._record_committed_pose_key(key)
return key
# ---------------------------------------------------------------------
# AC-1 + AC-5: current_estimate fresh EstimatorOutput; smoothed=False
def test_ac1_current_estimate_returns_fresh_estimator_output() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out_a = estimator.current_estimate()
out_b = estimator.current_estimate()
assert isinstance(out_a, EstimatorOutput)
assert isinstance(out_b, EstimatorOutput)
assert out_a is not out_b
assert out_a.frame_id != out_b.frame_id
def test_ac1_no_committed_pose_key_raises_fatal() -> None:
estimator = _build_estimator()
with pytest.raises(EstimatorFatalError, match="no committed pose key"):
estimator.current_estimate()
def test_ac5_current_estimate_smoothed_false() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
assert out.smoothed is False
# ---------------------------------------------------------------------
# AC-2: SPD covariance enforcement
def test_ac2_spd_invariant_holds_for_real_marginals() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
# Should not raise — well-conditioned posterior is SPD.
np.linalg.cholesky(out.covariance_6x6)
def test_ac2_non_spd_marginals_raises_fatal() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.zeros((6, 6))
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
with pytest.raises(EstimatorFatalError, match="SPD"):
estimator.current_estimate()
assert estimator._isam2_state == IsamState.LOST
# ---------------------------------------------------------------------
# AC-3: WGS84 conversion uses WgsConverter
def test_ac3_default_origin_is_equator() -> None:
estimator = _build_estimator()
_seed_prior(estimator, pose=gtsam.Pose3())
out = estimator.current_estimate()
# Identity pose at default origin (0, 0, 0) → output is exactly the origin.
assert out.position_wgs84.lat_deg == pytest.approx(0.0, abs=1e-6)
assert out.position_wgs84.lon_deg == pytest.approx(0.0, abs=1e-6)
assert out.position_wgs84.alt_m == pytest.approx(0.0, abs=1e-3)
def test_ac3_explicit_origin_round_trips() -> None:
estimator = _build_estimator()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.0, alt_m=120.0)
estimator.set_enu_origin(origin)
_seed_prior(estimator, pose=gtsam.Pose3())
out = estimator.current_estimate()
assert out.position_wgs84.lat_deg == pytest.approx(50.0, abs=1e-6)
assert out.position_wgs84.lon_deg == pytest.approx(36.0, abs=1e-6)
assert out.position_wgs84.alt_m == pytest.approx(120.0, abs=1e-3)
def test_ac3_translated_pose_offsets_from_origin() -> None:
estimator = _build_estimator()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.0, alt_m=120.0)
estimator.set_enu_origin(origin)
pose_100m_east = gtsam.Pose3(gtsam.Rot3(), gtsam.Point3(100.0, 0.0, 0.0))
_seed_prior(estimator, pose=pose_100m_east)
out = estimator.current_estimate()
# ~100m east at 50° lat is ~0.0014° lon; just confirm the
# direction + same lat.
assert out.position_wgs84.lat_deg == pytest.approx(50.0, abs=1e-4)
assert out.position_wgs84.lon_deg > 36.0
assert out.position_wgs84.lon_deg < 36.01
# ---------------------------------------------------------------------
# AC-4: smoothed_history bounded by K
def test_ac4_smoothed_history_bounded_by_k() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(100)
assert len(history) <= 15
def test_ac4_smoothed_history_entries_have_smoothed_true() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(100)
assert len(history) >= 1
for entry in history:
assert entry.smoothed is True
def test_ac4_smoothed_history_empty_when_n_zero() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
history = estimator.smoothed_history(0)
assert history == []
def test_ac4_smoothed_history_empty_before_seed() -> None:
estimator = _build_estimator()
history = estimator.smoothed_history(10)
assert history == []
# ---------------------------------------------------------------------
# AC-6: IsamState lifecycle
def test_ac6_isam2_state_init_before_first_estimate() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.INIT
def test_ac6_isam2_state_tracking_after_estimate() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.TRACKING
def test_ac6_isam2_state_lost_after_fatal() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.zeros((6, 6))
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
with pytest.raises(EstimatorFatalError):
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.isam2_state == IsamState.LOST
# ---------------------------------------------------------------------
# AC-7: keyframe_count accuracy
def test_ac7_keyframe_count_initially_zero() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.keyframe_count == 0
def test_ac7_keyframe_count_grows_with_seeded_keys() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
snap = estimator.health_snapshot()
assert snap.keyframe_count == 1
# ---------------------------------------------------------------------
# AC-8: cov_norm_growing_for_s monotonicity
def test_ac8_cov_norm_growing_zero_with_constant_norm() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.1
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for _ in range(5):
estimator.current_estimate()
snap = estimator.health_snapshot()
assert snap.cov_norm_growing_for_s == 0.0
def test_ac8_cov_norm_growing_increments_under_rising_norm() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for sigma in [0.05, 0.1, 0.2, 0.3, 0.5]:
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * sigma
estimator.current_estimate()
growing_s = estimator._cov_norm_growing_for_s()
assert growing_s > 0.0
def test_ac8_cov_norm_growing_resets_on_drop() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 0
estimator._isam2_handle = fake_handle
for sigma in [0.05, 0.1, 0.2, 0.3, 0.5]:
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * sigma
estimator.current_estimate()
# Now drop the norm — the rising chain breaks.
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.05
estimator.current_estimate()
assert estimator._cov_norm_growing_for_s() == 0.0
# ---------------------------------------------------------------------
# AC-9: spoof_promotion_blocked via injected state machine
def test_ac9_default_spoof_promotion_blocked_false() -> None:
estimator = _build_estimator()
snap = estimator.health_snapshot()
assert snap.spoof_promotion_blocked is False
def test_ac9_spoof_promotion_blocked_from_state_machine() -> None:
estimator = _build_estimator()
machine = mock.MagicMock()
machine.is_spoof_promotion_blocked.return_value = True
machine.current_label.return_value = PoseSourceLabel.DEAD_RECKONED
estimator.attach_source_label_state_machine(machine)
snap = estimator.health_snapshot()
assert snap.spoof_promotion_blocked is True
def test_ac9_state_machine_drives_source_label() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
machine = mock.MagicMock()
machine.is_spoof_promotion_blocked.return_value = False
machine.current_label.return_value = PoseSourceLabel.SATELLITE_ANCHORED
estimator.attach_source_label_state_machine(machine)
out = estimator.current_estimate()
assert out.source_label == PoseSourceLabel.SATELLITE_ANCHORED
def test_ac9_default_source_label_is_visual_propagated() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out = estimator.current_estimate()
assert out.source_label == PoseSourceLabel.VISUAL_PROPAGATED
# ---------------------------------------------------------------------
# AC-10: last_satellite_anchor_age_ms pass-through
def test_ac10_last_satellite_anchor_age_ms_passthrough() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
fake_handle = mock.MagicMock()
fake_marginals = mock.MagicMock()
fake_marginals.marginalCovariance.return_value = np.eye(6, dtype=np.float64) * 0.1
fake_handle.compute_marginals.return_value = fake_marginals
fake_handle.last_anchor_age_ms.return_value = 1234
estimator._isam2_handle = fake_handle
out = estimator.current_estimate()
assert out.last_satellite_anchor_age_ms == 1234
def test_ac10_emitted_at_is_monotonic_ns() -> None:
estimator = _build_estimator()
_seed_prior(estimator)
out_a = estimator.current_estimate()
out_b = estimator.current_estimate()
assert isinstance(out_a.emitted_at, int)
assert out_b.emitted_at >= out_a.emitted_at
# ---------------------------------------------------------------------
# Defensive — health_snapshot is cheap (no marginals call)
def test_health_snapshot_does_not_call_marginals() -> None:
estimator = _build_estimator()
fake_handle = mock.MagicMock()
estimator._isam2_handle = fake_handle
snap = estimator.health_snapshot()
fake_handle.compute_marginals.assert_not_called()
assert isinstance(snap, EstimatorHealth)