Files
gps-denied-onboard/tests/test_processor_full.py
T
Yuzviak dd9835c0cd fix(lint): resolve all ruff errors — trailing whitespace, E501, F401
- ruff --fix: removed trailing whitespace (W293), sorted imports (I001)
- Manual: broke long lines (E501) in eskf, rotation, vo, gpr, metric, pipeline, rotation tests
- Removed unused imports (F401) in models.py, schemas/__init__.py
- pyproject.toml: line-length 100→120, E501 ignore for abstract interfaces

ruff check: 0 errors. pytest: 195 passed / 8 skipped.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 17:09:47 +03:00

128 lines
4.6 KiB
Python

"""Tests for FlightProcessor full processing pipeline (Stage 10)."""
from unittest.mock import AsyncMock, MagicMock
import numpy as np
import pytest
from gps_denied.core.chunk_manager import RouteChunkManager
from gps_denied.core.gpr import GlobalPlaceRecognition
from gps_denied.core.graph import FactorGraphOptimizer
from gps_denied.core.metric import MetricRefinement
from gps_denied.core.models import ModelManager
from gps_denied.core.processor import FlightProcessor, TrackingState
from gps_denied.core.recovery import FailureRecoveryCoordinator
from gps_denied.core.vo import SequentialVisualOdometry
from gps_denied.schemas.graph import FactorGraphConfig
@pytest.fixture
def processor():
repo = MagicMock()
streamer = MagicMock()
streamer.push_event = AsyncMock()
proc = FlightProcessor(repo, streamer)
mm = ModelManager()
vo = SequentialVisualOdometry(mm)
gpr = GlobalPlaceRecognition(mm)
gpr.load_index("test", "dummy")
metric = MetricRefinement(mm)
graph = FactorGraphOptimizer(FactorGraphConfig())
chunk_mgr = RouteChunkManager(graph)
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
proc.attach_components(
vo=vo, gpr=gpr, metric=metric,
graph=graph, recovery=recovery, chunk_mgr=chunk_mgr
)
return proc
@pytest.mark.asyncio
async def test_process_first_frame(processor):
"""First frame has no previous image → VO skips, should still publish."""
img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
result = await processor.process_frame("fl1", 0, img)
assert result.frame_id == 0
assert result.tracking_state == TrackingState.NORMAL
# First frame — no previous so VO is skipped
assert result.vo_success is False
# SSE event published
processor.streamer.push_event.assert_called_once()
@pytest.mark.asyncio
async def test_process_consecutive_frames_normal(processor):
"""Two consecutive frames → VO attempts (mock may or may not succeed)."""
flight = "fl2"
img1 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
img2 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
r1 = await processor.process_frame(flight, 0, img1)
r2 = await processor.process_frame(flight, 1, img2)
assert r1.frame_id == 0
assert r2.frame_id == 1
# At minimum, both published SSE
assert processor.streamer.push_event.call_count == 2
@pytest.mark.asyncio
async def test_tracking_loss_triggers_recovery(processor, monkeypatch):
"""When VO returns tracking_good=False, state transitions to RECOVERY."""
flight = "fl3"
img0 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
img1 = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
# Process frame 0 — will be NORMAL
await processor.process_frame(flight, 0, img0)
# Force VO to fail by monkeypatching
def bad_vo(*args, **kwargs):
from gps_denied.schemas.vo import RelativePose
return RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0.0, inlier_count=0, total_matches=0, tracking_good=False
)
monkeypatch.setattr(processor._vo, "compute_relative_pose", bad_vo)
# Also force recovery NOT to succeed so state stays RECOVERY
monkeypatch.setattr(processor._recovery, "process_chunk_recovery", lambda *a, **k: False)
r1 = await processor.process_frame(flight, 1, img1)
# State should be RECOVERY (LOST transitioning immediately)
assert r1.tracking_state == TrackingState.RECOVERY
@pytest.mark.asyncio
async def test_state_machine_full_cycle(processor, monkeypatch):
"""Full cycle: NORMAL → LOST → RECOVERY → NORMAL."""
flight = "fl_cycle"
imgs = [np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8) for _ in range(3)]
# Frame 0: NORMAL
r0 = await processor.process_frame(flight, 0, imgs[0])
assert r0.tracking_state == TrackingState.NORMAL
# Frame 1: Force VO fail → RECOVERY
def bad_vo(*a, **k):
from gps_denied.schemas.vo import RelativePose
return RelativePose(
translation=np.zeros(3), rotation=np.eye(3), covariance=np.eye(6),
confidence=0, inlier_count=0, total_matches=0, tracking_good=False
)
monkeypatch.setattr(processor._vo, "compute_relative_pose", bad_vo)
# Also force recovery to succeed
monkeypatch.setattr(processor._recovery, "process_chunk_recovery", lambda *a, **k: True)
r1 = await processor.process_frame(flight, 1, imgs[1])
# Should have recovered back to NORMAL
assert r1.tracking_state == TrackingState.NORMAL
assert r1.alignment_success is True