Files
Oleksandr Bezdieniezhnykh 29ac16cfcb [AZ-409] [AZ-412] [AZ-413] Batch 70: FT-P-01/04/05/06 scenarios
AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy:
- accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts
- test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay
  + sitl_observer NotImplementedError; AC-4 timeout discipline

AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments:
- registration_classifier.py: accel-derived attitude + overlap heuristic
  + success ratio with AC-3 sharp-turn exclusion
- test_ft_p_04_derkachi_f2f_registration.py: scenario gated on
  frame_source_replay + imu_replay + fdr_reader

AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets:
- mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile
  via numpy linear interp + combined report
- test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses
  accuracy_evaluator for geodesic join
- test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV
  evidence; skips when either upstream CSV missing

Tests: 325 unit tests pass (+77 vs batch 69).
Reports: batch_70_report.md, batch_70_review.md (PASS).
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 18:10:46 +03:00

285 lines
9.1 KiB
Python

"""MRE budget evaluation for FT-P-05 / FT-P-06 (AZ-413 / AC-2.1b, AC-2.2).
The SUT exposes per-frame **MRE** (Mean Reprojection Error, in pixels)
for both:
* **Frame-to-frame** registrations — produced during the Derkachi replay
(FT-P-04 scope; the MRE per frame is recorded in the FDR archive
alongside the boolean success metric).
* **Cross-domain** registrations — produced when the satellite-anchor
pipeline matches a UAV frame against a satellite tile (FT-P-05 scope;
one MRE per still-image push).
FT-P-05 binds:
* AC-2 (per-image cross-domain): every image's MRE < 2.5 px.
* AC-3 (accuracy alongside MRE): inherits FT-P-01 thresholds (≥80 % at
50 m, ≥50 % at 20 m) but on the same image set; the helper reuses
``accuracy_evaluator`` for the geodesic part.
FT-P-06 binds AC-4: the 95th percentile MRE bound — < 1.0 px frame-to-frame
AND < 2.5 px cross-domain. The 95th percentile is computed with numpy's
default linear-interpolation algorithm (which the spec explicitly names).
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from statistics import median
from typing import Iterable, Sequence
import numpy as np
MRE_PER_IMAGE_BUDGET_PX = 2.5
MRE_P95_FRAME_TO_FRAME_BUDGET_PX = 1.0
MRE_P95_CROSS_DOMAIN_BUDGET_PX = 2.5
@dataclass(frozen=True)
class CrossDomainRecord:
"""One observation per still-image push (FT-P-05)."""
image_id: str
mre_px: float
error_m: float
@dataclass(frozen=True)
class FrameToFrameRecord:
"""One observation per video frame (FT-P-04 evidence reused by FT-P-06)."""
frame_index: int
mre_px: float
@dataclass(frozen=True)
class PerImageBudgetReport:
"""FT-P-05 AC-2: every image MRE < 2.5 px."""
total_images: int
pass_count: int
fail_image_ids: tuple[str, ...]
max_mre_px: float
budget_px: float = MRE_PER_IMAGE_BUDGET_PX
@property
def passes(self) -> bool:
return self.pass_count == self.total_images > 0
@dataclass(frozen=True)
class P95Report:
"""FT-P-06 AC-4: 95th-percentile budget."""
sample_count: int
p95_px: float
budget_px: float
@property
def passes(self) -> bool:
return self.sample_count > 0 and self.p95_px < self.budget_px
@dataclass(frozen=True)
class CombinedP95Report:
"""FT-P-06 combined assertion across both domains."""
frame_to_frame: P95Report
cross_domain: P95Report
@property
def passes(self) -> bool:
return self.frame_to_frame.passes and self.cross_domain.passes
def evaluate_per_image_budget(
records: Sequence[CrossDomainRecord],
*,
budget_px: float = MRE_PER_IMAGE_BUDGET_PX,
) -> PerImageBudgetReport:
"""AC-2 of FT-P-05: every cross-domain MRE strictly below ``budget_px``.
Strictness: the spec text "MRE < 2.5 px for all images" reads as a
strict less-than. A record at exactly 2.5 px FAILS (the matcher must
be inside the budget, not on the boundary).
"""
if budget_px <= 0:
raise ValueError(f"budget_px must be > 0, got {budget_px}")
fail_ids: list[str] = []
pass_count = 0
max_mre = 0.0
for r in records:
max_mre = max(max_mre, r.mre_px)
if r.mre_px < budget_px:
pass_count += 1
else:
fail_ids.append(r.image_id)
return PerImageBudgetReport(
total_images=len(records),
pass_count=pass_count,
fail_image_ids=tuple(fail_ids),
max_mre_px=max_mre,
budget_px=budget_px,
)
def evaluate_p95(
mre_samples: Sequence[float],
*,
budget_px: float,
) -> P95Report:
"""AC-4 of FT-P-06: 95th-percentile MRE strictly below ``budget_px``.
Percentile computed via ``numpy.percentile`` with the default
``method='linear'`` (linear interpolation between adjacent ranks).
The spec explicitly names that method.
"""
if budget_px <= 0:
raise ValueError(f"budget_px must be > 0, got {budget_px}")
n = len(mre_samples)
if n == 0:
return P95Report(sample_count=0, p95_px=float("nan"), budget_px=budget_px)
p95 = float(np.percentile(np.asarray(mre_samples, dtype=float), 95))
return P95Report(sample_count=n, p95_px=p95, budget_px=budget_px)
def evaluate_combined_p95(
frame_to_frame: Sequence[FrameToFrameRecord],
cross_domain: Sequence[CrossDomainRecord],
) -> CombinedP95Report:
"""FT-P-06 combined assertion using per-domain budgets."""
f2f = evaluate_p95(
[r.mre_px for r in frame_to_frame],
budget_px=MRE_P95_FRAME_TO_FRAME_BUDGET_PX,
)
xd = evaluate_p95(
[r.mre_px for r in cross_domain],
budget_px=MRE_P95_CROSS_DOMAIN_BUDGET_PX,
)
return CombinedP95Report(frame_to_frame=f2f, cross_domain=xd)
def load_cross_domain_csv(csv_path: Path) -> list[CrossDomainRecord]:
"""Read ``ft-p-05.csv`` back into typed records (used by FT-P-06)."""
if not csv_path.exists():
raise FileNotFoundError(
f"FT-P-05 evidence not found at {csv_path} — run FT-P-05 first."
)
records: list[CrossDomainRecord] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
needed = {"image_id", "mre_px", "error_m"}
missing = needed - set(reader.fieldnames or [])
if missing:
raise ValueError(f"FT-P-05 CSV missing columns: {sorted(missing)}")
for row in reader:
records.append(
CrossDomainRecord(
image_id=row["image_id"],
mre_px=float(row["mre_px"]),
error_m=float(row["error_m"]) if row["error_m"] != "inf" else float("inf"),
)
)
return records
def load_frame_to_frame_csv(csv_path: Path) -> list[FrameToFrameRecord]:
"""Read frame-to-frame MRE from the FT-P-04 evidence CSV.
The FT-P-04 CSV currently includes ``registration_success`` per frame
but NOT MRE; that column will be added when the SUT exposes it
(AC-NEW-3 FDR schema). This loader expects a ``mre_px`` column —
raises ValueError if absent so the FT-P-06 scenario fails loudly.
"""
if not csv_path.exists():
raise FileNotFoundError(
f"FT-P-04 evidence not found at {csv_path} — run FT-P-04 first."
)
records: list[FrameToFrameRecord] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
if "mre_px" not in (reader.fieldnames or []):
raise ValueError(
"FT-P-04 evidence is missing the 'mre_px' column required by FT-P-06. "
"The SUT must emit per-frame MRE in the FDR archive (AC-NEW-3)."
)
for row in reader:
mre_str = row["mre_px"].strip()
if not mre_str:
continue
records.append(
FrameToFrameRecord(
frame_index=int(row["frame_index"]),
mre_px=float(mre_str),
)
)
return records
def write_cross_domain_csv(
out_path: Path,
records: Iterable[CrossDomainRecord],
*,
pass_50m: dict[str, bool] | None = None,
pass_20m: dict[str, bool] | None = None,
) -> Path:
"""Write the FT-P-05 per-image evidence CSV.
Header: ``image_id, est_lat, est_lon, error_m, mre_px, pass_50m,
pass_20m, pass_mre``. The lat/lon columns are emitted as blanks here
(the scenario file fills them via ``write_csv_evidence`` from
``accuracy_evaluator`` — this writer is for the FT-P-06-relevant
columns only).
"""
pass_50m = pass_50m or {}
pass_20m = pass_20m or {}
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"image_id",
"est_lat",
"est_lon",
"error_m",
"mre_px",
"pass_50m",
"pass_20m",
"pass_mre",
]
)
for r in records:
writer.writerow(
[
r.image_id,
"",
"",
"inf" if r.error_m == float("inf") else f"{r.error_m:.3f}",
f"{r.mre_px:.4f}",
"true" if pass_50m.get(r.image_id, False) else "false",
"true" if pass_20m.get(r.image_id, False) else "false",
"true" if r.mre_px < MRE_PER_IMAGE_BUDGET_PX else "false",
]
)
return out_path
def summarize_mre_distribution(records: Sequence[FrameToFrameRecord | CrossDomainRecord]) -> dict[str, float]:
"""Summary stats for diagnostic logging (median, p95, max).
Convenience helper; not used by the AC assertions themselves.
"""
if not records:
return {"count": 0.0, "median": float("nan"), "p95": float("nan"), "max": float("nan")}
samples = [r.mre_px for r in records]
return {
"count": float(len(samples)),
"median": float(median(samples)),
"p95": float(np.percentile(np.asarray(samples, dtype=float), 95)),
"max": float(max(samples)),
}