Files
gps-denied-onboard/tests/unit/c5_state/test_az388_fallback_watcher.py
Oleksandr Bezdieniezhnykh 823c0f1b2e [AZ-398] Replay: FrameSource + Clock Protocols + Clock injection
Ship the two Layer-1 cross-cutting Protocols replay mode needs to leave
production C1-C5 components mode-agnostic (Invariant 1) and replay-
deterministic (Invariant 2). Live + replay binaries see the same
interfaces; only the strategy differs.

* Clock Protocol (monotonic_ns / time_ns / sleep_until_ns) +
  WallClock (live + REALTIME replay) + TlogDerivedClock (ASAP replay;
  advance-on-call; non-monotonic source → ClockOrderingError).
* FrameSource Protocol (next_frame -> NavCameraFrame | None / close)
  + LiveCameraFrameSource (cv2.VideoCapture device index) +
  VideoFileFrameSource (cv2.VideoCapture file).
* Build-flag gating: BUILD_VIDEO_FILE_FRAME_SOURCE,
  BUILD_LIVE_CAMERA_FRAME_SOURCE (constructor-time check; Tier-0 OFF
  refuses construction with FrameSourceConfigError).
* Composition-root factories: build_clock + build_frame_source.
* Injected Clock across every component that previously called
  time.monotonic_ns() / time.sleep() directly: c5_state (estimator,
  ESKF, fallback watcher, source-label SM, isam2 handle), c8_fc_adapter
  (inbound MAVLink + MSP2, AP outbound, iNav outbound, QGC GCS),
  c13_fdr writer, c12_operator_tooling httpx flights client. All
  constructors default to WallClock() so existing call sites keep
  live-binary behaviour without a wiring change.
* AC-4 CI guard (tests/_meta/test_no_direct_time_in_components.py)
  AST-scans components/**/*.py for direct time.monotonic_ns /
  time.time_ns / time.sleep references and fails loudly with file:line.
* Conformance + factory tests: tests/unit/clock + tests/unit/frame_source.
* Test fixture updates: FallbackWatcher / SourceLabelStateMachine
  clock_ns is now required (removed time.monotonic_ns default);
  test_az388 patches estimator._clock instead of a module-level time;
  test_az393 ardupilot adapter uses a _FixedClock test double.

Excluded per the task spec: TlogReplayFcAdapter (AZ-399), ReplaySink
(AZ-400), compose_replay (AZ-401), CLI (AZ-402), Docker/CI (AZ-403),
E2E fixture (AZ-404), IMU auto-sync (AZ-405).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 05:10:01 +03:00

392 lines
13 KiB
Python

