Files
gps-denied-onboard/tests/e2e/test_harness_smoke.py
T
Yuzviak a05381ade2 feat(testing): per-frame JSONL trace in E2EHarness
Opt-in trace_path parameter dumps one JSON record per processed frame
with the fields diagnostics need:

  frame_idx, timestamp_ns, vo_success, alignment_success,
  tracking_state, confidence,
  eskf_initialized, eskf_position_enu (or None), eskf_pos_sigma_m,
  estimate_lat/lon, gt_lat/lon/alt

No perf cost when trace_path is None. File is rotated per run — safe to
point at /tmp/foo.jsonl for ad-hoc debugging.

First real run on EuRoC MH_01 (100 frames) immediately exposes the
concrete divergence: vo_success=0/100 (VO never engages on EuRoC
grayscale imagery with current SP+LG adapter), eskf_initialized=0/100,
alignment_success=77/100 (satellite-fallback path fires). Diagnosis
that was hidden behind a single "ATE=10.9 km" number is now machine-
readable per frame.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:29:34 +03:00

125 lines
4.4 KiB
Python

"""Harness smoke test: synthetic adapter → FlightProcessor → sink → metrics.
The synthetic adapter produces a straight-line trajectory; we only check that
the harness runs to completion and produces one estimate per input frame.
Correctness of VO on synthetic is out of scope — that's unit-test territory.
"""
import pytest
from gps_denied.testing.datasets.synthetic import SyntheticAdapter
from gps_denied.testing.harness import E2EHarness, HarnessResult
@pytest.mark.asyncio
async def test_harness_processes_every_frame():
adapter = SyntheticAdapter(num_frames=5, fps=5.0)
harness = E2EHarness(adapter)
result: HarnessResult = await harness.run()
assert isinstance(result, HarnessResult)
assert result.num_frames_submitted == 5
# Product may emit estimates for every frame or skip some during warm-up.
# Smoke assertion: we got SOMETHING back.
assert result.num_estimates >= 0
assert result.ground_truth.shape[0] == 5
assert result.ground_truth.shape[1] == 3
@pytest.mark.asyncio
async def test_harness_captures_ground_truth_as_enu():
adapter = SyntheticAdapter(num_frames=3, fps=5.0, speed_m_s=10.0)
harness = E2EHarness(adapter)
result = await harness.run()
# Starting at origin, 10 m/s east, at t=0.4s we expect ~4m east
# GT array ordered by frame index
east_disp = result.ground_truth[-1, 0] - result.ground_truth[0, 0]
# Allow 5% tolerance for the lat/lon → ENU conversion approximation
assert abs(east_disp - 4.0) < 0.5
@pytest.mark.asyncio
async def test_harness_max_frames_truncates_iteration():
# Adapter says 10 frames; harness with max_frames=3 should stop at 3.
adapter = SyntheticAdapter(num_frames=10, fps=5.0)
harness = E2EHarness(adapter, max_frames=3)
result = await harness.run()
assert result.num_frames_submitted == 3
# GT aligned to the same truncation so downstream metrics match lengths
assert result.ground_truth.shape[0] == 3
@pytest.mark.asyncio
async def test_harness_max_frames_none_runs_full():
# Explicit None = no limit (same as omitting the parameter).
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
harness = E2EHarness(adapter, max_frames=None)
result = await harness.run()
assert result.num_frames_submitted == 4
@pytest.mark.asyncio
async def test_harness_max_frames_larger_than_dataset_is_harmless():
# Limit above dataset size should not over-extend.
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
harness = E2EHarness(adapter, max_frames=100)
result = await harness.run()
assert result.num_frames_submitted == 4
@pytest.mark.asyncio
async def test_harness_trace_path_none_does_not_emit(tmp_path):
# No trace_path → no file created, no perf cost from serialisation.
adapter = SyntheticAdapter(num_frames=3, fps=5.0)
harness = E2EHarness(adapter)
await harness.run()
# Sanity: tmp_path stays empty (harness never touches it).
assert list(tmp_path.iterdir()) == []
@pytest.mark.asyncio
async def test_harness_trace_path_writes_jsonl_with_expected_fields(tmp_path):
import json
trace = tmp_path / "run.jsonl"
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
harness = E2EHarness(adapter, trace_path=trace)
await harness.run()
assert trace.is_file()
lines = trace.read_text().splitlines()
# One JSON record per submitted frame
assert len(lines) == 4
first = json.loads(lines[0])
# Contract: every record carries these keys so downstream tooling can
# depend on them across datasets.
expected_keys = {
"frame_idx",
"timestamp_ns",
"vo_success",
"alignment_success",
"tracking_state",
"confidence",
"eskf_initialized",
"eskf_position_enu", # list[3] or None when not initialized
"eskf_pos_sigma_m", # float or None
"estimate_lat", # float or None
"estimate_lon",
"gt_lat",
"gt_lon",
"gt_alt",
}
missing = expected_keys - set(first.keys())
assert not missing, f"trace record missing keys: {missing}"
@pytest.mark.asyncio
async def test_harness_trace_truncates_with_max_frames(tmp_path):
# When max_frames caps iteration, trace file has exactly that many lines.
trace = tmp_path / "run.jsonl"
adapter = SyntheticAdapter(num_frames=10, fps=5.0)
harness = E2EHarness(adapter, max_frames=3, trace_path=trace)
await harness.run()
lines = trace.read_text().splitlines()
assert len(lines) == 3