Files
gps-denied-onboard/e2e/runner/helpers/outlier_tolerance_evaluator.py
Oleksandr Bezdieniezhnykh 2d6d44af5d [AZ-424] [AZ-425] [AZ-426] Implement negatives set (FT-N-01/03/04)
Adds three pure-logic evaluators + scenarios + unit tests covering the
project's failure-mode robustness ladder (AC-3.1, AC-3.4, AC-3.5,
AC-NEW-8):

* outlier_tolerance_evaluator (AZ-424 / FT-N-01): per-event 50 m drift
  bound + 3-frame covariance-monotonic window over the AZ-408 outlier
  injector's medium-density manifest.
* outage_request_evaluator (AZ-425 / FT-N-03): detects 3+ consecutive
  missing-frame windows; validates OPERATOR_RELOC_REQUEST STATUSTEXT
  arrives at 2 s ±500 ms, dead_reckoned label during outage, and no
  FC EKF divergence.
* blackout_spoof_evaluator (AZ-426 / FT-N-04): eight-AC ladder across
  the 5 s / 15 s / 35 s sub-windows — switch latency, spoof rejection,
  monotonic covariance, honest horiz_accuracy, STATUSTEXT 1-2 Hz,
  35 s escalation thresholds, and recovery gate.

Each scenario is skip-gated on the AZ-441 / AZ-407 / AZ-416 replay /
SITL / mavproxy helpers; unit tests (14 + 18 + 29 = 61) cover the
AC logic today. Full e2e unit-test suite: 527 passed (+67).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 08:26:16 +03:00

262 lines
8.3 KiB
Python

