Files
gps-denied-onboard/e2e/tests/negative/test_ft_n_04_blackout_spoof.py
Oleksandr Bezdieniezhnykh f49d803252 [AZ-597] Batch 77: replay_mode helpers + 13 scenario stub rewires
Add `runner/helpers/replay_mode.py` (NullFrameSink, NullFcInboundEmitter,
default_frame_period_ms, load_replay_json, resolve_replay_subdir,
imu_replay_noop) and rewire all 13 scenarios off their local
`_resolve_*` / `_drive_*` / `_push_*` NotImplementedError stubs.

Closes the offline FDR-replay execution path. `grep raise
NotImplementedError` under `e2e/tests/` now returns zero matches. +17
unit tests (626 total, up from 608). Unit-test behaviour unchanged
(scenarios still skip via b75 sitl_replay_ready gate when
E2E_SITL_REPLAY_DIR is unset).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 09:52:05 +03:00

236 lines
8.8 KiB
Python

"""FT-N-04 — Visual blackout + spoofed GPS combined failsafe (AZ-426 / AC-3.5, AC-NEW-8).
Three sub-cases (5 s / 15 s / 35 s) at the ladder of windows
prescribed by AC-3.5 + AC-NEW-8, replayed via the AZ-408
``blackout_spoof`` injector + the FC-inbound spoof proxy, and
validated by ``runner.helpers.blackout_spoof_evaluator``.
Gated on the same upstream replay helpers as the other negative
scenarios (``frame_source_replay``, ``fdr_reader``,
``mavproxy_tlog_reader``, ``sitl_observer``, ``fc_proxy`` runtime
binding). When those helpers are still stubbed the scenario test
skips while
``e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py`` covers
the AC-1..AC-8 evaluator logic.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from fixtures.injectors.blackout_spoof import BlackoutSpoofReport
from runner.helpers import blackout_spoof_evaluator as bse
_WINDOW_LADDER_S = (5.0, 15.0, 35.0)
@pytest.mark.parametrize(
"blackout_spoof_derkachi",
[{"window_seconds": s, "seed": 0} for s in _WINDOW_LADDER_S],
indirect=True,
ids=[f"{int(s)}s" for s in _WINDOW_LADDER_S],
)
@pytest.mark.traces_to(
"AC-3.5,AC-NEW-8,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6,AC-7,AC-8,AC-9"
)
def test_ft_n_04_blackout_spoof(
fc_adapter: str,
vio_strategy: str,
blackout_spoof_derkachi: BlackoutSpoofReport,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
if not sitl_replay_ready:
pytest.skip(
"FT-N-04 full replay requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) AND a runtime fc_proxy "
"driver. AC-1..AC-8 evaluator logic covered by "
"e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py."
)
from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
schedule = blackout_spoof_derkachi.schedule
window = bse.BlackoutWindow(
onset_monotonic_ms=schedule.window_start_ms,
end_monotonic_ms=schedule.window_end_ms,
)
is_35s = abs(window.duration_s - 35.0) < 0.5
# 1. Drive replay (frames + paired fc-proxy spoof injection).
FrameSourceReplayer(_resolve_frame_sink()).replay_video(
blackout_spoof_derkachi.out_root / "frames"
)
_drive_fc_proxy(blackout_spoof_derkachi.out_root / "schedule.json")
# 2. Collect FDR estimates + spoof-rejected events.
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
estimates: list[bse.OutboundEstimateSample] = []
spoof_events: list[bse.SpoofRejectedEvent] = []
for rec in fdr_reader.iter_records(fdr_root):
if rec.record_type == "outbound_estimate":
p = rec.payload
estimates.append(
bse.OutboundEstimateSample(
monotonic_ms=int(rec.monotonic_ms),
source_label=str(p["source_label"]), # type: ignore[arg-type]
cov_semi_major_m=float(p["cov_semi_major_m"]), # type: ignore[arg-type]
horiz_accuracy=float(p.get("horiz_accuracy", p["cov_semi_major_m"])), # type: ignore[arg-type]
fix_type=int(p.get("fix_type", -1)), # type: ignore[arg-type]
)
)
elif rec.record_type == "spoof_rejected":
spoof_events.append(
bse.SpoofRejectedEvent(
monotonic_ms=int(rec.monotonic_ms),
reason=str(rec.payload.get("reason", "")), # type: ignore[arg-type]
)
)
# 3. Collect STATUSTEXTs from mavproxy tlog.
tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog"
statustexts = [
bse.StatustextSample(
monotonic_ms=int(m.timestamp_us // 1000),
text=str(m.fields.get("text", "")),
)
for m in mavproxy_tlog_reader.iter_messages(tlog_path)
if m.msg_type == "STATUSTEXT"
]
# 4. Collect FC-side GPS health + consistency-check events (recovery gate).
gps_health = [
bse.GpsHealthSample(
monotonic_ms=int(s.monotonic_ms),
healthy=bool(s.healthy),
spoofed=bool(s.spoofed),
)
for s in sitl_observer.read_gps_health_samples() # type: ignore[attr-defined]
]
consistency = [
bse.ConsistencyCheckEvent(
monotonic_ms=int(c.monotonic_ms), passed=bool(c.passed)
)
for c in sitl_observer.read_consistency_check_events() # type: ignore[attr-defined]
]
# 5. Evaluate.
report = bse.evaluate(
window,
estimates=estimates,
statustexts=statustexts,
spoof_events=spoof_events,
gps_health=gps_health,
consistency_checks=consistency,
frame_period_ms=_resolve_frame_period_ms(),
is_35s_window=is_35s,
)
out_csv = (
evidence_dir
/ f"ft-n-04-{int(window.duration_s)}s-{fc_adapter}-{vio_strategy}.csv"
)
bse.write_csv_evidence(out_csv, report)
# 6. NFR metrics + AC assertions.
nfr_recorder.record_metric(
f"ft_n_04.{int(window.duration_s)}s.switch_latency_ms",
float(report.switch_latency.first_dead_reckoned_offset_ms or 0),
ac_id="AC-1",
)
nfr_recorder.record_metric(
f"ft_n_04.{int(window.duration_s)}s.spoof_rejected_count",
float(report.spoof_rejection.spoof_rejected_count),
ac_id="AC-2",
)
nfr_recorder.record_metric(
f"ft_n_04.{int(window.duration_s)}s.honest_accuracy_violation_count",
float(report.honest_accuracy.violation_count),
ac_id="AC-4",
)
if report.statustext_rate.observed_hz is not None:
nfr_recorder.record_metric(
f"ft_n_04.{int(window.duration_s)}s.statustext_imu_only_hz",
report.statustext_rate.observed_hz,
ac_id="AC-5",
)
if is_35s:
nfr_recorder.record_metric(
"ft_n_04.35s.cov2d_at_ms",
float(report.escalation.cov2d_crossed_at_ms or 0),
ac_id="AC-6",
)
nfr_recorder.record_metric(
"ft_n_04.35s.failsafe_trigger_at_ms",
float(report.escalation.cov500_or_30s_crossed_at_ms or 0),
ac_id="AC-7",
)
assert report.switch_latency.passes, (
f"AC-1: dead_reckoned label not within ≤{bse.SWITCH_LATENCY_MS} ms / "
f"1 frame of blackout onset; "
f"offset={report.switch_latency.first_dead_reckoned_offset_ms} ms, "
f"frame_period={report.switch_latency.frame_period_ms} ms"
)
assert report.spoof_rejection.passes, (
f"AC-2: spoof rejection failed; "
f"rejected_count={report.spoof_rejection.spoof_rejected_count}, "
f"re_anchored_count={report.spoof_rejection.satellite_anchored_inside_window}"
)
assert report.covariance_monotonic.passes, (
f"AC-3: cov_semi_major_m decreased at "
f"{report.covariance_monotonic.first_decreasing_at_ms} ms"
)
assert report.honest_accuracy.passes, (
f"AC-4: horiz_accuracy under-reporting "
f"({report.honest_accuracy.violation_count} violations of "
f"{report.honest_accuracy.sample_count} samples)"
)
assert report.statustext_rate.passes, (
f"AC-5: VISUAL_BLACKOUT_IMU_ONLY rate "
f"{report.statustext_rate.observed_hz} Hz outside "
f"[{bse.STATUSTEXT_RATE_MIN_HZ}, {bse.STATUSTEXT_RATE_MAX_HZ}] Hz"
)
if is_35s:
assert report.escalation.passes_ac6, (
f"AC-6: fix_type not degraded after cov crossed "
f"{bse.ESCALATION_COV_2D_M} m at "
f"{report.escalation.cov2d_crossed_at_ms} ms"
)
assert report.escalation.passes_ac7, (
f"AC-7: failsafe escalation incomplete; "
f"horiz_999={report.escalation.horiz_accuracy_999}, "
f"failsafe_statustext_offset_ms="
f"{report.escalation.failsafe_statustext_offset_ms}"
)
assert report.recovery_gate.passes, (
f"AC-8: recovery gate failed; "
f"recovery_at_ms={report.recovery_gate.recovery_at_ms}, "
f"stable_period_s={report.recovery_gate.stable_period_s}, "
f"consistency_check_passed={report.recovery_gate.consistency_check_passed}"
)
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
from runner.helpers.replay_mode import NullFrameSink
return NullFrameSink()
def _drive_fc_proxy(schedule_path: Path) -> None:
from runner.helpers.fc_proxy_runtime import drive_fc_proxy
drive_fc_proxy(schedule_path)
def _resolve_frame_period_ms() -> int:
"""Return the default 30 fps per-frame period (AZ-597)."""
from runner.helpers.replay_mode import default_frame_period_ms
return default_frame_period_ms()