From ade0c86f2bd64259e1a5c71c35cca1e77c91d6b9 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Sat, 23 May 2026 15:27:41 +0300 Subject: [PATCH] [AZ-840] [AZ-835] e2e orchestrator test (E-AZ-835 C4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps the AZ-699 verdict-report path with the AZ-839 operator_pre_flight_setup C3 fixture so a single Tier-2 test takes only (tlog, video, calibration) and runs the full 7-step pipeline on the Jetson harness without operator hand-curation. New surface (tests-only, no src/ changes): - tests/e2e/replay/_e2e_orchestrator.py — orchestrator with OrchestratorStep enum, OrchestrationFailure exception (step prefix per AC-5), OrchestrationReport dataclass, write_effective_replay_config helper, and run_e2e_orchestration entry point covering steps 1-2-6-7. - tests/e2e/replay/test_e2e_orchestrator_unit.py — 17 unit tests covering each failure mode + happy path with mocked subprocess + ground-truth loader (AC-8). - tests/e2e/replay/test_az835_e2e_real_flight.py — Tier-2 + RUN_REPLAY_E2E gated integration test asserting verdict report exists, 15-min budget held (AC-1, AC-2, AC-3, AC-4, AC-6). The effective config write overlays c6_tile_cache.root_dir onto the static operator YAML at runtime so the airborne subprocess shares the cache_root the C3 fixture chose. Field- level merge — every other operator-config block stays verbatim. The static YAML on disk is never touched. Test run: tests/e2e/replay 45 passed, 10 skipped (10 skips were 9 pre-existing + 1 new tier2). No src/ touched, no AZ-839 driver changes; AC-7 (AZ-699 still passes) holds by inspection. Co-authored-by: Cursor --- .../AZ-840_e2e_orchestrator_test.md | 0 .../batch_109_cycle3_report.md | 171 +++++ _docs/_autodev_state.md | 2 +- tests/e2e/replay/_e2e_orchestrator.py | 655 +++++++++++++++++ .../e2e/replay/test_az835_e2e_real_flight.py | 182 +++++ .../e2e/replay/test_e2e_orchestrator_unit.py | 671 ++++++++++++++++++ 6 files changed, 1680 insertions(+), 1 deletion(-) rename _docs/02_tasks/{todo => done}/AZ-840_e2e_orchestrator_test.md (100%) create mode 100644 _docs/03_implementation/batch_109_cycle3_report.md create mode 100644 tests/e2e/replay/_e2e_orchestrator.py create mode 100644 tests/e2e/replay/test_az835_e2e_real_flight.py create mode 100644 tests/e2e/replay/test_e2e_orchestrator_unit.py diff --git a/_docs/02_tasks/todo/AZ-840_e2e_orchestrator_test.md b/_docs/02_tasks/done/AZ-840_e2e_orchestrator_test.md similarity index 100% rename from _docs/02_tasks/todo/AZ-840_e2e_orchestrator_test.md rename to _docs/02_tasks/done/AZ-840_e2e_orchestrator_test.md diff --git a/_docs/03_implementation/batch_109_cycle3_report.md b/_docs/03_implementation/batch_109_cycle3_report.md new file mode 100644 index 0000000..38a1e22 --- /dev/null +++ b/_docs/03_implementation/batch_109_cycle3_report.md @@ -0,0 +1,171 @@ +# Batch 109 — Cycle 3 — AZ-840 e2e orchestrator test + +**Date**: 2026-05-23 +**Tasks**: AZ-840 (C4 — Epic AZ-835). +**Story points**: 3 (per the task spec). +**Jira status**: AZ-840 In Progress → In Testing at commit step. + +## Why this batch exists + +Epic AZ-835 (real-flight e2e validation) needs a single Tier-2 +test that proves the 7-step pipeline runs from +`(tlog, video, calibration)` to a horizontal-error verdict +without operator hand-curation between steps. Steps 3-5 were +delivered by AZ-839 (C3 — `operator_pre_flight_setup`); steps +1-2-6-7 are this batch. + +The AZ-839 batch 108b follow-up note explicitly anticipated this +batch: "AZ-840 will additionally need to feed the airborne +replay binary a config that points at the same `cache_root` +... the cleanest path is for AZ-840 to write an effective YAML +at runtime from the same override recipe used here." + +## What this batch ships + +A driver module + unit test suite + Tier-2 integration test: + +* `tests/e2e/replay/_e2e_orchestrator.py` — wraps the AZ-699 + verdict-report path with the AZ-839 C3 fixture's + `PopulatedC6Cache`. Public surface: + * `OrchestratorStep` enum — failure-step labels per AC-5. + * `OrchestrationFailure(step, message)` exception — wraps + every step failure with the step name in the message prefix. + * `OrchestrationReport` dataclass — verdict, distribution, + paths, wall-clock measurements per AC-4. + * `write_effective_replay_config` — small helper that overlays + `c6_tile_cache.root_dir` onto the static operator YAML. + * `read_calibration_acquisition_method` — mirror of AZ-699's + helper so the report writer keeps the same shape. + * `run_e2e_orchestration` — the AC-1 entry point wiring + validate → write_config → airborne subprocess → parse JSONL + → load tlog GT → compute distribution → render report. +* `tests/e2e/replay/test_e2e_orchestrator_unit.py` — 17 unit + tests covering each of the 7 steps' failure modes plus the + happy path. The runner is injected (`subprocess.run` default) + so unit tests stage synthetic JSONL output without touching + the airborne binary. `load_tlog_ground_truth` is monkeypatched + to return a synthetic 3-row series. +* `tests/e2e/replay/test_az835_e2e_real_flight.py:: + test_az840_e2e_real_flight_orchestration` — Tier-2 + RUN_REPLAY_E2E + gated test that consumes the C3 fixture + Derkachi inputs and + asserts the verdict markdown is written, the threshold-hit + share table is present, and the 15-min budget held. + +## AC coverage + +| AC | Description | Coverage | +|-----|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| AC-1| Steps 1-7 end-to-end on Tier-2 from a fresh tlog/video | `test_az840_e2e_real_flight_orchestration` (Tier-2-gated); 17 unit tests prove the orchestrator structure | +| AC-2| Verdict report exists either PASS or FAIL | `test_run_e2e_orchestration_writes_report_even_on_fail_verdict` + integration assertion `report_path.is_file()` | +| AC-3| Reuses C3 fixture (`operator_pre_flight_setup`) | Integration test consumes the fixture; effective config overlay points at `populated_cache.cache_root` | +| AC-4| 15-min wall-time soft target on the Derkachi clip | `_DEFAULT_MAX_SECONDS = 900.0` passed as `subprocess.run` `timeout`; integration asserts `replay_subprocess_seconds <= 900`| +| AC-5| Mid-pipeline failure fails LOUD with a clear step prefix | `OrchestratorStep` enum + 8 step-specific failure unit tests (`validate`/`write_config`/`airborne` × 3/`parse` × 2/`gt`) | +| AC-6| Gated by `RUN_REPLAY_E2E=1` + Tier-2 marker | `_orchestrator_skip_reason()` checks env vars + binary + video size; `@pytest.mark.tier2` decorator | +| AC-7| AZ-699 verdict test continues to pass | No changes to `test_derkachi_real_tlog.py`; same `real_flight_validation_.md` report path convention | +| AC-8| Unit-tested orchestration helper without Tier-2 inputs | 17 unit tests covering config write (4) + calibration parse (3) + run helper (10) — all use mocked subprocess + GT loader | + +## Test run results + +``` +$ .venv/bin/pytest tests/e2e/replay/ -v --tb=short --timeout=60 +============================ 45 passed, 10 skipped, 3 warnings in 0.78s ============ +``` + +Breakdown: +* 17 new orchestrator unit tests pass. +* 11 AZ-839 driver unit tests still pass (no driver changes). +* 14 helper unit tests (`test_helpers.py`) still pass. +* 3 derkachi-1min mode-agnostic AST tests still pass. +* 10 skips: 1 new Tier-2 (this AZ-840 integration), 6 + RUN_REPLAY_E2E gated AZ-404 cases, 1 AC-8 D-PROJ-2 placeholder, + 1 Tier-2 AZ-699, 1 Tier-2 AZ-839 integration. None are + regressions; the tier2 gate trips off-Jetson. + +## Design notes + +### `--auto-trim` ownership + +The orchestrator passes `--auto-trim` unconditionally so AZ-405 / +AZ-698 active-flight-cut + tlog/video sync (Epic step 1) runs +inside the airborne binary every time. The Epic narrative does +not separate trim from the airborne pipeline; collapsing them +into a single subprocess invocation matches AZ-699 and avoids +duplicating the trim path. + +### `clip_duration_s` parity with AZ-699 + +`run_e2e_orchestration` computes +`clip_duration_s = ground_truth[-1].t_s - ground_truth[0].t_s` +exactly as `test_derkachi_real_tlog.py` does. This means both +verdict reports name the same clip duration even when the +trimmed video is shorter than the ground-truth window — a +deliberate choice: the report header documents what the verdict +covers, not what the binary processed. + +### Effective config write — single source of truth + +`write_effective_replay_config` materialises the same override +recipe AZ-839 uses in-memory, but on disk so the airborne +subprocess sees the cache_root the fixture chose. Field-level +merge: every other block in the operator YAML is preserved +verbatim; only `c6_tile_cache.root_dir` and +`c6_tile_cache.faiss_index_path` are overwritten. The static +operator YAML on disk is never touched. + +### Failure surface = step prefix + +`OrchestrationFailure` always prefixes its message with +`[]`. CI log scrapers and pytest's traceback printer both +surface the prefix on the first line; AC-5 ("clear error +pointing at the failing step") holds without requiring the test +to inspect the exception object. The step is also exposed as +`exc.step` for programmatic assertions. + +## Files changed + +* `tests/e2e/replay/_e2e_orchestrator.py` (new, 656 LOC). +* `tests/e2e/replay/test_e2e_orchestrator_unit.py` (new, 660+ LOC). +* `tests/e2e/replay/test_az835_e2e_real_flight.py` (new, 156 LOC). + +No `src/` changes, no operator-config YAML changes, no AZ-839 +driver changes. AZ-840 is purely additive at the test layer. + +## Code review (self-review) + +Verdict: **PASS_WITH_WARNINGS**. + +| Phase | Result | +|-------|--------| +| 1. Context loading | Re-read `gps_compare.py`, `accuracy_report.py`, `replay_input.py`, `cli/replay.py`, `test_derkachi_real_tlog.py`. Emission schema (`emitted_at`, `position_wgs84`) is the same shape `gps-denied-replay` writes. | +| 2. Spec compliance | All 8 AZ-840 ACs covered; AC-7 holds by inspection (no AZ-699 changes). | +| 3. Code quality | All public types have docstrings; failure messages name the upstream exception via `repr` so `OSError` / `subprocess.TimeoutExpired` carry through. Runner kw-args mirror `subprocess.run` signature 1:1. | +| 4. Security quick-scan | Effective config write goes to a tmp file the test owns; no secrets in the YAML overlay (override is two string fields). Subprocess `env` is opt-in (`None` defaults to `os.environ`). | +| 5. Performance scan | Unit tests run in 0.51 s. Tier-2 wall-clock cap is 900 s, enforced by the subprocess timeout. | +| 6. Cross-task consistency | `clip_duration_s` and `report_path` match AZ-699 exactly so a single Jetson run produces the same markdown shape. | +| 7. Architecture compliance | Orchestrator lives entirely under `tests/e2e/replay/`; no `src/` writes. C3 fixture's invariants (`PopulatedC6Cache.cache_root` is the single source of truth) propagate via `write_effective_replay_config`. | + +## Findings + +| ID | Severity | Description | Disposition | +|----|----------|-------------|-------------| +| F1 | Low | `_default_tile_decoder` in `conftest.py` (carried from batch 108) — still raw TIFF. Not in the AZ-840 path; AZ-840 doesn't change tile decoding. | Defer; no AZ-840 ticket. | +| F2 | Low | `_resolve_replay_descriptor_dim` is NetVLAD-only (carried from batch 108). AZ-840 doesn't change descriptors. | Defer; no AZ-840 ticket. | +| F3 | Low | `--pace asap` is hardcoded in `_run_replay_subprocess` argv; the AZ-699 test passes `--pace asap` too, so behaviour is identical. If a future test wants a real-time pace, the runner kwarg is the seam. | Document; no ticket. | +| F4 | Low | `_run_replay_subprocess` does not stream stdout/stderr; failures surface only after the subprocess exits. For 15-min runs this means the operator sees no progress until the budget expires. AZ-699 has the same shape. | Document; consider an AZ-* if the budget grows. | + +## Notes for follow-up + +* AZ-840 lands the orchestrator test as Tier-2-gated. Verifying + the Tier-2 path actually runs on the Jetson harness is the + next gating step before Epic AZ-835 can flip from "covered by + unit tests" to "covered by Tier-2 integration". +* `_e2e_orchestrator.py` is intentionally kept under `tests/` + rather than promoted to `src/`. If a second consumer of the + same orchestration shape appears (e.g. AZ-833 mock-suite-sat + parity test), the move to a shared helper module under + `src/gps_denied_onboard/replay/` is the right next step; + for now the test-only location matches the helper's only + consumer. +* AZ-841 (Tier-2 unxfail follow-up) and AZ-842 (replay protocol + + orchestrator docs) sit downstream — both should reference + this batch report in their planning sections. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index ae6fa51..b90b7e2 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 7 name: batch-loop - detail: "batch 109 next; AZ-840 C4" + detail: "batch 110 next; full-suite gate after AZ-840 C4 ship" retry_count: 0 cycle: 3 tracker: jira diff --git a/tests/e2e/replay/_e2e_orchestrator.py b/tests/e2e/replay/_e2e_orchestrator.py new file mode 100644 index 0000000..12f7ed4 --- /dev/null +++ b/tests/e2e/replay/_e2e_orchestrator.py @@ -0,0 +1,655 @@ +"""E2E orchestrator for the AZ-835 7-step pipeline (AZ-840 / Epic AZ-835 C4). + +Wraps the AZ-699 verdict-report writing path with the AZ-839 C3 +fixture's `PopulatedC6Cache` so a single Tier-2 test can run from +``(tlog, video, calibration)`` to a horizontal-error report without +operator hand-curation between steps. The 7-step Epic narrative +(``_docs/02_tasks/todo/AZ-840_e2e_orchestrator_test.md``): + +1. Active flight cut + tlog/video sync — handled by ``gps-denied-replay`` + ``--auto-trim`` (AZ-405 / AZ-698) inside the airborne binary. +2. On-fly frame + IMU extraction — same binary's per-frame loop. +3. Auto-create route — done by the C3 fixture + (``operator_pre_flight_setup`` calls ``extract_route_from_tlog``). +4. POST route to satellite-provider — C3 fixture (AZ-838 + ``SatelliteProviderRouteClient.seed_route``). +5. Build FAISS index — C3 fixture (AZ-322 ``DescriptorBatcher``). +6. Run gps-denied airborne pipeline — this module's + ``_run_replay_subprocess`` invokes ``gps-denied-replay`` against + the populated cache. +7. Get GPS fixes, check vs tlog GPS — this module's + ``_load_ground_truth`` + ``horizontal_error_distribution`` + + ``render_report`` writes the verdict markdown. + +The C3 fixture mutates ``c6_tile_cache.root_dir`` to point at a +``tmp_path_factory.mktemp`` value (AZ-839 batch 108b). The static +operator YAML at ``GPS_DENIED_OPERATOR_CONFIG_PATH`` cannot know +that path. ``write_effective_replay_config`` reads the static YAML, +overlays the ``c6_tile_cache.root_dir`` override, writes the merged +result to a tmp file, and returns the path the airborne binary +will load via ``--config``. This keeps a single source of truth +for the cache_root override across the in-memory C3 fixture path +and the subprocess airborne path. + +Public surface — re-exported from this module: + +* :class:`OrchestratorStep` — failure-step labels per AC-5 ("fails + LOUD with a clear error pointing at the failing step"). +* :class:`OrchestrationFailure` — wraps the underlying exception + with the step that produced it. +* :class:`OrchestrationReport` — return value of + :func:`run_e2e_orchestration` (verdict, distribution, paths, + wall-clock measurements per AC-4). +* :func:`write_effective_replay_config` — small helper for the + config merge step. +* :func:`run_e2e_orchestration` — the AC-1 entry point. +""" + +from __future__ import annotations + +import datetime +import json +import logging +import subprocess +import time +from collections.abc import Callable, Mapping +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any + +import yaml + +from gps_denied_onboard.helpers.accuracy_report import ( + AC3_GATE_PCT, + AC3_GATE_THRESHOLD_M, + ReportContext, + render_report, + verdict_passes_ac3, +) +from gps_denied_onboard.helpers.gps_compare import ( + GroundTruthRow, + HorizontalErrorDistribution, + horizontal_error_distribution, +) +from gps_denied_onboard.replay_input import load_tlog_ground_truth + +from tests.e2e.replay._operator_pre_flight import PopulatedC6Cache + +__all__ = [ + "OrchestrationFailure", + "OrchestrationReport", + "OrchestratorStep", + "read_calibration_acquisition_method", + "run_e2e_orchestration", + "write_effective_replay_config", +] + + +# Replay-subprocess wall-clock cap for the Derkachi clip per AZ-840 +# AC-4 (15 min soft target). Exposed as a default that the integration +# test can override; the unit tests rely on the contract that the +# runner argument is a free callable. +_DEFAULT_MAX_SECONDS: float = 900.0 + +_LOGGER = logging.getLogger("tests.e2e.replay.e2e_orchestrator") + + +class OrchestratorStep(str, Enum): + """Labels for the 7-step pipeline used by :class:`OrchestrationFailure`. + + AC-5: every failure that reaches the test surface must name the + step that produced it. The string values are stable so test + assertions and log readers can match on them. + """ + + VALIDATE_INPUTS = "validate_inputs" + WRITE_EFFECTIVE_CONFIG = "write_effective_config" + AIRBORNE_PIPELINE = "airborne_pipeline" + PARSE_EMISSIONS = "parse_emissions" + LOAD_GROUND_TRUTH = "load_ground_truth" + COMPUTE_DISTRIBUTION = "compute_distribution" + RENDER_REPORT = "render_report" + + +class OrchestrationFailure(RuntimeError): + """Failure inside one of the 7 orchestration steps (AC-5). + + The :attr:`step` attribute names the failing step; the message + embeds it as the prefix so plain log-readers see the failure + location without inspecting the exception object. + """ + + def __init__(self, step: OrchestratorStep, message: str) -> None: + super().__init__(f"[{step.value}] {message}") + self.step = step + + +@dataclass(frozen=True, slots=True) +class OrchestrationReport: + """Return value of :func:`run_e2e_orchestration`. + + Attributes: + verdict_passed: ``True`` iff the run met the AZ-696 epic + AC-3 gate (>= AC3_GATE_PCT% within AC3_GATE_THRESHOLD_M m). + distribution: Computed horizontal-error distribution. + report_path: Markdown report written under ``report_dir``. + emissions_count: Total estimator-output records consumed. + wall_clock_s: Wall-clock seconds for the orchestration run + (excludes the C3 fixture setup; covers steps 1-2-6-7). + replay_subprocess_seconds: Wall-clock seconds the airborne + replay subprocess took. Always <= ``wall_clock_s``. + """ + + verdict_passed: bool + distribution: HorizontalErrorDistribution + report_path: Path + emissions_count: int + wall_clock_s: float + replay_subprocess_seconds: float + + +def read_calibration_acquisition_method(calibration_path: Path) -> str: + """Return the AZ-702 ``acquisition_method`` field, or ``"unknown"``. + + Mirrors ``test_derkachi_real_tlog._read_calibration_acquisition_method`` + so the AZ-840 verdict report can name the calibration provenance + in its failure message (AZ-699 AC-3). Pure helper; the report + writer needs the string, not the JSON. + """ + try: + data = json.loads(calibration_path.read_text()) + except (OSError, json.JSONDecodeError): + return "unknown" + method = data.get("acquisition_method") + if isinstance(method, str) and method: + return method + return "unknown" + + +def write_effective_replay_config( + *, + base_config_path: Path, + cache_root: Path, + output_path: Path, +) -> Path: + """Merge cache_root override into the static operator YAML. + + Reads ``base_config_path`` as YAML, sets the + ``c6_tile_cache.root_dir`` to ``cache_root`` (forcing the + FAISS index path to fall back to ``/descriptor.index``), + and writes the merged document to ``output_path`` as YAML. + + The merge is field-level: every other block in the base YAML is + preserved verbatim. This keeps a single source of truth for the + operator config — the test harness only contributes the dynamic + cache_root. + + Returns: + The ``output_path`` argument, for ergonomic chaining. + + Raises: + OrchestrationFailure (step=WRITE_EFFECTIVE_CONFIG): Base YAML + unreadable, malformed, or not a top-level mapping. + """ + + try: + base_text = base_config_path.read_text() + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.WRITE_EFFECTIVE_CONFIG, + f"cannot read base config at {base_config_path}: {exc!r}", + ) from exc + + try: + base_data = yaml.safe_load(base_text) or {} + except yaml.YAMLError as exc: + raise OrchestrationFailure( + OrchestratorStep.WRITE_EFFECTIVE_CONFIG, + f"base config YAML at {base_config_path} is malformed: {exc!r}", + ) from exc + if not isinstance(base_data, dict): + raise OrchestrationFailure( + OrchestratorStep.WRITE_EFFECTIVE_CONFIG, + f"base config YAML at {base_config_path} must be a mapping; " + f"got {type(base_data).__name__}", + ) + + c6_block_raw = base_data.get("c6_tile_cache") + c6_block = dict(c6_block_raw) if isinstance(c6_block_raw, dict) else {} + c6_block["root_dir"] = str(cache_root) + c6_block["faiss_index_path"] = "" + base_data["c6_tile_cache"] = c6_block + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + yaml.safe_dump(base_data, sort_keys=True, default_flow_style=False) + ) + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.WRITE_EFFECTIVE_CONFIG, + f"cannot write effective config at {output_path}: {exc!r}", + ) from exc + return output_path + + +def run_e2e_orchestration( + *, + populated_cache: PopulatedC6Cache, + base_config_path: Path, + tlog_path: Path, + video_path: Path, + calibration_path: Path, + signing_key_path: Path, + replay_binary: Path, + output_path: Path, + report_dir: Path, + effective_config_path: Path, + run_date_utc: str | None = None, + runner: Callable[..., subprocess.CompletedProcess[str]] = subprocess.run, + subprocess_env: Mapping[str, str] | None = None, + max_seconds: float = _DEFAULT_MAX_SECONDS, + logger: logging.Logger | None = None, +) -> OrchestrationReport: + """Run AZ-835 steps 1-7 against the AZ-839 populated cache. + + Steps 3-5 are the responsibility of ``populated_cache`` (the + AZ-839 C3 fixture); this function covers 1-2-6 (the airborne + replay subprocess) and 7 (verdict report). The C3 fixture and + this function share the cache_root via + :func:`write_effective_replay_config` so the airborne binary + reads the same FAISS index the fixture wrote (AC-3). + + Args: + populated_cache: C3 fixture output (AZ-839). Carries + ``cache_root``, ``faiss_index_path``, and the route + spec the test pipeline produced. + base_config_path: Static operator config YAML + (``GPS_DENIED_OPERATOR_CONFIG_PATH``). Must register + ``c6_tile_cache``, ``c10_provisioning``, ``c2_vpr``, + ``c4_pose``, and ``c5_state`` blocks for the airborne + binary to compose the replay graph. + tlog_path: ArduPilot binary tlog the test consumes. + video_path: Flight video file the test consumes. + calibration_path: Camera calibration JSON (AZ-702 + factory-sheet for Derkachi). + signing_key_path: MAVLink signing-key file. Replay protocol + Invariant 11 — required even for the noop transport. + replay_binary: ``gps-denied-replay`` console-script path. + output_path: Where the airborne binary writes JSONL + estimator emissions. + report_dir: Directory the verdict markdown is written to. + effective_config_path: Where the cache_root-merged YAML is + written. The path is passed to the airborne binary via + ``--config``. + run_date_utc: ISO-8601 date for the report filename and + header. Defaults to today UTC. + runner: ``subprocess.run`` by default; tests inject a fake + that emits a synthetic JSONL output. + subprocess_env: Optional environment overlay for the + replay subprocess. ``None`` means ``os.environ``. + max_seconds: Hard wall-clock cap for the airborne replay + subprocess. The orchestrator times out the runner via + its ``timeout`` kwarg; an exceeded budget surfaces as + ``OrchestrationFailure(step=AIRBORNE_PIPELINE)``. + logger: Optional logger. Defaults to the module logger. + + Returns: + :class:`OrchestrationReport` on success. The verdict can + be PASS or FAIL — AC-2 mandates the report exists either + way. + + Raises: + OrchestrationFailure: Any of the 7 steps failed. The + ``step`` attribute names the failing step. + """ + + log = logger or _LOGGER + started = time.monotonic() + effective_run_date = run_date_utc or ( + datetime.datetime.now(datetime.timezone.utc).date().isoformat() + ) + + _validate_inputs( + base_config_path=base_config_path, + tlog_path=tlog_path, + video_path=video_path, + calibration_path=calibration_path, + signing_key_path=signing_key_path, + replay_binary=replay_binary, + report_dir=report_dir, + ) + + write_effective_replay_config( + base_config_path=base_config_path, + cache_root=populated_cache.cache_root, + output_path=effective_config_path, + ) + + replay_subprocess_seconds = _run_replay_subprocess( + replay_binary=replay_binary, + video_path=video_path, + tlog_path=tlog_path, + output_path=output_path, + calibration_path=calibration_path, + config_path=effective_config_path, + signing_key_path=signing_key_path, + max_seconds=max_seconds, + runner=runner, + env=subprocess_env, + logger=log, + ) + + emissions = _parse_jsonl(output_path) + + ground_truth = _load_ground_truth(tlog_path) + + distribution = _compute_distribution(emissions, ground_truth) + + context = ReportContext( + run_date_utc=effective_run_date, + tlog_path=tlog_path, + video_path=video_path, + calibration_acquisition_method=read_calibration_acquisition_method( + calibration_path + ), + clip_duration_s=( + ground_truth[-1].t_s - ground_truth[0].t_s + if ground_truth + else 0.0 + ), + emissions_count=len(emissions), + ) + verdict_passed = verdict_passes_ac3(distribution) + report_path = _render_and_write_report( + distribution=distribution, + context=context, + passed=verdict_passed, + report_dir=report_dir, + ) + + log.info( + "e2e_orchestrator: report written", + extra={ + "kind": "e2e_orchestrator.report_written", + "kv": { + "report_path": str(report_path), + "verdict_passed": verdict_passed, + "share_within_threshold_pct": ( + distribution.threshold_hit_share.get( + AC3_GATE_THRESHOLD_M, 0.0 + ) + * 100.0 + ), + "ac3_gate_pct": AC3_GATE_PCT, + "emissions_count": len(emissions), + "ground_truth_pairings": distribution.count, + }, + }, + ) + + wall_clock_s = max(0.0, time.monotonic() - started) + return OrchestrationReport( + verdict_passed=verdict_passed, + distribution=distribution, + report_path=report_path, + emissions_count=len(emissions), + wall_clock_s=wall_clock_s, + replay_subprocess_seconds=replay_subprocess_seconds, + ) + + +def _validate_inputs( + *, + base_config_path: Path, + tlog_path: Path, + video_path: Path, + calibration_path: Path, + signing_key_path: Path, + replay_binary: Path, + report_dir: Path, +) -> None: + """Fail fast on missing inputs (AC-5 — surface the failing step early).""" + file_inputs: tuple[tuple[str, Path], ...] = ( + ("base_config_path", base_config_path), + ("tlog_path", tlog_path), + ("video_path", video_path), + ("calibration_path", calibration_path), + ("signing_key_path", signing_key_path), + ("replay_binary", replay_binary), + ) + for label, path in file_inputs: + if not path.is_file(): + raise OrchestrationFailure( + OrchestratorStep.VALIDATE_INPUTS, + f"{label} is not a file: {path}", + ) + try: + report_dir.mkdir(parents=True, exist_ok=True) + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.VALIDATE_INPUTS, + f"report_dir {report_dir} cannot be created: {exc!r}", + ) from exc + + +def _run_replay_subprocess( + *, + replay_binary: Path, + video_path: Path, + tlog_path: Path, + output_path: Path, + calibration_path: Path, + config_path: Path, + signing_key_path: Path, + max_seconds: float, + runner: Callable[..., subprocess.CompletedProcess[str]], + env: Mapping[str, str] | None, + logger: logging.Logger, +) -> float: + """Invoke gps-denied-replay with --auto-trim; return wall-clock seconds. + + Wraps :class:`subprocess.run` so unit tests can inject a fake + runner. ``--auto-trim`` is always enabled here — the + orchestrator owns the AZ-405 / AZ-698 sync path (AZ-840 step 1). + + Raises: + OrchestrationFailure (step=AIRBORNE_PIPELINE): Non-zero exit, + timeout, or runner-level OSError. + """ + + argv = [ + str(replay_binary), + "--video", + str(video_path), + "--tlog", + str(tlog_path), + "--output", + str(output_path), + "--camera-calibration", + str(calibration_path), + "--config", + str(config_path), + "--mavlink-signing-key", + str(signing_key_path), + "--pace", + "asap", + "--auto-trim", + ] + started = time.monotonic() + try: + completed = runner( + argv, + capture_output=True, + text=True, + timeout=max_seconds, + env=dict(env) if env is not None else None, + ) + except subprocess.TimeoutExpired as exc: + raise OrchestrationFailure( + OrchestratorStep.AIRBORNE_PIPELINE, + f"gps-denied-replay timed out after {max_seconds:.0f} s", + ) from exc + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.AIRBORNE_PIPELINE, + f"cannot launch gps-denied-replay at {replay_binary}: {exc!r}", + ) from exc + + elapsed_s = max(0.0, time.monotonic() - started) + if completed.returncode != 0: + raise OrchestrationFailure( + OrchestratorStep.AIRBORNE_PIPELINE, + f"gps-denied-replay exited {completed.returncode}\n" + f"stdout:\n{completed.stdout}\nstderr:\n{completed.stderr}", + ) + logger.info( + "e2e_orchestrator: replay subprocess complete", + extra={ + "kind": "e2e_orchestrator.replay_subprocess", + "kv": { + "elapsed_s": elapsed_s, + "max_seconds": max_seconds, + }, + }, + ) + return elapsed_s + + +def _parse_jsonl(path: Path) -> list[dict[str, Any]]: + """Read one JSON record per non-blank line. + + Raises: + OrchestrationFailure (step=PARSE_EMISSIONS): Output file + missing, unreadable, has zero records, or contains a + malformed line. + """ + if not path.is_file(): + raise OrchestrationFailure( + OrchestratorStep.PARSE_EMISSIONS, + f"replay output JSONL not found: {path}", + ) + try: + text = path.read_text() + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.PARSE_EMISSIONS, + f"replay output JSONL unreadable at {path}: {exc!r}", + ) from exc + rows: list[dict[str, Any]] = [] + for line_idx, line in enumerate(text.splitlines(), start=1): + if not line.strip(): + continue + try: + row = json.loads(line) + except json.JSONDecodeError as exc: + raise OrchestrationFailure( + OrchestratorStep.PARSE_EMISSIONS, + f"malformed JSON at line {line_idx} of {path}: {exc.msg}", + ) from exc + if not isinstance(row, dict): + raise OrchestrationFailure( + OrchestratorStep.PARSE_EMISSIONS, + f"line {line_idx} of {path} is not a JSON object: {row!r}", + ) + rows.append(row) + if not rows: + raise OrchestrationFailure( + OrchestratorStep.PARSE_EMISSIONS, + f"replay output JSONL at {path} has zero records — pipeline " + "produced no estimator emissions", + ) + return rows + + +def _load_ground_truth(tlog_path: Path) -> list[GroundTruthRow]: + """Extract WGS84 ground truth from the binary tlog. + + Raises: + OrchestrationFailure (step=LOAD_GROUND_TRUTH): Loader + error or empty record list. + """ + try: + series = load_tlog_ground_truth(tlog_path).records + except Exception as exc: + raise OrchestrationFailure( + OrchestratorStep.LOAD_GROUND_TRUTH, + f"load_tlog_ground_truth({tlog_path}) failed: {exc!r}", + ) from exc + rows: list[GroundTruthRow] = [ + GroundTruthRow( + t_s=fix.ts_ns / 1e9, + lat_deg=fix.lat_deg, + lon_deg=fix.lon_deg, + alt_m=fix.alt_m, + ) + for fix in series + ] + if not rows: + raise OrchestrationFailure( + OrchestratorStep.LOAD_GROUND_TRUTH, + f"tlog ground truth at {tlog_path} has zero rows", + ) + return rows + + +def _compute_distribution( + emissions: list[dict[str, Any]], + ground_truth: list[GroundTruthRow], +) -> HorizontalErrorDistribution: + """Compute the horizontal-error distribution. + + Raises: + OrchestrationFailure (step=COMPUTE_DISTRIBUTION): Helper + error or zero ground-truth pairings (every emission + fell outside the GT time window). + """ + try: + distribution = horizontal_error_distribution(emissions, ground_truth) + except Exception as exc: + raise OrchestrationFailure( + OrchestratorStep.COMPUTE_DISTRIBUTION, + f"horizontal_error_distribution failed: {exc!r}", + ) from exc + if distribution.count == 0: + raise OrchestrationFailure( + OrchestratorStep.COMPUTE_DISTRIBUTION, + "no emissions paired with ground truth — JSONL timestamps " + "outside the tlog GPS window?", + ) + return distribution + + +def _render_and_write_report( + *, + distribution: HorizontalErrorDistribution, + context: ReportContext, + passed: bool, + report_dir: Path, +) -> Path: + """Render the verdict markdown and write it to ``report_dir``. + + Raises: + OrchestrationFailure (step=RENDER_REPORT): Render or write + failure; ``report_dir`` was already created by + :func:`_validate_inputs`. + """ + try: + report_text = render_report(distribution, context, passed=passed) + except Exception as exc: + raise OrchestrationFailure( + OrchestratorStep.RENDER_REPORT, + f"render_report failed: {exc!r}", + ) from exc + report_path = ( + report_dir / f"real_flight_validation_{context.run_date_utc}.md" + ) + try: + report_path.write_text(report_text) + except OSError as exc: + raise OrchestrationFailure( + OrchestratorStep.RENDER_REPORT, + f"cannot write report at {report_path}: {exc!r}", + ) from exc + return report_path diff --git a/tests/e2e/replay/test_az835_e2e_real_flight.py b/tests/e2e/replay/test_az835_e2e_real_flight.py new file mode 100644 index 0000000..6ffcafb --- /dev/null +++ b/tests/e2e/replay/test_az835_e2e_real_flight.py @@ -0,0 +1,182 @@ +"""AZ-840 — E2E orchestrator integration test (AC-1 / AC-2 / AC-3 / AC-4 / AC-6). + +The Tier-2 entry point that closes Epic AZ-835's narrative: from a +``(tlog, video, calibration)`` triple, run the full 7-step pipeline +end-to-end on the Jetson harness without operator hand-curation +between steps. + +The test consumes: + +* :func:`tests.e2e.replay.conftest.operator_pre_flight_setup` — + the AZ-839 C3 fixture that owns steps 3-5 (route extraction + + satellite-provider seeding + FAISS index build) and yields a + :class:`PopulatedC6Cache` keyed off a freshly-mktemp'd + ``cache_root``. +* :func:`tests.e2e.replay.conftest.derkachi_replay_inputs` — the + shared session fixture that materialises the Derkachi tlog + + video + factory-sheet calibration + signing-key file. +* :func:`tests.e2e.replay._e2e_orchestrator.run_e2e_orchestration` + — the AC-1 driver that wires everything below the C3 fixture. + +The driver writes a fresh effective replay config per session +(merging the static operator YAML with the cache_root override), +invokes ``gps-denied-replay --auto-trim``, parses the JSONL +emissions, computes the horizontal-error distribution, and writes +the verdict markdown under ``_docs/06_metrics/`` (AC-2). + +Skip gates (in evaluation order): + +1. ``@pytest.mark.tier2`` — the per-suite Tier-2 plugin gates this + off on dev macOS (matches the AZ-839 / AZ-699 contract). +2. ``RUN_REPLAY_E2E`` not in ``{1, true, yes, on}``. +3. ``gps-denied-replay`` console-script not on ``PATH``. +4. Real video missing or placeholder-sized (mirrors AZ-699's gate). +5. ``operator_pre_flight_setup`` fixture itself skipped — the + downstream consumer inherits the SKIP automatically (pytest's + fixture-skip propagation). + +AC-7 (AZ-699 continues to pass) is satisfied by inspection: this +test does not modify ``test_derkachi_real_tlog.py`` and writes its +report to the same path (``real_flight_validation_.md``) but +in an idempotent way — both tests writing PASS or both writing +FAIL is the expected joint outcome on a given clip. +""" + +from __future__ import annotations + +import os +import shutil +import sys +from collections.abc import Iterator +from pathlib import Path + +import pytest + +from tests.e2e.replay._e2e_orchestrator import ( + OrchestrationReport, + run_e2e_orchestration, +) +from tests.e2e.replay._operator_pre_flight import PopulatedC6Cache +from tests.e2e.replay.conftest import DerkachiReplayInputs + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[3] + + +def _derkachi_dir() -> Path: + return _repo_root() / "_docs" / "00_problem" / "input_data" / "flight_derkachi" + + +_MIN_REAL_VIDEO_BYTES: int = 1_000_000 + + +def _replay_binary() -> Path | None: + """Return the absolute path to ``gps-denied-replay`` or ``None``. + + Same lookup order AZ-699 uses: PATH first, venv bin second. + """ + + binary = shutil.which("gps-denied-replay") + if binary is not None: + return Path(binary) + venv_bin = Path(sys.executable).parent / "gps-denied-replay" + if venv_bin.exists(): + return venv_bin + return None + + +def _orchestrator_skip_reason() -> str | None: + """Return a SKIP message when env / inputs preclude a Jetson run.""" + + if os.environ.get("RUN_REPLAY_E2E", "").strip().lower() not in { + "1", + "true", + "yes", + "on", + }: + return "AZ-840 e2e orchestrator gated by RUN_REPLAY_E2E=1" + if not os.environ.get("GPS_DENIED_OPERATOR_CONFIG_PATH", "").strip(): + return ( + "AZ-840 e2e orchestrator requires GPS_DENIED_OPERATOR_CONFIG_PATH " + "(same env var the C3 fixture consumes)" + ) + if _replay_binary() is None: + return "gps-denied-replay console-script not installed" + video = _derkachi_dir() / "flight_derkachi.mp4" + if not video.is_file(): + return f"Derkachi video missing: {video}" + if video.stat().st_size < _MIN_REAL_VIDEO_BYTES: + return ( + f"Derkachi video at {video} is only {video.stat().st_size} " + "bytes — placeholder, not a real recording" + ) + return None + + +@pytest.fixture +def az840_skip_gate() -> Iterator[None]: + """Skip-gate the orchestrator test before any heavy fixtures resolve.""" + + reason = _orchestrator_skip_reason() + if reason is not None: + pytest.skip(reason) + yield + + +@pytest.mark.tier2 +def test_az840_e2e_real_flight_orchestration( + az840_skip_gate: None, + operator_pre_flight_setup: PopulatedC6Cache, + derkachi_replay_inputs: DerkachiReplayInputs, + tmp_path: Path, +) -> None: + # Arrange — every input besides cache_root comes from the existing + # session fixtures so the same Tier-2 harness setup that powers + # AZ-699 + AZ-839 is exercised. + binary = _replay_binary() + assert binary is not None, "skip gate already verified the binary exists" + base_config_path = Path(os.environ["GPS_DENIED_OPERATOR_CONFIG_PATH"]) + output_path = tmp_path / "estimator_output.jsonl" + effective_config_path = tmp_path / "operator_config_effective.yaml" + report_dir = _repo_root() / "_docs" / "06_metrics" + + # Act + report = run_e2e_orchestration( + populated_cache=operator_pre_flight_setup, + base_config_path=base_config_path, + tlog_path=derkachi_replay_inputs.tlog_path, + video_path=derkachi_replay_inputs.video_path, + calibration_path=derkachi_replay_inputs.calibration_path, + signing_key_path=derkachi_replay_inputs.signing_key_path, + replay_binary=binary, + output_path=output_path, + report_dir=report_dir, + effective_config_path=effective_config_path, + ) + + # Assert AC-2 + AC-4 — report exists; full run within the 15-min budget. + assert isinstance(report, OrchestrationReport) + assert report.report_path.is_file() + body = report.report_path.read_text() + assert "## Horizontal error (metres)" in body + assert "## Threshold-hit share" in body + assert "Mean" in body + for threshold in (10, 25, 50, 100): + assert f"| {threshold} |" in body, ( + f"threshold {threshold} m row missing from report" + ) + assert report.replay_subprocess_seconds <= 900.0, ( + "AZ-840 AC-4: replay subprocess exceeded 15-min soft target" + ) + assert report.wall_clock_s >= report.replay_subprocess_seconds + assert report.distribution.count > 0, ( + "no emissions paired with ground truth — orchestration produced " + "data but every emission fell outside the tlog GPS window" + ) + + # Assert AC-3 — the effective config was written and points at the + # cache_root the C3 fixture supplied. + assert effective_config_path.is_file() + effective_text = effective_config_path.read_text() + assert str(operator_pre_flight_setup.cache_root) in effective_text diff --git a/tests/e2e/replay/test_e2e_orchestrator_unit.py b/tests/e2e/replay/test_e2e_orchestrator_unit.py new file mode 100644 index 0000000..72eda25 --- /dev/null +++ b/tests/e2e/replay/test_e2e_orchestrator_unit.py @@ -0,0 +1,671 @@ +"""Unit tests for the AZ-840 e2e orchestrator (AC-8). + +The end-to-end happy path is the Tier-2 integration test in +``test_az835_e2e_real_flight.py`` (AC-1 / AC-2). This module covers +the orchestration helper layer in isolation: + +* Param validation — every required path must exist before the + airborne subprocess is spawned (AC-5 fails LOUD). +* Effective-config merge — the ``c6_tile_cache.root_dir`` override + is written to YAML; the rest of the base config is preserved. +* Error propagation per step — every documented failure surfaces + as :class:`OrchestrationFailure` with the correct + :class:`OrchestratorStep` label. +* Happy path — when the runner returns success and the JSONL + + ground truth align, :class:`OrchestrationReport` carries a + written report path and an honest verdict (AC-2: report exists + PASS or FAIL). + +The tests inject a fake ``runner`` so no real +``gps-denied-replay`` subprocess is spawned. Real binary execution +is exercised on the Jetson harness via the AC-1 integration test. +""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +import yaml + +from gps_denied_onboard.helpers.accuracy_report import ( + AC3_GATE_THRESHOLD_M, +) +from gps_denied_onboard.replay_input.tlog_route import RouteSpec + +from tests.e2e.replay._e2e_orchestrator import ( + OrchestrationFailure, + OrchestrationReport, + OrchestratorStep, + read_calibration_acquisition_method, + run_e2e_orchestration, + write_effective_replay_config, +) +from tests.e2e.replay._operator_pre_flight import PopulatedC6Cache + + +# ---------------------------------------------------------------------- +# Helpers + + +def _build_populated_cache(tmp_path: Path) -> PopulatedC6Cache: + """Construct a synthetic :class:`PopulatedC6Cache`. + + The orchestrator only consumes ``cache_root`` from the cache, + so the FAISS sidecar paths are placeholders. The route_spec is + a minimal one-waypoint instance — no AZ-836 invariants are + re-asserted by AZ-840. + """ + + cache_root = tmp_path / "cache_root" + cache_root.mkdir() + return PopulatedC6Cache( + cache_root=cache_root, + tile_store_path=cache_root / "tiles", + faiss_index_path=cache_root / "descriptor.index", + faiss_sidecar_sha256_path=cache_root / "descriptor.index.sha256", + faiss_sidecar_meta_path=cache_root / "descriptor.index.meta.json", + route_spec=RouteSpec( + waypoints=((50.10, 36.10),), + suggested_region_size_meters=500.0, + source_tlog=Path("test.tlog"), + source_segment=(0, 100), + total_distance_meters=0.0, + ), + tile_count=1, + elapsed_seconds=0.0, + ) + + +def _stage_inputs(tmp_path: Path) -> dict[str, Path]: + """Write touch-files for every input path the orchestrator validates. + + The base config YAML carries one stub block so the merge step + has a real document to overlay on. + """ + + base_config = tmp_path / "operator_config.yaml" + base_config.write_text( + yaml.safe_dump( + { + "mode": "replay", + "c6_tile_cache": { + "store_runtime": "postgres_filesystem", + "metadata_runtime": "postgres_filesystem", + "descriptor_index_runtime": "faiss_hnsw", + "root_dir": "/var/lib/gps-denied/tiles", + "faiss_index_path": "/some/static/path/descriptor.index", + }, + } + ) + ) + + tlog = tmp_path / "input.tlog" + tlog.write_bytes(b"\x00") + video = tmp_path / "input.mp4" + video.write_bytes(b"\x00") + calibration = tmp_path / "calibration.json" + calibration.write_text(json.dumps({"acquisition_method": "factory-sheet"})) + signing_key = tmp_path / "signing_key.bin" + signing_key.write_bytes(b"\x00" * 32) + binary = tmp_path / "gps-denied-replay" + binary.write_text("") + + return { + "base_config_path": base_config, + "tlog_path": tlog, + "video_path": video, + "calibration_path": calibration, + "signing_key_path": signing_key, + "replay_binary": binary, + } + + +def _ground_truth_tlog_loader( + monkeypatch: pytest.MonkeyPatch, + *, + times_s: tuple[float, ...] = (0.0, 1.0, 2.0), + lat_deg: float = 50.10, + lon_deg: float = 36.10, + alt_m: float = 100.0, +) -> None: + """Stub the orchestrator's ground-truth loader so unit tests skip MAVLink. + + The orchestrator imports ``load_tlog_ground_truth`` from + ``gps_denied_onboard.replay_input``; patching the symbol *as + bound on the orchestrator module* keeps the patch local to the + unit suite (no cross-test bleed). + """ + + fixes = [ + _StubGpsFix( + ts_ns=int(t * 1e9), + lat_deg=lat_deg, + lon_deg=lon_deg, + alt_m=alt_m, + ) + for t in times_s + ] + series = _StubGpsSeries(records=tuple(fixes)) + monkeypatch.setattr( + "tests.e2e.replay._e2e_orchestrator.load_tlog_ground_truth", + lambda *_args, **_kwargs: series, + ) + + +class _StubGpsFix: + """Mirrors the fields the orchestrator reads from each tlog row.""" + + __slots__ = ("ts_ns", "lat_deg", "lon_deg", "alt_m") + + def __init__( + self, *, ts_ns: int, lat_deg: float, lon_deg: float, alt_m: float + ) -> None: + self.ts_ns = ts_ns + self.lat_deg = lat_deg + self.lon_deg = lon_deg + self.alt_m = alt_m + + +class _StubGpsSeries: + """Drop-in replacement for :class:`TlogGroundTruth`.""" + + def __init__(self, *, records: tuple[_StubGpsFix, ...]) -> None: + self.records = records + + +def _build_runner_emitting( + output_path: Path, + *, + rows: list[dict[str, object]], + returncode: int = 0, + stdout: str = "", + stderr: str = "", +) -> "MagicMock": + """Return a fake ``subprocess.run`` that writes JSONL on call.""" + + def _run(argv, **kwargs): # type: ignore[no-untyped-def] + if rows: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + "\n".join(json.dumps(row) for row in rows) + "\n" + ) + return subprocess.CompletedProcess( + args=argv, + returncode=returncode, + stdout=stdout, + stderr=stderr, + ) + + return MagicMock(side_effect=_run) + + +# ---------------------------------------------------------------------- +# write_effective_replay_config + + +def test_write_effective_replay_config_overlays_root_dir( + tmp_path: Path, +) -> None: + # Arrange + inputs = _stage_inputs(tmp_path) + cache_root = tmp_path / "cache" + cache_root.mkdir() + output_path = tmp_path / "effective.yaml" + + # Act + written_path = write_effective_replay_config( + base_config_path=inputs["base_config_path"], + cache_root=cache_root, + output_path=output_path, + ) + + # Assert + assert written_path == output_path + merged = yaml.safe_load(output_path.read_text()) + assert merged["c6_tile_cache"]["root_dir"] == str(cache_root) + assert merged["c6_tile_cache"]["faiss_index_path"] == "" + assert merged["mode"] == "replay" + assert ( + merged["c6_tile_cache"]["store_runtime"] == "postgres_filesystem" + ), "non-overridden c6_tile_cache fields must survive" + + +def test_write_effective_replay_config_creates_block_when_absent( + tmp_path: Path, +) -> None: + # Arrange + base = tmp_path / "operator.yaml" + base.write_text(yaml.safe_dump({"mode": "replay"})) + cache_root = tmp_path / "cache" + cache_root.mkdir() + + # Act + write_effective_replay_config( + base_config_path=base, + cache_root=cache_root, + output_path=tmp_path / "effective.yaml", + ) + + # Assert + merged = yaml.safe_load((tmp_path / "effective.yaml").read_text()) + assert merged["c6_tile_cache"]["root_dir"] == str(cache_root) + + +def test_write_effective_replay_config_malformed_yaml_fails( + tmp_path: Path, +) -> None: + # Arrange + base = tmp_path / "bad.yaml" + base.write_text(":\n : not yaml:") + cache_root = tmp_path / "cache" + cache_root.mkdir() + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + write_effective_replay_config( + base_config_path=base, + cache_root=cache_root, + output_path=tmp_path / "effective.yaml", + ) + assert exc_info.value.step is OrchestratorStep.WRITE_EFFECTIVE_CONFIG + + +def test_write_effective_replay_config_non_mapping_top_level_fails( + tmp_path: Path, +) -> None: + # Arrange + base = tmp_path / "bad.yaml" + base.write_text("- not a mapping\n") + cache_root = tmp_path / "cache" + cache_root.mkdir() + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + write_effective_replay_config( + base_config_path=base, + cache_root=cache_root, + output_path=tmp_path / "effective.yaml", + ) + assert exc_info.value.step is OrchestratorStep.WRITE_EFFECTIVE_CONFIG + + +# ---------------------------------------------------------------------- +# read_calibration_acquisition_method + + +def test_read_calibration_acquisition_method_returns_field_when_present( + tmp_path: Path, +) -> None: + # Arrange + path = tmp_path / "cal.json" + path.write_text(json.dumps({"acquisition_method": "factory-sheet"})) + + # Assert + assert read_calibration_acquisition_method(path) == "factory-sheet" + + +def test_read_calibration_acquisition_method_returns_unknown_on_missing( + tmp_path: Path, +) -> None: + # Arrange + path = tmp_path / "cal.json" + path.write_text(json.dumps({"some_other_field": True})) + + # Assert + assert read_calibration_acquisition_method(path) == "unknown" + + +def test_read_calibration_acquisition_method_returns_unknown_on_malformed( + tmp_path: Path, +) -> None: + # Arrange + path = tmp_path / "cal.json" + path.write_text("{not valid json") + + # Assert + assert read_calibration_acquisition_method(path) == "unknown" + + +# ---------------------------------------------------------------------- +# run_e2e_orchestration — param validation (AC-5) + + +def test_run_e2e_orchestration_missing_tlog_fails_loud( + tmp_path: Path, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + inputs["tlog_path"].unlink() + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=tmp_path / "out.jsonl", + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.VALIDATE_INPUTS + assert "tlog_path" in str(exc_info.value) + + +def test_run_e2e_orchestration_missing_binary_fails_loud( + tmp_path: Path, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + inputs["replay_binary"].unlink() + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=tmp_path / "out.jsonl", + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.VALIDATE_INPUTS + assert "replay_binary" in str(exc_info.value) + + +# ---------------------------------------------------------------------- +# run_e2e_orchestration — subprocess error propagation (AC-5) + + +def test_run_e2e_orchestration_replay_nonzero_exit_fails_loud( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + runner = MagicMock( + return_value=subprocess.CompletedProcess( + args=[], + returncode=1, + stdout="", + stderr="boom", + ) + ) + _ground_truth_tlog_loader(monkeypatch) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.AIRBORNE_PIPELINE + assert "exited 1" in str(exc_info.value) + assert "boom" in str(exc_info.value) + + +def test_run_e2e_orchestration_replay_timeout_fails_loud( + tmp_path: Path, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + + def _timeout(*_args, **_kwargs): + raise subprocess.TimeoutExpired(cmd=["replay"], timeout=0.1) + + runner = MagicMock(side_effect=_timeout) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=tmp_path / "out.jsonl", + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + max_seconds=0.1, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.AIRBORNE_PIPELINE + assert "timed out" in str(exc_info.value) + + +def test_run_e2e_orchestration_replay_oserror_fails_loud( + tmp_path: Path, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + + def _oserror(*_args, **_kwargs): + raise OSError("permission denied") + + runner = MagicMock(side_effect=_oserror) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=tmp_path / "out.jsonl", + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.AIRBORNE_PIPELINE + assert "permission denied" in str(exc_info.value) + + +# ---------------------------------------------------------------------- +# run_e2e_orchestration — empty / malformed JSONL (AC-5) + + +def test_run_e2e_orchestration_empty_jsonl_fails_loud( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + + def _runner(argv, **_kwargs): # type: ignore[no-untyped-def] + output_path.write_text("\n\n") # only blanks + return subprocess.CompletedProcess(args=argv, returncode=0, stdout="", stderr="") + + runner = MagicMock(side_effect=_runner) + _ground_truth_tlog_loader(monkeypatch) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.PARSE_EMISSIONS + + +def test_run_e2e_orchestration_malformed_jsonl_fails_loud( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + + def _runner(argv, **_kwargs): # type: ignore[no-untyped-def] + output_path.write_text('{"valid": true}\nnot a json line\n') + return subprocess.CompletedProcess(args=argv, returncode=0, stdout="", stderr="") + + runner = MagicMock(side_effect=_runner) + _ground_truth_tlog_loader(monkeypatch) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.PARSE_EMISSIONS + + +# ---------------------------------------------------------------------- +# run_e2e_orchestration — ground truth loader failure (AC-5) + + +def test_run_e2e_orchestration_ground_truth_loader_failure_fails_loud( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + runner = _build_runner_emitting( + output_path, + rows=[ + { + "emitted_at": int(0.5 * 1e9), + "position_wgs84": { + "lat_deg": 50.10, + "lon_deg": 36.10, + "alt_m": 100.0, + }, + } + ], + ) + + def _raise(*_args, **_kwargs): + raise ValueError("tlog corrupt") + + monkeypatch.setattr( + "tests.e2e.replay._e2e_orchestrator.load_tlog_ground_truth", + _raise, + ) + + # Act + Assert + with pytest.raises(OrchestrationFailure) as exc_info: + run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=tmp_path / "metrics", + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + **inputs, # type: ignore[arg-type] + ) + assert exc_info.value.step is OrchestratorStep.LOAD_GROUND_TRUTH + assert "tlog corrupt" in str(exc_info.value) + + +# ---------------------------------------------------------------------- +# run_e2e_orchestration — happy path (AC-1 / AC-2) + + +def test_run_e2e_orchestration_happy_path_writes_report( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + report_dir = tmp_path / "metrics" + effective_config_path = tmp_path / "eff.yaml" + rows = [ + { + "emitted_at": int(0.5 * 1e9), + "position_wgs84": {"lat_deg": 50.10, "lon_deg": 36.10, "alt_m": 100.0}, + }, + { + "emitted_at": int(1.5 * 1e9), + "position_wgs84": {"lat_deg": 50.10, "lon_deg": 36.10, "alt_m": 100.0}, + }, + ] + runner = _build_runner_emitting(output_path, rows=rows) + _ground_truth_tlog_loader(monkeypatch) + + # Act + report = run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=report_dir, + effective_config_path=effective_config_path, + runner=runner, + run_date_utc="2026-05-23", + **inputs, # type: ignore[arg-type] + ) + + # Assert + assert isinstance(report, OrchestrationReport) + assert report.report_path.is_file() + assert report.emissions_count == 2 + assert report.distribution.count == 2 + assert report.verdict_passed is True + body = report.report_path.read_text() + assert "## Horizontal error (metres)" in body + assert "## Threshold-hit share" in body + assert f"| {AC3_GATE_THRESHOLD_M:g} |" in body + runner.assert_called_once() + argv_passed = runner.call_args.args[0] + assert str(effective_config_path) in argv_passed + assert "--auto-trim" in argv_passed + merged = yaml.safe_load(effective_config_path.read_text()) + assert merged["c6_tile_cache"]["root_dir"] == str(cache.cache_root) + + +def test_run_e2e_orchestration_writes_report_even_on_fail_verdict( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — emissions are 1 km from ground truth, far above the 100 m gate. + cache = _build_populated_cache(tmp_path) + inputs = _stage_inputs(tmp_path) + output_path = tmp_path / "out.jsonl" + report_dir = tmp_path / "metrics" + rows = [ + { + "emitted_at": int(0.5 * 1e9), + "position_wgs84": {"lat_deg": 50.110, "lon_deg": 36.110, "alt_m": 100.0}, + }, + { + "emitted_at": int(1.5 * 1e9), + "position_wgs84": {"lat_deg": 50.110, "lon_deg": 36.110, "alt_m": 100.0}, + }, + ] + runner = _build_runner_emitting(output_path, rows=rows) + _ground_truth_tlog_loader(monkeypatch) + + # Act + report = run_e2e_orchestration( + populated_cache=cache, + output_path=output_path, + report_dir=report_dir, + effective_config_path=tmp_path / "eff.yaml", + runner=runner, + run_date_utc="2026-05-23", + **inputs, # type: ignore[arg-type] + ) + + # Assert — AC-2: report exists regardless of PASS/FAIL. + assert report.verdict_passed is False + assert report.report_path.is_file() + assert "FAIL" in report.report_path.read_text()