From 1d260f7e41bc0bc5af2152f5460325840395193c Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Sun, 17 May 2026 08:42:12 +0300 Subject: [PATCH] [AZ-594] Implement core-three harness stubs (fdr_reader, frame_source_replay, imu_replay) Replaces the NotImplementedError stubs AZ-406 reserved on three runner- side helpers; these were stranded from any tracker ticket since AZ-407/408 never came back to fill them. Concrete bodies: * fdr_reader.iter_records: JSONL parser + wire-envelope validator; recursive *.jsonl walk; projects {schema_version, ts, producer_id, kind, payload} to runner-side FdrRecord with record_type/monotonic_ms renames; yields oldest-first. * frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG re-encode; auto-detects file vs directory; injectable sleep_fn for unit-test pacing. * imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians attitude conversion; tolerates scientific notation; same sleep_fn injection pattern. Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31). Existing scenario _harness_helpers_implemented probes still return False because they also depend on sitl_observer / fc_proxy_runtime stubs that remain pending; scenario probe cleanup is out of AZ-594 scope. Co-authored-by: Cursor --- .../done/AZ-594_harness_stubs_core_three.md | 91 ++++++++ _docs/03_implementation/batch_74_report.md | 86 +++++++ .../reviews/batch_74_review.md | 144 ++++++++++++ _docs/_autodev_state.md | 4 +- e2e/_unit_tests/helpers/test_fdr_reader.py | 221 ++++++++++++++++-- .../helpers/test_frame_source_replay.py | 216 +++++++++++++++++ e2e/_unit_tests/helpers/test_imu_replay.py | 192 +++++++++++++++ e2e/runner/helpers/fdr_reader.py | 102 +++++++- e2e/runner/helpers/frame_source_replay.py | 135 ++++++++--- e2e/runner/helpers/imu_replay.py | 81 ++++++- 10 files changed, 1196 insertions(+), 76 deletions(-) create mode 100644 _docs/02_tasks/done/AZ-594_harness_stubs_core_three.md create mode 100644 _docs/03_implementation/batch_74_report.md create mode 100644 _docs/03_implementation/reviews/batch_74_review.md create mode 100644 e2e/_unit_tests/helpers/test_frame_source_replay.py create mode 100644 e2e/_unit_tests/helpers/test_imu_replay.py diff --git a/_docs/02_tasks/done/AZ-594_harness_stubs_core_three.md b/_docs/02_tasks/done/AZ-594_harness_stubs_core_three.md new file mode 100644 index 0000000..d342f77 --- /dev/null +++ b/_docs/02_tasks/done/AZ-594_harness_stubs_core_three.md @@ -0,0 +1,91 @@ +# Harness stubs — core three (fdr_reader, frame_source_replay, imu_replay) + +**Task**: AZ-594_harness_stubs_core_three +**Name**: Implement the three foundational e2e/runner/helpers/ stubs that AZ-406 reserved +**Description**: Replace `NotImplementedError` stubs in `fdr_reader.iter_records`, `frame_source_replay.replay_video`, and `imu_replay.ImuReplayer.replay` with concrete bodies. Together they unblock the skip-gated scenarios from batches 71-73 once `sitl_observer` lands. +**Complexity**: 4 points +**Dependencies**: AZ-406, AZ-407, AZ-408 +**Component**: Blackbox Tests / Test Infrastructure (epic AZ-262) +**Tracker**: AZ-594 +**Epic**: AZ-262 (E-BBT) + +## Problem + +AZ-406 committed to the public surface of the runner-side helpers and +reserved each method body as `NotImplementedError`. AZ-407/408 filled +the fixture-builder side; the runtime replay/read surfaces were +deferred without a dedicated ticket. The skip-gated scenarios in +batches 71-73 (`test_ft_p_07/08/10/11`, `test_ft_n_01..04`) reference +these stubs in their `_harness_helpers_implemented` probe and skip +cleanly until the bodies land. + +## Outcome + +- `fdr_reader.iter_records` reads `*.jsonl` files under an archive + root and yields `FdrRecord` envelopes ordered by `monotonic_ms`. +- `frame_source_replay.replay_video` decodes `.mp4` files or + directories of `.jpg`/`.png` frames via OpenCV and emits to the + injected `FrameSink` at the requested cadence. +- `imu_replay.ImuReplayer.replay` parses `data_imu.csv` and drives a + `FcInboundEmitter` at the configured rate. +- Each body has ≥5 unit tests (happy path + ≥2 error paths + ≥1 + boundary case). + +## Scope + +### Included +- The three method bodies + their unit tests. +- Layout-test entries (already present for the helper files). + +### Excluded +- `sitl_observer.get_observer` + `read_*` surfaces — separate ticket. +- `fc_proxy` runtime driver — separate ticket. +- Removing the skip-gates in existing scenarios — happens once + `sitl_observer` lands. + +## Acceptance Criteria + +**AC-1**: `fdr_reader.iter_records` opens every `*.jsonl` file under +the archive root, parses each line as JSON, and yields `FdrRecord` +envelopes sorted by `monotonic_ms`. Raises `FileNotFoundError` on +missing root. + +**AC-2**: `frame_source_replay.replay_video` accepts an `.mp4` file +OR a directory of `.jpg`/`.png` frames; decodes via OpenCV; emits to +the sink at the cadence (encoded FPS if `realtime`, configured FPS +otherwise). Returns the count emitted. Raises `FileNotFoundError` on +missing input. + +**AC-3**: `imu_replay.ImuReplayer.replay` parses the CSV at +`csv_path`, constructs `ImuSample`s with accel/gyro/attitude/baro, +and calls `emitter.emit(sample)` for each row. Returns the count +emitted. Tolerates scientific-notation floats. + +**AC-4**: Each new body has ≥5 unit tests in +`e2e/_unit_tests/helpers/` covering the happy path + 2 error paths + +1 boundary case. + +**AC-5**: Full e2e unit-test suite passes (regression gate). + +## System Under Test Boundary + +None — these are runner-side helpers that synthesize the SUT's +inputs and parse the SUT's FDR output. They do NOT import any +`src/gps_denied_onboard` symbol. + +## Constraints + +- Use `opencv-python-headless` (already pinned in + `e2e/runner/requirements.txt`). +- `imu_replay.replay` must not block on wall-clock at the requested + rate when called in tests; a `time.sleep`-based pacing path is + acceptable for production use but the test must inject a fake + clock / cadence=0 to keep unit tests fast. +- `fdr_reader.iter_records` uses stdlib `json` (orjson is fine but + not required at this stage — keep dependencies minimal). + +## Document Dependencies + +- `_docs/02_document/tests/blackbox-tests.md` § Test infrastructure +- `_docs/02_tasks/done/AZ-406_test_infrastructure.md` +- `_docs/02_tasks/done/AZ-407_fixture_builders_static.md` diff --git a/_docs/03_implementation/batch_74_report.md b/_docs/03_implementation/batch_74_report.md new file mode 100644 index 0000000..d7540b8 --- /dev/null +++ b/_docs/03_implementation/batch_74_report.md @@ -0,0 +1,86 @@ +# Batch 74 Report — Harness Stubs (cycle 1, batch 8 of test phase) + +**Batch**: 74 +**Date**: 2026-05-17 +**Context**: Test implementation (greenfield Step 10 — Implement Tests) +**Tasks**: AZ-594 (4 cp) — 1 task (umbrella ticket for the core three harness stubs) +**Cycle**: 1 +**Verdict**: COMPLETE — PASS (self-reviewed; see `reviews/batch_74_review.md`) + +## Summary + +A planning-gap fix-up batch. The skip-gates in batches 71-73 referenced +`AZ-441 / AZ-407 / AZ-416 leftovers` — but on inspection none of those +tickets actually owned the deferred harness stubs (`frame_source_replay`, +`imu_replay`, `fdr_reader`, `sitl_observer`, `fc_proxy_runtime`). The +stubs were AZ-406 surface reservations that AZ-407/408 never came back +to fill, and were stranded without a tracker entry. + +This batch creates a single umbrella ticket (AZ-594) and ships the +core three of the five — the ones with the lowest implementation risk +and the broadest unblock surface: + +* `fdr_reader.iter_records` — JSONL parser + wire-schema validator. +* `frame_source_replay.replay_video` — OpenCV-backed decode + sink emission. +* `imu_replay.ImuReplayer.replay` — CSV parser + emitter driver. + +The remaining two stubs (`sitl_observer.get_observer` + `read_*` +surfaces; `fc_proxy_runtime` driver + docker wiring) need separate +tickets — they touch live pymavlink / yamspy / TCP plumbing and don't +fit in a single batch alongside the core three. Those become batch 75 +candidates. + +### AZ-594 — Harness stubs (core three) (4 cp) + +* **`runner/helpers/fdr_reader.py`** — + `iter_records(fdr_archive_root)` recursively walks `*.jsonl` files, + validates each line against the wire envelope (`schema_version, ts, + producer_id, kind, payload`), projects onto the runner-side + `FdrRecord` dataclass (`record_type` for `kind`; `monotonic_ms` + derived from ISO 8601 `ts`), and yields records oldest-first. Raises + `FileNotFoundError` on missing root + `ValueError` with file+line + on malformed envelopes. `archive_size_bytes` body was already present. +* **`runner/helpers/frame_source_replay.py`** — `FrameSourceReplayer` + now backs both `replay_image_directory(dir)` and `replay_video(path)`. + `replay_video` auto-detects file vs directory (so AZ-408 injector + frame-directory outputs work via the same entry point). OpenCV + `VideoCapture` decodes MP4; every frame is re-JPEG-encoded so the + sink always receives JPEG bytes. Cadence pacing is parameterised via + a `sleep_fn` injection so unit tests can drop wall-clock entirely. +* **`runner/helpers/imu_replay.py`** — `ImuReplayer.replay(csv_path)` + parses `data_imu.csv` (the AZ-408 schema with possibly scientific- + notation floats), constructs typed `ImuSample`s with attitude + converted from degrees → radians, and drives `FcInboundEmitter.emit`. + Same `sleep_fn` + `realtime` injection pattern as + `frame_source_replay` for test parity. + +## Scenario-probe interaction + +The existing `_harness_helpers_implemented` probes in batches 71-73 +pass `/tmp/non-existent` to each helper. Previously the inner +`except NotImplementedError: return False` fired; with the new bodies, +the outer `except Exception: return False` catches the +`FileNotFoundError` and still returns False. So: + +* No existing scenario silently starts running its full E2E path. +* All eight skip-gated scenarios from batches 71-73 continue to skip + cleanly because they ALSO depend on `sitl_observer` / + `mavproxy_tlog_reader` (for some) / `fc_proxy_runtime` (for FT-N-04). +* Probe-cleanup is explicitly excluded from AZ-594 scope and will land + in the future batch that fills the remaining two stubs. + +## Test Results + +* New unit tests: 14 (fdr_reader) + 10 (frame_source_replay) + 10 + (imu_replay) = **34 new tests**. +* Full `e2e/_unit_tests` suite: **558 passed in 139 s** (previous + cumulative: 527 → +31 net). +* No new linter errors. + +## State + +* Spec moved: `_docs/02_tasks/todo/AZ-594_harness_stubs_core_three.md` + → `_docs/02_tasks/done/`. +* `_docs/_autodev_state.md` advanced to `last_completed_batch: 74`. +* Cumulative review window: `last_cumulative_review = batches_70-72`; + next K=3 cumulative review fires at the end of batch 75. diff --git a/_docs/03_implementation/reviews/batch_74_review.md b/_docs/03_implementation/reviews/batch_74_review.md new file mode 100644 index 0000000..2f1a09d --- /dev/null +++ b/_docs/03_implementation/reviews/batch_74_review.md @@ -0,0 +1,144 @@ +# Code Review Report + +**Batch**: 74 — AZ-594 (harness stubs: core three) +**Date**: 2026-05-17 +**Verdict**: PASS + +## Findings + +(none) + +## Findings Sweep + +### Phase 1 — Context Loading + +Read the AZ-594 task spec, then the three stub files in their +pre-implementation state (`fdr_reader.py`, `frame_source_replay.py`, +`imu_replay.py`). Cross-checked the SUT-side `FdrRecord` wire schema +in `src/gps_denied_onboard/fdr_client/records.py` to confirm field +names (`schema_version`, `ts`, `producer_id`, `kind`, `payload`, +`extra`) and the rename strategy the runner-side `FdrRecord` +duplicates (`kind → record_type`, `ts → monotonic_ms`). Re-read the +existing scenario `_harness_helpers_implemented` probes in the batch +71-73 scenario files to confirm the post-fix probe semantics. + +### Phase 2 — Spec Compliance + +| AC | Coverage | Status | +|----|----------|--------| +| AC-1 (`fdr_reader.iter_records` parses every `*.jsonl` under root, yields `FdrRecord` ordered by `monotonic_ms`; raises FileNotFoundError on missing root) | `test_missing_root_raises_file_not_found`, `test_empty_root_yields_nothing`, `test_single_file_round_trip`, `test_multiple_files_are_merged_and_sorted`, `test_subdirectory_files_included`, `test_payload_passed_through`; envelope-validation tests cover the wire-schema gate (`test_missing_envelope_key_raises`, `test_non_object_line_raises`, `test_malformed_json_raises`, `test_empty_producer_id_raises`, `test_ts_iso_without_z_parses`, `test_blank_lines_are_skipped`); `archive_size_bytes` covered by `test_archive_size_bytes_sums_all_files` and `test_archive_size_bytes_missing_root_returns_zero` | Covered | +| AC-2 (`frame_source_replay.replay_video` accepts `.mp4` OR directory; decodes via OpenCV; emits via sink at cadence; raises FileNotFoundError on missing input) | `test_video_missing_path_raises_file_not_found`, `test_video_dir_delegates_to_image_directory`, `test_video_mp4_round_trip`; image-dir mode covered by `test_image_dir_missing_raises_file_not_found`, `test_image_dir_empty_returns_zero`, `test_image_dir_emits_sorted_by_name`, `test_image_dir_non_jpeg_reencoded`, `test_image_dir_skips_non_image_files`, `test_image_dir_non_realtime_does_not_sleep`, `test_image_dir_realtime_sleeps_per_frame` | Covered | +| AC-3 (`imu_replay.ImuReplayer.replay` parses CSV, constructs `ImuSample`s, calls emitter; tolerates scientific-notation floats) | `test_happy_path_emits_all_rows`, `test_scientific_notation_parses`, `test_attitude_radians_converted`, `test_realtime_sleeps_per_sample`, `test_non_realtime_does_not_sleep`, `test_empty_csv_emits_nothing`; error paths: `test_missing_csv_raises_file_not_found`, `test_rate_hz_must_be_positive`, `test_missing_required_columns_raises`, `test_row_with_unparseable_value_raises` | Covered | +| AC-4 (≥5 unit tests per body — happy + 2 error + 1 boundary) | fdr_reader: 14 tests; frame_source_replay: 10 tests; imu_replay: 10 tests | Covered (exceeds floor) | +| AC-5 (full suite passes) | 558 passed (+31 from 527 baseline) | Covered | + +### Phase 3 — Code Quality + +* **Single responsibility**: + * `fdr_reader` owns wire-envelope parsing + projection to the runner-side + dataclass. It does NOT validate per-kind payload keys — that's left to + the consuming evaluators, matching the AZ-272/273 contract that + documents `payload` as opaque on the runner side. + * `frame_source_replay.FrameSourceReplayer` owns frame decode + sink + emission. Cadence pacing is parameterised via `sleep_fn` for testability; + OpenCV is the only third-party import. + * `imu_replay.ImuReplayer` owns CSV parse + emitter invocation. Cadence + pacing same pattern as frame_source_replay. Row parsing is a pure + function `_parse_row` so missing/malformed columns surface as a + `ValueError` with a file + line pointer. +* **No suppressed errors**: + * `fdr_reader._parse_envelope` raises `ValueError` (with file + line) + on every malformed envelope branch; `json.JSONDecodeError` propagates + naturally for unrecoverable JSON corruption. + * `frame_source_replay._decode_and_emit_video` raises explicit + `ValueError` if OpenCV cannot open the file or fails to encode a + frame — never silently skips a frame. + * `imu_replay._parse_row` re-raises `KeyError`/`ValueError` wrapped + as a contextual `ValueError`; the original `__cause__` is preserved + via `raise ... from exc`. + * No `except Exception: pass`, `2>/dev/null`, or empty `except` blocks. +* **AAA comment discipline**: every new test uses `# Arrange / # Act / + # Assert`; sections omitted when not needed. +* **No code comments narrating what code does** — only the module-level + docstrings explain the wire-vs-runner schema rename, the OpenCV + realtime/non-realtime distinction, and the scientific-notation + tolerance rationale. +* **Public boundary**: confirmed all three modules import only + stdlib (`csv`, `json`, `math`, `time`, `pathlib`, `datetime`) + + `cv2`. No `from gps_denied_onboard ...` anywhere. + +### Phase 4 — Security + +* **No new credentials, secrets, or network surface**. All three + helpers are deterministic file-I/O over caller-supplied paths. +* **Wire-schema gate** in `fdr_reader._parse_envelope` is the safety + invariant — if the SUT FDR schema drifts, the runner blows up at + parse time with a file+line pointer rather than silently producing + records with default-zero fields. This is the explicit + drift-detection rationale documented in the `FdrRecord` docstring. +* **No `eval`, `exec`, `pickle`, or `subprocess`** in any of the three + modules. + +### Phase 5 — Performance + +* All three implementations are O(N) over their input streams + (JSONL lines, frames, CSV rows). Sorting in `iter_records` is the + one O(N log N) step — necessary for the multi-file merge-sort + guarantee, and acceptable since the typical archive is bounded by + the AZ-441 50 GB / 8 h budget. +* No file I/O at module-import time. +* `FrameSourceReplayer._decode_and_emit_video` always re-encodes + frames to JPEG (lossy double-encode if source was MP4). The + intent is so the sink always receives JPEG bytes regardless of + source format — matches the SUT's `ONBOARD_FRAME_SOURCE_PATH` + expectation of file-system-readable JPEG frames. + +### Phase 6 — Cross-Task Consistency + +* **Sleep injection pattern**: `frame_source_replay.FrameSourceReplayer` + and `imu_replay.ImuReplayer` both accept `sleep_fn` as a keyword + argument defaulting to `time.sleep`. Tests pass `lambda _: None` + or a recording stub. Single pattern across the two replay surfaces. +* **`realtime` vs `non-realtime` flag**: both replayers default to + `realtime=True` and skip the sleep when `False`. Consistent semantics + so that future scenarios can choose between wall-clock replay + (for live-FC tests) and fast replay (for FDR/evidence-only tests). +* **FileNotFoundError**: all three new bodies surface missing input + paths via `FileNotFoundError`, consistent with how the existing + helpers `accuracy_evaluator`, `multi_segment_evaluator`, + `mavproxy_tlog_reader`, `cold_start_evaluator` handle missing + inputs. +* **Scenario probe interaction**: the existing + `_harness_helpers_implemented` probes in batches 71-73 (FT-P-07, + FT-P-08, FT-P-10, FT-P-11, FT-N-01, FT-N-02, FT-N-03, FT-N-04) + pass `/tmp/non-existent` to each helper. Previously the inner + `except NotImplementedError: return False` fired; now the outer + `except Exception: return False` catches the new `FileNotFoundError` + and still returns False. The scenarios continue to skip — which is + correct because the OTHER probe-gated helpers (`sitl_observer`, + `mavproxy_tlog_reader` for some scenarios, `fc_proxy_runtime`) are + still pending. Probe cleanup is explicitly excluded from AZ-594 + scope and will happen once the remaining harness stubs land. + +### Phase 7 — Architecture Compliance + +* **Module placement unchanged**: existing files at + `e2e/runner/helpers/{fdr_reader,frame_source_replay,imu_replay}.py` + were edited in place (bodies replace `NotImplementedError`); no + layout changes. New unit-test files in + `e2e/_unit_tests/helpers/`. Layout invariant test still passes — + these helpers were already listed. +* **No `src/gps_denied_onboard` imports** anywhere. Verified. +* **Existing scenario surface preserved**: `FdrRecord` dataclass + field names and types are unchanged from the AZ-406 contract. + `ImuSample` likewise. `ReplayCadence` and `FrameSink` Protocol + unchanged. Consumers in batches 71-73 stay valid. + +## Test Results + +* New unit tests: 14 (fdr_reader) + 10 (frame_source_replay) + 10 + (imu_replay) = **34 new tests**. +* Full `e2e/_unit_tests` suite: **558 passed in 139 s** (previous + cumulative: 527 → +31 net). +* No new linter errors (`ReadLints` on all six new/modified files + reported clean). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 76ad66f..421f543 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,9 +12,9 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 73 +last_completed_batch: 74 last_cumulative_review: batches_70-72 -current_batch: 74 +current_batch: 75 current_batch_tasks: "" last_step_outcomes: step_8: "Code is testable — no changes needed (testability_assessment.md committed; no list-of-changes, no source edits)" diff --git a/e2e/_unit_tests/helpers/test_fdr_reader.py b/e2e/_unit_tests/helpers/test_fdr_reader.py index bf27e64..1e02bc4 100644 --- a/e2e/_unit_tests/helpers/test_fdr_reader.py +++ b/e2e/_unit_tests/helpers/test_fdr_reader.py @@ -1,37 +1,214 @@ -"""Unit tests for `runner.helpers.fdr_reader.archive_size_bytes`. - -The full `iter_records` parser is owned by AZ-441; AZ-406 only commits to -the directory-size helper. -""" +"""Unit tests for `e2e/runner/helpers/fdr_reader.py` (AZ-594 AC-1).""" from __future__ import annotations +import json from pathlib import Path import pytest -from runner.helpers.fdr_reader import archive_size_bytes +from e2e.runner.helpers.fdr_reader import FdrRecord, archive_size_bytes, iter_records -def test_archive_size_zero_for_missing_root(tmp_path: Path) -> None: - assert archive_size_bytes(tmp_path / "does-not-exist") == 0 +def _write_jsonl(path: Path, records: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w") as fh: + for r in records: + fh.write(json.dumps(r) + "\n") -def test_archive_size_sums_nested_files(tmp_path: Path) -> None: - # Arrange - (tmp_path / "a").mkdir() - (tmp_path / "a" / "b.bin").write_bytes(b"x" * 100) - (tmp_path / "a" / "c.bin").write_bytes(b"y" * 50) - (tmp_path / "top.bin").write_bytes(b"z" * 200) - # Act - size = archive_size_bytes(tmp_path) +def _env(ts: str, *, kind: str = "vio.tick", producer_id: str = "c1.vio", payload: dict | None = None) -> dict: + return { + "schema_version": 1, + "ts": ts, + "producer_id": producer_id, + "kind": kind, + "payload": payload if payload is not None else {"frame_id": "f0"}, + } + + +def test_missing_root_raises_file_not_found(tmp_path: Path): # Assert - assert size == 350 + with pytest.raises(FileNotFoundError, match="FDR archive root not found"): + list(iter_records(tmp_path / "nope")) -def test_iter_records_raises_until_az441_lands() -> None: - """Until AZ-441 fills the parser in, callers must see a clear error.""" - from runner.helpers.fdr_reader import iter_records +def test_empty_root_yields_nothing(tmp_path: Path): + # Arrange + (tmp_path / "fdr").mkdir() - with pytest.raises(NotImplementedError, match="AZ-441"): - next(iter_records(Path("/tmp/nonexistent"))) + # Act + records = list(iter_records(tmp_path / "fdr")) + + # Assert + assert records == [] + + +def test_single_file_round_trip(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + _write_jsonl( + root / "segment_001.jsonl", + [_env("2026-05-17T08:00:00.100Z"), _env("2026-05-17T08:00:00.200Z")], + ) + + # Act + records = list(iter_records(root)) + + # Assert + assert len(records) == 2 + assert all(isinstance(r, FdrRecord) for r in records) + assert records[0].record_type == "vio.tick" + assert records[0].producer_id == "c1.vio" + assert records[1].monotonic_ms - records[0].monotonic_ms == 100 + + +def test_multiple_files_are_merged_and_sorted(tmp_path: Path): + # Arrange — file B has older records than file A. + root = tmp_path / "fdr" + _write_jsonl( + root / "b_segment.jsonl", + [_env("2026-05-17T08:00:01.000Z")], + ) + _write_jsonl( + root / "a_segment.jsonl", + [_env("2026-05-17T08:00:00.500Z")], + ) + + # Act + records = list(iter_records(root)) + + # Assert — global oldest-first regardless of filename order. + assert len(records) == 2 + assert records[0].monotonic_ms < records[1].monotonic_ms + + +def test_blank_lines_are_skipped(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + root.mkdir() + with (root / "segment_001.jsonl").open("w") as fh: + fh.write(json.dumps(_env("2026-05-17T08:00:00.100Z")) + "\n") + fh.write("\n") + fh.write(" \n") + fh.write(json.dumps(_env("2026-05-17T08:00:00.200Z")) + "\n") + + # Act + records = list(iter_records(root)) + + # Assert + assert len(records) == 2 + + +def test_missing_envelope_key_raises(tmp_path: Path): + # Arrange — missing `kind`. + root = tmp_path / "fdr" + bad = { + "schema_version": 1, + "ts": "2026-05-17T08:00:00.100Z", + "producer_id": "c1.vio", + "payload": {}, + } + _write_jsonl(root / "bad.jsonl", [bad]) + + # Act / Assert + with pytest.raises(ValueError, match="missing required keys \\['kind'\\]"): + list(iter_records(root)) + + +def test_non_object_line_raises(tmp_path: Path): + # Arrange — array at top level. + root = tmp_path / "fdr" + root.mkdir() + with (root / "bad.jsonl").open("w") as fh: + fh.write("[1, 2, 3]\n") + + # Act / Assert + with pytest.raises(ValueError, match="not a JSON object"): + list(iter_records(root)) + + +def test_malformed_json_raises(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + root.mkdir() + with (root / "bad.jsonl").open("w") as fh: + fh.write("{not-json\n") + + # Act / Assert + with pytest.raises(json.JSONDecodeError): + list(iter_records(root)) + + +def test_empty_producer_id_raises(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + bad = _env("2026-05-17T08:00:00.100Z", producer_id="") + _write_jsonl(root / "bad.jsonl", [bad]) + + # Act / Assert + with pytest.raises(ValueError, match="producer_id` must be a non-empty"): + list(iter_records(root)) + + +def test_ts_iso_without_z_parses(tmp_path: Path): + # Arrange — already in +00:00 form. + root = tmp_path / "fdr" + _write_jsonl(root / "a.jsonl", [_env("2026-05-17T08:00:00.250+00:00")]) + + # Act + records = list(iter_records(root)) + + # Assert + assert len(records) == 1 + + +def test_payload_passed_through(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + payload = {"frame_idx": 42, "lat_deg": 50.0, "lon_deg": 30.0, "cov_semi_major_m": 5.5} + _write_jsonl( + root / "a.jsonl", + [_env("2026-05-17T08:00:00.100Z", kind="outbound_estimate", payload=payload)], + ) + + # Act + [record] = list(iter_records(root)) + + # Assert + assert record.payload == payload + assert record.record_type == "outbound_estimate" + + +def test_archive_size_bytes_sums_all_files(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + _write_jsonl(root / "a.jsonl", [_env("2026-05-17T08:00:00.100Z")]) + _write_jsonl(root / "sub/b.jsonl", [_env("2026-05-17T08:00:00.200Z")]) + + # Act + total = archive_size_bytes(root) + + # Assert + assert total > 0 + a_size = (root / "a.jsonl").stat().st_size + b_size = (root / "sub/b.jsonl").stat().st_size + assert total == a_size + b_size + + +def test_archive_size_bytes_missing_root_returns_zero(tmp_path: Path): + # Assert + assert archive_size_bytes(tmp_path / "nope") == 0 + + +def test_subdirectory_files_included(tmp_path: Path): + # Arrange + root = tmp_path / "fdr" + _write_jsonl(root / "seg1.jsonl", [_env("2026-05-17T08:00:00.100Z")]) + _write_jsonl(root / "sub/seg2.jsonl", [_env("2026-05-17T08:00:00.200Z")]) + + # Act + records = list(iter_records(root)) + + # Assert + assert len(records) == 2 diff --git a/e2e/_unit_tests/helpers/test_frame_source_replay.py b/e2e/_unit_tests/helpers/test_frame_source_replay.py new file mode 100644 index 0000000..90fb4d6 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_frame_source_replay.py @@ -0,0 +1,216 @@ +"""Unit tests for `e2e/runner/helpers/frame_source_replay.py` (AZ-594 AC-2).""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from e2e.runner.helpers.frame_source_replay import ( + FrameSourceReplayer, + ReplayCadence, +) + + +@dataclass +class _RecordingSink: + """In-memory FrameSink that captures every emission for assertions.""" + + frames: list[tuple[bytes, int]] = field(default_factory=list) + + def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: + self.frames.append((jpeg_bytes, timestamp_ms)) + + +@dataclass +class _RecordingSleep: + """Captures the durations the replayer was asked to sleep.""" + + sleeps: list[float] = field(default_factory=list) + + def __call__(self, duration_s: float) -> None: + self.sleeps.append(duration_s) + + +def _write_jpg(path: Path, w: int = 64, h: int = 48, fill: int = 128) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + img = np.full((h, w, 3), fill, dtype=np.uint8) + cv2.imwrite(str(path), img) + + +def _write_video(path: Path, n_frames: int, fps: float, w: int = 64, h: int = 48) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(path), fourcc, fps, (w, h)) + if not writer.isOpened(): + pytest.skip(f"OpenCV cannot write mp4v on this platform: {path}") + try: + for i in range(n_frames): + img = np.full((h, w, 3), (i * 5) % 255, dtype=np.uint8) + writer.write(img) + finally: + writer.release() + + +# replay_image_directory + + +def test_image_dir_missing_raises_file_not_found(tmp_path: Path): + # Arrange + sink = _RecordingSink() + replayer = FrameSourceReplayer(sink, sleep_fn=lambda _: None) + + # Assert + with pytest.raises(FileNotFoundError, match="frame directory not found"): + replayer.replay_image_directory(tmp_path / "nope") + + +def test_image_dir_empty_returns_zero(tmp_path: Path): + # Arrange + (tmp_path / "frames").mkdir() + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory( + tmp_path / "frames" + ) + + # Assert + assert emitted == 0 + assert sink.frames == [] + + +def test_image_dir_emits_sorted_by_name(tmp_path: Path): + # Arrange — files written out of order; sort must restore numeric order. + frames_dir = tmp_path / "frames" + _write_jpg(frames_dir / "AD000003.jpg", fill=30) + _write_jpg(frames_dir / "AD000001.jpg", fill=10) + _write_jpg(frames_dir / "AD000002.jpg", fill=20) + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer( + sink, + cadence=ReplayCadence(fps=10.0, realtime=False), + sleep_fn=lambda _: None, + ).replay_image_directory(frames_dir) + + # Assert — three frames at 0/100/200 ms. + assert emitted == 3 + assert [ts for _, ts in sink.frames] == [0, 100, 200] + assert all(b.startswith(b"\xff\xd8") for b, _ in sink.frames) # JPEG SOI + + +def test_image_dir_non_jpeg_reencoded(tmp_path: Path): + # Arrange + frames_dir = tmp_path / "frames" + png_path = frames_dir / "AD000001.png" + frames_dir.mkdir() + img = np.full((48, 64, 3), 100, dtype=np.uint8) + cv2.imwrite(str(png_path), img) + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory(frames_dir) + + # Assert + assert emitted == 1 + assert sink.frames[0][0].startswith(b"\xff\xd8") # JPEG, not PNG + + +def test_image_dir_skips_non_image_files(tmp_path: Path): + # Arrange + frames_dir = tmp_path / "frames" + _write_jpg(frames_dir / "AD000001.jpg") + (frames_dir / "README.txt").write_text("not an image") + (frames_dir / "manifest.csv").write_text("col1,col2\n1,2\n") + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer(sink, sleep_fn=lambda _: None).replay_image_directory(frames_dir) + + # Assert + assert emitted == 1 + + +def test_image_dir_non_realtime_does_not_sleep(tmp_path: Path): + # Arrange + frames_dir = tmp_path / "frames" + for i in range(3): + _write_jpg(frames_dir / f"AD{i:06d}.jpg") + sink = _RecordingSink() + sleep = _RecordingSleep() + + # Act + FrameSourceReplayer( + sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=sleep + ).replay_image_directory(frames_dir) + + # Assert + assert sleep.sleeps == [] + + +def test_image_dir_realtime_sleeps_per_frame(tmp_path: Path): + # Arrange + frames_dir = tmp_path / "frames" + for i in range(3): + _write_jpg(frames_dir / f"AD{i:06d}.jpg") + sink = _RecordingSink() + sleep = _RecordingSleep() + + # Act + FrameSourceReplayer( + sink, cadence=ReplayCadence(fps=10.0, realtime=True), sleep_fn=sleep + ).replay_image_directory(frames_dir) + + # Assert — sleeps once per emitted frame at 0.1 s. + assert sleep.sleeps == pytest.approx([0.1, 0.1, 0.1]) + + +# replay_video + + +def test_video_missing_path_raises_file_not_found(tmp_path: Path): + # Arrange + sink = _RecordingSink() + replayer = FrameSourceReplayer(sink, sleep_fn=lambda _: None) + + # Assert + with pytest.raises(FileNotFoundError, match="video path not found"): + replayer.replay_video(tmp_path / "nope.mp4") + + +def test_video_dir_delegates_to_image_directory(tmp_path: Path): + # Arrange + frames_dir = tmp_path / "frames" + for i in range(2): + _write_jpg(frames_dir / f"AD{i:06d}.jpg") + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer( + sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=lambda _: None + ).replay_video(frames_dir) + + # Assert + assert emitted == 2 + + +def test_video_mp4_round_trip(tmp_path: Path): + # Arrange — write a 5-frame 10 FPS MP4 then replay it. + video_path = tmp_path / "tiny.mp4" + _write_video(video_path, n_frames=5, fps=10.0) + sink = _RecordingSink() + + # Act + emitted = FrameSourceReplayer( + sink, cadence=ReplayCadence(fps=10.0, realtime=False), sleep_fn=lambda _: None + ).replay_video(video_path) + + # Assert + assert emitted == 5 + assert [ts for _, ts in sink.frames] == [0, 100, 200, 300, 400] + assert all(b.startswith(b"\xff\xd8") for b, _ in sink.frames) diff --git a/e2e/_unit_tests/helpers/test_imu_replay.py b/e2e/_unit_tests/helpers/test_imu_replay.py new file mode 100644 index 0000000..f1bd50d --- /dev/null +++ b/e2e/_unit_tests/helpers/test_imu_replay.py @@ -0,0 +1,192 @@ +"""Unit tests for `e2e/runner/helpers/imu_replay.py` (AZ-594 AC-3).""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from pathlib import Path + +import pytest + +from e2e.runner.helpers.imu_replay import FcInboundEmitter, ImuReplayer, ImuSample + + +@dataclass +class _RecordingEmitter: + """In-memory FcInboundEmitter that captures every sample.""" + + samples: list[ImuSample] = field(default_factory=list) + + def emit(self, sample: ImuSample) -> None: + self.samples.append(sample) + + +@dataclass +class _RecordingSleep: + sleeps: list[float] = field(default_factory=list) + + def __call__(self, duration_s: float) -> None: + self.sleeps.append(duration_s) + + +_HEADER = "timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg,baro_m\n" + + +def _write_csv(path: Path, rows: list[str], *, header: str = _HEADER) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w") as fh: + fh.write(header) + for r in rows: + fh.write(r + "\n") + + +def test_missing_csv_raises_file_not_found(tmp_path: Path): + # Arrange + emitter = _RecordingEmitter() + replayer = ImuReplayer(emitter, realtime=False) + + # Assert + with pytest.raises(FileNotFoundError, match="IMU CSV not found"): + replayer.replay(tmp_path / "nope.csv") + + +def test_rate_hz_must_be_positive(): + # Assert + with pytest.raises(ValueError, match="rate_hz must be positive"): + ImuReplayer(_RecordingEmitter(), rate_hz=0) + + +def test_missing_required_columns_raises(tmp_path: Path): + # Arrange — missing `baro_m`. + p = tmp_path / "imu.csv" + _write_csv( + p, + ["100,0,0,9.8,0,0,0,0,0,0"], + header="timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg\n", + ) + + # Assert + with pytest.raises(ValueError, match="missing required columns"): + ImuReplayer(_RecordingEmitter(), realtime=False).replay(p) + + +def test_row_with_unparseable_value_raises(tmp_path: Path): + # Arrange + p = tmp_path / "imu.csv" + _write_csv(p, ["100,0,0,not-a-float,0,0,0,0,0,0,300"]) + + # Assert + with pytest.raises(ValueError, match="IMU CSV row malformed"): + ImuReplayer(_RecordingEmitter(), realtime=False).replay(p) + + +def test_happy_path_emits_all_rows(tmp_path: Path): + # Arrange + p = tmp_path / "imu.csv" + _write_csv( + p, + [ + "100,0.0,0.0,9.8,0.0,0.0,0.0,0.0,0.0,0.0,300.0", + "200,0.1,0.0,9.7,0.0,0.0,0.0,0.0,0.0,0.0,300.5", + "300,0.0,0.2,9.6,0.0,0.0,0.0,0.0,0.0,0.0,301.0", + ], + ) + emitter = _RecordingEmitter() + + # Act + emitted = ImuReplayer(emitter, realtime=False).replay(p) + + # Assert + assert emitted == 3 + assert len(emitter.samples) == 3 + assert emitter.samples[0].timestamp_ms == 100 + assert emitter.samples[1].accel_mss == (0.1, 0.0, 9.7) + + +def test_scientific_notation_parses(tmp_path: Path): + # Arrange — AZ-408 fixture style float fields. + p = tmp_path / "imu.csv" + _write_csv( + p, + [ + "100,-4.44E-16,1.23e-3,9.81,-1e-5,2e-7,0.0,1.0,2.0,3.0,300.0", + ], + ) + emitter = _RecordingEmitter() + + # Act + ImuReplayer(emitter, realtime=False).replay(p) + + # Assert + s = emitter.samples[0] + assert s.accel_mss[0] == pytest.approx(-4.44e-16) + assert s.accel_mss[1] == pytest.approx(1.23e-3) + assert s.accel_mss[2] == pytest.approx(9.81) + assert s.gyro_rps == (-1e-5, 2e-7, 0.0) + + +def test_attitude_radians_converted(tmp_path: Path): + # Arrange + p = tmp_path / "imu.csv" + _write_csv( + p, + ["100,0,0,9.8,0,0,0,90,180,270,300"], + ) + emitter = _RecordingEmitter() + + # Act + ImuReplayer(emitter, realtime=False).replay(p) + + # Assert + roll, pitch, yaw = emitter.samples[0].attitude_rad + assert roll == pytest.approx(math.pi / 2) + assert pitch == pytest.approx(math.pi) + assert yaw == pytest.approx(3 * math.pi / 2) + + +def test_realtime_sleeps_per_sample(tmp_path: Path): + # Arrange — 3 rows at 10 Hz → 3 sleeps of 0.1 s. + p = tmp_path / "imu.csv" + _write_csv( + p, + [ + "100,0,0,9.8,0,0,0,0,0,0,300", + "200,0,0,9.8,0,0,0,0,0,0,300", + "300,0,0,9.8,0,0,0,0,0,0,300", + ], + ) + emitter = _RecordingEmitter() + sleep = _RecordingSleep() + + # Act + ImuReplayer(emitter, rate_hz=10.0, realtime=True, sleep_fn=sleep).replay(p) + + # Assert + assert sleep.sleeps == pytest.approx([0.1, 0.1, 0.1]) + + +def test_non_realtime_does_not_sleep(tmp_path: Path): + # Arrange + p = tmp_path / "imu.csv" + _write_csv(p, ["100,0,0,9.8,0,0,0,0,0,0,300"]) + sleep = _RecordingSleep() + + # Act + ImuReplayer(_RecordingEmitter(), realtime=False, sleep_fn=sleep).replay(p) + + # Assert + assert sleep.sleeps == [] + + +def test_empty_csv_emits_nothing(tmp_path: Path): + # Arrange — header only. + p = tmp_path / "imu.csv" + _write_csv(p, []) + emitter = _RecordingEmitter() + + # Act + emitted = ImuReplayer(emitter, realtime=False).replay(p) + + # Assert + assert emitted == 0 + assert emitter.samples == [] diff --git a/e2e/runner/helpers/fdr_reader.py b/e2e/runner/helpers/fdr_reader.py index f24d2d8..d74c14f 100644 --- a/e2e/runner/helpers/fdr_reader.py +++ b/e2e/runner/helpers/fdr_reader.py @@ -1,22 +1,32 @@ """Post-run filesystem read of the FDR archive. The FDR archive is a line-delimited JSON record stream per AZ-272 / AZ-273. -Each line is an `FdrRecord` envelope (producer_id, type, monotonic_ms, -payload). The runner image must NEVER import the SUT's FdrRecord schema -directly — it parses the JSON bytes and validates against a duplicate -record-type allowlist baked into this module. +Each line is an FDR envelope on the wire schema +``{schema_version, ts, producer_id, kind, payload, extra?}``. This module +parses the JSON bytes and validates the wire envelope structurally — the +runner image NEVER imports the SUT's FdrRecord schema directly so a +breaking SUT change surfaces as a parse failure here (visible drift) +rather than silently following along. -Public surface only; concrete parser + assertion helpers are owned by -AZ-441 (NFT-LIM-02 — FDR size budget) and the resilience scenario tasks -that need to crawl the archive (AZ-432, AZ-433, AZ-435). +The runner-side `FdrRecord` dataclass renames `kind` → `record_type` and +projects `ts` (ISO 8601 wall-clock) onto an integer `monotonic_ms` field +for downstream evaluators that work in milliseconds. Within one flight, +ISO 8601 ms-since-epoch is monotonic at the millisecond resolution the +evaluators care about (NFR-RES NTP drift is excluded by AC-7 of the FDR +contract: the on-board clock is monotonic over the lifetime of one +flight session). """ from __future__ import annotations +import json from dataclasses import dataclass +from datetime import datetime from pathlib import Path from typing import Iterator +_WIRE_REQUIRED_KEYS = ("schema_version", "ts", "producer_id", "kind", "payload") + @dataclass(frozen=True) class FdrRecord: @@ -33,15 +43,83 @@ class FdrRecord: payload: dict[str, object] +def _ts_to_monotonic_ms(ts: str) -> int: + """Project ISO 8601 ``ts`` onto an int millisecond value. + + Accepts trailing ``Z`` (UTC) which ``datetime.fromisoformat`` did not + accept until 3.11; we normalise to ``+00:00`` first. + """ + normalised = ts[:-1] + "+00:00" if ts.endswith("Z") else ts + dt = datetime.fromisoformat(normalised) + return int(dt.timestamp() * 1000) + + +def _parse_envelope(line_bytes: bytes, source: Path, line_no: int) -> FdrRecord: + """Decode one JSONL line into a typed envelope. + + Wire-side keys are validated structurally; downstream payload keys are + NOT validated here (the consuming evaluator owns its own payload contract). + """ + decoded = json.loads(line_bytes) + if not isinstance(decoded, dict): + raise ValueError( + f"FDR line is not a JSON object: {source}:{line_no}: type={type(decoded).__name__}" + ) + missing = [k for k in _WIRE_REQUIRED_KEYS if k not in decoded] + if missing: + raise ValueError( + f"FDR wire envelope missing required keys {missing} at {source}:{line_no}" + ) + ts = decoded["ts"] + if not isinstance(ts, str) or not ts: + raise ValueError(f"FDR envelope `ts` must be a non-empty ISO 8601 string at {source}:{line_no}") + producer_id = decoded["producer_id"] + if not isinstance(producer_id, str) or not producer_id: + raise ValueError( + f"FDR envelope `producer_id` must be a non-empty string at {source}:{line_no}" + ) + kind = decoded["kind"] + if not isinstance(kind, str) or not kind: + raise ValueError(f"FDR envelope `kind` must be a non-empty string at {source}:{line_no}") + payload = decoded["payload"] + if not isinstance(payload, dict): + raise ValueError(f"FDR envelope `payload` must be an object at {source}:{line_no}") + return FdrRecord( + producer_id=producer_id, + monotonic_ms=_ts_to_monotonic_ms(ts), + record_type=kind, + payload=payload, + ) + + def iter_records(fdr_archive_root: Path) -> Iterator[FdrRecord]: """Iterate every FDR record in the archive root (ordered by monotonic_ms). - Raises NotImplementedError until AZ-441 supplies the orjson-backed parser. + Walks every ``*.jsonl`` file under ``fdr_archive_root`` (recursive), + parses each line as a wire envelope, and yields the runner-side + ``FdrRecord`` projection. Records are emitted oldest-first across the + union of all files. + + Raises ``FileNotFoundError`` if the archive root does not exist. + Raises ``ValueError`` (with a file + line pointer) on malformed JSON, + a wrong-shape envelope, or an unparseable ``ts``. """ - raise NotImplementedError( - "fdr_reader.iter_records is owned by AZ-441 — AZ-406 supplies only " - "the public surface." - ) + if not fdr_archive_root.exists(): + raise FileNotFoundError( + f"FDR archive root not found: {fdr_archive_root}" + ) + records: list[FdrRecord] = [] + for jsonl_path in sorted(fdr_archive_root.rglob("*.jsonl")): + if not jsonl_path.is_file(): + continue + with jsonl_path.open("rb") as fh: + for line_no, raw in enumerate(fh, start=1): + stripped = raw.strip() + if not stripped: + continue + records.append(_parse_envelope(stripped, jsonl_path, line_no)) + records.sort(key=lambda r: r.monotonic_ms) + yield from records def archive_size_bytes(fdr_archive_root: Path) -> int: diff --git a/e2e/runner/helpers/frame_source_replay.py b/e2e/runner/helpers/frame_source_replay.py index 16bb8d6..46ad96a 100644 --- a/e2e/runner/helpers/frame_source_replay.py +++ b/e2e/runner/helpers/frame_source_replay.py @@ -6,25 +6,30 @@ Two replay modes: SUT polls. 2. Video replay (FT-P-02, FT-P-04, FT-N-01..04, NFT-PERF-*) — decode an MP4 with OpenCV and emit frames at the encoded FPS (or a user-supplied - rate for fast-forward). + rate for fast-forward). ``replay_video`` also accepts a directory of + extracted frames (`AD000001.jpg`-style) so the AZ-408 injectors that + emit frame directories rather than MP4s can use the same surface. The actual frame-source path inside the SUT container is configured via the ``ONBOARD_FRAME_SOURCE_PATH`` environment variable on the SUT — the runner writes to a shared tmpfs volume mounted at the same path inside both containers. - -This file currently provides the public surface used by per-scenario tests; -concrete implementations land alongside their consuming test tasks -(AZ-407 onward). The intent is that `FrameSourceReplayer` is a stable API -the test specs can rely on while the underlying replay strategy is filled -in incrementally. """ from __future__ import annotations +import time from dataclasses import dataclass from pathlib import Path -from typing import Protocol +from typing import Callable, Protocol + +import cv2 + +# Image extensions handled by ``replay_image_directory`` / ``replay_video`` +# when given a directory rather than a file. Sorted-by-name ordering implies +# zero-padded filenames (``AD000001.jpg``) which the AZ-407 / AZ-408 +# fixture builders already produce. +_IMAGE_EXTENSIONS = (".jpg", ".jpeg", ".png", ".bmp") @dataclass(frozen=True) @@ -43,35 +48,109 @@ class FrameSink(Protocol): class FrameSourceReplayer: - """Public surface for replaying frames into the SUT's frame-source path. + """Public surface for replaying frames into the SUT's frame-source path.""" - AZ-407 (Static fixture builders) supplies the concrete still-image replay - implementation; AZ-408 (Runtime synthetic-injection) supplies the video - + injector variants. AZ-406 only commits to the contract. - """ - - def __init__(self, sink: FrameSink, cadence: ReplayCadence | None = None) -> None: + def __init__( + self, + sink: FrameSink, + cadence: ReplayCadence | None = None, + *, + sleep_fn: Callable[[float], None] = time.sleep, + ) -> None: self._sink = sink self._cadence = cadence or ReplayCadence() + # Injected so unit tests can drop wall-clock pacing entirely. + self._sleep = sleep_fn def replay_image_directory(self, directory: Path) -> int: - """Replay every image in ``directory`` (sorted by name). Returns count emitted. + """Replay every image in ``directory`` (sorted by name) at the configured + cadence. Returns the number of frames emitted to the sink. - Raises NotImplementedError until AZ-407 lands. Tests that need this - path should mark themselves @pytest.mark.skip(reason="awaiting AZ-407") - until then; AC-1 (smoke) does not depend on this surface. + Raises ``FileNotFoundError`` if ``directory`` does not exist or is not + a directory. The sink is invoked with raw JPEG-encoded bytes and a + monotonic-ms timestamp that starts at 0 and advances by the period + derived from ``self._cadence.fps``. """ - raise NotImplementedError( - "FrameSourceReplayer.replay_image_directory is owned by AZ-407 — " - "AZ-406 supplies only the public surface." + if not directory.exists() or not directory.is_dir(): + raise FileNotFoundError( + f"frame directory not found: {directory}" + ) + files = sorted( + p for p in directory.iterdir() + if p.is_file() and p.suffix.lower() in _IMAGE_EXTENSIONS ) + return self._emit_files(files) def replay_video(self, video_path: Path) -> int: - """Replay an MP4 / .h264 file frame-by-frame. Returns count emitted. + """Replay frames from ``video_path``. Returns count emitted. - Raises NotImplementedError until AZ-408 lands. + Auto-detects: + * a regular file (``.mp4`` / ``.avi`` / ``.h264``) — decoded with + OpenCV ``VideoCapture`` and re-encoded as JPEG before emission. + * a directory — delegates to ``replay_image_directory`` so the + AZ-408 injectors that emit frame directories can use this entry + point without the caller knowing the difference. + + Raises ``FileNotFoundError`` on a missing path. """ - raise NotImplementedError( - "FrameSourceReplayer.replay_video is owned by AZ-408 — " - "AZ-406 supplies only the public surface." - ) + if not video_path.exists(): + raise FileNotFoundError(f"video path not found: {video_path}") + if video_path.is_dir(): + return self.replay_image_directory(video_path) + return self._decode_and_emit_video(video_path) + + def _decode_and_emit_video(self, video_path: Path) -> int: + cap = cv2.VideoCapture(str(video_path)) + if not cap.isOpened(): + raise ValueError(f"OpenCV failed to open video: {video_path}") + try: + encoded_fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 + fps = encoded_fps if (self._cadence.realtime and encoded_fps > 0) else self._cadence.fps + period_ms = self._period_ms(fps) + emitted = 0 + t_ms = 0 + while True: + ok, frame = cap.read() + if not ok: + break + success, encoded = cv2.imencode(".jpg", frame) + if not success: + raise ValueError( + f"OpenCV failed to JPEG-encode frame {emitted} of {video_path}" + ) + self._sink.write_frame(encoded.tobytes(), t_ms) + emitted += 1 + t_ms += period_ms + if self._cadence.realtime and period_ms > 0: + self._sleep(period_ms / 1000.0) + return emitted + finally: + cap.release() + + def _emit_files(self, files: list[Path]) -> int: + period_ms = self._period_ms(self._cadence.fps) + emitted = 0 + t_ms = 0 + for path in files: + jpeg_bytes = path.read_bytes() + if path.suffix.lower() != ".jpg" and path.suffix.lower() != ".jpeg": + # Re-encode non-JPEG sources so the sink always gets JPEG bytes. + img = cv2.imread(str(path), cv2.IMREAD_UNCHANGED) + if img is None: + raise ValueError(f"OpenCV failed to read image: {path}") + success, encoded = cv2.imencode(".jpg", img) + if not success: + raise ValueError(f"OpenCV failed to JPEG-encode image: {path}") + jpeg_bytes = encoded.tobytes() + self._sink.write_frame(jpeg_bytes, t_ms) + emitted += 1 + t_ms += period_ms + if self._cadence.realtime and period_ms > 0: + self._sleep(period_ms / 1000.0) + return emitted + + @staticmethod + def _period_ms(fps: float) -> int: + if fps <= 0: + return 0 + return int(round(1000.0 / fps)) diff --git a/e2e/runner/helpers/imu_replay.py b/e2e/runner/helpers/imu_replay.py index b8d92ca..1a09604 100644 --- a/e2e/runner/helpers/imu_replay.py +++ b/e2e/runner/helpers/imu_replay.py @@ -3,17 +3,27 @@ CSV schema (from `_docs/00_problem/input_data/flight_derkachi/data_imu.csv`): timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg,baro_m -Owned by AZ-406 (public surface) + AZ-407 (concrete file-driver -implementation). This module commits to the type signatures the -per-scenario tests will import; the actual MAVLink / MSP2 emission is -wired up by the downstream task. +Numeric fields are accepted in any float-parseable form, including +scientific notation (``-4.44E-16``) — the AZ-408 source CSV uses that +form for near-zero values. """ from __future__ import annotations +import csv +import math +import time from dataclasses import dataclass from pathlib import Path -from typing import Protocol +from typing import Callable, Protocol + +_REQUIRED_COLUMNS = ( + "timestamp_ms", + "ax", "ay", "az", + "gx", "gy", "gz", + "roll_deg", "pitch_deg", "yaw_deg", + "baro_m", +) @dataclass(frozen=True) @@ -34,20 +44,67 @@ class FcInboundEmitter(Protocol): ... +def _parse_row(row: dict[str, str], source: Path, line_no: int) -> ImuSample: + try: + return ImuSample( + timestamp_ms=int(round(float(row["timestamp_ms"]))), + accel_mss=(float(row["ax"]), float(row["ay"]), float(row["az"])), + gyro_rps=(float(row["gx"]), float(row["gy"]), float(row["gz"])), + attitude_rad=( + math.radians(float(row["roll_deg"])), + math.radians(float(row["pitch_deg"])), + math.radians(float(row["yaw_deg"])), + ), + baro_alt_m=float(row["baro_m"]), + ) + except (KeyError, ValueError) as exc: + raise ValueError( + f"IMU CSV row malformed at {source}:{line_no}: {exc}" + ) from exc + + class ImuReplayer: """Drives an `FcInboundEmitter` from a CSV file at the recorded cadence.""" - def __init__(self, emitter: FcInboundEmitter, rate_hz: float = 10.0) -> None: + def __init__( + self, + emitter: FcInboundEmitter, + rate_hz: float = 10.0, + *, + sleep_fn: Callable[[float], None] = time.sleep, + realtime: bool = True, + ) -> None: + if rate_hz <= 0: + raise ValueError(f"rate_hz must be positive; got {rate_hz}") self._emitter = emitter self._rate_hz = rate_hz + self._sleep = sleep_fn + self._realtime = realtime def replay(self, csv_path: Path) -> int: """Replay the CSV file. Returns the number of samples emitted. - Concrete implementation is owned by AZ-407 (FT-P-02 derkachi-drift - + FT-P-04 frame-to-frame registration are the first consumers). + Raises ``FileNotFoundError`` on missing CSV. Raises ``ValueError`` + on missing columns or a row that does not parse. When ``realtime`` + is True (default), sleeps ``1 / rate_hz`` seconds between + emissions; tests should pass ``realtime=False`` or inject a + no-op ``sleep_fn`` to keep the unit suite fast. """ - raise NotImplementedError( - "ImuReplayer.replay is owned by AZ-407 — AZ-406 supplies only " - "the public surface." - ) + if not csv_path.exists(): + raise FileNotFoundError(f"IMU CSV not found: {csv_path}") + emitted = 0 + period_s = 1.0 / self._rate_hz + with csv_path.open() as fh: + reader = csv.DictReader(fh) + missing = [c for c in _REQUIRED_COLUMNS if c not in (reader.fieldnames or [])] + if missing: + raise ValueError( + f"IMU CSV {csv_path} missing required columns: {missing}" + ) + for line_no, row in enumerate(reader, start=2): # +1 for header line + sample = _parse_row(row, csv_path, line_no) + self._emitter.emit(sample) + emitted += 1 + if self._realtime: + self._sleep(period_s) + return emitted