mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 21:56:38 +00:00
dd9835c0cd
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001) - Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests - Removed unused imports (F401) in models.py, schemas/__init__.py - pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces ruff check: 0 errors. pytest: 195 passed / 8 skipped. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
128 lines
4.6 KiB
Python
128 lines
4.6 KiB
Python
"""Tests for FlightProcessor full processing pipeline (Stage 10)."""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied.core.chunk_manager import RouteChunkManager
|
|
from gps_denied.core.gpr import GlobalPlaceRecognition
|
|
from gps_denied.core.graph import FactorGraphOptimizer
|
|
from gps_denied.core.metric import MetricRefinement
|
|
from gps_denied.core.models import ModelManager
|
|
from gps_denied.core.processor import FlightProcessor, TrackingState
|
|
from gps_denied.core.recovery import FailureRecoveryCoordinator
|
|
from gps_denied.core.vo import SequentialVisualOdometry
|
|
from gps_denied.schemas.graph import FactorGraphConfig
|
|
|
|
|
|
@pytest.fixture
|
|
def processor():
|
|
repo = MagicMock()
|
|
streamer = MagicMock()
|
|
streamer.push_event = AsyncMock()
|
|
|
|
proc = FlightProcessor(repo, streamer)
|
|
|
|
mm = ModelManager()
|
|
vo = SequentialVisualOdometry(mm)
|
|
gpr = GlobalPlaceRecognition(mm)
|
|
gpr.load_index("test", "dummy")
|
|
metric = MetricRefinement(mm)
|
|
graph = FactorGraphOptimizer(FactorGraphConfig())
|
|
chunk_mgr = RouteChunkManager(graph)
|
|
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
|
|
|
|
proc.attach_components(
|
|
vo=vo, gpr=gpr, metric=metric,
|
|
graph=graph, recovery=recovery, chunk_mgr=chunk_mgr
|
|
)
|
|
return proc
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_first_frame(processor):
|
|
"""First frame has no previous image → VO skips, should still publish."""
|
|
img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
|
|
result = await processor.process_frame("fl1", 0, img)
|
|
|
|
assert result.frame_id == 0
|
|
assert result.tracking_state == TrackingState.NORMAL
|
|
# First frame — no previous so VO is skipped
|
|
assert result.vo_success is False
|
|
# SSE event published
|
|
processor.streamer.push_event.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_consecutive_frames_normal(processor):
|
|
"""Two consecutive frames → VO attempts (mock may or may not succeed)."""
|
|
flight = "fl2"
|
|
img1 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
|
|
img2 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
|
|
|
|
r1 = await processor.process_frame(flight, 0, img1)
|
|
r2 = await processor.process_frame(flight, 1, img2)
|
|
|
|
assert r1.frame_id == 0
|
|
assert r2.frame_id == 1
|
|
# At minimum, both published SSE
|
|
assert processor.streamer.push_event.call_count == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tracking_loss_triggers_recovery(processor, monkeypatch):
|
|
"""When VO returns tracking_good=False, state transitions to RECOVERY."""
|
|
flight = "fl3"
|
|
img0 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
|
|
img1 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
|
|
|
|
# Process frame 0 — will be NORMAL
|
|
await processor.process_frame(flight, 0, img0)
|
|
|
|
# Force VO to fail by monkeypatching
|
|
def bad_vo(*args, **kwargs):
|
|
from gps_denied.schemas.vo import RelativePose
|
|
return RelativePose(
|
|
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
|
|
confidence=0.0, inlier_count=0, total_matches=0, tracking_good=False
|
|
)
|
|
|
|
monkeypatch.setattr(processor._vo, "compute_relative_pose", bad_vo)
|
|
|
|
# Also force recovery NOT to succeed so state stays RECOVERY
|
|
monkeypatch.setattr(processor._recovery, "process_chunk_recovery", lambda *a, **k: False)
|
|
|
|
r1 = await processor.process_frame(flight, 1, img1)
|
|
|
|
# State should be RECOVERY (LOST transitioning immediately)
|
|
assert r1.tracking_state == TrackingState.RECOVERY
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_state_machine_full_cycle(processor, monkeypatch):
|
|
"""Full cycle: NORMAL → LOST → RECOVERY → NORMAL."""
|
|
flight = "fl_cycle"
|
|
imgs = [np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8) for _ in range(3)]
|
|
|
|
# Frame 0: NORMAL
|
|
r0 = await processor.process_frame(flight, 0, imgs[0])
|
|
assert r0.tracking_state == TrackingState.NORMAL
|
|
|
|
# Frame 1: Force VO fail → RECOVERY
|
|
def bad_vo(*a, **k):
|
|
from gps_denied.schemas.vo import RelativePose
|
|
return RelativePose(
|
|
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
|
|
confidence=0, inlier_count=0, total_matches=0, tracking_good=False
|
|
)
|
|
monkeypatch.setattr(processor._vo, "compute_relative_pose", bad_vo)
|
|
|
|
# Also force recovery to succeed
|
|
monkeypatch.setattr(processor._recovery, "process_chunk_recovery", lambda *a, **k: True)
|
|
|
|
r1 = await processor.process_frame(flight, 1, imgs[1])
|
|
# Should have recovered back to NORMAL
|
|
assert r1.tracking_state == TrackingState.NORMAL
|
|
assert r1.alignment_success is True
|