"""Harness smoke test: synthetic adapter → FlightProcessor → sink → metrics. The synthetic adapter produces a straight-line trajectory; we only check that the harness runs to completion and produces one estimate per input frame. Correctness of VO on synthetic is out of scope — that's unit-test territory. """ import pytest from gps_denied.testing.datasets.synthetic import SyntheticAdapter from gps_denied.testing.harness import E2EHarness, HarnessResult @pytest.mark.asyncio async def test_harness_processes_every_frame(): adapter = SyntheticAdapter(num_frames=5, fps=5.0) harness = E2EHarness(adapter) result: HarnessResult = await harness.run() assert isinstance(result, HarnessResult) assert result.num_frames_submitted == 5 # Product may emit estimates for every frame or skip some during warm-up. # Smoke assertion: we got SOMETHING back. assert result.num_estimates >= 0 assert result.ground_truth.shape[0] == 5 assert result.ground_truth.shape[1] == 3 @pytest.mark.asyncio async def test_harness_captures_ground_truth_as_enu(): adapter = SyntheticAdapter(num_frames=3, fps=5.0, speed_m_s=10.0) harness = E2EHarness(adapter) result = await harness.run() # Starting at origin, 10 m/s east, at t=0.4s we expect ~4m east # GT array ordered by frame index east_disp = result.ground_truth[-1, 0] - result.ground_truth[0, 0] # Allow 5% tolerance for the lat/lon → ENU conversion approximation assert abs(east_disp - 4.0) < 0.5 @pytest.mark.asyncio async def test_harness_max_frames_truncates_iteration(): # Adapter says 10 frames; harness with max_frames=3 should stop at 3. adapter = SyntheticAdapter(num_frames=10, fps=5.0) harness = E2EHarness(adapter, max_frames=3) result = await harness.run() assert result.num_frames_submitted == 3 # GT aligned to the same truncation so downstream metrics match lengths assert result.ground_truth.shape[0] == 3 @pytest.mark.asyncio async def test_harness_max_frames_none_runs_full(): # Explicit None = no limit (same as omitting the parameter). adapter = SyntheticAdapter(num_frames=4, fps=5.0) harness = E2EHarness(adapter, max_frames=None) result = await harness.run() assert result.num_frames_submitted == 4 @pytest.mark.asyncio async def test_harness_max_frames_larger_than_dataset_is_harmless(): # Limit above dataset size should not over-extend. adapter = SyntheticAdapter(num_frames=4, fps=5.0) harness = E2EHarness(adapter, max_frames=100) result = await harness.run() assert result.num_frames_submitted == 4 @pytest.mark.asyncio async def test_harness_trace_path_none_does_not_emit(tmp_path): # No trace_path → no file created, no perf cost from serialisation. adapter = SyntheticAdapter(num_frames=3, fps=5.0) harness = E2EHarness(adapter) await harness.run() # Sanity: tmp_path stays empty (harness never touches it). assert list(tmp_path.iterdir()) == [] @pytest.mark.asyncio async def test_harness_trace_path_writes_jsonl_with_expected_fields(tmp_path): import json trace = tmp_path / "run.jsonl" adapter = SyntheticAdapter(num_frames=4, fps=5.0) harness = E2EHarness(adapter, trace_path=trace) await harness.run() assert trace.is_file() lines = trace.read_text().splitlines() # One JSON record per submitted frame assert len(lines) == 4 first = json.loads(lines[0]) # Contract: every record carries these keys so downstream tooling can # depend on them across datasets. expected_keys = { "frame_idx", "timestamp_ns", "vo_success", "alignment_success", "tracking_state", "confidence", "eskf_initialized", "eskf_position_enu", # list[3] or None when not initialized "eskf_pos_sigma_m", # float or None "estimate_lat", # float or None "estimate_lon", "gt_lat", "gt_lon", "gt_alt", } missing = expected_keys - set(first.keys()) assert not missing, f"trace record missing keys: {missing}" @pytest.mark.asyncio async def test_harness_trace_truncates_with_max_frames(tmp_path): # When max_frames caps iteration, trace file has exactly that many lines. trace = tmp_path / "run.jsonl" adapter = SyntheticAdapter(num_frames=10, fps=5.0) harness = E2EHarness(adapter, max_frames=3, trace_path=trace) await harness.run() lines = trace.read_text().splitlines() assert len(lines) == 3