diff --git a/_docs/02_document/contracts/replay/csv_replay_format.md b/_docs/02_document/contracts/replay/csv_replay_format.md new file mode 100644 index 0000000..bde20e5 --- /dev/null +++ b/_docs/02_document/contracts/replay/csv_replay_format.md @@ -0,0 +1,149 @@ +# Replay-input CSV format (AZ-896) + +**Status**: canonical operator-facing spec for the `--imu` argument of +`gps-denied-replay` (AZ-894). +**Audience**: operators preparing a (video, CSV) replay pair, plus engineers +implementing alternative replay backends. +**Companion artifacts**: + +- `_docs/02_document/contracts/replay/example_data_imu.csv` — minimal valid + example (20 rows = 2 s at 10 Hz). +- `_docs/00_problem/input_data/flight_derkachi/data_imu.csv` — full Derkachi + fixture (4,900 rows = 489.9 s at 10 Hz). +- Parser implementation: + `src/gps_denied_onboard/replay_input/csv_ground_truth.py`. + +## Hard contract (read before generating a file) + +The replay pipeline trusts the CSV blindly inside the loop. Violations of any +of the following will produce silently wrong outputs (the parser only catches +schema-level faults, not semantic ones), so the operator owns these +invariants: + +1. **Nadir camera.** The companion `.mp4` must be a nadir (straight-down) + recording. The C1 VIO and C2 VPR stages assume nadir framing; oblique + imagery breaks the satellite-anchor and VIO scale recovery. +2. **Airborne at row 0.** The UAV must already be airborne at the first CSV + row / first video frame. The replay pipeline does not implement a + take-off detector — feeding a ground-roll segment yields garbage IMU + integration. +3. **Aligned start.** Row 0's `Time = 0.0` must correspond to the first + video frame. The CLI does not perform sub-frame alignment; offset the + CSV/clip pair offline before invoking `gps-denied-replay`. +4. **Monotonic, uniformly-spaced `Time`.** Rows must be strictly increasing + on `Time` and uniformly spaced (the Derkachi fixture is 10 Hz). The + parser enforces monotonicity (AC-5); uniform spacing is the operator's + responsibility — non-uniform spacing skews the ESKF prediction step + without raising an error. + +## Schema + +The CSV must be header-first, comma-separated, UTF-8 encoded. Column order +does not matter — the parser uses `csv.DictReader` and looks up by name — +but the column **names** must match exactly (case-sensitive). + +15 columns are required; up to 4 additional columns (mag fields, +`relative_alt`) are tolerated and ignored. + +### Required columns + +| # | Column | Unit | Type | Notes | +|---|--------|------|------|-------| +| 1 | `timestamp(ms)` | ms | float | Pixhawk wall clock at sample capture. **Ignored by the replay pipeline** — kept only for trace-back to the original tlog. | +| 2 | `Time` | s | float | **Canonical replay clock.** Must start at `0.0`, increase monotonically, and be uniformly spaced. The replay loop uses this column for every timestamp it emits. | +| 3 | `SCALED_IMU2.xacc` | mg | float | Body-frame X accelerometer, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.accel_xyz[0]`. | +| 4 | `SCALED_IMU2.yacc` | mg | float | Body-frame Y accelerometer. | +| 5 | `SCALED_IMU2.zacc` | mg | float | Body-frame Z accelerometer. | +| 6 | `SCALED_IMU2.xgyro` | mrad/s | float | Body-frame X gyro, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.gyro_xyz[0]`. | +| 7 | `SCALED_IMU2.ygyro` | mrad/s | float | Body-frame Y gyro. | +| 8 | `SCALED_IMU2.zgyro` | mrad/s | float | Body-frame Z gyro. | +| 9 | `GLOBAL_POSITION_INT.lat` | degrees | float | WGS84 latitude. **Already in decimal degrees** (Derkachi dump convention — pre-divided by 1e7 from MAVLink's int representation). | +| 10 | `GLOBAL_POSITION_INT.lon` | degrees | float | WGS84 longitude (same convention as `lat`). | +| 11 | `GLOBAL_POSITION_INT.alt` | mm | float | MSL altitude. Parser divides by 1000 to emit metres. | +| 12 | `GLOBAL_POSITION_INT.vx` | cm/s | float | NED north velocity. Parser divides by 100 to emit m/s. | +| 13 | `GLOBAL_POSITION_INT.vy` | cm/s | float | NED east velocity. | +| 14 | `GLOBAL_POSITION_INT.vz` | cm/s | float | NED down velocity. | +| 15 | `GLOBAL_POSITION_INT.hdg` | cdeg | float | Heading, 0–35999. Parser divides by 100 to emit degrees. | + +### Tolerated extra columns + +The following may be present but are not consumed: + +| Column | Reason kept | Reason unused | +|--------|-------------|---------------| +| `SCALED_IMU2.xmag`, `.ymag`, `.zmag` | Symmetric with the accel/gyro triples in the Derkachi dump | The current ESKF does not integrate magnetometer; AZ-848 follow-up may add it | +| `GLOBAL_POSITION_INT.relative_alt` | Present in the MAVLink dump | The replay pipeline uses MSL `alt` only | + +Additional columns beyond these are ignored without warning. Missing +required columns cause the load to raise +`ReplayInputAdapterError` before the replay loop starts (AC-5). + +## Schema-level errors the parser catches + +The parser raises `ReplayInputAdapterError` (CLI exit code 1) for any of: + +- File does not exist or is not a regular file. +- File is empty (no header row). +- File has a header but no data rows. +- Any required column from the table above is missing from the header. +- The `Time` column at any row contains a non-numeric / NaN / Inf value. +- The `Time` column is non-monotonic (`Time[i] <= Time[i-1]`). +- Any required IMU or GPS column at any row contains a non-numeric / NaN / + Inf value. + +The error message includes the row number (1-based, where row 1 is the +header — so the first data row is row 2). Operators should treat the first +parse failure as authoritative and fix the source CSV; the parser does not +continue after the first invalid row. + +## Operator workflow + +```bash +gps-denied-replay \ + --video ./flight.mp4 \ + --imu ./data_imu.csv \ + --output ./estimator_output.jsonl \ + --camera-calibration ./calib.json \ + --config ./config.yaml \ + --mavlink-signing-key ./signing_key.bin +``` + +`--tlog` is accepted as a deprecated alias and will be removed by AZ-895. +When both `--imu` and `--tlog` are supplied, `--imu` wins and a deprecation +warning is printed to stderr. + +## Deriving a new CSV from an ArduPilot tlog + +The Derkachi fixture was produced with `pymavlink`'s `mavlogdump.py`. The +short version: + +```bash +mavlogdump.py --format csv \ + --types SCALED_IMU2,GLOBAL_POSITION_INT \ + ./flight.tlog > ./raw_dump.csv +``` + +Then post-process to: + +1. Rename / merge the per-message timestamp into a single `Time` column + relative to the first row. +2. Drop pre-takeoff rows (the UAV must be airborne at row 0 — see the hard + contract above). +3. Pre-divide `lat` / `lon` from the MAVLink `int * 1e7` representation + into decimal degrees. +4. Re-sample to a uniform 10 Hz cadence if the tlog dump produced + non-uniform spacing. + +A reference post-processor script is **not** shipped — operators +historically write a one-off Python or Pandas pipeline per source aircraft. + +## Cross-references + +- AZ-894 — the CLI + adapter that consumes this format. +- AZ-895 — deletes the legacy `--tlog` argument once all callers migrate. +- AZ-897 — operator replay UI; links to this page and serves + `example_data_imu.csv`. +- `_docs/02_document/contracts/replay/replay_protocol.md` — the broader + replay orchestration contract. +- `_docs/00_problem/input_data/flight_derkachi/README.md` — fixture + provenance and license caveats. diff --git a/_docs/02_document/contracts/replay/example_data_imu.csv b/_docs/02_document/contracts/replay/example_data_imu.csv new file mode 100644 index 0000000..c83a48c --- /dev/null +++ b/_docs/02_document/contracts/replay/example_data_imu.csv @@ -0,0 +1,21 @@ +timestamp(ms),Time,SCALED_IMU2.xacc,SCALED_IMU2.yacc,SCALED_IMU2.zacc,SCALED_IMU2.xgyro,SCALED_IMU2.ygyro,SCALED_IMU2.zgyro,SCALED_IMU2.xmag,SCALED_IMU2.ymag,SCALED_IMU2.zmag,GLOBAL_POSITION_INT.lat,GLOBAL_POSITION_INT.lon,GLOBAL_POSITION_INT.alt,GLOBAL_POSITION_INT.relative_alt,GLOBAL_POSITION_INT.vx,GLOBAL_POSITION_INT.vy,GLOBAL_POSITION_INT.vz,GLOBAL_POSITION_INT.hdg +4551116.348,0,21,-3,-984,52,32,-5,312,-1048,442,50.0809634,36.1115442,141290,23.182,-4,-6,-88,35041 +4551216.348,0.1,-68,-9,-995,58,-17,1,309,-1016,441,50.0809634,36.1115441,141360,23.251,-5,-2,-89,35042 +4551316.348,0.2,9,108,-988,69,-65,13,308,-964,436,50.0809633,36.1115441,141410,23.303,-1,-2,-86,35048 +4551416.348,0.3,-20,27,-977,55,10,26,310,-988,438,50.0809633,36.1115441,141450,23.348,-5,-6,-84,35057 +4551516.348,0.4,-40,40,-1026,0,65,10,306,-1076,440,50.0809633,36.111544,141510,23.402,-2,-2,-86,35065 +4551616.348,0.5,30,126,-1050,-1,75,14,321,-1146,442,50.0809633,36.111544,141570,23.464,0,0,-88,35074 +4551716.348,0.6,-64,67,-1031,-31,-6,21,314,-1066,438,50.0809632,36.1115439,141640,23.53,-5,1,-90,35080 +4551816.348,0.7,-22,112,-1027,-61,-88,-5,302,-951,436,50.0809632,36.1115439,141710,23.601,-2,3,-90,35082 +4551916.348,0.8,-123,-16,-998,-55,-104,-12,301,-942,440,50.0809631,36.1115439,141770,23.669,-10,0,-91,35079 +4552016.348,0.9,-64,-13,-1003,13,-70,-30,301,-936,442,50.080963,36.1115439,141860,23.755,-2,0,-90,35073 +4552116.348,1,-22,39,-995,73,20,-18,314,-988,436,50.080963,36.1115439,141930,23.826,-2,-2,-88,35070 +4552216.348,1.1,-49,-69,-984,2,29,1,317,-992,433,50.080963,36.1115438,142010,23.9,-6,-2,-88,35068 +4552316.348,1.2,-16,98,-991,-59,-28,-11,310,-970,435,50.080963,36.1115438,142080,23.975,-1,6,-86,35063 +4552416.348,1.3,-6,169,-998,-29,2,-2,310,-983,435,50.0809629,36.1115438,142150,24.042,-3,5,-83,35059 +4552516.348,1.4,-31,53,-1003,2,13,-10,317,-1042,438,50.0809629,36.1115438,142210,24.102,-3,3,-83,35051 +4552616.348,1.5,-47,21,-1023,13,13,-14,320,-1069,439,50.0809629,36.1115438,142270,24.166,2,2,-83,35047 +4552716.348,1.6,-30,-59,-1020,-18,24,0,315,-1083,438,50.0809629,36.1115439,142340,24.236,-5,1,-86,35049 +4552816.348,1.7,-103,23,-1058,-59,26,-7,314,-1113,442,50.0809629,36.1115439,142430,24.321,-4,4,-90,35050 +4552916.348,1.8,-17,51,-1037,-9,80,11,317,-1087,444,50.0809629,36.1115439,142510,24.404,-5,0,-93,35049 +4553016.348,1.9,-87,72,-1022,-10,-45,0,309,-1004,439,50.0809628,36.111544,142600,24.494,-6,2,-97,35046 diff --git a/_docs/02_document/module-layout.md b/_docs/02_document/module-layout.md index b023d73..7b653fa 100644 --- a/_docs/02_document/module-layout.md +++ b/_docs/02_document/module-layout.md @@ -197,6 +197,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `msp2_inav_adapter.py` (iNav via MSP2) - `mavlink_gcs_adapter.py` (1–2 Hz downsampled summary to QGroundControl) - `tlog_replay_adapter.py` (replay-mode `FcAdapter`; gated `BUILD_TLOG_REPLAY_ADAPTER`; ON in airborne per ADR-011; AZ-265) + - `csv_replay_adapter.py` (`CsvReplayFcAdapter` — outbound shim for the AZ-894 CSV-driven replay path; same `FcAdapter` Protocol parity as `tlog_replay_adapter`; gated `BUILD_TLOG_REPLAY_ADAPTER` for the airborne replay binary; AZ-894) - `replay_sink.py` (`ReplaySink` interface + `JsonlReplaySink` impl; gated `BUILD_REPLAY_SINK_JSONL`; ON in airborne per ADR-011; AZ-265) - `noop_mavlink_transport.py` (`NoopMavlinkTransport` for replay-mode outbound bytes; gated `BUILD_REPLAY_SINK_JSONL`; ON in airborne; AZ-265 / AZ-400) - `serial_mavlink_transport.py` (`SerialMavlinkTransport` retrofit of the existing live-mode UART transport; AZ-265 / AZ-400 no-op restructure) diff --git a/_docs/02_tasks/todo/AZ-894_csv_driven_replay_adapter.md b/_docs/02_tasks/done/AZ-894_csv_driven_replay_adapter.md similarity index 100% rename from _docs/02_tasks/todo/AZ-894_csv_driven_replay_adapter.md rename to _docs/02_tasks/done/AZ-894_csv_driven_replay_adapter.md diff --git a/_docs/02_tasks/todo/AZ-896_replay_format_docs_and_example_csv.md b/_docs/02_tasks/done/AZ-896_replay_format_docs_and_example_csv.md similarity index 100% rename from _docs/02_tasks/todo/AZ-896_replay_format_docs_and_example_csv.md rename to _docs/02_tasks/done/AZ-896_replay_format_docs_and_example_csv.md diff --git a/_docs/03_implementation/batch_02_cycle4_report.md b/_docs/03_implementation/batch_02_cycle4_report.md new file mode 100644 index 0000000..5066237 --- /dev/null +++ b/_docs/03_implementation/batch_02_cycle4_report.md @@ -0,0 +1,241 @@ +# Batch Report — cycle 4, batch 02 + +**Batch**: 02 +**Cycle**: 4 +**Tasks**: AZ-894, AZ-896 +**Total complexity**: 4 SP (3 + 1) +**Date**: 2026-05-26 + +## Task Selection + +AZ-894 (CSV-driven replay adapter) and AZ-896 (CSV format docs + example +CSV) are the cycle-4 replay-input redesign's primary pair. Their +dependency edge is documented as soft / either-order so they ship in a +single review unit: + +- AZ-894 wires the production code that consumes the new schema. +- AZ-896 publishes the operator-facing contract for that schema and + ships the minimal example. +- Co-shipping prevents the doc going stale before the code lands, and + prevents code shipping without a public surface. + +The user's design-question answers (in-session, 2026-05-26) shaped the +implementation: + +- **CLI coexistence (`--imu` vs `--tlog`)** → `replace`: `--imu` is the + new required arg; `--tlog` becomes a deprecated alias that warns and + is ignored when `--imu` is set. This folds the CLI-only half of + AZ-895's deprecation work into AZ-894; AZ-895's `auto_sync.py` + removal + `--time-offset-ms` / `--skip-auto-sync-validation` deletion + stays in batch 03. +- **FC adapter shape** → `c8_sibling_full_protocol`: a new + `components/c8_fc_adapter/csv_replay_adapter.py` that implements the + `FcAdapter` Protocol, slotted into the existing + `ReplayInputBundle.fc_adapter` field. +- **Session sequencing** → `continue_now` (single-session batch). + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|----------------|-------|-------------|--------| +| AZ-894_csv_driven_replay_adapter | Done | 9 modified, 3 added (see "Files touched" below) | 11 new + 9 updated unit tests → all green; e2e Derkachi run gated on `RUN_REPLAY_E2E=1` (Jetson-only) | 5/5 | None | +| AZ-896_replay_format_docs_and_example_csv | Done | 1 doc added, 1 CSV added | 1 new unit test (`test_az896_example_csv_loads_clean`) → green | 3/4 immediate; AC-4 defers to AZ-897 | None | + +### Files touched (AZ-894 + AZ-896) + +Production (`src/gps_denied_onboard/**`): + +- ADDED `replay_input/csv_ground_truth.py` — DTO + `load_csv_ground_truth` parser +- ADDED `components/c8_fc_adapter/csv_replay_adapter.py` — `CsvReplayFcAdapter` +- MODIFIED `replay_input/__init__.py` — re-exports for new symbols +- MODIFIED `config/schema.py` — `ReplayConfig.imu_csv_path` field +- MODIFIED `cli/replay.py` — required `--imu`, deprecated `--tlog`, + path validation, config wiring, startup-banner deprecation notice +- MODIFIED `runtime_root/_replay_branch.py` — branch on + `replay.imu_csv_path` to build the CSV bundle; new `_build_csv_bundle` + helper that instantiates `CsvReplayFcAdapter` +- MODIFIED `runtime_root/__init__.py` — `_run_replay_loop` branches on + CSV vs tlog for ground-truth loading and IMU draining; overrides + `vio_out.emitted_at_ns` with the CSV-derived `frame_end_ns` (AC-4) + +Tests (`tests/**`): + +- ADDED `tests/unit/replay_input/test_csv_ground_truth.py` — 11 tests + covering AC-1 (Derkachi + synthetic paired-sample invariants), + unit-conversion contract, and AC-5 (six schema-fault classes) +- ADDED `tests/unit/c8_fc_adapter/test_csv_replay_adapter.py` — 12 + tests covering build-flag gate, construction validation, open/close + idempotency, protocol surface, unsupported operations, INIT + flight-state fallback, and emit-without-transport errors +- MODIFIED `tests/unit/test_az401_compose_root_replay.py` — renamed + `test_replay_branch_rejects_empty_tlog_path` → + `test_replay_branch_rejects_both_inputs_empty`; widened AC-8 + `allowed_deep_prefixes` to include the new `csv_replay_adapter` + sibling module +- MODIFIED `tests/unit/test_az402_replay_cli.py` — `_required_files` + fixture now provides `imu` CSV path; `_argv` always passes `--imu` + alongside `--tlog`; help-output surface check asserts `--imu` appears +- MODIFIED `tests/e2e/replay/conftest.py` — `DerkachiReplayInputs` + carries `imu_csv_path`; `replay_runner` invokes the CLI with `--imu` + and conditionally forwards `--tlog` for backward-compat coverage + +Docs (`_docs/**`): + +- ADDED `_docs/02_document/contracts/replay/csv_replay_format.md` — + canonical operator-facing format spec +- ADDED `_docs/02_document/contracts/replay/example_data_imu.csv` — + minimal valid example (20 rows = 2 s at 10 Hz, taken from Derkachi + fixture rows 1–20) +- MODIFIED `_docs/02_document/module-layout.md` — `csv_replay_adapter.py` + listed alongside the other c8 replay strategy modules + +## File-Ownership Note + +- `csv_ground_truth.py` lives under `replay_input/` (Layer-4 cross-cutting + per `module-layout.md:407`). OWNED. +- `csv_replay_adapter.py` lives under `c8_fc_adapter/` (Layer-4 adapter + per `module-layout.md:187`). OWNED. The architecture doc now lists it + alongside `tlog_replay_adapter.py` / `replay_sink.py` / + `noop_mavlink_transport.py`. +- `cli/replay.py`, `config/schema.py`, `runtime_root/_replay_branch.py`, + `runtime_root/__init__.py` are all owned by the binary composition + surface — change scope is minimal (additive field + branching gate). +- AZ-401 AC-8 import-boundary gate widened by one entry to allow + `_replay_branch.py` to import the new c8 sibling strategy directly + (precedent: `noop_mavlink_transport`, `replay_sink`). + +## AC Test Coverage + +### AZ-894 + +| AC | Coverage | Test | +|----|----------|------| +| AC-1 (parses Derkachi, paired samples) | Direct | `test_ac1_loads_derkachi_csv_emits_paired_samples` (4,900 samples, not 4,899 — task spec was off by one; docstring records why) | +| AC-2 (`--imu` wired in CLI) | Direct | `test_az402_replay_cli.py::test_ac1_all_required_args_parsed` (and adjacent `test_ac8_mode_set_to_replay`); also exercised by the `--help` surface check `test_ac10_console_script_runs_help` | +| AC-3 (Derkachi 1-min e2e green on Jetson, no AZ-848 cascade) | Indirect (skipped without `RUN_REPLAY_E2E=1`) | `test_derkachi_1min.py::test_ac1_exits_0_jsonl_count_match` — same test now drives `--imu`; exit code 0 + JSONL count match are jointly impossible if AC-4 is violated, so the existing test simultaneously validates AC-3 and AC-4 on Jetson | +| AC-4 (VioOutput.emitted_at_ns from CSV `Time`) | Indirect (skipped without `RUN_REPLAY_E2E=1`) | Same e2e test as AC-3. The runtime loop's `dataclasses.replace(vio_out, emitted_at_ns=frame_end_ns)` is the only path that satisfies AC-4 + AC-3 together; a regression would surface as the AZ-848 cascade | +| AC-5 (schema fault → `ReplayInputAdapterError` at startup) | Direct | `test_ac5_file_not_found_raises`, `test_ac5_missing_required_column_raises`, `test_ac5_nan_in_time_raises`, `test_ac5_non_monotonic_time_raises`, `test_ac5_repeated_time_also_non_monotonic`, `test_ac5_non_numeric_imu_value_raises`, `test_ac5_header_only_raises` | + +**AC-4 coverage rationale**: the `_run_replay_loop` is integration-heavy +and has no existing unit-test seam. Carving one out to assert the +`emitted_at_ns` override directly would expand scope beyond AZ-894 and +the user explicitly chose `continue_now` for this batch. The Jetson e2e +test is the AC-4 backstop: any regression on the override produces an +immediate AZ-848 cascade and fails AC-3 (which is already part of the +ticket's AC set). When AZ-895 lands and the `auto_sync` surface goes +away, the runtime loop simplifies enough that a focused unit test for +the override may become inexpensive — flagged as a follow-up. + +### AZ-896 + +| AC | Coverage | Test | +|----|----------|------| +| AC-1 (all 19 columns documented) | Direct (doc inspection) | `_docs/02_document/contracts/replay/csv_replay_format.md` § "Schema" table — 15 required + 4 tolerated rows | +| AC-2 (3 hard constraints stated up top) | Direct (doc inspection) | Same doc § "Hard contract" appears before the schema table; covers nadir, airborne, aligned-start, plus monotonic / uniformly-spaced | +| AC-3 (example CSV passes adapter) | Direct | `test_az896_example_csv_loads_clean` — loads `example_data_imu.csv` through `load_csv_ground_truth`, asserts ≥10 rows + parser source label + ts_ns[0] == 0 | +| AC-4 (UI links to docs page) | **Deferred** | AZ-897 owns the operator UI; the doc explicitly references it under "Cross-references" so AZ-897 can be authored against a known anchor. AC will fire on AZ-897 acceptance | + +**Total AZ-894 + AZ-896**: 8/9 ACs immediately covered; 1 deferred-by-design +(AZ-896 AC-4 depends on AZ-897). No skipped-without-reason tests. + +## Code Review Verdict: PASS + +Inline review (consistent with batch 01's lightweight approach for a +single user-clarified-design batch). Detailed walk: + +- **Phase 1 (Context)**: AZ-894 + AZ-896 specs read; the three + user-clarified design choices (replace/c8_sibling_full_protocol/ + continue_now) are reflected verbatim in the code shape. +- **Phase 2 (Spec compliance)**: AC-by-AC walkthrough above. AZ-894 AC-4 + has a documented indirect-coverage note (above); no AC is + silently uncovered. +- **Phase 3 (Code quality)**: + - `csv_ground_truth.py` validates structure once at entry, raises + fail-fast on every documented schema fault (AC-5), preserves + byte-for-byte semantics with the tlog adapter for IMU + does + explicit unit conversions for GPS (deg / m / m/s / deg). + - `CsvReplayFcAdapter` mirrors `TlogReplayFcAdapter`'s outbound shape + (MavlinkTransport wiring, position emit, status-text emit) and is + explicit about what is intentionally unused (the telemetry bus, + source-set switching, flight-state). + - Runtime-loop changes are guarded by a single `using_csv` boolean; + legacy tlog path is preserved unchanged for AZ-895 to remove later. + - `cli/replay.py` deprecation banner only fires when `--tlog` is set + AND prints to stderr (matches existing banner-redaction tests). +- **Phase 4 (Security)**: no new credentials, no IPC, no network. CSV + parser uses `csv.DictReader` (stdlib, no eval) and `float()`. CLI + signing-key handling unchanged. +- **Phase 5 (Performance)**: parser is single-pass O(rows); loads the + full Derkachi 4,900-row CSV in well under a second on dev macOS + (`pytest` reports 4.5s for the full unit suite touched). Replay loop + drains IMU samples from a pre-loaded tuple — no async / no thread. +- **Phase 6 (Cross-task consistency)**: + - The CLI banner names "AZ-894 / AZ-895" so the deprecation copy is + accurate when AZ-895 lands. + - `module-layout.md`, the AZ-401 AC-8 allowlist, and the new c8 + sibling are mutually consistent. +- **Phase 7 (Architecture)**: + - New file ownership matches `module-layout.md`. + - Replay branch's deep import widening is mechanical (one allowlist + entry that mirrors the sibling precedent in the same component). + - No new layer rule. + +No `@pytest.mark.xfail` decorators removed → LESSONS 2026-05-26 [testing] +gate not engaged. + +## Auto-Fix Attempts: 0 +## Escalated Findings: 0 +## Stuck Tasks: 0 + +## Tests Run + +Focused local pass on touched modules: + +``` +python -m pytest \ + tests/unit/replay_input/test_csv_ground_truth.py \ + tests/unit/c8_fc_adapter/test_csv_replay_adapter.py \ + tests/unit/test_az401_compose_root_replay.py \ + tests/unit/test_az402_replay_cli.py \ + -v --tb=short +``` +→ **70 passed, 0 failed, 0 skipped**. + +Full unit-suite gate: + +``` +python -m pytest tests/unit/ -v --tb=short -q +``` +→ **2,327 passed, 86 skipped, 3 warnings in 76 s**. All skips have +explicit environmental reasons (Docker compose, CUDA, TensorRT, Tier-2 +hardware, `RUN_REPLAY_E2E=1`). + +## Tracker Transitions + +| Ticket | Step 5 (→ In Progress) | Step 12 (→ In Testing) | +|--------|------------------------|------------------------| +| AZ-894 | _to be transitioned after commit_ | _to be transitioned after commit_ | +| AZ-896 | _to be transitioned after commit_ | _to be transitioned after commit_ | + +This block is updated in-place after the batch commit lands and the +Jira MCP transitions are confirmed via `getTransitionsForJiraIssue` → +`transitionJiraIssue` → read-back. + +## Leftovers / Tracker hygiene + +- No new leftovers produced. +- Still open from prior batches: + - `_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md` + — gtsam numpy-2 wheel not on PyPI; unchanged. + +## Next Batch + +Batch 03 (cycle 4): **AZ-895** — deprecate the `auto_sync` surface +proper. Now that AZ-894 has shipped the CSV-driven primary path, this +batch removes `auto_sync.py`, strips the auto-sync wiring from +`_replay_branch.py`, removes / deprecates `--time-offset-ms` and +`--skip-auto-sync-validation` CLI flags, and re-documents the tlog +adapter as audit-only. The CLI-only half of AZ-895 (deprecating +`--tlog` itself) already landed in this batch per the user's design +choice — batch 03 picks up the runtime / auto-sync infrastructure +half. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index baee421..0b98365 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 10 name: Implement status: in_progress sub_step: - phase: 0 - name: awaiting-invocation - detail: "" + phase: 7 + name: batch-loop + detail: "batch 3 of 4 — AZ-895" retry_count: 0 cycle: 4 tracker: jira diff --git a/src/gps_denied_onboard/cli/replay.py b/src/gps_denied_onboard/cli/replay.py index e179fde..5cb03f8 100644 --- a/src/gps_denied_onboard/cli/replay.py +++ b/src/gps_denied_onboard/cli/replay.py @@ -98,7 +98,32 @@ def _build_argparser() -> argparse.ArgumentParser: ), ) parser.add_argument("--video", required=True, type=Path, metavar="PATH") - parser.add_argument("--tlog", required=True, type=Path, metavar="PATH") + parser.add_argument( + "--imu", + dest="imu", + required=True, + type=Path, + metavar="PATH", + help=( + "Paired Derkachi-schema CSV (IMU + GPS ground truth on a " + "single canonical Time clock). Required for new replay runs " + "(AZ-894). Schema spec: " + "_docs/02_document/contracts/replay/csv_replay_format.md." + ), + ) + parser.add_argument( + "--tlog", + required=False, + default=None, + type=Path, + metavar="PATH", + help=( + "DEPRECATED (AZ-894 → AZ-895): legacy pymavlink .tlog file. " + "Accepted for transitional CLIs but the replay pipeline now " + "drives off --imu; --tlog is ignored when --imu is present. " + "Remove from new invocations." + ), + ) parser.add_argument("--output", required=True, type=Path, metavar="PATH") parser.add_argument( "--camera-calibration", @@ -188,13 +213,15 @@ def _build_argparser() -> argparse.ArgumentParser: def _validate_paths(args: argparse.Namespace) -> None: """Fail fast if any required-file argument is missing or unreadable.""" - paths: tuple[tuple[str, Path], ...] = ( + paths: list[tuple[str, Path]] = [ ("video", args.video), - ("tlog", args.tlog), + ("imu", args.imu), ("camera-calibration", args.camera_calibration), ("config", args.config_path), ("mavlink-signing-key", args.mavlink_signing_key), - ) + ] + if args.tlog is not None: + paths.append(("tlog", args.tlog)) for label, path in paths: if not path.exists(): raise ReplayCliError(f"--{label} path does not exist: {path}") @@ -250,7 +277,8 @@ def _build_replay_config( """ new_replay = ReplayConfig( video_path=str(args.video), - tlog_path=str(args.tlog), + tlog_path=str(args.tlog) if args.tlog is not None else "", + imu_csv_path=str(args.imu), output_path=str(args.output), pace=args.pace, time_offset_ms=args.time_offset_ms, @@ -301,7 +329,8 @@ def _print_startup_banner(args: argparse.Namespace) -> None: Logging is bootstrapped inside the airborne main; this banner gives the operator a single line confirming what the CLI parsed before any - further output. + further output. AZ-894: also surfaces the --tlog deprecation warning + inline so operators see it even when stderr is the only sink. """ sanitised = vars(args).copy() sanitised["mavlink_signing_key"] = "" @@ -310,6 +339,14 @@ def _print_startup_banner(args: argparse.Namespace) -> None: file=sys.stderr, flush=True, ) + if args.tlog is not None: + print( + "gps-denied-replay: WARNING --tlog is deprecated (AZ-894 / AZ-895). " + "The replay pipeline drives off --imu; --tlog is accepted but ignored. " + "Remove it from your invocation.", + file=sys.stderr, + flush=True, + ) # ---------------------------------------------------------------------- diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/csv_replay_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/csv_replay_adapter.py new file mode 100644 index 0000000..2943e4e --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/csv_replay_adapter.py @@ -0,0 +1,312 @@ +"""``CsvReplayFcAdapter`` (AZ-894 / E-DEMO-REPLAY). + +Replay-only :class:`FcAdapter` sibling to :class:`TlogReplayFcAdapter` +that backs the CSV-driven replay input (AZ-894). The CSV variant exists +to remove the AZ-848 / AZ-883 two-clock surface from the replay test/demo +path — the canonical replay loop reads IMU + GPS straight from +:func:`replay_input.csv_ground_truth.load_csv_ground_truth`, so this +adapter's inbound :class:`SubscriptionBus` is intentionally never fed. + +The adapter exists for two reasons: + +1. **Protocol parity (replay_protocol Invariant 1).** The composition + root populates ``components["fc_adapter"]`` and downstream code (e.g. + :func:`_run_replay_loop`) requires a non-``None`` value implementing + the :class:`FcAdapter` Protocol; substituting this thin sibling keeps + the loop's preconditions identical to the tlog path. +2. **Outbound byte equality (Invariant 5).** Encoders write through the + :class:`MavlinkTransport` seam in both modes; this adapter routes + ``emit_external_position`` / ``emit_status_text`` through the injected + transport so the AC-9 ``bytes_written`` invariant holds without + touching ``tlog_replay_adapter.py``. + +Inbound surface is reduced to a no-op subscription bus by design — the +replay loop reads from the CSV directly and never subscribes (mirroring +the documented bypass that the tlog adapter already relies on; see +``runtime_root._run_replay_loop`` docstring "IMU samples are read +SYNCHRONOUSLY…"). +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import TYPE_CHECKING, Any, Final + +from gps_denied_onboard._types.fc import ( + FcKind, + FlightState, + FlightStateSignal, + PortConfig, + Severity, + Subscription, + TelemetryCallback, +) +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_gps_input, + encode_named_value_float, + encode_statustext, + send_via_transport, +) +from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import ( + source_label_to_float, +) +from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + FcAdapterConfigError, + FcEmitError, + FcOpenError, + SourceSetSwitchNotSupportedError, +) +from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport +from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ReplayPace +from gps_denied_onboard.logging import get_logger + +if TYPE_CHECKING: + from gps_denied_onboard._types.emitted import EmittedExternalPosition + from gps_denied_onboard._types.state import EstimatorOutput + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.fdr_client.client import FdrClient + +__all__ = ["CsvReplayFcAdapter"] + + +_BUILD_FLAG: Final[str] = "BUILD_CSV_REPLAY_ADAPTER" +_LOG_KIND_OPENED: Final[str] = "c8.csv_replay.opened" + + +def _build_flag_on() -> bool: + """Return ``True`` when ``BUILD_CSV_REPLAY_ADAPTER`` is a truthy token.""" + raw = os.environ.get(_BUILD_FLAG, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +class CsvReplayFcAdapter: + """Thin :class:`FcAdapter` backing the CSV-driven replay input. + + The constructor signature mirrors :class:`TlogReplayFcAdapter` on the + fields that the composition root threads through, so swapping the + two adapters at construction time is a single-line change inside + :mod:`runtime_root._replay_branch`. Inbound subscription is a no-op + by design (see module docstring). + """ + + __slots__ = ( + "_csv_path", + "_target_fc_dialect", + "_clock", + "_fdr_client", + "_pace", + "_log", + "_bus", + "_opened", + "_closed", + "_mavlink_transport", + "_outbound_mav", + "_sequence_number", + "_clock_us_provider", + "_clock_ms_boot_provider", + ) + + def __init__( + self, + *, + csv_path: Path, + target_fc_dialect: FcKind, + clock: "Clock", + fdr_client: "FdrClient", + pace: ReplayPace = ReplayPace.ASAP, + mavlink_transport: "MavlinkTransport | None" = None, + outbound_mav: Any | None = None, + ) -> None: + if not _build_flag_on(): + raise FcAdapterConfigError( + f"{_BUILD_FLAG} is OFF in this binary; CsvReplayFcAdapter " + "is unavailable. Rebuild with the flag set to ON in the " + "replay binary's Dockerfile." + ) + if not isinstance(csv_path, Path): + raise FcAdapterConfigError( + f"csv_path must be a pathlib.Path; got {type(csv_path).__name__}" + ) + if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV): + raise FcAdapterConfigError( + f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; " + f"got {target_fc_dialect!r}" + ) + if not isinstance(pace, ReplayPace): + raise FcAdapterConfigError( + f"pace must be a ReplayPace enum; got {type(pace).__name__}" + ) + self._csv_path = csv_path + self._target_fc_dialect = target_fc_dialect + self._clock = clock + self._fdr_client = fdr_client + self._pace = pace + self._log = get_logger("c8_fc_adapter.csv_replay") + self._bus = SubscriptionBus() + self._opened = False + self._closed = False + self._mavlink_transport: MavlinkTransport | None = mavlink_transport + self._outbound_mav: Any = outbound_mav + self._sequence_number: int = 0 + self._clock_us_provider = lambda: int(self._clock.monotonic_ns() // 1000) + self._clock_ms_boot_provider = lambda: int( + self._clock.monotonic_ns() // 1_000_000 + ) % 0xFFFFFFFF + + # ------------------------------------------------------------------ + # FcAdapter Protocol implementation + + def open( + self, + port: PortConfig | None = None, + signing_key: bytes | None = None, + ) -> None: + """Validate the CSV exists; lazy-build the outbound MAVLink instance. + + ``port`` and ``signing_key`` are accepted for Protocol parity but + unused (replay has no FC link to open). The actual CSV parsing + happens inside :func:`load_csv_ground_truth` from the runtime + loop; this method only fails fast on a missing file so the + composition root surfaces the same shape of error as the tlog + path. + """ + if self._opened: + raise FcOpenError("CsvReplayFcAdapter already opened") + if not self._csv_path.is_file(): + raise FcOpenError(f"CSV file not found: {self._csv_path}") + if self._mavlink_transport is not None and self._outbound_mav is None: + from pymavlink.dialects.v20 import ardupilotmega as _mavlink + + self._outbound_mav = _mavlink.MAVLink( + file=None, srcSystem=1, srcComponent=1 + ) + self._opened = True + self._log.info( + f"{_LOG_KIND_OPENED}: csv_path={self._csv_path} " + f"dialect={self._target_fc_dialect.value} " + f"pace={self._pace.value}", + extra={ + "kind": _LOG_KIND_OPENED, + "kv": { + "csv_path": str(self._csv_path), + "target_fc_dialect": self._target_fc_dialect.value, + "pace": self._pace.value, + }, + }, + ) + + def close(self) -> None: + """Release any held outbound resources; idempotent.""" + if not self._opened or self._closed: + return + self._closed = True + + def subscribe_telemetry(self, callback: TelemetryCallback) -> Subscription: + # Bus is intentionally never fed in the CSV variant — the replay + # loop reads IMU + GPS directly from the parsed CsvGroundTruth. + # We still hand back a real Subscription so Protocol consumers + # (and any future inbound-mirroring code) get a no-op handle + # instead of a contract violation. + return self._bus.subscribe(callback) + + def emit_external_position( + self, output: "EstimatorOutput" + ) -> "EmittedExternalPosition": + from gps_denied_onboard._types.emitted import EmittedExternalPosition + + if self._mavlink_transport is None or self._outbound_mav is None: + raise FcEmitError("replay adapter does not emit to FC") + if output.smoothed: + raise FcEmitError( + "smoothed output cannot be emitted to FC (Invariant 6)" + ) + wgs = output.position_wgs84 + if not isinstance(wgs, LatLonAlt): + raise FcEmitError( + f"EstimatorOutput.position_wgs84 must be a LatLonAlt; " + f"got {type(wgs).__name__}" + ) + emitted_at = self._clock.monotonic_ns() + self._sequence_number += 1 + seq = self._sequence_number + try: + gps_msg = encode_gps_input( + self._outbound_mav, + time_usec=int(self._clock_us_provider()), + gps_id=0, + ignore_flags=0, + time_week_ms=0, + time_week=0, + fix_type=3, + lat=int(wgs.lat_deg * 1e7), + lon=int(wgs.lon_deg * 1e7), + alt=float(wgs.alt_m), + hdop=0.0, + vdop=0.0, + vn=0.0, + ve=0.0, + vd=0.0, + speed_accuracy=0.0, + horiz_accuracy=0.0, + vert_accuracy=0.0, + satellites_visible=10, + yaw=0, + ) + send_via_transport(self._outbound_mav, gps_msg, self._mavlink_transport) + label_msg = encode_named_value_float( + self._outbound_mav, + time_boot_ms=int(self._clock_ms_boot_provider()), + name=b"src_lbl", + value=source_label_to_float(output.source_label), + ) + send_via_transport( + self._outbound_mav, label_msg, self._mavlink_transport + ) + except Exception as exc: + raise FcEmitError( + f"replay outbound wire emit failed: {exc!r}" + ) from exc + return EmittedExternalPosition( + fc_kind=FcKind.ARDUPILOT_PLANE, + horiz_accuracy_m=0.0, + source_label=output.source_label, + emitted_at=emitted_at, + sequence_number=seq, + ) + + def emit_status_text(self, msg: str, severity: Severity) -> None: + if self._mavlink_transport is None or self._outbound_mav is None: + raise FcEmitError("replay adapter does not emit to FC") + try: + text = msg.encode("utf-8")[:50] + txt_msg = encode_statustext( + self._outbound_mav, + severity=int(severity.value), + text=text, + ) + send_via_transport(self._outbound_mav, txt_msg, self._mavlink_transport) + except Exception as exc: + raise FcEmitError( + f"replay outbound statustext failed: {exc!r}" + ) from exc + + def request_source_set_switch(self) -> None: + raise SourceSetSwitchNotSupportedError( + "CsvReplayFcAdapter cannot issue MAV_CMD_SET_EKF_SOURCE_SET; " + "replay reads telemetry from a recorded CSV" + ) + + def current_flight_state(self) -> FlightStateSignal: + # The CSV does not carry MAVLink HEARTBEAT, so we cannot derive a + # latched flight-state. Returning INIT mirrors what the tlog adapter + # returns before its first decoded heartbeat; the replay loop never + # consumes this value (it drives the loop from the CSV directly). + return FlightStateSignal( + state=FlightState.INIT, + last_valid_gps_hint_wgs84=None, + last_valid_gps_age_ms=None, + captured_at=self._clock.monotonic_ns(), + ) diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index 0ea714f..75da1ca 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -347,7 +347,18 @@ class ReplayConfig: ``.h264``). Empty string is the default sentinel; a non-empty value is required when ``mode == "replay"``. tlog_path: Filesystem path to the matching pymavlink ``.tlog``. - Empty string is the default sentinel. + Empty string is the default sentinel. Deprecated as of AZ-894 — + the canonical replay input is now the CSV pair routed through + :attr:`imu_csv_path`. ``tlog_path`` remains valid only for + legacy callers that have not migrated to the CSV input; AZ-895 + removes it entirely. + imu_csv_path: Filesystem path to the paired Derkachi-schema CSV + (IMU + GPS-ground-truth on a single canonical ``Time`` clock). + Empty string is the default sentinel; a non-empty value is + required for new replay runs (AZ-894). Setting this in + combination with ``tlog_path`` is permitted during the + AZ-894 → AZ-895 migration window: ``imu_csv_path`` wins; the + tlog path is then unused. output_path: Filesystem path the :class:`JsonlReplaySink` will write to. Default points at ``/tmp/replay.jsonl`` for developer ergonomics; production wiring overrides via the @@ -390,6 +401,7 @@ class ReplayConfig: video_path: str = "" tlog_path: str = "" + imu_csv_path: str = "" output_path: str = "/tmp/replay.jsonl" pace: str = "asap" time_offset_ms: int | None = None diff --git a/src/gps_denied_onboard/replay_input/__init__.py b/src/gps_denied_onboard/replay_input/__init__.py index 2097755..379cbd2 100644 --- a/src/gps_denied_onboard/replay_input/__init__.py +++ b/src/gps_denied_onboard/replay_input/__init__.py @@ -20,6 +20,11 @@ path. """ from gps_denied_onboard._types.route import RouteSpec +from gps_denied_onboard.replay_input.csv_ground_truth import ( + CsvGpsFix, + CsvGroundTruth, + load_csv_ground_truth, +) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError from gps_denied_onboard.replay_input.interface import ( AlignedWindow, @@ -42,6 +47,8 @@ __all__ = [ "AlignedWindow", "AutoSyncConfig", "AutoSyncDecision", + "CsvGpsFix", + "CsvGroundTruth", "ReplayInputAdapter", "ReplayInputAdapterError", "ReplayInputBundle", @@ -50,5 +57,6 @@ __all__ = [ "TlogGpsFix", "TlogGroundTruth", "extract_route_from_tlog", + "load_csv_ground_truth", "load_tlog_ground_truth", ] diff --git a/src/gps_denied_onboard/replay_input/csv_ground_truth.py b/src/gps_denied_onboard/replay_input/csv_ground_truth.py new file mode 100644 index 0000000..bb01756 --- /dev/null +++ b/src/gps_denied_onboard/replay_input/csv_ground_truth.py @@ -0,0 +1,255 @@ +"""CSV-driven IMU + GPS ground-truth extractor (AZ-894 / E-DEMO-REPLAY). + +Streams paired IMU samples and GPS-truth fixes from a Derkachi-schema CSV +into a typed :class:`CsvGroundTruth` DTO. Sibling to :mod:`tlog_ground_truth` +— the CSV variant lets the replay pipeline run on a single canonical clock +(the CSV's ``Time`` column), eliminating the AZ-848 / AZ-883 two-clock +mismatch that cycle-3 surfaced. + +Schema (19 cols, header-required): + +* ``timestamp(ms)`` — FC-boot-relative ms, kept for traceability only. +* ``Time`` — flight-relative seconds, the canonical clock. +* ``SCALED_IMU2.{x,y,z}{acc,gyro,mag}`` — 10 Hz IMU stream (raw mg / mrad/s + / mGauss per ArduPilot convention; preserved unchanged to match the + byte-for-byte semantics the tlog adapter uses for ``ImuSample.accel_xyz`` + and ``gyro_xyz``). +* ``GLOBAL_POSITION_INT.{lat,lon,alt,relative_alt,vx,vy,vz,hdg}`` — 10 Hz + GPS truth. ``lat``/``lon`` already in degrees (Derkachi dump format), + ``alt`` in mm, ``vx``/``vy``/``vz`` in cm/s, ``hdg`` in cdeg. + +The full operator-facing schema spec lives in +``_docs/02_document/contracts/replay/csv_replay_format.md`` (AZ-896). +""" + +from __future__ import annotations + +import csv +import logging +import math +from dataclasses import dataclass +from pathlib import Path +from typing import Final + +from gps_denied_onboard._types.nav import ImuSample +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError + +__all__ = [ + "CSV_SOURCE_LABEL", + "REQUIRED_COLUMNS", + "CsvGpsFix", + "CsvGroundTruth", + "load_csv_ground_truth", +] + + +_LOGGER = logging.getLogger("gps_denied_onboard.replay_input.csv_ground_truth") + +CSV_SOURCE_LABEL: Final[str] = "GLOBAL_POSITION_INT_CSV" + +REQUIRED_COLUMNS: Final[tuple[str, ...]] = ( + "timestamp(ms)", + "Time", + "SCALED_IMU2.xacc", + "SCALED_IMU2.yacc", + "SCALED_IMU2.zacc", + "SCALED_IMU2.xgyro", + "SCALED_IMU2.ygyro", + "SCALED_IMU2.zgyro", + "GLOBAL_POSITION_INT.lat", + "GLOBAL_POSITION_INT.lon", + "GLOBAL_POSITION_INT.alt", + "GLOBAL_POSITION_INT.vx", + "GLOBAL_POSITION_INT.vy", + "GLOBAL_POSITION_INT.vz", + "GLOBAL_POSITION_INT.hdg", +) + +# Per the Derkachi dump format the lat/lon columns are already in degrees +# (pre-converted from MAVLink's int×1e7). alt stays in mm; vx/vy/vz in +# cm/s; hdg in cdeg. +_MM_PER_M: Final[float] = 1000.0 +_CM_PER_M_S: Final[float] = 100.0 +_CDEG_PER_DEG: Final[float] = 100.0 +_S_TO_NS: Final[float] = 1.0e9 + + +@dataclass(frozen=True, slots=True) +class CsvGpsFix: + """One time-aligned GPS-truth row extracted from the CSV. + + Field semantics match :class:`tlog_ground_truth.TlogGpsFix` so the + replay loop's downstream consumers see an identical shape regardless + of input format. + """ + + ts_ns: int + lat_deg: float + lon_deg: float + alt_m: float + hdg_deg: float + vx_m_s: float + vy_m_s: float + vz_m_s: float + + +@dataclass(frozen=True, slots=True) +class CsvGroundTruth: + """Paired IMU + GPS series extracted from a Derkachi-schema CSV. + + Attributes: + records: Time-ordered GPS fixes. + imu_samples: Time-ordered IMU samples; one per CSV row (10 Hz). + Values preserve the raw mg / mrad/s units the tlog adapter + also passes through so C1 / C5 see identical numeric shapes + regardless of input source. + source: Discriminator label echoed into the cold-start FDR + event (``"GLOBAL_POSITION_INT_CSV"`` for the Derkachi CSV). + """ + + records: tuple[CsvGpsFix, ...] + imu_samples: tuple[ImuSample, ...] + source: str + + +def load_csv_ground_truth(csv_path: Path) -> CsvGroundTruth: + """Parse the Derkachi-schema CSV into a :class:`CsvGroundTruth`. + + Performs all schema validation at entry so a malformed CSV raises a + single :class:`ReplayInputAdapterError` (AC-5 fail-fast) rather than + surfacing the same problem as a downstream NaN deep inside the loop. + + Args: + csv_path: Path to the CSV file. Existence is checked at entry. + + Returns: + A :class:`CsvGroundTruth` with row-aligned GPS records and IMU + samples. The IMU and GPS sequences share the same length (one + entry per CSV row). + + Raises: + ReplayInputAdapterError: When the file is missing, the header + is missing a required column, the ``Time`` column contains + NaN / non-monotonic values, or any required numeric column + contains an unparseable value. + """ + if not csv_path.is_file(): + raise ReplayInputAdapterError(f"CSV file not found: {csv_path}") + + with csv_path.open("r", encoding="utf-8", newline="") as handle: + reader = csv.DictReader(handle) + fieldnames = reader.fieldnames + if fieldnames is None: + raise ReplayInputAdapterError( + f"CSV {csv_path} is empty (no header row)" + ) + missing = [col for col in REQUIRED_COLUMNS if col not in fieldnames] + if missing: + raise ReplayInputAdapterError( + f"CSV {csv_path} missing required columns: {missing}" + ) + + gps_records: list[CsvGpsFix] = [] + imu_samples: list[ImuSample] = [] + last_ts_ns: int | None = None + for row_idx, row in enumerate(reader, start=2): + ts_ns = _parse_time_ns(row, csv_path, row_idx) + if last_ts_ns is not None and ts_ns <= last_ts_ns: + raise ReplayInputAdapterError( + f"CSV {csv_path} row {row_idx}: non-monotonic Time " + f"({ts_ns / _S_TO_NS:.6f} s <= previous " + f"{last_ts_ns / _S_TO_NS:.6f} s)" + ) + last_ts_ns = ts_ns + imu_samples.append(_parse_imu(row, csv_path, row_idx, ts_ns)) + gps_records.append(_parse_gps(row, csv_path, row_idx, ts_ns)) + + if not gps_records: + raise ReplayInputAdapterError( + f"CSV {csv_path} has a header but no data rows" + ) + + _LOGGER.info( + "csv_ground_truth.loaded: csv_path=%s rows=%d span_s=%.3f", + csv_path, + len(gps_records), + (gps_records[-1].ts_ns - gps_records[0].ts_ns) / _S_TO_NS, + ) + + return CsvGroundTruth( + records=tuple(gps_records), + imu_samples=tuple(imu_samples), + source=CSV_SOURCE_LABEL, + ) + + +def _parse_time_ns(row: dict[str, str], path: Path, row_idx: int) -> int: + raw = row.get("Time", "") + try: + seconds = float(raw) + except ValueError as exc: + raise ReplayInputAdapterError( + f"CSV {path} row {row_idx}: Time={raw!r} is not a number" + ) from exc + if math.isnan(seconds) or math.isinf(seconds): + raise ReplayInputAdapterError( + f"CSV {path} row {row_idx}: Time={raw!r} is NaN/Inf" + ) + return int(seconds * _S_TO_NS) + + +def _parse_imu( + row: dict[str, str], + path: Path, + row_idx: int, + ts_ns: int, +) -> ImuSample: + accel = ( + _parse_float(row, "SCALED_IMU2.xacc", path, row_idx), + _parse_float(row, "SCALED_IMU2.yacc", path, row_idx), + _parse_float(row, "SCALED_IMU2.zacc", path, row_idx), + ) + gyro = ( + _parse_float(row, "SCALED_IMU2.xgyro", path, row_idx), + _parse_float(row, "SCALED_IMU2.ygyro", path, row_idx), + _parse_float(row, "SCALED_IMU2.zgyro", path, row_idx), + ) + return ImuSample(ts_ns=ts_ns, accel_xyz=accel, gyro_xyz=gyro) + + +def _parse_gps( + row: dict[str, str], + path: Path, + row_idx: int, + ts_ns: int, +) -> CsvGpsFix: + return CsvGpsFix( + ts_ns=ts_ns, + lat_deg=_parse_float(row, "GLOBAL_POSITION_INT.lat", path, row_idx), + lon_deg=_parse_float(row, "GLOBAL_POSITION_INT.lon", path, row_idx), + alt_m=_parse_float(row, "GLOBAL_POSITION_INT.alt", path, row_idx) / _MM_PER_M, + hdg_deg=_parse_float(row, "GLOBAL_POSITION_INT.hdg", path, row_idx) / _CDEG_PER_DEG, + vx_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vx", path, row_idx) / _CM_PER_M_S, + vy_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vy", path, row_idx) / _CM_PER_M_S, + vz_m_s=_parse_float(row, "GLOBAL_POSITION_INT.vz", path, row_idx) / _CM_PER_M_S, + ) + + +def _parse_float( + row: dict[str, str], + column: str, + path: Path, + row_idx: int, +) -> float: + raw = row.get(column, "") + try: + value = float(raw) + except ValueError as exc: + raise ReplayInputAdapterError( + f"CSV {path} row {row_idx}: {column}={raw!r} is not a number" + ) from exc + if math.isnan(value) or math.isinf(value): + raise ReplayInputAdapterError( + f"CSV {path} row {row_idx}: {column}={raw!r} is NaN/Inf" + ) + return value diff --git a/src/gps_denied_onboard/runtime_root/__init__.py b/src/gps_denied_onboard/runtime_root/__init__.py index 7c5f323..fff7629 100644 --- a/src/gps_denied_onboard/runtime_root/__init__.py +++ b/src/gps_denied_onboard/runtime_root/__init__.py @@ -789,6 +789,7 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: fixes (cold-start impossible), or when the estimator raises a fatal error. ``EXIT_SUCCESS`` (0) on clean completion. """ + import dataclasses import time from gps_denied_onboard._types.geo import LatLonAlt @@ -802,6 +803,9 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: EstimatorFatalError, ) from gps_denied_onboard.logging import get_logger + from gps_denied_onboard.replay_input.csv_ground_truth import ( + load_csv_ground_truth, + ) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError from gps_denied_onboard.replay_input.tlog_ground_truth import ( load_tlog_ground_truth, @@ -856,38 +860,66 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: # explicitly. calibration = _load_camera_calibration(config) - # Cold-start origin from tlog's first GPS fix. This is the - # ADR-010 / Principle #11 documented fallback when no operator - # Manifest is available. ESKF/GTSAM both require an origin - # before the first add_fc_imu (else EstimatorAlreadyStartedError). + # Cold-start origin from the first GPS fix in the configured input. + # AZ-894: prefer the CSV ground-truth (single canonical clock); fall + # back to the legacy tlog path when ``imu_csv_path`` is unset (the + # AZ-895 deprecation completes the removal). + csv_path_str = config.replay.imu_csv_path tlog_path_str = config.replay.tlog_path - _log.info( - "replay_loop.loading_gps_for_cold_start: tlog_path=%s", - tlog_path_str, - extra={ - "kind": "replay_loop.loading_gps_for_cold_start", - "kv": {"tlog_path": tlog_path_str}, - }, - ) - try: - gt = load_tlog_ground_truth(Path(tlog_path_str)) - except ReplayInputAdapterError as exc: - _log.error( - "replay_loop.tlog_load_failed: %r", - exc, + using_csv = bool(csv_path_str) + if using_csv: + _log.info( + "replay_loop.loading_gps_for_cold_start: imu_csv_path=%s", + csv_path_str, extra={ - "kind": "replay_loop.tlog_load_failed", - "kv": {"error": repr(exc)}, + "kind": "replay_loop.loading_gps_for_cold_start", + "kv": {"imu_csv_path": csv_path_str}, }, ) - return EXIT_GENERIC_FAILURE + try: + csv_gt = load_csv_ground_truth(Path(csv_path_str)) + except ReplayInputAdapterError as exc: + _log.error( + "replay_loop.csv_load_failed: %r", + exc, + extra={ + "kind": "replay_loop.csv_load_failed", + "kv": {"error": repr(exc)}, + }, + ) + return EXIT_GENERIC_FAILURE + gt = csv_gt + else: + _log.info( + "replay_loop.loading_gps_for_cold_start: tlog_path=%s", + tlog_path_str, + extra={ + "kind": "replay_loop.loading_gps_for_cold_start", + "kv": {"tlog_path": tlog_path_str}, + }, + ) + try: + gt = load_tlog_ground_truth(Path(tlog_path_str)) + except ReplayInputAdapterError as exc: + _log.error( + "replay_loop.tlog_load_failed: %r", + exc, + extra={ + "kind": "replay_loop.tlog_load_failed", + "kv": {"error": repr(exc)}, + }, + ) + return EXIT_GENERIC_FAILURE if not gt.records: _log.error( - "replay_loop.cold_start_impossible: tlog has no GPS messages, " + "replay_loop.cold_start_impossible: input has no GPS fixes, " "cannot seed C5 set_takeoff_origin", extra={ "kind": "replay_loop.cold_start_impossible", - "kv": {"tlog_path": tlog_path_str}, + "kv": { + "input_path": csv_path_str if using_csv else tlog_path_str, + "input_kind": "csv" if using_csv else "tlog", + }, }, ) return EXIT_GENERIC_FAILURE @@ -924,22 +956,29 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: }, ) - # Open the tlog directly for synchronous IMU read. Bypasses the - # decode-thread race in TlogReplayFcAdapter (see docstring). - try: - from pymavlink import mavutil # type: ignore[import-untyped] - except ImportError as exc: - _log.error( - "replay_loop.pymavlink_unavailable: %r", - exc, - extra={ - "kind": "replay_loop.pymavlink_unavailable", - "kv": {"error": repr(exc)}, - }, - ) - return EXIT_GENERIC_FAILURE - - tlog_reader = mavutil.mavlink_connection(str(tlog_path_str)) + # IMU source. CSV path: walk the pre-loaded list (gt.imu_samples) + # so the loop's IMU draining stays on the single canonical clock. + # Tlog path (legacy): open the tlog directly for synchronous IMU + # read; bypasses the decode-thread race in TlogReplayFcAdapter. + tlog_reader: Any = None + csv_imu_samples: tuple[ImuSample, ...] = () + csv_imu_idx = 0 + if using_csv: + csv_imu_samples = csv_gt.imu_samples + else: + try: + from pymavlink import mavutil # type: ignore[import-untyped] + except ImportError as exc: + _log.error( + "replay_loop.pymavlink_unavailable: %r", + exc, + extra={ + "kind": "replay_loop.pymavlink_unavailable", + "kv": {"error": repr(exc)}, + }, + ) + return EXIT_GENERIC_FAILURE + tlog_reader = mavutil.mavlink_connection(str(tlog_path_str)) # IMU sample buffer used to build per-frame ImuWindows. We # accumulate every RAW_IMU/SCALED_IMU2 sample whose FC-clock @@ -949,32 +988,41 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: imu_eof = False def _drain_imu_until(target_ns: int) -> None: - """Advance the tlog reader, appending IMU samples up to ``target_ns``. + """Advance the IMU source, appending samples up to ``target_ns``. - Stops at end-of-stream (``recv_match`` returns ``None``). - Mirrors :meth:`TlogReplayFcAdapter._handle_imu` for sample - construction so the bytes-on-wire and the synchronous-read - paths produce identical IMU samples. + Branches on ``using_csv`` so the closure stays a single + definition. Both branches respect the same ``pending_imu`` + buffer + ``imu_anchor_ns`` / ``imu_eof`` state and produce + :class:`ImuSample` instances with identical numeric semantics + — the tlog branch matches :meth:`TlogReplayFcAdapter._handle_imu` + for byte-for-byte compatibility with the legacy path. """ - nonlocal imu_anchor_ns, imu_eof + nonlocal imu_anchor_ns, imu_eof, csv_imu_idx while not imu_eof: if pending_imu and pending_imu[-1].ts_ns >= target_ns: return - msg = tlog_reader.recv_match( - type=["RAW_IMU", "SCALED_IMU2"], - blocking=False, - ) - if msg is None: - imu_eof = True - return - ts_ns = int(getattr(msg, "time_usec", 0)) * 1000 - if ts_ns == 0: - continue - sample = ImuSample( - ts_ns=ts_ns, - accel_xyz=(float(msg.xacc), float(msg.yacc), float(msg.zacc)), - gyro_xyz=(float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)), - ) + if using_csv: + if csv_imu_idx >= len(csv_imu_samples): + imu_eof = True + return + sample = csv_imu_samples[csv_imu_idx] + csv_imu_idx += 1 + else: + msg = tlog_reader.recv_match( + type=["RAW_IMU", "SCALED_IMU2"], + blocking=False, + ) + if msg is None: + imu_eof = True + return + ts_ns = int(getattr(msg, "time_usec", 0)) * 1000 + if ts_ns == 0: + continue + sample = ImuSample( + ts_ns=ts_ns, + accel_xyz=(float(msg.xacc), float(msg.yacc), float(msg.zacc)), + gyro_xyz=(float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)), + ) if imu_anchor_ns is None: imu_anchor_ns = sample.ts_ns pending_imu.append(sample) @@ -1141,6 +1189,17 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: return EXIT_GENERIC_FAILURE if vio_out is not None: + # AZ-894 AC-4: when the CSV adapter drives the loop, + # stamp VioOutput.emitted_at_ns with the CSV-derived + # frame timestamp (single-clock invariant). Without + # this, C1 produces a Jetson-monotonic timestamp that + # disagrees with the CSV-anchored ImuWindow.ts_end_ns + # the estimator consumed the line before — exactly + # the AZ-848 two-clock surface this ticket eliminates. + if using_csv: + vio_out = dataclasses.replace( + vio_out, emitted_at_ns=frame_end_ns + ) try: state_estimator.add_vio(vio_out) except EstimatorDegradedError as exc: @@ -1203,17 +1262,18 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: if slack_ns > 0: time.sleep(slack_ns / 1_000_000_000.0) finally: - try: - tlog_reader.close() - except Exception as exc: # pragma: no cover — defensive. - _log.debug( - "replay_loop.tlog_reader_close_error: %r", - exc, - extra={ - "kind": "replay_loop.tlog_reader_close_error", - "kv": {"error": repr(exc)}, - }, - ) + if tlog_reader is not None: + try: + tlog_reader.close() + except Exception as exc: # pragma: no cover — defensive. + _log.debug( + "replay_loop.tlog_reader_close_error: %r", + exc, + extra={ + "kind": "replay_loop.tlog_reader_close_error", + "kv": {"error": repr(exc)}, + }, + ) _log.info( "replay_loop.complete: frames=%d emitted=%d vio_init_skipped=%d " diff --git a/src/gps_denied_onboard/runtime_root/_replay_branch.py b/src/gps_denied_onboard/runtime_root/_replay_branch.py index e85f9b5..26a44e9 100644 --- a/src/gps_denied_onboard/runtime_root/_replay_branch.py +++ b/src/gps_denied_onboard/runtime_root/_replay_branch.py @@ -36,6 +36,9 @@ from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.fc import FcKind +from gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter import ( + CsvReplayFcAdapter, +) from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import ( NoopMavlinkTransport, ) @@ -44,6 +47,7 @@ from gps_denied_onboard.components.c8_fc_adapter.replay_sink import ( ) from gps_denied_onboard.config import Config from gps_denied_onboard.fdr_client import make_fdr_client +from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource from gps_denied_onboard.helpers.wgs_converter import WgsConverter from gps_denied_onboard.logging import get_logger from gps_denied_onboard.replay_input import ( @@ -73,6 +77,12 @@ REPLAY_BUILD_FLAGS: Final[tuple[str, ...]] = ( "BUILD_REPLAY_SINK_JSONL", ) +# AZ-894: separate build flag for the CSV adapter so the replay binary +# can opt into the new path without disturbing the BUILD_TLOG_* gate +# (the tlog adapter is still composed by _build_replay_input_bundle's +# legacy branch until AZ-895 removes it). +_CSV_REPLAY_BUILD_FLAG: Final[str] = "BUILD_CSV_REPLAY_ADAPTER" + REPLAY_COMPONENT_KEYS: Final[tuple[str, ...]] = ( "frame_source", @@ -175,14 +185,21 @@ def _validate_build_flags() -> None: def _validate_replay_paths(config: Config) -> None: - """Reject empty / missing replay paths early with a precise message.""" + """Reject empty / missing replay paths early with a precise message. + + AZ-894: ``imu_csv_path`` is the canonical replay input. ``tlog_path`` + remains valid for the legacy auto-sync path until AZ-895 removes it, + but exactly one of the two must be set so the composition root can + pick a single branch. + """ if not config.replay.video_path: raise CompositionError( "config.replay.video_path is empty; replay mode requires a video path" ) - if not config.replay.tlog_path: + if not config.replay.imu_csv_path and not config.replay.tlog_path: raise CompositionError( - "config.replay.tlog_path is empty; replay mode requires a tlog path" + "config.replay.imu_csv_path is empty and no tlog_path fallback is set; " + "replay mode requires an IMU+GPS CSV (AZ-894) or a tlog file (legacy)" ) if not config.replay.output_path: raise CompositionError( @@ -197,13 +214,31 @@ def _build_replay_input_bundle( adapter_factory: Any | None, mavlink_transport: Any | None = None, ) -> ReplayInputBundle: - """Build the :class:`ReplayInputAdapter` and call ``open()``.""" + """Build the replay input bundle and open the underlying strategies. + + AZ-894: branches on ``config.replay.imu_csv_path`` — when set, builds + the :class:`CsvReplayFcAdapter` + :class:`VideoFileFrameSource` pair + on a single canonical clock derived from the CSV's ``Time`` column; + when unset, falls back to the legacy :class:`ReplayInputAdapter` + tlog path (auto-sync + AC-9 validator). AZ-895 removes the legacy + branch. + """ pace = _resolve_pace(config.replay.pace) target_fc_dialect = _resolve_fc_kind(config.replay.target_fc_dialect) - auto_sync = _build_auto_sync_config(config) camera_calibration = _load_camera_calibration(config) wgs_converter = WgsConverter() + if config.replay.imu_csv_path: + return _build_csv_bundle( + config, + fdr_client=fdr_client, + pace=pace, + target_fc_dialect=target_fc_dialect, + camera_calibration=camera_calibration, + mavlink_transport=mavlink_transport, + ) + + auto_sync = _build_auto_sync_config(config) if adapter_factory is not None: adapter = adapter_factory( config=config, @@ -233,6 +268,56 @@ def _build_replay_input_bundle( return adapter.open() +def _build_csv_bundle( + config: Config, + *, + fdr_client: "FdrClient", + pace: ReplayPace, + target_fc_dialect: FcKind, + camera_calibration: CameraCalibration, + mavlink_transport: Any | None, +) -> ReplayInputBundle: + """Compose the AZ-894 CSV bundle (frame source + CSV FC adapter + clock). + + No auto-sync / auto-trim is run — the CSV's ``Time`` column is the + single canonical clock by construction, so ``resolved_time_offset_ms`` + is fixed at 0 and ``auto_sync_result`` / ``aligned_window`` are + ``None``. + """ + from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock + from gps_denied_onboard.clock.wall_clock import WallClock + + csv_path = Path(config.replay.imu_csv_path) + if not csv_path.is_file(): + raise CompositionError( + f"config.replay.imu_csv_path points at a missing file: {csv_path}" + ) + + clock = TlogDerivedClock(source=iter([])) if pace is ReplayPace.ASAP else WallClock() + frame_source = VideoFileFrameSource( + path=Path(config.replay.video_path), + camera_calibration_id=camera_calibration.camera_id, + clock=clock, + ) + fc_adapter = CsvReplayFcAdapter( + csv_path=csv_path, + target_fc_dialect=target_fc_dialect, + clock=clock, + fdr_client=fdr_client, + pace=pace, + mavlink_transport=mavlink_transport, + ) + fc_adapter.open() + return ReplayInputBundle( + frame_source=frame_source, + fc_adapter=fc_adapter, + clock=clock, + resolved_time_offset_ms=0, + auto_sync_result=None, + aligned_window=None, + ) + + def _resolve_pace(raw: str) -> ReplayPace: if raw == "asap": return ReplayPace.ASAP diff --git a/tests/e2e/replay/conftest.py b/tests/e2e/replay/conftest.py index db3a6c0..7e5fb38 100644 --- a/tests/e2e/replay/conftest.py +++ b/tests/e2e/replay/conftest.py @@ -69,6 +69,7 @@ class DerkachiReplayInputs: video_path: Path tlog_path: Path + imu_csv_path: Path calibration_path: Path config_path: Path signing_key_path: Path @@ -170,6 +171,7 @@ def derkachi_replay_inputs(tmp_path_factory: pytest.TempPathFactory) -> Derkachi return DerkachiReplayInputs( video_path=video_path, tlog_path=tlog_path, + imu_csv_path=csv_path, calibration_path=_calibration_path(), config_path=config_path, signing_key_path=signing_key_path, @@ -241,8 +243,8 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any: binary, "--video", str(derkachi_replay_inputs.video_path), - "--tlog", - str(derkachi_replay_inputs.tlog_path), + "--imu", + str(derkachi_replay_inputs.imu_csv_path), "--output", str(out_path), "--camera-calibration", @@ -254,6 +256,12 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any: "--pace", pace, ] + # --tlog is deprecated under AZ-894 but we still forward it + # when the synth tlog exists, so the legacy-path e2e tests + # (test_derkachi_real_tlog.py) keep exercising the deprecation + # warning until AZ-895 deletes the path entirely. + if derkachi_replay_inputs.tlog_path.is_file(): + argv.extend(["--tlog", str(derkachi_replay_inputs.tlog_path)]) if time_offset_ms is not None: argv.extend(["--time-offset-ms", str(time_offset_ms)]) if skip_auto_sync: diff --git a/tests/unit/c8_fc_adapter/test_csv_replay_adapter.py b/tests/unit/c8_fc_adapter/test_csv_replay_adapter.py new file mode 100644 index 0000000..d330719 --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_csv_replay_adapter.py @@ -0,0 +1,249 @@ +"""AZ-894 — ``CsvReplayFcAdapter`` unit tests. + +Focused contract coverage for the thin :class:`FcAdapter` sibling that +backs the CSV-driven replay input. The functional inbound/outbound +plumbing the runtime loop relies on (CSV parsing, frame-stamped IMU +draining, ESKF cold-start origin) is exercised in +``tests/unit/replay_input/test_csv_ground_truth.py`` and the AZ-404 +e2e harness; here we pin the Protocol surface (open/close idempotency, +build-flag refusal, source-set refusal, transport-less emit refusal) +so a refactor of the adapter cannot silently regress Invariant 5 or +the FcAdapter Protocol parity that the composition root depends on. + +Style: every test follows the Arrange / Act / Assert pattern. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from gps_denied_onboard._types.fc import FcKind, FlightState, Severity +from gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter import ( + CsvReplayFcAdapter, +) +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + FcAdapterConfigError, + FcEmitError, + FcOpenError, + SourceSetSwitchNotSupportedError, +) +from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( + ReplayPace, +) + + +@pytest.fixture(autouse=True) +def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "ON") + + +class _FakeClock: + """Minimal Clock stub returning a monotonic counter in ns.""" + + def __init__(self) -> None: + self._t = 0 + + def monotonic_ns(self) -> int: + self._t += 1 + return self._t + + def sleep_until_ns(self, _: int) -> None: # pragma: no cover — unused. + return None + + +class _FakeFdr: + """No-op FDR client stand-in (CsvReplayFcAdapter does not emit FDR yet).""" + + def enqueue(self, _record: object) -> None: # pragma: no cover — unused. + return None + + +@pytest.fixture() +def csv_file(tmp_path: Path) -> Path: + # Existence is the only check the adapter does at open() time — + # the body never has to be parseable here (parsing lives in + # csv_ground_truth.load_csv_ground_truth, tested separately). + path = tmp_path / "data_imu.csv" + path.write_text("placeholder", encoding="utf-8") + return path + + +def _make_adapter(csv_path: Path) -> CsvReplayFcAdapter: + return CsvReplayFcAdapter( + csv_path=csv_path, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + clock=_FakeClock(), + fdr_client=_FakeFdr(), + pace=ReplayPace.ASAP, + ) + + +# ---------------------------------------------------------------------- +# Build flag + + +def test_construction_refused_when_build_flag_off( + monkeypatch: pytest.MonkeyPatch, csv_file: Path +) -> None: + # Arrange + monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "OFF") + + # Act + Assert + with pytest.raises(FcAdapterConfigError, match="BUILD_CSV_REPLAY_ADAPTER"): + _make_adapter(csv_file) + + +def test_construction_rejects_non_path(csv_file: Path) -> None: + # Arrange — argument intentionally a str rather than Path. + + # Act + Assert + with pytest.raises(FcAdapterConfigError, match="csv_path must be a pathlib.Path"): + CsvReplayFcAdapter( + csv_path=str(csv_file), # type: ignore[arg-type] + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + clock=_FakeClock(), + fdr_client=_FakeFdr(), + pace=ReplayPace.ASAP, + ) + + +def test_construction_rejects_unknown_dialect(csv_file: Path) -> None: + # Act + Assert + with pytest.raises(FcAdapterConfigError, match="target_fc_dialect"): + CsvReplayFcAdapter( + csv_path=csv_file, + target_fc_dialect=FcKind.GCS_QGC, + clock=_FakeClock(), + fdr_client=_FakeFdr(), + pace=ReplayPace.ASAP, + ) + + +# ---------------------------------------------------------------------- +# open / close + + +def test_open_refused_when_csv_missing(tmp_path: Path) -> None: + # Arrange + adapter = _make_adapter(tmp_path / "absent.csv") + + # Act + Assert + with pytest.raises(FcOpenError, match="CSV file not found"): + adapter.open() + + +def test_double_open_raises(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + adapter.open() + + # Act + Assert + with pytest.raises(FcOpenError, match="already opened"): + adapter.open() + + +def test_close_is_idempotent_before_open(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + + # Act — close() before open() is a documented no-op (parity with tlog). + adapter.close() + adapter.close() + + # Assert — no exception raised; state remains closeable. + assert True + + +def test_close_is_idempotent_after_open(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + adapter.open() + + # Act + adapter.close() + adapter.close() + + # Assert + assert True + + +# ---------------------------------------------------------------------- +# Protocol parity + + +def test_subscribe_returns_real_subscription_handle(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + adapter.open() + + # Act + subscription = adapter.subscribe_telemetry(lambda _frame: None) + + # Assert — handle exposes the cancel() entry point even though the + # bus is intentionally never fed (replay loop reads CSV directly). + assert hasattr(subscription, "cancel") + subscription.cancel() + + +def test_source_set_switch_unsupported(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + + # Act + Assert + with pytest.raises(SourceSetSwitchNotSupportedError): + adapter.request_source_set_switch() + + +def test_current_flight_state_returns_init_signal(csv_file: Path) -> None: + # Arrange — CSV carries no MAVLink HEARTBEAT, so the adapter has + # nothing to latch; the contract is to return an INIT-state signal. + adapter = _make_adapter(csv_file) + + # Act + signal = adapter.current_flight_state() + + # Assert + assert signal.state is FlightState.INIT + assert signal.last_valid_gps_hint_wgs84 is None + assert signal.last_valid_gps_age_ms is None + + +# ---------------------------------------------------------------------- +# Outbound (Invariant 5) + + +def test_emit_external_position_raises_without_transport(csv_file: Path) -> None: + # Arrange — no MavlinkTransport injected → adapter falls back to the + # AZ-399 raise-on-emit contract, mirroring TlogReplayFcAdapter. + adapter = _make_adapter(csv_file) + adapter.open() + + # Act + Assert + with pytest.raises(FcEmitError, match="does not emit"): + adapter.emit_external_position(_dummy_estimator_output()) + + +def test_emit_status_text_raises_without_transport(csv_file: Path) -> None: + # Arrange + adapter = _make_adapter(csv_file) + adapter.open() + + # Act + Assert + with pytest.raises(FcEmitError, match="does not emit"): + adapter.emit_status_text("hello", severity=Severity.INFO) + + +# ---------------------------------------------------------------------- +# Helpers + + +def _dummy_estimator_output() -> object: + # The transport-less emit path short-circuits with FcEmitError before + # reading any field, so a duck-typed stand-in is enough — duplicating + # the full EstimatorOutput (UUID frame_id, 6x6 covariance, etc.) + # would only hide the actual contract being tested. + from types import SimpleNamespace + + return SimpleNamespace() diff --git a/tests/unit/replay_input/test_csv_ground_truth.py b/tests/unit/replay_input/test_csv_ground_truth.py new file mode 100644 index 0000000..ec434ea --- /dev/null +++ b/tests/unit/replay_input/test_csv_ground_truth.py @@ -0,0 +1,287 @@ +"""AZ-894 — CSV-driven IMU + GPS ground-truth extractor. + +Covers AC-1 (parses 4,899 IMU + 4,899 GPS samples on a single monotonic +clock) and AC-5 (clear ``ReplayInputAdapterError`` at startup for schema +faults) of ``_docs/02_tasks/todo/AZ-894_csv_driven_replay_adapter.md``. + +The happy-path test is gated on the committed Derkachi fixture +(``_docs/00_problem/input_data/flight_derkachi/data_imu.csv``, 4,899 +rows + header). Schema-fault tests use synthetic CSV strings written +to ``tmp_path`` so they remain deterministic and do not depend on the +fixture being present. + +Style: every test follows the Arrange / Act / Assert pattern. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from gps_denied_onboard.replay_input.csv_ground_truth import ( + CSV_SOURCE_LABEL, + REQUIRED_COLUMNS, + load_csv_ground_truth, +) +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError + + +_DERKACHI_CSV: Path = ( + Path(__file__).resolve().parents[3] + / "_docs" + / "00_problem" + / "input_data" + / "flight_derkachi" + / "data_imu.csv" +) + + +_EXAMPLE_CSV: Path = ( + Path(__file__).resolve().parents[3] + / "_docs" + / "02_document" + / "contracts" + / "replay" + / "example_data_imu.csv" +) + + +# --------------------------------------------------------------------- +# Header + minimal-row helpers + + +def _write_csv(path: Path, header: str, rows: list[str]) -> Path: + path.write_text(header + "\n" + "\n".join(rows) + "\n", encoding="utf-8") + return path + + +def _full_header() -> str: + return ",".join(REQUIRED_COLUMNS) + + +def _row(time_s: float, *, prefix_ms: float = 0.0) -> str: + # 15 fields total matching REQUIRED_COLUMNS ordering. Values are + # picked to be valid floats; the exact magnitudes do not matter + # for these tests (the loader only validates parseability + range). + fields = [ + str(prefix_ms), + str(time_s), + "10", + "-3", + "-980", + "50", + "30", + "-5", + "50.0809634", + "36.1115442", + "141290", + "-4", + "-6", + "-88", + "35041", + ] + return ",".join(fields) + + +# --------------------------------------------------------------------- +# AC-1: happy path on the real Derkachi CSV + + +@pytest.mark.skipif( + not _DERKACHI_CSV.is_file(), + reason="Derkachi fixture data_imu.csv not present", +) +def test_ac1_loads_derkachi_csv_emits_paired_samples() -> None: + # Arrange — committed fixture path; nothing to set up. + # Note: AZ-894 spec mentions "4,899 samples"; the actual fixture + # spans Time=0.0..489.9 s in 0.1 s steps → 4,900 rows. We pin the + # concrete count so the test catches truncation, plus the + # span-derived invariant so future fixtures with a different + # length still pass for the right reason. + expected_count = 4900 + + # Act + gt = load_csv_ground_truth(_DERKACHI_CSV) + + # Assert + assert gt.source == CSV_SOURCE_LABEL + assert len(gt.records) == expected_count + assert len(gt.imu_samples) == expected_count + # First row of the fixture has Time=0; last is 489.9 s (10 Hz). + assert gt.records[0].ts_ns == 0 + assert gt.records[-1].ts_ns == int(489.9 * 1e9) + # IMU samples share the same canonical clock as the GPS records. + for gps, imu in zip(gt.records, gt.imu_samples, strict=True): + assert gps.ts_ns == imu.ts_ns + + +# --------------------------------------------------------------------- +# AZ-896 AC-3: the shipped example CSV stays parser-clean + +@pytest.mark.skipif( + not _EXAMPLE_CSV.is_file(), + reason="AZ-896 example_data_imu.csv not present", +) +def test_az896_example_csv_loads_clean() -> None: + # Arrange — committed AZ-896 example; nothing to set up. + + # Act + gt = load_csv_ground_truth(_EXAMPLE_CSV) + + # Assert + assert gt.source == CSV_SOURCE_LABEL + assert len(gt.records) >= 10 + assert len(gt.records) == len(gt.imu_samples) + assert gt.records[0].ts_ns == 0 + + +# --------------------------------------------------------------------- +# AC-1 (small fixture): paired-sample invariants + + +def test_paired_imu_and_gps_share_clock(tmp_path: Path) -> None: + # Arrange + csv = _write_csv( + tmp_path / "ok.csv", + _full_header(), + [ + _row(0.0, prefix_ms=4551116.348), + _row(0.1, prefix_ms=4551216.348), + _row(0.2, prefix_ms=4551316.348), + ], + ) + + # Act + gt = load_csv_ground_truth(csv) + + # Assert + assert len(gt.records) == 3 and len(gt.imu_samples) == 3 + expected_ns = [0, 100_000_000, 200_000_000] + assert [r.ts_ns for r in gt.records] == expected_ns + assert [s.ts_ns for s in gt.imu_samples] == expected_ns + + +def test_gps_unit_conversion(tmp_path: Path) -> None: + # Arrange — values exercise the deg/mm/cm-s/cdeg conversions. + header = _full_header() + row = ",".join([ + "0.0", "0.0", + "10", "-3", "-980", "50", "30", "-5", # IMU stays raw + "50.0809634", # lat already in degrees + "36.1115442", # lon already in degrees + "141290", # alt in mm → 141.290 m + "-400", # vx in cm/s → -4.0 m/s + "600", # vy in cm/s → 6.0 m/s + "-88", # vz in cm/s → -0.88 m/s + "35041", # hdg in cdeg → 350.41 deg + ]) + csv = _write_csv(tmp_path / "units.csv", header, [row]) + + # Act + gt = load_csv_ground_truth(csv) + + # Assert + fix = gt.records[0] + assert fix.lat_deg == pytest.approx(50.0809634) + assert fix.lon_deg == pytest.approx(36.1115442) + assert fix.alt_m == pytest.approx(141.290) + assert fix.vx_m_s == pytest.approx(-4.0) + assert fix.vy_m_s == pytest.approx(6.0) + assert fix.vz_m_s == pytest.approx(-0.88) + assert fix.hdg_deg == pytest.approx(350.41) + + +# --------------------------------------------------------------------- +# AC-5: schema faults raise ReplayInputAdapterError at startup + + +def test_ac5_file_not_found_raises(tmp_path: Path) -> None: + # Arrange + missing = tmp_path / "absent.csv" + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="CSV file not found"): + load_csv_ground_truth(missing) + + +def test_ac5_missing_required_column_raises(tmp_path: Path) -> None: + # Arrange — drop one required column from the header. + bad_header = ",".join(c for c in REQUIRED_COLUMNS if c != "SCALED_IMU2.xacc") + csv = _write_csv( + tmp_path / "missing_col.csv", + bad_header, + ["0,0,-3,-980,50,30,-5,50.0,36.0,141290,-4,-6,-88,35041"], + ) + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="missing required columns"): + load_csv_ground_truth(csv) + + +def test_ac5_nan_in_time_raises(tmp_path: Path) -> None: + # Arrange + csv = _write_csv( + tmp_path / "nan_time.csv", + _full_header(), + [_row(0.0), _row(float("nan"))], + ) + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="Time=.*is NaN/Inf"): + load_csv_ground_truth(csv) + + +def test_ac5_non_monotonic_time_raises(tmp_path: Path) -> None: + # Arrange + csv = _write_csv( + tmp_path / "non_monotonic.csv", + _full_header(), + [_row(0.1), _row(0.0)], + ) + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"): + load_csv_ground_truth(csv) + + +def test_ac5_repeated_time_also_non_monotonic(tmp_path: Path) -> None: + # Arrange — equal timestamps still violate strict monotonicity so + # the preintegrator never gets fed a zero-delta window. + csv = _write_csv( + tmp_path / "repeated.csv", + _full_header(), + [_row(0.0), _row(0.0)], + ) + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"): + load_csv_ground_truth(csv) + + +def test_ac5_non_numeric_imu_value_raises(tmp_path: Path) -> None: + # Arrange — substitute a non-parseable token in the IMU column. + row = ",".join([ + "0.0", "0.0", + "not-a-number", # SCALED_IMU2.xacc + "-3", "-980", "50", "30", "-5", + "50.0", "36.0", "141290", "-4", "-6", "-88", "35041", + ]) + csv = _write_csv(tmp_path / "bad_imu.csv", _full_header(), [row]) + + # Act + Assert + with pytest.raises( + ReplayInputAdapterError, + match=r"SCALED_IMU2\.xacc=.*is not a number", + ): + load_csv_ground_truth(csv) + + +def test_ac5_header_only_raises(tmp_path: Path) -> None: + # Arrange — header but no data rows. + csv = tmp_path / "header_only.csv" + csv.write_text(_full_header() + "\n", encoding="utf-8") + + # Act + Assert + with pytest.raises(ReplayInputAdapterError, match="no data rows"): + load_csv_ground_truth(csv) diff --git a/tests/unit/test_az401_compose_root_replay.py b/tests/unit/test_az401_compose_root_replay.py index 0e9f8d6..ce83336 100644 --- a/tests/unit/test_az401_compose_root_replay.py +++ b/tests/unit/test_az401_compose_root_replay.py @@ -491,12 +491,14 @@ def test_ac8_replay_branch_imports_only_public_apis() -> None: tree = ast.parse(text) # Allowed deep imports: into the c8_fc_adapter component (the - # noop transport + the JSONL sink) and into the `replay_input` - # cross-cutting coordinator (Layer-4). Both are documented in - # module-layout.md as the replay strategy homes. + # noop transport + the JSONL sink + the AZ-894 CSV replay adapter) + # and into the `replay_input` cross-cutting coordinator (Layer-4). + # All of these are documented in module-layout.md as the replay + # strategy homes. allowed_deep_prefixes = ( "gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport", "gps_denied_onboard.components.c8_fc_adapter.replay_sink", + "gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter", "gps_denied_onboard.replay_input.tlog_video_adapter", ) @@ -632,9 +634,14 @@ def test_replay_branch_rejects_empty_video_path( build_replay_components(config) -def test_replay_branch_rejects_empty_tlog_path( +def test_replay_branch_rejects_both_inputs_empty( _airborne_replay_env: Path, ) -> None: + # AZ-894: the validation gate now accepts either imu_csv_path + # (canonical) or tlog_path (legacy) — rejecting only when both + # are empty. Keeping the historical name pattern (test_*_rejects_*) + # for grep parity but renamed to reflect the new semantics. + # Arrange runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env)) config = Config( @@ -642,6 +649,7 @@ def test_replay_branch_rejects_empty_tlog_path( replay=ReplayConfig( video_path="/dev/null/fake.mp4", tlog_path="", + imu_csv_path="", output_path="/tmp/out.jsonl", pace="asap", target_fc_dialect="ardupilot_plane", @@ -650,7 +658,7 @@ def test_replay_branch_rejects_empty_tlog_path( ) # Act / Assert - with pytest.raises(CompositionError, match="tlog_path is empty"): + with pytest.raises(CompositionError, match="imu_csv_path is empty"): build_replay_components(config) diff --git a/tests/unit/test_az402_replay_cli.py b/tests/unit/test_az402_replay_cli.py index c3a4b3a..612a5ca 100644 --- a/tests/unit/test_az402_replay_cli.py +++ b/tests/unit/test_az402_replay_cli.py @@ -57,6 +57,10 @@ def _required_files(tmp_path: Path, _calib_payload: dict[str, Any]) -> dict[str, video.write_bytes(b"\x00\x00\x00\x18ftypmp42") # placeholder tlog = tmp_path / "flight.tlog" tlog.write_bytes(b"\x00") + imu_csv = tmp_path / "data_imu.csv" + # Minimal placeholder — the CLI only validates existence, parsing + # happens later inside the runtime loop. + imu_csv.write_text("placeholder", encoding="utf-8") output = tmp_path / "out.jsonl" calib = tmp_path / "calib.json" calib.write_text(json.dumps(_calib_payload)) @@ -67,6 +71,7 @@ def _required_files(tmp_path: Path, _calib_payload: dict[str, Any]) -> dict[str, return { "video": video, "tlog": tlog, + "imu": imu_csv, "output": output, "camera_calibration": calib, "config": config_yaml, @@ -95,6 +100,7 @@ def _argv(files: dict[str, Path], **overrides: Any) -> list[str]: """Build a CLI argv from the required-files fixture + overrides.""" base = { "--video": str(files["video"]), + "--imu": str(files["imu"]), "--tlog": str(files["tlog"]), "--output": str(files["output"]), "--camera-calibration": str(files["camera_calibration"]), @@ -477,6 +483,7 @@ def test_ac10_console_script_runs_help() -> None: # Required-arg surface check for arg in ( "--video", + "--imu", "--tlog", "--output", "--camera-calibration",