"""FT-N-01 — 350 m outlier injection tolerance (AZ-424 / AC-3.1). Replays the Derkachi flight with the AZ-408 ``outlier`` injector at ``--density medium`` and verifies AC-1 / AC-2 / AC-3 via ``runner.helpers.outlier_tolerance_evaluator``. Gated on the same upstream replay helpers as FT-N-02 / FT-P-07 (``frame_source_replay``, ``fdr_reader``, ``imu_replay``). When those helpers are still stubbed (current state under AZ-441 / AZ-407 leftovers), the scenario test skips while ``e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py`` covers the pure-logic AC-2 / AC-3 invariants. """ from __future__ import annotations from pathlib import Path import pytest from fixtures.injectors.outlier import OutlierInjectionReport from runner.helpers import outlier_tolerance_evaluator as ote @pytest.mark.parametrize( "outlier_injection_derkachi", [{"density": "medium", "seed": 0}], indirect=True, ) @pytest.mark.traces_to("AC-3.1,AC-1,AC-2,AC-3,AC-4") def test_ft_n_01_outlier_tolerance( fc_adapter: str, vio_strategy: str, outlier_injection_derkachi: OutlierInjectionReport, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: if not sitl_replay_ready: pytest.skip( "FT-N-01 full replay requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595). AC-1/AC-2/AC-3 helper logic " "covered by e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py." ) from runner.helpers import fdr_reader from runner.helpers.frame_source_replay import FrameSourceReplayer # 1. AC-1 — load injection plan (outlier event frames + offsets). manifest_path = outlier_injection_derkachi.out_root / "manifest.csv" events = ote.load_outlier_manifest(manifest_path) assert len(events) >= ote.MIN_OUTLIER_COUNT, ( f"AC-1: medium-density injection must produce ≥{ote.MIN_OUTLIER_COUNT} " f"outliers (got {len(events)} from {manifest_path})" ) # 2. Drive replay against the injected frames directory. FrameSourceReplayer(_resolve_frame_sink()).replay_video( outlier_injection_derkachi.out_root / "frames" ) # 3. Collect outbound estimates + GT from FDR + tile cache. fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" estimates: list[ote.OutboundEstimate] = [] for rec in fdr_reader.iter_records(fdr_root): if rec.record_type != "outbound_estimate": continue payload = rec.payload estimates.append( ote.OutboundEstimate( frame_idx=int(payload["frame_idx"]), # type: ignore[arg-type] monotonic_ms=int(rec.monotonic_ms), lat_deg=float(payload["lat_deg"]), # type: ignore[arg-type] lon_deg=float(payload["lon_deg"]), # type: ignore[arg-type] cov_semi_major_m=float(payload["cov_semi_major_m"]), # type: ignore[arg-type] source_label=str(payload["source_label"]), # type: ignore[arg-type] ) ) gt: list[ote.GtPose] = _resolve_gt_per_frame(outlier_injection_derkachi) if not estimates: pytest.fail("FT-N-01: no outbound_estimate records produced") # 4. Evaluate per outlier event. report = ote.evaluate(events, estimates, gt) out_csv = evidence_dir / f"ft-n-01-{fc_adapter}-{vio_strategy}.csv" ote.write_csv_evidence(out_csv, report) # 5. NFR + AC assertions. nfr_recorder.record_metric( "ft_n_01.total_outliers", float(report.total_outliers), ac_id="AC-1" ) nfr_recorder.record_metric( "ft_n_01.failed_event_count", float(report.failed_event_count), ac_id="AC-2" ) for e in report.events: if e.drift_m is not None: nfr_recorder.record_metric( f"ft_n_01.event_{e.frame_idx}.drift_m", e.drift_m, ac_id="AC-2" ) nfr_recorder.record_metric( f"ft_n_01.event_{e.frame_idx}.cov_non_decreasing", 1.0 if e.cov_non_decreasing else 0.0, ac_id="AC-3", ) assert report.passes_count, ( f"AC-1: ≥{ote.MIN_OUTLIER_COUNT} outliers required; " f"got {report.total_outliers}" ) for e in report.events: assert e.passes_drift, ( f"AC-2 (drift ≤ {ote.DRIFT_BUDGET_M} m) failed at frame " f"{e.frame_idx}: drift_m={e.drift_m}, " f"error_before={e.error_before_m}, error_after={e.error_after_m}" ) assert e.passes_covariance, ( f"AC-3 (cov_semi_major_m non-decreasing across window) failed at " f"frame {e.frame_idx}: " f"cov_before={e.cov_before}, cov_outlier={e.cov_outlier}, " f"cov_after={e.cov_after}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] raise NotImplementedError( "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" ) def _resolve_gt_per_frame(report: OutlierInjectionReport) -> list[ote.GtPose]: raise NotImplementedError( "Per-frame GT resolution is owned by AZ-407 / runner.helpers.tile_cache_gt" )