Files
Oleksandr Bezdieniezhnykh 1d260f7e41 [AZ-594] Implement core-three harness stubs (fdr_reader, frame_source_replay, imu_replay)
Replaces the NotImplementedError stubs AZ-406 reserved on three runner-
side helpers; these were stranded from any tracker ticket since
AZ-407/408 never came back to fill them. Concrete bodies:

* fdr_reader.iter_records: JSONL parser + wire-envelope validator;
  recursive *.jsonl walk; projects {schema_version, ts, producer_id,
  kind, payload} to runner-side FdrRecord with record_type/monotonic_ms
  renames; yields oldest-first.
* frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG
  re-encode; auto-detects file vs directory; injectable sleep_fn for
  unit-test pacing.
* imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians
  attitude conversion; tolerates scientific notation; same sleep_fn
  injection pattern.

Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31).
Existing scenario _harness_helpers_implemented probes still return False
because they also depend on sitl_observer / fc_proxy_runtime stubs that
remain pending; scenario probe cleanup is out of AZ-594 scope.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 08:42:12 +03:00

217 lines
6.2 KiB
Python

"""Unit tests for `e2e/runner/helpers/frame_source_replay.py` (AZ-594 AC-2)."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
import cv2
import numpy as np
import pytest
from e2e.runner.helpers.frame_source_replay import (
FrameSourceReplayer,
ReplayCadence,
)
@dataclass
class _RecordingSink:
"""In-memory FrameSink that captures every emission for assertions."""
frames: list[tuple[bytes, int]] = field(default_factory=list)
def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None:
self.frames.append((jpeg_bytes, timestamp_ms))
@dataclass
class _RecordingSleep:
"""Captures the durations the replayer was asked to sleep."""
sleeps: list[float] = field(default_factory=list)
def __call__(self, duration_s: float) -> None:
self.sleeps.append(duration_s)
def _write_jpg(path: Path, w: int = 64, h: int = 48, fill: int = 128) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
img = np.full((h, w, 3), fill, dtype=np.uint8)
cv2.imwrite(str(path), img)
def _write_video(path: Path, n_frames: int, fps: float, w: int = 64, h: int = 48) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(path), fourcc, fps, (w, h))
if not writer.isOpened():
pytest.skip(f"OpenCV cannot write mp4v on this platform: {path}")
try:
for i in range(n_frames):
img = np.full((h, w, 3), (i * 5) % 255, dtype=np.uint8)
writer.write(img)
finally:
writer.release()
# replay_image_directory
def test_image_dir_missing_raises_file_not_found(tmp_path: Path):
# Arrange
sink = _RecordingSink()
replayer = FrameSourceReplayer(sink, sleep_fn=lambda _: None)
# Assert
with pytest.raises(FileNotFoundError, match="frame directory not found"):
replayer.replay_image_directory(tmp_path / "nope")
def test_image_dir_empty_returns_zero(tmp_path: Path):
# Arrange
(tmp_path / "frames").mkdir()
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory(
tmp_path / "frames"
)
# Assert
assert emitted == 0
assert sink.frames == []
def test_image_dir_emits_sorted_by_name(tmp_path: Path):
# Arrange — files written out of order; sort must restore numeric order.
frames_dir = tmp_path / "frames"
_write_jpg(frames_dir / "AD000003.jpg", fill=30)
_write_jpg(frames_dir / "AD000001.jpg", fill=10)
_write_jpg(frames_dir / "AD000002.jpg", fill=20)
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(
sink,
cadence=ReplayCadence(fps=10.0, realtime=False),
sleep_fn=lambda _: None,
).replay_image_directory(frames_dir)
# Assert — three frames at 0/100/200 ms.
assert emitted == 3
assert [ts for _, ts in sink.frames] == [0, 100, 200]
assert all(b.startswith(b"\xff\xd8") for b, _ in sink.frames) # JPEG SOI
def test_image_dir_non_jpeg_reencoded(tmp_path: Path):
# Arrange
frames_dir = tmp_path / "frames"
png_path = frames_dir / "AD000001.png"
frames_dir.mkdir()
img = np.full((48, 64, 3), 100, dtype=np.uint8)
cv2.imwrite(str(png_path), img)
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory(frames_dir)
# Assert
assert emitted == 1
assert sink.frames[0][0].startswith(b"\xff\xd8") # JPEG, not PNG
def test_image_dir_skips_non_image_files(tmp_path: Path):
# Arrange
frames_dir = tmp_path / "frames"
_write_jpg(frames_dir / "AD000001.jpg")
(frames_dir / "README.txt").write_text("not an image")
(frames_dir / "manifest.csv").write_text("col1,col2\n1,2\n")
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory(frames_dir)
# Assert
assert emitted == 1
def test_image_dir_non_realtime_does_not_sleep(tmp_path: Path):
# Arrange
frames_dir = tmp_path / "frames"
for i in range(3):
_write_jpg(frames_dir / f"AD{i:06d}.jpg")
sink = _RecordingSink()
sleep = _RecordingSleep()
# Act
FrameSourceReplayer(
sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=sleep
).replay_image_directory(frames_dir)
# Assert
assert sleep.sleeps == []
def test_image_dir_realtime_sleeps_per_frame(tmp_path: Path):
# Arrange
frames_dir = tmp_path / "frames"
for i in range(3):
_write_jpg(frames_dir / f"AD{i:06d}.jpg")
sink = _RecordingSink()
sleep = _RecordingSleep()
# Act
FrameSourceReplayer(
sink, cadence=ReplayCadence(fps=10.0, realtime=True), sleep_fn=sleep
).replay_image_directory(frames_dir)
# Assert — sleeps once per emitted frame at 0.1 s.
assert sleep.sleeps == pytest.approx([0.1, 0.1, 0.1])
# replay_video
def test_video_missing_path_raises_file_not_found(tmp_path: Path):
# Arrange
sink = _RecordingSink()
replayer = FrameSourceReplayer(sink, sleep_fn=lambda _: None)
# Assert
with pytest.raises(FileNotFoundError, match="video path not found"):
replayer.replay_video(tmp_path / "nope.mp4")
def test_video_dir_delegates_to_image_directory(tmp_path: Path):
# Arrange
frames_dir = tmp_path / "frames"
for i in range(2):
_write_jpg(frames_dir / f"AD{i:06d}.jpg")
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(
sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=lambda _: None
).replay_video(frames_dir)
# Assert
assert emitted == 2
def test_video_mp4_round_trip(tmp_path: Path):
# Arrange — write a 5-frame 10 FPS MP4 then replay it.
video_path = tmp_path / "tiny.mp4"
_write_video(video_path, n_frames=5, fps=10.0)
sink = _RecordingSink()
# Act
emitted = FrameSourceReplayer(
sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=lambda _: None
).replay_video(video_path)
# Assert
assert emitted == 5
assert [ts for _, ts in sink.frames] == [0, 100, 200, 300, 400]
assert all(b.startswith(b"\xff\xd8") for b, _ in sink.frames)