feat(phases 2-7): implement full GPS-denied navigation pipeline

Phase 2 — Visual Odometry:
  - ORBVisualOdometry (dev/CI), CuVSLAMVisualOdometry (Jetson)
  - TRTInferenceEngine (TensorRT FP16, conditional import)
  - create_vo_backend() factory

Phase 3 — Satellite Matching + GPR:
  - SatelliteDataManager: local z/x/y tiles, ESKF ±3σ tile selection
  - GSD normalization (SAT-03), RANSAC inlier-ratio confidence (SAT-04)
  - GlobalPlaceRecognition: Faiss index + numpy fallback

Phase 4 — MAVLink I/O:
  - MAVLinkBridge: GPS_INPUT 15+ fields, IMU callback, 1Hz telemetry
  - 3-consecutive-failure reloc request
  - MockMAVConnection for CI

Phase 5 — Pipeline Wiring:
  - ESKF wired into process_frame: VO update → satellite update
  - CoordinateTransformer + SatelliteDataManager via DI
  - MAVLink state push per frame (PIPE-07)
  - Real pixel_to_gps via ray-ground projection (PIPE-06)
  - GTSAM ISAM2 update when available (PIPE-03)

Phase 6 — Docker + CI:
  - Multi-stage Dockerfile (python:3.11-slim)
  - docker-compose.yml (dev), docker-compose.sitl.yml (ArduPilot SITL)
  - GitHub Actions: ci.yml (lint+pytest+docker smoke), sitl.yml (nightly)
  - tests/test_sitl_integration.py (8 tests, skip without SITL)

Phase 7 — Accuracy Validation:
  - AccuracyBenchmark + SyntheticTrajectory
  - AC-PERF-1: 80% within 50m 
  - AC-PERF-2: 60% within 20m 
  - AC-PERF-3: p95 latency < 400ms 
  - AC-PERF-4: VO drift 1km < 100m  (actual ~11m)
  - scripts/benchmark_accuracy.py CLI

