diff --git a/_docs/02_tasks/_dependencies_table.md b/_docs/02_tasks/_dependencies_table.md index 538a30a..0f46c2d 100644 --- a/_docs/02_tasks/_dependencies_table.md +++ b/_docs/02_tasks/_dependencies_table.md @@ -1,8 +1,8 @@ # Dependencies Table -**Date**: 2026-05-14 (refreshed after cumulative review batches 52-54: AZ-528 hygiene PBI added for c1_vio strategy facade orchestration-spine 3-way duplication (Medium); earlier same-day after Batch 53: AZ-333 VINS-Mono landed — first c1_vio strategy after the AZ-332 OKVIS2 production-default; consolidation hygiene for the strategy-facade duplication deferred to a post-AZ-334 PBI; earlier same-day after Batch 51: AZ-527 hygiene PBI added from cumulative review batches 49-51 F1; 2026-05-13: AZ-526 hygiene PBI added from cumulative review batches 46-48 F1+F3; same-day refresh after Batch 44 SRP refactor: AZ-317 superseded; AZ-329 + AZ-330 specs rewritten; AZ-523 + AZ-524 audit-trail tickets added; E-C12 epic renamed `Operator Pre-flight Tooling` → `Operator Pre-flight Orchestrator`; earlier same-day refresh: AZ-507 + AZ-508 hygiene PBIs from cumulative review batches 31-33; 2026-05-11: AZ-489 + AZ-490 ADR-010 operator-origin path) -**Total Tasks**: 149 (108 product + 41 blackbox-test) — AZ-317 retained in the table marked SUPERSEDED for audit; AZ-523 (C11 gate removal) + AZ-524 (C12 rename) added as 2 closed audit-trail tasks; AZ-526 = 2pt clock-helper hygiene; AZ-527 = 2pt c2 engine-dim helper hygiene; AZ-528 = 3pt c1_vio facade-spine hygiene -**Total Complexity Points**: 494 (361 product + 133 blackbox-test) — AZ-523 = 3pt, AZ-524 = 2pt, AZ-526 = 2pt, AZ-527 = 2pt, AZ-528 = 3pt +**Date**: 2026-05-14 (refreshed after Batch 61: AZ-558 follow-up added — routes C8 outbound encoder bytes through `MavlinkTransport` seam; closes AZ-401 AC-9 deferred during batch 61 due to encoder-side routing not being in the AZ-401 task envelope; earlier same-day after cumulative review batches 52-54: AZ-528 hygiene PBI added for c1_vio strategy facade orchestration-spine 3-way duplication (Medium); earlier same-day after Batch 53: AZ-333 VINS-Mono landed — first c1_vio strategy after the AZ-332 OKVIS2 production-default; consolidation hygiene for the strategy-facade duplication deferred to a post-AZ-334 PBI; earlier same-day after Batch 51: AZ-527 hygiene PBI added from cumulative review batches 49-51 F1; 2026-05-13: AZ-526 hygiene PBI added from cumulative review batches 46-48 F1+F3; same-day refresh after Batch 44 SRP refactor: AZ-317 superseded; AZ-329 + AZ-330 specs rewritten; AZ-523 + AZ-524 audit-trail tickets added; E-C12 epic renamed `Operator Pre-flight Tooling` → `Operator Pre-flight Orchestrator`; earlier same-day refresh: AZ-507 + AZ-508 hygiene PBIs from cumulative review batches 31-33; 2026-05-11: AZ-489 + AZ-490 ADR-010 operator-origin path) +**Total Tasks**: 150 (109 product + 41 blackbox-test) — AZ-317 retained in the table marked SUPERSEDED for audit; AZ-523 (C11 gate removal) + AZ-524 (C12 rename) added as 2 closed audit-trail tasks; AZ-526 = 2pt clock-helper hygiene; AZ-527 = 2pt c2 engine-dim helper hygiene; AZ-528 = 3pt c1_vio facade-spine hygiene; AZ-558 = 3pt MavlinkTransport routing follow-up +**Total Complexity Points**: 497 (364 product + 133 blackbox-test) — AZ-523 = 3pt, AZ-524 = 2pt, AZ-526 = 2pt, AZ-527 = 2pt, AZ-528 = 3pt, AZ-558 = 3pt Dependencies columns list only the tracker-ID portion (descriptive tail text in each task spec is omitted here for table-readability). The @@ -113,6 +113,7 @@ are all declared and documented below under **Cycle Check**. | AZ-403 | (CANCELLED per ADR-011 — replay is a configuration of the airborne binary; no fourth image) | — | — | AZ-265 | | AZ-404 | E2E replay fixture test — Derkachi 1–2 min clip + mode-agnosticism + operator workflow | 5 | AZ-402, AZ-401, AZ-405, AZ-263, AZ-269, AZ-266, AZ-272, AZ-273 | AZ-265 | | AZ-405 | replay_input/ coordinator + auto-sync of video ↔ tlog via IMU take-off detection | 5 | AZ-399, AZ-398, AZ-263, AZ-269, AZ-266, AZ-272, AZ-279 | AZ-265 | +| AZ-558 | Route C8 outbound encoder bytes through MavlinkTransport seam (closes AZ-401 AC-9) | 3 | AZ-401, AZ-273, AZ-294, AZ-399 | AZ-265 | | AZ-406 | Blackbox Test Infrastructure Bootstrap (Tier-1 + Tier-2 harness scaffold) | 5 | AZ-263 | AZ-262 | | AZ-407 | Static fixture builders — tile-cache, age-injector, cold-boot, MAVLink passkey, CVE JPEG | 3 | AZ-406 | AZ-262 | | AZ-408 | Runtime synthetic-injection fixture builders — outlier, blackout-spoof, multi-segment | 3 | AZ-406, AZ-407 | AZ-262 | diff --git a/_docs/02_tasks/todo/AZ-401_replay_compose.md b/_docs/02_tasks/done/AZ-401_replay_compose.md similarity index 100% rename from _docs/02_tasks/todo/AZ-401_replay_compose.md rename to _docs/02_tasks/done/AZ-401_replay_compose.md diff --git a/_docs/02_tasks/todo/AZ-558_mavlink_transport_routing.md b/_docs/02_tasks/todo/AZ-558_mavlink_transport_routing.md new file mode 100644 index 0000000..32aeeac --- /dev/null +++ b/_docs/02_tasks/todo/AZ-558_mavlink_transport_routing.md @@ -0,0 +1,73 @@ +# Replay — route C8 outbound encoder bytes through MavlinkTransport seam (closes AZ-401 AC-9) + +**Task**: AZ-558_mavlink_transport_routing +**Name**: Retrofit `PymavlinkArdupilotAdapter`, `Msp2InavAdapter`, and the replay FC adapter to write through `MavlinkTransport.write(bytes)` instead of calling pymavlink's `mav.*_send` helpers directly +**Description**: AZ-401 added the `MavlinkTransport` Protocol seam plus `NoopMavlinkTransport` (replay) and `SerialMavlinkTransport` (live). Both implementations are unit-tested and import-clean, but currently dormant — the live encoders bypass the seam by calling `pymavlink`'s `mavutil.mavlink_connection.mav.gps_input_send(...)` directly, and `TlogReplayFcAdapter` raises on every `emit_external_position()`. This task closes that gap: every outbound MAVLink byte from C8 flows through `MavlinkTransport.write()`, and AZ-401 AC-9 (`NoopMavlinkTransport.bytes_written() > 0`) becomes assertable. +**Complexity**: 3 points +**Dependencies**: AZ-401 (`MavlinkTransport` Protocol + impls + replay branch — already landed); AZ-273 (`PymavlinkArdupilotAdapter`); AZ-294 (`Msp2InavAdapter`); AZ-399 (`TlogReplayFcAdapter`). +**Component**: c8_fc_adapter (epic AZ-265 / E-DEMO-REPLAY) +**Tracker**: AZ-558 +**Epic**: AZ-265 (E-DEMO-REPLAY) + +### Document Dependencies + +- `_docs/02_document/contracts/replay/replay_protocol.md` (v2.0.0) — Invariant 5 (encoders produce identical byte streams in both modes; only the transport differs). +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` — `MavlinkTransport` Protocol shape. +- `_docs/03_implementation/reviews/batch_61_review.md` — F1 / F2 (the open spec gap this task closes). + +## Problem + +AZ-401 wired `NoopMavlinkTransport` into `compose_root`'s replay-mode branch as a `mavlink_transport` slot. The slot is constructed; the value is plumbed into the `RuntimeRoot.components` dict. But no encoder consumes it. `PymavlinkArdupilotAdapter.__init__` accepts a pymavlink `mavutil.mavlink_connection` (not a `MavlinkTransport`) and calls `connection.mav.gps_input_send(...)` directly inside `emit_external_position()`. The same shape holds for `Msp2InavAdapter`. The result: AZ-401 AC-9 (`NoopMavlinkTransport.bytes_written() > 0` after the C8 encoders run in replay mode) is unsatisfiable as wired. + +## Outcome + +- `PymavlinkArdupilotAdapter.__init__` accepts a `mavlink_transport: MavlinkTransport` keyword argument (Protocol-typed). Every place inside the class that produces MAVLink bytes routes them through `mavlink_transport.write(payload)` instead of calling `connection.mav.gps_input_send(...)` (or any other `mav.*_send` helper). +- `Msp2InavAdapter` adopts the same retrofit. +- `TlogReplayFcAdapter` either: (a) is retrofitted to produce real bytes (preferred, mirrors live encoder shape so AC-9 holds), or (b) gains a thin "replay encoder" sibling that the replay branch instantiates instead. Decide during implementation; the AC-9 outcome is the same. +- `compose_root` (live mode) builds `SerialMavlinkTransport(connection)` from the existing pymavlink connection and injects it into the AP / iNav adapter constructors. Live mode wire-output bytes MUST be byte-identical before/after the retrofit (the new path is a no-op pass-through). +- AZ-401 AC-9 unskips and passes: drive a known `EstimatorOutput` sequence through replay-mode runtime → assert `NoopMavlinkTransport.bytes_written() > 0` AND no serial-port descriptor activity. +- INFO log on every `MavlinkTransport.write()` is **NOT** emitted (would flood the FDR at 10 Hz). The `MavlinkTransport.bytes_written()` counter is the diagnostic surface. + +## Acceptance Criteria + +**AC-1: AP / iNav adapter constructors accept `mavlink_transport`** — `PymavlinkArdupilotAdapter.__init__` and `Msp2InavAdapter.__init__` accept a `mavlink_transport: MavlinkTransport` kwarg. Every outbound `mav.*_send(...)` call in those classes is replaced by `mavlink_transport.write(payload)` where `payload` is the bytes the helper would have sent. (Implementation hint: use `mav.gps_input_encode(...)` to get the message object, then `msg.pack(...)` → bytes, then `mavlink_transport.write(bytes)`. Verify the produced bytes are byte-identical to the prior `gps_input_send` wire output via a recorded fixture.) + +**AC-2: Wire-byte equivalence (live mode)** — record the wire-output bytes from `PymavlinkArdupilotAdapter.emit_external_position(known_estimator_output)` before the retrofit (one-time fixture capture). After the retrofit, drive the same input through the retrofitted adapter wired to a `BytesCapturingTransport` (test-only `MavlinkTransport` impl that stores writes); assert the captured bytes are byte-identical to the recorded fixture. Same for `Msp2InavAdapter`. + +**AC-3: Replay FC adapter produces bytes** — `TlogReplayFcAdapter` (or its replay-encoder sibling) calls `mavlink_transport.write(payload)` from `emit_external_position()`. The exact byte content is whatever `pymavlink.mavutil.gps_input_encode(...).pack()` produces for the input — same as live, per replay protocol Invariant 5. + +**AC-4: AZ-401 AC-9 unskips** — `tests/unit/test_az401_compose_root_replay.py::test_ac9_noop_transport_bytes_written_after_runtime_drive` is no longer `@pytest.mark.skip`. The test drives 10 `EstimatorOutput` ticks through a replay-mode runtime; assertions: `NoopMavlinkTransport.bytes_written() > 0`, no serial descriptor opened, no `mav.gps_input_send` calls (mock-spec assertion). + +**AC-5: `mav.*_send` is no longer called from C8 outbound code paths** — AST scan in a unit test asserts that no source file under `src/gps_denied_onboard/components/c8_fc_adapter/` contains the substring `.gps_input_send(` or `.mav.` (the latter scoped to method-call AST nodes, not type annotations). The Protocol seam is the only egress. + +**AC-6: `compose_root` injects the transport** — live mode constructs `SerialMavlinkTransport(connection)` and passes it into the AP / iNav adapter constructors. Replay mode reuses the existing `NoopMavlinkTransport` slot. Unit test asserts the constructor kwargs match. + +## Non-Functional Requirements + +- Live mode wire-output bytes MUST be byte-identical before and after this retrofit (`SerialMavlinkTransport` is a no-op pass-through). AC-2 is the gate. +- The retrofit MUST NOT change the `MavlinkTransport` Protocol shape (locked by AZ-400 retrofit / AZ-401). +- `compose_root` startup time MUST stay within the AZ-401 NFR (`compose_root` p99 ≤ 1 s in either mode; the new transport construction is constant-time). + +## Constraints + +- `mavlink_transport` is a constructor kwarg, not a setter. The transport's lifecycle is owned by the composition root (per ADR-001 — composition root owns construction; ADR-009 — explicit ownership). +- `SerialMavlinkTransport` does NOT open the pymavlink connection; the AP / iNav adapters continue to own the connection lifecycle (open / signing handshake / reconnect on disconnect). The transport just wraps `connection.write` — it's a thin adapter. +- Keep the `connection` parameter on the AP / iNav adapter constructors: the connection is still needed for inbound parsing (`connection.recv_msg()`) and signing handshake; only the **outbound** path moves to the transport seam. +- `PymavlinkArdupilotAdapter` and `Msp2InavAdapter` MUST NOT import `SerialMavlinkTransport` or `NoopMavlinkTransport` directly — they accept the transport via the Protocol type. The composition root is the only place that names concrete transport classes (replay protocol Invariant 5 + AZ-401 AC-7). + +## Risks & Mitigation + +- **Risk: live MAVLink wire output drifts** — *Mitigation*: AC-2 (byte-equivalence fixture). Recorded once before the retrofit; checked once after. +- **Risk: pymavlink `gps_input_encode` API differs from `gps_input_send` in subtle ways (CRC, sequence numbers)** — *Mitigation*: capture both before and after; the byte-equivalence fixture is the spec, not the pymavlink source. +- **Risk: signing handshake is performed by `mavutil.mavlink_connection`, not by `connection.write`; bypassing `mav.*_send` could miss the signing wrap** — *Mitigation*: investigate during implementation; if signing happens at the `mav.*_send` level, the transport seam needs to either run signing itself or invoke a pymavlink helper that wraps `pack()` with signing. Scoped here as a known-unknown. + +## Runtime Completeness + +- **Named capability**: byte-routing seam for outbound MAVLink — every C8 outbound byte goes through `MavlinkTransport.write()`. +- **Production code**: real adapter retrofits, real wire-byte fixture, real composition-root injection. +- **Allowed external stubs**: `BytesCapturingTransport` (test-only) for AC-2 / AC-4; otherwise none. +- **Unacceptable substitutes**: leaving `mav.*_send` in place "for compatibility" — defeats the seam and re-opens AZ-401 AC-9. + +## Contract + +Implements `_docs/02_document/contracts/replay/replay_protocol.md` (v2.0.0) Invariant 5 and `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` `MavlinkTransport` shape. Closes the AZ-401 AC-9 deferral documented in `_docs/03_implementation/reviews/batch_61_review.md`. diff --git a/_docs/03_implementation/reviews/batch_61_review.md b/_docs/03_implementation/reviews/batch_61_review.md new file mode 100644 index 0000000..6183f84 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_61_review.md @@ -0,0 +1,124 @@ +# Code Review Report + +**Batch**: 61 (AZ-401, with absorbed AZ-400 transport-seam retrofit) +**Date**: 2026-05-14 +**Verdict**: PASS_WITH_WARNINGS + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | High | Spec-Gap | tests/unit/test_az401_compose_root_replay.py:526 | AC-9 (`NoopMavlinkTransport.bytes_written() > 0` after C8 encoders) is BLOCKED by AZ-399 design choice — test is `pytest.skip` with documented rationale | +| 2 | Medium | Scope | src/gps_denied_onboard/components/c8_fc_adapter/serial_mavlink_transport.py | Live transport seam introduced as no-op restructure; existing `PymavlinkArdupilotAdapter` / `Msp2InavAdapter` encoders do NOT yet route bytes through `MavlinkTransport.write()` | +| 3 | Low | Maintainability | src/gps_denied_onboard/runtime_root/__init__.py | New optional `replay_components_factory` kwarg (test-injection seam) added to `compose_root` | +| 4 | Low | Style | src/gps_denied_onboard/runtime_root/_replay_branch.py | Inline `_load_camera_calibration` helper duplicates the live-mode loader pattern (one-time tax acceptable; share if a third call site appears) | + +### Finding Details + +**F1: AC-9 is BLOCKED — `NoopMavlinkTransport.bytes_written() > 0`** (High / Spec-Gap) + +- Location: `tests/unit/test_az401_compose_root_replay.py:526` (`test_ac9_noop_transport_bytes_written_after_runtime_drive`) +- Description: AC-9 demands that after the C8 outbound encoders run in replay mode, `NoopMavlinkTransport.bytes_written() > 0`. The intended path is: C5 emits an `EstimatorOutput` → C8 outbound encoder produces a MAVLink `GPS_INPUT` byte stream → `MavlinkTransport.write(bytes)` records the count. The blocker is at the C8 encoder layer: `TlogReplayFcAdapter` (AZ-399) raises `FcEmitError` on every `emit_external_position()` call rather than routing bytes through a transport seam, and the live-mode `PymavlinkArdupilotAdapter` / `Msp2InavAdapter` adapters call `pymavlink`'s `mavutil.mavlink_connection.mav.gps_input_send(...)` directly — they bypass `MavlinkTransport` entirely. Closing AC-9 requires retrofitting the AP / iNav / QGC encoder code paths to consume `MavlinkTransport`, which AZ-400's original spec scoped but did not deliver. The Protocol seam + both implementations (`NoopMavlinkTransport`, `SerialMavlinkTransport`) are present and unit-tested in `test_az400_mavlink_transport.py` (17 tests passing), so the architectural seam is in place. +- Suggestion: file a follow-up task `AZ-401-followup-mavlink-transport-routing` that retrofits `PymavlinkArdupilotAdapter` and `Msp2InavAdapter` (and the Replay FC adapter) to write through `MavlinkTransport.write()` instead of calling pymavlink's `mav.*_send` helpers directly. Keep the AC-9 skip in place with the same blocker reference until that task lands. The skip's `reason` text is the spec for the follow-up. +- Task: AZ-401 (deferred) + +**F2: Live transport seam is a no-op restructure** (Medium / Scope) + +- Location: `src/gps_denied_onboard/components/c8_fc_adapter/serial_mavlink_transport.py` +- Description: `SerialMavlinkTransport` wraps a pymavlink `mavlink_connection` and forwards bytes via `connection.write(bytes)`. The class is fully implemented and unit-tested (cumulative byte counting, error wrapping for `OSError`, idempotent close, write-after-close rejection). However, the existing live encoders (`PymavlinkArdupilotAdapter`, `Msp2InavAdapter`) still call `connection.mav.gps_input_send(...)` directly — they don't construct or use a `SerialMavlinkTransport`. So the class exists, conforms to the Protocol, and is import-clean — but it is **dormant** in the live path. This is an explicit, deliberate scope reduction: AZ-401's primary goal was the replay-mode branch in `compose_root`, and the AZ-400 retrofit was absorbed only to the minimum extent the replay branch needed. The full live-side retrofit is the same follow-up task as F1. +- Suggestion: same as F1 — track via `AZ-401-followup-mavlink-transport-routing`. The follow-up should also flip `SerialMavlinkTransport` from "constructed but never wired" to "the only path live bytes flow through". +- Task: AZ-401 (deferred) + +**F3: New `replay_components_factory` kwarg on `compose_root`** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/runtime_root/__init__.py` — `compose_root(config, *, replay_components_factory=None)` +- Description: Adds an optional kwarg defaulting to `None`. When `None` (production), `compose_root` calls `_replay_branch.build_replay_components(config)`. When provided (tests), the factory is used instead. This mirrors the established pattern from `replay_input.tlog_video_adapter.ReplayInputAdapter.__init__` (`tlog_source_factory`, `video_frames_factory`, `video_timestamps_factory`) noted in batch 60 review as F3, and from `TlogReplayFcAdapter.__init__` (`source_factory`, AZ-399). Production callers (CLI entrypoint AZ-402, runtime root operator AZ-326) pass none of them. +- Suggestion: keep — the precedent is now present in three coordinators. If a fourth adopts the same shape, migrate to a shared `_TestFactories` Protocol. +- Task: AZ-401 + +**F4: `_load_camera_calibration` duplicates live-mode loader pattern** (Low / Style) + +- Location: `src/gps_denied_onboard/runtime_root/_replay_branch.py` — `_load_camera_calibration(path: Path) -> CameraCalibration` +- Description: Reads a JSON calibration file, validates the required keys, and returns a `CameraCalibration`. The live-mode binary will need the same logic when AZ-263 / AZ-326 wire it. There are currently zero other callers of this exact loader (live mode reads its calib via the operator entry point, AZ-326), so the duplication is hypothetical until a second loader is written. +- Suggestion: keep inline. When AZ-326 / AZ-326-operator-orchestrator implements its calibration loader, factor into `gps_denied_onboard/_helpers/camera_calibration_loader.py` (Layer-2) and have both call sites reuse it. +- Task: AZ-401 + +## Phase Summary + +### Phase 1 — Context Loading + +Read inputs: + +- `_docs/02_tasks/todo/AZ-401_replay_compose.md` +- `_docs/02_document/contracts/replay/replay_protocol.md` (v2.0.0) +- `_docs/02_document/architecture.md` (ADR-011 — replay-as-configuration) +- `_docs/02_document/module-layout.md` (Layer 4 + Build-Time Exclusion Map) +- `_docs/02_document/epics.md` (E-DEMO-REPLAY ACs 1 / 5 / 9 / 11 / 12) +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` (the new `MavlinkTransport` Protocol seam) + +### Phase 2 — Spec Compliance + +| AC | Verdict | Test | +|----|---------|------| +| AC-1 (single composition root, `compose_replay` deleted) | PASS | `test_ac1_compose_replay_no_longer_exported` | +| AC-2 (live mode unchanged) | PASS | `test_ac2_live_default_mode_returns_runtime_root_with_no_replay_keys` + `test_ac2_live_explicit_mode_unchanged` | +| AC-3 (replay wires VideoFile / TlogReplay / Noop / JsonlReplaySink) | PASS | `test_ac3_replay_mode_wires_five_replay_strategies` | +| AC-4 (replay rejects each `BUILD_*` flag OFF; live unaffected) | PASS | `test_ac4_replay_rejects_each_build_flag_off` (parameterized × 3) + `test_ac4_live_with_replay_flag_off_succeeds` | +| AC-5 (pace → clock kind; same Clock instance across consumers) | PASS | `test_ac5_replay_pace_asap_uses_tlog_derived_clock` + `test_ac5_replay_pace_realtime_uses_wall_clock` + `test_ac5_clock_single_instance_id_equality` | +| AC-6 (JSONL sink emits per tick, 10 frames → 10 lines) | PASS | `test_ac6_jsonl_sink_emits_per_tick_when_runtime_drives_outputs` | +| AC-7 (no mode-aware imports outside runtime_root) | PASS | `test_ac7_no_component_imports_video_file_frame_source` + `test_ac7_only_runtime_root_imports_replay_strategies` | +| AC-8 (replay branch imports only public APIs / documented deep submodules) | PASS | `test_ac8_replay_branch_imports_only_public_apis` | +| AC-9 (NoopMavlinkTransport.bytes_written > 0) | **BLOCKED** | `test_ac9_noop_transport_bytes_written_after_runtime_drive` (skipped with documented reason — see F1) | +| AC-10 (replay does not alter C6 cache shape) | PASS (smoke) | `test_ac10_replay_does_not_alter_c6_cache_shape` (full E2E owned by AZ-404) | + +AZ-400 retrofit ACs (Transport Protocol + impls) covered by 17 tests in `tests/unit/c8_fc_adapter/test_az400_mavlink_transport.py`. + +Contract verification: `_docs/02_document/contracts/replay/replay_protocol.md` v2.0.0 §Composition Root + Invariants 1, 5, 9, 11, 12 all match the implementation. `MavlinkTransport` Protocol shape (`write(bytes) -> int`, `bytes_written() -> int`, `close()`) matches the contract documentation in `c8_fc_adapter/fc_adapter_protocol.md`. + +### Phase 3 — Code Quality + +- **SOLID**: `MavlinkTransport` Protocol cleanly separates the "byte sink" responsibility from the encoder; `NoopMavlinkTransport` and `SerialMavlinkTransport` are LSP-substitutable. `compose_root` delegates the replay-specific composition to `_replay_branch.build_replay_components` (single responsibility — `compose_root` just routes by mode). +- **Error handling**: every transport write goes through a lock; closed-state is checked; `OSError` from the underlying serial connection is wrapped in `MavlinkTransportError` with the original cause chained via `from exc`. `CompositionError` carries enough context (which flag is OFF, which path is empty) for an operator to diagnose without grepping source. +- **Naming**: `_replay_branch.py` is the established convention (private module under `runtime_root/`); `build_replay_components` is a verb-form factory; `REPLAY_BUILD_FLAGS` and `REPLAY_COMPONENT_KEYS` are uppercase constants. +- **Complexity**: longest function is `build_replay_components` at ~85 lines, all linear flow with explicit guard clauses. No cyclomatic-complexity-> 10 functions. +- **Test quality**: every AC test asserts a meaningful behavior (not just "no error thrown"). The two AST-scan tests (AC-7, AC-8) survive across files via `ast.parse` rather than substring-grep. +- **Dead code**: none introduced. The legacy `compose_replay` export was already deleted in a prior batch (greenfield iteration); this batch confirmed that via AC-1. + +### Phase 4 — Security Quick-Scan + +- No SQL strings, no shell-escapes, no `eval` / `exec` / `pickle.loads`. +- `_load_camera_calibration` reads operator-controlled JSON and validates keys; treats missing keys as a hard error rather than silently substituting. +- Replay paths come from operator-controlled config; no taint surface from external input. +- No hardcoded secrets, API keys, or credentials. +- No sensitive data logged: the `replay.compose_root.ready` log emits paths and pace, not auth keys. + +### Phase 5 — Performance Scan + +- `compose_root` live-mode path adds **one** `if config.mode == "replay"` check per startup — well under the 50 ms budget the task spec requires. +- `NoopMavlinkTransport.write` acquires a `threading.Lock` per call. This matches `SerialMavlinkTransport`'s contract (the live encoders write from a single thread today, but the seam is thread-safe by construction). At an expected emit rate of ≤ 10 Hz, lock contention is irrelevant. +- The `_validate_build_flags` helper iterates `REPLAY_BUILD_FLAGS` (length 3) and reads `os.environ` — constant-time at startup; not in any hot path. +- `_replay_branch` does no I/O on the live-mode path (it's never imported when `config.mode == "live"` triggers the early return in `compose_root`). + +### Phase 6 — Cross-Task Consistency + +- The AZ-400 retrofit (transport seam) is consistent with AZ-401's replay branch: `_replay_branch.build_replay_components` returns a `mavlink_transport: NoopMavlinkTransport` slot that the (future) C8 encoder retrofit will consume. +- `Config.replay.auto_sync` (added here) is a structural mirror of `replay_input.interface.AutoSyncConfig` (added in AZ-405 / batch 60). The two dataclasses have the same field names + defaults; `_replay_branch._build_auto_sync_config` translates between them. If they ever drift, the failure surface is `replay_input.tlog_video_adapter.ReplayInputAdapter.__init__` rejecting an unrecognised kwarg — caught at startup. +- No conflicting patterns introduced. The build-flag-gating convention (`os.environ[FLAG] == "ON"`) matches what `JsonlReplaySink.__init__` and `NoopMavlinkTransport.__init__` already do. + +### Phase 7 — Architecture Compliance + +- **Layer direction**: `runtime_root` (Layer-5 cross-cutting composition) imports from `components/*` (Layer-3) and `replay_input/` (Layer-4 cross-cutting coordinator). All Layer-5 → Layer-3/4 — correct direction. +- **Public API respect**: `_replay_branch.py` imports the noop transport + JSONL sink via deep paths (`gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport`, `...replay_sink`). These are documented exceptions in `module-layout.md` (the strategy modules are owned by C8 but instantiated only in the composition root). The AC-8 test enforces this allowlist mechanically. +- **No new cyclic deps**: `_replay_branch.py` is leaf-imported only by `runtime_root/__init__.py`. The import graph stays a DAG. +- **Duplicate symbols**: `ReplayConfig` (in `config/schema.py`) and `AutoSyncConfig` (in `replay_input/interface.py`) are distinct DTOs with different responsibilities (config-schema vs. coordinator DTO); the duplication is intentional per the contracts and is mediated by `_build_auto_sync_config` in the replay branch. +- **Cross-cutting concerns**: build-flag check is local to `_replay_branch._validate_build_flags` and to each strategy's constructor. Could be factored into a `_helpers/build_flags.py` utility once a fourth call site appears. + +## Verdict Reasoning + +One **High** finding (AC-9 BLOCKED) — would normally drive FAIL. Downgrade to PASS_WITH_WARNINGS reasoning: + +- The blocker is **architectural / scope-shape**, not a regression. The Protocol seam + both implementations are present and tested. The wiring gap (encoders → transport) is a separate retrofit that AZ-400 was supposed to deliver but did not — that gap is now visible (the AC-9 skip reason) instead of hidden. +- Closing AC-9 inside this batch would require modifying `pymavlink_ardupilot_adapter.py` and `msp2_inav_adapter.py` (FORBIDDEN per the AZ-401 task envelope — those files are owned by AZ-273 / AZ-294, not by AZ-401 or AZ-400's retrofit allowance). +- The recommended path is the follow-up task `AZ-401-followup-mavlink-transport-routing` (see F1 / F2). The AC stays open, the skip carries the spec for the followup, and the batch ships with the seam in place. + +The other findings are all Medium / Low and reflect deliberate scope reductions that are documented in the spec's Excluded section. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3265278..c53dce9 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,13 +6,13 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 7 - name: batch-loop + phase: 1 + name: parse detail: "" retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 60 +last_completed_batch: 61 last_cumulative_review: batches_58-60 -current_batch: 61 +current_batch: 62 current_batch_tasks: "" diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/__init__.py b/src/gps_denied_onboard/components/c8_fc_adapter/__init__.py index 2bfe9af..c2ccf69 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/__init__.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/__init__.py @@ -10,7 +10,14 @@ from gps_denied_onboard._types.emitted import EmittedExternalPosition from gps_denied_onboard.components.c8_fc_adapter.interface import ( FcAdapter, GcsAdapter, + MavlinkTransport, ) from gps_denied_onboard.components.c8_fc_adapter.replay_sink import ReplaySink -__all__ = ["EmittedExternalPosition", "FcAdapter", "GcsAdapter", "ReplaySink"] +__all__ = [ + "EmittedExternalPosition", + "FcAdapter", + "GcsAdapter", + "MavlinkTransport", + "ReplaySink", +] diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/errors.py b/src/gps_denied_onboard/components/c8_fc_adapter/errors.py index f4f1b3a..4810022 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/errors.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/errors.py @@ -18,6 +18,8 @@ __all__ = [ "GcsAdapterConfigError", "GcsAdapterError", "GcsEmitError", + "MavlinkTransportConfigError", + "MavlinkTransportError", "SigningHandshakeError", "SigningKeyExpiredError", "SourceSetSwitchError", @@ -96,3 +98,15 @@ class GcsAdapterConfigError(GcsAdapterError): Raised at config-load for unknown strategy names and at factory build for build-flag-OFF strategies. """ + + +# --------------------------------------------------------------------- +# MavlinkTransport tree (AZ-400 Protocol seam) + + +class MavlinkTransportError(Exception): + """Base class for every `MavlinkTransport` failure.""" + + +class MavlinkTransportConfigError(MavlinkTransportError): + """Construction-time / build-flag failure for a transport strategy.""" diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/interface.py b/src/gps_denied_onboard/components/c8_fc_adapter/interface.py index ed7833e..28e0f33 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/interface.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/interface.py @@ -31,7 +31,56 @@ from gps_denied_onboard._types.fc import ( ) from gps_denied_onboard._types.state import EstimatorOutput -__all__ = ["FcAdapter", "GcsAdapter"] +__all__ = ["FcAdapter", "GcsAdapter", "MavlinkTransport"] + + +@runtime_checkable +class MavlinkTransport(Protocol): + """Outbound MAVLink byte-stream destination (AZ-400 Protocol seam). + + The contract (replay_protocol.md v2.0.0 Invariant 5) splits the C8 + outbound code path into two halves: an *encoder* half (per-message + `gps_input_send` / `statustext_send` / `command_long_send` calls + that produce MAVLink 2.0 byte streams) and a *transport* half that + decides where those bytes go (a real serial UART in live mode, a + drop-on-the-floor sink in replay). + + Concrete strategies: + + * :class:`SerialMavlinkTransport` — wraps a ``pymavlink`` + ``mavutil.mavlink_connection`` open on the FC's UART (live mode). + * :class:`NoopMavlinkTransport` — counts the bytes the encoders + try to send and discards them (replay mode + Invariant 5 + verification + AC-9 byte-count check). + + Only :func:`gps_denied_onboard.runtime_root.compose_root` may + instantiate transports; component code consumes them via + constructor injection so the strategy is mode-agnostic from the + encoder's point of view. + """ + + def write(self, payload: bytes) -> int: + """Write ``payload`` to the transport; return the byte count consumed. + + Must accept any byte length (encoders may issue zero-length + flushes during the MAVLink 2.0 signing handshake). Implementors + that fail mid-write must raise (do NOT return a short count) so + the caller can decide whether the link is dead. + """ + ... + + def bytes_written(self) -> int: + """Cumulative byte count the transport has accepted since open. + + Used by AC-9 of AZ-401 to verify the encoder code path actually + ran in replay mode (and by live-side health checks to detect a + completely silent UART). + """ + ... + + def close(self) -> None: + """Close the underlying transport; idempotent.""" + ... @runtime_checkable diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/noop_mavlink_transport.py b/src/gps_denied_onboard/components/c8_fc_adapter/noop_mavlink_transport.py new file mode 100644 index 0000000..c28cdaa --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/noop_mavlink_transport.py @@ -0,0 +1,106 @@ +"""``NoopMavlinkTransport`` — replay-mode outbound byte sink (AZ-400). + +Replay-mode strategy for the :class:`MavlinkTransport` Protocol. Counts +every byte the C8 outbound encoders try to send and discards the +payload. Used by ``compose_root`` in ``config.mode == "replay"`` so the +encoders' code path can be exercised in replay tests without opening a +real serial UART. + +Build-time gating: the transport refuses construction unless +``BUILD_REPLAY_SINK_JSONL`` is ON. The flag is shared with the +``JsonlReplaySink`` because both answer the same question — "where do +the airborne binary's outbound side-effects go in replay?" — and the +replay binary always wants both ON together. + +Thread-safety: ``write`` and ``bytes_written`` are guarded by a lock so +concurrent encoder threads (the live binary's outbound thread + a +diagnostic emit thread) do not race the counter. Replay's runtime loop +is single-threaded, but the lock costs ~100 ns and prevents test-side +surprises (mirrors :class:`JsonlReplaySink`). +""" + +from __future__ import annotations + +import os +import threading +from typing import Final + +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + MavlinkTransportConfigError, + MavlinkTransportError, +) +from gps_denied_onboard.logging import get_logger + +__all__ = ["NoopMavlinkTransport"] + + +_BUILD_FLAG: Final[str] = "BUILD_REPLAY_SINK_JSONL" +_LOG_KIND_OPENED: Final[str] = "replay.transport.noop_opened" +_LOG_KIND_CLOSED: Final[str] = "replay.transport.noop_closed" +_LOG_KIND_DOUBLE_CLOSE: Final[str] = "replay.transport.noop_double_close" + + +def _build_flag_on() -> bool: + raw = os.environ.get(_BUILD_FLAG, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +class NoopMavlinkTransport: + """Drop-on-the-floor :class:`MavlinkTransport` for replay mode. + + Counts the bytes the C8 outbound encoders attempt to write; never + raises on the write path. Idempotent close. + """ + + __slots__ = ("_log", "_lock", "_bytes_written", "_closed") + + def __init__(self) -> None: + if not _build_flag_on(): + raise MavlinkTransportConfigError( + f"{_BUILD_FLAG} is OFF in this binary; NoopMavlinkTransport is " + "unavailable. Rebuild with the flag set to ON in the airborne " + "Dockerfile." + ) + self._log = get_logger("c8_fc_adapter.noop_mavlink_transport") + self._lock = threading.Lock() + self._bytes_written = 0 + self._closed = False + self._log.info( + _LOG_KIND_OPENED, + extra={"kind": _LOG_KIND_OPENED, "kv": {}}, + ) + + def write(self, payload: bytes) -> int: + if not isinstance(payload, (bytes, bytearray, memoryview)): + raise MavlinkTransportError( + "NoopMavlinkTransport.write expects bytes-like; got " + f"{type(payload).__name__}" + ) + with self._lock: + if self._closed: + raise MavlinkTransportError("write on closed NoopMavlinkTransport") + n = len(payload) + self._bytes_written += n + return n + + def bytes_written(self) -> int: + with self._lock: + return self._bytes_written + + def close(self) -> None: + with self._lock: + if self._closed: + self._log.debug( + _LOG_KIND_DOUBLE_CLOSE, + extra={"kind": _LOG_KIND_DOUBLE_CLOSE, "kv": {}}, + ) + return + self._closed = True + total = self._bytes_written + self._log.info( + _LOG_KIND_CLOSED, + extra={ + "kind": _LOG_KIND_CLOSED, + "kv": {"bytes_written": total}, + }, + ) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py b/src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py index 3351f64..6205d8e 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py @@ -360,8 +360,8 @@ def create(*, output_path: Path, fdr_client: "FdrClient") -> JsonlReplaySink: """Module-level factory entrypoint per project convention. Mirrors the ``create`` factories used by the C2/C3 strategies so - the AZ-401 ``compose_replay`` wiring resolves the sink through a - single named-symbol contract instead of poking at the class - constructor directly. + the AZ-401 replay-mode branch of ``compose_root`` resolves the + sink through a single named-symbol contract instead of poking at + the class constructor directly. """ return JsonlReplaySink(output_path=output_path, fdr_client=fdr_client) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/serial_mavlink_transport.py b/src/gps_denied_onboard/components/c8_fc_adapter/serial_mavlink_transport.py new file mode 100644 index 0000000..dc8645a --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/serial_mavlink_transport.py @@ -0,0 +1,124 @@ +"""``SerialMavlinkTransport`` — live-mode outbound byte sink (AZ-400). + +Live-mode strategy for the :class:`MavlinkTransport` Protocol. Wraps a +``pymavlink`` ``mavutil.mavlink_connection`` so the C8 outbound +encoders can write through a typed transport seam instead of poking the +connection directly. + +The existing :class:`PymavlinkArdupilotAdapter` / :class:`Msp2InavAdapter` +encoders still call ``self._connection.mav.gps_input_send(...)`` etc. +directly; the full retrofit that routes those calls through this +transport is tracked separately (see the AZ-401 batch report — the +encoder rewrite is deferred to keep this commit's blast radius +bounded). This module ships the typed surface so + +* :func:`gps_denied_onboard.runtime_root.compose_root` in live mode can + construct it under the same registry slot the replay branch uses for + :class:`NoopMavlinkTransport` (replay protocol Invariant 5 surface + parity); and +* future AP/iNav/QGC encoder edits route their per-message ``write`` + calls through here without touching the composition root. + +The class is intentionally minimal — it forwards ``write(payload)`` to +the underlying pymavlink connection's ``write`` method (every +``mavlink_connection`` returned by ``mavutil.mavlink_connection`` is a +file-like object exposing ``.write(bytes) -> int``) and tracks a +running byte count for parity with :class:`NoopMavlinkTransport`. +""" + +from __future__ import annotations + +import threading +from typing import TYPE_CHECKING, Any, Final + +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + MavlinkTransportError, +) +from gps_denied_onboard.logging import get_logger + +if TYPE_CHECKING: + pass + +__all__ = ["SerialMavlinkTransport"] + + +_LOG_KIND_OPENED: Final[str] = "live.transport.serial_opened" +_LOG_KIND_CLOSED: Final[str] = "live.transport.serial_closed" +_LOG_KIND_DOUBLE_CLOSE: Final[str] = "live.transport.serial_double_close" + + +class SerialMavlinkTransport: + """:class:`MavlinkTransport` over a pymavlink serial connection. + + Constructor injects an already-open ``mavutil.mavlink_connection`` + object so the connection lifecycle (port open, signing handshake, + reconnection on disconnect) stays owned by the existing + :class:`PymavlinkArdupilotAdapter` / :class:`Msp2InavAdapter`. The + transport itself does not open the connection — that ownership + boundary keeps this commit a no-op restructure for live wiring. + """ + + __slots__ = ("_connection", "_log", "_lock", "_bytes_written", "_closed") + + def __init__(self, connection: Any) -> None: + if connection is None: + raise MavlinkTransportError( + "SerialMavlinkTransport requires an open pymavlink connection" + ) + write = getattr(connection, "write", None) + if not callable(write): + raise MavlinkTransportError( + "SerialMavlinkTransport.connection must expose a callable " + ".write(bytes) -> int; got " + f"{type(connection).__name__}" + ) + self._connection = connection + self._log = get_logger("c8_fc_adapter.serial_mavlink_transport") + self._lock = threading.Lock() + self._bytes_written = 0 + self._closed = False + self._log.info( + _LOG_KIND_OPENED, + extra={"kind": _LOG_KIND_OPENED, "kv": {}}, + ) + + def write(self, payload: bytes) -> int: + if not isinstance(payload, (bytes, bytearray, memoryview)): + raise MavlinkTransportError( + "SerialMavlinkTransport.write expects bytes-like; got " + f"{type(payload).__name__}" + ) + with self._lock: + if self._closed: + raise MavlinkTransportError("write on closed SerialMavlinkTransport") + try: + returned = self._connection.write(bytes(payload)) + except OSError as exc: + raise MavlinkTransportError( + f"SerialMavlinkTransport underlying write failed: {exc!r}" + ) from exc + n = int(returned) if returned is not None else len(payload) + self._bytes_written += n + return n + + def bytes_written(self) -> int: + with self._lock: + return self._bytes_written + + def close(self) -> None: + with self._lock: + if self._closed: + self._log.debug( + _LOG_KIND_DOUBLE_CLOSE, + extra={"kind": _LOG_KIND_DOUBLE_CLOSE, "kv": {}}, + ) + return + self._closed = True + total = self._bytes_written + self._log.info( + _LOG_KIND_CLOSED, + extra={ + "kind": _LOG_KIND_CLOSED, + "kv": {"bytes_written": total}, + }, + ) diff --git a/src/gps_denied_onboard/config/__init__.py b/src/gps_denied_onboard/config/__init__.py index 3590f3e..564a6c1 100644 --- a/src/gps_denied_onboard/config/__init__.py +++ b/src/gps_denied_onboard/config/__init__.py @@ -3,8 +3,10 @@ from gps_denied_onboard.config.loader import ENV_KEY_MAP, load_config from gps_denied_onboard.config.schema import ( DEFAULT_FORBIDDEN_RECORD_KINDS, + KNOWN_FC_DIALECTS, KNOWN_FC_STRATEGIES, KNOWN_GCS_STRATEGIES, + KNOWN_REPLAY_PACES, Config, ConfigError, FcConfig, @@ -13,6 +15,8 @@ from gps_denied_onboard.config.schema import ( GcsConfig, LogConfig, RecordKindPolicyConfig, + ReplayAutoSyncConfig, + ReplayConfig, RequiredFieldMissingError, RuntimeConfig, TileSnapshotConfig, @@ -22,8 +26,10 @@ from gps_denied_onboard.config.schema import ( __all__ = [ "DEFAULT_FORBIDDEN_RECORD_KINDS", "ENV_KEY_MAP", + "KNOWN_FC_DIALECTS", "KNOWN_FC_STRATEGIES", "KNOWN_GCS_STRATEGIES", + "KNOWN_REPLAY_PACES", "Config", "ConfigError", "FcConfig", @@ -32,6 +38,8 @@ __all__ = [ "GcsConfig", "LogConfig", "RecordKindPolicyConfig", + "ReplayAutoSyncConfig", + "ReplayConfig", "RequiredFieldMissingError", "RuntimeConfig", "TileSnapshotConfig", diff --git a/src/gps_denied_onboard/config/loader.py b/src/gps_denied_onboard/config/loader.py index 72df0a8..8a86f5a 100644 --- a/src/gps_denied_onboard/config/loader.py +++ b/src/gps_denied_onboard/config/loader.py @@ -23,10 +23,13 @@ import yaml from gps_denied_onboard.config.schema import ( _COMPONENT_REGISTRY, Config, + ConfigError, FcConfig, FdrConfig, GcsConfig, LogConfig, + ReplayAutoSyncConfig, + ReplayConfig, RequiredFieldMissingError, RuntimeConfig, _replace_block, @@ -64,6 +67,13 @@ ENV_KEY_MAP: Final[dict[str, tuple[str, str]]] = { "GCS_PORT_DEVICE": ("gcs", "port_device"), "GCS_PORT_BAUD": ("gcs", "port_baud"), "GCS_SUMMARY_RATE_HZ": ("gcs", "summary_rate_hz"), + # Replay block (AZ-401) + "REPLAY_VIDEO_PATH": ("replay", "video_path"), + "REPLAY_TLOG_PATH": ("replay", "tlog_path"), + "REPLAY_OUTPUT_PATH": ("replay", "output_path"), + "REPLAY_PACE": ("replay", "pace"), + "REPLAY_TIME_OFFSET_MS": ("replay", "time_offset_ms"), + "REPLAY_TARGET_FC_DIALECT": ("replay", "target_fc_dialect"), } # Env vars that MUST resolve to a non-empty value before `load_config` @@ -106,6 +116,12 @@ _FIELD_COERCIONS: Final[dict[str, type]] = { "spoof_recovery_source_set": int, "source_set_switch_timeout_ms": int, "summary_rate_hz": float, + # Replay block coercions (AZ-401) + "video_path": str, + "tlog_path": str, + "output_path": str, + "pace": str, + "target_fc_dialect": str, } @@ -121,8 +137,91 @@ def _coerce_value(field_name: str, raw: Any) -> Any: ) from exc +def _coerce_optional_int(field_name: str, raw: Any) -> int | None: + """Coerce ``raw`` to ``int`` or ``None`` (empty / null sentinels become ``None``).""" + if raw is None: + return None + if isinstance(raw, str) and raw.strip() == "": + return None + if isinstance(raw, int) and not isinstance(raw, bool): + return raw + try: + return int(raw) + except (TypeError, ValueError) as exc: + raise RequiredFieldMissingError( + f"config field {field_name!r}: cannot coerce {raw!r} to int ({exc})" + ) from exc + + +def _build_replay_block(overrides: Mapping[str, Any]) -> ReplayConfig: + """Build a :class:`ReplayConfig` from YAML/env overrides. + + Handles two non-trivial coercions the generic path cannot: + + * ``time_offset_ms`` — ``int | None`` (empty string / None → None). + * ``auto_sync`` — nested mapping → :class:`ReplayAutoSyncConfig`. + """ + flat: dict[str, Any] = {} + auto_sync_overrides: Mapping[str, Any] = {} + for key, value in overrides.items(): + if key == "auto_sync": + if value is None: + continue + if not isinstance(value, Mapping): + raise ConfigError( + f"replay.auto_sync must be a mapping; got {type(value).__name__}" + ) + auto_sync_overrides = value + continue + if key == "time_offset_ms": + flat[key] = _coerce_optional_int(key, value) + continue + flat[key] = _coerce_value(key, value) + auto_sync_block = _replace_block( + ReplayAutoSyncConfig(), + {k: _coerce_replay_auto_sync_field(k, v) for k, v in auto_sync_overrides.items()}, + ) + flat["auto_sync"] = auto_sync_block + return _replace_block(ReplayConfig(), flat) + + +_REPLAY_AUTO_SYNC_TYPES: Final[dict[str, type]] = { + "takeoff_accel_threshold_g": float, + "takeoff_attitude_rate_threshold_rad_s": float, + "sustained_seconds": float, + "prescan_max_messages": int, + "video_motion_threshold": float, + "video_motion_scan_seconds": float, + "match_threshold_pct": float, + "match_window_ms": int, + "low_confidence_threshold": float, +} + + +def _coerce_replay_auto_sync_field(field_name: str, raw: Any) -> Any: + target_type = _REPLAY_AUTO_SYNC_TYPES.get(field_name) + if target_type is None or isinstance(raw, target_type): + return raw + try: + return target_type(raw) + except (TypeError, ValueError) as exc: + raise RequiredFieldMissingError( + f"config field replay.auto_sync.{field_name}: cannot coerce {raw!r} " + f"to {target_type.__name__} ({exc})" + ) from exc + + +_TOP_LEVEL_SCALAR_FIELDS: Final[frozenset[str]] = frozenset({"mode"}) + + def _load_yaml_files(paths: Sequence[Path]) -> dict[str, dict[str, Any]]: - """Merge YAML files in order: later paths win for the same block + field.""" + """Merge YAML files in order: later paths win for the same block + field. + + Top-level scalar fields named in :data:`_TOP_LEVEL_SCALAR_FIELDS` + (currently ``mode``) are collected under the synthetic ``__top__`` + block so the ``Config`` outer fields can be overridden alongside + the nested cross-cutting / component blocks. + """ merged: dict[str, dict[str, Any]] = {} for path in paths: data = yaml.safe_load(path.read_text()) or {} @@ -131,6 +230,9 @@ def _load_yaml_files(paths: Sequence[Path]) -> dict[str, dict[str, Any]]: f"YAML at {path} must be a mapping at the top level; got {type(data).__name__}" ) for block_name, block_value in data.items(): + if block_name in _TOP_LEVEL_SCALAR_FIELDS: + merged.setdefault("__top__", {})[block_name] = block_value + continue if not isinstance(block_value, dict): continue merged.setdefault(block_name, {}).update(block_value) @@ -193,6 +295,11 @@ def load_config( GcsConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("gcs", {}).items()}, ) + replay_block = _build_replay_block(yaml_overrides.get("replay", {})) + raw_mode = yaml_overrides.get("__top__", {}).get("mode") + if raw_mode is None: + raw_mode = env.get("MODE", "live") + mode = str(raw_mode).strip().lower() component_blocks = _resolve_component_blocks() for slug, dataclass_type in _COMPONENT_REGISTRY.items(): @@ -209,5 +316,7 @@ def load_config( fdr=fdr_block, fc=fc_block, gcs=gcs_block, + replay=replay_block, + mode=mode, # type: ignore[arg-type] # validated by Config.__post_init__ components=component_blocks, ) diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index dcc6a1f..13da2c6 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -12,12 +12,14 @@ from __future__ import annotations from collections.abc import Mapping from dataclasses import dataclass, field, fields, is_dataclass, replace -from typing import Any, Final +from typing import Any, Final, Literal __all__ = [ "DEFAULT_FORBIDDEN_RECORD_KINDS", + "KNOWN_FC_DIALECTS", "KNOWN_FC_STRATEGIES", "KNOWN_GCS_STRATEGIES", + "KNOWN_REPLAY_PACES", "Config", "ConfigError", "FcConfig", @@ -26,6 +28,8 @@ __all__ = [ "GcsConfig", "LogConfig", "RecordKindPolicyConfig", + "ReplayAutoSyncConfig", + "ReplayConfig", "RequiredFieldMissingError", "RuntimeConfig", "TileSnapshotConfig", @@ -35,6 +39,8 @@ __all__ = [ KNOWN_FC_STRATEGIES: Final[frozenset[str]] = frozenset({"ardupilot_plane", "inav"}) KNOWN_GCS_STRATEGIES: Final[frozenset[str]] = frozenset({"qgc_mavlink"}) +KNOWN_REPLAY_PACES: Final[frozenset[str]] = frozenset({"asap", "realtime"}) +KNOWN_FC_DIALECTS: Final[frozenset[str]] = frozenset({"ardupilot_plane", "inav"}) # Default raw-frame kinds that AZ-295's RecordKindPolicy must reject @@ -289,6 +295,98 @@ class RuntimeConfig: tile_cache_path: str = "/var/lib/gps-denied/tiles" +@dataclass(frozen=True) +class ReplayAutoSyncConfig: + """Operator-tunable thresholds for the AZ-405 auto-sync detector. + + Mirrors the ``AutoSyncConfig`` DTO in + :mod:`gps_denied_onboard.replay_input.interface`; lives here so the + YAML loader can populate it without importing the Layer-4 replay + package (which would create a config → replay_input → config cycle). + The composition root translates this block into the matching + ``AutoSyncConfig`` instance when it builds the + :class:`ReplayInputAdapter`. + + All fields default to the contract values in + ``_docs/02_document/contracts/replay/replay_protocol.md`` v2.0.0. + """ + + takeoff_accel_threshold_g: float = 0.5 + takeoff_attitude_rate_threshold_rad_s: float = 1.0 + sustained_seconds: float = 0.5 + prescan_max_messages: int = 6000 + video_motion_threshold: float = 1.5 + video_motion_scan_seconds: float = 10.0 + match_threshold_pct: float = 95.0 + match_window_ms: int = 100 + low_confidence_threshold: float = 0.80 + + +@dataclass(frozen=True) +class ReplayConfig: + """Replay-mode runtime descriptors (AZ-401 / E-DEMO-REPLAY). + + Read by :func:`gps_denied_onboard.runtime_root.compose_root` only + when the outer :attr:`Config.mode` is ``"replay"``. Live mode + ignores every field — the block exists as a default-constructed + placeholder so the outer :class:`Config` shape stays stable across + modes. + + Validation here is shape-only: the unknown-pace / unknown-dialect + cases reject early. Path existence is verified by the composition + root because YAML may legally reference paths injected at runtime. + + Attributes: + video_path: Filesystem path to the replay video (``.mp4`` / + ``.h264``). Empty string is the default sentinel; a + non-empty value is required when ``mode == "replay"``. + tlog_path: Filesystem path to the matching pymavlink ``.tlog``. + Empty string is the default sentinel. + output_path: Filesystem path the :class:`JsonlReplaySink` will + write to. Default points at ``/tmp/replay.jsonl`` for + developer ergonomics; production wiring overrides via the + CLI ``--output`` flag. + pace: One of :data:`KNOWN_REPLAY_PACES`. ``"asap"`` selects + :class:`TlogDerivedClock`; ``"realtime"`` selects + :class:`WallClock`. + time_offset_ms: Manual override for the video-vs-tlog offset. + ``None`` means "run AZ-405 auto-sync"; an integer value + bypasses auto-sync entirely. + target_fc_dialect: One of :data:`KNOWN_FC_DIALECTS`; controls + which pymavlink dialect the :class:`TlogReplayFcAdapter` + decodes. + auto_sync: Operator-tunable thresholds for the AZ-405 + auto-sync detector. + """ + + video_path: str = "" + tlog_path: str = "" + output_path: str = "/tmp/replay.jsonl" + pace: str = "asap" + time_offset_ms: int | None = None + target_fc_dialect: str = "ardupilot_plane" + auto_sync: ReplayAutoSyncConfig = field(default_factory=ReplayAutoSyncConfig) + + def __post_init__(self) -> None: + if self.pace not in KNOWN_REPLAY_PACES: + raise ConfigError( + f"ReplayConfig.pace={self.pace!r} not in " + f"{sorted(KNOWN_REPLAY_PACES)}" + ) + if self.target_fc_dialect not in KNOWN_FC_DIALECTS: + raise ConfigError( + f"ReplayConfig.target_fc_dialect={self.target_fc_dialect!r} " + f"not in {sorted(KNOWN_FC_DIALECTS)}" + ) + if self.time_offset_ms is not None and not isinstance( + self.time_offset_ms, int + ): + raise ConfigError( + "ReplayConfig.time_offset_ms must be int or None; " + f"got {type(self.time_offset_ms).__name__}" + ) + + # Documented defaults for cross-cutting blocks ONLY. Per-component defaults # live with their own component epic. The registry below is the single # source of truth so two components cannot silently claim the same key. @@ -298,6 +396,7 @@ _DEFAULT_BLOCKS: Final[dict[str, type]] = { "runtime": RuntimeConfig, "fc": FcConfig, "gcs": GcsConfig, + "replay": ReplayConfig, } @@ -341,6 +440,14 @@ class Config: Components consume only their own slice via ``config.components[slug]``; the runtime / log / fdr cross-cutting blocks are read directly via attribute access by the composition root. + + The :attr:`mode` field selects between ``"live"`` (the default — + behaves exactly as the pre-AZ-401 binary) and ``"replay"`` (drives + the airborne binary off recorded video + tlog inputs per ADR-011 / + replay protocol v2.0.0). Replay-only configuration lives under + :attr:`replay`; the field is always present (default-constructed) + so the outer shape is stable, but its contents are ignored in live + mode. """ runtime: RuntimeConfig = field(default_factory=RuntimeConfig) @@ -348,14 +455,23 @@ class Config: fdr: FdrConfig = field(default_factory=FdrConfig) fc: FcConfig = field(default_factory=FcConfig) gcs: GcsConfig = field(default_factory=GcsConfig) + replay: ReplayConfig = field(default_factory=ReplayConfig) + mode: Literal["live", "replay"] = "live" components: Mapping[str, Any] = field(default_factory=dict) + def __post_init__(self) -> None: + if self.mode not in ("live", "replay"): + raise ConfigError( + f"Config.mode={self.mode!r} not in ('live', 'replay')" + ) + @classmethod def with_blocks(cls, **blocks: Any) -> Config: """Build a `Config` from a flat name-to-instance map. - Cross-cutting names (``log``, ``fdr``, ``runtime``, ``fc``, ``gcs``) - become attributes; every other key is treated as a component slug + Cross-cutting names (``log``, ``fdr``, ``runtime``, ``fc``, + ``gcs``, ``replay``) become attributes; ``mode`` is also a + recognised key. Every other key is treated as a component slug and goes into ``components``. """ runtime = blocks.pop("runtime", RuntimeConfig()) @@ -363,12 +479,16 @@ class Config: fdr = blocks.pop("fdr", FdrConfig()) fc = blocks.pop("fc", FcConfig()) gcs = blocks.pop("gcs", GcsConfig()) + replay = blocks.pop("replay", ReplayConfig()) + mode = blocks.pop("mode", "live") return cls( runtime=runtime, log=log, fdr=fdr, fc=fc, gcs=gcs, + replay=replay, + mode=mode, components=dict(blocks), ) diff --git a/src/gps_denied_onboard/runtime_root/__init__.py b/src/gps_denied_onboard/runtime_root/__init__.py index c862df8..43ac501 100644 --- a/src/gps_denied_onboard/runtime_root/__init__.py +++ b/src/gps_denied_onboard/runtime_root/__init__.py @@ -7,9 +7,14 @@ the component graph in dependency order. Per-binary entrypoints: -* :func:`compose_root` - airborne runtime +* :func:`compose_root` - airborne runtime; serves both ``config.mode == "live"`` + and ``config.mode == "replay"`` per ADR-011 (replay-as-configuration) * :func:`compose_operator` - operator-side tooling (pre-flight, post-landing) -* :func:`compose_replay` - replay-cli runtime (extension owned by AZ-401) + +Replay is a configuration of :func:`compose_root`, not a separate function: +the branch on ``config.mode`` lives in :mod:`._replay_branch`. The legacy +``compose_replay`` export was removed by AZ-401 (ADR-011 supersedes the +v1.0.0 "replay is a sibling root" design). Public surface frozen by ``_docs/02_document/contracts/shared_config/composition_root_protocol.md`` v1.0.0. @@ -24,6 +29,10 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Final, Literal, get_args from gps_denied_onboard.config import Config, load_config +from gps_denied_onboard.runtime_root._replay_branch import ( + CompositionError, + build_replay_components, +) from gps_denied_onboard.runtime_root.c12_factory import ( build_flights_api_client, ) @@ -67,6 +76,7 @@ __all__ = [ "EXIT_FDR_OPEN_FAILURE", "EXIT_GENERIC_FAILURE", "REQUIRED_ENV_VARS", + "CompositionError", "ConfigurationError", "OperatorRoot", "OutboundThreadAlreadyBoundError", @@ -91,7 +101,6 @@ __all__ = [ "clear_strategy_registries", "clear_strategy_registry", "compose_operator", - "compose_replay", "compose_root", "list_registered_fc_strategies", "list_registered_gcs_strategies", @@ -317,8 +326,17 @@ def _compose( binary: str, allowed_tiers: frozenset[StrategyTier], extra_required_env: Iterable[str], + pre_constructed: Mapping[str, Any] | None = None, ) -> tuple[dict[str, Any], tuple[str, ...]]: - """Shared composition path used by ``compose_root`` / ``compose_operator``.""" + """Shared composition path used by ``compose_root`` / ``compose_operator``. + + ``pre_constructed`` lets the caller seed the ``constructed`` dict + before any registered factory runs — used by the replay-mode branch + of :func:`compose_root` to inject the cross-cutting replay + strategies (``frame_source``, ``fc_adapter``, ``clock``, + ``mavlink_transport``, ``replay_sink``) so any C1-C7 factory that + declares a dependency on one finds it already populated. + """ _check_required_env(extra_required=extra_required_env) selections = _resolve_component_strategies(config, allowed_tiers) resolved: dict[str, _Registration] = { @@ -326,7 +344,9 @@ def _compose( for slug, strategy in selections.items() } order = _topo_order(resolved.keys(), resolved) - constructed: dict[str, Any] = {} + constructed: dict[str, Any] = ( + dict(pre_constructed) if pre_constructed is not None else {} + ) for slug in order: registration = resolved[slug] try: @@ -336,7 +356,11 @@ def _compose( _close_partial_instances(constructed) raise _ = binary # documented but unused beyond labelling the returned root - return constructed, tuple(order) + # Returned components include only the registry-driven strategies — the + # caller is responsible for merging the pre_constructed dict back in if + # it wants a single combined view. + registry_built = {slug: constructed[slug] for slug in order} + return registry_built, tuple(order) def _close_partial_instances(instances: Mapping[str, Any]) -> None: @@ -392,19 +416,61 @@ def _read_strategy_attr(block: Any) -> Any: return None -def compose_root(config: Config) -> RuntimeRoot: - """Compose the airborne runtime graph (per contract v1.0.0).""" +def compose_root( + config: Config, + *, + replay_components_factory: Any | None = None, +) -> RuntimeRoot: + """Compose the airborne runtime graph for ``config.mode``. + + With ``config.mode == "live"`` (the default) the function behaves + exactly as the pre-AZ-401 implementation — every wiring decision is + driven by ``config.components[slug].strategy`` against the strategy + registry, gated by the airborne tier. + + With ``config.mode == "replay"`` the function additionally builds + the five replay-only strategies (``frame_source``, ``fc_adapter``, + ``clock``, ``mavlink_transport``, ``replay_sink``) per + :mod:`._replay_branch` and merges them into the components dict + BEFORE the registry-driven C1-C7+C13 strategies run, so any + component factory that consumes one of the five via ``constructed`` + finds it already populated. C1-C7+C13 strategies are wired + identically to live mode (replay protocol Invariant 1). + + The ``replay_components_factory`` keyword is a test-only injection + point — production callers omit it. Tests pass a callable returning + ``(components, construction_order)`` so the unit suite does not + have to satisfy the full OpenCV / pymavlink / FDR side-effects of + the real strategies. + """ + extra_env = ( + ("MAVLINK_SIGNING_KEY",) + if config.mode == "live" + else () + ) + if config.mode == "replay": + replay_factory = replay_components_factory or build_replay_components + replay_components, replay_order = replay_factory(config) + else: + replay_components = {} + replay_order = () components, order = _compose( config, binary="airborne", allowed_tiers=frozenset({"airborne", "shared"}), - extra_required_env=("MAVLINK_SIGNING_KEY",), + extra_required_env=extra_env, + pre_constructed=replay_components, + ) + merged: dict[str, Any] = dict(replay_components) + merged.update(components) + full_order = tuple(replay_order) + tuple( + slug for slug in order if slug not in replay_order ) return RuntimeRoot( binary="airborne", profile=os.environ["GPS_DENIED_FC_PROFILE"], - components=components, - construction_order=order, + components=merged, + construction_order=full_order, ) @@ -424,22 +490,6 @@ def compose_operator(config: Config) -> OperatorRoot: ) -def compose_replay(config: Config) -> RuntimeRoot: - """Compose the replay-cli runtime graph. Concrete wiring is owned by AZ-401.""" - components, order = _compose( - config, - binary="replay-cli", - allowed_tiers=frozenset({"airborne", "shared"}), - extra_required_env=(), - ) - return RuntimeRoot( - binary="replay-cli", - profile=os.environ["GPS_DENIED_FC_PROFILE"], - components=components, - construction_order=order, - ) - - @dataclass(frozen=True) class TakeoffResult: """Successful takeoff: writer is open, FC adapter is wired, components started. diff --git a/src/gps_denied_onboard/runtime_root/_replay_branch.py b/src/gps_denied_onboard/runtime_root/_replay_branch.py new file mode 100644 index 0000000..3ca3fbd --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/_replay_branch.py @@ -0,0 +1,329 @@ +"""Replay-mode branch of :func:`compose_root` (AZ-401 / E-DEMO-REPLAY). + +Internal module. Owns the wiring that turns a ``config.mode == "replay"`` +:class:`Config` into a :class:`RuntimeRoot` whose components dict carries +the replay-only strategies (``frame_source``, ``fc_adapter``, ``clock``, +``mavlink_transport``, ``replay_sink``) plus whatever C1-C7+C13 strategies +the binary's bootstrap registered against +:data:`gps_denied_onboard.runtime_root._STRATEGY_REGISTRY`. + +Per replay protocol v2.0.0 (ADR-011): replay is a configuration of the +single airborne composition root, not a sibling root. The branch lives +in this module to keep ``runtime_root/__init__.py`` focused on the +shared composition spine while still exposing exactly one +``compose_root(config)`` entrypoint. + +Build-flag gates (per replay protocol Invariant 9): + +- ``BUILD_VIDEO_FILE_FRAME_SOURCE`` — required for the + :class:`VideoFileFrameSource` instance returned by the coordinator. +- ``BUILD_TLOG_REPLAY_ADAPTER`` — required for the + :class:`TlogReplayFcAdapter` instance returned by the coordinator. +- ``BUILD_REPLAY_SINK_JSONL`` — shared by the JSONL sink and the noop + outbound transport. + +All three default ON in the airborne binary (per ADR-011); flipping any +OFF disables replay mode without affecting live mode. +""" + +from __future__ import annotations + +import json +import os +from collections.abc import Mapping +from pathlib import Path +from typing import TYPE_CHECKING, Any, Final + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.fc import FcKind +from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import ( + NoopMavlinkTransport, +) +from gps_denied_onboard.components.c8_fc_adapter.replay_sink import ( + JsonlReplaySink, +) +from gps_denied_onboard.config import Config +from gps_denied_onboard.fdr_client import make_fdr_client +from gps_denied_onboard.helpers.wgs_converter import WgsConverter +from gps_denied_onboard.logging import get_logger +from gps_denied_onboard.replay_input import ( + AutoSyncConfig, + ReplayInputAdapter, + ReplayInputBundle, +) +from gps_denied_onboard.replay_input.tlog_video_adapter import ReplayPace + +if TYPE_CHECKING: + from gps_denied_onboard.fdr_client.client import FdrClient + +__all__ = [ + "REPLAY_BUILD_FLAGS", + "REPLAY_COMPONENT_KEYS", + "CompositionError", + "build_replay_components", +] + + +_LOG_KIND_READY: Final[str] = "replay.compose_root.ready" + + +REPLAY_BUILD_FLAGS: Final[tuple[str, ...]] = ( + "BUILD_VIDEO_FILE_FRAME_SOURCE", + "BUILD_TLOG_REPLAY_ADAPTER", + "BUILD_REPLAY_SINK_JSONL", +) + + +REPLAY_COMPONENT_KEYS: Final[tuple[str, ...]] = ( + "frame_source", + "fc_adapter", + "clock", + "mavlink_transport", + "replay_sink", +) + + +class CompositionError(RuntimeError): + """Raised when the replay-mode branch refuses to compose a runtime. + + Carries the human-readable reason (build-flag OFF, missing path, + contradictory config) so the caller can surface it in the structured + log + on stderr without a second introspection pass. + """ + + +def build_replay_components( + config: Config, + *, + fdr_client_factory: Any | None = None, + replay_input_adapter_factory: Any | None = None, + sink_factory: Any | None = None, + transport_factory: Any | None = None, +) -> tuple[dict[str, Any], tuple[str, ...]]: + """Construct the replay-mode component dict + construction order. + + The factories are test-only injection points. Production callers + (just ``compose_root``) leave them ``None`` so the real constructors + run; unit tests pass fakes so they don't have to satisfy the full + OpenCV / pymavlink / FDR side-effects of the real strategies. + + Returns: + ``(components, construction_order)`` — the same shape + :func:`gps_denied_onboard.runtime_root._compose` returns. The + keys are the entries of :data:`REPLAY_COMPONENT_KEYS`; the + values are typed strategy instances. + """ + if config.mode != "replay": + raise CompositionError( + "build_replay_components called with non-replay config " + f"(mode={config.mode!r})" + ) + _validate_build_flags() + _validate_replay_paths(config) + + fdr_factory = fdr_client_factory or make_fdr_client + fdr_client = fdr_factory("replay_input", config) + + sink_fdr_client = fdr_factory("c8_fc_adapter.replay_sink", config) + + bundle = _build_replay_input_bundle( + config, + fdr_client=fdr_client, + adapter_factory=replay_input_adapter_factory, + ) + + if sink_factory is not None: + sink = sink_factory(config, sink_fdr_client) + else: + sink = JsonlReplaySink( + output_path=Path(config.replay.output_path), + fdr_client=sink_fdr_client, + ) + + if transport_factory is not None: + transport = transport_factory(config) + else: + transport = NoopMavlinkTransport() + + components: dict[str, Any] = { + "frame_source": bundle.frame_source, + "fc_adapter": bundle.fc_adapter, + "clock": bundle.clock, + "mavlink_transport": transport, + "replay_sink": sink, + } + + _log_ready(config, bundle) + return components, REPLAY_COMPONENT_KEYS + + +def _validate_build_flags() -> None: + """Refuse construction when any replay-mode ``BUILD_*`` flag is OFF.""" + for flag_name in REPLAY_BUILD_FLAGS: + raw = os.environ.get(flag_name, "ON").strip().upper() + if raw == "OFF": + raise CompositionError( + f"{flag_name} is OFF; replay mode requires it" + ) + + +def _validate_replay_paths(config: Config) -> None: + """Reject empty / missing replay paths early with a precise message.""" + if not config.replay.video_path: + raise CompositionError( + "config.replay.video_path is empty; replay mode requires a video path" + ) + if not config.replay.tlog_path: + raise CompositionError( + "config.replay.tlog_path is empty; replay mode requires a tlog path" + ) + if not config.replay.output_path: + raise CompositionError( + "config.replay.output_path is empty; replay mode requires an output path" + ) + + +def _build_replay_input_bundle( + config: Config, + *, + fdr_client: "FdrClient", + adapter_factory: Any | None, +) -> ReplayInputBundle: + """Build the :class:`ReplayInputAdapter` and call ``open()``.""" + pace = _resolve_pace(config.replay.pace) + target_fc_dialect = _resolve_fc_kind(config.replay.target_fc_dialect) + auto_sync = _build_auto_sync_config(config) + camera_calibration = _load_camera_calibration(config) + wgs_converter = WgsConverter() + + if adapter_factory is not None: + adapter = adapter_factory( + config=config, + camera_calibration=camera_calibration, + target_fc_dialect=target_fc_dialect, + wgs_converter=wgs_converter, + fdr_client=fdr_client, + pace=pace, + auto_sync_config=auto_sync, + ) + else: + adapter = ReplayInputAdapter( + video_path=Path(config.replay.video_path), + tlog_path=Path(config.replay.tlog_path), + camera_calibration=camera_calibration, + target_fc_dialect=target_fc_dialect, + wgs_converter=wgs_converter, + fdr_client=fdr_client, + pace=pace, + manual_time_offset_ms=config.replay.time_offset_ms, + auto_sync_config=auto_sync, + ) + return adapter.open() + + +def _resolve_pace(raw: str) -> ReplayPace: + if raw == "asap": + return ReplayPace.ASAP + if raw == "realtime": + return ReplayPace.REALTIME + raise CompositionError( + f"config.replay.pace={raw!r} not in ('asap', 'realtime')" + ) + + +def _resolve_fc_kind(raw: str) -> FcKind: + if raw == "ardupilot_plane": + return FcKind.ARDUPILOT_PLANE + if raw == "inav": + return FcKind.INAV + raise CompositionError( + f"config.replay.target_fc_dialect={raw!r} not in " + "('ardupilot_plane', 'inav')" + ) + + +def _build_auto_sync_config(config: Config) -> AutoSyncConfig: + block = config.replay.auto_sync + return AutoSyncConfig( + takeoff_accel_threshold_g=block.takeoff_accel_threshold_g, + takeoff_attitude_rate_threshold_rad_s=( + block.takeoff_attitude_rate_threshold_rad_s + ), + sustained_seconds=block.sustained_seconds, + prescan_max_messages=block.prescan_max_messages, + video_motion_threshold=block.video_motion_threshold, + video_motion_scan_seconds=block.video_motion_scan_seconds, + match_threshold_pct=block.match_threshold_pct, + match_window_ms=block.match_window_ms, + low_confidence_threshold=block.low_confidence_threshold, + ) + + +def _load_camera_calibration(config: Config) -> CameraCalibration: + """Read the camera calibration JSON into a :class:`CameraCalibration` DTO. + + The replay binary uses the SAME calibration file the live binary + loads; AZ-401 does not introduce a new on-disk format. + """ + import numpy as np + + path = config.runtime.camera_calibration_path + if not path: + raise CompositionError( + "config.runtime.camera_calibration_path is empty; replay mode " + "requires a camera calibration JSON" + ) + try: + blob = json.loads(Path(path).read_text(encoding="utf-8")) + except OSError as exc: + raise CompositionError( + f"failed to read camera calibration from {path!r}: {exc!r}" + ) from exc + except json.JSONDecodeError as exc: + raise CompositionError( + f"camera calibration {path!r} is not valid JSON: {exc!r}" + ) from exc + if not isinstance(blob, Mapping): + raise CompositionError( + f"camera calibration {path!r} must decode to a mapping; " + f"got {type(blob).__name__}" + ) + intrinsics = np.asarray(blob.get("intrinsics_3x3"), dtype=np.float64) + if intrinsics.shape != (3, 3): + raise CompositionError( + f"camera calibration {path!r} 'intrinsics_3x3' must be 3x3; " + f"got shape {intrinsics.shape}" + ) + distortion = np.asarray(blob.get("distortion", []), dtype=np.float64) + body_to_camera = np.asarray( + blob.get("body_to_camera_se3", np.eye(4).tolist()), + dtype=np.float64, + ) + return CameraCalibration( + camera_id=str(blob.get("camera_id", "replay-camera")), + intrinsics_3x3=intrinsics, + distortion=distortion, + body_to_camera_se3=body_to_camera, + acquisition_method=str(blob.get("acquisition_method", "operator")), + metadata=dict(blob.get("metadata", {})), + ) + + +def _log_ready(config: Config, bundle: ReplayInputBundle) -> None: + log = get_logger("runtime_root.replay_branch") + log.info( + f"{_LOG_KIND_READY}: pace={config.replay.pace} " + f"resolved_offset_ms={bundle.resolved_time_offset_ms}", + extra={ + "kind": _LOG_KIND_READY, + "kv": { + "video_path": config.replay.video_path, + "tlog_path": config.replay.tlog_path, + "output_path": config.replay.output_path, + "pace": config.replay.pace, + "resolved_offset_ms": bundle.resolved_time_offset_ms, + "calib_path": config.runtime.camera_calibration_path, + "auto_sync_used": bundle.auto_sync_result is not None, + }, + }, + ) diff --git a/tests/unit/c8_fc_adapter/test_az400_mavlink_transport.py b/tests/unit/c8_fc_adapter/test_az400_mavlink_transport.py new file mode 100644 index 0000000..92ef099 --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_az400_mavlink_transport.py @@ -0,0 +1,287 @@ +"""AZ-400 retrofit — `MavlinkTransport` Protocol + Noop / Serial impls. + +Covers the part of AZ-400 that the v1.0.0 sprint deferred: +the transport seam declared by the replay contract Invariant 5 and +required by AZ-401's ``compose_root`` replay branch (per +``_docs/02_document/contracts/replay/replay_protocol.md`` v2.0.0 lines +14, 109, 222, 237). + +Per-test references: + +- AC-Transport-1 — protocol conformance +- AC-Transport-2 — noop accepts every byte length, counts cumulatively +- AC-Transport-3 — serial forwards bytes through the underlying connection +- AC-Transport-4 — both raise on write-after-close +- AC-Transport-5 — close is idempotent +- AC-Transport-6 — build flag OFF refuses noop construction +- AC-Transport-7 — serial OSError surfaces as ``MavlinkTransportError`` +""" + +from __future__ import annotations + +from typing import Any +from unittest import mock + +import pytest + +from gps_denied_onboard.components.c8_fc_adapter import MavlinkTransport +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + MavlinkTransportConfigError, + MavlinkTransportError, +) +from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import ( + NoopMavlinkTransport, +) +from gps_denied_onboard.components.c8_fc_adapter.serial_mavlink_transport import ( + SerialMavlinkTransport, +) + + +# ---------------------------------------------------------------------- +# Fixtures + + +@pytest.fixture(autouse=True) +def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "ON") + + +class _FakeConnection: + """Stub for ``mavutil.mavlink_connection`` — exposes ``write(bytes)``.""" + + def __init__(self) -> None: + self.received: list[bytes] = [] + self.fail_with: Exception | None = None + + def write(self, data: bytes) -> int: + if self.fail_with is not None: + raise self.fail_with + self.received.append(bytes(data)) + return len(data) + + +# ---------------------------------------------------------------------- +# AC-Transport-1: Protocol conformance + + +def test_noop_transport_satisfies_protocol() -> None: + # Act + transport = NoopMavlinkTransport() + + # Assert + assert isinstance(transport, MavlinkTransport) + + +def test_serial_transport_satisfies_protocol() -> None: + # Arrange + conn = _FakeConnection() + + # Act + transport = SerialMavlinkTransport(connection=conn) + + # Assert + assert isinstance(transport, MavlinkTransport) + + +# ---------------------------------------------------------------------- +# AC-Transport-2: NoopMavlinkTransport accepts and counts bytes + + +def test_noop_transport_counts_cumulative_bytes() -> None: + # Arrange + transport = NoopMavlinkTransport() + + # Act + n1 = transport.write(b"abc") + n2 = transport.write(b"") + n3 = transport.write(b"defgh") + + # Assert + assert n1 == 3 + assert n2 == 0 + assert n3 == 5 + assert transport.bytes_written() == 8 + + +def test_noop_transport_accepts_bytes_like_views() -> None: + # Arrange + transport = NoopMavlinkTransport() + + # Act + transport.write(bytearray(b"abc")) + transport.write(memoryview(b"def")) + + # Assert + assert transport.bytes_written() == 6 + + +def test_noop_transport_rejects_non_bytes() -> None: + # Arrange + transport = NoopMavlinkTransport() + + # Act / Assert + with pytest.raises(MavlinkTransportError, match="bytes-like"): + transport.write("not-bytes") # type: ignore[arg-type] + + +# ---------------------------------------------------------------------- +# AC-Transport-3: SerialMavlinkTransport forwards bytes + + +def test_serial_transport_forwards_bytes_to_underlying_connection() -> None: + # Arrange + conn = _FakeConnection() + transport = SerialMavlinkTransport(connection=conn) + + # Act + n = transport.write(b"hello") + + # Assert + assert n == 5 + assert conn.received == [b"hello"] + assert transport.bytes_written() == 5 + + +def test_serial_transport_rejects_missing_write_method() -> None: + # Arrange + class _NoWrite: + pass + + # Act / Assert + with pytest.raises(MavlinkTransportError, match=r"\.write\(bytes\)"): + SerialMavlinkTransport(connection=_NoWrite()) + + +def test_serial_transport_rejects_none_connection() -> None: + # Act / Assert + with pytest.raises(MavlinkTransportError, match="open pymavlink connection"): + SerialMavlinkTransport(connection=None) + + +# ---------------------------------------------------------------------- +# AC-Transport-4: write after close raises + + +def test_noop_transport_write_after_close_raises() -> None: + # Arrange + transport = NoopMavlinkTransport() + transport.write(b"first") + transport.close() + + # Act / Assert + with pytest.raises(MavlinkTransportError, match="closed"): + transport.write(b"second") + + +def test_serial_transport_write_after_close_raises() -> None: + # Arrange + conn = _FakeConnection() + transport = SerialMavlinkTransport(connection=conn) + transport.write(b"first") + transport.close() + + # Act / Assert + with pytest.raises(MavlinkTransportError, match="closed"): + transport.write(b"second") + + +# ---------------------------------------------------------------------- +# AC-Transport-5: idempotent close + + +def test_noop_transport_close_is_idempotent() -> None: + # Arrange + transport = NoopMavlinkTransport() + + # Act + transport.close() + transport.close() # must not raise + + +def test_serial_transport_close_is_idempotent() -> None: + # Arrange + conn = _FakeConnection() + transport = SerialMavlinkTransport(connection=conn) + + # Act + transport.close() + transport.close() # must not raise + + +# ---------------------------------------------------------------------- +# AC-Transport-6: BUILD flag gating + + +def test_noop_transport_build_flag_off_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "OFF") + + # Act / Assert + with pytest.raises(MavlinkTransportConfigError, match="BUILD_REPLAY_SINK_JSONL is OFF"): + NoopMavlinkTransport() + + +# ---------------------------------------------------------------------- +# AC-Transport-7: SerialMavlinkTransport surfaces OSError as MavlinkTransportError + + +def test_serial_transport_oserror_wrapped() -> None: + # Arrange + conn = _FakeConnection() + conn.fail_with = OSError("device disconnected") + transport = SerialMavlinkTransport(connection=conn) + + # Act / Assert + with pytest.raises(MavlinkTransportError, match="underlying write failed"): + transport.write(b"abc") + + +# ---------------------------------------------------------------------- +# bytes_written reads safely after close + + +def test_noop_bytes_written_after_close_returns_total() -> None: + # Arrange + transport = NoopMavlinkTransport() + transport.write(b"abcd") + transport.close() + + # Assert + assert transport.bytes_written() == 4 + + +def test_serial_bytes_written_after_close_returns_total() -> None: + # Arrange + conn = _FakeConnection() + transport = SerialMavlinkTransport(connection=conn) + transport.write(b"abcdef") + transport.close() + + # Assert + assert transport.bytes_written() == 6 + + +# ---------------------------------------------------------------------- +# Smoke: serial transport handles ``returned is None`` from underlying write + + +def test_serial_transport_falls_back_to_payload_length_when_write_returns_none() -> None: + # Arrange + conn = mock.MagicMock(spec=["write"]) + conn.write.return_value = None + transport = SerialMavlinkTransport(connection=conn) + + # Act + n = transport.write(b"abcde") + + # Assert + assert n == 5 + assert transport.bytes_written() == 5 + + +# ---------------------------------------------------------------------- +# Smoke: ad-hoc Any annotation removes pytest unused-import warnings. + +_ = Any diff --git a/tests/unit/c8_fc_adapter/test_smoke.py b/tests/unit/c8_fc_adapter/test_smoke.py index 86a20a8..1a777a4 100644 --- a/tests/unit/c8_fc_adapter/test_smoke.py +++ b/tests/unit/c8_fc_adapter/test_smoke.py @@ -1,4 +1,9 @@ -"""C8 FC Adapter smoke test — AC-9 (legacy) + AZ-390 public-API gate.""" +"""C8 FC Adapter smoke test — AC-9 (legacy) + AZ-390 public-API gate. + +AZ-401 expands the public Protocol surface with ``MavlinkTransport``, +the outbound byte-stream seam shared by ``SerialMavlinkTransport`` +(live) and ``NoopMavlinkTransport`` (replay). +""" def test_interface_importable() -> None: @@ -7,10 +12,17 @@ def test_interface_importable() -> None: EmittedExternalPosition, FcAdapter, GcsAdapter, + MavlinkTransport, ReplaySink, ) - for sym in (FcAdapter, GcsAdapter, ReplaySink, EmittedExternalPosition): + for sym in ( + FcAdapter, + GcsAdapter, + ReplaySink, + EmittedExternalPosition, + MavlinkTransport, + ): assert sym is not None @@ -24,5 +36,6 @@ def test_internal_modules_not_in_public_all() -> None: "EmittedExternalPosition", "FcAdapter", "GcsAdapter", + "MavlinkTransport", "ReplaySink", } diff --git a/tests/unit/test_az401_compose_root_replay.py b/tests/unit/test_az401_compose_root_replay.py new file mode 100644 index 0000000..8b71722 --- /dev/null +++ b/tests/unit/test_az401_compose_root_replay.py @@ -0,0 +1,697 @@ +"""AZ-401 — `compose_root(config)` replay-mode branch unit tests. + +Verifies the contract at ``_docs/02_document/contracts/replay/replay_protocol.md`` +v2.0.0 §Composition Root + ADR-011 (replay-as-configuration). Covers +AC-1 through AC-10 of the AZ-401 task spec. + +AC-9 ("``NoopMavlinkTransport.bytes_written() > 0`` after the C8 outbound +encoders run") is recorded here as a known BLOCKED case: the existing +:class:`TlogReplayFcAdapter` (AZ-399) raises on every ``emit_external_position`` +call rather than routing the encoder bytes through a transport seam, so +the encoders never run in replay mode. Closing this gap requires the AP +/ iNav / QGC encoder retrofits that AZ-400 originally scoped but did +not deliver. See the batch 61 report for the deferral rationale. +""" + +from __future__ import annotations + +import ast +import json +from collections.abc import Iterator +from pathlib import Path +from typing import Any +from unittest import mock +from uuid import UUID, uuid4 + +import numpy as np +import pytest + +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard._types.state import EstimatorOutput, PoseSourceLabel, Quat +from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import ( + NoopMavlinkTransport, +) +from gps_denied_onboard.components.c8_fc_adapter.replay_sink import ( + JsonlReplaySink, +) +from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( + TlogReplayFcAdapter, +) +from gps_denied_onboard.config import ( + Config, + ReplayAutoSyncConfig, + ReplayConfig, + RuntimeConfig, +) +from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource +from gps_denied_onboard.replay_input.interface import ReplayInputBundle +from gps_denied_onboard.runtime_root import ( + CompositionError, + RuntimeRoot, + clear_strategy_registry, + compose_root, +) +from gps_denied_onboard.runtime_root._replay_branch import ( + REPLAY_BUILD_FLAGS, + REPLAY_COMPONENT_KEYS, + build_replay_components, +) + + +_REPO_ROOT = Path(__file__).resolve().parents[2] + + +# ---------------------------------------------------------------------- +# Shared fixtures + + +@pytest.fixture(autouse=True) +def _isolated_registry() -> Iterator[None]: + clear_strategy_registry() + yield + clear_strategy_registry() + + +@pytest.fixture +def _airborne_replay_env( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> Path: + """Set the env vars + replay BUILD_* flags compose_root needs. + + Returns the path of a synthetic camera calibration JSON the + ``compose_root`` replay branch will load. + """ + calib_path = tmp_path / "calib.json" + calib_path.write_text( + json.dumps( + { + "camera_id": "test-cam", + "intrinsics_3x3": np.eye(3).tolist(), + "distortion": [0.0, 0.0, 0.0, 0.0], + "body_to_camera_se3": np.eye(4).tolist(), + "acquisition_method": "operator", + "metadata": {}, + } + ) + ) + for name, value in ( + ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), + ("GPS_DENIED_TIER", "1"), + ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), + ("CAMERA_CALIBRATION_PATH", str(calib_path)), + ("LOG_LEVEL", "INFO"), + ("LOG_SINK", "console"), + ("INFERENCE_BACKEND", "pytorch_fp16"), + ("FDR_PATH", "/var/lib/gps-denied/fdr"), + ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), + ): + monkeypatch.setenv(name, value) + for flag in REPLAY_BUILD_FLAGS: + monkeypatch.setenv(flag, "ON") + return calib_path + + +@pytest.fixture +def _airborne_live_env(monkeypatch: pytest.MonkeyPatch) -> None: + for name, value in ( + ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), + ("GPS_DENIED_TIER", "1"), + ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), + ("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"), + ("LOG_LEVEL", "INFO"), + ("LOG_SINK", "console"), + ("INFERENCE_BACKEND", "pytorch_fp16"), + ("FDR_PATH", "/var/lib/gps-denied/fdr"), + ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), + ("MAVLINK_SIGNING_KEY", "ZZZZZZZZ"), + ): + monkeypatch.setenv(name, value) + + +def _make_replay_config( + *, + pace: str = "asap", + time_offset_ms: int | None = 0, + target_fc_dialect: str = "ardupilot_plane", + output_path: str = "/tmp/replay.jsonl", + calib_path: Path | None = None, +) -> Config: + runtime = ( + RuntimeConfig() + if calib_path is None + else RuntimeConfig(camera_calibration_path=str(calib_path)) + ) + replay = ReplayConfig( + video_path="/dev/null/fake.mp4", + tlog_path="/dev/null/fake.tlog", + output_path=output_path, + pace=pace, + time_offset_ms=time_offset_ms, + target_fc_dialect=target_fc_dialect, + auto_sync=ReplayAutoSyncConfig(), + ) + return Config(runtime=runtime, replay=replay, mode="replay") + + +def _make_replay_bundle( + *, + clock_kind: str = "tlog", +) -> ReplayInputBundle: + """Build a :class:`ReplayInputBundle` with mocked strategies. + + The strategies are real instances of the right classes (so AC-3 + ``isinstance`` checks pass) but with their internal init guards + bypassed via ``__new__`` because the production constructors open + OpenCV / pymavlink resources we don't want in the unit suite. + """ + fs = VideoFileFrameSource.__new__(VideoFileFrameSource) + fc = TlogReplayFcAdapter.__new__(TlogReplayFcAdapter) + if clock_kind == "tlog": + clock = TlogDerivedClock(source=iter([1_000_000_000, 2_000_000_000])) + else: + clock = WallClock() + return ReplayInputBundle( + frame_source=fs, + fc_adapter=fc, + clock=clock, + resolved_time_offset_ms=0, + auto_sync_result=None, + ) + + +def _fake_replay_components_factory( + *, + bundle: ReplayInputBundle, + sink: Any | None = None, + transport: Any | None = None, +) -> Any: + """Return a callable suitable for ``replay_components_factory``.""" + + def factory(_config: Config) -> tuple[dict[str, Any], tuple[str, ...]]: + components = { + "frame_source": bundle.frame_source, + "fc_adapter": bundle.fc_adapter, + "clock": bundle.clock, + "mavlink_transport": transport if transport is not None else NoopMavlinkTransport(), + "replay_sink": sink if sink is not None else mock.MagicMock(spec=JsonlReplaySink), + } + return components, REPLAY_COMPONENT_KEYS + + return factory + + +def _make_estimator_output(seq: int = 0) -> EstimatorOutput: + return EstimatorOutput( + frame_id=uuid4(), + position_wgs84=LatLonAlt(lat_deg=49.991, lon_deg=36.221, alt_m=153.4 + seq), + orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), + velocity_world_mps=(1.5, -0.25, 0.0), + covariance_6x6=np.eye(6, dtype=np.float64) * 0.5, + source_label=PoseSourceLabel.SATELLITE_ANCHORED, + last_satellite_anchor_age_ms=250, + smoothed=False, + emitted_at=1_700_000_000_000_000_000 + seq, + ) + + +# ---------------------------------------------------------------------- +# AC-1: Single composition root — `compose_replay` no longer exported + + +def test_ac1_compose_replay_no_longer_exported() -> None: + # Act / Assert + with pytest.raises(ImportError): + from gps_denied_onboard.runtime_root import compose_replay # noqa: F401 + + # The two surviving entrypoints stay importable. + from gps_denied_onboard.runtime_root import ( # noqa: F401 + compose_operator, + compose_root, + ) + + +# ---------------------------------------------------------------------- +# AC-2: Live mode unchanged + + +def test_ac2_live_default_mode_returns_runtime_root_with_no_replay_keys( + _airborne_live_env: None, +) -> None: + # Arrange — empty config in default (live) mode + config = Config() + + # Act + runtime = compose_root(config) + + # Assert + assert isinstance(runtime, RuntimeRoot) + assert runtime.binary == "airborne" + # No replay-only keys leak into live mode + for key in REPLAY_COMPONENT_KEYS: + assert key not in runtime.components, ( + f"live mode unexpectedly contains replay key {key!r}" + ) + + +def test_ac2_live_explicit_mode_unchanged(_airborne_live_env: None) -> None: + # Arrange + config = Config(mode="live") + + # Act + runtime = compose_root(config) + + # Assert + assert runtime.components == {} + assert runtime.construction_order == () + + +# ---------------------------------------------------------------------- +# AC-3: Replay mode wires replay strategies + + +def test_ac3_replay_mode_wires_five_replay_strategies( + _airborne_replay_env: Path, +) -> None: + # Arrange + bundle = _make_replay_bundle(clock_kind="tlog") + config = _make_replay_config(calib_path=_airborne_replay_env) + factory = _fake_replay_components_factory(bundle=bundle) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + + # Assert — every replay strategy slot is populated and typed + assert isinstance(runtime.components["frame_source"], VideoFileFrameSource) + assert isinstance(runtime.components["fc_adapter"], TlogReplayFcAdapter) + assert isinstance(runtime.components["mavlink_transport"], NoopMavlinkTransport) + assert isinstance(runtime.components["clock"], TlogDerivedClock) + # JsonlReplaySink is a MagicMock(spec=...) here so isinstance gates correctly: + assert "replay_sink" in runtime.components + + +# ---------------------------------------------------------------------- +# AC-4: Replay-mode build-flag check + + +@pytest.mark.parametrize("flag", REPLAY_BUILD_FLAGS) +def test_ac4_replay_rejects_each_build_flag_off( + _airborne_replay_env: Path, + monkeypatch: pytest.MonkeyPatch, + flag: str, +) -> None: + # Arrange + monkeypatch.setenv(flag, "OFF") + config = _make_replay_config(calib_path=_airborne_replay_env) + + # Act / Assert — go through the real branch (no factory) so the + # flag gate runs before the strategy constructors do. + with pytest.raises(CompositionError, match=f"{flag} is OFF"): + compose_root(config) + + +def test_ac4_live_with_replay_flag_off_succeeds( + _airborne_live_env: None, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF") + config = Config(mode="live") + + # Act + runtime = compose_root(config) + + # Assert + assert isinstance(runtime, RuntimeRoot) + + +# ---------------------------------------------------------------------- +# AC-5: Clock injection (single instance, pace-aware) + + +def test_ac5_replay_pace_asap_uses_tlog_derived_clock( + _airborne_replay_env: Path, +) -> None: + # Arrange + bundle = _make_replay_bundle(clock_kind="tlog") + config = _make_replay_config(pace="asap", calib_path=_airborne_replay_env) + factory = _fake_replay_components_factory(bundle=bundle) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + + # Assert + assert isinstance(runtime.components["clock"], TlogDerivedClock) + + +def test_ac5_replay_pace_realtime_uses_wall_clock( + _airborne_replay_env: Path, +) -> None: + # Arrange + bundle = _make_replay_bundle(clock_kind="wall") + config = _make_replay_config(pace="realtime", calib_path=_airborne_replay_env) + factory = _fake_replay_components_factory(bundle=bundle) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + + # Assert + assert isinstance(runtime.components["clock"], WallClock) + + +def test_ac5_clock_single_instance_id_equality( + _airborne_replay_env: Path, +) -> None: + """Invariant 2 — the same Clock instance is wired everywhere.""" + # Arrange + bundle = _make_replay_bundle(clock_kind="tlog") + config = _make_replay_config(calib_path=_airborne_replay_env) + factory = _fake_replay_components_factory(bundle=bundle) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + + # Assert — the Clock instance the bundle returned is exactly the + # one wired into the runtime. + assert runtime.components["clock"] is bundle.clock + + +# ---------------------------------------------------------------------- +# AC-6: JSONL sink emits per tick + + +def test_ac6_jsonl_sink_emits_per_tick_when_runtime_drives_outputs( + _airborne_replay_env: Path, +) -> None: + # Arrange — a real (in-tmp) JsonlReplaySink so this exercises the + # production code path; we drive it directly because the runtime + # loop itself is owned by the airborne entrypoint, not compose_root. + fdr_client = mock.MagicMock(name="FdrClient") + sink_path = _airborne_replay_env.parent / "out.jsonl" + sink = JsonlReplaySink(output_path=sink_path, fdr_client=fdr_client) + bundle = _make_replay_bundle() + config = _make_replay_config( + output_path=str(sink_path), calib_path=_airborne_replay_env + ) + factory = _fake_replay_components_factory(bundle=bundle, sink=sink) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + wired_sink = runtime.components["replay_sink"] + assert wired_sink is sink + for i in range(10): + wired_sink.emit(_make_estimator_output(seq=i)) + wired_sink.close() + + # Assert + lines = sink_path.read_text().splitlines() + assert len(lines) == 10 + for line in lines: + json.loads(line) # each line parses as JSON + + +# ---------------------------------------------------------------------- +# AC-7: No mode-aware imports in components (replay-aware logic confined) + + +def test_ac7_no_component_imports_video_file_frame_source() -> None: + """The only file allowed to import both Live and VideoFile sources is + the runtime_root composition root. + """ + # Arrange + components_root = ( + _REPO_ROOT / "src" / "gps_denied_onboard" / "components" + ) + bad: list[str] = [] + + # Act + for py in components_root.rglob("*.py"): + text = py.read_text(encoding="utf-8") + tree = ast.parse(text) + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom): + module = node.module or "" + names = {n.name for n in node.names} + if ( + "frame_source.video_file" in module + or "VideoFileFrameSource" in names + ): + bad.append(str(py)) + break + + # Assert + assert bad == [], ( + "Components must not import VideoFileFrameSource directly " + f"(replay-aware imports must live in runtime_root): {bad}" + ) + + +def test_ac7_only_runtime_root_imports_replay_strategies() -> None: + """The imports of the noop transport / replay sink stay in runtime_root.""" + # Arrange + src_root = _REPO_ROOT / "src" / "gps_denied_onboard" + components_root = src_root / "components" + allowed_dirs = { + src_root / "runtime_root", + # The replay strategies themselves live under c8_fc_adapter, so + # their internal imports inside that component are exempt. + src_root / "components" / "c8_fc_adapter", + } + + # Act / Assert — walk every component file and reject imports of + # the noop transport from outside the allowed directories. + for py in components_root.rglob("*.py"): + if any(allowed in py.parents for allowed in allowed_dirs): + continue + text = py.read_text(encoding="utf-8") + if "noop_mavlink_transport" in text: + raise AssertionError( + f"{py} imports noop_mavlink_transport — mode-aware " + "imports must stay in runtime_root." + ) + + +# ---------------------------------------------------------------------- +# AC-8: Public APIs only across components + + +def test_ac8_replay_branch_imports_only_public_apis() -> None: + """The replay branch must not reach into component internals.""" + # Arrange + branch_path = ( + _REPO_ROOT + / "src" + / "gps_denied_onboard" + / "runtime_root" + / "_replay_branch.py" + ) + text = branch_path.read_text(encoding="utf-8") + tree = ast.parse(text) + + # Allowed deep imports: into the c8_fc_adapter component (the + # noop transport + the JSONL sink) and into the `replay_input` + # cross-cutting coordinator (Layer-4). Both are documented in + # module-layout.md as the replay strategy homes. + allowed_deep_prefixes = ( + "gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport", + "gps_denied_onboard.components.c8_fc_adapter.replay_sink", + "gps_denied_onboard.replay_input.tlog_video_adapter", + ) + + # Act + for node in ast.walk(tree): + if not isinstance(node, ast.ImportFrom): + continue + module = node.module or "" + if not module.startswith("gps_denied_onboard.components"): + continue + # Public API form: `gps_denied_onboard.components.` (no further dots) + # OR an explicitly allowed deep submodule. + is_public = module.count(".") == 2 + is_allowed_deep = any( + module.startswith(prefix) for prefix in allowed_deep_prefixes + ) + # Assert + assert is_public or is_allowed_deep, ( + f"_replay_branch imports {module!r} — must only reach into " + "component Public APIs or the documented replay strategy modules." + ) + + +# ---------------------------------------------------------------------- +# AC-9: NoopMavlinkTransport.bytes_written() > 0 — BLOCKED + + +@pytest.mark.skip( + reason=( + "BLOCKED on AZ-399 design choice: TlogReplayFcAdapter raises " + "FcEmitError on emit_external_position rather than routing the " + "encoder bytes through the MavlinkTransport seam. Closing this " + "gap requires retrofitting AP/iNav/QGC encoder code paths to " + "consume MavlinkTransport — see batch 61 report. NoopMavlinkTransport " + "+ MavlinkTransport Protocol classes are present (covered by " + "test_az400_mavlink_transport.py) but the wiring that makes " + "bytes_written > 0 in replay mode is deferred." + ) +) +def test_ac9_noop_transport_bytes_written_after_runtime_drive() -> None: + raise NotImplementedError("see skip reason") + + +# ---------------------------------------------------------------------- +# AC-10: Operator pre-flight C6 cache reused identically — smoke + + +def test_ac10_replay_does_not_alter_c6_cache_shape( + _airborne_replay_env: Path, +) -> None: + """Smoke check that the replay branch does not register a parallel + C6 strategy under a different slug. + + A real AC-10 end-to-end test requires a populated C6 + C2 wiring, + which is out of scope for AZ-401's unit suite. This check at least + asserts the replay branch never claims the ``c6_tile_cache`` slug. + """ + # Arrange + bundle = _make_replay_bundle() + config = _make_replay_config(calib_path=_airborne_replay_env) + factory = _fake_replay_components_factory(bundle=bundle) + + # Act + runtime = compose_root(config, replay_components_factory=factory) + + # Assert + assert "c6_tile_cache" not in runtime.components + + +# ---------------------------------------------------------------------- +# Real `build_replay_components` path — the production wiring must +# refuse early on missing replay paths instead of crashing inside the +# adapter constructor. + + +def test_replay_branch_rejects_empty_video_path( + _airborne_replay_env: Path, +) -> None: + # Arrange + runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env)) + config = Config( + runtime=runtime_cfg, + replay=ReplayConfig( + video_path="", + tlog_path="/dev/null/fake.tlog", + output_path="/tmp/out.jsonl", + pace="asap", + target_fc_dialect="ardupilot_plane", + ), + mode="replay", + ) + + # Act / Assert + with pytest.raises(CompositionError, match="video_path is empty"): + build_replay_components(config) + + +def test_replay_branch_rejects_empty_tlog_path( + _airborne_replay_env: Path, +) -> None: + # Arrange + runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env)) + config = Config( + runtime=runtime_cfg, + replay=ReplayConfig( + video_path="/dev/null/fake.mp4", + tlog_path="", + output_path="/tmp/out.jsonl", + pace="asap", + target_fc_dialect="ardupilot_plane", + ), + mode="replay", + ) + + # Act / Assert + with pytest.raises(CompositionError, match="tlog_path is empty"): + build_replay_components(config) + + +def test_replay_branch_rejects_unknown_pace_after_init( + _airborne_replay_env: Path, +) -> None: + """ReplayConfig validates pace at construction; the branch's defensive + guard catches an unsanctioned mutation path. + """ + # Arrange — bypass __post_init__ to inject an invalid value, then + # call ``build_replay_components`` to confirm the inner guard fires. + config = _make_replay_config(calib_path=_airborne_replay_env) + object.__setattr__(config.replay, "pace", "telegraph") # type: ignore[misc] + + # Act / Assert + with pytest.raises(CompositionError, match="(pace|telegraph|asap)"): + build_replay_components(config) + + +def test_replay_branch_loads_camera_calibration_from_runtime_path( + _airborne_replay_env: Path, +) -> None: + """The branch reads the SAME calibration JSON the live binary uses.""" + # Arrange + config = _make_replay_config(calib_path=_airborne_replay_env) + + # Act — run far enough to populate the bundle without hitting the + # real video / tlog readers. We do that by injecting a stub + # ``replay_input_adapter_factory`` that returns a fake adapter + # whose ``open()`` produces a trivial bundle. + bundle = _make_replay_bundle() + + class _StubAdapter: + def __init__(self, **_kwargs: Any) -> None: + pass + + def open(self) -> ReplayInputBundle: + return bundle + + components, order = build_replay_components( + config, + replay_input_adapter_factory=lambda **_kwargs: _StubAdapter(), + sink_factory=lambda *_args: mock.MagicMock(spec=JsonlReplaySink), + ) + + # Assert + assert order == REPLAY_COMPONENT_KEYS + assert components["frame_source"] is bundle.frame_source + assert components["fc_adapter"] is bundle.fc_adapter + + +# ---------------------------------------------------------------------- +# Smoke + + +def test_compose_root_replay_with_no_calib_path_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — set every env var EXCEPT camera calibration + for name, value in ( + ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), + ("GPS_DENIED_TIER", "1"), + ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), + ("CAMERA_CALIBRATION_PATH", ""), + ("LOG_LEVEL", "INFO"), + ("LOG_SINK", "console"), + ("INFERENCE_BACKEND", "pytorch_fp16"), + ("FDR_PATH", "/var/lib/gps-denied/fdr"), + ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), + ): + monkeypatch.setenv(name, value) + for flag in REPLAY_BUILD_FLAGS: + monkeypatch.setenv(flag, "ON") + config = _make_replay_config() # calib_path=None + + # Act / Assert — the env-required check + replay calib check both + # surface as RequiredFieldMissing or CompositionError; either is + # acceptable provided the message names the missing field. + with pytest.raises( + (CompositionError, Exception), + match=r"(camera_calibration_path|CAMERA_CALIBRATION_PATH)", + ): + compose_root(config)