"""AZ-395 — AP MAVLink 2.0 per-flight signing AC tests.""" from __future__ import annotations import logging import re from datetime import datetime, timezone from types import SimpleNamespace from typing import Any from unittest import mock import numpy as np import pytest from gps_denied_onboard._types.fc import FcKind, PortConfig, Severity from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.pose import EstimatorOutput from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcOpenError, SigningHandshakeError, ) from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter import ( PymavlinkArdupilotAdapter, ) from gps_denied_onboard.config import load_config _DEV_STATIC_KEY = "00112233445566778899aabbccddeeff" * 2 # 64 hex chars = 32 bytes class _MavStub: def __init__(self, signing_failure_count: int = 0) -> None: self.gps_input_calls: list[tuple[Any, ...]] = [] self.named_value_float_calls: list[tuple[Any, ...]] = [] self.statustext_calls: list[tuple[int, bytes]] = [] # pymavlink exposes `connection.mav.signing.sig_count` after # setup_signing(...); we simulate that surface here. self.signing = SimpleNamespace(sig_count=signing_failure_count) def gps_input_send(self, *args: Any) -> None: self.gps_input_calls.append(args) def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: self.named_value_float_calls.append((time_boot_ms, name, value)) def statustext_send(self, severity: int, text: bytes) -> None: self.statustext_calls.append((severity, text)) class _ConnStub: def __init__(self, *, fail_signing: bool = False, signing_failure_count: int = 0) -> None: self.mav = _MavStub(signing_failure_count=signing_failure_count) self.setup_signing_calls: list[bytes] = [] self._fail_signing = fail_signing self.closed = False def setup_signing(self, key: bytes) -> None: if self._fail_signing: raise RuntimeError("simulated signing handshake refusal") self.setup_signing_calls.append(bytes(key)) def close(self) -> None: self.closed = True def _ap_config(*, signing_key_source: str, dev_static_key: str = "") -> Any: env: dict[str, str] = { "FC_ADAPTER": "ardupilot_plane", "FC_PORT_DEVICE": "/dev/null", "FC_PORT_BAUD": "921600", "FC_SIGNING_KEY_SOURCE": signing_key_source, } if dev_static_key: env["FC_DEV_STATIC_SIGNING_KEY"] = dev_static_key return load_config(env=env, paths=(), require_env=False) def _build_adapter( conn: _ConnStub, *, signing_key_source: str, dev_static_key: str = "" ) -> PymavlinkArdupilotAdapter: cfg = _ap_config(signing_key_source=signing_key_source, dev_static_key=dev_static_key) fdr = mock.MagicMock() return PymavlinkArdupilotAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, flight_id="flt-az395", connect_factory=lambda device, baud: conn, ) def _port() -> PortConfig: return PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600) def _make_output(frame_id: int = 1) -> EstimatorOutput: return EstimatorOutput( frame_id=frame_id, timestamp=datetime.now(tz=timezone.utc), pose_se3=np.eye(4), covariance_6x6=np.eye(6, dtype=np.float64) * 0.25, source_label="visual_propagated", smoothed=False, extras={"wgs84": LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0)}, ) # ---------------------------------------------------------------------- # AC-1 / AC-10: signing_key=None with ephemeral source generates the key def test_ac1_ephemeral_generates_key_when_none_passed() -> None: conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") adapter.open(_port(), signing_key=None) try: assert len(conn.setup_signing_calls) == 1 assert len(conn.setup_signing_calls[0]) == 32 finally: adapter.close() # ---------------------------------------------------------------------- # AC-2: two opens produce distinct ephemeral keys def test_ac2_ephemeral_distinct_per_flight() -> None: conn1 = _ConnStub() a1 = _build_adapter(conn1, signing_key_source="ephemeral_per_flight") a1.open(_port(), signing_key=None) a1.close() conn2 = _ConnStub() a2 = _build_adapter(conn2, signing_key_source="ephemeral_per_flight") a2.open(_port(), signing_key=None) a2.close() assert conn1.setup_signing_calls[0] != conn2.setup_signing_calls[0] # ---------------------------------------------------------------------- # AC-3: handshake failure raises SigningHandshakeError def test_ac3_handshake_failure_raises(caplog: pytest.LogCaptureFixture) -> None: conn = _ConnStub(fail_signing=True) adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") with caplog.at_level(logging.ERROR), pytest.raises(SigningHandshakeError): adapter.open(_port(), signing_key=None) assert any( getattr(rec, "kind", None) == "c8.ap.signing_handshake_failed" for rec in caplog.records ) # ---------------------------------------------------------------------- # AC-4: handshake success FDR record has NO key bytes def test_ac4_handshake_success_fdr_has_no_key_bytes() -> None: conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") adapter.open(_port(), signing_key=None) try: key = bytes(conn.setup_signing_calls[0]) rotated_calls = [ call for call in adapter._fdr_client.enqueue.mock_calls # type: ignore[attr-defined] if call.args[0].payload.get("kind") == "c8.ap.signing_key_rotated" ] assert len(rotated_calls) == 1 rec_payload = rotated_calls[0].args[0].payload # AC-4: assert NO key byte sub-sequence appears in the FDR payload. rendered = str(rec_payload) assert key.hex() not in rendered for i in range(0, len(key) - 4): chunk = key[i : i + 4].hex() assert chunk not in rendered, f"4-byte chunk leak at offset {i}" finally: adapter.close() # ---------------------------------------------------------------------- # AC-5: key never in any log line def test_ac5_key_never_in_logs(caplog: pytest.LogCaptureFixture) -> None: conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") with caplog.at_level(logging.DEBUG): adapter.open(_port(), signing_key=None) for i in range(5): adapter.emit_external_position(_make_output(frame_id=i)) adapter.close() key = bytes(conn.setup_signing_calls[0]) rendered_logs = "\n".join(rec.getMessage() for rec in caplog.records) assert key.hex() not in rendered_logs pattern = re.compile(re.escape(key[:4].hex())) assert pattern.search(rendered_logs) is None # ---------------------------------------------------------------------- # AC-6: mid-flight signing failure does NOT raise def test_ac6_mid_flight_signing_failure_no_raise(caplog: pytest.LogCaptureFixture) -> None: conn = _ConnStub(signing_failure_count=5) adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") adapter.open(_port(), signing_key=None) try: with caplog.at_level(logging.ERROR): adapter.emit_external_position(_make_output()) adapter.emit_external_position(_make_output(frame_id=2)) assert any(getattr(rec, "kind", None) == "c8.ap.signing_failure" for rec in caplog.records) assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls) finally: adapter.close() # ---------------------------------------------------------------------- # AC-7: key zeroisation on close def test_ac7_key_zeroisation_on_close(caplog: pytest.LogCaptureFixture) -> None: conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") adapter.open(_port(), signing_key=None) # Grab a direct reference to the bytearray buffer so we can inspect # it after close() — bytearray is mutable so the zeroisation in # _zeroise_signing_key wipes this same buffer in place. key_buf = adapter._signing_key assert key_buf is not None assert any(b != 0 for b in key_buf) # non-zero before close with caplog.at_level(logging.INFO): adapter.close() assert all(b == 0 for b in key_buf) assert any(getattr(rec, "kind", None) == "c8.ap.signing_key_zeroised" for rec in caplog.records) # ---------------------------------------------------------------------- # AC-8: BUILD_DEV_STATIC_KEY=ON enables static repeatable key path def test_ac8_dev_static_key_repeatable(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("BUILD_DEV_STATIC_KEY", "ON") conn1 = _ConnStub() a1 = _build_adapter(conn1, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY) a1.open(_port(), signing_key=None) a1.close() conn2 = _ConnStub() a2 = _build_adapter(conn2, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY) a2.open(_port(), signing_key=None) a2.close() assert conn1.setup_signing_calls[0] == conn2.setup_signing_calls[0] assert conn1.setup_signing_calls[0].hex() == _DEV_STATIC_KEY def test_ac8_dev_static_key_blocked_without_build_flag(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("BUILD_DEV_STATIC_KEY", raising=False) conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="dev_static", dev_static_key=_DEV_STATIC_KEY) with pytest.raises(FcOpenError, match="BUILD_DEV_STATIC_KEY"): adapter.open(_port(), signing_key=None) # ---------------------------------------------------------------------- # AC-9 (indirect): mid-flight signing failure STATUSTEXT severity = WARNING. # AC-3 handshake-failure raises and logs ERROR; the STATUSTEXT severity is # not exercised because handshake-failure aborts open() before any emit. def test_ac9_statustext_severity_on_mid_flight_failure() -> None: conn = _ConnStub(signing_failure_count=5) adapter = _build_adapter(conn, signing_key_source="ephemeral_per_flight") adapter.open(_port(), signing_key=None) try: adapter.emit_external_position(_make_output()) # First STATUSTEXT is the signing-failure WARNING; subsequent # provenance STATUSTEXT may follow at INFO. Assert at least one # WARNING was emitted. assert any(sev == int(Severity.WARNING.value) for sev, _ in conn.mav.statustext_calls) finally: adapter.close() # ---------------------------------------------------------------------- # AC-10: AZ-393 placeholder tightened — `signing_key_source="none"` is the # only path that bypasses signing; `"ephemeral_per_flight"` rejects no key # via an internally generated one (and AC-1 verified that). The matching # tightening surface is that an unknown source is rejected. def test_ac10_unknown_signing_source_rejected() -> None: # FcConfig enforces the allowed-source list at config-load time, so # an explicit "explicit" source is rejected before the adapter sees # it. We exercise the FcOpenError path by patching the validated # config in place (test seam). conn = _ConnStub() adapter = _build_adapter(conn, signing_key_source="none") # Mutate the validated config in place to simulate an unknown source # slipping past validation (defence-in-depth) — adapter must refuse. object.__setattr__(adapter._config.fc, "signing_key_source", "bogus") # type: ignore[arg-type] with pytest.raises(FcOpenError, match="unknown signing_key_source"): adapter.open(_port(), signing_key=None)