Files
Oleksandr Bezdieniezhnykh 7fb3cb3f34 [AZ-600] Batch 80: refactor sitl_replay_builder to strategy pattern
Replace per-scenario fixture builders with a parameterized strategy
framework so future Derkachi-based scenarios compose existing pieces
instead of duplicating ~200 lines of orchestration per scenario.

New e2e/fixtures/sitl_replay_builder/builder.py:
- VideoSource ABC + StillImagesSource, Mp4PassthroughSource
- TlogSource ABC + SyntheticStationaryTlog, ImuCsvTlog
- FdrProjection ABC + RawFdrPassthrough, OutboundMessagesProjection
- FixtureBuilderConfig + build_fixtures(cfg) orchestrator
- Consolidated MAVLink pack_raw_imu / pack_attitude helpers
- Consolidated run_gps_denied_replay + write_observer_fixture

build_p01_fixtures.py: 423 -> 107 lines (75% reduction).
build_p02_fixtures.py: 292 -> 98 lines (66% reduction).
_common.py: deleted (folded into builder.py).

Tests reorganized:
- test_sitl_replay_builder_builder.py (new, 33 strategy-level tests)
- test_sitl_replay_builder.py (slimmed, 6 FT-P-01 integration)
- test_sitl_replay_builder_p02.py (slimmed, 7 FT-P-02 integration)

README documents the strategy framework + a worked example for
adding FT-P-04 in ~30 lines (no new strategy code required).

Regression gate: 700 passing (was 686; +14 from finer-grained
coverage of new strategy classes and the build_fixtures orchestrator).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 14:19:08 +03:00

619 lines
21 KiB
Python

