mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 17:11:14 +00:00
5744ff65ac
- Add pytestmark = [pytest.mark.<category>] to all 23 root test files and 14 e2e test files - Marker distribution: 22 unit, 7 integration, 1 blackbox, 1 sitl, 5 e2e + 2 e2e integration - Add import pytest to test_models.py, test_download.py, test_synthetic_adapter.py (were missing) - Convert test_sitl_integration.py's bare pytestmark to list form preserving skipif guard - Union of all 5 markers = 298/298 = 100% coverage; 216 tests pass with --strict-markers
127 lines
4.5 KiB
Python
127 lines
4.5 KiB
Python
"""Harness smoke test: synthetic adapter → FlightProcessor → sink → metrics.
|
|
|
|
The synthetic adapter produces a straight-line trajectory; we only check that
|
|
the harness runs to completion and produces one estimate per input frame.
|
|
Correctness of VO on synthetic is out of scope — that's unit-test territory.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
pytestmark = [pytest.mark.integration]
|
|
|
|
from gps_denied.testing.datasets.synthetic import SyntheticAdapter
|
|
from gps_denied.testing.harness import E2EHarness, HarnessResult
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_processes_every_frame():
|
|
adapter = SyntheticAdapter(num_frames=5, fps=5.0)
|
|
harness = E2EHarness(adapter)
|
|
result: HarnessResult = await harness.run()
|
|
assert isinstance(result, HarnessResult)
|
|
assert result.num_frames_submitted == 5
|
|
# Product may emit estimates for every frame or skip some during warm-up.
|
|
# Smoke assertion: we got SOMETHING back.
|
|
assert result.num_estimates >= 0
|
|
assert result.ground_truth.shape[0] == 5
|
|
assert result.ground_truth.shape[1] == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_captures_ground_truth_as_enu():
|
|
adapter = SyntheticAdapter(num_frames=3, fps=5.0, speed_m_s=10.0)
|
|
harness = E2EHarness(adapter)
|
|
result = await harness.run()
|
|
# Starting at origin, 10 m/s east, at t=0.4s we expect ~4m east
|
|
# GT array ordered by frame index
|
|
east_disp = result.ground_truth[-1, 0] - result.ground_truth[0, 0]
|
|
# Allow 5% tolerance for the lat/lon → ENU conversion approximation
|
|
assert abs(east_disp - 4.0) < 0.5
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_max_frames_truncates_iteration():
|
|
# Adapter says 10 frames; harness with max_frames=3 should stop at 3.
|
|
adapter = SyntheticAdapter(num_frames=10, fps=5.0)
|
|
harness = E2EHarness(adapter, max_frames=3)
|
|
result = await harness.run()
|
|
assert result.num_frames_submitted == 3
|
|
# GT aligned to the same truncation so downstream metrics match lengths
|
|
assert result.ground_truth.shape[0] == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_max_frames_none_runs_full():
|
|
# Explicit None = no limit (same as omitting the parameter).
|
|
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
|
|
harness = E2EHarness(adapter, max_frames=None)
|
|
result = await harness.run()
|
|
assert result.num_frames_submitted == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_max_frames_larger_than_dataset_is_harmless():
|
|
# Limit above dataset size should not over-extend.
|
|
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
|
|
harness = E2EHarness(adapter, max_frames=100)
|
|
result = await harness.run()
|
|
assert result.num_frames_submitted == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_trace_path_none_does_not_emit(tmp_path):
|
|
# No trace_path → no file created, no perf cost from serialisation.
|
|
adapter = SyntheticAdapter(num_frames=3, fps=5.0)
|
|
harness = E2EHarness(adapter)
|
|
await harness.run()
|
|
# Sanity: tmp_path stays empty (harness never touches it).
|
|
assert list(tmp_path.iterdir()) == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_trace_path_writes_jsonl_with_expected_fields(tmp_path):
|
|
import json
|
|
|
|
trace = tmp_path / "run.jsonl"
|
|
adapter = SyntheticAdapter(num_frames=4, fps=5.0)
|
|
harness = E2EHarness(adapter, trace_path=trace)
|
|
await harness.run()
|
|
|
|
assert trace.is_file()
|
|
lines = trace.read_text().splitlines()
|
|
# One JSON record per submitted frame
|
|
assert len(lines) == 4
|
|
|
|
first = json.loads(lines[0])
|
|
# Contract: every record carries these keys so downstream tooling can
|
|
# depend on them across datasets.
|
|
expected_keys = {
|
|
"frame_idx",
|
|
"timestamp_ns",
|
|
"vo_success",
|
|
"alignment_success",
|
|
"tracking_state",
|
|
"confidence",
|
|
"eskf_initialized",
|
|
"eskf_position_enu", # list[3] or None when not initialized
|
|
"eskf_pos_sigma_m", # float or None
|
|
"estimate_lat", # float or None
|
|
"estimate_lon",
|
|
"gt_lat",
|
|
"gt_lon",
|
|
"gt_alt",
|
|
}
|
|
missing = expected_keys - set(first.keys())
|
|
assert not missing, f"trace record missing keys: {missing}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harness_trace_truncates_with_max_frames(tmp_path):
|
|
# When max_frames caps iteration, trace file has exactly that many lines.
|
|
trace = tmp_path / "run.jsonl"
|
|
adapter = SyntheticAdapter(num_frames=10, fps=5.0)
|
|
harness = E2EHarness(adapter, max_frames=3, trace_path=trace)
|
|
await harness.run()
|
|
lines = trace.read_text().splitlines()
|
|
assert len(lines) == 3
|