import pytest import numpy as np from datetime import datetime from unittest.mock import Mock, patch from f11_failure_recovery_coordinator import ( FailureRecoveryCoordinator, ConfidenceAssessment, SearchSession, RelativePose, AlignmentResult, GPSPoint, TileCandidate, ChunkHandle, UserAnchor, Sim3Transform, ChunkAlignmentResult ) @pytest.fixture def deps(): return { "satellite_data_manager": Mock(), "image_rotation_manager": Mock(), "global_place_recognition": Mock(), "metric_refinement": Mock(), "factor_graph_optimizer": Mock(), "route_chunk_manager": Mock() } @pytest.fixture def coordinator(deps): return FailureRecoveryCoordinator(deps) class TestFailureRecoveryCoordinator: # --- 11.01 Feature: Confidence Assessment --- def test_check_confidence_good(self, coordinator): vo_res = RelativePose(transform=np.eye(4), inlier_count=60, reprojection_error=0.5) ls_res = AlignmentResult(matched=True, gps=GPSPoint(lat=0, lon=0), confidence=0.8, inlier_count=40, transform=np.eye(3)) assessment = coordinator.check_confidence(vo_res, ls_res) assert assessment.tracking_status == "good" assert assessment.overall_confidence == 1.0 def test_check_confidence_degraded(self, coordinator): vo_res = RelativePose(transform=np.eye(4), inlier_count=30, reprojection_error=1.5) assessment = coordinator.check_confidence(vo_res, None) assert assessment.tracking_status == "degraded" assert assessment.overall_confidence == 30 / 50.0 def test_check_confidence_lost(self, coordinator): vo_res = RelativePose(transform=np.eye(4), inlier_count=10, reprojection_error=3.0) assessment = coordinator.check_confidence(vo_res, None) assert assessment.tracking_status == "lost" assert coordinator.detect_tracking_loss(assessment) is True # --- 11.02 Feature: Progressive Search --- def test_progressive_search(self, coordinator): session = coordinator.start_search("fl1", 1, GPSPoint(lat=48.0, lon=37.0)) assert session.current_grid_size == 1 coordinator.f04.expand_search_grid.return_value = ["tile1", "tile2", "tile3"] tiles = coordinator.expand_search_radius(session) assert session.current_grid_size == 4 assert len(tiles) == 3 tiles = coordinator.expand_search_radius(session) assert session.current_grid_size == 9 # skip to 25 session.current_grid_size = 25 tiles = coordinator.expand_search_radius(session) assert session.exhausted is True def test_try_current_grid(self, coordinator): session = coordinator.start_search("fl1", 1, GPSPoint(lat=48.0, lon=37.0)) mock_res = AlignmentResult(matched=True, gps=GPSPoint(lat=48.0, lon=37.0), confidence=0.8, inlier_count=40, transform=np.eye(3)) coordinator.f09.align_to_satellite.return_value = mock_res res = coordinator.try_current_grid(session, {"t1": (np.zeros((10,10)), Mock())}, np.zeros((10,10))) assert res is not None assert session.found is True # --- 11.03 Feature: User Input Handling --- def test_user_input(self, coordinator): req = coordinator.create_user_input_request("fl1", 1, np.zeros((10,10)), []) assert req.request_id == "usr_req_fl1_1" coordinator.f12.get_chunk_for_frame.return_value = "chunk_1" anchor = UserAnchor(uav_pixel=(10,10), satellite_gps=GPSPoint(lat=48, lon=37)) assert coordinator.apply_user_anchor("fl1", 1, anchor) is True coordinator.f10.add_chunk_anchor.assert_called_once() # --- 11.04 Feature: Chunk Recovery Coordination --- def test_chunk_creation(self, coordinator): mock_chunk = ChunkHandle(chunk_id="chunk_2", flight_id="fl1", start_frame_id=2, frames=[], is_active=True, has_anchor=False, matching_status="unanchored") coordinator.f12.create_chunk.return_value = mock_chunk res = coordinator.create_chunk_on_tracking_loss("fl1", 2) assert res.chunk_id == "chunk_2" def test_try_chunk_semantic_matching(self, coordinator): coordinator.f12.get_chunk_images.return_value = [np.zeros((10,10))] coordinator.f08.retrieve_candidate_tiles_for_chunk.return_value = [TileCandidate(tile_id="t1", score=0.9, gps=GPSPoint(lat=0,lon=0), rank=1, similarity_score=0.9)] cands = coordinator.try_chunk_semantic_matching("chunk_1") assert cands is not None assert len(cands) == 1 def test_try_chunk_litesam_matching(self, coordinator): coordinator.f12.get_chunk_images.return_value = [np.zeros((10,10))] cands = [TileCandidate(tile_id="t1", score=0.9, gps=GPSPoint(lat=0,lon=0), rank=1, similarity_score=0.9)] coordinator.f04.fetch_tile.return_value = np.zeros((10,10)) mock_rot_res = Mock(matched=True, precise_angle=120.0, confidence=0.9, inlier_count=50, homography=np.eye(3)) coordinator.f06.try_chunk_rotation_steps.return_value = mock_rot_res # Mock F09 helpers coordinator.f09._compute_sim3_transform.return_value = Sim3Transform(translation=np.zeros(3), rotation=np.eye(3), scale=1.0) coordinator.f09._get_chunk_center_gps.return_value = GPSPoint(lat=0,lon=0) res = coordinator.try_chunk_litesam_matching("chunk_1", cands) assert res is not None assert res.rotation_angle == 120.0 def test_merge_chunk_to_trajectory(self, coordinator): coordinator.f12.get_preceding_chunk.return_value = "main" coordinator.f12.merge_chunks.return_value = True align_res = ChunkAlignmentResult(matched=True, chunk_id="chunk_1", chunk_center_gps=GPSPoint(lat=0,lon=0), rotation_angle=0, confidence=0.9, inlier_count=50, transform=Sim3Transform(translation=np.zeros(3), rotation=np.eye(3), scale=1.0), reprojection_error=0.0) assert coordinator.merge_chunk_to_trajectory("fl1", "chunk_1", align_res) is True coordinator.f12.mark_chunk_anchored.assert_called_once() coordinator.f12.merge_chunks.assert_called_once() def test_process_unanchored_chunks(self, coordinator): mock_chunk = Mock() mock_chunk.chunk_id = "chunk_1" coordinator.f12.get_chunks_for_matching.return_value = [mock_chunk] coordinator.f12.is_chunk_ready_for_matching.return_value = True # Setup successful matching path coordinator.f12.get_chunk_images.return_value = [np.zeros((10,10))] cands = [TileCandidate(tile_id="t1", score=0.9, gps=GPSPoint(lat=0,lon=0), rank=1, similarity_score=0.9)] coordinator.f08.retrieve_candidate_tiles_for_chunk.return_value = cands coordinator.f04.fetch_tile.return_value = np.zeros((10,10)) mock_rot_res = Mock(matched=True, precise_angle=0.0, confidence=0.9, inlier_count=50, homography=np.eye(3)) coordinator.f06.try_chunk_rotation_steps.return_value = mock_rot_res coordinator.f09._compute_sim3_transform.return_value = Sim3Transform(translation=np.zeros(3), rotation=np.eye(3), scale=1.0) coordinator.f09._get_chunk_center_gps.return_value = GPSPoint(lat=0,lon=0) coordinator.f12.get_preceding_chunk.return_value = "main" coordinator.f12.merge_chunks.return_value = True coordinator.process_unanchored_chunks("fl1") coordinator.f12.mark_chunk_matching.assert_called_once_with("chunk_1") coordinator.f12.mark_chunk_anchored.assert_called_once() coordinator.f12.merge_chunks.assert_called_once()