mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-23 01:06:36 +00:00
feat: stage11 — Acceptance tests & performance benchmarks (80 tests)
This commit is contained in:
@@ -0,0 +1,201 @@
|
||||
"""Acceptance & Performance Tests (Stage 11).
|
||||
|
||||
Scenarios tested:
|
||||
AC-1: Normal flight — 20 frames processed without tracking loss.
|
||||
AC-2: Tracking loss & recovery — VO fails mid-flight, recovery re-anchors.
|
||||
AC-3: Performance — process_frame < 5 s per frame.
|
||||
AC-4: User anchor fix — user provides GPS fix, graph snaps.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock
|
||||
|
||||
from gps_denied.core.processor import FlightProcessor, TrackingState
|
||||
from gps_denied.core.models import ModelManager
|
||||
from gps_denied.core.vo import SequentialVisualOdometry
|
||||
from gps_denied.core.gpr import GlobalPlaceRecognition
|
||||
from gps_denied.core.metric import MetricRefinement
|
||||
from gps_denied.core.graph import FactorGraphOptimizer
|
||||
from gps_denied.core.chunk_manager import RouteChunkManager
|
||||
from gps_denied.core.recovery import FailureRecoveryCoordinator
|
||||
from gps_denied.schemas.graph import FactorGraphConfig
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.fixture
|
||||
def wired_processor():
|
||||
"""Fully wired FlightProcessor with all components attached."""
|
||||
repo = MagicMock()
|
||||
streamer = MagicMock()
|
||||
streamer.push_event = AsyncMock()
|
||||
|
||||
proc = FlightProcessor(repo, streamer)
|
||||
|
||||
mm = ModelManager()
|
||||
vo = SequentialVisualOdometry(mm)
|
||||
gpr = GlobalPlaceRecognition(mm)
|
||||
gpr.load_index("ac", "dummy")
|
||||
metric = MetricRefinement(mm)
|
||||
graph = FactorGraphOptimizer(FactorGraphConfig())
|
||||
chunk_mgr = RouteChunkManager(graph)
|
||||
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
|
||||
|
||||
proc.attach_components(
|
||||
vo=vo, gpr=gpr, metric=metric,
|
||||
graph=graph, recovery=recovery, chunk_mgr=chunk_mgr,
|
||||
)
|
||||
return proc
|
||||
|
||||
|
||||
def _random_frame(h=200, w=200):
|
||||
return np.random.randint(0, 255, (h, w, 3), dtype=np.uint8)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-1: Normal flight — 20 consecutive frames
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac1_normal_flight(wired_processor):
|
||||
"""Twenty frames processed without crash; SSE events emitted for each."""
|
||||
flight = "ac1_normal"
|
||||
for i in range(20):
|
||||
result = await wired_processor.process_frame(flight, i, _random_frame())
|
||||
assert result.frame_id == i
|
||||
|
||||
# All 20 frames should have emitted SSE
|
||||
assert wired_processor.streamer.push_event.call_count == 20
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-2: Tracking loss → recovery cycle
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac2_tracking_loss_and_recovery(wired_processor, monkeypatch):
|
||||
"""
|
||||
Frames 0-4 normal, frame 5 VO fails (LOST),
|
||||
recovery succeeds → back to NORMAL.
|
||||
"""
|
||||
flight = "ac2_loss"
|
||||
|
||||
call_count = {"n": 0}
|
||||
|
||||
def _controlled_vo(*a, **k):
|
||||
from gps_denied.schemas.vo import RelativePose
|
||||
call_count["n"] += 1
|
||||
if call_count["n"] <= 4:
|
||||
# Good VO for frames 1-4
|
||||
return RelativePose(
|
||||
translation=np.array([1.0, 0, 0]), rotation=np.eye(3), covariance=np.eye(6),
|
||||
confidence=0.9, inlier_count=100, total_matches=120, tracking_good=True,
|
||||
)
|
||||
# Bad VO for frame 5
|
||||
return RelativePose(
|
||||
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
|
||||
confidence=0, inlier_count=0, total_matches=0, tracking_good=False,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(wired_processor._vo, "compute_relative_pose", _controlled_vo)
|
||||
|
||||
# Frames 0-4 normal
|
||||
for i in range(5):
|
||||
r = await wired_processor.process_frame(flight, i, _random_frame())
|
||||
assert r.tracking_state == TrackingState.NORMAL
|
||||
|
||||
# Frame 5: VO fails → recovery
|
||||
monkeypatch.setattr(
|
||||
wired_processor._recovery, "process_chunk_recovery", lambda *a, **k: True
|
||||
)
|
||||
|
||||
r5 = await wired_processor.process_frame(flight, 5, _random_frame())
|
||||
# Recovery succeeded → back to NORMAL
|
||||
assert r5.tracking_state == TrackingState.NORMAL
|
||||
assert r5.alignment_success is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-3: Performance — < 5 s per frame
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac3_performance_per_frame(wired_processor):
|
||||
"""Each process_frame call must complete in < 5 seconds (mock pipeline)."""
|
||||
flight = "ac3_perf"
|
||||
timings = []
|
||||
for i in range(10):
|
||||
t0 = time.perf_counter()
|
||||
await wired_processor.process_frame(flight, i, _random_frame())
|
||||
elapsed = time.perf_counter() - t0
|
||||
timings.append(elapsed)
|
||||
|
||||
avg = sum(timings) / len(timings)
|
||||
max_t = max(timings)
|
||||
|
||||
# With mocks this should be sub-second
|
||||
assert max_t < 5.0, f"Max frame time {max_t:.3f}s exceeds 5s limit"
|
||||
assert avg < 1.0, f"Average frame time {avg:.3f}s unexpectedly high"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-4: User anchor fix
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac4_user_anchor_fix(wired_processor):
|
||||
"""
|
||||
Verify that add_absolute_factor with is_user_anchor=True is accepted
|
||||
by the graph and the trajectory incorporates the anchor.
|
||||
"""
|
||||
from gps_denied.schemas.flight import GPSPoint
|
||||
from gps_denied.schemas.graph import Pose
|
||||
from datetime import datetime
|
||||
|
||||
flight = "ac4_anchor"
|
||||
graph = wired_processor._graph
|
||||
|
||||
# Inject initial pose
|
||||
graph._init_flight(flight)
|
||||
graph._flights_state[flight]["poses"][0] = Pose(
|
||||
frame_id=0, position=np.zeros(3),
|
||||
orientation=np.eye(3), timestamp=datetime.now(),
|
||||
)
|
||||
|
||||
gps = GPSPoint(lat=49.5, lon=31.0)
|
||||
ok = graph.add_absolute_factor(flight, 0, gps, np.eye(2), is_user_anchor=True)
|
||||
assert ok is True
|
||||
|
||||
traj = graph.get_trajectory(flight)
|
||||
assert traj[0].position[1] > 50000 # ~55 km north of origin
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-5: Sustained throughput — 50 frames
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac5_sustained_throughput(wired_processor):
|
||||
"""Process 50 frames back-to-back; no crashes, total < 30 seconds."""
|
||||
flight = "ac5_sustained"
|
||||
t0 = time.perf_counter()
|
||||
for i in range(50):
|
||||
await wired_processor.process_frame(flight, i, _random_frame())
|
||||
total = time.perf_counter() - t0
|
||||
|
||||
assert total < 30.0, f"Total time for 50 frames: {total:.2f}s"
|
||||
assert wired_processor.streamer.push_event.call_count == 50
|
||||
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# AC-6: Factor graph optimization converges
|
||||
# ---------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ac6_graph_optimization_convergence(wired_processor):
|
||||
"""After N frames the graph should report convergence."""
|
||||
flight = "ac6_converge"
|
||||
for i in range(10):
|
||||
await wired_processor.process_frame(flight, i, _random_frame())
|
||||
|
||||
opt = wired_processor._graph.optimize(flight, 10)
|
||||
assert opt.converged is True
|
||||
assert opt.final_error < 1.0
|
||||
Reference in New Issue
Block a user