"""Escalation-ladder evaluator for NFT-RES-04 (AZ-435 / AC-NEW-8 escalation order). FT-N-04 already proves the 35 s blackout-with-spoof window's per-AC thresholds (see ``blackout_spoof_evaluator``). NFT-RES-04 is the *resilience-tier* scenario: it asserts the **full ladder fires in observable order** within tight latency budgets: * AC-1 — when the SUT's reported 95 % covariance crosses ``COV_2D_THRESHOLD_M`` (100 m), MAVLink fix-quality degrades to ≤``FIX_TYPE_2D`` within ≤``ESCALATION_LATENCY_MS`` (500 ms) of the crossing. * AC-2 — when covariance crosses ``COV_FAILSAFE_THRESHOLD_M`` (500 m) OR blackout duration exceeds ``DURATION_FAILSAFE_S`` (30 s), the outbound ``horiz_accuracy`` becomes ``HORIZ_ACCURACY_FAILSAFE`` (999.0) AND a ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT is emitted within ≤``ESCALATION_LATENCY_MS`` of the trigger. * AC-ORDER — the AC-1 fix-degrade crossing must precede the AC-2 failsafe trigger in observed time. A later-than-AC-2 cov-2d crossing is a strict-monotonicity bug because the SUT cannot un-cross 100 m on its way past 500 m. This evaluator deliberately re-defines the thresholds locally rather than importing them from ``blackout_spoof_evaluator`` so a future contract drift in either evaluator does not silently propagate. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Sequence COV_2D_THRESHOLD_M = 100.0 COV_FAILSAFE_THRESHOLD_M = 500.0 DURATION_FAILSAFE_S = 30.0 FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D HORIZ_ACCURACY_FAILSAFE = 999.0 STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE" ESCALATION_LATENCY_MS = 500 EXPECTED_WINDOW_S = 35.0 WINDOW_TOLERANCE_S = 2.0 @dataclass(frozen=True) class BlackoutWindow: """The injector-emitted window the ladder is evaluated over.""" onset_monotonic_ms: int end_monotonic_ms: int @property def duration_s(self) -> float: return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 @property def is_35s(self) -> bool: return abs(self.duration_s - EXPECTED_WINDOW_S) <= WINDOW_TOLERANCE_S @dataclass(frozen=True) class EstimateSample: """One outbound estimate at a monotonic-ms timestamp.""" monotonic_ms: int cov_semi_major_m: float horiz_accuracy: float fix_type: int @dataclass(frozen=True) class StatustextSample: monotonic_ms: int text: str @dataclass(frozen=True) class FixDegradeReport: """AC-1: cov-2d crossing → fix-type degrade within ≤500 ms.""" cov2d_crossed_at_ms: int | None fix_degraded_at_ms: int | None latency_ms: int | None budget_ms: int @property def passes(self) -> bool: return ( self.cov2d_crossed_at_ms is not None and self.fix_degraded_at_ms is not None and self.latency_ms is not None and self.latency_ms <= self.budget_ms ) @dataclass(frozen=True) class FailsafeReport: """AC-2: cov-500 OR 30 s elapsed → 999.0 AND STATUSTEXT within ≤500 ms.""" failsafe_trigger_at_ms: int | None horiz_999_at_ms: int | None horiz_999_latency_ms: int | None statustext_at_ms: int | None statustext_latency_ms: int | None budget_ms: int @property def horiz_999_passes(self) -> bool: return ( self.failsafe_trigger_at_ms is not None and self.horiz_999_latency_ms is not None and self.horiz_999_latency_ms <= self.budget_ms ) @property def statustext_passes(self) -> bool: return ( self.failsafe_trigger_at_ms is not None and self.statustext_latency_ms is not None and self.statustext_latency_ms <= self.budget_ms ) @property def passes(self) -> bool: return self.horiz_999_passes and self.statustext_passes @dataclass(frozen=True) class OrderingReport: """AC-ORDER: cov-2d crossing must precede the failsafe trigger.""" cov2d_at_ms: int | None failsafe_trigger_at_ms: int | None @property def passes(self) -> bool: if self.cov2d_at_ms is None or self.failsafe_trigger_at_ms is None: # Cannot verify ordering when either pole is missing — # the per-AC pass/fail covers that case. return False return self.cov2d_at_ms < self.failsafe_trigger_at_ms @dataclass(frozen=True) class EscalationLadderReport: """Aggregate NFT-RES-04 verdict for one 35 s window.""" window: BlackoutWindow fix_degrade: FixDegradeReport failsafe: FailsafeReport ordering: OrderingReport @property def passes_window(self) -> bool: return self.window.is_35s @property def passes(self) -> bool: return ( self.passes_window and self.fix_degrade.passes and self.failsafe.passes and self.ordering.passes ) def _samples_in_window( window: BlackoutWindow, samples: Sequence[EstimateSample] ) -> list[EstimateSample]: return [ s for s in samples if window.onset_monotonic_ms <= s.monotonic_ms <= window.end_monotonic_ms ] def _first_cov_crossing( window: BlackoutWindow, samples: Sequence[EstimateSample], threshold_m: float, ) -> int | None: for s in _samples_in_window(window, samples): if s.cov_semi_major_m >= threshold_m: return s.monotonic_ms return None def _first_fix_degrade( samples: Sequence[EstimateSample], from_ms: int ) -> int | None: for s in samples: if s.monotonic_ms < from_ms: continue if s.fix_type <= FIX_TYPE_2D and s.fix_type >= 0: return s.monotonic_ms return None def _first_horiz_999(samples: Sequence[EstimateSample], from_ms: int) -> int | None: for s in samples: if s.monotonic_ms < from_ms: continue if s.horiz_accuracy == HORIZ_ACCURACY_FAILSAFE: return s.monotonic_ms return None def _first_failsafe_statustext( statustexts: Sequence[StatustextSample], from_ms: int ) -> int | None: for st in statustexts: if st.monotonic_ms < from_ms: continue if STATUSTEXT_FAILSAFE in st.text: return st.monotonic_ms return None def evaluate( window: BlackoutWindow, *, estimates: Sequence[EstimateSample], statustexts: Sequence[StatustextSample], budget_ms: int = ESCALATION_LATENCY_MS, ) -> EscalationLadderReport: """Compute AC-1 + AC-2 + AC-ORDER verdicts for one 35 s window.""" cov2d_at = _first_cov_crossing(window, estimates, COV_2D_THRESHOLD_M) fix_degraded_at = ( _first_fix_degrade(estimates, cov2d_at) if cov2d_at is not None else None ) fix_latency: int | None = None if cov2d_at is not None and fix_degraded_at is not None: fix_latency = fix_degraded_at - cov2d_at fix_report = FixDegradeReport( cov2d_crossed_at_ms=cov2d_at, fix_degraded_at_ms=fix_degraded_at, latency_ms=fix_latency, budget_ms=budget_ms, ) cov500_at = _first_cov_crossing(window, estimates, COV_FAILSAFE_THRESHOLD_M) duration_trip_at: int | None = None if window.duration_s >= DURATION_FAILSAFE_S: duration_trip_at = ( window.onset_monotonic_ms + int(DURATION_FAILSAFE_S * 1000) ) failsafe_trigger_at: int | None candidates = [t for t in (cov500_at, duration_trip_at) if t is not None] failsafe_trigger_at = min(candidates) if candidates else None horiz_999_at: int | None = None horiz_latency: int | None = None statustext_at: int | None = None statustext_latency: int | None = None if failsafe_trigger_at is not None: horiz_999_at = _first_horiz_999(estimates, failsafe_trigger_at) if horiz_999_at is not None: horiz_latency = horiz_999_at - failsafe_trigger_at statustext_at = _first_failsafe_statustext(statustexts, failsafe_trigger_at) if statustext_at is not None: statustext_latency = statustext_at - failsafe_trigger_at failsafe_report = FailsafeReport( failsafe_trigger_at_ms=failsafe_trigger_at, horiz_999_at_ms=horiz_999_at, horiz_999_latency_ms=horiz_latency, statustext_at_ms=statustext_at, statustext_latency_ms=statustext_latency, budget_ms=budget_ms, ) ordering = OrderingReport( cov2d_at_ms=cov2d_at, failsafe_trigger_at_ms=failsafe_trigger_at ) return EscalationLadderReport( window=window, fix_degrade=fix_report, failsafe=failsafe_report, ordering=ordering, ) def write_csv_evidence(out_path: Path, report: EscalationLadderReport) -> Path: """Aggregate-summary CSV (one row per window).""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "window_duration_s", "window_is_35s", "ac1_cov2d_at_ms", "ac1_fix_degraded_at_ms", "ac1_latency_ms", "ac1_passes", "ac2_failsafe_trigger_at_ms", "ac2_horiz_999_latency_ms", "ac2_statustext_latency_ms", "ac2_passes", "ac_order_passes", "passes", ] ) writer.writerow( [ f"{report.window.duration_s:.3f}", "true" if report.window.is_35s else "false", "" if report.fix_degrade.cov2d_crossed_at_ms is None else report.fix_degrade.cov2d_crossed_at_ms, "" if report.fix_degrade.fix_degraded_at_ms is None else report.fix_degrade.fix_degraded_at_ms, "" if report.fix_degrade.latency_ms is None else report.fix_degrade.latency_ms, "true" if report.fix_degrade.passes else "false", "" if report.failsafe.failsafe_trigger_at_ms is None else report.failsafe.failsafe_trigger_at_ms, "" if report.failsafe.horiz_999_latency_ms is None else report.failsafe.horiz_999_latency_ms, "" if report.failsafe.statustext_latency_ms is None else report.failsafe.statustext_latency_ms, "true" if report.failsafe.passes else "false", "true" if report.ordering.passes else "false", "true" if report.passes else "false", ] ) return out_path