Files
Oleksandr Bezdieniezhnykh 29ac16cfcb [AZ-409] [AZ-412] [AZ-413] Batch 70: FT-P-01/04/05/06 scenarios
AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy:
- accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts
- test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay
  + sitl_observer NotImplementedError; AC-4 timeout discipline

AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments:
- registration_classifier.py: accel-derived attitude + overlap heuristic
  + success ratio with AC-3 sharp-turn exclusion
- test_ft_p_04_derkachi_f2f_registration.py: scenario gated on
  frame_source_replay + imu_replay + fdr_reader

AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets:
- mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile
  via numpy linear interp + combined report
- test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses
  accuracy_evaluator for geodesic join
- test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV
  evidence; skips when either upstream CSV missing

Tests: 325 unit tests pass (+77 vs batch 69).
Reports: batch_70_report.md, batch_70_review.md (PASS).
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 18:10:46 +03:00

412 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for ``runner.helpers.registration_classifier`` (FT-P-04 / AZ-412).
Covers AC-1 (normal-segment classification reproducibility), AC-2
(success ratio ≥0.95), AC-3 (sharp-turn exclusion from denominator),
and the CSV evidence shape.
"""
from __future__ import annotations
import csv
import math
from pathlib import Path
import pytest
from runner.helpers.registration_classifier import (
ATTITUDE_LIMIT_DEG,
DEFAULT_GROUND_FOOTPRINT_M,
IMU_HZ,
SUCCESS_RATIO_REQUIRED,
TARGET_OVERLAP_FRACTION,
VIDEO_FPS,
VIDEO_FRAMES_PER_IMU_ROW,
FrameAttitude,
FrameClassification,
ImuTelemetryRow,
SuccessReport,
classify_frames,
compute_attitude,
compute_overlap_fraction,
compute_success_ratio,
compute_translation_m,
load_imu_telemetry,
write_csv_evidence,
)
REPO_ROOT = Path(__file__).resolve().parents[3]
DERKACHI_IMU_CSV = REPO_ROOT / "_docs" / "00_problem" / "input_data" / "flight_derkachi" / "data_imu.csv"
def _level_row(time_s: float = 0.0) -> ImuTelemetryRow:
"""A cruise/level row: gravity is z=-1000mg, cruise velocity 10 m/s east."""
return ImuTelemetryRow(
timestamp_ms=time_s * 1000.0,
time_s=time_s,
xacc=0,
yacc=0,
zacc=-1000,
vx_cms=1000.0,
vy_cms=0.0,
vz_cms=0.0,
)
def _rolled_row(time_s: float, roll_deg: float) -> ImuTelemetryRow:
"""A row with the given roll about +x; uses the accel decomposition."""
rad = math.radians(roll_deg)
return ImuTelemetryRow(
timestamp_ms=time_s * 1000.0,
time_s=time_s,
xacc=0,
yacc=int(round(-1000.0 * math.sin(rad))),
zacc=int(round(-1000.0 * math.cos(rad))),
vx_cms=1000.0,
vy_cms=0.0,
vz_cms=0.0,
)
def _pitched_row(time_s: float, pitch_deg: float) -> ImuTelemetryRow:
"""A row pitched nose-down by ``pitch_deg``; ``+pitch_deg`` = nose down."""
rad = math.radians(pitch_deg)
return ImuTelemetryRow(
timestamp_ms=time_s * 1000.0,
time_s=time_s,
xacc=int(round(-1000.0 * math.sin(rad))),
yacc=0,
zacc=int(round(-1000.0 * math.cos(rad))),
vx_cms=1000.0,
vy_cms=0.0,
vz_cms=0.0,
)
def test_load_imu_telemetry_parses_repo_csv() -> None:
"""The shipped ``data_imu.csv`` parses cleanly into ≈4900 rows."""
# Act
rows = load_imu_telemetry(DERKACHI_IMU_CSV)
# Assert — results_report.md says "4,900 nonblank rows".
assert len(rows) == 4900
assert rows[0].time_s == pytest.approx(0.0, abs=1e-9)
# The first row's accel components match the file header we inspected.
assert rows[0].xacc == 21
assert rows[0].yacc == -3
assert rows[0].zacc == -984
def test_load_imu_telemetry_rejects_missing_file(tmp_path: Path) -> None:
# Act / Assert
with pytest.raises(FileNotFoundError):
load_imu_telemetry(tmp_path / "missing.csv")
def test_load_imu_telemetry_rejects_missing_columns(tmp_path: Path) -> None:
# Arrange
bad = tmp_path / "bad.csv"
bad.write_text("timestamp(ms),Time\n100,0.1\n")
# Act / Assert
with pytest.raises(ValueError, match="missing columns"):
load_imu_telemetry(bad)
def test_compute_attitude_level_row_within_one_degree() -> None:
"""Repo's first row (≈level cruise) → bank + pitch both within ±1°."""
# Act
attitude = compute_attitude(_level_row())
# Assert
assert abs(attitude.bank_deg) < 1.0
assert abs(attitude.pitch_deg) < 1.0
def test_compute_attitude_right_roll_30_deg_round_trip() -> None:
"""A row constructed with 30° right roll → bank ≈ +30°."""
# Act
attitude = compute_attitude(_rolled_row(time_s=0.1, roll_deg=30.0))
# Assert
assert attitude.bank_deg == pytest.approx(30.0, abs=0.5)
assert abs(attitude.pitch_deg) < 0.5
def test_compute_attitude_left_roll_30_deg_round_trip() -> None:
"""30° left roll → bank ≈ -30°."""
# Act
attitude = compute_attitude(_rolled_row(time_s=0.1, roll_deg=-30.0))
# Assert
assert attitude.bank_deg == pytest.approx(-30.0, abs=0.5)
def test_compute_attitude_pitch_down_15_deg_round_trip() -> None:
"""Pitched nose-down 15° → pitch ≈ +15°."""
# Act
attitude = compute_attitude(_pitched_row(time_s=0.1, pitch_deg=15.0))
# Assert
assert attitude.pitch_deg == pytest.approx(15.0, abs=0.5)
def test_compute_translation_m_uses_per_frame_dt() -> None:
"""Translation = horizontal_speed * (1/30s) per video frame."""
# Arrange — 10 m/s east cruise.
row = ImuTelemetryRow(0.0, 0.0, 0, 0, -1000, vx_cms=1000.0, vy_cms=0.0, vz_cms=0.0)
# Act
translation = compute_translation_m(row, prev_row=None)
# Assert — 10 m/s × (1/30 s) ≈ 0.333 m
assert translation == pytest.approx(10.0 / 30.0, rel=1e-6)
def test_compute_overlap_fraction_full_overlap_when_translation_zero() -> None:
# Act
overlap = compute_overlap_fraction(translation_m=0.0, ground_footprint_m=147.0)
# Assert
assert overlap == pytest.approx(1.0)
def test_compute_overlap_fraction_half_overlap_at_half_footprint() -> None:
"""Translating by half the footprint → 50% overlap."""
# Act
overlap = compute_overlap_fraction(translation_m=73.5, ground_footprint_m=147.0)
# Assert
assert overlap == pytest.approx(0.5, abs=1e-6)
def test_compute_overlap_fraction_clamped_at_zero() -> None:
"""Translating further than the footprint → 0% (clamped, never negative)."""
# Act
overlap = compute_overlap_fraction(translation_m=300.0, ground_footprint_m=147.0)
# Assert
assert overlap == 0.0
def test_compute_overlap_fraction_rejects_zero_footprint() -> None:
# Act / Assert
with pytest.raises(ValueError, match="ground_footprint_m must be > 0"):
compute_overlap_fraction(translation_m=1.0, ground_footprint_m=0.0)
def test_classify_frames_expands_each_imu_row_to_three_video_frames() -> None:
"""VIDEO_FRAMES_PER_IMU_ROW = 3; classify_frames respects it."""
# Arrange
rows = [_level_row(time_s=0.0), _level_row(time_s=0.1)]
# Act
classifications = classify_frames(rows)
# Assert
assert len(classifications) == 2 * VIDEO_FRAMES_PER_IMU_ROW == 6
assert [c.frame_index for c in classifications] == [0, 1, 2, 3, 4, 5]
assert [c.imu_row_index for c in classifications] == [0, 0, 0, 1, 1, 1]
def test_classify_frames_marks_level_cruise_as_normal() -> None:
"""Level cruise rows (±10° attitude, low translation) are all normal."""
# Arrange — 10 rows of level cruise.
rows = [_level_row(time_s=0.1 * i) for i in range(10)]
# Act
classifications = classify_frames(rows)
# Assert
assert all(c.is_normal for c in classifications)
assert all(c.excluded_reason == "" for c in classifications)
def test_classify_frames_excludes_sharp_roll() -> None:
"""A 25° roll row is excluded; the level rows around it stay normal."""
# Arrange — 3 level + 1 sharp roll + 3 level
rows = (
[_level_row(time_s=0.1 * i) for i in range(3)]
+ [_rolled_row(time_s=0.3, roll_deg=25.0)]
+ [_level_row(time_s=0.1 * i) for i in range(4, 7)]
)
# Act
classifications = classify_frames(rows)
# Assert
sharp_frames = [c for c in classifications if c.imu_row_index == 3]
other_frames = [c for c in classifications if c.imu_row_index != 3]
assert len(sharp_frames) == VIDEO_FRAMES_PER_IMU_ROW
assert all(not c.is_normal for c in sharp_frames)
assert all(c.excluded_reason == "attitude_exceeds_limit" for c in sharp_frames)
assert all(c.is_normal for c in other_frames)
def test_classify_frames_is_reproducible_ac1() -> None:
"""AC-1: same input → same classification across two runs."""
# Arrange — pull a real chunk of Derkachi telemetry.
rows = load_imu_telemetry(DERKACHI_IMU_CSV)[:100]
# Act
a = classify_frames(rows)
b = classify_frames(rows)
# Assert
assert a == b
def test_classify_frames_rejects_invalid_overlap_threshold() -> None:
# Act / Assert
with pytest.raises(ValueError, match="min_overlap_fraction"):
classify_frames([_level_row()], min_overlap_fraction=1.5)
def test_classify_frames_rejects_invalid_attitude_limit() -> None:
# Act / Assert
with pytest.raises(ValueError, match="attitude_limit_deg"):
classify_frames([_level_row()], attitude_limit_deg=0.0)
def test_compute_success_ratio_perfect_run_passes() -> None:
"""100 normal frames + 100 success metrics → ratio 1.0; passes."""
# Arrange
rows = [_level_row(time_s=0.1 * i) for i in range(34)] # 34 × 3 = 102 frames
classifications = classify_frames(rows)
success_map = {c.frame_index: True for c in classifications}
# Act
report = compute_success_ratio(classifications, success_map)
# Assert
assert report.denominator == len(classifications)
assert report.success_count == len(classifications)
assert report.ratio == 1.0
assert report.passes is True
assert report.excluded_count == 0
def test_compute_success_ratio_at_95_pct_passes() -> None:
"""Exactly 95% success → AC-2 passes."""
# Arrange — 20 normal frames, 1 failure → 19/20 = 0.95.
rows = [_level_row(time_s=0.1 * i) for i in range(7)] # 7 × 3 = 21 frames; trim to 20.
classifications = classify_frames(rows)[:20]
success_map = {c.frame_index: (i != 0) for i, c in enumerate(classifications)}
# Act
report = compute_success_ratio(classifications, success_map)
# Assert
assert report.denominator == 20
assert report.success_count == 19
assert report.ratio == pytest.approx(0.95)
assert report.passes is True
def test_compute_success_ratio_below_95_pct_fails() -> None:
"""94% success → AC-2 fails."""
# Arrange — 100 normal frames, 6 failures → 94/100 = 0.94.
rows = [_level_row(time_s=0.1 * i) for i in range(34)]
classifications = classify_frames(rows)[:100]
success_map = {c.frame_index: (i >= 6) for i, c in enumerate(classifications)}
# Act
report = compute_success_ratio(classifications, success_map)
# Assert
assert report.denominator == 100
assert report.ratio == pytest.approx(0.94)
assert report.passes is False
def test_compute_success_ratio_excludes_sharp_turn_from_denominator_ac3() -> None:
"""AC-3: sharp-turn frames are NOT counted in the denominator."""
# Arrange — 5 normal + 5 sharp + 5 normal IMU rows = 45 frames total.
rows = (
[_level_row(time_s=0.1 * i) for i in range(5)]
+ [_rolled_row(time_s=0.1 * (5 + i), roll_deg=30.0) for i in range(5)]
+ [_level_row(time_s=0.1 * (10 + i)) for i in range(5)]
)
classifications = classify_frames(rows)
success_map = {c.frame_index: True for c in classifications}
# Act
report = compute_success_ratio(classifications, success_map)
# Assert — 30 normal video frames; 15 excluded by attitude.
assert report.denominator == 30
assert report.excluded_by_attitude == 15
assert report.excluded_by_overlap == 0
assert report.excluded_by_missing_metric == 0
def test_compute_success_ratio_handles_missing_metric_separately() -> None:
"""A normal frame without a success-map entry is excluded as 'missing'."""
# Arrange
rows = [_level_row(time_s=0.1 * i) for i in range(5)]
classifications = classify_frames(rows)
# Drop the first three frames from the success map.
success_map = {c.frame_index: True for c in classifications[3:]}
# Act
report = compute_success_ratio(classifications, success_map)
# Assert
assert report.excluded_by_missing_metric == 3
assert report.denominator == len(classifications) - 3
def test_constants_match_spec() -> None:
"""The constants exposed by the module must match the AC text."""
# Assert
assert ATTITUDE_LIMIT_DEG == 10.0
assert TARGET_OVERLAP_FRACTION == 0.40
assert SUCCESS_RATIO_REQUIRED == 0.95
assert VIDEO_FPS == 30
assert IMU_HZ == 10
assert VIDEO_FRAMES_PER_IMU_ROW == 3
assert DEFAULT_GROUND_FOOTPRINT_M > 0
def test_write_csv_evidence_round_trip(tmp_path: Path) -> None:
"""CSV header + per-frame row written exactly as specified."""
# Arrange
rows = [_level_row(time_s=0.1 * i) for i in range(2)]
classifications = classify_frames(rows)
success_map = {0: True, 1: False, 2: True, 3: True, 4: True, 5: True}
out_path = tmp_path / "ft-p-04.csv"
# Act
write_csv_evidence(out_path, classifications, success_map)
# Assert
written = list(csv.reader(out_path.open()))
assert written[0] == [
"frame_index",
"imu_row_index",
"bank_deg",
"pitch_deg",
"translation_m",
"overlap_fraction",
"is_normal",
"excluded_reason",
"registration_success",
]
assert len(written) == 1 + len(classifications)
# frame 1 must have registration_success=false written.
assert written[2][8] == "false"
def test_write_csv_evidence_omits_metric_when_missing(tmp_path: Path) -> None:
"""Frames without a success-map entry emit an empty registration_success cell."""
# Arrange
rows = [_level_row(time_s=0.0)]
classifications = classify_frames(rows)
out_path = tmp_path / "ft-p-04-empty.csv"
# Act
write_csv_evidence(out_path, classifications, {})
# Assert
written = list(csv.reader(out_path.open()))
assert all(row[8] == "" for row in written[1:])