mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 22:46:36 +00:00
dd9835c0cd
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001) - Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests - Removed unused imports (F401) in models.py, schemas/__init__.py - pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces ruff check: 0 errors. pytest: 195 passed / 8 skipped. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
212 lines
7.5 KiB
Python
212 lines
7.5 KiB
Python
"""Acceptance & Performance Tests (Stage 11).
|
|
|
|
Scenarios tested:
|
|
AC-1: Normal flight — 20 frames processed without tracking loss.
|
|
AC-2: Tracking loss & recovery — VO fails mid-flight, recovery re-anchors.
|
|
AC-3: Performance — process_frame < 5 s per frame.
|
|
AC-4: User anchor fix — user provides GPS fix, graph snaps.
|
|
"""
|
|
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied.core.chunk_manager import RouteChunkManager
|
|
from gps_denied.core.gpr import GlobalPlaceRecognition
|
|
from gps_denied.core.graph import FactorGraphOptimizer
|
|
from gps_denied.core.metric import MetricRefinement
|
|
from gps_denied.core.models import ModelManager
|
|
from gps_denied.core.processor import FlightProcessor, TrackingState
|
|
from gps_denied.core.recovery import FailureRecoveryCoordinator
|
|
from gps_denied.core.vo import SequentialVisualOdometry
|
|
from gps_denied.schemas.graph import FactorGraphConfig
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------
|
|
@pytest.fixture
|
|
def wired_processor():
|
|
"""Fully wired FlightProcessor with all components attached."""
|
|
repo = MagicMock()
|
|
streamer = MagicMock()
|
|
streamer.push_event = AsyncMock()
|
|
|
|
proc = FlightProcessor(repo, streamer)
|
|
|
|
mm = ModelManager()
|
|
vo = SequentialVisualOdometry(mm)
|
|
gpr = GlobalPlaceRecognition(mm)
|
|
gpr.load_index("ac", "dummy")
|
|
metric = MetricRefinement(mm)
|
|
graph = FactorGraphOptimizer(FactorGraphConfig())
|
|
chunk_mgr = RouteChunkManager(graph)
|
|
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
|
|
|
|
proc.attach_components(
|
|
vo=vo, gpr=gpr, metric=metric,
|
|
graph=graph, recovery=recovery, chunk_mgr=chunk_mgr,
|
|
)
|
|
return proc
|
|
|
|
|
|
def _random_frame(h=200, w=200):
|
|
return np.random.randint(0, 255, (h, w, 3), dtype=np.uint8)
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-1: Normal flight — 20 consecutive frames
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac1_normal_flight(wired_processor):
|
|
"""Twenty frames processed without crash; SSE events emitted for each."""
|
|
flight = "ac1_normal"
|
|
for i in range(20):
|
|
result = await wired_processor.process_frame(flight, i, _random_frame())
|
|
assert result.frame_id == i
|
|
|
|
# All 20 frames should have emitted SSE
|
|
assert wired_processor.streamer.push_event.call_count == 20
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-2: Tracking loss → recovery cycle
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac2_tracking_loss_and_recovery(wired_processor, monkeypatch):
|
|
"""
|
|
Frames 0-4 normal, frame 5 VO fails (LOST),
|
|
recovery succeeds → back to NORMAL.
|
|
"""
|
|
flight = "ac2_loss"
|
|
|
|
call_count = {"n": 0}
|
|
|
|
def _controlled_vo(*a, **k):
|
|
from gps_denied.schemas.vo import RelativePose
|
|
call_count["n"] += 1
|
|
if call_count["n"] <= 4:
|
|
# Good VO for frames 1-4
|
|
return RelativePose(
|
|
translation=np.array([1.0, 0, 0]), rotation=np.eye(3), covariance=np.eye(6),
|
|
confidence=0.9, inlier_count=100, total_matches=120, tracking_good=True,
|
|
)
|
|
# Bad VO for frame 5
|
|
return RelativePose(
|
|
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
|
|
confidence=0, inlier_count=0, total_matches=0, tracking_good=False,
|
|
)
|
|
|
|
monkeypatch.setattr(wired_processor._vo, "compute_relative_pose", _controlled_vo)
|
|
|
|
# Frames 0-4 normal
|
|
for i in range(5):
|
|
r = await wired_processor.process_frame(flight, i, _random_frame())
|
|
assert r.tracking_state == TrackingState.NORMAL
|
|
|
|
# Frame 5: VO fails → recovery
|
|
monkeypatch.setattr(
|
|
wired_processor._recovery, "process_chunk_recovery", lambda *a, **k: True
|
|
)
|
|
|
|
r5 = await wired_processor.process_frame(flight, 5, _random_frame())
|
|
# Recovery succeeded → back to NORMAL
|
|
assert r5.tracking_state == TrackingState.NORMAL
|
|
assert r5.alignment_success is True
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-3: Performance — < 5 s per frame
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac3_performance_per_frame(wired_processor):
|
|
"""Each process_frame call must complete in < 5 seconds (mock pipeline)."""
|
|
flight = "ac3_perf"
|
|
timings = []
|
|
for i in range(10):
|
|
t0 = time.perf_counter()
|
|
await wired_processor.process_frame(flight, i, _random_frame())
|
|
elapsed = time.perf_counter() - t0
|
|
timings.append(elapsed)
|
|
|
|
avg = sum(timings) / len(timings)
|
|
max_t = max(timings)
|
|
|
|
# With mocks this should be sub-second
|
|
assert max_t < 5.0, f"Max frame time {max_t:.3f}s exceeds 5s limit"
|
|
assert avg < 1.0, f"Average frame time {avg:.3f}s unexpectedly high"
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-4: User anchor fix
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac4_user_anchor_fix(wired_processor):
|
|
"""
|
|
Verify that add_absolute_factor with is_user_anchor=True is accepted
|
|
by the graph and the trajectory incorporates the anchor.
|
|
"""
|
|
from datetime import datetime
|
|
|
|
from gps_denied.schemas import GPSPoint
|
|
from gps_denied.schemas.graph import Pose
|
|
|
|
flight = "ac4_anchor"
|
|
graph = wired_processor._graph
|
|
|
|
# Inject two poses
|
|
graph._init_flight(flight)
|
|
graph._flights_state[flight]["poses"][0] = Pose(
|
|
frame_id=0, position=np.zeros(3),
|
|
orientation=np.eye(3), timestamp=datetime.now(),
|
|
)
|
|
graph._flights_state[flight]["poses"][1] = Pose(
|
|
frame_id=1, position=np.zeros(3),
|
|
orientation=np.eye(3), timestamp=datetime.now(),
|
|
)
|
|
|
|
# First GPS sets origin
|
|
origin = GPSPoint(lat=49.0, lon=30.0)
|
|
graph.add_absolute_factor(flight, 0, origin, np.eye(2), is_user_anchor=True)
|
|
|
|
# Second GPS — 0.5° north
|
|
gps2 = GPSPoint(lat=49.5, lon=31.0)
|
|
ok = graph.add_absolute_factor(flight, 1, gps2, np.eye(2), is_user_anchor=True)
|
|
assert ok is True
|
|
|
|
traj = graph.get_trajectory(flight)
|
|
assert traj[1].position[1] > 50000 # ~55 km north of origin
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-5: Sustained throughput — 50 frames
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac5_sustained_throughput(wired_processor):
|
|
"""Process 50 frames back-to-back; no crashes, total < 30 seconds."""
|
|
flight = "ac5_sustained"
|
|
t0 = time.perf_counter()
|
|
for i in range(50):
|
|
await wired_processor.process_frame(flight, i, _random_frame())
|
|
total = time.perf_counter() - t0
|
|
|
|
assert total < 30.0, f"Total time for 50 frames: {total:.2f}s"
|
|
assert wired_processor.streamer.push_event.call_count == 50
|
|
|
|
|
|
# ---------------------------------------------------------------
|
|
# AC-6: Factor graph optimization converges
|
|
# ---------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_ac6_graph_optimization_convergence(wired_processor):
|
|
"""After N frames the graph should report convergence."""
|
|
flight = "ac6_converge"
|
|
for i in range(10):
|
|
await wired_processor.process_frame(flight, i, _random_frame())
|
|
|
|
opt = wired_processor._graph.optimize(flight, 10)
|
|
assert opt.converged is True
|
|
assert opt.final_error < 1.0
|