import pytest pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True) import numpy as np import threading import time from unittest.mock import Mock, patch from f02_flight_processing_engine import FlightProcessingEngine @pytest.fixture def mock_deps(): return { "vo": Mock(), "graph": Mock(), "cvgl": Mock(), "publisher": Mock(), "failure_coord": Mock(), "result_mgr": Mock(), "camera_params": Mock(), "image_pipeline": Mock(), "chunk_manager": Mock() } @pytest.fixture def engine(mock_deps): eng = FlightProcessingEngine( vo_frontend=mock_deps["vo"], factor_graph=mock_deps["graph"], cvgl_backend=mock_deps["cvgl"], async_pose_publisher=mock_deps["publisher"], failure_coordinator=mock_deps["failure_coord"], result_manager=mock_deps["result_mgr"], camera_params=mock_deps["camera_params"], image_input_pipeline=mock_deps["image_pipeline"], route_chunk_manager=mock_deps["chunk_manager"] ) return eng class TestFrameProcessingLoop: """Tests defined in 02.2.01_feature_frame_processing_loop.md""" def test_start_processing_initiates_background_loop(self, engine): engine.start_processing("flight_123") assert engine.is_running is True assert engine.flight_status == "PROCESSING" assert isinstance(engine.processing_thread, threading.Thread) assert engine.processing_thread.is_alive() engine.stop_processing() def test_stop_processing_gracefully_terminates_active_processing(self, engine): engine.start_processing("flight_123") engine.stop_processing() assert engine.is_running is False engine.processing_thread.join(timeout=1.0) assert not engine.processing_thread.is_alive() def test_process_frame_returns_valid_FrameResult_with_pose(self, engine, mock_deps): # Setup active chunk mock_chunk = Mock(chunk_id="chunk_0") engine.get_active_chunk = Mock(return_value=mock_chunk) # Setup successful VO tracking mock_rel_pose = Mock(tracking_good=True, rotation=np.eye(3), translation=np.array([1, 0, 0])) mock_deps["vo"].compute_relative_pose.return_value = mock_rel_pose # Setup optimizer optimization success mock_deps["graph"].optimize_chunk.return_value = (True, {10: np.eye(4)}) dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8) engine.last_frame_id = 9 result = engine.process_frame("flight_123", 10, dummy_img) assert result["status"] == "PROCESSED" assert result["frame"] == 10 mock_deps["graph"].add_relative_factor_to_chunk.assert_called_once() mock_deps["graph"].optimize_chunk.assert_called_once_with("chunk_0") def test_flight_status_transitions_to_blocked_on_tracking_loss(self, engine, mock_deps): engine.get_active_chunk = Mock(return_value=Mock(chunk_id="chunk_0")) mock_deps["vo"].compute_relative_pose.return_value = Mock(tracking_good=False) engine.handle_tracking_loss = Mock(return_value="BLOCKED") dummy_img = np.zeros((100, 100, 3), dtype=np.uint8) engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8) result = engine.process_frame("flight_123", 10, dummy_img) engine.handle_tracking_loss.assert_called_once_with("flight_123", 10)