mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 20:11:14 +00:00
[AZ-385] C5 SourceLabelStateMachine + spoof-promotion gate
Implements Invariants 5 + 8 + AC-NEW-2 / AC-NEW-8: the EstimatorOutput.source_label now reflects a real state machine (DEAD_RECKONED → SATELLITE_ANCHORED ↔ VISUAL_PROPAGATED) governed by a spoof-promotion gate that latches closed on FC SPOOFED GPS health and re-opens only when BOTH conditions hold — ≥10 s STABLE_NON_SPOOFED AND next anchor within spoof_promotion_visual_consistency_tol_m. Every reject emits a c5.state.spoof_rejected FDR record plus a subscriber-fan-out STATUSTEXT (severity WARNING, 50-char cap per MAVLink). FDR and subscriber paths bypass the standard logger so silencing logs cannot suppress the spoof trail (R07 / AC-6). GtsamIsam2StateEstimator now eagerly builds the SM from C5StateConfig in __init__; new public methods notify_gps_health() (delegates to SM, called by composition root from C8 inbound) and subscribe_spoof_rejection() (composition root attaches C8's QgcTelemetryAdapter here). health_snapshot.spoof_promotion_blocked + current_estimate.source_label now flow from the live SM. 25 new unit tests across all 12 ACs plus cancellation, subscriber exception isolation, and estimator wire-up integration cases. One AZ-384 test renamed + updated to expect DEAD_RECKONED before any anchor (was VISUAL_PROPAGATED placeholder pre-AZ-385). Full suite: 632 passed, 2 skipped. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -412,13 +412,16 @@ def test_ac9_state_machine_drives_source_label() -> None:
|
||||
assert out.source_label == PoseSourceLabel.SATELLITE_ANCHORED
|
||||
|
||||
|
||||
def test_ac9_default_source_label_is_visual_propagated() -> None:
|
||||
def test_ac9_default_source_label_is_dead_reckoned_before_any_anchor() -> None:
|
||||
# AZ-385 superseded the AZ-384 default: the auto-constructed
|
||||
# SourceLabelStateMachine returns DEAD_RECKONED until the first
|
||||
# satellite anchor is observed (AC-1 of AZ-385 + Invariant 5).
|
||||
estimator = _build_estimator()
|
||||
_seed_prior(estimator)
|
||||
|
||||
out = estimator.current_estimate()
|
||||
|
||||
assert out.source_label == PoseSourceLabel.VISUAL_PROPAGATED
|
||||
assert out.source_label == PoseSourceLabel.DEAD_RECKONED
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
@@ -0,0 +1,520 @@
|
||||
"""AZ-385 — SourceLabelStateMachine + spoof-promotion gate.
|
||||
|
||||
Twelve ACs from ``_docs/02_tasks/todo/AZ-385_c5_source_label_spoof_gate.md``:
|
||||
|
||||
- AC-1 Initial label is ``DEAD_RECKONED``.
|
||||
- AC-2 First successful anchor → ``SATELLITE_ANCHORED``.
|
||||
- AC-3 Stale anchor → ``VISUAL_PROPAGATED`` (anchor age > 1 s).
|
||||
- AC-4 Spoof detection latches the gate + every reject emits FDR +
|
||||
STATUSTEXT subscriber callback.
|
||||
- AC-5 Recovery requires BOTH ≥10 s ``STABLE_NON_SPOOFED`` AND a
|
||||
next anchor within ``tol_m``.
|
||||
- AC-6 Logging cannot be silenced — mocking the logger to drop
|
||||
everything still fires FDR + subscriber paths.
|
||||
- AC-7 ``is_spoof_promotion_blocked()`` mirrors the gate latch.
|
||||
- AC-8 ``health_snapshot.spoof_promotion_blocked`` wired through.
|
||||
- AC-9 Configurable thresholds (``min_stable_s = 30`` shifts the
|
||||
dwell test).
|
||||
- AC-10 Every label change emits one ``c5.state.source_label_changed``
|
||||
INFO log.
|
||||
- AC-11 Reject FDR record has the documented payload shape.
|
||||
- AC-12 STATUSTEXT severity is ``WARNING`` + message fits 50 chars.
|
||||
|
||||
The 13th block covers the wire-up into ``GtsamIsam2StateEstimator``:
|
||||
the estimator constructs the SM eagerly, ``notify_gps_health`` is
|
||||
forwarded, ``subscribe_spoof_rejection`` returns a cancellable
|
||||
handle, and ``health_snapshot.spoof_promotion_blocked`` reflects the
|
||||
gate state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest import mock
|
||||
|
||||
import gtsam
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus, Severity
|
||||
from gps_denied_onboard._types.state import PoseSourceLabel
|
||||
from gps_denied_onboard.components.c5_state._source_label_sm import (
|
||||
SourceLabelStateMachine,
|
||||
)
|
||||
from gps_denied_onboard.components.c5_state.config import C5StateConfig
|
||||
from gps_denied_onboard.components.c5_state.errors import StateEstimatorConfigError
|
||||
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
|
||||
GtsamIsam2StateEstimator,
|
||||
create,
|
||||
)
|
||||
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _registry_isolation():
|
||||
# Arrange
|
||||
clear_state_registry()
|
||||
yield
|
||||
clear_state_registry()
|
||||
|
||||
|
||||
class _Clock:
|
||||
"""Synthetic ``monotonic_ns()`` source for deterministic timelines."""
|
||||
|
||||
def __init__(self, t_ns: int = 0) -> None:
|
||||
self.t_ns = t_ns
|
||||
|
||||
def __call__(self) -> int:
|
||||
return self.t_ns
|
||||
|
||||
|
||||
def _make_sm(
|
||||
*,
|
||||
min_stable_s: float = 10.0,
|
||||
tol_m: float = 30.0,
|
||||
clock: _Clock | None = None,
|
||||
fdr_client: mock.MagicMock | None = None,
|
||||
) -> tuple[SourceLabelStateMachine, _Clock, mock.MagicMock]:
|
||||
if clock is None:
|
||||
clock = _Clock(0)
|
||||
fdr = fdr_client if fdr_client is not None else mock.MagicMock()
|
||||
sm = SourceLabelStateMachine(
|
||||
spoof_promotion_min_stable_s=min_stable_s,
|
||||
spoof_promotion_visual_consistency_tol_m=tol_m,
|
||||
fdr_client=fdr,
|
||||
producer_id="c5_state",
|
||||
clock_ns=clock,
|
||||
)
|
||||
return sm, clock, fdr
|
||||
|
||||
|
||||
def _gps(
|
||||
status: GpsStatus,
|
||||
fix_age_ms: int = 100,
|
||||
captured_at: int = 0,
|
||||
) -> GpsHealth:
|
||||
return GpsHealth(status=status, fix_age_ms=fix_age_ms, captured_at=captured_at)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-1: initial label is DEAD_RECKONED
|
||||
|
||||
|
||||
def test_ac1_initial_label_is_dead_reckoned() -> None:
|
||||
sm, _clock, _fdr = _make_sm()
|
||||
|
||||
assert sm.current_label() == PoseSourceLabel.DEAD_RECKONED
|
||||
|
||||
|
||||
def test_ac1_initial_label_remains_dead_reckoned_after_gps_only() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
|
||||
assert sm.current_label() == PoseSourceLabel.DEAD_RECKONED
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-2: first successful anchor → SATELLITE_ANCHORED
|
||||
|
||||
|
||||
def test_ac2_first_anchor_promotes_to_satellite_anchored() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
assert sm.current_label() == PoseSourceLabel.SATELLITE_ANCHORED
|
||||
|
||||
|
||||
def test_ac2_first_anchor_when_blocked_emits_reject() -> None:
|
||||
sm, clock, fdr = _make_sm()
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
# The block latched on the SPOOFED notify → the anchor that
|
||||
# comes in next is a promotion attempt against a closed gate →
|
||||
# one reject + label stays VISUAL_PROPAGATED.
|
||||
assert sm.current_label() == PoseSourceLabel.VISUAL_PROPAGATED
|
||||
fdr.enqueue.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-3: stale anchor → VISUAL_PROPAGATED
|
||||
|
||||
|
||||
def test_ac3_stale_anchor_falls_back_to_visual_propagated() -> None:
|
||||
# Default anchor staleness threshold is 1 s; here we anchor
|
||||
# at t=1s and check at t=2.5s — the anchor is 1.5s old → VP.
|
||||
sm, clock, _fdr = _make_sm()
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
assert sm.current_label() == PoseSourceLabel.SATELLITE_ANCHORED
|
||||
|
||||
clock.t_ns = int(2.5 * 1e9)
|
||||
assert sm.current_label() == PoseSourceLabel.VISUAL_PROPAGATED
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-4: spoof detection → block promotion + reject emits FDR + STATUSTEXT
|
||||
|
||||
|
||||
def test_ac4_spoofed_status_latches_gate_closed() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
|
||||
def test_ac4_reject_fires_subscriber_callback() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
seen: list[tuple[str, Severity, str]] = []
|
||||
sm.subscribe_rejection(lambda reason, sev, text: seen.append((reason, sev, text)))
|
||||
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
assert len(seen) == 1
|
||||
reason, sev, text = seen[0]
|
||||
assert reason == "gps_spoofed"
|
||||
assert sev == Severity.WARNING
|
||||
assert text.startswith("GPS spoof rejected: ")
|
||||
|
||||
|
||||
def test_ac4_reject_fires_fdr_record() -> None:
|
||||
sm, clock, fdr = _make_sm()
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
fdr.enqueue.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-5: recovery requires BOTH ≥10s stable + visual consistency
|
||||
|
||||
|
||||
def test_ac5_only_stable_dwell_does_not_lift_block() -> None:
|
||||
sm, clock, _fdr = _make_sm(min_stable_s=10.0, tol_m=30.0)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
|
||||
clock.t_ns = int(1.0 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
# 11 s after stable started, but consistency_delta = None.
|
||||
clock.t_ns = int(12.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
|
||||
def test_ac5_only_consistency_does_not_lift_block() -> None:
|
||||
sm, clock, _fdr = _make_sm(min_stable_s=10.0, tol_m=30.0)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
|
||||
clock.t_ns = int(1.0 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
# Only 3 s of stable — dwell insufficient — even though
|
||||
# consistency_delta = 5 m (well within tol).
|
||||
clock.t_ns = int(4.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=5.0)
|
||||
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
|
||||
def test_ac5_both_conditions_lift_block() -> None:
|
||||
sm, clock, _fdr = _make_sm(min_stable_s=10.0, tol_m=30.0)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
|
||||
clock.t_ns = int(1.0 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
clock.t_ns = int(12.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=5.0)
|
||||
|
||||
assert sm.is_spoof_promotion_blocked() is False
|
||||
assert sm.current_label() == PoseSourceLabel.SATELLITE_ANCHORED
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-6: logging cannot be silenced
|
||||
|
||||
|
||||
def test_ac6_silenced_logger_does_not_suppress_fdr_or_subscriber() -> None:
|
||||
sm, clock, fdr = _make_sm()
|
||||
seen: list[str] = []
|
||||
sm.subscribe_rejection(lambda reason, _sev, _text: seen.append(reason))
|
||||
|
||||
# Mock the SM's logger so any call to it is a no-op — does NOT
|
||||
# affect the FDR enqueue or the subscriber dispatch path.
|
||||
with mock.patch.object(sm, "_log", mock.MagicMock()):
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
fdr.enqueue.assert_called_once()
|
||||
assert seen == ["gps_spoofed"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-7: is_spoof_promotion_blocked()
|
||||
|
||||
|
||||
def test_ac7_block_query_reflects_latch() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
assert sm.is_spoof_promotion_blocked() is False
|
||||
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-8: health_snapshot.spoof_promotion_blocked wired through
|
||||
|
||||
|
||||
def _build_estimator() -> GtsamIsam2StateEstimator:
|
||||
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15)
|
||||
cfg = mock.MagicMock()
|
||||
cfg.components = {"c5_state": block}
|
||||
estimator, _ = create(
|
||||
config=cfg,
|
||||
imu_preintegrator=mock.MagicMock(),
|
||||
se3_utils=mock.MagicMock(),
|
||||
wgs_converter=mock.MagicMock(),
|
||||
fdr_client=mock.MagicMock(),
|
||||
)
|
||||
return estimator
|
||||
|
||||
|
||||
def test_ac8_health_snapshot_reflects_spoof_block() -> None:
|
||||
estimator = _build_estimator()
|
||||
health_before = estimator.health_snapshot()
|
||||
assert health_before.spoof_promotion_blocked is False
|
||||
|
||||
estimator.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
health_after = estimator.health_snapshot()
|
||||
assert health_after.spoof_promotion_blocked is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-9: configurable thresholds
|
||||
|
||||
|
||||
def test_ac9_min_stable_30s_shifts_dwell_window() -> None:
|
||||
sm, clock, _fdr = _make_sm(min_stable_s=30.0, tol_m=30.0)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1.0 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
|
||||
# 15 s — would unblock at default 10 s, but min_stable_s = 30.
|
||||
clock.t_ns = int(16.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=5.0)
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
# 32 s — now dwell satisfied.
|
||||
clock.t_ns = int(33.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=5.0)
|
||||
assert sm.is_spoof_promotion_blocked() is False
|
||||
|
||||
|
||||
def test_ac9_consistency_tol_5m_rejects_15m_delta() -> None:
|
||||
sm, clock, _fdr = _make_sm(min_stable_s=10.0, tol_m=5.0)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1.0 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.STABLE_NON_SPOOFED))
|
||||
|
||||
# 12 s stable + 15 m delta — dwell OK, consistency NOT OK.
|
||||
clock.t_ns = int(13.0 * 1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=15.0)
|
||||
assert sm.is_spoof_promotion_blocked() is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-10: every label change emits ONE state-transition INFO log
|
||||
|
||||
|
||||
def test_ac10_label_change_emits_info_log() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
with mock.patch.object(sm, "_log") as log:
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
log.info.assert_called()
|
||||
kinds = {call.args[0] for call in log.info.call_args_list}
|
||||
assert "c5.state.source_label_changed" in kinds
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-11: reject FDR record shape
|
||||
|
||||
|
||||
def test_ac11_reject_fdr_record_shape() -> None:
|
||||
sm, clock, fdr = _make_sm()
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
fdr.enqueue.assert_called_once()
|
||||
record = fdr.enqueue.call_args.args[0]
|
||||
assert record.kind == "c5.state.spoof_rejected"
|
||||
assert record.producer_id == "c5_state"
|
||||
assert set(record.payload.keys()) == {
|
||||
"reason",
|
||||
"gps_health",
|
||||
"time_since_stable_s",
|
||||
"visual_consistency_delta_m",
|
||||
}
|
||||
assert record.payload["gps_health"] == "spoofed"
|
||||
assert record.payload["time_since_stable_s"] == pytest.approx(0.0)
|
||||
# The ts field is a non-empty ISO-8601 string at the decode boundary.
|
||||
datetime.fromisoformat(record.ts.replace("Z", "+00:00")) if record.ts.endswith(
|
||||
"Z"
|
||||
) else datetime.fromisoformat(record.ts).astimezone(timezone.utc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-12: STATUSTEXT severity + 50-char cap
|
||||
|
||||
|
||||
def test_ac12_statustext_severity_is_warning() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
seen: list[Severity] = []
|
||||
sm.subscribe_rejection(lambda _reason, sev, _text: seen.append(sev))
|
||||
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
assert seen == [Severity.WARNING]
|
||||
|
||||
|
||||
def test_ac12_statustext_max_50_chars() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
seen: list[str] = []
|
||||
sm.subscribe_rejection(lambda _reason, _sev, text: seen.append(text))
|
||||
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
assert len(seen) == 1
|
||||
assert len(seen[0]) <= 50
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Subscription cancellation
|
||||
|
||||
|
||||
def test_subscription_cancel_stops_callbacks() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
seen: list[str] = []
|
||||
sub = sm.subscribe_rejection(lambda reason, _sev, _text: seen.append(reason))
|
||||
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
assert seen == ["gps_spoofed"]
|
||||
|
||||
sub.cancel()
|
||||
clock.t_ns = int(2e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
assert seen == ["gps_spoofed"] # unchanged
|
||||
|
||||
|
||||
def test_subscriber_exception_does_not_break_state_machine() -> None:
|
||||
sm, clock, _fdr = _make_sm()
|
||||
|
||||
def _broken(_reason: str, _sev: Severity, _text: str) -> None:
|
||||
raise RuntimeError("subscriber crash")
|
||||
|
||||
sm.subscribe_rejection(_broken)
|
||||
clock.t_ns = int(0.5 * 1e9)
|
||||
sm.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
clock.t_ns = int(1e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
|
||||
# A subsequent attempt — should still emit (state machine intact).
|
||||
seen: list[str] = []
|
||||
sm.subscribe_rejection(lambda reason, _sev, _text: seen.append(reason))
|
||||
clock.t_ns = int(2e9)
|
||||
sm.notify_satellite_anchor(now_ns=clock.t_ns, gps_consistency_delta_m=None)
|
||||
assert seen == ["gps_spoofed"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Estimator wire-up — current_estimate carries the SM's label
|
||||
|
||||
|
||||
def _seed_prior(estimator: GtsamIsam2StateEstimator) -> int:
|
||||
import gtsam_unstable
|
||||
|
||||
pose = gtsam.Pose3()
|
||||
key = gtsam.symbol("x", estimator._next_key_counter)
|
||||
estimator._next_key_counter += 1
|
||||
noise = gtsam.noiseModel.Isotropic.Sigma(6, 0.1)
|
||||
graph = gtsam.NonlinearFactorGraph()
|
||||
graph.add(gtsam.PriorFactorPose3(key, pose, noise))
|
||||
values = gtsam.Values()
|
||||
values.insert(key, pose)
|
||||
ts_map = gtsam_unstable.FixedLagSmootherKeyTimestampMap()
|
||||
ts_map.insert((key, 0.0))
|
||||
estimator._isam2_handle.update(graph, values, timestamps=ts_map)
|
||||
estimator._record_committed_pose_key(key)
|
||||
return key
|
||||
|
||||
|
||||
def test_estimator_current_estimate_label_reflects_sm() -> None:
|
||||
estimator = _build_estimator()
|
||||
_seed_prior(estimator)
|
||||
|
||||
out_before = estimator.current_estimate()
|
||||
assert out_before.source_label == PoseSourceLabel.DEAD_RECKONED
|
||||
|
||||
# Drive a satellite anchor — the SM should transition.
|
||||
estimator._source_label_machine.notify_satellite_anchor(
|
||||
now_ns=estimator._source_label_machine._clock_ns(),
|
||||
gps_consistency_delta_m=None,
|
||||
)
|
||||
|
||||
out_after = estimator.current_estimate()
|
||||
assert out_after.source_label == PoseSourceLabel.SATELLITE_ANCHORED
|
||||
|
||||
|
||||
def test_estimator_subscribe_spoof_rejection_returns_handle() -> None:
|
||||
estimator = _build_estimator()
|
||||
seen: list[str] = []
|
||||
sub = estimator.subscribe_spoof_rejection(lambda reason, _sev, _text: seen.append(reason))
|
||||
|
||||
estimator.notify_gps_health(_gps(GpsStatus.SPOOFED))
|
||||
estimator._source_label_machine.notify_satellite_anchor(
|
||||
now_ns=estimator._source_label_machine._clock_ns(),
|
||||
gps_consistency_delta_m=None,
|
||||
)
|
||||
|
||||
assert seen == ["gps_spoofed"]
|
||||
sub.cancel()
|
||||
|
||||
|
||||
def test_estimator_subscribe_after_attach_with_stub_raises() -> None:
|
||||
estimator = _build_estimator()
|
||||
stub = mock.MagicMock()
|
||||
stub.current_label = mock.MagicMock(return_value=PoseSourceLabel.SATELLITE_ANCHORED)
|
||||
stub.is_spoof_promotion_blocked = mock.MagicMock(return_value=False)
|
||||
|
||||
estimator.attach_source_label_state_machine(stub)
|
||||
|
||||
with pytest.raises(StateEstimatorConfigError):
|
||||
estimator.subscribe_spoof_rejection(lambda *_args: None)
|
||||
Reference in New Issue
Block a user