"""FC-inbound proxy runtime driver — wraps `BlackoutSpoofProxy` for scenarios. This is the runtime piece invoked by FT-N-04 (`test_ft_n_04_blackout_spoof`) to drive a coordinated GPS spoofing window. The schedule itself is owned by `fixtures/injectors/blackout_spoof.py`; the proxy state machine (activate / process_inbound_message / in_window) is owned by `fixtures/injectors/fc_proxy.BlackoutSpoofProxy`. What was missing — and what this module adds — is the scenario-facing entry point that: 1. Loads the schedule from disk via `BlackoutSpoofProxy.from_schedule_file`. 2. Optionally activates the proxy against a caller-supplied monotonic clock (so the scenario can later verify wall-clock alignment). 3. Writes a small `proxy_drive_report.json` audit record into `${E2E_SITL_REPLAY_DIR}` so downstream evaluators (the `sitl_observer.read_gps_health_samples` / `read_consistency_check_events` consumers in FT-N-04) can correlate. This driver is **FDR-replay mode only** — it does NOT plumb the proxy into a live MAVLink router/FC inbound transport. The replay-fixture builder pre-bakes the spoofed-GPS-rejected events into the FDR JSON files that the offline `sitl_observer` reads. Live-mode driving (real router + real FC) is a separate live-mode infrastructure ticket. Public-boundary discipline: imports only stdlib + `fixtures.injectors.fc_proxy` (an existing test-side module). Zero `from gps_denied_onboard ...` imports. """ from __future__ import annotations import json import os from dataclasses import asdict, dataclass from pathlib import Path from typing import Callable from fixtures.injectors.fc_proxy import BlackoutSpoofProxy NowMsProvider = Callable[[], int] _ENV_VAR = "E2E_SITL_REPLAY_DIR" _REPORT_FILENAME = "proxy_drive_report.json" @dataclass(frozen=True) class ProxyDriveReport: """Audit record of one `drive_fc_proxy` invocation. Persisted as `proxy_drive_report.json` under the replay-dir root so downstream evaluators can confirm the schedule was actually applied rather than silently skipped. """ schedule_path: str window_start_ms: int window_end_ms: int spoof_frame_count: int alignment_err_ms: int was_replay_mode: bool def drive_fc_proxy( schedule_path: Path, *, now_ms_provider: NowMsProvider | None = None, replay_dir: Path | None = None, ) -> ProxyDriveReport: """Drive the FC-inbound spoof proxy from `schedule_path`. `schedule_path` — JSON written by `blackout_spoof.materialize`. `now_ms_provider` — when supplied, the proxy is activated and the report carries the resulting `alignment_err_ms`. When None, the driver runs in replay mode and reports `alignment_err_ms=0`. `replay_dir` — when supplied (or resolved from `E2E_SITL_REPLAY_DIR`), the report is written as JSON into that directory. When both are absent, no file is written. Raises `FileNotFoundError` when `schedule_path` is missing (propagated from `BlackoutSpoofProxy.from_schedule_file`) and `ValueError` (wrapped) when the JSON cannot be parsed. """ try: proxy = BlackoutSpoofProxy.from_schedule_file(schedule_path) except json.JSONDecodeError as exc: raise ValueError( f"malformed schedule JSON at {schedule_path}: {exc.msg}" ) from exc if now_ms_provider is not None: activation = proxy.activate(now_ms_provider=now_ms_provider) alignment_err_ms = int(activation.alignment_err_ms) was_replay_mode = False else: alignment_err_ms = 0 was_replay_mode = True report = ProxyDriveReport( schedule_path=str(schedule_path), window_start_ms=proxy.window_start_ms, window_end_ms=proxy.window_end_ms, spoof_frame_count=proxy.spoof_frame_count, alignment_err_ms=alignment_err_ms, was_replay_mode=was_replay_mode, ) target_dir = replay_dir if replay_dir is not None else _resolve_replay_dir() if target_dir is not None: target_dir.mkdir(parents=True, exist_ok=True) (target_dir / _REPORT_FILENAME).write_text(json.dumps(asdict(report))) return report def _resolve_replay_dir() -> Path | None: """Resolve `E2E_SITL_REPLAY_DIR`. Returns None when unset or empty.""" raw = os.environ.get(_ENV_VAR, "").strip() if not raw: return None return Path(raw)