mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:51:12 +00:00
1d260f7e41
Replaces the NotImplementedError stubs AZ-406 reserved on three runner-
side helpers; these were stranded from any tracker ticket since
AZ-407/408 never came back to fill them. Concrete bodies:
* fdr_reader.iter_records: JSONL parser + wire-envelope validator;
recursive *.jsonl walk; projects {schema_version, ts, producer_id,
kind, payload} to runner-side FdrRecord with record_type/monotonic_ms
renames; yields oldest-first.
* frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG
re-encode; auto-detects file vs directory; injectable sleep_fn for
unit-test pacing.
* imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians
attitude conversion; tolerates scientific notation; same sleep_fn
injection pattern.
Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31).
Existing scenario _harness_helpers_implemented probes still return False
because they also depend on sitl_observer / fc_proxy_runtime stubs that
remain pending; scenario probe cleanup is out of AZ-594 scope.
Co-authored-by: Cursor <cursoragent@cursor.com>
215 lines
5.7 KiB
Python
215 lines
5.7 KiB
Python
"""Unit tests for `e2e/runner/helpers/fdr_reader.py` (AZ-594 AC-1)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from e2e.runner.helpers.fdr_reader import FdrRecord, archive_size_bytes, iter_records
|
|
|
|
|
|
def _write_jsonl(path: Path, records: list[dict]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w") as fh:
|
|
for r in records:
|
|
fh.write(json.dumps(r) + "\n")
|
|
|
|
|
|
def _env(ts: str, *, kind: str = "vio.tick", producer_id: str = "c1.vio", payload: dict | None = None) -> dict:
|
|
return {
|
|
"schema_version": 1,
|
|
"ts": ts,
|
|
"producer_id": producer_id,
|
|
"kind": kind,
|
|
"payload": payload if payload is not None else {"frame_id": "f0"},
|
|
}
|
|
|
|
|
|
def test_missing_root_raises_file_not_found(tmp_path: Path):
|
|
# Assert
|
|
with pytest.raises(FileNotFoundError, match="FDR archive root not found"):
|
|
list(iter_records(tmp_path / "nope"))
|
|
|
|
|
|
def test_empty_root_yields_nothing(tmp_path: Path):
|
|
# Arrange
|
|
(tmp_path / "fdr").mkdir()
|
|
|
|
# Act
|
|
records = list(iter_records(tmp_path / "fdr"))
|
|
|
|
# Assert
|
|
assert records == []
|
|
|
|
|
|
def test_single_file_round_trip(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
_write_jsonl(
|
|
root / "segment_001.jsonl",
|
|
[_env("2026-05-17T08:00:00.100Z"), _env("2026-05-17T08:00:00.200Z")],
|
|
)
|
|
|
|
# Act
|
|
records = list(iter_records(root))
|
|
|
|
# Assert
|
|
assert len(records) == 2
|
|
assert all(isinstance(r, FdrRecord) for r in records)
|
|
assert records[0].record_type == "vio.tick"
|
|
assert records[0].producer_id == "c1.vio"
|
|
assert records[1].monotonic_ms - records[0].monotonic_ms == 100
|
|
|
|
|
|
def test_multiple_files_are_merged_and_sorted(tmp_path: Path):
|
|
# Arrange — file B has older records than file A.
|
|
root = tmp_path / "fdr"
|
|
_write_jsonl(
|
|
root / "b_segment.jsonl",
|
|
[_env("2026-05-17T08:00:01.000Z")],
|
|
)
|
|
_write_jsonl(
|
|
root / "a_segment.jsonl",
|
|
[_env("2026-05-17T08:00:00.500Z")],
|
|
)
|
|
|
|
# Act
|
|
records = list(iter_records(root))
|
|
|
|
# Assert — global oldest-first regardless of filename order.
|
|
assert len(records) == 2
|
|
assert records[0].monotonic_ms < records[1].monotonic_ms
|
|
|
|
|
|
def test_blank_lines_are_skipped(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
root.mkdir()
|
|
with (root / "segment_001.jsonl").open("w") as fh:
|
|
fh.write(json.dumps(_env("2026-05-17T08:00:00.100Z")) + "\n")
|
|
fh.write("\n")
|
|
fh.write(" \n")
|
|
fh.write(json.dumps(_env("2026-05-17T08:00:00.200Z")) + "\n")
|
|
|
|
# Act
|
|
records = list(iter_records(root))
|
|
|
|
# Assert
|
|
assert len(records) == 2
|
|
|
|
|
|
def test_missing_envelope_key_raises(tmp_path: Path):
|
|
# Arrange — missing `kind`.
|
|
root = tmp_path / "fdr"
|
|
bad = {
|
|
"schema_version": 1,
|
|
"ts": "2026-05-17T08:00:00.100Z",
|
|
"producer_id": "c1.vio",
|
|
"payload": {},
|
|
}
|
|
_write_jsonl(root / "bad.jsonl", [bad])
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="missing required keys \\['kind'\\]"):
|
|
list(iter_records(root))
|
|
|
|
|
|
def test_non_object_line_raises(tmp_path: Path):
|
|
# Arrange — array at top level.
|
|
root = tmp_path / "fdr"
|
|
root.mkdir()
|
|
with (root / "bad.jsonl").open("w") as fh:
|
|
fh.write("[1, 2, 3]\n")
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="not a JSON object"):
|
|
list(iter_records(root))
|
|
|
|
|
|
def test_malformed_json_raises(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
root.mkdir()
|
|
with (root / "bad.jsonl").open("w") as fh:
|
|
fh.write("{not-json\n")
|
|
|
|
# Act / Assert
|
|
with pytest.raises(json.JSONDecodeError):
|
|
list(iter_records(root))
|
|
|
|
|
|
def test_empty_producer_id_raises(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
bad = _env("2026-05-17T08:00:00.100Z", producer_id="")
|
|
_write_jsonl(root / "bad.jsonl", [bad])
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="producer_id` must be a non-empty"):
|
|
list(iter_records(root))
|
|
|
|
|
|
def test_ts_iso_without_z_parses(tmp_path: Path):
|
|
# Arrange — already in +00:00 form.
|
|
root = tmp_path / "fdr"
|
|
_write_jsonl(root / "a.jsonl", [_env("2026-05-17T08:00:00.250+00:00")])
|
|
|
|
# Act
|
|
records = list(iter_records(root))
|
|
|
|
# Assert
|
|
assert len(records) == 1
|
|
|
|
|
|
def test_payload_passed_through(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
payload = {"frame_idx": 42, "lat_deg": 50.0, "lon_deg": 30.0, "cov_semi_major_m": 5.5}
|
|
_write_jsonl(
|
|
root / "a.jsonl",
|
|
[_env("2026-05-17T08:00:00.100Z", kind="outbound_estimate", payload=payload)],
|
|
)
|
|
|
|
# Act
|
|
[record] = list(iter_records(root))
|
|
|
|
# Assert
|
|
assert record.payload == payload
|
|
assert record.record_type == "outbound_estimate"
|
|
|
|
|
|
def test_archive_size_bytes_sums_all_files(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
_write_jsonl(root / "a.jsonl", [_env("2026-05-17T08:00:00.100Z")])
|
|
_write_jsonl(root / "sub/b.jsonl", [_env("2026-05-17T08:00:00.200Z")])
|
|
|
|
# Act
|
|
total = archive_size_bytes(root)
|
|
|
|
# Assert
|
|
assert total > 0
|
|
a_size = (root / "a.jsonl").stat().st_size
|
|
b_size = (root / "sub/b.jsonl").stat().st_size
|
|
assert total == a_size + b_size
|
|
|
|
|
|
def test_archive_size_bytes_missing_root_returns_zero(tmp_path: Path):
|
|
# Assert
|
|
assert archive_size_bytes(tmp_path / "nope") == 0
|
|
|
|
|
|
def test_subdirectory_files_included(tmp_path: Path):
|
|
# Arrange
|
|
root = tmp_path / "fdr"
|
|
_write_jsonl(root / "seg1.jsonl", [_env("2026-05-17T08:00:00.100Z")])
|
|
_write_jsonl(root / "sub/seg2.jsonl", [_env("2026-05-17T08:00:00.200Z")])
|
|
|
|
# Act
|
|
records = list(iter_records(root))
|
|
|
|
# Assert
|
|
assert len(records) == 2
|