"""IMU-only fallback drift evaluator for NFT-RES-01 (AZ-432 / AC-3.5 + AC-NEW-7). A pure-vision-blackout (no spoof) lasting 30 s is injected by ``fixtures/injectors/blackout_spoof.py --no-spoof``. The SUT must fall back to IMU-only dead reckoning. AC-3.5 + AC-NEW-7 prescribe two drift budgets at the end of the blackout, depending on whether the CombinedImuFactor (PreintegratedCombinedMeasurements) is active: * sub-case (a) — no good IMU → ``drift ≤ NO_IMU_BUDGET_M`` (100 m). * sub-case (b) — CombinedImuFactor active (SUT default config) → ``drift ≤ GOOD_IMU_BUDGET_M`` (50 m). Drift is the Vincenty distance between the SUT's last estimate at blackout end and the ground-truth position at the same timestamp. The scenario test owns the orchestration (window injection, sub-case selection, fixture loading). This module owns the pure arithmetic + CSV evidence. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol; consumes only typed samples that the scenario adapter projects out of the boundary observers. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Iterable, Sequence from .geo import distance_m # AC-2 / AC-3 NO_IMU_BUDGET_M = 100.0 GOOD_IMU_BUDGET_M = 50.0 # AC-1 — accept windows within ±2 s of the nominal 30 s. WINDOW_NOMINAL_S = 30.0 WINDOW_TOLERANCE_S = 2.0 SUBCASE_NO_IMU = "no_imu" SUBCASE_GOOD_IMU = "good_imu_combined_factor" ALLOWED_SUBCASES = (SUBCASE_NO_IMU, SUBCASE_GOOD_IMU) @dataclass(frozen=True) class PositionSample: """One WGS84 sample tagged with a monotonic-ms timestamp. Used for both the SUT's outbound estimate stream and the ground-truth track. Both streams must share the same monotonic clock so the scenario can pick the "at blackout end" sample by interpolation / nearest-neighbour lookup. """ monotonic_ms: int lat_deg: float lon_deg: float @dataclass(frozen=True) class BlackoutWindow: """The injector-emitted window the evaluator is bound to.""" onset_monotonic_ms: int end_monotonic_ms: int @property def duration_s(self) -> float: return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 @property def window_in_spec(self) -> bool: """AC-1: window duration must be within ±2 s of nominal 30 s.""" return abs(self.duration_s - WINDOW_NOMINAL_S) <= WINDOW_TOLERANCE_S @dataclass(frozen=True) class SubCaseReport: """Drift result for a single sub-case (no_imu / good_imu).""" subcase: str drift_m: float | None budget_m: float estimate_at_end: PositionSample | None gt_at_end: PositionSample | None @property def passes(self) -> bool: return self.drift_m is not None and self.drift_m <= self.budget_m @dataclass(frozen=True) class ImuFallbackReport: """Aggregate NFT-RES-01 result for one parameterization.""" window: BlackoutWindow sub_cases: tuple[SubCaseReport, ...] @property def passes_window(self) -> bool: return self.window.window_in_spec @property def passes(self) -> bool: return self.passes_window and all(s.passes for s in self.sub_cases) def by_subcase(self, subcase: str) -> SubCaseReport: for s in self.sub_cases: if s.subcase == subcase: return s raise KeyError(f"sub-case {subcase!r} not present in report") def _pick_at_or_before( samples: Sequence[PositionSample], t_ms: int ) -> PositionSample | None: """Return the latest sample with ``monotonic_ms ≤ t_ms`` (None if none qualify). Tests against the closest sample on the "left" of the boundary — drift evaluation must NOT extrapolate past the captured window. """ chosen: PositionSample | None = None for s in samples: if s.monotonic_ms <= t_ms: if chosen is None or s.monotonic_ms > chosen.monotonic_ms: chosen = s return chosen def evaluate_subcase( window: BlackoutWindow, estimates: Sequence[PositionSample], ground_truth: Sequence[PositionSample], *, subcase: str, budget_m: float | None = None, ) -> SubCaseReport: """Compute drift for one sub-case. `subcase` selects the budget when `budget_m` is omitted: 100 m for ``no_imu``, 50 m for ``good_imu_combined_factor``. Unknown sub-case names raise ``ValueError`` so a typo at the call site fails loud instead of silently relaxing the budget. """ if subcase not in ALLOWED_SUBCASES: raise ValueError( f"subcase must be one of {ALLOWED_SUBCASES}; got {subcase!r}" ) if budget_m is None: budget_m = ( NO_IMU_BUDGET_M if subcase == SUBCASE_NO_IMU else GOOD_IMU_BUDGET_M ) estimate_end = _pick_at_or_before(estimates, window.end_monotonic_ms) gt_end = _pick_at_or_before(ground_truth, window.end_monotonic_ms) drift: float | None if estimate_end is None or gt_end is None: drift = None else: drift = distance_m( estimate_end.lat_deg, estimate_end.lon_deg, gt_end.lat_deg, gt_end.lon_deg, ) return SubCaseReport( subcase=subcase, drift_m=drift, budget_m=budget_m, estimate_at_end=estimate_end, gt_at_end=gt_end, ) def evaluate( window: BlackoutWindow, *, sub_cases: Iterable[tuple[str, Sequence[PositionSample], Sequence[PositionSample]]], ) -> ImuFallbackReport: """Compute the aggregate report across multiple sub-cases. Each tuple is ``(subcase_name, estimates, ground_truth)``. The evaluator does not require both sub-cases to be present — a scenario that can only exercise one path still gets a partial report whose ``passes`` is False (because the missing sub-case has no drift). """ reports: list[SubCaseReport] = [] for subcase, estimates, ground_truth in sub_cases: reports.append(evaluate_subcase(window, estimates, ground_truth, subcase=subcase)) return ImuFallbackReport(window=window, sub_cases=tuple(reports)) def write_csv_evidence(out_path: Path, report: ImuFallbackReport) -> Path: """Aggregate-summary CSV (one row per sub-case).""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "subcase", "window_duration_s", "window_in_spec", "drift_m", "budget_m", "estimate_end_lat", "estimate_end_lon", "gt_end_lat", "gt_end_lon", "passes", ] ) for sub in report.sub_cases: writer.writerow( [ sub.subcase, f"{report.window.duration_s:.3f}", "true" if report.passes_window else "false", "" if sub.drift_m is None else f"{sub.drift_m:.3f}", f"{sub.budget_m:.3f}", "" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lat_deg:.7f}", "" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lon_deg:.7f}", "" if sub.gt_at_end is None else f"{sub.gt_at_end.lat_deg:.7f}", "" if sub.gt_at_end is None else f"{sub.gt_at_end.lon_deg:.7f}", "true" if sub.passes else "false", ] ) return out_path