"""E2E orchestrator for the AZ-835 7-step pipeline (AZ-840 / Epic AZ-835 C4). Wraps the AZ-699 verdict-report writing path with the AZ-839 C3 fixture's `PopulatedC6Cache` so a single Tier-2 test can run from ``(tlog, video, calibration)`` to a horizontal-error report without operator hand-curation between steps. The 7-step Epic narrative (``_docs/02_tasks/todo/AZ-840_e2e_orchestrator_test.md``): 1. Active flight cut + tlog/video sync — handled by ``gps-denied-replay`` ``--auto-trim`` (AZ-405 / AZ-698) inside the airborne binary. 2. On-fly frame + IMU extraction — same binary's per-frame loop. 3. Auto-create route — done by the C3 fixture (``operator_pre_flight_setup`` calls ``extract_route_from_tlog``). 4. POST route to satellite-provider — C3 fixture (AZ-838 ``SatelliteProviderRouteClient.seed_route``). 5. Build FAISS index — C3 fixture (AZ-322 ``DescriptorBatcher``). 6. Run gps-denied airborne pipeline — this module's ``_run_replay_subprocess`` invokes ``gps-denied-replay`` against the populated cache. 7. Get GPS fixes, check vs tlog GPS — this module's ``_load_ground_truth`` + ``horizontal_error_distribution`` + ``render_report`` writes the verdict markdown. The C3 fixture mutates ``c6_tile_cache.root_dir`` to point at a ``tmp_path_factory.mktemp`` value (AZ-839 batch 108b). The static operator YAML at ``GPS_DENIED_OPERATOR_CONFIG_PATH`` cannot know that path. ``write_effective_replay_config`` reads the static YAML, overlays the ``c6_tile_cache.root_dir`` override, writes the merged result to a tmp file, and returns the path the airborne binary will load via ``--config``. This keeps a single source of truth for the cache_root override across the in-memory C3 fixture path and the subprocess airborne path. Public surface — re-exported from this module: * :class:`OrchestratorStep` — failure-step labels per AC-5 ("fails LOUD with a clear error pointing at the failing step"). * :class:`OrchestrationFailure` — wraps the underlying exception with the step that produced it. * :class:`OrchestrationReport` — return value of :func:`run_e2e_orchestration` (verdict, distribution, paths, wall-clock measurements per AC-4). * :func:`write_effective_replay_config` — small helper for the config merge step. * :func:`run_e2e_orchestration` — the AC-1 entry point. """ from __future__ import annotations import datetime import json import logging import subprocess import time from collections.abc import Callable, Mapping from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Any import yaml from gps_denied_onboard.helpers.accuracy_report import ( AC3_GATE_PCT, AC3_GATE_THRESHOLD_M, ReportContext, render_report, verdict_passes_ac3, ) from gps_denied_onboard.helpers.gps_compare import ( GroundTruthRow, HorizontalErrorDistribution, horizontal_error_distribution, ) from gps_denied_onboard.replay_input import load_tlog_ground_truth from tests.e2e.replay._operator_pre_flight import PopulatedC6Cache __all__ = [ "OrchestrationFailure", "OrchestrationReport", "OrchestratorStep", "read_calibration_acquisition_method", "run_e2e_orchestration", "write_effective_replay_config", ] # Replay-subprocess wall-clock cap for the Derkachi clip per AZ-840 # AC-4 (15 min soft target). Exposed as a default that the integration # test can override; the unit tests rely on the contract that the # runner argument is a free callable. _DEFAULT_MAX_SECONDS: float = 900.0 _LOGGER = logging.getLogger("tests.e2e.replay.e2e_orchestrator") class OrchestratorStep(str, Enum): """Labels for the 7-step pipeline used by :class:`OrchestrationFailure`. AC-5: every failure that reaches the test surface must name the step that produced it. The string values are stable so test assertions and log readers can match on them. """ VALIDATE_INPUTS = "validate_inputs" WRITE_EFFECTIVE_CONFIG = "write_effective_config" AIRBORNE_PIPELINE = "airborne_pipeline" PARSE_EMISSIONS = "parse_emissions" LOAD_GROUND_TRUTH = "load_ground_truth" COMPUTE_DISTRIBUTION = "compute_distribution" RENDER_REPORT = "render_report" class OrchestrationFailure(RuntimeError): """Failure inside one of the 7 orchestration steps (AC-5). The :attr:`step` attribute names the failing step; the message embeds it as the prefix so plain log-readers see the failure location without inspecting the exception object. """ def __init__(self, step: OrchestratorStep, message: str) -> None: super().__init__(f"[{step.value}] {message}") self.step = step @dataclass(frozen=True, slots=True) class OrchestrationReport: """Return value of :func:`run_e2e_orchestration`. Attributes: verdict_passed: ``True`` iff the run met the AZ-696 epic AC-3 gate (>= AC3_GATE_PCT% within AC3_GATE_THRESHOLD_M m). distribution: Computed horizontal-error distribution. report_path: Markdown report written under ``report_dir``. emissions_count: Total estimator-output records consumed. wall_clock_s: Wall-clock seconds for the orchestration run (excludes the C3 fixture setup; covers steps 1-2-6-7). replay_subprocess_seconds: Wall-clock seconds the airborne replay subprocess took. Always <= ``wall_clock_s``. """ verdict_passed: bool distribution: HorizontalErrorDistribution report_path: Path emissions_count: int wall_clock_s: float replay_subprocess_seconds: float def read_calibration_acquisition_method(calibration_path: Path) -> str: """Return the AZ-702 ``acquisition_method`` field, or ``"unknown"``. Mirrors ``test_derkachi_real_tlog._read_calibration_acquisition_method`` so the AZ-840 verdict report can name the calibration provenance in its failure message (AZ-699 AC-3). Pure helper; the report writer needs the string, not the JSON. """ try: data = json.loads(calibration_path.read_text()) except (OSError, json.JSONDecodeError): return "unknown" method = data.get("acquisition_method") if isinstance(method, str) and method: return method return "unknown" def write_effective_replay_config( *, base_config_path: Path, cache_root: Path, output_path: Path, ) -> Path: """Merge cache_root override into the static operator YAML. Reads ``base_config_path`` as YAML, sets the ``c6_tile_cache.root_dir`` to ``cache_root`` (forcing the FAISS index path to fall back to ``/descriptor.index``), and writes the merged document to ``output_path`` as YAML. The merge is field-level: every other block in the base YAML is preserved verbatim. This keeps a single source of truth for the operator config — the test harness only contributes the dynamic cache_root. Returns: The ``output_path`` argument, for ergonomic chaining. Raises: OrchestrationFailure (step=WRITE_EFFECTIVE_CONFIG): Base YAML unreadable, malformed, or not a top-level mapping. """ try: base_text = base_config_path.read_text() except OSError as exc: raise OrchestrationFailure( OrchestratorStep.WRITE_EFFECTIVE_CONFIG, f"cannot read base config at {base_config_path}: {exc!r}", ) from exc try: base_data = yaml.safe_load(base_text) or {} except yaml.YAMLError as exc: raise OrchestrationFailure( OrchestratorStep.WRITE_EFFECTIVE_CONFIG, f"base config YAML at {base_config_path} is malformed: {exc!r}", ) from exc if not isinstance(base_data, dict): raise OrchestrationFailure( OrchestratorStep.WRITE_EFFECTIVE_CONFIG, f"base config YAML at {base_config_path} must be a mapping; " f"got {type(base_data).__name__}", ) c6_block_raw = base_data.get("c6_tile_cache") c6_block = dict(c6_block_raw) if isinstance(c6_block_raw, dict) else {} c6_block["root_dir"] = str(cache_root) c6_block["faiss_index_path"] = "" base_data["c6_tile_cache"] = c6_block try: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( yaml.safe_dump(base_data, sort_keys=True, default_flow_style=False) ) except OSError as exc: raise OrchestrationFailure( OrchestratorStep.WRITE_EFFECTIVE_CONFIG, f"cannot write effective config at {output_path}: {exc!r}", ) from exc return output_path def run_e2e_orchestration( *, populated_cache: PopulatedC6Cache, base_config_path: Path, tlog_path: Path, video_path: Path, calibration_path: Path, signing_key_path: Path, replay_binary: Path, output_path: Path, report_dir: Path, effective_config_path: Path, run_date_utc: str | None = None, runner: Callable[..., subprocess.CompletedProcess[str]] = subprocess.run, subprocess_env: Mapping[str, str] | None = None, max_seconds: float = _DEFAULT_MAX_SECONDS, logger: logging.Logger | None = None, ) -> OrchestrationReport: """Run AZ-835 steps 1-7 against the AZ-839 populated cache. Steps 3-5 are the responsibility of ``populated_cache`` (the AZ-839 C3 fixture); this function covers 1-2-6 (the airborne replay subprocess) and 7 (verdict report). The C3 fixture and this function share the cache_root via :func:`write_effective_replay_config` so the airborne binary reads the same FAISS index the fixture wrote (AC-3). Args: populated_cache: C3 fixture output (AZ-839). Carries ``cache_root``, ``faiss_index_path``, and the route spec the test pipeline produced. base_config_path: Static operator config YAML (``GPS_DENIED_OPERATOR_CONFIG_PATH``). Must register ``c6_tile_cache``, ``c10_provisioning``, ``c2_vpr``, ``c4_pose``, and ``c5_state`` blocks for the airborne binary to compose the replay graph. tlog_path: ArduPilot binary tlog the test consumes. video_path: Flight video file the test consumes. calibration_path: Camera calibration JSON (AZ-702 factory-sheet for Derkachi). signing_key_path: MAVLink signing-key file. Replay protocol Invariant 11 — required even for the noop transport. replay_binary: ``gps-denied-replay`` console-script path. output_path: Where the airborne binary writes JSONL estimator emissions. report_dir: Directory the verdict markdown is written to. effective_config_path: Where the cache_root-merged YAML is written. The path is passed to the airborne binary via ``--config``. run_date_utc: ISO-8601 date for the report filename and header. Defaults to today UTC. runner: ``subprocess.run`` by default; tests inject a fake that emits a synthetic JSONL output. subprocess_env: Optional environment overlay for the replay subprocess. ``None`` means ``os.environ``. max_seconds: Hard wall-clock cap for the airborne replay subprocess. The orchestrator times out the runner via its ``timeout`` kwarg; an exceeded budget surfaces as ``OrchestrationFailure(step=AIRBORNE_PIPELINE)``. logger: Optional logger. Defaults to the module logger. Returns: :class:`OrchestrationReport` on success. The verdict can be PASS or FAIL — AC-2 mandates the report exists either way. Raises: OrchestrationFailure: Any of the 7 steps failed. The ``step`` attribute names the failing step. """ log = logger or _LOGGER started = time.monotonic() effective_run_date = run_date_utc or ( datetime.datetime.now(datetime.timezone.utc).date().isoformat() ) _validate_inputs( base_config_path=base_config_path, tlog_path=tlog_path, video_path=video_path, calibration_path=calibration_path, signing_key_path=signing_key_path, replay_binary=replay_binary, report_dir=report_dir, ) write_effective_replay_config( base_config_path=base_config_path, cache_root=populated_cache.cache_root, output_path=effective_config_path, ) replay_subprocess_seconds = _run_replay_subprocess( replay_binary=replay_binary, video_path=video_path, tlog_path=tlog_path, output_path=output_path, calibration_path=calibration_path, config_path=effective_config_path, signing_key_path=signing_key_path, max_seconds=max_seconds, runner=runner, env=subprocess_env, logger=log, ) emissions = _parse_jsonl(output_path) ground_truth = _load_ground_truth(tlog_path) distribution = _compute_distribution(emissions, ground_truth) context = ReportContext( run_date_utc=effective_run_date, tlog_path=tlog_path, video_path=video_path, calibration_acquisition_method=read_calibration_acquisition_method( calibration_path ), clip_duration_s=( ground_truth[-1].t_s - ground_truth[0].t_s if ground_truth else 0.0 ), emissions_count=len(emissions), ) verdict_passed = verdict_passes_ac3(distribution) report_path = _render_and_write_report( distribution=distribution, context=context, passed=verdict_passed, report_dir=report_dir, ) log.info( "e2e_orchestrator: report written", extra={ "kind": "e2e_orchestrator.report_written", "kv": { "report_path": str(report_path), "verdict_passed": verdict_passed, "share_within_threshold_pct": ( distribution.threshold_hit_share.get( AC3_GATE_THRESHOLD_M, 0.0 ) * 100.0 ), "ac3_gate_pct": AC3_GATE_PCT, "emissions_count": len(emissions), "ground_truth_pairings": distribution.count, }, }, ) wall_clock_s = max(0.0, time.monotonic() - started) return OrchestrationReport( verdict_passed=verdict_passed, distribution=distribution, report_path=report_path, emissions_count=len(emissions), wall_clock_s=wall_clock_s, replay_subprocess_seconds=replay_subprocess_seconds, ) def _validate_inputs( *, base_config_path: Path, tlog_path: Path, video_path: Path, calibration_path: Path, signing_key_path: Path, replay_binary: Path, report_dir: Path, ) -> None: """Fail fast on missing inputs (AC-5 — surface the failing step early).""" file_inputs: tuple[tuple[str, Path], ...] = ( ("base_config_path", base_config_path), ("tlog_path", tlog_path), ("video_path", video_path), ("calibration_path", calibration_path), ("signing_key_path", signing_key_path), ("replay_binary", replay_binary), ) for label, path in file_inputs: if not path.is_file(): raise OrchestrationFailure( OrchestratorStep.VALIDATE_INPUTS, f"{label} is not a file: {path}", ) try: report_dir.mkdir(parents=True, exist_ok=True) except OSError as exc: raise OrchestrationFailure( OrchestratorStep.VALIDATE_INPUTS, f"report_dir {report_dir} cannot be created: {exc!r}", ) from exc def _run_replay_subprocess( *, replay_binary: Path, video_path: Path, tlog_path: Path, output_path: Path, calibration_path: Path, config_path: Path, signing_key_path: Path, max_seconds: float, runner: Callable[..., subprocess.CompletedProcess[str]], env: Mapping[str, str] | None, logger: logging.Logger, ) -> float: """Invoke gps-denied-replay with --auto-trim; return wall-clock seconds. Wraps :class:`subprocess.run` so unit tests can inject a fake runner. ``--auto-trim`` is always enabled here — the orchestrator owns the AZ-405 / AZ-698 sync path (AZ-840 step 1). Raises: OrchestrationFailure (step=AIRBORNE_PIPELINE): Non-zero exit, timeout, or runner-level OSError. """ argv = [ str(replay_binary), "--video", str(video_path), "--tlog", str(tlog_path), "--output", str(output_path), "--camera-calibration", str(calibration_path), "--config", str(config_path), "--mavlink-signing-key", str(signing_key_path), "--pace", "asap", "--auto-trim", ] started = time.monotonic() try: completed = runner( argv, capture_output=True, text=True, timeout=max_seconds, env=dict(env) if env is not None else None, ) except subprocess.TimeoutExpired as exc: raise OrchestrationFailure( OrchestratorStep.AIRBORNE_PIPELINE, f"gps-denied-replay timed out after {max_seconds:.0f} s", ) from exc except OSError as exc: raise OrchestrationFailure( OrchestratorStep.AIRBORNE_PIPELINE, f"cannot launch gps-denied-replay at {replay_binary}: {exc!r}", ) from exc elapsed_s = max(0.0, time.monotonic() - started) if completed.returncode != 0: raise OrchestrationFailure( OrchestratorStep.AIRBORNE_PIPELINE, f"gps-denied-replay exited {completed.returncode}\n" f"stdout:\n{completed.stdout}\nstderr:\n{completed.stderr}", ) logger.info( "e2e_orchestrator: replay subprocess complete", extra={ "kind": "e2e_orchestrator.replay_subprocess", "kv": { "elapsed_s": elapsed_s, "max_seconds": max_seconds, }, }, ) return elapsed_s def _parse_jsonl(path: Path) -> list[dict[str, Any]]: """Read one JSON record per non-blank line. Raises: OrchestrationFailure (step=PARSE_EMISSIONS): Output file missing, unreadable, has zero records, or contains a malformed line. """ if not path.is_file(): raise OrchestrationFailure( OrchestratorStep.PARSE_EMISSIONS, f"replay output JSONL not found: {path}", ) try: text = path.read_text() except OSError as exc: raise OrchestrationFailure( OrchestratorStep.PARSE_EMISSIONS, f"replay output JSONL unreadable at {path}: {exc!r}", ) from exc rows: list[dict[str, Any]] = [] for line_idx, line in enumerate(text.splitlines(), start=1): if not line.strip(): continue try: row = json.loads(line) except json.JSONDecodeError as exc: raise OrchestrationFailure( OrchestratorStep.PARSE_EMISSIONS, f"malformed JSON at line {line_idx} of {path}: {exc.msg}", ) from exc if not isinstance(row, dict): raise OrchestrationFailure( OrchestratorStep.PARSE_EMISSIONS, f"line {line_idx} of {path} is not a JSON object: {row!r}", ) rows.append(row) if not rows: raise OrchestrationFailure( OrchestratorStep.PARSE_EMISSIONS, f"replay output JSONL at {path} has zero records — pipeline " "produced no estimator emissions", ) return rows def _load_ground_truth(tlog_path: Path) -> list[GroundTruthRow]: """Extract WGS84 ground truth from the binary tlog. Raises: OrchestrationFailure (step=LOAD_GROUND_TRUTH): Loader error or empty record list. """ try: series = load_tlog_ground_truth(tlog_path).records except Exception as exc: raise OrchestrationFailure( OrchestratorStep.LOAD_GROUND_TRUTH, f"load_tlog_ground_truth({tlog_path}) failed: {exc!r}", ) from exc rows: list[GroundTruthRow] = [ GroundTruthRow( t_s=fix.ts_ns / 1e9, lat_deg=fix.lat_deg, lon_deg=fix.lon_deg, alt_m=fix.alt_m, ) for fix in series ] if not rows: raise OrchestrationFailure( OrchestratorStep.LOAD_GROUND_TRUTH, f"tlog ground truth at {tlog_path} has zero rows", ) return rows def _compute_distribution( emissions: list[dict[str, Any]], ground_truth: list[GroundTruthRow], ) -> HorizontalErrorDistribution: """Compute the horizontal-error distribution. Raises: OrchestrationFailure (step=COMPUTE_DISTRIBUTION): Helper error or zero ground-truth pairings (every emission fell outside the GT time window). """ try: distribution = horizontal_error_distribution(emissions, ground_truth) except Exception as exc: raise OrchestrationFailure( OrchestratorStep.COMPUTE_DISTRIBUTION, f"horizontal_error_distribution failed: {exc!r}", ) from exc if distribution.count == 0: raise OrchestrationFailure( OrchestratorStep.COMPUTE_DISTRIBUTION, "no emissions paired with ground truth — JSONL timestamps " "outside the tlog GPS window?", ) return distribution def _render_and_write_report( *, distribution: HorizontalErrorDistribution, context: ReportContext, passed: bool, report_dir: Path, ) -> Path: """Render the verdict markdown and write it to ``report_dir``. Raises: OrchestrationFailure (step=RENDER_REPORT): Render or write failure; ``report_dir`` was already created by :func:`_validate_inputs`. """ try: report_text = render_report(distribution, context, passed=passed) except Exception as exc: raise OrchestrationFailure( OrchestratorStep.RENDER_REPORT, f"render_report failed: {exc!r}", ) from exc report_path = ( report_dir / f"real_flight_validation_{context.run_date_utc}.md" ) try: report_path.write_text(report_text) except OSError as exc: raise OrchestrationFailure( OrchestratorStep.RENDER_REPORT, f"cannot write report at {report_path}: {exc!r}", ) from exc return report_path