"""AZ-393 — PymavlinkArdupilotAdapter outbound AC tests. Covers all 10 ACs of AZ-393. The AP signing path is covered by ``test_az395_mavlink_signing.py``; this file uses ``signing_key_source="none"`` so the AP open path is signing-free. """ from __future__ import annotations import logging import threading from typing import Any from unittest import mock from uuid import UUID, uuid4 import numpy as np import pytest from gps_denied_onboard._types.fc import FcKind, PortConfig from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import ( EstimatorOutput, PoseSourceLabel, Quat, ) from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) from gps_denied_onboard.components.c8_fc_adapter._outbound_provenance import ( source_label_to_float, ) from gps_denied_onboard.components.c8_fc_adapter.errors import FcEmitError from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter import ( PymavlinkArdupilotAdapter, ) from gps_denied_onboard.config import load_config # ---------------------------------------------------------------------- # Helpers — pymavlink stand-in class _MavStub: """Captures pymavlink ``mav.*_send`` calls for wire-level assertions.""" def __init__(self) -> None: self.gps_input_calls: list[tuple[Any, ...]] = [] self.named_value_float_calls: list[tuple[Any, ...]] = [] self.statustext_calls: list[tuple[int, bytes]] = [] def gps_input_send(self, *args: Any) -> None: self.gps_input_calls.append(args) def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: self.named_value_float_calls.append((time_boot_ms, name, value)) def statustext_send(self, severity: int, text: bytes) -> None: self.statustext_calls.append((severity, text)) class _ConnStub: def __init__(self) -> None: self.mav = _MavStub() self.setup_signing_calls: list[bytes] = [] self.closed = False def setup_signing(self, key: bytes) -> None: self.setup_signing_calls.append(bytes(key)) def close(self) -> None: self.closed = True @pytest.fixture def conn() -> _ConnStub: return _ConnStub() @pytest.fixture def adapter(conn: _ConnStub, tmp_path) -> PymavlinkArdupilotAdapter: cfg = _config_for_ap(tmp_path, signing_key_source="none") fdr = mock.MagicMock() cov = CovarianceProjector(fdr_client=fdr) adapter = PymavlinkArdupilotAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=cov, fdr_client=fdr, clock=lambda: 1.0, flight_id="flt-test", connect_factory=lambda device, baud: conn, ) port = PortConfig( fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600, ) adapter.open(port, signing_key=None) yield adapter adapter.close() def _config_for_ap(tmp_path, *, signing_key_source: str = "none"): """Build a Config with ``fc.adapter='ardupilot_plane'``.""" env = { "FC_ADAPTER": "ardupilot_plane", "FC_PORT_DEVICE": "/dev/null", "FC_PORT_BAUD": "921600", "FC_SIGNING_KEY_SOURCE": signing_key_source, } return load_config(env=env, paths=(), require_env=False) def _make_output( *, source_label: PoseSourceLabel = PoseSourceLabel.VISUAL_PROPAGATED, smoothed: bool = False, cov: np.ndarray | None = None, wgs: LatLonAlt | None = None, frame_id: UUID | int | None = None, ) -> EstimatorOutput: if cov is None: cov = np.eye(6, dtype=np.float64) * 0.25 if wgs is None: wgs = LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0) if frame_id is None: frame_id = uuid4() elif isinstance(frame_id, int): # Deterministic UUID for legacy int-keyed tests. frame_id = UUID(int=frame_id) return EstimatorOutput( frame_id=frame_id, position_wgs84=wgs, orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=cov, source_label=source_label, last_satellite_anchor_age_ms=0, smoothed=smoothed, emitted_at=0, ) # ---------------------------------------------------------------------- # AC-1: GPS_INPUT field fidelity def test_ac1_gps_input_field_fidelity(adapter: PymavlinkArdupilotAdapter, conn: _ConnStub) -> None: # Arrange cov = np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64) wgs = LatLonAlt(lat_deg=50.4501, lon_deg=30.5234, alt_m=180.0) output = _make_output(cov=cov, wgs=wgs) expected_horiz_m = CovarianceProjector( fdr_client=mock.MagicMock() ).to_ardupilot_horiz_accuracy_m(output) # Act result = adapter.emit_external_position(output) # Assert assert len(conn.mav.gps_input_calls) == 1 call = conn.mav.gps_input_calls[0] # gps_input_send signature: (time_us, gps_id, ignore_flags, time_week_ms, # time_week, fix_type, lat_e7, lon_e7, alt, hdop, vdop, vn, ve, vd, # speed_accuracy, horiz_accuracy, vert_accuracy, sat_visible, yaw) assert call[5] == 3 # fix type 3D assert call[6] == int(wgs.lat_deg * 1e7) assert call[7] == int(wgs.lon_deg * 1e7) assert call[8] == pytest.approx(wgs.alt_m) assert call[15] == pytest.approx(expected_horiz_m, abs=1e-3) assert result.horiz_accuracy_m == pytest.approx(expected_horiz_m) assert result.fc_kind is FcKind.ARDUPILOT_PLANE assert result.sequence_number == 1 # ---------------------------------------------------------------------- # AC-2: GPS_INPUT every frame def test_ac2_gps_input_every_frame(adapter: PymavlinkArdupilotAdapter, conn: _ConnStub) -> None: for i in range(100): adapter.emit_external_position(_make_output(frame_id=i)) assert len(conn.mav.gps_input_calls) == 100 # ---------------------------------------------------------------------- # AC-3: NAMED_VALUE_FLOAT every frame, correct mapping def test_ac3_named_value_float_every_frame( adapter: PymavlinkArdupilotAdapter, conn: _ConnStub ) -> None: for i in range(100): adapter.emit_external_position(_make_output(frame_id=i)) assert len(conn.mav.named_value_float_calls) == 100 for _, name, value in conn.mav.named_value_float_calls: assert name == b"src_lbl" assert value == pytest.approx(source_label_to_float(PoseSourceLabel.VISUAL_PROPAGATED)) # ---------------------------------------------------------------------- # AC-4: STATUSTEXT rate-limited on transition def test_ac4_statustext_only_on_transition( adapter: PymavlinkArdupilotAdapter, conn: _ConnStub ) -> None: """100 frames, source_label toggles every 10; expect 10 transitions.""" # Loosen the 1 s per-severity hard cap for the test by patching the # rate-limiter's min_interval_s — AC-4 measures the transition # behaviour, not the secondary 1 Hz spam-defence cap. adapter._provenance._min_interval_s = 0.0 # type: ignore[attr-defined] labels = [PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.SATELLITE_ANCHORED] for i in range(100): label = labels[(i // 10) % 2] adapter.emit_external_position(_make_output(source_label=label, frame_id=i)) # Each block of 10 frames is a single label; that's 10 blocks → 9 # transitions between adjacent blocks plus 1 for the initial # None->visual_propagated bootstrap. assert len(conn.mav.statustext_calls) == 10 def test_ac4_statustext_zero_within_state( adapter: PymavlinkArdupilotAdapter, conn: _ConnStub ) -> None: """100 frames all on the same source label → exactly 1 STATUSTEXT (the bootstrap).""" adapter._provenance._min_interval_s = 0.0 # type: ignore[attr-defined] for i in range(100): adapter.emit_external_position(_make_output(frame_id=i)) assert len(conn.mav.statustext_calls) == 1 # ---------------------------------------------------------------------- # AC-5: Smoothed output rejected def test_ac5_smoothed_output_rejected( adapter: PymavlinkArdupilotAdapter, conn: _ConnStub, caplog: pytest.LogCaptureFixture ) -> None: output = _make_output(smoothed=True) with caplog.at_level(logging.ERROR), pytest.raises(FcEmitError, match="smoothed"): adapter.emit_external_position(output) assert len(conn.mav.gps_input_calls) == 0 assert any(getattr(rec, "kind", None) == "c8.ap.emit_failed" for rec in caplog.records) # ---------------------------------------------------------------------- # AC-6: Non-SPD covariance rejected def test_ac6_non_spd_covariance_rejected( adapter: PymavlinkArdupilotAdapter, conn: _ConnStub ) -> None: bad_cov = np.eye(6, dtype=np.float64) bad_cov[0, 0] = -1.0 # not positive-definite output = _make_output(cov=bad_cov) with pytest.raises(FcEmitError): adapter.emit_external_position(output) assert len(conn.mav.gps_input_calls) == 0 # ---------------------------------------------------------------------- # AC-7: Single-writer thread def test_ac7_single_writer_thread(adapter: PymavlinkArdupilotAdapter) -> None: adapter.emit_external_position(_make_output()) # binds to main thread err: list[BaseException] = [] def run() -> None: try: adapter.emit_external_position(_make_output(frame_id=2)) except RuntimeError as e: err.append(e) t = threading.Thread(target=run) t.start() t.join(timeout=2.0) assert len(err) == 1 assert "single-writer" in str(err[0]).lower() # ---------------------------------------------------------------------- # AC-8: Open without signing key (placeholder; AZ-395 tightens this) def test_ac8_open_without_signing_key_succeeds(conn: _ConnStub, tmp_path) -> None: cfg = _config_for_ap(tmp_path, signing_key_source="none") fdr = mock.MagicMock() a = PymavlinkArdupilotAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, connect_factory=lambda device, baud: conn, ) port = PortConfig( fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600, ) a.open(port, signing_key=None) try: assert a._opened is True finally: a.close() # ---------------------------------------------------------------------- # AC-9: source-set switch is wired (AZ-396 replaced AZ-393's placeholder) # # AZ-393's original AC-9 asserted `request_source_set_switch()` raises # `NotImplementedError`; AZ-396 (batch 11) replaced that placeholder # with the real body. The AZ-396 AC tests cover the new behaviour; # the AZ-393 surface is now exercised by those tests rather than this # one. The check left here verifies the method is present and callable. def test_ac9_source_set_switch_method_callable(adapter: PymavlinkArdupilotAdapter) -> None: assert callable(adapter.request_source_set_switch) # ---------------------------------------------------------------------- # AC-10: First emit logged once def test_ac10_first_emit_logged_once( adapter: PymavlinkArdupilotAdapter, caplog: pytest.LogCaptureFixture ) -> None: with caplog.at_level(logging.INFO): for i in range(5): adapter.emit_external_position(_make_output(frame_id=i)) first_emit_records = [ rec for rec in caplog.records if getattr(rec, "kind", None) == "c8.ap.first_emit" ] assert len(first_emit_records) == 1