mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:01:13 +00:00
330893be5c
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit tests + directory-layout registration. * AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7); two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m). * AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume ≤30s + first-emission accuracy ≤100m. * AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4); iteration-count + master-seed determinism + envelope ratio ≥0.95. Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix. * AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8); AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT ≤500ms) + AC-ORDER (strict ordering). Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low). F5 documents intentional threshold duplication with blackout_spoof evaluator (prevents contract drift between FT-N-04 and NFT-RES-04). Co-authored-by: Cursor <cursoragent@cursor.com>
325 lines
10 KiB
Python
325 lines
10 KiB
Python
"""Escalation-ladder evaluator for NFT-RES-04 (AZ-435 / AC-NEW-8 escalation order).
|
|
|
|
FT-N-04 already proves the 35 s blackout-with-spoof window's per-AC
|
|
thresholds (see ``blackout_spoof_evaluator``). NFT-RES-04 is the
|
|
*resilience-tier* scenario: it asserts the **full ladder fires in
|
|
observable order** within tight latency budgets:
|
|
|
|
* AC-1 — when the SUT's reported 95 % covariance crosses
|
|
``COV_2D_THRESHOLD_M`` (100 m), MAVLink fix-quality degrades to
|
|
≤``FIX_TYPE_2D`` within ≤``ESCALATION_LATENCY_MS`` (500 ms) of the
|
|
crossing.
|
|
* AC-2 — when covariance crosses ``COV_FAILSAFE_THRESHOLD_M`` (500 m)
|
|
OR blackout duration exceeds ``DURATION_FAILSAFE_S`` (30 s), the
|
|
outbound ``horiz_accuracy`` becomes ``HORIZ_ACCURACY_FAILSAFE``
|
|
(999.0) AND a ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT is emitted
|
|
within ≤``ESCALATION_LATENCY_MS`` of the trigger.
|
|
* AC-ORDER — the AC-1 fix-degrade crossing must precede the AC-2
|
|
failsafe trigger in observed time. A later-than-AC-2 cov-2d crossing
|
|
is a strict-monotonicity bug because the SUT cannot un-cross 100 m
|
|
on its way past 500 m.
|
|
|
|
This evaluator deliberately re-defines the thresholds locally rather
|
|
than importing them from ``blackout_spoof_evaluator`` so a future
|
|
contract drift in either evaluator does not silently propagate.
|
|
|
|
Public-boundary discipline: does NOT import any
|
|
``src/gps_denied_onboard`` symbol.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Sequence
|
|
|
|
COV_2D_THRESHOLD_M = 100.0
|
|
COV_FAILSAFE_THRESHOLD_M = 500.0
|
|
DURATION_FAILSAFE_S = 30.0
|
|
FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D
|
|
HORIZ_ACCURACY_FAILSAFE = 999.0
|
|
STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE"
|
|
ESCALATION_LATENCY_MS = 500
|
|
EXPECTED_WINDOW_S = 35.0
|
|
WINDOW_TOLERANCE_S = 2.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BlackoutWindow:
|
|
"""The injector-emitted window the ladder is evaluated over."""
|
|
|
|
onset_monotonic_ms: int
|
|
end_monotonic_ms: int
|
|
|
|
@property
|
|
def duration_s(self) -> float:
|
|
return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0
|
|
|
|
@property
|
|
def is_35s(self) -> bool:
|
|
return abs(self.duration_s - EXPECTED_WINDOW_S) <= WINDOW_TOLERANCE_S
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EstimateSample:
|
|
"""One outbound estimate at a monotonic-ms timestamp."""
|
|
|
|
monotonic_ms: int
|
|
cov_semi_major_m: float
|
|
horiz_accuracy: float
|
|
fix_type: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class StatustextSample:
|
|
monotonic_ms: int
|
|
text: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FixDegradeReport:
|
|
"""AC-1: cov-2d crossing → fix-type degrade within ≤500 ms."""
|
|
|
|
cov2d_crossed_at_ms: int | None
|
|
fix_degraded_at_ms: int | None
|
|
latency_ms: int | None
|
|
budget_ms: int
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return (
|
|
self.cov2d_crossed_at_ms is not None
|
|
and self.fix_degraded_at_ms is not None
|
|
and self.latency_ms is not None
|
|
and self.latency_ms <= self.budget_ms
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FailsafeReport:
|
|
"""AC-2: cov-500 OR 30 s elapsed → 999.0 AND STATUSTEXT within ≤500 ms."""
|
|
|
|
failsafe_trigger_at_ms: int | None
|
|
horiz_999_at_ms: int | None
|
|
horiz_999_latency_ms: int | None
|
|
statustext_at_ms: int | None
|
|
statustext_latency_ms: int | None
|
|
budget_ms: int
|
|
|
|
@property
|
|
def horiz_999_passes(self) -> bool:
|
|
return (
|
|
self.failsafe_trigger_at_ms is not None
|
|
and self.horiz_999_latency_ms is not None
|
|
and self.horiz_999_latency_ms <= self.budget_ms
|
|
)
|
|
|
|
@property
|
|
def statustext_passes(self) -> bool:
|
|
return (
|
|
self.failsafe_trigger_at_ms is not None
|
|
and self.statustext_latency_ms is not None
|
|
and self.statustext_latency_ms <= self.budget_ms
|
|
)
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.horiz_999_passes and self.statustext_passes
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OrderingReport:
|
|
"""AC-ORDER: cov-2d crossing must precede the failsafe trigger."""
|
|
|
|
cov2d_at_ms: int | None
|
|
failsafe_trigger_at_ms: int | None
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
if self.cov2d_at_ms is None or self.failsafe_trigger_at_ms is None:
|
|
# Cannot verify ordering when either pole is missing —
|
|
# the per-AC pass/fail covers that case.
|
|
return False
|
|
return self.cov2d_at_ms < self.failsafe_trigger_at_ms
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EscalationLadderReport:
|
|
"""Aggregate NFT-RES-04 verdict for one 35 s window."""
|
|
|
|
window: BlackoutWindow
|
|
fix_degrade: FixDegradeReport
|
|
failsafe: FailsafeReport
|
|
ordering: OrderingReport
|
|
|
|
@property
|
|
def passes_window(self) -> bool:
|
|
return self.window.is_35s
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return (
|
|
self.passes_window
|
|
and self.fix_degrade.passes
|
|
and self.failsafe.passes
|
|
and self.ordering.passes
|
|
)
|
|
|
|
|
|
def _samples_in_window(
|
|
window: BlackoutWindow, samples: Sequence[EstimateSample]
|
|
) -> list[EstimateSample]:
|
|
return [
|
|
s
|
|
for s in samples
|
|
if window.onset_monotonic_ms <= s.monotonic_ms <= window.end_monotonic_ms
|
|
]
|
|
|
|
|
|
def _first_cov_crossing(
|
|
window: BlackoutWindow,
|
|
samples: Sequence[EstimateSample],
|
|
threshold_m: float,
|
|
) -> int | None:
|
|
for s in _samples_in_window(window, samples):
|
|
if s.cov_semi_major_m >= threshold_m:
|
|
return s.monotonic_ms
|
|
return None
|
|
|
|
|
|
def _first_fix_degrade(
|
|
samples: Sequence[EstimateSample], from_ms: int
|
|
) -> int | None:
|
|
for s in samples:
|
|
if s.monotonic_ms < from_ms:
|
|
continue
|
|
if s.fix_type <= FIX_TYPE_2D and s.fix_type >= 0:
|
|
return s.monotonic_ms
|
|
return None
|
|
|
|
|
|
def _first_horiz_999(samples: Sequence[EstimateSample], from_ms: int) -> int | None:
|
|
for s in samples:
|
|
if s.monotonic_ms < from_ms:
|
|
continue
|
|
if s.horiz_accuracy == HORIZ_ACCURACY_FAILSAFE:
|
|
return s.monotonic_ms
|
|
return None
|
|
|
|
|
|
def _first_failsafe_statustext(
|
|
statustexts: Sequence[StatustextSample], from_ms: int
|
|
) -> int | None:
|
|
for st in statustexts:
|
|
if st.monotonic_ms < from_ms:
|
|
continue
|
|
if STATUSTEXT_FAILSAFE in st.text:
|
|
return st.monotonic_ms
|
|
return None
|
|
|
|
|
|
def evaluate(
|
|
window: BlackoutWindow,
|
|
*,
|
|
estimates: Sequence[EstimateSample],
|
|
statustexts: Sequence[StatustextSample],
|
|
budget_ms: int = ESCALATION_LATENCY_MS,
|
|
) -> EscalationLadderReport:
|
|
"""Compute AC-1 + AC-2 + AC-ORDER verdicts for one 35 s window."""
|
|
cov2d_at = _first_cov_crossing(window, estimates, COV_2D_THRESHOLD_M)
|
|
fix_degraded_at = (
|
|
_first_fix_degrade(estimates, cov2d_at) if cov2d_at is not None else None
|
|
)
|
|
fix_latency: int | None = None
|
|
if cov2d_at is not None and fix_degraded_at is not None:
|
|
fix_latency = fix_degraded_at - cov2d_at
|
|
fix_report = FixDegradeReport(
|
|
cov2d_crossed_at_ms=cov2d_at,
|
|
fix_degraded_at_ms=fix_degraded_at,
|
|
latency_ms=fix_latency,
|
|
budget_ms=budget_ms,
|
|
)
|
|
|
|
cov500_at = _first_cov_crossing(window, estimates, COV_FAILSAFE_THRESHOLD_M)
|
|
duration_trip_at: int | None = None
|
|
if window.duration_s >= DURATION_FAILSAFE_S:
|
|
duration_trip_at = (
|
|
window.onset_monotonic_ms + int(DURATION_FAILSAFE_S * 1000)
|
|
)
|
|
failsafe_trigger_at: int | None
|
|
candidates = [t for t in (cov500_at, duration_trip_at) if t is not None]
|
|
failsafe_trigger_at = min(candidates) if candidates else None
|
|
|
|
horiz_999_at: int | None = None
|
|
horiz_latency: int | None = None
|
|
statustext_at: int | None = None
|
|
statustext_latency: int | None = None
|
|
if failsafe_trigger_at is not None:
|
|
horiz_999_at = _first_horiz_999(estimates, failsafe_trigger_at)
|
|
if horiz_999_at is not None:
|
|
horiz_latency = horiz_999_at - failsafe_trigger_at
|
|
statustext_at = _first_failsafe_statustext(statustexts, failsafe_trigger_at)
|
|
if statustext_at is not None:
|
|
statustext_latency = statustext_at - failsafe_trigger_at
|
|
|
|
failsafe_report = FailsafeReport(
|
|
failsafe_trigger_at_ms=failsafe_trigger_at,
|
|
horiz_999_at_ms=horiz_999_at,
|
|
horiz_999_latency_ms=horiz_latency,
|
|
statustext_at_ms=statustext_at,
|
|
statustext_latency_ms=statustext_latency,
|
|
budget_ms=budget_ms,
|
|
)
|
|
|
|
ordering = OrderingReport(
|
|
cov2d_at_ms=cov2d_at, failsafe_trigger_at_ms=failsafe_trigger_at
|
|
)
|
|
|
|
return EscalationLadderReport(
|
|
window=window,
|
|
fix_degrade=fix_report,
|
|
failsafe=failsafe_report,
|
|
ordering=ordering,
|
|
)
|
|
|
|
|
|
def write_csv_evidence(out_path: Path, report: EscalationLadderReport) -> Path:
|
|
"""Aggregate-summary CSV (one row per window)."""
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with out_path.open("w", newline="") as fh:
|
|
writer = csv.writer(fh)
|
|
writer.writerow(
|
|
[
|
|
"window_duration_s",
|
|
"window_is_35s",
|
|
"ac1_cov2d_at_ms",
|
|
"ac1_fix_degraded_at_ms",
|
|
"ac1_latency_ms",
|
|
"ac1_passes",
|
|
"ac2_failsafe_trigger_at_ms",
|
|
"ac2_horiz_999_latency_ms",
|
|
"ac2_statustext_latency_ms",
|
|
"ac2_passes",
|
|
"ac_order_passes",
|
|
"passes",
|
|
]
|
|
)
|
|
writer.writerow(
|
|
[
|
|
f"{report.window.duration_s:.3f}",
|
|
"true" if report.window.is_35s else "false",
|
|
"" if report.fix_degrade.cov2d_crossed_at_ms is None else report.fix_degrade.cov2d_crossed_at_ms,
|
|
"" if report.fix_degrade.fix_degraded_at_ms is None else report.fix_degrade.fix_degraded_at_ms,
|
|
"" if report.fix_degrade.latency_ms is None else report.fix_degrade.latency_ms,
|
|
"true" if report.fix_degrade.passes else "false",
|
|
"" if report.failsafe.failsafe_trigger_at_ms is None else report.failsafe.failsafe_trigger_at_ms,
|
|
"" if report.failsafe.horiz_999_latency_ms is None else report.failsafe.horiz_999_latency_ms,
|
|
"" if report.failsafe.statustext_latency_ms is None else report.failsafe.statustext_latency_ms,
|
|
"true" if report.failsafe.passes else "false",
|
|
"true" if report.ordering.passes else "false",
|
|
"true" if report.passes else "false",
|
|
]
|
|
)
|
|
return out_path
|