mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 09:41:13 +00:00
a3dc8e2636
ReportContext.tlog_path was widened in-place by AZ-959 to mean "ground-truth source path" without renaming, leaving the rendered report's "- Tlog: <csv_path>" line cosmetically wrong for CSV runs. This rename + label fix completes the cleanup. - helpers/accuracy_report.py: field rename + docstring update + rendered line now reads "- Ground truth: <path>" for both inputs. - replay_api/app.py: kwarg updated, AZ-959 inline comment about the overload removed (field name now carries the intent). - tests/unit/test_az699_report_writer.py: fixture updated, two new symmetric tests assert the canonical label for tlog AND csv inputs (AC-2). - tests/e2e/replay/_e2e_orchestrator.py + test_derkachi_real_tlog.py: kwarg updated. Tests: 62/62 green across test_az699_report_writer.py, test_az700_render_map.py, test_az701_replay_api.py. CSV-replay-input chain (AZ-959 + AZ-960 + AZ-961) is now coherent: - API accepts (video, csv) with XOR validation - /static/example-csv serves the AZ-896 reference doc - Runner dispatches --imu vs --tlog argv - Report renders with source-agnostic "Ground truth:" label - Map renders from CSV truth via gps-denied-render-map dispatch Bookkeeping: AZ-961 spec moved todo/ → done/, dep-table preamble eighth bump documents the rename + summarises the cycle-4 CSV chain, state.md records batch 7 complete. Co-authored-by: Cursor <cursoragent@cursor.com>
656 lines
22 KiB
Python
656 lines
22 KiB
Python
"""E2E orchestrator for the AZ-835 7-step pipeline (AZ-840 / Epic AZ-835 C4).
|
|
|
|
Wraps the AZ-699 verdict-report writing path with the AZ-839 C3
|
|
fixture's `PopulatedC6Cache` so a single Tier-2 test can run from
|
|
``(tlog, video, calibration)`` to a horizontal-error report without
|
|
operator hand-curation between steps. The 7-step Epic narrative
|
|
(``_docs/02_tasks/todo/AZ-840_e2e_orchestrator_test.md``):
|
|
|
|
1. Active flight cut + tlog/video sync — handled by ``gps-denied-replay``
|
|
``--auto-trim`` (AZ-405 / AZ-698) inside the airborne binary.
|
|
2. On-fly frame + IMU extraction — same binary's per-frame loop.
|
|
3. Auto-create route — done by the C3 fixture
|
|
(``operator_pre_flight_setup`` calls ``extract_route_from_tlog``).
|
|
4. POST route to satellite-provider — C3 fixture (AZ-838
|
|
``SatelliteProviderRouteClient.seed_route``).
|
|
5. Build FAISS index — C3 fixture (AZ-322 ``DescriptorBatcher``).
|
|
6. Run gps-denied airborne pipeline — this module's
|
|
``_run_replay_subprocess`` invokes ``gps-denied-replay`` against
|
|
the populated cache.
|
|
7. Get GPS fixes, check vs tlog GPS — this module's
|
|
``_load_ground_truth`` + ``horizontal_error_distribution`` +
|
|
``render_report`` writes the verdict markdown.
|
|
|
|
The C3 fixture mutates ``c6_tile_cache.root_dir`` to point at a
|
|
``tmp_path_factory.mktemp`` value (AZ-839 batch 108b). The static
|
|
operator YAML at ``GPS_DENIED_OPERATOR_CONFIG_PATH`` cannot know
|
|
that path. ``write_effective_replay_config`` reads the static YAML,
|
|
overlays the ``c6_tile_cache.root_dir`` override, writes the merged
|
|
result to a tmp file, and returns the path the airborne binary
|
|
will load via ``--config``. This keeps a single source of truth
|
|
for the cache_root override across the in-memory C3 fixture path
|
|
and the subprocess airborne path.
|
|
|
|
Public surface — re-exported from this module:
|
|
|
|
* :class:`OrchestratorStep` — failure-step labels per AC-5 ("fails
|
|
LOUD with a clear error pointing at the failing step").
|
|
* :class:`OrchestrationFailure` — wraps the underlying exception
|
|
with the step that produced it.
|
|
* :class:`OrchestrationReport` — return value of
|
|
:func:`run_e2e_orchestration` (verdict, distribution, paths,
|
|
wall-clock measurements per AC-4).
|
|
* :func:`write_effective_replay_config` — small helper for the
|
|
config merge step.
|
|
* :func:`run_e2e_orchestration` — the AC-1 entry point.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import time
|
|
from collections.abc import Callable, Mapping
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
from gps_denied_onboard.helpers.accuracy_report import (
|
|
AC3_GATE_PCT,
|
|
AC3_GATE_THRESHOLD_M,
|
|
ReportContext,
|
|
render_report,
|
|
verdict_passes_ac3,
|
|
)
|
|
from gps_denied_onboard.helpers.gps_compare import (
|
|
GroundTruthRow,
|
|
HorizontalErrorDistribution,
|
|
horizontal_error_distribution,
|
|
)
|
|
from gps_denied_onboard.replay_input import load_tlog_ground_truth
|
|
|
|
from tests.e2e.replay._operator_pre_flight import PopulatedC6Cache
|
|
|
|
__all__ = [
|
|
"OrchestrationFailure",
|
|
"OrchestrationReport",
|
|
"OrchestratorStep",
|
|
"read_calibration_acquisition_method",
|
|
"run_e2e_orchestration",
|
|
"write_effective_replay_config",
|
|
]
|
|
|
|
|
|
# Replay-subprocess wall-clock cap for the Derkachi clip per AZ-840
|
|
# AC-4 (15 min soft target). Exposed as a default that the integration
|
|
# test can override; the unit tests rely on the contract that the
|
|
# runner argument is a free callable.
|
|
_DEFAULT_MAX_SECONDS: float = 900.0
|
|
|
|
_LOGGER = logging.getLogger("tests.e2e.replay.e2e_orchestrator")
|
|
|
|
|
|
class OrchestratorStep(str, Enum):
|
|
"""Labels for the 7-step pipeline used by :class:`OrchestrationFailure`.
|
|
|
|
AC-5: every failure that reaches the test surface must name the
|
|
step that produced it. The string values are stable so test
|
|
assertions and log readers can match on them.
|
|
"""
|
|
|
|
VALIDATE_INPUTS = "validate_inputs"
|
|
WRITE_EFFECTIVE_CONFIG = "write_effective_config"
|
|
AIRBORNE_PIPELINE = "airborne_pipeline"
|
|
PARSE_EMISSIONS = "parse_emissions"
|
|
LOAD_GROUND_TRUTH = "load_ground_truth"
|
|
COMPUTE_DISTRIBUTION = "compute_distribution"
|
|
RENDER_REPORT = "render_report"
|
|
|
|
|
|
class OrchestrationFailure(RuntimeError):
|
|
"""Failure inside one of the 7 orchestration steps (AC-5).
|
|
|
|
The :attr:`step` attribute names the failing step; the message
|
|
embeds it as the prefix so plain log-readers see the failure
|
|
location without inspecting the exception object.
|
|
"""
|
|
|
|
def __init__(self, step: OrchestratorStep, message: str) -> None:
|
|
super().__init__(f"[{step.value}] {message}")
|
|
self.step = step
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class OrchestrationReport:
|
|
"""Return value of :func:`run_e2e_orchestration`.
|
|
|
|
Attributes:
|
|
verdict_passed: ``True`` iff the run met the AZ-696 epic
|
|
AC-3 gate (>= AC3_GATE_PCT% within AC3_GATE_THRESHOLD_M m).
|
|
distribution: Computed horizontal-error distribution.
|
|
report_path: Markdown report written under ``report_dir``.
|
|
emissions_count: Total estimator-output records consumed.
|
|
wall_clock_s: Wall-clock seconds for the orchestration run
|
|
(excludes the C3 fixture setup; covers steps 1-2-6-7).
|
|
replay_subprocess_seconds: Wall-clock seconds the airborne
|
|
replay subprocess took. Always <= ``wall_clock_s``.
|
|
"""
|
|
|
|
verdict_passed: bool
|
|
distribution: HorizontalErrorDistribution
|
|
report_path: Path
|
|
emissions_count: int
|
|
wall_clock_s: float
|
|
replay_subprocess_seconds: float
|
|
|
|
|
|
def read_calibration_acquisition_method(calibration_path: Path) -> str:
|
|
"""Return the AZ-702 ``acquisition_method`` field, or ``"unknown"``.
|
|
|
|
Mirrors ``test_derkachi_real_tlog._read_calibration_acquisition_method``
|
|
so the AZ-840 verdict report can name the calibration provenance
|
|
in its failure message (AZ-699 AC-3). Pure helper; the report
|
|
writer needs the string, not the JSON.
|
|
"""
|
|
try:
|
|
data = json.loads(calibration_path.read_text())
|
|
except (OSError, json.JSONDecodeError):
|
|
return "unknown"
|
|
method = data.get("acquisition_method")
|
|
if isinstance(method, str) and method:
|
|
return method
|
|
return "unknown"
|
|
|
|
|
|
def write_effective_replay_config(
|
|
*,
|
|
base_config_path: Path,
|
|
cache_root: Path,
|
|
output_path: Path,
|
|
) -> Path:
|
|
"""Merge cache_root override into the static operator YAML.
|
|
|
|
Reads ``base_config_path`` as YAML, sets the
|
|
``c6_tile_cache.root_dir`` to ``cache_root`` (forcing the
|
|
FAISS index path to fall back to ``<cache_root>/descriptor.index``),
|
|
and writes the merged document to ``output_path`` as YAML.
|
|
|
|
The merge is field-level: every other block in the base YAML is
|
|
preserved verbatim. This keeps a single source of truth for the
|
|
operator config — the test harness only contributes the dynamic
|
|
cache_root.
|
|
|
|
Returns:
|
|
The ``output_path`` argument, for ergonomic chaining.
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=WRITE_EFFECTIVE_CONFIG): Base YAML
|
|
unreadable, malformed, or not a top-level mapping.
|
|
"""
|
|
|
|
try:
|
|
base_text = base_config_path.read_text()
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.WRITE_EFFECTIVE_CONFIG,
|
|
f"cannot read base config at {base_config_path}: {exc!r}",
|
|
) from exc
|
|
|
|
try:
|
|
base_data = yaml.safe_load(base_text) or {}
|
|
except yaml.YAMLError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.WRITE_EFFECTIVE_CONFIG,
|
|
f"base config YAML at {base_config_path} is malformed: {exc!r}",
|
|
) from exc
|
|
if not isinstance(base_data, dict):
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.WRITE_EFFECTIVE_CONFIG,
|
|
f"base config YAML at {base_config_path} must be a mapping; "
|
|
f"got {type(base_data).__name__}",
|
|
)
|
|
|
|
c6_block_raw = base_data.get("c6_tile_cache")
|
|
c6_block = dict(c6_block_raw) if isinstance(c6_block_raw, dict) else {}
|
|
c6_block["root_dir"] = str(cache_root)
|
|
c6_block["faiss_index_path"] = ""
|
|
base_data["c6_tile_cache"] = c6_block
|
|
|
|
try:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(
|
|
yaml.safe_dump(base_data, sort_keys=True, default_flow_style=False)
|
|
)
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.WRITE_EFFECTIVE_CONFIG,
|
|
f"cannot write effective config at {output_path}: {exc!r}",
|
|
) from exc
|
|
return output_path
|
|
|
|
|
|
def run_e2e_orchestration(
|
|
*,
|
|
populated_cache: PopulatedC6Cache,
|
|
base_config_path: Path,
|
|
tlog_path: Path,
|
|
video_path: Path,
|
|
calibration_path: Path,
|
|
signing_key_path: Path,
|
|
replay_binary: Path,
|
|
output_path: Path,
|
|
report_dir: Path,
|
|
effective_config_path: Path,
|
|
run_date_utc: str | None = None,
|
|
runner: Callable[..., subprocess.CompletedProcess[str]] = subprocess.run,
|
|
subprocess_env: Mapping[str, str] | None = None,
|
|
max_seconds: float = _DEFAULT_MAX_SECONDS,
|
|
logger: logging.Logger | None = None,
|
|
) -> OrchestrationReport:
|
|
"""Run AZ-835 steps 1-7 against the AZ-839 populated cache.
|
|
|
|
Steps 3-5 are the responsibility of ``populated_cache`` (the
|
|
AZ-839 C3 fixture); this function covers 1-2-6 (the airborne
|
|
replay subprocess) and 7 (verdict report). The C3 fixture and
|
|
this function share the cache_root via
|
|
:func:`write_effective_replay_config` so the airborne binary
|
|
reads the same FAISS index the fixture wrote (AC-3).
|
|
|
|
Args:
|
|
populated_cache: C3 fixture output (AZ-839). Carries
|
|
``cache_root``, ``faiss_index_path``, and the route
|
|
spec the test pipeline produced.
|
|
base_config_path: Static operator config YAML
|
|
(``GPS_DENIED_OPERATOR_CONFIG_PATH``). Must register
|
|
``c6_tile_cache``, ``c10_provisioning``, ``c2_vpr``,
|
|
``c4_pose``, and ``c5_state`` blocks for the airborne
|
|
binary to compose the replay graph.
|
|
tlog_path: ArduPilot binary tlog the test consumes.
|
|
video_path: Flight video file the test consumes.
|
|
calibration_path: Camera calibration JSON (AZ-702
|
|
factory-sheet for Derkachi).
|
|
signing_key_path: MAVLink signing-key file. Replay protocol
|
|
Invariant 11 — required even for the noop transport.
|
|
replay_binary: ``gps-denied-replay`` console-script path.
|
|
output_path: Where the airborne binary writes JSONL
|
|
estimator emissions.
|
|
report_dir: Directory the verdict markdown is written to.
|
|
effective_config_path: Where the cache_root-merged YAML is
|
|
written. The path is passed to the airborne binary via
|
|
``--config``.
|
|
run_date_utc: ISO-8601 date for the report filename and
|
|
header. Defaults to today UTC.
|
|
runner: ``subprocess.run`` by default; tests inject a fake
|
|
that emits a synthetic JSONL output.
|
|
subprocess_env: Optional environment overlay for the
|
|
replay subprocess. ``None`` means ``os.environ``.
|
|
max_seconds: Hard wall-clock cap for the airborne replay
|
|
subprocess. The orchestrator times out the runner via
|
|
its ``timeout`` kwarg; an exceeded budget surfaces as
|
|
``OrchestrationFailure(step=AIRBORNE_PIPELINE)``.
|
|
logger: Optional logger. Defaults to the module logger.
|
|
|
|
Returns:
|
|
:class:`OrchestrationReport` on success. The verdict can
|
|
be PASS or FAIL — AC-2 mandates the report exists either
|
|
way.
|
|
|
|
Raises:
|
|
OrchestrationFailure: Any of the 7 steps failed. The
|
|
``step`` attribute names the failing step.
|
|
"""
|
|
|
|
log = logger or _LOGGER
|
|
started = time.monotonic()
|
|
effective_run_date = run_date_utc or (
|
|
datetime.datetime.now(datetime.timezone.utc).date().isoformat()
|
|
)
|
|
|
|
_validate_inputs(
|
|
base_config_path=base_config_path,
|
|
tlog_path=tlog_path,
|
|
video_path=video_path,
|
|
calibration_path=calibration_path,
|
|
signing_key_path=signing_key_path,
|
|
replay_binary=replay_binary,
|
|
report_dir=report_dir,
|
|
)
|
|
|
|
write_effective_replay_config(
|
|
base_config_path=base_config_path,
|
|
cache_root=populated_cache.cache_root,
|
|
output_path=effective_config_path,
|
|
)
|
|
|
|
replay_subprocess_seconds = _run_replay_subprocess(
|
|
replay_binary=replay_binary,
|
|
video_path=video_path,
|
|
tlog_path=tlog_path,
|
|
output_path=output_path,
|
|
calibration_path=calibration_path,
|
|
config_path=effective_config_path,
|
|
signing_key_path=signing_key_path,
|
|
max_seconds=max_seconds,
|
|
runner=runner,
|
|
env=subprocess_env,
|
|
logger=log,
|
|
)
|
|
|
|
emissions = _parse_jsonl(output_path)
|
|
|
|
ground_truth = _load_ground_truth(tlog_path)
|
|
|
|
distribution = _compute_distribution(emissions, ground_truth)
|
|
|
|
context = ReportContext(
|
|
run_date_utc=effective_run_date,
|
|
ground_truth_path=tlog_path,
|
|
video_path=video_path,
|
|
calibration_acquisition_method=read_calibration_acquisition_method(
|
|
calibration_path
|
|
),
|
|
clip_duration_s=(
|
|
ground_truth[-1].t_s - ground_truth[0].t_s
|
|
if ground_truth
|
|
else 0.0
|
|
),
|
|
emissions_count=len(emissions),
|
|
)
|
|
verdict_passed = verdict_passes_ac3(distribution)
|
|
report_path = _render_and_write_report(
|
|
distribution=distribution,
|
|
context=context,
|
|
passed=verdict_passed,
|
|
report_dir=report_dir,
|
|
)
|
|
|
|
log.info(
|
|
"e2e_orchestrator: report written",
|
|
extra={
|
|
"kind": "e2e_orchestrator.report_written",
|
|
"kv": {
|
|
"report_path": str(report_path),
|
|
"verdict_passed": verdict_passed,
|
|
"share_within_threshold_pct": (
|
|
distribution.threshold_hit_share.get(
|
|
AC3_GATE_THRESHOLD_M, 0.0
|
|
)
|
|
* 100.0
|
|
),
|
|
"ac3_gate_pct": AC3_GATE_PCT,
|
|
"emissions_count": len(emissions),
|
|
"ground_truth_pairings": distribution.count,
|
|
},
|
|
},
|
|
)
|
|
|
|
wall_clock_s = max(0.0, time.monotonic() - started)
|
|
return OrchestrationReport(
|
|
verdict_passed=verdict_passed,
|
|
distribution=distribution,
|
|
report_path=report_path,
|
|
emissions_count=len(emissions),
|
|
wall_clock_s=wall_clock_s,
|
|
replay_subprocess_seconds=replay_subprocess_seconds,
|
|
)
|
|
|
|
|
|
def _validate_inputs(
|
|
*,
|
|
base_config_path: Path,
|
|
tlog_path: Path,
|
|
video_path: Path,
|
|
calibration_path: Path,
|
|
signing_key_path: Path,
|
|
replay_binary: Path,
|
|
report_dir: Path,
|
|
) -> None:
|
|
"""Fail fast on missing inputs (AC-5 — surface the failing step early)."""
|
|
file_inputs: tuple[tuple[str, Path], ...] = (
|
|
("base_config_path", base_config_path),
|
|
("tlog_path", tlog_path),
|
|
("video_path", video_path),
|
|
("calibration_path", calibration_path),
|
|
("signing_key_path", signing_key_path),
|
|
("replay_binary", replay_binary),
|
|
)
|
|
for label, path in file_inputs:
|
|
if not path.is_file():
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.VALIDATE_INPUTS,
|
|
f"{label} is not a file: {path}",
|
|
)
|
|
try:
|
|
report_dir.mkdir(parents=True, exist_ok=True)
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.VALIDATE_INPUTS,
|
|
f"report_dir {report_dir} cannot be created: {exc!r}",
|
|
) from exc
|
|
|
|
|
|
def _run_replay_subprocess(
|
|
*,
|
|
replay_binary: Path,
|
|
video_path: Path,
|
|
tlog_path: Path,
|
|
output_path: Path,
|
|
calibration_path: Path,
|
|
config_path: Path,
|
|
signing_key_path: Path,
|
|
max_seconds: float,
|
|
runner: Callable[..., subprocess.CompletedProcess[str]],
|
|
env: Mapping[str, str] | None,
|
|
logger: logging.Logger,
|
|
) -> float:
|
|
"""Invoke gps-denied-replay with --auto-trim; return wall-clock seconds.
|
|
|
|
Wraps :class:`subprocess.run` so unit tests can inject a fake
|
|
runner. ``--auto-trim`` is always enabled here — the
|
|
orchestrator owns the AZ-405 / AZ-698 sync path (AZ-840 step 1).
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=AIRBORNE_PIPELINE): Non-zero exit,
|
|
timeout, or runner-level OSError.
|
|
"""
|
|
|
|
argv = [
|
|
str(replay_binary),
|
|
"--video",
|
|
str(video_path),
|
|
"--tlog",
|
|
str(tlog_path),
|
|
"--output",
|
|
str(output_path),
|
|
"--camera-calibration",
|
|
str(calibration_path),
|
|
"--config",
|
|
str(config_path),
|
|
"--mavlink-signing-key",
|
|
str(signing_key_path),
|
|
"--pace",
|
|
"asap",
|
|
"--auto-trim",
|
|
]
|
|
started = time.monotonic()
|
|
try:
|
|
completed = runner(
|
|
argv,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=max_seconds,
|
|
env=dict(env) if env is not None else None,
|
|
)
|
|
except subprocess.TimeoutExpired as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.AIRBORNE_PIPELINE,
|
|
f"gps-denied-replay timed out after {max_seconds:.0f} s",
|
|
) from exc
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.AIRBORNE_PIPELINE,
|
|
f"cannot launch gps-denied-replay at {replay_binary}: {exc!r}",
|
|
) from exc
|
|
|
|
elapsed_s = max(0.0, time.monotonic() - started)
|
|
if completed.returncode != 0:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.AIRBORNE_PIPELINE,
|
|
f"gps-denied-replay exited {completed.returncode}\n"
|
|
f"stdout:\n{completed.stdout}\nstderr:\n{completed.stderr}",
|
|
)
|
|
logger.info(
|
|
"e2e_orchestrator: replay subprocess complete",
|
|
extra={
|
|
"kind": "e2e_orchestrator.replay_subprocess",
|
|
"kv": {
|
|
"elapsed_s": elapsed_s,
|
|
"max_seconds": max_seconds,
|
|
},
|
|
},
|
|
)
|
|
return elapsed_s
|
|
|
|
|
|
def _parse_jsonl(path: Path) -> list[dict[str, Any]]:
|
|
"""Read one JSON record per non-blank line.
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=PARSE_EMISSIONS): Output file
|
|
missing, unreadable, has zero records, or contains a
|
|
malformed line.
|
|
"""
|
|
if not path.is_file():
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.PARSE_EMISSIONS,
|
|
f"replay output JSONL not found: {path}",
|
|
)
|
|
try:
|
|
text = path.read_text()
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.PARSE_EMISSIONS,
|
|
f"replay output JSONL unreadable at {path}: {exc!r}",
|
|
) from exc
|
|
rows: list[dict[str, Any]] = []
|
|
for line_idx, line in enumerate(text.splitlines(), start=1):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.PARSE_EMISSIONS,
|
|
f"malformed JSON at line {line_idx} of {path}: {exc.msg}",
|
|
) from exc
|
|
if not isinstance(row, dict):
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.PARSE_EMISSIONS,
|
|
f"line {line_idx} of {path} is not a JSON object: {row!r}",
|
|
)
|
|
rows.append(row)
|
|
if not rows:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.PARSE_EMISSIONS,
|
|
f"replay output JSONL at {path} has zero records — pipeline "
|
|
"produced no estimator emissions",
|
|
)
|
|
return rows
|
|
|
|
|
|
def _load_ground_truth(tlog_path: Path) -> list[GroundTruthRow]:
|
|
"""Extract WGS84 ground truth from the binary tlog.
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=LOAD_GROUND_TRUTH): Loader
|
|
error or empty record list.
|
|
"""
|
|
try:
|
|
series = load_tlog_ground_truth(tlog_path).records
|
|
except Exception as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.LOAD_GROUND_TRUTH,
|
|
f"load_tlog_ground_truth({tlog_path}) failed: {exc!r}",
|
|
) from exc
|
|
rows: list[GroundTruthRow] = [
|
|
GroundTruthRow(
|
|
t_s=fix.ts_ns / 1e9,
|
|
lat_deg=fix.lat_deg,
|
|
lon_deg=fix.lon_deg,
|
|
alt_m=fix.alt_m,
|
|
)
|
|
for fix in series
|
|
]
|
|
if not rows:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.LOAD_GROUND_TRUTH,
|
|
f"tlog ground truth at {tlog_path} has zero rows",
|
|
)
|
|
return rows
|
|
|
|
|
|
def _compute_distribution(
|
|
emissions: list[dict[str, Any]],
|
|
ground_truth: list[GroundTruthRow],
|
|
) -> HorizontalErrorDistribution:
|
|
"""Compute the horizontal-error distribution.
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=COMPUTE_DISTRIBUTION): Helper
|
|
error or zero ground-truth pairings (every emission
|
|
fell outside the GT time window).
|
|
"""
|
|
try:
|
|
distribution = horizontal_error_distribution(emissions, ground_truth)
|
|
except Exception as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.COMPUTE_DISTRIBUTION,
|
|
f"horizontal_error_distribution failed: {exc!r}",
|
|
) from exc
|
|
if distribution.count == 0:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.COMPUTE_DISTRIBUTION,
|
|
"no emissions paired with ground truth — JSONL timestamps "
|
|
"outside the tlog GPS window?",
|
|
)
|
|
return distribution
|
|
|
|
|
|
def _render_and_write_report(
|
|
*,
|
|
distribution: HorizontalErrorDistribution,
|
|
context: ReportContext,
|
|
passed: bool,
|
|
report_dir: Path,
|
|
) -> Path:
|
|
"""Render the verdict markdown and write it to ``report_dir``.
|
|
|
|
Raises:
|
|
OrchestrationFailure (step=RENDER_REPORT): Render or write
|
|
failure; ``report_dir`` was already created by
|
|
:func:`_validate_inputs`.
|
|
"""
|
|
try:
|
|
report_text = render_report(distribution, context, passed=passed)
|
|
except Exception as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.RENDER_REPORT,
|
|
f"render_report failed: {exc!r}",
|
|
) from exc
|
|
report_path = (
|
|
report_dir / f"real_flight_validation_{context.run_date_utc}.md"
|
|
)
|
|
try:
|
|
report_path.write_text(report_text)
|
|
except OSError as exc:
|
|
raise OrchestrationFailure(
|
|
OrchestratorStep.RENDER_REPORT,
|
|
f"cannot write report at {report_path}: {exc!r}",
|
|
) from exc
|
|
return report_path
|