"""Parameterized fixture-builder framework for SITL replay scenarios (AZ-600).
The per-scenario fixture builders (`build_p01_fixtures.py`,
`build_p02_fixtures.py`, and future FT-P-04/05/07/08/10/11 builders) all
share the same shape:
1. Materialize a video file (MP4) from some source.
2. Materialize a tlog file from some source.
3. Run the production ``gps-denied-replay`` CLI against the pair.
4. Project the resulting FDR JSONL into the scenario's fixture shape.
5. Write the companion ``observer_<fc_kind>_<host>.json``.
Only steps 1, 2, and 4 vary across scenarios; the rest is shared. This
module exposes three strategy ABCs (``VideoSource``, ``TlogSource``,
``FdrProjection``) plus the four concrete impls used by FT-P-01 + FT-P-02,
and a single ``build_fixtures(cfg)`` orchestrator that composes them.
Adding a new scenario typically means writing a ~30-line config factory in
a thin per-scenario module (see ``build_p01_fixtures.py`` /
``build_p02_fixtures.py`` for working examples); no new strategy code is
required unless the scenario has a genuinely new video / tlog / FDR shape.
"""
from __future__ import annotations
import abc
import csv
import json
import logging
import math
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Iterator, Sequence
_LOG = logging.getLogger(__name__)
DEFAULT_CLI_BIN = "gps-denied-replay"
DEFAULT_FPS = 1.0
DEFAULT_TLOG_DURATION_S = 120
DEFAULT_TLOG_HZ = 200
DEFAULT_FDR_KIND = "outbound_position_estimate"
# Gravity in mg, used as the stationary z-accel sample (RAW_IMU is in mg).
STATIONARY_Z_ACCEL_MG = -9810
# ---------------------------------------------------------------------------
# Subprocess driver + observer-fixture writer (shared by every scenario)
# ---------------------------------------------------------------------------
def run_gps_denied_replay(
video: Path,
tlog: Path,
fdr_out: Path,
*,
cli_bin: str = DEFAULT_CLI_BIN,
time_offset_ms: int = 0,
extra_args: Sequence[str] = (),
_runner: Callable[[Sequence[str]], subprocess.CompletedProcess] | None = None,
) -> subprocess.CompletedProcess:
"""Run ``gps-denied-replay`` as a subprocess.
``time_offset_ms`` defaults to 0 because most synthetic / aligned-input
scenarios intentionally bypass auto-sync. Operators running this
against truly independent tlog+video pairs SHOULD omit it and let the
production auto-sync run.
Raises ``subprocess.CalledProcessError`` on non-zero exit code. The
default subprocess runner can be swapped via ``_runner`` for unit tests.
"""
fdr_out.parent.mkdir(parents=True, exist_ok=True)
cmd: list[str] = [
cli_bin,
"--video", str(video),
"--tlog", str(tlog),
"--time-offset-ms", str(time_offset_ms),
"--fdr-out", str(fdr_out),
*extra_args,
]
_LOG.info("running: %s", " ".join(cmd))
runner = _runner or (lambda c: subprocess.run(c, check=True, capture_output=True, text=True))
return runner(cmd)
def write_observer_fixture(output_path: Path) -> None:
"""Write the minimal ``observer_<fc_kind>_<host>.json`` ``get_observer`` needs.
Scenarios that only consume ``wait_for_outbound`` or ``iter_records``
still trigger ``sitl_observer.get_observer(...)`` for construction.
Populate with safe defaults; scenarios that care about
``read_gps_state`` ship their own observer fixtures.
"""
payload = {
"gps_state": {
"primary_source": "MAV",
"last_position_lat_deg": 0.0,
"last_position_lon_deg": 0.0,
"last_position_alt_m": 0.0,
"fix_quality": 3,
"horizontal_accuracy_m": 1.0,
"last_update_age_ms": 0,
},
"parameters": {},
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2))
# ---------------------------------------------------------------------------
# Parameterized MAVLink packers (shared by every TlogSource)
# ---------------------------------------------------------------------------
def pack_raw_imu(
time_usec: int,
*,
xacc: int = 0,
yacc: int = 0,
zacc: int = 0,
xgyro: int = 0,
ygyro: int = 0,
zgyro: int = 0,
) -> bytes:
"""Pack a RAW_IMU MAVLink frame (msg id 27).
All values pass-through to the MAVLink wire format. Stationary callers
use ``zacc=STATIONARY_Z_ACCEL_MG`` (≈ -9810 mg ≈ 1 g) to encode gravity.
"""
from pymavlink.dialects.v20 import ardupilotmega as mavlink
packer = mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1)
msg = mavlink.MAVLink_raw_imu_message(
time_usec=time_usec,
xacc=xacc, yacc=yacc, zacc=zacc,
xgyro=xgyro, ygyro=ygyro, zgyro=zgyro,
xmag=0, ymag=0, zmag=0,
id=0, temperature=0,
)
return msg.pack(packer)
def pack_attitude(
time_boot_ms: int,
*,
roll: float = 0.0,
pitch: float = 0.0,
yaw: float = 0.0,
) -> bytes:
"""Pack an ATTITUDE MAVLink frame (msg id 30)."""
from pymavlink.dialects.v20 import ardupilotmega as mavlink
packer = mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1)
msg = mavlink.MAVLink_attitude_message(
time_boot_ms=time_boot_ms,
roll=float(roll), pitch=float(pitch), yaw=float(yaw),
rollspeed=0.0, pitchspeed=0.0, yawspeed=0.0,
)
return msg.pack(packer)
def _default_mavlink_writer_factory(out: Path):
"""Return a pymavlink ``mavlogfile`` open for write."""
from pymavlink import mavutil
return mavutil.mavlogfile(str(out), write=True)
def hdg_centideg_to_rad(hdg_cdeg: float) -> float:
"""Convert centidegrees [0, 36000) to radians [0, 2pi)."""
return (hdg_cdeg * math.pi) / 18000.0
# ---------------------------------------------------------------------------
# VideoSource strategy
# ---------------------------------------------------------------------------
class VideoSource(abc.ABC):
"""Strategy: materialize the MP4 the replay CLI consumes."""
@abc.abstractmethod
def materialize(
self,
output_path: Path,
*,
_video_writer_factory: Callable | None = None,
_imread: Callable | None = None,
) -> Path:
"""Return the path of a ready-to-consume MP4.
Implementations may either write a new file at ``output_path`` (and
return ``output_path``) or pass through an already-existing MP4
(returning its real location, ignoring ``output_path``).
"""
@dataclass(frozen=True)
class StillImagesSource(VideoSource):
"""Encode a sequence of still images into an MP4 at ``fps``."""
image_paths: Sequence[Path]
fps: float = DEFAULT_FPS
def materialize(
self,
output_path: Path,
*,
_video_writer_factory: Callable | None = None,
_imread: Callable | None = None,
) -> Path:
if not self.image_paths:
raise FileNotFoundError(
"StillImagesSource: image_paths is empty; nothing to encode"
)
if _video_writer_factory is None or _imread is None:
import cv2
_imread = _imread or (lambda path: cv2.imread(str(path), cv2.IMREAD_COLOR))
if _video_writer_factory is None:
_fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = self.fps
def _video_writer_factory(out: Path, width: int, height: int):
return cv2.VideoWriter(str(out), _fourcc, fps, (width, height))
first_frame = _imread(self.image_paths[0])
if first_frame is None:
raise FileNotFoundError(
f"StillImagesSource: failed to read {self.image_paths[0]}"
)
height, width = first_frame.shape[:2]
output_path.parent.mkdir(parents=True, exist_ok=True)
writer = _video_writer_factory(output_path, width, height)
try:
writer.write(first_frame)
for path in self.image_paths[1:]:
frame = _imread(path)
if frame is None:
raise FileNotFoundError(
f"StillImagesSource: failed to read {path}"
)
writer.write(frame)
finally:
writer.release()
return output_path
@dataclass(frozen=True)
class Mp4PassthroughSource(VideoSource):
"""Use an already-existing MP4 (no copy, no encode)."""
mp4_path: Path
def materialize(self, output_path: Path, **_deps) -> Path:
if not self.mp4_path.is_file():
raise FileNotFoundError(f"Mp4PassthroughSource: MP4 not found: {self.mp4_path}")
return self.mp4_path
# ---------------------------------------------------------------------------
# TlogSource strategy
# ---------------------------------------------------------------------------
class TlogSource(abc.ABC):
"""Strategy: materialize the tlog the replay CLI consumes."""
@abc.abstractmethod
def materialize(
self,
output_path: Path,
*,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
"""Return the path of a ready-to-consume tlog."""
@dataclass(frozen=True)
class SyntheticStationaryTlog(TlogSource):
"""Write a tlog of zero-motion RAW_IMU + ATTITUDE pairs (z-accel = gravity)."""
duration_s: int = DEFAULT_TLOG_DURATION_S
hz: int = DEFAULT_TLOG_HZ
def materialize(
self,
output_path: Path,
*,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
if self.duration_s <= 0:
raise ValueError(f"duration_s must be positive; got {self.duration_s}")
if self.hz <= 0:
raise ValueError(f"hz must be positive; got {self.hz}")
factory = _mavlink_writer_factory or _default_mavlink_writer_factory
output_path.parent.mkdir(parents=True, exist_ok=True)
writer = factory(output_path)
try:
period_us = int(1_000_000 / self.hz)
total_pairs = self.duration_s * self.hz
for i in range(total_pairs):
time_us = i * period_us
writer.write(pack_raw_imu(time_us, zacc=STATIONARY_Z_ACCEL_MG))
writer.write(pack_attitude(time_us // 1000))
finally:
close = getattr(writer, "close", None)
if callable(close):
close()
return output_path
@dataclass(frozen=True)
class ImuCsvSchema:
"""Column-name map for a flight-recorded IMU CSV (Derkachi default)."""
timestamp_ms_col: str = "timestamp(ms)"
xacc_col: str = "SCALED_IMU2.xacc"
yacc_col: str = "SCALED_IMU2.yacc"
zacc_col: str = "SCALED_IMU2.zacc"
xgyro_col: str = "SCALED_IMU2.xgyro"
ygyro_col: str = "SCALED_IMU2.ygyro"
zgyro_col: str = "SCALED_IMU2.zgyro"
hdg_centideg_col: str = "GLOBAL_POSITION_INT.hdg"
@property
def required_columns(self) -> tuple[str, ...]:
return (
self.timestamp_ms_col, self.xacc_col, self.yacc_col, self.zacc_col,
self.xgyro_col, self.ygyro_col, self.zgyro_col, self.hdg_centideg_col,
)
DEFAULT_DERKACHI_IMU_SCHEMA = ImuCsvSchema()
@dataclass(frozen=True)
class ImuCsvTlog(TlogSource):
"""Convert a recorded IMU CSV to a tlog with real RAW_IMU + ATTITUDE values."""
csv_path: Path
schema: ImuCsvSchema = DEFAULT_DERKACHI_IMU_SCHEMA
def materialize(
self,
output_path: Path,
*,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
if not self.csv_path.is_file():
raise FileNotFoundError(f"IMU CSV not found: {self.csv_path}")
rows = list(self._iter_rows())
if not rows:
raise ValueError(f"IMU CSV is empty: {self.csv_path}")
factory = _mavlink_writer_factory or _default_mavlink_writer_factory
output_path.parent.mkdir(parents=True, exist_ok=True)
writer = factory(output_path)
try:
for index, row in enumerate(rows, start=1):
try:
ts_ms = float(row[self.schema.timestamp_ms_col])
xacc = int(float(row[self.schema.xacc_col]))
yacc = int(float(row[self.schema.yacc_col]))
zacc = int(float(row[self.schema.zacc_col]))
xgyro = int(float(row[self.schema.xgyro_col]))
ygyro = int(float(row[self.schema.ygyro_col]))
zgyro = int(float(row[self.schema.zgyro_col]))
hdg_cdeg = float(row[self.schema.hdg_centideg_col])
except (ValueError, KeyError) as exc:
raise ValueError(
f"malformed IMU CSV row at {self.csv_path} row#{index}: {exc}"
) from exc
yaw_rad = hdg_centideg_to_rad(hdg_cdeg)
writer.write(pack_raw_imu(
int(ts_ms * 1000),
xacc=xacc, yacc=yacc, zacc=zacc,
xgyro=xgyro, ygyro=ygyro, zgyro=zgyro,
))
writer.write(pack_attitude(int(ts_ms), yaw=yaw_rad))
finally:
close = getattr(writer, "close", None)
if callable(close):
close()
return output_path
def _iter_rows(self) -> Iterator[dict[str, str]]:
with self.csv_path.open("r", newline="", encoding="utf-8") as fp:
reader = csv.DictReader(fp)
if reader.fieldnames is None:
raise ValueError(f"IMU CSV missing header: {self.csv_path}")
missing = [c for c in self.schema.required_columns if c not in reader.fieldnames]
if missing:
raise ValueError(
f"IMU CSV {self.csv_path} missing required columns: {missing}"
)
yield from reader
# ---------------------------------------------------------------------------
# FdrProjection strategy
# ---------------------------------------------------------------------------
class FdrProjection(abc.ABC):
"""Strategy: translate the FDR JSONL into the scenario's fixture shape."""
@abc.abstractmethod
def materialize(
self,
fdr_jsonl: Path,
output_dir: Path,
fc_kind: str,
host: str,
) -> None:
"""Read ``fdr_jsonl`` and write any scenario-specific fixture artifacts."""
@dataclass(frozen=True)
class RawFdrPassthrough(FdrProjection):
"""Leave the FDR archive as-is; optionally assert it has ≥1 estimate record."""
verify_estimates: bool = True
def materialize(self, fdr_jsonl: Path, output_dir: Path, fc_kind: str, host: str) -> None:
if not self.verify_estimates:
return
count = verify_fdr_has_estimates(fdr_jsonl)
_LOG.info("FDR archive %s contains %d estimate records", fdr_jsonl, count)
@dataclass(frozen=True)
class OutboundMessagesProjection(FdrProjection):
"""Parse FDR ``outbound_position_estimate`` records into ``outbound_messages_*.json``."""
image_ids: Sequence[str] = field(default_factory=tuple)
fdr_kind: str = DEFAULT_FDR_KIND
lat_key: str = "lat_deg"
lon_key: str = "lon_deg"
def materialize(self, fdr_jsonl: Path, output_dir: Path, fc_kind: str, host: str) -> None:
raw_estimates = parse_fdr_for_outbound_estimates(
fdr_jsonl, fdr_kind=self.fdr_kind,
lat_key=self.lat_key, lon_key=self.lon_key,
)
estimates: list[dict | None] = list(raw_estimates[: len(self.image_ids)])
if len(raw_estimates) > len(self.image_ids):
_LOG.warning(
"FDR carried %d outbound estimates but only %d images were pushed; "
"truncating to the per-frame count",
len(raw_estimates), len(self.image_ids),
)
while len(estimates) < len(self.image_ids):
estimates.append(None)
output_path = output_dir / f"outbound_messages_{fc_kind}_{host}.json"
_write_outbound_messages_fixture(output_path, self.image_ids, estimates)
def parse_fdr_for_outbound_estimates(
fdr_path: Path,
*,
fdr_kind: str = DEFAULT_FDR_KIND,
lat_key: str = "lat_deg",
lon_key: str = "lon_deg",
) -> list[dict]:
"""Walk ``fdr_path`` (JSONL) and return outbound-estimate payloads in order."""
if not fdr_path.is_file():
raise FileNotFoundError(f"FDR JSONL not found: {fdr_path}")
out: list[dict] = []
with fdr_path.open("r", encoding="utf-8") as fp:
for line_no, line in enumerate(fp, start=1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError as exc:
raise ValueError(
f"malformed FDR JSON at {fdr_path}:{line_no}: {exc.msg}"
) from exc
if record.get("kind") != fdr_kind:
continue
payload = record.get("payload", {})
if not isinstance(payload, dict):
continue
if lat_key not in payload or lon_key not in payload:
continue
out.append({
"lat_deg": float(payload[lat_key]),
"lon_deg": float(payload[lon_key]),
})
return out
def verify_fdr_has_estimates(fdr_path: Path) -> int:
"""Return the count of ``record_type == "estimate"`` records in ``fdr_path``.
Raises ``ValueError`` if the file has zero such records — that means
the replay produced nothing useful for the scenario to analyze.
"""
if not fdr_path.is_file():
raise FileNotFoundError(f"FDR JSONL not found: {fdr_path}")
count = 0
with fdr_path.open("r", encoding="utf-8") as fp:
for line in fp:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if record.get("record_type") == "estimate":
count += 1
if count == 0:
raise ValueError(
f"FDR archive {fdr_path} contains zero estimate records; "
f"the replay did not produce any outbound estimates for the scenario to analyze"
)
return count
def _write_outbound_messages_fixture(
output_path: Path,
image_ids: Sequence[str],
estimates: Sequence[dict | None],
) -> None:
"""Write ``outbound_messages_<fc_kind>_<host>.json``.
``image_ids`` and ``estimates`` must have the same length. ``None``
entries in ``estimates`` are persisted as JSON ``null`` (timeout
markers); other entries must carry ``lat_deg``/``lon_deg``.
"""
if len(image_ids) != len(estimates):
raise ValueError(
f"length mismatch: {len(image_ids)} image_ids vs {len(estimates)} estimates"
)
messages: list[dict | None] = []
for image_id, estimate in zip(image_ids, estimates):
if estimate is None:
messages.append(None)
continue
messages.append({
"image_id": image_id,
"lat_deg": float(estimate["lat_deg"]),
"lon_deg": float(estimate["lon_deg"]),
})
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps({"messages": messages}, indent=2))
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class FixtureBuilderConfig:
"""Per-invocation config consumed by ``build_fixtures``."""
video_source: VideoSource
tlog_source: TlogSource
fdr_projection: FdrProjection
output_dir: Path
fc_kind: str = "ardupilot"
host: str = "sitl-host"
cli_bin: str = DEFAULT_CLI_BIN
video_filename: str = "video.mp4"
tlog_filename: str = "telemetry.tlog"
fdr_subdir: str = "fdr"
fdr_filename: str = "fdr.jsonl"
time_offset_ms: int = 0
def build_fixtures(
cfg: FixtureBuilderConfig,
*,
_runner: Callable[[Sequence[str]], subprocess.CompletedProcess] | None = None,
_video_writer_factory: Callable | None = None,
_imread: Callable | None = None,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
"""End-to-end fixture build. Returns the output directory.
Steps:
1. Ask the ``VideoSource`` to materialize the MP4.
2. Ask the ``TlogSource`` to materialize the tlog.
3. Run the production ``gps-denied-replay`` CLI against the pair.
4. Ask the ``FdrProjection`` to translate the FDR JSONL.
5. Write the companion observer fixture.
"""
cfg.output_dir.mkdir(parents=True, exist_ok=True)
fdr_jsonl = cfg.output_dir / cfg.fdr_subdir / cfg.fdr_filename
video = cfg.video_source.materialize(
cfg.output_dir / cfg.video_filename,
_video_writer_factory=_video_writer_factory, _imread=_imread,
)
tlog = cfg.tlog_source.materialize(
cfg.output_dir / cfg.tlog_filename,
_mavlink_writer_factory=_mavlink_writer_factory,
)
run_gps_denied_replay(
video, tlog, fdr_jsonl,
cli_bin=cfg.cli_bin, time_offset_ms=cfg.time_offset_ms, _runner=_runner,
)
cfg.fdr_projection.materialize(fdr_jsonl, cfg.output_dir, cfg.fc_kind, cfg.host)
write_observer_fixture(cfg.output_dir / f"observer_{cfg.fc_kind}_{cfg.host}.json")
return cfg.output_dir