"""Blackout-spoof evaluation for FT-N-04 (AZ-426 / AC-3.5 + AC-NEW-8). Three-window ladder (5 s / 15 s / 35 s) with the ``blackout_spoof.py`` injector + FC-inbound spoof proxy. The evaluator validates per AZ-426: * AC-1: switch latency — within ≤1 frame OR ≤``SWITCH_LATENCY_MS`` (whichever is shorter), the first outbound estimate after blackout onset carries ``source_label = dead_reckoned``. * AC-2: spoof rejection — at least one FDR ``spoof-rejected`` event is observed during the blackout window AND zero spoofed GPS records are consumed into the estimator (label never returns to ``satellite_anchored`` during the window). * AC-3: monotonic covariance — ``cov_semi_major_m`` is non-decreasing across consecutive emissions inside the window. * AC-4: honest horiz_accuracy — ``horiz_accuracy ≥ HONEST_ACCURACY_RATIO × cov_semi_major_m`` for every emission. * AC-5: STATUSTEXT 1-2 Hz — ``VISUAL_BLACKOUT_IMU_ONLY`` STATUSTEXT rate is in ``[STATUSTEXT_RATE_MIN_HZ, STATUSTEXT_RATE_MAX_HZ]`` throughout the window. * AC-6 (35 s only): when 95 % covariance crosses ``ESCALATION_COV_2D_M``, fix_type degrades to ≤``ESCALATION_FIX_TYPE_2D``. * AC-7 (35 s only): when 95 % covariance crosses ``ESCALATION_COV_FAILSAFE_M`` OR window duration exceeds ``ESCALATION_DURATION_FAILSAFE_S``, ``horiz_accuracy == HORIZ_ACCURACY_FAILSAFE`` AND ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT is emitted within ≤``ESCALATION_LATENCY_MS`` of the crossing. * AC-8: recovery gate — after blackout end, label only returns to ``satellite_anchored`` once both (a) FC GPS-health is stable + non-spoofed for ≥``RECOVERY_STABLE_S`` AND (b) a visual/satellite consistency check succeeds. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Iterable, Sequence # AC-1 SWITCH_LATENCY_MS = 400 # AC-2 DEAD_RECKONED_LABEL = "dead_reckoned" SATELLITE_ANCHORED_LABEL = "satellite_anchored" # AC-4 HONEST_ACCURACY_RATIO = 0.95 # AC-5 STATUSTEXT_IMU_ONLY = "VISUAL_BLACKOUT_IMU_ONLY" STATUSTEXT_RATE_MIN_HZ = 1.0 STATUSTEXT_RATE_MAX_HZ = 2.0 # AC-6 / AC-7 STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE" ESCALATION_COV_2D_M = 100.0 ESCALATION_COV_FAILSAFE_M = 500.0 ESCALATION_DURATION_FAILSAFE_S = 30.0 ESCALATION_FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D HORIZ_ACCURACY_FAILSAFE = 999.0 ESCALATION_LATENCY_MS = 500 # AC-8 RECOVERY_STABLE_S = 10.0 @dataclass(frozen=True) class BlackoutWindow: """The injector-emitted window the evaluator is bound to.""" onset_monotonic_ms: int end_monotonic_ms: int @property def duration_s(self) -> float: return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 @dataclass(frozen=True) class OutboundEstimateSample: """One outbound estimate with fields used by FT-N-04 ACs.""" monotonic_ms: int source_label: str cov_semi_major_m: float horiz_accuracy: float # AP GPS_INPUT.horiz_accuracy (m) fix_type: int # MAVLink GPS fix type (0..6); -1 if unavailable @dataclass(frozen=True) class StatustextSample: monotonic_ms: int text: str @dataclass(frozen=True) class SpoofRejectedEvent: """One FDR `spoof-rejected` event.""" monotonic_ms: int reason: str @dataclass(frozen=True) class GpsHealthSample: """FC-side GPS health sample (post-blackout, for recovery gate).""" monotonic_ms: int healthy: bool spoofed: bool @dataclass(frozen=True) class ConsistencyCheckEvent: """Visual/satellite consistency check outcome (post-blackout).""" monotonic_ms: int passed: bool @dataclass(frozen=True) class SwitchLatencyReport: """AC-1 result.""" first_dead_reckoned_offset_ms: int | None # ms after window onset frame_period_ms: int passes: bool @dataclass(frozen=True) class SpoofRejectionReport: """AC-2 result.""" spoof_rejected_count: int satellite_anchored_inside_window: int passes: bool @dataclass(frozen=True) class CovarianceMonotonicReport: """AC-3 result.""" first_decreasing_at_ms: int | None sample_count: int passes: bool @dataclass(frozen=True) class HonestAccuracyReport: """AC-4 result.""" violation_count: int sample_count: int passes: bool @dataclass(frozen=True) class StatustextRateReport: """AC-5 result for VISUAL_BLACKOUT_IMU_ONLY.""" observed_hz: float | None count: int passes: bool @dataclass(frozen=True) class EscalationReport: """AC-6 + AC-7 result (35 s window only — other windows return passes=True).""" cov2d_crossed: bool cov2d_crossed_at_ms: int | None fix_type_degraded: bool # AC-6 satisfied cov500_or_30s_crossed: bool cov500_or_30s_crossed_at_ms: int | None horiz_accuracy_999: bool # AC-7 part 1 failsafe_statustext_offset_ms: int | None failsafe_statustext_in_time: bool # AC-7 part 2 passes_ac6: bool passes_ac7: bool @property def passes(self) -> bool: return self.passes_ac6 and self.passes_ac7 @dataclass(frozen=True) class RecoveryGateReport: """AC-8 result.""" recovery_at_ms: int | None stable_period_s: float | None consistency_check_passed: bool passes: bool @dataclass(frozen=True) class BlackoutSpoofReport: """Aggregate FT-N-04 result for one window.""" window: BlackoutWindow switch_latency: SwitchLatencyReport spoof_rejection: SpoofRejectionReport covariance_monotonic: CovarianceMonotonicReport honest_accuracy: HonestAccuracyReport statustext_rate: StatustextRateReport escalation: EscalationReport recovery_gate: RecoveryGateReport @property def passes(self) -> bool: return all( ( self.switch_latency.passes, self.spoof_rejection.passes, self.covariance_monotonic.passes, self.honest_accuracy.passes, self.statustext_rate.passes, self.escalation.passes, self.recovery_gate.passes, ) ) def _inside_window(window: BlackoutWindow, t_ms: int) -> bool: return window.onset_monotonic_ms <= t_ms <= window.end_monotonic_ms def _samples_inside_window( window: BlackoutWindow, samples: Iterable[OutboundEstimateSample] ) -> list[OutboundEstimateSample]: return [s for s in samples if _inside_window(window, s.monotonic_ms)] def evaluate_switch_latency( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample], frame_period_ms: int, ) -> SwitchLatencyReport: """AC-1: dead_reckoned label within ≤1 frame OR ≤SWITCH_LATENCY_MS.""" budget_ms = min(SWITCH_LATENCY_MS, frame_period_ms) offset: int | None = None for s in estimates: if s.monotonic_ms < window.onset_monotonic_ms: continue if s.source_label == DEAD_RECKONED_LABEL: offset = s.monotonic_ms - window.onset_monotonic_ms break return SwitchLatencyReport( first_dead_reckoned_offset_ms=offset, frame_period_ms=frame_period_ms, passes=offset is not None and offset <= budget_ms, ) def evaluate_spoof_rejection( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample], spoof_events: Sequence[SpoofRejectedEvent], ) -> SpoofRejectionReport: """AC-2: spoof-rejected events present AND no satellite_anchored re-entry.""" rejected = sum( 1 for ev in spoof_events if _inside_window(window, ev.monotonic_ms) ) inside = _samples_inside_window(window, estimates) re_anchored = sum(1 for s in inside if s.source_label == SATELLITE_ANCHORED_LABEL) return SpoofRejectionReport( spoof_rejected_count=rejected, satellite_anchored_inside_window=re_anchored, passes=rejected >= 1 and re_anchored == 0, ) def evaluate_covariance_monotonic( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample] ) -> CovarianceMonotonicReport: """AC-3: cov_semi_major_m non-decreasing across consecutive emissions.""" inside = _samples_inside_window(window, estimates) first_dec: int | None = None for i in range(1, len(inside)): if inside[i].cov_semi_major_m < inside[i - 1].cov_semi_major_m: first_dec = inside[i].monotonic_ms break return CovarianceMonotonicReport( first_decreasing_at_ms=first_dec, sample_count=len(inside), passes=first_dec is None and len(inside) >= 1, ) def evaluate_honest_accuracy( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample] ) -> HonestAccuracyReport: """AC-4: horiz_accuracy ≥ HONEST_ACCURACY_RATIO × cov_semi_major_m.""" inside = _samples_inside_window(window, estimates) violations = sum( 1 for s in inside if s.horiz_accuracy < HONEST_ACCURACY_RATIO * s.cov_semi_major_m ) return HonestAccuracyReport( violation_count=violations, sample_count=len(inside), passes=violations == 0 and len(inside) >= 1, ) def evaluate_statustext_rate( window: BlackoutWindow, statustexts: Sequence[StatustextSample] ) -> StatustextRateReport: """AC-5: VISUAL_BLACKOUT_IMU_ONLY rate ∈ [1, 2] Hz.""" inside = [ st for st in statustexts if STATUSTEXT_IMU_ONLY in st.text and _inside_window(window, st.monotonic_ms) ] duration_s = window.duration_s if duration_s <= 0 or not inside: return StatustextRateReport(observed_hz=None, count=len(inside), passes=False) rate = len(inside) / duration_s return StatustextRateReport( observed_hz=rate, count=len(inside), passes=STATUSTEXT_RATE_MIN_HZ <= rate <= STATUSTEXT_RATE_MAX_HZ, ) def _first_cov_crossing_ms( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample], threshold_m: float, ) -> int | None: for s in _samples_inside_window(window, estimates): if s.cov_semi_major_m >= threshold_m: return s.monotonic_ms return None def evaluate_escalation( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample], statustexts: Sequence[StatustextSample], *, is_35s_window: bool, ) -> EscalationReport: """AC-6 + AC-7: applies only to the 35 s sub-case. For non-35 s windows the report is vacuously passing — those windows are not expected to cross either escalation threshold and any incidental crossing is treated as informational only. """ cov2d_at = _first_cov_crossing_ms(window, estimates, ESCALATION_COV_2D_M) cov500_at = _first_cov_crossing_ms(window, estimates, ESCALATION_COV_FAILSAFE_M) duration_breach_at: int | None = None if window.duration_s >= ESCALATION_DURATION_FAILSAFE_S: duration_breach_at = ( window.onset_monotonic_ms + int(ESCALATION_DURATION_FAILSAFE_S * 1000) ) failsafe_trigger_at: int | None = None if cov500_at is not None and duration_breach_at is not None: failsafe_trigger_at = min(cov500_at, duration_breach_at) else: failsafe_trigger_at = cov500_at if cov500_at is not None else duration_breach_at if not is_35s_window: return EscalationReport( cov2d_crossed=cov2d_at is not None, cov2d_crossed_at_ms=cov2d_at, fix_type_degraded=True, cov500_or_30s_crossed=failsafe_trigger_at is not None, cov500_or_30s_crossed_at_ms=failsafe_trigger_at, horiz_accuracy_999=True, failsafe_statustext_offset_ms=None, failsafe_statustext_in_time=True, passes_ac6=True, passes_ac7=True, ) # AC-6: any sample at/after cov2d_at must have fix_type ≤ ESCALATION_FIX_TYPE_2D. fix_degraded = True if cov2d_at is not None: post = [s for s in _samples_inside_window(window, estimates) if s.monotonic_ms >= cov2d_at] if post and any(s.fix_type > ESCALATION_FIX_TYPE_2D for s in post): fix_degraded = False passes_ac6 = cov2d_at is None or fix_degraded # AC-7: post-trigger samples must have horiz_accuracy == 999 AND # VISUAL_BLACKOUT_FAILSAFE STATUSTEXT must arrive within ≤500 ms of trigger. horiz_999 = True failsafe_offset: int | None = None failsafe_in_time = True if failsafe_trigger_at is not None: post = [s for s in _samples_inside_window(window, estimates) if s.monotonic_ms >= failsafe_trigger_at] if post and any(s.horiz_accuracy != HORIZ_ACCURACY_FAILSAFE for s in post): horiz_999 = False for st in statustexts: if STATUSTEXT_FAILSAFE not in st.text: continue if st.monotonic_ms < failsafe_trigger_at: continue offset = st.monotonic_ms - failsafe_trigger_at if failsafe_offset is None or offset < failsafe_offset: failsafe_offset = offset failsafe_in_time = ( failsafe_offset is not None and failsafe_offset <= ESCALATION_LATENCY_MS ) passes_ac7 = failsafe_trigger_at is None or (horiz_999 and failsafe_in_time) return EscalationReport( cov2d_crossed=cov2d_at is not None, cov2d_crossed_at_ms=cov2d_at, fix_type_degraded=fix_degraded, cov500_or_30s_crossed=failsafe_trigger_at is not None, cov500_or_30s_crossed_at_ms=failsafe_trigger_at, horiz_accuracy_999=horiz_999, failsafe_statustext_offset_ms=failsafe_offset, failsafe_statustext_in_time=failsafe_in_time, passes_ac6=passes_ac6, passes_ac7=passes_ac7, ) def evaluate_recovery_gate( window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample], gps_health: Sequence[GpsHealthSample], consistency_checks: Sequence[ConsistencyCheckEvent], ) -> RecoveryGateReport: """AC-8: recovery only after ≥10 s healthy/non-spoofed FC GPS AND a consistency check pass.""" # First post-window satellite_anchored sample marks the (claimed) recovery moment. recovery_at: int | None = None for s in estimates: if ( s.monotonic_ms > window.end_monotonic_ms and s.source_label == SATELLITE_ANCHORED_LABEL ): recovery_at = s.monotonic_ms break if recovery_at is None: # No recovery attempted — vacuously passing for this gate; the # caller can still flag it via window-level coverage. return RecoveryGateReport( recovery_at_ms=None, stable_period_s=None, consistency_check_passed=False, passes=True, ) # (a) Continuous healthy/non-spoofed FC GPS for ≥RECOVERY_STABLE_S BEFORE recovery_at. cutoff_ms = recovery_at - int(RECOVERY_STABLE_S * 1000) relevant = [ h for h in gps_health if window.end_monotonic_ms <= h.monotonic_ms <= recovery_at ] stable = all(h.healthy and not h.spoofed for h in relevant) and len(relevant) >= 1 earliest_relevant = relevant[0].monotonic_ms if relevant else recovery_at stable_period_s = (recovery_at - earliest_relevant) / 1000.0 has_enough_window = earliest_relevant <= cutoff_ms # (b) Consistency check pass occurred between window-end and recovery_at. consistency_passed = any( c.passed and window.end_monotonic_ms <= c.monotonic_ms <= recovery_at for c in consistency_checks ) return RecoveryGateReport( recovery_at_ms=recovery_at, stable_period_s=stable_period_s, consistency_check_passed=consistency_passed, passes=stable and has_enough_window and consistency_passed, ) def evaluate( window: BlackoutWindow, *, estimates: Sequence[OutboundEstimateSample], statustexts: Sequence[StatustextSample], spoof_events: Sequence[SpoofRejectedEvent], gps_health: Sequence[GpsHealthSample], consistency_checks: Sequence[ConsistencyCheckEvent], frame_period_ms: int, is_35s_window: bool, ) -> BlackoutSpoofReport: """Run every AC-1..AC-8 check for a single window.""" return BlackoutSpoofReport( window=window, switch_latency=evaluate_switch_latency(window, estimates, frame_period_ms), spoof_rejection=evaluate_spoof_rejection(window, estimates, spoof_events), covariance_monotonic=evaluate_covariance_monotonic(window, estimates), honest_accuracy=evaluate_honest_accuracy(window, estimates), statustext_rate=evaluate_statustext_rate(window, statustexts), escalation=evaluate_escalation( window, estimates, statustexts, is_35s_window=is_35s_window ), recovery_gate=evaluate_recovery_gate( window, estimates, gps_health, consistency_checks ), ) def write_csv_evidence(out_path: Path, report: BlackoutSpoofReport) -> Path: """Write FT-N-04 aggregate evidence — one row of per-AC summary.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "window_duration_s", "ac1_switch_latency_ms", "ac1_passes", "ac2_spoof_rejected_count", "ac2_re_anchored_count", "ac2_passes", "ac3_first_decreasing_at_ms", "ac3_passes", "ac4_violation_count", "ac4_passes", "ac5_observed_hz", "ac5_passes", "ac6_cov2d_at_ms", "ac6_passes", "ac7_failsafe_trigger_at_ms", "ac7_passes", "ac8_recovery_at_ms", "ac8_passes", "passes", ] ) r = report writer.writerow( [ f"{r.window.duration_s:.3f}", "" if r.switch_latency.first_dead_reckoned_offset_ms is None else r.switch_latency.first_dead_reckoned_offset_ms, "true" if r.switch_latency.passes else "false", r.spoof_rejection.spoof_rejected_count, r.spoof_rejection.satellite_anchored_inside_window, "true" if r.spoof_rejection.passes else "false", "" if r.covariance_monotonic.first_decreasing_at_ms is None else r.covariance_monotonic.first_decreasing_at_ms, "true" if r.covariance_monotonic.passes else "false", r.honest_accuracy.violation_count, "true" if r.honest_accuracy.passes else "false", "" if r.statustext_rate.observed_hz is None else f"{r.statustext_rate.observed_hz:.3f}", "true" if r.statustext_rate.passes else "false", "" if r.escalation.cov2d_crossed_at_ms is None else r.escalation.cov2d_crossed_at_ms, "true" if r.escalation.passes_ac6 else "false", "" if r.escalation.cov500_or_30s_crossed_at_ms is None else r.escalation.cov500_or_30s_crossed_at_ms, "true" if r.escalation.passes_ac7 else "false", "" if r.recovery_gate.recovery_at_ms is None else r.recovery_gate.recovery_at_ms, "true" if r.recovery_gate.passes else "false", "true" if r.passes else "false", ] ) return out_path