"""Tests for MAVLink I/O Bridge (Phase 4). MAV-01: GPS_INPUT sent at configured rate. MAV-02: ESKF state correctly mapped to GPS_INPUT fields. MAV-03: IMU receive callback invoked. MAV-04: 3 consecutive failures trigger re-localisation request. MAV-05: Telemetry sent at 1 Hz. """ import asyncio import math import time import numpy as np import pytest from gps_denied.core.mavlink import ( MAVLinkBridge, MockMAVConnection, _confidence_to_fix_type, _eskf_to_gps_input, _unix_to_gps_time, ) from gps_denied.schemas import GPSPoint from gps_denied.schemas.eskf import ConfidenceTier, ESKFState from gps_denied.schemas.mavlink import GPSInputMessage, RelocalizationRequest # --------------------------------------------------------------- # Helpers # --------------------------------------------------------------- def _make_state( pos=(0.0, 0.0, 0.0), vel=(0.0, 0.0, 0.0), confidence=ConfidenceTier.HIGH, cov_scale=1.0, ) -> ESKFState: cov = np.eye(15) * cov_scale return ESKFState( position=np.array(pos), velocity=np.array(vel), quaternion=np.array([1.0, 0.0, 0.0, 0.0]), accel_bias=np.zeros(3), gyro_bias=np.zeros(3), covariance=cov, timestamp=time.time(), confidence=confidence, ) ORIGIN = GPSPoint(lat=49.0, lon=32.0) # --------------------------------------------------------------- # GPS time helpers # --------------------------------------------------------------- def test_unix_to_gps_time_epoch(): """GPS epoch (Unix=315964800) should be week=0, ms=0.""" week, ms = _unix_to_gps_time(315_964_800.0) assert week == 0 assert ms == 0 def test_unix_to_gps_time_recent(): """Recent timestamp must produce a valid week and ms-of-week.""" week, ms = _unix_to_gps_time(time.time()) assert week > 2000 # GPS week > 2000 in 2024+ assert 0 <= ms < 7 * 86400 * 1000 # --------------------------------------------------------------- # MAV-02: ESKF → GPS_INPUT field mapping # --------------------------------------------------------------- def test_confidence_to_fix_type(): """MAV-02: confidence tier → fix_type mapping.""" assert _confidence_to_fix_type(ConfidenceTier.HIGH) == 3 assert _confidence_to_fix_type(ConfidenceTier.MEDIUM) == 3 assert _confidence_to_fix_type(ConfidenceTier.LOW) == 0 assert _confidence_to_fix_type(ConfidenceTier.FAILED) == 0 def test_eskf_to_gps_input_position(): """MAV-02: ENU position → degE7 lat/lon.""" # 1° lat ≈ 111319.5 m; move 111319.5 m North → lat + 1° state = _make_state(pos=(0.0, 111_319.5, 0.0)) msg = _eskf_to_gps_input(state, ORIGIN) expected_lat = int((ORIGIN.lat + 1.0) * 1e7) assert abs(msg.lat - expected_lat) <= 10 # within 1 µ-degree tolerance def test_eskf_to_gps_input_lon(): """MAV-02: East displacement → longitude shift.""" cos_lat = math.cos(math.radians(ORIGIN.lat)) east_1deg = 111_319.5 * cos_lat state = _make_state(pos=(east_1deg, 0.0, 0.0)) msg = _eskf_to_gps_input(state, ORIGIN) expected_lon = int((ORIGIN.lon + 1.0) * 1e7) assert abs(msg.lon - expected_lon) <= 10 def test_eskf_to_gps_input_velocity_ned(): """MAV-02: ENU velocity → NED (vn=North, ve=East, vd=-Up).""" state = _make_state(vel=(3.0, 4.0, 1.0)) # ENU: E=3, N=4, U=1 msg = _eskf_to_gps_input(state, ORIGIN) assert math.isclose(msg.vn, 4.0, abs_tol=1e-3) # North = ENU[1] assert math.isclose(msg.ve, 3.0, abs_tol=1e-3) # East = ENU[0] assert math.isclose(msg.vd, -1.0, abs_tol=1e-3) # Down = -Up def test_eskf_to_gps_input_accuracy_from_covariance(): """MAV-02: accuracy fields derived from covariance diagonal.""" cov = np.eye(15) cov[0, 0] = 100.0 # East variance → σ_E = 10 m cov[1, 1] = 100.0 # North variance → σ_N = 10 m state = ESKFState( position=np.zeros(3), velocity=np.zeros(3), quaternion=np.array([1.0, 0, 0, 0]), accel_bias=np.zeros(3), gyro_bias=np.zeros(3), covariance=cov, timestamp=time.time(), confidence=ConfidenceTier.HIGH, ) msg = _eskf_to_gps_input(state, ORIGIN) # horiz_accuracy = sqrt(P[0,0] + P[1,1]) = sqrt(200) ≈ 14.14 assert math.isclose(msg.horiz_accuracy, math.sqrt(200.0), abs_tol=0.1) def test_eskf_to_gps_input_satellites_visible_10(): """Synthetic satellites_visible = 10 to prevent ArduPilot failsafes.""" msg = _eskf_to_gps_input(_make_state(), ORIGIN) assert msg.satellites_visible == 10 def test_eskf_to_gps_input_returns_message(): """_eskf_to_gps_input always returns a GPSInputMessage.""" msg = _eskf_to_gps_input(_make_state(), ORIGIN) assert isinstance(msg, GPSInputMessage) assert msg.fix_type == 3 # HIGH → 3D fix # --------------------------------------------------------------- # MAVLinkBridge — MockMAVConnection path # --------------------------------------------------------------- @pytest.fixture def bridge(): b = MAVLinkBridge(connection_string="mock://", output_hz=10.0, telemetry_hz=1.0) b._conn = MockMAVConnection() b._origin = ORIGIN return b def test_bridge_build_gps_input_no_state(bridge): """build_gps_input returns None before any state is pushed.""" assert bridge.build_gps_input() is None def test_bridge_build_gps_input_with_state(bridge): """build_gps_input returns a message once state is set.""" bridge.update_state(_make_state(), altitude_m=600.0) msg = bridge.build_gps_input() assert msg is not None assert msg.fix_type == 3 # --------------------------------------------------------------- # MAV-01: GPS output loop sends at configured rate # --------------------------------------------------------------- @pytest.mark.asyncio async def test_gps_output_sends_messages(bridge): """MAV-01: After N iterations the mock connection has GPS_INPUT records.""" bridge.update_state(_make_state(), altitude_m=500.0) bridge._running = True # Run one iteration manually await bridge._gps_output_loop.__wrapped__(bridge) if hasattr( bridge._gps_output_loop, "__wrapped__" ) else None # Directly call _send_gps_input msg = bridge.build_gps_input() bridge._send_gps_input(msg) sent = [s for s in bridge._conn._sent if s["type"] == "GPS_INPUT"] assert len(sent) >= 1 # --------------------------------------------------------------- # MAV-04: Consecutive failure detection # --------------------------------------------------------------- def test_consecutive_failure_counter_resets_on_good_state(bridge): """update_state with HIGH confidence resets failure counter.""" bridge._consecutive_failures = 5 bridge.update_state(_make_state(confidence=ConfidenceTier.HIGH)) assert bridge._consecutive_failures == 0 def test_consecutive_failure_counter_increments_on_low(bridge): """update_state with LOW confidence increments failure counter.""" bridge._consecutive_failures = 0 bridge.update_state(_make_state(confidence=ConfidenceTier.LOW)) assert bridge._consecutive_failures == 1 bridge.update_state(_make_state(confidence=ConfidenceTier.LOW)) assert bridge._consecutive_failures == 2 def test_reloc_request_triggered_after_3_failures(bridge): """MAV-04: after 3 failures the re-localisation callback is called.""" received: list[RelocalizationRequest] = [] bridge.set_reloc_callback(received.append) bridge._origin = ORIGIN bridge._last_state = _make_state() bridge._consecutive_failures = 3 bridge._send_reloc_request() assert len(received) == 1 assert received[0].consecutive_failures == 3 # Must include last known position assert received[0].last_lat is not None assert received[0].last_lon is not None def test_reloc_request_sent_to_mock_conn(bridge): """MAV-04: NAMED_VALUE_FLOAT messages written to mock connection.""" bridge._last_state = _make_state() bridge._consecutive_failures = 3 bridge._send_reloc_request() reloc = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"] assert len(reloc) >= 1 # --------------------------------------------------------------- # MAV-05: Telemetry # --------------------------------------------------------------- def test_telemetry_sends_named_value_float(bridge): """MAV-05: _send_telemetry writes NAMED_VALUE_FLOAT records.""" bridge._last_state = _make_state(confidence=ConfidenceTier.MEDIUM) bridge._send_telemetry() telem = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"] names = {s["kwargs"]["name"] for s in telem} assert "CONF_SCORE" in names assert "DRIFT_M" in names def test_telemetry_confidence_score_values(bridge): """MAV-05: confidence score matches tier mapping.""" for tier, expected in [ (ConfidenceTier.HIGH, 1.0), (ConfidenceTier.MEDIUM, 0.6), (ConfidenceTier.LOW, 0.2), (ConfidenceTier.FAILED, 0.0), ]: bridge._conn._sent.clear() bridge._last_state = _make_state(confidence=tier) bridge._send_telemetry() conf = next(s for s in bridge._conn._sent if s["kwargs"]["name"] == "CONF_SCORE") assert math.isclose(conf["kwargs"]["value"], expected, abs_tol=1e-6) # --------------------------------------------------------------- # MAV-03: IMU callback # --------------------------------------------------------------- def test_imu_callback_set_and_called(bridge): """MAV-03: IMU callback registered and invokable.""" received = [] cb = received.append bridge.set_imu_callback(cb) assert bridge._on_imu is cb # Simulate calling it from gps_denied.schemas.eskf import IMUMeasurement imu = IMUMeasurement(accel=np.zeros(3), gyro=np.zeros(3), timestamp=time.time()) bridge._on_imu(imu) assert len(received) == 1 @pytest.mark.asyncio async def test_start_stop(tmp_path): """Bridge start/stop completes without errors (mock mode).""" b = MAVLinkBridge(connection_string="mock://", output_hz=50.0) await b.start(ORIGIN) await asyncio.sleep(0.05) await b.stop() assert not b._running