"""AZ-388 — AC-5.2 fallback watcher + GtsamIsam2StateEstimator hookup.
Eight ACs from ``_docs/02_tasks/done/AZ-388_c5_ac52_fallback.md``:
- AC-1 Engagement after ``threshold_s`` of no successful estimate.
- AC-2 Engagement is one-shot (rate-limited across the episode).
- AC-3 Recovery signal fires once after a successful estimate.
- AC-4 ``check_fallback_state`` watchdog engages from an external
caller even without ``current_estimate`` being invoked.
- AC-5 Engagement callback carries :data:`Severity.CRITICAL`;
recovery callback carries :data:`Severity.NOTICE`.
- AC-6 Configurable threshold (``no_estimate_fallback_s = 5.0``
engages at 5 s, not 3 s).
- AC-7 iSAM2 estimator participates — entry hook engages,
success hook recovers.
- AC-8 FDR record shapes — engagement carries
``{reason, elapsed_s, threshold_s}``; recovery carries
``{recovered_after_s}``.
The ``EskfStateEstimator`` half of AC-7 will be exercised once
AZ-386 lands; the watcher is shared between both estimators so the
AZ-386 wire-up cost is one constructor line + two hook calls.
"""
from __future__ import annotations
from unittest import mock
import gtsam
import pytest
from gps_denied_onboard._types.fc import Severity
from gps_denied_onboard.components.c5_state._fallback_watcher import FallbackWatcher
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create,
)
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
@pytest.fixture(autouse=True)
def _registry_isolation():
# Arrange
clear_state_registry()
yield
clear_state_registry()
class _Clock:
"""Synthetic ``monotonic_ns()`` source for deterministic timelines."""
def __init__(self, t_ns: int = 0) -> None:
self.t_ns = t_ns
def __call__(self) -> int:
return self.t_ns
def _make_watcher(
*, threshold_s: float = 3.0, fdr_client: mock.MagicMock | None = None
) -> tuple[FallbackWatcher, _Clock, mock.MagicMock]:
clock = _Clock(0)
fdr = fdr_client if fdr_client is not None else mock.MagicMock()
watcher = FallbackWatcher(
threshold_s=threshold_s,
fdr_client=fdr,
producer_id="c5_state",
clock_ns=clock,
)
return watcher, clock, fdr
# ---------------------------------------------------------------------
# AC-1: engagement after threshold elapses
def test_ac1_engagement_after_threshold_elapses() -> None:
watcher, clock, _fdr = _make_watcher(threshold_s=3.0)
engaged_seen: list[tuple[float, Severity]] = []
watcher.subscribe_engaged(lambda elapsed, sev: engaged_seen.append((elapsed, sev)))
clock.t_ns = int(3.5 * 1e9)
in_fb = watcher.check_and_engage(clock.t_ns)
assert in_fb is True
assert len(engaged_seen) == 1
elapsed_s, sev = engaged_seen[0]
assert elapsed_s == pytest.approx(3.5, abs=1e-3)
assert sev == Severity.CRITICAL
def test_ac1_engagement_does_not_fire_before_threshold() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
engaged_seen: list[tuple[float, Severity]] = []
watcher.subscribe_engaged(lambda elapsed, sev: engaged_seen.append((elapsed, sev)))
clock.t_ns = int(2.99 * 1e9)
in_fb = watcher.check_and_engage(clock.t_ns)
assert in_fb is False
assert engaged_seen == []
# ---------------------------------------------------------------------
# AC-2: engagement is one-shot (rate-limited)
def test_ac2_sustained_no_estimate_emits_one_engagement() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
engaged_seen: list[float] = []
watcher.subscribe_engaged(lambda elapsed, _sev: engaged_seen.append(elapsed))
for seconds in (3.5, 10.0, 20.0, 30.0):
clock.t_ns = int(seconds * 1e9)
watcher.check_and_engage(clock.t_ns)
assert len(engaged_seen) == 1
# ---------------------------------------------------------------------
# AC-3: recovery signal after engagement
def test_ac3_recovery_after_engagement_fires_once() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
recovered_seen: list[tuple[float, Severity]] = []
watcher.subscribe_recovered(lambda elapsed, sev: recovered_seen.append((elapsed, sev)))
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
clock.t_ns = int(7.5 * 1e9)
watcher.mark_successful_estimate(clock.t_ns)
assert len(recovered_seen) == 1
elapsed_s, sev = recovered_seen[0]
assert elapsed_s == pytest.approx(4.0, abs=1e-3)
assert sev == Severity.NOTICE
def test_ac3_recovery_does_not_fire_without_engagement() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
recovered_seen: list[float] = []
watcher.subscribe_recovered(lambda elapsed, _sev: recovered_seen.append(elapsed))
clock.t_ns = int(1.0 * 1e9)
watcher.mark_successful_estimate(clock.t_ns)
assert recovered_seen == []
# ---------------------------------------------------------------------
# AC-4: external watchdog engages without current_estimate calls
def test_ac4_watchdog_engages_without_mark_calls() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
clock.t_ns = int(3.5 * 1e9)
in_fb = watcher.check_and_engage(clock.t_ns)
assert in_fb is True
assert watcher.in_fallback is True
# ---------------------------------------------------------------------
# AC-5: severity hints carried in callbacks
def test_ac5_engagement_severity_is_critical() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
captured: list[Severity] = []
watcher.subscribe_engaged(lambda _e, sev: captured.append(sev))
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
assert captured == [Severity.CRITICAL]
def test_ac5_recovery_severity_is_notice() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
captured: list[Severity] = []
watcher.subscribe_recovered(lambda _e, sev: captured.append(sev))
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
clock.t_ns = int(7.0 * 1e9)
watcher.mark_successful_estimate(clock.t_ns)
assert captured == [Severity.NOTICE]
# ---------------------------------------------------------------------
# AC-6: configurable threshold
def test_ac6_custom_threshold_5s_engages_at_5s() -> None:
watcher, clock, _ = _make_watcher(threshold_s=5.0)
engaged_seen: list[float] = []
watcher.subscribe_engaged(lambda elapsed, _sev: engaged_seen.append(elapsed))
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
assert engaged_seen == []
clock.t_ns = int(5.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
assert len(engaged_seen) == 1
assert engaged_seen[0] == pytest.approx(5.5, abs=1e-3)
def test_ac6_zero_threshold_rejected() -> None:
with pytest.raises(ValueError, match="threshold_s must be > 0"):
FallbackWatcher(threshold_s=0.0, fdr_client=None, clock_ns=lambda: 0)
# ---------------------------------------------------------------------
# AC-8: FDR record payload shapes
def test_ac8_engagement_fdr_record_shape() -> None:
fdr = mock.MagicMock()
watcher, clock, _ = _make_watcher(threshold_s=3.0, fdr_client=fdr)
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
fdr.enqueue.assert_called_once()
record = fdr.enqueue.call_args.args[0]
assert record.kind == "c5.state.no_estimate_fallback_engaged"
assert record.producer_id == "c5_state"
assert record.payload["reason"] == "no_successful_estimate_for_s"
assert record.payload["elapsed_s"] == pytest.approx(3.5, abs=1e-3)
assert record.payload["threshold_s"] == pytest.approx(3.0, abs=1e-3)
def test_ac8_recovery_fdr_record_shape() -> None:
fdr = mock.MagicMock()
watcher, clock, _ = _make_watcher(threshold_s=3.0, fdr_client=fdr)
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
clock.t_ns = int(7.5 * 1e9)
watcher.mark_successful_estimate(clock.t_ns)
assert fdr.enqueue.call_count == 2
recovery_record = fdr.enqueue.call_args.args[0]
assert recovery_record.kind == "c5.state.no_estimate_fallback_recovered"
assert recovery_record.payload == {"recovered_after_s": pytest.approx(4.0, abs=1e-3)}
# ---------------------------------------------------------------------
# Subscription cancellation
def test_subscription_cancel_silences_callback() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
seen: list[float] = []
handle = watcher.subscribe_engaged(lambda elapsed, _sev: seen.append(elapsed))
handle.cancel()
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
assert seen == []
def test_callback_exception_does_not_break_watcher() -> None:
watcher, clock, _ = _make_watcher(threshold_s=3.0)
good_seen: list[float] = []
def boom(elapsed: float, _sev: Severity) -> None:
raise RuntimeError("synthetic")
watcher.subscribe_engaged(boom)
watcher.subscribe_engaged(lambda elapsed, _sev: good_seen.append(elapsed))
clock.t_ns = int(3.5 * 1e9)
watcher.check_and_engage(clock.t_ns)
assert len(good_seen) == 1
# ---------------------------------------------------------------------
# Idempotence: no FDR records when fdr_client is None
def test_watcher_without_fdr_client_does_not_crash() -> None:
watcher = FallbackWatcher(threshold_s=3.0, fdr_client=None, clock_ns=_Clock(0))
seen: list[float] = []
watcher.subscribe_engaged(lambda elapsed, _sev: seen.append(elapsed))
watcher.check_and_engage(int(3.5 * 1e9))
assert seen == [pytest.approx(3.5, abs=1e-3)]
# =====================================================================
# AC-7 — iSAM2 estimator participates
def _build_estimator() -> GtsamIsam2StateEstimator:
block = C5StateConfig(
strategy="gtsam_isam2", keyframe_window_size=15, no_estimate_fallback_s=3.0
)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
fdr = mock.MagicMock()
estimator, _ = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=fdr,
)
return estimator
def _seed_prior(estimator: GtsamIsam2StateEstimator) -> int:
import gtsam_unstable
pose = gtsam.Pose3()
key = gtsam.symbol("x", estimator._next_key_counter)
estimator._next_key_counter += 1
noise = gtsam.noiseModel.Isotropic.Sigma(6, 0.1)
graph = gtsam.NonlinearFactorGraph()
graph.add(gtsam.PriorFactorPose3(key, pose, noise))
values = gtsam.Values()
values.insert(key, pose)
ts_map = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
ts_map.insert((key, 0.0))
estimator._isam2_handle.update(graph, values, timestamps=ts_map)
estimator._record_committed_pose_key(key)
return key
def test_ac7_isam2_check_fallback_state_engages_via_public_api() -> None:
estimator = _build_estimator()
engaged_seen: list[tuple[float, Severity]] = []
estimator.subscribe_fallback_engaged(lambda elapsed, sev: engaged_seen.append((elapsed, sev)))
# Synthesise a 4 s-old "last successful estimate" by reaching
# into the watcher state — equivalent to a real timeline where
# no successful estimate occurred for 4 s.
estimator._fallback._last_successful_estimate_ns = 0
in_fb = estimator.check_fallback_state(int(4.0 * 1e9))
assert in_fb is True
assert len(engaged_seen) == 1
def test_ac7_isam2_successful_current_estimate_clears_fallback() -> None:
estimator = _build_estimator()
recovered_seen: list[float] = []
estimator.subscribe_fallback_recovered(lambda elapsed, _sev: recovered_seen.append(elapsed))
_seed_prior(estimator)
# Engage first via the synthesised timeline.
estimator._fallback._last_successful_estimate_ns = 0
estimator.check_fallback_state(int(4.0 * 1e9))
assert estimator._fallback.in_fallback is True
# Now a successful current_estimate should fire the recovery.
estimator.current_estimate()
assert estimator._fallback.in_fallback is False
assert len(recovered_seen) == 1
def test_ac7_isam2_current_estimate_entry_engages_after_threshold() -> None:
estimator = _build_estimator()
engaged_seen: list[float] = []
estimator.subscribe_fallback_engaged(lambda elapsed, _sev: engaged_seen.append(elapsed))
# Synthesise a stale watcher (no successful estimate for > threshold)
# and call current_estimate WITHOUT a seeded prior so it raises
# EstimatorFatalError after the entry hook engages fallback.
estimator._fallback._last_successful_estimate_ns = 0
# Patch the estimator's injected Clock so the entry hook sees the
# synthesised "now" (AZ-398: components consume an injected
# :class:`Clock`, not :func:`time.monotonic_ns`).
from gps_denied_onboard.components.c5_state.errors import EstimatorFatalError
estimator._clock = mock.MagicMock()
estimator._clock.monotonic_ns.return_value = int(4.0 * 1e9)
with pytest.raises(EstimatorFatalError):
estimator.current_estimate()
assert len(engaged_seen) == 1