Files
gps-denied-onboard/tests/unit/c8_fc_adapter/_mav_test_helpers.py
T
Oleksandr Bezdieniezhnykh 2b19b8b90b [AZ-558] Route C8 outbound encoder bytes through MavlinkTransport seam
All FC adapter outbound MAVLink bytes now go through the AZ-401
MavlinkTransport seam (NoopMavlinkTransport in replay,
SerialMavlinkTransport in live). New helpers in
_outbound_mavlink_payloads.py extract encode/pack/seq-bump so the four
AP _send sites and the iNav statustext _send site become
encode -> pack -> transport.write. TlogReplayFcAdapter emits real
AP-shape MAVLink bytes through the injected NoopMavlinkTransport,
satisfying replay protocol Invariant 5 and unblocking AZ-401 AC-9.

Closes AZ-558. Also unskips AZ-401 AC-9 and AZ-404 AC-4b. Live wire
output remains byte-identical (proven via two-instance MAVLink
byte-equivalence tests). AST scan asserts no .mav.<name>_send( calls
remain in the retrofit set (AP / iNav / tlog adapters).

Out of scope (logged in review): GCS adapter retrofit; airborne live
strategy registration that would activate the SerialMavlinkTransport
factory injection path.

Tests: 2110 passed, 92 environmental skips, 1 unrelated pre-existing
macOS cold-start flake deselected.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 05:33:56 +03:00

102 lines
3.1 KiB
Python

"""Shared test stubs for the C8 outbound retrofit (AZ-558).
After AZ-558 the AP / iNav / replay adapters route bytes through the
``MavlinkTransport`` Protocol seam instead of calling
``connection.mav.X_send(...)`` directly. The new code path is
``mav.X_encode(...) → msg.pack(mav) → transport.write(buf)``.
Tests that previously used a hand-rolled ``_MavStub`` with ``X_send``
methods need the matching ``X_encode`` methods so their wire-level
assertions continue to work. This module provides:
* :class:`_FakeMsg` — opaque message stub returned by ``X_encode``;
its ``pack(mav)`` returns deterministic placeholder bytes without
recomputing CRCs / signing (the per-test stub records the call args
inside its own ``X_encode`` method, so ``pack`` can be a pure
byte-emitter).
* :class:`_NullTransport` — minimal :class:`MavlinkTransport`
implementation that drops bytes and counts them. Used by tests that
do not care about wire content but need a transport to satisfy
the new ``mavlink_transport_factory`` plumbing.
* :class:`_BytesCapturingTransport` — collects every ``write(buf)``
call. Used by AC-2 byte-equivalence and AZ-401 AC-9 tests that
assert on aggregate byte volume / specific message bytes.
"""
from __future__ import annotations
from typing import Any
__all__ = [
"_BytesCapturingTransport",
"_FakeMsg",
"_NullTransport",
"_PLACEHOLDER_PACK_BYTES",
]
_PLACEHOLDER_PACK_BYTES: bytes = b"\x00" * 16
class _FakeMsg:
"""Opaque MAVLink message stand-in returned by ``X_encode``.
``pack(mav)`` returns fixed placeholder bytes — the test stub's
``X_encode`` method already recorded the call args, so we don't
need to re-record here.
"""
__slots__ = ()
def pack(self, mav: Any, force_mavlink1: bool = False) -> bytes:
return _PLACEHOLDER_PACK_BYTES
class _NullTransport:
"""Minimal :class:`MavlinkTransport` impl that drops bytes."""
__slots__ = ("_bytes_written", "_closed")
def __init__(self) -> None:
self._bytes_written = 0
self._closed = False
def write(self, payload: bytes) -> int:
if self._closed:
raise RuntimeError("write on closed _NullTransport")
n = len(payload)
self._bytes_written += n
return n
def bytes_written(self) -> int:
return self._bytes_written
def close(self) -> None:
self._closed = True
class _BytesCapturingTransport:
"""Test :class:`MavlinkTransport` that retains every ``write`` payload."""
__slots__ = ("_chunks", "_closed")
def __init__(self) -> None:
self._chunks: list[bytes] = []
self._closed = False
def write(self, payload: bytes) -> int:
if self._closed:
raise RuntimeError("write on closed _BytesCapturingTransport")
self._chunks.append(bytes(payload))
return len(payload)
def bytes_written(self) -> int:
return sum(len(c) for c in self._chunks)
def close(self) -> None:
self._closed = True
@property
def chunks(self) -> tuple[bytes, ...]:
return tuple(self._chunks)