mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 09:56:37 +00:00
94 lines
3.5 KiB
Python
94 lines
3.5 KiB
Python
import pytest
|
|
pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True)
|
|
|
|
import numpy as np
|
|
import threading
|
|
import time
|
|
from unittest.mock import Mock, patch
|
|
|
|
from f02_flight_processing_engine import FlightProcessingEngine
|
|
|
|
@pytest.fixture
|
|
def mock_deps():
|
|
return {
|
|
"vo": Mock(),
|
|
"graph": Mock(),
|
|
"cvgl": Mock(),
|
|
"publisher": Mock(),
|
|
"failure_coord": Mock(),
|
|
"result_mgr": Mock(),
|
|
"camera_params": Mock(),
|
|
"image_pipeline": Mock(),
|
|
"chunk_manager": Mock()
|
|
}
|
|
|
|
@pytest.fixture
|
|
def engine(mock_deps):
|
|
eng = FlightProcessingEngine(
|
|
vo_frontend=mock_deps["vo"],
|
|
factor_graph=mock_deps["graph"],
|
|
cvgl_backend=mock_deps["cvgl"],
|
|
async_pose_publisher=mock_deps["publisher"],
|
|
failure_coordinator=mock_deps["failure_coord"],
|
|
result_manager=mock_deps["result_mgr"],
|
|
camera_params=mock_deps["camera_params"],
|
|
image_input_pipeline=mock_deps["image_pipeline"],
|
|
route_chunk_manager=mock_deps["chunk_manager"]
|
|
)
|
|
return eng
|
|
|
|
class TestFrameProcessingLoop:
|
|
"""Tests defined in 02.2.01_feature_frame_processing_loop.md"""
|
|
|
|
def test_start_processing_initiates_background_loop(self, engine):
|
|
engine.start_processing("flight_123")
|
|
|
|
assert engine.is_running is True
|
|
assert engine.flight_status == "PROCESSING"
|
|
assert isinstance(engine.processing_thread, threading.Thread)
|
|
assert engine.processing_thread.is_alive()
|
|
|
|
engine.stop_processing()
|
|
|
|
def test_stop_processing_gracefully_terminates_active_processing(self, engine):
|
|
engine.start_processing("flight_123")
|
|
engine.stop_processing()
|
|
|
|
assert engine.is_running is False
|
|
engine.processing_thread.join(timeout=1.0)
|
|
assert not engine.processing_thread.is_alive()
|
|
|
|
def test_process_frame_returns_valid_FrameResult_with_pose(self, engine, mock_deps):
|
|
# Setup active chunk
|
|
mock_chunk = Mock(chunk_id="chunk_0")
|
|
engine.get_active_chunk = Mock(return_value=mock_chunk)
|
|
|
|
# Setup successful VO tracking
|
|
mock_rel_pose = Mock(tracking_good=True, rotation=np.eye(3), translation=np.array([1, 0, 0]))
|
|
mock_deps["vo"].compute_relative_pose.return_value = mock_rel_pose
|
|
|
|
# Setup optimizer optimization success
|
|
mock_deps["graph"].optimize_chunk.return_value = (True, {10: np.eye(4)})
|
|
|
|
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
|
|
engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8)
|
|
engine.last_frame_id = 9
|
|
|
|
result = engine.process_frame("flight_123", 10, dummy_img)
|
|
|
|
assert result["status"] == "PROCESSED"
|
|
assert result["frame"] == 10
|
|
mock_deps["graph"].add_relative_factor_to_chunk.assert_called_once()
|
|
mock_deps["graph"].optimize_chunk.assert_called_once_with("chunk_0")
|
|
|
|
def test_flight_status_transitions_to_blocked_on_tracking_loss(self, engine, mock_deps):
|
|
engine.get_active_chunk = Mock(return_value=Mock(chunk_id="chunk_0"))
|
|
|
|
mock_deps["vo"].compute_relative_pose.return_value = Mock(tracking_good=False)
|
|
engine.handle_tracking_loss = Mock(return_value="BLOCKED")
|
|
|
|
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
|
|
engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8)
|
|
|
|
result = engine.process_frame("flight_123", 10, dummy_img)
|
|
engine.handle_tracking_loss.assert_called_once_with("flight_123", 10) |