mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:41:12 +00:00
2d6d44af5d
Adds three pure-logic evaluators + scenarios + unit tests covering the project's failure-mode robustness ladder (AC-3.1, AC-3.4, AC-3.5, AC-NEW-8): * outlier_tolerance_evaluator (AZ-424 / FT-N-01): per-event 50 m drift bound + 3-frame covariance-monotonic window over the AZ-408 outlier injector's medium-density manifest. * outage_request_evaluator (AZ-425 / FT-N-03): detects 3+ consecutive missing-frame windows; validates OPERATOR_RELOC_REQUEST STATUSTEXT arrives at 2 s ±500 ms, dead_reckoned label during outage, and no FC EKF divergence. * blackout_spoof_evaluator (AZ-426 / FT-N-04): eight-AC ladder across the 5 s / 15 s / 35 s sub-windows — switch latency, spoof rejection, monotonic covariance, honest horiz_accuracy, STATUSTEXT 1-2 Hz, 35 s escalation thresholds, and recovery gate. Each scenario is skip-gated on the AZ-441 / AZ-407 / AZ-416 replay / SITL / mavproxy helpers; unit tests (14 + 18 + 29 = 61) cover the AC logic today. Full e2e unit-test suite: 527 passed (+67). Co-authored-by: Cursor <cursoragent@cursor.com>
262 lines
8.3 KiB
Python
262 lines
8.3 KiB
Python
"""Outlier-tolerance evaluation for FT-N-01 (AZ-424 / AC-3.1).
|
|
|
|
Consumes the AZ-408 ``outlier`` injector's ``manifest.csv`` (which
|
|
frames were replaced + the geodesic offset) and the SUT's outbound
|
|
estimate stream, and validates:
|
|
|
|
* AC-1: at least ``MIN_OUTLIER_COUNT`` outlier frames were injected
|
|
over the replay.
|
|
* AC-2: for every outlier event,
|
|
``error_after_outlier ≤ error_before_outlier + DRIFT_BUDGET_M``.
|
|
* AC-3: ``cov_semi_major_m`` is non-decreasing across the 3-frame
|
|
window centred on the outlier (frame before, outlier, frame after).
|
|
|
|
The injector's ``geodesic_offset_m`` column verifies the
|
|
RESTRICT-CAM-1 / AC-3.1 threshold (>350 m) per-row — the AC-1 count
|
|
check here is a coarser invariant that does not duplicate the
|
|
per-row geodesic gate.
|
|
|
|
Public-boundary discipline: does NOT import any
|
|
``src/gps_denied_onboard`` symbol.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Sequence
|
|
|
|
from .geo import distance_m
|
|
|
|
DRIFT_BUDGET_M = 50.0 # AC-2
|
|
COVARIANCE_WINDOW_FRAMES = 3 # AC-3: 1 before + 1 outlier + 1 after
|
|
MIN_OUTLIER_COUNT = 10 # AC-1: ~10 over Derkachi 8-min replay
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class GtPose:
|
|
"""One ground-truth pose for a video frame, keyed by frame index."""
|
|
|
|
frame_idx: int
|
|
lat_deg: float
|
|
lon_deg: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutboundEstimate:
|
|
"""One outbound estimate with covariance + label, keyed by frame index."""
|
|
|
|
frame_idx: int
|
|
monotonic_ms: int
|
|
lat_deg: float
|
|
lon_deg: float
|
|
cov_semi_major_m: float
|
|
source_label: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutlierEvent:
|
|
"""One row from the injector's manifest.csv."""
|
|
|
|
frame_idx: int
|
|
geodesic_offset_m: float
|
|
src_jpeg_path: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutlierEventReport:
|
|
"""AC-2 + AC-3 evaluation for one outlier event."""
|
|
|
|
frame_idx: int
|
|
error_before_m: float | None
|
|
error_outlier_m: float | None
|
|
error_after_m: float | None
|
|
drift_m: float | None # error_after - error_before; AC-2 budget
|
|
cov_before: float | None
|
|
cov_outlier: float | None
|
|
cov_after: float | None
|
|
cov_non_decreasing: bool
|
|
|
|
@property
|
|
def passes_drift(self) -> bool:
|
|
return (
|
|
self.drift_m is not None
|
|
and self.drift_m <= DRIFT_BUDGET_M
|
|
)
|
|
|
|
@property
|
|
def passes_covariance(self) -> bool:
|
|
return self.cov_non_decreasing
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.passes_drift and self.passes_covariance
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutlierToleranceReport:
|
|
"""Aggregate report for all outlier events in the replay."""
|
|
|
|
events: tuple[OutlierEventReport, ...]
|
|
total_outliers: int
|
|
|
|
@property
|
|
def passes_count(self) -> bool:
|
|
return self.total_outliers >= MIN_OUTLIER_COUNT
|
|
|
|
@property
|
|
def failed_event_count(self) -> int:
|
|
return sum(1 for e in self.events if not e.passes)
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.passes_count and self.failed_event_count == 0
|
|
|
|
|
|
def load_outlier_manifest(manifest_path: Path) -> list[OutlierEvent]:
|
|
"""Read ``outlier/manifest.csv`` into typed events.
|
|
|
|
Schema (AZ-408): ``frame_idx, src_jpeg_path, replacement_tile_x,
|
|
replacement_tile_y, geodesic_offset_m, seed``.
|
|
"""
|
|
if not manifest_path.exists():
|
|
raise FileNotFoundError(
|
|
f"outlier manifest not found: {manifest_path} — run the "
|
|
"outlier injector first (AZ-408 / runner/helpers/injector_fixtures)"
|
|
)
|
|
events: list[OutlierEvent] = []
|
|
with manifest_path.open() as fh:
|
|
reader = csv.DictReader(fh)
|
|
required = {"frame_idx", "src_jpeg_path", "geodesic_offset_m"}
|
|
missing = required - set(reader.fieldnames or [])
|
|
if missing:
|
|
raise ValueError(
|
|
f"outlier manifest {manifest_path} missing required columns: "
|
|
f"{sorted(missing)}"
|
|
)
|
|
for row in reader:
|
|
events.append(
|
|
OutlierEvent(
|
|
frame_idx=int(row["frame_idx"]),
|
|
geodesic_offset_m=float(row["geodesic_offset_m"]),
|
|
src_jpeg_path=row["src_jpeg_path"],
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def _index_by_frame(estimates: Sequence[OutboundEstimate]) -> dict[int, OutboundEstimate]:
|
|
by_frame: dict[int, OutboundEstimate] = {}
|
|
for e in estimates:
|
|
by_frame[e.frame_idx] = e
|
|
return by_frame
|
|
|
|
|
|
def _index_gt(gt: Sequence[GtPose]) -> dict[int, GtPose]:
|
|
by_frame: dict[int, GtPose] = {}
|
|
for g in gt:
|
|
by_frame[g.frame_idx] = g
|
|
return by_frame
|
|
|
|
|
|
def _error_m(est: OutboundEstimate | None, gt: GtPose | None) -> float | None:
|
|
if est is None or gt is None:
|
|
return None
|
|
return distance_m(gt.lat_deg, gt.lon_deg, est.lat_deg, est.lon_deg)
|
|
|
|
|
|
def evaluate_event(
|
|
event: OutlierEvent,
|
|
estimates_by_frame: dict[int, OutboundEstimate],
|
|
gt_by_frame: dict[int, GtPose],
|
|
) -> OutlierEventReport:
|
|
"""Compute the AC-2 + AC-3 report for one outlier event."""
|
|
before = estimates_by_frame.get(event.frame_idx - 1)
|
|
outlier = estimates_by_frame.get(event.frame_idx)
|
|
after = estimates_by_frame.get(event.frame_idx + 1)
|
|
|
|
gt_before = gt_by_frame.get(event.frame_idx - 1)
|
|
gt_outlier = gt_by_frame.get(event.frame_idx)
|
|
gt_after = gt_by_frame.get(event.frame_idx + 1)
|
|
|
|
err_before = _error_m(before, gt_before)
|
|
err_outlier = _error_m(outlier, gt_outlier)
|
|
err_after = _error_m(after, gt_after)
|
|
|
|
drift: float | None = None
|
|
if err_before is not None and err_after is not None:
|
|
drift = err_after - err_before
|
|
|
|
cov_before = before.cov_semi_major_m if before is not None else None
|
|
cov_outlier = outlier.cov_semi_major_m if outlier is not None else None
|
|
cov_after = after.cov_semi_major_m if after is not None else None
|
|
|
|
covs = [c for c in (cov_before, cov_outlier, cov_after) if c is not None]
|
|
cov_non_decreasing = all(covs[i + 1] >= covs[i] for i in range(len(covs) - 1))
|
|
|
|
return OutlierEventReport(
|
|
frame_idx=event.frame_idx,
|
|
error_before_m=err_before,
|
|
error_outlier_m=err_outlier,
|
|
error_after_m=err_after,
|
|
drift_m=drift,
|
|
cov_before=cov_before,
|
|
cov_outlier=cov_outlier,
|
|
cov_after=cov_after,
|
|
cov_non_decreasing=cov_non_decreasing,
|
|
)
|
|
|
|
|
|
def evaluate(
|
|
events: Sequence[OutlierEvent],
|
|
estimates: Sequence[OutboundEstimate],
|
|
gt: Sequence[GtPose],
|
|
) -> OutlierToleranceReport:
|
|
"""Aggregate report across all outlier events."""
|
|
by_frame = _index_by_frame(estimates)
|
|
gt_idx = _index_gt(gt)
|
|
reports = tuple(evaluate_event(ev, by_frame, gt_idx) for ev in events)
|
|
return OutlierToleranceReport(events=reports, total_outliers=len(events))
|
|
|
|
|
|
def write_csv_evidence(out_path: Path, report: OutlierToleranceReport) -> Path:
|
|
"""Write per-event FT-N-01 evidence CSV."""
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with out_path.open("w", newline="") as fh:
|
|
writer = csv.writer(fh)
|
|
writer.writerow(
|
|
[
|
|
"frame_idx",
|
|
"error_before_m",
|
|
"error_outlier_m",
|
|
"error_after_m",
|
|
"drift_m",
|
|
"cov_before",
|
|
"cov_outlier",
|
|
"cov_after",
|
|
"cov_non_decreasing",
|
|
"passes_drift",
|
|
"passes_covariance",
|
|
"passes",
|
|
]
|
|
)
|
|
for e in report.events:
|
|
writer.writerow(
|
|
[
|
|
e.frame_idx,
|
|
"" if e.error_before_m is None else f"{e.error_before_m:.3f}",
|
|
"" if e.error_outlier_m is None else f"{e.error_outlier_m:.3f}",
|
|
"" if e.error_after_m is None else f"{e.error_after_m:.3f}",
|
|
"" if e.drift_m is None else f"{e.drift_m:.3f}",
|
|
"" if e.cov_before is None else f"{e.cov_before:.3f}",
|
|
"" if e.cov_outlier is None else f"{e.cov_outlier:.3f}",
|
|
"" if e.cov_after is None else f"{e.cov_after:.3f}",
|
|
"true" if e.cov_non_decreasing else "false",
|
|
"true" if e.passes_drift else "false",
|
|
"true" if e.passes_covariance else "false",
|
|
"true" if e.passes else "false",
|
|
]
|
|
)
|
|
return out_path
|