Files
Oleksandr Bezdieniezhnykh 1d260f7e41 [AZ-594] Implement core-three harness stubs (fdr_reader, frame_source_replay, imu_replay)
Replaces the NotImplementedError stubs AZ-406 reserved on three runner-
side helpers; these were stranded from any tracker ticket since
AZ-407/408 never came back to fill them. Concrete bodies:

* fdr_reader.iter_records: JSONL parser + wire-envelope validator;
  recursive *.jsonl walk; projects {schema_version, ts, producer_id,
  kind, payload} to runner-side FdrRecord with record_type/monotonic_ms
  renames; yields oldest-first.
* frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG
  re-encode; auto-detects file vs directory; injectable sleep_fn for
  unit-test pacing.
* imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians
  attitude conversion; tolerates scientific notation; same sleep_fn
  injection pattern.

Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31).
Existing scenario _harness_helpers_implemented probes still return False
because they also depend on sitl_observer / fc_proxy_runtime stubs that
remain pending; scenario probe cleanup is out of AZ-594 scope.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 08:42:12 +03:00

111 lines
3.6 KiB
Python

"""Replay `data_imu.csv` to the FC inbound at 10 Hz.
CSV schema (from `_docs/00_problem/input_data/flight_derkachi/data_imu.csv`):
timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg,baro_m
Numeric fields are accepted in any float-parseable form, including
scientific notation (``-4.44E-16``) — the AZ-408 source CSV uses that
form for near-zero values.
"""
from __future__ import annotations
import csv
import math
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Protocol
_REQUIRED_COLUMNS = (
"timestamp_ms",
"ax", "ay", "az",
"gx", "gy", "gz",
"roll_deg", "pitch_deg", "yaw_deg",
"baro_m",
)
@dataclass(frozen=True)
class ImuSample:
"""One row of `data_imu.csv` after parsing into native units."""
timestamp_ms: int
accel_mss: tuple[float, float, float]
gyro_rps: tuple[float, float, float]
attitude_rad: tuple[float, float, float] # roll, pitch, yaw (radians)
baro_alt_m: float
class FcInboundEmitter(Protocol):
"""Abstract emitter — concrete impls are MAVLink (AP) or MSP2 (iNav)."""
def emit(self, sample: ImuSample) -> None:
...
def _parse_row(row: dict[str, str], source: Path, line_no: int) -> ImuSample:
try:
return ImuSample(
timestamp_ms=int(round(float(row["timestamp_ms"]))),
accel_mss=(float(row["ax"]), float(row["ay"]), float(row["az"])),
gyro_rps=(float(row["gx"]), float(row["gy"]), float(row["gz"])),
attitude_rad=(
math.radians(float(row["roll_deg"])),
math.radians(float(row["pitch_deg"])),
math.radians(float(row["yaw_deg"])),
),
baro_alt_m=float(row["baro_m"]),
)
except (KeyError, ValueError) as exc:
raise ValueError(
f"IMU CSV row malformed at {source}:{line_no}: {exc}"
) from exc
class ImuReplayer:
"""Drives an `FcInboundEmitter` from a CSV file at the recorded cadence."""
def __init__(
self,
emitter: FcInboundEmitter,
rate_hz: float = 10.0,
*,
sleep_fn: Callable[[float], None] = time.sleep,
realtime: bool = True,
) -> None:
if rate_hz <= 0:
raise ValueError(f"rate_hz must be positive; got {rate_hz}")
self._emitter = emitter
self._rate_hz = rate_hz
self._sleep = sleep_fn
self._realtime = realtime
def replay(self, csv_path: Path) -> int:
"""Replay the CSV file. Returns the number of samples emitted.
Raises ``FileNotFoundError`` on missing CSV. Raises ``ValueError``
on missing columns or a row that does not parse. When ``realtime``
is True (default), sleeps ``1 / rate_hz`` seconds between
emissions; tests should pass ``realtime=False`` or inject a
no-op ``sleep_fn`` to keep the unit suite fast.
"""
if not csv_path.exists():
raise FileNotFoundError(f"IMU CSV not found: {csv_path}")
emitted = 0
period_s = 1.0 / self._rate_hz
with csv_path.open() as fh:
reader = csv.DictReader(fh)
missing = [c for c in _REQUIRED_COLUMNS if c not in (reader.fieldnames or [])]
if missing:
raise ValueError(
f"IMU CSV {csv_path} missing required columns: {missing}"
)
for line_no, row in enumerate(reader, start=2): # +1 for header line
sample = _parse_row(row, csv_path, line_no)
self._emitter.emit(sample)
emitted += 1
if self._realtime:
self._sleep(period_s)
return emitted