"""MRE budget evaluation for FT-P-05 / FT-P-06 (AZ-413 / AC-2.1b, AC-2.2). The SUT exposes per-frame **MRE** (Mean Reprojection Error, in pixels) for both: * **Frame-to-frame** registrations — produced during the Derkachi replay (FT-P-04 scope; the MRE per frame is recorded in the FDR archive alongside the boolean success metric). * **Cross-domain** registrations — produced when the satellite-anchor pipeline matches a UAV frame against a satellite tile (FT-P-05 scope; one MRE per still-image push). FT-P-05 binds: * AC-2 (per-image cross-domain): every image's MRE < 2.5 px. * AC-3 (accuracy alongside MRE): inherits FT-P-01 thresholds (≥80 % at 50 m, ≥50 % at 20 m) but on the same image set; the helper reuses ``accuracy_evaluator`` for the geodesic part. FT-P-06 binds AC-4: the 95th percentile MRE bound — < 1.0 px frame-to-frame AND < 2.5 px cross-domain. The 95th percentile is computed with numpy's default linear-interpolation algorithm (which the spec explicitly names). Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from statistics import median from typing import Iterable, Sequence import numpy as np MRE_PER_IMAGE_BUDGET_PX = 2.5 MRE_P95_FRAME_TO_FRAME_BUDGET_PX = 1.0 MRE_P95_CROSS_DOMAIN_BUDGET_PX = 2.5 @dataclass(frozen=True) class CrossDomainRecord: """One observation per still-image push (FT-P-05).""" image_id: str mre_px: float error_m: float @dataclass(frozen=True) class FrameToFrameRecord: """One observation per video frame (FT-P-04 evidence reused by FT-P-06).""" frame_index: int mre_px: float @dataclass(frozen=True) class PerImageBudgetReport: """FT-P-05 AC-2: every image MRE < 2.5 px.""" total_images: int pass_count: int fail_image_ids: tuple[str, ...] max_mre_px: float budget_px: float = MRE_PER_IMAGE_BUDGET_PX @property def passes(self) -> bool: return self.pass_count == self.total_images > 0 @dataclass(frozen=True) class P95Report: """FT-P-06 AC-4: 95th-percentile budget.""" sample_count: int p95_px: float budget_px: float @property def passes(self) -> bool: return self.sample_count > 0 and self.p95_px < self.budget_px @dataclass(frozen=True) class CombinedP95Report: """FT-P-06 combined assertion across both domains.""" frame_to_frame: P95Report cross_domain: P95Report @property def passes(self) -> bool: return self.frame_to_frame.passes and self.cross_domain.passes def evaluate_per_image_budget( records: Sequence[CrossDomainRecord], *, budget_px: float = MRE_PER_IMAGE_BUDGET_PX, ) -> PerImageBudgetReport: """AC-2 of FT-P-05: every cross-domain MRE strictly below ``budget_px``. Strictness: the spec text "MRE < 2.5 px for all images" reads as a strict less-than. A record at exactly 2.5 px FAILS (the matcher must be inside the budget, not on the boundary). """ if budget_px <= 0: raise ValueError(f"budget_px must be > 0, got {budget_px}") fail_ids: list[str] = [] pass_count = 0 max_mre = 0.0 for r in records: max_mre = max(max_mre, r.mre_px) if r.mre_px < budget_px: pass_count += 1 else: fail_ids.append(r.image_id) return PerImageBudgetReport( total_images=len(records), pass_count=pass_count, fail_image_ids=tuple(fail_ids), max_mre_px=max_mre, budget_px=budget_px, ) def evaluate_p95( mre_samples: Sequence[float], *, budget_px: float, ) -> P95Report: """AC-4 of FT-P-06: 95th-percentile MRE strictly below ``budget_px``. Percentile computed via ``numpy.percentile`` with the default ``method='linear'`` (linear interpolation between adjacent ranks). The spec explicitly names that method. """ if budget_px <= 0: raise ValueError(f"budget_px must be > 0, got {budget_px}") n = len(mre_samples) if n == 0: return P95Report(sample_count=0, p95_px=float("nan"), budget_px=budget_px) p95 = float(np.percentile(np.asarray(mre_samples, dtype=float), 95)) return P95Report(sample_count=n, p95_px=p95, budget_px=budget_px) def evaluate_combined_p95( frame_to_frame: Sequence[FrameToFrameRecord], cross_domain: Sequence[CrossDomainRecord], ) -> CombinedP95Report: """FT-P-06 combined assertion using per-domain budgets.""" f2f = evaluate_p95( [r.mre_px for r in frame_to_frame], budget_px=MRE_P95_FRAME_TO_FRAME_BUDGET_PX, ) xd = evaluate_p95( [r.mre_px for r in cross_domain], budget_px=MRE_P95_CROSS_DOMAIN_BUDGET_PX, ) return CombinedP95Report(frame_to_frame=f2f, cross_domain=xd) def load_cross_domain_csv(csv_path: Path) -> list[CrossDomainRecord]: """Read ``ft-p-05.csv`` back into typed records (used by FT-P-06).""" if not csv_path.exists(): raise FileNotFoundError( f"FT-P-05 evidence not found at {csv_path} — run FT-P-05 first." ) records: list[CrossDomainRecord] = [] with csv_path.open() as fh: reader = csv.DictReader(fh) needed = {"image_id", "mre_px", "error_m"} missing = needed - set(reader.fieldnames or []) if missing: raise ValueError(f"FT-P-05 CSV missing columns: {sorted(missing)}") for row in reader: records.append( CrossDomainRecord( image_id=row["image_id"], mre_px=float(row["mre_px"]), error_m=float(row["error_m"]) if row["error_m"] != "inf" else float("inf"), ) ) return records def load_frame_to_frame_csv(csv_path: Path) -> list[FrameToFrameRecord]: """Read frame-to-frame MRE from the FT-P-04 evidence CSV. The FT-P-04 CSV currently includes ``registration_success`` per frame but NOT MRE; that column will be added when the SUT exposes it (AC-NEW-3 FDR schema). This loader expects a ``mre_px`` column — raises ValueError if absent so the FT-P-06 scenario fails loudly. """ if not csv_path.exists(): raise FileNotFoundError( f"FT-P-04 evidence not found at {csv_path} — run FT-P-04 first." ) records: list[FrameToFrameRecord] = [] with csv_path.open() as fh: reader = csv.DictReader(fh) if "mre_px" not in (reader.fieldnames or []): raise ValueError( "FT-P-04 evidence is missing the 'mre_px' column required by FT-P-06. " "The SUT must emit per-frame MRE in the FDR archive (AC-NEW-3)." ) for row in reader: mre_str = row["mre_px"].strip() if not mre_str: continue records.append( FrameToFrameRecord( frame_index=int(row["frame_index"]), mre_px=float(mre_str), ) ) return records def write_cross_domain_csv( out_path: Path, records: Iterable[CrossDomainRecord], *, pass_50m: dict[str, bool] | None = None, pass_20m: dict[str, bool] | None = None, ) -> Path: """Write the FT-P-05 per-image evidence CSV. Header: ``image_id, est_lat, est_lon, error_m, mre_px, pass_50m, pass_20m, pass_mre``. The lat/lon columns are emitted as blanks here (the scenario file fills them via ``write_csv_evidence`` from ``accuracy_evaluator`` — this writer is for the FT-P-06-relevant columns only). """ pass_50m = pass_50m or {} pass_20m = pass_20m or {} out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "image_id", "est_lat", "est_lon", "error_m", "mre_px", "pass_50m", "pass_20m", "pass_mre", ] ) for r in records: writer.writerow( [ r.image_id, "", "", "inf" if r.error_m == float("inf") else f"{r.error_m:.3f}", f"{r.mre_px:.4f}", "true" if pass_50m.get(r.image_id, False) else "false", "true" if pass_20m.get(r.image_id, False) else "false", "true" if r.mre_px < MRE_PER_IMAGE_BUDGET_PX else "false", ] ) return out_path def summarize_mre_distribution(records: Sequence[FrameToFrameRecord | CrossDomainRecord]) -> dict[str, float]: """Summary stats for diagnostic logging (median, p95, max). Convenience helper; not used by the AC assertions themselves. """ if not records: return {"count": 0.0, "median": float("nan"), "p95": float("nan"), "max": float("nan")} samples = [r.mre_px for r in records] return { "count": float(len(samples)), "median": float(median(samples)), "p95": float(np.percentile(np.asarray(samples, dtype=float), 95)), "max": float(max(samples)), }