Files
gps-denied-onboard/tests/unit/c5_state/test_az383_factor_adds.py
T
Oleksandr Bezdieniezhnykh 6c7d24f7e0 [AZ-331] C1 VioStrategy: Protocol + DTOs + factory + C5 migration
Freezes the c1_vio Public API per
_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md v1.0.0:

- VioStrategy Protocol (4 methods: process_frame, reset_to_warm_start,
  health_snapshot, current_strategy_label) in
  components/c1_vio/interface.py.
- DTOs (VioOutput, VioHealth, FeatureQuality, WarmStartPose) + VioState
  enum in _types/nav.py — L1 placement so C5 + C13 consume them without
  crossing the components.* boundary (AZ-270 AC-6). The new VioOutput
  shape (frame_id: str, relative_pose_T: gtsam.Pose3,
  pose_covariance_6x6, imu_bias, feature_quality, emitted_at_ns)
  replaces the AZ-263 scaffolding in _types/vio.py, which is now
  deleted.
- VioError family (VioInitializingError / VioDegradedError /
  VioFatalError) in components/c1_vio/errors.py. Documented
  rationale: the degraded-operation path returns a VioOutput with
  inflated covariance + VioHealth.state=DEGRADED rather than raising
  VioDegradedError — the error type exists only for the rare
  degraded->fatal transition.
- C1VioConfig per-component config block (strategy enum,
  lost_frame_threshold default 9, warm_start_max_frames default 5)
  with constructor-time validation rejecting unknown strategy labels.
- StrategyNotAvailableError added to runtime_root/errors.py;
  composition-time error distinct from the VioError family.
- Composition-root factory build_vio_strategy in
  runtime_root/vio_factory.py with three BUILD_* gates (BUILD_OKVIS2,
  BUILD_VINS_MONO, BUILD_KLT_RANSAC). Concrete strategy modules are
  imported lazily via __import__ AFTER the flag check — Tier-0
  workstation builds with the flag OFF MUST NOT load the strategy
  module (Risk-2 / I-5; verifiable via sys.modules).
- 36 conformance tests cover all 9 ACs + NFR-perf-factory
  (p99 build under 200 ms x 1000 calls) + NFR-reliability-error-family.
  AC-8 introspects the contract file's Shape table and asserts method
  parity against the runtime Protocol; AC-9 asserts the frame_id
  annotation is 'str' (PEP-563 stringified).

C5 migration (consumers of the new VioOutput shape):
- gtsam_isam2_estimator.py + eskf_baseline.py: replaced
  vio.timestamp -> vio.emitted_at_ns (drops _datetime_to_ns on the
  VIO path), vio.pose_se3 -> vio.relative_pose_T (gtsam.Pose3 direct;
  drops _pose_se3_to_gtsam / _pose_se3_to_array), vio.covariance_6x6
  -> vio.pose_covariance_6x6 (rename).
- key_for_frame signature widened to UUID | int | str to accept the
  new str frame_id.
- 4 C5 test files migrated to the new VioOutput shape with helper
  fixtures producing ImuBias + FeatureQuality + str frame_id.
- c5_state/interface.py TYPE_CHECKING import path updated.

Bootstrap healthcheck + test_types_importable updated to drop the
deleted _types/vio module and pick up _types/inference (AZ-297) in
the same sweep.

Full unit-test sweep: 884 passed, 2 pre-existing environment skips
(cmake, actionlint).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 04:44:31 +03:00

436 lines
15 KiB
Python

