"""``gps-denied-replay`` console-script — replay-mode dispatcher (AZ-402). Per ADR-011 the replay CLI is **not** a standalone composition root. It parses operator arguments, validates their files, mutates a loaded :class:`~gps_denied_onboard.config.Config` to ``mode == "replay"``, and dispatches into the same airborne ``main(config)`` entry point that the live binary uses. The single composition root in :mod:`gps_denied_onboard.runtime_root` branches on ``config.mode`` per AZ-401. Exit codes (AC-9): * ``0`` — success. * ``2`` — replay auto-sync impossible (epic AZ-265 AC-8) OR argparse reported missing required argument (stdlib default). * ``1`` — any other error (calibration malformed, file missing, configuration invalid, unhandled exception). Implements ``_docs/02_document/contracts/replay/replay_protocol.md`` v2.0.0 — CLI surface + Invariant 11 (signing key mandatory). """ from __future__ import annotations import argparse import json import logging import os import sys import traceback from collections.abc import Callable, Sequence from dataclasses import replace from pathlib import Path from typing import Any, Final from gps_denied_onboard.config import ( Config, ReplayConfig, load_config, ) __all__ = [ "EXIT_GENERIC_FAILURE", "EXIT_SUCCESS", "EXIT_SYNC_IMPOSSIBLE", "ReplayCliError", "main", ] EXIT_SUCCESS: Final[int] = 0 EXIT_GENERIC_FAILURE: Final[int] = 1 EXIT_SYNC_IMPOSSIBLE: Final[int] = 2 _REQUIRED_CALIBRATION_KEYS: Final[tuple[tuple[str, str], ...]] = ( # (json key, error label per AC-7 phrasing) ("intrinsics_3x3", "intrinsics"), ("distortion", "distortion"), ("body_to_camera_se3", "body_to_camera_se3"), ) _LOGGER = logging.getLogger("gps_denied_onboard.cli.replay") class ReplayCliError(RuntimeError): """Operator-facing CLI error (file missing, calibration malformed, etc.). Surfaces as exit code :data:`EXIT_GENERIC_FAILURE` with a human-readable stderr message; the underlying cause (if any) is chained via ``__cause__`` for debug logs. """ # ---------------------------------------------------------------------- # Argument parsing def _build_argparser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="gps-denied-replay", description=( "Replay-mode dispatcher for the airborne binary. Loads a " "config, sets mode='replay', and runs the same composition " "root the live binary uses (ADR-011)." ), ) parser.add_argument("--video", required=True, type=Path, metavar="PATH") parser.add_argument("--tlog", required=True, type=Path, metavar="PATH") parser.add_argument("--output", required=True, type=Path, metavar="PATH") parser.add_argument( "--camera-calibration", dest="camera_calibration", required=True, type=Path, metavar="PATH", ) parser.add_argument( "--config", dest="config_path", required=True, type=Path, metavar="PATH" ) parser.add_argument( "--mavlink-signing-key", dest="mavlink_signing_key", required=True, type=Path, metavar="PATH", help=( "MAVLink signing key file (binary). Required even in replay " "mode per replay protocol Invariant 11; the signing handshake " "still runs on the encoder path." ), ) parser.add_argument( "--pace", choices=("realtime", "asap"), default="asap", ) parser.add_argument( "--time-offset-ms", dest="time_offset_ms", type=int, default=None, help=( "Manual offset between video and tlog clocks. When omitted, " "ReplayInputAdapter (AZ-405) auto-detects via IMU take-off." ), ) return parser # ---------------------------------------------------------------------- # File validation def _validate_paths(args: argparse.Namespace) -> None: """Fail fast if any required-file argument is missing or unreadable.""" paths: tuple[tuple[str, Path], ...] = ( ("video", args.video), ("tlog", args.tlog), ("camera-calibration", args.camera_calibration), ("config", args.config_path), ("mavlink-signing-key", args.mavlink_signing_key), ) for label, path in paths: if not path.exists(): raise ReplayCliError(f"--{label} path does not exist: {path}") if not path.is_file(): raise ReplayCliError(f"--{label} path is not a file: {path}") def _load_calibration_json(path: Path) -> dict[str, Any]: """Load + schema-validate the camera calibration JSON. The CLI validates here so a corrupt or schema-incomplete calibration surfaces with a single clean error before the airborne main runs. The calibration file is re-read inside ``compose_root``'s replay branch from ``config.runtime.camera_calibration_path``; this loader is only an early-fail gate. """ try: text = path.read_text(encoding="utf-8") except OSError as exc: raise ReplayCliError( f"camera-calibration file unreadable: {exc!r}" ) from exc try: data = json.loads(text) except json.JSONDecodeError as exc: raise ReplayCliError( f"camera-calibration JSON malformed: {exc.msg} at line {exc.lineno}" ) from exc if not isinstance(data, dict): raise ReplayCliError( "camera-calibration schema invalid: expected JSON object at top level" ) for key, label in _REQUIRED_CALIBRATION_KEYS: if key not in data: raise ReplayCliError( f"camera-calibration schema invalid: missing {label!r}" ) return data # ---------------------------------------------------------------------- # Config mutation def _build_replay_config( args: argparse.Namespace, base_config: Config ) -> Config: """Return a new :class:`Config` mutated to replay mode. Per ADR-011 the CLI's only job after loading is to set ``config.mode = "replay"`` and populate ``config.replay`` from the operator's CLI args. Composition logic stays in ``compose_root``. """ new_replay = ReplayConfig( video_path=str(args.video), tlog_path=str(args.tlog), output_path=str(args.output), pace=args.pace, time_offset_ms=args.time_offset_ms, target_fc_dialect=base_config.replay.target_fc_dialect, auto_sync=base_config.replay.auto_sync, ) new_runtime = replace( base_config.runtime, camera_calibration_path=str(args.camera_calibration), ) # MAVLink signing key contents are stored as hex on the dev-static # field. In replay the NoopMavlinkTransport never actually transmits, # but `compose_root` still wires the signing-handshake path so the # code path is symmetric with live (replay protocol Invariant 5). try: signing_key_bytes = args.mavlink_signing_key.read_bytes() except OSError as exc: raise ReplayCliError( f"--mavlink-signing-key file unreadable: {exc!r}" ) from exc new_fc = replace( base_config.fc, signing_key_source="dev_static", dev_static_signing_key=signing_key_bytes.hex(), ) return replace( base_config, mode="replay", replay=new_replay, runtime=new_runtime, fc=new_fc, ) # ---------------------------------------------------------------------- # Startup banner def _print_startup_banner(args: argparse.Namespace) -> None: """Print a sanitised one-line banner to stderr before logging boots. Logging is bootstrapped inside the airborne main; this banner gives the operator a single line confirming what the CLI parsed before any further output. """ sanitised = vars(args).copy() sanitised["mavlink_signing_key"] = "" print( f"gps-denied-replay starting with args: {sanitised}", file=sys.stderr, flush=True, ) # ---------------------------------------------------------------------- # Entrypoint def main( argv: Sequence[str] | None = None, *, shared_main: Callable[[Config], int] | None = None, ) -> int: """``gps-denied-replay`` entrypoint. Parameters ---------- argv: Argument vector to parse. ``None`` (default) means ``sys.argv[1:]`` per stdlib argparse convention. shared_main: Test-injection seam. ``None`` resolves to ``runtime_root.main`` lazily (avoids a circular import at module load) so unit tests can swap in a fake without monkeypatching. """ parser = _build_argparser() args = parser.parse_args(argv) _print_startup_banner(args) if shared_main is None: from gps_denied_onboard.runtime_root import main as shared_main # Local import to keep module-load cheap and avoid cycles with the # replay_input package while also letting tests trigger AC-9 paths. from gps_denied_onboard.replay_input import ReplayInputAdapterError try: _validate_paths(args) _load_calibration_json(args.camera_calibration) base_config = load_config(env=os.environ, paths=(args.config_path,)) config = _build_replay_config(args, base_config) return int(shared_main(config)) except ReplayCliError as exc: print(f"gps-denied-replay: {exc}", file=sys.stderr, flush=True) return EXIT_GENERIC_FAILURE except ReplayInputAdapterError as exc: # AC-8 hard-fail: auto-sync detected an offset that violates the # match-window threshold, or the tlog is missing required fields. # Operator must fix the inputs. print( f"gps-denied-replay: replay sync impossible: {exc}", file=sys.stderr, flush=True, ) return EXIT_SYNC_IMPOSSIBLE except SystemExit: # argparse / shared_main may raise SystemExit for clean shutdown # paths (--help, --version, fatal abort). Re-raise so the # process exit code is preserved verbatim. raise except Exception: _LOGGER.exception("gps-denied-replay: unhandled exception") traceback.print_exc(file=sys.stderr) return EXIT_GENERIC_FAILURE if __name__ == "__main__": raise SystemExit(main())