Files
gps-denied-onboard/tests/unit/c8_fc_adapter/test_az394_inav_outbound.py
T
Oleksandr Bezdieniezhnykh 1e0be08e8a [AZ-393] [AZ-394] [AZ-395] C8 outbound chain + AP MAVLink2 signing
AZ-393 ArduPilot outbound: PymavlinkArdupilotAdapter encodes
EstimatorOutput to MAVLink2 GPS_INPUT via gps_input_send; emits
NAMED_VALUE_FLOAT(name="src_lbl") every frame and STATUSTEXT on
source_label transition (1 Hz per-severity cap). Smoothed-output
guard (Invariant 6), single-writer thread (Invariant 8), SPD
propagation. Shared helper _outbound_provenance.py owns the
canonical source-label-to-float table + transition rate-limiter.

AZ-394 iNav outbound: Msp2InavAdapter encodes EstimatorOutput to
hand-rolled MSP2_SENSOR_GPS (0x1F03, 52-byte LE payload via
_msp2_sensor_gps_encoder.py + YAMSPy send_RAW_msg). Secondary
unsigned MAVLink channel for STATUSTEXT transitions. open()
rejects non-None signing_key (RESTRICT-COMM-2 / Invariant 2);
request_source_set_switch raises SourceSetSwitchNotSupportedError
(Invariant 9 verified: never calls setup_signing on secondary).

AZ-395 AP MAVLink2 signing: ephemeral per-flight 32-byte key
from secrets.token_bytes; pymavlink setup_signing handshake at
open(); in-place bytearray zeroisation on close(); mid-flight
signing-failure detection (ERROR log + WARNING STATUSTEXT + no
raise; threshold configurable). Key never logged / persisted /
serialised (regex-scanned by AC-4/AC-5). BUILD_DEV_STATIC_KEY=ON
enables repeatable static-key dev path; rejected at open() when
the build flag is absent.

Shared: EstimatorOutput.smoothed (default False) added for the
Invariant 6 gate at the C8 boundary; FcConfig extended with
dev_static_signing_key + signing_failure_threshold (additive
defaults; cross-field validation in __post_init__).

Tests: 33 new AC tests (11 + 11 + 11) covering all 30 ACs; full
suite 476 passing / 2 skipped / 0 failing (was 443). Contract
surfaces unchanged at fc_adapter_protocol v1.0.0 and
composition_root v1.2.0.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 04:47:44 +03:00

302 lines
9.5 KiB
Python

