Files
gps-denied-onboard/test_02_2_01_feature_frame_processing_loop.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

94 lines
3.5 KiB
Python

import pytest
pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True)
import numpy as np
import threading
import time
from unittest.mock import Mock, patch
from f02_flight_processing_engine import FlightProcessingEngine
@pytest.fixture
def mock_deps():
return {
"vo": Mock(),
"graph": Mock(),
"cvgl": Mock(),
"publisher": Mock(),
"failure_coord": Mock(),
"result_mgr": Mock(),
"camera_params": Mock(),
"image_pipeline": Mock(),
"chunk_manager": Mock()
}
@pytest.fixture
def engine(mock_deps):
eng = FlightProcessingEngine(
vo_frontend=mock_deps["vo"],
factor_graph=mock_deps["graph"],
cvgl_backend=mock_deps["cvgl"],
async_pose_publisher=mock_deps["publisher"],
failure_coordinator=mock_deps["failure_coord"],
result_manager=mock_deps["result_mgr"],
camera_params=mock_deps["camera_params"],
image_input_pipeline=mock_deps["image_pipeline"],
route_chunk_manager=mock_deps["chunk_manager"]
)
return eng
class TestFrameProcessingLoop:
"""Tests defined in 02.2.01_feature_frame_processing_loop.md"""
def test_start_processing_initiates_background_loop(self, engine):
engine.start_processing("flight_123")
assert engine.is_running is True
assert engine.flight_status == "PROCESSING"
assert isinstance(engine.processing_thread, threading.Thread)
assert engine.processing_thread.is_alive()
engine.stop_processing()
def test_stop_processing_gracefully_terminates_active_processing(self, engine):
engine.start_processing("flight_123")
engine.stop_processing()
assert engine.is_running is False
engine.processing_thread.join(timeout=1.0)
assert not engine.processing_thread.is_alive()
def test_process_frame_returns_valid_FrameResult_with_pose(self, engine, mock_deps):
# Setup active chunk
mock_chunk = Mock(chunk_id="chunk_0")
engine.get_active_chunk = Mock(return_value=mock_chunk)
# Setup successful VO tracking
mock_rel_pose = Mock(tracking_good=True, rotation=np.eye(3), translation=np.array([1, 0, 0]))
mock_deps["vo"].compute_relative_pose.return_value = mock_rel_pose
# Setup optimizer optimization success
mock_deps["graph"].optimize_chunk.return_value = (True, {10: np.eye(4)})
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8)
engine.last_frame_id = 9
result = engine.process_frame("flight_123", 10, dummy_img)
assert result["status"] == "PROCESSED"
assert result["frame"] == 10
mock_deps["graph"].add_relative_factor_to_chunk.assert_called_once()
mock_deps["graph"].optimize_chunk.assert_called_once_with("chunk_0")
def test_flight_status_transitions_to_blocked_on_tracking_loss(self, engine, mock_deps):
engine.get_active_chunk = Mock(return_value=Mock(chunk_id="chunk_0"))
mock_deps["vo"].compute_relative_pose.return_value = Mock(tracking_good=False)
engine.handle_tracking_loss = Mock(return_value="BLOCKED")
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine.last_image = np.zeros((100, 100, 3), dtype=np.uint8)
result = engine.process_frame("flight_123", 10, dummy_img)
engine.handle_tracking_loss.assert_called_once_with("flight_123", 10)