Files
gps-denied-onboard/e2e/runner/helpers/imu_fallback_drift_evaluator.py
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

228 lines
7.5 KiB
Python

"""IMU-only fallback drift evaluator for NFT-RES-01 (AZ-432 / AC-3.5 + AC-NEW-7).
A pure-vision-blackout (no spoof) lasting 30 s is injected by
``fixtures/injectors/blackout_spoof.py --no-spoof``. The SUT must
fall back to IMU-only dead reckoning. AC-3.5 + AC-NEW-7 prescribe two
drift budgets at the end of the blackout, depending on whether the
CombinedImuFactor (PreintegratedCombinedMeasurements) is active:
* sub-case (a) — no good IMU → ``drift ≤ NO_IMU_BUDGET_M`` (100 m).
* sub-case (b) — CombinedImuFactor active (SUT default config) →
``drift ≤ GOOD_IMU_BUDGET_M`` (50 m).
Drift is the Vincenty distance between the SUT's last estimate at
blackout end and the ground-truth position at the same timestamp.
The scenario test owns the orchestration (window injection,
sub-case selection, fixture loading). This module owns the pure
arithmetic + CSV evidence.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol; consumes only typed samples that
the scenario adapter projects out of the boundary observers.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
from .geo import distance_m
# AC-2 / AC-3
NO_IMU_BUDGET_M = 100.0
GOOD_IMU_BUDGET_M = 50.0
# AC-1 — accept windows within ±2 s of the nominal 30 s.
WINDOW_NOMINAL_S = 30.0
WINDOW_TOLERANCE_S = 2.0
SUBCASE_NO_IMU = "no_imu"
SUBCASE_GOOD_IMU = "good_imu_combined_factor"
ALLOWED_SUBCASES = (SUBCASE_NO_IMU, SUBCASE_GOOD_IMU)
@dataclass(frozen=True)
class PositionSample:
"""One WGS84 sample tagged with a monotonic-ms timestamp.
Used for both the SUT's outbound estimate stream and the ground-truth
track. Both streams must share the same monotonic clock so the
scenario can pick the "at blackout end" sample by interpolation /
nearest-neighbour lookup.
"""
monotonic_ms: int
lat_deg: float
lon_deg: float
@dataclass(frozen=True)
class BlackoutWindow:
"""The injector-emitted window the evaluator is bound to."""
onset_monotonic_ms: int
end_monotonic_ms: int
@property
def duration_s(self) -> float:
return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0
@property
def window_in_spec(self) -> bool:
"""AC-1: window duration must be within ±2 s of nominal 30 s."""
return abs(self.duration_s - WINDOW_NOMINAL_S) <= WINDOW_TOLERANCE_S
@dataclass(frozen=True)
class SubCaseReport:
"""Drift result for a single sub-case (no_imu / good_imu)."""
subcase: str
drift_m: float | None
budget_m: float
estimate_at_end: PositionSample | None
gt_at_end: PositionSample | None
@property
def passes(self) -> bool:
return self.drift_m is not None and self.drift_m <= self.budget_m
@dataclass(frozen=True)
class ImuFallbackReport:
"""Aggregate NFT-RES-01 result for one parameterization."""
window: BlackoutWindow
sub_cases: tuple[SubCaseReport, ...]
@property
def passes_window(self) -> bool:
return self.window.window_in_spec
@property
def passes(self) -> bool:
return self.passes_window and all(s.passes for s in self.sub_cases)
def by_subcase(self, subcase: str) -> SubCaseReport:
for s in self.sub_cases:
if s.subcase == subcase:
return s
raise KeyError(f"sub-case {subcase!r} not present in report")
def _pick_at_or_before(
samples: Sequence[PositionSample], t_ms: int
) -> PositionSample | None:
"""Return the latest sample with ``monotonic_ms ≤ t_ms`` (None if none qualify).
Tests against the closest sample on the "left" of the boundary —
drift evaluation must NOT extrapolate past the captured window.
"""
chosen: PositionSample | None = None
for s in samples:
if s.monotonic_ms <= t_ms:
if chosen is None or s.monotonic_ms > chosen.monotonic_ms:
chosen = s
return chosen
def evaluate_subcase(
window: BlackoutWindow,
estimates: Sequence[PositionSample],
ground_truth: Sequence[PositionSample],
*,
subcase: str,
budget_m: float | None = None,
) -> SubCaseReport:
"""Compute drift for one sub-case.
`subcase` selects the budget when `budget_m` is omitted: 100 m for
``no_imu``, 50 m for ``good_imu_combined_factor``. Unknown sub-case
names raise ``ValueError`` so a typo at the call site fails loud
instead of silently relaxing the budget.
"""
if subcase not in ALLOWED_SUBCASES:
raise ValueError(
f"subcase must be one of {ALLOWED_SUBCASES}; got {subcase!r}"
)
if budget_m is None:
budget_m = (
NO_IMU_BUDGET_M if subcase == SUBCASE_NO_IMU else GOOD_IMU_BUDGET_M
)
estimate_end = _pick_at_or_before(estimates, window.end_monotonic_ms)
gt_end = _pick_at_or_before(ground_truth, window.end_monotonic_ms)
drift: float | None
if estimate_end is None or gt_end is None:
drift = None
else:
drift = distance_m(
estimate_end.lat_deg,
estimate_end.lon_deg,
gt_end.lat_deg,
gt_end.lon_deg,
)
return SubCaseReport(
subcase=subcase,
drift_m=drift,
budget_m=budget_m,
estimate_at_end=estimate_end,
gt_at_end=gt_end,
)
def evaluate(
window: BlackoutWindow,
*,
sub_cases: Iterable[tuple[str, Sequence[PositionSample], Sequence[PositionSample]]],
) -> ImuFallbackReport:
"""Compute the aggregate report across multiple sub-cases.
Each tuple is ``(subcase_name, estimates, ground_truth)``. The
evaluator does not require both sub-cases to be present — a scenario
that can only exercise one path still gets a partial report whose
``passes`` is False (because the missing sub-case has no drift).
"""
reports: list[SubCaseReport] = []
for subcase, estimates, ground_truth in sub_cases:
reports.append(evaluate_subcase(window, estimates, ground_truth, subcase=subcase))
return ImuFallbackReport(window=window, sub_cases=tuple(reports))
def write_csv_evidence(out_path: Path, report: ImuFallbackReport) -> Path:
"""Aggregate-summary CSV (one row per sub-case)."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"subcase",
"window_duration_s",
"window_in_spec",
"drift_m",
"budget_m",
"estimate_end_lat",
"estimate_end_lon",
"gt_end_lat",
"gt_end_lon",
"passes",
]
)
for sub in report.sub_cases:
writer.writerow(
[
sub.subcase,
f"{report.window.duration_s:.3f}",
"true" if report.passes_window else "false",
"" if sub.drift_m is None else f"{sub.drift_m:.3f}",
f"{sub.budget_m:.3f}",
"" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lat_deg:.7f}",
"" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lon_deg:.7f}",
"" if sub.gt_at_end is None else f"{sub.gt_at_end.lat_deg:.7f}",
"" if sub.gt_at_end is None else f"{sub.gt_at_end.lon_deg:.7f}",
"true" if sub.passes else "false",
]
)
return out_path