mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 15:51:13 +00:00
29ac16cfcb
AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy: - accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts - test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay + sitl_observer NotImplementedError; AC-4 timeout discipline AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments: - registration_classifier.py: accel-derived attitude + overlap heuristic + success ratio with AC-3 sharp-turn exclusion - test_ft_p_04_derkachi_f2f_registration.py: scenario gated on frame_source_replay + imu_replay + fdr_reader AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets: - mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile via numpy linear interp + combined report - test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses accuracy_evaluator for geodesic join - test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV evidence; skips when either upstream CSV missing Tests: 325 unit tests pass (+77 vs batch 69). Reports: batch_70_report.md, batch_70_review.md (PASS). Co-authored-by: Cursor <cursoragent@cursor.com>
361 lines
12 KiB
Python
361 lines
12 KiB
Python
"""Unit tests for ``runner.helpers.accuracy_evaluator`` (FT-P-01 / AZ-409).
|
|
|
|
Covers AC-1 (per-image evaluation), AC-2 (50 m pass-count threshold ≥48),
|
|
AC-3 (20 m pass-count threshold ≥30), AC-4 (timeout discipline) and the
|
|
CSV evidence shape.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import math
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from runner.helpers.accuracy_evaluator import (
|
|
PASS_COUNT_20M_REQUIRED,
|
|
PASS_COUNT_50M_REQUIRED,
|
|
TOTAL_IMAGES_REQUIRED,
|
|
AggregateReport,
|
|
EstimateInput,
|
|
GtCoordinate,
|
|
PerImageResult,
|
|
compute_per_image,
|
|
evaluate,
|
|
load_gt_coordinates,
|
|
write_csv_evidence,
|
|
)
|
|
from runner.helpers.geo import distance_m, offset
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[3]
|
|
GT_CSV = REPO_ROOT / "_docs" / "00_problem" / "input_data" / "coordinates.csv"
|
|
|
|
|
|
def test_load_gt_coordinates_parses_repo_csv() -> None:
|
|
"""The shipped ``coordinates.csv`` must parse cleanly into 60 rows."""
|
|
# Act
|
|
rows = load_gt_coordinates(GT_CSV)
|
|
|
|
# Assert
|
|
assert len(rows) == TOTAL_IMAGES_REQUIRED
|
|
assert rows[0].image_id == "AD000001.jpg"
|
|
assert rows[0].lat_deg == pytest.approx(48.275292, abs=1e-6)
|
|
assert rows[0].lon_deg == pytest.approx(37.385220, abs=1e-6)
|
|
assert rows[-1].image_id == "AD000060.jpg"
|
|
|
|
|
|
def test_load_gt_coordinates_rejects_missing_file(tmp_path: Path) -> None:
|
|
"""Explicit FileNotFoundError, not a silent empty list."""
|
|
# Act / Assert
|
|
with pytest.raises(FileNotFoundError):
|
|
load_gt_coordinates(tmp_path / "missing.csv")
|
|
|
|
|
|
def test_load_gt_coordinates_rejects_wrong_header(tmp_path: Path) -> None:
|
|
# Arrange
|
|
bad = tmp_path / "bad.csv"
|
|
bad.write_text("img_name,latitude,longitude\nx,1,2\n")
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="header mismatch"):
|
|
load_gt_coordinates(bad)
|
|
|
|
|
|
def test_compute_per_image_zero_error_for_exact_match() -> None:
|
|
"""Exact GT → estimate match yields error_m ≈ 0 and both pass flags True."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275292, 37.385220)
|
|
est = EstimateInput("AD000001.jpg", 48.275292, 37.385220)
|
|
|
|
# Act
|
|
result = compute_per_image(gt, est)
|
|
|
|
# Assert
|
|
assert result.error_m == pytest.approx(0.0, abs=1e-6)
|
|
assert result.pass_50m is True
|
|
assert result.pass_20m is True
|
|
|
|
|
|
def test_compute_per_image_15m_north_passes_both() -> None:
|
|
"""15 m north of GT — below both 50 m and 20 m budgets."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275292, 37.385220)
|
|
new_lat, new_lon = offset(gt.lat_deg, gt.lon_deg, bearing_deg=0.0, distance_m=15.0)
|
|
est = EstimateInput("AD000001.jpg", new_lat, new_lon)
|
|
|
|
# Act
|
|
result = compute_per_image(gt, est)
|
|
|
|
# Assert
|
|
assert result.error_m == pytest.approx(15.0, abs=0.5)
|
|
assert result.pass_50m is True
|
|
assert result.pass_20m is True
|
|
|
|
|
|
def test_compute_per_image_35m_east_passes_50_only() -> None:
|
|
"""35 m east of GT — passes 50 m budget, fails 20 m budget."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275292, 37.385220)
|
|
new_lat, new_lon = offset(gt.lat_deg, gt.lon_deg, bearing_deg=90.0, distance_m=35.0)
|
|
est = EstimateInput("AD000001.jpg", new_lat, new_lon)
|
|
|
|
# Act
|
|
result = compute_per_image(gt, est)
|
|
|
|
# Assert
|
|
assert result.error_m == pytest.approx(35.0, abs=0.5)
|
|
assert result.pass_50m is True
|
|
assert result.pass_20m is False
|
|
|
|
|
|
def test_compute_per_image_120m_south_fails_both() -> None:
|
|
"""120 m south of GT — fails both budgets."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275292, 37.385220)
|
|
new_lat, new_lon = offset(gt.lat_deg, gt.lon_deg, bearing_deg=180.0, distance_m=120.0)
|
|
est = EstimateInput("AD000001.jpg", new_lat, new_lon)
|
|
|
|
# Act
|
|
result = compute_per_image(gt, est)
|
|
|
|
# Assert
|
|
assert result.error_m == pytest.approx(120.0, abs=0.5)
|
|
assert result.pass_50m is False
|
|
assert result.pass_20m is False
|
|
|
|
|
|
def test_compute_per_image_timeout_sets_inf_and_false_flags() -> None:
|
|
"""AC-4: inf estimate → error_m = inf, both flags False; no crash."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275292, 37.385220)
|
|
est = EstimateInput("AD000001.jpg", math.inf, math.inf)
|
|
|
|
# Act
|
|
result = compute_per_image(gt, est)
|
|
|
|
# Assert
|
|
assert math.isinf(result.error_m)
|
|
assert result.pass_50m is False
|
|
assert result.pass_20m is False
|
|
|
|
|
|
def test_compute_per_image_rejects_image_id_mismatch() -> None:
|
|
"""compute_per_image refuses to silently join across image_ids."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.0, 37.0)
|
|
est = EstimateInput("AD000002.jpg", 48.0, 37.0)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="image_id mismatch"):
|
|
compute_per_image(gt, est)
|
|
|
|
|
|
def _make_gt_with_offsets(offsets_m: list[float]) -> tuple[list[GtCoordinate], list[EstimateInput]]:
|
|
"""Build GT + estimates: each estimate is `offsets_m[i]` meters north of GT."""
|
|
base_lat, base_lon = 48.275, 37.385
|
|
gt_rows: list[GtCoordinate] = []
|
|
estimates: list[EstimateInput] = []
|
|
for i, off in enumerate(offsets_m, start=1):
|
|
image_id = f"AD{i:06d}.jpg"
|
|
gt_lat = base_lat + i * 1e-4
|
|
gt_lon = base_lon
|
|
gt_rows.append(GtCoordinate(image_id, gt_lat, gt_lon))
|
|
est_lat, est_lon = offset(gt_lat, gt_lon, bearing_deg=0.0, distance_m=off)
|
|
estimates.append(EstimateInput(image_id, est_lat, est_lon))
|
|
return gt_rows, estimates
|
|
|
|
|
|
def test_evaluate_all_pass_yields_overall_pass() -> None:
|
|
"""60 images all <20 m: AC-2 + AC-3 both pass."""
|
|
# Arrange
|
|
offsets = [5.0] * TOTAL_IMAGES_REQUIRED
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
|
|
# Act
|
|
results, aggregate = evaluate(gt_rows, estimates)
|
|
|
|
# Assert
|
|
assert len(results) == TOTAL_IMAGES_REQUIRED
|
|
assert aggregate.pass_count_50m == 60
|
|
assert aggregate.pass_count_20m == 60
|
|
assert aggregate.timeout_count == 0
|
|
assert aggregate.overall_pass is True
|
|
|
|
|
|
def test_evaluate_boundary_threshold_holds() -> None:
|
|
"""Exactly 48 within 50 m + 30 within 20 m → overall_pass = True."""
|
|
# Arrange — 30 images at 10m (pass both), 18 images at 35m (pass 50 only),
|
|
# 12 images at 120m (fail both).
|
|
offsets = [10.0] * 30 + [35.0] * 18 + [120.0] * 12
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
|
|
# Act
|
|
_, aggregate = evaluate(gt_rows, estimates)
|
|
|
|
# Assert
|
|
assert aggregate.pass_count_50m == 48
|
|
assert aggregate.pass_count_20m == 30
|
|
assert aggregate.pass_ac2 is True
|
|
assert aggregate.pass_ac3 is True
|
|
assert aggregate.overall_pass is True
|
|
|
|
|
|
def test_evaluate_below_50m_threshold_fails_overall() -> None:
|
|
"""47/60 within 50 m → AC-2 fails → overall_pass False."""
|
|
# Arrange — 30 at 10m, 17 at 35m (47 within 50m), 13 at 120m.
|
|
offsets = [10.0] * 30 + [35.0] * 17 + [120.0] * 13
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
|
|
# Act
|
|
_, aggregate = evaluate(gt_rows, estimates)
|
|
|
|
# Assert
|
|
assert aggregate.pass_count_50m == 47
|
|
assert aggregate.pass_ac2 is False
|
|
assert aggregate.overall_pass is False
|
|
|
|
|
|
def test_evaluate_below_20m_threshold_fails_overall() -> None:
|
|
"""All 60 within 50 m but only 29 within 20 m → AC-3 fails."""
|
|
# Arrange
|
|
offsets = [10.0] * 29 + [35.0] * 31
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
|
|
# Act
|
|
_, aggregate = evaluate(gt_rows, estimates)
|
|
|
|
# Assert
|
|
assert aggregate.pass_count_50m == 60
|
|
assert aggregate.pass_count_20m == 29
|
|
assert aggregate.pass_ac3 is False
|
|
assert aggregate.overall_pass is False
|
|
|
|
|
|
def test_evaluate_missing_estimate_recorded_as_timeout() -> None:
|
|
"""GT row without estimate → timeout (inf, both False) and aggregate counts it."""
|
|
# Arrange
|
|
offsets = [5.0] * TOTAL_IMAGES_REQUIRED
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
# Drop the 7th estimate to simulate a SITL timeout for AD000007.jpg.
|
|
dropped_index = 6
|
|
estimates_with_gap = [e for i, e in enumerate(estimates) if i != dropped_index]
|
|
|
|
# Act
|
|
results, aggregate = evaluate(gt_rows, estimates_with_gap)
|
|
|
|
# Assert
|
|
assert len(results) == TOTAL_IMAGES_REQUIRED
|
|
assert aggregate.timeout_count == 1
|
|
assert results[dropped_index].image_id == "AD000007.jpg"
|
|
assert math.isinf(results[dropped_index].error_m)
|
|
assert results[dropped_index].pass_50m is False
|
|
|
|
|
|
def test_evaluate_rejects_duplicate_estimate_image_id() -> None:
|
|
"""Two estimates for the same image_id → ValueError (programming error)."""
|
|
# Arrange
|
|
offsets = [5.0] * 2
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
duplicate = EstimateInput(estimates[0].image_id, estimates[0].est_lat_deg, estimates[0].est_lon_deg)
|
|
estimates.append(duplicate)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="duplicate estimate image_ids"):
|
|
evaluate(gt_rows, estimates)
|
|
|
|
|
|
def test_evaluate_rejects_stranger_estimate_image_id() -> None:
|
|
"""Estimate for an image not in GT → ValueError (programming error)."""
|
|
# Arrange
|
|
offsets = [5.0] * 2
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
estimates.append(EstimateInput("AD999999.jpg", 48.0, 37.0))
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="not in GT"):
|
|
evaluate(gt_rows, estimates)
|
|
|
|
|
|
def test_evaluate_full_timeout_run_produces_zero_pass_counts() -> None:
|
|
"""All 60 timed out → pass counts 0, overall_pass False."""
|
|
# Arrange
|
|
gt_rows = [GtCoordinate(f"AD{i:06d}.jpg", 48.275 + i * 1e-4, 37.385) for i in range(1, 61)]
|
|
estimates: list[EstimateInput] = []
|
|
|
|
# Act
|
|
results, aggregate = evaluate(gt_rows, estimates)
|
|
|
|
# Assert
|
|
assert aggregate.timeout_count == 60
|
|
assert aggregate.pass_count_50m == 0
|
|
assert aggregate.pass_count_20m == 0
|
|
assert aggregate.overall_pass is False
|
|
assert all(math.isinf(r.error_m) for r in results)
|
|
|
|
|
|
def test_aggregate_report_thresholds_match_results_report() -> None:
|
|
"""The thresholds in code must match results_report.md (48 / 30 / 60)."""
|
|
# Assert
|
|
assert PASS_COUNT_50M_REQUIRED == 48
|
|
assert PASS_COUNT_20M_REQUIRED == 30
|
|
assert TOTAL_IMAGES_REQUIRED == 60
|
|
|
|
|
|
def test_write_csv_evidence_round_trip(tmp_path: Path) -> None:
|
|
"""CSV row count + header + numeric round-trip on the evidence file."""
|
|
# Arrange
|
|
offsets = [5.0, 35.0, 120.0]
|
|
gt_rows, estimates = _make_gt_with_offsets(offsets)
|
|
results, _ = evaluate(gt_rows, estimates)
|
|
out_path = tmp_path / "ft-p-01.csv"
|
|
|
|
# Act
|
|
written = write_csv_evidence(out_path, results)
|
|
|
|
# Assert
|
|
assert written == out_path
|
|
rows = list(csv.reader(out_path.open()))
|
|
assert rows[0] == [
|
|
"image_id",
|
|
"gt_lat",
|
|
"gt_lon",
|
|
"est_lat",
|
|
"est_lon",
|
|
"error_m",
|
|
"pass_50m",
|
|
"pass_20m",
|
|
]
|
|
assert len(rows) == 1 + len(offsets)
|
|
# AD000003 had a 120 m offset → pass_50m=false, pass_20m=false
|
|
far_row = rows[3]
|
|
assert far_row[0] == "AD000003.jpg"
|
|
assert far_row[6] == "false"
|
|
assert far_row[7] == "false"
|
|
|
|
|
|
def test_write_csv_evidence_serializes_timeout_as_inf(tmp_path: Path) -> None:
|
|
"""Timeout rows are written with the literal 'inf' for est_lat/est_lon/error_m."""
|
|
# Arrange
|
|
gt = GtCoordinate("AD000001.jpg", 48.275, 37.385)
|
|
timeout = PerImageResult(
|
|
image_id="AD000001.jpg",
|
|
gt_lat=gt.lat_deg,
|
|
gt_lon=gt.lon_deg,
|
|
est_lat=math.inf,
|
|
est_lon=math.inf,
|
|
error_m=math.inf,
|
|
pass_50m=False,
|
|
pass_20m=False,
|
|
)
|
|
out_path = tmp_path / "ft-p-01.csv"
|
|
|
|
# Act
|
|
write_csv_evidence(out_path, [timeout])
|
|
|
|
# Assert
|
|
rows = list(csv.reader(out_path.open()))
|
|
assert rows[1][3] == "inf"
|
|
assert rows[1][4] == "inf"
|
|
assert rows[1][5] == "inf"
|