"""AZ-394 — Msp2InavAdapter outbound AC tests."""
from __future__ import annotations
import logging
import threading
from datetime import datetime, timezone
from typing import Any
from unittest import mock
import numpy as np
import pytest
from gps_denied_onboard._types.fc import FcKind, PortConfig
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.pose import EstimatorOutput
from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import (
CovarianceProjector,
)
from gps_denied_onboard.components.c8_fc_adapter._msp2_sensor_gps_encoder import (
MSP2_SENSOR_GPS_CODE,
decode_msp2_sensor_gps,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcAdapterConfigError,
FcEmitError,
SourceSetSwitchNotSupportedError,
)
from gps_denied_onboard.components.c8_fc_adapter.msp2_inav_adapter import (
Msp2InavAdapter,
)
from gps_denied_onboard.config import load_config
# ----------------------------------------------------------------------
# Helpers — MSP / secondary-MAVLink stand-ins
class _MspStub:
def __init__(self) -> None:
self.raw_msgs: list[tuple[int, bytes]] = []
self.closed = False
def send_RAW_msg(self, code: int, data: bytes) -> None:
self.raw_msgs.append((int(code), bytes(data)))
def close(self) -> None:
self.closed = True
class _SecondaryMavStub:
def __init__(self) -> None:
self.statustext_calls: list[tuple[int, bytes]] = []
self.closed = False
# Mirror pymavlink connection.mav shape.
self.mav = self
# Track signing-key state per RESTRICT-COMM-2 (Invariant 9):
# the unit-test adapter MUST never call setup_signing on us.
self.setup_signing_calls: list[Any] = []
def statustext_send(self, severity: int, text: bytes) -> None:
self.statustext_calls.append((int(severity), bytes(text)))
def setup_signing(self, key: Any) -> None:
self.setup_signing_calls.append(key)
def close(self) -> None:
self.closed = True
@pytest.fixture
def msp() -> _MspStub:
return _MspStub()
@pytest.fixture
def secondary() -> _SecondaryMavStub:
return _SecondaryMavStub()
def _inav_config(tmp_path) -> Any:
env = {
"FC_ADAPTER": "inav",
"FC_PORT_DEVICE": "/dev/null",
"FC_PORT_BAUD": "115200",
"FC_SIGNING_KEY_SOURCE": "none",
}
return load_config(env=env, paths=(), require_env=False)
def _make_output(
*,
source_label: str = "visual_propagated",
smoothed: bool = False,
cov: np.ndarray | None = None,
wgs: LatLonAlt | None = None,
frame_id: int = 1,
) -> EstimatorOutput:
if cov is None:
cov = np.eye(6, dtype=np.float64) * 0.25
if wgs is None:
wgs = LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0)
return EstimatorOutput(
frame_id=frame_id,
timestamp=datetime.now(tz=timezone.utc),
pose_se3=np.eye(4),
covariance_6x6=cov,
source_label=source_label,
smoothed=smoothed,
extras={"wgs84": wgs},
)
@pytest.fixture
def adapter(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> Msp2InavAdapter:
cfg = _inav_config(tmp_path)
fdr = mock.MagicMock()
a = Msp2InavAdapter(
config=cfg,
wgs_converter=mock.MagicMock(),
covariance_projector=CovarianceProjector(fdr_client=fdr),
fdr_client=fdr,
msp_connect_factory=lambda device, baud: msp,
secondary_mavlink_factory=lambda: secondary,
)
port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200)
a.open(port, signing_key=None)
yield a
a.close()
# ----------------------------------------------------------------------
# AC-1: field fidelity
def test_ac1_msp2_field_fidelity(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
# Arrange
cov = np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64)
wgs = LatLonAlt(lat_deg=50.4501, lon_deg=30.5234, alt_m=180.0)
output = _make_output(cov=cov, wgs=wgs)
expected_mm = CovarianceProjector(fdr_client=mock.MagicMock()).to_inav_h_pos_accuracy_mm(output)
# Act
result = adapter.emit_external_position(output)
# Assert
assert len(msp.raw_msgs) == 1
code, payload = msp.raw_msgs[0]
assert code == MSP2_SENSOR_GPS_CODE
decoded = decode_msp2_sensor_gps(payload)
assert decoded.fix_type == 3
assert decoded.latitude_e7 == int(wgs.lat_deg * 1e7)
assert decoded.longitude_e7 == int(wgs.lon_deg * 1e7)
assert decoded.msl_altitude_cm == int(wgs.alt_m * 100.0)
assert decoded.h_pos_accuracy_mm == expected_mm
assert result.fc_kind is FcKind.INAV
assert result.horiz_accuracy_m == pytest.approx(expected_mm / 1000.0)
# ----------------------------------------------------------------------
# AC-2: every frame, monotonic seq
def test_ac2_msp2_every_frame_with_seq(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
seqs = []
for i in range(100):
result = adapter.emit_external_position(_make_output(frame_id=i))
seqs.append(result.sequence_number)
assert len(msp.raw_msgs) == 100
# AC-2: monotonically incrementing (modulo uint8 wrap)
assert seqs[0] == 1
assert seqs[-1] == 100 & 0xFF
# ----------------------------------------------------------------------
# AC-3: STATUSTEXT on secondary channel only, on transitions
def test_ac3_statustext_secondary_only_on_transitions(
adapter: Msp2InavAdapter, msp: _MspStub, secondary: _SecondaryMavStub
) -> None:
adapter._provenance._min_interval_s = 0.0 # type: ignore[attr-defined]
labels = ["visual_propagated", "sat_anchored"]
for i in range(100):
label = labels[(i // 10) % 2]
adapter.emit_external_position(_make_output(source_label=label, frame_id=i))
# 10 transitions including the initial None->visual_propagated
assert len(secondary.statustext_calls) == 10
# AC-3: NEVER on primary MSP2 channel
for code, _ in msp.raw_msgs:
assert code == MSP2_SENSOR_GPS_CODE
# ----------------------------------------------------------------------
# AC-4: signing-key rejection
def test_ac4_signing_key_rejected(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> None:
cfg = _inav_config(tmp_path)
fdr = mock.MagicMock()
a = Msp2InavAdapter(
config=cfg,
wgs_converter=mock.MagicMock(),
covariance_projector=CovarianceProjector(fdr_client=fdr),
fdr_client=fdr,
msp_connect_factory=lambda device, baud: msp,
secondary_mavlink_factory=lambda: secondary,
)
port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200)
with pytest.raises(FcAdapterConfigError, match="iNav does not support MAVLink signing"):
a.open(port, signing_key=b"\x00" * 32)
# ----------------------------------------------------------------------
# AC-5: signing-asymmetry — adapter never calls setup_signing on secondary
def test_ac5_signing_asymmetry_no_signed_flag(
adapter: Msp2InavAdapter, secondary: _SecondaryMavStub
) -> None:
for i in range(50):
adapter.emit_external_position(_make_output(frame_id=i))
assert secondary.setup_signing_calls == []
# ----------------------------------------------------------------------
# AC-6: source-set-switch unsupported
def test_ac6_source_set_switch_unsupported(adapter: Msp2InavAdapter) -> None:
with pytest.raises(SourceSetSwitchNotSupportedError, match="iNav"):
adapter.request_source_set_switch()
# ----------------------------------------------------------------------
# AC-7: smoothed rejected
def test_ac7_smoothed_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
with pytest.raises(FcEmitError, match="smoothed"):
adapter.emit_external_position(_make_output(smoothed=True))
assert msp.raw_msgs == []
# ----------------------------------------------------------------------
# AC-8: non-SPD cov rejected
def test_ac8_non_spd_covariance_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
bad = np.eye(6, dtype=np.float64)
bad[0, 0] = -1.0
with pytest.raises(FcEmitError):
adapter.emit_external_position(_make_output(cov=bad))
assert msp.raw_msgs == []
# ----------------------------------------------------------------------
# AC-9: single-writer thread
def test_ac9_single_writer_thread(adapter: Msp2InavAdapter) -> None:
adapter.emit_external_position(_make_output())
err: list[BaseException] = []
def run() -> None:
try:
adapter.emit_external_position(_make_output(frame_id=2))
except RuntimeError as e:
err.append(e)
t = threading.Thread(target=run)
t.start()
t.join(timeout=2.0)
assert len(err) == 1
assert "single-writer" in str(err[0]).lower()
# ----------------------------------------------------------------------
# AC-10: first emit logged once
def test_ac10_first_emit_logged_once(
adapter: Msp2InavAdapter, caplog: pytest.LogCaptureFixture
) -> None:
with caplog.at_level(logging.INFO):
for i in range(5):
adapter.emit_external_position(_make_output(frame_id=i))
first = [rec for rec in caplog.records if getattr(rec, "kind", None) == "c8.inav.first_emit"]
assert len(first) == 1
# ----------------------------------------------------------------------
# Invariant 2 cross-check: iNav config rejects ephemeral signing
def test_inav_config_rejects_signing(tmp_path) -> None:
env = {
"FC_ADAPTER": "inav",
"FC_SIGNING_KEY_SOURCE": "ephemeral_per_flight",
}
with pytest.raises(Exception, match="adapter='inav'"):
load_config(env=env, paths=(), require_env=False)