mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 06:56:36 +00:00
85 lines
3.5 KiB
Python
85 lines
3.5 KiB
Python
import pytest
|
|
pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True)
|
|
|
|
from unittest.mock import Mock, MagicMock
|
|
from f02_1_flight_lifecycle_manager import FlightLifecycleManager, UserFixRequest, GPSPoint
|
|
|
|
@pytest.fixture
|
|
def mock_engine_factory():
|
|
return Mock(return_value=Mock(is_running=False))
|
|
|
|
@pytest.fixture
|
|
def mock_deps(mock_engine_factory):
|
|
return {
|
|
"image_input_pipeline": Mock(),
|
|
"sse_event_streamer": Mock(),
|
|
"coordinate_transformer": Mock(),
|
|
"engine_factory": mock_engine_factory
|
|
}
|
|
|
|
@pytest.fixture
|
|
def manager(mock_deps):
|
|
return FlightLifecycleManager(mock_deps)
|
|
|
|
class TestProcessingDelegation:
|
|
|
|
def test_queue_images_delegates_to_pipeline(self, manager, mock_deps):
|
|
mock_deps["image_input_pipeline"].queue_batch.return_value = True
|
|
|
|
success = manager.queue_images("flight_1", "mock_batch")
|
|
|
|
assert success is True
|
|
mock_deps["image_input_pipeline"].queue_batch.assert_called_once_with("flight_1", "mock_batch")
|
|
|
|
def test_queue_images_creates_and_starts_engine(self, manager, mock_deps, mock_engine_factory):
|
|
mock_deps["image_input_pipeline"].queue_batch.return_value = True
|
|
|
|
assert "flight_1" not in manager.active_engines
|
|
manager.queue_images("flight_1", "mock_batch")
|
|
|
|
assert "flight_1" in manager.active_engines
|
|
mock_engine_factory.assert_called_once_with("flight_1")
|
|
|
|
# Verify it started processing
|
|
engine = manager.active_engines["flight_1"]
|
|
engine.start_processing.assert_called_once_with("flight_1")
|
|
|
|
def test_queue_images_reuses_existing_engine(self, manager, mock_deps, mock_engine_factory):
|
|
mock_deps["image_input_pipeline"].queue_batch.return_value = True
|
|
|
|
# First call creates engine
|
|
manager.queue_images("flight_1", "batch_1")
|
|
assert mock_engine_factory.call_count == 1
|
|
|
|
# Manually set engine to running state
|
|
manager.active_engines["flight_1"].is_running = True
|
|
|
|
# Second call reuses engine
|
|
manager.queue_images("flight_1", "batch_2")
|
|
assert mock_engine_factory.call_count == 1 # Still 1!
|
|
|
|
def test_handle_user_fix_returns_error_if_no_engine(self, manager):
|
|
fix_req = UserFixRequest(frame_id=1, uav_pixel=[1, 1], satellite_gps=GPSPoint(lat=48.0, lon=37.0))
|
|
res = manager.handle_user_fix("missing_flight", fix_req)
|
|
assert res["status"] == "error"
|
|
assert "No active processing engine" in res["message"]
|
|
|
|
def test_handle_user_fix_delegates_to_engine(self, manager):
|
|
fix_req = UserFixRequest(frame_id=1, uav_pixel=[1, 1], satellite_gps=GPSPoint(lat=48.0, lon=37.0))
|
|
|
|
mock_engine = Mock()
|
|
mock_engine.current_chunk_id = "chunk_0"
|
|
manager.active_engines["test_flight"] = mock_engine
|
|
|
|
res = manager.handle_user_fix("test_flight", fix_req)
|
|
assert res["status"] == "success"
|
|
mock_engine.optimizer.add_chunk_anchor.assert_called_once()
|
|
|
|
def test_create_client_stream_delegates_to_f15(self, manager, mock_deps):
|
|
manager.create_client_stream("flight_1", "client_a")
|
|
mock_deps["sse_event_streamer"].create_stream.assert_called_once_with("flight_1", "client_a")
|
|
|
|
def test_convert_object_to_gps_delegates_to_f13(self, manager, mock_deps):
|
|
gps = manager.convert_object_to_gps("flight_1", 10, (500.0, 500.0))
|
|
assert gps is not None
|
|
assert gps.lat == 48.0 # From the mock implementation |