Files
gps-denied-onboard/e2e/fixtures/sitl_replay_builder/_common.py
T
Oleksandr Bezdieniezhnykh 4e0717e543 [AZ-599] Batch 79: FT-P-02 Derkachi builder + _common.py extraction
- Add build_p02_fixtures.py: IMU CSV → tlog conversion (RAW_IMU +
  ATTITUDE pairs, centidegrees→radians yaw) and orchestrator that
  runs gps-denied replay against Derkachi MP4 + generated tlog,
  verifying ≥1 record_type="estimate" in the FDR archive.
- Extract run_gps_denied_replay + FDR-parent-dir helpers into
  sitl_replay_builder/_common.py; refactor build_p01_fixtures.py
  to import from _common (b78 tests preserved).
- Add 20 unit tests under e2e/_unit_tests/fixtures/test_sitl_
  replay_builder_p02.py covering AC-1..AC-5; total unit suite
  686/686 passing (regression gate AC-6).
- README updated to document FT-P-01 + FT-P-02 builders.
- Advance autodev state: last_completed_batch=79, current_batch=80;
  prune verbose detail blob.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 13:40:07 +03:00

86 lines
2.8 KiB
Python

"""Shared helpers for the per-scenario fixture builders (AZ-599).
Both `build_p01_fixtures.py` (still-image FT-P-01) and
`build_p02_fixtures.py` (Derkachi FT-P-02) shell out to the production
`gps-denied-replay` CLI and write the same minimal `observer_*.json`
config; the helpers below live here so there's one canonical
implementation.
Future per-scenario builders (FT-P-04 / FT-P-05 / FT-P-10 / …) should
also import from this module.
"""
from __future__ import annotations
import json
import logging
import subprocess
from pathlib import Path
from typing import Callable, Sequence
_LOG = logging.getLogger(__name__)
DEFAULT_CLI_BIN = "gps-denied-replay"
def run_gps_denied_replay(
video: Path,
tlog: Path,
fdr_out: Path,
*,
cli_bin: str = DEFAULT_CLI_BIN,
time_offset_ms: int = 0,
extra_args: Sequence[str] = (),
_runner: Callable[[Sequence[str]], subprocess.CompletedProcess] | None = None,
) -> subprocess.CompletedProcess:
"""Run ``gps-denied-replay`` as a subprocess.
The `time_offset_ms` defaults to 0 because both b78 (synthetic
stationary tlog) and b79 (Derkachi real-motion tlog) intentionally
bypass auto-sync — b78 because there's no take-off signal, b79
because the IMU CSV is already aligned with the video. Operators
running this against truly independent tlog+video pairs SHOULD
omit ``time_offset_ms`` and let the production auto-sync run.
Raises ``subprocess.CalledProcessError`` on non-zero exit code.
The default subprocess runner can be swapped via ``_runner`` for
unit tests.
"""
fdr_out.parent.mkdir(parents=True, exist_ok=True)
cmd: list[str] = [
cli_bin,
"--video", str(video),
"--tlog", str(tlog),
"--time-offset-ms", str(time_offset_ms),
"--fdr-out", str(fdr_out),
*extra_args,
]
_LOG.info("running: %s", " ".join(cmd))
runner = _runner or (lambda c: subprocess.run(c, check=True, capture_output=True, text=True))
return runner(cmd)
def write_observer_fixture(output_path: Path) -> None:
"""Write minimal `observer_<fc_kind>_<host>.json` so `get_observer` succeeds.
Scenarios that only consume `wait_for_outbound` or `iter_records`
still trigger `sitl_observer.get_observer(...)` for construction.
Populate with safe defaults; scenarios that care about
`read_gps_state` carry their own observer fixtures.
"""
payload = {
"gps_state": {
"primary_source": "MAV",
"last_position_lat_deg": 0.0,
"last_position_lon_deg": 0.0,
"last_position_alt_m": 0.0,
"fix_quality": 3,
"horizontal_accuracy_m": 1.0,
"last_update_age_ms": 0,
},
"parameters": {},
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2))