mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 20:41:13 +00:00
330893be5c
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit tests + directory-layout registration. * AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7); two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m). * AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume ≤30s + first-emission accuracy ≤100m. * AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4); iteration-count + master-seed determinism + envelope ratio ≥0.95. Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix. * AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8); AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT ≤500ms) + AC-ORDER (strict ordering). Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low). F5 documents intentional threshold duplication with blackout_spoof evaluator (prevents contract drift between FT-N-04 and NFT-RES-04). Co-authored-by: Cursor <cursoragent@cursor.com>
228 lines
7.5 KiB
Python
228 lines
7.5 KiB
Python
"""IMU-only fallback drift evaluator for NFT-RES-01 (AZ-432 / AC-3.5 + AC-NEW-7).
|
|
|
|
A pure-vision-blackout (no spoof) lasting 30 s is injected by
|
|
``fixtures/injectors/blackout_spoof.py --no-spoof``. The SUT must
|
|
fall back to IMU-only dead reckoning. AC-3.5 + AC-NEW-7 prescribe two
|
|
drift budgets at the end of the blackout, depending on whether the
|
|
CombinedImuFactor (PreintegratedCombinedMeasurements) is active:
|
|
|
|
* sub-case (a) — no good IMU → ``drift ≤ NO_IMU_BUDGET_M`` (100 m).
|
|
* sub-case (b) — CombinedImuFactor active (SUT default config) →
|
|
``drift ≤ GOOD_IMU_BUDGET_M`` (50 m).
|
|
|
|
Drift is the Vincenty distance between the SUT's last estimate at
|
|
blackout end and the ground-truth position at the same timestamp.
|
|
|
|
The scenario test owns the orchestration (window injection,
|
|
sub-case selection, fixture loading). This module owns the pure
|
|
arithmetic + CSV evidence.
|
|
|
|
Public-boundary discipline: does NOT import any
|
|
``src/gps_denied_onboard`` symbol; consumes only typed samples that
|
|
the scenario adapter projects out of the boundary observers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable, Sequence
|
|
|
|
from .geo import distance_m
|
|
|
|
# AC-2 / AC-3
|
|
NO_IMU_BUDGET_M = 100.0
|
|
GOOD_IMU_BUDGET_M = 50.0
|
|
# AC-1 — accept windows within ±2 s of the nominal 30 s.
|
|
WINDOW_NOMINAL_S = 30.0
|
|
WINDOW_TOLERANCE_S = 2.0
|
|
|
|
SUBCASE_NO_IMU = "no_imu"
|
|
SUBCASE_GOOD_IMU = "good_imu_combined_factor"
|
|
ALLOWED_SUBCASES = (SUBCASE_NO_IMU, SUBCASE_GOOD_IMU)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PositionSample:
|
|
"""One WGS84 sample tagged with a monotonic-ms timestamp.
|
|
|
|
Used for both the SUT's outbound estimate stream and the ground-truth
|
|
track. Both streams must share the same monotonic clock so the
|
|
scenario can pick the "at blackout end" sample by interpolation /
|
|
nearest-neighbour lookup.
|
|
"""
|
|
|
|
monotonic_ms: int
|
|
lat_deg: float
|
|
lon_deg: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BlackoutWindow:
|
|
"""The injector-emitted window the evaluator is bound to."""
|
|
|
|
onset_monotonic_ms: int
|
|
end_monotonic_ms: int
|
|
|
|
@property
|
|
def duration_s(self) -> float:
|
|
return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0
|
|
|
|
@property
|
|
def window_in_spec(self) -> bool:
|
|
"""AC-1: window duration must be within ±2 s of nominal 30 s."""
|
|
return abs(self.duration_s - WINDOW_NOMINAL_S) <= WINDOW_TOLERANCE_S
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SubCaseReport:
|
|
"""Drift result for a single sub-case (no_imu / good_imu)."""
|
|
|
|
subcase: str
|
|
drift_m: float | None
|
|
budget_m: float
|
|
estimate_at_end: PositionSample | None
|
|
gt_at_end: PositionSample | None
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.drift_m is not None and self.drift_m <= self.budget_m
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImuFallbackReport:
|
|
"""Aggregate NFT-RES-01 result for one parameterization."""
|
|
|
|
window: BlackoutWindow
|
|
sub_cases: tuple[SubCaseReport, ...]
|
|
|
|
@property
|
|
def passes_window(self) -> bool:
|
|
return self.window.window_in_spec
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.passes_window and all(s.passes for s in self.sub_cases)
|
|
|
|
def by_subcase(self, subcase: str) -> SubCaseReport:
|
|
for s in self.sub_cases:
|
|
if s.subcase == subcase:
|
|
return s
|
|
raise KeyError(f"sub-case {subcase!r} not present in report")
|
|
|
|
|
|
def _pick_at_or_before(
|
|
samples: Sequence[PositionSample], t_ms: int
|
|
) -> PositionSample | None:
|
|
"""Return the latest sample with ``monotonic_ms ≤ t_ms`` (None if none qualify).
|
|
|
|
Tests against the closest sample on the "left" of the boundary —
|
|
drift evaluation must NOT extrapolate past the captured window.
|
|
"""
|
|
chosen: PositionSample | None = None
|
|
for s in samples:
|
|
if s.monotonic_ms <= t_ms:
|
|
if chosen is None or s.monotonic_ms > chosen.monotonic_ms:
|
|
chosen = s
|
|
return chosen
|
|
|
|
|
|
def evaluate_subcase(
|
|
window: BlackoutWindow,
|
|
estimates: Sequence[PositionSample],
|
|
ground_truth: Sequence[PositionSample],
|
|
*,
|
|
subcase: str,
|
|
budget_m: float | None = None,
|
|
) -> SubCaseReport:
|
|
"""Compute drift for one sub-case.
|
|
|
|
`subcase` selects the budget when `budget_m` is omitted: 100 m for
|
|
``no_imu``, 50 m for ``good_imu_combined_factor``. Unknown sub-case
|
|
names raise ``ValueError`` so a typo at the call site fails loud
|
|
instead of silently relaxing the budget.
|
|
"""
|
|
if subcase not in ALLOWED_SUBCASES:
|
|
raise ValueError(
|
|
f"subcase must be one of {ALLOWED_SUBCASES}; got {subcase!r}"
|
|
)
|
|
if budget_m is None:
|
|
budget_m = (
|
|
NO_IMU_BUDGET_M if subcase == SUBCASE_NO_IMU else GOOD_IMU_BUDGET_M
|
|
)
|
|
estimate_end = _pick_at_or_before(estimates, window.end_monotonic_ms)
|
|
gt_end = _pick_at_or_before(ground_truth, window.end_monotonic_ms)
|
|
drift: float | None
|
|
if estimate_end is None or gt_end is None:
|
|
drift = None
|
|
else:
|
|
drift = distance_m(
|
|
estimate_end.lat_deg,
|
|
estimate_end.lon_deg,
|
|
gt_end.lat_deg,
|
|
gt_end.lon_deg,
|
|
)
|
|
return SubCaseReport(
|
|
subcase=subcase,
|
|
drift_m=drift,
|
|
budget_m=budget_m,
|
|
estimate_at_end=estimate_end,
|
|
gt_at_end=gt_end,
|
|
)
|
|
|
|
|
|
def evaluate(
|
|
window: BlackoutWindow,
|
|
*,
|
|
sub_cases: Iterable[tuple[str, Sequence[PositionSample], Sequence[PositionSample]]],
|
|
) -> ImuFallbackReport:
|
|
"""Compute the aggregate report across multiple sub-cases.
|
|
|
|
Each tuple is ``(subcase_name, estimates, ground_truth)``. The
|
|
evaluator does not require both sub-cases to be present — a scenario
|
|
that can only exercise one path still gets a partial report whose
|
|
``passes`` is False (because the missing sub-case has no drift).
|
|
"""
|
|
reports: list[SubCaseReport] = []
|
|
for subcase, estimates, ground_truth in sub_cases:
|
|
reports.append(evaluate_subcase(window, estimates, ground_truth, subcase=subcase))
|
|
return ImuFallbackReport(window=window, sub_cases=tuple(reports))
|
|
|
|
|
|
def write_csv_evidence(out_path: Path, report: ImuFallbackReport) -> Path:
|
|
"""Aggregate-summary CSV (one row per sub-case)."""
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with out_path.open("w", newline="") as fh:
|
|
writer = csv.writer(fh)
|
|
writer.writerow(
|
|
[
|
|
"subcase",
|
|
"window_duration_s",
|
|
"window_in_spec",
|
|
"drift_m",
|
|
"budget_m",
|
|
"estimate_end_lat",
|
|
"estimate_end_lon",
|
|
"gt_end_lat",
|
|
"gt_end_lon",
|
|
"passes",
|
|
]
|
|
)
|
|
for sub in report.sub_cases:
|
|
writer.writerow(
|
|
[
|
|
sub.subcase,
|
|
f"{report.window.duration_s:.3f}",
|
|
"true" if report.passes_window else "false",
|
|
"" if sub.drift_m is None else f"{sub.drift_m:.3f}",
|
|
f"{sub.budget_m:.3f}",
|
|
"" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lat_deg:.7f}",
|
|
"" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lon_deg:.7f}",
|
|
"" if sub.gt_at_end is None else f"{sub.gt_at_end.lat_deg:.7f}",
|
|
"" if sub.gt_at_end is None else f"{sub.gt_at_end.lon_deg:.7f}",
|
|
"true" if sub.passes else "false",
|
|
]
|
|
)
|
|
return out_path
|