mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 09:41:13 +00:00
f49d803252
Add `runner/helpers/replay_mode.py` (NullFrameSink, NullFcInboundEmitter, default_frame_period_ms, load_replay_json, resolve_replay_subdir, imu_replay_noop) and rewire all 13 scenarios off their local `_resolve_*` / `_drive_*` / `_push_*` NotImplementedError stubs. Closes the offline FDR-replay execution path. `grep raise NotImplementedError` under `e2e/tests/` now returns zero matches. +17 unit tests (626 total, up from 608). Unit-test behaviour unchanged (scenarios still skip via b75 sitl_replay_ready gate when E2E_SITL_REPLAY_DIR is unset). Co-authored-by: Cursor <cursoragent@cursor.com>
236 lines
8.8 KiB
Python
236 lines
8.8 KiB
Python
"""FT-N-04 — Visual blackout + spoofed GPS combined failsafe (AZ-426 / AC-3.5, AC-NEW-8).
|
|
|
|
Three sub-cases (5 s / 15 s / 35 s) at the ladder of windows
|
|
prescribed by AC-3.5 + AC-NEW-8, replayed via the AZ-408
|
|
``blackout_spoof`` injector + the FC-inbound spoof proxy, and
|
|
validated by ``runner.helpers.blackout_spoof_evaluator``.
|
|
|
|
Gated on the same upstream replay helpers as the other negative
|
|
scenarios (``frame_source_replay``, ``fdr_reader``,
|
|
``mavproxy_tlog_reader``, ``sitl_observer``, ``fc_proxy`` runtime
|
|
binding). When those helpers are still stubbed the scenario test
|
|
skips while
|
|
``e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py`` covers
|
|
the AC-1..AC-8 evaluator logic.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from fixtures.injectors.blackout_spoof import BlackoutSpoofReport
|
|
from runner.helpers import blackout_spoof_evaluator as bse
|
|
|
|
_WINDOW_LADDER_S = (5.0, 15.0, 35.0)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"blackout_spoof_derkachi",
|
|
[{"window_seconds": s, "seed": 0} for s in _WINDOW_LADDER_S],
|
|
indirect=True,
|
|
ids=[f"{int(s)}s" for s in _WINDOW_LADDER_S],
|
|
)
|
|
@pytest.mark.traces_to(
|
|
"AC-3.5,AC-NEW-8,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6,AC-7,AC-8,AC-9"
|
|
)
|
|
def test_ft_n_04_blackout_spoof(
|
|
fc_adapter: str,
|
|
vio_strategy: str,
|
|
blackout_spoof_derkachi: BlackoutSpoofReport,
|
|
evidence_dir, # type: ignore[no-untyped-def]
|
|
run_id: str,
|
|
nfr_recorder, # type: ignore[no-untyped-def]
|
|
sitl_replay_ready: bool,
|
|
) -> None:
|
|
if not sitl_replay_ready:
|
|
pytest.skip(
|
|
"FT-N-04 full replay requires `E2E_SITL_REPLAY_DIR` to point at a "
|
|
"prepared SITL replay fixture (AZ-595) AND a runtime fc_proxy "
|
|
"driver. AC-1..AC-8 evaluator logic covered by "
|
|
"e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py."
|
|
)
|
|
|
|
from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer
|
|
from runner.helpers.frame_source_replay import FrameSourceReplayer
|
|
|
|
schedule = blackout_spoof_derkachi.schedule
|
|
window = bse.BlackoutWindow(
|
|
onset_monotonic_ms=schedule.window_start_ms,
|
|
end_monotonic_ms=schedule.window_end_ms,
|
|
)
|
|
is_35s = abs(window.duration_s - 35.0) < 0.5
|
|
|
|
# 1. Drive replay (frames + paired fc-proxy spoof injection).
|
|
FrameSourceReplayer(_resolve_frame_sink()).replay_video(
|
|
blackout_spoof_derkachi.out_root / "frames"
|
|
)
|
|
_drive_fc_proxy(blackout_spoof_derkachi.out_root / "schedule.json")
|
|
|
|
# 2. Collect FDR estimates + spoof-rejected events.
|
|
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
|
|
estimates: list[bse.OutboundEstimateSample] = []
|
|
spoof_events: list[bse.SpoofRejectedEvent] = []
|
|
for rec in fdr_reader.iter_records(fdr_root):
|
|
if rec.record_type == "outbound_estimate":
|
|
p = rec.payload
|
|
estimates.append(
|
|
bse.OutboundEstimateSample(
|
|
monotonic_ms=int(rec.monotonic_ms),
|
|
source_label=str(p["source_label"]), # type: ignore[arg-type]
|
|
cov_semi_major_m=float(p["cov_semi_major_m"]), # type: ignore[arg-type]
|
|
horiz_accuracy=float(p.get("horiz_accuracy", p["cov_semi_major_m"])), # type: ignore[arg-type]
|
|
fix_type=int(p.get("fix_type", -1)), # type: ignore[arg-type]
|
|
)
|
|
)
|
|
elif rec.record_type == "spoof_rejected":
|
|
spoof_events.append(
|
|
bse.SpoofRejectedEvent(
|
|
monotonic_ms=int(rec.monotonic_ms),
|
|
reason=str(rec.payload.get("reason", "")), # type: ignore[arg-type]
|
|
)
|
|
)
|
|
|
|
# 3. Collect STATUSTEXTs from mavproxy tlog.
|
|
tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog"
|
|
statustexts = [
|
|
bse.StatustextSample(
|
|
monotonic_ms=int(m.timestamp_us // 1000),
|
|
text=str(m.fields.get("text", "")),
|
|
)
|
|
for m in mavproxy_tlog_reader.iter_messages(tlog_path)
|
|
if m.msg_type == "STATUSTEXT"
|
|
]
|
|
|
|
# 4. Collect FC-side GPS health + consistency-check events (recovery gate).
|
|
gps_health = [
|
|
bse.GpsHealthSample(
|
|
monotonic_ms=int(s.monotonic_ms),
|
|
healthy=bool(s.healthy),
|
|
spoofed=bool(s.spoofed),
|
|
)
|
|
for s in sitl_observer.read_gps_health_samples() # type: ignore[attr-defined]
|
|
]
|
|
consistency = [
|
|
bse.ConsistencyCheckEvent(
|
|
monotonic_ms=int(c.monotonic_ms), passed=bool(c.passed)
|
|
)
|
|
for c in sitl_observer.read_consistency_check_events() # type: ignore[attr-defined]
|
|
]
|
|
|
|
# 5. Evaluate.
|
|
report = bse.evaluate(
|
|
window,
|
|
estimates=estimates,
|
|
statustexts=statustexts,
|
|
spoof_events=spoof_events,
|
|
gps_health=gps_health,
|
|
consistency_checks=consistency,
|
|
frame_period_ms=_resolve_frame_period_ms(),
|
|
is_35s_window=is_35s,
|
|
)
|
|
out_csv = (
|
|
evidence_dir
|
|
/ f"ft-n-04-{int(window.duration_s)}s-{fc_adapter}-{vio_strategy}.csv"
|
|
)
|
|
bse.write_csv_evidence(out_csv, report)
|
|
|
|
# 6. NFR metrics + AC assertions.
|
|
nfr_recorder.record_metric(
|
|
f"ft_n_04.{int(window.duration_s)}s.switch_latency_ms",
|
|
float(report.switch_latency.first_dead_reckoned_offset_ms or 0),
|
|
ac_id="AC-1",
|
|
)
|
|
nfr_recorder.record_metric(
|
|
f"ft_n_04.{int(window.duration_s)}s.spoof_rejected_count",
|
|
float(report.spoof_rejection.spoof_rejected_count),
|
|
ac_id="AC-2",
|
|
)
|
|
nfr_recorder.record_metric(
|
|
f"ft_n_04.{int(window.duration_s)}s.honest_accuracy_violation_count",
|
|
float(report.honest_accuracy.violation_count),
|
|
ac_id="AC-4",
|
|
)
|
|
if report.statustext_rate.observed_hz is not None:
|
|
nfr_recorder.record_metric(
|
|
f"ft_n_04.{int(window.duration_s)}s.statustext_imu_only_hz",
|
|
report.statustext_rate.observed_hz,
|
|
ac_id="AC-5",
|
|
)
|
|
if is_35s:
|
|
nfr_recorder.record_metric(
|
|
"ft_n_04.35s.cov2d_at_ms",
|
|
float(report.escalation.cov2d_crossed_at_ms or 0),
|
|
ac_id="AC-6",
|
|
)
|
|
nfr_recorder.record_metric(
|
|
"ft_n_04.35s.failsafe_trigger_at_ms",
|
|
float(report.escalation.cov500_or_30s_crossed_at_ms or 0),
|
|
ac_id="AC-7",
|
|
)
|
|
|
|
assert report.switch_latency.passes, (
|
|
f"AC-1: dead_reckoned label not within ≤{bse.SWITCH_LATENCY_MS} ms / "
|
|
f"1 frame of blackout onset; "
|
|
f"offset={report.switch_latency.first_dead_reckoned_offset_ms} ms, "
|
|
f"frame_period={report.switch_latency.frame_period_ms} ms"
|
|
)
|
|
assert report.spoof_rejection.passes, (
|
|
f"AC-2: spoof rejection failed; "
|
|
f"rejected_count={report.spoof_rejection.spoof_rejected_count}, "
|
|
f"re_anchored_count={report.spoof_rejection.satellite_anchored_inside_window}"
|
|
)
|
|
assert report.covariance_monotonic.passes, (
|
|
f"AC-3: cov_semi_major_m decreased at "
|
|
f"{report.covariance_monotonic.first_decreasing_at_ms} ms"
|
|
)
|
|
assert report.honest_accuracy.passes, (
|
|
f"AC-4: horiz_accuracy under-reporting "
|
|
f"({report.honest_accuracy.violation_count} violations of "
|
|
f"{report.honest_accuracy.sample_count} samples)"
|
|
)
|
|
assert report.statustext_rate.passes, (
|
|
f"AC-5: VISUAL_BLACKOUT_IMU_ONLY rate "
|
|
f"{report.statustext_rate.observed_hz} Hz outside "
|
|
f"[{bse.STATUSTEXT_RATE_MIN_HZ}, {bse.STATUSTEXT_RATE_MAX_HZ}] Hz"
|
|
)
|
|
if is_35s:
|
|
assert report.escalation.passes_ac6, (
|
|
f"AC-6: fix_type not degraded after cov crossed "
|
|
f"{bse.ESCALATION_COV_2D_M} m at "
|
|
f"{report.escalation.cov2d_crossed_at_ms} ms"
|
|
)
|
|
assert report.escalation.passes_ac7, (
|
|
f"AC-7: failsafe escalation incomplete; "
|
|
f"horiz_999={report.escalation.horiz_accuracy_999}, "
|
|
f"failsafe_statustext_offset_ms="
|
|
f"{report.escalation.failsafe_statustext_offset_ms}"
|
|
)
|
|
assert report.recovery_gate.passes, (
|
|
f"AC-8: recovery gate failed; "
|
|
f"recovery_at_ms={report.recovery_gate.recovery_at_ms}, "
|
|
f"stable_period_s={report.recovery_gate.stable_period_s}, "
|
|
f"consistency_check_passed={report.recovery_gate.consistency_check_passed}"
|
|
)
|
|
|
|
|
|
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
|
|
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
|
|
from runner.helpers.replay_mode import NullFrameSink
|
|
|
|
return NullFrameSink()
|
|
|
|
|
|
def _drive_fc_proxy(schedule_path: Path) -> None:
|
|
from runner.helpers.fc_proxy_runtime import drive_fc_proxy
|
|
|
|
drive_fc_proxy(schedule_path)
|
|
|
|
|
|
def _resolve_frame_period_ms() -> int:
|
|
"""Return the default 30 fps per-frame period (AZ-597)."""
|
|
from runner.helpers.replay_mode import default_frame_period_ms
|
|
|
|
return default_frame_period_ms()
|