"""Unit tests for `e2e/fixtures/sitl_replay_builder/build_p01_fixtures.py` (AZ-598). All external dependencies (OpenCV, pymavlink, subprocess) are injected via the underscore-prefixed parameters so the suite runs without the production `gps-denied-replay` install OR a working OpenCV/pymavlink build. The actual end-to-end run is a manual operator step (see README). """ from __future__ import annotations import json import subprocess import types from pathlib import Path from typing import Sequence from unittest.mock import MagicMock import pytest import e2e.fixtures.sitl_replay_builder.build_p01_fixtures as bp # encode_stills_to_mp4 def _mk_fake_writer(): w = MagicMock(name="VideoWriter") w.write = MagicMock() w.release = MagicMock() return w def test_encode_stills_to_mp4_empty_paths_raises(tmp_path: Path): # Assert with pytest.raises(FileNotFoundError, match="image_paths is empty"): bp.encode_stills_to_mp4( [], tmp_path / "out.mp4", _video_writer_factory=lambda *a, **kw: _mk_fake_writer(), _imread=lambda p: None, ) def test_encode_stills_to_mp4_writes_each_frame(tmp_path: Path): # Arrange writer = _mk_fake_writer() # Simulate (640, 480, 3) BGR frame via a stand-in object with .shape frame = types.SimpleNamespace(shape=(480, 640, 3)) paths = [tmp_path / f"img-{i}.jpg" for i in range(3)] # Act count = bp.encode_stills_to_mp4( paths, tmp_path / "out.mp4", _video_writer_factory=lambda out, w, h: writer, _imread=lambda p: frame, ) # Assert assert count == 3 assert writer.write.call_count == 3 assert writer.release.call_count == 1 def test_encode_stills_to_mp4_failed_read_raises(tmp_path: Path): # Arrange writer = _mk_fake_writer() frame_ok = types.SimpleNamespace(shape=(480, 640, 3)) seen: list[Path] = [] def imread(path: Path): seen.append(path) return None if str(path).endswith("img-1.jpg") else frame_ok # Assert with pytest.raises(FileNotFoundError, match="failed to read .*img-1.jpg"): bp.encode_stills_to_mp4( [tmp_path / f"img-{i}.jpg" for i in range(3)], tmp_path / "out.mp4", _video_writer_factory=lambda out, w, h: writer, _imread=imread, ) # generate_stationary_tlog def test_generate_stationary_tlog_writes_pairs(tmp_path: Path): # Arrange — fake mavlink writer that records every write() call. writer = MagicMock(name="MavlinkWriter") writer.write = MagicMock() writer.close = MagicMock() # Act pairs = bp.generate_stationary_tlog( tmp_path / "out.tlog", duration_s=2, hz=10, _mavlink_writer_factory=lambda out: writer, ) # Assert — 20 pairs (2s * 10Hz), each pair = 2 messages (RAW_IMU + ATTITUDE) assert pairs == 20 assert writer.write.call_count == 40 assert writer.close.call_count == 1 def test_generate_stationary_tlog_rejects_nonpositive_duration(tmp_path: Path): # Assert with pytest.raises(ValueError, match="duration_s must be positive"): bp.generate_stationary_tlog( tmp_path / "out.tlog", duration_s=0, _mavlink_writer_factory=lambda out: MagicMock(), ) def test_generate_stationary_tlog_rejects_nonpositive_hz(tmp_path: Path): # Assert with pytest.raises(ValueError, match="hz must be positive"): bp.generate_stationary_tlog( tmp_path / "out.tlog", hz=0, _mavlink_writer_factory=lambda out: MagicMock(), ) def test_generate_stationary_tlog_real_pymavlink_round_trip(tmp_path: Path): """Sanity-check the real packers; tlog file is well-formed.""" # Act — use real pymavlink (it's in pyproject.toml deps) pairs = bp.generate_stationary_tlog( tmp_path / "out.tlog", duration_s=1, hz=10, ) # Assert assert pairs == 10 assert (tmp_path / "out.tlog").is_file() assert (tmp_path / "out.tlog").stat().st_size > 0 # run_gps_denied_replay def test_run_gps_denied_replay_builds_correct_cmd(tmp_path: Path): # Arrange captured: list[Sequence[str]] = [] def fake_runner(cmd): captured.append(list(cmd)) return subprocess.CompletedProcess(args=cmd, returncode=0) # Act bp.run_gps_denied_replay( tmp_path / "stills.mp4", tmp_path / "stationary.tlog", tmp_path / "fdr.jsonl", _runner=fake_runner, ) # Assert assert len(captured) == 1 cmd = captured[0] assert cmd[0] == "gps-denied-replay" assert "--video" in cmd and str(tmp_path / "stills.mp4") in cmd assert "--tlog" in cmd and str(tmp_path / "stationary.tlog") in cmd assert "--time-offset-ms" in cmd and "0" in cmd assert "--fdr-out" in cmd and str(tmp_path / "fdr.jsonl") in cmd def test_run_gps_denied_replay_creates_fdr_parent_dir(tmp_path: Path): # Arrange nested = tmp_path / "deep" / "nested" / "fdr.jsonl" # Act bp.run_gps_denied_replay( tmp_path / "video.mp4", tmp_path / "tlog.tlog", nested, _runner=lambda c: subprocess.CompletedProcess(c, 0), ) # Assert assert nested.parent.is_dir() def test_run_gps_denied_replay_passes_extra_args(tmp_path: Path): # Arrange captured: list[Sequence[str]] = [] fake_runner = lambda c: (captured.append(list(c)) or subprocess.CompletedProcess(c, 0)) # Act bp.run_gps_denied_replay( tmp_path / "v.mp4", tmp_path / "t.tlog", tmp_path / "fdr.jsonl", extra_args=["--pace=ASAP", "--log-level=INFO"], _runner=fake_runner, ) # Assert cmd = captured[0] assert "--pace=ASAP" in cmd and "--log-level=INFO" in cmd # parse_fdr_for_outbound_estimates def _write_jsonl(path: Path, records: list[dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(json.dumps(r) for r in records)) def test_parse_fdr_missing_file_raises(tmp_path: Path): # Assert with pytest.raises(FileNotFoundError, match="FDR JSONL not found"): bp.parse_fdr_for_outbound_estimates(tmp_path / "missing.jsonl") def test_parse_fdr_filters_by_kind(tmp_path: Path): # Arrange fdr = tmp_path / "fdr.jsonl" _write_jsonl(fdr, [ {"kind": "other", "payload": {"lat_deg": 99.0, "lon_deg": 99.0}}, {"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}}, {"kind": "another", "payload": {"x": 0}}, {"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}}, ]) # Act estimates = bp.parse_fdr_for_outbound_estimates(fdr) # Assert assert estimates == [ {"lat_deg": 1.0, "lon_deg": 2.0}, {"lat_deg": 3.0, "lon_deg": 4.0}, ] def test_parse_fdr_skips_missing_coords(tmp_path: Path): # Arrange fdr = tmp_path / "fdr.jsonl" _write_jsonl(fdr, [ {"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0}}, # missing lon {"kind": "outbound_position_estimate", "payload": {"lon_deg": 2.0}}, # missing lat {"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}}, ]) # Act estimates = bp.parse_fdr_for_outbound_estimates(fdr) # Assert assert estimates == [{"lat_deg": 1.0, "lon_deg": 2.0}] def test_parse_fdr_custom_kind_and_keys(tmp_path: Path): # Arrange fdr = tmp_path / "fdr.jsonl" _write_jsonl(fdr, [ {"kind": "geo_estimate", "payload": {"latitude": 10.0, "longitude": 20.0}}, ]) # Act estimates = bp.parse_fdr_for_outbound_estimates( fdr, fdr_kind="geo_estimate", lat_key="latitude", lon_key="longitude" ) # Assert assert estimates == [{"lat_deg": 10.0, "lon_deg": 20.0}] def test_parse_fdr_skips_blank_lines(tmp_path: Path): # Arrange fdr = tmp_path / "fdr.jsonl" fdr.write_text( '\n' + json.dumps({"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}}) + '\n\n' ) # Act estimates = bp.parse_fdr_for_outbound_estimates(fdr) # Assert assert len(estimates) == 1 def test_parse_fdr_malformed_json_raises(tmp_path: Path): # Arrange fdr = tmp_path / "fdr.jsonl" fdr.write_text( json.dumps({"kind": "x", "payload": {}}) + "\n" + "{not valid json\n" ) # Assert with pytest.raises(ValueError, match="malformed FDR JSON at .*:2"): bp.parse_fdr_for_outbound_estimates(fdr) # write_outbound_messages_fixture def test_write_outbound_messages_length_mismatch_raises(tmp_path: Path): # Assert with pytest.raises(ValueError, match="length mismatch"): bp.write_outbound_messages_fixture( tmp_path / "out.json", image_ids=["a.jpg", "b.jpg"], estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}], ) def test_write_outbound_messages_preserves_nulls(tmp_path: Path): # Arrange out = tmp_path / "outbound.json" # Act bp.write_outbound_messages_fixture( out, image_ids=["a.jpg", "b.jpg", "c.jpg"], estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}, None, {"lat_deg": 3.0, "lon_deg": 4.0}], ) # Assert payload = json.loads(out.read_text()) assert payload == { "messages": [ {"image_id": "a.jpg", "lat_deg": 1.0, "lon_deg": 2.0}, None, {"image_id": "c.jpg", "lat_deg": 3.0, "lon_deg": 4.0}, ] } def test_write_outbound_messages_creates_parent(tmp_path: Path): # Arrange out = tmp_path / "deeply" / "nested" / "outbound.json" # Act bp.write_outbound_messages_fixture( out, image_ids=["a.jpg"], estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}], ) # Assert assert out.is_file() # write_observer_fixture def test_write_observer_fixture_schema(tmp_path: Path): # Arrange out = tmp_path / "observer.json" # Act bp.write_observer_fixture(out) # Assert — round-trips into the same dict consumed by sitl_observer.get_observer. payload = json.loads(out.read_text()) assert "gps_state" in payload assert payload["gps_state"]["primary_source"] == "MAV" assert "parameters" in payload # build_p01_fixtures end-to-end (mocked) def test_build_p01_fixtures_no_images_raises(tmp_path: Path): # Arrange cfg = bp.BuilderConfig( input_dir=tmp_path / "empty", output_dir=tmp_path / "out", fc_kind="ardupilot", host="sitl-host", ) (tmp_path / "empty").mkdir() # Assert with pytest.raises(FileNotFoundError, match="no AD\\?\\?\\?\\?\\?\\?.jpg images"): bp.build_p01_fixtures(cfg) def test_build_p01_fixtures_end_to_end_with_mocks(tmp_path: Path): # Arrange — synthesize 3 fake AD000NN.jpg files (one per "image"), # mock OpenCV / pymavlink / subprocess, and pre-stage a fake FDR JSONL. input_dir = tmp_path / "in" output_dir = tmp_path / "out" input_dir.mkdir() for n in range(1, 4): (input_dir / f"AD{n:06d}.jpg").touch() writer = _mk_fake_writer() frame = types.SimpleNamespace(shape=(480, 640, 3)) mav_writer = MagicMock(write=MagicMock(), close=MagicMock()) def fake_runner(cmd): # Find the --fdr-out path and pre-populate it with 3 records. fdr_path = Path(cmd[cmd.index("--fdr-out") + 1]) _write_jsonl(fdr_path, [ {"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}}, {"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}}, {"kind": "outbound_position_estimate", "payload": {"lat_deg": 5.0, "lon_deg": 6.0}}, ]) return subprocess.CompletedProcess(cmd, 0) cfg = bp.BuilderConfig( input_dir=input_dir, output_dir=output_dir, fc_kind="ardupilot", host="sitl-host", ) # Act result_dir = bp.build_p01_fixtures( cfg, _runner=fake_runner, _video_writer_factory=lambda out, w, h: writer, _imread=lambda p: frame, _mavlink_writer_factory=lambda out: mav_writer, ) # Assert assert result_dir == output_dir outbound_payload = json.loads((output_dir / "outbound_messages_ardupilot_sitl-host.json").read_text()) assert outbound_payload == { "messages": [ {"image_id": "AD000001.jpg", "lat_deg": 1.0, "lon_deg": 2.0}, {"image_id": "AD000002.jpg", "lat_deg": 3.0, "lon_deg": 4.0}, {"image_id": "AD000003.jpg", "lat_deg": 5.0, "lon_deg": 6.0}, ] } assert (output_dir / "observer_ardupilot_sitl-host.json").is_file() def test_build_p01_fixtures_fewer_estimates_than_frames_pads_nulls(tmp_path: Path): # Arrange — 3 frames, FDR yields 1 estimate; expect 2 null entries. input_dir = tmp_path / "in" output_dir = tmp_path / "out" input_dir.mkdir() for n in range(1, 4): (input_dir / f"AD{n:06d}.jpg").touch() def fake_runner(cmd): fdr_path = Path(cmd[cmd.index("--fdr-out") + 1]) _write_jsonl(fdr_path, [ {"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}}, ]) return subprocess.CompletedProcess(cmd, 0) cfg = bp.BuilderConfig( input_dir=input_dir, output_dir=output_dir, fc_kind="ardupilot", host="sitl-host", ) # Act bp.build_p01_fixtures( cfg, _runner=fake_runner, _video_writer_factory=lambda out, w, h: _mk_fake_writer(), _imread=lambda p: types.SimpleNamespace(shape=(480, 640, 3)), _mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()), ) # Assert payload = json.loads((output_dir / "outbound_messages_ardupilot_sitl-host.json").read_text()) assert payload["messages"][0]["lat_deg"] == 1.0 assert payload["messages"][1] is None assert payload["messages"][2] is None def test_build_p01_fixtures_more_estimates_than_frames_truncates(tmp_path: Path, caplog): # Arrange — 2 frames, FDR yields 4 estimates; expect 2 retained + warn. input_dir = tmp_path / "in" output_dir = tmp_path / "out" input_dir.mkdir() for n in range(1, 3): (input_dir / f"AD{n:06d}.jpg").touch() def fake_runner(cmd): fdr_path = Path(cmd[cmd.index("--fdr-out") + 1]) _write_jsonl(fdr_path, [ {"kind": "outbound_position_estimate", "payload": {"lat_deg": float(i), "lon_deg": float(i)}} for i in range(4) ]) return subprocess.CompletedProcess(cmd, 0) cfg = bp.BuilderConfig( input_dir=input_dir, output_dir=output_dir, fc_kind="ardupilot", host="sitl-host", ) # Act with caplog.at_level("WARNING"): bp.build_p01_fixtures( cfg, _runner=fake_runner, _video_writer_factory=lambda out, w, h: _mk_fake_writer(), _imread=lambda p: types.SimpleNamespace(shape=(480, 640, 3)), _mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()), ) # Assert payload = json.loads((output_dir / "outbound_messages_ardupilot_sitl-host.json").read_text()) assert len(payload["messages"]) == 2 assert any("truncating" in rec.message for rec in caplog.records)