"""Outlier-tolerance evaluation for FT-N-01 (AZ-424 / AC-3.1).
Consumes the AZ-408 ``outlier`` injector's ``manifest.csv`` (which
frames were replaced + the geodesic offset) and the SUT's outbound
estimate stream, and validates:
* AC-1: at least ``MIN_OUTLIER_COUNT`` outlier frames were injected
over the replay.
* AC-2: for every outlier event,
``error_after_outlier ≤ error_before_outlier + DRIFT_BUDGET_M``.
* AC-3: ``cov_semi_major_m`` is non-decreasing across the 3-frame
window centred on the outlier (frame before, outlier, frame after).
The injector's ``geodesic_offset_m`` column verifies the
RESTRICT-CAM-1 / AC-3.1 threshold (>350 m) per-row — the AC-1 count
check here is a coarser invariant that does not duplicate the
per-row geodesic gate.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
from .geo import distance_m
DRIFT_BUDGET_M = 50.0 # AC-2
COVARIANCE_WINDOW_FRAMES = 3 # AC-3: 1 before + 1 outlier + 1 after
MIN_OUTLIER_COUNT = 10 # AC-1: ~10 over Derkachi 8-min replay
@dataclass(frozen=True)
class GtPose:
"""One ground-truth pose for a video frame, keyed by frame index."""
frame_idx: int
lat_deg: float
lon_deg: float
@dataclass(frozen=True)
class OutboundEstimate:
"""One outbound estimate with covariance + label, keyed by frame index."""
frame_idx: int
monotonic_ms: int
lat_deg: float
lon_deg: float
cov_semi_major_m: float
source_label: str
@dataclass(frozen=True)
class OutlierEvent:
"""One row from the injector's manifest.csv."""
frame_idx: int
geodesic_offset_m: float
src_jpeg_path: str
@dataclass(frozen=True)
class OutlierEventReport:
"""AC-2 + AC-3 evaluation for one outlier event."""
frame_idx: int
error_before_m: float | None
error_outlier_m: float | None
error_after_m: float | None
drift_m: float | None # error_after - error_before; AC-2 budget
cov_before: float | None
cov_outlier: float | None
cov_after: float | None
cov_non_decreasing: bool
@property
def passes_drift(self) -> bool:
return (
self.drift_m is not None
and self.drift_m <= DRIFT_BUDGET_M
)
@property
def passes_covariance(self) -> bool:
return self.cov_non_decreasing
@property
def passes(self) -> bool:
return self.passes_drift and self.passes_covariance
@dataclass(frozen=True)
class OutlierToleranceReport:
"""Aggregate report for all outlier events in the replay."""
events: tuple[OutlierEventReport, ...]
total_outliers: int
@property
def passes_count(self) -> bool:
return self.total_outliers >= MIN_OUTLIER_COUNT
@property
def failed_event_count(self) -> int:
return sum(1 for e in self.events if not e.passes)
@property
def passes(self) -> bool:
return self.passes_count and self.failed_event_count == 0
def load_outlier_manifest(manifest_path: Path) -> list[OutlierEvent]:
"""Read ``outlier/manifest.csv`` into typed events.
Schema (AZ-408): ``frame_idx, src_jpeg_path, replacement_tile_x,
replacement_tile_y, geodesic_offset_m, seed``.
"""
if not manifest_path.exists():
raise FileNotFoundError(
f"outlier manifest not found: {manifest_path} — run the "
"outlier injector first (AZ-408 / runner/helpers/injector_fixtures)"
)
events: list[OutlierEvent] = []
with manifest_path.open() as fh:
reader = csv.DictReader(fh)
required = {"frame_idx", "src_jpeg_path", "geodesic_offset_m"}
missing = required - set(reader.fieldnames or [])
if missing:
raise ValueError(
f"outlier manifest {manifest_path} missing required columns: "
f"{sorted(missing)}"
)
for row in reader:
events.append(
OutlierEvent(
frame_idx=int(row["frame_idx"]),
geodesic_offset_m=float(row["geodesic_offset_m"]),
src_jpeg_path=row["src_jpeg_path"],
)
)
return events
def _index_by_frame(estimates: Sequence[OutboundEstimate]) -> dict[int, OutboundEstimate]:
by_frame: dict[int, OutboundEstimate] = {}
for e in estimates:
by_frame[e.frame_idx] = e
return by_frame
def _index_gt(gt: Sequence[GtPose]) -> dict[int, GtPose]:
by_frame: dict[int, GtPose] = {}
for g in gt:
by_frame[g.frame_idx] = g
return by_frame
def _error_m(est: OutboundEstimate | None, gt: GtPose | None) -> float | None:
if est is None or gt is None:
return None
return distance_m(gt.lat_deg, gt.lon_deg, est.lat_deg, est.lon_deg)
def evaluate_event(
event: OutlierEvent,
estimates_by_frame: dict[int, OutboundEstimate],
gt_by_frame: dict[int, GtPose],
) -> OutlierEventReport:
"""Compute the AC-2 + AC-3 report for one outlier event."""
before = estimates_by_frame.get(event.frame_idx - 1)
outlier = estimates_by_frame.get(event.frame_idx)
after = estimates_by_frame.get(event.frame_idx + 1)
gt_before = gt_by_frame.get(event.frame_idx - 1)
gt_outlier = gt_by_frame.get(event.frame_idx)
gt_after = gt_by_frame.get(event.frame_idx + 1)
err_before = _error_m(before, gt_before)
err_outlier = _error_m(outlier, gt_outlier)
err_after = _error_m(after, gt_after)
drift: float | None = None
if err_before is not None and err_after is not None:
drift = err_after - err_before
cov_before = before.cov_semi_major_m if before is not None else None
cov_outlier = outlier.cov_semi_major_m if outlier is not None else None
cov_after = after.cov_semi_major_m if after is not None else None
covs = [c for c in (cov_before, cov_outlier, cov_after) if c is not None]
cov_non_decreasing = all(covs[i + 1] >= covs[i] for i in range(len(covs) - 1))
return OutlierEventReport(
frame_idx=event.frame_idx,
error_before_m=err_before,
error_outlier_m=err_outlier,
error_after_m=err_after,
drift_m=drift,
cov_before=cov_before,
cov_outlier=cov_outlier,
cov_after=cov_after,
cov_non_decreasing=cov_non_decreasing,
)
def evaluate(
events: Sequence[OutlierEvent],
estimates: Sequence[OutboundEstimate],
gt: Sequence[GtPose],
) -> OutlierToleranceReport:
"""Aggregate report across all outlier events."""
by_frame = _index_by_frame(estimates)
gt_idx = _index_gt(gt)
reports = tuple(evaluate_event(ev, by_frame, gt_idx) for ev in events)
return OutlierToleranceReport(events=reports, total_outliers=len(events))
def write_csv_evidence(out_path: Path, report: OutlierToleranceReport) -> Path:
"""Write per-event FT-N-01 evidence CSV."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"frame_idx",
"error_before_m",
"error_outlier_m",
"error_after_m",
"drift_m",
"cov_before",
"cov_outlier",
"cov_after",
"cov_non_decreasing",
"passes_drift",
"passes_covariance",
"passes",
]
)
for e in report.events:
writer.writerow(
[
e.frame_idx,
"" if e.error_before_m is None else f"{e.error_before_m:.3f}",
"" if e.error_outlier_m is None else f"{e.error_outlier_m:.3f}",
"" if e.error_after_m is None else f"{e.error_after_m:.3f}",
"" if e.drift_m is None else f"{e.drift_m:.3f}",
"" if e.cov_before is None else f"{e.cov_before:.3f}",
"" if e.cov_outlier is None else f"{e.cov_outlier:.3f}",
"" if e.cov_after is None else f"{e.cov_after:.3f}",
"true" if e.cov_non_decreasing else "false",
"true" if e.passes_drift else "false",
"true" if e.passes_covariance else "false",
"true" if e.passes else "false",
]
)
return out_path