mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 16:31:14 +00:00
[AZ-600] Batch 80: refactor sitl_replay_builder to strategy pattern
Replace per-scenario fixture builders with a parameterized strategy framework so future Derkachi-based scenarios compose existing pieces instead of duplicating ~200 lines of orchestration per scenario. New e2e/fixtures/sitl_replay_builder/builder.py: - VideoSource ABC + StillImagesSource, Mp4PassthroughSource - TlogSource ABC + SyntheticStationaryTlog, ImuCsvTlog - FdrProjection ABC + RawFdrPassthrough, OutboundMessagesProjection - FixtureBuilderConfig + build_fixtures(cfg) orchestrator - Consolidated MAVLink pack_raw_imu / pack_attitude helpers - Consolidated run_gps_denied_replay + write_observer_fixture build_p01_fixtures.py: 423 -> 107 lines (75% reduction). build_p02_fixtures.py: 292 -> 98 lines (66% reduction). _common.py: deleted (folded into builder.py). Tests reorganized: - test_sitl_replay_builder_builder.py (new, 33 strategy-level tests) - test_sitl_replay_builder.py (slimmed, 6 FT-P-01 integration) - test_sitl_replay_builder_p02.py (slimmed, 7 FT-P-02 integration) README documents the strategy framework + a worked example for adding FT-P-04 in ~30 lines (no new strategy code required). Regression gate: 700 passing (was 686; +14 from finer-grained coverage of new strategy classes and the build_fixtures orchestrator). Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
"""Unit tests for `e2e/fixtures/sitl_replay_builder/build_p01_fixtures.py` (AZ-598).
|
||||
"""Integration tests for `e2e/fixtures/sitl_replay_builder/build_p01_fixtures.py` (FT-P-01).
|
||||
|
||||
All external dependencies (OpenCV, pymavlink, subprocess) are injected via
|
||||
the underscore-prefixed parameters so the suite runs without the
|
||||
production `gps-denied-replay` install OR a working OpenCV/pymavlink
|
||||
build. The actual end-to-end run is a manual operator step (see README).
|
||||
Strategy-level unit tests for the underlying ``builder.py`` machinery live
|
||||
in ``test_sitl_replay_builder_builder.py``. This file exercises the
|
||||
FT-P-01 scenario composition end-to-end (with all external deps mocked).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -12,378 +11,76 @@ import json
|
||||
import subprocess
|
||||
import types
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
import e2e.fixtures.sitl_replay_builder.build_p01_fixtures as bp
|
||||
import e2e.fixtures.sitl_replay_builder.build_p01_fixtures as bp01
|
||||
|
||||
|
||||
# encode_stills_to_mp4
|
||||
|
||||
|
||||
def _mk_fake_writer():
|
||||
def _mk_fake_video_writer() -> MagicMock:
|
||||
w = MagicMock(name="VideoWriter")
|
||||
w.write = MagicMock()
|
||||
w.release = MagicMock()
|
||||
return w
|
||||
|
||||
|
||||
def test_encode_stills_to_mp4_empty_paths_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="image_paths is empty"):
|
||||
bp.encode_stills_to_mp4(
|
||||
[], tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda *a, **kw: _mk_fake_writer(),
|
||||
_imread=lambda p: None,
|
||||
)
|
||||
|
||||
|
||||
def test_encode_stills_to_mp4_writes_each_frame(tmp_path: Path):
|
||||
# Arrange
|
||||
writer = _mk_fake_writer()
|
||||
# Simulate (640, 480, 3) BGR frame via a stand-in object with .shape
|
||||
frame = types.SimpleNamespace(shape=(480, 640, 3))
|
||||
paths = [tmp_path / f"img-{i}.jpg" for i in range(3)]
|
||||
|
||||
# Act
|
||||
count = bp.encode_stills_to_mp4(
|
||||
paths, tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda out, w, h: writer,
|
||||
_imread=lambda p: frame,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert count == 3
|
||||
assert writer.write.call_count == 3
|
||||
assert writer.release.call_count == 1
|
||||
|
||||
|
||||
def test_encode_stills_to_mp4_failed_read_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
writer = _mk_fake_writer()
|
||||
frame_ok = types.SimpleNamespace(shape=(480, 640, 3))
|
||||
seen: list[Path] = []
|
||||
|
||||
def imread(path: Path):
|
||||
seen.append(path)
|
||||
return None if str(path).endswith("img-1.jpg") else frame_ok
|
||||
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="failed to read .*img-1.jpg"):
|
||||
bp.encode_stills_to_mp4(
|
||||
[tmp_path / f"img-{i}.jpg" for i in range(3)],
|
||||
tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda out, w, h: writer,
|
||||
_imread=imread,
|
||||
)
|
||||
|
||||
|
||||
# generate_stationary_tlog
|
||||
|
||||
|
||||
def test_generate_stationary_tlog_writes_pairs(tmp_path: Path):
|
||||
# Arrange — fake mavlink writer that records every write() call.
|
||||
writer = MagicMock(name="MavlinkWriter")
|
||||
writer.write = MagicMock()
|
||||
writer.close = MagicMock()
|
||||
|
||||
# Act
|
||||
pairs = bp.generate_stationary_tlog(
|
||||
tmp_path / "out.tlog",
|
||||
duration_s=2, hz=10,
|
||||
_mavlink_writer_factory=lambda out: writer,
|
||||
)
|
||||
|
||||
# Assert — 20 pairs (2s * 10Hz), each pair = 2 messages (RAW_IMU + ATTITUDE)
|
||||
assert pairs == 20
|
||||
assert writer.write.call_count == 40
|
||||
assert writer.close.call_count == 1
|
||||
|
||||
|
||||
def test_generate_stationary_tlog_rejects_nonpositive_duration(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="duration_s must be positive"):
|
||||
bp.generate_stationary_tlog(
|
||||
tmp_path / "out.tlog", duration_s=0,
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_generate_stationary_tlog_rejects_nonpositive_hz(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="hz must be positive"):
|
||||
bp.generate_stationary_tlog(
|
||||
tmp_path / "out.tlog", hz=0,
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_generate_stationary_tlog_real_pymavlink_round_trip(tmp_path: Path):
|
||||
"""Sanity-check the real packers; tlog file is well-formed."""
|
||||
# Act — use real pymavlink (it's in pyproject.toml deps)
|
||||
pairs = bp.generate_stationary_tlog(
|
||||
tmp_path / "out.tlog", duration_s=1, hz=10,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert pairs == 10
|
||||
assert (tmp_path / "out.tlog").is_file()
|
||||
assert (tmp_path / "out.tlog").stat().st_size > 0
|
||||
|
||||
|
||||
# run_gps_denied_replay
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_builds_correct_cmd(tmp_path: Path):
|
||||
# Arrange
|
||||
captured: list[Sequence[str]] = []
|
||||
|
||||
def fake_runner(cmd):
|
||||
captured.append(list(cmd))
|
||||
return subprocess.CompletedProcess(args=cmd, returncode=0)
|
||||
|
||||
# Act
|
||||
bp.run_gps_denied_replay(
|
||||
tmp_path / "stills.mp4", tmp_path / "stationary.tlog",
|
||||
tmp_path / "fdr.jsonl",
|
||||
_runner=fake_runner,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(captured) == 1
|
||||
cmd = captured[0]
|
||||
assert cmd[0] == "gps-denied-replay"
|
||||
assert "--video" in cmd and str(tmp_path / "stills.mp4") in cmd
|
||||
assert "--tlog" in cmd and str(tmp_path / "stationary.tlog") in cmd
|
||||
assert "--time-offset-ms" in cmd and "0" in cmd
|
||||
assert "--fdr-out" in cmd and str(tmp_path / "fdr.jsonl") in cmd
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_creates_fdr_parent_dir(tmp_path: Path):
|
||||
# Arrange
|
||||
nested = tmp_path / "deep" / "nested" / "fdr.jsonl"
|
||||
|
||||
# Act
|
||||
bp.run_gps_denied_replay(
|
||||
tmp_path / "video.mp4", tmp_path / "tlog.tlog", nested,
|
||||
_runner=lambda c: subprocess.CompletedProcess(c, 0),
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert nested.parent.is_dir()
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_passes_extra_args(tmp_path: Path):
|
||||
# Arrange
|
||||
captured: list[Sequence[str]] = []
|
||||
fake_runner = lambda c: (captured.append(list(c)) or subprocess.CompletedProcess(c, 0))
|
||||
|
||||
# Act
|
||||
bp.run_gps_denied_replay(
|
||||
tmp_path / "v.mp4", tmp_path / "t.tlog", tmp_path / "fdr.jsonl",
|
||||
extra_args=["--pace=ASAP", "--log-level=INFO"],
|
||||
_runner=fake_runner,
|
||||
)
|
||||
|
||||
# Assert
|
||||
cmd = captured[0]
|
||||
assert "--pace=ASAP" in cmd and "--log-level=INFO" in cmd
|
||||
|
||||
|
||||
# parse_fdr_for_outbound_estimates
|
||||
|
||||
|
||||
def _write_jsonl(path: Path, records: list[dict]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("\n".join(json.dumps(r) for r in records))
|
||||
|
||||
|
||||
def test_parse_fdr_missing_file_raises(tmp_path: Path):
|
||||
# resolve_p01_image_paths
|
||||
|
||||
|
||||
def test_resolve_p01_image_paths_missing_dir_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="FDR JSONL not found"):
|
||||
bp.parse_fdr_for_outbound_estimates(tmp_path / "missing.jsonl")
|
||||
with pytest.raises(FileNotFoundError, match="input dir not found"):
|
||||
bp01.resolve_p01_image_paths(tmp_path / "missing")
|
||||
|
||||
|
||||
def test_parse_fdr_filters_by_kind(tmp_path: Path):
|
||||
def test_resolve_p01_image_paths_sorted(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "other", "payload": {"lat_deg": 99.0, "lon_deg": 99.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
{"kind": "another", "payload": {"x": 0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}},
|
||||
])
|
||||
for n in (3, 1, 2):
|
||||
(tmp_path / f"AD{n:06d}.jpg").touch()
|
||||
(tmp_path / "ignored.txt").touch()
|
||||
|
||||
# Act
|
||||
estimates = bp.parse_fdr_for_outbound_estimates(fdr)
|
||||
paths = bp01.resolve_p01_image_paths(tmp_path)
|
||||
|
||||
# Assert
|
||||
assert estimates == [
|
||||
{"lat_deg": 1.0, "lon_deg": 2.0},
|
||||
{"lat_deg": 3.0, "lon_deg": 4.0},
|
||||
]
|
||||
# Assert — only AD*.jpg, sorted by name
|
||||
assert [p.name for p in paths] == ["AD000001.jpg", "AD000002.jpg", "AD000003.jpg"]
|
||||
|
||||
|
||||
def test_parse_fdr_skips_missing_coords(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0}}, # missing lon
|
||||
{"kind": "outbound_position_estimate", "payload": {"lon_deg": 2.0}}, # missing lat
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
estimates = bp.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert estimates == [{"lat_deg": 1.0, "lon_deg": 2.0}]
|
||||
|
||||
|
||||
def test_parse_fdr_custom_kind_and_keys(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "geo_estimate", "payload": {"latitude": 10.0, "longitude": 20.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
estimates = bp.parse_fdr_for_outbound_estimates(
|
||||
fdr, fdr_kind="geo_estimate", lat_key="latitude", lon_key="longitude"
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert estimates == [{"lat_deg": 10.0, "lon_deg": 20.0}]
|
||||
|
||||
|
||||
def test_parse_fdr_skips_blank_lines(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
'\n'
|
||||
+ json.dumps({"kind": "outbound_position_estimate",
|
||||
"payload": {"lat_deg": 1.0, "lon_deg": 2.0}})
|
||||
+ '\n\n'
|
||||
)
|
||||
|
||||
# Act
|
||||
estimates = bp.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert len(estimates) == 1
|
||||
|
||||
|
||||
def test_parse_fdr_malformed_json_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
json.dumps({"kind": "x", "payload": {}}) + "\n"
|
||||
+ "{not valid json\n"
|
||||
)
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="malformed FDR JSON at .*:2"):
|
||||
bp.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
|
||||
# write_outbound_messages_fixture
|
||||
|
||||
|
||||
def test_write_outbound_messages_length_mismatch_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="length mismatch"):
|
||||
bp.write_outbound_messages_fixture(
|
||||
tmp_path / "out.json",
|
||||
image_ids=["a.jpg", "b.jpg"],
|
||||
estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}],
|
||||
)
|
||||
|
||||
|
||||
def test_write_outbound_messages_preserves_nulls(tmp_path: Path):
|
||||
# Arrange
|
||||
out = tmp_path / "outbound.json"
|
||||
|
||||
# Act
|
||||
bp.write_outbound_messages_fixture(
|
||||
out,
|
||||
image_ids=["a.jpg", "b.jpg", "c.jpg"],
|
||||
estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}, None, {"lat_deg": 3.0, "lon_deg": 4.0}],
|
||||
)
|
||||
|
||||
# Assert
|
||||
payload = json.loads(out.read_text())
|
||||
assert payload == {
|
||||
"messages": [
|
||||
{"image_id": "a.jpg", "lat_deg": 1.0, "lon_deg": 2.0},
|
||||
None,
|
||||
{"image_id": "c.jpg", "lat_deg": 3.0, "lon_deg": 4.0},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_write_outbound_messages_creates_parent(tmp_path: Path):
|
||||
# Arrange
|
||||
out = tmp_path / "deeply" / "nested" / "outbound.json"
|
||||
|
||||
# Act
|
||||
bp.write_outbound_messages_fixture(
|
||||
out, image_ids=["a.jpg"], estimates=[{"lat_deg": 1.0, "lon_deg": 2.0}],
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert out.is_file()
|
||||
|
||||
|
||||
# write_observer_fixture
|
||||
|
||||
|
||||
def test_write_observer_fixture_schema(tmp_path: Path):
|
||||
# Arrange
|
||||
out = tmp_path / "observer.json"
|
||||
|
||||
# Act
|
||||
bp.write_observer_fixture(out)
|
||||
|
||||
# Assert — round-trips into the same dict consumed by sitl_observer.get_observer.
|
||||
payload = json.loads(out.read_text())
|
||||
assert "gps_state" in payload
|
||||
assert payload["gps_state"]["primary_source"] == "MAV"
|
||||
assert "parameters" in payload
|
||||
|
||||
|
||||
# build_p01_fixtures end-to-end (mocked)
|
||||
# build_p01_fixtures end-to-end
|
||||
|
||||
|
||||
def test_build_p01_fixtures_no_images_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
cfg = bp.BuilderConfig(
|
||||
(tmp_path / "empty").mkdir()
|
||||
cfg = bp01.BuilderConfig(
|
||||
input_dir=tmp_path / "empty", output_dir=tmp_path / "out",
|
||||
fc_kind="ardupilot", host="sitl-host",
|
||||
)
|
||||
(tmp_path / "empty").mkdir()
|
||||
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="no AD\\?\\?\\?\\?\\?\\?.jpg images"):
|
||||
bp.build_p01_fixtures(cfg)
|
||||
bp01.build_p01_fixtures(cfg)
|
||||
|
||||
|
||||
def test_build_p01_fixtures_end_to_end_with_mocks(tmp_path: Path):
|
||||
# Arrange — synthesize 3 fake AD000NN.jpg files (one per "image"),
|
||||
# mock OpenCV / pymavlink / subprocess, and pre-stage a fake FDR JSONL.
|
||||
# Arrange — 3 fake AD000NN.jpg files, mocked OpenCV / pymavlink / subprocess
|
||||
input_dir = tmp_path / "in"
|
||||
output_dir = tmp_path / "out"
|
||||
input_dir.mkdir()
|
||||
for n in range(1, 4):
|
||||
(input_dir / f"AD{n:06d}.jpg").touch()
|
||||
|
||||
writer = _mk_fake_writer()
|
||||
writer = _mk_fake_video_writer()
|
||||
frame = types.SimpleNamespace(shape=(480, 640, 3))
|
||||
mav_writer = MagicMock(write=MagicMock(), close=MagicMock())
|
||||
|
||||
def fake_runner(cmd):
|
||||
# Find the --fdr-out path and pre-populate it with 3 records.
|
||||
fdr_path = Path(cmd[cmd.index("--fdr-out") + 1])
|
||||
_write_jsonl(fdr_path, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
@@ -392,13 +89,13 @@ def test_build_p01_fixtures_end_to_end_with_mocks(tmp_path: Path):
|
||||
])
|
||||
return subprocess.CompletedProcess(cmd, 0)
|
||||
|
||||
cfg = bp.BuilderConfig(
|
||||
cfg = bp01.BuilderConfig(
|
||||
input_dir=input_dir, output_dir=output_dir,
|
||||
fc_kind="ardupilot", host="sitl-host",
|
||||
)
|
||||
|
||||
# Act
|
||||
result_dir = bp.build_p01_fixtures(
|
||||
result_dir = bp01.build_p01_fixtures(
|
||||
cfg,
|
||||
_runner=fake_runner,
|
||||
_video_writer_factory=lambda out, w, h: writer,
|
||||
@@ -420,7 +117,7 @@ def test_build_p01_fixtures_end_to_end_with_mocks(tmp_path: Path):
|
||||
|
||||
|
||||
def test_build_p01_fixtures_fewer_estimates_than_frames_pads_nulls(tmp_path: Path):
|
||||
# Arrange — 3 frames, FDR yields 1 estimate; expect 2 null entries.
|
||||
# Arrange — 3 frames, FDR yields 1 estimate; expect 2 null entries
|
||||
input_dir = tmp_path / "in"
|
||||
output_dir = tmp_path / "out"
|
||||
input_dir.mkdir()
|
||||
@@ -434,16 +131,16 @@ def test_build_p01_fixtures_fewer_estimates_than_frames_pads_nulls(tmp_path: Pat
|
||||
])
|
||||
return subprocess.CompletedProcess(cmd, 0)
|
||||
|
||||
cfg = bp.BuilderConfig(
|
||||
cfg = bp01.BuilderConfig(
|
||||
input_dir=input_dir, output_dir=output_dir,
|
||||
fc_kind="ardupilot", host="sitl-host",
|
||||
)
|
||||
|
||||
# Act
|
||||
bp.build_p01_fixtures(
|
||||
bp01.build_p01_fixtures(
|
||||
cfg,
|
||||
_runner=fake_runner,
|
||||
_video_writer_factory=lambda out, w, h: _mk_fake_writer(),
|
||||
_video_writer_factory=lambda out, w, h: _mk_fake_video_writer(),
|
||||
_imread=lambda p: types.SimpleNamespace(shape=(480, 640, 3)),
|
||||
_mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()),
|
||||
)
|
||||
@@ -456,7 +153,7 @@ def test_build_p01_fixtures_fewer_estimates_than_frames_pads_nulls(tmp_path: Pat
|
||||
|
||||
|
||||
def test_build_p01_fixtures_more_estimates_than_frames_truncates(tmp_path: Path, caplog):
|
||||
# Arrange — 2 frames, FDR yields 4 estimates; expect 2 retained + warn.
|
||||
# Arrange — 2 frames, FDR yields 4 estimates; expect 2 retained + WARN
|
||||
input_dir = tmp_path / "in"
|
||||
output_dir = tmp_path / "out"
|
||||
input_dir.mkdir()
|
||||
@@ -471,17 +168,17 @@ def test_build_p01_fixtures_more_estimates_than_frames_truncates(tmp_path: Path,
|
||||
])
|
||||
return subprocess.CompletedProcess(cmd, 0)
|
||||
|
||||
cfg = bp.BuilderConfig(
|
||||
cfg = bp01.BuilderConfig(
|
||||
input_dir=input_dir, output_dir=output_dir,
|
||||
fc_kind="ardupilot", host="sitl-host",
|
||||
)
|
||||
|
||||
# Act
|
||||
with caplog.at_level("WARNING"):
|
||||
bp.build_p01_fixtures(
|
||||
bp01.build_p01_fixtures(
|
||||
cfg,
|
||||
_runner=fake_runner,
|
||||
_video_writer_factory=lambda out, w, h: _mk_fake_writer(),
|
||||
_video_writer_factory=lambda out, w, h: _mk_fake_video_writer(),
|
||||
_imread=lambda p: types.SimpleNamespace(shape=(480, 640, 3)),
|
||||
_mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,749 @@
|
||||
"""Strategy-level unit tests for `e2e/fixtures/sitl_replay_builder/builder.py` (AZ-600).
|
||||
|
||||
These tests exercise the parameterized strategies + helpers in isolation.
|
||||
Per-scenario integration tests live next to each scenario builder
|
||||
(`test_sitl_replay_builder.py` for FT-P-01, `test_sitl_replay_builder_p02.py`
|
||||
for FT-P-02).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import subprocess
|
||||
import types
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from e2e.fixtures.sitl_replay_builder import builder as bd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _mk_fake_video_writer() -> MagicMock:
|
||||
w = MagicMock(name="VideoWriter")
|
||||
w.write = MagicMock()
|
||||
w.release = MagicMock()
|
||||
return w
|
||||
|
||||
|
||||
def _write_jsonl(path: Path, records: list[dict]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("\n".join(json.dumps(r) for r in records))
|
||||
|
||||
|
||||
_IMU_HEADER_ROW = (
|
||||
"timestamp(ms),Time,SCALED_IMU2.xacc,SCALED_IMU2.yacc,SCALED_IMU2.zacc,"
|
||||
"SCALED_IMU2.xgyro,SCALED_IMU2.ygyro,SCALED_IMU2.zgyro,"
|
||||
"SCALED_IMU2.xmag,SCALED_IMU2.ymag,SCALED_IMU2.zmag,"
|
||||
"GLOBAL_POSITION_INT.lat,GLOBAL_POSITION_INT.lon,GLOBAL_POSITION_INT.alt,"
|
||||
"GLOBAL_POSITION_INT.relative_alt,GLOBAL_POSITION_INT.vx,GLOBAL_POSITION_INT.vy,"
|
||||
"GLOBAL_POSITION_INT.vz,GLOBAL_POSITION_INT.hdg"
|
||||
)
|
||||
|
||||
|
||||
def _good_imu_row(ts_ms: float, hdg: int = 35041) -> list:
|
||||
return [
|
||||
ts_ms, 0.0,
|
||||
21, -3, -984,
|
||||
52, 32, -5,
|
||||
312, -1048, 442,
|
||||
50_080_963_4, 36_111_544_2, 141_290, 23_182,
|
||||
-4, -6, -88,
|
||||
hdg,
|
||||
]
|
||||
|
||||
|
||||
def _write_imu_csv(path: Path, rows: list[list]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", newline="", encoding="utf-8") as fp:
|
||||
fp.write(_IMU_HEADER_ROW + "\n")
|
||||
writer = csv.writer(fp)
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# StillImagesSource
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_still_images_source_empty_paths_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="image_paths is empty"):
|
||||
bd.StillImagesSource(image_paths=[]).materialize(
|
||||
tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda *a, **kw: _mk_fake_video_writer(),
|
||||
_imread=lambda p: None,
|
||||
)
|
||||
|
||||
|
||||
def test_still_images_source_writes_each_frame(tmp_path: Path):
|
||||
# Arrange
|
||||
writer = _mk_fake_video_writer()
|
||||
frame = types.SimpleNamespace(shape=(480, 640, 3))
|
||||
paths = [tmp_path / f"img-{i}.jpg" for i in range(3)]
|
||||
|
||||
# Act
|
||||
result = bd.StillImagesSource(image_paths=paths).materialize(
|
||||
tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda out, w, h: writer,
|
||||
_imread=lambda p: frame,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == tmp_path / "out.mp4"
|
||||
assert writer.write.call_count == 3
|
||||
assert writer.release.call_count == 1
|
||||
|
||||
|
||||
def test_still_images_source_failed_read_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
writer = _mk_fake_video_writer()
|
||||
frame_ok = types.SimpleNamespace(shape=(480, 640, 3))
|
||||
|
||||
def imread(path: Path):
|
||||
return None if str(path).endswith("img-1.jpg") else frame_ok
|
||||
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="failed to read .*img-1.jpg"):
|
||||
bd.StillImagesSource(
|
||||
image_paths=[tmp_path / f"img-{i}.jpg" for i in range(3)],
|
||||
).materialize(
|
||||
tmp_path / "out.mp4",
|
||||
_video_writer_factory=lambda out, w, h: writer,
|
||||
_imread=imread,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mp4PassthroughSource
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_mp4_passthrough_returns_real_path(tmp_path: Path):
|
||||
# Arrange
|
||||
mp4 = tmp_path / "flight.mp4"
|
||||
mp4.touch()
|
||||
|
||||
# Act
|
||||
result = bd.Mp4PassthroughSource(mp4_path=mp4).materialize(tmp_path / "ignored.mp4")
|
||||
|
||||
# Assert — pass-through returns the real path, not output_path
|
||||
assert result == mp4
|
||||
|
||||
|
||||
def test_mp4_passthrough_missing_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="MP4 not found"):
|
||||
bd.Mp4PassthroughSource(mp4_path=tmp_path / "missing.mp4").materialize(
|
||||
tmp_path / "ignored.mp4"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SyntheticStationaryTlog
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_synthetic_stationary_writes_pairs(tmp_path: Path):
|
||||
# Arrange
|
||||
writer = MagicMock(write=MagicMock(), close=MagicMock())
|
||||
|
||||
# Act
|
||||
result = bd.SyntheticStationaryTlog(duration_s=2, hz=10).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: writer,
|
||||
)
|
||||
|
||||
# Assert — 20 pairs × 2 messages = 40 writes
|
||||
assert result == tmp_path / "out.tlog"
|
||||
assert writer.write.call_count == 40
|
||||
assert writer.close.call_count == 1
|
||||
|
||||
|
||||
def test_synthetic_stationary_rejects_nonpositive_duration(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="duration_s must be positive"):
|
||||
bd.SyntheticStationaryTlog(duration_s=0).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_synthetic_stationary_rejects_nonpositive_hz(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="hz must be positive"):
|
||||
bd.SyntheticStationaryTlog(hz=0).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_synthetic_stationary_real_pymavlink_round_trip(tmp_path: Path):
|
||||
# Act — use the real pymavlink packers
|
||||
result = bd.SyntheticStationaryTlog(duration_s=1, hz=10).materialize(tmp_path / "out.tlog")
|
||||
|
||||
# Assert
|
||||
assert result.is_file()
|
||||
assert result.stat().st_size > 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ImuCsvTlog
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_imu_csv_tlog_missing_file_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="IMU CSV not found"):
|
||||
bd.ImuCsvTlog(csv_path=tmp_path / "missing.csv").materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_imu_csv_tlog_empty_raises(tmp_path: Path):
|
||||
# Arrange — header only, no data rows
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="IMU CSV is empty"):
|
||||
bd.ImuCsvTlog(csv_path=csv_path).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_imu_csv_tlog_missing_required_column_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
csv_path.write_text("timestamp(ms),Time\n0,0\n")
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="missing required columns"):
|
||||
bd.ImuCsvTlog(csv_path=csv_path).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_imu_csv_tlog_malformed_numeric_raises(tmp_path: Path):
|
||||
# Arrange — column 2 (xacc) is non-numeric
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
row = _good_imu_row(0.0)
|
||||
row[2] = "not-a-number"
|
||||
_write_imu_csv(csv_path, [row])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="malformed IMU CSV row"):
|
||||
bd.ImuCsvTlog(csv_path=csv_path).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_imu_csv_tlog_writes_pair_per_row(tmp_path: Path):
|
||||
# Arrange
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [_good_imu_row(0.0), _good_imu_row(100.0), _good_imu_row(200.0)])
|
||||
writer = MagicMock(write=MagicMock(), close=MagicMock())
|
||||
|
||||
# Act
|
||||
result = bd.ImuCsvTlog(csv_path=csv_path).materialize(
|
||||
tmp_path / "out.tlog", _mavlink_writer_factory=lambda out: writer,
|
||||
)
|
||||
|
||||
# Assert — 3 rows → 3 pairs → 6 writes
|
||||
assert result == tmp_path / "out.tlog"
|
||||
assert writer.write.call_count == 6
|
||||
assert writer.close.call_count == 1
|
||||
|
||||
|
||||
def test_imu_csv_tlog_real_pymavlink_round_trip(tmp_path: Path):
|
||||
# Arrange
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [_good_imu_row(0.0), _good_imu_row(100.0)])
|
||||
|
||||
# Act
|
||||
result = bd.ImuCsvTlog(csv_path=csv_path).materialize(tmp_path / "out.tlog")
|
||||
|
||||
# Assert
|
||||
assert result.is_file()
|
||||
assert result.stat().st_size > 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# hdg_centideg_to_rad
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"centideg,expected_rad",
|
||||
[
|
||||
(0, 0.0),
|
||||
(9000, math.pi / 2),
|
||||
(18000, math.pi),
|
||||
(27000, 3 * math.pi / 2),
|
||||
(35990, 35990 * math.pi / 18000),
|
||||
],
|
||||
)
|
||||
def test_hdg_centideg_to_rad(centideg: int, expected_rad: float):
|
||||
# Assert
|
||||
assert bd.hdg_centideg_to_rad(centideg) == pytest.approx(expected_rad)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# pack_raw_imu / pack_attitude
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_pack_raw_imu_returns_nonempty_bytes():
|
||||
# Act
|
||||
stationary = bd.pack_raw_imu(0, zacc=bd.STATIONARY_Z_ACCEL_MG)
|
||||
real_motion = bd.pack_raw_imu(100, xacc=21, yacc=-3, zacc=-984)
|
||||
|
||||
# Assert
|
||||
assert isinstance(stationary, (bytes, bytearray)) and len(stationary) > 0
|
||||
assert isinstance(real_motion, (bytes, bytearray)) and len(real_motion) > 0
|
||||
assert stationary != real_motion
|
||||
|
||||
|
||||
def test_pack_attitude_returns_nonempty_bytes():
|
||||
# Act
|
||||
zero_yaw = bd.pack_attitude(0)
|
||||
real_yaw = bd.pack_attitude(100, yaw=math.pi / 2)
|
||||
|
||||
# Assert
|
||||
assert isinstance(zero_yaw, (bytes, bytearray)) and len(zero_yaw) > 0
|
||||
assert isinstance(real_yaw, (bytes, bytearray)) and len(real_yaw) > 0
|
||||
assert zero_yaw != real_yaw
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_gps_denied_replay
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_builds_correct_cmd(tmp_path: Path):
|
||||
# Arrange
|
||||
captured: list[Sequence[str]] = []
|
||||
|
||||
def fake_runner(cmd):
|
||||
captured.append(list(cmd))
|
||||
return subprocess.CompletedProcess(args=cmd, returncode=0)
|
||||
|
||||
# Act
|
||||
bd.run_gps_denied_replay(
|
||||
tmp_path / "stills.mp4", tmp_path / "stationary.tlog",
|
||||
tmp_path / "fdr.jsonl", _runner=fake_runner,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(captured) == 1
|
||||
cmd = captured[0]
|
||||
assert cmd[0] == "gps-denied-replay"
|
||||
assert "--video" in cmd and str(tmp_path / "stills.mp4") in cmd
|
||||
assert "--tlog" in cmd and str(tmp_path / "stationary.tlog") in cmd
|
||||
assert "--time-offset-ms" in cmd and "0" in cmd
|
||||
assert "--fdr-out" in cmd and str(tmp_path / "fdr.jsonl") in cmd
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_creates_fdr_parent_dir(tmp_path: Path):
|
||||
# Arrange
|
||||
nested = tmp_path / "deep" / "nested" / "fdr.jsonl"
|
||||
|
||||
# Act
|
||||
bd.run_gps_denied_replay(
|
||||
tmp_path / "video.mp4", tmp_path / "tlog.tlog", nested,
|
||||
_runner=lambda c: subprocess.CompletedProcess(c, 0),
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert nested.parent.is_dir()
|
||||
|
||||
|
||||
def test_run_gps_denied_replay_passes_extra_args(tmp_path: Path):
|
||||
# Arrange
|
||||
captured: list[Sequence[str]] = []
|
||||
fake_runner = lambda c: (captured.append(list(c)) or subprocess.CompletedProcess(c, 0))
|
||||
|
||||
# Act
|
||||
bd.run_gps_denied_replay(
|
||||
tmp_path / "v.mp4", tmp_path / "t.tlog", tmp_path / "fdr.jsonl",
|
||||
extra_args=["--pace=ASAP", "--log-level=INFO"], _runner=fake_runner,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert "--pace=ASAP" in captured[0] and "--log-level=INFO" in captured[0]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_fdr_for_outbound_estimates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_fdr_missing_file_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="FDR JSONL not found"):
|
||||
bd.parse_fdr_for_outbound_estimates(tmp_path / "missing.jsonl")
|
||||
|
||||
|
||||
def test_parse_fdr_filters_by_kind(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "other", "payload": {"lat_deg": 99.0, "lon_deg": 99.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
{"kind": "another", "payload": {"x": 0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
estimates = bd.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert estimates == [
|
||||
{"lat_deg": 1.0, "lon_deg": 2.0},
|
||||
{"lat_deg": 3.0, "lon_deg": 4.0},
|
||||
]
|
||||
|
||||
|
||||
def test_parse_fdr_skips_missing_coords(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lon_deg": 2.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
estimates = bd.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert estimates == [{"lat_deg": 1.0, "lon_deg": 2.0}]
|
||||
|
||||
|
||||
def test_parse_fdr_custom_kind_and_keys(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "geo_estimate", "payload": {"latitude": 10.0, "longitude": 20.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
estimates = bd.parse_fdr_for_outbound_estimates(
|
||||
fdr, fdr_kind="geo_estimate", lat_key="latitude", lon_key="longitude",
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert estimates == [{"lat_deg": 10.0, "lon_deg": 20.0}]
|
||||
|
||||
|
||||
def test_parse_fdr_skips_blank_lines(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
'\n'
|
||||
+ json.dumps({"kind": "outbound_position_estimate",
|
||||
"payload": {"lat_deg": 1.0, "lon_deg": 2.0}})
|
||||
+ '\n\n'
|
||||
)
|
||||
|
||||
# Act
|
||||
estimates = bd.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert len(estimates) == 1
|
||||
|
||||
|
||||
def test_parse_fdr_malformed_json_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
json.dumps({"kind": "x", "payload": {}}) + "\n"
|
||||
+ "{not valid json\n"
|
||||
)
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="malformed FDR JSON at .*:2"):
|
||||
bd.parse_fdr_for_outbound_estimates(fdr)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# verify_fdr_has_estimates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_verify_fdr_missing_file_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="FDR JSONL not found"):
|
||||
bd.verify_fdr_has_estimates(tmp_path / "missing.jsonl")
|
||||
|
||||
|
||||
def test_verify_fdr_no_estimates_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"record_type": "other", "payload": {}},
|
||||
{"record_type": "imu_tick", "payload": {}},
|
||||
])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="zero estimate records"):
|
||||
bd.verify_fdr_has_estimates(fdr)
|
||||
|
||||
|
||||
def test_verify_fdr_counts_estimates(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
{"record_type": "other", "payload": {}},
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
])
|
||||
|
||||
# Act
|
||||
count = bd.verify_fdr_has_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert count == 3
|
||||
|
||||
|
||||
def test_verify_fdr_tolerates_malformed_lines(tmp_path: Path):
|
||||
# Arrange — one bad JSON line interleaved with good estimate records
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
json.dumps({"record_type": "estimate"}) + "\n"
|
||||
+ "{not valid json\n"
|
||||
+ json.dumps({"record_type": "estimate"}) + "\n"
|
||||
)
|
||||
|
||||
# Act
|
||||
count = bd.verify_fdr_has_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RawFdrPassthrough
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_raw_fdr_passthrough_verifies_by_default(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [{"record_type": "imu_tick"}])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="zero estimate records"):
|
||||
bd.RawFdrPassthrough().materialize(fdr, tmp_path, "ardupilot", "sitl-host")
|
||||
|
||||
|
||||
def test_raw_fdr_passthrough_skips_verify_when_disabled(tmp_path: Path):
|
||||
# Arrange — file with no estimates, verify disabled
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [{"record_type": "imu_tick"}])
|
||||
|
||||
# Act — should not raise
|
||||
bd.RawFdrPassthrough(verify_estimates=False).materialize(
|
||||
fdr, tmp_path, "ardupilot", "sitl-host",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OutboundMessagesProjection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_outbound_projection_writes_full_messages(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
bd.OutboundMessagesProjection(image_ids=["a.jpg", "b.jpg"]).materialize(
|
||||
fdr, tmp_path, "ardupilot", "sitl-host",
|
||||
)
|
||||
|
||||
# Assert
|
||||
payload = json.loads((tmp_path / "outbound_messages_ardupilot_sitl-host.json").read_text())
|
||||
assert payload == {
|
||||
"messages": [
|
||||
{"image_id": "a.jpg", "lat_deg": 1.0, "lon_deg": 2.0},
|
||||
{"image_id": "b.jpg", "lat_deg": 3.0, "lon_deg": 4.0},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_outbound_projection_pads_with_null(tmp_path: Path):
|
||||
# Arrange — 3 image_ids, FDR has only 1 estimate
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
])
|
||||
|
||||
# Act
|
||||
bd.OutboundMessagesProjection(image_ids=["a.jpg", "b.jpg", "c.jpg"]).materialize(
|
||||
fdr, tmp_path, "ardupilot", "sitl-host",
|
||||
)
|
||||
|
||||
# Assert
|
||||
payload = json.loads((tmp_path / "outbound_messages_ardupilot_sitl-host.json").read_text())
|
||||
assert payload["messages"][0]["lat_deg"] == 1.0
|
||||
assert payload["messages"][1] is None
|
||||
assert payload["messages"][2] is None
|
||||
|
||||
|
||||
def test_outbound_projection_truncates_and_warns(tmp_path: Path, caplog):
|
||||
# Arrange — 2 image_ids, FDR has 4 estimates
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": float(i), "lon_deg": float(i)}}
|
||||
for i in range(4)
|
||||
])
|
||||
|
||||
# Act
|
||||
with caplog.at_level("WARNING"):
|
||||
bd.OutboundMessagesProjection(image_ids=["a.jpg", "b.jpg"]).materialize(
|
||||
fdr, tmp_path, "ardupilot", "sitl-host",
|
||||
)
|
||||
|
||||
# Assert
|
||||
payload = json.loads((tmp_path / "outbound_messages_ardupilot_sitl-host.json").read_text())
|
||||
assert len(payload["messages"]) == 2
|
||||
assert any("truncating" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_outbound_projection_length_mismatch_safe(tmp_path: Path):
|
||||
# Arrange — projection always reconciles to image_ids count (no length mismatch raises)
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
])
|
||||
|
||||
# Act — single image, single estimate
|
||||
bd.OutboundMessagesProjection(image_ids=["only.jpg"]).materialize(
|
||||
fdr, tmp_path, "ardupilot", "sitl-host",
|
||||
)
|
||||
|
||||
# Assert
|
||||
payload = json.loads((tmp_path / "outbound_messages_ardupilot_sitl-host.json").read_text())
|
||||
assert payload["messages"] == [{"image_id": "only.jpg", "lat_deg": 1.0, "lon_deg": 2.0}]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# write_observer_fixture
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_observer_fixture_schema(tmp_path: Path):
|
||||
# Arrange
|
||||
out = tmp_path / "observer.json"
|
||||
|
||||
# Act
|
||||
bd.write_observer_fixture(out)
|
||||
|
||||
# Assert
|
||||
payload = json.loads(out.read_text())
|
||||
assert "gps_state" in payload
|
||||
assert payload["gps_state"]["primary_source"] == "MAV"
|
||||
assert "parameters" in payload
|
||||
|
||||
|
||||
def test_write_observer_fixture_creates_parent(tmp_path: Path):
|
||||
# Arrange
|
||||
out = tmp_path / "deep" / "nested" / "observer.json"
|
||||
|
||||
# Act
|
||||
bd.write_observer_fixture(out)
|
||||
|
||||
# Assert
|
||||
assert out.is_file()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_fixtures orchestrator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_fixtures_orchestrator_composes_strategies(tmp_path: Path):
|
||||
# Arrange — synthetic stationary tlog + still images + outbound projection
|
||||
input_dir = tmp_path / "in"
|
||||
output_dir = tmp_path / "out"
|
||||
input_dir.mkdir()
|
||||
image_paths = []
|
||||
for n in range(1, 3):
|
||||
p = input_dir / f"AD{n:06d}.jpg"
|
||||
p.touch()
|
||||
image_paths.append(p)
|
||||
|
||||
def fake_runner(cmd):
|
||||
fdr_path = Path(cmd[cmd.index("--fdr-out") + 1])
|
||||
_write_jsonl(fdr_path, [
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 1.0, "lon_deg": 2.0}},
|
||||
{"kind": "outbound_position_estimate", "payload": {"lat_deg": 3.0, "lon_deg": 4.0}},
|
||||
])
|
||||
return subprocess.CompletedProcess(cmd, 0)
|
||||
|
||||
cfg = bd.FixtureBuilderConfig(
|
||||
video_source=bd.StillImagesSource(image_paths=image_paths),
|
||||
tlog_source=bd.SyntheticStationaryTlog(duration_s=1, hz=10),
|
||||
fdr_projection=bd.OutboundMessagesProjection(
|
||||
image_ids=[p.name for p in image_paths],
|
||||
),
|
||||
output_dir=output_dir,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = bd.build_fixtures(
|
||||
cfg,
|
||||
_runner=fake_runner,
|
||||
_video_writer_factory=lambda out, w, h: _mk_fake_video_writer(),
|
||||
_imread=lambda p: types.SimpleNamespace(shape=(480, 640, 3)),
|
||||
_mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()),
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == output_dir
|
||||
assert (output_dir / "observer_ardupilot_sitl-host.json").is_file()
|
||||
payload = json.loads((output_dir / "outbound_messages_ardupilot_sitl-host.json").read_text())
|
||||
assert len(payload["messages"]) == 2
|
||||
|
||||
|
||||
def test_build_fixtures_uses_passthrough_video(tmp_path: Path):
|
||||
# Arrange — Mp4PassthroughSource returns the real path; verify the CLI got it
|
||||
mp4 = tmp_path / "flight.mp4"
|
||||
mp4.touch()
|
||||
output_dir = tmp_path / "out"
|
||||
captured_cmd: list = []
|
||||
|
||||
def fake_runner(cmd):
|
||||
captured_cmd.append(list(cmd))
|
||||
fdr_path = Path(cmd[cmd.index("--fdr-out") + 1])
|
||||
_write_jsonl(fdr_path, [{"record_type": "estimate"}])
|
||||
return subprocess.CompletedProcess(cmd, 0)
|
||||
|
||||
cfg = bd.FixtureBuilderConfig(
|
||||
video_source=bd.Mp4PassthroughSource(mp4_path=mp4),
|
||||
tlog_source=bd.SyntheticStationaryTlog(duration_s=1, hz=10),
|
||||
fdr_projection=bd.RawFdrPassthrough(verify_estimates=True),
|
||||
output_dir=output_dir,
|
||||
fdr_subdir="fdr", fdr_filename="fdr.jsonl",
|
||||
)
|
||||
|
||||
# Act
|
||||
bd.build_fixtures(
|
||||
cfg, _runner=fake_runner,
|
||||
_mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()),
|
||||
)
|
||||
|
||||
# Assert — the CLI received the real MP4 path, not output_dir/video.mp4
|
||||
assert str(mp4) in captured_cmd[0]
|
||||
@@ -1,19 +1,17 @@
|
||||
"""Unit tests for `e2e/fixtures/sitl_replay_builder/build_p02_fixtures.py` (AZ-599).
|
||||
"""Integration tests for `e2e/fixtures/sitl_replay_builder/build_p02_fixtures.py` (FT-P-02).
|
||||
|
||||
All external dependencies (pymavlink, subprocess) are injected via the
|
||||
underscore-prefixed parameters. The IMU CSV is small enough that we
|
||||
hand-author it inline for each test rather than depending on the real
|
||||
Derkachi data file.
|
||||
Strategy-level unit tests for the underlying ``builder.py`` machinery
|
||||
(including ``ImuCsvTlog``, ``Mp4PassthroughSource``, ``RawFdrPassthrough``,
|
||||
``verify_fdr_has_estimates``) live in ``test_sitl_replay_builder_builder.py``.
|
||||
This file exercises the FT-P-02 scenario composition end-to-end.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
@@ -21,7 +19,7 @@ import pytest
|
||||
import e2e.fixtures.sitl_replay_builder.build_p02_fixtures as bp02
|
||||
|
||||
|
||||
_HEADER_ROW = (
|
||||
_IMU_HEADER_ROW = (
|
||||
"timestamp(ms),Time,SCALED_IMU2.xacc,SCALED_IMU2.yacc,SCALED_IMU2.zacc,"
|
||||
"SCALED_IMU2.xgyro,SCALED_IMU2.ygyro,SCALED_IMU2.zgyro,"
|
||||
"SCALED_IMU2.xmag,SCALED_IMU2.ymag,SCALED_IMU2.zmag,"
|
||||
@@ -31,18 +29,7 @@ _HEADER_ROW = (
|
||||
)
|
||||
|
||||
|
||||
def _write_imu_csv(path: Path, rows: list[list]) -> None:
|
||||
"""Write a CSV with the full Derkachi header + the supplied data rows."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", newline="", encoding="utf-8") as fp:
|
||||
fp.write(_HEADER_ROW + "\n")
|
||||
writer = csv.writer(fp)
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def _good_row(ts_ms: float, hdg: int = 35041) -> list:
|
||||
"""One well-formed Derkachi row at `ts_ms` and heading `hdg` cdeg."""
|
||||
return [
|
||||
ts_ms, 0.0,
|
||||
21, -3, -984,
|
||||
@@ -54,110 +41,13 @@ def _good_row(ts_ms: float, hdg: int = 35041) -> list:
|
||||
]
|
||||
|
||||
|
||||
# convert_imu_csv_to_tlog
|
||||
|
||||
|
||||
def test_convert_imu_csv_missing_file_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="IMU CSV not found"):
|
||||
bp02.convert_imu_csv_to_tlog(
|
||||
tmp_path / "missing.csv", tmp_path / "out.tlog",
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_convert_imu_csv_empty_raises(tmp_path: Path):
|
||||
# Arrange — header only, no data rows
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="IMU CSV is empty"):
|
||||
bp02.convert_imu_csv_to_tlog(
|
||||
csv_path, tmp_path / "out.tlog",
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_convert_imu_csv_missing_required_column_raises(tmp_path: Path):
|
||||
# Arrange — header missing `GLOBAL_POSITION_INT.hdg`
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
csv_path.write_text("timestamp(ms),Time\n0,0\n")
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="missing required columns"):
|
||||
bp02.convert_imu_csv_to_tlog(
|
||||
csv_path, tmp_path / "out.tlog",
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_convert_imu_csv_malformed_numeric_raises(tmp_path: Path):
|
||||
# Arrange — second-to-last value (xacc) is non-numeric
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
row = _good_row(0.0)
|
||||
row[2] = "not-a-number"
|
||||
_write_imu_csv(csv_path, [row])
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="malformed IMU CSV row"):
|
||||
bp02.convert_imu_csv_to_tlog(
|
||||
csv_path, tmp_path / "out.tlog",
|
||||
_mavlink_writer_factory=lambda out: MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
def test_convert_imu_csv_writes_pair_per_row(tmp_path: Path):
|
||||
# Arrange
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [_good_row(0.0), _good_row(100.0), _good_row(200.0)])
|
||||
writer = MagicMock(write=MagicMock(), close=MagicMock())
|
||||
|
||||
# Act
|
||||
pairs = bp02.convert_imu_csv_to_tlog(
|
||||
csv_path, tmp_path / "out.tlog",
|
||||
_mavlink_writer_factory=lambda out: writer,
|
||||
)
|
||||
|
||||
# Assert — 3 rows → 3 pairs → 6 message writes
|
||||
assert pairs == 3
|
||||
assert writer.write.call_count == 6
|
||||
assert writer.close.call_count == 1
|
||||
|
||||
|
||||
def test_convert_imu_csv_real_pymavlink_round_trip(tmp_path: Path):
|
||||
"""Sanity-check the real packers; tlog file is well-formed."""
|
||||
# Arrange
|
||||
csv_path = tmp_path / "imu.csv"
|
||||
_write_imu_csv(csv_path, [_good_row(0.0), _good_row(100.0)])
|
||||
|
||||
# Act — use real pymavlink (it's in pyproject.toml deps)
|
||||
pairs = bp02.convert_imu_csv_to_tlog(csv_path, tmp_path / "out.tlog")
|
||||
|
||||
# Assert
|
||||
assert pairs == 2
|
||||
assert (tmp_path / "out.tlog").stat().st_size > 0
|
||||
|
||||
|
||||
# _hdg_centideg_to_rad
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"centideg,expected_rad",
|
||||
[
|
||||
(0, 0.0),
|
||||
(9000, math.pi / 2),
|
||||
(18000, math.pi),
|
||||
(27000, 3 * math.pi / 2),
|
||||
(35990, 35990 * math.pi / 18000),
|
||||
],
|
||||
)
|
||||
def test_hdg_centideg_to_rad(centideg: int, expected_rad: float):
|
||||
# Assert
|
||||
assert bp02._hdg_centideg_to_rad(centideg) == pytest.approx(expected_rad)
|
||||
|
||||
|
||||
# verify_fdr_has_estimates
|
||||
def _write_imu_csv(path: Path, rows: list[list]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", newline="", encoding="utf-8") as fp:
|
||||
fp.write(_IMU_HEADER_ROW + "\n")
|
||||
writer = csv.writer(fp)
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def _write_jsonl(path: Path, records: list[dict]) -> None:
|
||||
@@ -165,66 +55,54 @@ def _write_jsonl(path: Path, records: list[dict]) -> None:
|
||||
path.write_text("\n".join(json.dumps(r) for r in records))
|
||||
|
||||
|
||||
def test_verify_fdr_missing_file_raises(tmp_path: Path):
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="FDR JSONL not found"):
|
||||
bp02.verify_fdr_has_estimates(tmp_path / "missing.jsonl")
|
||||
# resolve_derkachi_inputs
|
||||
|
||||
|
||||
def test_verify_fdr_no_estimates_raises(tmp_path: Path):
|
||||
def test_resolve_derkachi_inputs_missing_video_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"record_type": "other", "payload": {}},
|
||||
{"record_type": "imu_tick", "payload": {}},
|
||||
])
|
||||
derkachi_dir = tmp_path / "derkachi"
|
||||
derkachi_dir.mkdir()
|
||||
(derkachi_dir / "data_imu.csv").write_text(_IMU_HEADER_ROW + "\n")
|
||||
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="zero estimate records"):
|
||||
bp02.verify_fdr_has_estimates(fdr)
|
||||
with pytest.raises(FileNotFoundError, match="Derkachi MP4 not found"):
|
||||
bp02.resolve_derkachi_inputs(derkachi_dir)
|
||||
|
||||
|
||||
def test_verify_fdr_counts_estimates(tmp_path: Path):
|
||||
def test_resolve_derkachi_inputs_missing_csv_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
_write_jsonl(fdr, [
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
{"record_type": "other", "payload": {}},
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
{"record_type": "estimate", "payload": {}},
|
||||
])
|
||||
derkachi_dir = tmp_path / "derkachi"
|
||||
derkachi_dir.mkdir()
|
||||
(derkachi_dir / "flight_derkachi.mp4").touch()
|
||||
|
||||
# Assert
|
||||
with pytest.raises(FileNotFoundError, match="Derkachi IMU CSV not found"):
|
||||
bp02.resolve_derkachi_inputs(derkachi_dir)
|
||||
|
||||
|
||||
def test_resolve_derkachi_inputs_returns_both(tmp_path: Path):
|
||||
# Arrange
|
||||
derkachi_dir = tmp_path / "derkachi"
|
||||
derkachi_dir.mkdir()
|
||||
(derkachi_dir / "flight_derkachi.mp4").touch()
|
||||
_write_imu_csv(derkachi_dir / "data_imu.csv", [])
|
||||
|
||||
# Act
|
||||
count = bp02.verify_fdr_has_estimates(fdr)
|
||||
mp4, csv_path = bp02.resolve_derkachi_inputs(derkachi_dir)
|
||||
|
||||
# Assert
|
||||
assert count == 3
|
||||
assert mp4 == derkachi_dir / "flight_derkachi.mp4"
|
||||
assert csv_path == derkachi_dir / "data_imu.csv"
|
||||
|
||||
|
||||
def test_verify_fdr_tolerates_malformed_lines(tmp_path: Path):
|
||||
# Arrange — one bad JSON line interleaved with good estimate records
|
||||
fdr = tmp_path / "fdr.jsonl"
|
||||
fdr.write_text(
|
||||
json.dumps({"record_type": "estimate"}) + "\n"
|
||||
+ "{not valid json\n"
|
||||
+ json.dumps({"record_type": "estimate"}) + "\n"
|
||||
)
|
||||
|
||||
# Act
|
||||
count = bp02.verify_fdr_has_estimates(fdr)
|
||||
|
||||
# Assert
|
||||
assert count == 2
|
||||
|
||||
|
||||
# build_p02_fixtures end-to-end (mocked)
|
||||
# build_p02_fixtures end-to-end
|
||||
|
||||
|
||||
def test_build_p02_missing_video_raises(tmp_path: Path):
|
||||
# Arrange
|
||||
derkachi_dir = tmp_path / "derkachi"
|
||||
derkachi_dir.mkdir()
|
||||
(derkachi_dir / "data_imu.csv").write_text(_HEADER_ROW + "\n")
|
||||
(derkachi_dir / "data_imu.csv").write_text(_IMU_HEADER_ROW + "\n")
|
||||
|
||||
cfg = bp02.P02BuilderConfig(
|
||||
derkachi_dir=derkachi_dir, output_dir=tmp_path / "out",
|
||||
@@ -286,7 +164,7 @@ def test_build_p02_end_to_end_with_mocks(tmp_path: Path):
|
||||
|
||||
|
||||
def test_build_p02_propagates_verify_failure(tmp_path: Path):
|
||||
# Arrange — fake runner writes an FDR with no estimates; default verifier raises.
|
||||
# Arrange — runner writes an FDR with no estimates; default RawFdrPassthrough raises
|
||||
derkachi_dir = tmp_path / "derkachi"
|
||||
derkachi_dir.mkdir()
|
||||
(derkachi_dir / "flight_derkachi.mp4").touch()
|
||||
@@ -308,17 +186,3 @@ def test_build_p02_propagates_verify_failure(tmp_path: Path):
|
||||
_runner=fake_runner,
|
||||
_mavlink_writer_factory=lambda out: MagicMock(write=MagicMock(), close=MagicMock()),
|
||||
)
|
||||
|
||||
|
||||
# `_common.py` is shared with b78
|
||||
|
||||
|
||||
def test_common_module_exports_used_by_b01():
|
||||
"""AC-5: b78 builder still imports from _common.py after refactor."""
|
||||
# Arrange
|
||||
import e2e.fixtures.sitl_replay_builder.build_p01_fixtures as bp01
|
||||
import e2e.fixtures.sitl_replay_builder._common as common
|
||||
|
||||
# Assert
|
||||
assert bp01.run_gps_denied_replay is common.run_gps_denied_replay
|
||||
assert bp01.write_observer_fixture is common.write_observer_fixture
|
||||
|
||||
@@ -58,7 +58,7 @@ E2E_ROOT = Path(__file__).resolve().parents[1]
|
||||
"runner/helpers/fc_proxy_runtime.py",
|
||||
"runner/helpers/replay_mode.py",
|
||||
"fixtures/sitl_replay_builder/__init__.py",
|
||||
"fixtures/sitl_replay_builder/_common.py",
|
||||
"fixtures/sitl_replay_builder/builder.py",
|
||||
"fixtures/sitl_replay_builder/build_p01_fixtures.py",
|
||||
"fixtures/sitl_replay_builder/build_p02_fixtures.py",
|
||||
"fixtures/sitl_replay_builder/README.md",
|
||||
|
||||
Reference in New Issue
Block a user