"""AZ-397 — QgcTelemetryAdapter AC tests.""" from __future__ import annotations import logging import threading from types import SimpleNamespace from typing import Any from unittest import mock from uuid import UUID, uuid4 import numpy as np import pytest from gps_denied_onboard._types.fc import ( FcKind, OperatorCommand, PortConfig, Severity, ) from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import ( EstimatorOutput, PoseSourceLabel, Quat, ) from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) from gps_denied_onboard.components.c8_fc_adapter.errors import GcsAdapterConfigError from gps_denied_onboard.components.c8_fc_adapter.mavlink_gcs_adapter import ( QgcTelemetryAdapter, _compute_downsample_modulo, ) from gps_denied_onboard.config import load_config from gps_denied_onboard.config.schema import ConfigError, GcsConfig from gps_denied_onboard.helpers.wgs_converter import WgsConverter # ---------------------------------------------------------------------- # Helpers class _MavStub: def __init__(self) -> None: self.global_position_int_calls: list[tuple[Any, ...]] = [] self.named_value_float_calls: list[tuple[Any, ...]] = [] self.statustext_calls: list[tuple[int, bytes]] = [] def global_position_int_send(self, *args: Any) -> None: self.global_position_int_calls.append(args) def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: self.named_value_float_calls.append((time_boot_ms, name, value)) def statustext_send(self, severity: int, text: bytes) -> None: self.statustext_calls.append((severity, text)) class _ConnStub: def __init__(self) -> None: self.mav = _MavStub() self.message_hooks: list[Any] = [] self.closed = False def close(self) -> None: self.closed = True def _config(*, summary_rate_hz: float = 2.0) -> Any: env = { "FC_ADAPTER": "ardupilot_plane", "FC_PORT_DEVICE": "/dev/null", "FC_PORT_BAUD": "921600", "FC_SIGNING_KEY_SOURCE": "none", "GCS_ADAPTER": "qgc_mavlink", "GCS_PORT_DEVICE": "/dev/null", "GCS_PORT_BAUD": "921600", "GCS_SUMMARY_RATE_HZ": str(summary_rate_hz), } return load_config(env=env, paths=(), require_env=False) def _make_output( *, source_label: PoseSourceLabel = PoseSourceLabel.VISUAL_PROPAGATED, frame_id: UUID | int | None = None, ) -> EstimatorOutput: if frame_id is None: frame_id = uuid4() elif isinstance(frame_id, int): frame_id = UUID(int=frame_id) return EstimatorOutput( frame_id=frame_id, position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0), orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=np.eye(6, dtype=np.float64) * 0.25, source_label=source_label, last_satellite_anchor_age_ms=0, smoothed=False, emitted_at=0, ) def _make_adapter(conn: _ConnStub, cfg: Any) -> QgcTelemetryAdapter: fdr = mock.MagicMock() a = QgcTelemetryAdapter( config=cfg, wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, connect_factory=lambda device, baud: conn, ) port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) a.open(port) return a # ---------------------------------------------------------------------- # AC-1: 5 -> 2 Hz downsampling def test_ac1_5hz_to_2hz_downsample() -> None: conn = _ConnStub() a = _make_adapter(conn, _config(summary_rate_hz=2.0)) try: for i in range(100): a.emit_summary(_make_output(frame_id=i)) # modulo = round(5/2) = 2 (round half to even gives 2; we just # need parity with the documented choice). 100 / 2 = 50. assert _compute_downsample_modulo(2.0) == 2 assert len(conn.mav.global_position_int_calls) == 50 finally: a.close() # ---------------------------------------------------------------------- # AC-2: configurable rate @pytest.mark.parametrize( "rate_hz,expected_frames", [ (1.0, 20), # modulo 5 -> 100/5 (5.0, 100), # modulo 1 -> no downsample ], ) def test_ac2_configurable_rate(rate_hz: float, expected_frames: int) -> None: conn = _ConnStub() a = _make_adapter(conn, _config(summary_rate_hz=rate_hz)) try: for i in range(100): a.emit_summary(_make_output(frame_id=i)) assert len(conn.mav.global_position_int_calls) == expected_frames finally: a.close() def test_ac2_out_of_range_rate_rejected_at_config() -> None: with pytest.raises(ConfigError, match="summary_rate_hz"): GcsConfig(summary_rate_hz=10.0) with pytest.raises(ConfigError, match="summary_rate_hz"): GcsConfig(summary_rate_hz=0.2) # ---------------------------------------------------------------------- # AC-3: summary frame fields def test_ac3_summary_frame_fields() -> None: conn = _ConnStub() fdr = mock.MagicMock() a = QgcTelemetryAdapter( config=_config(summary_rate_hz=5.0), # no downsample for simplicity wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, connect_factory=lambda device, baud: conn, ) port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) a.open(port) try: wgs = LatLonAlt(lat_deg=50.45, lon_deg=30.52, alt_m=180.0) output = EstimatorOutput( frame_id=UUID(int=1), position_wgs84=wgs, orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0), velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64), source_label=PoseSourceLabel.VISUAL_PROPAGATED, last_satellite_anchor_age_ms=0, smoothed=False, emitted_at=0, ) a.emit_summary(output) assert len(conn.mav.global_position_int_calls) == 1 call = conn.mav.global_position_int_calls[0] # global_position_int_send(time_boot_ms, lat, lon, alt_mm, rel_alt_mm, vx, vy, vz, hdg) assert call[1] == int(wgs.lat_deg * 1e7) assert call[2] == int(wgs.lon_deg * 1e7) assert call[3] == int(wgs.alt_m * 1000.0) # horiz_accuracy via NAMED_VALUE_FLOAT assert len(conn.mav.named_value_float_calls) == 1 _, name, value = conn.mav.named_value_float_calls[0] assert name == b"horiz_m" expected = CovarianceProjector(fdr_client=mock.MagicMock()).to_ardupilot_horiz_accuracy_m( output ) assert value == pytest.approx(expected) finally: a.close() # ---------------------------------------------------------------------- # AC-4: STATUSTEXT mirror def test_ac4_statustext_mirror() -> None: conn = _ConnStub() a = _make_adapter(conn, _config(summary_rate_hz=2.0)) try: a.emit_status_text("hello", Severity.INFO) assert len(conn.mav.statustext_calls) == 1 sev, text = conn.mav.statustext_calls[0] assert sev == int(Severity.INFO.value) assert text == b"hello" finally: a.close() # ---------------------------------------------------------------------- # AC-5: operator command subscription def test_ac5_operator_command_subscription_invokes_callback() -> None: conn = _ConnStub() a = _make_adapter(conn, _config()) received: list[OperatorCommand] = [] try: a.subscribe_operator_commands(received.append) # Inject a PARAM_REQUEST_LIST stub msg = SimpleNamespace(target_system=1, target_component=1) msg.get_type = lambda: "PARAM_REQUEST_LIST" msg.get_srcSystem = lambda: 42 # Fire every registered hook for hook in conn.message_hooks: hook(conn, msg) assert len(received) == 1 cmd = received[0] assert cmd.command == "PARAM_REQUEST_LIST" assert cmd.payload.get("target_system") == 1 finally: a.close() # ---------------------------------------------------------------------- # AC-6: operator command FDR audit trail def test_ac6_operator_command_fdr_audit_trail() -> None: conn = _ConnStub() fdr = mock.MagicMock() a = QgcTelemetryAdapter( config=_config(), wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, connect_factory=lambda device, baud: conn, ) port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) a.open(port) try: a.subscribe_operator_commands(lambda _cmd: None) msg = SimpleNamespace(target_system=1, target_component=1, command=400) msg.get_type = lambda: "COMMAND_LONG" msg.get_srcSystem = lambda: 7 for hook in conn.message_hooks: hook(conn, msg) operator_records = [ call for call in fdr.enqueue.mock_calls if call.args[0].payload.get("kind") == "c8.gcs.operator_command" ] assert len(operator_records) == 1 rec = operator_records[0].args[0].payload assert rec["kv"]["command"] == "COMMAND_LONG" assert rec["kv"]["source_system"] == 7 finally: a.close() # ---------------------------------------------------------------------- # AC-7: single-writer thread for outbound def test_ac7_single_writer_thread() -> None: conn = _ConnStub() a = _make_adapter(conn, _config(summary_rate_hz=5.0)) err: list[BaseException] = [] try: a.emit_summary(_make_output()) def run() -> None: try: a.emit_summary(_make_output(frame_id=2)) except RuntimeError as e: err.append(e) t = threading.Thread(target=run) t.start() t.join(timeout=2.0) assert len(err) == 1 assert "single-writer" in str(err[0]).lower() finally: a.close() # ---------------------------------------------------------------------- # AC-8: first emit logged once def test_ac8_first_emit_logged_once(caplog: pytest.LogCaptureFixture) -> None: conn = _ConnStub() a = _make_adapter(conn, _config(summary_rate_hz=5.0)) try: with caplog.at_level(logging.INFO): for i in range(5): a.emit_summary(_make_output(frame_id=i)) first = [ r for r in caplog.records if getattr(r, "kind", None) == "c8.gcs.first_summary_emit" ] assert len(first) == 1 finally: a.close() # ---------------------------------------------------------------------- # AC-9: WGS84 round-trip (defensive) def test_ac9_wgs84_round_trip_within_1cm() -> None: # Round-trip a known lat/lon through WgsConverter ECEF and back. origin = LatLonAlt(lat_deg=50.45, lon_deg=30.52, alt_m=180.0) enu = WgsConverter.latlonalt_to_local_enu( origin, LatLonAlt(lat_deg=50.4501, lon_deg=30.5201, alt_m=180.5) ) recovered = WgsConverter.local_enu_to_latlonalt(origin, enu) expected = LatLonAlt(lat_deg=50.4501, lon_deg=30.5201, alt_m=180.5) # 1 cm threshold — pyproj's ENU round-trip is sub-millimetre at # these distances; we keep the AC's 1 cm threshold as the contract. enu_residual = WgsConverter.latlonalt_to_local_enu(expected, recovered) assert np.linalg.norm(enu_residual) < 0.01 # ---------------------------------------------------------------------- # AC-10: GcsAdapterConfigError on bad config (config-load path) def test_ac10_gcs_config_error_on_bad_rate() -> None: with pytest.raises(ConfigError, match="summary_rate_hz"): GcsConfig(summary_rate_hz=6.0) def test_ac10_open_rejects_wrong_fc_kind() -> None: conn = _ConnStub() fdr = mock.MagicMock() a = QgcTelemetryAdapter( config=_config(), wgs_converter=mock.MagicMock(), covariance_projector=CovarianceProjector(fdr_client=fdr), fdr_client=fdr, connect_factory=lambda device, baud: conn, ) port = PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600) with pytest.raises(GcsAdapterConfigError, match="GCS_QGC"): a.open(port)