import pytest import time import numpy as np from unittest.mock import Mock, call, ANY from f02_2_flight_processing_engine import ( FlightProcessingEngine, FrameResult, RecoveryStatus, ChunkHandle, UserFixResult ) from f02_1_flight_lifecycle_manager import UserFixRequest, GPSPoint @pytest.fixture def mocked_engine(): """Provides an engine with fully mocked external dependencies.""" engine = FlightProcessingEngine( f04=Mock(), f05=Mock(), f06=Mock(), f07=Mock(), f08=Mock(), f09=Mock(), f10=Mock(), f11=Mock(), f12=Mock(), f13=Mock(), f14=Mock(), f15=Mock(), f17=Mock() ) engine.f10.get_trajectory.return_value = {} engine.f10.get_chunk_trajectory.return_value = {} engine.f11.expand_search_radius.return_value = [] return engine class TestFlightProcessingEngine: # --- Feature 02.2.01: Frame Processing Loop --- def test_start_and_stop_processing(self, mocked_engine): mocked_engine.f05.get_next_image.return_value = {"frame_id": 1, "image": np.zeros((10,10))} mocked_engine.start_processing("flight_1") assert mocked_engine._get_flight_status("flight_1") == "PROCESSING" assert mocked_engine._threads["flight_1"].is_alive() mocked_engine.stop_processing("flight_1") mocked_engine._threads["flight_1"].join(timeout=1.0) assert not mocked_engine._threads["flight_1"].is_alive() def test_flight_status_transitions_to_completed(self, mocked_engine): # Return None to simulate an empty image queue mocked_engine.f05.get_next_image.return_value = None mocked_engine.start_processing("flight_1") mocked_engine._threads["flight_1"].join(timeout=1.0) assert mocked_engine._get_flight_status("flight_1") == "COMPLETED" def test_process_frame_success(self, mocked_engine): mocked_engine.f07.last_image = np.zeros((10,10)) mock_pose = Mock(tracking_good=True) mocked_engine.f07.compute_relative_pose.return_value = mock_pose result = mocked_engine.process_frame("flight_1", 10, np.zeros((10,10))) assert result.success is True assert result.frame_id == 10 mocked_engine.f06.requires_rotation_sweep.assert_called_once_with("flight_1") def test_process_frame_tracking_loss(self, mocked_engine): mocked_engine.f07.compute_relative_pose.return_value = None result = mocked_engine.process_frame("flight_1", 11, np.zeros((10,10))) assert result.success is False assert result.frame_id == 11 def test_processing_stops_mid_flight(self, mocked_engine): # Simulate a processing delay so we can stop it mid-flight def slow_process(*args): time.sleep(0.2) return FrameResult(frame_id=1, success=True) mocked_engine.process_frame = slow_process mocked_engine.f05.get_next_image.return_value = {"frame_id": 1, "image": np.zeros((10,10))} mocked_engine.start_processing("flight_mid_stop") time.sleep(0.1) # Let the loop start mocked_engine.stop_processing("flight_mid_stop") # Wait for thread to finish mocked_engine._threads["flight_mid_stop"].join(timeout=1.0) assert not mocked_engine._threads["flight_mid_stop"].is_alive() # Status should remain PROCESSING because the queue wasn't exhausted, it was interrupted assert mocked_engine._get_flight_status("flight_mid_stop") == "PROCESSING" def test_state_machine_rejects_invalid_transitions(self, mocked_engine): mocked_engine._update_flight_status("flight_state_test", "COMPLETED") # Attempting to go from COMPLETED back to PROCESSING should fail success = mocked_engine._update_flight_status("flight_state_test", "PROCESSING") assert success is False assert mocked_engine._get_flight_status("flight_state_test") == "COMPLETED" def test_internal_helpers(self, mocked_engine): # Test _check_tracking_status assert mocked_engine._check_tracking_status(FrameResult(frame_id=1, success=True)) is True assert mocked_engine._check_tracking_status(FrameResult(frame_id=2, success=False)) is False # Test _is_processing_active assert mocked_engine._is_processing_active("unknown") is False mocked_engine.f05.get_next_image.return_value = None mocked_engine.start_processing("flight_active_test") assert mocked_engine._is_processing_active("flight_active_test") is True mocked_engine.stop_processing("flight_active_test") assert mocked_engine._is_processing_active("flight_active_test") is False # --- Feature 02.2.02: Tracking Loss Recovery --- def test_handle_tracking_loss_success(self, mocked_engine): mocked_engine.f11.expand_search_radius.return_value = [Mock(x=0, y=0, zoom=18)] # Simulate F11 finding a match on the 3rd radius expansion side_effects = [False, False, True] mocked_engine.f11.try_current_grid.side_effect = side_effects status = mocked_engine.handle_tracking_loss("flight_2", 20, np.zeros((10,10))) assert status == RecoveryStatus.FOUND assert mocked_engine.f11.try_current_grid.call_count == 3 def test_handle_tracking_loss_exhausted_blocks(self, mocked_engine): # Simulate F11 never finding a match within 25 radius attempts mocked_engine.f11.try_current_grid.return_value = False mocked_engine.f11.expand_search_radius.side_effect = [[Mock()]] * 5 status = mocked_engine.handle_tracking_loss("flight_2", 20, np.zeros((10,10))) assert status == RecoveryStatus.BLOCKED assert mocked_engine.f11.try_current_grid.call_count == 5 mocked_engine.f11.create_user_input_request.assert_called_once() mocked_engine.f15.send_user_input_request.assert_called_once() def test_internal_recovery_helpers(self, mocked_engine): # 1. Test _run_progressive_search mocked_engine.f11.expand_search_radius.return_value = [Mock(x=0, y=0, zoom=18)] mocked_engine.f11.try_current_grid.return_value = True assert mocked_engine._run_progressive_search("flight_search", 1, np.zeros((10,10))) == RecoveryStatus.FOUND mocked_engine.f11.try_current_grid.return_value = False assert mocked_engine._run_progressive_search("flight_search", 1, np.zeros((10,10))) == RecoveryStatus.FAILED # 2. Test _request_user_input mocked_engine._request_user_input("flight_req", 1, "test_req") mocked_engine.f15.send_user_input_request.assert_called_once_with("flight_req", "test_req") # 3. Test _validate_user_fix valid_fix = UserFixRequest(frame_id=1, uav_pixel=(100, 100), satellite_gps=GPSPoint(lat=0, lon=0)) invalid_fix = UserFixRequest(frame_id=1, uav_pixel=(-1, 100), satellite_gps=GPSPoint(lat=0, lon=0)) assert mocked_engine._validate_user_fix(valid_fix) is True assert mocked_engine._validate_user_fix(invalid_fix) is False # 4. Test _apply_fix_and_resume mocked_engine.f11.apply_user_anchor.return_value = True res = mocked_engine._apply_fix_and_resume("flight_apply", valid_fix) assert res.status == "success" assert mocked_engine._get_flight_status("flight_apply") == "PROCESSING" def test_apply_user_fix_success_resumes_processing(self, mocked_engine): mocked_engine._update_flight_status("flight_3", "BLOCKED") mocked_engine.f11.apply_user_anchor.return_value = True fix_data = UserFixRequest(frame_id=20, uav_pixel=(500, 500), satellite_gps=GPSPoint(lat=48.0, lon=37.0)) result = mocked_engine.apply_user_fix("flight_3", fix_data) assert result.status == "success" assert mocked_engine._get_flight_status("flight_3") == "PROCESSING" def test_apply_user_fix_rejects_non_blocked(self, mocked_engine): mocked_engine._update_flight_status("flight_3", "PROCESSING") fix_data = UserFixRequest(frame_id=20, uav_pixel=(500, 500), satellite_gps=GPSPoint(lat=48.0, lon=37.0)) result = mocked_engine.apply_user_fix("flight_3", fix_data) assert result.status == "error" assert "not in blocked state" in result.message.lower() def test_apply_user_fix_invalid_pixel(self, mocked_engine): mocked_engine._update_flight_status("flight_3", "BLOCKED") fix_data = UserFixRequest(frame_id=20, uav_pixel=(-1, 500), satellite_gps=GPSPoint(lat=48.0, lon=37.0)) result = mocked_engine.apply_user_fix("flight_3", fix_data) assert result.status == "error" assert "invalid pixel" in result.message.lower() # --- Feature 02.2.03: Chunk Lifecycle Orchestration --- def test_get_and_create_chunk(self, mocked_engine): expected_chunk = ChunkHandle(chunk_id="chunk_99") mocked_engine.f12.get_active_chunk.return_value = expected_chunk mocked_engine.f12.create_chunk.return_value = ChunkHandle(chunk_id="chunk_100") assert mocked_engine.get_active_chunk("flight_4") == expected_chunk new_chunk = mocked_engine.create_new_chunk("flight_4", 100) assert new_chunk.chunk_id == "chunk_100" mocked_engine.f12.create_chunk.assert_called_once_with("flight_4", 100) def test_get_active_chunk_none(self, mocked_engine): mocked_engine.f12.get_active_chunk.return_value = None assert mocked_engine.get_active_chunk("flight_no_chunk") is None def test_add_frame_to_active_chunk(self, mocked_engine): mock_chunk = ChunkHandle(chunk_id="chunk_test") mocked_engine.f12.get_active_chunk.return_value = mock_chunk res = FrameResult(frame_id=1, success=True, pose=Mock()) mocked_engine._add_frame_to_active_chunk("flight_add", 1, res) mocked_engine.f12.add_frame_to_chunk.assert_called_once_with("chunk_test", 1, res.pose) def test_chunk_boundary_detection_and_creation_on_loss(self, mocked_engine): # Provide sequence: Image -> Tracking Failed -> Stop Processing mocked_engine.f11.expand_search_radius.return_value = [] image_gen = iter([{"frame_id": 5}, None]) mock_img = Mock(sequence=5, image=np.zeros((10,10))) image_gen = iter([mock_img, None]) def get_next_image(fid): return next(image_gen, None) mocked_engine.f05.get_next_image.side_effect = get_next_image mocked_engine.f07.compute_relative_pose.return_value = None # Force tracking loss mocked_engine.f12.create_chunk.return_value = ChunkHandle(chunk_id="chunk_5") mocked_engine.start_processing("flight_boundary_test") mocked_engine._threads["flight_boundary_test"].join(timeout=1.0) # Validate the proactive chunk creation was triggered upon tracking loss mocked_engine.f12.create_chunk.assert_called_once_with("flight_boundary_test", 5) # Ensure F11 recovery was escalated into after the proactive chunk creation mocked_engine.f11.start_search.assert_called_once() def test_chunk_lifecycle_integration_flow(self, mocked_engine): # Flow: Good Frame (1) -> Tracking Loss (2) -> New Chunk -> Good Frame (3) mocked_engine.f11.expand_search_radius.return_value = [] i1 = Mock(sequence=1, image=np.ones((10,10)) * 1) i2 = Mock(sequence=2, image=np.ones((10,10)) * 2) i3 = Mock(sequence=3, image=np.ones((10,10)) * 3) image_gen = iter([i1, i2, i3, None]) def get_next_image(fid): return next(image_gen, None) mocked_engine.f05.get_next_image.side_effect = get_next_image def compute_pose(last_img, img): if np.array_equal(img, i2.image): return None # Fail on frame 2 return Mock(tracking_good=True) mocked_engine.f07.compute_relative_pose.side_effect = compute_pose mocked_engine.f07.last_image = i1.image mock_chunk = ChunkHandle(chunk_id="chunk_test") mocked_engine.f12.get_active_chunk.return_value = mock_chunk mocked_engine.start_processing("flight_lifecycle") mocked_engine._threads["flight_lifecycle"].join(timeout=1.0) # Verifications # 2. Chunk creation triggered on frame 2 tracking loss mocked_engine.f12.create_chunk.assert_called_once_with("flight_lifecycle", 2) # 3. Frame 3 added to the chunk (tracking recovered/resumed) mocked_engine.f12.add_frame_to_chunk.assert_any_call("chunk_test", 3, ANY)