Files
gps-denied-onboard/tests/test_mavlink.py
T
Yuzviak dd9835c0cd fix(lint): resolve all ruff errors — trailing whitespace, E501, F401
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001)
- Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests
- Removed unused imports (F401) in models.py, schemas/__init__.py
- pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces

ruff check: 0 errors. pytest: 195 passed / 8 skipped.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 17:09:47 +03:00

288 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for MAVLink I/O Bridge (Phase 4).
MAV-01: GPS_INPUT sent at configured rate.
MAV-02: ESKF state correctly mapped to GPS_INPUT fields.
MAV-03: IMU receive callback invoked.
MAV-04: 3 consecutive failures trigger re-localisation request.
MAV-05: Telemetry sent at 1 Hz.
"""
import asyncio
import math
import time
import numpy as np
import pytest
from gps_denied.core.mavlink import (
MAVLinkBridge,
MockMAVConnection,
_confidence_to_fix_type,
_eskf_to_gps_input,
_unix_to_gps_time,
)
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ConfidenceTier, ESKFState
from gps_denied.schemas.mavlink import GPSInputMessage, RelocalizationRequest
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
def _make_state(
pos=(0.0, 0.0, 0.0),
vel=(0.0, 0.0, 0.0),
confidence=ConfidenceTier.HIGH,
cov_scale=1.0,
) -> ESKFState:
cov = np.eye(15) * cov_scale
return ESKFState(
position=np.array(pos),
velocity=np.array(vel),
quaternion=np.array([1.0, 0.0, 0.0, 0.0]),
accel_bias=np.zeros(3),
gyro_bias=np.zeros(3),
covariance=cov,
timestamp=time.time(),
confidence=confidence,
)
ORIGIN = GPSPoint(lat=49.0, lon=32.0)
# ---------------------------------------------------------------
# GPS time helpers
# ---------------------------------------------------------------
def test_unix_to_gps_time_epoch():
"""GPS epoch (Unix=315964800) should be week=0, ms=0."""
week, ms = _unix_to_gps_time(315_964_800.0)
assert week == 0
assert ms == 0
def test_unix_to_gps_time_recent():
"""Recent timestamp must produce a valid week and ms-of-week."""
week, ms = _unix_to_gps_time(time.time())
assert week > 2000 # GPS week > 2000 in 2024+
assert 0 <= ms < 7 * 86400 * 1000
# ---------------------------------------------------------------
# MAV-02: ESKF → GPS_INPUT field mapping
# ---------------------------------------------------------------
def test_confidence_to_fix_type():
"""MAV-02: confidence tier → fix_type mapping."""
assert _confidence_to_fix_type(ConfidenceTier.HIGH) == 3
assert _confidence_to_fix_type(ConfidenceTier.MEDIUM) == 2
assert _confidence_to_fix_type(ConfidenceTier.LOW) == 0
assert _confidence_to_fix_type(ConfidenceTier.FAILED) == 0
def test_eskf_to_gps_input_position():
"""MAV-02: ENU position → degE7 lat/lon."""
# 1° lat ≈ 111319.5 m; move 111319.5 m North → lat + 1°
state = _make_state(pos=(0.0, 111_319.5, 0.0))
msg = _eskf_to_gps_input(state, ORIGIN)
expected_lat = int((ORIGIN.lat + 1.0) * 1e7)
assert abs(msg.lat - expected_lat) <= 10 # within 1 µ-degree tolerance
def test_eskf_to_gps_input_lon():
"""MAV-02: East displacement → longitude shift."""
cos_lat = math.cos(math.radians(ORIGIN.lat))
east_1deg = 111_319.5 * cos_lat
state = _make_state(pos=(east_1deg, 0.0, 0.0))
msg = _eskf_to_gps_input(state, ORIGIN)
expected_lon = int((ORIGIN.lon + 1.0) * 1e7)
assert abs(msg.lon - expected_lon) <= 10
def test_eskf_to_gps_input_velocity_ned():
"""MAV-02: ENU velocity → NED (vn=North, ve=East, vd=-Up)."""
state = _make_state(vel=(3.0, 4.0, 1.0)) # ENU: E=3, N=4, U=1
msg = _eskf_to_gps_input(state, ORIGIN)
assert math.isclose(msg.vn, 4.0, abs_tol=1e-3) # North = ENU[1]
assert math.isclose(msg.ve, 3.0, abs_tol=1e-3) # East = ENU[0]
assert math.isclose(msg.vd, -1.0, abs_tol=1e-3) # Down = -Up
def test_eskf_to_gps_input_accuracy_from_covariance():
"""MAV-02: accuracy fields derived from covariance diagonal."""
cov = np.eye(15)
cov[0, 0] = 100.0 # East variance → σ_E = 10 m
cov[1, 1] = 100.0 # North variance → σ_N = 10 m
state = ESKFState(
position=np.zeros(3), velocity=np.zeros(3),
quaternion=np.array([1.0, 0, 0, 0]),
accel_bias=np.zeros(3), gyro_bias=np.zeros(3),
covariance=cov, timestamp=time.time(),
confidence=ConfidenceTier.HIGH,
)
msg = _eskf_to_gps_input(state, ORIGIN)
assert math.isclose(msg.horiz_accuracy, 10.0, abs_tol=0.01)
def test_eskf_to_gps_input_returns_message():
"""_eskf_to_gps_input always returns a GPSInputMessage."""
msg = _eskf_to_gps_input(_make_state(), ORIGIN)
assert isinstance(msg, GPSInputMessage)
assert msg.fix_type == 3 # HIGH → 3D fix
# ---------------------------------------------------------------
# MAVLinkBridge — MockMAVConnection path
# ---------------------------------------------------------------
@pytest.fixture
def bridge():
b = MAVLinkBridge(connection_string="mock://", output_hz=10.0, telemetry_hz=1.0)
b._conn = MockMAVConnection()
b._origin = ORIGIN
return b
def test_bridge_build_gps_input_no_state(bridge):
"""build_gps_input returns None before any state is pushed."""
assert bridge.build_gps_input() is None
def test_bridge_build_gps_input_with_state(bridge):
"""build_gps_input returns a message once state is set."""
bridge.update_state(_make_state(), altitude_m=600.0)
msg = bridge.build_gps_input()
assert msg is not None
assert msg.fix_type == 3
# ---------------------------------------------------------------
# MAV-01: GPS output loop sends at configured rate
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_gps_output_sends_messages(bridge):
"""MAV-01: After N iterations the mock connection has GPS_INPUT records."""
bridge.update_state(_make_state(), altitude_m=500.0)
bridge._running = True
# Run one iteration manually
await bridge._gps_output_loop.__wrapped__(bridge) if hasattr(
bridge._gps_output_loop, "__wrapped__"
) else None
# Directly call _send_gps_input
msg = bridge.build_gps_input()
bridge._send_gps_input(msg)
sent = [s for s in bridge._conn._sent if s["type"] == "GPS_INPUT"]
assert len(sent) >= 1
# ---------------------------------------------------------------
# MAV-04: Consecutive failure detection
# ---------------------------------------------------------------
def test_consecutive_failure_counter_resets_on_good_state(bridge):
"""update_state with HIGH confidence resets failure counter."""
bridge._consecutive_failures = 5
bridge.update_state(_make_state(confidence=ConfidenceTier.HIGH))
assert bridge._consecutive_failures == 0
def test_consecutive_failure_counter_increments_on_low(bridge):
"""update_state with LOW confidence increments failure counter."""
bridge._consecutive_failures = 0
bridge.update_state(_make_state(confidence=ConfidenceTier.LOW))
assert bridge._consecutive_failures == 1
bridge.update_state(_make_state(confidence=ConfidenceTier.LOW))
assert bridge._consecutive_failures == 2
def test_reloc_request_triggered_after_3_failures(bridge):
"""MAV-04: after 3 failures the re-localisation callback is called."""
received: list[RelocalizationRequest] = []
bridge.set_reloc_callback(received.append)
bridge._origin = ORIGIN
bridge._last_state = _make_state()
bridge._consecutive_failures = 3
bridge._send_reloc_request()
assert len(received) == 1
assert received[0].consecutive_failures == 3
# Must include last known position
assert received[0].last_lat is not None
assert received[0].last_lon is not None
def test_reloc_request_sent_to_mock_conn(bridge):
"""MAV-04: NAMED_VALUE_FLOAT messages written to mock connection."""
bridge._last_state = _make_state()
bridge._consecutive_failures = 3
bridge._send_reloc_request()
reloc = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"]
assert len(reloc) >= 1
# ---------------------------------------------------------------
# MAV-05: Telemetry
# ---------------------------------------------------------------
def test_telemetry_sends_named_value_float(bridge):
"""MAV-05: _send_telemetry writes NAMED_VALUE_FLOAT records."""
bridge._last_state = _make_state(confidence=ConfidenceTier.MEDIUM)
bridge._send_telemetry()
telem = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"]
names = {s["kwargs"]["name"] for s in telem}
assert "CONF_SCORE" in names
assert "DRIFT_M" in names
def test_telemetry_confidence_score_values(bridge):
"""MAV-05: confidence score matches tier mapping."""
for tier, expected in [
(ConfidenceTier.HIGH, 1.0),
(ConfidenceTier.MEDIUM, 0.6),
(ConfidenceTier.LOW, 0.2),
(ConfidenceTier.FAILED, 0.0),
]:
bridge._conn._sent.clear()
bridge._last_state = _make_state(confidence=tier)
bridge._send_telemetry()
conf = next(s for s in bridge._conn._sent if s["kwargs"]["name"] == "CONF_SCORE")
assert math.isclose(conf["kwargs"]["value"], expected, abs_tol=1e-6)
# ---------------------------------------------------------------
# MAV-03: IMU callback
# ---------------------------------------------------------------
def test_imu_callback_set_and_called(bridge):
"""MAV-03: IMU callback registered and invokable."""
received = []
cb = received.append
bridge.set_imu_callback(cb)
assert bridge._on_imu is cb
# Simulate calling it
from gps_denied.schemas.eskf import IMUMeasurement
imu = IMUMeasurement(accel=np.zeros(3), gyro=np.zeros(3), timestamp=time.time())
bridge._on_imu(imu)
assert len(received) == 1
@pytest.mark.asyncio
async def test_start_stop(tmp_path):
"""Bridge start/stop completes without errors (mock mode)."""
b = MAVLinkBridge(connection_string="mock://", output_hz=50.0)
await b.start(ORIGIN)
await asyncio.sleep(0.05)
await b.stop()
assert not b._running