[AZ-383] C5 add_vio/add_pose_anchor/add_fc_imu factor adds

Replaces AZ-382 NotImplementedError placeholders with real GTSAM factor
adds wired against the iSAM2 graph handle:

- add_vio  -> BetweenFactorPose3 between consecutive VIO pose keys
  (first call primes the chain; AZ-388 owns first-keyframe seeding).
- add_pose_anchor -> mode-dispatch per pose.covariance_mode:
  "marginals" -> PriorFactorPose3 + handle.update();
  "jacobian"  -> skip iSAM2 add per AZ-361 contract.
  Both paths bump _last_anchor_ns via time.monotonic_ns().
- add_fc_imu -> shared ImuPreintegrator.integrate_window +
  reset_for_new_keyframe; builds a CombinedImuFactor between the
  prev/curr (X, V, B) keyframe triple. Introduces new 'v' (velocity)
  and 'b' (bias) GTSAM key namespaces decoupled from the VIO/pose
  frame_id mapping.

Invariant 2 - non-decreasing timestamps - enforced per call with
EstimatorDegradedError + c5.state.out_of_order log. Every successful
add emits a structured DEBUG *_ok log; every failure emits a
structured ERROR *_failed log and raises through the C5 error
hierarchy (R05).

Contract-vs-reality fix-ups also landed:

- StateEstimator Protocol: add_fc_imu(ImuWindow) - was incorrectly
  annotated as ImuTelemetrySample by AZ-381.
- _last_anchor_ns semantics switched to monotonic_ns() to match
  last_anchor_age_ms.
- create() factory back-wires the ISam2GraphHandle to the estimator
  via the new attach_handle() method.

