test(e2e): implement harness skeleton + synthetic smoke test + pytest markers

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Yuzviak
2026-04-16 21:48:41 +03:00
parent 568939cd35
commit 95accb8f7a
3 changed files with 190 additions and 0 deletions
+147
View File
@@ -0,0 +1,147 @@
"""E2E harness — runs a DatasetAdapter through FlightProcessor and collects output.
The harness instantiates `FlightProcessor` with a minimal in-memory repository
and an event sink instead of the real SSE streamer. It pushes adapter frames
through `process_frame()` and records each emitted FrameResult for comparison
against the adapter's ground truth.
What the harness does NOT do:
- Train/tune ESKF noise parameters (use adapter-specific defaults)
- SE(3) trajectory alignment (callers apply as needed)
- Image decoding beyond OpenCV (extended formats handled per-adapter)
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from unittest.mock import AsyncMock, MagicMock
import cv2
import numpy as np
from gps_denied.core.chunk_manager import RouteChunkManager
from gps_denied.core.gpr import GlobalPlaceRecognition
from gps_denied.core.graph import FactorGraphOptimizer
from gps_denied.core.metric import MetricRefinement
from gps_denied.core.models import ModelManager
from gps_denied.core.processor import FlightProcessor
from gps_denied.core.recovery import FailureRecoveryCoordinator
from gps_denied.core.vo import SequentialVisualOdometry
from gps_denied.schemas.graph import FactorGraphConfig
from gps_denied.testing.datasets.base import DatasetAdapter, PlatformClass
EARTH_R = 6_378_137.0
@dataclass
class HarnessResult:
num_frames_submitted: int
num_estimates: int
estimated_positions_enu: np.ndarray = field(default_factory=lambda: np.zeros((0, 3)))
ground_truth: np.ndarray = field(default_factory=lambda: np.zeros((0, 3)))
adapter_name: str = ""
platform_class: PlatformClass = PlatformClass.SYNTHETIC
class E2EHarness:
"""Drives FlightProcessor from a DatasetAdapter; collects results."""
def __init__(self, adapter: DatasetAdapter, flight_id: str = "e2e-flight") -> None:
self._adapter = adapter
self._flight_id = flight_id
self._estimates: list[tuple[int, Optional[tuple[float, float, float]]]] = []
async def run(self) -> HarnessResult:
processor = self._build_processor()
frames = list(self._adapter.iter_frames())
gt_poses = list(self._adapter.iter_ground_truth())
for frame in frames:
image = self._load_or_synth_image(frame.image_path)
result = await processor.process_frame(
self._flight_id, frame.frame_idx, image
)
est = None
if result.gps is not None:
est = (result.gps.lat, result.gps.lon, 0.0) # alt not returned here
self._estimates.append((frame.frame_idx, est))
gt_enu = self._poses_to_enu(gt_poses)
est_enu = self._estimates_to_enu(gt_poses[0] if gt_poses else None)
return HarnessResult(
num_frames_submitted=len(frames),
num_estimates=sum(1 for _, e in self._estimates if e is not None),
estimated_positions_enu=est_enu,
ground_truth=gt_enu,
adapter_name=self._adapter.name,
platform_class=self._adapter.capabilities.platform_class,
)
def _build_processor(self) -> FlightProcessor:
repo = MagicMock()
streamer = MagicMock()
streamer.push_event = AsyncMock()
proc = FlightProcessor(repo, streamer)
mm = ModelManager()
vo = SequentialVisualOdometry(mm)
gpr = GlobalPlaceRecognition(mm)
gpr.load_index(self._flight_id, "dummy")
metric = MetricRefinement(mm)
graph = FactorGraphOptimizer(FactorGraphConfig())
chunk_mgr = RouteChunkManager(graph)
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
proc.attach_components(
vo=vo, gpr=gpr, metric=metric,
graph=graph, recovery=recovery, chunk_mgr=chunk_mgr,
)
return proc
def _load_or_synth_image(self, path: str) -> np.ndarray:
if path:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise FileNotFoundError(f"Could not read {path}")
return img
# Synthetic adapter: generate a checkerboard so OpenCV / VO have corners
sz = 480
board = np.zeros((sz, sz), dtype=np.uint8)
cell = 40
for r in range(0, sz, cell):
for c in range(0, sz, cell):
if ((r // cell) + (c // cell)) % 2 == 0:
board[r:r+cell, c:c+cell] = 255
return board
def _poses_to_enu(self, poses) -> np.ndarray:
if not poses:
return np.zeros((0, 3))
origin = poses[0]
arr = np.zeros((len(poses), 3))
for i, p in enumerate(poses):
dlat_m = (p.lat - origin.lat) * (np.pi / 180.0) * EARTH_R
dlon_m = (
(p.lon - origin.lon) * (np.pi / 180.0)
* EARTH_R * np.cos(np.radians(origin.lat))
)
# ENU: x=east, y=north, z=up
arr[i] = [dlon_m, dlat_m, p.alt - origin.alt]
return arr
def _estimates_to_enu(self, origin) -> np.ndarray:
if origin is None:
return np.zeros((0, 3))
rows = []
for _, est in self._estimates:
if est is None:
continue
lat, lon, alt = est
dlat_m = (lat - origin.lat) * (np.pi / 180.0) * EARTH_R
dlon_m = (
(lon - origin.lon) * (np.pi / 180.0)
* EARTH_R * np.cos(np.radians(origin.lat))
)
rows.append([dlon_m, dlat_m, alt])
return np.array(rows) if rows else np.zeros((0, 3))