"""CI-tier e2e: run the full pipeline on EuRoC MH_01. Skipped if the dataset is not installed under datasets/euroc/MH_01/. """ from pathlib import Path import pytest from gps_denied.testing.datasets.euroc import EuRoCAdapter from gps_denied.testing.harness import E2EHarness from gps_denied.testing.metrics import absolute_trajectory_error # Initial target — calibrated once real numbers land. EUROC_MH01_RMSE_CEILING_M = 5.0 @pytest.mark.e2e @pytest.mark.needs_dataset @pytest.mark.asyncio async def test_euroc_mh01_pipeline_completes(euroc_mh01_root: Path): adapter = EuRoCAdapter(euroc_mh01_root) harness = E2EHarness(adapter) result = await harness.run() assert result.num_frames_submitted > 100, ( "MH_01 has thousands of frames; harness should have submitted them all" ) @pytest.mark.e2e @pytest.mark.needs_dataset @pytest.mark.asyncio async def test_euroc_mh01_rmse_within_ceiling(euroc_mh01_root: Path): adapter = EuRoCAdapter(euroc_mh01_root) harness = E2EHarness(adapter) result = await harness.run() if result.estimated_positions_enu.shape[0] == 0: pytest.xfail( "Pipeline currently emits zero GPS estimates on EuRoC — " "expected: VO works but satellite matching + ESKF anchoring not yet tuned. " "Convert to regular assert once the pipeline stabilises." ) # Align lengths by truncating to shorter (estimates may lag GT at start) n = min(result.estimated_positions_enu.shape[0], result.ground_truth.shape[0]) ate = absolute_trajectory_error( result.estimated_positions_enu[:n], result.ground_truth[:n], ) assert ate["rmse"] < EUROC_MH01_RMSE_CEILING_M, f"ATE RMSE={ate['rmse']:.2f}m"