"""AZ-699 — Real-flight validation runner + accuracy report. Runs the ``gps-denied-replay`` console-script against the **real** ``derkachi.tlog`` (binary, not the AZ-265 synth) plus the real flight video and the AZ-702 factory-sheet camera calibration, computes the full horizontal-error distribution, and writes a structured Markdown accuracy report to ``_docs/06_metrics/real_flight_validation_{YYYY-MM-DD}.md``. Unlike the AZ-404 1-min test (``test_derkachi_1min.py``), this one emits an **honest** PASS/FAIL verdict — no ``@xfail`` mask. The threshold gate is ≥ 80 % of emissions within 100 m (AZ-696 epic AC-3). When the run fails, the failure message references the calibration acquisition method (factory-sheet for AZ-702 or the ``adti26.json`` placeholder) so the root cause is attributable without re-reading the source. The test SKIPS (not ``@xfail``) when any prerequisite is absent: * ``RUN_REPLAY_E2E`` env var not set (matches AZ-404 gating). * Real ``derkachi.tlog`` binary missing. * Real ``flight_derkachi.mp4`` missing OR placeholder-sized (the repo currently ships a 134-byte placeholder for the AZ-404 fixture; this AZ-699 path requires a decodable video). * ``gps-denied-replay`` console-script not installed. Style: every test follows the Arrange / Act / Assert pattern. """ from __future__ import annotations import datetime import json import os import shutil import subprocess import sys import time from pathlib import Path from typing import Any import pytest from gps_denied_onboard.helpers.gps_compare import ( GroundTruthRow, horizontal_error_distribution, ) from gps_denied_onboard.replay_input import load_tlog_ground_truth from gps_denied_onboard.helpers.accuracy_report import ( AC3_GATE_PCT, AC3_GATE_THRESHOLD_M, ReportContext, format_failure_message, render_report, verdict_passes_ac3, ) # ---------------------------------------------------------------------- # Skip gates def _repo_root() -> Path: return Path(__file__).resolve().parents[3] def _derkachi_dir() -> Path: return _repo_root() / "_docs" / "00_problem" / "input_data" / "flight_derkachi" # Real video must be more than the placeholder size to actually # decode. The current placeholder is 134 bytes; bump the floor to # 1 MB so any real recording trips the gate but the placeholder # never does. _MIN_REAL_VIDEO_BYTES: int = 1_000_000 def _real_inputs_present() -> tuple[bool, str]: derkachi = _derkachi_dir() tlog = derkachi / "derkachi.tlog" video = derkachi / "flight_derkachi.mp4" calibration = derkachi / "khp20s30_factory.json" if not tlog.is_file(): return False, f"real tlog missing: {tlog}" if not video.is_file(): return False, f"real video missing: {video}" if video.stat().st_size < _MIN_REAL_VIDEO_BYTES: return ( False, f"video at {video} is only {video.stat().st_size} bytes — " "placeholder, not a real recording. Drop the real " "flight_derkachi.mp4 into the fixture dir to enable AZ-699.", ) if not calibration.is_file(): return False, ( f"calibration missing: {calibration}; ship the AZ-702 " "factory-sheet calibration to enable AZ-699." ) return True, "" def _heavy_skip_reason() -> str | None: if os.environ.get("RUN_REPLAY_E2E", "").lower() not in { "1", "true", "yes", "on", }: return "AZ-699 e2e test gated by RUN_REPLAY_E2E=1" return None def _binary_present() -> tuple[bool, str | None]: binary = shutil.which("gps-denied-replay") if binary is not None: return True, binary venv_bin = Path(sys.executable).parent / "gps-denied-replay" if venv_bin.exists(): return True, str(venv_bin) return False, None # ---------------------------------------------------------------------- # Helpers def _read_calibration_acquisition_method(calibration_path: Path) -> str: """Extract the ``acquisition_method`` field set by AZ-702. AZ-702 documents that the camera calibration JSON carries an ``acquisition_method`` string ("factory-sheet" for the KHP20S30 factory deliverable). When the field is absent we label the run as ``"unknown"`` so the failure message stays honest — pretending the calibration is factory-sheet when it is actually a placeholder would mis-attribute the residual. """ try: data = json.loads(calibration_path.read_text()) except (OSError, json.JSONDecodeError): return "unknown" method = data.get("acquisition_method") if isinstance(method, str) and method: return method return "unknown" def _parse_jsonl(path: Path) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for line in path.read_text().splitlines(): if not line.strip(): continue rows.append(json.loads(line)) return rows def _load_full_ground_truth(tlog_path: Path) -> list[GroundTruthRow]: series = load_tlog_ground_truth(tlog_path).records return [ GroundTruthRow( t_s=fix.ts_ns / 1e9, lat_deg=fix.lat_deg, lon_deg=fix.lon_deg, alt_m=fix.alt_m, ) for fix in series ] # ---------------------------------------------------------------------- # AC-1 + AC-2 + AC-3: full real-flight run + honest verdict + report @pytest.mark.tier2 @pytest.mark.xfail( reason=( "Blocked by AZ-848 (+ AZ-883). The tlog adapter path is " "structurally broken at the clock layer: VioOutput.emitted_at_ns " "is sourced from process monotonic_ns (klt_ransac.py:274) while " "ImuWindow.ts_end_ns comes from the FC IMU timebase, so the C5 " "ESKF immediately diverges (mahalanobis² > 100) at frame 3 " "with c5.state.eskf_out_of_order. AZ-883 is the related " "SCALED_IMU2.ts_ns=0 default that exacerbates the mismatch on " "some tlogs. AZ-895 made the (video, CSV) path the primary " "replay surface, so AZ-848 is no longer bench-blocking — but " "this test still exercises the legacy tlog path and will " "remain xfail until AZ-848 / AZ-883 ship. The AZ-776 / AZ-777 " "gaps that originally surfaced via this test were closed in " "cycle 3, but the clock-mismatch root cause then surfaced " "underneath." ), strict=False, ) def test_az699_real_flight_validation_emits_verdict_and_report( tmp_path: Path, ) -> None: # Arrange: gate cleanly on missing prerequisites (no xfail). heavy_reason = _heavy_skip_reason() if heavy_reason is not None: pytest.skip(heavy_reason) inputs_ok, inputs_reason = _real_inputs_present() if not inputs_ok: pytest.skip(inputs_reason) binary_ok, binary = _binary_present() if not binary_ok or binary is None: pytest.skip("gps-denied-replay console-script not installed") derkachi = _derkachi_dir() tlog_path = derkachi / "derkachi.tlog" video_path = derkachi / "flight_derkachi.mp4" calibration_path = derkachi / "khp20s30_factory.json" output_path = tmp_path / "estimator_output.jsonl" # Signing-key stub (NoopMavlinkTransport ignores bytes; path # validation just needs the file to exist). signing_key_path = tmp_path / "signing_key.bin" signing_key_path.write_bytes(b"\x00" * 32) config_path = tmp_path / "config.yaml" config_path.write_text( "mode: replay\n" "replay:\n" " pace: asap\n" " target_fc_dialect: ardupilot_plane\n" ) argv = [ binary, "--video", str(video_path), "--tlog", str(tlog_path), "--output", str(output_path), "--camera-calibration", str(calibration_path), "--config", str(config_path), "--mavlink-signing-key", str(signing_key_path), "--pace", "asap", # AZ-698: let the segmenter + NCC find the last flight # window in the multi-flight tlog. No manual offset. "--auto-trim", ] # Act t0 = time.monotonic() completed = subprocess.run( argv, capture_output=True, text=True, timeout=900, # 15-min NFR budget per AZ-699 spec ) wall_clock_s = time.monotonic() - t0 assert completed.returncode == 0, ( f"gps-denied-replay exited {completed.returncode}\n" f"stdout:\n{completed.stdout}\nstderr:\n{completed.stderr}" ) assert wall_clock_s <= 900.0, ( f"AZ-699 NFR: real-flight run took {wall_clock_s:.1f} s; " "budget is 15 min" ) emissions = _parse_jsonl(output_path) assert emissions, "no JSONL emissions produced — pipeline failure" ground_truth = _load_full_ground_truth(tlog_path) assert ground_truth, "ground-truth extraction yielded zero rows" distribution = horizontal_error_distribution( emissions, ground_truth ) assert distribution.count > 0, ( "no emissions paired with ground truth — JSONL timestamps " "outside the tlog GPS window?" ) context = ReportContext( run_date_utc=datetime.datetime.now(datetime.timezone.utc) .date() .isoformat(), tlog_path=tlog_path, video_path=video_path, calibration_acquisition_method=_read_calibration_acquisition_method( calibration_path ), clip_duration_s=( (ground_truth[-1].t_s - ground_truth[0].t_s) if ground_truth else 0.0 ), emissions_count=len(emissions), ) passed = verdict_passes_ac3(distribution) report_text = render_report(distribution, context, passed=passed) report_dir = _repo_root() / "_docs" / "06_metrics" report_dir.mkdir(parents=True, exist_ok=True) report_path = ( report_dir / f"real_flight_validation_{context.run_date_utc}.md" ) report_path.write_text(report_text) # Assert AC-2: the report exists and contains all required rows. assert report_path.is_file() written = report_path.read_text() assert "## Horizontal error (metres)" in written assert "## Threshold-hit share" in written assert "Mean" in written assert "p50" in written assert "p95" in written assert "p99" in written for threshold in (10, 25, 50, 100): assert f"| {threshold} |" in written, ( f"threshold {threshold} m row missing from report" ) # Assert AC-1 + AC-3: honest verdict. On FAIL, the message must # reference the calibration method (AZ-699 AC-3). if not passed: msg = format_failure_message(distribution, context) pytest.fail( f"{msg}\n\nReport: {report_path}\n" f"Run wall-clock: {wall_clock_s:.1f} s" ) # PASS path — emit a concise summary so the CI log carries the # numbers without forcing the operator to open the report. share_100m = distribution.threshold_hit_share[AC3_GATE_THRESHOLD_M] * 100.0 assert share_100m >= AC3_GATE_PCT # guarded by verdict_passes_ac3 print( f"AZ-699 PASS — {share_100m:.1f} % within " f"{AC3_GATE_THRESHOLD_M:.0f} m, mean=" f"{distribution.horizontal_error_mean_m:.1f} m, p95=" f"{distribution.horizontal_error_p95_m:.1f} m. " f"Report at {report_path}", file=sys.stderr, )