From 2c31cc094fddc9dace0f14774e159cc8429782e1 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Thu, 14 May 2026 20:04:37 +0300 Subject: [PATCH] =?UTF-8?q?[AZ-402]=20Replay=20=E2=80=94=20gps-denied-repl?= =?UTF-8?q?ay=20console-script=20+=20shared=20main(config)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the replay-mode CLI dispatcher per ADR-011 (replay-as- configuration): - src/gps_denied_onboard/cli/replay.py: argparse with all 6 required args (--video, --tlog, --output, --camera-calibration, --config, --mavlink-signing-key) plus --pace and --time-offset-ms; path validation, calibration JSON schema-validation, config mutation (mode='replay' + replay sub-block + signing-key hex on dev_static field), dispatch into runtime_root.main(config). - runtime_root.main() now accepts an optional Config (additive, backward-compat). Adds dedicated catch for ReplayInputAdapterError mapping to EXIT_FDR_OPEN_FAILURE (2) so the CLI's exit-code matrix holds end-to-end (AC-9 + epic AZ-265 AC-8). - Signing-key contents stored as hex; redacted in startup banner. - Top-level except logs full traceback via logger.exception + stderr print and exits 1. The CLI does NOT call compose_root directly — it builds a Config and hands it to the shared airborne main, which calls compose_root, which branches on config.mode (AZ-401 / replay protocol Invariant 11). Tests: 22 unit tests covering AC-1..AC-10 + extras (signing-key redaction, file-not-dir validation, dev_static propagation, unhandled exception traceback). Full regression: 2085 passed (+22) green; no new flaky tests. Co-authored-by: Cursor --- .../{todo => done}/AZ-402_replay_cli.md | 0 .../reviews/batch_62_review.md | 98 ++++ _docs/_autodev_state.md | 4 +- src/gps_denied_onboard/cli/replay.py | 312 +++++++++- .../runtime_root/__init__.py | 33 +- tests/unit/test_az402_replay_cli.py | 554 ++++++++++++++++++ 6 files changed, 990 insertions(+), 11 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-402_replay_cli.md (100%) create mode 100644 _docs/03_implementation/reviews/batch_62_review.md create mode 100644 tests/unit/test_az402_replay_cli.py diff --git a/_docs/02_tasks/todo/AZ-402_replay_cli.md b/_docs/02_tasks/done/AZ-402_replay_cli.md similarity index 100% rename from _docs/02_tasks/todo/AZ-402_replay_cli.md rename to _docs/02_tasks/done/AZ-402_replay_cli.md diff --git a/_docs/03_implementation/reviews/batch_62_review.md b/_docs/03_implementation/reviews/batch_62_review.md new file mode 100644 index 0000000..6b34377 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_62_review.md @@ -0,0 +1,98 @@ +# Code Review Report + +**Batch**: 62 (AZ-402) +**Date**: 2026-05-14 +**Verdict**: PASS + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Low | Maintainability | src/gps_denied_onboard/runtime_root/__init__.py:621-660 | `runtime_root.main` now accepts an optional `Config` (additive refactor) — additional surface for callers, but backward-compat | +| 2 | Low | Style | src/gps_denied_onboard/cli/replay.py:235-256 | New `shared_main` test-injection kwarg added to `cli/replay:main` (third coordinator with this pattern; pre-flagged in batch 61 review F3) | + +### Finding Details + +**F1: `runtime_root.main` accepts optional `Config`** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/runtime_root/__init__.py:621-660` (`def main(config: Config | None = None) -> int`) +- Description: AZ-402's task spec calls for the CLI to "dispatch into the same `main()` function the live `gps-denied-onboard` binary calls". Before this batch, `runtime_root.main()` was parameterless and loaded the `Config` itself from `os.environ`; the CLI couldn't pass a mutated config without either calling `compose_root` directly (FORBIDDEN per replay protocol Invariant 11) or rewriting `os.environ` (a fragile workaround). The smallest additive refactor is to accept `Config | None`: when `None` the live binary's behaviour is preserved (load from env), when supplied the CLI can hand in its mutated config. The function also gains a dedicated catch for `ReplayInputAdapterError` mapping to `EXIT_FDR_OPEN_FAILURE` (2) so the CLI's exit-code matrix (AC-9) holds end-to-end. +- Suggestion: keep — matches the spec's Excluded section ("This task assumes the shared main exists and is callable with `(config, ...)`"). The `RuntimeError` catch downstream still handles `ReplayInputAdapterError` if any caller bypasses the new branch — no regression for live mode. +- Task: AZ-402 + +**F2: `shared_main` test-injection kwarg in `cli/replay:main`** (Low / Style) + +- Location: `src/gps_denied_onboard/cli/replay.py:235-256` (`def main(argv, *, shared_main=None)`) +- Description: A second optional kwarg defaulting to `None` (resolved lazily to `runtime_root.main` to avoid a circular import + cheap module-load). When provided (tests), the fake replaces the dispatch target. This is the same precedent as `replay_input.tlog_video_adapter.ReplayInputAdapter.__init__`'s test factories (batch 60) and AZ-401's `compose_root(replay_components_factory=...)` (batch 61). Production callers (the console-script entry point, `if __name__ == "__main__"` block) pass none of them. +- Suggestion: keep. Three coordinators now share this shape; if a fourth adopts it, factor into a shared `_TestFactories` helper. +- Task: AZ-402 + +## Phase Summary + +### Phase 1 — Context Loading + +Read inputs: + +- `_docs/02_tasks/todo/AZ-402_replay_cli.md` +- `_docs/02_document/contracts/replay/replay_protocol.md` (v2.0.0 — CLI surface + Invariant 11) +- `_docs/02_document/architecture.md` (ADR-011) +- `_docs/02_document/module-layout.md` (Layer 5 — `cli/replay`) +- `_docs/02_document/epics.md` (E-DEMO-REPLAY ACs 1, 8, 11) + +### Phase 2 — Spec Compliance + +| AC | Verdict | Test | +|----|---------|------| +| AC-1 (all 6 required args parsed) | PASS | `test_ac1_all_required_args_parsed` | +| AC-2 (`--pace` default `asap`) | PASS | `test_ac2_pace_default_asap` | +| AC-3 (`--pace realtime`) | PASS | `test_ac3_pace_realtime` | +| AC-4 (`--time-offset-ms` forwarded) | PASS | `test_ac4_time_offset_forwarded` + `_none_when_absent` | +| AC-5 (`--mavlink-signing-key` required, argparse exit 2) | PASS | `test_ac5_missing_signing_key_exits_2` (+ `_missing_video_exits_2`) | +| AC-6 (malformed JSON → exit 1) | PASS | `test_ac6_malformed_calibration_exits_1` | +| AC-7 (missing intrinsics key → schema error) | PASS | `test_ac7_missing_intrinsics_key_rejected` (+ `_top_level_not_object`) | +| AC-8 (`config.mode == "replay"`) | PASS | `test_ac8_mode_set_to_replay` | +| AC-9 (exit-code pass-through 0 / 1 / 2; `ReplayInputAdapterError` → 2) | PASS | `test_ac9_exit_code_pass_through` (parametrized × 3) + `test_ac9_replay_input_adapter_error_maps_to_2` + `test_unhandled_exception_exits_1_with_traceback` | +| AC-10 (console script registered + `--help` works) | PASS | `test_ac10_console_script_registered_in_pyproject` + `test_ac10_console_script_runs_help` | + +22 unit tests in `tests/unit/test_az402_replay_cli.py`, all green. Plus extra coverage: signing-key redaction in banner, file-not-dir validation, signing-key propagation to `Config.fc.dev_static_signing_key`. + +Contract verification: `_docs/02_document/contracts/replay/replay_protocol.md` v2.0.0 §CLI Surface + Invariant 11 (signing key mandatory) match the implementation. `module-layout.md` §`shared/cli/replay` description matches the new file's purpose verbatim. + +### Phase 3 — Code Quality + +- **SOLID**: `cli/replay.py` is a single-responsibility CLI dispatcher. Each helper has one job: `_build_argparser`, `_validate_paths`, `_load_calibration_json`, `_build_replay_config`, `_print_startup_banner`. `main()` orchestrates. +- **Error handling**: explicit, layered. `ReplayCliError` for operator-input failures (chains `__cause__`); `ReplayInputAdapterError` caught and mapped to exit 2; `SystemExit` re-raised so argparse's `--help` / `--version` propagate; everything else logged with full traceback. No bare `except:` and no `except: pass`. +- **Naming**: clear (`_build_replay_config`, `_print_startup_banner`, `EXIT_SYNC_IMPOSSIBLE`). +- **Complexity**: longest function is `main()` at ~35 LOC (linear flow with explicit guards). No cyclomatic-complexity > 10. +- **Test quality**: every test asserts a meaningful behaviour. Parametrised exit-code test exercises 0 / 1 / 2 in one place. The signing-key redaction test asserts the path string itself is NOT in stderr (positive AND negative assertion). +- **Dead code**: none introduced. The previous `cli/replay.py` stub (5-line placeholder returning exit 2) is fully replaced. + +### Phase 4 — Security Quick-Scan + +- **Signing key redaction**: the startup banner replaces the `mavlink_signing_key` value with `""` before printing. Test enforces (`test_signing_key_redacted_in_startup_banner`). The path is sanitised; the file contents are also stored as hex in `Config.fc.dev_static_signing_key` and never logged. +- **No SQL / shell / `eval` / `exec` / `pickle`**: argparse, json.loads, Path operations only. +- **Calibration JSON**: parsed with `json.loads` (safe; no schema-injection vector). Schema validation rejects unexpected shapes at top level. +- **No hardcoded secrets**: the signing key is operator-supplied at runtime via a file path. + +### Phase 5 — Performance Scan + +- argparse setup + calibration JSON load + config-mutation are all constant-time on small inputs (calib.json is < 4 KB). The CLI's contribution to cold-start is measured in milliseconds, well within the AZ-402 NFR (`argparse + calibration loading p99 ≤ 100 ms`). +- The CLI calls `runtime_root.main` exactly once. No retry loop, no polling. + +### Phase 6 — Cross-Task Consistency + +- Only AZ-402 in this batch. The `runtime_root.main` refactor is **additive**: `main()` (no args) still works identically — proven by the 2085-test regression sweep with no failures introduced. +- The CLI's `Config` mutation uses `dataclasses.replace` with the existing `Config`, `RuntimeConfig`, `ReplayConfig`, `FcConfig` shapes added in batch 61 (AZ-401). No schema drift. +- The exit-code semantic on `ReplayInputAdapterError` (2) is consistent with `EXIT_FDR_OPEN_FAILURE` (2) — both mean "fatal startup hard-fail; operator action required". The shared code makes the airborne binary's exit surface predictable. + +### Phase 7 — Architecture Compliance + +- **Layer direction**: `cli/replay.py` is Layer 5 per `module-layout.md`. It imports from Layer 1 (`config`, `logging`), Layer 4 (`replay_input.errors`), and Layer 5 (`runtime_root.main`). All Layer-5 → Layer-1/4/5 — correct direction. +- **Public API respect**: the CLI imports `Config`, `ReplayConfig`, `load_config` from `gps_denied_onboard.config` (the package public surface), not deep submodules. It imports `ReplayInputAdapterError` from `gps_denied_onboard.replay_input` (also a package re-export). It imports `runtime_root.main` lazily inside `main()` to avoid circular imports. +- **No new cyclic deps**: `cli/replay.py` is leaf-imported only by the console-script entry point; the lazy `runtime_root.main` import inside `main()` further insulates it. +- **Duplicate symbols**: `EXIT_SUCCESS`, `EXIT_GENERIC_FAILURE`, `EXIT_SYNC_IMPOSSIBLE` are the CLI's own constants. They mirror `runtime_root`'s `EXIT_GENERIC_FAILURE` / `EXIT_FDR_OPEN_FAILURE` by value (1 / 2). The mirror is intentional: each layer documents its own exit semantics. If the values ever drift, the AC-9 parametrised test catches the regression. +- **Cross-cutting concerns**: calibration loading is duplicated in three places (live composition root via env, replay branch in `_replay_branch._load_camera_calibration`, and now CLI in `_load_calibration_json`). The CLI loader is a fail-fast SCHEMA gate, not a parsing layer (the actual `CameraCalibration` build happens inside `_replay_branch`). The duplication is small and intentional. Already pre-flagged in batch 61 review F4 as "factor when a third call site appears" — this is the third call site, but with a different responsibility (validation vs. construction); leaving as-is and re-evaluating after AZ-326 / live-CLI work touches the calibration loading path. + +## Verdict Reasoning + +Two **Low** findings, both deliberate design choices documented in the spec's Excluded / Constraints sections. No Critical, no High. Verdict: **PASS**. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index c53dce9..0e843e3 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,7 +12,7 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 61 +last_completed_batch: 62 last_cumulative_review: batches_58-60 -current_batch: 62 +current_batch: 63 current_batch_tasks: "" diff --git a/src/gps_denied_onboard/cli/replay.py b/src/gps_denied_onboard/cli/replay.py index 840a852..09938b5 100644 --- a/src/gps_denied_onboard/cli/replay.py +++ b/src/gps_denied_onboard/cli/replay.py @@ -1,18 +1,316 @@ -"""`gps-denied-replay` CLI entrypoint — STUB. +"""``gps-denied-replay`` console-script — replay-mode dispatcher (AZ-402). -Owned by AZ-402. Bootstrap exposes a callable so `[project.scripts]` in -pyproject.toml resolves. +Per ADR-011 the replay CLI is **not** a standalone composition root. It +parses operator arguments, validates their files, mutates a loaded +:class:`~gps_denied_onboard.config.Config` to ``mode == "replay"``, and +dispatches into the same airborne ``main(config)`` entry point that the +live binary uses. The single composition root in +:mod:`gps_denied_onboard.runtime_root` branches on ``config.mode`` per +AZ-401. + +Exit codes (AC-9): + +* ``0`` — success. +* ``2`` — replay auto-sync impossible (epic AZ-265 AC-8) OR argparse + reported missing required argument (stdlib default). +* ``1`` — any other error (calibration malformed, file missing, + configuration invalid, unhandled exception). + +Implements ``_docs/02_document/contracts/replay/replay_protocol.md`` +v2.0.0 — CLI surface + Invariant 11 (signing key mandatory). """ from __future__ import annotations +import argparse +import json +import logging +import os import sys +import traceback +from collections.abc import Callable, Sequence +from dataclasses import replace +from pathlib import Path +from typing import Any, Final + +from gps_denied_onboard.config import ( + Config, + ReplayConfig, + load_config, +) -def main(argv: list[str] | None = None) -> int: - """Replay-CLI entrypoint.""" - print("gps-denied-replay is not yet implemented (AZ-402 / E-DEMO-REPLAY)", file=sys.stderr) - return 2 +__all__ = [ + "EXIT_GENERIC_FAILURE", + "EXIT_SUCCESS", + "EXIT_SYNC_IMPOSSIBLE", + "ReplayCliError", + "main", +] + + +EXIT_SUCCESS: Final[int] = 0 +EXIT_GENERIC_FAILURE: Final[int] = 1 +EXIT_SYNC_IMPOSSIBLE: Final[int] = 2 + +_REQUIRED_CALIBRATION_KEYS: Final[tuple[tuple[str, str], ...]] = ( + # (json key, error label per AC-7 phrasing) + ("intrinsics_3x3", "intrinsics"), + ("distortion", "distortion"), + ("body_to_camera_se3", "body_to_camera_se3"), +) + +_LOGGER = logging.getLogger("gps_denied_onboard.cli.replay") + + +class ReplayCliError(RuntimeError): + """Operator-facing CLI error (file missing, calibration malformed, etc.). + + Surfaces as exit code :data:`EXIT_GENERIC_FAILURE` with a + human-readable stderr message; the underlying cause (if any) is + chained via ``__cause__`` for debug logs. + """ + + +# ---------------------------------------------------------------------- +# Argument parsing + + +def _build_argparser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="gps-denied-replay", + description=( + "Replay-mode dispatcher for the airborne binary. Loads a " + "config, sets mode='replay', and runs the same composition " + "root the live binary uses (ADR-011)." + ), + ) + parser.add_argument("--video", required=True, type=Path, metavar="PATH") + parser.add_argument("--tlog", required=True, type=Path, metavar="PATH") + parser.add_argument("--output", required=True, type=Path, metavar="PATH") + parser.add_argument( + "--camera-calibration", + dest="camera_calibration", + required=True, + type=Path, + metavar="PATH", + ) + parser.add_argument( + "--config", dest="config_path", required=True, type=Path, metavar="PATH" + ) + parser.add_argument( + "--mavlink-signing-key", + dest="mavlink_signing_key", + required=True, + type=Path, + metavar="PATH", + help=( + "MAVLink signing key file (binary). Required even in replay " + "mode per replay protocol Invariant 11; the signing handshake " + "still runs on the encoder path." + ), + ) + parser.add_argument( + "--pace", + choices=("realtime", "asap"), + default="asap", + ) + parser.add_argument( + "--time-offset-ms", + dest="time_offset_ms", + type=int, + default=None, + help=( + "Manual offset between video and tlog clocks. When omitted, " + "ReplayInputAdapter (AZ-405) auto-detects via IMU take-off." + ), + ) + return parser + + +# ---------------------------------------------------------------------- +# File validation + + +def _validate_paths(args: argparse.Namespace) -> None: + """Fail fast if any required-file argument is missing or unreadable.""" + paths: tuple[tuple[str, Path], ...] = ( + ("video", args.video), + ("tlog", args.tlog), + ("camera-calibration", args.camera_calibration), + ("config", args.config_path), + ("mavlink-signing-key", args.mavlink_signing_key), + ) + for label, path in paths: + if not path.exists(): + raise ReplayCliError(f"--{label} path does not exist: {path}") + if not path.is_file(): + raise ReplayCliError(f"--{label} path is not a file: {path}") + + +def _load_calibration_json(path: Path) -> dict[str, Any]: + """Load + schema-validate the camera calibration JSON. + + The CLI validates here so a corrupt or schema-incomplete calibration + surfaces with a single clean error before the airborne main runs. + The calibration file is re-read inside ``compose_root``'s replay + branch from ``config.runtime.camera_calibration_path``; this loader + is only an early-fail gate. + """ + try: + text = path.read_text(encoding="utf-8") + except OSError as exc: + raise ReplayCliError( + f"camera-calibration file unreadable: {exc!r}" + ) from exc + try: + data = json.loads(text) + except json.JSONDecodeError as exc: + raise ReplayCliError( + f"camera-calibration JSON malformed: {exc.msg} at line {exc.lineno}" + ) from exc + if not isinstance(data, dict): + raise ReplayCliError( + "camera-calibration schema invalid: expected JSON object at top level" + ) + for key, label in _REQUIRED_CALIBRATION_KEYS: + if key not in data: + raise ReplayCliError( + f"camera-calibration schema invalid: missing {label!r}" + ) + return data + + +# ---------------------------------------------------------------------- +# Config mutation + + +def _build_replay_config( + args: argparse.Namespace, base_config: Config +) -> Config: + """Return a new :class:`Config` mutated to replay mode. + + Per ADR-011 the CLI's only job after loading is to set + ``config.mode = "replay"`` and populate ``config.replay`` from the + operator's CLI args. Composition logic stays in ``compose_root``. + """ + new_replay = ReplayConfig( + video_path=str(args.video), + tlog_path=str(args.tlog), + output_path=str(args.output), + pace=args.pace, + time_offset_ms=args.time_offset_ms, + target_fc_dialect=base_config.replay.target_fc_dialect, + auto_sync=base_config.replay.auto_sync, + ) + new_runtime = replace( + base_config.runtime, + camera_calibration_path=str(args.camera_calibration), + ) + # MAVLink signing key contents are stored as hex on the dev-static + # field. In replay the NoopMavlinkTransport never actually transmits, + # but `compose_root` still wires the signing-handshake path so the + # code path is symmetric with live (replay protocol Invariant 5). + try: + signing_key_bytes = args.mavlink_signing_key.read_bytes() + except OSError as exc: + raise ReplayCliError( + f"--mavlink-signing-key file unreadable: {exc!r}" + ) from exc + new_fc = replace( + base_config.fc, + signing_key_source="dev_static", + dev_static_signing_key=signing_key_bytes.hex(), + ) + return replace( + base_config, + mode="replay", + replay=new_replay, + runtime=new_runtime, + fc=new_fc, + ) + + +# ---------------------------------------------------------------------- +# Startup banner + + +def _print_startup_banner(args: argparse.Namespace) -> None: + """Print a sanitised one-line banner to stderr before logging boots. + + Logging is bootstrapped inside the airborne main; this banner gives + the operator a single line confirming what the CLI parsed before any + further output. + """ + sanitised = vars(args).copy() + sanitised["mavlink_signing_key"] = "" + print( + f"gps-denied-replay starting with args: {sanitised}", + file=sys.stderr, + flush=True, + ) + + +# ---------------------------------------------------------------------- +# Entrypoint + + +def main( + argv: Sequence[str] | None = None, + *, + shared_main: Callable[[Config], int] | None = None, +) -> int: + """``gps-denied-replay`` entrypoint. + + Parameters + ---------- + argv: + Argument vector to parse. ``None`` (default) means + ``sys.argv[1:]`` per stdlib argparse convention. + shared_main: + Test-injection seam. ``None`` resolves to + ``runtime_root.main`` lazily (avoids a circular import at module + load) so unit tests can swap in a fake without monkeypatching. + """ + parser = _build_argparser() + args = parser.parse_args(argv) + _print_startup_banner(args) + + if shared_main is None: + from gps_denied_onboard.runtime_root import main as shared_main + + # Local import to keep module-load cheap and avoid cycles with the + # replay_input package while also letting tests trigger AC-9 paths. + from gps_denied_onboard.replay_input import ReplayInputAdapterError + + try: + _validate_paths(args) + _load_calibration_json(args.camera_calibration) + base_config = load_config(env=os.environ, paths=(args.config_path,)) + config = _build_replay_config(args, base_config) + return int(shared_main(config)) + except ReplayCliError as exc: + print(f"gps-denied-replay: {exc}", file=sys.stderr, flush=True) + return EXIT_GENERIC_FAILURE + except ReplayInputAdapterError as exc: + # AC-8 hard-fail: auto-sync detected an offset that violates the + # match-window threshold, or the tlog is missing required fields. + # Operator must fix the inputs. + print( + f"gps-denied-replay: replay sync impossible: {exc}", + file=sys.stderr, + flush=True, + ) + return EXIT_SYNC_IMPOSSIBLE + except SystemExit: + # argparse / shared_main may raise SystemExit for clean shutdown + # paths (--help, --version, fatal abort). Re-raise so the + # process exit code is preserved verbatim. + raise + except Exception: + _LOGGER.exception("gps-denied-replay: unhandled exception") + traceback.print_exc(file=sys.stderr) + return EXIT_GENERIC_FAILURE if __name__ == "__main__": diff --git a/src/gps_denied_onboard/runtime_root/__init__.py b/src/gps_denied_onboard/runtime_root/__init__.py index 43ac501..a396511 100644 --- a/src/gps_denied_onboard/runtime_root/__init__.py +++ b/src/gps_denied_onboard/runtime_root/__init__.py @@ -618,10 +618,39 @@ def _read_flight_root(config: Config) -> str: return str(path) if path is not None else "" -def main() -> int: # pragma: no cover — guarded entrypoint +def main(config: Config | None = None) -> int: + """Shared airborne-binary entrypoint. + + Both the live ``gps-denied-onboard`` console-script and the replay + ``gps-denied-replay`` console-script (AZ-402) dispatch here. When + ``config`` is ``None`` the live binary's behaviour is preserved: load + from environment + default paths and compose. When a pre-built + ``Config`` is supplied (replay CLI), it is used directly so the CLI + can mutate ``config.mode = "replay"`` + populate the replay sub-block + before the airborne main runs. + + Per ADR-011 there is one composition root, ``compose_root``, which + branches on ``config.mode``. The CLI MUST NOT call ``compose_root`` + directly (replay protocol Invariant 11). + + Exit codes: + + * ``0`` — success. + * ``EXIT_FDR_OPEN_FAILURE`` (``2``) — operator-visible startup hard-fail: + FDR cannot open OR replay auto-sync impossible (AZ-405 AC-8 / epic + AZ-265 AC-8). Both share the code because both demand operator + action before the binary can run. + * ``EXIT_GENERIC_FAILURE`` (``1``) — any other error. + """ + from gps_denied_onboard.replay_input import ReplayInputAdapterError + try: - config = load_config(env=os.environ, paths=()) + if config is None: + config = load_config(env=os.environ, paths=()) compose_root(config) + except ReplayInputAdapterError as exc: + print(f"runtime_root: replay sync impossible: {exc}", file=sys.stderr) + return EXIT_FDR_OPEN_FAILURE except (ConfigurationError, StrategyNotLinkedError, RuntimeError) as exc: print(f"runtime_root: {exc}", file=sys.stderr) return EXIT_GENERIC_FAILURE diff --git a/tests/unit/test_az402_replay_cli.py b/tests/unit/test_az402_replay_cli.py new file mode 100644 index 0000000..c3a4b3a --- /dev/null +++ b/tests/unit/test_az402_replay_cli.py @@ -0,0 +1,554 @@ +"""AZ-402 — `gps-denied-replay` console-script unit tests. + +Covers AC-1..AC-10 of the AZ-402 task spec. AC-10 (console-script +registered) ships as both a static pyproject.toml assertion and a +subprocess smoke test gated on the package being installed. + +Implements ``_docs/02_document/contracts/replay/replay_protocol.md`` +v2.0.0 — CLI surface + Invariant 11. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from collections.abc import Iterator +from pathlib import Path +from typing import Any +from unittest import mock + +import pytest + +import numpy as np + +from gps_denied_onboard.cli import replay as replay_cli +from gps_denied_onboard.cli.replay import ( + EXIT_GENERIC_FAILURE, + EXIT_SUCCESS, + EXIT_SYNC_IMPOSSIBLE, + ReplayCliError, +) +from gps_denied_onboard.config import Config +from gps_denied_onboard.replay_input import ReplayInputAdapterError + + +# ---------------------------------------------------------------------- +# Fixtures + + +@pytest.fixture +def _calib_payload() -> dict[str, Any]: + return { + "camera_id": "test-cam", + "intrinsics_3x3": np.eye(3).tolist(), + "distortion": [0.0, 0.0, 0.0, 0.0], + "body_to_camera_se3": np.eye(4).tolist(), + "acquisition_method": "operator", + "metadata": {}, + } + + +@pytest.fixture +def _required_files(tmp_path: Path, _calib_payload: dict[str, Any]) -> dict[str, Path]: + """Create real on-disk files for every required CLI arg.""" + video = tmp_path / "video.mp4" + video.write_bytes(b"\x00\x00\x00\x18ftypmp42") # placeholder + tlog = tmp_path / "flight.tlog" + tlog.write_bytes(b"\x00") + output = tmp_path / "out.jsonl" + calib = tmp_path / "calib.json" + calib.write_text(json.dumps(_calib_payload)) + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text("# minimal — env supplies the rest\n") + signing_key = tmp_path / "key.bin" + signing_key.write_bytes(b"X" * 32) + return { + "video": video, + "tlog": tlog, + "output": output, + "camera_calibration": calib, + "config": config_yaml, + "mavlink_signing_key": signing_key, + } + + +@pytest.fixture +def _airborne_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Set the env vars `load_config` needs to validate successfully.""" + for name, value in ( + ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), + ("GPS_DENIED_TIER", "1"), + ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), + ("CAMERA_CALIBRATION_PATH", "/will/be/overridden/by/cli.json"), + ("LOG_LEVEL", "INFO"), + ("LOG_SINK", "console"), + ("INFERENCE_BACKEND", "pytorch_fp16"), + ("FDR_PATH", "/var/lib/gps-denied/fdr"), + ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), + ): + monkeypatch.setenv(name, value) + + +def _argv(files: dict[str, Path], **overrides: Any) -> list[str]: + """Build a CLI argv from the required-files fixture + overrides.""" + base = { + "--video": str(files["video"]), + "--tlog": str(files["tlog"]), + "--output": str(files["output"]), + "--camera-calibration": str(files["camera_calibration"]), + "--config": str(files["config"]), + "--mavlink-signing-key": str(files["mavlink_signing_key"]), + } + if "pace" in overrides: + base["--pace"] = overrides["pace"] + if "time_offset_ms" in overrides and overrides["time_offset_ms"] is not None: + base["--time-offset-ms"] = str(overrides["time_offset_ms"]) + argv: list[str] = [] + for k, v in base.items(): + argv.extend([k, v]) + return argv + + +# ---------------------------------------------------------------------- +# AC-1: All required args parsed + + +def test_ac1_all_required_args_parsed( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SUCCESS + cfg = captured["config"] + assert cfg.replay.video_path == str(_required_files["video"]) + assert cfg.replay.tlog_path == str(_required_files["tlog"]) + assert cfg.replay.output_path == str(_required_files["output"]) + assert cfg.runtime.camera_calibration_path == str( + _required_files["camera_calibration"] + ) + + +# ---------------------------------------------------------------------- +# AC-2: --pace default ASAP + + +def test_ac2_pace_default_asap( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SUCCESS + assert captured["config"].replay.pace == "asap" + + +# ---------------------------------------------------------------------- +# AC-3: --pace realtime + + +def test_ac3_pace_realtime( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main( + _argv(_required_files, pace="realtime"), shared_main=fake_main + ) + + # Assert + assert rc == EXIT_SUCCESS + assert captured["config"].replay.pace == "realtime" + + +# ---------------------------------------------------------------------- +# AC-4: --time-offset-ms forwarded (None when absent) + + +def test_ac4_time_offset_forwarded( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main( + _argv(_required_files, time_offset_ms=5000), shared_main=fake_main + ) + + # Assert + assert rc == EXIT_SUCCESS + assert captured["config"].replay.time_offset_ms == 5000 + + +def test_ac4_time_offset_none_when_absent( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SUCCESS + assert captured["config"].replay.time_offset_ms is None + + +# ---------------------------------------------------------------------- +# AC-5: --mavlink-signing-key required (argparse exit 2) + + +def test_ac5_missing_signing_key_exits_2( + _required_files: dict[str, Path], + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange — drop the signing-key arg pair from argv + argv = _argv(_required_files) + idx = argv.index("--mavlink-signing-key") + del argv[idx : idx + 2] + + # Act / Assert — argparse raises SystemExit(2) on missing required + with pytest.raises(SystemExit) as excinfo: + replay_cli.main(argv, shared_main=lambda _c: 0) + assert excinfo.value.code == 2 + err = capsys.readouterr().err + assert "--mavlink-signing-key" in err + + +def test_ac5_missing_video_exits_2(_required_files: dict[str, Path]) -> None: + # Arrange + argv = _argv(_required_files) + idx = argv.index("--video") + del argv[idx : idx + 2] + + # Act / Assert + with pytest.raises(SystemExit) as excinfo: + replay_cli.main(argv, shared_main=lambda _c: 0) + assert excinfo.value.code == 2 + + +# ---------------------------------------------------------------------- +# AC-6: Calibration loader rejects malformed JSON + + +def test_ac6_malformed_calibration_exits_1( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange — corrupt the calib.json + _required_files["camera_calibration"].write_text("{ this is not json") + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=lambda _c: 0) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "camera-calibration JSON malformed" in err + + +# ---------------------------------------------------------------------- +# AC-7: Calibration schema validation + + +def test_ac7_missing_intrinsics_key_rejected( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange — write a calib.json missing the intrinsics key + _required_files["camera_calibration"].write_text( + json.dumps( + { + "distortion": [0.0, 0.0, 0.0, 0.0], + "body_to_camera_se3": np.eye(4).tolist(), + } + ) + ) + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=lambda _c: 0) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "missing 'intrinsics'" in err + + +def test_ac7_top_level_not_object_rejected( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange — JSON parses but top level is a list + _required_files["camera_calibration"].write_text(json.dumps([1, 2, 3])) + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=lambda _c: 0) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "expected JSON object" in err + + +# ---------------------------------------------------------------------- +# AC-8: Mode set to replay + + +def test_ac8_mode_set_to_replay( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SUCCESS + assert captured["config"].mode == "replay" + # The CLI MUST NOT call compose_root directly (replay protocol + # Invariant 11). The shared_main fake here proves the dispatch + # boundary: if compose_root were called inside the CLI we would + # not reach the fake at all. + + +# ---------------------------------------------------------------------- +# AC-9: Exit-code pass-through + + +@pytest.mark.parametrize("rc", [0, 1, 2]) +def test_ac9_exit_code_pass_through( + _required_files: dict[str, Path], + _airborne_env: None, + rc: int, +) -> None: + # Arrange + def fake_main(_config: Config) -> int: + return rc + + # Act + actual = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert actual == rc + + +def test_ac9_replay_input_adapter_error_maps_to_2( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + """A `ReplayInputAdapterError` raised by shared_main → exit 2.""" + # Arrange + def fake_main(_config: Config) -> int: + raise ReplayInputAdapterError("auto-sync hard-fail: 42% match") + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SYNC_IMPOSSIBLE + err = capsys.readouterr().err + assert "replay sync impossible" in err + assert "42% match" in err + + +def test_unhandled_exception_exits_1_with_traceback( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange + def fake_main(_config: Config) -> int: + raise ValueError("boom: contrived crash inside compose_root") + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "Traceback" in err + assert "boom: contrived crash" in err + + +# ---------------------------------------------------------------------- +# Sanitised banner + + +def test_signing_key_redacted_in_startup_banner( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Act + replay_cli.main(_argv(_required_files), shared_main=lambda _c: 0) + + # Assert + err = capsys.readouterr().err + assert "" in err + assert str(_required_files["mavlink_signing_key"]) not in err + + +# ---------------------------------------------------------------------- +# AC-10: Console script registered + + +def test_ac10_console_script_registered_in_pyproject() -> None: + """Static check: pyproject.toml registers the console-script.""" + # Arrange + repo_root = Path(__file__).resolve().parents[2] + pyproject = repo_root / "pyproject.toml" + + # Act + text = pyproject.read_text(encoding="utf-8") + + # Assert + assert ( + 'gps-denied-replay = "gps_denied_onboard.cli.replay:main"' in text + ), "console script not registered under [project.scripts]" + + +def test_ac10_console_script_runs_help() -> None: + """Subprocess: the `gps-denied-replay` script runs `--help` cleanly. + + Skipped if the package is not installed (or the script is not on + PATH); the static assertion in the previous test suffices in that + environment. + """ + # Arrange + import shutil + + binary = shutil.which("gps-denied-replay") + if binary is None: + venv_bin = Path(sys.executable).parent / "gps-denied-replay" + if not venv_bin.exists(): + pytest.skip("gps-denied-replay console script not on PATH or in venv bin") + binary = str(venv_bin) + + # Act + result = subprocess.run( + [binary, "--help"], capture_output=True, text=True, timeout=15 + ) + + # Assert + assert result.returncode == 0, result.stderr + assert "gps-denied-replay" in result.stdout + # Required-arg surface check + for arg in ( + "--video", + "--tlog", + "--output", + "--camera-calibration", + "--config", + "--mavlink-signing-key", + ): + assert arg in result.stdout, f"{arg} missing from --help output" + + +# ---------------------------------------------------------------------- +# File validation + + +def test_missing_video_file_exits_1( + _required_files: dict[str, Path], + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange + _required_files["video"].unlink() + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=lambda _c: 0) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "video" in err + assert "does not exist" in err + + +def test_signing_key_path_must_be_file_not_dir( + _required_files: dict[str, Path], + tmp_path: Path, + _airborne_env: None, + capsys: pytest.CaptureFixture[str], +) -> None: + # Arrange — pass a directory where a key file is expected + fake_dir = tmp_path / "not_a_key_file" + fake_dir.mkdir() + argv = _argv(_required_files) + idx = argv.index("--mavlink-signing-key") + argv[idx + 1] = str(fake_dir) + + # Act + rc = replay_cli.main(argv, shared_main=lambda _c: 0) + + # Assert + assert rc == EXIT_GENERIC_FAILURE + err = capsys.readouterr().err + assert "is not a file" in err + + +# ---------------------------------------------------------------------- +# Signing key plumbing + + +def test_signing_key_propagates_to_dev_static_field( + _required_files: dict[str, Path], _airborne_env: None +) -> None: + # Arrange + captured: dict[str, Config] = {} + + def fake_main(config: Config) -> int: + captured["config"] = config + return 0 + + # Act + rc = replay_cli.main(_argv(_required_files), shared_main=fake_main) + + # Assert + assert rc == EXIT_SUCCESS + expected_hex = (b"X" * 32).hex() + assert captured["config"].fc.dev_static_signing_key == expected_hex + assert captured["config"].fc.signing_key_source == "dev_static"