"""Shared FDR-replay helpers consumed by 13 scenario files (AZ-597). Closes the last gap in the offline FDR-replay path opened by AZ-595 (`sitl_observer`) and AZ-596 (`fc_proxy_runtime`): a small grab-bag of per-scenario stubs that all reduce to "I'm in replay mode, so the frame / IMU / single-image push is a no-op or a JSON read". In replay mode, frames decoded by `FrameSourceReplayer` aren't actually driving anything — the SUT's FDR archive already encodes what happened when the fixture was built. The same applies to IMU samples emitted to the FC inbound. So scenarios just need: * a `FrameSink` that counts but discards bytes, * an `FcInboundEmitter` that counts but discards samples, * a default frame-period for window-arithmetic helpers that ask for it, * generic `${E2E_SITL_REPLAY_DIR}` JSON / sub-directory loaders for the scenario-specific fixtures (per-frame GT, single-image observation, outage frames directory). The 13 scenarios that previously carried local `_resolve_*` / `_drive_*` / `_push_*` `NotImplementedError` stubs now import these helpers directly. Public-boundary discipline: stdlib only. """ from __future__ import annotations import json import os from pathlib import Path _ENV_VAR = "E2E_SITL_REPLAY_DIR" DEFAULT_FRAME_PERIOD_MS = 33 """30 fps default frame period — matches Derkachi MP4 + outlier injector cadence.""" class NullFrameSink: """`FrameSink`-compatible sink that counts but discards bytes. In FDR-replay mode the SUT's FDR archive already encodes the per-frame result; the sink only needs to drain the replayer's `write_frame` calls without storing anything. The counter is surfaced for diagnostic asserts (e.g. "did we see 60 frames?"). """ def __init__(self) -> None: self.frames_written: int = 0 def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: self.frames_written += 1 class NullFcInboundEmitter: """`FcInboundEmitter`-compatible emitter that counts but discards samples. Same rationale as `NullFrameSink` — the FDR archive already encodes the IMU-driven FC state; the emitter only needs to drain `ImuReplayer.replay`'s `emit` calls. """ def __init__(self) -> None: self.samples_emitted: int = 0 def emit(self, sample: object) -> None: self.samples_emitted += 1 def default_frame_period_ms() -> int: """Default per-frame period in ms (30 fps). Scenarios that need a frame-period for window arithmetic (FT-N-03, FT-N-04) call this when the fixture builder hasn't supplied an explicit override. The constant matches the Derkachi MP4 native cadence and the AZ-408 outlier injector's frame stride. """ return DEFAULT_FRAME_PERIOD_MS def imu_replay_noop(csv_path: Path) -> None: """No-op IMU-replay driver for FDR-replay mode. The IMU samples are pre-baked into the FDR archive by the fixture builder, so the runtime driver has nothing to do. `csv_path` is accepted (and ignored) so the call-site signature matches the live-mode `imu_replay.ImuReplayer.replay(csv_path)` for the day a live-mode driver lands. """ return None def load_replay_json(filename: str) -> dict | list: """Load `${E2E_SITL_REPLAY_DIR}/` and return parsed JSON. Raises `FileNotFoundError` when the env var is unset OR the file is missing; `ValueError` (with the file path) when the JSON is malformed. """ root = _resolve_replay_root_or_raise(reason=f"load_replay_json({filename!r})") path = root / filename if not path.is_file(): raise FileNotFoundError( f"replay fixture {filename!r} not found at {path}" ) try: return json.loads(path.read_text()) except json.JSONDecodeError as exc: raise ValueError( f"malformed replay fixture JSON at {path}: {exc.msg}" ) from exc def resolve_replay_subdir(name: str) -> Path: """Resolve `${E2E_SITL_REPLAY_DIR}//` and verify it exists. Raises `FileNotFoundError` when the env var is unset OR the subdirectory is missing. """ root = _resolve_replay_root_or_raise(reason=f"resolve_replay_subdir({name!r})") path = root / name if not path.is_dir(): raise FileNotFoundError( f"replay fixture subdir {name!r} not found at {path}" ) return path def _resolve_replay_root_or_raise(*, reason: str) -> Path: raw = os.environ.get(_ENV_VAR, "").strip() if not raw: raise FileNotFoundError( f"{reason}: ${_ENV_VAR} not set or empty — scenario should " "have skipped via `sitl_replay_ready` (AZ-595) before reaching here" ) return Path(raw)