Files
Oleksandr Bezdieniezhnykh 7fb3cb3f34 [AZ-600] Batch 80: refactor sitl_replay_builder to strategy pattern
Replace per-scenario fixture builders with a parameterized strategy
framework so future Derkachi-based scenarios compose existing pieces
instead of duplicating ~200 lines of orchestration per scenario.

New e2e/fixtures/sitl_replay_builder/builder.py:
- VideoSource ABC + StillImagesSource, Mp4PassthroughSource
- TlogSource ABC + SyntheticStationaryTlog, ImuCsvTlog
- FdrProjection ABC + RawFdrPassthrough, OutboundMessagesProjection
- FixtureBuilderConfig + build_fixtures(cfg) orchestrator
- Consolidated MAVLink pack_raw_imu / pack_attitude helpers
- Consolidated run_gps_denied_replay + write_observer_fixture

build_p01_fixtures.py: 423 -> 107 lines (75% reduction).
build_p02_fixtures.py: 292 -> 98 lines (66% reduction).
_common.py: deleted (folded into builder.py).

Tests reorganized:
- test_sitl_replay_builder_builder.py (new, 33 strategy-level tests)
- test_sitl_replay_builder.py (slimmed, 6 FT-P-01 integration)
- test_sitl_replay_builder_p02.py (slimmed, 7 FT-P-02 integration)

README documents the strategy framework + a worked example for
adding FT-P-04 in ~30 lines (no new strategy code required).

Regression gate: 700 passing (was 686; +14 from finer-grained
coverage of new strategy classes and the build_fixtures orchestrator).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 14:19:08 +03:00

107 lines
3.8 KiB
Python

"""FT-P-02 Derkachi fixture builder (AZ-599; refactored to strategy pattern in AZ-600).
Composes the parameterized fixture-builder framework
(``e2e.fixtures.sitl_replay_builder.builder``) into the FT-P-02 scenario:
* Video source: pass-through of the recorded ``flight_derkachi.mp4``.
* Tlog source: real-motion tlog converted from ``data_imu.csv`` rows
(10 Hz ``SCALED_IMU2`` accel/gyro + ``GLOBAL_POSITION_INT.hdg`` yaw;
roll/pitch=0 fixed-wing-cruise approximation).
* FDR projection: raw passthrough + assert ≥1 ``record_type=="estimate"``
record (the FT-P-02 scenario walks the FDR via ``fdr_reader.iter_records``).
This module is intentionally thin — strategy implementations + the
orchestrator live in ``builder.py``.
"""
from __future__ import annotations
import argparse
import logging
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Sequence
from e2e.fixtures.sitl_replay_builder.builder import (
DEFAULT_CLI_BIN,
FixtureBuilderConfig,
ImuCsvTlog,
Mp4PassthroughSource,
RawFdrPassthrough,
build_fixtures,
)
@dataclass(frozen=True)
class P02BuilderConfig:
"""Per-invocation Derkachi builder configuration."""
derkachi_dir: Path
output_dir: Path
fc_kind: str = "ardupilot"
host: str = "sitl-host"
cli_bin: str = DEFAULT_CLI_BIN
def resolve_derkachi_inputs(derkachi_dir: Path) -> tuple[Path, Path]:
"""Return ``(mp4_path, imu_csv_path)`` under ``derkachi_dir`` or raise."""
mp4 = derkachi_dir / "flight_derkachi.mp4"
csv_path = derkachi_dir / "data_imu.csv"
if not mp4.is_file():
raise FileNotFoundError(f"Derkachi MP4 not found: {mp4}")
if not csv_path.is_file():
raise FileNotFoundError(f"Derkachi IMU CSV not found: {csv_path}")
return mp4, csv_path
def build_p02_fixtures(
cfg: P02BuilderConfig,
*,
_runner: Callable[[Sequence[str]], subprocess.CompletedProcess] | None = None,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
"""End-to-end FT-P-02 fixture build. Returns the output directory."""
mp4, csv_path = resolve_derkachi_inputs(cfg.derkachi_dir)
builder_cfg = FixtureBuilderConfig(
video_source=Mp4PassthroughSource(mp4_path=mp4),
tlog_source=ImuCsvTlog(csv_path=csv_path),
fdr_projection=RawFdrPassthrough(verify_estimates=True),
output_dir=cfg.output_dir,
fc_kind=cfg.fc_kind, host=cfg.host, cli_bin=cfg.cli_bin,
video_filename="video_unused.mp4", # Mp4PassthroughSource returns mp4 directly
tlog_filename="derkachi.tlog",
fdr_subdir="fdr", fdr_filename="fdr.jsonl",
)
return build_fixtures(
builder_cfg, _runner=_runner, _mavlink_writer_factory=_mavlink_writer_factory,
)
def _main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="build_p02_fixtures",
description="Build FT-P-02 Derkachi replay fixtures via gps-denied-replay.",
)
parser.add_argument("--derkachi-dir", type=Path, required=True,
help="Directory containing flight_derkachi.mp4 + data_imu.csv")
parser.add_argument("--output-dir", type=Path, required=True,
help="Output dir for derkachi.tlog + fdr/ archive + observer fixture")
parser.add_argument("--fc-kind", choices=("ardupilot", "inav"), default="ardupilot")
parser.add_argument("--host", default="sitl-host")
parser.add_argument("--cli-bin", default=DEFAULT_CLI_BIN)
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO)
cfg = P02BuilderConfig(
derkachi_dir=args.derkachi_dir, output_dir=args.output_dir,
fc_kind=args.fc_kind, host=args.host, cli_bin=args.cli_bin,
)
build_p02_fixtures(cfg)
return 0
if __name__ == "__main__": # pragma: no cover
sys.exit(_main())