Files
gps-denied-onboard/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py
T
Oleksandr Bezdieniezhnykh 2b19b8b90b [AZ-558] Route C8 outbound encoder bytes through MavlinkTransport seam
All FC adapter outbound MAVLink bytes now go through the AZ-401
MavlinkTransport seam (NoopMavlinkTransport in replay,
SerialMavlinkTransport in live). New helpers in
_outbound_mavlink_payloads.py extract encode/pack/seq-bump so the four
AP _send sites and the iNav statustext _send site become
encode -> pack -> transport.write. TlogReplayFcAdapter emits real
AP-shape MAVLink bytes through the injected NoopMavlinkTransport,
satisfying replay protocol Invariant 5 and unblocking AZ-401 AC-9.

Closes AZ-558. Also unskips AZ-401 AC-9 and AZ-404 AC-4b. Live wire
output remains byte-identical (proven via two-instance MAVLink
byte-equivalence tests). AST scan asserts no .mav.<name>_send( calls
remain in the retrofit set (AP / iNav / tlog adapters).

Out of scope (logged in review): GCS adapter retrofit; airborne live
strategy registration that would activate the SerialMavlinkTransport
factory injection path.

Tests: 2110 passed, 92 environmental skips, 1 unrelated pre-existing
macOS cold-start flake deselected.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 05:33:56 +03:00

326 lines
12 KiB
Python

"""AZ-395 — AP MAVLink 2.0 per-flight signing AC tests."""
from __future__ import annotations
import logging
import re
from types import SimpleNamespace
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.fc import FcKind, PortConfig, Severity
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import (
EstimatorOutput,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import (
CovarianceProjector,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcOpenError,
SigningHandshakeError,
)
from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter import (
PymavlinkArdupilotAdapter,
)
from gps_denied_onboard.config import load_config
from ._mav_test_helpers import _FakeMsg
_DEV_STATIC_KEY = "00112233445566778899aabbccddeeff" * 2 # 64 hex chars = 32 bytes
class _MavStub:
def __init__(self, signing_failure_count: int = 0) -> None:
self.gps_input_calls: list[tuple[Any, ...]] = []
self.named_value_float_calls: list[tuple[Any, ...]] = []
self.statustext_calls: list[tuple[int, bytes]] = []
# pymavlink exposes `connection.mav.signing.sig_count` after
# setup_signing(...); we simulate that surface here.
self.signing = SimpleNamespace(sig_count=signing_failure_count, sign_outgoing=False)
self.seq: int = 0
def gps_input_encode(self, *args: Any) -> _FakeMsg:
self.gps_input_calls.append(args)
return _FakeMsg()
def named_value_float_encode(self, time_boot_ms: int, name: bytes, value: float) -> _FakeMsg:
self.named_value_float_calls.append((time_boot_ms, name, value))
return _FakeMsg()
def statustext_encode(self, severity: int, text: bytes) -> _FakeMsg:
self.statustext_calls.append((severity, text))
return _FakeMsg()
class _ConnStub:
def __init__(self, *, fail_signing: bool = False, signing_failure_count: int = 0) -> None:
self.mav = _MavStub(signing_failure_count=signing_failure_count)
self.setup_signing_calls: list[bytes] = []
self._fail_signing = fail_signing
self.closed = False
self.write_calls: list[bytes] = []
def setup_signing(self, key: bytes) -> None:
if self._fail_signing:
raise RuntimeError("simulated signing handshake refusal")
self.setup_signing_calls.append(bytes(key))
def write(self, payload: bytes) -> int:
self.write_calls.append(bytes(payload))
return len(payload)
def close(self) -> None:
self.closed = True
def _ap_config(*, signing_key_source: str, dev_static_key: str = "") -> Any:
env: dict[str, str] = {
"FC_ADAPTER": "ardupilot_plane",
"FC_PORT_DEVICE": "/dev/null",
"FC_PORT_BAUD": "921600",
"FC_SIGNING_KEY_SOURCE": signing_key_source,
}
if dev_static_key:
env["FC_DEV_STATIC_SIGNING_KEY"] = dev_static_key
return load_config(env=env, paths=(), require_env=False)
def _build_adapter(
conn: _ConnStub, *, signing_key_source: str, dev_static_key: str = ""
) -> PymavlinkArdupilotAdapter:
cfg = _ap_config(signing_key_source=signing_key_source, dev_static_key=dev_static_key)
fdr = mock.MagicMock()
return PymavlinkArdupilotAdapter(
config=cfg,
wgs_converter=mock.MagicMock(),
covariance_projector=CovarianceProjector(fdr_client=fdr),
fdr_client=fdr,
flight_id="flt-az395",
connect_factory=lambda device, baud: conn,
)
def _port() -> PortConfig:
return PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600)
def _make_output(frame_id: UUID | int | None = None) -> EstimatorOutput:
if frame_id is None:
frame_id = uuid4()
elif isinstance(frame_id, int):
frame_id = UUID(int=frame_id)
return EstimatorOutput(
frame_id=frame_id,
position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
velocity_world_mps=(0.0, 0.0, 0.0),
covariance_6x6=np.eye(6, dtype=np.float64) * 0.25,
source_label=PoseSourceLabel.VISUAL_PROPAGATED,
last_satellite_anchor_age_ms=0,
smoothed=False,
emitted_at=0,
)
# ----------------------------------------------------------------------
# AC-1 / AC-10: signing_key=None with ephemeral source generates the key
def test_ac1_ephemeral_generates_key_when_none_passed() -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
assert len(conn.setup_signing_calls) == 1
assert len(conn.setup_signing_calls[0]) == 32
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-2: two opens produce distinct ephemeral keys
def test_ac2_ephemeral_distinct_per_flight() -> None:
conn1 = _ConnStub()
a1 = _build_adapter(conn1, signing_key_source="ephemeral_per_flight")
a1.open(_port(), signing_key=None)
a1.close()
conn2 = _ConnStub()
a2 = _build_adapter(conn2, signing_key_source="ephemeral_per_flight")
a2.open(_port(), signing_key=None)
a2.close()
assert conn1.setup_signing_calls[0] != conn2.setup_signing_calls[0]
# ----------------------------------------------------------------------
# AC-3: handshake failure raises SigningHandshakeError
def test_ac3_handshake_failure_raises(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub(fail_signing=True)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
with caplog.at_level(logging.ERROR), pytest.raises(SigningHandshakeError):
adapter.open(_port(), signing_key=None)
assert any(
getattr(rec, "kind", None) == "c8.ap.signing_handshake_failed" for rec in caplog.records
)
# ----------------------------------------------------------------------
# AC-4: handshake success FDR record has NO key bytes
def test_ac4_handshake_success_fdr_has_no_key_bytes() -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
key = bytes(conn.setup_signing_calls[0])
rotated_calls = [
call
for call in adapter._fdr_client.enqueue.mock_calls # type: ignore[attr-defined]
if call.args[0].payload.get("kind") == "c8.ap.signing_key_rotated"
]
assert len(rotated_calls) == 1
rec_payload = rotated_calls[0].args[0].payload
# AC-4: assert NO key byte sub-sequence appears in the FDR payload.
rendered = str(rec_payload)
assert key.hex() not in rendered
for i in range(0, len(key) - 4):
chunk = key[i : i + 4].hex()
assert chunk not in rendered, f"4-byte chunk leak at offset {i}"
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-5: key never in any log line
def test_ac5_key_never_in_logs(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
with caplog.at_level(logging.DEBUG):
adapter.open(_port(), signing_key=None)
for i in range(5):
adapter.emit_external_position(_make_output(frame_id=i))
adapter.close()
key = bytes(conn.setup_signing_calls[0])
rendered_logs = "\n".join(rec.getMessage() for rec in caplog.records)
assert key.hex() not in rendered_logs
pattern = re.compile(re.escape(key[:4].hex()))
assert pattern.search(rendered_logs) is None
# ----------------------------------------------------------------------
# AC-6: mid-flight signing failure does NOT raise
def test_ac6_mid_flight_signing_failure_no_raise(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub(signing_failure_count=5)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
with caplog.at_level(logging.ERROR):
adapter.emit_external_position(_make_output())
adapter.emit_external_position(_make_output(frame_id=2))
assert any(getattr(rec, "kind", None) == "c8.ap.signing_failure" for rec in caplog.records)
assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls)
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-7: key zeroisation on close
def test_ac7_key_zeroisation_on_close(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
# Grab a direct reference to the bytearray buffer so we can inspect
# it after close() — bytearray is mutable so the zeroisation in
# _zeroise_signing_key wipes this same buffer in place.
key_buf = adapter._signing_key
assert key_buf is not None
assert any(b != 0 for b in key_buf) # non-zero before close
with caplog.at_level(logging.INFO):
adapter.close()
assert all(b == 0 for b in key_buf)
assert any(getattr(rec, "kind", None) == "c8.ap.signing_key_zeroised" for rec in caplog.records)
# ----------------------------------------------------------------------
# AC-8: BUILD_DEV_STATIC_KEY=ON enables static repeatable key path
def test_ac8_dev_static_key_repeatable(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_DEV_STATIC_KEY", "ON")
conn1 = _ConnStub()
a1 = _build_adapter(conn1, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
a1.open(_port(), signing_key=None)
a1.close()
conn2 = _ConnStub()
a2 = _build_adapter(conn2, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
a2.open(_port(), signing_key=None)
a2.close()
assert conn1.setup_signing_calls[0] == conn2.setup_signing_calls[0]
assert conn1.setup_signing_calls[0].hex() == _DEV_STATIC_KEY
def test_ac8_dev_static_key_blocked_without_build_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("BUILD_DEV_STATIC_KEY", raising=False)
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
with pytest.raises(FcOpenError, match="BUILD_DEV_STATIC_KEY"):
adapter.open(_port(), signing_key=None)
# ----------------------------------------------------------------------
# AC-9 (indirect): mid-flight signing failure STATUSTEXT severity = WARNING.
# AC-3 handshake-failure raises and logs ERROR; the STATUSTEXT severity is
# not exercised because handshake-failure aborts open() before any emit.
def test_ac9_statustext_severity_on_mid_flight_failure() -> None:
conn = _ConnStub(signing_failure_count=5)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
adapter.emit_external_position(_make_output())
# First STATUSTEXT is the signing-failure WARNING; subsequent
# provenance STATUSTEXT may follow at INFO. Assert at least one
# WARNING was emitted.
assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls)
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-10: AZ-393 placeholder tightened — `signing_key_source="none"` is the
# only path that bypasses signing; `"ephemeral_per_flight"` rejects no key
# via an internally generated one (and AC-1 verified that). The matching
# tightening surface is that an unknown source is rejected.
def test_ac10_unknown_signing_source_rejected() -> None:
# FcConfig enforces the allowed-source list at config-load time, so
# an explicit "explicit" source is rejected before the adapter sees
# it. We exercise the FcOpenError path by patching the validated
# config in place (test seam).
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="none")
# Mutate the validated config in place to simulate an unknown source
# slipping past validation (defence-in-depth) — adapter must refuse.
object.__setattr__(adapter._config.fc, "signing_key_source", "bogus") # type: ignore[arg-type]
with pytest.raises(FcOpenError, match="unknown signing_key_source"):
adapter.open(_port(), signing_key=None)