Files
gps-denied-onboard/tests/test_processor_pipe.py
T
Yuzviak dd9835c0cd fix(lint): resolve all ruff errors — trailing whitespace, E501, F401
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001)
- Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests
- Removed unused imports (F401) in models.py, schemas/__init__.py
- pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces

ruff check: 0 errors. pytest: 195 passed / 8 skipped.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 17:09:47 +03:00

335 lines
11 KiB
Python

"""Phase 5 pipeline wiring tests.
PIPE-01: VO result feeds into ESKF update_vo.
PIPE-02: SatelliteDataManager + CoordinateTransformer wired into process_frame.
PIPE-04: Failure counter resets on recovery; MAVLink reloc triggered at threshold.
PIPE-05: ImageRotationManager initialised on first frame.
PIPE-06: convert_object_to_gps uses CoordinateTransformer pixel_to_gps.
PIPE-07: ESKF state pushed to MAVLinkBridge on every frame.
PIPE-08: ImageRotationManager accepts optional model_manager arg.
"""
from unittest.mock import AsyncMock, MagicMock
import numpy as np
import pytest
from gps_denied.core.coordinates import CoordinateTransformer
from gps_denied.core.processor import FlightProcessor, TrackingState
from gps_denied.core.rotation import ImageRotationManager
from gps_denied.schemas import CameraParameters, GPSPoint
from gps_denied.schemas.vo import RelativePose
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
ORIGIN = GPSPoint(lat=49.0, lon=32.0)
def _make_processor(with_coord=True, with_mavlink=True, with_satellite=False):
repo = MagicMock()
streamer = MagicMock()
streamer.push_event = AsyncMock()
proc = FlightProcessor(repo, streamer)
coord = CoordinateTransformer() if with_coord else None
if coord:
coord.set_enu_origin("fl1", ORIGIN)
coord.set_enu_origin("fl2", ORIGIN)
coord.set_enu_origin("fl_cycle", ORIGIN)
mavlink = MagicMock() if with_mavlink else None
proc.attach_components(coord=coord, mavlink=mavlink)
return proc, coord, mavlink
def _init_eskf(proc, flight_id, origin=ORIGIN, altitude=100.0):
"""Seed ESKF for a flight so process_frame can use it."""
proc._init_eskf_for_flight(flight_id, origin, altitude)
proc._altitudes[flight_id] = altitude
# ---------------------------------------------------------------
# PIPE-08: ImageRotationManager accepts optional model_manager
# ---------------------------------------------------------------
def test_rotation_manager_no_args():
"""PIPE-08: ImageRotationManager() with no args still works."""
rm = ImageRotationManager()
assert rm._model_manager is None
def test_rotation_manager_with_model_manager():
"""PIPE-08: ImageRotationManager accepts model_manager kwarg."""
mm = MagicMock()
rm = ImageRotationManager(model_manager=mm)
assert rm._model_manager is mm
# ---------------------------------------------------------------
# PIPE-05: Rotation manager initialised on first frame
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_first_frame_seeds_rotation_history():
"""PIPE-05: First frame call to process_frame seeds HeadingHistory."""
proc, _, _ = _make_processor()
rm = ImageRotationManager()
proc._rotation = rm
flight = "fl_rot"
proc._prev_images[flight] = np.zeros((100, 100, 3), dtype=np.uint8)
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
# HeadingHistory entry should exist after first frame
assert flight in rm._history
# ---------------------------------------------------------------
# PIPE-01: ESKF VO update
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_eskf_vo_update_called_on_good_tracking():
"""PIPE-01: When VO tracking_good=True, eskf.update_vo is called."""
proc, _, _ = _make_processor()
flight = "fl_vo"
_init_eskf(proc, flight)
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
# Seed previous frame
proc._prev_images[flight] = img0
# Mock VO to return good tracking
good_pose = RelativePose(
translation=np.array([1.0, 0.0, 0.0]),
rotation=np.eye(3),
covariance=np.eye(6),
confidence=0.9,
inlier_count=50,
total_matches=60,
tracking_good=True,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = good_pose
proc._vo = mock_vo
proc._eskf[flight]._nominal_state["position"].copy()
await proc.process_frame(flight, 1, img1)
proc._eskf[flight]._nominal_state["position"].copy()
# ESKF position should have changed due to VO update
assert mock_vo.compute_relative_pose.called
# After update_vo the position should differ from initial zeros
# (VO innovation shifts position)
assert proc._eskf[flight].initialized
@pytest.mark.asyncio
async def test_failure_counter_increments_on_bad_vo():
"""PIPE-04: Consecutive failure counter increments when VO fails."""
proc, _, _ = _make_processor()
flight = "fl_fail"
_init_eskf(proc, flight)
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.zeros((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
bad_pose = RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0.0, inlier_count=0, total_matches=0, tracking_good=False,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = bad_pose
proc._vo = mock_vo
await proc.process_frame(flight, 1, img1)
assert proc._failure_counts.get(flight, 0) == 1
@pytest.mark.asyncio
async def test_failure_counter_resets_on_good_vo():
"""PIPE-04: Failure counter resets when VO succeeds."""
proc, _, _ = _make_processor()
flight = "fl_reset"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 5
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
good_pose = RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0.9, inlier_count=50, total_matches=60, tracking_good=True,
)
mock_vo = MagicMock()
mock_vo.compute_relative_pose.return_value = good_pose
proc._vo = mock_vo
await proc.process_frame(flight, 1, img1)
assert proc._failure_counts[flight] == 0
@pytest.mark.asyncio
async def test_failure_counter_resets_on_recovery():
"""PIPE-04: Failure counter resets when recovery succeeds."""
proc, _, _ = _make_processor()
flight = "fl_rec"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 3
# Seed previous frame so VO is attempted
img0 = np.zeros((100, 100, 3), dtype=np.uint8)
img1 = np.ones((100, 100, 3), dtype=np.uint8)
proc._prev_images[flight] = img0
proc._flight_states[flight] = TrackingState.RECOVERY
# Mock recovery to succeed
mock_recovery = MagicMock()
mock_recovery.process_chunk_recovery.return_value = True
mock_chunk_mgr = MagicMock()
mock_chunk_mgr.get_active_chunk.return_value = MagicMock(chunk_id="c1")
proc._recovery = mock_recovery
proc._chunk_mgr = mock_chunk_mgr
result = await proc.process_frame(flight, 2, img1)
assert result.alignment_success is True
assert proc._failure_counts[flight] == 0
# ---------------------------------------------------------------
# PIPE-07: ESKF state pushed to MAVLink
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_mavlink_state_pushed_per_frame():
"""PIPE-07: MAVLinkBridge.update_state called on every frame with ESKF."""
proc, _, mavlink = _make_processor()
flight = "fl_mav"
_init_eskf(proc, flight)
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
mavlink.update_state.assert_called_once()
args, kwargs = mavlink.update_state.call_args
# First positional arg is ESKFState
from gps_denied.schemas.eskf import ESKFState
assert isinstance(args[0], ESKFState)
assert kwargs.get("altitude_m") == 100.0
@pytest.mark.asyncio
async def test_mavlink_not_called_without_eskf():
"""PIPE-07: No MAVLink call if ESKF not initialized for flight."""
proc, _, mavlink = _make_processor()
# Do NOT call _init_eskf_for_flight → ESKF absent
flight = "fl_nomav"
img = np.zeros((100, 100, 3), dtype=np.uint8)
await proc.process_frame(flight, 0, img)
mavlink.update_state.assert_not_called()
# ---------------------------------------------------------------
# PIPE-06: convert_object_to_gps uses CoordinateTransformer
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_convert_object_to_gps_uses_coord_transformer():
"""PIPE-06: pixel_to_gps called via CoordinateTransformer."""
proc, coord, _ = _make_processor()
flight = "fl_obj"
coord.set_enu_origin(flight, ORIGIN)
_init_eskf(proc, flight)
proc._flight_cameras[flight] = CameraParameters(
focal_length=4.5, sensor_width=6.17, sensor_height=4.55,
resolution_width=640, resolution_height=480,
)
response = await proc.convert_object_to_gps(flight, 0, (320.0, 240.0))
# Should return a valid GPS point (not the old hardcoded 48.0, 37.0)
assert response.gps is not None
# The result should be near the origin (ENU origin + ray projection)
assert abs(response.gps.lat - ORIGIN.lat) < 1.0
assert abs(response.gps.lon - ORIGIN.lon) < 1.0
@pytest.mark.asyncio
async def test_convert_object_to_gps_fallback_without_coord():
"""PIPE-06: Falls back gracefully when no CoordinateTransformer is set."""
proc, _, _ = _make_processor(with_coord=False)
flight = "fl_nocoord"
_init_eskf(proc, flight)
response = await proc.convert_object_to_gps(flight, 0, (100.0, 100.0))
# Must return something (not crash), even without coord transformer
assert response.gps is not None
# ---------------------------------------------------------------
# ESKF initialization via create_flight
# ---------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_flight_initialises_eskf():
"""create_flight should seed ESKF for the new flight."""
from gps_denied.schemas import Geofences
from gps_denied.schemas.flight import FlightCreateRequest
proc, _, _ = _make_processor()
from datetime import datetime, timezone
flight_row = MagicMock()
flight_row.id = "fl_new"
flight_row.created_at = datetime.now(timezone.utc)
proc.repository.insert_flight = AsyncMock(return_value=flight_row)
proc.repository.insert_geofence = AsyncMock()
proc.repository.insert_waypoint = AsyncMock()
req = FlightCreateRequest(
name="test",
description="",
start_gps=ORIGIN,
altitude=150.0,
geofences=Geofences(polygons=[]),
rough_waypoints=[],
camera_params=CameraParameters(
focal_length=4.5, sensor_width=6.17, sensor_height=4.55,
resolution_width=640, resolution_height=480,
),
)
await proc.create_flight(req)
assert "fl_new" in proc._eskf
assert proc._eskf["fl_new"].initialized
assert proc._altitudes["fl_new"] == 150.0
# ---------------------------------------------------------------
# _cleanup_flight clears ESKF state
# ---------------------------------------------------------------
def test_cleanup_flight_removes_eskf():
"""_cleanup_flight should remove ESKF and related dicts."""
proc, _, _ = _make_processor()
flight = "fl_clean"
_init_eskf(proc, flight)
proc._failure_counts[flight] = 2
proc._cleanup_flight(flight)
assert flight not in proc._eskf
assert flight not in proc._altitudes
assert flight not in proc._failure_counts