"""AZ-394 — Msp2InavAdapter outbound AC tests.""" from __future__ import annotations import logging import threading from typing import Any from unittest import mock from uuid import UUID, uuid4 import numpy as np import pytest from gps_denied_onboard._types.fc import FcKind, PortConfig from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import ( EstimatorOutput, PoseSourceLabel, Quat, ) from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) from gps_denied_onboard.components.c8_fc_adapter._msp2_sensor_gps_encoder import ( MSP2_SENSOR_GPS_CODE, decode_msp2_sensor_gps, ) from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, FcEmitError, SourceSetSwitchNotSupportedError, ) from gps_denied_onboard.components.c8_fc_adapter.msp2_inav_adapter import ( Msp2InavAdapter, ) from gps_denied_onboard.config import load_config # ---------------------------------------------------------------------- # Helpers — MSP / secondary-MAVLink stand-ins class _MspStub: def __init__(self) -> None: self.raw_msgs: list[tuple[int, bytes]] = [] self.closed = False def send_RAW_msg(self, code: int, data: bytes) -> None: self.raw_msgs.append((int(code), bytes(data))) def close(self) -> None: self.closed = True class _SecondaryMavStub: def __init__(self) -> None: self.statustext_calls: list[tuple[int, bytes]] = [] self.closed = False # Mirror pymavlink connection.mav shape. self.mav = self # Track signing-key state per RESTRICT-COMM-2 (Invariant 9): # the unit-test adapter MUST never call setup_signing on us. self.setup_signing_calls: list[Any] = [] def statustext_send(self, severity: int, text: bytes) -> None: self.statustext_calls.append((int(severity), bytes(text))) def setup_signing(self, key: Any) -> None: self.setup_signing_calls.append(key) def close(self) -> None: self.closed = True @pytest.fixture def msp() -> _MspStub: return _MspStub() @pytest.fixture def secondary() -> _SecondaryMavStub: return _SecondaryMavStub() def _inav_config(tmp_path) -> Any: env = { "FC_ADAPTER": "inav", "FC_PORT_DEVICE": "/dev/null", "FC_PORT_BAUD": "115200", "FC_SIGNING_KEY_SOURCE": "none", } return load_config(env=env, paths=(), require_env=False) def _make_output( *, source_label: PoseSourceLabel = PoseSourceLabel.VISUAL_PROPAGATED, smoothed: bool = False, cov: np.ndarray | None = None, wgs: LatLonAlt | None = None, frame_id: UUID | int | None = None, ) -> EstimatorOutput: if cov is None: cov = np.eye(6, dtype=np.float64) * 0.25 if wgs is None: wgs = LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0) if frame_id is None: frame_id = uuid4() elif isinstance(frame_id, int): frame_id = UUID(int=frame_id) return EstimatorOutput( frame_id=frame_id, position_wgs84=wgs, orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=cov, source_label=source_label, last_satellite_anchor_age_ms=0, smoothed=smoothed, emitted_at=0, ) @pytest.fixture def adapter(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> Msp2InavAdapter: cfg = _inav_config(tmp_path) fdr = mock.MagicMock() a = Msp2InavAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, msp_connect_factory=lambda device, baud: msp, secondary_mavlink_factory=lambda: secondary, ) port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200) a.open(port, signing_key=None) yield a a.close() # ---------------------------------------------------------------------- # AC-1: field fidelity def test_ac1_msp2_field_fidelity(adapter: Msp2InavAdapter, msp: _MspStub) -> None: # Arrange cov = np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64) wgs = LatLonAlt(lat_deg=50.4501, lon_deg=30.5234, alt_m=180.0) output = _make_output(cov=cov, wgs=wgs) expected_mm = CovarianceProjector(fdr_client=mock.MagicMock()).to_inav_h_pos_accuracy_mm(output) # Act result = adapter.emit_external_position(output) # Assert assert len(msp.raw_msgs) == 1 code, payload = msp.raw_msgs[0] assert code == MSP2_SENSOR_GPS_CODE decoded = decode_msp2_sensor_gps(payload) assert decoded.fix_type == 3 assert decoded.latitude_e7 == int(wgs.lat_deg * 1e7) assert decoded.longitude_e7 == int(wgs.lon_deg * 1e7) assert decoded.msl_altitude_cm == int(wgs.alt_m * 100.0) assert decoded.h_pos_accuracy_mm == expected_mm assert result.fc_kind is FcKind.INAV assert result.horiz_accuracy_m == pytest.approx(expected_mm / 1000.0) # ---------------------------------------------------------------------- # AC-2: every frame, monotonic seq def test_ac2_msp2_every_frame_with_seq(adapter: Msp2InavAdapter, msp: _MspStub) -> None: seqs = [] for i in range(100): result = adapter.emit_external_position(_make_output(frame_id=i)) seqs.append(result.sequence_number) assert len(msp.raw_msgs) == 100 # AC-2: monotonically incrementing (modulo uint8 wrap) assert seqs[0] == 1 assert seqs[-1] == 100 & 0xFF # ---------------------------------------------------------------------- # AC-3: STATUSTEXT on secondary channel only, on transitions def test_ac3_statustext_secondary_only_on_transitions( adapter: Msp2InavAdapter, msp: _MspStub, secondary: _SecondaryMavStub ) -> None: adapter._provenance._min_interval_s = 0.0 # type: ignore[attr-defined] labels = [PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.SATELLITE_ANCHORED] for i in range(100): label = labels[(i // 10) % 2] adapter.emit_external_position(_make_output(source_label=label, frame_id=i)) # 10 transitions including the initial None->visual_propagated assert len(secondary.statustext_calls) == 10 # AC-3: NEVER on primary MSP2 channel for code, _ in msp.raw_msgs: assert code == MSP2_SENSOR_GPS_CODE # ---------------------------------------------------------------------- # AC-4: signing-key rejection def test_ac4_signing_key_rejected(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> None: cfg = _inav_config(tmp_path) fdr = mock.MagicMock() a = Msp2InavAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, msp_connect_factory=lambda device, baud: msp, secondary_mavlink_factory=lambda: secondary, ) port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200) with pytest.raises(FcAdapterConfigError, match="iNav does not support MAVLink signing"): a.open(port, signing_key=b"\x00" * 32) # ---------------------------------------------------------------------- # AC-5: signing-asymmetry — adapter never calls setup_signing on secondary def test_ac5_signing_asymmetry_no_signed_flag( adapter: Msp2InavAdapter, secondary: _SecondaryMavStub ) -> None: for i in range(50): adapter.emit_external_position(_make_output(frame_id=i)) assert secondary.setup_signing_calls == [] # ---------------------------------------------------------------------- # AC-6: source-set-switch unsupported def test_ac6_source_set_switch_unsupported(adapter: Msp2InavAdapter) -> None: with pytest.raises(SourceSetSwitchNotSupportedError, match="iNav"): adapter.request_source_set_switch() # ---------------------------------------------------------------------- # AC-7: smoothed rejected def test_ac7_smoothed_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None: with pytest.raises(FcEmitError, match="smoothed"): adapter.emit_external_position(_make_output(smoothed=True)) assert msp.raw_msgs == [] # ---------------------------------------------------------------------- # AC-8: non-SPD cov rejected def test_ac8_non_spd_covariance_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None: bad = np.eye(6, dtype=np.float64) bad[0, 0] = -1.0 with pytest.raises(FcEmitError): adapter.emit_external_position(_make_output(cov=bad)) assert msp.raw_msgs == [] # ---------------------------------------------------------------------- # AC-9: single-writer thread def test_ac9_single_writer_thread(adapter: Msp2InavAdapter) -> None: adapter.emit_external_position(_make_output()) err: list[BaseException] = [] def run() -> None: try: adapter.emit_external_position(_make_output(frame_id=2)) except RuntimeError as e: err.append(e) t = threading.Thread(target=run) t.start() t.join(timeout=2.0) assert len(err) == 1 assert "single-writer" in str(err[0]).lower() # ---------------------------------------------------------------------- # AC-10: first emit logged once def test_ac10_first_emit_logged_once( adapter: Msp2InavAdapter, caplog: pytest.LogCaptureFixture ) -> None: with caplog.at_level(logging.INFO): for i in range(5): adapter.emit_external_position(_make_output(frame_id=i)) first = [rec for rec in caplog.records if getattr(rec, "kind", None) == "c8.inav.first_emit"] assert len(first) == 1 # ---------------------------------------------------------------------- # Invariant 2 cross-check: iNav config rejects ephemeral signing def test_inav_config_rejects_signing(tmp_path) -> None: env = { "FC_ADAPTER": "inav", "FC_SIGNING_KEY_SOURCE": "ephemeral_per_flight", } with pytest.raises(Exception, match="adapter='inav'"): load_config(env=env, paths=(), require_env=False)