Tests: +21 in tests/unit/c5_state/test_az383_factor_adds.py covering
all 8 ACs with mock ISam2GraphHandle instances. Three obsolete
AZ-382 tests (test_ac10_add_*_raises_named_az383) removed. Full
suite: 565 passed, 2 skipped.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 06:07:45 +03:00
parent 8b394a98c6
commit fd848266d1
7 changed files with 955 additions and 70 deletions
@@ -0,0 +1,96 @@
# Batch 14 — Cycle 1 Implementation Report
**Batch**: 14 of N
**Tasks landed**: AZ-383 (`GtsamIsam2StateEstimator` — VIO + Pose + IMU factor adds)
**Cycle**: 1
**Date**: 2026-05-11
## Scope
| Task | Component | Purpose |
|------|-----------|---------|
| AZ-383 | C5 state estimator | Replaces the three AZ-382 `NotImplementedError` placeholders on `GtsamIsam2StateEstimator` (`add_vio`, `add_pose_anchor`, `add_fc_imu`) with real factor-add bodies wired against the AZ-382 `ISam2GraphHandle`. `add_vio` emits a `BetweenFactorPose3` between consecutive VIO pose keys (first call primes the chain only). `add_pose_anchor` mode-dispatches per `pose.covariance_mode`: `"marginals"``PriorFactorPose3` + `handle.update()`; `"jacobian"` → SKIP the iSAM2 add per the AZ-361 cross-task contract (the running estimate still consumes the pose downstream, the graph stops growing under throttle). Both paths bump `_last_anchor_ns` via `time.monotonic_ns()` so `last_anchor_age_ms` and the spoof-promotion gate (AZ-385) see a recent satellite anchor. `add_fc_imu` feeds the shared `ImuPreintegrator`, closes the PIM with `reset_for_new_keyframe()`, and builds a `CombinedImuFactor` between the previous and current `(X, V, B)` keyframe triple — first call primes the chain without emitting a factor. Every successful add emits a DEBUG `*_ok` log; every failure emits an ERROR `*_failed` log and raises through the C5 error hierarchy (R05). Invariant 2 — non-decreasing timestamps — is enforced per call with an `EstimatorDegradedError` + `c5.state.out_of_order` log on violation. The remaining three Protocol output methods (`current_estimate` / `smoothed_history` / `health_snapshot`) still raise `NotImplementedError` naming AZ-384. |
## Decision record
The task started as a "~5pt drop-in", but inspecting the contracts surfaced four contract-vs-reality conflicts that the user chose to resolve in full alignment with the C5 contract rather than paper over (re-estimate to ~78pt, one batch):
1. **Protocol `add_fc_imu` type mismatch** — AZ-381's Protocol annotated `add_fc_imu(ImuTelemetrySample)` (a single sample), but the C5 contract has always specified `ImuWindow` because the shared `ImuPreintegrator` operates on batches. The composition root will batch the C8 `ImuTelemetrySample` stream into `ImuWindow`s before calling `add_fc_imu`. Fixed `interface.py` here with a docstring note.
2. **`ImuPreintegrator` API mismatch** — task spec assumed `preintegrate(window) -> CombinedImuFactor`. Real API per AZ-276 is `integrate_window(window)` + `reset_for_new_keyframe() -> PreintegratedCombinedMeasurements`. C5 builds the `CombinedImuFactor` itself from the closed PIM + the (X, V, B) key triple — that's not the preintegrator's job.
3. **Anchor freshness semantics** — task spec said `_last_anchor_ns = pose.emitted_at`, but `PoseEstimate` has no `emitted_at` field and the contract describes the value as a **monotonic age** (consumed by `last_anchor_age_ms` which uses `time.monotonic_ns()`). Switched to `_last_anchor_ns = time.monotonic_ns()`.
4. **`CombinedImuFactor` key management** — the factor needs a `(prev_X, prev_V, prev_B, curr_X, curr_V, curr_B)` 6-tuple of keys. Introduced two new GTSAM key namespaces (`'v'` for velocity, `'b'` for bias) with an independent `_imu_keyframe_counter`. The `'x'` (pose) namespace is shared with VIO / pose-anchor via `_next_key_counter`, but the IMU chain steps it independently per IMU keyframe — the composition root will arrange the higher-level scheduling that fuses the chains.
## Files added / modified
### Modified (prod)
- `src/gps_denied_onboard/components/c5_state/interface.py` — corrected `add_fc_imu` Protocol annotation from `ImuTelemetrySample` to `ImuWindow`; added a docstring note explaining the AZ-381 → AZ-383 fix-up.
- `src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py` — replaced the three `NotImplementedError` placeholders with real factor-add bodies; added `_DEFAULT_POSE_SIGMA = 0.1` module constant; added new state fields: `_log`, `_isam2_handle` (back-reference to the handle), `_last_added_ts_ns` (Invariant 2 monotonic guard), `_prev_vio` (between-factor wiring), `_imu_keyframe_counter` + `_prev_imu_x_key/_prev_imu_v_key/_prev_imu_b_key` (IMU chain); added `attach_handle(handle)` method; extended `key_for_frame` signature to `UUID | int`; added internal helpers `_require_handle`, `_guard_timestamp`, `_reset_staging`; added module-level pure helpers `_datetime_to_ns`, `_pose_se3_to_gtsam`, `_build_pose_noise`, `_make_timestamp_map`; updated `create(...)` to back-wire the handle to the estimator via `attach_handle(...)`.
### Added (tests)
- `tests/unit/c5_state/test_az383_factor_adds.py` — 21 new tests covering all 8 ACs using mock `ISam2GraphHandle` instances (real iSAM2 would require seeded initial values which AZ-388 owns — see "Test strategy" below).
### Modified (tests)
- `tests/unit/c5_state/test_az382_isam2_smoother_wiring.py` — removed the three now-obsolete `test_ac10_add_*_raises_named_az383` tests; their behaviour is now governed by AZ-383's AC tests. Replaced with a comment block pointing readers to `test_az383_factor_adds.py`. The three remaining `test_ac10_*_raises_named_az384` tests (the marginals/outputs methods) were retained.
## Test strategy — why mock handles
A smoke test of `add_vio` against the real `ISam2GraphHandleImpl` failed with `iSAM2.update failed: Attempting to … the key "x0", which does not exist in the Values`. The reason: a between-factor between `x0` and `x1` requires both keys to already be in `gtsam.Values`. The first `add_vio` call would need to ALSO seed `x0` with a prior or an initial value, which is the responsibility of AZ-388 (AC-5.2 fallback). Rather than smuggle that responsibility into AZ-383, the AC tests verify only what AZ-383 owns:
- The correct GTSAM factor TYPE is constructed (`BetweenFactorPose3` / `PriorFactorPose3` / `CombinedImuFactor`).
- The correct keys are passed.
- `handle.add_factor` and `handle.update` are called the expected number of times.
- The mode-dispatch in `add_pose_anchor` correctly SKIPS the add on the JACOBIAN path.
- Invariant 2 enforcement raises `EstimatorDegradedError` + emits the structured `c5.state.out_of_order` log.
- Every success path emits a `*_ok` DEBUG log; every failure path emits a `*_failed` ERROR log and raises through the C5 error hierarchy.
- `_last_anchor_ns` is updated on BOTH pose-anchor paths.
The full end-to-end iSAM2 convergence will be tested in AZ-388 once the chain is seedable.
## Architectural notes
- **`_FRAME_PERIOD_S` constant** — inherited from AZ-382 (D-C5-3 fixes the keyframe rate at 3 Hz). AZ-383 did not touch the smoother window logic.
- **(X, V, B) chain is independent of the frame-id mapping** — IMU keyframes are NOT VIO keyframes; the contract documents this. The composition root will fuse the chains; AZ-383's per-window factor add is self-contained.
- **Defensive logging is mandatory (R05)** — every successful add: `c5.state.add_vio_ok` / `c5.state.add_pose_anchor_ok` / `c5.state.add_fc_imu_ok`. Every failure: `c5.state.add_vio_failed` / `c5.state.add_pose_anchor_failed` / `c5.state.add_fc_imu_failed` / `c5.state.add_fc_imu_preintegrate_failed`. JACOBIAN-path skip emits INFO `c5.state.skip_isam2_jacobian_path`. Out-of-order: ERROR `c5.state.out_of_order`. The C5 contract documents this surface because silent factor-add failures bit the prototype.
- **First-call priming** — both `add_vio` and `add_fc_imu` SHORT-CIRCUIT the first call (`_prev_vio is None` / `_prev_imu_x_key is None`) because there is no previous frame to relate to. This is documented in both method docstrings.
- **`_reset_staging()` after every successful flush** — the staging `_graph` / `_values` are cleared post-flush so the next `add_*` builds a fresh delta. This matches the iSAM2 incremental-update idiom (the handle's `update(graph, values, …)` accepts the DELTA, not the cumulative graph).
## Test counts
| Suite | Before (B13) | After (B14) | Delta |
|-------|--------------|-------------|-------|
| Total passing | 547 | 565 | +18 |
| Skipped | 2 | 2 | 0 |
| AZ-383 (new) | 0 | 21 | +21 |
| AZ-382 (preserved) | 27 | 24 | 3 (obsolete `test_ac10_add_*_raises_named_az383` tests removed; behaviour now lives in AZ-383 tests) |
Run command: `PYTHONPATH=src pytest tests/ -q``565 passed, 2 skipped in ~20s`.
## Lint / type
- `ruff check src/gps_denied_onboard/components/c5_state/ tests/unit/c5_state/` — clean (1 `RUF002` ambiguous-Unicode in a docstring auto-fixed during the cycle).
- `ruff format src/gps_denied_onboard/components/c5_state/ tests/unit/c5_state/` — clean (1 file reformatted).
- `ReadLints` on `src/gps_denied_onboard/components/c5_state/` + `tests/unit/c5_state/` — 0 errors.
## Acceptance evidence
| AC | Test(s) | Status |
|----|---------|--------|
| AC-1 VIO factor add | `test_ac1_vio_first_call_records_without_factor`, `test_ac1_vio_second_call_adds_between_factor` | PASS |
| AC-2 Pose-anchor MARGINALS path | `test_ac2_pose_anchor_marginals_adds_prior_factor_and_updates`, `test_ac2_pose_anchor_marginals_bumps_last_anchor_ns` | PASS |
| AC-3 Pose-anchor JACOBIAN skip | `test_ac3_pose_anchor_jacobian_skips_factor_add`, `test_ac3_pose_anchor_jacobian_still_bumps_last_anchor_ns` | PASS |
| AC-4 IMU factor add via ImuPreintegrator | `test_ac4_imu_first_window_primes_chain_no_factor`, `test_ac4_imu_second_window_adds_combined_imu_factor` | PASS |
| AC-5 Timestamp ordering | `test_ac5_vio_out_of_order_raises_degraded`, `test_ac5_pose_anchor_out_of_order_raises_degraded`, `test_ac5_imu_out_of_order_raises_degraded`, `test_ac5_emits_out_of_order_log` | PASS |
| AC-6 Defensive logging | `test_ac6_vio_success_emits_debug_log`, `test_ac6_pose_anchor_success_emits_debug_log`, `test_ac6_vio_failure_emits_error_log_and_raises` | PASS |
| AC-7 Update exactly once | `test_ac7_vio_triggers_update_once`, `test_ac7_pose_anchor_marginals_triggers_update_once`, `test_ac7_pose_anchor_jacobian_triggers_no_update`, `test_ac7_imu_second_window_triggers_update_once` | PASS |
| AC-8 `_last_anchor_ns` accuracy | `test_ac8_last_anchor_age_ms_after_anchor_is_small` | PASS |
| Defensive — no-handle | `test_no_handle_raises_state_estimator_config_error` | PASS |
## Known forward actions (not in scope this batch)
- **AZ-388 (AC-5.2 fallback)** — first-keyframe seeding (prior factor + initial values on x0) so an end-to-end smoke test of `add_vio` → real iSAM2 update converges. The current AZ-383 tests use mock handles to avoid coupling the factor-add semantics to that scheduling.
- **AZ-384** — `current_estimate` / `smoothed_history` / `health_snapshot` bodies; will read marginals from the handle and translate them into the C5 `EstimatorOutput` / `EstimatorHealth` DTOs.
- **Composition root batching** — turn the C8 `ImuTelemetrySample` stream into `ImuWindow` instances before calling `add_fc_imu`. Owned by the composition-root binding task.
- **Cross-stream timestamp merge** — AZ-383 enforces per-call monotonicity but does not order the three streams against each other; the composition root will arrange that.
+1 -1
View File
@@ -8,7 +8,7 @@ status: in_progress
sub_step:
phase: 6
name: implement-tasks
detail: "batch 13 of N committed (AZ-382 c5 GtsamIsam2StateEstimator skeleton + ISam2GraphHandleImpl real bodies + defensive logging)"
detail: "batch 14 of N committed (AZ-383 c5 factor adds: add_vio BetweenFactorPose3 + add_pose_anchor mode-dispatch + add_fc_imu CombinedImuFactor with v/b key namespaces)"
retry_count: 0
cycle: 1
tracker: jira
@@ -1,55 +1,57 @@
"""C5 ``GtsamIsam2StateEstimator`` — skeleton wiring (AZ-382 / E-C5).
"""C5 ``GtsamIsam2StateEstimator`` — graph + factor adds (AZ-382 + AZ-383).
Builds the GTSAM iSAM2 graph + ``IncrementalFixedLagSmoother`` substrate
on which AZ-383 / AZ-384 / AZ-385 will hang the actual factor adds,
marginals, and source-label logic. This task owns:
AZ-382 wired the GTSAM substrate (iSAM2 + ``IncrementalFixedLagSmoother``
+ ``NonlinearFactorGraph`` + ``Values``) and the four ``ISam2GraphHandleImpl``
real bodies. AZ-383 owns the three Protocol factor-add methods:
* ``__init__`` — constructs ``_isam2`` (``gtsam.ISAM2``), ``_smoother``
(``gtsam_unstable.IncrementalFixedLagSmoother`` with a
``K * frame_period_s`` window), ``_graph`` + ``_values`` containers,
and the ``_key_for_frame`` mapping used by AZ-383 when it converts
``UUID`` ``frame_id`` values into GTSAM ``Key`` integers via
``gtsam.symbol('x', counter)``.
* :func:`create` — module-level factory the composition root registers
as ``register_state_estimator("gtsam_isam2", create)``. Returns the
``(StateEstimator, ISam2GraphHandle)`` tuple per
:mod:`gps_denied_onboard.runtime_root.state_factory`.
* :func:`register` — convenience that calls ``register_state_estimator``
with this strategy. Test fixtures + the per-binary bootstrap module
call this; AZ-381 deliberately did not auto-register at import.
* ``add_vio(vio: VioOutput)`` — ``BetweenFactorPose3`` between
consecutive pose keys with a noise model derived from
``vio.covariance_6x6``.
* ``add_pose_anchor(pose: PoseEstimate)`` — mode-dispatched per
``pose.covariance_mode``: ``"marginals"`` → ``PriorFactorPose3`` +
``update``; ``"jacobian"`` → skip iSAM2 add (per the AZ-361 cross-task
contract) but still bump ``_last_anchor_ns``.
* ``add_fc_imu(imu_window: ImuWindow)`` — feeds the shared
``ImuPreintegrator``, builds a ``CombinedImuFactor`` between the
prev/curr (X, V, B) keyframe triple, and inserts initial values for
the new keys.
Every ``StateEstimator`` Protocol method intentionally raises
``NotImplementedError`` with the **next** task's tracker ID in the
message — the bodies are owned by:
Invariant 2 — every ``add_*`` call enforces non-decreasing timestamps;
out-of-order arrivals raise :class:`EstimatorDegradedError` and emit an
ERROR log. R05 — every successful factor add emits a DEBUG SUCCESS
log; every failure emits an ERROR FAILURE log AND raises through the
C5 error hierarchy.
* ``add_vio`` / ``add_pose_anchor`` / ``add_fc_imu`` → AZ-383
* ``current_estimate`` / ``smoothed_history`` / ``health_snapshot`` → AZ-384
The ``ISam2GraphHandleImpl`` bodies in
:mod:`gps_denied_onboard.components.c5_state._isam2_handle`, however,
ARE owned by this task and are populated against the estimator
constructed here.
The remaining three Protocol methods (``current_estimate`` /
``smoothed_history`` / ``health_snapshot``) still raise
``NotImplementedError`` naming AZ-384 — marginals + outputs land
there.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Any, Final
from uuid import UUID
import gtsam
import gtsam_unstable
import numpy as np
from gps_denied_onboard.components.c5_state._isam2_handle import (
ISam2GraphHandle,
ISam2GraphHandleImpl,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import StateEstimatorConfigError
from gps_denied_onboard.components.c5_state.errors import (
EstimatorDegradedError,
StateEstimatorConfigError,
)
from gps_denied_onboard.components.c5_state.interface import StateEstimator
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.fc import ImuTelemetrySample
from gps_denied_onboard._types.nav import ImuWindow
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard._types.state import EstimatorHealth, EstimatorOutput
from gps_denied_onboard._types.vio import VioOutput
@@ -74,6 +76,12 @@ _FRAME_PERIOD_S: Final[float] = 1.0 / 3.0
# ``register_state_estimator`` registry.
_STRATEGY: Final[str] = "gtsam_isam2"
# Default isotropic noise covariance used when an input DTO arrives
# without an explicit ``covariance_6x6``. 0.1 m / 0.1 rad sigmas match
# the documented BMI088-class defaults and let the iSAM2 solver
# converge without exploding.
_DEFAULT_POSE_SIGMA: Final[float] = 0.1
class GtsamIsam2StateEstimator(StateEstimator):
"""Production-default C5 estimator — iSAM2 + ``IncrementalFixedLagSmoother``.
@@ -109,7 +117,10 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._graph = gtsam.NonlinearFactorGraph()
self._values = gtsam.Values()
self._key_for_frame: dict[UUID, int] = {}
# ``_key_for_frame`` maps the input DTO's ``frame_id`` (int on
# ``VioOutput`` + ``PoseEstimate``, UUID on the C5 output) to a
# GTSAM ``Key``. The dict accepts any hashable.
self._key_for_frame: dict[Any, int] = {}
self._next_key_counter: int = 0
# Initialised to 0 ⇒ ``last_anchor_age_ms`` returns
# ``monotonic_ns() / 1e6`` (a very large number) until AZ-383
@@ -117,7 +128,33 @@ class GtsamIsam2StateEstimator(StateEstimator):
# documents this as the "no anchor yet" sentinel.
self._last_anchor_ns: int = 0
get_logger("c5_state.gtsam_isam2").debug(
# AZ-383 state -----------------------------------------------------
self._log = get_logger("c5_state.gtsam_isam2")
# The handle is constructed by ``create(...)`` and assigned
# post-init so the estimator can call back into it during
# ``add_*``. AZ-383 (not AZ-382) introduced this back-reference;
# before then the handle was held only by the composition root.
self._isam2_handle: ISam2GraphHandle | None = None
# Invariant 2 — strictly non-decreasing per-stream timestamps.
# Stored as int ns on the wall-clock (datetime → ns for
# VioOutput/PoseEstimate; ts_end_ns for ImuWindow). The merge
# rule across the three streams is the composition root's job;
# C5 only enforces the per-call monotonicity guard.
self._last_added_ts_ns: int = 0
# ``add_vio`` between-factor wiring.
self._prev_vio: VioOutput | None = None
# ``add_fc_imu`` uses an INDEPENDENT (X, V, B) keyframe chain
# decoupled from the VIO/pose frame_id mapping — see the
# implementation note on add_fc_imu below.
self._imu_keyframe_counter: int = 0
self._prev_imu_x_key: int | None = None
self._prev_imu_v_key: int | None = None
self._prev_imu_b_key: int | None = None
self._log.debug(
"c5.state.isam2_initialised",
extra={
"kind": "c5.state.isam2_initialised",
@@ -141,14 +178,25 @@ class GtsamIsam2StateEstimator(StateEstimator):
f"config.components['c5_state'] must be a C5StateConfig; got {type(block).__name__}"
)
def key_for_frame(self, frame_id: UUID) -> int:
def attach_handle(self, handle: ISam2GraphHandle) -> None:
"""Wire the iSAM2 graph handle.
Called once by :func:`create` after the handle is constructed
against this estimator instance. The handle is required by
every ``add_*`` method, so any ``add_*`` call against an
estimator with no handle raises ``StateEstimatorConfigError``.
"""
self._isam2_handle = handle
def key_for_frame(self, frame_id: UUID | int) -> int:
"""Return the GTSAM ``Key`` for ``frame_id``, assigning on first use.
AZ-383 factor adds call this to translate ``UUID`` frame
identifiers into the integer keys GTSAM uses for symbols. Per
the C5 contract §"Key management policy" we reserve the
``'x'`` (pose) namespace; AZ-383 will additionally use
``'b'`` (bias) and ``'v'`` (velocity).
AZ-383 calls this from ``add_vio`` and ``add_pose_anchor`` to
translate the input DTO's frame identifier into the integer
key GTSAM uses. Per the C5 contract §"Key management policy"
we reserve the ``'x'`` (pose) namespace; ``add_fc_imu``
additionally uses ``'v'`` (velocity) and ``'b'`` (bias) via
its own internal counter.
"""
existing = self._key_for_frame.get(frame_id)
if existing is not None:
@@ -158,21 +206,268 @@ class GtsamIsam2StateEstimator(StateEstimator):
self._next_key_counter += 1
return new_key
# ------------------------------------------------------------------
# AZ-383: factor-add bodies.
def add_vio(self, vio: VioOutput) -> None:
raise NotImplementedError(
"Factor adds owned by AZ-383 — VIO/Pose/IMU factor wiring lands there."
"""Append a VIO ``BetweenFactorPose3`` to the running graph.
First call after construction records the initial pose without
emitting a factor (there's no previous frame to relate to);
every subsequent call builds a between-factor between the
previous and the current VIO frame's pose keys and triggers a
single ``handle.update()`` per AC-7.
"""
handle = self._require_handle()
ts_ns = _datetime_to_ns(vio.timestamp)
self._guard_timestamp(ts_ns, source="vio")
curr_pose = _pose_se3_to_gtsam(vio.pose_se3)
curr_key = self.key_for_frame(vio.frame_id)
if self._prev_vio is None:
# First VIO frame — record its absolute pose as an initial
# value but DO NOT emit a between-factor. AZ-388 (AC-5.2)
# owns the first-keyframe seeding once the full chain lands.
self._prev_vio = vio
self._last_added_ts_ns = ts_ns
self._log.debug(
"c5.state.add_vio_first_frame",
extra={
"kind": "c5.state.add_vio_first_frame",
"kv": {"frame_id": str(vio.frame_id), "ts_ns": ts_ns},
},
)
return
prev_pose = _pose_se3_to_gtsam(self._prev_vio.pose_se3)
prev_key = self.key_for_frame(self._prev_vio.frame_id)
relative_pose = prev_pose.between(curr_pose)
noise = _build_pose_noise(vio.covariance_6x6)
factor = gtsam.BetweenFactorPose3(prev_key, curr_key, relative_pose, noise)
try:
handle.add_factor(factor)
self._values.insert(curr_key, curr_pose)
timestamps = _make_timestamp_map([curr_key], ts_ns)
handle.update(self._graph, self._values, timestamps)
except EstimatorDegradedError:
raise
except Exception as exc:
self._log.error(
"c5.state.add_vio_failed",
extra={
"kind": "c5.state.add_vio_failed",
"kv": {"frame_id": str(vio.frame_id), "error": str(exc)},
},
)
raise EstimatorDegradedError(f"add_vio failed: {exc}") from exc
self._reset_staging()
self._prev_vio = vio
self._last_added_ts_ns = ts_ns
self._log.debug(
"c5.state.add_vio_ok",
extra={
"kind": "c5.state.add_vio_ok",
"kv": {
"frame_id": str(vio.frame_id),
"prev_key": prev_key,
"curr_key": curr_key,
},
},
)
def add_pose_anchor(self, pose: PoseEstimate) -> None:
raise NotImplementedError(
"Factor adds owned by AZ-383 — VIO/Pose/IMU factor wiring lands there."
"""Add a C4 pose anchor; mode-dispatched per ``pose.covariance_mode``.
``"marginals"`` path builds a ``PriorFactorPose3`` on the pose
key and triggers ``handle.update()``. ``"jacobian"`` path
SKIPS the iSAM2 add per the AZ-361 contract — the running
estimate is fed but the graph stops growing under throttle.
Both paths bump ``_last_anchor_ns`` so the spoof-promotion
gate (AZ-385) and ``last_anchor_age_ms`` see a recent anchor.
"""
handle = self._require_handle()
ts_ns = _datetime_to_ns(pose.timestamp)
self._guard_timestamp(ts_ns, source="pose_anchor")
pose_key = self.key_for_frame(pose.frame_id)
mode = (pose.covariance_mode or "marginals").lower()
# Both paths update the anchor freshness sentinel. The C5
# contract documents this — even the throttled JACOBIAN path
# counts as a recent anchor for AC-1.3 binning.
self._last_anchor_ns = time.monotonic_ns()
if mode == "marginals":
gtsam_pose = _pose_se3_to_gtsam(pose.pose_se3)
noise = _build_pose_noise(pose.covariance_6x6)
factor = gtsam.PriorFactorPose3(pose_key, gtsam_pose, noise)
try:
handle.add_factor(factor)
self._values.insert(pose_key, gtsam_pose)
timestamps = _make_timestamp_map([pose_key], ts_ns)
handle.update(self._graph, self._values, timestamps)
except EstimatorDegradedError:
raise
except Exception as exc:
self._log.error(
"c5.state.add_pose_anchor_failed",
extra={
"kind": "c5.state.add_pose_anchor_failed",
"kv": {"frame_id": str(pose.frame_id), "error": str(exc)},
},
)
raise EstimatorDegradedError(f"add_pose_anchor failed: {exc}") from exc
self._reset_staging()
self._log.debug(
"c5.state.add_pose_anchor_ok",
extra={
"kind": "c5.state.add_pose_anchor_ok",
"kv": {
"frame_id": str(pose.frame_id),
"pose_key": pose_key,
"covariance_mode": mode,
},
},
)
else:
# JACOBIAN path — AZ-361 contract: skip the iSAM2 add to
# keep the graph from growing under throttle.
self._log.info(
"c5.state.skip_isam2_jacobian_path",
extra={
"kind": "c5.state.skip_isam2_jacobian_path",
"kv": {
"frame_id": str(pose.frame_id),
"pose_key": pose_key,
"covariance_mode": mode,
},
},
)
self._last_added_ts_ns = ts_ns
def add_fc_imu(self, imu_window: ImuWindow) -> None:
"""Build a ``CombinedImuFactor`` from a window of FC IMU samples.
The window's samples are fed into the shared
:class:`ImuPreintegrator`; ``reset_for_new_keyframe()``
closes the PIM and yields the
``PreintegratedCombinedMeasurements`` used by the factor.
The (X, V, B) keyframe chain advances per call:
- First call primes the chain (no factor — no previous
keyframe to relate to).
- Subsequent calls add a factor between the previous and the
current (X, V, B) triple and insert initial values for the
new keys (identity Pose3, zero Velocity, zero Bias).
This chain is intentionally independent of the VIO/pose
frame_id mapping. The composition root will arrange the
higher-level scheduling that fuses the chains; AZ-383 only
owns the per-window factor add.
"""
handle = self._require_handle()
self._guard_timestamp(imu_window.ts_end_ns, source="imu_window")
try:
self._imu_preintegrator.integrate_window(imu_window)
closed_pim = self._imu_preintegrator.reset_for_new_keyframe()
except Exception as exc:
self._log.error(
"c5.state.add_fc_imu_preintegrate_failed",
extra={
"kind": "c5.state.add_fc_imu_preintegrate_failed",
"kv": {"ts_end_ns": imu_window.ts_end_ns, "error": str(exc)},
},
)
raise EstimatorDegradedError(f"add_fc_imu preintegrate failed: {exc}") from exc
curr_x_key = gtsam.symbol("x", self._next_key_counter)
curr_v_key = gtsam.symbol("v", self._imu_keyframe_counter)
curr_b_key = gtsam.symbol("b", self._imu_keyframe_counter)
self._next_key_counter += 1
self._imu_keyframe_counter += 1
if self._prev_imu_x_key is None:
# First IMU window — prime the chain. AZ-388 owns the
# first-keyframe seeding (prior factor + initial values)
# once the AC-5.2 fallback wiring lands.
self._prev_imu_x_key = curr_x_key
self._prev_imu_v_key = curr_v_key
self._prev_imu_b_key = curr_b_key
self._last_added_ts_ns = imu_window.ts_end_ns
self._log.debug(
"c5.state.add_fc_imu_first_window",
extra={
"kind": "c5.state.add_fc_imu_first_window",
"kv": {
"ts_end_ns": imu_window.ts_end_ns,
"x_key": curr_x_key,
"v_key": curr_v_key,
"b_key": curr_b_key,
},
},
)
return
factor = gtsam.CombinedImuFactor(
self._prev_imu_x_key,
self._prev_imu_v_key,
curr_x_key,
curr_v_key,
self._prev_imu_b_key,
curr_b_key,
closed_pim,
)
def add_fc_imu(self, imu_window: ImuTelemetrySample) -> None:
raise NotImplementedError(
"Factor adds owned by AZ-383 — VIO/Pose/IMU factor wiring lands there."
try:
handle.add_factor(factor)
# Initial values for the new triple — identity pose,
# zero velocity, zero bias. The smoother will refine.
self._values.insert(curr_x_key, gtsam.Pose3())
self._values.insert(curr_v_key, np.zeros(3, dtype=np.float64))
self._values.insert(curr_b_key, gtsam.imuBias.ConstantBias())
timestamps = _make_timestamp_map(
[curr_x_key, curr_v_key, curr_b_key], imu_window.ts_end_ns
)
handle.update(self._graph, self._values, timestamps)
except EstimatorDegradedError:
raise
except Exception as exc:
self._log.error(
"c5.state.add_fc_imu_failed",
extra={
"kind": "c5.state.add_fc_imu_failed",
"kv": {"ts_end_ns": imu_window.ts_end_ns, "error": str(exc)},
},
)
raise EstimatorDegradedError(f"add_fc_imu failed: {exc}") from exc
self._reset_staging()
self._prev_imu_x_key = curr_x_key
self._prev_imu_v_key = curr_v_key
self._prev_imu_b_key = curr_b_key
self._last_added_ts_ns = imu_window.ts_end_ns
self._log.debug(
"c5.state.add_fc_imu_ok",
extra={
"kind": "c5.state.add_fc_imu_ok",
"kv": {
"ts_end_ns": imu_window.ts_end_ns,
"x_key": curr_x_key,
"v_key": curr_v_key,
"b_key": curr_b_key,
},
},
)
# ------------------------------------------------------------------
# AZ-384 / AZ-385 / AZ-386 — body still owned by those tasks.
def current_estimate(self) -> EstimatorOutput:
raise NotImplementedError(
"Marginals + outputs owned by AZ-384 — current_estimate body lands there."
@@ -188,6 +483,42 @@ class GtsamIsam2StateEstimator(StateEstimator):
"Marginals + outputs owned by AZ-384 — health_snapshot body lands there."
)
# ------------------------------------------------------------------
# Internals.
def _require_handle(self) -> ISam2GraphHandle:
if self._isam2_handle is None:
raise StateEstimatorConfigError(
"ISam2GraphHandle not attached; call attach_handle(...) before any add_*"
)
return self._isam2_handle
def _guard_timestamp(self, ts_ns: int, *, source: str) -> None:
if ts_ns < self._last_added_ts_ns:
self._log.error(
"c5.state.out_of_order",
extra={
"kind": "c5.state.out_of_order",
"kv": {
"source": source,
"ts_ns": ts_ns,
"last_added_ts_ns": self._last_added_ts_ns,
},
},
)
raise EstimatorDegradedError(
f"out-of-order {source}: ts_ns={ts_ns} < last_added_ts_ns={self._last_added_ts_ns}"
)
def _reset_staging(self) -> None:
"""Clear the staging ``_graph`` + ``_values`` after a successful flush."""
self._graph = gtsam.NonlinearFactorGraph()
self._values = gtsam.Values()
# ----------------------------------------------------------------------
# Module-level helpers.
def create(
*,
@@ -199,10 +530,10 @@ def create(
) -> tuple[StateEstimator, ISam2GraphHandle]:
"""Composition-root factory — returns ``(estimator, handle)`` tuple.
The handle holds a reference to the estimator so the four
``ISam2GraphHandleImpl`` method bodies (defined in
:mod:`...components.c5_state._isam2_handle`) can drive the same
graph the estimator owns.
Construction order matters: the estimator is built first, the
handle is wired against it (the handle holds the estimator
reference for graph access), and the estimator is back-wired with
the handle so its ``add_*`` methods can call into it.
"""
estimator = GtsamIsam2StateEstimator(
config,
@@ -212,6 +543,7 @@ def create(
fdr_client=fdr_client,
)
handle = ISam2GraphHandleImpl(estimator)
estimator.attach_handle(handle)
return estimator, handle
@@ -228,3 +560,57 @@ def register() -> None:
from gps_denied_onboard.runtime_root.state_factory import register_state_estimator
register_state_estimator(_STRATEGY, create)
# ----------------------------------------------------------------------
# Module-level pure helpers (AZ-383).
def _datetime_to_ns(dt: Any) -> int:
"""Convert ``dt.timestamp() * 1e9`` to int ns.
Accepts any object exposing a ``timestamp()`` method (e.g.
:class:`datetime.datetime`); avoids importing ``datetime`` here
so test stubs can pass synthetic time objects.
"""
return int(dt.timestamp() * 1_000_000_000)
def _pose_se3_to_gtsam(pose_se3: Any) -> gtsam.Pose3:
"""Wrap a 4x4 numpy SE(3) matrix into ``gtsam.Pose3``."""
arr = np.asarray(pose_se3, dtype=np.float64)
if arr.shape != (4, 4):
raise ValueError(f"pose_se3 must be 4x4; got shape {arr.shape}")
return gtsam.Pose3(arr)
def _build_pose_noise(covariance: Any | None) -> gtsam.noiseModel.Base:
"""Build a 6-DoF Gaussian noise model from a 6x6 covariance.
Falls back to an isotropic default when the input is ``None`` —
the C5 contract permits this for early-flight VIO frames where
the covariance is not yet computed; the iSAM2 solver penalises
over-tight priors so the default is conservative.
"""
if covariance is None:
return gtsam.noiseModel.Isotropic.Sigma(6, _DEFAULT_POSE_SIGMA)
cov = np.asarray(covariance, dtype=np.float64)
if cov.shape != (6, 6):
raise ValueError(f"covariance_6x6 must be 6x6; got shape {cov.shape}")
return gtsam.noiseModel.Gaussian.Covariance(cov)
def _make_timestamp_map(
keys: list[int], ts_ns: int
) -> gtsam_unstable.FixedLagSmootherKeyTimestampMap:
"""Build a ``FixedLagSmootherKeyTimestampMap`` for the smoother.
The smoother needs per-key arrival timestamps in seconds (its
sliding-window evict logic uses them); we feed every newly
inserted key the same window-end timestamp.
"""
ts_map = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
ts_seconds = ts_ns * 1e-9
for key in keys:
ts_map.insert((key, ts_seconds))
return ts_map
@@ -20,7 +20,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
from gps_denied_onboard._types.fc import ImuTelemetrySample
from gps_denied_onboard._types.nav import ImuWindow
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard._types.state import (
EstimatorHealth,
@@ -52,8 +52,14 @@ class StateEstimator(Protocol):
factor + update cycle.
"""
def add_fc_imu(self, imu_window: ImuTelemetrySample) -> None:
"""Add an FC IMU sample / window to the iSAM2 preintegrator."""
def add_fc_imu(self, imu_window: ImuWindow) -> None:
"""Add an FC IMU window to the iSAM2 preintegrator (CombinedImuFactor).
AZ-383 fix-up: AZ-381 incorrectly annotated this as
``ImuTelemetrySample`` (a single sample); the C5 contract has
always specified ``ImuWindow`` because the shared
``ImuPreintegrator`` operates on batches.
"""
def current_estimate(self) -> EstimatorOutput:
"""Return the latest (non-smoothed) estimate. Never returns ``None``."""
@@ -369,24 +369,14 @@ def test_ac9_update_emits_success_log(caplog: pytest.LogCaptureFixture) -> None:
# ---------------------------------------------------------------------
# AC-10: StateEstimator Protocol methods still raise NotImplementedError
def test_ac10_add_vio_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_vio(mock.MagicMock())
def test_ac10_add_pose_anchor_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_pose_anchor(mock.MagicMock())
def test_ac10_add_fc_imu_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_fc_imu(mock.MagicMock())
#
# Note: The three ``add_*`` methods USED to raise ``NotImplementedError``
# naming AZ-383 in AZ-382's scope. AZ-383 has since landed and replaced
# those bodies with real factor adds; the now-active behaviour is
# tested in ``tests/unit/c5_state/test_az383_factor_adds.py``. Only the
# three output methods (``current_estimate`` / ``smoothed_history`` /
# ``health_snapshot``) still raise NotImplementedError pointing at the
# next-task AZ-384.
def test_ac10_current_estimate_raises_named_az384() -> None:
@@ -0,0 +1,407 @@
"""AZ-383 — GtsamIsam2StateEstimator add_vio / add_pose_anchor / add_fc_imu.
Eight ACs from ``_docs/02_tasks/done/AZ-383_c5_factor_adds.md``:
- AC-1 VIO factor add — ``BetweenFactorPose3`` with correct keys +
noise model.
- AC-2 Pose-anchor MARGINALS path — ``PriorFactorPose3`` + update,
``_last_anchor_ns`` bumped.
- AC-3 Pose-anchor JACOBIAN path SKIPS iSAM2 add but bumps
``_last_anchor_ns`` and emits INFO log.
- AC-4 IMU factor add via the shared ``ImuPreintegrator``.
- AC-5 Timestamp ordering — out-of-order ``add_*`` raises
:class:`EstimatorDegradedError`.
- AC-6 Defensive logging on every factor add (R05).
- AC-7 Each ``add_*`` triggers ``handle.update()`` exactly once.
- AC-8 ``_last_anchor_ns`` accuracy via ``last_anchor_age_ms``.
The handle is replaced with a stub on every test so the asserts
fire on call counts + factor types rather than iSAM2 convergence
(real iSAM2 first-keyframe seeding is AZ-388's responsibility).
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from unittest import mock
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.nav import ImuSample, ImuWindow
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create,
)
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
def _build_estimator(*, with_stub_handle: bool = True) -> GtsamIsam2StateEstimator:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, _real_handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
if with_stub_handle:
estimator._isam2_handle = mock.MagicMock()
return estimator
def _make_vio(*, frame_id: int, t_seconds: float, pose: np.ndarray | None = None) -> VioOutput:
return VioOutput(
frame_id=frame_id,
timestamp=datetime.fromtimestamp(t_seconds, tz=timezone.utc),
pose_se3=pose if pose is not None else np.eye(4),
covariance_6x6=np.eye(6) * 0.01,
)
def _make_pose(
*,
frame_id: int,
t_seconds: float,
covariance_mode: str = "marginals",
pose: np.ndarray | None = None,
) -> PoseEstimate:
return PoseEstimate(
frame_id=frame_id,
timestamp=datetime.fromtimestamp(t_seconds, tz=timezone.utc),
pose_se3=pose if pose is not None else np.eye(4),
covariance_6x6=np.eye(6) * 0.01,
covariance_mode=covariance_mode,
)
def _make_imu_window(*, ts_start_ns: int, ts_end_ns: int) -> ImuWindow:
samples = (
ImuSample(ts_ns=ts_start_ns, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0)),
ImuSample(ts_ns=ts_end_ns, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0)),
)
return ImuWindow(samples=samples, ts_start_ns=ts_start_ns, ts_end_ns=ts_end_ns)
# ---------------------------------------------------------------------
# AC-1: VIO factor add — BetweenFactorPose3 with correct keys + noise
def test_ac1_vio_first_call_records_without_factor() -> None:
estimator = _build_estimator()
vio = _make_vio(frame_id=1, t_seconds=1000.0)
estimator.add_vio(vio)
# First call MUST NOT emit a factor or update — there's no prev frame.
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
assert estimator._prev_vio is vio
def test_ac1_vio_second_call_adds_between_factor() -> None:
estimator = _build_estimator()
vio1 = _make_vio(frame_id=1, t_seconds=1000.0)
vio2 = _make_vio(frame_id=2, t_seconds=1001.0)
estimator.add_vio(vio1)
estimator.add_vio(vio2)
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.BetweenFactorPose3)
# Keys should be the consecutive x-symbols
assert factor.keys()[0] == gtsam.symbol("x", 0)
assert factor.keys()[1] == gtsam.symbol("x", 1)
# ---------------------------------------------------------------------
# AC-2: Pose-anchor MARGINALS path
def test_ac2_pose_anchor_marginals_adds_prior_factor_and_updates() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
estimator.add_pose_anchor(pose)
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.PriorFactorPose3)
estimator._isam2_handle.update.assert_called_once()
def test_ac2_pose_anchor_marginals_bumps_last_anchor_ns() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
before_ns = estimator._last_anchor_ns
estimator.add_pose_anchor(pose)
after_ns = estimator._last_anchor_ns
assert after_ns > before_ns
assert after_ns > 0
# ---------------------------------------------------------------------
# AC-3: Pose-anchor JACOBIAN path SKIPS factor add but bumps _last_anchor_ns
def test_ac3_pose_anchor_jacobian_skips_factor_add(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
with caplog.at_level(logging.INFO):
estimator.add_pose_anchor(pose)
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
skip_records = [r for r in caplog.records if r.kind == "c5.state.skip_isam2_jacobian_path"]
assert len(skip_records) == 1
def test_ac3_pose_anchor_jacobian_still_bumps_last_anchor_ns() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
before_ns = estimator._last_anchor_ns
estimator.add_pose_anchor(pose)
assert estimator._last_anchor_ns > before_ns
# ---------------------------------------------------------------------
# AC-4: IMU factor add via shared ImuPreintegrator
def test_ac4_imu_first_window_primes_chain_no_factor() -> None:
estimator = _build_estimator()
# Stub preintegrator returns a real PIM-like sentinel
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = mock.MagicMock()
window = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
estimator.add_fc_imu(window)
estimator._imu_preintegrator.integrate_window.assert_called_once_with(window)
estimator._imu_preintegrator.reset_for_new_keyframe.assert_called_once()
# First window primes the chain — no factor yet
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
def test_ac4_imu_second_window_adds_combined_imu_factor() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = (
gtsam.PreintegratedCombinedMeasurements(
gtsam.PreintegrationCombinedParams.MakeSharedU(9.81),
gtsam.imuBias.ConstantBias(),
)
)
win1 = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
win2 = _make_imu_window(ts_start_ns=2_000_001, ts_end_ns=3_000_000)
estimator.add_fc_imu(win1)
estimator.add_fc_imu(win2)
assert estimator._imu_preintegrator.integrate_window.call_count == 2
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.CombinedImuFactor)
# ---------------------------------------------------------------------
# AC-5: Timestamp ordering
def test_ac5_vio_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
later = _make_vio(frame_id=1, t_seconds=1001.0)
earlier = _make_vio(frame_id=2, t_seconds=1000.0)
estimator.add_vio(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order vio"):
estimator.add_vio(earlier)
def test_ac5_pose_anchor_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
later = _make_pose(frame_id=10, t_seconds=1001.0)
earlier = _make_pose(frame_id=11, t_seconds=1000.0)
estimator.add_pose_anchor(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order pose_anchor"):
estimator.add_pose_anchor(earlier)
def test_ac5_imu_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = mock.MagicMock()
later = _make_imu_window(ts_start_ns=2_000_000, ts_end_ns=3_000_000)
earlier = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
estimator.add_fc_imu(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order imu_window"):
estimator.add_fc_imu(earlier)
def test_ac5_emits_out_of_order_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
later = _make_vio(frame_id=1, t_seconds=1001.0)
earlier = _make_vio(frame_id=2, t_seconds=1000.0)
estimator.add_vio(later)
with caplog.at_level(logging.ERROR):
with pytest.raises(EstimatorDegradedError):
estimator.add_vio(earlier)
err_records = [r for r in caplog.records if r.kind == "c5.state.out_of_order"]
assert len(err_records) == 1
assert err_records[0].kv["source"] == "vio"
# ---------------------------------------------------------------------
# AC-6: Defensive logging on every factor add (R05)
def test_ac6_vio_success_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
vio1 = _make_vio(frame_id=1, t_seconds=1000.0)
vio2 = _make_vio(frame_id=2, t_seconds=1001.0)
with caplog.at_level(logging.DEBUG):
estimator.add_vio(vio1)
estimator.add_vio(vio2)
ok_records = [r for r in caplog.records if r.kind == "c5.state.add_vio_ok"]
assert len(ok_records) == 1
def test_ac6_pose_anchor_success_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
with caplog.at_level(logging.DEBUG):
estimator.add_pose_anchor(pose)
ok_records = [r for r in caplog.records if r.kind == "c5.state.add_pose_anchor_ok"]
assert len(ok_records) == 1
def test_ac6_vio_failure_emits_error_log_and_raises(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
# First call records the prev_vio without going through add_factor.
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))
# Make the handle fail.
estimator._isam2_handle.add_factor.side_effect = RuntimeError("synthetic")
with caplog.at_level(logging.ERROR):
with pytest.raises(EstimatorDegradedError, match="synthetic"):
estimator.add_vio(_make_vio(frame_id=2, t_seconds=1001.0))
err_records = [r for r in caplog.records if r.kind == "c5.state.add_vio_failed"]
assert len(err_records) == 1
# ---------------------------------------------------------------------
# AC-7: Each add_* triggers handle.update() exactly once
def test_ac7_vio_triggers_update_once() -> None:
estimator = _build_estimator()
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))
estimator.add_vio(_make_vio(frame_id=2, t_seconds=1001.0))
# Only the second call (which adds a factor) triggers update.
assert estimator._isam2_handle.update.call_count == 1
def test_ac7_pose_anchor_marginals_triggers_update_once() -> None:
estimator = _build_estimator()
estimator.add_pose_anchor(_make_pose(frame_id=10, t_seconds=1000.0))
assert estimator._isam2_handle.update.call_count == 1
def test_ac7_pose_anchor_jacobian_triggers_no_update() -> None:
estimator = _build_estimator()
estimator.add_pose_anchor(_make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian"))
assert estimator._isam2_handle.update.call_count == 0
def test_ac7_imu_second_window_triggers_update_once() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = (
gtsam.PreintegratedCombinedMeasurements(
gtsam.PreintegrationCombinedParams.MakeSharedU(9.81),
gtsam.imuBias.ConstantBias(),
)
)
estimator.add_fc_imu(_make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000))
estimator.add_fc_imu(_make_imu_window(ts_start_ns=2_000_001, ts_end_ns=3_000_000))
assert estimator._isam2_handle.update.call_count == 1
# ---------------------------------------------------------------------
# AC-8: _last_anchor_ns accuracy via last_anchor_age_ms
def test_ac8_last_anchor_age_ms_after_anchor_is_small() -> None:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, real_handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
# Use the JACOBIAN path so we exercise the _last_anchor_ns bump
# without needing real iSAM2 seeding (AZ-388's territory).
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
estimator.add_pose_anchor(pose)
age_ms = real_handle.last_anchor_age_ms()
assert 0 <= age_ms < 1000
# ---------------------------------------------------------------------
# Cross-cutting: estimator requires a handle
def test_no_handle_raises_state_estimator_config_error() -> None:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
# Construct directly without attach_handle.
estimator = GtsamIsam2StateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
from gps_denied_onboard.components.c5_state.errors import StateEstimatorConfigError
with pytest.raises(StateEstimatorConfigError, match="not attached"):
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))