mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 10:31:13 +00:00
2b19b8b90b
All FC adapter outbound MAVLink bytes now go through the AZ-401 MavlinkTransport seam (NoopMavlinkTransport in replay, SerialMavlinkTransport in live). New helpers in _outbound_mavlink_payloads.py extract encode/pack/seq-bump so the four AP _send sites and the iNav statustext _send site become encode -> pack -> transport.write. TlogReplayFcAdapter emits real AP-shape MAVLink bytes through the injected NoopMavlinkTransport, satisfying replay protocol Invariant 5 and unblocking AZ-401 AC-9. Closes AZ-558. Also unskips AZ-401 AC-9 and AZ-404 AC-4b. Live wire output remains byte-identical (proven via two-instance MAVLink byte-equivalence tests). AST scan asserts no .mav.<name>_send( calls remain in the retrofit set (AP / iNav / tlog adapters). Out of scope (logged in review): GCS adapter retrofit; airborne live strategy registration that would activate the SerialMavlinkTransport factory injection path. Tests: 2110 passed, 92 environmental skips, 1 unrelated pre-existing macOS cold-start flake deselected. Co-authored-by: Cursor <cursoragent@cursor.com>
327 lines
10 KiB
Python
327 lines
10 KiB
Python
"""AZ-394 — Msp2InavAdapter outbound AC tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import UUID, uuid4
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.fc import FcKind, PortConfig
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.state import (
|
|
EstimatorOutput,
|
|
PoseSourceLabel,
|
|
Quat,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import (
|
|
CovarianceProjector,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._msp2_sensor_gps_encoder import (
|
|
MSP2_SENSOR_GPS_CODE,
|
|
decode_msp2_sensor_gps,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter.errors import (
|
|
FcAdapterConfigError,
|
|
FcEmitError,
|
|
SourceSetSwitchNotSupportedError,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter.msp2_inav_adapter import (
|
|
Msp2InavAdapter,
|
|
)
|
|
from gps_denied_onboard.config import load_config
|
|
|
|
from ._mav_test_helpers import _FakeMsg
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Helpers — MSP / secondary-MAVLink stand-ins
|
|
|
|
|
|
class _MspStub:
|
|
def __init__(self) -> None:
|
|
self.raw_msgs: list[tuple[int, bytes]] = []
|
|
self.closed = False
|
|
|
|
def send_RAW_msg(self, code: int, data: bytes) -> None:
|
|
self.raw_msgs.append((int(code), bytes(data)))
|
|
|
|
def close(self) -> None:
|
|
self.closed = True
|
|
|
|
|
|
class _SigningStub:
|
|
sign_outgoing = False
|
|
|
|
|
|
class _SecondaryMavStub:
|
|
def __init__(self) -> None:
|
|
self.statustext_calls: list[tuple[int, bytes]] = []
|
|
self.closed = False
|
|
self.write_calls: list[bytes] = []
|
|
# Mirror pymavlink connection.mav shape.
|
|
self.mav = self
|
|
# Track signing-key state per RESTRICT-COMM-2 (Invariant 9):
|
|
# the unit-test adapter MUST never call setup_signing on us.
|
|
self.setup_signing_calls: list[Any] = []
|
|
# AZ-558: encoder flow needs ``mav.seq`` and ``mav.signing``.
|
|
self.seq: int = 0
|
|
self.signing = _SigningStub()
|
|
|
|
def statustext_encode(self, severity: int, text: bytes) -> _FakeMsg:
|
|
self.statustext_calls.append((int(severity), bytes(text)))
|
|
return _FakeMsg()
|
|
|
|
def setup_signing(self, key: Any) -> None:
|
|
self.setup_signing_calls.append(key)
|
|
|
|
def write(self, payload: bytes) -> int:
|
|
self.write_calls.append(bytes(payload))
|
|
return len(payload)
|
|
|
|
def close(self) -> None:
|
|
self.closed = True
|
|
|
|
|
|
@pytest.fixture
|
|
def msp() -> _MspStub:
|
|
return _MspStub()
|
|
|
|
|
|
@pytest.fixture
|
|
def secondary() -> _SecondaryMavStub:
|
|
return _SecondaryMavStub()
|
|
|
|
|
|
def _inav_config(tmp_path) -> Any:
|
|
env = {
|
|
"FC_ADAPTER": "inav",
|
|
"FC_PORT_DEVICE": "/dev/null",
|
|
"FC_PORT_BAUD": "115200",
|
|
"FC_SIGNING_KEY_SOURCE": "none",
|
|
}
|
|
return load_config(env=env, paths=(), require_env=False)
|
|
|
|
|
|
def _make_output(
|
|
*,
|
|
source_label: PoseSourceLabel = PoseSourceLabel.VISUAL_PROPAGATED,
|
|
smoothed: bool = False,
|
|
cov: np.ndarray | None = None,
|
|
wgs: LatLonAlt | None = None,
|
|
frame_id: UUID | int | None = None,
|
|
) -> EstimatorOutput:
|
|
if cov is None:
|
|
cov = np.eye(6, dtype=np.float64) * 0.25
|
|
if wgs is None:
|
|
wgs = LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0)
|
|
if frame_id is None:
|
|
frame_id = uuid4()
|
|
elif isinstance(frame_id, int):
|
|
frame_id = UUID(int=frame_id)
|
|
return EstimatorOutput(
|
|
frame_id=frame_id,
|
|
position_wgs84=wgs,
|
|
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
|
|
velocity_world_mps=(0.0, 0.0, 0.0),
|
|
covariance_6x6=cov,
|
|
source_label=source_label,
|
|
last_satellite_anchor_age_ms=0,
|
|
smoothed=smoothed,
|
|
emitted_at=0,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def adapter(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> Msp2InavAdapter:
|
|
cfg = _inav_config(tmp_path)
|
|
fdr = mock.MagicMock()
|
|
a = Msp2InavAdapter(
|
|
config=cfg,
|
|
wgs_converter=mock.MagicMock(),
|
|
covariance_projector=CovarianceProjector(fdr_client=fdr),
|
|
fdr_client=fdr,
|
|
msp_connect_factory=lambda device, baud: msp,
|
|
secondary_mavlink_factory=lambda: secondary,
|
|
)
|
|
port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200)
|
|
a.open(port, signing_key=None)
|
|
yield a
|
|
a.close()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1: field fidelity
|
|
|
|
|
|
def test_ac1_msp2_field_fidelity(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
|
|
# Arrange
|
|
cov = np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64)
|
|
wgs = LatLonAlt(lat_deg=50.4501, lon_deg=30.5234, alt_m=180.0)
|
|
output = _make_output(cov=cov, wgs=wgs)
|
|
expected_mm = CovarianceProjector(fdr_client=mock.MagicMock()).to_inav_h_pos_accuracy_mm(output)
|
|
|
|
# Act
|
|
result = adapter.emit_external_position(output)
|
|
|
|
# Assert
|
|
assert len(msp.raw_msgs) == 1
|
|
code, payload = msp.raw_msgs[0]
|
|
assert code == MSP2_SENSOR_GPS_CODE
|
|
decoded = decode_msp2_sensor_gps(payload)
|
|
assert decoded.fix_type == 3
|
|
assert decoded.latitude_e7 == int(wgs.lat_deg * 1e7)
|
|
assert decoded.longitude_e7 == int(wgs.lon_deg * 1e7)
|
|
assert decoded.msl_altitude_cm == int(wgs.alt_m * 100.0)
|
|
assert decoded.h_pos_accuracy_mm == expected_mm
|
|
assert result.fc_kind is FcKind.INAV
|
|
assert result.horiz_accuracy_m == pytest.approx(expected_mm / 1000.0)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2: every frame, monotonic seq
|
|
|
|
|
|
def test_ac2_msp2_every_frame_with_seq(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
|
|
seqs = []
|
|
for i in range(100):
|
|
result = adapter.emit_external_position(_make_output(frame_id=i))
|
|
seqs.append(result.sequence_number)
|
|
assert len(msp.raw_msgs) == 100
|
|
# AC-2: monotonically incrementing (modulo uint8 wrap)
|
|
assert seqs[0] == 1
|
|
assert seqs[-1] == 100 & 0xFF
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3: STATUSTEXT on secondary channel only, on transitions
|
|
|
|
|
|
def test_ac3_statustext_secondary_only_on_transitions(
|
|
adapter: Msp2InavAdapter, msp: _MspStub, secondary: _SecondaryMavStub
|
|
) -> None:
|
|
adapter._provenance._min_interval_s = 0.0 # type: ignore[attr-defined]
|
|
labels = [PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.SATELLITE_ANCHORED]
|
|
for i in range(100):
|
|
label = labels[(i // 10) % 2]
|
|
adapter.emit_external_position(_make_output(source_label=label, frame_id=i))
|
|
# 10 transitions including the initial None->visual_propagated
|
|
assert len(secondary.statustext_calls) == 10
|
|
# AC-3: NEVER on primary MSP2 channel
|
|
for code, _ in msp.raw_msgs:
|
|
assert code == MSP2_SENSOR_GPS_CODE
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4: signing-key rejection
|
|
|
|
|
|
def test_ac4_signing_key_rejected(msp: _MspStub, secondary: _SecondaryMavStub, tmp_path) -> None:
|
|
cfg = _inav_config(tmp_path)
|
|
fdr = mock.MagicMock()
|
|
a = Msp2InavAdapter(
|
|
config=cfg,
|
|
wgs_converter=mock.MagicMock(),
|
|
covariance_projector=CovarianceProjector(fdr_client=fdr),
|
|
fdr_client=fdr,
|
|
msp_connect_factory=lambda device, baud: msp,
|
|
secondary_mavlink_factory=lambda: secondary,
|
|
)
|
|
port = PortConfig(fc_kind=FcKind.INAV, device="/dev/null", baud=115200)
|
|
with pytest.raises(FcAdapterConfigError, match="iNav does not support MAVLink signing"):
|
|
a.open(port, signing_key=b"\x00" * 32)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5: signing-asymmetry — adapter never calls setup_signing on secondary
|
|
|
|
|
|
def test_ac5_signing_asymmetry_no_signed_flag(
|
|
adapter: Msp2InavAdapter, secondary: _SecondaryMavStub
|
|
) -> None:
|
|
for i in range(50):
|
|
adapter.emit_external_position(_make_output(frame_id=i))
|
|
assert secondary.setup_signing_calls == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6: source-set-switch unsupported
|
|
|
|
|
|
def test_ac6_source_set_switch_unsupported(adapter: Msp2InavAdapter) -> None:
|
|
with pytest.raises(SourceSetSwitchNotSupportedError, match="iNav"):
|
|
adapter.request_source_set_switch()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7: smoothed rejected
|
|
|
|
|
|
def test_ac7_smoothed_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
|
|
with pytest.raises(FcEmitError, match="smoothed"):
|
|
adapter.emit_external_position(_make_output(smoothed=True))
|
|
assert msp.raw_msgs == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8: non-SPD cov rejected
|
|
|
|
|
|
def test_ac8_non_spd_covariance_rejected(adapter: Msp2InavAdapter, msp: _MspStub) -> None:
|
|
bad = np.eye(6, dtype=np.float64)
|
|
bad[0, 0] = -1.0
|
|
with pytest.raises(FcEmitError):
|
|
adapter.emit_external_position(_make_output(cov=bad))
|
|
assert msp.raw_msgs == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9: single-writer thread
|
|
|
|
|
|
def test_ac9_single_writer_thread(adapter: Msp2InavAdapter) -> None:
|
|
adapter.emit_external_position(_make_output())
|
|
err: list[BaseException] = []
|
|
|
|
def run() -> None:
|
|
try:
|
|
adapter.emit_external_position(_make_output(frame_id=2))
|
|
except RuntimeError as e:
|
|
err.append(e)
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
t.join(timeout=2.0)
|
|
assert len(err) == 1
|
|
assert "single-writer" in str(err[0]).lower()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10: first emit logged once
|
|
|
|
|
|
def test_ac10_first_emit_logged_once(
|
|
adapter: Msp2InavAdapter, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
with caplog.at_level(logging.INFO):
|
|
for i in range(5):
|
|
adapter.emit_external_position(_make_output(frame_id=i))
|
|
first = [rec for rec in caplog.records if getattr(rec, "kind", None) == "c8.inav.first_emit"]
|
|
assert len(first) == 1
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Invariant 2 cross-check: iNav config rejects ephemeral signing
|
|
|
|
|
|
def test_inav_config_rejects_signing(tmp_path) -> None:
|
|
env = {
|
|
"FC_ADAPTER": "inav",
|
|
"FC_SIGNING_KEY_SOURCE": "ephemeral_per_flight",
|
|
}
|
|
with pytest.raises(Exception, match="adapter='inav'"):
|
|
load_config(env=env, paths=(), require_env=False)
|