"""FT-N-04 — Visual blackout + spoofed GPS combined failsafe (AZ-426 / AC-3.5, AC-NEW-8). Three sub-cases (5 s / 15 s / 35 s) at the ladder of windows prescribed by AC-3.5 + AC-NEW-8, replayed via the AZ-408 ``blackout_spoof`` injector + the FC-inbound spoof proxy, and validated by ``runner.helpers.blackout_spoof_evaluator``. Gated on the same upstream replay helpers as the other negative scenarios (``frame_source_replay``, ``fdr_reader``, ``mavproxy_tlog_reader``, ``sitl_observer``, ``fc_proxy`` runtime binding). When those helpers are still stubbed the scenario test skips while ``e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py`` covers the AC-1..AC-8 evaluator logic. """ from __future__ import annotations from pathlib import Path import pytest from fixtures.injectors.blackout_spoof import BlackoutSpoofReport from runner.helpers import blackout_spoof_evaluator as bse _WINDOW_LADDER_S = (5.0, 15.0, 35.0) @pytest.fixture(scope="module") def _harness_helpers_implemented() -> bool: from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer try: replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type] try: replayer.replay_video(Path("/tmp/non-existent.mp4")) except NotImplementedError: return False try: list(fdr_reader.iter_records(Path("/tmp/non-existent"))) except NotImplementedError: return False try: list(mavproxy_tlog_reader.iter_messages(Path("/tmp/non-existent.tlog"))) except NotImplementedError: return False try: sitl_observer.read_gps_health_samples() # type: ignore[attr-defined] sitl_observer.read_consistency_check_events() # type: ignore[attr-defined] except (AttributeError, NotImplementedError): return False return True except Exception: return False class _NullSink: def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: return None @pytest.mark.parametrize( "blackout_spoof_derkachi", [{"window_seconds": s, "seed": 0} for s in _WINDOW_LADDER_S], indirect=True, ids=[f"{int(s)}s" for s in _WINDOW_LADDER_S], ) @pytest.mark.traces_to( "AC-3.5,AC-NEW-8,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6,AC-7,AC-8,AC-9" ) def test_ft_n_04_blackout_spoof( fc_adapter: str, vio_strategy: str, blackout_spoof_derkachi: BlackoutSpoofReport, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] _harness_helpers_implemented: bool, ) -> None: if not _harness_helpers_implemented: pytest.skip( "FT-N-04 full replay requires runner.helpers.{frame_source_replay," "fdr_reader,mavproxy_tlog_reader,sitl_observer,fc_proxy} — currently " "AZ-441 / AZ-407 / AZ-416 leftovers. AC-1..AC-8 evaluator logic " "covered by e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py." ) from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer schedule = blackout_spoof_derkachi.schedule window = bse.BlackoutWindow( onset_monotonic_ms=schedule.window_start_ms, end_monotonic_ms=schedule.window_end_ms, ) is_35s = abs(window.duration_s - 35.0) < 0.5 # 1. Drive replay (frames + paired fc-proxy spoof injection). FrameSourceReplayer(_resolve_frame_sink()).replay_video( blackout_spoof_derkachi.out_root / "frames" ) _drive_fc_proxy(blackout_spoof_derkachi.out_root / "schedule.json") # 2. Collect FDR estimates + spoof-rejected events. fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" estimates: list[bse.OutboundEstimateSample] = [] spoof_events: list[bse.SpoofRejectedEvent] = [] for rec in fdr_reader.iter_records(fdr_root): if rec.record_type == "outbound_estimate": p = rec.payload estimates.append( bse.OutboundEstimateSample( monotonic_ms=int(rec.monotonic_ms), source_label=str(p["source_label"]), # type: ignore[arg-type] cov_semi_major_m=float(p["cov_semi_major_m"]), # type: ignore[arg-type] horiz_accuracy=float(p.get("horiz_accuracy", p["cov_semi_major_m"])), # type: ignore[arg-type] fix_type=int(p.get("fix_type", -1)), # type: ignore[arg-type] ) ) elif rec.record_type == "spoof_rejected": spoof_events.append( bse.SpoofRejectedEvent( monotonic_ms=int(rec.monotonic_ms), reason=str(rec.payload.get("reason", "")), # type: ignore[arg-type] ) ) # 3. Collect STATUSTEXTs from mavproxy tlog. tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog" statustexts = [ bse.StatustextSample( monotonic_ms=int(m.timestamp_us // 1000), text=str(m.fields.get("text", "")), ) for m in mavproxy_tlog_reader.iter_messages(tlog_path) if m.msg_type == "STATUSTEXT" ] # 4. Collect FC-side GPS health + consistency-check events (recovery gate). gps_health = [ bse.GpsHealthSample( monotonic_ms=int(s.monotonic_ms), healthy=bool(s.healthy), spoofed=bool(s.spoofed), ) for s in sitl_observer.read_gps_health_samples() # type: ignore[attr-defined] ] consistency = [ bse.ConsistencyCheckEvent( monotonic_ms=int(c.monotonic_ms), passed=bool(c.passed) ) for c in sitl_observer.read_consistency_check_events() # type: ignore[attr-defined] ] # 5. Evaluate. report = bse.evaluate( window, estimates=estimates, statustexts=statustexts, spoof_events=spoof_events, gps_health=gps_health, consistency_checks=consistency, frame_period_ms=_resolve_frame_period_ms(), is_35s_window=is_35s, ) out_csv = ( evidence_dir / f"ft-n-04-{int(window.duration_s)}s-{fc_adapter}-{vio_strategy}.csv" ) bse.write_csv_evidence(out_csv, report) # 6. NFR metrics + AC assertions. nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.switch_latency_ms", float(report.switch_latency.first_dead_reckoned_offset_ms or 0), ac_id="AC-1", ) nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.spoof_rejected_count", float(report.spoof_rejection.spoof_rejected_count), ac_id="AC-2", ) nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.honest_accuracy_violation_count", float(report.honest_accuracy.violation_count), ac_id="AC-4", ) if report.statustext_rate.observed_hz is not None: nfr_recorder.record_metric( f"ft_n_04.{int(window.duration_s)}s.statustext_imu_only_hz", report.statustext_rate.observed_hz, ac_id="AC-5", ) if is_35s: nfr_recorder.record_metric( "ft_n_04.35s.cov2d_at_ms", float(report.escalation.cov2d_crossed_at_ms or 0), ac_id="AC-6", ) nfr_recorder.record_metric( "ft_n_04.35s.failsafe_trigger_at_ms", float(report.escalation.cov500_or_30s_crossed_at_ms or 0), ac_id="AC-7", ) assert report.switch_latency.passes, ( f"AC-1: dead_reckoned label not within ≤{bse.SWITCH_LATENCY_MS} ms / " f"1 frame of blackout onset; " f"offset={report.switch_latency.first_dead_reckoned_offset_ms} ms, " f"frame_period={report.switch_latency.frame_period_ms} ms" ) assert report.spoof_rejection.passes, ( f"AC-2: spoof rejection failed; " f"rejected_count={report.spoof_rejection.spoof_rejected_count}, " f"re_anchored_count={report.spoof_rejection.satellite_anchored_inside_window}" ) assert report.covariance_monotonic.passes, ( f"AC-3: cov_semi_major_m decreased at " f"{report.covariance_monotonic.first_decreasing_at_ms} ms" ) assert report.honest_accuracy.passes, ( f"AC-4: horiz_accuracy under-reporting " f"({report.honest_accuracy.violation_count} violations of " f"{report.honest_accuracy.sample_count} samples)" ) assert report.statustext_rate.passes, ( f"AC-5: VISUAL_BLACKOUT_IMU_ONLY rate " f"{report.statustext_rate.observed_hz} Hz outside " f"[{bse.STATUSTEXT_RATE_MIN_HZ}, {bse.STATUSTEXT_RATE_MAX_HZ}] Hz" ) if is_35s: assert report.escalation.passes_ac6, ( f"AC-6: fix_type not degraded after cov crossed " f"{bse.ESCALATION_COV_2D_M} m at " f"{report.escalation.cov2d_crossed_at_ms} ms" ) assert report.escalation.passes_ac7, ( f"AC-7: failsafe escalation incomplete; " f"horiz_999={report.escalation.horiz_accuracy_999}, " f"failsafe_statustext_offset_ms=" f"{report.escalation.failsafe_statustext_offset_ms}" ) assert report.recovery_gate.passes, ( f"AC-8: recovery gate failed; " f"recovery_at_ms={report.recovery_gate.recovery_at_ms}, " f"stable_period_s={report.recovery_gate.stable_period_s}, " f"consistency_check_passed={report.recovery_gate.consistency_check_passed}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] raise NotImplementedError( "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" ) def _drive_fc_proxy(schedule_path: Path) -> None: raise NotImplementedError( "FC-inbound spoof proxy driver is owned by AZ-441 / runner.helpers.fc_proxy_runtime" ) def _resolve_frame_period_ms() -> int: raise NotImplementedError( "Frame period resolution is owned by AZ-441 / runner.helpers.frame_source_replay" )