Files
gps-denied-onboard/tests/unit/c5_state/test_az382_isam2_smoother_wiring.py
T
Oleksandr Bezdieniezhnykh 8b394a98c6 [AZ-382] C5 GtsamIsam2StateEstimator skeleton + real iSAM2 handle bodies
- Add GtsamIsam2StateEstimator owning the GTSAM substrate:
  gtsam.ISAM2(ISAM2Params()) + gtsam_unstable.IncrementalFixedLagSmoother
  (K * 1/3 s window per D-C5-3) + NonlinearFactorGraph + Values.
- Module-level create(...) factory + register() helper for
  register_state_estimator("gtsam_isam2", create). Opt-in registration
  per ADR-002 — no auto-import.
- Key-management policy: key_for_frame(UUID) -> int via
  gtsam.symbol('x', counter); idempotent re-lookup.
- Replace all four NotImplementedError bodies in _isam2_handle.py with
  real GTSAM calls:
  * add_factor → estimator._graph.add(factor); R05 defensive logging
    on success/failure; EstimatorDegradedError on failure.
  * update → _isam2.update + _smoother.update; empty
    FixedLagSmootherKeyTimestampMap substituted for timestamps=None;
    EstimatorFatalError on either failure.
  * compute_marginals → gtsam.Marginals(getFactorsUnsafe(),
    calculateEstimate()).
  * last_anchor_age_ms → (monotonic_ns - _last_anchor_ns) // 1e6.
- StateEstimator Protocol methods on the estimator still raise
  NotImplementedError naming AZ-383 (factor adds) / AZ-384
  (marginals + outputs).
- AZ-382 AC tests: 27 cases covering 10/10 ACs + factory integration.
- AZ-381 test_ac8_handle_methods_raise_named_task removed (obsolete:
  bodies are real now); test_ac8_handle_is_isam2_graph_handle retained.
- Full suite: 547 passed (+26 vs B12), 2 skipped.
- Impl report: _docs/03_implementation/batch_13_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 05:51:23 +03:00

444 lines
15 KiB
Python

