mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:31:13 +00:00
c6e6cba237
- AZ-414 (FT-P-07 + FT-N-02): sharp_turn_detector helper covering AC-1 (gyro_z run detection + synthetic-overlay fallback), AC-2/AC-3 (FT-N-02 during-turn label + monotonic covariance), AC-4/AC-5/AC-6 (FT-P-07 recovery lag/drift/heading); twin scenario files under positive/ and negative/. - AZ-415 (FT-P-08): multi_segment_evaluator helper + scenario. - AZ-418 (FT-P-10): smoothing_evaluator helper covering AC-1 (raw + smoothed pose pairing), AC-2 (improvement rate >= 0.80), AC-3 (mean improvement >= 5 m); scenario file. - All scenarios skip-gated on upstream frame_source_replay / imu_replay / fdr_reader stubs (auto-activate when AZ-441 + AZ-407 leftovers land). - +68 unit tests; full e2e unit suite: 393 passed. See _docs/03_implementation/batch_71_report.md and _docs/03_implementation/reviews/batch_71_review.md. Co-authored-by: Cursor <cursoragent@cursor.com>
285 lines
8.5 KiB
Python
285 lines
8.5 KiB
Python
"""Unit tests for ``runner.helpers.smoothing_evaluator`` (FT-P-10 / AZ-418).
|
|
|
|
Covers AC-2 (improvement rate ≥0.80), AC-3 (mean improvement ≥5 m), and
|
|
the FDR pairing discipline (raw + smoothed per keyframe, no dupes).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from runner.helpers.geo import offset
|
|
from runner.helpers.smoothing_evaluator import (
|
|
IMPROVEMENT_RATE_REQUIRED,
|
|
MEAN_IMPROVEMENT_M_REQUIRED,
|
|
GtPose,
|
|
KeyframePair,
|
|
KeyframePoseRecord,
|
|
SmoothingReport,
|
|
evaluate,
|
|
pair_records,
|
|
resolve_gt_at,
|
|
write_csv_evidence,
|
|
)
|
|
|
|
|
|
def _gt_track(n: int = 60, dt_ms: int = 100) -> list[GtPose]:
|
|
"""A straight-line GT track 10 Hz for 6 s, base lat/lon = Derkachi-ish."""
|
|
return [
|
|
GtPose(monotonic_ms=i * dt_ms, lat_deg=48.275 + i * 1e-4, lon_deg=37.385)
|
|
for i in range(n)
|
|
]
|
|
|
|
|
|
def _raw_smoothed_pair(
|
|
keyframe_id: int,
|
|
gt: GtPose,
|
|
raw_offset_m: float,
|
|
smoothed_offset_m: float,
|
|
) -> tuple[KeyframePoseRecord, KeyframePoseRecord]:
|
|
"""Build a (raw, smoothed) pair offset north of the GT pose by given amounts."""
|
|
raw_lat, raw_lon = offset(gt.lat_deg, gt.lon_deg, bearing_deg=0.0, distance_m=raw_offset_m)
|
|
sm_lat, sm_lon = offset(gt.lat_deg, gt.lon_deg, bearing_deg=0.0, distance_m=smoothed_offset_m)
|
|
raw = KeyframePoseRecord(
|
|
keyframe_id=keyframe_id,
|
|
pose_kind="raw",
|
|
monotonic_ms=gt.monotonic_ms,
|
|
lat_deg=raw_lat,
|
|
lon_deg=raw_lon,
|
|
)
|
|
smoothed = KeyframePoseRecord(
|
|
keyframe_id=keyframe_id,
|
|
pose_kind="smoothed",
|
|
monotonic_ms=gt.monotonic_ms + 500, # window-exit later
|
|
lat_deg=sm_lat,
|
|
lon_deg=sm_lon,
|
|
)
|
|
return raw, smoothed
|
|
|
|
|
|
def test_constants_match_spec() -> None:
|
|
"""The AC-2 + AC-3 thresholds must match the spec text."""
|
|
# Assert
|
|
assert IMPROVEMENT_RATE_REQUIRED == 0.80
|
|
assert MEAN_IMPROVEMENT_M_REQUIRED == 5.0
|
|
|
|
|
|
def test_resolve_gt_at_picks_nearest() -> None:
|
|
"""Linear scan picks the nearest GT pose."""
|
|
# Arrange
|
|
track = _gt_track()
|
|
|
|
# Act
|
|
nearest = resolve_gt_at(monotonic_ms=523, gt_track=track)
|
|
|
|
# Assert — nearest 10 Hz sample to 523 ms is at 500 ms.
|
|
assert nearest.monotonic_ms == 500
|
|
|
|
|
|
def test_resolve_gt_at_rejects_empty_track() -> None:
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="gt_track is empty"):
|
|
resolve_gt_at(monotonic_ms=0, gt_track=[])
|
|
|
|
|
|
def test_pair_records_groups_by_keyframe() -> None:
|
|
"""raw + smoothed get grouped per keyframe; partial entries remain partial."""
|
|
# Arrange
|
|
gt = _gt_track()[0]
|
|
raw, sm = _raw_smoothed_pair(7, gt, raw_offset_m=10.0, smoothed_offset_m=3.0)
|
|
records = [raw, sm]
|
|
|
|
# Act
|
|
paired = pair_records(records)
|
|
|
|
# Assert
|
|
assert paired == {7: (raw, sm)}
|
|
|
|
|
|
def test_pair_records_keeps_orphans_partial() -> None:
|
|
"""Smoothed without raw → (None, smoothed)."""
|
|
# Arrange
|
|
gt = _gt_track()[0]
|
|
_, sm = _raw_smoothed_pair(7, gt, raw_offset_m=10.0, smoothed_offset_m=3.0)
|
|
|
|
# Act
|
|
paired = pair_records([sm])
|
|
|
|
# Assert
|
|
assert paired == {7: (None, sm)}
|
|
|
|
|
|
def test_pair_records_rejects_duplicate_pose_kind() -> None:
|
|
"""Two raws for the same keyframe → ValueError."""
|
|
# Arrange
|
|
gt = _gt_track()[0]
|
|
raw1, _ = _raw_smoothed_pair(7, gt, raw_offset_m=10.0, smoothed_offset_m=3.0)
|
|
raw2, _ = _raw_smoothed_pair(7, gt, raw_offset_m=8.0, smoothed_offset_m=3.0)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="duplicate raw pose"):
|
|
pair_records([raw1, raw2])
|
|
|
|
|
|
def test_pair_records_rejects_unknown_pose_kind() -> None:
|
|
"""Programming-error guard for unknown pose_kind values."""
|
|
# Arrange
|
|
bogus = KeyframePoseRecord(
|
|
keyframe_id=1, pose_kind="filtered", monotonic_ms=0, lat_deg=0.0, lon_deg=0.0
|
|
)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="unknown pose_kind 'filtered'"):
|
|
pair_records([bogus])
|
|
|
|
|
|
def test_evaluate_all_smoothed_wins_passes() -> None:
|
|
"""Every keyframe's smoothed is closer to GT → improvement rate 1.0."""
|
|
# Arrange — 20 keyframes; raw 15m off, smoothed 2m off → 13m improvement each.
|
|
track = _gt_track()
|
|
records: list[KeyframePoseRecord] = []
|
|
for i, gt in enumerate(track[:20]):
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=15.0, smoothed_offset_m=2.0)
|
|
records += [raw, sm]
|
|
|
|
# Act
|
|
report = evaluate(records, track)
|
|
|
|
# Assert
|
|
assert report.pair_count == 20
|
|
assert report.improvement_rate == 1.0
|
|
assert report.mean_improvement_m == pytest.approx(13.0, abs=1.0)
|
|
assert report.passes is True
|
|
|
|
|
|
def test_evaluate_at_80_pct_improvement_rate_passes() -> None:
|
|
"""80% smoothed wins AND mean improvement ≥5m → AC-2+AC-3 pass."""
|
|
# Arrange — 10 keyframes: 8 smoothed_wins by 10m, 2 smoothed_loses by 1m.
|
|
track = _gt_track()
|
|
records: list[KeyframePoseRecord] = []
|
|
for i, gt in enumerate(track[:10]):
|
|
if i < 8:
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=12.0, smoothed_offset_m=2.0)
|
|
else:
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=2.0, smoothed_offset_m=3.0)
|
|
records += [raw, sm]
|
|
|
|
# Act
|
|
report = evaluate(records, track)
|
|
|
|
# Assert
|
|
assert report.improvement_rate == pytest.approx(0.80, abs=1e-6)
|
|
assert report.passes_rate is True
|
|
# mean = ((10 * 8) + (-1 * 2)) / 10 = 7.8 m
|
|
assert report.mean_improvement_m == pytest.approx(7.8, abs=1.0)
|
|
assert report.passes_mean is True
|
|
assert report.passes is True
|
|
|
|
|
|
def test_evaluate_below_80_pct_fails_overall() -> None:
|
|
"""79% smoothed wins → AC-2 fails."""
|
|
# Arrange — 100 keyframes: 79 wins, 21 losses.
|
|
track = _gt_track(n=100)
|
|
records: list[KeyframePoseRecord] = []
|
|
for i, gt in enumerate(track):
|
|
if i < 79:
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=15.0, smoothed_offset_m=2.0)
|
|
else:
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=2.0, smoothed_offset_m=3.0)
|
|
records += [raw, sm]
|
|
|
|
# Act
|
|
report = evaluate(records, track)
|
|
|
|
# Assert
|
|
assert report.improvement_rate == pytest.approx(0.79)
|
|
assert report.passes_rate is False
|
|
assert report.passes is False
|
|
|
|
|
|
def test_evaluate_mean_improvement_below_5m_fails() -> None:
|
|
"""100% rate but mean improvement = 3m → AC-3 fails."""
|
|
# Arrange — every keyframe smoothed wins by 3 m.
|
|
track = _gt_track()
|
|
records: list[KeyframePoseRecord] = []
|
|
for i, gt in enumerate(track[:20]):
|
|
raw, sm = _raw_smoothed_pair(i, gt, raw_offset_m=8.0, smoothed_offset_m=5.0)
|
|
records += [raw, sm]
|
|
|
|
# Act
|
|
report = evaluate(records, track)
|
|
|
|
# Assert
|
|
assert report.improvement_rate == 1.0
|
|
assert report.mean_improvement_m == pytest.approx(3.0, abs=0.5)
|
|
assert report.passes_mean is False
|
|
assert report.passes is False
|
|
|
|
|
|
def test_evaluate_excludes_unpaired_keyframes() -> None:
|
|
"""Keyframe with only raw OR only smoothed is silently excluded."""
|
|
# Arrange — keyframe 0 fully paired, keyframe 1 has only raw.
|
|
track = _gt_track()
|
|
raw0, sm0 = _raw_smoothed_pair(0, track[0], raw_offset_m=10.0, smoothed_offset_m=2.0)
|
|
raw1, _ = _raw_smoothed_pair(1, track[1], raw_offset_m=10.0, smoothed_offset_m=2.0)
|
|
|
|
# Act
|
|
report = evaluate([raw0, sm0, raw1], track)
|
|
|
|
# Assert
|
|
assert report.pair_count == 1
|
|
assert report.pairs[0].keyframe_id == 0
|
|
|
|
|
|
def test_evaluate_empty_records_does_not_pass() -> None:
|
|
"""Zero pairs → does NOT pass; rate + mean are 0."""
|
|
# Arrange
|
|
track = _gt_track()
|
|
|
|
# Act
|
|
report = evaluate([], track)
|
|
|
|
# Assert
|
|
assert report.pair_count == 0
|
|
assert report.passes_rate is False
|
|
assert report.passes_mean is False
|
|
assert report.passes is False
|
|
|
|
|
|
def test_evaluate_rejects_empty_gt_track() -> None:
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="gt_track must not be empty"):
|
|
evaluate([], [])
|
|
|
|
|
|
def test_write_csv_evidence_round_trip(tmp_path: Path) -> None:
|
|
"""CSV header + one row per pair."""
|
|
# Arrange
|
|
track = _gt_track()
|
|
raw, sm = _raw_smoothed_pair(0, track[0], raw_offset_m=15.0, smoothed_offset_m=2.0)
|
|
report = evaluate([raw, sm], track)
|
|
out = tmp_path / "ft-p-10.csv"
|
|
|
|
# Act
|
|
write_csv_evidence(out, report)
|
|
|
|
# Assert
|
|
rows = list(csv.reader(out.open()))
|
|
assert rows[0] == [
|
|
"keyframe_id",
|
|
"raw_lat",
|
|
"raw_lon",
|
|
"smoothed_lat",
|
|
"smoothed_lon",
|
|
"gt_lat",
|
|
"gt_lon",
|
|
"raw_error_m",
|
|
"smoothed_error_m",
|
|
"improvement_m",
|
|
"smoothed_wins",
|
|
]
|
|
assert rows[1][-1] == "true"
|