import pytest import os import json import numpy as np import time import tempfile from unittest.mock import Mock, patch from f02_1_flight_lifecycle_manager import GPSPoint from f08_global_place_recognition import ( GlobalPlaceRecognition, TileCandidate, IndexNotFoundError, IndexCorruptedError, MetadataMismatchError ) @pytest.fixture def gpr(): return GlobalPlaceRecognition() @pytest.fixture def dummy_image(): return np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8) @pytest.fixture def mock_index_files(): with tempfile.TemporaryDirectory() as tmpdir: index_path = os.path.join(tmpdir, "test.index") meta_path = os.path.join(tmpdir, "test.json") # Create dummy Faiss index stub with open(index_path, 'w') as f: f.write("dummy_faiss_data") # Create metadata matching 1000 items meta_dict = {str(i): {"tile_id": f"t_{i}", "lat": 48.0, "lon": 37.0} for i in range(1000)} with open(meta_path, 'w') as f: json.dump(meta_dict, f) yield index_path, meta_path class TestGlobalPlaceRecognition: # --- Feature 08.02: Descriptor Computation --- def test_compute_location_descriptor_dimensions(self, gpr, dummy_image): desc = gpr.compute_location_descriptor(dummy_image) assert desc.shape == (4096,) assert desc.dtype == np.float32 def test_descriptor_normalization(self, gpr, dummy_image): desc = gpr.compute_location_descriptor(dummy_image) norm = np.linalg.norm(desc) assert np.isclose(norm, 1.0, atol=1e-5) def test_deterministic_output(self, gpr, dummy_image): desc1 = gpr.compute_location_descriptor(dummy_image) desc2 = gpr.compute_location_descriptor(dummy_image) assert np.array_equal(desc1, desc2) def test_compute_chunk_descriptor_empty(self, gpr): with pytest.raises(ValueError): gpr.compute_chunk_descriptor([]) def test_compute_chunk_descriptor_single(self, gpr, dummy_image): desc_single = gpr.compute_location_descriptor(dummy_image) desc_chunk = gpr.compute_chunk_descriptor([dummy_image]) assert np.allclose(desc_single, desc_chunk, atol=1e-5) def test_compute_chunk_descriptor_multiple(self, gpr): img1 = np.ones((100, 100, 3), dtype=np.uint8) * 10 img2 = np.ones((100, 100, 3), dtype=np.uint8) * 200 chunk = [img1, img2] desc = gpr.compute_chunk_descriptor(chunk) assert desc.shape == (4096,) assert np.isclose(np.linalg.norm(desc), 1.0, atol=1e-5) def test_season_invariance_simulation(self, gpr): # Simulate by verifying the system relies on Cosine Similarity (Dot product of L2 normalized vectors) desc1 = gpr._l2_normalize(np.ones(4096, dtype=np.float32)) desc2 = gpr._l2_normalize(np.ones(4096, dtype=np.float32) + np.random.rand(4096)*0.1) sim = np.dot(desc1, desc2) assert sim > 0.7 # Meets season invariance similarity threshold def test_location_discrimination_simulation(self, gpr): # Different locations should have orthogonal or negatively correlated descriptors desc1 = gpr._l2_normalize(np.ones(4096, dtype=np.float32)) desc2 = gpr._l2_normalize(-np.ones(4096, dtype=np.float32) + np.random.rand(4096)*0.1) sim = np.dot(desc1, desc2) assert sim < 0.5 # Distinct locations discriminated def test_chunk_robustness_variance(self, gpr, dummy_image): # The aggregated chunk descriptor should represent a robust cluster center chunk = [dummy_image + i for i in range(10)] descs = [gpr.compute_location_descriptor(img) for img in chunk] chunk_desc = gpr.compute_chunk_descriptor(chunk) assert np.isclose(np.linalg.norm(chunk_desc), 1.0, atol=1e-5) distances = [np.linalg.norm(chunk_desc - d) for d in descs] assert np.mean(distances) < 1.0 # Chunk desc forms a strong centroid for the set # --- Integration Tests --- def test_model_manager_integration(self): mock_manager = Mock() mock_manager.run_dinov2.return_value = np.random.rand(256, 384).astype(np.float32) gpr_model = GlobalPlaceRecognition(model_manager=mock_manager) img = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8) gpr_model.compute_location_descriptor(img) mock_manager.run_dinov2.assert_called_once() def test_descriptor_performance_budget(self, gpr, dummy_image): start = time.time() gpr.compute_location_descriptor(dummy_image) assert time.time() - start < 0.200 # Single inference budget ~150ms def test_chunk_performance_budget(self, gpr, dummy_image): chunk = [dummy_image for _ in range(10)] start = time.time() gpr.compute_chunk_descriptor(chunk) assert time.time() - start < 2.0 # Chunk budget (< 2s for 10 images) # --- Feature 08.01: Index Management --- def test_load_valid_index(self, gpr, mock_index_files): index_path, _ = mock_index_files assert gpr.load_index("flight_1", index_path) is True assert gpr.is_index_loaded is True assert len(gpr.tile_metadata) == 1000 def test_index_not_found(self, gpr): with pytest.raises(IndexNotFoundError): gpr.load_index("flight_2", "/path/to/nowhere.index") def test_dimension_validation(self, gpr): with pytest.raises(IndexCorruptedError): gpr._validate_index_integrity(index_dim=128, expected_count=10) assert gpr._validate_index_integrity(4096, 10) is True assert gpr._validate_index_integrity(8192, 10) is True def test_metadata_mismatch(self, gpr, mock_index_files): index_path, meta_path = mock_index_files # Corrupt metadata to have only 500 entries, but mock faiss returns 1000 meta_dict = {str(i): {"tile_id": f"t_{i}"} for i in range(500)} with open(meta_path, 'w') as f: json.dump(meta_dict, f) with pytest.raises(MetadataMismatchError): gpr.load_index("flight_3", index_path) def test_empty_metadata_file(self, gpr, mock_index_files): index_path, meta_path = mock_index_files with open(meta_path, 'w') as f: f.write("") with pytest.raises(MetadataMismatchError, match="empty"): gpr.load_index("flight_empty", index_path) def test_load_performance(self, gpr, mock_index_files): index_path, meta_path = mock_index_files # Generate large metadata large_meta = {str(i): {"tile_id": f"t_{i}"} for i in range(10000)} with open(meta_path, 'w') as f: json.dump(large_meta, f) # Mock Faiss to match 10000 items if gpr.faiss_manager: gpr.faiss_manager.get_stats.return_value = (10000, 4096) else: gpr._validate_index_integrity = Mock(return_value=True) gpr._verify_metadata_alignment = Mock(return_value=True) start_time = time.time() gpr.load_index("flight_perf", index_path) duration = time.time() - start_time assert duration < 10.0 def test_faiss_manager_integration(self, mock_index_files): mock_faiss = Mock() mock_faiss.get_stats.return_value = (1000, 4096) gpr_with_faiss = GlobalPlaceRecognition(faiss_manager=mock_faiss) index_path, _ = mock_index_files assert gpr_with_faiss.load_index("flight_faiss", index_path) is True mock_faiss.load_index.assert_called_once_with(index_path) # --- Feature 08.03: Candidate Retrieval --- def test_query_database_unloaded(self, gpr): desc = np.random.rand(4096).astype(np.float32) matches = gpr.query_database(desc, top_k=5) assert len(matches) == 0 def test_query_database_returns_top_k(self, gpr, mock_index_files): index_path, _ = mock_index_files gpr.load_index("flight_4", index_path) desc = np.random.rand(4096).astype(np.float32) matches = gpr.query_database(desc, top_k=5) assert len(matches) == 5 assert matches[0].distance <= matches[-1].distance # Sorted ascending assert 0.0 <= matches[0].similarity_score <= 1.0 def test_query_after_load(self, gpr, mock_index_files): index_path, _ = mock_index_files gpr.load_index("flight_query", index_path) desc = np.random.rand(4096).astype(np.float32) matches = gpr.query_database(desc, top_k=5) assert len(matches) > 0 def test_query_performance(self, gpr, mock_index_files): index_path, _ = mock_index_files gpr.load_index("flight_query_perf", index_path) desc = np.random.rand(4096).astype(np.float32) start = time.time() gpr.query_database(desc, top_k=5) assert time.time() - start < 0.05 # < 50ms def test_distance_to_similarity(self, gpr): assert gpr._distance_to_similarity(0.0) == 1.0 assert gpr._distance_to_similarity(2.0) == 0.0 assert gpr._distance_to_similarity(1.0) == 0.75 def test_rank_candidates(self, gpr): cands = [ TileCandidate(tile_id="1", gps_center=GPSPoint(lat=0, lon=0), similarity_score=0.5, rank=0), TileCandidate(tile_id="2", gps_center=GPSPoint(lat=0, lon=0), similarity_score=0.9, rank=0), TileCandidate(tile_id="3", gps_center=GPSPoint(lat=0, lon=0), similarity_score=0.7, rank=0) ] ranked = gpr.rank_candidates(cands) assert ranked[0].tile_id == "2" assert ranked[0].rank == 1 assert ranked[-1].tile_id == "1" assert ranked[-1].rank == 3 def test_spatial_reranking(self, gpr): # Mock _apply_spatial_reranking to test its effect def mock_rerank(cands, dead_reckoning_estimate=None): if dead_reckoning_estimate: cands.sort(key=lambda c: abs(c.gps_center.lat - dead_reckoning_estimate.lat)) return cands with patch.object(gpr, '_apply_spatial_reranking', side_effect=mock_rerank): cands = [ TileCandidate(tile_id="1", gps_center=GPSPoint(lat=50.0, lon=0), similarity_score=0.9, rank=0), TileCandidate(tile_id="2", gps_center=GPSPoint(lat=48.1, lon=0), similarity_score=0.8, rank=0) ] dead_reckoning = GPSPoint(lat=48.0, lon=0) # Without dead reckoning, should sort by similarity ranked_sim = gpr.rank_candidates(cands.copy()) assert ranked_sim[0].tile_id == "1" # With dead reckoning, should sort by proximity ranked_spatial = gpr._apply_spatial_reranking(cands.copy(), dead_reckoning) assert ranked_spatial[0].tile_id == "2" def test_retrieve_candidate_tiles_integration(self, gpr, mock_index_files, dummy_image): index_path, _ = mock_index_files gpr.load_index("flight_5", index_path) candidates = gpr.retrieve_candidate_tiles(dummy_image, top_k=3) assert len(candidates) == 3 assert candidates[0].rank == 1 chunk = [dummy_image, dummy_image] chunk_candidates = gpr.retrieve_candidate_tiles_for_chunk(chunk, top_k=3) assert len(chunk_candidates) == 3 def test_retrieve_candidate_tiles_performance(self, gpr, mock_index_files, dummy_image): index_path, _ = mock_index_files gpr.load_index("flight_perf_e2e", index_path) start = time.time() gpr.retrieve_candidate_tiles(dummy_image, top_k=5) assert time.time() - start < 0.250 # < 200ms budget + buffer def test_chunk_retrieval_more_accurate(self, gpr, mock_index_files): # This is a conceptual test; real accuracy requires a ground-truth dataset. # We simulate it by checking if the chunk descriptor is different from a single one. index_path, _ = mock_index_files gpr.load_index("flight_chunk_acc", index_path) img1 = np.ones((224, 224, 3), dtype=np.uint8) * 50 img2 = np.ones((224, 224, 3), dtype=np.uint8) * 150 cands1 = gpr.retrieve_candidate_tiles(img1, top_k=1) cands_chunk = gpr.retrieve_candidate_tiles_for_chunk([img1, img2], top_k=1) assert cands1[0].tile_id != cands_chunk[0].tile_id