"""AZ-382 — GtsamIsam2StateEstimator + ISam2GraphHandleImpl real-body tests.
Ten ACs from ``_docs/02_tasks/done/AZ-382_c5_isam2_smoother_wiring.md``
(authored todo, archived here once the task transitions to Done):
- AC-1 Construction succeeds; ``_isam2`` / ``_smoother`` / ``_graph``
/ ``_values`` / ``_key_for_frame`` initialised.
- AC-2 Key-management policy assigns unique GTSAM keys via
``gtsam.symbol('x', counter)``; repeat lookups return the same
key.
- AC-3 ``IncrementalFixedLagSmoother`` instantiated with
``K * frame_period_s``.
- AC-4 ``keyframe_window_size = 5`` rejected at config-load
(delegates to the AZ-381 ``C5StateConfig.__post_init__``).
- AC-5 ``ISam2GraphHandleImpl.add_factor`` real body — appends to
``estimator._graph``; failure raises
:class:`EstimatorDegradedError` AND logs the failure record.
- AC-6 ``ISam2GraphHandleImpl.update`` real body — iSAM2 +
smoother advance; failure raises
:class:`EstimatorFatalError`.
- AC-7 ``ISam2GraphHandleImpl.compute_marginals`` returns a real
``gtsam.Marginals`` instance.
- AC-8 ``ISam2GraphHandleImpl.last_anchor_age_ms`` returns a very
large number until an anchor lands.
- AC-9 Defensive logging emitted on every mutation (success + failure).
- AC-10 ``StateEstimator`` Protocol methods on the estimator still
raise ``NotImplementedError`` naming AZ-383 / AZ-384.
"""
from __future__ import annotations
import logging
from unittest import mock
from uuid import uuid4
import gtsam
import gtsam_unstable
import pytest
from gps_denied_onboard.components.c5_state._isam2_handle import (
ISam2GraphHandle,
ISam2GraphHandleImpl,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import (
EstimatorDegradedError,
EstimatorFatalError,
)
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
_FRAME_PERIOD_S,
GtsamIsam2StateEstimator,
create,
register,
)
from gps_denied_onboard.components.c5_state.interface import StateEstimator
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.runtime_root.state_factory import (
build_state_estimator,
clear_state_registry,
)
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
def _build_config(*, strategy: str = "gtsam_isam2", keyframe_window_size: int = 15):
block = C5StateConfig(strategy=strategy, keyframe_window_size=keyframe_window_size)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
return cfg
def _build_estimator(*, keyframe_window_size: int = 15) -> GtsamIsam2StateEstimator:
cfg = _build_config(keyframe_window_size=keyframe_window_size)
return GtsamIsam2StateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
# ---------------------------------------------------------------------
# AC-1: construction
def test_ac1_construction_initialises_substrate() -> None:
estimator = _build_estimator(keyframe_window_size=15)
assert isinstance(estimator._isam2, gtsam.ISAM2)
assert isinstance(estimator._smoother, gtsam_unstable.IncrementalFixedLagSmoother)
assert isinstance(estimator._graph, gtsam.NonlinearFactorGraph)
assert isinstance(estimator._values, gtsam.Values)
assert estimator._key_for_frame == {}
assert estimator._next_key_counter == 0
assert estimator._last_anchor_ns == 0
assert estimator._graph.size() == 0
assert isinstance(estimator, StateEstimator)
def test_ac1_construction_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None:
# ``get_logger`` resets the named logger to NOTSET, which masks
# ``caplog.at_level(DEBUG, logger=...)``. Bump the root level
# instead so the DEBUG record propagates to caplog's handler.
with caplog.at_level(logging.DEBUG):
_build_estimator(keyframe_window_size=12)
records = [r for r in caplog.records if r.kind == "c5.state.isam2_initialised"]
assert len(records) == 1
assert records[0].kv["keyframe_window_size"] == 12
assert records[0].kv["total_factors_initial"] == 0
# ---------------------------------------------------------------------
# AC-2: key-management policy
def test_ac2_key_for_frame_assigns_unique_keys() -> None:
estimator = _build_estimator()
u1, u2, u3 = uuid4(), uuid4(), uuid4()
k1 = estimator.key_for_frame(u1)
k2 = estimator.key_for_frame(u2)
k3 = estimator.key_for_frame(u3)
assert k1 != k2 != k3
assert estimator._next_key_counter == 3
assert estimator._key_for_frame == {u1: k1, u2: k2, u3: k3}
def test_ac2_key_for_frame_is_idempotent() -> None:
estimator = _build_estimator()
u1 = uuid4()
first = estimator.key_for_frame(u1)
second = estimator.key_for_frame(u1)
assert first == second
assert estimator._next_key_counter == 1
def test_ac2_keys_use_x_namespace() -> None:
estimator = _build_estimator()
u1 = uuid4()
key = estimator.key_for_frame(u1)
expected_first_x_key = gtsam.symbol("x", 0)
assert key == expected_first_x_key
# ---------------------------------------------------------------------
# AC-3: window size respected
def test_ac3_smoother_window_is_k_times_frame_period() -> None:
# Arrange / Act
estimator = _build_estimator(keyframe_window_size=15)
expected_window_seconds = 15 * _FRAME_PERIOD_S
# Hit the smoother through a controlled smoke path to be sure the
# window is honored — direct introspection is C++-private. We use
# the smoother's ``smootherLag()`` getter (FixedLagSmoother base).
assert estimator._smoother.smootherLag() == pytest.approx(expected_window_seconds, rel=1e-6)
# ---------------------------------------------------------------------
# AC-4: window size validation
def test_ac4_window_below_min_rejected_by_config() -> None:
with pytest.raises(ConfigError, match=r"\[10, 20\]"):
C5StateConfig(keyframe_window_size=5)
def test_ac4_window_above_max_rejected_by_config() -> None:
with pytest.raises(ConfigError, match=r"\[10, 20\]"):
C5StateConfig(keyframe_window_size=21)
# ---------------------------------------------------------------------
# AC-5: ISam2GraphHandleImpl.add_factor real body
def test_ac5_add_factor_appends_to_graph(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
key = estimator.key_for_frame(uuid4())
factor = gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1))
with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"):
handle.add_factor(factor)
assert estimator._graph.size() == 1
ok_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_ok"]
assert len(ok_records) == 1
assert ok_records[0].kv["factor_type"] == "PriorFactorPose3"
assert ok_records[0].kv["graph_size"] == 1
def test_ac5_add_factor_failure_raises_degraded_and_logs(
caplog: pytest.LogCaptureFixture,
) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
# Replace the graph with a stub whose ``add`` raises so we can test
# the failure path without depending on a specific GTSAM exception.
failing_graph = mock.MagicMock()
failing_graph.add.side_effect = RuntimeError("synthetic graph failure")
estimator._graph = failing_graph
with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"):
with pytest.raises(EstimatorDegradedError, match="synthetic graph failure"):
handle.add_factor(mock.MagicMock())
err_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_failed"]
assert len(err_records) == 1
assert "synthetic graph failure" in err_records[0].kv["error"]
# ---------------------------------------------------------------------
# AC-6: ISam2GraphHandleImpl.update real body
def _build_unit_update_payload() -> tuple[
gtsam.NonlinearFactorGraph, gtsam.Values, gtsam_unstable.FixedLagSmootherKeyTimestampMap
]:
key = gtsam.symbol("x", 0)
graph = gtsam.NonlinearFactorGraph()
graph.add(gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1)))
values = gtsam.Values()
values.insert(key, gtsam.Pose3())
timestamps = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
timestamps.insert((key, 0.0))
return graph, values, timestamps
def test_ac6_update_advances_isam2_and_smoother(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
graph, values, timestamps = _build_unit_update_payload()
with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"):
handle.update(graph, values, timestamps)
# iSAM2 should now hold the prior factor; calculateEstimate() ⇒ non-empty Values.
assert estimator._isam2.calculateEstimate().size() == 1
ok_records = [r for r in caplog.records if r.kind == "c5.state.update_ok"]
assert len(ok_records) == 1
def test_ac6_update_with_none_timestamps_substitutes_empty_map() -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
graph, values, _ = _build_unit_update_payload()
handle.update(graph, values, None)
assert estimator._isam2.calculateEstimate().size() == 1
def test_ac6_isam2_failure_raises_fatal(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
failing_isam2 = mock.MagicMock()
failing_isam2.update.side_effect = RuntimeError("synthetic isam2 failure")
estimator._isam2 = failing_isam2
with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"):
with pytest.raises(EstimatorFatalError, match="synthetic isam2 failure"):
handle.update(mock.MagicMock(), mock.MagicMock())
err_records = [r for r in caplog.records if r.kind == "c5.state.isam2_update_failed"]
assert len(err_records) == 1
def test_ac6_smoother_failure_raises_fatal(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
failing_smoother = mock.MagicMock()
failing_smoother.update.side_effect = RuntimeError("synthetic smoother failure")
estimator._smoother = failing_smoother
graph, values, timestamps = _build_unit_update_payload()
with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"):
with pytest.raises(EstimatorFatalError, match="synthetic smoother failure"):
handle.update(graph, values, timestamps)
err_records = [r for r in caplog.records if r.kind == "c5.state.smoother_update_failed"]
assert len(err_records) == 1
# ---------------------------------------------------------------------
# AC-7: compute_marginals real body
def test_ac7_compute_marginals_returns_real_instance() -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
graph, values, timestamps = _build_unit_update_payload()
handle.update(graph, values, timestamps)
marginals = handle.compute_marginals()
assert isinstance(marginals, gtsam.Marginals)
# ---------------------------------------------------------------------
# AC-8: last_anchor_age_ms real body
def test_ac8_last_anchor_age_ms_huge_when_no_anchor() -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
age_ms = handle.last_anchor_age_ms()
# _last_anchor_ns=0 ⇒ age = monotonic_ns()/1e6 (typically 1e9 ms+)
assert age_ms > 1_000_000
def test_ac8_last_anchor_age_ms_small_after_anchor_set() -> None:
import time as _time
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
estimator._last_anchor_ns = _time.monotonic_ns()
age_ms = handle.last_anchor_age_ms()
assert 0 <= age_ms < 100
# ---------------------------------------------------------------------
# AC-9: defensive logging on every mutation (success and failure)
def test_ac9_add_factor_emits_success_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
key = estimator.key_for_frame(uuid4())
factor = gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1))
with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"):
handle.add_factor(factor)
success_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_ok"]
assert len(success_records) == 1
def test_ac9_update_emits_success_log(caplog: pytest.LogCaptureFixture) -> None:
estimator = _build_estimator()
handle = ISam2GraphHandleImpl(estimator)
graph, values, timestamps = _build_unit_update_payload()
with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"):
handle.update(graph, values, timestamps)
success_records = [r for r in caplog.records if r.kind == "c5.state.update_ok"]
assert len(success_records) == 1
# ---------------------------------------------------------------------
# AC-10: StateEstimator Protocol methods still raise NotImplementedError
def test_ac10_add_vio_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_vio(mock.MagicMock())
def test_ac10_add_pose_anchor_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_pose_anchor(mock.MagicMock())
def test_ac10_add_fc_imu_raises_named_az383() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-383"):
estimator.add_fc_imu(mock.MagicMock())
def test_ac10_current_estimate_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.current_estimate()
def test_ac10_smoothed_history_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.smoothed_history(n_keyframes=5)
def test_ac10_health_snapshot_raises_named_az384() -> None:
estimator = _build_estimator()
with pytest.raises(NotImplementedError, match="AZ-384"):
estimator.health_snapshot()
# ---------------------------------------------------------------------
# Factory integration: register() + build_state_estimator returns the tuple
def test_register_makes_strategy_available_to_factory() -> None:
register()
cfg = _build_config(strategy="gtsam_isam2")
estimator, handle = build_state_estimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(estimator, GtsamIsam2StateEstimator)
assert isinstance(handle, ISam2GraphHandle)
assert isinstance(handle, ISam2GraphHandleImpl)
def test_create_returns_handle_bound_to_returned_estimator() -> None:
cfg = _build_config(strategy="gtsam_isam2")
estimator, handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(handle, ISam2GraphHandleImpl)
assert handle._estimator is estimator