Files
Oleksandr Bezdieniezhnykh 1d260f7e41 [AZ-594] Implement core-three harness stubs (fdr_reader, frame_source_replay, imu_replay)
Replaces the NotImplementedError stubs AZ-406 reserved on three runner-
side helpers; these were stranded from any tracker ticket since
AZ-407/408 never came back to fill them. Concrete bodies:

* fdr_reader.iter_records: JSONL parser + wire-envelope validator;
  recursive *.jsonl walk; projects {schema_version, ts, producer_id,
  kind, payload} to runner-side FdrRecord with record_type/monotonic_ms
  renames; yields oldest-first.
* frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG
  re-encode; auto-detects file vs directory; injectable sleep_fn for
  unit-test pacing.
* imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians
  attitude conversion; tolerates scientific notation; same sleep_fn
  injection pattern.

Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31).
Existing scenario _harness_helpers_implemented probes still return False
because they also depend on sitl_observer / fc_proxy_runtime stubs that
remain pending; scenario probe cleanup is out of AZ-594 scope.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 08:42:12 +03:00

193 lines
5.0 KiB
Python

"""Unit tests for `e2e/runner/helpers/imu_replay.py` (AZ-594 AC-3)."""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from pathlib import Path
import pytest
from e2e.runner.helpers.imu_replay import FcInboundEmitter, ImuReplayer, ImuSample
@dataclass
class _RecordingEmitter:
"""In-memory FcInboundEmitter that captures every sample."""
samples: list[ImuSample] = field(default_factory=list)
def emit(self, sample: ImuSample) -> None:
self.samples.append(sample)
@dataclass
class _RecordingSleep:
sleeps: list[float] = field(default_factory=list)
def __call__(self, duration_s: float) -> None:
self.sleeps.append(duration_s)
_HEADER = "timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg,baro_m\n"
def _write_csv(path: Path, rows: list[str], *, header: str = _HEADER) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as fh:
fh.write(header)
for r in rows:
fh.write(r + "\n")
def test_missing_csv_raises_file_not_found(tmp_path: Path):
# Arrange
emitter = _RecordingEmitter()
replayer = ImuReplayer(emitter, realtime=False)
# Assert
with pytest.raises(FileNotFoundError, match="IMU CSV not found"):
replayer.replay(tmp_path / "nope.csv")
def test_rate_hz_must_be_positive():
# Assert
with pytest.raises(ValueError, match="rate_hz must be positive"):
ImuReplayer(_RecordingEmitter(), rate_hz=0)
def test_missing_required_columns_raises(tmp_path: Path):
# Arrange — missing `baro_m`.
p = tmp_path / "imu.csv"
_write_csv(
p,
["100,0,0,9.8,0,0,0,0,0,0"],
header="timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg\n",
)
# Assert
with pytest.raises(ValueError, match="missing required columns"):
ImuReplayer(_RecordingEmitter(), realtime=False).replay(p)
def test_row_with_unparseable_value_raises(tmp_path: Path):
# Arrange
p = tmp_path / "imu.csv"
_write_csv(p, ["100,0,0,not-a-float,0,0,0,0,0,0,300"])
# Assert
with pytest.raises(ValueError, match="IMU CSV row malformed"):
ImuReplayer(_RecordingEmitter(), realtime=False).replay(p)
def test_happy_path_emits_all_rows(tmp_path: Path):
# Arrange
p = tmp_path / "imu.csv"
_write_csv(
p,
[
"100,0.0,0.0,9.8,0.0,0.0,0.0,0.0,0.0,0.0,300.0",
"200,0.1,0.0,9.7,0.0,0.0,0.0,0.0,0.0,0.0,300.5",
"300,0.0,0.2,9.6,0.0,0.0,0.0,0.0,0.0,0.0,301.0",
],
)
emitter = _RecordingEmitter()
# Act
emitted = ImuReplayer(emitter, realtime=False).replay(p)
# Assert
assert emitted == 3
assert len(emitter.samples) == 3
assert emitter.samples[0].timestamp_ms == 100
assert emitter.samples[1].accel_mss == (0.1, 0.0, 9.7)
def test_scientific_notation_parses(tmp_path: Path):
# Arrange — AZ-408 fixture style float fields.
p = tmp_path / "imu.csv"
_write_csv(
p,
[
"100,-4.44E-16,1.23e-3,9.81,-1e-5,2e-7,0.0,1.0,2.0,3.0,300.0",
],
)
emitter = _RecordingEmitter()
# Act
ImuReplayer(emitter, realtime=False).replay(p)
# Assert
s = emitter.samples[0]
assert s.accel_mss[0] == pytest.approx(-4.44e-16)
assert s.accel_mss[1] == pytest.approx(1.23e-3)
assert s.accel_mss[2] == pytest.approx(9.81)
assert s.gyro_rps == (-1e-5, 2e-7, 0.0)
def test_attitude_radians_converted(tmp_path: Path):
# Arrange
p = tmp_path / "imu.csv"
_write_csv(
p,
["100,0,0,9.8,0,0,0,90,180,270,300"],
)
emitter = _RecordingEmitter()
# Act
ImuReplayer(emitter, realtime=False).replay(p)
# Assert
roll, pitch, yaw = emitter.samples[0].attitude_rad
assert roll == pytest.approx(math.pi / 2)
assert pitch == pytest.approx(math.pi)
assert yaw == pytest.approx(3 * math.pi / 2)
def test_realtime_sleeps_per_sample(tmp_path: Path):
# Arrange — 3 rows at 10 Hz → 3 sleeps of 0.1 s.
p = tmp_path / "imu.csv"
_write_csv(
p,
[
"100,0,0,9.8,0,0,0,0,0,0,300",
"200,0,0,9.8,0,0,0,0,0,0,300",
"300,0,0,9.8,0,0,0,0,0,0,300",
],
)
emitter = _RecordingEmitter()
sleep = _RecordingSleep()
# Act
ImuReplayer(emitter, rate_hz=10.0, realtime=True, sleep_fn=sleep).replay(p)
# Assert
assert sleep.sleeps == pytest.approx([0.1, 0.1, 0.1])
def test_non_realtime_does_not_sleep(tmp_path: Path):
# Arrange
p = tmp_path / "imu.csv"
_write_csv(p, ["100,0,0,9.8,0,0,0,0,0,0,300"])
sleep = _RecordingSleep()
# Act
ImuReplayer(_RecordingEmitter(), realtime=False, sleep_fn=sleep).replay(p)
# Assert
assert sleep.sleeps == []
def test_empty_csv_emits_nothing(tmp_path: Path):
# Arrange — header only.
p = tmp_path / "imu.csv"
_write_csv(p, [])
emitter = _RecordingEmitter()
# Act
emitted = ImuReplayer(emitter, realtime=False).replay(p)
# Assert
assert emitted == 0
assert emitter.samples == []