"""FT-N-04 — Visual blackout + spoofed GPS combined failsafe (AZ-426 / AC-3.5, AC-NEW-8). Three sub-cases (5 s / 15 s / 35 s) at the ladder of windows prescribed by AC-3.5 + AC-NEW-8, replayed via the AZ-408 ``blackout_spoof`` injector + the FC-inbound spoof proxy, and validated by ``runner.helpers.blackout_spoof_evaluator``. Gated on the same upstream replay helpers as the other negative scenarios (``frame_source_replay``, ``fdr_reader``, ``mavproxy_tlog_reader``, ``sitl_observer``, ``fc_proxy`` runtime binding). When those helpers are still stubbed the scenario test skips while ``e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py`` covers the AC-1..AC-8 evaluator logic. """ from __future__ import annotations from pathlib import Path import pytest from fixtures.injectors.blackout_spoof import BlackoutSpoofReport from runner.helpers import blackout_spoof_evaluator as bse _WINDOW_LADDER_S = (5.0, 15.0, 35.0) @pytest.mark.parametrize( "blackout_spoof_derkachi", [{"window_seconds": s, "seed": 0} for s in _WINDOW_LADDER_S], indirect=True, ids=[f"{int(s)}s" for s in _WINDOW_LADDER_S], ) @pytest.mark.traces_to( "AC-3.5,AC-NEW-8,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6,AC-7,AC-8,AC-9" ) def test_ft_n_04_blackout_spoof( fc_adapter: str, vio_strategy: str, blackout_spoof_derkachi: BlackoutSpoofReport, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: if not sitl_replay_ready: pytest.skip( "FT-N-04 full replay requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595) AND a runtime fc_proxy " "driver. AC-1..AC-8 evaluator logic covered by " "e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py." ) from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer schedule = blackout_spoof_derkachi.schedule window = bse.BlackoutWindow( onset_monotonic_ms=schedule.window_start_ms, end_monotonic_ms=schedule.window_end_ms, ) is_35s = abs(window.duration_s - 35.0) < 0.5 # 1. Drive replay (frames + paired fc-proxy spoof injection). FrameSourceReplayer(_resolve_frame_sink()).replay_video( blackout_spoof_derkachi.out_root / "frames" ) _drive_fc_proxy(blackout_spoof_derkachi.out_root / "schedule.json") # 2. Collect FDR estimates + spoof-rejected events. fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" estimates: list[bse.OutboundEstimateSample] = [] spoof_events: list[bse.SpoofRejectedEvent] = [] for rec in fdr_reader.iter_records(fdr_root): if rec.record_type == "outbound_estimate": p = rec.payload estimates.append( bse.OutboundEstimateSample( monotonic_ms=int(rec.monotonic_ms), source_label=str(p["source_label"]), # type: ignore[arg-type] cov_semi_major_m=float(p["cov_semi_major_m"]), # type: ignore[arg-type] horiz_accuracy=float(p.get("horiz_accuracy", p["cov_semi_major_m"])), # type: ignore[arg-type] fix_type=int(p.get("fix_type", -1)), # type: ignore[arg-type] ) ) elif rec.record_type == "spoof_rejected": spoof_events.append( bse.SpoofRejectedEvent( monotonic_ms=int(rec.monotonic_ms), reason=str(rec.payload.get("reason", "")), # type: ignore[arg-type] ) ) # 3. Collect STATUSTEXTs from mavproxy tlog. tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog" statustexts = [ bse.StatustextSample( monotonic_ms=int(m.timestamp_us // 1000), text=str(m.fields.get("text", "")), ) for m in mavproxy_tlog_reader.iter_messages(tlog_path) if m.msg_type == "STATUSTEXT" ] # 4. Collect FC-side GPS health + consistency-check events (recovery gate). gps_health = [ bse.GpsHealthSample( monotonic_ms=int(s.monotonic_ms), healthy=bool(s.healthy), spoofed=bool(s.spoofed), ) for s in sitl_observer.read_gps_health_samples() # type: ignore[attr-defined] ] consistency = [ bse.ConsistencyCheckEvent( monotonic_ms=int(c.monotonic_ms), passed=bool(c.passed) ) for c in sitl_observer.read_consistency_check_events() # type: ignore[attr-defined] ] # 5. Evaluate. report = bse.evaluate( window, estimates=estimates, statustexts=statustexts, spoof_events=spoof_events, gps_health=gps_health, consistency_checks=consistency, frame_period_ms=_resolve_frame_period_ms(), is_35s_window=is_35s, ) out_csv = ( evidence_dir / f"ft-n-04-{int(window.duration_s)}s-{fc_adapter}-{vio_strategy}.csv" ) bse.write_csv_evidence(out_csv, report) # 6. NFR metrics + AC assertions. nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.switch_latency_ms", float(report.switch_latency.first_dead_reckoned_offset_ms or 0), ac_id="AC-1", ) nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.spoof_rejected_count", float(report.spoof_rejection.spoof_rejected_count), ac_id="AC-2", ) nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.honest_accuracy_violation_count", float(report.honest_accuracy.violation_count), ac_id="AC-4", ) if report.statustext_rate.observed_hz is not None: nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.statustext_imu_only_hz", report.statustext_rate.observed_hz, ac_id="AC-5", ) if is_35s: nfr_recorder.record_metric( "ft_n_04.35s.cov2d_at_ms", float(report.escalation.cov2d_crossed_at_ms or 0), ac_id="AC-6", ) nfr_recorder.record_metric( "ft_n_04.35s.failsafe_trigger_at_ms", float(report.escalation.cov500_or_30s_crossed_at_ms or 0), ac_id="AC-7", ) assert report.switch_latency.passes, ( f"AC-1: dead_reckoned label not within ≤{bse.SWITCH_LATENCY_MS} ms / " f"1 frame of blackout onset; " f"offset={report.switch_latency.first_dead_reckoned_offset_ms} ms, " f"frame_period={report.switch_latency.frame_period_ms} ms" ) assert report.spoof_rejection.passes, ( f"AC-2: spoof rejection failed; " f"rejected_count={report.spoof_rejection.spoof_rejected_count}, " f"re_anchored_count={report.spoof_rejection.satellite_anchored_inside_window}" ) assert report.covariance_monotonic.passes, ( f"AC-3: cov_semi_major_m decreased at " f"{report.covariance_monotonic.first_decreasing_at_ms} ms" ) assert report.honest_accuracy.passes, ( f"AC-4: horiz_accuracy under-reporting " f"({report.honest_accuracy.violation_count} violations of " f"{report.honest_accuracy.sample_count} samples)" ) assert report.statustext_rate.passes, ( f"AC-5: VISUAL_BLACKOUT_IMU_ONLY rate " f"{report.statustext_rate.observed_hz} Hz outside " f"[{bse.STATUSTEXT_RATE_MIN_HZ}, {bse.STATUSTEXT_RATE_MAX_HZ}] Hz" ) if is_35s: assert report.escalation.passes_ac6, ( f"AC-6: fix_type not degraded after cov crossed " f"{bse.ESCALATION_COV_2D_M} m at " f"{report.escalation.cov2d_crossed_at_ms} ms" ) assert report.escalation.passes_ac7, ( f"AC-7: failsafe escalation incomplete; " f"horiz_999={report.escalation.horiz_accuracy_999}, " f"failsafe_statustext_offset_ms=" f"{report.escalation.failsafe_statustext_offset_ms}" ) assert report.recovery_gate.passes, ( f"AC-8: recovery gate failed; " f"recovery_at_ms={report.recovery_gate.recovery_at_ms}, " f"stable_period_s={report.recovery_gate.stable_period_s}, " f"consistency_check_passed={report.recovery_gate.consistency_check_passed}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] """Return a replay-mode `FrameSink` (counter-only; AZ-597).""" from runner.helpers.replay_mode import NullFrameSink return NullFrameSink() def _drive_fc_proxy(schedule_path: Path) -> None: from runner.helpers.fc_proxy_runtime import drive_fc_proxy drive_fc_proxy(schedule_path) def _resolve_frame_period_ms() -> int: """Return the default 30 fps per-frame period (AZ-597).""" from runner.helpers.replay_mode import default_frame_period_ms return default_frame_period_ms()