Files
gps-denied-onboard/e2e/runner/helpers/escalation_ladder_evaluator.py
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

325 lines
10 KiB
Python

"""Escalation-ladder evaluator for NFT-RES-04 (AZ-435 / AC-NEW-8 escalation order).
FT-N-04 already proves the 35 s blackout-with-spoof window's per-AC
thresholds (see ``blackout_spoof_evaluator``). NFT-RES-04 is the
*resilience-tier* scenario: it asserts the **full ladder fires in
observable order** within tight latency budgets:
* AC-1 — when the SUT's reported 95 % covariance crosses
``COV_2D_THRESHOLD_M`` (100 m), MAVLink fix-quality degrades to
≤``FIX_TYPE_2D`` within ≤``ESCALATION_LATENCY_MS`` (500 ms) of the
crossing.
* AC-2 — when covariance crosses ``COV_FAILSAFE_THRESHOLD_M`` (500 m)
OR blackout duration exceeds ``DURATION_FAILSAFE_S`` (30 s), the
outbound ``horiz_accuracy`` becomes ``HORIZ_ACCURACY_FAILSAFE``
(999.0) AND a ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT is emitted
within ≤``ESCALATION_LATENCY_MS`` of the trigger.
* AC-ORDER — the AC-1 fix-degrade crossing must precede the AC-2
failsafe trigger in observed time. A later-than-AC-2 cov-2d crossing
is a strict-monotonicity bug because the SUT cannot un-cross 100 m
on its way past 500 m.
This evaluator deliberately re-defines the thresholds locally rather
than importing them from ``blackout_spoof_evaluator`` so a future
contract drift in either evaluator does not silently propagate.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
COV_2D_THRESHOLD_M = 100.0
COV_FAILSAFE_THRESHOLD_M = 500.0
DURATION_FAILSAFE_S = 30.0
FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D
HORIZ_ACCURACY_FAILSAFE = 999.0
STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE"
ESCALATION_LATENCY_MS = 500
EXPECTED_WINDOW_S = 35.0
WINDOW_TOLERANCE_S = 2.0
@dataclass(frozen=True)
class BlackoutWindow:
"""The injector-emitted window the ladder is evaluated over."""
onset_monotonic_ms: int
end_monotonic_ms: int
@property
def duration_s(self) -> float:
return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0
@property
def is_35s(self) -> bool:
return abs(self.duration_s - EXPECTED_WINDOW_S) <= WINDOW_TOLERANCE_S
@dataclass(frozen=True)
class EstimateSample:
"""One outbound estimate at a monotonic-ms timestamp."""
monotonic_ms: int
cov_semi_major_m: float
horiz_accuracy: float
fix_type: int
@dataclass(frozen=True)
class StatustextSample:
monotonic_ms: int
text: str
@dataclass(frozen=True)
class FixDegradeReport:
"""AC-1: cov-2d crossing → fix-type degrade within ≤500 ms."""
cov2d_crossed_at_ms: int | None
fix_degraded_at_ms: int | None
latency_ms: int | None
budget_ms: int
@property
def passes(self) -> bool:
return (
self.cov2d_crossed_at_ms is not None
and self.fix_degraded_at_ms is not None
and self.latency_ms is not None
and self.latency_ms <= self.budget_ms
)
@dataclass(frozen=True)
class FailsafeReport:
"""AC-2: cov-500 OR 30 s elapsed → 999.0 AND STATUSTEXT within ≤500 ms."""
failsafe_trigger_at_ms: int | None
horiz_999_at_ms: int | None
horiz_999_latency_ms: int | None
statustext_at_ms: int | None
statustext_latency_ms: int | None
budget_ms: int
@property
def horiz_999_passes(self) -> bool:
return (
self.failsafe_trigger_at_ms is not None
and self.horiz_999_latency_ms is not None
and self.horiz_999_latency_ms <= self.budget_ms
)
@property
def statustext_passes(self) -> bool:
return (
self.failsafe_trigger_at_ms is not None
and self.statustext_latency_ms is not None
and self.statustext_latency_ms <= self.budget_ms
)
@property
def passes(self) -> bool:
return self.horiz_999_passes and self.statustext_passes
@dataclass(frozen=True)
class OrderingReport:
"""AC-ORDER: cov-2d crossing must precede the failsafe trigger."""
cov2d_at_ms: int | None
failsafe_trigger_at_ms: int | None
@property
def passes(self) -> bool:
if self.cov2d_at_ms is None or self.failsafe_trigger_at_ms is None:
# Cannot verify ordering when either pole is missing —
# the per-AC pass/fail covers that case.
return False
return self.cov2d_at_ms < self.failsafe_trigger_at_ms
@dataclass(frozen=True)
class EscalationLadderReport:
"""Aggregate NFT-RES-04 verdict for one 35 s window."""
window: BlackoutWindow
fix_degrade: FixDegradeReport
failsafe: FailsafeReport
ordering: OrderingReport
@property
def passes_window(self) -> bool:
return self.window.is_35s
@property
def passes(self) -> bool:
return (
self.passes_window
and self.fix_degrade.passes
and self.failsafe.passes
and self.ordering.passes
)
def _samples_in_window(
window: BlackoutWindow, samples: Sequence[EstimateSample]
) -> list[EstimateSample]:
return [
s
for s in samples
if window.onset_monotonic_ms <= s.monotonic_ms <= window.end_monotonic_ms
]
def _first_cov_crossing(
window: BlackoutWindow,
samples: Sequence[EstimateSample],
threshold_m: float,
) -> int | None:
for s in _samples_in_window(window, samples):
if s.cov_semi_major_m >= threshold_m:
return s.monotonic_ms
return None
def _first_fix_degrade(
samples: Sequence[EstimateSample], from_ms: int
) -> int | None:
for s in samples:
if s.monotonic_ms < from_ms:
continue
if s.fix_type <= FIX_TYPE_2D and s.fix_type >= 0:
return s.monotonic_ms
return None
def _first_horiz_999(samples: Sequence[EstimateSample], from_ms: int) -> int | None:
for s in samples:
if s.monotonic_ms < from_ms:
continue
if s.horiz_accuracy == HORIZ_ACCURACY_FAILSAFE:
return s.monotonic_ms
return None
def _first_failsafe_statustext(
statustexts: Sequence[StatustextSample], from_ms: int
) -> int | None:
for st in statustexts:
if st.monotonic_ms < from_ms:
continue
if STATUSTEXT_FAILSAFE in st.text:
return st.monotonic_ms
return None
def evaluate(
window: BlackoutWindow,
*,
estimates: Sequence[EstimateSample],
statustexts: Sequence[StatustextSample],
budget_ms: int = ESCALATION_LATENCY_MS,
) -> EscalationLadderReport:
"""Compute AC-1 + AC-2 + AC-ORDER verdicts for one 35 s window."""
cov2d_at = _first_cov_crossing(window, estimates, COV_2D_THRESHOLD_M)
fix_degraded_at = (
_first_fix_degrade(estimates, cov2d_at) if cov2d_at is not None else None
)
fix_latency: int | None = None
if cov2d_at is not None and fix_degraded_at is not None:
fix_latency = fix_degraded_at - cov2d_at
fix_report = FixDegradeReport(
cov2d_crossed_at_ms=cov2d_at,
fix_degraded_at_ms=fix_degraded_at,
latency_ms=fix_latency,
budget_ms=budget_ms,
)
cov500_at = _first_cov_crossing(window, estimates, COV_FAILSAFE_THRESHOLD_M)
duration_trip_at: int | None = None
if window.duration_s >= DURATION_FAILSAFE_S:
duration_trip_at = (
window.onset_monotonic_ms + int(DURATION_FAILSAFE_S * 1000)
)
failsafe_trigger_at: int | None
candidates = [t for t in (cov500_at, duration_trip_at) if t is not None]
failsafe_trigger_at = min(candidates) if candidates else None
horiz_999_at: int | None = None
horiz_latency: int | None = None
statustext_at: int | None = None
statustext_latency: int | None = None
if failsafe_trigger_at is not None:
horiz_999_at = _first_horiz_999(estimates, failsafe_trigger_at)
if horiz_999_at is not None:
horiz_latency = horiz_999_at - failsafe_trigger_at
statustext_at = _first_failsafe_statustext(statustexts, failsafe_trigger_at)
if statustext_at is not None:
statustext_latency = statustext_at - failsafe_trigger_at
failsafe_report = FailsafeReport(
failsafe_trigger_at_ms=failsafe_trigger_at,
horiz_999_at_ms=horiz_999_at,
horiz_999_latency_ms=horiz_latency,
statustext_at_ms=statustext_at,
statustext_latency_ms=statustext_latency,
budget_ms=budget_ms,
)
ordering = OrderingReport(
cov2d_at_ms=cov2d_at, failsafe_trigger_at_ms=failsafe_trigger_at
)
return EscalationLadderReport(
window=window,
fix_degrade=fix_report,
failsafe=failsafe_report,
ordering=ordering,
)
def write_csv_evidence(out_path: Path, report: EscalationLadderReport) -> Path:
"""Aggregate-summary CSV (one row per window)."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"window_duration_s",
"window_is_35s",
"ac1_cov2d_at_ms",
"ac1_fix_degraded_at_ms",
"ac1_latency_ms",
"ac1_passes",
"ac2_failsafe_trigger_at_ms",
"ac2_horiz_999_latency_ms",
"ac2_statustext_latency_ms",
"ac2_passes",
"ac_order_passes",
"passes",
]
)
writer.writerow(
[
f"{report.window.duration_s:.3f}",
"true" if report.window.is_35s else "false",
"" if report.fix_degrade.cov2d_crossed_at_ms is None else report.fix_degrade.cov2d_crossed_at_ms,
"" if report.fix_degrade.fix_degraded_at_ms is None else report.fix_degrade.fix_degraded_at_ms,
"" if report.fix_degrade.latency_ms is None else report.fix_degrade.latency_ms,
"true" if report.fix_degrade.passes else "false",
"" if report.failsafe.failsafe_trigger_at_ms is None else report.failsafe.failsafe_trigger_at_ms,
"" if report.failsafe.horiz_999_latency_ms is None else report.failsafe.horiz_999_latency_ms,
"" if report.failsafe.statustext_latency_ms is None else report.failsafe.statustext_latency_ms,
"true" if report.failsafe.passes else "false",
"true" if report.ordering.passes else "false",
"true" if report.passes else "false",
]
)
return out_path