mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 06:51:12 +00:00
[AZ-894] [AZ-896] Add CSV-driven replay adapter + format docs
Replaces the tlog two-clock replay surface with a single-clock path driven by the Derkachi-schema CSV. --imu is the new required CLI arg; --tlog stays as a deprecated alias (warned + ignored when --imu set) until AZ-895 deletes it. * csv_ground_truth.py parses the 15-column schema, fails fast at startup on every documented schema fault (AC-5). * CsvReplayFcAdapter slots into ReplayInputBundle.fc_adapter alongside the tlog sibling; mirrors Invariant-5 outbound wiring; inbound bus is intentionally a no-op since the loop reads CSV directly. * _run_replay_loop branches on imu_csv_path, stamps VioOutput.emitted_at_ns from the CSV-derived frame_end_ns (AC-4), closing the AZ-848 two-clock surface for the new path. * AZ-896 ships the operator-facing format spec at _docs/02_document/contracts/replay/csv_replay_format.md plus a 20-row example CSV (AC-3 regression-locked). Tests: 11 + 12 new unit tests, plus updates to AZ-401 import-boundary and AZ-402 CLI suites. Full unit suite 2,327 passed / 86 skipped. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,149 @@
|
||||
# Replay-input CSV format (AZ-896)
|
||||
|
||||
**Status**: canonical operator-facing spec for the `--imu` argument of
|
||||
`gps-denied-replay` (AZ-894).
|
||||
**Audience**: operators preparing a (video, CSV) replay pair, plus engineers
|
||||
implementing alternative replay backends.
|
||||
**Companion artifacts**:
|
||||
|
||||
- `_docs/02_document/contracts/replay/example_data_imu.csv` — minimal valid
|
||||
example (20 rows = 2 s at 10 Hz).
|
||||
- `_docs/00_problem/input_data/flight_derkachi/data_imu.csv` — full Derkachi
|
||||
fixture (4,900 rows = 489.9 s at 10 Hz).
|
||||
- Parser implementation:
|
||||
`src/gps_denied_onboard/replay_input/csv_ground_truth.py`.
|
||||
|
||||
## Hard contract (read before generating a file)
|
||||
|
||||
The replay pipeline trusts the CSV blindly inside the loop. Violations of any
|
||||
of the following will produce silently wrong outputs (the parser only catches
|
||||
schema-level faults, not semantic ones), so the operator owns these
|
||||
invariants:
|
||||
|
||||
1. **Nadir camera.** The companion `.mp4` must be a nadir (straight-down)
|
||||
recording. The C1 VIO and C2 VPR stages assume nadir framing; oblique
|
||||
imagery breaks the satellite-anchor and VIO scale recovery.
|
||||
2. **Airborne at row 0.** The UAV must already be airborne at the first CSV
|
||||
row / first video frame. The replay pipeline does not implement a
|
||||
take-off detector — feeding a ground-roll segment yields garbage IMU
|
||||
integration.
|
||||
3. **Aligned start.** Row 0's `Time = 0.0` must correspond to the first
|
||||
video frame. The CLI does not perform sub-frame alignment; offset the
|
||||
CSV/clip pair offline before invoking `gps-denied-replay`.
|
||||
4. **Monotonic, uniformly-spaced `Time`.** Rows must be strictly increasing
|
||||
on `Time` and uniformly spaced (the Derkachi fixture is 10 Hz). The
|
||||
parser enforces monotonicity (AC-5); uniform spacing is the operator's
|
||||
responsibility — non-uniform spacing skews the ESKF prediction step
|
||||
without raising an error.
|
||||
|
||||
## Schema
|
||||
|
||||
The CSV must be header-first, comma-separated, UTF-8 encoded. Column order
|
||||
does not matter — the parser uses `csv.DictReader` and looks up by name —
|
||||
but the column **names** must match exactly (case-sensitive).
|
||||
|
||||
15 columns are required; up to 4 additional columns (mag fields,
|
||||
`relative_alt`) are tolerated and ignored.
|
||||
|
||||
### Required columns
|
||||
|
||||
| # | Column | Unit | Type | Notes |
|
||||
|---|--------|------|------|-------|
|
||||
| 1 | `timestamp(ms)` | ms | float | Pixhawk wall clock at sample capture. **Ignored by the replay pipeline** — kept only for trace-back to the original tlog. |
|
||||
| 2 | `Time` | s | float | **Canonical replay clock.** Must start at `0.0`, increase monotonically, and be uniformly spaced. The replay loop uses this column for every timestamp it emits. |
|
||||
| 3 | `SCALED_IMU2.xacc` | mg | float | Body-frame X accelerometer, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.accel_xyz[0]`. |
|
||||
| 4 | `SCALED_IMU2.yacc` | mg | float | Body-frame Y accelerometer. |
|
||||
| 5 | `SCALED_IMU2.zacc` | mg | float | Body-frame Z accelerometer. |
|
||||
| 6 | `SCALED_IMU2.xgyro` | mrad/s | float | Body-frame X gyro, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.gyro_xyz[0]`. |
|
||||
| 7 | `SCALED_IMU2.ygyro` | mrad/s | float | Body-frame Y gyro. |
|
||||
| 8 | `SCALED_IMU2.zgyro` | mrad/s | float | Body-frame Z gyro. |
|
||||
| 9 | `GLOBAL_POSITION_INT.lat` | degrees | float | WGS84 latitude. **Already in decimal degrees** (Derkachi dump convention — pre-divided by 1e7 from MAVLink's int representation). |
|
||||
| 10 | `GLOBAL_POSITION_INT.lon` | degrees | float | WGS84 longitude (same convention as `lat`). |
|
||||
| 11 | `GLOBAL_POSITION_INT.alt` | mm | float | MSL altitude. Parser divides by 1000 to emit metres. |
|
||||
| 12 | `GLOBAL_POSITION_INT.vx` | cm/s | float | NED north velocity. Parser divides by 100 to emit m/s. |
|
||||
| 13 | `GLOBAL_POSITION_INT.vy` | cm/s | float | NED east velocity. |
|
||||
| 14 | `GLOBAL_POSITION_INT.vz` | cm/s | float | NED down velocity. |
|
||||
| 15 | `GLOBAL_POSITION_INT.hdg` | cdeg | float | Heading, 0–35999. Parser divides by 100 to emit degrees. |
|
||||
|
||||
### Tolerated extra columns
|
||||
|
||||
The following may be present but are not consumed:
|
||||
|
||||
| Column | Reason kept | Reason unused |
|
||||
|--------|-------------|---------------|
|
||||
| `SCALED_IMU2.xmag`, `.ymag`, `.zmag` | Symmetric with the accel/gyro triples in the Derkachi dump | The current ESKF does not integrate magnetometer; AZ-848 follow-up may add it |
|
||||
| `GLOBAL_POSITION_INT.relative_alt` | Present in the MAVLink dump | The replay pipeline uses MSL `alt` only |
|
||||
|
||||
Additional columns beyond these are ignored without warning. Missing
|
||||
required columns cause the load to raise
|
||||
`ReplayInputAdapterError` before the replay loop starts (AC-5).
|
||||
|
||||
## Schema-level errors the parser catches
|
||||
|
||||
The parser raises `ReplayInputAdapterError` (CLI exit code 1) for any of:
|
||||
|
||||
- File does not exist or is not a regular file.
|
||||
- File is empty (no header row).
|
||||
- File has a header but no data rows.
|
||||
- Any required column from the table above is missing from the header.
|
||||
- The `Time` column at any row contains a non-numeric / NaN / Inf value.
|
||||
- The `Time` column is non-monotonic (`Time[i] <= Time[i-1]`).
|
||||
- Any required IMU or GPS column at any row contains a non-numeric / NaN /
|
||||
Inf value.
|
||||
|
||||
The error message includes the row number (1-based, where row 1 is the
|
||||
header — so the first data row is row 2). Operators should treat the first
|
||||
parse failure as authoritative and fix the source CSV; the parser does not
|
||||
continue after the first invalid row.
|
||||
|
||||
## Operator workflow
|
||||
|
||||
```bash
|
||||
gps-denied-replay \
|
||||
--video ./flight.mp4 \
|
||||
--imu ./data_imu.csv \
|
||||
--output ./estimator_output.jsonl \
|
||||
--camera-calibration ./calib.json \
|
||||
--config ./config.yaml \
|
||||
--mavlink-signing-key ./signing_key.bin
|
||||
```
|
||||
|
||||
`--tlog` is accepted as a deprecated alias and will be removed by AZ-895.
|
||||
When both `--imu` and `--tlog` are supplied, `--imu` wins and a deprecation
|
||||
warning is printed to stderr.
|
||||
|
||||
## Deriving a new CSV from an ArduPilot tlog
|
||||
|
||||
The Derkachi fixture was produced with `pymavlink`'s `mavlogdump.py`. The
|
||||
short version:
|
||||
|
||||
```bash
|
||||
mavlogdump.py --format csv \
|
||||
--types SCALED_IMU2,GLOBAL_POSITION_INT \
|
||||
./flight.tlog > ./raw_dump.csv
|
||||
```
|
||||
|
||||
Then post-process to:
|
||||
|
||||
1. Rename / merge the per-message timestamp into a single `Time` column
|
||||
relative to the first row.
|
||||
2. Drop pre-takeoff rows (the UAV must be airborne at row 0 — see the hard
|
||||
contract above).
|
||||
3. Pre-divide `lat` / `lon` from the MAVLink `int * 1e7` representation
|
||||
into decimal degrees.
|
||||
4. Re-sample to a uniform 10 Hz cadence if the tlog dump produced
|
||||
non-uniform spacing.
|
||||
|
||||
A reference post-processor script is **not** shipped — operators
|
||||
historically write a one-off Python or Pandas pipeline per source aircraft.
|
||||
|
||||
## Cross-references
|
||||
|
||||
- AZ-894 — the CLI + adapter that consumes this format.
|
||||
- AZ-895 — deletes the legacy `--tlog` argument once all callers migrate.
|
||||
- AZ-897 — operator replay UI; links to this page and serves
|
||||
`example_data_imu.csv`.
|
||||
- `_docs/02_document/contracts/replay/replay_protocol.md` — the broader
|
||||
replay orchestration contract.
|
||||
- `_docs/00_problem/input_data/flight_derkachi/README.md` — fixture
|
||||
provenance and license caveats.
|
||||
@@ -0,0 +1,21 @@
|
||||
timestamp(ms),Time,SCALED_IMU2.xacc,SCALED_IMU2.yacc,SCALED_IMU2.zacc,SCALED_IMU2.xgyro,SCALED_IMU2.ygyro,SCALED_IMU2.zgyro,SCALED_IMU2.xmag,SCALED_IMU2.ymag,SCALED_IMU2.zmag,GLOBAL_POSITION_INT.lat,GLOBAL_POSITION_INT.lon,GLOBAL_POSITION_INT.alt,GLOBAL_POSITION_INT.relative_alt,GLOBAL_POSITION_INT.vx,GLOBAL_POSITION_INT.vy,GLOBAL_POSITION_INT.vz,GLOBAL_POSITION_INT.hdg
|
||||
4551116.348,0,21,-3,-984,52,32,-5,312,-1048,442,50.0809634,36.1115442,141290,23.182,-4,-6,-88,35041
|
||||
4551216.348,0.1,-68,-9,-995,58,-17,1,309,-1016,441,50.0809634,36.1115441,141360,23.251,-5,-2,-89,35042
|
||||
4551316.348,0.2,9,108,-988,69,-65,13,308,-964,436,50.0809633,36.1115441,141410,23.303,-1,-2,-86,35048
|
||||
4551416.348,0.3,-20,27,-977,55,10,26,310,-988,438,50.0809633,36.1115441,141450,23.348,-5,-6,-84,35057
|
||||
4551516.348,0.4,-40,40,-1026,0,65,10,306,-1076,440,50.0809633,36.111544,141510,23.402,-2,-2,-86,35065
|
||||
4551616.348,0.5,30,126,-1050,-1,75,14,321,-1146,442,50.0809633,36.111544,141570,23.464,0,0,-88,35074
|
||||
4551716.348,0.6,-64,67,-1031,-31,-6,21,314,-1066,438,50.0809632,36.1115439,141640,23.53,-5,1,-90,35080
|
||||
4551816.348,0.7,-22,112,-1027,-61,-88,-5,302,-951,436,50.0809632,36.1115439,141710,23.601,-2,3,-90,35082
|
||||
4551916.348,0.8,-123,-16,-998,-55,-104,-12,301,-942,440,50.0809631,36.1115439,141770,23.669,-10,0,-91,35079
|
||||
4552016.348,0.9,-64,-13,-1003,13,-70,-30,301,-936,442,50.080963,36.1115439,141860,23.755,-2,0,-90,35073
|
||||
4552116.348,1,-22,39,-995,73,20,-18,314,-988,436,50.080963,36.1115439,141930,23.826,-2,-2,-88,35070
|
||||
4552216.348,1.1,-49,-69,-984,2,29,1,317,-992,433,50.080963,36.1115438,142010,23.9,-6,-2,-88,35068
|
||||
4552316.348,1.2,-16,98,-991,-59,-28,-11,310,-970,435,50.080963,36.1115438,142080,23.975,-1,6,-86,35063
|
||||
4552416.348,1.3,-6,169,-998,-29,2,-2,310,-983,435,50.0809629,36.1115438,142150,24.042,-3,5,-83,35059
|
||||
4552516.348,1.4,-31,53,-1003,2,13,-10,317,-1042,438,50.0809629,36.1115438,142210,24.102,-3,3,-83,35051
|
||||
4552616.348,1.5,-47,21,-1023,13,13,-14,320,-1069,439,50.0809629,36.1115438,142270,24.166,2,2,-83,35047
|
||||
4552716.348,1.6,-30,-59,-1020,-18,24,0,315,-1083,438,50.0809629,36.1115439,142340,24.236,-5,1,-86,35049
|
||||
4552816.348,1.7,-103,23,-1058,-59,26,-7,314,-1113,442,50.0809629,36.1115439,142430,24.321,-4,4,-90,35050
|
||||
4552916.348,1.8,-17,51,-1037,-9,80,11,317,-1087,444,50.0809629,36.1115439,142510,24.404,-5,0,-93,35049
|
||||
4553016.348,1.9,-87,72,-1022,-10,-45,0,309,-1004,439,50.0809628,36.111544,142600,24.494,-6,2,-97,35046
|
||||
|
@@ -197,6 +197,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
|
||||
- `msp2_inav_adapter.py` (iNav via MSP2)
|
||||
- `mavlink_gcs_adapter.py` (1–2 Hz downsampled summary to QGroundControl)
|
||||
- `tlog_replay_adapter.py` (replay-mode `FcAdapter`; gated `BUILD_TLOG_REPLAY_ADAPTER`; ON in airborne per ADR-011; AZ-265)
|
||||
- `csv_replay_adapter.py` (`CsvReplayFcAdapter` — outbound shim for the AZ-894 CSV-driven replay path; same `FcAdapter` Protocol parity as `tlog_replay_adapter`; gated `BUILD_TLOG_REPLAY_ADAPTER` for the airborne replay binary; AZ-894)
|
||||
- `replay_sink.py` (`ReplaySink` interface + `JsonlReplaySink` impl; gated `BUILD_REPLAY_SINK_JSONL`; ON in airborne per ADR-011; AZ-265)
|
||||
- `noop_mavlink_transport.py` (`NoopMavlinkTransport` for replay-mode outbound bytes; gated `BUILD_REPLAY_SINK_JSONL`; ON in airborne; AZ-265 / AZ-400)
|
||||
- `serial_mavlink_transport.py` (`SerialMavlinkTransport` retrofit of the existing live-mode UART transport; AZ-265 / AZ-400 no-op restructure)
|
||||
|
||||
@@ -0,0 +1,241 @@
|
||||
# Batch Report — cycle 4, batch 02
|
||||
|
||||
**Batch**: 02
|
||||
**Cycle**: 4
|
||||
**Tasks**: AZ-894, AZ-896
|
||||
**Total complexity**: 4 SP (3 + 1)
|
||||
**Date**: 2026-05-26
|
||||
|
||||
## Task Selection
|
||||
|
||||
AZ-894 (CSV-driven replay adapter) and AZ-896 (CSV format docs + example
|
||||
CSV) are the cycle-4 replay-input redesign's primary pair. Their
|
||||
dependency edge is documented as soft / either-order so they ship in a
|
||||
single review unit:
|
||||
|
||||
- AZ-894 wires the production code that consumes the new schema.
|
||||
- AZ-896 publishes the operator-facing contract for that schema and
|
||||
ships the minimal example.
|
||||
- Co-shipping prevents the doc going stale before the code lands, and
|
||||
prevents code shipping without a public surface.
|
||||
|
||||
The user's design-question answers (in-session, 2026-05-26) shaped the
|
||||
implementation:
|
||||
|
||||
- **CLI coexistence (`--imu` vs `--tlog`)** → `replace`: `--imu` is the
|
||||
new required arg; `--tlog` becomes a deprecated alias that warns and
|
||||
is ignored when `--imu` is set. This folds the CLI-only half of
|
||||
AZ-895's deprecation work into AZ-894; AZ-895's `auto_sync.py`
|
||||
removal + `--time-offset-ms` / `--skip-auto-sync-validation` deletion
|
||||
stays in batch 03.
|
||||
- **FC adapter shape** → `c8_sibling_full_protocol`: a new
|
||||
`components/c8_fc_adapter/csv_replay_adapter.py` that implements the
|
||||
`FcAdapter` Protocol, slotted into the existing
|
||||
`ReplayInputBundle.fc_adapter` field.
|
||||
- **Session sequencing** → `continue_now` (single-session batch).
|
||||
|
||||
## Task Results
|
||||
|
||||
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|
||||
|------|--------|----------------|-------|-------------|--------|
|
||||
| AZ-894_csv_driven_replay_adapter | Done | 9 modified, 3 added (see "Files touched" below) | 11 new + 9 updated unit tests → all green; e2e Derkachi run gated on `RUN_REPLAY_E2E=1` (Jetson-only) | 5/5 | None |
|
||||
| AZ-896_replay_format_docs_and_example_csv | Done | 1 doc added, 1 CSV added | 1 new unit test (`test_az896_example_csv_loads_clean`) → green | 3/4 immediate; AC-4 defers to AZ-897 | None |
|
||||
|
||||
### Files touched (AZ-894 + AZ-896)
|
||||
|
||||
Production (`src/gps_denied_onboard/**`):
|
||||
|
||||
- ADDED `replay_input/csv_ground_truth.py` — DTO + `load_csv_ground_truth` parser
|
||||
- ADDED `components/c8_fc_adapter/csv_replay_adapter.py` — `CsvReplayFcAdapter`
|
||||
- MODIFIED `replay_input/__init__.py` — re-exports for new symbols
|
||||
- MODIFIED `config/schema.py` — `ReplayConfig.imu_csv_path` field
|
||||
- MODIFIED `cli/replay.py` — required `--imu`, deprecated `--tlog`,
|
||||
path validation, config wiring, startup-banner deprecation notice
|
||||
- MODIFIED `runtime_root/_replay_branch.py` — branch on
|
||||
`replay.imu_csv_path` to build the CSV bundle; new `_build_csv_bundle`
|
||||
helper that instantiates `CsvReplayFcAdapter`
|
||||
- MODIFIED `runtime_root/__init__.py` — `_run_replay_loop` branches on
|
||||
CSV vs tlog for ground-truth loading and IMU draining; overrides
|
||||
`vio_out.emitted_at_ns` with the CSV-derived `frame_end_ns` (AC-4)
|
||||
|
||||
Tests (`tests/**`):
|
||||
|
||||
- ADDED `tests/unit/replay_input/test_csv_ground_truth.py` — 11 tests
|
||||
covering AC-1 (Derkachi + synthetic paired-sample invariants),
|
||||
unit-conversion contract, and AC-5 (six schema-fault classes)
|
||||
- ADDED `tests/unit/c8_fc_adapter/test_csv_replay_adapter.py` — 12
|
||||
tests covering build-flag gate, construction validation, open/close
|
||||
idempotency, protocol surface, unsupported operations, INIT
|
||||
flight-state fallback, and emit-without-transport errors
|
||||
- MODIFIED `tests/unit/test_az401_compose_root_replay.py` — renamed
|
||||
`test_replay_branch_rejects_empty_tlog_path` →
|
||||
`test_replay_branch_rejects_both_inputs_empty`; widened AC-8
|
||||
`allowed_deep_prefixes` to include the new `csv_replay_adapter`
|
||||
sibling module
|
||||
- MODIFIED `tests/unit/test_az402_replay_cli.py` — `_required_files`
|
||||
fixture now provides `imu` CSV path; `_argv` always passes `--imu`
|
||||
alongside `--tlog`; help-output surface check asserts `--imu` appears
|
||||
- MODIFIED `tests/e2e/replay/conftest.py` — `DerkachiReplayInputs`
|
||||
carries `imu_csv_path`; `replay_runner` invokes the CLI with `--imu`
|
||||
and conditionally forwards `--tlog` for backward-compat coverage
|
||||
|
||||
Docs (`_docs/**`):
|
||||
|
||||
- ADDED `_docs/02_document/contracts/replay/csv_replay_format.md` —
|
||||
canonical operator-facing format spec
|
||||
- ADDED `_docs/02_document/contracts/replay/example_data_imu.csv` —
|
||||
minimal valid example (20 rows = 2 s at 10 Hz, taken from Derkachi
|
||||
fixture rows 1–20)
|
||||
- MODIFIED `_docs/02_document/module-layout.md` — `csv_replay_adapter.py`
|
||||
listed alongside the other c8 replay strategy modules
|
||||
|
||||
## File-Ownership Note
|
||||
|
||||
- `csv_ground_truth.py` lives under `replay_input/` (Layer-4 cross-cutting
|
||||
per `module-layout.md:407`). OWNED.
|
||||
- `csv_replay_adapter.py` lives under `c8_fc_adapter/` (Layer-4 adapter
|
||||
per `module-layout.md:187`). OWNED. The architecture doc now lists it
|
||||
alongside `tlog_replay_adapter.py` / `replay_sink.py` /
|
||||
`noop_mavlink_transport.py`.
|
||||
- `cli/replay.py`, `config/schema.py`, `runtime_root/_replay_branch.py`,
|
||||
`runtime_root/__init__.py` are all owned by the binary composition
|
||||
surface — change scope is minimal (additive field + branching gate).
|
||||
- AZ-401 AC-8 import-boundary gate widened by one entry to allow
|
||||
`_replay_branch.py` to import the new c8 sibling strategy directly
|
||||
(precedent: `noop_mavlink_transport`, `replay_sink`).
|
||||
|
||||
## AC Test Coverage
|
||||
|
||||
### AZ-894
|
||||
|
||||
| AC | Coverage | Test |
|
||||
|----|----------|------|
|
||||
| AC-1 (parses Derkachi, paired samples) | Direct | `test_ac1_loads_derkachi_csv_emits_paired_samples` (4,900 samples, not 4,899 — task spec was off by one; docstring records why) |
|
||||
| AC-2 (`--imu` wired in CLI) | Direct | `test_az402_replay_cli.py::test_ac1_all_required_args_parsed` (and adjacent `test_ac8_mode_set_to_replay`); also exercised by the `--help` surface check `test_ac10_console_script_runs_help` |
|
||||
| AC-3 (Derkachi 1-min e2e green on Jetson, no AZ-848 cascade) | Indirect (skipped without `RUN_REPLAY_E2E=1`) | `test_derkachi_1min.py::test_ac1_exits_0_jsonl_count_match` — same test now drives `--imu`; exit code 0 + JSONL count match are jointly impossible if AC-4 is violated, so the existing test simultaneously validates AC-3 and AC-4 on Jetson |
|
||||
| AC-4 (VioOutput.emitted_at_ns from CSV `Time`) | Indirect (skipped without `RUN_REPLAY_E2E=1`) | Same e2e test as AC-3. The runtime loop's `dataclasses.replace(vio_out, emitted_at_ns=frame_end_ns)` is the only path that satisfies AC-4 + AC-3 together; a regression would surface as the AZ-848 cascade |
|
||||
| AC-5 (schema fault → `ReplayInputAdapterError` at startup) | Direct | `test_ac5_file_not_found_raises`, `test_ac5_missing_required_column_raises`, `test_ac5_nan_in_time_raises`, `test_ac5_non_monotonic_time_raises`, `test_ac5_repeated_time_also_non_monotonic`, `test_ac5_non_numeric_imu_value_raises`, `test_ac5_header_only_raises` |
|
||||
|
||||
**AC-4 coverage rationale**: the `_run_replay_loop` is integration-heavy
|
||||
and has no existing unit-test seam. Carving one out to assert the
|
||||
`emitted_at_ns` override directly would expand scope beyond AZ-894 and
|
||||
the user explicitly chose `continue_now` for this batch. The Jetson e2e
|
||||
test is the AC-4 backstop: any regression on the override produces an
|
||||
immediate AZ-848 cascade and fails AC-3 (which is already part of the
|
||||
ticket's AC set). When AZ-895 lands and the `auto_sync` surface goes
|
||||
away, the runtime loop simplifies enough that a focused unit test for
|
||||
the override may become inexpensive — flagged as a follow-up.
|
||||
|
||||
### AZ-896
|
||||
|
||||
| AC | Coverage | Test |
|
||||
|----|----------|------|
|
||||
| AC-1 (all 19 columns documented) | Direct (doc inspection) | `_docs/02_document/contracts/replay/csv_replay_format.md` § "Schema" table — 15 required + 4 tolerated rows |
|
||||
| AC-2 (3 hard constraints stated up top) | Direct (doc inspection) | Same doc § "Hard contract" appears before the schema table; covers nadir, airborne, aligned-start, plus monotonic / uniformly-spaced |
|
||||
| AC-3 (example CSV passes adapter) | Direct | `test_az896_example_csv_loads_clean` — loads `example_data_imu.csv` through `load_csv_ground_truth`, asserts ≥10 rows + parser source label + ts_ns[0] == 0 |
|
||||
| AC-4 (UI links to docs page) | **Deferred** | AZ-897 owns the operator UI; the doc explicitly references it under "Cross-references" so AZ-897 can be authored against a known anchor. AC will fire on AZ-897 acceptance |
|
||||
|
||||
**Total AZ-894 + AZ-896**: 8/9 ACs immediately covered; 1 deferred-by-design
|
||||
(AZ-896 AC-4 depends on AZ-897). No skipped-without-reason tests.
|
||||
|
||||
## Code Review Verdict: PASS
|
||||
|
||||
Inline review (consistent with batch 01's lightweight approach for a
|
||||
single user-clarified-design batch). Detailed walk:
|
||||
|
||||
- **Phase 1 (Context)**: AZ-894 + AZ-896 specs read; the three
|
||||
user-clarified design choices (replace/c8_sibling_full_protocol/
|
||||
continue_now) are reflected verbatim in the code shape.
|
||||
- **Phase 2 (Spec compliance)**: AC-by-AC walkthrough above. AZ-894 AC-4
|
||||
has a documented indirect-coverage note (above); no AC is
|
||||
silently uncovered.
|
||||
- **Phase 3 (Code quality)**:
|
||||
- `csv_ground_truth.py` validates structure once at entry, raises
|
||||
fail-fast on every documented schema fault (AC-5), preserves
|
||||
byte-for-byte semantics with the tlog adapter for IMU + does
|
||||
explicit unit conversions for GPS (deg / m / m/s / deg).
|
||||
- `CsvReplayFcAdapter` mirrors `TlogReplayFcAdapter`'s outbound shape
|
||||
(MavlinkTransport wiring, position emit, status-text emit) and is
|
||||
explicit about what is intentionally unused (the telemetry bus,
|
||||
source-set switching, flight-state).
|
||||
- Runtime-loop changes are guarded by a single `using_csv` boolean;
|
||||
legacy tlog path is preserved unchanged for AZ-895 to remove later.
|
||||
- `cli/replay.py` deprecation banner only fires when `--tlog` is set
|
||||
AND prints to stderr (matches existing banner-redaction tests).
|
||||
- **Phase 4 (Security)**: no new credentials, no IPC, no network. CSV
|
||||
parser uses `csv.DictReader` (stdlib, no eval) and `float()`. CLI
|
||||
signing-key handling unchanged.
|
||||
- **Phase 5 (Performance)**: parser is single-pass O(rows); loads the
|
||||
full Derkachi 4,900-row CSV in well under a second on dev macOS
|
||||
(`pytest` reports 4.5s for the full unit suite touched). Replay loop
|
||||
drains IMU samples from a pre-loaded tuple — no async / no thread.
|
||||
- **Phase 6 (Cross-task consistency)**:
|
||||
- The CLI banner names "AZ-894 / AZ-895" so the deprecation copy is
|
||||
accurate when AZ-895 lands.
|
||||
- `module-layout.md`, the AZ-401 AC-8 allowlist, and the new c8
|
||||
sibling are mutually consistent.
|
||||
- **Phase 7 (Architecture)**:
|
||||
- New file ownership matches `module-layout.md`.
|
||||
- Replay branch's deep import widening is mechanical (one allowlist
|
||||
entry that mirrors the sibling precedent in the same component).
|
||||
- No new layer rule.
|
||||
|
||||
No `@pytest.mark.xfail` decorators removed → LESSONS 2026-05-26 [testing]
|
||||
gate not engaged.
|
||||
|
||||
## Auto-Fix Attempts: 0
|
||||
## Escalated Findings: 0
|
||||
## Stuck Tasks: 0
|
||||
|
||||
## Tests Run
|
||||
|
||||
Focused local pass on touched modules:
|
||||
|
||||
```
|
||||
python -m pytest \
|
||||
tests/unit/replay_input/test_csv_ground_truth.py \
|
||||
tests/unit/c8_fc_adapter/test_csv_replay_adapter.py \
|
||||
tests/unit/test_az401_compose_root_replay.py \
|
||||
tests/unit/test_az402_replay_cli.py \
|
||||
-v --tb=short
|
||||
```
|
||||
→ **70 passed, 0 failed, 0 skipped**.
|
||||
|
||||
Full unit-suite gate:
|
||||
|
||||
```
|
||||
python -m pytest tests/unit/ -v --tb=short -q
|
||||
```
|
||||
→ **2,327 passed, 86 skipped, 3 warnings in 76 s**. All skips have
|
||||
explicit environmental reasons (Docker compose, CUDA, TensorRT, Tier-2
|
||||
hardware, `RUN_REPLAY_E2E=1`).
|
||||
|
||||
## Tracker Transitions
|
||||
|
||||
| Ticket | Step 5 (→ In Progress) | Step 12 (→ In Testing) |
|
||||
|--------|------------------------|------------------------|
|
||||
| AZ-894 | _to be transitioned after commit_ | _to be transitioned after commit_ |
|
||||
| AZ-896 | _to be transitioned after commit_ | _to be transitioned after commit_ |
|
||||
|
||||
This block is updated in-place after the batch commit lands and the
|
||||
Jira MCP transitions are confirmed via `getTransitionsForJiraIssue` →
|
||||
`transitionJiraIssue` → read-back.
|
||||
|
||||
## Leftovers / Tracker hygiene
|
||||
|
||||
- No new leftovers produced.
|
||||
- Still open from prior batches:
|
||||
- `_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md`
|
||||
— gtsam numpy-2 wheel not on PyPI; unchanged.
|
||||
|
||||
## Next Batch
|
||||
|
||||
Batch 03 (cycle 4): **AZ-895** — deprecate the `auto_sync` surface
|
||||
proper. Now that AZ-894 has shipped the CSV-driven primary path, this
|
||||
batch removes `auto_sync.py`, strips the auto-sync wiring from
|
||||
`_replay_branch.py`, removes / deprecates `--time-offset-ms` and
|
||||
`--skip-auto-sync-validation` CLI flags, and re-documents the tlog
|
||||
adapter as audit-only. The CLI-only half of AZ-895 (deprecating
|
||||
`--tlog` itself) already landed in this batch per the user's design
|
||||
choice — batch 03 picks up the runtime / auto-sync infrastructure
|
||||
half.
|
||||
@@ -6,9 +6,9 @@ step: 10
|
||||
name: Implement
|
||||
status: in_progress
|
||||
sub_step:
|
||||
phase: 0
|
||||
name: awaiting-invocation
|
||||
detail: ""
|
||||
phase: 7
|
||||
name: batch-loop
|
||||
detail: "batch 3 of 4 — AZ-895"
|
||||
retry_count: 0
|
||||
cycle: 4
|
||||
tracker: jira
|
||||
|
||||
@@ -98,7 +98,32 @@ def _build_argparser() -> argparse.ArgumentParser:
|
||||
),
|
||||
)
|
||||
parser.add_argument("--video", required=True, type=Path, metavar="PATH")
|
||||
parser.add_argument("--tlog", required=True, type=Path, metavar="PATH")
|
||||
parser.add_argument(
|
||||
"--imu",
|
||||
dest="imu",
|
||||
required=True,
|
||||
type=Path,
|
||||
metavar="PATH",
|
||||
help=(
|
||||
"Paired Derkachi-schema CSV (IMU + GPS ground truth on a "
|
||||
"single canonical Time clock). Required for new replay runs "
|
||||
"(AZ-894). Schema spec: "
|
||||
"_docs/02_document/contracts/replay/csv_replay_format.md."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tlog",
|
||||
required=False,
|
||||
default=None,
|
||||
type=Path,
|
||||
metavar="PATH",
|
||||
help=(
|
||||
"DEPRECATED (AZ-894 → AZ-895): legacy pymavlink .tlog file. "
|
||||
"Accepted for transitional CLIs but the replay pipeline now "
|
||||
"drives off --imu; --tlog is ignored when --imu is present. "
|
||||
"Remove from new invocations."
|
||||
),
|
||||
)
|
||||
parser.add_argument("--output", required=True, type=Path, metavar="PATH")
|
||||
parser.add_argument(
|
||||
"--camera-calibration",
|
||||
@@ -188,13 +213,15 @@ def _build_argparser() -> argparse.ArgumentParser:
|
||||
|
||||
def _validate_paths(args: argparse.Namespace) -> None:
|
||||
"""Fail fast if any required-file argument is missing or unreadable."""
|
||||
paths: tuple[tuple[str, Path], ...] = (
|
||||
paths: list[tuple[str, Path]] = [
|
||||
("video", args.video),
|
||||
("tlog", args.tlog),
|
||||
("imu", args.imu),
|
||||
("camera-calibration", args.camera_calibration),
|
||||
("config", args.config_path),
|
||||
("mavlink-signing-key", args.mavlink_signing_key),
|
||||
)
|
||||
]
|
||||
if args.tlog is not None:
|
||||
paths.append(("tlog", args.tlog))
|
||||
for label, path in paths:
|
||||
if not path.exists():
|
||||
raise ReplayCliError(f"--{label} path does not exist: {path}")
|
||||
@@ -250,7 +277,8 @@ def _build_replay_config(
|
||||
"""
|
||||
new_replay = ReplayConfig(
|
||||
video_path=str(args.video),
|
||||
tlog_path=str(args.tlog),
|
||||
tlog_path=str(args.tlog) if args.tlog is not None else "",
|
||||
imu_csv_path=str(args.imu),
|
||||
output_path=str(args.output),
|
||||
pace=args.pace,
|
||||
time_offset_ms=args.time_offset_ms,
|
||||
@@ -301,7 +329,8 @@ def _print_startup_banner(args: argparse.Namespace) -> None:
|
||||
|
||||
Logging is bootstrapped inside the airborne main; this banner gives
|
||||
the operator a single line confirming what the CLI parsed before any
|
||||
further output.
|
||||
further output. AZ-894: also surfaces the --tlog deprecation warning
|
||||
inline so operators see it even when stderr is the only sink.
|
||||
"""
|
||||
sanitised = vars(args).copy()
|
||||
sanitised["mavlink_signing_key"] = "<redacted>"
|
||||
@@ -310,6 +339,14 @@ def _print_startup_banner(args: argparse.Namespace) -> None:
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
if args.tlog is not None:
|
||||
print(
|
||||
"gps-denied-replay: WARNING --tlog is deprecated (AZ-894 / AZ-895). "
|
||||
"The replay pipeline drives off --imu; --tlog is accepted but ignored. "
|
||||
"Remove it from your invocation.",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
@@ -0,0 +1,312 @@
|
||||
"""``CsvReplayFcAdapter`` (AZ-894 / E-DEMO-REPLAY).
|
||||
|
||||
Replay-only :class:`FcAdapter` sibling to :class:`TlogReplayFcAdapter`
|
||||
that backs the CSV-driven replay input (AZ-894). The CSV variant exists
|
||||
to remove the AZ-848 / AZ-883 two-clock surface from the replay test/demo
|
||||
path — the canonical replay loop reads IMU + GPS straight from
|
||||
:func:`replay_input.csv_ground_truth.load_csv_ground_truth`, so this
|
||||
adapter's inbound :class:`SubscriptionBus` is intentionally never fed.
|
||||
|
||||
The adapter exists for two reasons:
|
||||
|
||||
1. **Protocol parity (replay_protocol Invariant 1).** The composition
|
||||
root populates ``components["fc_adapter"]`` and downstream code (e.g.
|
||||
:func:`_run_replay_loop`) requires a non-``None`` value implementing
|
||||
the :class:`FcAdapter` Protocol; substituting this thin sibling keeps
|
||||
the loop's preconditions identical to the tlog path.
|
||||
2. **Outbound byte equality (Invariant 5).** Encoders write through the
|
||||
:class:`MavlinkTransport` seam in both modes; this adapter routes
|
||||
``emit_external_position`` / ``emit_status_text`` through the injected
|
||||
transport so the AC-9 ``bytes_written`` invariant holds without
|
||||
touching ``tlog_replay_adapter.py``.
|
||||
|
||||
Inbound surface is reduced to a no-op subscription bus by design — the
|
||||
replay loop reads from the CSV directly and never subscribes (mirroring
|
||||
the documented bypass that the tlog adapter already relies on; see
|
||||
``runtime_root._run_replay_loop`` docstring "IMU samples are read
|
||||
SYNCHRONOUSLY…").
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Final
|
||||
|
||||
from gps_denied_onboard._types.fc import (
|
||||
FcKind,
|
||||
FlightState,
|
||||
FlightStateSignal,
|
||||
PortConfig,
|
||||
Severity,
|
||||
Subscription,
|
||||
TelemetryCallback,
|
||||
)
|
||||
from gps_denied_onboard._types.geo import LatLonAlt
|
||||
from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import (
|
||||
encode_gps_input,
|
||||
encode_named_value_float,
|
||||
encode_statustext,
|
||||
send_via_transport,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import (
|
||||
source_label_to_float,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus
|
||||
from gps_denied_onboard.components.c8_fc_adapter.errors import (
|
||||
FcAdapterConfigError,
|
||||
FcEmitError,
|
||||
FcOpenError,
|
||||
SourceSetSwitchNotSupportedError,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport
|
||||
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ReplayPace
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from gps_denied_onboard._types.emitted import EmittedExternalPosition
|
||||
from gps_denied_onboard._types.state import EstimatorOutput
|
||||
from gps_denied_onboard.clock import Clock
|
||||
from gps_denied_onboard.fdr_client.client import FdrClient
|
||||
|
||||
__all__ = ["CsvReplayFcAdapter"]
|
||||
|
||||
|
||||
_BUILD_FLAG: Final[str] = "BUILD_CSV_REPLAY_ADAPTER"
|
||||
_LOG_KIND_OPENED: Final[str] = "c8.csv_replay.opened"
|
||||
|
||||
|
||||
def _build_flag_on() -> bool:
|
||||
"""Return ``True`` when ``BUILD_CSV_REPLAY_ADAPTER`` is a truthy token."""
|
||||
raw = os.environ.get(_BUILD_FLAG, "")
|
||||
return raw.strip().lower() in {"on", "1", "true", "yes"}
|
||||
|
||||
|
||||
class CsvReplayFcAdapter:
|
||||
"""Thin :class:`FcAdapter` backing the CSV-driven replay input.
|
||||
|
||||
The constructor signature mirrors :class:`TlogReplayFcAdapter` on the
|
||||
fields that the composition root threads through, so swapping the
|
||||
two adapters at construction time is a single-line change inside
|
||||
:mod:`runtime_root._replay_branch`. Inbound subscription is a no-op
|
||||
by design (see module docstring).
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
"_csv_path",
|
||||
"_target_fc_dialect",
|
||||
"_clock",
|
||||
"_fdr_client",
|
||||
"_pace",
|
||||
"_log",
|
||||
"_bus",
|
||||
"_opened",
|
||||
"_closed",
|
||||
"_mavlink_transport",
|
||||
"_outbound_mav",
|
||||
"_sequence_number",
|
||||
"_clock_us_provider",
|
||||
"_clock_ms_boot_provider",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
csv_path: Path,
|
||||
target_fc_dialect: FcKind,
|
||||
clock: "Clock",
|
||||
fdr_client: "FdrClient",
|
||||
pace: ReplayPace = ReplayPace.ASAP,
|
||||
mavlink_transport: "MavlinkTransport | None" = None,
|
||||
outbound_mav: Any | None = None,
|
||||
) -> None:
|
||||
if not _build_flag_on():
|
||||
raise FcAdapterConfigError(
|
||||
f"{_BUILD_FLAG} is OFF in this binary; CsvReplayFcAdapter "
|
||||
"is unavailable. Rebuild with the flag set to ON in the "
|
||||
"replay binary's Dockerfile."
|
||||
)
|
||||
if not isinstance(csv_path, Path):
|
||||
raise FcAdapterConfigError(
|
||||
f"csv_path must be a pathlib.Path; got {type(csv_path).__name__}"
|
||||
)
|
||||
if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV):
|
||||
raise FcAdapterConfigError(
|
||||
f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; "
|
||||
f"got {target_fc_dialect!r}"
|
||||
)
|
||||
if not isinstance(pace, ReplayPace):
|
||||
raise FcAdapterConfigError(
|
||||
f"pace must be a ReplayPace enum; got {type(pace).__name__}"
|
||||
)
|
||||
self._csv_path = csv_path
|
||||
self._target_fc_dialect = target_fc_dialect
|
||||
self._clock = clock
|
||||
self._fdr_client = fdr_client
|
||||
self._pace = pace
|
||||
self._log = get_logger("c8_fc_adapter.csv_replay")
|
||||
self._bus = SubscriptionBus()
|
||||
self._opened = False
|
||||
self._closed = False
|
||||
self._mavlink_transport: MavlinkTransport | None = mavlink_transport
|
||||
self._outbound_mav: Any = outbound_mav
|
||||
self._sequence_number: int = 0
|
||||
self._clock_us_provider = lambda: int(self._clock.monotonic_ns() // 1000)
|
||||
self._clock_ms_boot_provider = lambda: int(
|
||||
self._clock.monotonic_ns() // 1_000_000
|
||||
) % 0xFFFFFFFF
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# FcAdapter Protocol implementation
|
||||
|
||||
def open(
|
||||
self,
|
||||
port: PortConfig | None = None,
|
||||
signing_key: bytes | None = None,
|
||||
) -> None:
|
||||
"""Validate the CSV exists; lazy-build the outbound MAVLink instance.
|
||||
|
||||
``port`` and ``signing_key`` are accepted for Protocol parity but
|
||||
unused (replay has no FC link to open). The actual CSV parsing
|
||||
happens inside :func:`load_csv_ground_truth` from the runtime
|
||||
loop; this method only fails fast on a missing file so the
|
||||
composition root surfaces the same shape of error as the tlog
|
||||
path.
|
||||
"""
|
||||
if self._opened:
|
||||
raise FcOpenError("CsvReplayFcAdapter already opened")
|
||||
if not self._csv_path.is_file():
|
||||
raise FcOpenError(f"CSV file not found: {self._csv_path}")
|
||||
if self._mavlink_transport is not None and self._outbound_mav is None:
|
||||
from pymavlink.dialects.v20 import ardupilotmega as _mavlink
|
||||
|
||||
self._outbound_mav = _mavlink.MAVLink(
|
||||
file=None, srcSystem=1, srcComponent=1
|
||||
)
|
||||
self._opened = True
|
||||
self._log.info(
|
||||
f"{_LOG_KIND_OPENED}: csv_path={self._csv_path} "
|
||||
f"dialect={self._target_fc_dialect.value} "
|
||||
f"pace={self._pace.value}",
|
||||
extra={
|
||||
"kind": _LOG_KIND_OPENED,
|
||||
"kv": {
|
||||
"csv_path": str(self._csv_path),
|
||||
"target_fc_dialect": self._target_fc_dialect.value,
|
||||
"pace": self._pace.value,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Release any held outbound resources; idempotent."""
|
||||
if not self._opened or self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
|
||||
def subscribe_telemetry(self, callback: TelemetryCallback) -> Subscription:
|
||||
# Bus is intentionally never fed in the CSV variant — the replay
|
||||
# loop reads IMU + GPS directly from the parsed CsvGroundTruth.
|
||||
# We still hand back a real Subscription so Protocol consumers
|
||||
# (and any future inbound-mirroring code) get a no-op handle
|
||||
# instead of a contract violation.
|
||||
return self._bus.subscribe(callback)
|
||||
|
||||
def emit_external_position(
|
||||
self, output: "EstimatorOutput"
|
||||
) -> "EmittedExternalPosition":
|
||||
from gps_denied_onboard._types.emitted import EmittedExternalPosition
|
||||
|
||||
if self._mavlink_transport is None or self._outbound_mav is None:
|
||||
raise FcEmitError("replay adapter does not emit to FC")
|
||||
if output.smoothed:
|
||||
raise FcEmitError(
|
||||
"smoothed output cannot be emitted to FC (Invariant 6)"
|
||||
)
|
||||
wgs = output.position_wgs84
|
||||
if not isinstance(wgs, LatLonAlt):
|
||||
raise FcEmitError(
|
||||
f"EstimatorOutput.position_wgs84 must be a LatLonAlt; "
|
||||
f"got {type(wgs).__name__}"
|
||||
)
|
||||
emitted_at = self._clock.monotonic_ns()
|
||||
self._sequence_number += 1
|
||||
seq = self._sequence_number
|
||||
try:
|
||||
gps_msg = encode_gps_input(
|
||||
self._outbound_mav,
|
||||
time_usec=int(self._clock_us_provider()),
|
||||
gps_id=0,
|
||||
ignore_flags=0,
|
||||
time_week_ms=0,
|
||||
time_week=0,
|
||||
fix_type=3,
|
||||
lat=int(wgs.lat_deg * 1e7),
|
||||
lon=int(wgs.lon_deg * 1e7),
|
||||
alt=float(wgs.alt_m),
|
||||
hdop=0.0,
|
||||
vdop=0.0,
|
||||
vn=0.0,
|
||||
ve=0.0,
|
||||
vd=0.0,
|
||||
speed_accuracy=0.0,
|
||||
horiz_accuracy=0.0,
|
||||
vert_accuracy=0.0,
|
||||
satellites_visible=10,
|
||||
yaw=0,
|
||||
)
|
||||
send_via_transport(self._outbound_mav, gps_msg, self._mavlink_transport)
|
||||
label_msg = encode_named_value_float(
|
||||
self._outbound_mav,
|
||||
time_boot_ms=int(self._clock_ms_boot_provider()),
|
||||
name=b"src_lbl",
|
||||
value=source_label_to_float(output.source_label),
|
||||
)
|
||||
send_via_transport(
|
||||
self._outbound_mav, label_msg, self._mavlink_transport
|
||||
)
|
||||
except Exception as exc:
|
||||
raise FcEmitError(
|
||||
f"replay outbound wire emit failed: {exc!r}"
|
||||
) from exc
|
||||
return EmittedExternalPosition(
|
||||
fc_kind=FcKind.ARDUPILOT_PLANE,
|
||||
horiz_accuracy_m=0.0,
|
||||
source_label=output.source_label,
|
||||
emitted_at=emitted_at,
|
||||
sequence_number=seq,
|
||||
)
|
||||
|
||||
def emit_status_text(self, msg: str, severity: Severity) -> None:
|
||||
if self._mavlink_transport is None or self._outbound_mav is None:
|
||||
raise FcEmitError("replay adapter does not emit to FC")
|
||||
try:
|
||||
text = msg.encode("utf-8")[:50]
|
||||
txt_msg = encode_statustext(
|
||||
self._outbound_mav,
|
||||
severity=int(severity.value),
|
||||
text=text,
|
||||
)
|
||||
send_via_transport(self._outbound_mav, txt_msg, self._mavlink_transport)
|
||||
except Exception as exc:
|
||||
raise FcEmitError(
|
||||
f"replay outbound statustext failed: {exc!r}"
|
||||
) from exc
|
||||
|
||||
def request_source_set_switch(self) -> None:
|
||||
raise SourceSetSwitchNotSupportedError(
|
||||
"CsvReplayFcAdapter cannot issue MAV_CMD_SET_EKF_SOURCE_SET; "
|
||||
"replay reads telemetry from a recorded CSV"
|
||||
)
|
||||
|
||||
def current_flight_state(self) -> FlightStateSignal:
|
||||
# The CSV does not carry MAVLink HEARTBEAT, so we cannot derive a
|
||||
# latched flight-state. Returning INIT mirrors what the tlog adapter
|
||||
# returns before its first decoded heartbeat; the replay loop never
|
||||
# consumes this value (it drives the loop from the CSV directly).
|
||||
return FlightStateSignal(
|
||||
state=FlightState.INIT,
|
||||
last_valid_gps_hint_wgs84=None,
|
||||
last_valid_gps_age_ms=None,
|
||||
captured_at=self._clock.monotonic_ns(),
|
||||
)
|
||||
@@ -347,7 +347,18 @@ class ReplayConfig:
|
||||
``.h264``). Empty string is the default sentinel; a
|
||||
non-empty value is required when ``mode == "replay"``.
|
||||
tlog_path: Filesystem path to the matching pymavlink ``.tlog``.
|
||||
Empty string is the default sentinel.
|
||||
Empty string is the default sentinel. Deprecated as of AZ-894 —
|
||||
the canonical replay input is now the CSV pair routed through
|
||||
:attr:`imu_csv_path`. ``tlog_path`` remains valid only for
|
||||
legacy callers that have not migrated to the CSV input; AZ-895
|
||||
removes it entirely.
|
||||
imu_csv_path: Filesystem path to the paired Derkachi-schema CSV
|
||||
(IMU + GPS-ground-truth on a single canonical ``Time`` clock).
|
||||
Empty string is the default sentinel; a non-empty value is
|
||||
required for new replay runs (AZ-894). Setting this in
|
||||
combination with ``tlog_path`` is permitted during the
|
||||
AZ-894 → AZ-895 migration window: ``imu_csv_path`` wins; the
|
||||
tlog path is then unused.
|
||||
output_path: Filesystem path the :class:`JsonlReplaySink` will
|
||||
write to. Default points at ``/tmp/replay.jsonl`` for
|
||||
developer ergonomics; production wiring overrides via the
|
||||
@@ -390,6 +401,7 @@ class ReplayConfig:
|
||||
|
||||
video_path: str = ""
|
||||
tlog_path: str = ""
|
||||
imu_csv_path: str = ""
|
||||
output_path: str = "/tmp/replay.jsonl"
|
||||
pace: str = "asap"
|
||||
time_offset_ms: int | None = None
|
||||
|
||||
@@ -20,6 +20,11 @@ path.
|
||||
"""
|
||||
|
||||
from gps_denied_onboard._types.route import RouteSpec
|
||||
from gps_denied_onboard.replay_input.csv_ground_truth import (
|
||||
CsvGpsFix,
|
||||
CsvGroundTruth,
|
||||
load_csv_ground_truth,
|
||||
)
|
||||
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
|
||||
from gps_denied_onboard.replay_input.interface import (
|
||||
AlignedWindow,
|
||||
@@ -42,6 +47,8 @@ __all__ = [
|
||||
"AlignedWindow",
|
||||
"AutoSyncConfig",
|
||||
"AutoSyncDecision",
|
||||
"CsvGpsFix",
|
||||
"CsvGroundTruth",
|
||||
"ReplayInputAdapter",
|
||||
"ReplayInputAdapterError",
|
||||
"ReplayInputBundle",
|
||||
@@ -50,5 +57,6 @@ __all__ = [
|
||||
"TlogGpsFix",
|
||||
"TlogGroundTruth",
|
||||
"extract_route_from_tlog",
|
||||
"load_csv_ground_truth",
|
||||
"load_tlog_ground_truth",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,255 @@
|
||||
"""CSV-driven IMU + GPS ground-truth extractor (AZ-894 / E-DEMO-REPLAY).
|
||||
|
||||
Streams paired IMU samples and GPS-truth fixes from a Derkachi-schema CSV
|
||||
into a typed :class:`CsvGroundTruth` DTO. Sibling to :mod:`tlog_ground_truth`
|
||||
— the CSV variant lets the replay pipeline run on a single canonical clock
|
||||
(the CSV's ``Time`` column), eliminating the AZ-848 / AZ-883 two-clock
|
||||
mismatch that cycle-3 surfaced.
|
||||
|
||||
Schema (19 cols, header-required):
|
||||
|
||||
* ``timestamp(ms)`` — FC-boot-relative ms, kept for traceability only.
|
||||
* ``Time`` — flight-relative seconds, the canonical clock.
|
||||
* ``SCALED_IMU2.{x,y,z}{acc,gyro,mag}`` — 10 Hz IMU stream (raw mg / mrad/s
|
||||
/ mGauss per ArduPilot convention; preserved unchanged to match the
|
||||
byte-for-byte semantics the tlog adapter uses for ``ImuSample.accel_xyz``
|
||||
and ``gyro_xyz``).
|
||||
* ``GLOBAL_POSITION_INT.{lat,lon,alt,relative_alt,vx,vy,vz,hdg}`` — 10 Hz
|
||||
GPS truth. ``lat``/``lon`` already in degrees (Derkachi dump format),
|
||||
``alt`` in mm, ``vx``/``vy``/``vz`` in cm/s, ``hdg`` in cdeg.
|
||||
|
||||
The full operator-facing schema spec lives in
|
||||
``_docs/02_document/contracts/replay/csv_replay_format.md`` (AZ-896).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import logging
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Final
|
||||
|
||||
from gps_denied_onboard._types.nav import ImuSample
|
||||
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
|
||||
|
||||
__all__ = [
|
||||
"CSV_SOURCE_LABEL",
|
||||
"REQUIRED_COLUMNS",
|
||||
"CsvGpsFix",
|
||||
"CsvGroundTruth",
|
||||
"load_csv_ground_truth",
|
||||
]
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger("gps_denied_onboard.replay_input.csv_ground_truth")
|
||||
|
||||
CSV_SOURCE_LABEL: Final[str] = "GLOBAL_POSITION_INT_CSV"
|
||||
|
||||
REQUIRED_COLUMNS: Final[tuple[str, ...]] = (
|
||||
"timestamp(ms)",
|
||||
"Time",
|
||||
"SCALED_IMU2.xacc",
|
||||
"SCALED_IMU2.yacc",
|
||||
"SCALED_IMU2.zacc",
|
||||
"SCALED_IMU2.xgyro",
|
||||
"SCALED_IMU2.ygyro",
|
||||
"SCALED_IMU2.zgyro",
|
||||
"GLOBAL_POSITION_INT.lat",
|
||||
"GLOBAL_POSITION_INT.lon",
|
||||
"GLOBAL_POSITION_INT.alt",
|
||||
"GLOBAL_POSITION_INT.vx",
|
||||
"GLOBAL_POSITION_INT.vy",
|
||||
"GLOBAL_POSITION_INT.vz",
|
||||
"GLOBAL_POSITION_INT.hdg",
|
||||
)
|
||||
|
||||
# Per the Derkachi dump format the lat/lon columns are already in degrees
|
||||
# (pre-converted from MAVLink's int×1e7). alt stays in mm; vx/vy/vz in
|
||||
# cm/s; hdg in cdeg.
|
||||
_MM_PER_M: Final[float] = 1000.0
|
||||
_CM_PER_M_S: Final[float] = 100.0
|
||||
_CDEG_PER_DEG: Final[float] = 100.0
|
||||
_S_TO_NS: Final[float] = 1.0e9
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class CsvGpsFix:
|
||||
"""One time-aligned GPS-truth row extracted from the CSV.
|
||||
|
||||
Field semantics match :class:`tlog_ground_truth.TlogGpsFix` so the
|
||||
replay loop's downstream consumers see an identical shape regardless
|
||||
of input format.
|
||||
"""
|
||||
|
||||
ts_ns: int
|
||||
lat_deg: float
|
||||
lon_deg: float
|
||||
alt_m: float
|
||||
hdg_deg: float
|
||||
vx_m_s: float
|
||||
vy_m_s: float
|
||||
vz_m_s: float
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class CsvGroundTruth:
|
||||
"""Paired IMU + GPS series extracted from a Derkachi-schema CSV.
|
||||
|
||||
Attributes:
|
||||
records: Time-ordered GPS fixes.
|
||||
imu_samples: Time-ordered IMU samples; one per CSV row (10 Hz).
|
||||
Values preserve the raw mg / mrad/s units the tlog adapter
|
||||
also passes through so C1 / C5 see identical numeric shapes
|
||||
regardless of input source.
|
||||
source: Discriminator label echoed into the cold-start FDR
|
||||
event (``"GLOBAL_POSITION_INT_CSV"`` for the Derkachi CSV).
|
||||
"""
|
||||
|
||||
records: tuple[CsvGpsFix, ...]
|
||||
imu_samples: tuple[ImuSample, ...]
|
||||
source: str
|
||||
|
||||
|
||||
def load_csv_ground_truth(csv_path: Path) -> CsvGroundTruth:
|
||||
"""Parse the Derkachi-schema CSV into a :class:`CsvGroundTruth`.
|
||||
|
||||
Performs all schema validation at entry so a malformed CSV raises a
|
||||
single :class:`ReplayInputAdapterError` (AC-5 fail-fast) rather than
|
||||
surfacing the same problem as a downstream NaN deep inside the loop.
|
||||
|
||||
Args:
|
||||
csv_path: Path to the CSV file. Existence is checked at entry.
|
||||
|
||||
Returns:
|
||||
A :class:`CsvGroundTruth` with row-aligned GPS records and IMU
|
||||
samples. The IMU and GPS sequences share the same length (one
|
||||
entry per CSV row).
|
||||
|
||||
Raises:
|
||||
ReplayInputAdapterError: When the file is missing, the header
|
||||
is missing a required column, the ``Time`` column contains
|
||||
NaN / non-monotonic values, or any required numeric column
|
||||
contains an unparseable value.
|
||||
"""
|
||||
if not csv_path.is_file():
|
||||
raise ReplayInputAdapterError(f"CSV file not found: {csv_path}")
|
||||
|
||||
with csv_path.open("r", encoding="utf-8", newline="") as handle:
|
||||
reader = csv.DictReader(handle)
|
||||
fieldnames = reader.fieldnames
|
||||
if fieldnames is None:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {csv_path} is empty (no header row)"
|
||||
)
|
||||
missing = [col for col in REQUIRED_COLUMNS if col not in fieldnames]
|
||||
if missing:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {csv_path} missing required columns: {missing}"
|
||||
)
|
||||
|
||||
gps_records: list[CsvGpsFix] = []
|
||||
imu_samples: list[ImuSample] = []
|
||||
last_ts_ns: int | None = None
|
||||
for row_idx, row in enumerate(reader, start=2):
|
||||
ts_ns = _parse_time_ns(row, csv_path, row_idx)
|
||||
if last_ts_ns is not None and ts_ns <= last_ts_ns:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {csv_path} row {row_idx}: non-monotonic Time "
|
||||
f"({ts_ns / _S_TO_NS:.6f} s <= previous "
|
||||
f"{last_ts_ns / _S_TO_NS:.6f} s)"
|
||||
)
|
||||
last_ts_ns = ts_ns
|
||||
imu_samples.append(_parse_imu(row, csv_path, row_idx, ts_ns))
|
||||
gps_records.append(_parse_gps(row, csv_path, row_idx, ts_ns))
|
||||
|
||||
if not gps_records:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {csv_path} has a header but no data rows"
|
||||
)
|
||||
|
||||
_LOGGER.info(
|
||||
"csv_ground_truth.loaded: csv_path=%s rows=%d span_s=%.3f",
|
||||
csv_path,
|
||||
len(gps_records),
|
||||
(gps_records[-1].ts_ns - gps_records[0].ts_ns) / _S_TO_NS,
|
||||
)
|
||||
|
||||
return CsvGroundTruth(
|
||||
records=tuple(gps_records),
|
||||
imu_samples=tuple(imu_samples),
|
||||
source=CSV_SOURCE_LABEL,
|
||||
)
|
||||
|
||||
|
||||
def _parse_time_ns(row: dict[str, str], path: Path, row_idx: int) -> int:
|
||||
raw = row.get("Time", "")
|
||||
try:
|
||||
seconds = float(raw)
|
||||
except ValueError as exc:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {path} row {row_idx}: Time={raw!r} is not a number"
|
||||
) from exc
|
||||
if math.isnan(seconds) or math.isinf(seconds):
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {path} row {row_idx}: Time={raw!r} is NaN/Inf"
|
||||
)
|
||||
return int(seconds * _S_TO_NS)
|
||||
|
||||
|
||||
def _parse_imu(
|
||||
row: dict[str, str],
|
||||
path: Path,
|
||||
row_idx: int,
|
||||
ts_ns: int,
|
||||
) -> ImuSample:
|
||||
accel = (
|
||||
_parse_float(row, "SCALED_IMU2.xacc", path, row_idx),
|
||||
_parse_float(row, "SCALED_IMU2.yacc", path, row_idx),
|
||||
_parse_float(row, "SCALED_IMU2.zacc", path, row_idx),
|
||||
)
|
||||
gyro = (
|
||||
_parse_float(row, "SCALED_IMU2.xgyro", path, row_idx),
|
||||
_parse_float(row, "SCALED_IMU2.ygyro", path, row_idx),
|
||||
_parse_float(row, "SCALED_IMU2.zgyro", path, row_idx),
|
||||
)
|
||||
return ImuSample(ts_ns=ts_ns, accel_xyz=accel, gyro_xyz=gyro)
|
||||
|
||||
|
||||
def _parse_gps(
|
||||
row: dict[str, str],
|
||||
path: Path,
|
||||
row_idx: int,
|
||||
ts_ns: int,
|
||||
) -> CsvGpsFix:
|
||||
return CsvGpsFix(
|
||||
ts_ns=ts_ns,
|
||||
lat_deg=_parse_float(row, "GLOBAL_POSITION_INT.lat", path, row_idx),
|
||||
lon_deg=_parse_float(row, "GLOBAL_POSITION_INT.lon", path, row_idx),
|
||||
alt_m=_parse_float(row, "GLOBAL_POSITION_INT.alt", path, row_idx) / _MM_PER_M,
|
||||
hdg_deg=_parse_float(row, "GLOBAL_POSITION_INT.hdg", path, row_idx) / _CDEG_PER_DEG,
|
||||
vx_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vx", path, row_idx) / _CM_PER_M_S,
|
||||
vy_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vy", path, row_idx) / _CM_PER_M_S,
|
||||
vz_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vz", path, row_idx) / _CM_PER_M_S,
|
||||
)
|
||||
|
||||
|
||||
def _parse_float(
|
||||
row: dict[str, str],
|
||||
column: str,
|
||||
path: Path,
|
||||
row_idx: int,
|
||||
) -> float:
|
||||
raw = row.get(column, "")
|
||||
try:
|
||||
value = float(raw)
|
||||
except ValueError as exc:
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {path} row {row_idx}: {column}={raw!r} is not a number"
|
||||
) from exc
|
||||
if math.isnan(value) or math.isinf(value):
|
||||
raise ReplayInputAdapterError(
|
||||
f"CSV {path} row {row_idx}: {column}={raw!r} is NaN/Inf"
|
||||
)
|
||||
return value
|
||||
@@ -789,6 +789,7 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
fixes (cold-start impossible), or when the estimator raises a
|
||||
fatal error. ``EXIT_SUCCESS`` (0) on clean completion.
|
||||
"""
|
||||
import dataclasses
|
||||
import time
|
||||
|
||||
from gps_denied_onboard._types.geo import LatLonAlt
|
||||
@@ -802,6 +803,9 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
EstimatorFatalError,
|
||||
)
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
from gps_denied_onboard.replay_input.csv_ground_truth import (
|
||||
load_csv_ground_truth,
|
||||
)
|
||||
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
|
||||
from gps_denied_onboard.replay_input.tlog_ground_truth import (
|
||||
load_tlog_ground_truth,
|
||||
@@ -856,38 +860,66 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
# explicitly.
|
||||
calibration = _load_camera_calibration(config)
|
||||
|
||||
# Cold-start origin from tlog's first GPS fix. This is the
|
||||
# ADR-010 / Principle #11 documented fallback when no operator
|
||||
# Manifest is available. ESKF/GTSAM both require an origin
|
||||
# before the first add_fc_imu (else EstimatorAlreadyStartedError).
|
||||
# Cold-start origin from the first GPS fix in the configured input.
|
||||
# AZ-894: prefer the CSV ground-truth (single canonical clock); fall
|
||||
# back to the legacy tlog path when ``imu_csv_path`` is unset (the
|
||||
# AZ-895 deprecation completes the removal).
|
||||
csv_path_str = config.replay.imu_csv_path
|
||||
tlog_path_str = config.replay.tlog_path
|
||||
_log.info(
|
||||
"replay_loop.loading_gps_for_cold_start: tlog_path=%s",
|
||||
tlog_path_str,
|
||||
extra={
|
||||
"kind": "replay_loop.loading_gps_for_cold_start",
|
||||
"kv": {"tlog_path": tlog_path_str},
|
||||
},
|
||||
)
|
||||
try:
|
||||
gt = load_tlog_ground_truth(Path(tlog_path_str))
|
||||
except ReplayInputAdapterError as exc:
|
||||
_log.error(
|
||||
"replay_loop.tlog_load_failed: %r",
|
||||
exc,
|
||||
using_csv = bool(csv_path_str)
|
||||
if using_csv:
|
||||
_log.info(
|
||||
"replay_loop.loading_gps_for_cold_start: imu_csv_path=%s",
|
||||
csv_path_str,
|
||||
extra={
|
||||
"kind": "replay_loop.tlog_load_failed",
|
||||
"kv": {"error": repr(exc)},
|
||||
"kind": "replay_loop.loading_gps_for_cold_start",
|
||||
"kv": {"imu_csv_path": csv_path_str},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
try:
|
||||
csv_gt = load_csv_ground_truth(Path(csv_path_str))
|
||||
except ReplayInputAdapterError as exc:
|
||||
_log.error(
|
||||
"replay_loop.csv_load_failed: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.csv_load_failed",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
gt = csv_gt
|
||||
else:
|
||||
_log.info(
|
||||
"replay_loop.loading_gps_for_cold_start: tlog_path=%s",
|
||||
tlog_path_str,
|
||||
extra={
|
||||
"kind": "replay_loop.loading_gps_for_cold_start",
|
||||
"kv": {"tlog_path": tlog_path_str},
|
||||
},
|
||||
)
|
||||
try:
|
||||
gt = load_tlog_ground_truth(Path(tlog_path_str))
|
||||
except ReplayInputAdapterError as exc:
|
||||
_log.error(
|
||||
"replay_loop.tlog_load_failed: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.tlog_load_failed",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
if not gt.records:
|
||||
_log.error(
|
||||
"replay_loop.cold_start_impossible: tlog has no GPS messages, "
|
||||
"replay_loop.cold_start_impossible: input has no GPS fixes, "
|
||||
"cannot seed C5 set_takeoff_origin",
|
||||
extra={
|
||||
"kind": "replay_loop.cold_start_impossible",
|
||||
"kv": {"tlog_path": tlog_path_str},
|
||||
"kv": {
|
||||
"input_path": csv_path_str if using_csv else tlog_path_str,
|
||||
"input_kind": "csv" if using_csv else "tlog",
|
||||
},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
@@ -924,22 +956,29 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
},
|
||||
)
|
||||
|
||||
# Open the tlog directly for synchronous IMU read. Bypasses the
|
||||
# decode-thread race in TlogReplayFcAdapter (see docstring).
|
||||
try:
|
||||
from pymavlink import mavutil # type: ignore[import-untyped]
|
||||
except ImportError as exc:
|
||||
_log.error(
|
||||
"replay_loop.pymavlink_unavailable: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.pymavlink_unavailable",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
|
||||
tlog_reader = mavutil.mavlink_connection(str(tlog_path_str))
|
||||
# IMU source. CSV path: walk the pre-loaded list (gt.imu_samples)
|
||||
# so the loop's IMU draining stays on the single canonical clock.
|
||||
# Tlog path (legacy): open the tlog directly for synchronous IMU
|
||||
# read; bypasses the decode-thread race in TlogReplayFcAdapter.
|
||||
tlog_reader: Any = None
|
||||
csv_imu_samples: tuple[ImuSample, ...] = ()
|
||||
csv_imu_idx = 0
|
||||
if using_csv:
|
||||
csv_imu_samples = csv_gt.imu_samples
|
||||
else:
|
||||
try:
|
||||
from pymavlink import mavutil # type: ignore[import-untyped]
|
||||
except ImportError as exc:
|
||||
_log.error(
|
||||
"replay_loop.pymavlink_unavailable: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.pymavlink_unavailable",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
return EXIT_GENERIC_FAILURE
|
||||
tlog_reader = mavutil.mavlink_connection(str(tlog_path_str))
|
||||
|
||||
# IMU sample buffer used to build per-frame ImuWindows. We
|
||||
# accumulate every RAW_IMU/SCALED_IMU2 sample whose FC-clock
|
||||
@@ -949,32 +988,41 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
imu_eof = False
|
||||
|
||||
def _drain_imu_until(target_ns: int) -> None:
|
||||
"""Advance the tlog reader, appending IMU samples up to ``target_ns``.
|
||||
"""Advance the IMU source, appending samples up to ``target_ns``.
|
||||
|
||||
Stops at end-of-stream (``recv_match`` returns ``None``).
|
||||
Mirrors :meth:`TlogReplayFcAdapter._handle_imu` for sample
|
||||
construction so the bytes-on-wire and the synchronous-read
|
||||
paths produce identical IMU samples.
|
||||
Branches on ``using_csv`` so the closure stays a single
|
||||
definition. Both branches respect the same ``pending_imu``
|
||||
buffer + ``imu_anchor_ns`` / ``imu_eof`` state and produce
|
||||
:class:`ImuSample` instances with identical numeric semantics
|
||||
— the tlog branch matches :meth:`TlogReplayFcAdapter._handle_imu`
|
||||
for byte-for-byte compatibility with the legacy path.
|
||||
"""
|
||||
nonlocal imu_anchor_ns, imu_eof
|
||||
nonlocal imu_anchor_ns, imu_eof, csv_imu_idx
|
||||
while not imu_eof:
|
||||
if pending_imu and pending_imu[-1].ts_ns >= target_ns:
|
||||
return
|
||||
msg = tlog_reader.recv_match(
|
||||
type=["RAW_IMU", "SCALED_IMU2"],
|
||||
blocking=False,
|
||||
)
|
||||
if msg is None:
|
||||
imu_eof = True
|
||||
return
|
||||
ts_ns = int(getattr(msg, "time_usec", 0)) * 1000
|
||||
if ts_ns == 0:
|
||||
continue
|
||||
sample = ImuSample(
|
||||
ts_ns=ts_ns,
|
||||
accel_xyz=(float(msg.xacc), float(msg.yacc), float(msg.zacc)),
|
||||
gyro_xyz=(float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)),
|
||||
)
|
||||
if using_csv:
|
||||
if csv_imu_idx >= len(csv_imu_samples):
|
||||
imu_eof = True
|
||||
return
|
||||
sample = csv_imu_samples[csv_imu_idx]
|
||||
csv_imu_idx += 1
|
||||
else:
|
||||
msg = tlog_reader.recv_match(
|
||||
type=["RAW_IMU", "SCALED_IMU2"],
|
||||
blocking=False,
|
||||
)
|
||||
if msg is None:
|
||||
imu_eof = True
|
||||
return
|
||||
ts_ns = int(getattr(msg, "time_usec", 0)) * 1000
|
||||
if ts_ns == 0:
|
||||
continue
|
||||
sample = ImuSample(
|
||||
ts_ns=ts_ns,
|
||||
accel_xyz=(float(msg.xacc), float(msg.yacc), float(msg.zacc)),
|
||||
gyro_xyz=(float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)),
|
||||
)
|
||||
if imu_anchor_ns is None:
|
||||
imu_anchor_ns = sample.ts_ns
|
||||
pending_imu.append(sample)
|
||||
@@ -1141,6 +1189,17 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
return EXIT_GENERIC_FAILURE
|
||||
|
||||
if vio_out is not None:
|
||||
# AZ-894 AC-4: when the CSV adapter drives the loop,
|
||||
# stamp VioOutput.emitted_at_ns with the CSV-derived
|
||||
# frame timestamp (single-clock invariant). Without
|
||||
# this, C1 produces a Jetson-monotonic timestamp that
|
||||
# disagrees with the CSV-anchored ImuWindow.ts_end_ns
|
||||
# the estimator consumed the line before — exactly
|
||||
# the AZ-848 two-clock surface this ticket eliminates.
|
||||
if using_csv:
|
||||
vio_out = dataclasses.replace(
|
||||
vio_out, emitted_at_ns=frame_end_ns
|
||||
)
|
||||
try:
|
||||
state_estimator.add_vio(vio_out)
|
||||
except EstimatorDegradedError as exc:
|
||||
@@ -1203,17 +1262,18 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int:
|
||||
if slack_ns > 0:
|
||||
time.sleep(slack_ns / 1_000_000_000.0)
|
||||
finally:
|
||||
try:
|
||||
tlog_reader.close()
|
||||
except Exception as exc: # pragma: no cover — defensive.
|
||||
_log.debug(
|
||||
"replay_loop.tlog_reader_close_error: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.tlog_reader_close_error",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
if tlog_reader is not None:
|
||||
try:
|
||||
tlog_reader.close()
|
||||
except Exception as exc: # pragma: no cover — defensive.
|
||||
_log.debug(
|
||||
"replay_loop.tlog_reader_close_error: %r",
|
||||
exc,
|
||||
extra={
|
||||
"kind": "replay_loop.tlog_reader_close_error",
|
||||
"kv": {"error": repr(exc)},
|
||||
},
|
||||
)
|
||||
|
||||
_log.info(
|
||||
"replay_loop.complete: frames=%d emitted=%d vio_init_skipped=%d "
|
||||
|
||||
@@ -36,6 +36,9 @@ from typing import TYPE_CHECKING, Any, Final
|
||||
|
||||
from gps_denied_onboard._types.calibration import CameraCalibration
|
||||
from gps_denied_onboard._types.fc import FcKind
|
||||
from gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter import (
|
||||
CsvReplayFcAdapter,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import (
|
||||
NoopMavlinkTransport,
|
||||
)
|
||||
@@ -44,6 +47,7 @@ from gps_denied_onboard.components.c8_fc_adapter.replay_sink import (
|
||||
)
|
||||
from gps_denied_onboard.config import Config
|
||||
from gps_denied_onboard.fdr_client import make_fdr_client
|
||||
from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource
|
||||
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
from gps_denied_onboard.replay_input import (
|
||||
@@ -73,6 +77,12 @@ REPLAY_BUILD_FLAGS: Final[tuple[str, ...]] = (
|
||||
"BUILD_REPLAY_SINK_JSONL",
|
||||
)
|
||||
|
||||
# AZ-894: separate build flag for the CSV adapter so the replay binary
|
||||
# can opt into the new path without disturbing the BUILD_TLOG_* gate
|
||||
# (the tlog adapter is still composed by _build_replay_input_bundle's
|
||||
# legacy branch until AZ-895 removes it).
|
||||
_CSV_REPLAY_BUILD_FLAG: Final[str] = "BUILD_CSV_REPLAY_ADAPTER"
|
||||
|
||||
|
||||
REPLAY_COMPONENT_KEYS: Final[tuple[str, ...]] = (
|
||||
"frame_source",
|
||||
@@ -175,14 +185,21 @@ def _validate_build_flags() -> None:
|
||||
|
||||
|
||||
def _validate_replay_paths(config: Config) -> None:
|
||||
"""Reject empty / missing replay paths early with a precise message."""
|
||||
"""Reject empty / missing replay paths early with a precise message.
|
||||
|
||||
AZ-894: ``imu_csv_path`` is the canonical replay input. ``tlog_path``
|
||||
remains valid for the legacy auto-sync path until AZ-895 removes it,
|
||||
but exactly one of the two must be set so the composition root can
|
||||
pick a single branch.
|
||||
"""
|
||||
if not config.replay.video_path:
|
||||
raise CompositionError(
|
||||
"config.replay.video_path is empty; replay mode requires a video path"
|
||||
)
|
||||
if not config.replay.tlog_path:
|
||||
if not config.replay.imu_csv_path and not config.replay.tlog_path:
|
||||
raise CompositionError(
|
||||
"config.replay.tlog_path is empty; replay mode requires a tlog path"
|
||||
"config.replay.imu_csv_path is empty and no tlog_path fallback is set; "
|
||||
"replay mode requires an IMU+GPS CSV (AZ-894) or a tlog file (legacy)"
|
||||
)
|
||||
if not config.replay.output_path:
|
||||
raise CompositionError(
|
||||
@@ -197,13 +214,31 @@ def _build_replay_input_bundle(
|
||||
adapter_factory: Any | None,
|
||||
mavlink_transport: Any | None = None,
|
||||
) -> ReplayInputBundle:
|
||||
"""Build the :class:`ReplayInputAdapter` and call ``open()``."""
|
||||
"""Build the replay input bundle and open the underlying strategies.
|
||||
|
||||
AZ-894: branches on ``config.replay.imu_csv_path`` — when set, builds
|
||||
the :class:`CsvReplayFcAdapter` + :class:`VideoFileFrameSource` pair
|
||||
on a single canonical clock derived from the CSV's ``Time`` column;
|
||||
when unset, falls back to the legacy :class:`ReplayInputAdapter`
|
||||
tlog path (auto-sync + AC-9 validator). AZ-895 removes the legacy
|
||||
branch.
|
||||
"""
|
||||
pace = _resolve_pace(config.replay.pace)
|
||||
target_fc_dialect = _resolve_fc_kind(config.replay.target_fc_dialect)
|
||||
auto_sync = _build_auto_sync_config(config)
|
||||
camera_calibration = _load_camera_calibration(config)
|
||||
wgs_converter = WgsConverter()
|
||||
|
||||
if config.replay.imu_csv_path:
|
||||
return _build_csv_bundle(
|
||||
config,
|
||||
fdr_client=fdr_client,
|
||||
pace=pace,
|
||||
target_fc_dialect=target_fc_dialect,
|
||||
camera_calibration=camera_calibration,
|
||||
mavlink_transport=mavlink_transport,
|
||||
)
|
||||
|
||||
auto_sync = _build_auto_sync_config(config)
|
||||
if adapter_factory is not None:
|
||||
adapter = adapter_factory(
|
||||
config=config,
|
||||
@@ -233,6 +268,56 @@ def _build_replay_input_bundle(
|
||||
return adapter.open()
|
||||
|
||||
|
||||
def _build_csv_bundle(
|
||||
config: Config,
|
||||
*,
|
||||
fdr_client: "FdrClient",
|
||||
pace: ReplayPace,
|
||||
target_fc_dialect: FcKind,
|
||||
camera_calibration: CameraCalibration,
|
||||
mavlink_transport: Any | None,
|
||||
) -> ReplayInputBundle:
|
||||
"""Compose the AZ-894 CSV bundle (frame source + CSV FC adapter + clock).
|
||||
|
||||
No auto-sync / auto-trim is run — the CSV's ``Time`` column is the
|
||||
single canonical clock by construction, so ``resolved_time_offset_ms``
|
||||
is fixed at 0 and ``auto_sync_result`` / ``aligned_window`` are
|
||||
``None``.
|
||||
"""
|
||||
from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock
|
||||
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||
|
||||
csv_path = Path(config.replay.imu_csv_path)
|
||||
if not csv_path.is_file():
|
||||
raise CompositionError(
|
||||
f"config.replay.imu_csv_path points at a missing file: {csv_path}"
|
||||
)
|
||||
|
||||
clock = TlogDerivedClock(source=iter([])) if pace is ReplayPace.ASAP else WallClock()
|
||||
frame_source = VideoFileFrameSource(
|
||||
path=Path(config.replay.video_path),
|
||||
camera_calibration_id=camera_calibration.camera_id,
|
||||
clock=clock,
|
||||
)
|
||||
fc_adapter = CsvReplayFcAdapter(
|
||||
csv_path=csv_path,
|
||||
target_fc_dialect=target_fc_dialect,
|
||||
clock=clock,
|
||||
fdr_client=fdr_client,
|
||||
pace=pace,
|
||||
mavlink_transport=mavlink_transport,
|
||||
)
|
||||
fc_adapter.open()
|
||||
return ReplayInputBundle(
|
||||
frame_source=frame_source,
|
||||
fc_adapter=fc_adapter,
|
||||
clock=clock,
|
||||
resolved_time_offset_ms=0,
|
||||
auto_sync_result=None,
|
||||
aligned_window=None,
|
||||
)
|
||||
|
||||
|
||||
def _resolve_pace(raw: str) -> ReplayPace:
|
||||
if raw == "asap":
|
||||
return ReplayPace.ASAP
|
||||
|
||||
@@ -69,6 +69,7 @@ class DerkachiReplayInputs:
|
||||
|
||||
video_path: Path
|
||||
tlog_path: Path
|
||||
imu_csv_path: Path
|
||||
calibration_path: Path
|
||||
config_path: Path
|
||||
signing_key_path: Path
|
||||
@@ -170,6 +171,7 @@ def derkachi_replay_inputs(tmp_path_factory: pytest.TempPathFactory) -> Derkachi
|
||||
return DerkachiReplayInputs(
|
||||
video_path=video_path,
|
||||
tlog_path=tlog_path,
|
||||
imu_csv_path=csv_path,
|
||||
calibration_path=_calibration_path(),
|
||||
config_path=config_path,
|
||||
signing_key_path=signing_key_path,
|
||||
@@ -241,8 +243,8 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
||||
binary,
|
||||
"--video",
|
||||
str(derkachi_replay_inputs.video_path),
|
||||
"--tlog",
|
||||
str(derkachi_replay_inputs.tlog_path),
|
||||
"--imu",
|
||||
str(derkachi_replay_inputs.imu_csv_path),
|
||||
"--output",
|
||||
str(out_path),
|
||||
"--camera-calibration",
|
||||
@@ -254,6 +256,12 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
||||
"--pace",
|
||||
pace,
|
||||
]
|
||||
# --tlog is deprecated under AZ-894 but we still forward it
|
||||
# when the synth tlog exists, so the legacy-path e2e tests
|
||||
# (test_derkachi_real_tlog.py) keep exercising the deprecation
|
||||
# warning until AZ-895 deletes the path entirely.
|
||||
if derkachi_replay_inputs.tlog_path.is_file():
|
||||
argv.extend(["--tlog", str(derkachi_replay_inputs.tlog_path)])
|
||||
if time_offset_ms is not None:
|
||||
argv.extend(["--time-offset-ms", str(time_offset_ms)])
|
||||
if skip_auto_sync:
|
||||
|
||||
@@ -0,0 +1,249 @@
|
||||
"""AZ-894 — ``CsvReplayFcAdapter`` unit tests.
|
||||
|
||||
Focused contract coverage for the thin :class:`FcAdapter` sibling that
|
||||
backs the CSV-driven replay input. The functional inbound/outbound
|
||||
plumbing the runtime loop relies on (CSV parsing, frame-stamped IMU
|
||||
draining, ESKF cold-start origin) is exercised in
|
||||
``tests/unit/replay_input/test_csv_ground_truth.py`` and the AZ-404
|
||||
e2e harness; here we pin the Protocol surface (open/close idempotency,
|
||||
build-flag refusal, source-set refusal, transport-less emit refusal)
|
||||
so a refactor of the adapter cannot silently regress Invariant 5 or
|
||||
the FcAdapter Protocol parity that the composition root depends on.
|
||||
|
||||
Style: every test follows the Arrange / Act / Assert pattern.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard._types.fc import FcKind, FlightState, Severity
|
||||
from gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter import (
|
||||
CsvReplayFcAdapter,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter.errors import (
|
||||
FcAdapterConfigError,
|
||||
FcEmitError,
|
||||
FcOpenError,
|
||||
SourceSetSwitchNotSupportedError,
|
||||
)
|
||||
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
|
||||
ReplayPace,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "ON")
|
||||
|
||||
|
||||
class _FakeClock:
|
||||
"""Minimal Clock stub returning a monotonic counter in ns."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._t = 0
|
||||
|
||||
def monotonic_ns(self) -> int:
|
||||
self._t += 1
|
||||
return self._t
|
||||
|
||||
def sleep_until_ns(self, _: int) -> None: # pragma: no cover — unused.
|
||||
return None
|
||||
|
||||
|
||||
class _FakeFdr:
|
||||
"""No-op FDR client stand-in (CsvReplayFcAdapter does not emit FDR yet)."""
|
||||
|
||||
def enqueue(self, _record: object) -> None: # pragma: no cover — unused.
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def csv_file(tmp_path: Path) -> Path:
|
||||
# Existence is the only check the adapter does at open() time —
|
||||
# the body never has to be parseable here (parsing lives in
|
||||
# csv_ground_truth.load_csv_ground_truth, tested separately).
|
||||
path = tmp_path / "data_imu.csv"
|
||||
path.write_text("placeholder", encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def _make_adapter(csv_path: Path) -> CsvReplayFcAdapter:
|
||||
return CsvReplayFcAdapter(
|
||||
csv_path=csv_path,
|
||||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||||
clock=_FakeClock(),
|
||||
fdr_client=_FakeFdr(),
|
||||
pace=ReplayPace.ASAP,
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Build flag
|
||||
|
||||
|
||||
def test_construction_refused_when_build_flag_off(
|
||||
monkeypatch: pytest.MonkeyPatch, csv_file: Path
|
||||
) -> None:
|
||||
# Arrange
|
||||
monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "OFF")
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcAdapterConfigError, match="BUILD_CSV_REPLAY_ADAPTER"):
|
||||
_make_adapter(csv_file)
|
||||
|
||||
|
||||
def test_construction_rejects_non_path(csv_file: Path) -> None:
|
||||
# Arrange — argument intentionally a str rather than Path.
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcAdapterConfigError, match="csv_path must be a pathlib.Path"):
|
||||
CsvReplayFcAdapter(
|
||||
csv_path=str(csv_file), # type: ignore[arg-type]
|
||||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||||
clock=_FakeClock(),
|
||||
fdr_client=_FakeFdr(),
|
||||
pace=ReplayPace.ASAP,
|
||||
)
|
||||
|
||||
|
||||
def test_construction_rejects_unknown_dialect(csv_file: Path) -> None:
|
||||
# Act + Assert
|
||||
with pytest.raises(FcAdapterConfigError, match="target_fc_dialect"):
|
||||
CsvReplayFcAdapter(
|
||||
csv_path=csv_file,
|
||||
target_fc_dialect=FcKind.GCS_QGC,
|
||||
clock=_FakeClock(),
|
||||
fdr_client=_FakeFdr(),
|
||||
pace=ReplayPace.ASAP,
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# open / close
|
||||
|
||||
|
||||
def test_open_refused_when_csv_missing(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(tmp_path / "absent.csv")
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcOpenError, match="CSV file not found"):
|
||||
adapter.open()
|
||||
|
||||
|
||||
def test_double_open_raises(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
adapter.open()
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcOpenError, match="already opened"):
|
||||
adapter.open()
|
||||
|
||||
|
||||
def test_close_is_idempotent_before_open(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
|
||||
# Act — close() before open() is a documented no-op (parity with tlog).
|
||||
adapter.close()
|
||||
adapter.close()
|
||||
|
||||
# Assert — no exception raised; state remains closeable.
|
||||
assert True
|
||||
|
||||
|
||||
def test_close_is_idempotent_after_open(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
adapter.open()
|
||||
|
||||
# Act
|
||||
adapter.close()
|
||||
adapter.close()
|
||||
|
||||
# Assert
|
||||
assert True
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Protocol parity
|
||||
|
||||
|
||||
def test_subscribe_returns_real_subscription_handle(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
adapter.open()
|
||||
|
||||
# Act
|
||||
subscription = adapter.subscribe_telemetry(lambda _frame: None)
|
||||
|
||||
# Assert — handle exposes the cancel() entry point even though the
|
||||
# bus is intentionally never fed (replay loop reads CSV directly).
|
||||
assert hasattr(subscription, "cancel")
|
||||
subscription.cancel()
|
||||
|
||||
|
||||
def test_source_set_switch_unsupported(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(SourceSetSwitchNotSupportedError):
|
||||
adapter.request_source_set_switch()
|
||||
|
||||
|
||||
def test_current_flight_state_returns_init_signal(csv_file: Path) -> None:
|
||||
# Arrange — CSV carries no MAVLink HEARTBEAT, so the adapter has
|
||||
# nothing to latch; the contract is to return an INIT-state signal.
|
||||
adapter = _make_adapter(csv_file)
|
||||
|
||||
# Act
|
||||
signal = adapter.current_flight_state()
|
||||
|
||||
# Assert
|
||||
assert signal.state is FlightState.INIT
|
||||
assert signal.last_valid_gps_hint_wgs84 is None
|
||||
assert signal.last_valid_gps_age_ms is None
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Outbound (Invariant 5)
|
||||
|
||||
|
||||
def test_emit_external_position_raises_without_transport(csv_file: Path) -> None:
|
||||
# Arrange — no MavlinkTransport injected → adapter falls back to the
|
||||
# AZ-399 raise-on-emit contract, mirroring TlogReplayFcAdapter.
|
||||
adapter = _make_adapter(csv_file)
|
||||
adapter.open()
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcEmitError, match="does not emit"):
|
||||
adapter.emit_external_position(_dummy_estimator_output())
|
||||
|
||||
|
||||
def test_emit_status_text_raises_without_transport(csv_file: Path) -> None:
|
||||
# Arrange
|
||||
adapter = _make_adapter(csv_file)
|
||||
adapter.open()
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(FcEmitError, match="does not emit"):
|
||||
adapter.emit_status_text("hello", severity=Severity.INFO)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Helpers
|
||||
|
||||
|
||||
def _dummy_estimator_output() -> object:
|
||||
# The transport-less emit path short-circuits with FcEmitError before
|
||||
# reading any field, so a duck-typed stand-in is enough — duplicating
|
||||
# the full EstimatorOutput (UUID frame_id, 6x6 covariance, etc.)
|
||||
# would only hide the actual contract being tested.
|
||||
from types import SimpleNamespace
|
||||
|
||||
return SimpleNamespace()
|
||||
@@ -0,0 +1,287 @@
|
||||
"""AZ-894 — CSV-driven IMU + GPS ground-truth extractor.
|
||||
|
||||
Covers AC-1 (parses 4,899 IMU + 4,899 GPS samples on a single monotonic
|
||||
clock) and AC-5 (clear ``ReplayInputAdapterError`` at startup for schema
|
||||
faults) of ``_docs/02_tasks/todo/AZ-894_csv_driven_replay_adapter.md``.
|
||||
|
||||
The happy-path test is gated on the committed Derkachi fixture
|
||||
(``_docs/00_problem/input_data/flight_derkachi/data_imu.csv``, 4,899
|
||||
rows + header). Schema-fault tests use synthetic CSV strings written
|
||||
to ``tmp_path`` so they remain deterministic and do not depend on the
|
||||
fixture being present.
|
||||
|
||||
Style: every test follows the Arrange / Act / Assert pattern.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard.replay_input.csv_ground_truth import (
|
||||
CSV_SOURCE_LABEL,
|
||||
REQUIRED_COLUMNS,
|
||||
load_csv_ground_truth,
|
||||
)
|
||||
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
|
||||
|
||||
|
||||
_DERKACHI_CSV: Path = (
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "_docs"
|
||||
/ "00_problem"
|
||||
/ "input_data"
|
||||
/ "flight_derkachi"
|
||||
/ "data_imu.csv"
|
||||
)
|
||||
|
||||
|
||||
_EXAMPLE_CSV: Path = (
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "_docs"
|
||||
/ "02_document"
|
||||
/ "contracts"
|
||||
/ "replay"
|
||||
/ "example_data_imu.csv"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Header + minimal-row helpers
|
||||
|
||||
|
||||
def _write_csv(path: Path, header: str, rows: list[str]) -> Path:
|
||||
path.write_text(header + "\n" + "\n".join(rows) + "\n", encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def _full_header() -> str:
|
||||
return ",".join(REQUIRED_COLUMNS)
|
||||
|
||||
|
||||
def _row(time_s: float, *, prefix_ms: float = 0.0) -> str:
|
||||
# 15 fields total matching REQUIRED_COLUMNS ordering. Values are
|
||||
# picked to be valid floats; the exact magnitudes do not matter
|
||||
# for these tests (the loader only validates parseability + range).
|
||||
fields = [
|
||||
str(prefix_ms),
|
||||
str(time_s),
|
||||
"10",
|
||||
"-3",
|
||||
"-980",
|
||||
"50",
|
||||
"30",
|
||||
"-5",
|
||||
"50.0809634",
|
||||
"36.1115442",
|
||||
"141290",
|
||||
"-4",
|
||||
"-6",
|
||||
"-88",
|
||||
"35041",
|
||||
]
|
||||
return ",".join(fields)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-1: happy path on the real Derkachi CSV
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not _DERKACHI_CSV.is_file(),
|
||||
reason="Derkachi fixture data_imu.csv not present",
|
||||
)
|
||||
def test_ac1_loads_derkachi_csv_emits_paired_samples() -> None:
|
||||
# Arrange — committed fixture path; nothing to set up.
|
||||
# Note: AZ-894 spec mentions "4,899 samples"; the actual fixture
|
||||
# spans Time=0.0..489.9 s in 0.1 s steps → 4,900 rows. We pin the
|
||||
# concrete count so the test catches truncation, plus the
|
||||
# span-derived invariant so future fixtures with a different
|
||||
# length still pass for the right reason.
|
||||
expected_count = 4900
|
||||
|
||||
# Act
|
||||
gt = load_csv_ground_truth(_DERKACHI_CSV)
|
||||
|
||||
# Assert
|
||||
assert gt.source == CSV_SOURCE_LABEL
|
||||
assert len(gt.records) == expected_count
|
||||
assert len(gt.imu_samples) == expected_count
|
||||
# First row of the fixture has Time=0; last is 489.9 s (10 Hz).
|
||||
assert gt.records[0].ts_ns == 0
|
||||
assert gt.records[-1].ts_ns == int(489.9 * 1e9)
|
||||
# IMU samples share the same canonical clock as the GPS records.
|
||||
for gps, imu in zip(gt.records, gt.imu_samples, strict=True):
|
||||
assert gps.ts_ns == imu.ts_ns
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AZ-896 AC-3: the shipped example CSV stays parser-clean
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not _EXAMPLE_CSV.is_file(),
|
||||
reason="AZ-896 example_data_imu.csv not present",
|
||||
)
|
||||
def test_az896_example_csv_loads_clean() -> None:
|
||||
# Arrange — committed AZ-896 example; nothing to set up.
|
||||
|
||||
# Act
|
||||
gt = load_csv_ground_truth(_EXAMPLE_CSV)
|
||||
|
||||
# Assert
|
||||
assert gt.source == CSV_SOURCE_LABEL
|
||||
assert len(gt.records) >= 10
|
||||
assert len(gt.records) == len(gt.imu_samples)
|
||||
assert gt.records[0].ts_ns == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-1 (small fixture): paired-sample invariants
|
||||
|
||||
|
||||
def test_paired_imu_and_gps_share_clock(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
csv = _write_csv(
|
||||
tmp_path / "ok.csv",
|
||||
_full_header(),
|
||||
[
|
||||
_row(0.0, prefix_ms=4551116.348),
|
||||
_row(0.1, prefix_ms=4551216.348),
|
||||
_row(0.2, prefix_ms=4551316.348),
|
||||
],
|
||||
)
|
||||
|
||||
# Act
|
||||
gt = load_csv_ground_truth(csv)
|
||||
|
||||
# Assert
|
||||
assert len(gt.records) == 3 and len(gt.imu_samples) == 3
|
||||
expected_ns = [0, 100_000_000, 200_000_000]
|
||||
assert [r.ts_ns for r in gt.records] == expected_ns
|
||||
assert [s.ts_ns for s in gt.imu_samples] == expected_ns
|
||||
|
||||
|
||||
def test_gps_unit_conversion(tmp_path: Path) -> None:
|
||||
# Arrange — values exercise the deg/mm/cm-s/cdeg conversions.
|
||||
header = _full_header()
|
||||
row = ",".join([
|
||||
"0.0", "0.0",
|
||||
"10", "-3", "-980", "50", "30", "-5", # IMU stays raw
|
||||
"50.0809634", # lat already in degrees
|
||||
"36.1115442", # lon already in degrees
|
||||
"141290", # alt in mm → 141.290 m
|
||||
"-400", # vx in cm/s → -4.0 m/s
|
||||
"600", # vy in cm/s → 6.0 m/s
|
||||
"-88", # vz in cm/s → -0.88 m/s
|
||||
"35041", # hdg in cdeg → 350.41 deg
|
||||
])
|
||||
csv = _write_csv(tmp_path / "units.csv", header, [row])
|
||||
|
||||
# Act
|
||||
gt = load_csv_ground_truth(csv)
|
||||
|
||||
# Assert
|
||||
fix = gt.records[0]
|
||||
assert fix.lat_deg == pytest.approx(50.0809634)
|
||||
assert fix.lon_deg == pytest.approx(36.1115442)
|
||||
assert fix.alt_m == pytest.approx(141.290)
|
||||
assert fix.vx_m_s == pytest.approx(-4.0)
|
||||
assert fix.vy_m_s == pytest.approx(6.0)
|
||||
assert fix.vz_m_s == pytest.approx(-0.88)
|
||||
assert fix.hdg_deg == pytest.approx(350.41)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# AC-5: schema faults raise ReplayInputAdapterError at startup
|
||||
|
||||
|
||||
def test_ac5_file_not_found_raises(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
missing = tmp_path / "absent.csv"
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="CSV file not found"):
|
||||
load_csv_ground_truth(missing)
|
||||
|
||||
|
||||
def test_ac5_missing_required_column_raises(tmp_path: Path) -> None:
|
||||
# Arrange — drop one required column from the header.
|
||||
bad_header = ",".join(c for c in REQUIRED_COLUMNS if c != "SCALED_IMU2.xacc")
|
||||
csv = _write_csv(
|
||||
tmp_path / "missing_col.csv",
|
||||
bad_header,
|
||||
["0,0,-3,-980,50,30,-5,50.0,36.0,141290,-4,-6,-88,35041"],
|
||||
)
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="missing required columns"):
|
||||
load_csv_ground_truth(csv)
|
||||
|
||||
|
||||
def test_ac5_nan_in_time_raises(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
csv = _write_csv(
|
||||
tmp_path / "nan_time.csv",
|
||||
_full_header(),
|
||||
[_row(0.0), _row(float("nan"))],
|
||||
)
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="Time=.*is NaN/Inf"):
|
||||
load_csv_ground_truth(csv)
|
||||
|
||||
|
||||
def test_ac5_non_monotonic_time_raises(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
csv = _write_csv(
|
||||
tmp_path / "non_monotonic.csv",
|
||||
_full_header(),
|
||||
[_row(0.1), _row(0.0)],
|
||||
)
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"):
|
||||
load_csv_ground_truth(csv)
|
||||
|
||||
|
||||
def test_ac5_repeated_time_also_non_monotonic(tmp_path: Path) -> None:
|
||||
# Arrange — equal timestamps still violate strict monotonicity so
|
||||
# the preintegrator never gets fed a zero-delta window.
|
||||
csv = _write_csv(
|
||||
tmp_path / "repeated.csv",
|
||||
_full_header(),
|
||||
[_row(0.0), _row(0.0)],
|
||||
)
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"):
|
||||
load_csv_ground_truth(csv)
|
||||
|
||||
|
||||
def test_ac5_non_numeric_imu_value_raises(tmp_path: Path) -> None:
|
||||
# Arrange — substitute a non-parseable token in the IMU column.
|
||||
row = ",".join([
|
||||
"0.0", "0.0",
|
||||
"not-a-number", # SCALED_IMU2.xacc
|
||||
"-3", "-980", "50", "30", "-5",
|
||||
"50.0", "36.0", "141290", "-4", "-6", "-88", "35041",
|
||||
])
|
||||
csv = _write_csv(tmp_path / "bad_imu.csv", _full_header(), [row])
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(
|
||||
ReplayInputAdapterError,
|
||||
match=r"SCALED_IMU2\.xacc=.*is not a number",
|
||||
):
|
||||
load_csv_ground_truth(csv)
|
||||
|
||||
|
||||
def test_ac5_header_only_raises(tmp_path: Path) -> None:
|
||||
# Arrange — header but no data rows.
|
||||
csv = tmp_path / "header_only.csv"
|
||||
csv.write_text(_full_header() + "\n", encoding="utf-8")
|
||||
|
||||
# Act + Assert
|
||||
with pytest.raises(ReplayInputAdapterError, match="no data rows"):
|
||||
load_csv_ground_truth(csv)
|
||||
@@ -491,12 +491,14 @@ def test_ac8_replay_branch_imports_only_public_apis() -> None:
|
||||
tree = ast.parse(text)
|
||||
|
||||
# Allowed deep imports: into the c8_fc_adapter component (the
|
||||
# noop transport + the JSONL sink) and into the `replay_input`
|
||||
# cross-cutting coordinator (Layer-4). Both are documented in
|
||||
# module-layout.md as the replay strategy homes.
|
||||
# noop transport + the JSONL sink + the AZ-894 CSV replay adapter)
|
||||
# and into the `replay_input` cross-cutting coordinator (Layer-4).
|
||||
# All of these are documented in module-layout.md as the replay
|
||||
# strategy homes.
|
||||
allowed_deep_prefixes = (
|
||||
"gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport",
|
||||
"gps_denied_onboard.components.c8_fc_adapter.replay_sink",
|
||||
"gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter",
|
||||
"gps_denied_onboard.replay_input.tlog_video_adapter",
|
||||
)
|
||||
|
||||
@@ -632,9 +634,14 @@ def test_replay_branch_rejects_empty_video_path(
|
||||
build_replay_components(config)
|
||||
|
||||
|
||||
def test_replay_branch_rejects_empty_tlog_path(
|
||||
def test_replay_branch_rejects_both_inputs_empty(
|
||||
_airborne_replay_env: Path,
|
||||
) -> None:
|
||||
# AZ-894: the validation gate now accepts either imu_csv_path
|
||||
# (canonical) or tlog_path (legacy) — rejecting only when both
|
||||
# are empty. Keeping the historical name pattern (test_*_rejects_*)
|
||||
# for grep parity but renamed to reflect the new semantics.
|
||||
|
||||
# Arrange
|
||||
runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env))
|
||||
config = Config(
|
||||
@@ -642,6 +649,7 @@ def test_replay_branch_rejects_empty_tlog_path(
|
||||
replay=ReplayConfig(
|
||||
video_path="/dev/null/fake.mp4",
|
||||
tlog_path="",
|
||||
imu_csv_path="",
|
||||
output_path="/tmp/out.jsonl",
|
||||
pace="asap",
|
||||
target_fc_dialect="ardupilot_plane",
|
||||
@@ -650,7 +658,7 @@ def test_replay_branch_rejects_empty_tlog_path(
|
||||
)
|
||||
|
||||
# Act / Assert
|
||||
with pytest.raises(CompositionError, match="tlog_path is empty"):
|
||||
with pytest.raises(CompositionError, match="imu_csv_path is empty"):
|
||||
build_replay_components(config)
|
||||
|
||||
|
||||
|
||||
@@ -57,6 +57,10 @@ def _required_files(tmp_path: Path, _calib_payload: dict[str, Any]) -> dict[str,
|
||||
video.write_bytes(b"\x00\x00\x00\x18ftypmp42") # placeholder
|
||||
tlog = tmp_path / "flight.tlog"
|
||||
tlog.write_bytes(b"\x00")
|
||||
imu_csv = tmp_path / "data_imu.csv"
|
||||
# Minimal placeholder — the CLI only validates existence, parsing
|
||||
# happens later inside the runtime loop.
|
||||
imu_csv.write_text("placeholder", encoding="utf-8")
|
||||
output = tmp_path / "out.jsonl"
|
||||
calib = tmp_path / "calib.json"
|
||||
calib.write_text(json.dumps(_calib_payload))
|
||||
@@ -67,6 +71,7 @@ def _required_files(tmp_path: Path, _calib_payload: dict[str, Any]) -> dict[str,
|
||||
return {
|
||||
"video": video,
|
||||
"tlog": tlog,
|
||||
"imu": imu_csv,
|
||||
"output": output,
|
||||
"camera_calibration": calib,
|
||||
"config": config_yaml,
|
||||
@@ -95,6 +100,7 @@ def _argv(files: dict[str, Path], **overrides: Any) -> list[str]:
|
||||
"""Build a CLI argv from the required-files fixture + overrides."""
|
||||
base = {
|
||||
"--video": str(files["video"]),
|
||||
"--imu": str(files["imu"]),
|
||||
"--tlog": str(files["tlog"]),
|
||||
"--output": str(files["output"]),
|
||||
"--camera-calibration": str(files["camera_calibration"]),
|
||||
@@ -477,6 +483,7 @@ def test_ac10_console_script_runs_help() -> None:
|
||||
# Required-arg surface check
|
||||
for arg in (
|
||||
"--video",
|
||||
"--imu",
|
||||
"--tlog",
|
||||
"--output",
|
||||
"--camera-calibration",
|
||||
|
||||
Reference in New Issue
Block a user