Files
gps-denied-onboard/test_f02_flight_processing_engine.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

115 lines
4.6 KiB
Python

import pytest
pytest.skip("Obsolete test file replaced by test_f02_2_flight_processing_engine", allow_module_level=True)
import numpy as np
import asyncio
from unittest.mock import Mock, MagicMock
from queue import Empty
from f02_flight_processing_engine import FlightProcessingEngine
@pytest.fixture
def mock_deps():
return {
"vo": Mock(),
"graph": Mock(),
"cvgl": Mock(),
"publisher": MagicMock(),
"failure_coord": Mock(),
"result_mgr": Mock(),
"camera_params": Mock()
}
@pytest.fixture
def engine(mock_deps):
eng = FlightProcessingEngine(
vo_frontend=mock_deps["vo"],
factor_graph=mock_deps["graph"],
cvgl_backend=mock_deps["cvgl"],
async_pose_publisher=mock_deps["publisher"],
event_loop=Mock(spec=asyncio.AbstractEventLoop),
failure_coordinator=mock_deps["failure_coord"],
result_manager=mock_deps["result_mgr"],
camera_params=mock_deps["camera_params"]
)
eng.active_flight_id = "test_flight_1"
# Mock the optimizer results
eng.optimizer.optimize_chunk.return_value = (True, {1: np.eye(4), 2: np.eye(4)})
return eng
class TestFlightProcessingEngine:
def test_first_frame_initializes_chunk(self, engine, mock_deps):
"""Verifies that the very first frame creates the origin subgraph."""
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine._process_single_frame(1, dummy_img)
assert engine.last_frame_id == 1
mock_deps["graph"].create_chunk_subgraph.assert_called_once_with("chunk_0", 1)
# CVGL anchoring should be attempted on the very first frame
mock_deps["cvgl"].retrieve_and_match.assert_called_once()
def test_successful_tracking_adds_relative_factor(self, engine, mock_deps):
"""Verifies normal VO tracking updates the pose graph."""
# Initialize frame 1
engine.last_frame_id = 1
engine.last_image = np.zeros((10, 10))
engine.optimizer.chunks = {"chunk_0": Mock()}
# Mock successful VO for frame 2
mock_pose = Mock()
mock_pose.tracking_good = True
mock_pose.rotation = np.eye(3)
mock_pose.translation = np.array([1, 0, 0])
mock_deps["vo"].compute_relative_pose.return_value = mock_pose
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine._process_single_frame(2, dummy_img)
# Ensure graph factor was added
mock_deps["graph"].add_relative_factor_to_chunk.assert_called_once()
# Ensure result was published
assert mock_deps["publisher"].call_count > 0
def test_tracking_loss_triggers_new_chunk(self, engine, mock_deps):
"""Verifies AC-4 (sharp turn / tracking loss) initiates a multi-map fragment."""
engine.last_frame_id = 1
engine.last_image = np.zeros((10, 10))
# Mock failed VO (e.g., sharp turn with <5% overlap)
mock_deps["vo"].compute_relative_pose.return_value = None
# Mock failure coordinator creating a new chunk
mock_chunk = Mock()
mock_chunk.chunk_id = "chunk_1"
mock_deps["failure_coord"].create_chunk_on_tracking_loss.return_value = mock_chunk
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine._process_single_frame(2, dummy_img)
assert engine.current_chunk_id == "chunk_1"
assert "chunk_1" in engine.unanchored_chunks
# Ensure graph creates a new disconnected subgraph for chunk_1
mock_deps["graph"].create_chunk_subgraph.assert_called_with("chunk_1", 2)
def test_cvgl_anchoring_updates_trajectory(self, engine, mock_deps):
"""Verifies periodic CVGL anchoring triggers metric pose refinements (AC-8)."""
engine.satellite_index = "mock_index"
# Mock successful CVGL match
mock_deps["cvgl"].retrieve_and_match.return_value = (True, np.eye(3), {'lat': 48.0, 'lon': 37.0})
dummy_img = np.zeros((100, 100, 3), dtype=np.uint8)
engine._attempt_global_anchoring(15, dummy_img)
# Ensure the anchor is added as a hard constraint to the graph
mock_deps["graph"].add_chunk_anchor.assert_called_once()
# Extract the args passed to add_chunk_anchor
args, _ = mock_deps["graph"].add_chunk_anchor.call_args
assert args[0] == "chunk_0"
assert args[1] == 15
assert np.array_equal(args[2], np.array([48.0, 37.0, 400.0]))
# Ensure refined poses are published
assert mock_deps["publisher"].call_count > 0