"""Phase 5 pipeline wiring tests. PIPE-01: VO result feeds into ESKF update_vo. PIPE-02: SatelliteDataManager + CoordinateTransformer wired into process_frame. PIPE-04: Failure counter resets on recovery; MAVLink reloc triggered at threshold. PIPE-05: ImageRotationManager initialised on first frame. PIPE-06: convert_object_to_gps uses CoordinateTransformer pixel_to_gps. PIPE-07: ESKF state pushed to MAVLinkBridge on every frame. PIPE-08: ImageRotationManager accepts optional model_manager arg. """ from unittest.mock import AsyncMock, MagicMock import numpy as np import pytest from gps_denied.core.coordinates import CoordinateTransformer from gps_denied.core.processor import FlightProcessor, TrackingState from gps_denied.core.rotation import ImageRotationManager from gps_denied.schemas import CameraParameters, GPSPoint from gps_denied.schemas.vo import RelativePose # --------------------------------------------------------------- # Helpers # --------------------------------------------------------------- ORIGIN = GPSPoint(lat=49.0, lon=32.0) def _make_processor(with_coord=True, with_mavlink=True, with_satellite=False): repo = MagicMock() streamer = MagicMock() streamer.push_event = AsyncMock() proc = FlightProcessor(repo, streamer) coord = CoordinateTransformer() if with_coord else None if coord: coord.set_enu_origin("fl1", ORIGIN) coord.set_enu_origin("fl2", ORIGIN) coord.set_enu_origin("fl_cycle", ORIGIN) mavlink = MagicMock() if with_mavlink else None proc.attach_components(coord=coord, mavlink=mavlink) return proc, coord, mavlink def _init_eskf(proc, flight_id, origin=ORIGIN, altitude=100.0): """Seed ESKF for a flight so process_frame can use it.""" proc._init_eskf_for_flight(flight_id, origin, altitude) proc._altitudes[flight_id] = altitude # --------------------------------------------------------------- # PIPE-08: ImageRotationManager accepts optional model_manager # --------------------------------------------------------------- def test_rotation_manager_no_args(): """PIPE-08: ImageRotationManager() with no args still works.""" rm = ImageRotationManager() assert rm._model_manager is None def test_rotation_manager_with_model_manager(): """PIPE-08: ImageRotationManager accepts model_manager kwarg.""" mm = MagicMock() rm = ImageRotationManager(model_manager=mm) assert rm._model_manager is mm # --------------------------------------------------------------- # PIPE-05: Rotation manager initialised on first frame # --------------------------------------------------------------- @pytest.mark.asyncio async def test_first_frame_seeds_rotation_history(): """PIPE-05: First frame call to process_frame seeds HeadingHistory.""" proc, _, _ = _make_processor() rm = ImageRotationManager() proc._rotation = rm flight = "fl_rot" proc._prev_images[flight] = np.zeros((100, 100, 3), dtype=np.uint8) img = np.zeros((100, 100, 3), dtype=np.uint8) await proc.process_frame(flight, 0, img) # HeadingHistory entry should exist after first frame assert flight in rm._history # --------------------------------------------------------------- # PIPE-01: ESKF VO update # --------------------------------------------------------------- @pytest.mark.asyncio async def test_eskf_vo_update_called_on_good_tracking(): """PIPE-01: When VO tracking_good=True, eskf.update_vo is called.""" proc, _, _ = _make_processor() flight = "fl_vo" _init_eskf(proc, flight) img0 = np.zeros((100, 100, 3), dtype=np.uint8) img1 = np.ones((100, 100, 3), dtype=np.uint8) # Seed previous frame proc._prev_images[flight] = img0 # Mock VO to return good tracking good_pose = RelativePose( translation=np.array([1.0, 0.0, 0.0]), rotation=np.eye(3), covariance=np.eye(6), confidence=0.9, inlier_count=50, total_matches=60, tracking_good=True, ) mock_vo = MagicMock() mock_vo.compute_relative_pose.return_value = good_pose proc._vo = mock_vo proc._eskf[flight]._nominal_state["position"].copy() await proc.process_frame(flight, 1, img1) proc._eskf[flight]._nominal_state["position"].copy() # ESKF position should have changed due to VO update assert mock_vo.compute_relative_pose.called # After update_vo the position should differ from initial zeros # (VO innovation shifts position) assert proc._eskf[flight].initialized @pytest.mark.asyncio async def test_failure_counter_increments_on_bad_vo(): """PIPE-04: Consecutive failure counter increments when VO fails.""" proc, _, _ = _make_processor() flight = "fl_fail" _init_eskf(proc, flight) img0 = np.zeros((100, 100, 3), dtype=np.uint8) img1 = np.zeros((100, 100, 3), dtype=np.uint8) proc._prev_images[flight] = img0 bad_pose = RelativePose( translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6), confidence=0.0, inlier_count=0, total_matches=0, tracking_good=False, ) mock_vo = MagicMock() mock_vo.compute_relative_pose.return_value = bad_pose proc._vo = mock_vo await proc.process_frame(flight, 1, img1) assert proc._failure_counts.get(flight, 0) == 1 @pytest.mark.asyncio async def test_failure_counter_resets_on_good_vo(): """PIPE-04: Failure counter resets when VO succeeds.""" proc, _, _ = _make_processor() flight = "fl_reset" _init_eskf(proc, flight) proc._failure_counts[flight] = 5 img0 = np.zeros((100, 100, 3), dtype=np.uint8) img1 = np.ones((100, 100, 3), dtype=np.uint8) proc._prev_images[flight] = img0 good_pose = RelativePose( translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6), confidence=0.9, inlier_count=50, total_matches=60, tracking_good=True, ) mock_vo = MagicMock() mock_vo.compute_relative_pose.return_value = good_pose proc._vo = mock_vo await proc.process_frame(flight, 1, img1) assert proc._failure_counts[flight] == 0 @pytest.mark.asyncio async def test_failure_counter_resets_on_recovery(): """PIPE-04: Failure counter resets when recovery succeeds.""" proc, _, _ = _make_processor() flight = "fl_rec" _init_eskf(proc, flight) proc._failure_counts[flight] = 3 # Seed previous frame so VO is attempted img0 = np.zeros((100, 100, 3), dtype=np.uint8) img1 = np.ones((100, 100, 3), dtype=np.uint8) proc._prev_images[flight] = img0 proc._flight_states[flight] = TrackingState.RECOVERY # Mock recovery to succeed mock_recovery = MagicMock() mock_recovery.process_chunk_recovery.return_value = True mock_chunk_mgr = MagicMock() mock_chunk_mgr.get_active_chunk.return_value = MagicMock(chunk_id="c1") proc._recovery = mock_recovery proc._chunk_mgr = mock_chunk_mgr result = await proc.process_frame(flight, 2, img1) assert result.alignment_success is True assert proc._failure_counts[flight] == 0 # --------------------------------------------------------------- # PIPE-07: ESKF state pushed to MAVLink # --------------------------------------------------------------- @pytest.mark.asyncio async def test_mavlink_state_pushed_per_frame(): """PIPE-07: MAVLinkBridge.update_state called on every frame with ESKF.""" proc, _, mavlink = _make_processor() flight = "fl_mav" _init_eskf(proc, flight) img = np.zeros((100, 100, 3), dtype=np.uint8) await proc.process_frame(flight, 0, img) mavlink.update_state.assert_called_once() args, kwargs = mavlink.update_state.call_args # First positional arg is ESKFState from gps_denied.schemas.eskf import ESKFState assert isinstance(args[0], ESKFState) assert kwargs.get("altitude_m") == 100.0 @pytest.mark.asyncio async def test_mavlink_not_called_without_eskf(): """PIPE-07: No MAVLink call if ESKF not initialized for flight.""" proc, _, mavlink = _make_processor() # Do NOT call _init_eskf_for_flight → ESKF absent flight = "fl_nomav" img = np.zeros((100, 100, 3), dtype=np.uint8) await proc.process_frame(flight, 0, img) mavlink.update_state.assert_not_called() # --------------------------------------------------------------- # PIPE-06: convert_object_to_gps uses CoordinateTransformer # --------------------------------------------------------------- @pytest.mark.asyncio async def test_convert_object_to_gps_uses_coord_transformer(): """PIPE-06: pixel_to_gps called via CoordinateTransformer.""" proc, coord, _ = _make_processor() flight = "fl_obj" coord.set_enu_origin(flight, ORIGIN) _init_eskf(proc, flight) proc._flight_cameras[flight] = CameraParameters( focal_length=4.5, sensor_width=6.17, sensor_height=4.55, resolution_width=640, resolution_height=480, ) response = await proc.convert_object_to_gps(flight, 0, (320.0, 240.0)) # Should return a valid GPS point (not the old hardcoded 48.0, 37.0) assert response.gps is not None # The result should be near the origin (ENU origin + ray projection) assert abs(response.gps.lat - ORIGIN.lat) < 1.0 assert abs(response.gps.lon - ORIGIN.lon) < 1.0 @pytest.mark.asyncio async def test_convert_object_to_gps_fallback_without_coord(): """PIPE-06: Falls back gracefully when no CoordinateTransformer is set.""" proc, _, _ = _make_processor(with_coord=False) flight = "fl_nocoord" _init_eskf(proc, flight) response = await proc.convert_object_to_gps(flight, 0, (100.0, 100.0)) # Must return something (not crash), even without coord transformer assert response.gps is not None # --------------------------------------------------------------- # ESKF initialization via create_flight # --------------------------------------------------------------- @pytest.mark.asyncio async def test_create_flight_initialises_eskf(): """create_flight should seed ESKF for the new flight.""" from gps_denied.schemas import Geofences from gps_denied.schemas.flight import FlightCreateRequest proc, _, _ = _make_processor() from datetime import datetime, timezone flight_row = MagicMock() flight_row.id = "fl_new" flight_row.created_at = datetime.now(timezone.utc) proc.repository.insert_flight = AsyncMock(return_value=flight_row) proc.repository.insert_geofence = AsyncMock() proc.repository.insert_waypoint = AsyncMock() req = FlightCreateRequest( name="test", description="", start_gps=ORIGIN, altitude=150.0, geofences=Geofences(polygons=[]), rough_waypoints=[], camera_params=CameraParameters( focal_length=4.5, sensor_width=6.17, sensor_height=4.55, resolution_width=640, resolution_height=480, ), ) await proc.create_flight(req) assert "fl_new" in proc._eskf assert proc._eskf["fl_new"].initialized assert proc._altitudes["fl_new"] == 150.0 # --------------------------------------------------------------- # _cleanup_flight clears ESKF state # --------------------------------------------------------------- def test_cleanup_flight_removes_eskf(): """_cleanup_flight should remove ESKF and related dicts.""" proc, _, _ = _make_processor() flight = "fl_clean" _init_eskf(proc, flight) proc._failure_counts[flight] = 2 proc._cleanup_flight(flight) assert flight not in proc._eskf assert flight not in proc._altitudes assert flight not in proc._failure_counts