"""AZ-382 — GtsamIsam2StateEstimator + ISam2GraphHandleImpl real-body tests. Ten ACs from ``_docs/02_tasks/done/AZ-382_c5_isam2_smoother_wiring.md`` (authored todo, archived here once the task transitions to Done): - AC-1 Construction succeeds; ``_isam2`` / ``_smoother`` / ``_graph`` / ``_values`` / ``_key_for_frame`` initialised. - AC-2 Key-management policy assigns unique GTSAM keys via ``gtsam.symbol('x', counter)``; repeat lookups return the same key. - AC-3 ``IncrementalFixedLagSmoother`` instantiated with ``K * frame_period_s``. - AC-4 ``keyframe_window_size = 5`` rejected at config-load (delegates to the AZ-381 ``C5StateConfig.__post_init__``). - AC-5 ``ISam2GraphHandleImpl.add_factor`` real body — appends to ``estimator._graph``; failure raises :class:`EstimatorDegradedError` AND logs the failure record. - AC-6 ``ISam2GraphHandleImpl.update`` real body — iSAM2 + smoother advance; failure raises :class:`EstimatorFatalError`. - AC-7 ``ISam2GraphHandleImpl.compute_marginals`` returns a real ``gtsam.Marginals`` instance. - AC-8 ``ISam2GraphHandleImpl.last_anchor_age_ms`` returns a very large number until an anchor lands. - AC-9 Defensive logging emitted on every mutation (success + failure). - AC-10 ``StateEstimator`` Protocol methods on the estimator still raise ``NotImplementedError`` naming AZ-383 / AZ-384. """ from __future__ import annotations import logging from unittest import mock from uuid import uuid4 import gtsam import gtsam_unstable import pytest from gps_denied_onboard.components.c5_state._isam2_handle import ( ISam2GraphHandle, ISam2GraphHandleImpl, ) from gps_denied_onboard.components.c5_state.config import C5StateConfig from gps_denied_onboard.components.c5_state.errors import ( EstimatorDegradedError, EstimatorFatalError, ) from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import ( _FRAME_PERIOD_S, GtsamIsam2StateEstimator, create, register, ) from gps_denied_onboard.components.c5_state.interface import StateEstimator from gps_denied_onboard.config.schema import ConfigError from gps_denied_onboard.runtime_root.state_factory import ( build_state_estimator, clear_state_registry, ) @pytest.fixture(autouse=True) def _registry_isolation(): # Arrange clear_state_registry() yield clear_state_registry() def _build_config(*, strategy: str = "gtsam_isam2", keyframe_window_size: int = 15): block = C5StateConfig(strategy=strategy, keyframe_window_size=keyframe_window_size) cfg = mock.MagicMock() cfg.components = {"c5_state": block} return cfg def _build_estimator(*, keyframe_window_size: int = 15) -> GtsamIsam2StateEstimator: cfg = _build_config(keyframe_window_size=keyframe_window_size) return GtsamIsam2StateEstimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) # --------------------------------------------------------------------- # AC-1: construction def test_ac1_construction_initialises_substrate() -> None: estimator = _build_estimator(keyframe_window_size=15) assert isinstance(estimator._isam2, gtsam.ISAM2) assert isinstance(estimator._smoother, gtsam_unstable.IncrementalFixedLagSmoother) assert isinstance(estimator._graph, gtsam.NonlinearFactorGraph) assert isinstance(estimator._values, gtsam.Values) assert estimator._key_for_frame == {} assert estimator._next_key_counter == 0 assert estimator._last_anchor_ns == 0 assert estimator._graph.size() == 0 assert isinstance(estimator, StateEstimator) def test_ac1_construction_emits_debug_log(caplog: pytest.LogCaptureFixture) -> None: # ``get_logger`` resets the named logger to NOTSET, which masks # ``caplog.at_level(DEBUG, logger=...)``. Bump the root level # instead so the DEBUG record propagates to caplog's handler. with caplog.at_level(logging.DEBUG): _build_estimator(keyframe_window_size=12) records = [r for r in caplog.records if r.kind == "c5.state.isam2_initialised"] assert len(records) == 1 assert records[0].kv["keyframe_window_size"] == 12 assert records[0].kv["total_factors_initial"] == 0 # --------------------------------------------------------------------- # AC-2: key-management policy def test_ac2_key_for_frame_assigns_unique_keys() -> None: estimator = _build_estimator() u1, u2, u3 = uuid4(), uuid4(), uuid4() k1 = estimator.key_for_frame(u1) k2 = estimator.key_for_frame(u2) k3 = estimator.key_for_frame(u3) assert k1 != k2 != k3 assert estimator._next_key_counter == 3 assert estimator._key_for_frame == {u1: k1, u2: k2, u3: k3} def test_ac2_key_for_frame_is_idempotent() -> None: estimator = _build_estimator() u1 = uuid4() first = estimator.key_for_frame(u1) second = estimator.key_for_frame(u1) assert first == second assert estimator._next_key_counter == 1 def test_ac2_keys_use_x_namespace() -> None: estimator = _build_estimator() u1 = uuid4() key = estimator.key_for_frame(u1) expected_first_x_key = gtsam.symbol("x", 0) assert key == expected_first_x_key # --------------------------------------------------------------------- # AC-3: window size respected def test_ac3_smoother_window_is_k_times_frame_period() -> None: # Arrange / Act estimator = _build_estimator(keyframe_window_size=15) expected_window_seconds = 15 * _FRAME_PERIOD_S # Hit the smoother through a controlled smoke path to be sure the # window is honored — direct introspection is C++-private. We use # the smoother's ``smootherLag()`` getter (FixedLagSmoother base). assert estimator._smoother.smootherLag() == pytest.approx(expected_window_seconds, rel=1e-6) # --------------------------------------------------------------------- # AC-4: window size validation def test_ac4_window_below_min_rejected_by_config() -> None: with pytest.raises(ConfigError, match=r"\[10, 20\]"): C5StateConfig(keyframe_window_size=5) def test_ac4_window_above_max_rejected_by_config() -> None: with pytest.raises(ConfigError, match=r"\[10, 20\]"): C5StateConfig(keyframe_window_size=21) # --------------------------------------------------------------------- # AC-5: ISam2GraphHandleImpl.add_factor real body def test_ac5_add_factor_appends_to_graph(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) key = estimator.key_for_frame(uuid4()) factor = gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1)) with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"): handle.add_factor(factor) assert estimator._graph.size() == 1 ok_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_ok"] assert len(ok_records) == 1 assert ok_records[0].kv["factor_type"] == "PriorFactorPose3" assert ok_records[0].kv["graph_size"] == 1 def test_ac5_add_factor_failure_raises_degraded_and_logs( caplog: pytest.LogCaptureFixture, ) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) # Replace the graph with a stub whose ``add`` raises so we can test # the failure path without depending on a specific GTSAM exception. failing_graph = mock.MagicMock() failing_graph.add.side_effect = RuntimeError("synthetic graph failure") estimator._graph = failing_graph with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"): with pytest.raises(EstimatorDegradedError, match="synthetic graph failure"): handle.add_factor(mock.MagicMock()) err_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_failed"] assert len(err_records) == 1 assert "synthetic graph failure" in err_records[0].kv["error"] # --------------------------------------------------------------------- # AC-6: ISam2GraphHandleImpl.update real body def _build_unit_update_payload() -> tuple[ gtsam.NonlinearFactorGraph, gtsam.Values, gtsam_unstable.FixedLagSmootherKeyTimestampMap ]: key = gtsam.symbol("x", 0) graph = gtsam.NonlinearFactorGraph() graph.add(gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1))) values = gtsam.Values() values.insert(key, gtsam.Pose3()) timestamps = gtsam_unstable.FixedLagSmootherKeyTimestampMap() timestamps.insert((key, 0.0)) return graph, values, timestamps def test_ac6_update_advances_isam2_and_smoother(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) graph, values, timestamps = _build_unit_update_payload() with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"): handle.update(graph, values, timestamps) # iSAM2 should now hold the prior factor; calculateEstimate() ⇒ non-empty Values. assert estimator._isam2.calculateEstimate().size() == 1 ok_records = [r for r in caplog.records if r.kind == "c5.state.update_ok"] assert len(ok_records) == 1 def test_ac6_update_with_none_timestamps_substitutes_empty_map() -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) graph, values, _ = _build_unit_update_payload() handle.update(graph, values, None) assert estimator._isam2.calculateEstimate().size() == 1 def test_ac6_isam2_failure_raises_fatal(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) failing_isam2 = mock.MagicMock() failing_isam2.update.side_effect = RuntimeError("synthetic isam2 failure") estimator._isam2 = failing_isam2 with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"): with pytest.raises(EstimatorFatalError, match="synthetic isam2 failure"): handle.update(mock.MagicMock(), mock.MagicMock()) err_records = [r for r in caplog.records if r.kind == "c5.state.isam2_update_failed"] assert len(err_records) == 1 def test_ac6_smoother_failure_raises_fatal(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) failing_smoother = mock.MagicMock() failing_smoother.update.side_effect = RuntimeError("synthetic smoother failure") estimator._smoother = failing_smoother graph, values, timestamps = _build_unit_update_payload() with caplog.at_level(logging.ERROR, logger="c5_state.isam2_handle"): with pytest.raises(EstimatorFatalError, match="synthetic smoother failure"): handle.update(graph, values, timestamps) err_records = [r for r in caplog.records if r.kind == "c5.state.smoother_update_failed"] assert len(err_records) == 1 # --------------------------------------------------------------------- # AC-7: compute_marginals real body def test_ac7_compute_marginals_returns_real_instance() -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) graph, values, timestamps = _build_unit_update_payload() handle.update(graph, values, timestamps) marginals = handle.compute_marginals() assert isinstance(marginals, gtsam.Marginals) # --------------------------------------------------------------------- # AC-8: last_anchor_age_ms real body def test_ac8_last_anchor_age_ms_huge_when_no_anchor() -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) age_ms = handle.last_anchor_age_ms() # _last_anchor_ns=0 ⇒ age = monotonic_ns()/1e6 (typically 1e9 ms+) assert age_ms > 1_000_000 def test_ac8_last_anchor_age_ms_small_after_anchor_set() -> None: import time as _time estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) estimator._last_anchor_ns = _time.monotonic_ns() age_ms = handle.last_anchor_age_ms() assert 0 <= age_ms < 100 # --------------------------------------------------------------------- # AC-9: defensive logging on every mutation (success and failure) def test_ac9_add_factor_emits_success_log(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) key = estimator.key_for_frame(uuid4()) factor = gtsam.PriorFactorPose3(key, gtsam.Pose3(), gtsam.noiseModel.Isotropic.Sigma(6, 0.1)) with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"): handle.add_factor(factor) success_records = [r for r in caplog.records if r.kind == "c5.state.add_factor_ok"] assert len(success_records) == 1 def test_ac9_update_emits_success_log(caplog: pytest.LogCaptureFixture) -> None: estimator = _build_estimator() handle = ISam2GraphHandleImpl(estimator) graph, values, timestamps = _build_unit_update_payload() with caplog.at_level(logging.DEBUG, logger="c5_state.isam2_handle"): handle.update(graph, values, timestamps) success_records = [r for r in caplog.records if r.kind == "c5.state.update_ok"] assert len(success_records) == 1 # --------------------------------------------------------------------- # AC-10 NOTE: All six ``StateEstimator`` Protocol methods originally # raised ``NotImplementedError`` in AZ-382's scope. They have all # landed: # # - ``add_vio`` / ``add_pose_anchor`` / ``add_fc_imu`` → tested in # ``tests/unit/c5_state/test_az383_factor_adds.py``. # - ``current_estimate`` / ``smoothed_history`` / ``health_snapshot`` # → tested in ``tests/unit/c5_state/test_az384_marginals_outputs.py``. # --------------------------------------------------------------------- # Factory integration: register() + build_state_estimator returns the tuple def test_register_makes_strategy_available_to_factory() -> None: register() cfg = _build_config(strategy="gtsam_isam2") estimator, handle = build_state_estimator( cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) assert isinstance(estimator, GtsamIsam2StateEstimator) assert isinstance(handle, ISam2GraphHandle) assert isinstance(handle, ISam2GraphHandleImpl) def test_create_returns_handle_bound_to_returned_estimator() -> None: cfg = _build_config(strategy="gtsam_isam2") estimator, handle = create( config=cfg, imu_preintegrator=mock.MagicMock(), se3_utils=mock.MagicMock(), wgs_converter=mock.MagicMock(), fdr_client=mock.MagicMock(), ) assert isinstance(handle, ISam2GraphHandleImpl) assert handle._estimator is estimator