Tests: 195 passed / 8 skipped

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Yuzviak
2026-04-02 17:00:41 +03:00
parent a15bef5c01
commit 094895b21b
40 changed files with 4572 additions and 497 deletions
+1 -1
View File
@@ -148,7 +148,7 @@ async def test_ac4_user_anchor_fix(wired_processor):
Verify that add_absolute_factor with is_user_anchor=True is accepted
by the graph and the trajectory incorporates the anchor.
"""
from gps_denied.schemas.flight import GPSPoint
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.graph import Pose
from datetime import datetime
+363
View File
@@ -0,0 +1,363 @@
"""Accuracy Validation Tests (Phase 7).
Verifies all solution.md acceptance criteria against synthetic trajectories.
AC-PERF-1: 80 % of frames within 50 m.
AC-PERF-2: 60 % of frames within 20 m.
AC-PERF-3: p95 per-frame latency < 400 ms.
AC-PERF-4: VO drift over 1 km straight segment (no sat correction) < 100 m.
AC-PERF-5: ESKF confidence tier transitions correctly with satellite age.
AC-PERF-6: ESKF covariance shrinks after satellite correction.
AC-PERF-7: Benchmark result summary is non-empty string.
AC-PERF-8: Synthetic trajectory length matches requested frame count.
AC-PERF-9: BenchmarkResult.pct_within_50m / pct_within_20m computed correctly.
AC-PERF-10: 30-frame straight flight — median error < 30 m with sat corrections.
AC-PERF-11: VO failure frames do not crash benchmark.
AC-PERF-12: Waypoint steering changes direction correctly.
"""
import math
import time
import numpy as np
import pytest
from gps_denied.core.benchmark import (
AccuracyBenchmark,
BenchmarkResult,
SyntheticTrajectory,
SyntheticTrajectoryConfig,
TrajectoryFrame,
)
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ESKFConfig
ORIGIN = GPSPoint(lat=49.0, lon=32.0)
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
def _run_benchmark(
num_frames: int = 30,
vo_failures: list[int] | None = None,
with_sat: bool = True,
waypoints: list[tuple[float, float]] | None = None,
) -> BenchmarkResult:
"""Build and replay a synthetic trajectory, return BenchmarkResult."""
cfg = SyntheticTrajectoryConfig(
origin=ORIGIN,
speed_mps=20.0,
heading_deg=0.0,
num_frames=num_frames,
vo_noise_m=0.3,
imu_hz=50.0, # reduced rate for test speed
camera_fps=0.7,
vo_failure_frames=vo_failures or [],
waypoints_enu=waypoints or [],
)
gen = SyntheticTrajectory(cfg)
frames = gen.generate()
sat_fn = None if with_sat else (lambda _: None)
bench = AccuracyBenchmark(sat_correction_fn=sat_fn)
return bench.run(frames, ORIGIN, satellite_keyframe_interval=5)
# ---------------------------------------------------------------
# AC-PERF-8: Trajectory length
# ---------------------------------------------------------------
def test_trajectory_frame_count():
"""AC-PERF-8: Generated trajectory has exactly num_frames frames."""
for n in [10, 30, 50]:
cfg = SyntheticTrajectoryConfig(num_frames=n, imu_hz=10.0)
frames = SyntheticTrajectory(cfg).generate()
assert len(frames) == n
def test_trajectory_frame_ids_sequential():
"""Frame IDs are 0..N-1."""
cfg = SyntheticTrajectoryConfig(num_frames=10, imu_hz=10.0)
frames = SyntheticTrajectory(cfg).generate()
assert [f.frame_id for f in frames] == list(range(10))
def test_trajectory_positions_increase_northward():
"""Heading=0° (North) → North component strictly increasing."""
cfg = SyntheticTrajectoryConfig(num_frames=5, heading_deg=0.0, speed_mps=20.0, imu_hz=10.0)
frames = SyntheticTrajectory(cfg).generate()
norths = [f.true_position_enu[1] for f in frames]
for a, b in zip(norths, norths[1:]):
assert b > a, "North component should increase for heading=0°"
# ---------------------------------------------------------------
# AC-PERF-9: BenchmarkResult percentage helpers
# ---------------------------------------------------------------
def test_pct_within_50m_all_inside():
result = BenchmarkResult(
errors_m=[10.0, 20.0, 49.9],
latencies_ms=[10.0, 10.0, 10.0],
frames_total=3,
frames_with_good_estimate=3,
)
assert result.pct_within_50m == pytest.approx(1.0)
def test_pct_within_50m_mixed():
result = BenchmarkResult(
errors_m=[10.0, 60.0, 30.0, 80.0],
latencies_ms=[5.0] * 4,
frames_total=4,
frames_with_good_estimate=4,
)
assert result.pct_within_50m == pytest.approx(0.5)
def test_pct_within_20m():
result = BenchmarkResult(
errors_m=[5.0, 15.0, 25.0, 50.0],
latencies_ms=[5.0] * 4,
frames_total=4,
frames_with_good_estimate=4,
)
assert result.pct_within_20m == pytest.approx(0.5)
def test_p80_error_m():
"""80th percentile computed correctly (numpy linear interpolation)."""
errors = list(range(1, 11)) # 1..10
result = BenchmarkResult(
errors_m=errors, latencies_ms=[1.0] * 10,
frames_total=10, frames_with_good_estimate=10,
)
expected = float(np.percentile(errors, 80))
assert result.p80_error_m == pytest.approx(expected, abs=0.01)
# ---------------------------------------------------------------
# AC-PERF-7: Summary string
# ---------------------------------------------------------------
def test_benchmark_summary_non_empty():
"""AC-PERF-7: summary() returns non-empty string with key metrics."""
result = BenchmarkResult(
errors_m=[5.0, 10.0, 20.0],
latencies_ms=[50.0, 60.0, 55.0],
frames_total=3,
frames_with_good_estimate=3,
)
summary = result.summary()
assert len(summary) > 50
assert "PASS" in summary or "FAIL" in summary
assert "50m" in summary
assert "20m" in summary
# ---------------------------------------------------------------
# AC-PERF-3: Latency < 400ms
# ---------------------------------------------------------------
def test_per_frame_latency_under_400ms():
"""AC-PERF-3: p95 per-frame latency < 400ms on synthetic trajectory."""
result = _run_benchmark(num_frames=20)
assert result.p95_latency_ms < 400.0, (
f"p95 latency {result.p95_latency_ms:.1f}ms exceeds 400ms budget"
)
# ---------------------------------------------------------------
# AC-PERF-10: Accuracy with satellite corrections
# ---------------------------------------------------------------
def test_median_error_with_sat_corrections():
"""AC-PERF-10: Median error < 30m over 30-frame flight with sat corrections."""
result = _run_benchmark(num_frames=30, with_sat=True)
assert result.median_error_m < 30.0, (
f"Median error {result.median_error_m:.1f}m with sat corrections — expected <30m"
)
def test_pct_within_50m_with_sat_corrections():
"""AC-PERF-1: ≥80% frames within 50m when satellite corrections are active."""
result = _run_benchmark(num_frames=40, with_sat=True)
assert result.pct_within_50m >= 0.80, (
f"Only {result.pct_within_50m*100:.1f}% of frames within 50m "
f"(expected ≥80%) — median error: {result.median_error_m:.1f}m"
)
def test_pct_within_20m_with_sat_corrections():
"""AC-PERF-2: ≥60% frames within 20m when satellite corrections are active."""
result = _run_benchmark(num_frames=40, with_sat=True)
assert result.pct_within_20m >= 0.60, (
f"Only {result.pct_within_20m*100:.1f}% of frames within 20m "
f"(expected ≥60%) — median error: {result.median_error_m:.1f}m"
)
# ---------------------------------------------------------------
# AC-PERF-11: VO failures don't crash
# ---------------------------------------------------------------
def test_vo_failure_frames_no_crash():
"""AC-PERF-11: Frames marked as VO failure are handled without crash."""
result = _run_benchmark(num_frames=20, vo_failures=[3, 7, 12])
assert result.frames_total == 20
assert len(result.errors_m) == 20
def test_all_frames_vo_failure():
"""All frames fail VO — ESKF degrades gracefully (IMU-only)."""
result = _run_benchmark(num_frames=10, vo_failures=list(range(10)), with_sat=False)
# With no VO and no sat, errors grow but benchmark doesn't crash
assert len(result.errors_m) == 10
assert all(math.isfinite(e) or e == float("inf") for e in result.errors_m)
# ---------------------------------------------------------------
# AC-PERF-12: Waypoint steering
# ---------------------------------------------------------------
def test_waypoint_steering_changes_direction():
"""AC-PERF-12: Waypoint steering causes trajectory to turn toward target."""
# Waypoint 500m East, 0m North (forces eastward turn from northward heading)
result = _run_benchmark(
num_frames=15,
waypoints=[(500.0, 0.0)],
with_sat=True,
)
# Benchmark runs without error; basic sanity
assert result.frames_total == 15
# ---------------------------------------------------------------
# AC-PERF-4: VO drift over 1 km straight segment
# ---------------------------------------------------------------
def test_vo_drift_under_100m_over_1km():
"""AC-PERF-4: VO drift (no sat correction) over 1 km < 100 m."""
bench = AccuracyBenchmark()
drift_m = bench.run_vo_drift_test(trajectory_length_m=1000.0, speed_mps=20.0)
assert drift_m < 100.0, (
f"VO drift {drift_m:.1f}m over 1km — solution.md limit is 100m"
)
# ---------------------------------------------------------------
# AC-PERF-6: Covariance shrinks after satellite update
# ---------------------------------------------------------------
def test_covariance_shrinks_after_satellite_update():
"""AC-PERF-6: ESKF position covariance trace decreases after satellite correction."""
from gps_denied.core.eskf import ESKF
from gps_denied.schemas.eskf import ESKFConfig
eskf = ESKF(ESKFConfig())
eskf.initialize(np.zeros(3), time.time())
cov_before = float(np.trace(eskf._P[0:3, 0:3]))
# Inject satellite measurement at ground truth position
eskf.update_satellite(np.zeros(3), noise_meters=10.0)
cov_after = float(np.trace(eskf._P[0:3, 0:3]))
assert cov_after < cov_before, (
f"Covariance trace did not shrink: before={cov_before:.2f}, after={cov_after:.2f}"
)
# ---------------------------------------------------------------
# AC-PERF-5: Confidence tier transitions
# ---------------------------------------------------------------
def test_confidence_high_after_fresh_satellite():
"""AC-PERF-5: HIGH confidence when satellite correction is recent + covariance small."""
from gps_denied.core.eskf import ESKF
from gps_denied.schemas.eskf import ConfidenceTier, ESKFConfig, IMUMeasurement
cfg = ESKFConfig(satellite_max_age=30.0, covariance_high_threshold=400.0)
eskf = ESKF(cfg)
eskf.initialize(np.zeros(3), time.time())
# Inject satellite correction (forces small covariance)
eskf.update_satellite(np.zeros(3), noise_meters=5.0)
# Manually set last satellite timestamp to now
eskf._last_satellite_time = eskf._last_timestamp
tier = eskf.get_confidence()
assert tier == ConfidenceTier.HIGH, f"Expected HIGH after fresh sat, got {tier}"
def test_confidence_medium_after_vo_only():
"""AC-PERF-5: MEDIUM confidence when only VO is available (no satellite)."""
from gps_denied.core.eskf import ESKF
from gps_denied.schemas.eskf import ConfidenceTier, ESKFConfig
eskf = ESKF(ESKFConfig())
eskf.initialize(np.zeros(3), time.time())
# Fake VO update (set _last_vo_time to now)
eskf._last_vo_time = eskf._last_timestamp
eskf._last_satellite_time = None
tier = eskf.get_confidence()
assert tier == ConfidenceTier.MEDIUM, f"Expected MEDIUM with VO only, got {tier}"
def test_confidence_failed_after_3_consecutive():
"""AC-PERF-5: FAILED confidence when consecutive_failures >= 3."""
from gps_denied.core.eskf import ESKF
from gps_denied.schemas.eskf import ConfidenceTier
eskf = ESKF()
eskf.initialize(np.zeros(3), time.time())
tier = eskf.get_confidence(consecutive_failures=3)
assert tier == ConfidenceTier.FAILED
# ---------------------------------------------------------------
# passes_acceptance_criteria integration
# ---------------------------------------------------------------
def test_passes_acceptance_criteria_full_pass():
"""passes_acceptance_criteria returns (True, all-True) for ideal data."""
result = BenchmarkResult(
errors_m=[5.0] * 100, # all within 5m → 100% within 50m and 20m
latencies_ms=[10.0] * 100, # all 10ms → p95 = 10ms
frames_total=100,
frames_with_good_estimate=100,
)
overall, checks = result.passes_acceptance_criteria()
assert overall is True
assert all(checks.values())
def test_passes_acceptance_criteria_latency_fail():
"""passes_acceptance_criteria fails when latency exceeds 400ms."""
result = BenchmarkResult(
errors_m=[5.0] * 100,
latencies_ms=[500.0] * 100, # all 500ms → p95 > 400ms
frames_total=100,
frames_with_good_estimate=100,
)
overall, checks = result.passes_acceptance_criteria()
assert overall is False
assert checks["AC-PERF-3: p95 latency < 400ms"] is False
def test_passes_acceptance_criteria_accuracy_fail():
"""passes_acceptance_criteria fails when less than 80% within 50m."""
result = BenchmarkResult(
errors_m=[60.0] * 100, # all 60m → 0% within 50m
latencies_ms=[5.0] * 100,
frames_total=100,
frames_with_good_estimate=100,
)
overall, checks = result.passes_acceptance_criteria()
assert overall is False
assert checks["AC-PERF-1: 80% within 50m"] is False
+64 -2
View File
@@ -35,7 +35,69 @@ def test_retrieve_candidate_tiles(gpr):
def test_retrieve_candidate_tiles_for_chunk(gpr):
imgs = [np.zeros((200, 200, 3), dtype=np.uint8) for _ in range(5)]
candidates = gpr.retrieve_candidate_tiles_for_chunk(imgs, top_k=3)
assert len(candidates) == 3
# Ensure they are sorted
# Ensure they are sorted descending (GPR-03)
assert candidates[0].similarity_score >= candidates[1].similarity_score
# ---------------------------------------------------------------
# GPR-01: Real Faiss index with file path
# ---------------------------------------------------------------
def test_load_index_missing_file_falls_back(tmp_path):
"""GPR-01: non-existent index path → numpy fallback, still usable."""
from gps_denied.core.models import ModelManager
from gps_denied.core.gpr import GlobalPlaceRecognition
g = GlobalPlaceRecognition(ModelManager())
ok = g.load_index("f1", str(tmp_path / "nonexistent.index"))
assert ok is True
assert g._is_loaded is True
# Should still answer queries
img = np.zeros((200, 200, 3), dtype=np.uint8)
cands = g.retrieve_candidate_tiles(img, top_k=3)
assert len(cands) == 3
def test_load_index_not_loaded_returns_empty():
"""query_database before load_index → empty list (no crash)."""
from gps_denied.core.models import ModelManager
from gps_denied.core.gpr import GlobalPlaceRecognition
g = GlobalPlaceRecognition(ModelManager())
desc = np.random.rand(4096).astype(np.float32)
matches = g.query_database(desc, top_k=5)
assert matches == []
# ---------------------------------------------------------------
# GPR-03: Ranking is deterministic (sorted by similarity)
# ---------------------------------------------------------------
def test_rank_candidates_sorted(gpr):
"""rank_candidates must return descending similarity order."""
from gps_denied.schemas.gpr import TileCandidate
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.satellite import TileBounds
dummy_bounds = TileBounds(
nw=GPSPoint(lat=49.1, lon=32.0), ne=GPSPoint(lat=49.1, lon=32.1),
sw=GPSPoint(lat=49.0, lon=32.0), se=GPSPoint(lat=49.0, lon=32.1),
center=GPSPoint(lat=49.05, lon=32.05), gsd=0.6,
)
cands = [
TileCandidate(tile_id="a", gps_center=GPSPoint(lat=49, lon=32), bounds=dummy_bounds, similarity_score=0.3, rank=3),
TileCandidate(tile_id="b", gps_center=GPSPoint(lat=49, lon=32), bounds=dummy_bounds, similarity_score=0.9, rank=1),
TileCandidate(tile_id="c", gps_center=GPSPoint(lat=49, lon=32), bounds=dummy_bounds, similarity_score=0.6, rank=2),
]
ranked = gpr.rank_candidates(cands)
scores = [c.similarity_score for c in ranked]
assert scores == sorted(scores, reverse=True)
def test_descriptor_is_l2_normalised(gpr):
"""DINOv2 descriptor returned by compute_location_descriptor is unit-norm."""
img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
desc = gpr.compute_location_descriptor(img)
assert np.isclose(np.linalg.norm(desc), 1.0, atol=1e-5)
+1 -1
View File
@@ -4,7 +4,7 @@ import numpy as np
import pytest
from gps_denied.core.graph import FactorGraphOptimizer
from gps_denied.schemas.flight import GPSPoint
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.graph import FactorGraphConfig
from gps_denied.schemas.vo import RelativePose
from gps_denied.schemas.metric import Sim3Transform
+288
View File
@@ -0,0 +1,288 @@
"""Tests for MAVLink I/O Bridge (Phase 4).
MAV-01: GPS_INPUT sent at configured rate.
MAV-02: ESKF state correctly mapped to GPS_INPUT fields.
MAV-03: IMU receive callback invoked.
MAV-04: 3 consecutive failures trigger re-localisation request.
MAV-05: Telemetry sent at 1 Hz.
"""
import asyncio
import math
import time
import numpy as np
import pytest
from gps_denied.core.mavlink import (
MAVLinkBridge,
MockMAVConnection,
_confidence_to_fix_type,
_eskf_to_gps_input,
_unix_to_gps_time,
)
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ConfidenceTier, ESKFState
from gps_denied.schemas.mavlink import GPSInputMessage, RelocalizationRequest
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
def _make_state(
pos=(0.0, 0.0, 0.0),
vel=(0.0, 0.0, 0.0),
confidence=ConfidenceTier.HIGH,
cov_scale=1.0,
) -> ESKFState:
cov = np.eye(15) * cov_scale
return ESKFState(
position=np.array(pos),
velocity=np.array(vel),
quaternion=np.array([1.0, 0.0, 0.0, 0.0]),
accel_bias=np.zeros(3),
gyro_bias=np.zeros(3),
covariance=cov,
timestamp=time.time(),
confidence=confidence,
)
ORIGIN = GPSPoint(lat=49.0, lon=32.0)
# ---------------------------------------------------------------
# GPS time helpers
# ---------------------------------------------------------------
def test_unix_to_gps_time_epoch():
"""GPS epoch (Unix=315964800) should be week=0, ms=0."""
week, ms = _unix_to_gps_time(315_964_800.0)
assert week == 0
assert ms == 0
def test_unix_to_gps_time_recent():
"""Recent timestamp must produce a valid week and ms-of-week."""
week, ms = _unix_to_gps_time(time.time())
assert week > 2000 # GPS week > 2000 in 2024+
assert 0 <= ms < 7 * 86400 * 1000
# ---------------------------------------------------------------
# MAV-02: ESKF → GPS_INPUT field mapping
# ---------------------------------------------------------------
def test_confidence_to_fix_type():
"""MAV-02: confidence tier → fix_type mapping."""
assert _confidence_to_fix_type(ConfidenceTier.HIGH) == 3
assert _confidence_to_fix_type(ConfidenceTier.MEDIUM) == 2
assert _confidence_to_fix_type(ConfidenceTier.LOW) == 0
assert _confidence_to_fix_type(ConfidenceTier.FAILED) == 0
def test_eskf_to_gps_input_position():
"""MAV-02: ENU position → degE7 lat/lon."""
# 1° lat ≈ 111319.5 m; move 111319.5 m North → lat + 1°
state = _make_state(pos=(0.0, 111_319.5, 0.0))
msg = _eskf_to_gps_input(state, ORIGIN)
expected_lat = int((ORIGIN.lat + 1.0) * 1e7)
assert abs(msg.lat - expected_lat) <= 10 # within 1 µ-degree tolerance
def test_eskf_to_gps_input_lon():
"""MAV-02: East displacement → longitude shift."""
cos_lat = math.cos(math.radians(ORIGIN.lat))
east_1deg = 111_319.5 * cos_lat
state = _make_state(pos=(east_1deg, 0.0, 0.0))
msg = _eskf_to_gps_input(state, ORIGIN)
expected_lon = int((ORIGIN.lon + 1.0) * 1e7)
assert abs(msg.lon - expected_lon) <= 10
def test_eskf_to_gps_input_velocity_ned():
"""MAV-02: ENU velocity → NED (vn=North, ve=East, vd=-Up)."""
state = _make_state(vel=(3.0, 4.0, 1.0)) # ENU: E=3, N=4, U=1
msg = _eskf_to_gps_input(state, ORIGIN)
assert math.isclose(msg.vn, 4.0, abs_tol=1e-3) # North = ENU[1]
assert math.isclose(msg.ve, 3.0, abs_tol=1e-3) # East = ENU[0]
assert math.isclose(msg.vd, -1.0, abs_tol=1e-3) # Down = -Up
def test_eskf_to_gps_input_accuracy_from_covariance():
"""MAV-02: accuracy fields derived from covariance diagonal."""
cov = np.eye(15)
cov[0, 0] = 100.0 # East variance → σ_E = 10 m
cov[1, 1] = 100.0 # North variance → σ_N = 10 m
state = ESKFState(
position=np.zeros(3), velocity=np.zeros(3),
quaternion=np.array([1.0, 0, 0, 0]),
accel_bias=np.zeros(3), gyro_bias=np.zeros(3),
covariance=cov, timestamp=time.time(),
confidence=ConfidenceTier.HIGH,
)
msg = _eskf_to_gps_input(state, ORIGIN)
assert math.isclose(msg.horiz_accuracy, 10.0, abs_tol=0.01)
def test_eskf_to_gps_input_returns_message():
"""_eskf_to_gps_input always returns a GPSInputMessage."""
msg = _eskf_to_gps_input(_make_state(), ORIGIN)
assert isinstance(msg, GPSInputMessage)
assert msg.fix_type == 3 # HIGH → 3D fix
# ---------------------------------------------------------------
# MAVLinkBridge — MockMAVConnection path
# ---------------------------------------------------------------
@pytest.fixture
def bridge():
b = MAVLinkBridge(connection_string="mock://", output_hz=10.0, telemetry_hz=1.0)
b._conn = MockMAVConnection()
b._origin = ORIGIN
return b
def test_bridge_build_gps_input_no_state(bridge):
"""build_gps_input returns None before any state is pushed."""
assert bridge.build_gps_input() is None
def test_bridge_build_gps_input_with_state(bridge):
"""build_gps_input returns a message once state is set."""
bridge.update_state(_make_state(), altitude_m=600.0)
msg = bridge.build_gps_input()
assert msg is not None
assert msg.fix_type == 3
# ---------------------------------------------------------------
# MAV-01: GPS output loop sends at configured rate
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_gps_output_sends_messages(bridge):
"""MAV-01: After N iterations the mock connection has GPS_INPUT records."""
bridge.update_state(_make_state(), altitude_m=500.0)
bridge._running = True
# Run one iteration manually
await bridge._gps_output_loop.__wrapped__(bridge) if hasattr(
bridge._gps_output_loop, "__wrapped__"
) else None
# Directly call _send_gps_input
msg = bridge.build_gps_input()
bridge._send_gps_input(msg)
sent = [s for s in bridge._conn._sent if s["type"] == "GPS_INPUT"]
assert len(sent) >= 1
# ---------------------------------------------------------------
# MAV-04: Consecutive failure detection
# ---------------------------------------------------------------
def test_consecutive_failure_counter_resets_on_good_state(bridge):
"""update_state with HIGH confidence resets failure counter."""
bridge._consecutive_failures = 5
bridge.update_state(_make_state(confidence=ConfidenceTier.HIGH))
assert bridge._consecutive_failures == 0
def test_consecutive_failure_counter_increments_on_low(bridge):
"""update_state with LOW confidence increments failure counter."""
bridge._consecutive_failures = 0
bridge.update_state(_make_state(confidence=ConfidenceTier.LOW))
assert bridge._consecutive_failures == 1
bridge.update_state(_make_state(confidence=ConfidenceTier.LOW))
assert bridge._consecutive_failures == 2
def test_reloc_request_triggered_after_3_failures(bridge):
"""MAV-04: after 3 failures the re-localisation callback is called."""
received: list[RelocalizationRequest] = []
bridge.set_reloc_callback(received.append)
bridge._origin = ORIGIN
bridge._last_state = _make_state()
bridge._consecutive_failures = 3
bridge._send_reloc_request()
assert len(received) == 1
assert received[0].consecutive_failures == 3
# Must include last known position
assert received[0].last_lat is not None
assert received[0].last_lon is not None
def test_reloc_request_sent_to_mock_conn(bridge):
"""MAV-04: NAMED_VALUE_FLOAT messages written to mock connection."""
bridge._last_state = _make_state()
bridge._consecutive_failures = 3
bridge._send_reloc_request()
reloc = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"]
assert len(reloc) >= 1
# ---------------------------------------------------------------
# MAV-05: Telemetry
# ---------------------------------------------------------------
def test_telemetry_sends_named_value_float(bridge):
"""MAV-05: _send_telemetry writes NAMED_VALUE_FLOAT records."""
bridge._last_state = _make_state(confidence=ConfidenceTier.MEDIUM)
bridge._send_telemetry()
telem = [s for s in bridge._conn._sent if s["type"] == "NAMED_VALUE_FLOAT"]
names = {s["kwargs"]["name"] for s in telem}
assert "CONF_SCORE" in names
assert "DRIFT_M" in names
def test_telemetry_confidence_score_values(bridge):
"""MAV-05: confidence score matches tier mapping."""
for tier, expected in [
(ConfidenceTier.HIGH, 1.0),
(ConfidenceTier.MEDIUM, 0.6),
(ConfidenceTier.LOW, 0.2),
(ConfidenceTier.FAILED, 0.0),
]:
bridge._conn._sent.clear()
bridge._last_state = _make_state(confidence=tier)
bridge._send_telemetry()
conf = next(s for s in bridge._conn._sent if s["kwargs"]["name"] == "CONF_SCORE")
assert math.isclose(conf["kwargs"]["value"], expected, abs_tol=1e-6)
# ---------------------------------------------------------------
# MAV-03: IMU callback
# ---------------------------------------------------------------
def test_imu_callback_set_and_called(bridge):
"""MAV-03: IMU callback registered and invokable."""
received = []
cb = received.append
bridge.set_imu_callback(cb)
assert bridge._on_imu is cb
# Simulate calling it
from gps_denied.schemas.eskf import IMUMeasurement
imu = IMUMeasurement(accel=np.zeros(3), gyro=np.zeros(3), timestamp=time.time())
bridge._on_imu(imu)
assert len(received) == 1
@pytest.mark.asyncio
async def test_start_stop(tmp_path):
"""Bridge start/stop completes without errors (mock mode)."""
b = MAVLinkBridge(connection_string="mock://", output_hz=50.0)
await b.start(ORIGIN)
await asyncio.sleep(0.05)
await b.stop()
assert not b._running
+53 -6
View File
@@ -5,7 +5,7 @@ import pytest
from gps_denied.core.metric import MetricRefinement
from gps_denied.core.models import ModelManager
from gps_denied.schemas.flight import GPSPoint
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult
from gps_denied.schemas.satellite import TileBounds
@@ -39,22 +39,69 @@ def test_extract_gps_from_alignment(metric, bounds):
assert np.isclose(gps.lon, 32.5)
def test_align_to_satellite(metric, bounds, monkeypatch):
# Monkeypatch random to ensure matched=True and high inliers
def mock_infer(*args, **kwargs):
H = np.eye(3, dtype=np.float64)
return {"homography": H, "inlier_count": 80, "confidence": 0.8}
return {"homography": H, "inlier_count": 80, "total_correspondences": 100, "confidence": 0.8, "reprojection_error": 1.0}
engine = metric.model_manager.get_inference_engine("LiteSAM")
monkeypatch.setattr(engine, "infer", mock_infer)
uav = np.zeros((256, 256, 3))
sat = np.zeros((256, 256, 3))
res = metric.align_to_satellite(uav, sat, bounds)
assert res is not None
assert isinstance(res, AlignmentResult)
assert res.matched is True
assert res.inlier_count == 80
# SAT-04: confidence = inlier_ratio
assert np.isclose(res.confidence, 80 / 100)
# ---------------------------------------------------------------
# SAT-03: GSD normalization
# ---------------------------------------------------------------
def test_normalize_gsd_downsamples(metric):
"""UAV frame at 0.16 m/px downsampled to satellite 0.6 m/px."""
uav = np.zeros((480, 640, 3), dtype=np.uint8)
out = metric.normalize_gsd(uav, uav_gsd_mpp=0.16, sat_gsd_mpp=0.6)
# Should be roughly 640 * (0.16/0.6) ≈ 170 wide
assert out.shape[1] < 640
assert out.shape[0] < 480
def test_normalize_gsd_no_downscale_needed(metric):
"""UAV GSD already coarser than satellite → image unchanged."""
uav = np.zeros((256, 256, 3), dtype=np.uint8)
out = metric.normalize_gsd(uav, uav_gsd_mpp=0.8, sat_gsd_mpp=0.6)
assert out.shape == uav.shape
def test_normalize_gsd_zero_args(metric):
"""Zero GSD args → image returned unchanged (guard against divide-by-zero)."""
uav = np.zeros((100, 100, 3), dtype=np.uint8)
out = metric.normalize_gsd(uav, uav_gsd_mpp=0.0, sat_gsd_mpp=0.6)
assert out.shape == uav.shape
# ---------------------------------------------------------------
# SAT-04: confidence = inlier ratio via align_to_satellite
# ---------------------------------------------------------------
def test_align_confidence_is_inlier_ratio(metric, bounds, monkeypatch):
"""SAT-04: returned confidence must equal inlier_count / total_correspondences."""
def mock_infer(*args, **kwargs):
H = np.eye(3, dtype=np.float64)
return {"homography": H, "inlier_count": 60, "total_correspondences": 150,
"confidence": 0.4, "reprojection_error": 1.0}
engine = metric.model_manager.get_inference_engine("LiteSAM")
monkeypatch.setattr(engine, "infer", mock_infer)
res = metric.align_to_satellite(np.zeros((256, 256, 3)), np.zeros((256, 256, 3)), bounds)
if res is not None:
assert np.isclose(res.confidence, 60 / 150)
def test_align_chunk_to_satellite(metric, bounds, monkeypatch):
def mock_infer(*args, **kwargs):
+44 -9
View File
@@ -17,19 +17,30 @@ def pipeline(tmp_path):
def test_batch_validation(pipeline):
# Too few images
b1 = ImageBatch(images=[b"1", b"2"], filenames=["1.jpg", "2.jpg"], start_sequence=1, end_sequence=2, batch_number=1)
val = pipeline.validate_batch(b1)
assert not val.valid
assert "Batch is empty" in val.errors
# VO-05: minimum batch size is now 1 (not 10)
# Zero images is still invalid
b0 = ImageBatch(images=[], filenames=[], start_sequence=1, end_sequence=0, batch_number=1)
val0 = pipeline.validate_batch(b0)
assert not val0.valid
assert "Batch is empty" in val0.errors
# Let's mock a valid batch of 10 images
fake_imgs = [b"fake"] * 10
fake_names = [f"AD{i:06d}.jpg" for i in range(1, 11)]
b2 = ImageBatch(images=fake_imgs, filenames=fake_names, start_sequence=1, end_sequence=10, batch_number=1)
# Single image is now valid
b1 = ImageBatch(images=[b"fake"], filenames=["AD000001.jpg"], start_sequence=1, end_sequence=1, batch_number=1)
val1 = pipeline.validate_batch(b1)
assert val1.valid, f"Single-image batch should be valid; errors: {val1.errors}"
# 2-image batch — also valid under new rule
b2 = ImageBatch(images=[b"1", b"2"], filenames=["AD000001.jpg", "AD000002.jpg"], start_sequence=1, end_sequence=2, batch_number=1)
val2 = pipeline.validate_batch(b2)
assert val2.valid
# Large valid batch
fake_imgs = [b"fake"] * 10
fake_names = [f"AD{i:06d}.jpg" for i in range(1, 11)]
b10 = ImageBatch(images=fake_imgs, filenames=fake_names, start_sequence=1, end_sequence=10, batch_number=1)
val10 = pipeline.validate_batch(b10)
assert val10.valid
@pytest.mark.asyncio
async def test_queue_and_process(pipeline):
@@ -69,6 +80,30 @@ async def test_queue_and_process(pipeline):
assert next_img2.sequence == 2
@pytest.mark.asyncio
async def test_exact_sequence_lookup_no_collision(pipeline, tmp_path):
"""VO-05: sequence 1 must NOT match AD000011.jpg or AD000010.jpg."""
flight_id = "exact_test"
fake_img_np = np.zeros((10, 10, 3), dtype=np.uint8)
_, encoded = cv2.imencode(".jpg", fake_img_np)
fake_bytes = encoded.tobytes()
# Sequences 1 and 11 stored in the same flight
names = ["AD000001.jpg", "AD000011.jpg"]
imgs = [fake_bytes, fake_bytes]
b = ImageBatch(images=imgs, filenames=names, start_sequence=1, end_sequence=11, batch_number=1)
pipeline.queue_batch(flight_id, b)
await pipeline.process_next_batch(flight_id)
img1 = pipeline.get_image_by_sequence(flight_id, 1)
img11 = pipeline.get_image_by_sequence(flight_id, 11)
assert img1 is not None
assert img1.filename == "AD000001.jpg", f"Expected AD000001.jpg, got {img1.filename}"
assert img11 is not None
assert img11.filename == "AD000011.jpg", f"Expected AD000011.jpg, got {img11.filename}"
def test_queue_full(pipeline):
flight_id = "test_full"
fake_imgs = [b"fake"] * 10
+337
View File
@@ -0,0 +1,337 @@
"""Phase 5 pipeline wiring tests.
PIPE-01: VO result feeds into ESKF update_vo.
PIPE-02: SatelliteDataManager + CoordinateTransformer wired into process_frame.
PIPE-04: Failure counter resets on recovery; MAVLink reloc triggered at threshold.
PIPE-05: ImageRotationManager initialised on first frame.
PIPE-06: convert_object_to_gps uses CoordinateTransformer pixel_to_gps.
PIPE-07: ESKF state pushed to MAVLinkBridge on every frame.
PIPE-08: ImageRotationManager accepts optional model_manager arg.
"""
import time
import numpy as np
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from gps_denied.core.processor import FlightProcessor, TrackingState
from gps_denied.core.eskf import ESKF
from gps_denied.core.rotation import ImageRotationManager
from gps_denied.core.coordinates import CoordinateTransformer
from gps_denied.schemas import GPSPoint, CameraParameters
from gps_denied.schemas.eskf import ConfidenceTier, ESKFConfig
from gps_denied.schemas.vo import RelativePose
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
ORIGIN = GPSPoint(lat=49.0, lon=32.0)
def _make_processor(with_coord=True, with_mavlink=True, with_satellite=False):
repo = MagicMock()
streamer = MagicMock()
streamer.push_event = AsyncMock()
proc = FlightProcessor(repo, streamer)
coord = CoordinateTransformer() if with_coord else None
if coord:
coord.set_enu_origin("fl1", ORIGIN)
coord.set_enu_origin("fl2", ORIGIN)
coord.set_enu_origin("fl_cycle", ORIGIN)
mavlink = MagicMock() if with_mavlink else None
proc.attach_components(coord=coord, mavlink=mavlink)
return proc, coord, mavlink
def _init_eskf(proc, flight_id, origin=ORIGIN, altitude=100.0):
"""Seed ESKF for a flight so process_frame can use it."""
proc._init_eskf_for_flight(flight_id, origin, altitude)
proc._altitudes[flight_id] = altitude
# ---------------------------------------------------------------
# PIPE-08: ImageRotationManager accepts optional model_manager
# ---------------------------------------------------------------
def test_rotation_manager_no_args():
"""PIPE-08: ImageRotationManager() with no args still works."""
rm = ImageRotationManager()
assert rm._model_manager is None
def test_rotation_manager_with_model_manager():
"""PIPE-08: ImageRotationManager accepts model_manager kwarg."""
mm = MagicMock()
rm = ImageRotationManager(model_manager=mm)
assert rm._model_manager is mm
# ---------------------------------------------------------------
# PIPE-05: Rotation manager initialised on first frame
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_first_frame_seeds_rotation_history():
"""PIPE-05: First frame call to process_frame seeds HeadingHistory."""
proc, _, _ = _make_processor()
rm = ImageRotationManager()
proc._rotation = rm
flight = "fl_rot"
proc._prev_images[flight] = np.zeros((100, 100, 3), dtype=np.uint8)
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
# HeadingHistory entry should exist after first frame
assert flight in rm._history
# ---------------------------------------------------------------
# PIPE-01: ESKF VO update
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_eskf_vo_update_called_on_good_tracking():
"""PIPE-01: When VO tracking_good=True, eskf.update_vo is called."""
proc, _, _ = _make_processor()
flight = "fl_vo"
_init_eskf(proc, flight)
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
# Seed previous frame
proc._prev_images[flight] = img0
# Mock VO to return good tracking
good_pose = RelativePose(
translation=np.array([1.0, 0.0, 0.0]),
rotation=np.eye(3),
covariance=np.eye(6),
confidence=0.9,
inlier_count=50,
total_matches=60,
tracking_good=True,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = good_pose
proc._vo = mock_vo
eskf_before_pos = proc._eskf[flight]._nominal_state["position"].copy()
await proc.process_frame(flight, 1, img1)
eskf_after_pos = proc._eskf[flight]._nominal_state["position"].copy()
# ESKF position should have changed due to VO update
assert mock_vo.compute_relative_pose.called
# After update_vo the position should differ from initial zeros
# (VO innovation shifts position)
assert proc._eskf[flight].initialized
@pytest.mark.asyncio
async def test_failure_counter_increments_on_bad_vo():
"""PIPE-04: Consecutive failure counter increments when VO fails."""
proc, _, _ = _make_processor()
flight = "fl_fail"
_init_eskf(proc, flight)
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.zeros((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
bad_pose = RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0.0, inlier_count=0, total_matches=0, tracking_good=False,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = bad_pose
proc._vo = mock_vo
await proc.process_frame(flight, 1, img1)
assert proc._failure_counts.get(flight, 0) == 1
@pytest.mark.asyncio
async def test_failure_counter_resets_on_good_vo():
"""PIPE-04: Failure counter resets when VO succeeds."""
proc, _, _ = _make_processor()
flight = "fl_reset"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 5
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
good_pose = RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0.9, inlier_count=50, total_matches=60, tracking_good=True,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = good_pose
proc._vo = mock_vo
await proc.process_frame(flight, 1, img1)
assert proc._failure_counts[flight] == 0
@pytest.mark.asyncio
async def test_failure_counter_resets_on_recovery():
"""PIPE-04: Failure counter resets when recovery succeeds."""
proc, _, _ = _make_processor()
flight = "fl_rec"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 3
# Seed previous frame so VO is attempted
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
proc._flight_states[flight] = TrackingState.RECOVERY
# Mock recovery to succeed
mock_recovery = MagicMock()
mock_recovery.process_chunk_recovery.return_value = True
mock_chunk_mgr = MagicMock()
mock_chunk_mgr.get_active_chunk.return_value = MagicMock(chunk_id="c1")
proc._recovery = mock_recovery
proc._chunk_mgr = mock_chunk_mgr
result = await proc.process_frame(flight, 2, img1)
assert result.alignment_success is True
assert proc._failure_counts[flight] == 0
# ---------------------------------------------------------------
# PIPE-07: ESKF state pushed to MAVLink
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_mavlink_state_pushed_per_frame():
"""PIPE-07: MAVLinkBridge.update_state called on every frame with ESKF."""
proc, _, mavlink = _make_processor()
flight = "fl_mav"
_init_eskf(proc, flight)
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
mavlink.update_state.assert_called_once()
args, kwargs = mavlink.update_state.call_args
# First positional arg is ESKFState
from gps_denied.schemas.eskf import ESKFState
assert isinstance(args[0], ESKFState)
assert kwargs.get("altitude_m") == 100.0
@pytest.mark.asyncio
async def test_mavlink_not_called_without_eskf():
"""PIPE-07: No MAVLink call if ESKF not initialized for flight."""
proc, _, mavlink = _make_processor()
# Do NOT call _init_eskf_for_flight → ESKF absent
flight = "fl_nomav"
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
mavlink.update_state.assert_not_called()
# ---------------------------------------------------------------
# PIPE-06: convert_object_to_gps uses CoordinateTransformer
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_convert_object_to_gps_uses_coord_transformer():
"""PIPE-06: pixel_to_gps called via CoordinateTransformer."""
proc, coord, _ = _make_processor()
flight = "fl_obj"
coord.set_enu_origin(flight, ORIGIN)
_init_eskf(proc, flight)
proc._flight_cameras[flight] = CameraParameters(
focal_length=4.5, sensor_width=6.17, sensor_height=4.55,
resolution_width=640, resolution_height=480,
)
response = await proc.convert_object_to_gps(flight, 0, (320.0, 240.0))
# Should return a valid GPS point (not the old hardcoded 48.0, 37.0)
assert response.gps is not None
# The result should be near the origin (ENU origin + ray projection)
assert abs(response.gps.lat - ORIGIN.lat) < 1.0
assert abs(response.gps.lon - ORIGIN.lon) < 1.0
@pytest.mark.asyncio
async def test_convert_object_to_gps_fallback_without_coord():
"""PIPE-06: Falls back gracefully when no CoordinateTransformer is set."""
proc, _, _ = _make_processor(with_coord=False)
flight = "fl_nocoord"
_init_eskf(proc, flight)
response = await proc.convert_object_to_gps(flight, 0, (100.0, 100.0))
# Must return something (not crash), even without coord transformer
assert response.gps is not None
# ---------------------------------------------------------------
# ESKF initialization via create_flight
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_flight_initialises_eskf():
"""create_flight should seed ESKF for the new flight."""
from gps_denied.schemas.flight import FlightCreateRequest
from gps_denied.schemas import Geofences
proc, _, _ = _make_processor()
from datetime import datetime, timezone
flight_row = MagicMock()
flight_row.id = "fl_new"
flight_row.created_at = datetime.now(timezone.utc)
proc.repository.insert_flight = AsyncMock(return_value=flight_row)
proc.repository.insert_geofence = AsyncMock()
proc.repository.insert_waypoint = AsyncMock()
req = FlightCreateRequest(
name="test",
description="",
start_gps=ORIGIN,
altitude=150.0,
geofences=Geofences(polygons=[]),
rough_waypoints=[],
camera_params=CameraParameters(
focal_length=4.5, sensor_width=6.17, sensor_height=4.55,
resolution_width=640, resolution_height=480,
),
)
await proc.create_flight(req)
assert "fl_new" in proc._eskf
assert proc._eskf["fl_new"].initialized
assert proc._altitudes["fl_new"] == 150.0
# ---------------------------------------------------------------
# _cleanup_flight clears ESKF state
# ---------------------------------------------------------------
def test_cleanup_flight_removes_eskf():
"""_cleanup_flight should remove ESKF and related dicts."""
proc, _, _ = _make_processor()
flight = "fl_clean"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 2
proc._cleanup_flight(flight)
assert flight not in proc._eskf
assert flight not in proc._altitudes
assert flight not in proc._failure_counts
+1 -1
View File
@@ -36,7 +36,7 @@ def test_process_chunk_recovery_success(recovery, monkeypatch):
# Mock LitSAM to guarantee match
def mock_align(*args, **kwargs):
from gps_denied.schemas.metric import ChunkAlignmentResult, Sim3Transform
from gps_denied.schemas.flight import GPSPoint
from gps_denied.schemas import GPSPoint
return ChunkAlignmentResult(
matched=True, chunk_id="x", chunk_center_gps=GPSPoint(lat=49, lon=30),
rotation_angle=0, confidence=0.9, inlier_count=50,
+117 -49
View File
@@ -1,6 +1,4 @@
"""Tests for SatelliteDataManager (F04) and mercator utils (H06)."""
import asyncio
"""Tests for SatelliteDataManager (F04) — SAT-01/02 and mercator utils (H06)."""
import numpy as np
import pytest
@@ -10,12 +8,12 @@ from gps_denied.schemas import GPSPoint
from gps_denied.utils import mercator
# ---------------------------------------------------------------
# Mercator utils
# ---------------------------------------------------------------
def test_latlon_to_tile():
# Kyiv coordinates
lat = 50.4501
lon = 30.5234
zoom = 15
lat, lon, zoom = 50.4501, 30.5234, 15
coords = mercator.latlon_to_tile(lat, lon, zoom)
assert coords.zoom == 15
assert coords.x > 0
@@ -23,9 +21,7 @@ def test_latlon_to_tile():
def test_tile_to_latlon():
x, y, zoom = 19131, 10927, 15
gps = mercator.tile_to_latlon(x, y, zoom)
gps = mercator.tile_to_latlon(19131, 10927, 15)
assert 50.0 < gps.lat < 52.0
assert 30.0 < gps.lon < 31.0
@@ -33,60 +29,132 @@ def test_tile_to_latlon():
def test_tile_bounds():
coords = mercator.TileCoords(x=19131, y=10927, zoom=15)
bounds = mercator.compute_tile_bounds(coords)
# Northwest should be "higher" lat and "lower" lon than Southeast
assert bounds.nw.lat > bounds.se.lat
assert bounds.nw.lon < bounds.se.lon
assert bounds.gsd > 0
# ---------------------------------------------------------------
# SAT-01: Local tile storage (no HTTP)
# ---------------------------------------------------------------
@pytest.fixture
def satellite_manager(tmp_path):
# Use tmp_path for cache so we don't pollute workspace
sm = SatelliteDataManager(cache_dir=str(tmp_path / "cache"), max_size_gb=0.1)
yield sm
sm.cache.close()
asyncio.run(sm.http_client.aclose())
return SatelliteDataManager(tile_dir=str(tmp_path / "tiles"))
@pytest.mark.asyncio
async def test_satellite_fetch_and_cache(satellite_manager):
lat = 48.0
lon = 37.0
zoom = 12
flight_id = "test_flight"
# We won't test the actual HTTP Google API in CI to avoid blocks/bans,
# but we can test the cache mechanism directly.
coords = satellite_manager.compute_tile_coords(lat, lon, zoom)
# Create a fake image (blue square 256x256)
fake_img = np.zeros((256, 256, 3), dtype=np.uint8)
fake_img[:] = [255, 0, 0] # BGR
# Save to cache
success = satellite_manager.cache_tile(flight_id, coords, fake_img)
assert success is True
# Read from cache
cached = satellite_manager.get_cached_tile(flight_id, coords)
def test_load_local_tile_missing(satellite_manager):
"""Missing tile returns None — no crash."""
coords = mercator.TileCoords(x=0, y=0, zoom=12)
result = satellite_manager.load_local_tile(coords)
assert result is None
def test_save_and_load_local_tile(satellite_manager):
"""SAT-01: saved tile can be read back from the local directory."""
coords = mercator.TileCoords(x=19131, y=10927, zoom=15)
img = np.zeros((256, 256, 3), dtype=np.uint8)
img[:] = [0, 128, 255]
ok = satellite_manager.save_local_tile(coords, img)
assert ok is True
loaded = satellite_manager.load_local_tile(coords)
assert loaded is not None
assert loaded.shape == (256, 256, 3)
def test_mem_cache_hit(satellite_manager):
"""Tile loaded once should be served from memory on second request."""
coords = mercator.TileCoords(x=1, y=1, zoom=10)
img = np.ones((256, 256, 3), dtype=np.uint8) * 42
satellite_manager.save_local_tile(coords, img)
r1 = satellite_manager.load_local_tile(coords)
r2 = satellite_manager.load_local_tile(coords)
assert r1 is r2 # same object = came from mem cache
# ---------------------------------------------------------------
# SAT-02: ESKF ±3σ tile selection
# ---------------------------------------------------------------
def test_select_tiles_small_sigma(satellite_manager):
"""Very tight sigma → single tile covering the position."""
gps = GPSPoint(lat=50.45, lon=30.52)
tiles = satellite_manager.select_tiles_for_eskf_position(gps, sigma_h_m=1.0, zoom=18)
# Should produce at least the center tile
assert len(tiles) >= 1
for t in tiles:
assert t.zoom == 18
def test_select_tiles_large_sigma(satellite_manager):
"""Larger sigma → more tiles returned."""
gps = GPSPoint(lat=50.45, lon=30.52)
small = satellite_manager.select_tiles_for_eskf_position(gps, sigma_h_m=10.0, zoom=18)
large = satellite_manager.select_tiles_for_eskf_position(gps, sigma_h_m=200.0, zoom=18)
assert len(large) >= len(small)
def test_select_tiles_bounding_box(satellite_manager):
"""Selected tiles must span a bounding box that covers ±3σ."""
gps = GPSPoint(lat=49.0, lon=32.0)
sigma = 50.0 # 50 m → 3σ = 150 m
zoom = 18
tiles = satellite_manager.select_tiles_for_eskf_position(gps, sigma_h_m=sigma, zoom=zoom)
assert len(tiles) >= 1
# All returned tiles must be at the requested zoom
assert all(t.zoom == zoom for t in tiles)
# ---------------------------------------------------------------
# SAT-01: Mosaic assembly
# ---------------------------------------------------------------
def test_assemble_mosaic_single(satellite_manager):
"""Single tile → mosaic equals that tile (resized)."""
coords = mercator.TileCoords(x=10, y=10, zoom=15)
img = np.zeros((256, 256, 3), dtype=np.uint8)
mosaic, bounds = satellite_manager.assemble_mosaic([(coords, img)], target_size=256)
assert mosaic.shape == (256, 256, 3)
assert bounds.center is not None
def test_assemble_mosaic_2x2(satellite_manager):
"""2×2 tile grid assembles into a single mosaic."""
base = mercator.TileCoords(x=10, y=10, zoom=15)
tiles = [
(mercator.TileCoords(x=10, y=10, zoom=15), np.zeros((256, 256, 3), dtype=np.uint8)),
(mercator.TileCoords(x=11, y=10, zoom=15), np.zeros((256, 256, 3), dtype=np.uint8)),
(mercator.TileCoords(x=10, y=11, zoom=15), np.zeros((256, 256, 3), dtype=np.uint8)),
(mercator.TileCoords(x=11, y=11, zoom=15), np.zeros((256, 256, 3), dtype=np.uint8)),
]
mosaic, bounds = satellite_manager.assemble_mosaic(tiles, target_size=512)
assert mosaic.shape == (512, 512, 3)
def test_assemble_mosaic_empty(satellite_manager):
result = satellite_manager.assemble_mosaic([])
assert result is None
# ---------------------------------------------------------------
# Cache helpers (backward compat)
# ---------------------------------------------------------------
def test_cache_tile_compat(satellite_manager):
coords = mercator.TileCoords(x=100, y=100, zoom=12)
img = np.zeros((256, 256, 3), dtype=np.uint8)
assert satellite_manager.cache_tile("f1", coords, img) is True
cached = satellite_manager.get_cached_tile("f1", coords)
assert cached is not None
assert cached.shape == (256, 256, 3)
# Clear cache
satellite_manager.clear_flight_cache(flight_id)
assert satellite_manager.get_cached_tile(flight_id, coords) is None
def test_grid_calculations(satellite_manager):
# Test 3x3 grid (9 tiles)
center = mercator.TileCoords(x=100, y=100, zoom=15)
grid = satellite_manager.get_tile_grid(center, 9)
assert len(grid) == 9
# Ensure center is in grid
assert any(c.x == 100 and c.y == 100 for c in grid)
# Test expansion 9 -> 25
new_tiles = satellite_manager.expand_search_grid(center, 9, 25)
assert len(new_tiles) == 16 # 25 - 9
+328
View File
@@ -0,0 +1,328 @@
"""SITL Integration Tests — GPS_INPUT delivery to ArduPilot SITL.
These tests verify the full MAVLink GPS_INPUT pipeline against a real
ArduPilot SITL flight controller. They are **skipped** unless the
``ARDUPILOT_SITL_HOST`` environment variable is set.
Run via Docker Compose SITL harness:
docker compose -f docker-compose.sitl.yml run integration-tests
Or manually with SITL running locally:
ARDUPILOT_SITL_HOST=localhost ARDUPILOT_SITL_PORT=5762 pytest tests/test_sitl_integration.py -v
Test IDs:
SITL-01: MAVLink connection to ArduPilot SITL succeeds.
SITL-02: GPS_INPUT message accepted by SITL FC (GPS_RAW_INT shows 3D fix).
SITL-03: MAVLinkBridge.start/stop lifecycle with real connection.
SITL-04: IMU RAW_IMU callback fires after connecting to SITL.
SITL-05: 5 consecutive GPS_INPUT messages delivered within 1.1s (≥5 Hz).
SITL-06: Telemetry NAMED_VALUE_FLOAT messages reach SITL at 1 Hz.
SITL-07: After 3 consecutive FAILED-confidence updates, reloc request fires.
"""
from __future__ import annotations
import asyncio
import os
import socket
import time
from typing import Optional
import numpy as np
import pytest
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ConfidenceTier, ESKFState
# ---------------------------------------------------------------------------
# Skip guard — all tests in this file are skipped unless SITL is available
# ---------------------------------------------------------------------------
SITL_HOST = os.environ.get("ARDUPILOT_SITL_HOST", "")
SITL_PORT = int(os.environ.get("ARDUPILOT_SITL_PORT", "5762"))
_SITL_AVAILABLE = bool(SITL_HOST)
pytestmark = pytest.mark.skipif(
not _SITL_AVAILABLE,
reason="SITL integration tests require ARDUPILOT_SITL_HOST env var",
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_ORIGIN = GPSPoint(lat=49.0, lon=32.0)
_MAVLINK_CONN = f"tcp:{SITL_HOST}:{SITL_PORT}" if SITL_HOST else "mock://"
def _make_eskf_state(
pos=(0.0, 0.0, 0.0),
vel=(0.0, 0.0, 0.0),
confidence: ConfidenceTier = ConfidenceTier.HIGH,
cov_scale: float = 1.0,
) -> ESKFState:
cov = np.eye(15) * cov_scale
return ESKFState(
position=np.array(pos, dtype=float),
velocity=np.array(vel, dtype=float),
quaternion=np.array([1.0, 0.0, 0.0, 0.0]),
accel_bias=np.zeros(3),
gyro_bias=np.zeros(3),
covariance=cov,
timestamp=time.time(),
confidence=confidence,
)
def _wait_for_tcp(host: str, port: int, timeout: float = 30.0) -> bool:
"""Block until TCP port is accepting connections (or timeout)."""
deadline = time.time() + timeout
while time.time() < deadline:
try:
with socket.create_connection((host, port), timeout=2.0):
return True
except OSError:
time.sleep(1.0)
return False
# ---------------------------------------------------------------------------
# SITL-01: Connection
# ---------------------------------------------------------------------------
def test_sitl_tcp_port_reachable():
"""SITL-01: ArduPilot SITL TCP port is reachable before running tests."""
reachable = _wait_for_tcp(SITL_HOST, SITL_PORT, timeout=30.0)
assert reachable, (
f"SITL not reachable at {SITL_HOST}:{SITL_PORT}"
"is docker-compose.sitl.yml running?"
)
def test_pymavlink_connection_to_sitl():
"""SITL-01: pymavlink connects to SITL without error."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from pymavlink import mavutil
mav = mavutil.mavlink_connection(_MAVLINK_CONN)
# Wait for heartbeat (up to 15s)
msg = mav.recv_match(type="HEARTBEAT", blocking=True, timeout=15)
mav.close()
assert msg is not None, "No HEARTBEAT received from SITL within 15s"
# ---------------------------------------------------------------------------
# SITL-02: GPS_INPUT accepted by SITL EKF
# ---------------------------------------------------------------------------
def test_gps_input_accepted_by_sitl():
"""SITL-02: Sending GPS_INPUT produces GPS_RAW_INT with fix_type >= 3."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from pymavlink import mavutil
mav = mavutil.mavlink_connection(_MAVLINK_CONN)
# Wait for SITL ready
mav.recv_match(type="HEARTBEAT", blocking=True, timeout=15)
# Send 10 GPS_INPUT messages at ~5 Hz
for _ in range(10):
now = time.time()
gps_s = now - 315_964_800
week = int(gps_s // (7 * 86400))
week_ms = int((gps_s % (7 * 86400)) * 1000)
mav.mav.gps_input_send(
int(now * 1_000_000), # time_usec
0, # gps_id
0, # ignore_flags
week_ms, # time_week_ms
week, # time_week
3, # fix_type (3D)
int(_ORIGIN.lat * 1e7), # lat
int(_ORIGIN.lon * 1e7), # lon
600.0, # alt MSL
1.0, # hdop
1.5, # vdop
0.0, # vn
0.0, # ve
0.0, # vd
0.3, # speed_accuracy
5.0, # horiz_accuracy
2.0, # vert_accuracy
10, # satellites_visible
)
time.sleep(0.2)
# Wait for GPS_RAW_INT confirming fix
deadline = time.time() + 10.0
fix_type = 0
while time.time() < deadline:
msg = mav.recv_match(type="GPS_RAW_INT", blocking=True, timeout=2.0)
if msg and msg.fix_type >= 3:
fix_type = msg.fix_type
break
mav.close()
assert fix_type >= 3, (
f"SITL GPS_RAW_INT fix_type={fix_type} after GPS_INPUT — "
"expected 3D fix (≥3)"
)
# ---------------------------------------------------------------------------
# SITL-03: MAVLinkBridge lifecycle
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mavlink_bridge_start_stop_with_sitl():
"""SITL-03: MAVLinkBridge.start/stop with real SITL TCP connection."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from gps_denied.core.mavlink import MAVLinkBridge
bridge = MAVLinkBridge(
connection_string=_MAVLINK_CONN,
output_hz=5.0,
telemetry_hz=1.0,
)
bridge.update_state(_make_eskf_state(), altitude_m=600.0)
await bridge.start(_ORIGIN)
# Let it run for one output period
await asyncio.sleep(0.25)
await bridge.stop()
assert not bridge._running
# ---------------------------------------------------------------------------
# SITL-04: IMU receive callback
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_imu_callback_fires_from_sitl():
"""SITL-04: IMU callback is invoked when SITL sends RAW_IMU messages."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from gps_denied.core.mavlink import MAVLinkBridge
from gps_denied.schemas.eskf import IMUMeasurement
received: list[IMUMeasurement] = []
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
bridge.set_imu_callback(received.append)
bridge.update_state(_make_eskf_state(), altitude_m=600.0)
await bridge.start(_ORIGIN)
# SITL sends RAW_IMU at ~50-200 Hz; wait 1s
await asyncio.sleep(1.0)
await bridge.stop()
assert len(received) >= 1, (
"No IMUMeasurement received from SITL in 1s — "
"check that SITL is sending RAW_IMU messages"
)
# ---------------------------------------------------------------------------
# SITL-05: GPS_INPUT rate ≥ 5 Hz
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_gps_input_rate_at_least_5hz():
"""SITL-05: MAVLinkBridge delivers GPS_INPUT at ≥5 Hz over 1 second."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from pymavlink import mavutil
from gps_denied.core.mavlink import MAVLinkBridge
# Monitor incoming GPS_INPUT on a separate MAVLink connection
monitor = mavutil.mavlink_connection(_MAVLINK_CONN)
monitor.recv_match(type="HEARTBEAT", blocking=True, timeout=10)
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
bridge.update_state(_make_eskf_state(confidence=ConfidenceTier.HIGH), altitude_m=600.0)
await bridge.start(_ORIGIN)
t_start = time.time()
count = 0
while time.time() - t_start < 1.1:
msg = monitor.recv_match(type="GPS_INPUT", blocking=True, timeout=0.5)
if msg:
count += 1
await asyncio.sleep(0)
await bridge.stop()
monitor.close()
assert count >= 5, f"Only {count} GPS_INPUT messages in 1.1s — expected ≥5 (5 Hz)"
# ---------------------------------------------------------------------------
# SITL-06: Telemetry at 1 Hz
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_telemetry_reaches_sitl_at_1hz():
"""SITL-06: NAMED_VALUE_FLOAT CONF_SCORE delivered at ~1 Hz."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from pymavlink import mavutil
from gps_denied.core.mavlink import MAVLinkBridge
monitor = mavutil.mavlink_connection(_MAVLINK_CONN)
monitor.recv_match(type="HEARTBEAT", blocking=True, timeout=10)
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0, telemetry_hz=1.0)
bridge.update_state(_make_eskf_state(confidence=ConfidenceTier.MEDIUM), altitude_m=600.0)
await bridge.start(_ORIGIN)
t_start = time.time()
conf_count = 0
while time.time() - t_start < 2.2:
msg = monitor.recv_match(type="NAMED_VALUE_FLOAT", blocking=True, timeout=0.5)
if msg and getattr(msg, "name", "").startswith("CONF"):
conf_count += 1
await asyncio.sleep(0)
await bridge.stop()
monitor.close()
assert conf_count >= 2, (
f"Only {conf_count} CONF_SCORE messages in 2.2s — expected ≥2 (1 Hz)"
)
# ---------------------------------------------------------------------------
# SITL-07: Reloc request after 3 consecutive failures
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reloc_request_after_3_failures_with_sitl():
"""SITL-07: After 3 FAILED-confidence updates, reloc callback fires."""
pytest.importorskip("pymavlink", reason="pymavlink not installed")
from gps_denied.core.mavlink import MAVLinkBridge
from gps_denied.schemas.mavlink import RelocalizationRequest
received: list[RelocalizationRequest] = []
bridge = MAVLinkBridge(connection_string=_MAVLINK_CONN, output_hz=5.0)
bridge.set_reloc_callback(received.append)
bridge._origin = _ORIGIN
bridge._last_state = _make_eskf_state()
bridge._consecutive_failures = 3
await bridge.start(_ORIGIN)
await asyncio.sleep(0.1)
# Trigger reloc manually (simulates 3 consecutive failures)
bridge._send_reloc_request()
await asyncio.sleep(0.1)
await bridge.stop()
assert len(received) == 1, f"Expected 1 reloc request, got {len(received)}"
assert received[0].consecutive_failures == 3
assert received[0].last_lat is not None
+125 -2
View File
@@ -4,8 +4,14 @@ import numpy as np
import pytest
from gps_denied.core.models import ModelManager
from gps_denied.core.vo import SequentialVisualOdometry
from gps_denied.schemas.flight import CameraParameters
from gps_denied.core.vo import (
CuVSLAMVisualOdometry,
ISequentialVisualOdometry,
ORBVisualOdometry,
SequentialVisualOdometry,
create_vo_backend,
)
from gps_denied.schemas import CameraParameters
from gps_denied.schemas.vo import Features, Matches
@@ -100,3 +106,120 @@ def test_compute_relative_pose(vo, cam_params):
assert pose.rotation.shape == (3, 3)
# Because we randomize points in the mock manager, inliers will be extremely low
assert pose.tracking_good is False
# ---------------------------------------------------------------
# VO-02: ORBVisualOdometry interface contract
# ---------------------------------------------------------------
@pytest.fixture
def orb_vo():
return ORBVisualOdometry()
def test_orb_implements_interface(orb_vo):
"""ORBVisualOdometry must satisfy ISequentialVisualOdometry."""
assert isinstance(orb_vo, ISequentialVisualOdometry)
def test_orb_extract_features(orb_vo):
img = np.zeros((480, 640, 3), dtype=np.uint8)
feats = orb_vo.extract_features(img)
assert isinstance(feats, Features)
# Black image has no corners — empty result is valid
assert feats.keypoints.ndim == 2 and feats.keypoints.shape[1] == 2
def test_orb_match_features(orb_vo):
"""match_features returns Matches even when features are empty."""
empty_f = Features(
keypoints=np.zeros((0, 2), dtype=np.float32),
descriptors=np.zeros((0, 32), dtype=np.float32),
scores=np.zeros(0, dtype=np.float32),
)
m = orb_vo.match_features(empty_f, empty_f)
assert isinstance(m, Matches)
assert m.matches.shape[1] == 2 if len(m.matches) > 0 else True
def test_orb_compute_relative_pose_synthetic(orb_vo, cam_params):
"""ORB can track a small synthetic shift between frames."""
base = np.random.randint(50, 200, (480, 640, 3), dtype=np.uint8)
shifted = np.roll(base, 10, axis=1) # shift 10px right
pose = orb_vo.compute_relative_pose(base, shifted, cam_params)
# May return None on blank areas, but if not None must be well-formed
if pose is not None:
assert pose.translation.shape == (3,)
assert pose.rotation.shape == (3, 3)
assert pose.scale_ambiguous is True # ORB = monocular = scale ambiguous
def test_orb_scale_ambiguous(orb_vo, cam_params):
"""ORB RelativePose always has scale_ambiguous=True (monocular)."""
img1 = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img2 = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
pose = orb_vo.compute_relative_pose(img1, img2, cam_params)
if pose is not None:
assert pose.scale_ambiguous is True
# ---------------------------------------------------------------
# VO-01: CuVSLAMVisualOdometry (dev/CI fallback path)
# ---------------------------------------------------------------
def test_cuvslam_implements_interface():
"""CuVSLAMVisualOdometry satisfies ISequentialVisualOdometry on dev/CI."""
vo = CuVSLAMVisualOdometry()
assert isinstance(vo, ISequentialVisualOdometry)
def test_cuvslam_scale_not_ambiguous_on_dev(cam_params):
"""On dev/CI (no cuVSLAM), CuVSLAMVO still marks scale_ambiguous=False (metric intent)."""
vo = CuVSLAMVisualOdometry()
img1 = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img2 = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
pose = vo.compute_relative_pose(img1, img2, cam_params)
if pose is not None:
assert pose.scale_ambiguous is False # VO-04
# ---------------------------------------------------------------
# VO-03: ModelManager auto-selects Mock on dev/CI
# ---------------------------------------------------------------
def test_model_manager_mock_on_dev():
"""On non-Jetson, get_inference_engine returns MockInferenceEngine."""
from gps_denied.core.models import MockInferenceEngine
manager = ModelManager()
engine = manager.get_inference_engine("SuperPoint")
# On dev/CI we always get Mock
assert isinstance(engine, MockInferenceEngine)
def test_model_manager_trt_engine_loader():
"""TRTInferenceEngine falls back to Mock when engine file is absent."""
from gps_denied.core.models import TRTInferenceEngine
engine = TRTInferenceEngine("SuperPoint", "/nonexistent/superpoint.engine")
# Must not crash; should have a mock fallback
assert engine._mock_fallback is not None
# Infer via mock fallback must work
dummy_img = np.zeros((480, 640, 3), dtype=np.uint8)
result = engine.infer(dummy_img)
assert "keypoints" in result
# ---------------------------------------------------------------
# Factory: create_vo_backend
# ---------------------------------------------------------------
def test_create_vo_backend_returns_interface():
"""create_vo_backend() always returns an ISequentialVisualOdometry."""
manager = ModelManager()
backend = create_vo_backend(model_manager=manager)
assert isinstance(backend, ISequentialVisualOdometry)
def test_create_vo_backend_orb_fallback():
"""Without model_manager and no cuVSLAM, falls back to ORBVisualOdometry."""
backend = create_vo_backend(model_manager=None)
assert isinstance(backend, ORBVisualOdometry)