"""Top-K retrieval evaluator + scene-change PARTIAL recorder (AZ-423 / FT-P-19). Two pure-logic helpers feeding AC-8.6 (AZ-423): * **AC-1 — scale-ratio retrievability**: for each of the 60 ``still-image-set-60`` images the SUT runs top-K=10 retrieval against its tile cache. The AC passes iff EVERY image has at least one retrieved tile whose centre lies within 100 m of the image's true centre. This is a "set_contains" check — we do NOT care which rank the matching tile occupies, only that it appears in the top-K. * **AC-2 — scene-change subset PARTIAL**: for the 2 paired ``_gmaps.png`` reference images the cross-domain matcher runs; the helper records the boolean outcome AND tags the subset's overall result as ``PARTIAL`` unconditionally — because N=2 is too small to yield a meaningful pass/fail statistic, and the traceability matrix documents this AC as PARTIAL irrespective of count. The scenario test pulls per-frame retrieval candidates from the FDR ``retrieval-topk`` record stream and per-image scene-change outcomes from the cross-domain matcher's FDR record stream; this module only decides whether the parsed inputs satisfy the AC. Public-boundary discipline: NO imports from ``src/gps_denied_onboard``. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Iterable, Sequence from .geo import distance_m # ─────────────────────── FDR record kinds & schema ─────────────────────── RETRIEVAL_TOPK_FDR_KIND = "retrieval-topk" SCENE_CHANGE_MATCH_FDR_KIND = "scene-change-match" TOP_K_REQUIRED = 10 TOP_K_DISTANCE_TOLERANCE_M = 100.0 # Scene-change-pair convention: image `AD.jpg` paired with # `AD_gmaps.png`. Only 2 pairs exist in the project's static # fixture set (`still-image-sat-refs-2`). SCENE_CHANGE_PAIRED_IMAGE_IDS: tuple[str, ...] = ("AD000001", "AD000002") SCENE_CHANGE_SUBSET_PARTIAL_LABEL = "PARTIAL" @dataclass(frozen=True) class CandidateTile: """One top-K candidate the SUT retrieved from its tile cache. ``centre_lat_deg`` / ``centre_lon_deg`` is the WGS84 centre of the tile's footprint per ``TileMetadata.centre_wgs84``. """ tile_id: str centre_lat_deg: float centre_lon_deg: float @dataclass(frozen=True) class TopKQuery: """One image's top-K=10 retrieval result, plus the GT centre. The scenario test produces one of these per image; the helper decides AC-1 per-image and overall. """ image_id: str true_centre_lat_deg: float true_centre_lon_deg: float candidates: tuple[CandidateTile, ...] @dataclass(frozen=True) class TopKImageReport: """Per-image AC-1 outcome. ``min_distance_m`` is the nearest candidate's distance (m) to the true centre; ``None`` when ``candidates`` is empty (the helper treats that as failure). """ image_id: str candidate_count: int min_distance_m: float | None pass_distance: bool @dataclass(frozen=True) class TopKAggregateReport: """AC-1 of FT-P-19: every image's top-K covers the true centre.""" entries: tuple[TopKImageReport, ...] max_distance_m: float expected_image_count: int @property def pass_count(self) -> int: return sum(1 for e in self.entries if e.pass_distance) @property def failing_entries(self) -> tuple[TopKImageReport, ...]: return tuple(e for e in self.entries if not e.pass_distance) @property def passes(self) -> bool: if len(self.entries) != self.expected_image_count: return False return self.pass_count == self.expected_image_count def evaluate_top_k_within_distance( queries: Sequence[TopKQuery], *, max_distance_m: float = TOP_K_DISTANCE_TOLERANCE_M, expected_image_count: int = 60, ) -> TopKAggregateReport: """AC-1: every image's top-K must include a tile within ``max_distance_m`` m. The helper computes the nearest candidate's Vincenty distance to the true centre for each query and decides per-image. The image passes iff at least one candidate is within ``max_distance_m``; the aggregate passes iff every image passes AND the total query count matches ``expected_image_count``. Raises ``ValueError`` on ``max_distance_m <= 0``. """ if max_distance_m <= 0: raise ValueError(f"max_distance_m must be > 0, got {max_distance_m}") entries: list[TopKImageReport] = [] for q in queries: if not q.candidates: entries.append( TopKImageReport( image_id=q.image_id, candidate_count=0, min_distance_m=None, pass_distance=False, ) ) continue distances = [ distance_m( q.true_centre_lat_deg, q.true_centre_lon_deg, c.centre_lat_deg, c.centre_lon_deg, ) for c in q.candidates ] min_d = min(distances) entries.append( TopKImageReport( image_id=q.image_id, candidate_count=len(q.candidates), min_distance_m=min_d, pass_distance=min_d <= max_distance_m, ) ) return TopKAggregateReport( entries=tuple(entries), max_distance_m=max_distance_m, expected_image_count=expected_image_count, ) # ─────────────────────── AC-2 scene-change subset ─────────────────────── @dataclass(frozen=True) class SceneChangeMatch: """One paired-image cross-domain matcher outcome.""" image_id: str # e.g. "AD000001"; pairs implicitly with `_gmaps.png` matched: bool inlier_count: int | None # informational; ``None`` when the matcher didn't report it @dataclass(frozen=True) class SceneChangeSubsetReport: """AC-2 of FT-P-19: scene-change subset is structurally PARTIAL. The subset's overall_label is ALWAYS ``PARTIAL`` — even when both images match successfully — because N=2 is too small for a meaningful pass/fail statistic and the traceability matrix documents AC-8.6 as PARTIAL irrespective of outcome count. """ entries: tuple[SceneChangeMatch, ...] expected_image_ids: tuple[str, ...] = SCENE_CHANGE_PAIRED_IMAGE_IDS overall_label: str = SCENE_CHANGE_SUBSET_PARTIAL_LABEL @property def matched_count(self) -> int: return sum(1 for e in self.entries if e.matched) @property def coverage_complete(self) -> bool: """True iff every expected paired-image id has an entry. Coverage is a structural completeness check (did we collect results for both AD000001 and AD000002?); it is independent of the matcher pass/fail outcome. """ observed = {e.image_id for e in self.entries} return observed == set(self.expected_image_ids) def evaluate_scene_change_subset( matches: Sequence[SceneChangeMatch], *, expected_image_ids: Sequence[str] = SCENE_CHANGE_PAIRED_IMAGE_IDS, ) -> SceneChangeSubsetReport: """AC-2: record the paired-image matcher outcomes and emit PARTIAL. The result is intentionally lenient on pass/fail count — the PARTIAL annotation comes from the spec, not from the data. """ return SceneChangeSubsetReport( entries=tuple(matches), expected_image_ids=tuple(expected_image_ids), ) # ─────────────────────── CSV evidence emission ─────────────────────── TOP_K_CSV_HEADER: tuple[str, ...] = ( "image_id", "candidate_count", "min_distance_m", "pass_distance", ) SCENE_CHANGE_CSV_HEADER: tuple[str, ...] = ( "image_id", "matched", "inlier_count", "subset_label", ) def write_top_k_csv(path: Path, report: TopKAggregateReport) -> None: """Write per-image AC-1 results to ``path`` (CSV, UTF-8, LF newlines). Idempotent: overwrites ``path`` if it exists. Raises ``OSError`` if the parent directory does not exist. """ with path.open("w", encoding="utf-8", newline="") as fh: writer = csv.writer(fh, lineterminator="\n") writer.writerow(TOP_K_CSV_HEADER) for e in report.entries: writer.writerow( [ e.image_id, e.candidate_count, "" if e.min_distance_m is None else f"{e.min_distance_m:.4f}", "true" if e.pass_distance else "false", ] ) def write_scene_change_csv(path: Path, report: SceneChangeSubsetReport) -> None: """Write paired-image AC-2 results to ``path`` (CSV) with PARTIAL tag. Every row carries the subset's ``overall_label`` so a downstream consumer can group / filter by it without joining tables. """ with path.open("w", encoding="utf-8", newline="") as fh: writer = csv.writer(fh, lineterminator="\n") writer.writerow(SCENE_CHANGE_CSV_HEADER) for e in report.entries: writer.writerow( [ e.image_id, "true" if e.matched else "false", "" if e.inlier_count is None else e.inlier_count, report.overall_label, ] ) # ─────────────────────── FDR record projection ─────────────────────── def project_topk_record_to_query( payload: object, true_centre_lat_deg: float, true_centre_lon_deg: float ) -> TopKQuery | None: """Project one FDR ``retrieval-topk`` payload onto a ``TopKQuery``. The payload is expected to carry: * ``image_id`` (str) * ``candidates`` — list[dict] of {tile_id, centre_lat_deg, centre_lon_deg} Returns ``None`` when the payload is malformed (missing fields, wrong shape) — the scenario logs the skip and treats it as failure at aggregate time. """ if not isinstance(payload, dict): return None image_id = payload.get("image_id") raw_candidates = payload.get("candidates") if not isinstance(image_id, str) or not isinstance(raw_candidates, (list, tuple)): return None candidates: list[CandidateTile] = [] for c in raw_candidates: if not isinstance(c, dict): continue tile_id = c.get("tile_id") lat = c.get("centre_lat_deg") lon = c.get("centre_lon_deg") if ( not isinstance(tile_id, str) or not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)) ): continue candidates.append( CandidateTile( tile_id=tile_id, centre_lat_deg=float(lat), centre_lon_deg=float(lon), ) ) return TopKQuery( image_id=image_id, true_centre_lat_deg=true_centre_lat_deg, true_centre_lon_deg=true_centre_lon_deg, candidates=tuple(candidates), ) def project_scene_change_record(payload: object) -> SceneChangeMatch | None: """Project one FDR ``scene-change-match`` payload onto a ``SceneChangeMatch``. Expected payload shape: * ``image_id`` (str) * ``matched`` (bool) * ``inlier_count`` (int | None) — optional Returns ``None`` on malformed input. """ if not isinstance(payload, dict): return None image_id = payload.get("image_id") matched = payload.get("matched") if not isinstance(image_id, str) or not isinstance(matched, bool): return None inlier_raw = payload.get("inlier_count") inlier_count: int | None if inlier_raw is None: inlier_count = None elif isinstance(inlier_raw, bool) or not isinstance(inlier_raw, int): inlier_count = None else: inlier_count = inlier_raw return SceneChangeMatch( image_id=image_id, matched=matched, inlier_count=inlier_count ) def iter_topk_payloads(records: Iterable[object]) -> Iterable[object]: """Filter an iterable of FDR records, yielding ``retrieval-topk`` payloads.""" for rec in records: rt = getattr(rec, "record_type", None) if rt == RETRIEVAL_TOPK_FDR_KIND: yield getattr(rec, "payload", None) def iter_scene_change_payloads(records: Iterable[object]) -> Iterable[object]: """Filter an iterable of FDR records, yielding ``scene-change-match`` payloads.""" for rec in records: rt = getattr(rec, "record_type", None) if rt == SCENE_CHANGE_MATCH_FDR_KIND: yield getattr(rec, "payload", None)