"""FT-P-10 — GTSAM smoothing-loop look-back accuracy (AC-4.5 revised). The full scenario: 1. Replay Derkachi end-to-end through the SUT. 2. Read the post-run FDR archive for per-past-keyframe pose records; per AC-NEW-3 the SUT emits two records per past keyframe: ``pose_kind = "raw"`` (single-shot at first emission) and ``pose_kind = "smoothed"`` (iSAM2-converged at smoother window exit). 3. Load Derkachi ``data_imu.csv`` and extract ``GLOBAL_POSITION_INT`` GT poses (10 Hz). 4. Pair raw + smoothed per keyframe; compute distance(raw, GT) and distance(smoothed, GT); aggregate. 5. Assert AC-2 (improvement_rate ≥ 0.80) AND AC-3 (mean_improvement ≥ 5 m). What this file owns: * AC-1 / AC-2 / AC-3 / AC-4 wiring above. * Per-strategy reporting (the spec calls out that vins_mono ≥ okvis2 ≥ klt_ransac is expected even if all pass). What this file does NOT own: * The MP4 replay → ``runner.helpers.frame_source_replay`` (stub) — gated. * The IMU CSV replay → ``runner.helpers.imu_replay`` (stub) — gated. * The FDR-archive iteration → ``runner.helpers.fdr_reader`` (stub) — gated. When the upstream helpers land, this file's runtime path activates automatically. """ from __future__ import annotations import csv from pathlib import Path import pytest from runner.helpers import smoothing_evaluator as se DERKACHI_DIR = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" ) DERKACHI_IMU_CSV = DERKACHI_DIR / "data_imu.csv" DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" def _load_derkachi_gt_track() -> list[se.GtPose]: """Read GLOBAL_POSITION_INT poses from data_imu.csv. lat / lon are stored as decimal degrees (e.g. 50.0809634), NOT 1e-7 int32. The column names confirm this: ``GLOBAL_POSITION_INT.lat`` / ``GLOBAL_POSITION_INT.lon`` per the CSV header. """ track: list[se.GtPose] = [] with DERKACHI_IMU_CSV.open() as fh: reader = csv.DictReader(fh) for row in reader: ts = row.get("timestamp(ms)", "").strip() if not ts: continue track.append( se.GtPose( monotonic_ms=int(float(ts)), lat_deg=float(row["GLOBAL_POSITION_INT.lat"]), lon_deg=float(row["GLOBAL_POSITION_INT.lon"]), ) ) return track @pytest.mark.traces_to("AC-4.5,AC-1,AC-2,AC-3,AC-4") def test_ft_p_10_smoothing_lookback( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """Full FT-P-10 scenario. AC-1: FDR contains raw + smoothed per past keyframe — verified at the pairing step (orphan = excluded). AC-2: improvement_rate ≥ 0.80. AC-3: mean_improvement_m ≥ 5 m. AC-4: parameterised across ``(fc_adapter, vio_strategy)``. """ if not sitl_replay_ready: pytest.skip( "FT-P-10 full replay requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595). Pure-logic ACs covered by " "e2e/_unit_tests/helpers/test_smoothing_evaluator.py." ) from runner.helpers import fdr_reader, imu_replay from runner.helpers.frame_source_replay import FrameSourceReplayer # 1. Drive replay. sink = _resolve_frame_sink() emitter = _resolve_fc_inbound_emitter(fc_adapter) FrameSourceReplayer(sink).replay_video(DERKACHI_MP4) imu_replay.ImuReplayer(emitter).replay(DERKACHI_IMU_CSV) # 2. Collect raw + smoothed pose records from the FDR archive. fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" records: list[se.KeyframePoseRecord] = [] for rec in fdr_reader.iter_records(fdr_root): if rec.record_type == "keyframe_pose": payload = rec.payload records.append( se.KeyframePoseRecord( keyframe_id=int(payload["keyframe_id"]), # type: ignore[arg-type] pose_kind=str(payload["pose_kind"]), # type: ignore[arg-type] monotonic_ms=int(rec.monotonic_ms), lat_deg=float(payload["lat_deg"]), # type: ignore[arg-type] lon_deg=float(payload["lon_deg"]), # type: ignore[arg-type] ) ) if not records: pytest.fail( "FT-P-10: SUT did not emit any keyframe_pose FDR records " "(AC-1 / AC-NEW-3 requires raw + smoothed per past keyframe)." ) # 3. Load Derkachi GT track. gt_track = _load_derkachi_gt_track() if not gt_track: pytest.fail(f"FT-P-10: empty GT track loaded from {DERKACHI_IMU_CSV}") # 4. Evaluate + emit evidence. report = se.evaluate(records, gt_track) out_csv = evidence_dir / f"ft-p-10-{fc_adapter}-{vio_strategy}.csv" se.write_csv_evidence(out_csv, report) # 5. NFR metrics (per-strategy comparability per AC-4). nfr_recorder.record_metric( "ft_p_10.improvement_rate", report.improvement_rate, ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_10.mean_improvement_m", report.mean_improvement_m, ac_id="AC-3" ) nfr_recorder.record_metric( "ft_p_10.median_improvement_m", report.median_improvement_m, ac_id="AC-3" ) nfr_recorder.record_metric( "ft_p_10.pair_count", float(report.pair_count), ac_id="AC-1" ) # 6. AC assertions. assert report.passes_rate, ( f"AC-2 (improvement rate ≥{se.IMPROVEMENT_RATE_REQUIRED:.0%}) failed: " f"{report.improvement_rate:.4f} over {report.pair_count} keyframes" ) assert report.passes_mean, ( f"AC-3 (mean improvement ≥{se.MEAN_IMPROVEMENT_M_REQUIRED} m) failed: " f"mean={report.mean_improvement_m:.3f} m, median={report.median_improvement_m:.3f} m" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] """Return a replay-mode `FrameSink` (counter-only; AZ-597).""" from runner.helpers.replay_mode import NullFrameSink return NullFrameSink() def _resolve_fc_inbound_emitter(fc_adapter: str): # type: ignore[no-untyped-def] """Return a replay-mode `FcInboundEmitter` (counter-only; AZ-597).""" from runner.helpers.replay_mode import NullFcInboundEmitter return NullFcInboundEmitter()