mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 22:36:37 +00:00
dd9835c0cd
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001) - Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests - Removed unused imports (F401) in models.py, schemas/__init__.py - pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces ruff check: 0 errors. pytest: 195 passed / 8 skipped. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
328 lines
11 KiB
Python
328 lines
11 KiB
Python
"""SITL Integration Tests — GPS_INPUT delivery to ArduPilot SITL.
|
|
|
|
These tests verify the full MAVLink GPS_INPUT pipeline against a real
|
|
ArduPilot SITL flight controller. They are **skipped** unless the
|
|
``ARDUPILOT_SITL_HOST`` environment variable is set.
|
|
|
|
Run via Docker Compose SITL harness:
|
|
docker compose -f docker-compose.sitl.yml run integration-tests
|
|
|
|
Or manually with SITL running locally:
|
|
ARDUPILOT_SITL_HOST=localhost ARDUPILOT_SITL_PORT=5762 pytest tests/test_sitl_integration.py -v
|
|
|
|
Test IDs:
|
|
SITL-01: MAVLink connection to ArduPilot SITL succeeds.
|
|
SITL-02: GPS_INPUT message accepted by SITL FC (GPS_RAW_INT shows 3D fix).
|
|
SITL-03: MAVLinkBridge.start/stop lifecycle with real connection.
|
|
SITL-04: IMU RAW_IMU callback fires after connecting to SITL.
|
|
SITL-05: 5 consecutive GPS_INPUT messages delivered within 1.1s (≥5 Hz).
|
|
SITL-06: Telemetry NAMED_VALUE_FLOAT messages reach SITL at 1 Hz.
|
|
SITL-07: After 3 consecutive FAILED-confidence updates, reloc request fires.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import socket
|
|
import time
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied.schemas import GPSPoint
|
|
from gps_denied.schemas.eskf import ConfidenceTier, ESKFState
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Skip guard — all tests in this file are skipped unless SITL is available
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SITL_HOST = os.environ.get("ARDUPILOT_SITL_HOST", "")
|
|
SITL_PORT = int(os.environ.get("ARDUPILOT_SITL_PORT", "5762"))
|
|
|
|
_SITL_AVAILABLE = bool(SITL_HOST)
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
not _SITL_AVAILABLE,
|
|
reason="SITL integration tests require ARDUPILOT_SITL_HOST env var",
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ORIGIN = GPSPoint(lat=49.0, lon=32.0)
|
|
_MAVLINK_CONN = f"tcp:{SITL_HOST}:{SITL_PORT}" if SITL_HOST else "mock://"
|
|
|
|
|
|
def _make_eskf_state(
|
|
pos=(0.0, 0.0, 0.0),
|
|
vel=(0.0, 0.0, 0.0),
|
|
confidence: ConfidenceTier = ConfidenceTier.HIGH,
|
|
cov_scale: float = 1.0,
|
|
) -> ESKFState:
|
|
cov = np.eye(15) * cov_scale
|
|
return ESKFState(
|
|
position=np.array(pos, dtype=float),
|
|
velocity=np.array(vel, dtype=float),
|
|
quaternion=np.array([1.0, 0.0, 0.0, 0.0]),
|
|
accel_bias=np.zeros(3),
|
|
gyro_bias=np.zeros(3),
|
|
covariance=cov,
|
|
timestamp=time.time(),
|
|
confidence=confidence,
|
|
)
|
|
|
|
|
|
def _wait_for_tcp(host: str, port: int, timeout: float = 30.0) -> bool:
|
|
"""Block until TCP port is accepting connections (or timeout)."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
with socket.create_connection((host, port), timeout=2.0):
|
|
return True
|
|
except OSError:
|
|
time.sleep(1.0)
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-01: Connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_sitl_tcp_port_reachable():
|
|
"""SITL-01: ArduPilot SITL TCP port is reachable before running tests."""
|
|
reachable = _wait_for_tcp(SITL_HOST, SITL_PORT, timeout=30.0)
|
|
assert reachable, (
|
|
f"SITL not reachable at {SITL_HOST}:{SITL_PORT} — "
|
|
"is docker-compose.sitl.yml running?"
|
|
)
|
|
|
|
|
|
def test_pymavlink_connection_to_sitl():
|
|
"""SITL-01: pymavlink connects to SITL without error."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
from pymavlink import mavutil
|
|
|
|
mav = mavutil.mavlink_connection(_MAVLINK_CONN)
|
|
# Wait for heartbeat (up to 15s)
|
|
msg = mav.recv_match(type="HEARTBEAT", blocking=True, timeout=15)
|
|
mav.close()
|
|
assert msg is not None, "No HEARTBEAT received from SITL within 15s"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-02: GPS_INPUT accepted by SITL EKF
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_gps_input_accepted_by_sitl():
|
|
"""SITL-02: Sending GPS_INPUT produces GPS_RAW_INT with fix_type >= 3."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
from pymavlink import mavutil
|
|
|
|
mav = mavutil.mavlink_connection(_MAVLINK_CONN)
|
|
# Wait for SITL ready
|
|
mav.recv_match(type="HEARTBEAT", blocking=True, timeout=15)
|
|
|
|
# Send 10 GPS_INPUT messages at ~5 Hz
|
|
for _ in range(10):
|
|
now = time.time()
|
|
gps_s = now - 315_964_800
|
|
week = int(gps_s // (7 * 86400))
|
|
week_ms = int((gps_s % (7 * 86400)) * 1000)
|
|
|
|
mav.mav.gps_input_send(
|
|
int(now * 1_000_000), # time_usec
|
|
0, # gps_id
|
|
0, # ignore_flags
|
|
week_ms, # time_week_ms
|
|
week, # time_week
|
|
3, # fix_type (3D)
|
|
int(_ORIGIN.lat * 1e7), # lat
|
|
int(_ORIGIN.lon * 1e7), # lon
|
|
600.0, # alt MSL
|
|
1.0, # hdop
|
|
1.5, # vdop
|
|
0.0, # vn
|
|
0.0, # ve
|
|
0.0, # vd
|
|
0.3, # speed_accuracy
|
|
5.0, # horiz_accuracy
|
|
2.0, # vert_accuracy
|
|
10, # satellites_visible
|
|
)
|
|
time.sleep(0.2)
|
|
|
|
# Wait for GPS_RAW_INT confirming fix
|
|
deadline = time.time() + 10.0
|
|
fix_type = 0
|
|
while time.time() < deadline:
|
|
msg = mav.recv_match(type="GPS_RAW_INT", blocking=True, timeout=2.0)
|
|
if msg and msg.fix_type >= 3:
|
|
fix_type = msg.fix_type
|
|
break
|
|
|
|
mav.close()
|
|
assert fix_type >= 3, (
|
|
f"SITL GPS_RAW_INT fix_type={fix_type} after GPS_INPUT — "
|
|
"expected 3D fix (≥3)"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-03: MAVLinkBridge lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mavlink_bridge_start_stop_with_sitl():
|
|
"""SITL-03: MAVLinkBridge.start/stop with real SITL TCP connection."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
|
|
from gps_denied.core.mavlink import MAVLinkBridge
|
|
|
|
bridge = MAVLinkBridge(
|
|
connection_string=_MAVLINK_CONN,
|
|
output_hz=5.0,
|
|
telemetry_hz=1.0,
|
|
)
|
|
bridge.update_state(_make_eskf_state(), altitude_m=600.0)
|
|
|
|
await bridge.start(_ORIGIN)
|
|
# Let it run for one output period
|
|
await asyncio.sleep(0.25)
|
|
await bridge.stop()
|
|
|
|
assert not bridge._running
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-04: IMU receive callback
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_imu_callback_fires_from_sitl():
|
|
"""SITL-04: IMU callback is invoked when SITL sends RAW_IMU messages."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
|
|
from gps_denied.core.mavlink import MAVLinkBridge
|
|
from gps_denied.schemas.eskf import IMUMeasurement
|
|
|
|
received: list[IMUMeasurement] = []
|
|
|
|
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
|
|
bridge.set_imu_callback(received.append)
|
|
bridge.update_state(_make_eskf_state(), altitude_m=600.0)
|
|
|
|
await bridge.start(_ORIGIN)
|
|
# SITL sends RAW_IMU at ~50-200 Hz; wait 1s
|
|
await asyncio.sleep(1.0)
|
|
await bridge.stop()
|
|
|
|
assert len(received) >= 1, (
|
|
"No IMUMeasurement received from SITL in 1s — "
|
|
"check that SITL is sending RAW_IMU messages"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-05: GPS_INPUT rate ≥ 5 Hz
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gps_input_rate_at_least_5hz():
|
|
"""SITL-05: MAVLinkBridge delivers GPS_INPUT at ≥5 Hz over 1 second."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
from pymavlink import mavutil
|
|
|
|
from gps_denied.core.mavlink import MAVLinkBridge
|
|
|
|
# Monitor incoming GPS_INPUT on a separate MAVLink connection
|
|
monitor = mavutil.mavlink_connection(_MAVLINK_CONN)
|
|
monitor.recv_match(type="HEARTBEAT", blocking=True, timeout=10)
|
|
|
|
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
|
|
bridge.update_state(_make_eskf_state(confidence=ConfidenceTier.HIGH), altitude_m=600.0)
|
|
await bridge.start(_ORIGIN)
|
|
|
|
t_start = time.time()
|
|
count = 0
|
|
while time.time() - t_start < 1.1:
|
|
msg = monitor.recv_match(type="GPS_INPUT", blocking=True, timeout=0.5)
|
|
if msg:
|
|
count += 1
|
|
await asyncio.sleep(0)
|
|
|
|
await bridge.stop()
|
|
monitor.close()
|
|
|
|
assert count >= 5, f"Only {count} GPS_INPUT messages in 1.1s — expected ≥5 (5 Hz)"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-06: Telemetry at 1 Hz
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telemetry_reaches_sitl_at_1hz():
|
|
"""SITL-06: NAMED_VALUE_FLOAT CONF_SCORE delivered at ~1 Hz."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
from pymavlink import mavutil
|
|
|
|
from gps_denied.core.mavlink import MAVLinkBridge
|
|
|
|
monitor = mavutil.mavlink_connection(_MAVLINK_CONN)
|
|
monitor.recv_match(type="HEARTBEAT", blocking=True, timeout=10)
|
|
|
|
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0, telemetry_hz=1.0)
|
|
bridge.update_state(_make_eskf_state(confidence=ConfidenceTier.MEDIUM), altitude_m=600.0)
|
|
await bridge.start(_ORIGIN)
|
|
|
|
t_start = time.time()
|
|
conf_count = 0
|
|
while time.time() - t_start < 2.2:
|
|
msg = monitor.recv_match(type="NAMED_VALUE_FLOAT", blocking=True, timeout=0.5)
|
|
if msg and getattr(msg, "name", "").startswith("CONF"):
|
|
conf_count += 1
|
|
await asyncio.sleep(0)
|
|
|
|
await bridge.stop()
|
|
monitor.close()
|
|
|
|
assert conf_count >= 2, (
|
|
f"Only {conf_count} CONF_SCORE messages in 2.2s — expected ≥2 (1 Hz)"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SITL-07: Reloc request after 3 consecutive failures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reloc_request_after_3_failures_with_sitl():
|
|
"""SITL-07: After 3 FAILED-confidence updates, reloc callback fires."""
|
|
pytest.importorskip("pymavlink", reason="pymavlink not installed")
|
|
|
|
from gps_denied.core.mavlink import MAVLinkBridge
|
|
from gps_denied.schemas.mavlink import RelocalizationRequest
|
|
|
|
received: list[RelocalizationRequest] = []
|
|
|
|
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
|
|
bridge.set_reloc_callback(received.append)
|
|
bridge._origin = _ORIGIN
|
|
bridge._last_state = _make_eskf_state()
|
|
bridge._consecutive_failures = 3
|
|
|
|
await bridge.start(_ORIGIN)
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Trigger reloc manually (simulates 3 consecutive failures)
|
|
bridge._send_reloc_request()
|
|
|
|
await asyncio.sleep(0.1)
|
|
await bridge.stop()
|
|
|
|
assert len(received) == 1, f"Expected 1 reloc request, got {len(received)}"
|
|
assert received[0].consecutive_failures == 3
|
|
assert received[0].last_lat is not None
|