"""Acceptance & Performance Tests (Stage 11). Scenarios tested: AC-1: Normal flight — 20 frames processed without tracking loss. AC-2: Tracking loss & recovery — VO fails mid-flight, recovery re-anchors. AC-3: Performance — process_frame < 5 s per frame. AC-4: User anchor fix — user provides GPS fix, graph snaps. """ import time import numpy as np import pytest from unittest.mock import MagicMock, AsyncMock from gps_denied.core.processor import FlightProcessor, TrackingState from gps_denied.core.models import ModelManager from gps_denied.core.vo import SequentialVisualOdometry from gps_denied.core.gpr import GlobalPlaceRecognition from gps_denied.core.metric import MetricRefinement from gps_denied.core.graph import FactorGraphOptimizer from gps_denied.core.chunk_manager import RouteChunkManager from gps_denied.core.recovery import FailureRecoveryCoordinator from gps_denied.schemas.graph import FactorGraphConfig # --------------------------------------------------------------- # Fixtures # --------------------------------------------------------------- @pytest.fixture def wired_processor(): """Fully wired FlightProcessor with all components attached.""" repo = MagicMock() streamer = MagicMock() streamer.push_event = AsyncMock() proc = FlightProcessor(repo, streamer) mm = ModelManager() vo = SequentialVisualOdometry(mm) gpr = GlobalPlaceRecognition(mm) gpr.load_index("ac", "dummy") metric = MetricRefinement(mm) graph = FactorGraphOptimizer(FactorGraphConfig()) chunk_mgr = RouteChunkManager(graph) recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric) proc.attach_components( vo=vo, gpr=gpr, metric=metric, graph=graph, recovery=recovery, chunk_mgr=chunk_mgr, ) return proc def _random_frame(h=200, w=200): return np.random.randint(0, 255, (h, w, 3), dtype=np.uint8) # --------------------------------------------------------------- # AC-1: Normal flight — 20 consecutive frames # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac1_normal_flight(wired_processor): """Twenty frames processed without crash; SSE events emitted for each.""" flight = "ac1_normal" for i in range(20): result = await wired_processor.process_frame(flight, i, _random_frame()) assert result.frame_id == i # All 20 frames should have emitted SSE assert wired_processor.streamer.push_event.call_count == 20 # --------------------------------------------------------------- # AC-2: Tracking loss → recovery cycle # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac2_tracking_loss_and_recovery(wired_processor, monkeypatch): """ Frames 0-4 normal, frame 5 VO fails (LOST), recovery succeeds → back to NORMAL. """ flight = "ac2_loss" call_count = {"n": 0} def _controlled_vo(*a, **k): from gps_denied.schemas.vo import RelativePose call_count["n"] += 1 if call_count["n"] <= 4: # Good VO for frames 1-4 return RelativePose( translation=np.array([1.0, 0, 0]), rotation=np.eye(3), covariance=np.eye(6), confidence=0.9, inlier_count=100, total_matches=120, tracking_good=True, ) # Bad VO for frame 5 return RelativePose( translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6), confidence=0, inlier_count=0, total_matches=0, tracking_good=False, ) monkeypatch.setattr(wired_processor._vo, "compute_relative_pose", _controlled_vo) # Frames 0-4 normal for i in range(5): r = await wired_processor.process_frame(flight, i, _random_frame()) assert r.tracking_state == TrackingState.NORMAL # Frame 5: VO fails → recovery monkeypatch.setattr( wired_processor._recovery, "process_chunk_recovery", lambda *a, **k: True ) r5 = await wired_processor.process_frame(flight, 5, _random_frame()) # Recovery succeeded → back to NORMAL assert r5.tracking_state == TrackingState.NORMAL assert r5.alignment_success is True # --------------------------------------------------------------- # AC-3: Performance — < 5 s per frame # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac3_performance_per_frame(wired_processor): """Each process_frame call must complete in < 5 seconds (mock pipeline).""" flight = "ac3_perf" timings = [] for i in range(10): t0 = time.perf_counter() await wired_processor.process_frame(flight, i, _random_frame()) elapsed = time.perf_counter() - t0 timings.append(elapsed) avg = sum(timings) / len(timings) max_t = max(timings) # With mocks this should be sub-second assert max_t < 5.0, f"Max frame time {max_t:.3f}s exceeds 5s limit" assert avg < 1.0, f"Average frame time {avg:.3f}s unexpectedly high" # --------------------------------------------------------------- # AC-4: User anchor fix # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac4_user_anchor_fix(wired_processor): """ Verify that add_absolute_factor with is_user_anchor=True is accepted by the graph and the trajectory incorporates the anchor. """ from gps_denied.schemas.flight import GPSPoint from gps_denied.schemas.graph import Pose from datetime import datetime flight = "ac4_anchor" graph = wired_processor._graph # Inject two poses graph._init_flight(flight) graph._flights_state[flight]["poses"][0] = Pose( frame_id=0, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now(), ) graph._flights_state[flight]["poses"][1] = Pose( frame_id=1, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now(), ) # First GPS sets origin origin = GPSPoint(lat=49.0, lon=30.0) graph.add_absolute_factor(flight, 0, origin, np.eye(2), is_user_anchor=True) # Second GPS — 0.5° north gps2 = GPSPoint(lat=49.5, lon=31.0) ok = graph.add_absolute_factor(flight, 1, gps2, np.eye(2), is_user_anchor=True) assert ok is True traj = graph.get_trajectory(flight) assert traj[1].position[1] > 50000 # ~55 km north of origin # --------------------------------------------------------------- # AC-5: Sustained throughput — 50 frames # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac5_sustained_throughput(wired_processor): """Process 50 frames back-to-back; no crashes, total < 30 seconds.""" flight = "ac5_sustained" t0 = time.perf_counter() for i in range(50): await wired_processor.process_frame(flight, i, _random_frame()) total = time.perf_counter() - t0 assert total < 30.0, f"Total time for 50 frames: {total:.2f}s" assert wired_processor.streamer.push_event.call_count == 50 # --------------------------------------------------------------- # AC-6: Factor graph optimization converges # --------------------------------------------------------------- @pytest.mark.asyncio async def test_ac6_graph_optimization_convergence(wired_processor): """After N frames the graph should report convergence.""" flight = "ac6_converge" for i in range(10): await wired_processor.process_frame(flight, i, _random_frame()) opt = wired_processor._graph.optimize(flight, 10) assert opt.converged is True assert opt.final_error < 1.0