"""Replay images / video to the SUT's V4L2 file frame source. Two replay modes: 1. Image-set replay (FT-P-01, FT-P-05) — emit a sequence of JPEG / PNG still images at a configurable rate to the file frame source path the SUT polls. 2. Video replay (FT-P-02, FT-P-04, FT-N-01..04, NFT-PERF-*) — decode an MP4 with OpenCV and emit frames at the encoded FPS (or a user-supplied rate for fast-forward). ``replay_video`` also accepts a directory of extracted frames (`AD000001.jpg`-style) so the AZ-408 injectors that emit frame directories rather than MP4s can use the same surface. The actual frame-source path inside the SUT container is configured via the ``ONBOARD_FRAME_SOURCE_PATH`` environment variable on the SUT — the runner writes to a shared tmpfs volume mounted at the same path inside both containers. """ from __future__ import annotations import time from dataclasses import dataclass from pathlib import Path from typing import Callable, Protocol import cv2 # Image extensions handled by ``replay_image_directory`` / ``replay_video`` # when given a directory rather than a file. Sorted-by-name ordering implies # zero-padded filenames (``AD000001.jpg``) which the AZ-407 / AZ-408 # fixture builders already produce. _IMAGE_EXTENSIONS = (".jpg", ".jpeg", ".png", ".bmp") @dataclass(frozen=True) class ReplayCadence: """Frame-rate / pace configuration for a replay session.""" fps: float = 10.0 realtime: bool = True class FrameSink(Protocol): """Abstract destination for replayed frames (file path or memory queue).""" def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: ... class FrameSourceReplayer: """Public surface for replaying frames into the SUT's frame-source path.""" def __init__( self, sink: FrameSink, cadence: ReplayCadence | None = None, *, sleep_fn: Callable[[float], None] = time.sleep, ) -> None: self._sink = sink self._cadence = cadence or ReplayCadence() # Injected so unit tests can drop wall-clock pacing entirely. self._sleep = sleep_fn def replay_image_directory(self, directory: Path) -> int: """Replay every image in ``directory`` (sorted by name) at the configured cadence. Returns the number of frames emitted to the sink. Raises ``FileNotFoundError`` if ``directory`` does not exist or is not a directory. The sink is invoked with raw JPEG-encoded bytes and a monotonic-ms timestamp that starts at 0 and advances by the period derived from ``self._cadence.fps``. """ if not directory.exists() or not directory.is_dir(): raise FileNotFoundError( f"frame directory not found: {directory}" ) files = sorted( p for p in directory.iterdir() if p.is_file() and p.suffix.lower() in _IMAGE_EXTENSIONS ) return self._emit_files(files) def replay_video(self, video_path: Path) -> int: """Replay frames from ``video_path``. Returns count emitted. Auto-detects: * a regular file (``.mp4`` / ``.avi`` / ``.h264``) — decoded with OpenCV ``VideoCapture`` and re-encoded as JPEG before emission. * a directory — delegates to ``replay_image_directory`` so the AZ-408 injectors that emit frame directories can use this entry point without the caller knowing the difference. Raises ``FileNotFoundError`` on a missing path. """ if not video_path.exists(): raise FileNotFoundError(f"video path not found: {video_path}") if video_path.is_dir(): return self.replay_image_directory(video_path) return self._decode_and_emit_video(video_path) def _decode_and_emit_video(self, video_path: Path) -> int: cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise ValueError(f"OpenCV failed to open video: {video_path}") try: encoded_fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 fps = encoded_fps if (self._cadence.realtime and encoded_fps > 0) else self._cadence.fps period_ms = self._period_ms(fps) emitted = 0 t_ms = 0 while True: ok, frame = cap.read() if not ok: break success, encoded = cv2.imencode(".jpg", frame) if not success: raise ValueError( f"OpenCV failed to JPEG-encode frame {emitted} of {video_path}" ) self._sink.write_frame(encoded.tobytes(), t_ms) emitted += 1 t_ms += period_ms if self._cadence.realtime and period_ms > 0: self._sleep(period_ms / 1000.0) return emitted finally: cap.release() def _emit_files(self, files: list[Path]) -> int: period_ms = self._period_ms(self._cadence.fps) emitted = 0 t_ms = 0 for path in files: jpeg_bytes = path.read_bytes() if path.suffix.lower() != ".jpg" and path.suffix.lower() != ".jpeg": # Re-encode non-JPEG sources so the sink always gets JPEG bytes. img = cv2.imread(str(path), cv2.IMREAD_UNCHANGED) if img is None: raise ValueError(f"OpenCV failed to read image: {path}") success, encoded = cv2.imencode(".jpg", img) if not success: raise ValueError(f"OpenCV failed to JPEG-encode image: {path}") jpeg_bytes = encoded.tobytes() self._sink.write_frame(jpeg_bytes, t_ms) emitted += 1 t_ms += period_ms if self._cadence.realtime and period_ms > 0: self._sleep(period_ms / 1000.0) return emitted @staticmethod def _period_ms(fps: float) -> int: if fps <= 0: return 0 return int(round(1000.0 / fps))