"""AZ-383 — GtsamIsam2StateEstimator add_vio / add_pose_anchor / add_fc_imu.
Eight ACs from ``_docs/02_tasks/done/AZ-383_c5_factor_adds.md``:
- AC-1 VIO factor add — ``BetweenFactorPose3`` with correct keys +
noise model.
- AC-2 Pose-anchor MARGINALS path — ``PriorFactorPose3`` + update,
``_last_anchor_ns`` bumped.
- AC-3 Pose-anchor JACOBIAN path SKIPS iSAM2 add but bumps
``_last_anchor_ns`` and emits INFO log.
- AC-4 IMU factor add via the shared ``ImuPreintegrator``.
- AC-5 Timestamp ordering — out-of-order ``add_*`` raises
:class:`EstimatorDegradedError`.
- AC-6 Defensive logging on every factor add (R05).
- AC-7 Each ``add_*`` triggers ``handle.update()`` exactly once.
- AC-8 ``_last_anchor_ns`` accuracy via ``last_anchor_age_ms``.
The handle is replaced with a stub on every test so the asserts
fire on call counts + factor types rather than iSAM2 convergence
(real iSAM2 first-keyframe seeding is AZ-388's responsibility).
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from unittest import mock
from uuid import UUID
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.nav import (
FeatureQuality,
ImuBias,
ImuSample,
ImuWindow,
VioOutput,
)
from gps_denied_onboard._types.pose import (
CovarianceMode,
PoseEstimate,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create,
)
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
def _build_estimator(*, with_stub_handle: bool = True) -> GtsamIsam2StateEstimator:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, _real_handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
if with_stub_handle:
estimator._isam2_handle = mock.MagicMock()
return estimator
_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
_NEUTRAL_FQ = FeatureQuality(
tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0,
)
def _make_vio(*, frame_id: int, t_seconds: float, pose: np.ndarray | None = None) -> VioOutput:
matrix = pose if pose is not None else np.eye(4)
return VioOutput(
frame_id=str(frame_id),
relative_pose_T=gtsam.Pose3(matrix),
pose_covariance_6x6=np.eye(6) * 0.01,
imu_bias=_ZERO_BIAS,
feature_quality=_NEUTRAL_FQ,
emitted_at_ns=int(t_seconds * 1_000_000_000),
)
def _make_pose(
*,
frame_id: int,
t_seconds: float,
covariance_mode: CovarianceMode | str = CovarianceMode.MARGINALS,
) -> PoseEstimate:
mode = (
covariance_mode
if isinstance(covariance_mode, CovarianceMode)
else CovarianceMode(covariance_mode.lower())
)
return PoseEstimate(
frame_id=UUID(int=frame_id),
position_wgs84=LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
covariance_6x6=np.eye(6) * 0.01,
covariance_mode=mode,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=int(t_seconds * 1_000_000_000),
)
def _make_imu_window(*, ts_start_ns: int, ts_end_ns: int) -> ImuWindow:
samples = (
ImuSample(ts_ns=ts_start_ns, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0)),
ImuSample(ts_ns=ts_end_ns, accel_xyz=(0.0, 0.0, 9.81), gyro_xyz=(0.0, 0.0, 0.0)),
)
return ImuWindow(samples=samples, ts_start_ns=ts_start_ns, ts_end_ns=ts_end_ns)
# ---------------------------------------------------------------------
# AC-1: VIO factor add — BetweenFactorPose3 with correct keys + noise
def test_ac1_vio_first_call_records_without_factor() -> None:
estimator = _build_estimator()
vio = _make_vio(frame_id=1, t_seconds=1000.0)
estimator.add_vio(vio)
# First call MUST NOT emit a factor or update — there's no prev frame.
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
assert estimator._prev_vio is vio
def test_ac1_vio_second_call_adds_between_factor() -> None:
estimator = _build_estimator()
vio1 = _make_vio(frame_id=1, t_seconds=1000.0)
vio2 = _make_vio(frame_id=2, t_seconds=1001.0)
estimator.add_vio(vio1)
estimator.add_vio(vio2)
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.BetweenFactorPose3)
# Keys should be the consecutive x-symbols
assert factor.keys()[0] == gtsam.symbol("x", 0)
assert factor.keys()[1] == gtsam.symbol("x", 1)
# ---------------------------------------------------------------------
# AC-2: Pose-anchor MARGINALS path
def test_ac2_pose_anchor_marginals_adds_prior_factor_and_updates() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
estimator.add_pose_anchor(pose)
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.PriorFactorPose3)
estimator._isam2_handle.update.assert_called_once()
def test_ac2_pose_anchor_marginals_bumps_last_anchor_ns() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
before_ns = estimator._last_anchor_ns
estimator.add_pose_anchor(pose)
after_ns = estimator._last_anchor_ns
assert after_ns > before_ns
assert after_ns > 0
# ---------------------------------------------------------------------
# AC-3: Pose-anchor JACOBIAN path SKIPS factor add but bumps _last_anchor_ns
def test_ac3_pose_anchor_jacobian_skips_factor_add(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
with caplog.at_level(logging.INFO):
estimator.add_pose_anchor(pose)
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
skip_records = [r for r in caplog.records if r.kind == "c5.state.skip_isam2_jacobian_path"]
assert len(skip_records) == 1
def test_ac3_pose_anchor_jacobian_still_bumps_last_anchor_ns() -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
before_ns = estimator._last_anchor_ns
estimator.add_pose_anchor(pose)
assert estimator._last_anchor_ns > before_ns
# ---------------------------------------------------------------------
# AC-4: IMU factor add via shared ImuPreintegrator
def test_ac4_imu_first_window_primes_chain_no_factor() -> None:
estimator = _build_estimator()
# Stub preintegrator returns a real PIM-like sentinel
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = mock.MagicMock()
window = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
estimator.add_fc_imu(window)
estimator._imu_preintegrator.integrate_window.assert_called_once_with(window)
estimator._imu_preintegrator.reset_for_new_keyframe.assert_called_once()
# First window primes the chain — no factor yet
estimator._isam2_handle.add_factor.assert_not_called()
estimator._isam2_handle.update.assert_not_called()
def test_ac4_imu_second_window_adds_combined_imu_factor() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = (
gtsam.PreintegratedCombinedMeasurements(
gtsam.PreintegrationCombinedParams.MakeSharedU(9.81),
gtsam.imuBias.ConstantBias(),
)
)
win1 = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
win2 = _make_imu_window(ts_start_ns=2_000_001, ts_end_ns=3_000_000)
estimator.add_fc_imu(win1)
estimator.add_fc_imu(win2)
assert estimator._imu_preintegrator.integrate_window.call_count == 2
estimator._isam2_handle.add_factor.assert_called_once()
factor = estimator._isam2_handle.add_factor.call_args[0][0]
assert isinstance(factor, gtsam.CombinedImuFactor)
# ---------------------------------------------------------------------
# AC-5: Timestamp ordering
def test_ac5_vio_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
later = _make_vio(frame_id=1, t_seconds=1001.0)
earlier = _make_vio(frame_id=2, t_seconds=1000.0)
estimator.add_vio(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order vio"):
estimator.add_vio(earlier)
def test_ac5_pose_anchor_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
later = _make_pose(frame_id=10, t_seconds=1001.0)
earlier = _make_pose(frame_id=11, t_seconds=1000.0)
estimator.add_pose_anchor(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order pose_anchor"):
estimator.add_pose_anchor(earlier)
def test_ac5_imu_out_of_order_raises_degraded() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = mock.MagicMock()
later = _make_imu_window(ts_start_ns=2_000_000, ts_end_ns=3_000_000)
earlier = _make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000)
estimator.add_fc_imu(later)
with pytest.raises(EstimatorDegradedError, match="out-of-order imu_window"):
estimator.add_fc_imu(earlier)
def test_ac5_emits_out_of_order_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
later = _make_vio(frame_id=1, t_seconds=1001.0)
earlier = _make_vio(frame_id=2, t_seconds=1000.0)
estimator.add_vio(later)
with caplog.at_level(logging.ERROR):
with pytest.raises(EstimatorDegradedError):
estimator.add_vio(earlier)
err_records = [r for r in caplog.records if r.kind == "c5.state.out_of_order"]
assert len(err_records) == 1
assert err_records[0].kv["source"] == "vio"
# ---------------------------------------------------------------------
# AC-6: Defensive logging on every factor add (R05)
def test_ac6_vio_success_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
vio1 = _make_vio(frame_id=1, t_seconds=1000.0)
vio2 = _make_vio(frame_id=2, t_seconds=1001.0)
with caplog.at_level(logging.DEBUG):
estimator.add_vio(vio1)
estimator.add_vio(vio2)
ok_records = [r for r in caplog.records if r.kind == "c5.state.add_vio_ok"]
assert len(ok_records) == 1
def test_ac6_pose_anchor_success_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="marginals")
with caplog.at_level(logging.DEBUG):
estimator.add_pose_anchor(pose)
ok_records = [r for r in caplog.records if r.kind == "c5.state.add_pose_anchor_ok"]
assert len(ok_records) == 1
def test_ac6_vio_failure_emits_error_log_and_raises(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
# First call records the prev_vio without going through add_factor.
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))
# Make the handle fail.
estimator._isam2_handle.add_factor.side_effect = RuntimeError("synthetic")
with caplog.at_level(logging.ERROR):
with pytest.raises(EstimatorDegradedError, match="synthetic"):
estimator.add_vio(_make_vio(frame_id=2, t_seconds=1001.0))
err_records = [r for r in caplog.records if r.kind == "c5.state.add_vio_failed"]
assert len(err_records) == 1
# ---------------------------------------------------------------------
# AC-7: Each add_* triggers handle.update() exactly once
def test_ac7_vio_triggers_update_once() -> None:
estimator = _build_estimator()
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))
estimator.add_vio(_make_vio(frame_id=2, t_seconds=1001.0))
# Only the second call (which adds a factor) triggers update.
assert estimator._isam2_handle.update.call_count == 1
def test_ac7_pose_anchor_marginals_triggers_update_once() -> None:
estimator = _build_estimator()
estimator.add_pose_anchor(_make_pose(frame_id=10, t_seconds=1000.0))
assert estimator._isam2_handle.update.call_count == 1
def test_ac7_pose_anchor_jacobian_triggers_no_update() -> None:
estimator = _build_estimator()
estimator.add_pose_anchor(_make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian"))
assert estimator._isam2_handle.update.call_count == 0
def test_ac7_imu_second_window_triggers_update_once() -> None:
estimator = _build_estimator()
estimator._imu_preintegrator.reset_for_new_keyframe.return_value = (
gtsam.PreintegratedCombinedMeasurements(
gtsam.PreintegrationCombinedParams.MakeSharedU(9.81),
gtsam.imuBias.ConstantBias(),
)
)
estimator.add_fc_imu(_make_imu_window(ts_start_ns=1_000_000, ts_end_ns=2_000_000))
estimator.add_fc_imu(_make_imu_window(ts_start_ns=2_000_001, ts_end_ns=3_000_000))
assert estimator._isam2_handle.update.call_count == 1
# ---------------------------------------------------------------------
# AC-8: _last_anchor_ns accuracy via last_anchor_age_ms
def test_ac8_last_anchor_age_ms_after_anchor_is_small() -> None:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, real_handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
# Use the JACOBIAN path so we exercise the _last_anchor_ns bump
# without needing real iSAM2 seeding (AZ-388's territory).
pose = _make_pose(frame_id=10, t_seconds=1000.0, covariance_mode="jacobian")
estimator.add_pose_anchor(pose)
age_ms = real_handle.last_anchor_age_ms()
assert 0 <= age_ms < 1000
# ---------------------------------------------------------------------
# Cross-cutting: estimator requires a handle
def test_no_handle_raises_state_estimator_config_error() -> None:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
# Construct directly without attach_handle.
estimator = GtsamIsam2StateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
from gps_denied_onboard.components.c5_state.errors import StateEstimatorConfigError
with pytest.raises(StateEstimatorConfigError, match="not attached"):
estimator.add_vio(_make_vio(frame_id=1, t_seconds=1000.0))