diff --git a/src/gps_denied/testing/harness.py b/src/gps_denied/testing/harness.py index d5b22d5..ef2ed8c 100644 --- a/src/gps_denied/testing/harness.py +++ b/src/gps_denied/testing/harness.py @@ -13,7 +13,9 @@ What the harness does NOT do: from __future__ import annotations +import json from dataclasses import dataclass, field +from pathlib import Path from typing import Optional from unittest.mock import AsyncMock, MagicMock @@ -29,7 +31,11 @@ from gps_denied.core.processor import FlightProcessor from gps_denied.core.recovery import FailureRecoveryCoordinator from gps_denied.core.vo import SequentialVisualOdometry from gps_denied.schemas.graph import FactorGraphConfig -from gps_denied.testing.datasets.base import DatasetAdapter, PlatformClass +from gps_denied.testing.datasets.base import ( + DatasetAdapter, + DatasetPose, + PlatformClass, +) EARTH_R = 6_378_137.0 @@ -59,10 +65,12 @@ class E2EHarness: adapter: DatasetAdapter, flight_id: str = "e2e-flight", max_frames: Optional[int] = None, + trace_path: Optional[Path] = None, ) -> None: self._adapter = adapter self._flight_id = flight_id self._max_frames = max_frames + self._trace_path: Optional[Path] = Path(trace_path) if trace_path else None self._estimates: list[tuple[int, Optional[tuple[float, float, float]]]] = [] async def run(self) -> HarnessResult: @@ -74,15 +82,35 @@ class E2EHarness: frames = frames[: self._max_frames] gt_poses = gt_poses[: self._max_frames] - for frame in frames: - image = self._load_or_synth_image(frame.image_path) - result = await processor.process_frame( - self._flight_id, frame.frame_idx, image - ) - est = None - if result.gps is not None: - est = (result.gps.lat, result.gps.lon, 0.0) # alt not returned here - self._estimates.append((frame.frame_idx, est)) + # Align GT by index so trace records can pair each frame with the + # corresponding pose without an expensive timestamp search. + gt_by_idx: dict[int, DatasetPose] = {} + for i, pose in enumerate(gt_poses): + gt_by_idx[i] = pose + + trace_fh = None + if self._trace_path is not None: + self._trace_path.parent.mkdir(parents=True, exist_ok=True) + trace_fh = self._trace_path.open("w") + + try: + for frame in frames: + image = self._load_or_synth_image(frame.image_path) + result = await processor.process_frame( + self._flight_id, frame.frame_idx, image + ) + est = None + if result.gps is not None: + est = (result.gps.lat, result.gps.lon, 0.0) + self._estimates.append((frame.frame_idx, est)) + + if trace_fh is not None: + gt = gt_by_idx.get(frame.frame_idx) + record = self._trace_record(processor, frame, result, gt) + trace_fh.write(json.dumps(record) + "\n") + finally: + if trace_fh is not None: + trace_fh.close() gt_enu = self._poses_to_enu(gt_poses) est_enu = self._estimates_to_enu(gt_poses[0] if gt_poses else None) @@ -96,6 +124,51 @@ class E2EHarness: platform_class=self._adapter.capabilities.platform_class, ) + def _trace_record( + self, + processor: FlightProcessor, + frame, + result, + gt: Optional[DatasetPose], + ) -> dict: + """Build one JSONL record describing the product's state after a frame. + + Captures VO success, ESKF state (position + trace(cov) as a scalar + sigma), and the estimate/GT pair. Enough to diagnose *where* the + pipeline diverges without dumping raw images or covariance matrices. + """ + eskf = processor._eskf.get(self._flight_id) # noqa: SLF001 — test harness + eskf_initialized = bool(eskf and eskf.initialized) + eskf_position = None + eskf_sigma = None + if eskf_initialized: + pos = eskf.position + eskf_position = [float(pos[0]), float(pos[1]), float(pos[2])] + cov = eskf.covariance + if cov is not None: + # Scalar horizontal+vertical position uncertainty summary. + eskf_sigma = float(np.sqrt(np.trace(cov[0:3, 0:3]) / 3.0)) + + est_lat = result.gps.lat if result.gps is not None else None + est_lon = result.gps.lon if result.gps is not None else None + + return { + "frame_idx": frame.frame_idx, + "timestamp_ns": frame.timestamp_ns, + "vo_success": bool(result.vo_success), + "alignment_success": bool(result.alignment_success), + "tracking_state": result.tracking_state.value, + "confidence": float(result.confidence), + "eskf_initialized": eskf_initialized, + "eskf_position_enu": eskf_position, + "eskf_pos_sigma_m": eskf_sigma, + "estimate_lat": est_lat, + "estimate_lon": est_lon, + "gt_lat": gt.lat if gt else None, + "gt_lon": gt.lon if gt else None, + "gt_alt": gt.alt if gt else None, + } + def _build_processor(self) -> FlightProcessor: repo = MagicMock() streamer = MagicMock() diff --git a/tests/e2e/test_harness_smoke.py b/tests/e2e/test_harness_smoke.py index 60cd8dd..1eb5396 100644 --- a/tests/e2e/test_harness_smoke.py +++ b/tests/e2e/test_harness_smoke.py @@ -64,3 +64,61 @@ async def test_harness_max_frames_larger_than_dataset_is_harmless(): harness = E2EHarness(adapter, max_frames=100) result = await harness.run() assert result.num_frames_submitted == 4 + + +@pytest.mark.asyncio +async def test_harness_trace_path_none_does_not_emit(tmp_path): + # No trace_path → no file created, no perf cost from serialisation. + adapter = SyntheticAdapter(num_frames=3, fps=5.0) + harness = E2EHarness(adapter) + await harness.run() + # Sanity: tmp_path stays empty (harness never touches it). + assert list(tmp_path.iterdir()) == [] + + +@pytest.mark.asyncio +async def test_harness_trace_path_writes_jsonl_with_expected_fields(tmp_path): + import json + + trace = tmp_path / "run.jsonl" + adapter = SyntheticAdapter(num_frames=4, fps=5.0) + harness = E2EHarness(adapter, trace_path=trace) + await harness.run() + + assert trace.is_file() + lines = trace.read_text().splitlines() + # One JSON record per submitted frame + assert len(lines) == 4 + + first = json.loads(lines[0]) + # Contract: every record carries these keys so downstream tooling can + # depend on them across datasets. + expected_keys = { + "frame_idx", + "timestamp_ns", + "vo_success", + "alignment_success", + "tracking_state", + "confidence", + "eskf_initialized", + "eskf_position_enu", # list[3] or None when not initialized + "eskf_pos_sigma_m", # float or None + "estimate_lat", # float or None + "estimate_lon", + "gt_lat", + "gt_lon", + "gt_alt", + } + missing = expected_keys - set(first.keys()) + assert not missing, f"trace record missing keys: {missing}" + + +@pytest.mark.asyncio +async def test_harness_trace_truncates_with_max_frames(tmp_path): + # When max_frames caps iteration, trace file has exactly that many lines. + trace = tmp_path / "run.jsonl" + adapter = SyntheticAdapter(num_frames=10, fps=5.0) + harness = E2EHarness(adapter, max_frames=3, trace_path=trace) + await harness.run() + lines = trace.read_text().splitlines() + assert len(lines) == 3