"""Per-image accuracy evaluation for FT-P-01 (AZ-409 — AC-1.1, AC-1.2). Consumes a list of ``(image_id, est_lat, est_lon)`` estimates produced by the SUT during a 60-image still-image push, joins against the ground-truth ``coordinates.csv`` shipped with the project, computes Vincenty geodesic distance per image, and reports the AC-2 / AC-3 pass-counts. The helper is **transport-agnostic**: the scenario test reads the per-image estimates from the SITL observer (or post-run FDR archive) and hands a typed list to ``evaluate()`` — no SUT import. The pass-count thresholds come from the spec's ``expected_results/results_report.md`` Pass/Fail Rules: * AC-2 (50 m budget): ≥48 / 60 images pass (80 %). * AC-3 (20 m budget): ≥30 / 60 images pass (50 %). Timeout discipline (AC-4): when the SITL listener times out for an image, the scenario passes ``est_lat = est_lon = float('inf')``; ``evaluate()`` records ``error_m = inf``, ``pass_50m = False``, ``pass_20m = False`` for that image. The aggregate may still pass if other images carry the count. Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import math from dataclasses import dataclass from pathlib import Path from typing import Iterable, Sequence from .geo import distance_m PASS_COUNT_50M_REQUIRED = 48 PASS_COUNT_20M_REQUIRED = 30 TOTAL_IMAGES_REQUIRED = 60 @dataclass(frozen=True) class GtCoordinate: """Ground-truth WGS84 frame-center coordinate for one still image.""" image_id: str lat_deg: float lon_deg: float @dataclass(frozen=True) class EstimateInput: """One outbound estimate observed at the SITL listener. For a timed-out image (no message received within the scenario's 5 s budget) the scenario passes ``est_lat = est_lon = float('inf')``; ``evaluate()`` records ``error_m = inf`` and both pass flags False. """ image_id: str est_lat_deg: float est_lon_deg: float @dataclass(frozen=True) class PerImageResult: """Per-image evaluation row written to ``ft-p-01.csv``.""" image_id: str gt_lat: float gt_lon: float est_lat: float est_lon: float error_m: float pass_50m: bool pass_20m: bool @dataclass(frozen=True) class AggregateReport: """Aggregate pass-count over a 60-image run; drives the scenario assertion.""" total_images: int pass_count_50m: int pass_count_20m: int timeout_count: int pass_50m_required: int = PASS_COUNT_50M_REQUIRED pass_20m_required: int = PASS_COUNT_20M_REQUIRED @property def pass_ac2(self) -> bool: """AC-2: ≥48 / 60 pass the 50 m budget.""" return self.pass_count_50m >= self.pass_50m_required @property def pass_ac3(self) -> bool: """AC-3: ≥30 / 60 pass the 20 m budget.""" return self.pass_count_20m >= self.pass_20m_required @property def overall_pass(self) -> bool: """Scenario passes iff both AC-2 and AC-3 hold.""" return self.pass_ac2 and self.pass_ac3 def load_gt_coordinates(csv_path: Path) -> list[GtCoordinate]: """Parse the project's ``coordinates.csv``. Header format: ``image, lat, lon`` (with the project's whitespace around commas — tolerated). """ if not csv_path.exists(): raise FileNotFoundError( f"coordinates.csv not found at {csv_path} — check the bind-mount or repo path" ) rows: list[GtCoordinate] = [] with csv_path.open() as fh: reader = csv.reader(fh) header = next(reader) normalised_header = [c.strip() for c in header] expected = ["image", "lat", "lon"] if normalised_header != expected: raise ValueError( f"coordinates.csv header mismatch: expected {expected}, got {normalised_header}" ) for raw in reader: if not raw: continue image_id, lat_str, lon_str = (c.strip() for c in raw) rows.append( GtCoordinate( image_id=image_id, lat_deg=float(lat_str), lon_deg=float(lon_str), ) ) return rows def _is_timeout(value: float) -> bool: """An est_lat or est_lon of inf marks an AC-4 timeout.""" return math.isinf(value) def compute_per_image( gt: GtCoordinate, estimate: EstimateInput ) -> PerImageResult: """Compute error_m + AC-2/AC-3 pass flags for one image.""" if gt.image_id != estimate.image_id: raise ValueError( f"image_id mismatch: gt='{gt.image_id}' estimate='{estimate.image_id}'" ) if _is_timeout(estimate.est_lat_deg) or _is_timeout(estimate.est_lon_deg): return PerImageResult( image_id=gt.image_id, gt_lat=gt.lat_deg, gt_lon=gt.lon_deg, est_lat=estimate.est_lat_deg, est_lon=estimate.est_lon_deg, error_m=math.inf, pass_50m=False, pass_20m=False, ) err = distance_m(gt.lat_deg, gt.lon_deg, estimate.est_lat_deg, estimate.est_lon_deg) return PerImageResult( image_id=gt.image_id, gt_lat=gt.lat_deg, gt_lon=gt.lon_deg, est_lat=estimate.est_lat_deg, est_lon=estimate.est_lon_deg, error_m=err, pass_50m=err <= 50.0, pass_20m=err <= 20.0, ) def evaluate( gt_rows: Sequence[GtCoordinate], estimates: Sequence[EstimateInput], ) -> tuple[list[PerImageResult], AggregateReport]: """Join GT + estimates by image_id, compute per-image + aggregate. The GT order is authoritative — the resulting list is in GT order so the CSV column is stable across runs. An estimate without a matching GT row is an error (the scenario should not push a stranger image); a GT row without a matching estimate is a timeout (recorded with inf). """ by_id = {e.image_id: e for e in estimates} if len(by_id) != len(estimates): seen: set[str] = set() dupes: list[str] = [] for e in estimates: if e.image_id in seen: dupes.append(e.image_id) seen.add(e.image_id) raise ValueError(f"duplicate estimate image_ids: {sorted(set(dupes))}") stranger_ids = sorted(set(by_id) - {g.image_id for g in gt_rows}) if stranger_ids: raise ValueError( f"estimate(s) for image_id(s) not in GT: {stranger_ids}" ) results: list[PerImageResult] = [] timeout_count = 0 for gt in gt_rows: est = by_id.get(gt.image_id) if est is None: est = EstimateInput(image_id=gt.image_id, est_lat_deg=math.inf, est_lon_deg=math.inf) timeout_count += 1 elif _is_timeout(est.est_lat_deg) or _is_timeout(est.est_lon_deg): timeout_count += 1 results.append(compute_per_image(gt, est)) aggregate = AggregateReport( total_images=len(results), pass_count_50m=sum(1 for r in results if r.pass_50m), pass_count_20m=sum(1 for r in results if r.pass_20m), timeout_count=timeout_count, ) return results, aggregate def write_csv_evidence(out_path: Path, results: Iterable[PerImageResult]) -> Path: """Write the FT-P-01 per-image evidence CSV. Header: ``image_id, gt_lat, gt_lon, est_lat, est_lon, error_m, pass_50m, pass_20m``. """ out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "image_id", "gt_lat", "gt_lon", "est_lat", "est_lon", "error_m", "pass_50m", "pass_20m", ] ) for r in results: writer.writerow( [ r.image_id, f"{r.gt_lat:.6f}", f"{r.gt_lon:.6f}", "inf" if math.isinf(r.est_lat) else f"{r.est_lat:.6f}", "inf" if math.isinf(r.est_lon) else f"{r.est_lon:.6f}", "inf" if math.isinf(r.error_m) else f"{r.error_m:.3f}", "true" if r.pass_50m else "false", "true" if r.pass_20m else "false", ] ) return out_path