Files
gps-denied-onboard/tests/unit/c8_fc_adapter/test_az395_mavlink_signing.py
T
Oleksandr Bezdieniezhnykh beed43724f [AZ-381] C5 StateEstimator protocol + factory + C8 DTO reshape
- Add StateEstimator Protocol (6 methods, @runtime_checkable) + DTOs
  (EstimatorOutput, EstimatorHealth, IsamState, PoseSourceLabel, Quat)
  in _types/state.py per state_estimator_protocol.md v1.0.0.
- Add C5 error hierarchy (StateEstimatorError + 3 subclasses) and
  C5StateConfig (strategy, keyframe_window, spoof gates,
  no_estimate_fallback_s) with __post_init__ validation.
- Add ISam2GraphHandle Protocol + ISam2GraphHandleImpl skeleton (all
  4 methods raise NotImplementedError naming AZ-382 as owner).
- Add build_state_estimator factory + bind_state_ingest_thread for
  single-writer enforcement; ADR-002 build-flag gating
  (BUILD_STATE_<variant>); INFO log on success.
- Strict reshape of legacy EstimatorOutput / EstimatorHealth across
  all 6 C8 production files (_outbound_provenance,
  _covariance_projector, pymavlink_ardupilot_adapter,
  msp2_inav_adapter, mavlink_gcs_adapter, interface) + 6 C8 test
  files (UUID frame_id, LatLonAlt position_wgs84, Quat orientation,
  PoseSourceLabel enum source_label). Remove ad-hoc DTOs from
  _types/pose.py and from C4's public __init__ (EstimatorOutput is a
  C5 concept, not a C4 one).
- 20 AZ-381 AC tests (10 ACs + 4 config range + NFR + conformance).
- Full suite: 521 passed, 2 skipped (+20 vs Batch 11).
- Contracts: state_estimator_protocol.md v1.0.0 -> active;
  composition_root_protocol.md v1.2.0 -> v1.3.0 (additive state
  block + factory + ingest-thread binding).
- Impl report: _docs/03_implementation/batch_12_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 05:35:20 +03:00

315 lines
12 KiB
Python

