Files
Oleksandr Bezdieniezhnykh 7fb3cb3f34 [AZ-600] Batch 80: refactor sitl_replay_builder to strategy pattern
Replace per-scenario fixture builders with a parameterized strategy
framework so future Derkachi-based scenarios compose existing pieces
instead of duplicating ~200 lines of orchestration per scenario.

New e2e/fixtures/sitl_replay_builder/builder.py:
- VideoSource ABC + StillImagesSource, Mp4PassthroughSource
- TlogSource ABC + SyntheticStationaryTlog, ImuCsvTlog
- FdrProjection ABC + RawFdrPassthrough, OutboundMessagesProjection
- FixtureBuilderConfig + build_fixtures(cfg) orchestrator
- Consolidated MAVLink pack_raw_imu / pack_attitude helpers
- Consolidated run_gps_denied_replay + write_observer_fixture

build_p01_fixtures.py: 423 -> 107 lines (75% reduction).
build_p02_fixtures.py: 292 -> 98 lines (66% reduction).
_common.py: deleted (folded into builder.py).

Tests reorganized:
- test_sitl_replay_builder_builder.py (new, 33 strategy-level tests)
- test_sitl_replay_builder.py (slimmed, 6 FT-P-01 integration)
- test_sitl_replay_builder_p02.py (slimmed, 7 FT-P-02 integration)

README documents the strategy framework + a worked example for
adding FT-P-04 in ~30 lines (no new strategy code required).

Regression gate: 700 passing (was 686; +14 from finer-grained
coverage of new strategy classes and the build_fixtures orchestrator).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 14:19:08 +03:00

115 lines
4.1 KiB
Python

"""FT-P-01 fixture builder (AZ-598; refactored to strategy pattern in AZ-600).
Composes the parameterized fixture-builder framework
(``e2e.fixtures.sitl_replay_builder.builder``) into the FT-P-01 scenario:
* Video source: 60 ``AD000NN.jpg`` still images encoded at ``fps``.
* Tlog source: synthetic stationary RAW_IMU + ATTITUDE pairs.
* FDR projection: parse ``outbound_position_estimate`` records and write
``outbound_messages_<fc_kind>_<host>.json`` (the FT-P-01 fixture shape).
This module is intentionally thin — strategy implementations + the
orchestrator live in ``builder.py``. Adding a new scenario typically only
requires writing a similar ~60-line config factory + CLI module.
"""
from __future__ import annotations
import argparse
import logging
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Sequence
from e2e.fixtures.sitl_replay_builder.builder import (
DEFAULT_CLI_BIN,
DEFAULT_FPS,
DEFAULT_TLOG_DURATION_S,
DEFAULT_TLOG_HZ,
FixtureBuilderConfig,
OutboundMessagesProjection,
StillImagesSource,
SyntheticStationaryTlog,
build_fixtures,
)
@dataclass(frozen=True)
class BuilderConfig:
"""Per-invocation FT-P-01 builder configuration."""
input_dir: Path
output_dir: Path
fc_kind: str
host: str
fps: float = DEFAULT_FPS
tlog_duration_s: int = DEFAULT_TLOG_DURATION_S
tlog_hz: int = DEFAULT_TLOG_HZ
cli_bin: str = DEFAULT_CLI_BIN
def resolve_p01_image_paths(input_dir: Path) -> list[Path]:
"""Return the ``AD000NN.jpg`` images under ``input_dir``, sorted by name."""
if not input_dir.is_dir():
raise FileNotFoundError(f"input dir not found: {input_dir}")
return sorted(input_dir.glob("AD??????.jpg"))
def build_p01_fixtures(
cfg: BuilderConfig,
*,
_runner: Callable[[Sequence[str]], subprocess.CompletedProcess] | None = None,
_video_writer_factory: Callable | None = None,
_imread: Callable | None = None,
_mavlink_writer_factory: Callable | None = None,
) -> Path:
"""End-to-end FT-P-01 fixture build. Returns the output directory."""
image_paths = resolve_p01_image_paths(cfg.input_dir)
if not image_paths:
raise FileNotFoundError(f"no AD??????.jpg images found under {cfg.input_dir}")
builder_cfg = FixtureBuilderConfig(
video_source=StillImagesSource(image_paths=image_paths, fps=cfg.fps),
tlog_source=SyntheticStationaryTlog(duration_s=cfg.tlog_duration_s, hz=cfg.tlog_hz),
fdr_projection=OutboundMessagesProjection(image_ids=[p.name for p in image_paths]),
output_dir=cfg.output_dir,
fc_kind=cfg.fc_kind, host=cfg.host, cli_bin=cfg.cli_bin,
video_filename="stills.mp4", tlog_filename="stationary.tlog",
fdr_subdir=".", fdr_filename="fdr.jsonl",
)
return build_fixtures(
builder_cfg,
_runner=_runner, _video_writer_factory=_video_writer_factory,
_imread=_imread, _mavlink_writer_factory=_mavlink_writer_factory,
)
def _main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="build_p01_fixtures",
description="Build FT-P-01 SITL replay fixtures via gps-denied-replay.",
)
parser.add_argument("--input-dir", type=Path, required=True,
help="Directory containing AD000001..AD000060.jpg")
parser.add_argument("--output-dir", type=Path, required=True,
help="Output dir for stills.mp4 + stationary.tlog + fixtures")
parser.add_argument("--fc-kind", choices=("ardupilot", "inav"), default="ardupilot")
parser.add_argument("--host", default="sitl-host")
parser.add_argument("--fps", type=float, default=DEFAULT_FPS)
parser.add_argument("--cli-bin", default=DEFAULT_CLI_BIN)
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO)
cfg = BuilderConfig(
input_dir=args.input_dir, output_dir=args.output_dir,
fc_kind=args.fc_kind, host=args.host, fps=args.fps, cli_bin=args.cli_bin,
)
build_p01_fixtures(cfg)
return 0
if __name__ == "__main__": # pragma: no cover
sys.exit(_main())