Files
gps-denied-onboard/e2e/runner/helpers/fc_proxy_runtime.py
T
Oleksandr Bezdieniezhnykh 6554d568f1 [AZ-596] Batch 76: fc_proxy_runtime driver (FDR-replay mode)
Add `runner/helpers/fc_proxy_runtime.py` wrapping the existing
`BlackoutSpoofProxy` (AZ-406) with a scenario-facing `drive_fc_proxy`
entry point. FDR-replay mode only: loads `schedule.json`, optionally
activates the proxy against a caller clock for alignment verification,
and writes a `proxy_drive_report.json` audit record into
`${E2E_SITL_REPLAY_DIR}` for downstream evaluators.

Replaces the local `_drive_fc_proxy` stub in FT-N-04. Adds 3
@property accessors on `BlackoutSpoofProxy` so the wrapper does not
reach into private attributes. +11 unit tests (608 total, up from
596). Live-mode router wiring remains out of scope (future ticket).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 09:08:48 +03:00

120 lines
4.3 KiB
Python

"""FC-inbound proxy runtime driver — wraps `BlackoutSpoofProxy` for scenarios.
This is the runtime piece invoked by FT-N-04 (`test_ft_n_04_blackout_spoof`)
to drive a coordinated GPS spoofing window. The schedule itself is owned
by `fixtures/injectors/blackout_spoof.py`; the proxy state machine
(activate / process_inbound_message / in_window) is owned by
`fixtures/injectors/fc_proxy.BlackoutSpoofProxy`. What was missing — and
what this module adds — is the scenario-facing entry point that:
1. Loads the schedule from disk via `BlackoutSpoofProxy.from_schedule_file`.
2. Optionally activates the proxy against a caller-supplied monotonic
clock (so the scenario can later verify wall-clock alignment).
3. Writes a small `proxy_drive_report.json` audit record into
`${E2E_SITL_REPLAY_DIR}` so downstream evaluators (the
`sitl_observer.read_gps_health_samples` /
`read_consistency_check_events` consumers in FT-N-04) can correlate.
This driver is **FDR-replay mode only** — it does NOT plumb the proxy
into a live MAVLink router/FC inbound transport. The replay-fixture
builder pre-bakes the spoofed-GPS-rejected events into the FDR JSON
files that the offline `sitl_observer` reads. Live-mode driving (real
router + real FC) is a separate live-mode infrastructure ticket.
Public-boundary discipline: imports only stdlib +
`fixtures.injectors.fc_proxy` (an existing test-side module). Zero
`from gps_denied_onboard ...` imports.
"""
from __future__ import annotations
import json
import os
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Callable
from fixtures.injectors.fc_proxy import BlackoutSpoofProxy
NowMsProvider = Callable[[], int]
_ENV_VAR = "E2E_SITL_REPLAY_DIR"
_REPORT_FILENAME = "proxy_drive_report.json"
@dataclass(frozen=True)
class ProxyDriveReport:
"""Audit record of one `drive_fc_proxy` invocation.
Persisted as `proxy_drive_report.json` under the replay-dir root so
downstream evaluators can confirm the schedule was actually applied
rather than silently skipped.
"""
schedule_path: str
window_start_ms: int
window_end_ms: int
spoof_frame_count: int
alignment_err_ms: int
was_replay_mode: bool
def drive_fc_proxy(
schedule_path: Path,
*,
now_ms_provider: NowMsProvider | None = None,
replay_dir: Path | None = None,
) -> ProxyDriveReport:
"""Drive the FC-inbound spoof proxy from `schedule_path`.
`schedule_path` — JSON written by `blackout_spoof.materialize`.
`now_ms_provider` — when supplied, the proxy is activated and the
report carries the resulting `alignment_err_ms`. When None,
the driver runs in replay mode and reports `alignment_err_ms=0`.
`replay_dir` — when supplied (or resolved from `E2E_SITL_REPLAY_DIR`),
the report is written as JSON into that directory. When both
are absent, no file is written.
Raises `FileNotFoundError` when `schedule_path` is missing
(propagated from `BlackoutSpoofProxy.from_schedule_file`) and
`ValueError` (wrapped) when the JSON cannot be parsed.
"""
try:
proxy = BlackoutSpoofProxy.from_schedule_file(schedule_path)
except json.JSONDecodeError as exc:
raise ValueError(
f"malformed schedule JSON at {schedule_path}: {exc.msg}"
) from exc
if now_ms_provider is not None:
activation = proxy.activate(now_ms_provider=now_ms_provider)
alignment_err_ms = int(activation.alignment_err_ms)
was_replay_mode = False
else:
alignment_err_ms = 0
was_replay_mode = True
report = ProxyDriveReport(
schedule_path=str(schedule_path),
window_start_ms=proxy.window_start_ms,
window_end_ms=proxy.window_end_ms,
spoof_frame_count=proxy.spoof_frame_count,
alignment_err_ms=alignment_err_ms,
was_replay_mode=was_replay_mode,
)
target_dir = replay_dir if replay_dir is not None else _resolve_replay_dir()
if target_dir is not None:
target_dir.mkdir(parents=True, exist_ok=True)
(target_dir / _REPORT_FILENAME).write_text(json.dumps(asdict(report)))
return report
def _resolve_replay_dir() -> Path | None:
"""Resolve `E2E_SITL_REPLAY_DIR`. Returns None when unset or empty."""
raw = os.environ.get(_ENV_VAR, "").strip()
if not raw:
return None
return Path(raw)