import pytest pytest.skip("Obsolete test file replaced by test_f02_2_flight_processing_engine", allow_module_level=True) import numpy as np import asyncio from unittest.mock import Mock, MagicMock from queue import Empty from f02_flight_processing_engine import FlightProcessingEngine @pytest.fixture def mock_deps(): return { "vo": Mock(), "graph": Mock(), "cvgl": Mock(), "publisher": MagicMock(), "failure_coord": Mock(), "result_mgr": Mock(), "camera_params": Mock() } @pytest.fixture def engine(mock_deps): eng = FlightProcessingEngine( vo_frontend=mock_deps["vo"], factor_graph=mock_deps["graph"], cvgl_backend=mock_deps["cvgl"], async_pose_publisher=mock_deps["publisher"], event_loop=Mock(spec=asyncio.AbstractEventLoop), failure_coordinator=mock_deps["failure_coord"], result_manager=mock_deps["result_mgr"], camera_params=mock_deps["camera_params"] ) eng.active_flight_id = "test_flight_1" # Mock the optimizer results eng.optimizer.optimize_chunk.return_value = (True, {1: np.eye(4), 2: np.eye(4)}) return eng class TestFlightProcessingEngine: def test_first_frame_initializes_chunk(self, engine, mock_deps): """Verifies that the very first frame creates the origin subgraph.""" dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine._process_single_frame(1, dummy_img) assert engine.last_frame_id == 1 mock_deps["graph"].create_chunk_subgraph.assert_called_once_with("chunk_0", 1) # CVGL anchoring should be attempted on the very first frame mock_deps["cvgl"].retrieve_and_match.assert_called_once() def test_successful_tracking_adds_relative_factor(self, engine, mock_deps): """Verifies normal VO tracking updates the pose graph.""" # Initialize frame 1 engine.last_frame_id = 1 engine.last_image = np.zeros((10, 10)) engine.optimizer.chunks = {"chunk_0": Mock()} # Mock successful VO for frame 2 mock_pose = Mock() mock_pose.tracking_good = True mock_pose.rotation = np.eye(3) mock_pose.translation = np.array([1, 0, 0]) mock_deps["vo"].compute_relative_pose.return_value = mock_pose dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine._process_single_frame(2, dummy_img) # Ensure graph factor was added mock_deps["graph"].add_relative_factor_to_chunk.assert_called_once() # Ensure result was published assert mock_deps["publisher"].call_count > 0 def test_tracking_loss_triggers_new_chunk(self, engine, mock_deps): """Verifies AC-4 (sharp turn / tracking loss) initiates a multi-map fragment.""" engine.last_frame_id = 1 engine.last_image = np.zeros((10, 10)) # Mock failed VO (e.g., sharp turn with <5% overlap) mock_deps["vo"].compute_relative_pose.return_value = None # Mock failure coordinator creating a new chunk mock_chunk = Mock() mock_chunk.chunk_id = "chunk_1" mock_deps["failure_coord"].create_chunk_on_tracking_loss.return_value = mock_chunk dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine._process_single_frame(2, dummy_img) assert engine.current_chunk_id == "chunk_1" assert "chunk_1" in engine.unanchored_chunks # Ensure graph creates a new disconnected subgraph for chunk_1 mock_deps["graph"].create_chunk_subgraph.assert_called_with("chunk_1", 2) def test_cvgl_anchoring_updates_trajectory(self, engine, mock_deps): """Verifies periodic CVGL anchoring triggers metric pose refinements (AC-8).""" engine.satellite_index = "mock_index" # Mock successful CVGL match mock_deps["cvgl"].retrieve_and_match.return_value = (True, np.eye(3), {'lat': 48.0, 'lon': 37.0}) dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine._attempt_global_anchoring(15, dummy_img) # Ensure the anchor is added as a hard constraint to the graph mock_deps["graph"].add_chunk_anchor.assert_called_once() # Extract the args passed to add_chunk_anchor args, _ = mock_deps["graph"].add_chunk_anchor.call_args assert args[0] == "chunk_0" assert args[1] == 15 assert np.array_equal(args[2], np.array([48.0, 37.0, 400.0])) # Ensure refined poses are published assert mock_deps["publisher"].call_count > 0