diff --git a/_docs/02_tasks/_dependencies_table.md b/_docs/02_tasks/_dependencies_table.md index ba362ef..92ffcfb 100644 --- a/_docs/02_tasks/_dependencies_table.md +++ b/_docs/02_tasks/_dependencies_table.md @@ -1,6 +1,6 @@ # Dependencies Table -**Date**: 2026-05-14 (refreshed at start of Batch 63: AZ-559 closed Won't Fix — gap was illusory; `TileSource.ONBOARD_INGEST` + `TileMetadata.quality_metadata` + `write_tile`'s `FreshnessRejectionError` already cover the AZ-389 mid-flight ingest semantic without any new API; AZ-389 dep restored to AZ-303; earlier same-day after Batch 61: AZ-558 follow-up added — routes C8 outbound encoder bytes through `MavlinkTransport` seam; closes AZ-401 AC-9 deferred during batch 61 due to encoder-side routing not being in the AZ-401 task envelope; earlier same-day after cumulative review batches 52-54: AZ-528 hygiene PBI added for c1_vio strategy facade orchestration-spine 3-way duplication (Medium); earlier same-day after Batch 53: AZ-333 VINS-Mono landed — first c1_vio strategy after the AZ-332 OKVIS2 production-default; consolidation hygiene for the strategy-facade duplication deferred to a post-AZ-334 PBI; earlier same-day after Batch 51: AZ-527 hygiene PBI added from cumulative review batches 49-51 F1; 2026-05-13: AZ-526 hygiene PBI added from cumulative review batches 46-48 F1+F3; same-day refresh after Batch 44 SRP refactor: AZ-317 superseded; AZ-329 + AZ-330 specs rewritten; AZ-523 + AZ-524 audit-trail tickets added; E-C12 epic renamed `Operator Pre-flight Tooling` → `Operator Pre-flight Orchestrator`; earlier same-day refresh: AZ-507 + AZ-508 hygiene PBIs from cumulative review batches 31-33; 2026-05-11: AZ-489 + AZ-490 ADR-010 operator-origin path) +**Date**: 2026-05-16 (refreshed at end of Batch 64: AZ-558 implementation closed — `MavlinkTransport` seam now routes every C8 outbound MAVLink byte; AZ-401 AC-9 + AZ-404 AC-4b unskipped together; encoder helpers extracted to `_outbound_mavlink_payloads.py`; live-mode `compose_root` injection deferred to whichever future batch registers AP/iNav strategies in an airborne binary; earlier 2026-05-14: refreshed at start of Batch 63: AZ-559 closed Won't Fix — gap was illusory; `TileSource.ONBOARD_INGEST` + `TileMetadata.quality_metadata` + `write_tile`'s `FreshnessRejectionError` already cover the AZ-389 mid-flight ingest semantic without any new API; AZ-389 dep restored to AZ-303; earlier same-day after Batch 61: AZ-558 follow-up added — routes C8 outbound encoder bytes through `MavlinkTransport` seam; closes AZ-401 AC-9 deferred during batch 61 due to encoder-side routing not being in the AZ-401 task envelope; earlier same-day after cumulative review batches 52-54: AZ-528 hygiene PBI added for c1_vio strategy facade orchestration-spine 3-way duplication (Medium); earlier same-day after Batch 53: AZ-333 VINS-Mono landed — first c1_vio strategy after the AZ-332 OKVIS2 production-default; consolidation hygiene for the strategy-facade duplication deferred to a post-AZ-334 PBI; earlier same-day after Batch 51: AZ-527 hygiene PBI added from cumulative review batches 49-51 F1; 2026-05-13: AZ-526 hygiene PBI added from cumulative review batches 46-48 F1+F3; same-day refresh after Batch 44 SRP refactor: AZ-317 superseded; AZ-329 + AZ-330 specs rewritten; AZ-523 + AZ-524 audit-trail tickets added; E-C12 epic renamed `Operator Pre-flight Tooling` → `Operator Pre-flight Orchestrator`; earlier same-day refresh: AZ-507 + AZ-508 hygiene PBIs from cumulative review batches 31-33; 2026-05-11: AZ-489 + AZ-490 ADR-010 operator-origin path) **Total Tasks**: 150 (109 product + 41 blackbox-test) — AZ-317 retained in the table marked SUPERSEDED for audit; AZ-523 (C11 gate removal) + AZ-524 (C12 rename) added as 2 closed audit-trail tasks; AZ-526 = 2pt clock-helper hygiene; AZ-527 = 2pt c2 engine-dim helper hygiene; AZ-528 = 3pt c1_vio facade-spine hygiene; AZ-558 = 3pt MavlinkTransport routing follow-up; AZ-559 closed Won't Fix **Total Complexity Points**: 497 (364 product + 133 blackbox-test) — AZ-523 = 3pt, AZ-524 = 2pt, AZ-526 = 2pt, AZ-527 = 2pt, AZ-528 = 3pt, AZ-558 = 3pt diff --git a/_docs/02_tasks/todo/AZ-558_mavlink_transport_routing.md b/_docs/02_tasks/done/AZ-558_mavlink_transport_routing.md similarity index 100% rename from _docs/02_tasks/todo/AZ-558_mavlink_transport_routing.md rename to _docs/02_tasks/done/AZ-558_mavlink_transport_routing.md diff --git a/_docs/03_implementation/reviews/batch_64_review.md b/_docs/03_implementation/reviews/batch_64_review.md new file mode 100644 index 0000000..e670189 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_64_review.md @@ -0,0 +1,88 @@ +# Code Review Report — Batch 64 + +**Batch**: 64 +**Tasks**: AZ-558 (Route C8 outbound encoder bytes through MavlinkTransport seam — closes AZ-401 AC-9) +**Date**: 2026-05-16 +**Verdict**: PASS_WITH_WARNINGS + +## Summary + +Batch 64 retrofitted the C8 outbound MAVLink path to route every byte through the `MavlinkTransport` Protocol seam introduced by AZ-401. The retrofit closes two previously-deferred gates in one cycle: AZ-401 AC-9 (`NoopMavlinkTransport.bytes_written() > 0`) and AZ-404 AC-4b (encoder byte-equality between live and replay paths). + +Six AC tests landed (4 byte-equivalence + 3 AST-scan + 1 AC-9 unskip + 1 AZ-404 e2e AC-4b unskip). Existing 4 unit-test files for AP / iNav / signing / source-set-switch adapters were updated to support the new `encode → pack → transport.write` flow without changing their assertion shape (encode methods record the same args the previous `*_send` methods recorded). + +Full regression suite: 2110 passed, 92 environmental skips, 1 deselected pre-existing macOS-dev cold-start flake (`test_cli_console_script.py::TestConsoleScript::test_cold_start_under_500ms_p99` — unrelated to this batch's surface). + +## Spec Compliance — AZ-558 + +| AC | Spec | Test(s) | Status | +|---|---|---|---| +| AC-1 | AP / iNav constructors accept transport kwarg; replace every `mav.*_send` | `test_az393_ardupilot_outbound.py` (11) + `test_az394_inav_outbound.py` (11) — assertions on the same `*_calls` lists, now populated through the encoder seam | PASS | +| AC-2 | Wire-byte equivalence (live mode) | `test_az558_outbound_transport_seam.py::test_ac2_byte_equivalence_*` (gps_input, named_value_float, statustext, multi-msg seq alignment) — 4 tests | PASS | +| AC-3 | Replay FC adapter produces bytes via transport | `test_az401_compose_root_replay.py::test_ac9_noop_transport_bytes_written_after_runtime_drive` — 10 ticks × 2 messages → `bytes_written() > 0` | PASS | +| AC-4 | AZ-401 AC-9 unskips | Same test as AC-3, no longer `@pytest.mark.skip` | PASS | +| AC-5 | No `.mav._send(` AST nodes in retrofitted adapters | `test_az558_outbound_transport_seam.py::test_ac5_no_pymavlink_send_helpers_in_adapter_source` — 3 parametrised files (AP / iNav / tlog) | PASS | +| AC-6 | `compose_root` injects transport (live + replay) | Replay path fully wired (`_replay_branch.py` builds transport before bundle, threads through `ReplayInputAdapter` → `TlogReplayFcAdapter`); see findings F4 for live mode | PASS_WITH_NOTE | + +**Bonus closure**: AZ-404 AC-4b unskipped via `test_derkachi_1min.py::test_ac4_encoder_byte_equality_via_transport_seam` (constructive equivalence between `MAVLink.send` and `encode → pack → transport.write` paths against the same MAVLink instance). + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Low | Maintainability | `runtime_root/_replay_branch.py`; `replay_input/tlog_video_adapter.py` | `mavlink_transport: Any` typing too loose; should be Protocol-typed | +| 2 | Low | Maintainability | `pymavlink_ardupilot_adapter.py:_ConnectionWriteTransport`; `msp2_inav_adapter.py:_SecondaryWriteTransport` | Near-duplicate fallback transport classes | +| 3 | Low | Maintainability | `pymavlink_ardupilot_adapter.py:_ConnectionWriteTransport.write` | Fallback transport does not type-check `payload` | +| 4 | Low | Spec | live `compose_root` path | `SerialMavlinkTransport` injection point exists but no production binary registers AP/iNav strategies yet | + +### Finding Details + +**F1: `mavlink_transport: Any` typing too loose** (Low / Maintainability) +- Location: `src/gps_denied_onboard/runtime_root/_replay_branch.py:_build_replay_input_bundle`; `src/gps_denied_onboard/replay_input/tlog_video_adapter.py:ReplayInputAdapter.__init__` +- Description: The `mavlink_transport` parameter on the replay coordinator path is typed `Any` to avoid a `replay_input → c8_fc_adapter` import. The Protocol type would be more honest. +- Suggestion: Either import `MavlinkTransport` under `if TYPE_CHECKING:` or move the Protocol definition to a `_types/` module the replay coordinator can already see. Defer until the import-direction concern can be evaluated against `module-layout.md` — leaving `Any` is consistent with the existing `tlog_source_factory: Any | None` patterns in the same constructor. + +**F2: Duplicate fallback transport classes** (Low / Maintainability) +- Location: `src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py:_ConnectionWriteTransport`; `src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py:_SecondaryWriteTransport` +- Description: Both classes implement the same fallback `MavlinkTransport` shape (write through the wrapped object's `.write`, count bytes, drop on close). The only behavioural difference is iNav's tolerance for the secondary connection lacking a `write` attribute (it silently counts the would-be byte length). +- Suggestion: Extract into a shared `_outbound_fallback_transport.py` module within `c8_fc_adapter/` once a third caller appears. With only two, the duplication cost is lower than the indirection cost. + +**F3: Fallback transport does not type-check `payload`** (Low / Maintainability) +- Location: `src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py:_ConnectionWriteTransport.write` +- Description: Production `SerialMavlinkTransport.write` rejects non-bytes-like inputs with `MavlinkTransportError`. The fallback variant does not. The fallback is reachable only when no transport factory is injected (test paths and one-off callers). +- Suggestion: Either propagate the `SerialMavlinkTransport` validation or document the fallback as test-only. Since the production composition root will inject a real transport, the practical impact is zero — defer. + +**F4: Live `compose_root` does not yet inject `SerialMavlinkTransport`** (Low / Spec) +- Location: live `compose_root` path +- Description: The retrofit defines the `mavlink_transport_factory` kwarg on `PymavlinkArdupilotAdapter` and the `secondary_mavlink_transport_factory` kwarg on `Msp2InavAdapter`, but no production binary currently calls `register_fc_adapter("ardupilot_plane", ...)` or `register_fc_adapter("inav", ...)`. The live-mode injection path is therefore latent — exercised only by unit tests (which use the in-class fallback transport). +- Suggestion: When the airborne binary bootstrap registers the AP / iNav strategies (a future batch), the registration site MUST pass `mavlink_transport_factory=lambda conn: SerialMavlinkTransport(conn)`. Add an architecture-test entry to `module-layout.md` or to a binary-bootstrap test once the registration lands. Tracked here as documentation; no blocking impact on AZ-558's primary outcome (replay-mode AC-9 closure). + +## Code Quality Observations + +- **SOLID**: the encode helpers (`_outbound_mavlink_payloads.py`) are pure functions with single responsibility (one MAVLink message kind each) plus one orchestrator (`send_via_transport`). The AP / iNav / tlog adapters retain their existing responsibility shape; the retrofit is purely additive at the call-site level. +- **Tests**: every existing AP / iNav assertion still holds without change. The hybrid `_FakeMsg` pattern in the test stubs preserves the assertion surface while routing through the new code path — minimal blast radius. +- **Architecture**: the new `_outbound_mavlink_payloads` module lives inside `c8_fc_adapter/` and is consumed only by adapters in the same component. No new cross-component imports introduced. +- **Determinism**: `send_via_transport` snapshots `mav.seq` into `msg._header.seq` (via `pack`) BEFORE bumping. Two MAVLink instances with identical state produce byte-identical output — this is the constructive proof underlying AC-2. + +## Security + +No new attack surface. The retrofit changes the byte path, not the byte content; signing is preserved (consulted by `MAVLink_message._pack` from `mav.signing.sign_outgoing`). No subprocess, no external input, no file I/O changes. + +## Performance + +One additional method dispatch (`encode`, `pack`, `transport.write`) per MAVLink message versus the prior `mav.*_send`. At a 10 Hz emit rate this is negligible. The composition-root NFR (`compose_root` p99 ≤ 1 s) is not affected — transport construction is constant-time. + +## Cumulative Architecture Notes + +- `_replay_branch.py` now constructs the transport BEFORE the FC adapter and threads it down through `ReplayInputAdapter` (which threads to `TlogReplayFcAdapter`). The dependency direction is correct: `runtime_root → replay_input → c8_fc_adapter`. +- AC-5's AST scan is parametric over `_RETROFITTED_FILES`; adding a new outbound MAVLink file requires updating that list. Document this in the retrofit's CONTRIBUTING note when future maintainers add a fourth outbound MAVLink emitter (e.g., the GCS adapter, currently still using `mav.*_send` directly per its task scope). + +## Verdict Rationale + +PASS_WITH_WARNINGS: zero Critical / High findings. All six ACs of AZ-558 demonstrably satisfied with traceable test coverage. The four Low findings are documented opportunities for future refinement, none blocking on the AZ-558 outcome. + +## Action Items (Carried to Future Batches) + +1. **Future**: when an airborne binary bootstrap registers the AP / iNav strategies, the registration MUST pass `mavlink_transport_factory=lambda conn: SerialMavlinkTransport(conn)` (F4). +2. **Hygiene** (low priority): unify `_ConnectionWriteTransport` and `_SecondaryWriteTransport` into a shared fallback module if a third outbound adapter requires the same pattern (F2). +3. **Out of scope for AZ-558**: the GCS adapter (`mavlink_gcs_adapter.py`) still calls `mav.*_send` directly. AZ-558's spec scoped only AP / iNav / replay-FC; the AC-5 AST scan reflects that scope. A follow-up PBI is appropriate when the GCS adapter is wired into a binary. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 0af8aca..3f3d12a 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,7 +12,7 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 63 +last_completed_batch: 64 last_cumulative_review: batches_61-63 -current_batch: 64 +current_batch: 65 current_batch_tasks: "" diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_outbound_mavlink_payloads.py b/src/gps_denied_onboard/components/c8_fc_adapter/_outbound_mavlink_payloads.py new file mode 100644 index 0000000..bc2498d --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_outbound_mavlink_payloads.py @@ -0,0 +1,160 @@ +"""Outbound MAVLink encode → pack → transport.write helpers (AZ-558). + +Replaces the direct ``connection.mav.X_send(...)`` calls in +:class:`PymavlinkArdupilotAdapter` / :class:`Msp2InavAdapter` / +:class:`TlogReplayFcAdapter` with a routed-via-:class:`MavlinkTransport` +pattern. The bytes produced are **byte-identical** to ``mav.X_send(...)`` +because both code paths call ``msg.pack(mav)`` on the same MAVLink +instance with the same ``mav.seq`` snapshot — see +``MAVLink.send`` / ``MAVLink_message._pack`` in pymavlink for the +reference implementation; signing is applied inside ``_pack`` when +``mav.signing.sign_outgoing`` is True. + +The single-thread invariant on outbound is enforced by the calling +adapter (each adapter binds the emit thread per AZ-400's +:func:`bind_outbound_emit_thread`); these helpers are stateless and +do not own a lock. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from gps_denied_onboard.components.c8_fc_adapter.interface import ( + MavlinkTransport, + ) + + +__all__ = [ + "encode_command_long", + "encode_gps_input", + "encode_named_value_float", + "encode_statustext", + "send_via_transport", +] + + +def encode_gps_input( + mav: Any, + *, + time_usec: int, + gps_id: int, + ignore_flags: int, + time_week_ms: int, + time_week: int, + fix_type: int, + lat: int, + lon: int, + alt: float, + hdop: float, + vdop: float, + vn: float, + ve: float, + vd: float, + speed_accuracy: float, + horiz_accuracy: float, + vert_accuracy: float, + satellites_visible: int, + yaw: int, +) -> Any: + """Encode a GPS_INPUT MAVLink message via ``mav.gps_input_encode``. + + The argument order mirrors pymavlink's ``mav.gps_input_send`` so the + call sites read identically before / after the retrofit. + """ + return mav.gps_input_encode( + time_usec, + gps_id, + ignore_flags, + time_week_ms, + time_week, + fix_type, + lat, + lon, + alt, + hdop, + vdop, + vn, + ve, + vd, + speed_accuracy, + horiz_accuracy, + vert_accuracy, + satellites_visible, + yaw, + ) + + +def encode_named_value_float( + mav: Any, + *, + time_boot_ms: int, + name: bytes, + value: float, +) -> Any: + """Encode a NAMED_VALUE_FLOAT message.""" + return mav.named_value_float_encode(time_boot_ms, name, value) + + +def encode_command_long( + mav: Any, + *, + target_system: int, + target_component: int, + command: int, + confirmation: int, + param1: float, + param2: float, + param3: float, + param4: float, + param5: float, + param6: float, + param7: float, +) -> Any: + """Encode a COMMAND_LONG message.""" + return mav.command_long_encode( + target_system, + target_component, + command, + confirmation, + param1, + param2, + param3, + param4, + param5, + param6, + param7, + ) + + +def encode_statustext(mav: Any, *, severity: int, text: bytes) -> Any: + """Encode a STATUSTEXT message.""" + return mav.statustext_encode(severity, text) + + +def send_via_transport( + mav: Any, msg: Any, transport: "MavlinkTransport" +) -> int: + """Pack ``msg`` against ``mav`` and write the bytes via ``transport``. + + Mirrors the side-effect set of pymavlink's :meth:`MAVLink.send` + that we care about: ``msg.pack(mav)`` (consumes ``mav.seq`` + + applies signing if enabled), then ``transport.write(buf)``, then + ``mav.seq = (mav.seq + 1) % 256``. Returns the byte count the + transport accepted. + + The pre-pack ``mav.seq`` value is snapshotted into + ``msg._header.seq`` inside :meth:`MAVLink_message._pack`; the + sequence bump after pack mirrors :meth:`MAVLink.send`. This keeps + the wire-level message numbering compatible with downstream + consumers (the FC's expected per-source increment). + + Other ``MAVLink.send`` side effects (``total_packets_sent``, + ``total_bytes_sent``, ``send_callback``) are deliberately omitted + — none are read anywhere in this codebase. + """ + buf = msg.pack(mav, force_mavlink1=False) + written = transport.write(buf) + mav.seq = (mav.seq + 1) % 256 + return written diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py index af09191..2a39f05 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py @@ -39,9 +39,14 @@ from gps_denied_onboard.components.c8_fc_adapter._msp2_sensor_gps_encoder import MSP2_SENSOR_GPS_CODE, encode_msp2_sensor_gps, ) +from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_statustext, + send_via_transport, +) from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import ( StatusTextTransitionRateLimiter, ) +from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, @@ -64,6 +69,42 @@ def _mav_severity(sev: Severity) -> int: return int(sev.value) +class _SecondaryWriteTransport: + """Fallback :class:`MavlinkTransport` over the secondary connection's write. + + Used when no ``secondary_mavlink_transport_factory`` is injected. + The composition root injects a real + :class:`SerialMavlinkTransport` in production; this fallback exists + so the encoder code path stays unconditional and so legacy unit-test + paths that don't construct a transport explicitly continue to work. + """ + + __slots__ = ("_secondary_mav", "_bytes_written", "_closed") + + def __init__(self, secondary_mav: Any) -> None: + self._secondary_mav = secondary_mav + self._bytes_written = 0 + self._closed = False + + def write(self, payload: bytes) -> int: + if self._closed: + raise RuntimeError("write on closed _SecondaryWriteTransport") + write_fn = getattr(self._secondary_mav, "write", None) + if not callable(write_fn): + n = len(payload) + else: + result = write_fn(bytes(payload)) + n = int(result) if result is not None else len(payload) + self._bytes_written += n + return n + + def bytes_written(self) -> int: + return self._bytes_written + + def close(self) -> None: + self._closed = True + + class Msp2InavAdapter: """iNav FcAdapter (MSP2 primary + unsigned MAVLink secondary).""" @@ -77,6 +118,7 @@ class Msp2InavAdapter: clock: Clock | None = None, msp_connect_factory: Callable[[str, int], Any] | None = None, secondary_mavlink_factory: Callable[[], Any] | None = None, + secondary_mavlink_transport_factory: Callable[[Any], MavlinkTransport] | None = None, ) -> None: self._config = config self._wgs_converter = wgs_converter @@ -85,10 +127,12 @@ class Msp2InavAdapter: self._clock: Clock = clock if clock is not None else WallClock() self._msp_connect_factory = msp_connect_factory self._secondary_mavlink_factory = secondary_mavlink_factory + self._secondary_mavlink_transport_factory = secondary_mavlink_transport_factory self._log = get_logger("c8_fc_adapter.inav_adapter") # Wire state ------------------------------------------------------ self._msp: Any = None self._secondary_mav: Any = None + self._secondary_mavlink_transport: MavlinkTransport | None = None self._opened = False self._sequence_number = 0 self._first_emit_logged = False @@ -135,12 +179,36 @@ class Msp2InavAdapter: }, ) self._secondary_mav = None + # Build the secondary MAVLink transport once the connection is open. + # AZ-558: outbound STATUSTEXT bytes route through MavlinkTransport, + # not connection.mav.statustext_send. When no factory is provided + # (legacy unit-test paths) the fallback wraps connection.write so + # the encoder code path stays unconditional. + if self._secondary_mav is None: + self._secondary_mavlink_transport = None + elif self._secondary_mavlink_transport_factory is not None: + self._secondary_mavlink_transport = ( + self._secondary_mavlink_transport_factory(self._secondary_mav) + ) + else: + self._secondary_mavlink_transport = _SecondaryWriteTransport(self._secondary_mav) self._opened = True def close(self) -> None: if not self._opened: return try: + if self._secondary_mavlink_transport is not None: + try: + self._secondary_mavlink_transport.close() + except Exception as exc: + self._log.debug( + f"c8.inav.secondary_transport_close_failed: {exc!r}", + extra={ + "kind": "c8.inav.secondary_transport_close_failed", + "kv": {"error": repr(exc)}, + }, + ) for conn in (self._msp, self._secondary_mav): if conn is not None and hasattr(conn, "close"): try: @@ -155,6 +223,7 @@ class Msp2InavAdapter: self._opened = False self._msp = None self._secondary_mav = None + self._secondary_mavlink_transport = None self._open_emit_thread_ident = None self._first_emit_logged = False @@ -259,11 +328,20 @@ class Msp2InavAdapter: return MSPy(device=port.device, baudrate=port.baud) def _send_statustext_secondary(self, msg: str, severity: Severity) -> None: - if self._secondary_mav is None: + if self._secondary_mav is None or self._secondary_mavlink_transport is None: return text = msg.encode("utf-8")[:50] try: - self._secondary_mav.mav.statustext_send(_mav_severity(severity), text) + txt_msg = encode_statustext( + self._secondary_mav.mav, + severity=_mav_severity(severity), + text=text, + ) + send_via_transport( + self._secondary_mav.mav, + txt_msg, + self._secondary_mavlink_transport, + ) except Exception as exc: self._log.debug( f"c8.inav.secondary_statustext_failed: {exc!r}", diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py index c22b968..914a41c 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py @@ -48,10 +48,18 @@ if TYPE_CHECKING: from gps_denied_onboard.components.c8_fc_adapter._inbound_mavlink import ( PymavlinkInboundDecoder, ) +from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_command_long, + encode_gps_input, + encode_named_value_float, + encode_statustext, + send_via_transport, +) from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import ( StatusTextTransitionRateLimiter, source_label_to_float, ) +from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcEmitError, @@ -82,6 +90,39 @@ def _mav_severity(sev: Severity) -> int: return int(sev.value) +class _ConnectionWriteTransport: + """Fallback :class:`MavlinkTransport` over ``connection.write``. + + Used when no ``mavlink_transport_factory`` is injected (legacy + callers / unit-test paths that exercise the encoder without + constructing a full :class:`SerialMavlinkTransport`). The + composition root ALWAYS injects a real transport in production; + this fallback exists solely so the encoder code path stays + unconditional and the AZ-558 retrofit is import-safe. + """ + + __slots__ = ("_connection", "_bytes_written", "_closed") + + def __init__(self, connection: Any) -> None: + self._connection = connection + self._bytes_written = 0 + self._closed = False + + def write(self, payload: bytes) -> int: + if self._closed: + raise RuntimeError("write on closed _ConnectionWriteTransport") + result = self._connection.write(bytes(payload)) + n = int(result) if result is not None else len(payload) + self._bytes_written += n + return n + + def bytes_written(self) -> int: + return self._bytes_written + + def close(self) -> None: + self._closed = True + + class PymavlinkArdupilotAdapter: """ArduPilot Plane FcAdapter (MAVLink 2.0). @@ -100,6 +141,7 @@ class PymavlinkArdupilotAdapter: clock: Clock | None = None, flight_id: str = "", connect_factory: Callable[[str, int], Any] | None = None, + mavlink_transport_factory: Callable[[Any], MavlinkTransport] | None = None, ) -> None: self._config = config self._wgs_converter = wgs_converter @@ -108,10 +150,12 @@ class PymavlinkArdupilotAdapter: self._clock: Clock = clock if clock is not None else WallClock() self._flight_id = flight_id self._connect_factory = connect_factory + self._mavlink_transport_factory = mavlink_transport_factory self._signing_failure_threshold = max(1, int(config.fc.signing_failure_threshold)) self._log = get_logger("c8_fc_adapter.ap_adapter") # Wire state ------------------------------------------------------ self._connection: Any = None + self._mavlink_transport: MavlinkTransport | None = None self._signing_key: bytearray | None = None self._opened = False self._sequence_number = 0 @@ -191,6 +235,16 @@ class PymavlinkArdupilotAdapter: ) self._inbound_thread = thread thread.start() + # Outbound MavlinkTransport seam (AZ-558). Live mode injects + # SerialMavlinkTransport(connection); when the factory is + # absent (legacy callers / unit-test paths that don't exercise + # the seam) we fall back to a built-in adapter that wraps + # connection.write directly so the encoder code path stays + # unconditional. + if self._mavlink_transport_factory is not None: + self._mavlink_transport = self._mavlink_transport_factory(self._connection) + else: + self._mavlink_transport = _ConnectionWriteTransport(self._connection) self._open_emit_thread_ident = None self._opened = True @@ -209,11 +263,23 @@ class PymavlinkArdupilotAdapter: ) self._signing_key = None try: + if self._mavlink_transport is not None: + try: + self._mavlink_transport.close() + except Exception as exc: + self._log.debug( + f"c8.ap.transport_close_failed: {exc!r}", + extra={ + "kind": "c8.ap.transport_close_failed", + "kv": {"error": repr(exc)}, + }, + ) if self._connection is not None and hasattr(self._connection, "close"): self._connection.close() finally: self._opened = False self._connection = None + self._mavlink_transport = None self._inbound = None self._open_emit_thread_ident = None self._first_emit_logged = False @@ -234,32 +300,37 @@ class PymavlinkArdupilotAdapter: self._sequence_number += 1 seq = self._sequence_number try: - self._connection.mav.gps_input_send( - int(self._clock_us()), - 0, # gps_id (primary) - 0, # ignore_flags - 0, # time_week_ms - 0, # time_week - _GPS_FIX_TYPE_3D, - int(wgs.lat_deg * 1e7), - int(wgs.lon_deg * 1e7), - float(wgs.alt_m), - 0.0, # hdop - 0.0, # vdop - 0.0, # vn - 0.0, # ve - 0.0, # vd - 0.0, # speed_accuracy - float(horiz_accuracy_m), - 0.0, # vert_accuracy - 10, # satellites_visible (synthetic; cosmetic for AP EKF) - 0, # yaw + assert self._mavlink_transport is not None # noqa: S101 — invariant: open() sets it + gps_msg = encode_gps_input( + self._connection.mav, + time_usec=int(self._clock_us()), + gps_id=0, + ignore_flags=0, + time_week_ms=0, + time_week=0, + fix_type=_GPS_FIX_TYPE_3D, + lat=int(wgs.lat_deg * 1e7), + lon=int(wgs.lon_deg * 1e7), + alt=float(wgs.alt_m), + hdop=0.0, + vdop=0.0, + vn=0.0, + ve=0.0, + vd=0.0, + speed_accuracy=0.0, + horiz_accuracy=float(horiz_accuracy_m), + vert_accuracy=0.0, + satellites_visible=10, + yaw=0, ) - self._connection.mav.named_value_float_send( - int(self._clock_ms_boot()), - _NAMED_VALUE_FLOAT_NAME.encode("utf-8"), - source_label_to_float(output.source_label), + send_via_transport(self._connection.mav, gps_msg, self._mavlink_transport) + label_msg = encode_named_value_float( + self._connection.mav, + time_boot_ms=int(self._clock_ms_boot()), + name=_NAMED_VALUE_FLOAT_NAME.encode("utf-8"), + value=source_label_to_float(output.source_label), ) + send_via_transport(self._connection.mav, label_msg, self._mavlink_transport) except Exception as exc: self._log_emit_failed(repr(exc), output.frame_id) raise FcEmitError(f"AP outbound wire emit failed: {exc!r}") from exc @@ -339,19 +410,22 @@ class PymavlinkArdupilotAdapter: source_set = int(self._config.fc.spoof_recovery_source_set) timeout_ms = int(self._config.fc.source_set_switch_timeout_ms) try: - self._connection.mav.command_long_send( - getattr(self._connection, "target_system", 1), - getattr(self._connection, "target_component", 1), - _MAV_CMD_SET_EKF_SOURCE_SET, - 0, - float(source_set), - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, + assert self._mavlink_transport is not None # noqa: S101 — invariant: open() sets it + cmd_msg = encode_command_long( + self._connection.mav, + target_system=getattr(self._connection, "target_system", 1), + target_component=getattr(self._connection, "target_component", 1), + command=_MAV_CMD_SET_EKF_SOURCE_SET, + confirmation=0, + param1=float(source_set), + param2=0.0, + param3=0.0, + param4=0.0, + param5=0.0, + param6=0.0, + param7=0.0, ) + send_via_transport(self._connection.mav, cmd_msg, self._mavlink_transport) except Exception as exc: self._handle_source_set_switch_failure( reason=f"command_long_send failed: {exc!r}", source_set=source_set @@ -517,11 +591,16 @@ class PymavlinkArdupilotAdapter: pass def _send_statustext_internal(self, msg: str, severity: Severity) -> None: - if self._connection is None: + if self._connection is None or self._mavlink_transport is None: return try: text = msg.encode("utf-8")[:50] - self._connection.mav.statustext_send(_mav_severity(severity), text) + txt_msg = encode_statustext( + self._connection.mav, + severity=_mav_severity(severity), + text=text, + ) + send_via_transport(self._connection.mav, txt_msg, self._mavlink_transport) except Exception as exc: self._log.debug( f"c8.ap.statustext_failed: {exc!r}", diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py index 755a809..c63c544 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py @@ -55,6 +55,15 @@ from gps_denied_onboard._types.fc import ( TelemetryKind, ) from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_gps_input, + encode_named_value_float, + encode_statustext, + send_via_transport, +) +from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import ( + source_label_to_float, +) from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, @@ -62,6 +71,7 @@ from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcOpenError, SourceSetSwitchNotSupportedError, ) +from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport from gps_denied_onboard.fdr_client.records import FdrRecord from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now from gps_denied_onboard.logging import get_logger @@ -208,6 +218,11 @@ class TlogReplayFcAdapter: "_latest_flight_state", "_last_received_at_ns", "_dispatched_count", + "_mavlink_transport", + "_outbound_mav", + "_sequence_number", + "_clock_us_provider", + "_clock_ms_boot_provider", ) def __init__( @@ -221,6 +236,8 @@ class TlogReplayFcAdapter: time_offset_ms: int = 0, pace: ReplayPace = ReplayPace.ASAP, source_factory: Any | None = None, + mavlink_transport: "MavlinkTransport | None" = None, + outbound_mav: Any | None = None, ) -> None: if not _build_flag_on(): raise FcAdapterConfigError( @@ -258,6 +275,22 @@ class TlogReplayFcAdapter: self._latest_flight_state: FlightStateSignal | None = None self._last_received_at_ns: int = -1 self._dispatched_count: int = 0 + # AZ-558: outbound MAVLink seam. When ``mavlink_transport`` is + # injected (replay branch wires NoopMavlinkTransport in), every + # ``emit_external_position`` / ``emit_status_text`` call routes + # AP-shape MAVLink bytes through the transport so AZ-401 AC-9 + # (`bytes_written > 0`) holds. Without a transport the adapter + # falls back to the prior raise-on-emit behaviour, preserving + # the AZ-399 unit-test contract. ``outbound_mav`` is a pymavlink + # MAVLink instance that owns the per-replay sequence counter + # and signing state; the replay branch leaves it None so the + # adapter constructs a fresh MAVLink(file=None) lazily inside + # :meth:`open` to avoid pulling pymavlink into module import. + self._mavlink_transport: MavlinkTransport | None = mavlink_transport + self._outbound_mav: Any = outbound_mav + self._sequence_number: int = 0 + self._clock_us_provider = lambda: int(self._clock.monotonic_ns() // 1000) + self._clock_ms_boot_provider = lambda: int(self._clock.monotonic_ns() // 1_000_000) % 0xFFFFFFFF # ------------------------------------------------------------------ # FcAdapter Protocol implementation @@ -282,6 +315,16 @@ class TlogReplayFcAdapter: raise FcOpenError(f"tlog file not found: {self._tlog_path}") message_counts = self._prescan_required_messages() self._source = self._open_mavlog() + # AZ-558: when a transport is wired but no outbound MAVLink + # instance was injected, build one now so replay-mode emit + # paths can encode + pack without re-importing pymavlink at + # module import time. + if self._mavlink_transport is not None and self._outbound_mav is None: + from pymavlink.dialects.v20 import ardupilotmega as _mavlink + + self._outbound_mav = _mavlink.MAVLink( + file=None, srcSystem=1, srcComponent=1 + ) thread = threading.Thread( target=self._run_decode_loop, name=_DECODE_THREAD_NAME, @@ -343,12 +386,83 @@ class TlogReplayFcAdapter: return self._bus.subscribe(callback) def emit_external_position(self, output: "EstimatorOutput") -> "EmittedExternalPosition": - # Invariant 5: replay never writes to the FC. - raise FcEmitError("replay adapter does not emit to FC") + # Replay protocol Invariant 5 (post-AZ-558): encoders run in + # both modes producing identical byte streams; only the + # transport differs. Replay routes AP-shape MAVLink bytes + # through the injected NoopMavlinkTransport (no wire I/O, + # ``bytes_written`` increments). Without an injected transport + # we honour the AZ-399 raise-on-emit contract for backward + # compatibility with unit tests that exercise the read-only + # invariant directly. + from gps_denied_onboard._types.emitted import EmittedExternalPosition + + if self._mavlink_transport is None or self._outbound_mav is None: + raise FcEmitError("replay adapter does not emit to FC") + if output.smoothed: + raise FcEmitError("smoothed output cannot be emitted to FC (Invariant 6)") + wgs = output.position_wgs84 + if not isinstance(wgs, LatLonAlt): + raise FcEmitError( + f"EstimatorOutput.position_wgs84 must be a LatLonAlt; got {type(wgs).__name__}" + ) + emitted_at = self._clock.monotonic_ns() + self._sequence_number += 1 + seq = self._sequence_number + try: + gps_msg = encode_gps_input( + self._outbound_mav, + time_usec=int(self._clock_us_provider()), + gps_id=0, + ignore_flags=0, + time_week_ms=0, + time_week=0, + fix_type=3, + lat=int(wgs.lat_deg * 1e7), + lon=int(wgs.lon_deg * 1e7), + alt=float(wgs.alt_m), + hdop=0.0, + vdop=0.0, + vn=0.0, + ve=0.0, + vd=0.0, + speed_accuracy=0.0, + horiz_accuracy=0.0, + vert_accuracy=0.0, + satellites_visible=10, + yaw=0, + ) + send_via_transport(self._outbound_mav, gps_msg, self._mavlink_transport) + label_msg = encode_named_value_float( + self._outbound_mav, + time_boot_ms=int(self._clock_ms_boot_provider()), + name=b"src_lbl", + value=source_label_to_float(output.source_label), + ) + send_via_transport(self._outbound_mav, label_msg, self._mavlink_transport) + except Exception as exc: + raise FcEmitError(f"replay outbound wire emit failed: {exc!r}") from exc + return EmittedExternalPosition( + fc_kind=FcKind.ARDUPILOT_PLANE, + horiz_accuracy_m=0.0, + source_label=output.source_label, + emitted_at=emitted_at, + sequence_number=seq, + ) def emit_status_text(self, msg: str, severity: Severity) -> None: - # Invariant 5: replay never writes to the FC. - raise FcEmitError("replay adapter does not emit to FC") + # See ``emit_external_position`` for the replay-mode rationale. + if self._mavlink_transport is None or self._outbound_mav is None: + raise FcEmitError("replay adapter does not emit to FC") + try: + text = msg.encode("utf-8")[:50] + txt_msg = encode_statustext( + self._outbound_mav, + severity=int(severity.value), + text=text, + ) + send_via_transport(self._outbound_mav, txt_msg, self._mavlink_transport) + except Exception as exc: + raise FcEmitError(f"replay outbound statustext failed: {exc!r}") from exc def request_source_set_switch(self) -> None: raise SourceSetSwitchNotSupportedError( diff --git a/src/gps_denied_onboard/replay_input/tlog_video_adapter.py b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py index fcd78c4..a5b731a 100644 --- a/src/gps_denied_onboard/replay_input/tlog_video_adapter.py +++ b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py @@ -131,6 +131,7 @@ class ReplayInputAdapter: "_tlog_source_factory", "_video_frames_factory", "_video_timestamps_factory", + "_mavlink_transport", "_log", "_opened", "_closed", @@ -152,6 +153,7 @@ class ReplayInputAdapter: tlog_source_factory: Any | None = None, video_frames_factory: Any | None = None, video_timestamps_factory: Any | None = None, + mavlink_transport: Any | None = None, ) -> None: if not isinstance(video_path, Path): raise ReplayInputAdapterError( @@ -182,6 +184,7 @@ class ReplayInputAdapter: self._tlog_source_factory = tlog_source_factory self._video_frames_factory = video_frames_factory self._video_timestamps_factory = video_timestamps_factory + self._mavlink_transport = mavlink_transport self._log = logging.getLogger("replay_input.tlog_video_adapter") self._opened = False self._closed = False @@ -268,6 +271,7 @@ class ReplayInputAdapter: time_offset_ms=resolved_offset_ms, pace=self._pace, source_factory=self._tlog_source_factory, + mavlink_transport=self._mavlink_transport, ) fc_adapter.open() except (FcOpenError, FcAdapterConfigError, FcAdapterError) as exc: diff --git a/src/gps_denied_onboard/runtime_root/_replay_branch.py b/src/gps_denied_onboard/runtime_root/_replay_branch.py index 3ca3fbd..65bfe7a 100644 --- a/src/gps_denied_onboard/runtime_root/_replay_branch.py +++ b/src/gps_denied_onboard/runtime_root/_replay_branch.py @@ -126,10 +126,22 @@ def build_replay_components( sink_fdr_client = fdr_factory("c8_fc_adapter.replay_sink", config) + # AZ-558: build the outbound MAVLink transport BEFORE the FC adapter + # so it can be threaded through `ReplayInputAdapter` and into + # `TlogReplayFcAdapter`. The same instance is exposed as the + # ``mavlink_transport`` slot in ``components`` (replay protocol + # Invariant 5: encoders write through the seam in both modes; + # replay drops the bytes via NoopMavlinkTransport). + if transport_factory is not None: + transport = transport_factory(config) + else: + transport = NoopMavlinkTransport() + bundle = _build_replay_input_bundle( config, fdr_client=fdr_client, adapter_factory=replay_input_adapter_factory, + mavlink_transport=transport, ) if sink_factory is not None: @@ -140,11 +152,6 @@ def build_replay_components( fdr_client=sink_fdr_client, ) - if transport_factory is not None: - transport = transport_factory(config) - else: - transport = NoopMavlinkTransport() - components: dict[str, Any] = { "frame_source": bundle.frame_source, "fc_adapter": bundle.fc_adapter, @@ -188,6 +195,7 @@ def _build_replay_input_bundle( *, fdr_client: "FdrClient", adapter_factory: Any | None, + mavlink_transport: Any | None = None, ) -> ReplayInputBundle: """Build the :class:`ReplayInputAdapter` and call ``open()``.""" pace = _resolve_pace(config.replay.pace) @@ -205,6 +213,7 @@ def _build_replay_input_bundle( fdr_client=fdr_client, pace=pace, auto_sync_config=auto_sync, + mavlink_transport=mavlink_transport, ) else: adapter = ReplayInputAdapter( @@ -217,6 +226,7 @@ def _build_replay_input_bundle( pace=pace, manual_time_offset_ms=config.replay.time_offset_ms, auto_sync_config=auto_sync, + mavlink_transport=mavlink_transport, ) return adapter.open() diff --git a/tests/e2e/replay/test_derkachi_1min.py b/tests/e2e/replay/test_derkachi_1min.py index 6a4254d..8dc4017 100644 --- a/tests/e2e/replay/test_derkachi_1min.py +++ b/tests/e2e/replay/test_derkachi_1min.py @@ -266,19 +266,84 @@ class _ModeBranchScanner(ast.NodeVisitor): # ---------------------------------------------------------------------- -# AC-4b: Encoder byte-equality (BLOCKED on AZ-558) +# AC-4b: Encoder byte-equality (closed by AZ-558) -@pytest.mark.skip( - reason=( - "AC-4b blocked on AZ-558: C8 encoders still bypass the " - "MavlinkTransport seam by calling mav.*_send directly. The " - "CapturingMavlinkTransport fixture in _helpers.py is ready; " - "this test unskips when AZ-558 lands." +def test_ac4_encoder_byte_equality_via_transport_seam() -> None: + """AZ-404 AC-4b / AZ-558 AC-2: encoders write the same bytes through + the :class:`MavlinkTransport` seam regardless of mode. + + Constructive equivalence: pymavlink's ``MAVLink.send`` and our + retrofit (``mav.X_encode → msg.pack(mav) → transport.write``) + both invoke ``pack`` on the same MAVLink instance with the same + pre-bump ``mav.seq``. Run both paths with two MAVLink instances + initialised identically; the resulting bytes are equal by + construction. + + AZ-558's unit suite (``test_az558_outbound_transport_seam.py``) + covers this for every retrofitted message kind (GPS_INPUT, + NAMED_VALUE_FLOAT, STATUSTEXT, multi-message seq alignment); this + e2e variant double-checks the contract holds against the live + pymavlink ``MAVLink.send`` integration so a future pymavlink + upgrade that changes the framing surface fails this test loudly. + """ + # Arrange + import io + + from pymavlink.dialects.v20 import ardupilotmega as _mavlink + + from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_gps_input, + send_via_transport, ) -) -def test_ac4_encoder_byte_equality() -> None: - raise NotImplementedError("blocked on AZ-558 — see skip reason") + + from tests.e2e.replay._helpers import CapturingMavlinkTransport + + legacy_buf = io.BytesIO() + legacy = _mavlink.MAVLink(file=legacy_buf, srcSystem=1, srcComponent=1) + new = _mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1) + capture = CapturingMavlinkTransport() + + # Three deterministic GPS_INPUT messages of varying intensity to + # cover the encoded-payload range. + samples = [ + dict( + time_usec=t * 100_000, + gps_id=0, + ignore_flags=0, + time_week_ms=0, + time_week=0, + fix_type=3, + lat=int((50.0 + t * 0.0001) * 1e7), + lon=int((30.0 + t * 0.0001) * 1e7), + alt=100.0 + t * 0.5, + hdop=0.0, + vdop=0.0, + vn=0.0, + ve=0.0, + vd=0.0, + speed_accuracy=0.0, + horiz_accuracy=2.0 + t * 0.1, + vert_accuracy=0.0, + satellites_visible=10, + yaw=0, + ) + for t in range(3) + ] + + # Act + for s in samples: + legacy.gps_input_send(*s.values()) + msg = encode_gps_input(new, **s) + send_via_transport(new, msg, capture) + + # Assert + assert legacy_buf.getvalue() == capture.captured_concat, ( + "AZ-404 AC-4b violated: encoder byte stream differs between " + "MAVLink.send and encode→pack→transport.write paths" + ) + assert capture.bytes_written() > 0 + assert legacy.seq == new.seq # ---------------------------------------------------------------------- diff --git a/tests/unit/c8_fc_adapter/_mav_test_helpers.py b/tests/unit/c8_fc_adapter/_mav_test_helpers.py new file mode 100644 index 0000000..fc12e82 --- /dev/null +++ b/tests/unit/c8_fc_adapter/_mav_test_helpers.py @@ -0,0 +1,101 @@ +"""Shared test stubs for the C8 outbound retrofit (AZ-558). + +After AZ-558 the AP / iNav / replay adapters route bytes through the +``MavlinkTransport`` Protocol seam instead of calling +``connection.mav.X_send(...)`` directly. The new code path is +``mav.X_encode(...) → msg.pack(mav) → transport.write(buf)``. + +Tests that previously used a hand-rolled ``_MavStub`` with ``X_send`` +methods need the matching ``X_encode`` methods so their wire-level +assertions continue to work. This module provides: + +* :class:`_FakeMsg` — opaque message stub returned by ``X_encode``; + its ``pack(mav)`` returns deterministic placeholder bytes without + recomputing CRCs / signing (the per-test stub records the call args + inside its own ``X_encode`` method, so ``pack`` can be a pure + byte-emitter). +* :class:`_NullTransport` — minimal :class:`MavlinkTransport` + implementation that drops bytes and counts them. Used by tests that + do not care about wire content but need a transport to satisfy + the new ``mavlink_transport_factory`` plumbing. +* :class:`_BytesCapturingTransport` — collects every ``write(buf)`` + call. Used by AC-2 byte-equivalence and AZ-401 AC-9 tests that + assert on aggregate byte volume / specific message bytes. +""" + +from __future__ import annotations + +from typing import Any + + +__all__ = [ + "_BytesCapturingTransport", + "_FakeMsg", + "_NullTransport", + "_PLACEHOLDER_PACK_BYTES", +] + +_PLACEHOLDER_PACK_BYTES: bytes = b"\x00" * 16 + + +class _FakeMsg: + """Opaque MAVLink message stand-in returned by ``X_encode``. + + ``pack(mav)`` returns fixed placeholder bytes — the test stub's + ``X_encode`` method already recorded the call args, so we don't + need to re-record here. + """ + + __slots__ = () + + def pack(self, mav: Any, force_mavlink1: bool = False) -> bytes: + return _PLACEHOLDER_PACK_BYTES + + +class _NullTransport: + """Minimal :class:`MavlinkTransport` impl that drops bytes.""" + + __slots__ = ("_bytes_written", "_closed") + + def __init__(self) -> None: + self._bytes_written = 0 + self._closed = False + + def write(self, payload: bytes) -> int: + if self._closed: + raise RuntimeError("write on closed _NullTransport") + n = len(payload) + self._bytes_written += n + return n + + def bytes_written(self) -> int: + return self._bytes_written + + def close(self) -> None: + self._closed = True + + +class _BytesCapturingTransport: + """Test :class:`MavlinkTransport` that retains every ``write`` payload.""" + + __slots__ = ("_chunks", "_closed") + + def __init__(self) -> None: + self._chunks: list[bytes] = [] + self._closed = False + + def write(self, payload: bytes) -> int: + if self._closed: + raise RuntimeError("write on closed _BytesCapturingTransport") + self._chunks.append(bytes(payload)) + return len(payload) + + def bytes_written(self) -> int: + return sum(len(c) for c in self._chunks) + + def close(self) -> None: + self._closed = True + + @property + def chunks(self) -> tuple[bytes, ...]: + return tuple(self._chunks) diff --git a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py index 3128209..b85a5cc 100644 --- a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py +++ b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py @@ -35,26 +35,43 @@ from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter imp ) from gps_denied_onboard.config import load_config +from ._mav_test_helpers import _FakeMsg + # ---------------------------------------------------------------------- # Helpers — pymavlink stand-in +class _SigningStub: + sign_outgoing = False + + class _MavStub: - """Captures pymavlink ``mav.*_send`` calls for wire-level assertions.""" + """Captures pymavlink ``mav.*_encode`` calls for wire-level assertions. + + Post-AZ-558 the AP adapter routes through ``encode → pack → transport.write``; + we record args at the ``encode`` boundary (where the test cares), + return a :class:`_FakeMsg` whose ``pack`` produces placeholder bytes, + and the transport seam swallows them. + """ def __init__(self) -> None: self.gps_input_calls: list[tuple[Any, ...]] = [] self.named_value_float_calls: list[tuple[Any, ...]] = [] self.statustext_calls: list[tuple[int, bytes]] = [] + self.seq: int = 0 + self.signing = _SigningStub() - def gps_input_send(self, *args: Any) -> None: + def gps_input_encode(self, *args: Any) -> _FakeMsg: self.gps_input_calls.append(args) + return _FakeMsg() - def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: + def named_value_float_encode(self, time_boot_ms: int, name: bytes, value: float) -> _FakeMsg: self.named_value_float_calls.append((time_boot_ms, name, value)) + return _FakeMsg() - def statustext_send(self, severity: int, text: bytes) -> None: + def statustext_encode(self, severity: int, text: bytes) -> _FakeMsg: self.statustext_calls.append((severity, text)) + return _FakeMsg() class _ConnStub: @@ -62,10 +79,15 @@ class _ConnStub: self.mav = _MavStub() self.setup_signing_calls: list[bytes] = [] self.closed = False + self.write_calls: list[bytes] = [] def setup_signing(self, key: bytes) -> None: self.setup_signing_calls.append(bytes(key)) + def write(self, payload: bytes) -> int: + self.write_calls.append(bytes(payload)) + return len(payload) + def close(self) -> None: self.closed = True diff --git a/tests/unit/c8_fc_adapter/test_az394_inav_outbound.py b/tests/unit/c8_fc_adapter/test_az394_inav_outbound.py index 8a66c05..94a6110 100644 --- a/tests/unit/c8_fc_adapter/test_az394_inav_outbound.py +++ b/tests/unit/c8_fc_adapter/test_az394_inav_outbound.py @@ -35,6 +35,8 @@ from gps_denied_onboard.components.c8_fc_adapter.msp2_inav_adapter import ( ) from gps_denied_onboard.config import load_config +from ._mav_test_helpers import _FakeMsg + # ---------------------------------------------------------------------- # Helpers — MSP / secondary-MAVLink stand-ins @@ -51,22 +53,35 @@ class _MspStub: self.closed = True +class _SigningStub: + sign_outgoing = False + + class _SecondaryMavStub: def __init__(self) -> None: self.statustext_calls: list[tuple[int, bytes]] = [] self.closed = False + self.write_calls: list[bytes] = [] # Mirror pymavlink connection.mav shape. self.mav = self # Track signing-key state per RESTRICT-COMM-2 (Invariant 9): # the unit-test adapter MUST never call setup_signing on us. self.setup_signing_calls: list[Any] = [] + # AZ-558: encoder flow needs ``mav.seq`` and ``mav.signing``. + self.seq: int = 0 + self.signing = _SigningStub() - def statustext_send(self, severity: int, text: bytes) -> None: + def statustext_encode(self, severity: int, text: bytes) -> _FakeMsg: self.statustext_calls.append((int(severity), bytes(text))) + return _FakeMsg() def setup_signing(self, key: Any) -> None: self.setup_signing_calls.append(key) + def write(self, payload: bytes) -> int: + self.write_calls.append(bytes(payload)) + return len(payload) + def close(self) -> None: self.closed = True diff --git a/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py b/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py index 52e8580..a7406e6 100644 --- a/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py +++ b/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py @@ -31,6 +31,8 @@ from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter imp ) from gps_denied_onboard.config import load_config +from ._mav_test_helpers import _FakeMsg + _DEV_STATIC_KEY = "00112233445566778899aabbccddeeff" * 2 # 64 hex chars = 32 bytes @@ -41,16 +43,20 @@ class _MavStub: self.statustext_calls: list[tuple[int, bytes]] = [] # pymavlink exposes `connection.mav.signing.sig_count` after # setup_signing(...); we simulate that surface here. - self.signing = SimpleNamespace(sig_count=signing_failure_count) + self.signing = SimpleNamespace(sig_count=signing_failure_count, sign_outgoing=False) + self.seq: int = 0 - def gps_input_send(self, *args: Any) -> None: + def gps_input_encode(self, *args: Any) -> _FakeMsg: self.gps_input_calls.append(args) + return _FakeMsg() - def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: + def named_value_float_encode(self, time_boot_ms: int, name: bytes, value: float) -> _FakeMsg: self.named_value_float_calls.append((time_boot_ms, name, value)) + return _FakeMsg() - def statustext_send(self, severity: int, text: bytes) -> None: + def statustext_encode(self, severity: int, text: bytes) -> _FakeMsg: self.statustext_calls.append((severity, text)) + return _FakeMsg() class _ConnStub: @@ -59,12 +65,17 @@ class _ConnStub: self.setup_signing_calls: list[bytes] = [] self._fail_signing = fail_signing self.closed = False + self.write_calls: list[bytes] = [] def setup_signing(self, key: bytes) -> None: if self._fail_signing: raise RuntimeError("simulated signing handshake refusal") self.setup_signing_calls.append(bytes(key)) + def write(self, payload: bytes) -> int: + self.write_calls.append(bytes(payload)) + return len(payload) + def close(self) -> None: self.closed = True diff --git a/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py b/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py index da2861f..2053671 100644 --- a/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py +++ b/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py @@ -22,6 +22,8 @@ from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter imp from gps_denied_onboard.config import load_config from gps_denied_onboard.runtime_root.spoof_recovery_sink import SpoofRecoverySink +from ._mav_test_helpers import _FakeMsg + # AC-1 / AC-2 / AC-3: pymavlink ardupilotmega command id for SET_EKF_SOURCE_SET. _CMD_SET_EKF_SOURCE_SET = 42007 _MAV_RESULT_ACCEPTED = 0 @@ -34,14 +36,20 @@ class _AckMsg: self.result = result +class _SigningStub: + sign_outgoing = False + + class _MavStub: def __init__(self) -> None: self.command_long_calls: list[tuple[int, ...]] = [] self.statustext_calls: list[tuple[int, bytes]] = [] self.named_value_float_calls: list[tuple[Any, ...]] = [] self.gps_input_calls: list[tuple[Any, ...]] = [] + self.seq: int = 0 + self.signing = _SigningStub() - def command_long_send( + def command_long_encode( self, target_system: int, target_component: int, @@ -54,17 +62,23 @@ class _MavStub: p5: float, p6: float, p7: float, - ) -> None: + ) -> "_FakeMsg": self.command_long_calls.append((target_system, target_component, command, confirmation, p1)) + return _FakeMsg() - def statustext_send(self, severity: int, text: bytes) -> None: + def statustext_encode(self, severity: int, text: bytes) -> "_FakeMsg": self.statustext_calls.append((severity, text)) + return _FakeMsg() - def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: + def named_value_float_encode( + self, time_boot_ms: int, name: bytes, value: float + ) -> "_FakeMsg": self.named_value_float_calls.append((time_boot_ms, name, value)) + return _FakeMsg() - def gps_input_send(self, *args: Any) -> None: + def gps_input_encode(self, *args: Any) -> "_FakeMsg": self.gps_input_calls.append(args) + return _FakeMsg() class _ConnStub: @@ -74,6 +88,7 @@ class _ConnStub: self.target_component = 1 self._ack_queue = list(ack_queue or []) self.closed = False + self.write_calls: list[bytes] = [] def recv_match(self, *, type: str, blocking: bool, timeout: float | None) -> Any: # Real pymavlink filters by ``type``; the inbound decoder thread @@ -89,6 +104,10 @@ class _ConnStub: def setup_signing(self, key: bytes) -> None: pass + def write(self, payload: bytes) -> int: + self.write_calls.append(bytes(payload)) + return len(payload) + def close(self) -> None: self.closed = True diff --git a/tests/unit/c8_fc_adapter/test_az558_outbound_transport_seam.py b/tests/unit/c8_fc_adapter/test_az558_outbound_transport_seam.py new file mode 100644 index 0000000..a40534a --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_az558_outbound_transport_seam.py @@ -0,0 +1,212 @@ +"""AZ-558 — outbound MavlinkTransport seam acceptance tests. + +Closes AZ-401 AC-9 + AZ-404 AC-4b deferrals by routing every C8 +outbound MAVLink byte through the :class:`MavlinkTransport` Protocol. + +ACs covered here (AC-1, AC-3, AC-4 are exercised in their respective +adapter / compose-root tests): + +* **AC-2** — Wire-byte equivalence (live mode). Two pymavlink + ``MAVLink`` instances with identical state are driven through: + (1) ``mav.gps_input_send(...)`` writing to a ``BytesIO``, and + (2) ``encode + pack + transport.write`` writing to a + :class:`_BytesCapturingTransport`. The captured bytes are + byte-identical, by construction (both call ``msg.pack(mav)`` + on the same MAVLink instance with the same ``mav.seq`` snapshot). +* **AC-5** — AST scan asserts that no method-call AST node in the + AP / iNav / replay-FC adapter source files invokes a pymavlink + ``mav.X_send`` helper. The Protocol seam is the only egress. +""" + +from __future__ import annotations + +import ast +import io +from pathlib import Path +from typing import Any + +import pytest + +from gps_denied_onboard.components.c8_fc_adapter._outbound_mavlink_payloads import ( + encode_gps_input, + encode_named_value_float, + encode_statustext, + send_via_transport, +) + +from ._mav_test_helpers import _BytesCapturingTransport + + +_REPO_ROOT = Path(__file__).resolve().parents[3] +_C8_DIR = _REPO_ROOT / "src" / "gps_denied_onboard" / "components" / "c8_fc_adapter" + +# AC-5 retrofitted files — every outbound MAVLink byte must now route +# through the MavlinkTransport seam, not via pymavlink's *_send helpers. +_RETROFITTED_FILES: tuple[str, ...] = ( + "pymavlink_ardupilot_adapter.py", + "msp2_inav_adapter.py", + "tlog_replay_adapter.py", +) + +# Pymavlink ``mav._send(...)`` helpers we forbid in retrofitted code. +_FORBIDDEN_SEND_HELPERS: frozenset[str] = frozenset( + { + "gps_input_send", + "named_value_float_send", + "statustext_send", + "command_long_send", + "global_position_int_send", + "command_int_send", + "param_set_send", + "param_request_read_send", + } +) + + +# ---------------------------------------------------------------------- +# AC-2 — wire-byte equivalence + + +@pytest.fixture +def _mavlink_pair() -> tuple[Any, Any]: + """Build two pymavlink MAVLink instances with identical state.""" + from pymavlink.dialects.v20 import ardupilotmega as _mavlink + + legacy_buf = io.BytesIO() + legacy = _mavlink.MAVLink(file=legacy_buf, srcSystem=1, srcComponent=1) + new = _mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1) + return legacy, new, legacy_buf # type: ignore[return-value] + + +def test_ac2_byte_equivalence_gps_input(_mavlink_pair: tuple[Any, Any, io.BytesIO]) -> None: + """``encode + pack + transport.write`` is byte-identical to ``gps_input_send``.""" + # Arrange + legacy, new, legacy_buf = _mavlink_pair + capture = _BytesCapturingTransport() + args = dict( + time_usec=1_000_000, + gps_id=0, + ignore_flags=0, + time_week_ms=0, + time_week=0, + fix_type=3, + lat=int(50.0 * 1e7), + lon=int(30.0 * 1e7), + alt=100.0, + hdop=0.0, + vdop=0.0, + vn=0.0, + ve=0.0, + vd=0.0, + speed_accuracy=0.0, + horiz_accuracy=2.5, + vert_accuracy=0.0, + satellites_visible=10, + yaw=0, + ) + + # Act + legacy.gps_input_send(*args.values()) + msg = encode_gps_input(new, **args) + send_via_transport(new, msg, capture) + + # Assert + assert legacy_buf.getvalue() == b"".join(capture.chunks) + + +def test_ac2_byte_equivalence_named_value_float( + _mavlink_pair: tuple[Any, Any, io.BytesIO], +) -> None: + """``named_value_float_encode`` produces byte-equivalent output.""" + # Arrange + legacy, new, legacy_buf = _mavlink_pair + capture = _BytesCapturingTransport() + args = dict(time_boot_ms=12345, name=b"src_lbl", value=1.5) + + # Act + legacy.named_value_float_send(*args.values()) + msg = encode_named_value_float(new, **args) + send_via_transport(new, msg, capture) + + # Assert + assert legacy_buf.getvalue() == b"".join(capture.chunks) + + +def test_ac2_byte_equivalence_statustext( + _mavlink_pair: tuple[Any, Any, io.BytesIO], +) -> None: + """``statustext_encode`` produces byte-equivalent output.""" + # Arrange + legacy, new, legacy_buf = _mavlink_pair + capture = _BytesCapturingTransport() + + # Act + legacy.statustext_send(4, b"hello") + msg = encode_statustext(new, severity=4, text=b"hello") + send_via_transport(new, msg, capture) + + # Assert + assert legacy_buf.getvalue() == b"".join(capture.chunks) + + +def test_ac2_byte_equivalence_seq_bumps_consistently( + _mavlink_pair: tuple[Any, Any, io.BytesIO], +) -> None: + """Sending two messages keeps the seq numbers byte-aligned.""" + # Arrange + legacy, new, legacy_buf = _mavlink_pair + capture = _BytesCapturingTransport() + args = dict( + time_usec=1_000_000, gps_id=0, ignore_flags=0, time_week_ms=0, time_week=0, + fix_type=3, lat=0, lon=0, alt=0.0, hdop=0.0, vdop=0.0, vn=0.0, ve=0.0, + vd=0.0, speed_accuracy=0.0, horiz_accuracy=0.0, vert_accuracy=0.0, + satellites_visible=10, yaw=0, + ) + + # Act — two messages, both sides + for _ in range(2): + legacy.gps_input_send(*args.values()) + msg = encode_gps_input(new, **args) + send_via_transport(new, msg, capture) + + # Assert + assert legacy_buf.getvalue() == b"".join(capture.chunks) + assert legacy.seq == new.seq + + +# ---------------------------------------------------------------------- +# AC-5 — AST scan: no `mav.X_send(` in retrofitted adapters + + +class _MavSendCallScanner(ast.NodeVisitor): + """Flags ``.mav._send(...)`` AST patterns.""" + + def __init__(self) -> None: + self.findings: list[tuple[str, int]] = [] + + def visit_Call(self, node: ast.Call) -> None: # noqa: N802 — ast API + func = node.func + if isinstance(func, ast.Attribute) and func.attr in _FORBIDDEN_SEND_HELPERS: + value = func.value + if isinstance(value, ast.Attribute) and value.attr == "mav": + self.findings.append((func.attr, node.lineno)) + self.generic_visit(node) + + +@pytest.mark.parametrize("filename", _RETROFITTED_FILES) +def test_ac5_no_pymavlink_send_helpers_in_adapter_source(filename: str) -> None: + """No retrofitted adapter source calls ``connection.mav._send(...)``.""" + # Arrange + source_path = _C8_DIR / filename + tree = ast.parse(source_path.read_text(encoding="utf-8")) + scanner = _MavSendCallScanner() + + # Act + scanner.visit(tree) + + # Assert + assert scanner.findings == [], ( + f"{filename}: forbidden pymavlink send-helpers still present " + f"(MavlinkTransport.write must be the only egress per AZ-558 AC-5):\n" + + "\n".join(f" line {ln}: .mav.{name}(" for name, ln in scanner.findings) + ) diff --git a/tests/unit/test_az401_compose_root_replay.py b/tests/unit/test_az401_compose_root_replay.py index 8b71722..a16e47a 100644 --- a/tests/unit/test_az401_compose_root_replay.py +++ b/tests/unit/test_az401_compose_root_replay.py @@ -520,23 +520,61 @@ def test_ac8_replay_branch_imports_only_public_apis() -> None: # ---------------------------------------------------------------------- -# AC-9: NoopMavlinkTransport.bytes_written() > 0 — BLOCKED +# AC-9: NoopMavlinkTransport.bytes_written() > 0 (closed by AZ-558) -@pytest.mark.skip( - reason=( - "BLOCKED on AZ-399 design choice: TlogReplayFcAdapter raises " - "FcEmitError on emit_external_position rather than routing the " - "encoder bytes through the MavlinkTransport seam. Closing this " - "gap requires retrofitting AP/iNav/QGC encoder code paths to " - "consume MavlinkTransport — see batch 61 report. NoopMavlinkTransport " - "+ MavlinkTransport Protocol classes are present (covered by " - "test_az400_mavlink_transport.py) but the wiring that makes " - "bytes_written > 0 in replay mode is deferred." +def test_ac9_noop_transport_bytes_written_after_runtime_drive( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """AZ-401 AC-9 / AZ-558 AC-4: replay encoders write through the seam. + + Drives 10 ``EstimatorOutput`` ticks through a replay-wired + :class:`TlogReplayFcAdapter` with a :class:`NoopMavlinkTransport` + injected as its outbound seam. After the AZ-558 retrofit the + adapter encodes ``GPS_INPUT`` + ``NAMED_VALUE_FLOAT`` per tick + and writes the packed bytes through the transport — replay + protocol Invariant 5 (encoders run in both modes; only the + transport differs). + """ + # Arrange + from pymavlink.dialects.v20 import ardupilotmega as _mavlink + + monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "ON") + transport = NoopMavlinkTransport() + outbound_mav = _mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1) + fc = TlogReplayFcAdapter.__new__(TlogReplayFcAdapter) + # Initialise only the slots the encoder code path consults so the + # test stays focused on the wire-routing contract (no tlog file, + # no BUILD_TLOG_REPLAY_ADAPTER gate, no decode thread). + fc._mavlink_transport = transport + fc._outbound_mav = outbound_mav + fc._sequence_number = 0 + fc._clock = WallClock() + fc._clock_us_provider = lambda: int(fc._clock.monotonic_ns() // 1000) + fc._clock_ms_boot_provider = ( + lambda: int(fc._clock.monotonic_ns() // 1_000_000) % 0xFFFFFFFF + ) + output = EstimatorOutput( + frame_id=uuid4(), + position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0), + orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), + velocity_world_mps=(0.0, 0.0, 0.0), + covariance_6x6=np.eye(6, dtype=np.float64) * 0.25, + source_label=PoseSourceLabel.VISUAL_PROPAGATED, + last_satellite_anchor_age_ms=0, + smoothed=False, + emitted_at=0, + ) + + # Act + for _ in range(10): + fc.emit_external_position(output) + + # Assert + assert transport.bytes_written() > 0, ( + f"NoopMavlinkTransport.bytes_written() = {transport.bytes_written()}; " + "expected > 0 after 10 emit_external_position calls" ) -) -def test_ac9_noop_transport_bytes_written_after_runtime_drive() -> None: - raise NotImplementedError("see skip reason") # ----------------------------------------------------------------------