mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:01:13 +00:00
1d260f7e41
Replaces the NotImplementedError stubs AZ-406 reserved on three runner-
side helpers; these were stranded from any tracker ticket since
AZ-407/408 never came back to fill them. Concrete bodies:
* fdr_reader.iter_records: JSONL parser + wire-envelope validator;
recursive *.jsonl walk; projects {schema_version, ts, producer_id,
kind, payload} to runner-side FdrRecord with record_type/monotonic_ms
renames; yields oldest-first.
* frame_source_replay.replay_video: OpenCV VideoCapture decode + JPEG
re-encode; auto-detects file vs directory; injectable sleep_fn for
unit-test pacing.
* imu_replay.ImuReplayer.replay: csv.DictReader parse; degrees->radians
attitude conversion; tolerates scientific notation; same sleep_fn
injection pattern.
Adds 34 unit tests (14 + 10 + 10). Full e2e unit suite: 558 passed (+31).
Existing scenario _harness_helpers_implemented probes still return False
because they also depend on sitl_observer / fc_proxy_runtime stubs that
remain pending; scenario probe cleanup is out of AZ-594 scope.
Co-authored-by: Cursor <cursoragent@cursor.com>
111 lines
3.6 KiB
Python
111 lines
3.6 KiB
Python
"""Replay `data_imu.csv` to the FC inbound at 10 Hz.
|
|
|
|
CSV schema (from `_docs/00_problem/input_data/flight_derkachi/data_imu.csv`):
|
|
timestamp_ms,ax,ay,az,gx,gy,gz,roll_deg,pitch_deg,yaw_deg,baro_m
|
|
|
|
Numeric fields are accepted in any float-parseable form, including
|
|
scientific notation (``-4.44E-16``) — the AZ-408 source CSV uses that
|
|
form for near-zero values.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import math
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Callable, Protocol
|
|
|
|
_REQUIRED_COLUMNS = (
|
|
"timestamp_ms",
|
|
"ax", "ay", "az",
|
|
"gx", "gy", "gz",
|
|
"roll_deg", "pitch_deg", "yaw_deg",
|
|
"baro_m",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImuSample:
|
|
"""One row of `data_imu.csv` after parsing into native units."""
|
|
|
|
timestamp_ms: int
|
|
accel_mss: tuple[float, float, float]
|
|
gyro_rps: tuple[float, float, float]
|
|
attitude_rad: tuple[float, float, float] # roll, pitch, yaw (radians)
|
|
baro_alt_m: float
|
|
|
|
|
|
class FcInboundEmitter(Protocol):
|
|
"""Abstract emitter — concrete impls are MAVLink (AP) or MSP2 (iNav)."""
|
|
|
|
def emit(self, sample: ImuSample) -> None:
|
|
...
|
|
|
|
|
|
def _parse_row(row: dict[str, str], source: Path, line_no: int) -> ImuSample:
|
|
try:
|
|
return ImuSample(
|
|
timestamp_ms=int(round(float(row["timestamp_ms"]))),
|
|
accel_mss=(float(row["ax"]), float(row["ay"]), float(row["az"])),
|
|
gyro_rps=(float(row["gx"]), float(row["gy"]), float(row["gz"])),
|
|
attitude_rad=(
|
|
math.radians(float(row["roll_deg"])),
|
|
math.radians(float(row["pitch_deg"])),
|
|
math.radians(float(row["yaw_deg"])),
|
|
),
|
|
baro_alt_m=float(row["baro_m"]),
|
|
)
|
|
except (KeyError, ValueError) as exc:
|
|
raise ValueError(
|
|
f"IMU CSV row malformed at {source}:{line_no}: {exc}"
|
|
) from exc
|
|
|
|
|
|
class ImuReplayer:
|
|
"""Drives an `FcInboundEmitter` from a CSV file at the recorded cadence."""
|
|
|
|
def __init__(
|
|
self,
|
|
emitter: FcInboundEmitter,
|
|
rate_hz: float = 10.0,
|
|
*,
|
|
sleep_fn: Callable[[float], None] = time.sleep,
|
|
realtime: bool = True,
|
|
) -> None:
|
|
if rate_hz <= 0:
|
|
raise ValueError(f"rate_hz must be positive; got {rate_hz}")
|
|
self._emitter = emitter
|
|
self._rate_hz = rate_hz
|
|
self._sleep = sleep_fn
|
|
self._realtime = realtime
|
|
|
|
def replay(self, csv_path: Path) -> int:
|
|
"""Replay the CSV file. Returns the number of samples emitted.
|
|
|
|
Raises ``FileNotFoundError`` on missing CSV. Raises ``ValueError``
|
|
on missing columns or a row that does not parse. When ``realtime``
|
|
is True (default), sleeps ``1 / rate_hz`` seconds between
|
|
emissions; tests should pass ``realtime=False`` or inject a
|
|
no-op ``sleep_fn`` to keep the unit suite fast.
|
|
"""
|
|
if not csv_path.exists():
|
|
raise FileNotFoundError(f"IMU CSV not found: {csv_path}")
|
|
emitted = 0
|
|
period_s = 1.0 / self._rate_hz
|
|
with csv_path.open() as fh:
|
|
reader = csv.DictReader(fh)
|
|
missing = [c for c in _REQUIRED_COLUMNS if c not in (reader.fieldnames or [])]
|
|
if missing:
|
|
raise ValueError(
|
|
f"IMU CSV {csv_path} missing required columns: {missing}"
|
|
)
|
|
for line_no, row in enumerate(reader, start=2): # +1 for header line
|
|
sample = _parse_row(row, csv_path, line_no)
|
|
self._emitter.emit(sample)
|
|
emitted += 1
|
|
if self._realtime:
|
|
self._sleep(period_s)
|
|
return emitted
|