"""AZ-395 — AP MAVLink 2.0 per-flight signing AC tests."""
from __future__ import annotations
import logging
import re
from types import SimpleNamespace
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.fc import FcKind, PortConfig, Severity
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import (
EstimatorOutput,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import (
CovarianceProjector,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcOpenError,
SigningHandshakeError,
)
from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter import (
PymavlinkArdupilotAdapter,
)
from gps_denied_onboard.config import load_config
_DEV_STATIC_KEY = "00112233445566778899aabbccddeeff" * 2 # 64 hex chars = 32 bytes
class _MavStub:
def __init__(self, signing_failure_count: int = 0) -> None:
self.gps_input_calls: list[tuple[Any, ...]] = []
self.named_value_float_calls: list[tuple[Any, ...]] = []
self.statustext_calls: list[tuple[int, bytes]] = []
# pymavlink exposes `connection.mav.signing.sig_count` after
# setup_signing(...); we simulate that surface here.
self.signing = SimpleNamespace(sig_count=signing_failure_count)
def gps_input_send(self, *args: Any) -> None:
self.gps_input_calls.append(args)
def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None:
self.named_value_float_calls.append((time_boot_ms, name, value))
def statustext_send(self, severity: int, text: bytes) -> None:
self.statustext_calls.append((severity, text))
class _ConnStub:
def __init__(self, *, fail_signing: bool = False, signing_failure_count: int = 0) -> None:
self.mav = _MavStub(signing_failure_count=signing_failure_count)
self.setup_signing_calls: list[bytes] = []
self._fail_signing = fail_signing
self.closed = False
def setup_signing(self, key: bytes) -> None:
if self._fail_signing:
raise RuntimeError("simulated signing handshake refusal")
self.setup_signing_calls.append(bytes(key))
def close(self) -> None:
self.closed = True
def _ap_config(*, signing_key_source: str, dev_static_key: str = "") -> Any:
env: dict[str, str] = {
"FC_ADAPTER": "ardupilot_plane",
"FC_PORT_DEVICE": "/dev/null",
"FC_PORT_BAUD": "921600",
"FC_SIGNING_KEY_SOURCE": signing_key_source,
}
if dev_static_key:
env["FC_DEV_STATIC_SIGNING_KEY"] = dev_static_key
return load_config(env=env, paths=(), require_env=False)
def _build_adapter(
conn: _ConnStub, *, signing_key_source: str, dev_static_key: str = ""
) -> PymavlinkArdupilotAdapter:
cfg = _ap_config(signing_key_source=signing_key_source, dev_static_key=dev_static_key)
fdr = mock.MagicMock()
return PymavlinkArdupilotAdapter(
config=cfg,
wgs_converter=mock.MagicMock(),
covariance_projector=CovarianceProjector(fdr_client=fdr),
fdr_client=fdr,
flight_id="flt-az395",
connect_factory=lambda device, baud: conn,
)
def _port() -> PortConfig:
return PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600)
def _make_output(frame_id: UUID | int | None = None) -> EstimatorOutput:
if frame_id is None:
frame_id = uuid4()
elif isinstance(frame_id, int):
frame_id = UUID(int=frame_id)
return EstimatorOutput(
frame_id=frame_id,
position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
velocity_world_mps=(0.0, 0.0, 0.0),
covariance_6x6=np.eye(6, dtype=np.float64) * 0.25,
source_label=PoseSourceLabel.VISUAL_PROPAGATED,
last_satellite_anchor_age_ms=0,
smoothed=False,
emitted_at=0,
)
# ----------------------------------------------------------------------
# AC-1 / AC-10: signing_key=None with ephemeral source generates the key
def test_ac1_ephemeral_generates_key_when_none_passed() -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
assert len(conn.setup_signing_calls) == 1
assert len(conn.setup_signing_calls[0]) == 32
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-2: two opens produce distinct ephemeral keys
def test_ac2_ephemeral_distinct_per_flight() -> None:
conn1 = _ConnStub()
a1 = _build_adapter(conn1, signing_key_source="ephemeral_per_flight")
a1.open(_port(), signing_key=None)
a1.close()
conn2 = _ConnStub()
a2 = _build_adapter(conn2, signing_key_source="ephemeral_per_flight")
a2.open(_port(), signing_key=None)
a2.close()
assert conn1.setup_signing_calls[0] != conn2.setup_signing_calls[0]
# ----------------------------------------------------------------------
# AC-3: handshake failure raises SigningHandshakeError
def test_ac3_handshake_failure_raises(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub(fail_signing=True)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
with caplog.at_level(logging.ERROR), pytest.raises(SigningHandshakeError):
adapter.open(_port(), signing_key=None)
assert any(
getattr(rec, "kind", None) == "c8.ap.signing_handshake_failed" for rec in caplog.records
)
# ----------------------------------------------------------------------
# AC-4: handshake success FDR record has NO key bytes
def test_ac4_handshake_success_fdr_has_no_key_bytes() -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
key = bytes(conn.setup_signing_calls[0])
rotated_calls = [
call
for call in adapter._fdr_client.enqueue.mock_calls # type: ignore[attr-defined]
if call.args[0].payload.get("kind") == "c8.ap.signing_key_rotated"
]
assert len(rotated_calls) == 1
rec_payload = rotated_calls[0].args[0].payload
# AC-4: assert NO key byte sub-sequence appears in the FDR payload.
rendered = str(rec_payload)
assert key.hex() not in rendered
for i in range(0, len(key) - 4):
chunk = key[i : i + 4].hex()
assert chunk not in rendered, f"4-byte chunk leak at offset {i}"
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-5: key never in any log line
def test_ac5_key_never_in_logs(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
with caplog.at_level(logging.DEBUG):
adapter.open(_port(), signing_key=None)
for i in range(5):
adapter.emit_external_position(_make_output(frame_id=i))
adapter.close()
key = bytes(conn.setup_signing_calls[0])
rendered_logs = "\n".join(rec.getMessage() for rec in caplog.records)
assert key.hex() not in rendered_logs
pattern = re.compile(re.escape(key[:4].hex()))
assert pattern.search(rendered_logs) is None
# ----------------------------------------------------------------------
# AC-6: mid-flight signing failure does NOT raise
def test_ac6_mid_flight_signing_failure_no_raise(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub(signing_failure_count=5)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
with caplog.at_level(logging.ERROR):
adapter.emit_external_position(_make_output())
adapter.emit_external_position(_make_output(frame_id=2))
assert any(getattr(rec, "kind", None) == "c8.ap.signing_failure" for rec in caplog.records)
assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls)
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-7: key zeroisation on close
def test_ac7_key_zeroisation_on_close(caplog: pytest.LogCaptureFixture) -> None:
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
# Grab a direct reference to the bytearray buffer so we can inspect
# it after close() — bytearray is mutable so the zeroisation in
# _zeroise_signing_key wipes this same buffer in place.
key_buf = adapter._signing_key
assert key_buf is not None
assert any(b != 0 for b in key_buf) # non-zero before close
with caplog.at_level(logging.INFO):
adapter.close()
assert all(b == 0 for b in key_buf)
assert any(getattr(rec, "kind", None) == "c8.ap.signing_key_zeroised" for rec in caplog.records)
# ----------------------------------------------------------------------
# AC-8: BUILD_DEV_STATIC_KEY=ON enables static repeatable key path
def test_ac8_dev_static_key_repeatable(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_DEV_STATIC_KEY", "ON")
conn1 = _ConnStub()
a1 = _build_adapter(conn1, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
a1.open(_port(), signing_key=None)
a1.close()
conn2 = _ConnStub()
a2 = _build_adapter(conn2, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
a2.open(_port(), signing_key=None)
a2.close()
assert conn1.setup_signing_calls[0] == conn2.setup_signing_calls[0]
assert conn1.setup_signing_calls[0].hex() == _DEV_STATIC_KEY
def test_ac8_dev_static_key_blocked_without_build_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("BUILD_DEV_STATIC_KEY", raising=False)
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY)
with pytest.raises(FcOpenError, match="BUILD_DEV_STATIC_KEY"):
adapter.open(_port(), signing_key=None)
# ----------------------------------------------------------------------
# AC-9 (indirect): mid-flight signing failure STATUSTEXT severity = WARNING.
# AC-3 handshake-failure raises and logs ERROR; the STATUSTEXT severity is
# not exercised because handshake-failure aborts open() before any emit.
def test_ac9_statustext_severity_on_mid_flight_failure() -> None:
conn = _ConnStub(signing_failure_count=5)
adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight")
adapter.open(_port(), signing_key=None)
try:
adapter.emit_external_position(_make_output())
# First STATUSTEXT is the signing-failure WARNING; subsequent
# provenance STATUSTEXT may follow at INFO. Assert at least one
# WARNING was emitted.
assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls)
finally:
adapter.close()
# ----------------------------------------------------------------------
# AC-10: AZ-393 placeholder tightened — `signing_key_source="none"` is the
# only path that bypasses signing; `"ephemeral_per_flight"` rejects no key
# via an internally generated one (and AC-1 verified that). The matching
# tightening surface is that an unknown source is rejected.
def test_ac10_unknown_signing_source_rejected() -> None:
# FcConfig enforces the allowed-source list at config-load time, so
# an explicit "explicit" source is rejected before the adapter sees
# it. We exercise the FcOpenError path by patching the validated
# config in place (test seam).
conn = _ConnStub()
adapter = _build_adapter(conn, signing_key_source="none")
# Mutate the validated config in place to simulate an unknown source
# slipping past validation (defence-in-depth) — adapter must refuse.
object.__setattr__(adapter._config.fc, "signing_key_source", "bogus") # type: ignore[arg-type]
with pytest.raises(FcOpenError, match="unknown signing_key_source"):
adapter.open(_port(), signing_key=None)