"""Pytest fixtures for the AZ-404 E2E replay tests. The fixtures are import-clean on dev macOS — the heavy work (synthesizing the tlog, invoking the airborne CLI in a subprocess) runs only when ``RUN_REPLAY_E2E=1`` is set in the environment. Without the env var, the test module's collection-time skip marker prevents the fixtures from being requested. """ from __future__ import annotations import json import os import shutil import subprocess import sys from collections.abc import Iterator from dataclasses import dataclass from pathlib import Path from typing import Any import pytest from gps_denied_onboard.replay_input import load_tlog_ground_truth from tests.e2e.replay._helpers import GroundTruthRow, load_ground_truth_csv from tests.e2e.replay._tlog_synth import synthesize_tlog # Duration cap used exclusively for the realtime-pacing test. The full # Derkachi flight is ~490 s; running it at realtime pace in CI would take # ~8 minutes. The realtime test passes --max-duration-s to the CLI so # only this short clip is paced at wall-clock speed. _REALTIME_TEST_CLIP_S: float = 60.0 # ---------------------------------------------------------------------- # Path helpers def _repo_root() -> Path: return Path(__file__).resolve().parents[3] def _derkachi_dir() -> Path: return _repo_root() / "_docs" / "00_problem" / "input_data" / "flight_derkachi" def _calibration_path() -> Path: # AZ-702 ships a factory-sheet approximation for the Topotek # KHP20S30 nadir camera at # `_docs/00_problem/input_data/flight_derkachi/khp20s30_factory.json`. # When present we use it; otherwise we fall back to the # `adti26.json` placeholder so the AC-1/2/5/6 path stays # exercisable on dev macOS without the AZ-702 deliverable. factory_path = _derkachi_dir() / "khp20s30_factory.json" if factory_path.is_file(): return factory_path return _repo_root() / "tests" / "fixtures" / "calibration" / "adti26.json" # ---------------------------------------------------------------------- # Fixtures @dataclass(frozen=True) class DerkachiReplayInputs: """Bundle of paths the AZ-402 CLI consumes for a Derkachi replay run.""" video_path: Path tlog_path: Path calibration_path: Path config_path: Path signing_key_path: Path output_path: Path ground_truth: list[GroundTruthRow] @pytest.fixture(scope="session") def derkachi_replay_inputs(tmp_path_factory: pytest.TempPathFactory) -> DerkachiReplayInputs: """Materialise Derkachi inputs + a synthesized tlog for the e2e run. Session-scoped so the tlog synthesizer runs once across the whole e2e collection. The tlog is cached at ``tmp_path_factory.mktemp("derkachi") / "synth.tlog"`` so each pytest invocation gets a fresh copy; the synthesizer is fast enough (~1 s for 60 s of data) that disk caching across invocations is unnecessary. """ derkachi = _derkachi_dir() csv_path = derkachi / "data_imu.csv" video_path = derkachi / "flight_derkachi.mp4" real_tlog_path = derkachi / "derkachi.tlog" if not video_path.is_file(): pytest.fail(f"Derkachi fixture missing: {video_path}") work_dir = tmp_path_factory.mktemp("derkachi") # AZ-697: prefer the real binary tlog when present; fall back to # synthesizing one from the CSV so dev environments without the # 5.8 MB binary blob still exercise the e2e path. if real_tlog_path.is_file(): tlog_path = real_tlog_path gt_series = load_tlog_ground_truth(real_tlog_path).records ground_truth_full = [ GroundTruthRow( t_s=fix.ts_ns / 1e9, lat_deg=fix.lat_deg, lon_deg=fix.lon_deg, alt_m=fix.alt_m, ) for fix in gt_series ] else: if not csv_path.is_file(): pytest.fail( f"Derkachi fixture missing: {csv_path} — see " "_docs/00_problem/input_data/flight_derkachi/README.md" ) tlog_path = work_dir / "synth.tlog" synthesize_tlog(csv_path, tlog_path) ground_truth_full = load_ground_truth_csv(csv_path) # Empty signing key — the airborne replay path runs the signing # handshake against `NoopMavlinkTransport`, so the key contents do # not affect any wire output. We still need a real file because # the CLI's path-validation gate requires it. signing_key_path = work_dir / "signing_key.bin" signing_key_path.write_bytes(b"\x00" * 32) config_path = work_dir / "config.yaml" config_path.write_text( # Replay-specific overrides; the rest comes from the env vars # the airborne binary's `load_config` honours by default. # # Per-component blocks at the TOP LEVEL — the YAML loader # in `gps_denied_onboard.config.loader._load_yaml_files` # treats each top-level mapping as a block whose key is a # registry slug; nesting the slugs under a `components:` # wrapper makes the loader silently drop them (the wrapper # is not a registered slug). # # Open-loop ESKF composition profile (AZ-776 / ADR-012): # `c4_pose.enabled = false` strips C4 from the composition # graph so the airborne binary can run the mandatory simple # baseline (KLT/RANSAC VIO + ESKF state estimator) end-to-end # without a C4 anchor. ESKF has no iSAM2 graph for C4 to # anchor against; the `compose_root` validation gate rejects # the off-diagonal pairings (`enabled=False` + `gtsam_isam2` # or `enabled=True` + `eskf`) with a `CompositionError`. # Position drifts open-loop without C2/C3/C4 satellite # re-anchoring — AZ-777 (Derkachi C6 reference tile cache) # is the follow-up that closes the satellite-anchoring half # of the per-frame loop. "mode: replay\n" "replay:\n" " pace: asap\n" " target_fc_dialect: ardupilot_plane\n" "c1_vio:\n" " strategy: klt_ransac\n" "c4_pose:\n" " enabled: false\n" "c5_state:\n" " strategy: eskf\n" ) output_path = work_dir / "estimator_output.jsonl" ground_truth = ground_truth_full return DerkachiReplayInputs( video_path=video_path, tlog_path=tlog_path, calibration_path=_calibration_path(), config_path=config_path, signing_key_path=signing_key_path, output_path=output_path, ground_truth=ground_truth, ) @dataclass(frozen=True) class ReplayRunResult: """Outcome of a single ``gps-denied-replay`` subprocess run.""" returncode: int stdout: str stderr: str output_path: Path wall_clock_s: float @pytest.fixture def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any: """Return a callable that invokes the ``gps-denied-replay`` console-script. The callable accepts keyword overrides for ``pace``, ``time_offset_ms``, and ``skip_auto_sync`` (AZ-611); everything else is taken from ``derkachi_replay_inputs``. Output is written to a fresh path per invocation so determinism comparisons (AC-5) get two independent files. Derkachi is a mid-flight fixture (no take-off spike) and the only motion the video detector sees in the first 60 s is camera shake and scenery change — neither tlog nor video can produce a reliable auto-sync signal. The synth tlog and the video share the same ``t=0`` anchor by construction (see ``_tlog_synth.py``), so the correct offset is exactly ``0``. The fixture defaults reflect that — heavy ACs pass ``time_offset_ms=0`` + ``skip_auto_sync=True`` so the run never touches the AC-9 validator that would otherwise reject the fixture's false-positive video motion onset. """ binary = shutil.which("gps-denied-replay") if binary is None: venv_bin = Path(sys.executable).parent / "gps-denied-replay" if venv_bin.exists(): binary = str(venv_bin) if binary is None: pytest.skip( "gps-denied-replay console-script not on PATH; " "install the package in the test venv" ) invocation_count = {"n": 0} def _run( *, pace: str = "asap", time_offset_ms: int | None = 0, skip_auto_sync: bool = True, max_duration_s: float | None = None, ) -> ReplayRunResult: import time invocation_count["n"] += 1 out_path = derkachi_replay_inputs.output_path.with_name( f"estimator_output_{invocation_count['n']}.jsonl" ) argv = [ binary, "--video", str(derkachi_replay_inputs.video_path), "--tlog", str(derkachi_replay_inputs.tlog_path), "--output", str(out_path), "--camera-calibration", str(derkachi_replay_inputs.calibration_path), "--config", str(derkachi_replay_inputs.config_path), "--mavlink-signing-key", str(derkachi_replay_inputs.signing_key_path), "--pace", pace, ] if time_offset_ms is not None: argv.extend(["--time-offset-ms", str(time_offset_ms)]) if skip_auto_sync: argv.append("--skip-auto-sync") if max_duration_s is not None: argv.extend(["--max-duration-s", str(max_duration_s)]) # Build-flag env vars required by the airborne factories for # the strategies the replay config selects (klt_ransac VIO + # ESKF state estimator). Both default OFF in the factory # gates — opt them in explicitly so the eager # `_build_c5_state_estimator_pair` and the lazy c1_vio # factory find their gating flags ON. run_env = { **os.environ, "BUILD_KLT_RANSAC": "ON", "BUILD_STATE_ESKF": "ON", } t0 = time.monotonic() completed = subprocess.run( argv, capture_output=True, text=True, timeout=180, env=run_env, ) wall_s = time.monotonic() - t0 return ReplayRunResult( returncode=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, output_path=out_path, wall_clock_s=wall_s, ) return _run @pytest.fixture def operator_pre_flight_setup(tmp_path: Path) -> Iterator[Path]: """Operator C12 pre-flight rehearsal stub. Per AZ-404's spec this fixture should run the operator's full C10/C11/C12 pre-flight against a ``mock-suite-sat-service`` fixture and yield the populated cache directory. The current ``tests/fixtures/mock-suite-sat-service`` is a bootstrap stub (only ``GET /healthz`` per its README) — the full D-PROJ-2 contract is not implemented. Until that ships, AC-8 (operator workflow rehearsal) is skipped at the test level; this fixture yields a placeholder cache directory so test bodies that request it can fail-fast with a documented reason rather than a surprise ImportError. """ cache_dir = tmp_path / "operator_cache" cache_dir.mkdir() yield cache_dir