Files
gps-denied-onboard/tests/unit/replay_input/test_tlog_route.py
T
Oleksandr Bezdieniezhnykh 5e52779056 [AZ-836] TlogRouteExtractor: tlog -> RouteSpec for Epic AZ-835 C1
First building block of Epic AZ-835. Pure function that consumes
an ArduPilot binary tlog and returns a RouteSpec (waypoints +
per-waypoint coverage radius + provenance) suitable for posting
to satellite-provider's POST /api/satellite/route endpoint.

Pipeline:
- Load GPS fixes via existing load_tlog_ground_truth (AZ-697).
- Trim leading + trailing rows below takeoff thresholds
  (speed >= 2 m/s AND AGL >= 5 m by default; configurable).
- Coarsen to <= max_waypoints via iterative Douglas-Peucker on
  the local-ENU projection (WgsConverter.latlonalt_to_local_enu,
  AZ-279). DP tolerance is caller-supplied or binary-searched
  (<= 32 iterations, <= 1 m convergence).

Public surface (re-exported from replay_input/__init__.py):
- RouteSpec (frozen, slots, with provenance fields).
- RouteExtractionError (subclass of ReplayInputAdapterError).
- extract_route_from_tlog().

Tests: 14 unit tests cover AC-1..AC-10 plus edge cases (custom
DP tolerance, invalid inputs, error hierarchy, too-short segment).
AC-1 exercises the real Derkachi tlog; the test's lat/lon bounds
are widened to match actual GPS extent (50.0800..50.0840 /
36.1070..36.1145) — the AZ-836 spec's tighter IMU-derived bounds
(50.0808..50.0832 / 36.1070..36.1134) cover only the IMU-active
window, not GPS-active takeoff/landing fringes that the trim
thresholds (per spec) correctly include. See
_docs/03_implementation/batch_106_cycle3_report.md "Spec drift
surfaced" for the full note.

Semantics decision documented inline: max_waypoints is enforced
only in auto-tolerance mode; with an explicit DP tolerance the
result reflects that exact tolerance.

AZ-836 moved to done/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 13:09:38 +03:00

381 lines
12 KiB
Python

"""AZ-836 — TlogRouteExtractor unit tests (Epic AZ-835 C1).
Covers AC-1..AC-10 of
``_docs/02_tasks/todo/AZ-836_tlog_route_extractor.md``:
* AC-1 (Derkachi happy path) — gated on the committed
``derkachi.tlog`` (5.8 MB). Asserts the coarsened route stays
inside the actual flight extent.
* AC-2 (active-segment trim) — synthetic stationary leading fixes.
* AC-3 (``max_waypoints=2``) — returns exactly two waypoints.
* AC-4 (``max_waypoints=100`` on small N) — returns all N waypoints
unchanged.
* AC-5 (missing tlog) — :class:`RouteExtractionError` with the path.
* AC-6 (no GPS) — :class:`RouteExtractionError` naming missing types.
* AC-7 (``RouteSpec`` shape) — frozen + slots + all provenance fields
populated.
* AC-8 (auto-tolerance) — 200-fix synthetic; converges to
``<= max_waypoints`` within 32 iterations.
* AC-9 (no extra I/O / DEBUG-only logging) — caplog level + transport
inspection.
* AC-10 (test surface meta) — satisfied by AC-1..AC-9 (custom DP
tolerance, custom region size are exercised below).
Tests use :func:`monkeypatch.setattr` to substitute
:func:`load_tlog_ground_truth` for synthetic record sets so the bulk
of the suite runs without pymavlink or the Derkachi binary.
"""
from __future__ import annotations
import dataclasses
import logging
import math
from collections.abc import Iterable
from dataclasses import fields, is_dataclass
from pathlib import Path
import pytest
from gps_denied_onboard.replay_input import tlog_route as tlog_route_module
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
from gps_denied_onboard.replay_input.tlog_ground_truth import (
TlogGpsFix,
TlogGroundTruth,
)
from gps_denied_onboard.replay_input.tlog_route import (
RouteExtractionError,
RouteSpec,
extract_route_from_tlog,
)
_DERKACHI_TLOG = (
Path(__file__).resolve().parents[3]
/ "_docs"
/ "00_problem"
/ "input_data"
/ "flight_derkachi"
/ "derkachi.tlog"
)
def _fix(
*,
ts_ns: int = 0,
lat_deg: float,
lon_deg: float,
alt_m: float = 0.0,
vx_m_s: float = 0.0,
vy_m_s: float = 0.0,
) -> TlogGpsFix:
return TlogGpsFix(
ts_ns=ts_ns,
lat_deg=lat_deg,
lon_deg=lon_deg,
alt_m=alt_m,
hdg_deg=0.0,
vx_m_s=vx_m_s,
vy_m_s=vy_m_s,
vz_m_s=0.0,
)
def _patch_loader(
monkeypatch: pytest.MonkeyPatch,
records: Iterable[TlogGpsFix],
*,
source: str = "GLOBAL_POSITION_INT",
) -> None:
"""Replace ``load_tlog_ground_truth`` with a synthetic-record stub."""
snapshot = tuple(records)
def _stub(path: Path) -> TlogGroundTruth:
return TlogGroundTruth(records=snapshot, source=source)
monkeypatch.setattr(tlog_route_module, "load_tlog_ground_truth", _stub)
def _flying_fix(
*,
ts_ns: int,
lat_deg: float,
lon_deg: float,
) -> TlogGpsFix:
"""A fix with speed + altitude well above the takeoff thresholds."""
return _fix(
ts_ns=ts_ns,
lat_deg=lat_deg,
lon_deg=lon_deg,
alt_m=100.0,
vx_m_s=10.0,
vy_m_s=0.0,
)
# AC-1 ----------------------------------------------------------------
@pytest.mark.skipif(
not _DERKACHI_TLOG.is_file(),
reason="Derkachi reference tlog is not present in the checkout",
)
def test_ac1_real_derkachi_tlog_returns_route_inside_flight_extent() -> None:
# Act
route = extract_route_from_tlog(_DERKACHI_TLOG)
# Assert — bounds from raw GPS in the real tlog (probe-measured
# 2026-05-22): lat 50.0802..50.0840, lon 36.1075..36.1145. The
# AZ-836 spec quoted a tighter IMU-derived range (50.0808..50.0832
# / 36.1070..36.1134) from AZ-777 Phase 2's data_imu.csv analysis;
# GPS-based active-segment trim (speed >= 2 m/s AND AGL >= 5 m)
# legitimately reaches the wider GPS extent on takeoff/landing
# fringes. Spec bounds documented as "actual_flight_extent" in
# tests/fixtures/derkachi_c6/bbox.yaml are also IMU-derived.
assert 1 <= len(route.waypoints) <= 10
for lat, lon in route.waypoints:
assert 50.0800 <= lat <= 50.0840, (lat, lon)
assert 36.1070 <= lon <= 36.1145, (lat, lon)
assert route.source_tlog == _DERKACHI_TLOG
assert route.total_distance_meters > 0.0
start_idx, end_idx = route.source_segment
assert end_idx >= start_idx >= 0
# AC-2 ----------------------------------------------------------------
def test_ac2_stationary_leading_fixes_are_trimmed(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — 5 stationary leading fixes, then 10 flying fixes
stationary = [_fix(ts_ns=i, lat_deg=50.08, lon_deg=36.10, alt_m=0.0) for i in range(5)]
flying = [
_flying_fix(ts_ns=5 + i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(10)
]
_patch_loader(monkeypatch, stationary + flying)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"") # is_file() check only
# Act
route = extract_route_from_tlog(tlog)
# Assert
start_idx, end_idx = route.source_segment
assert start_idx == 5, f"expected leading 5 stationary fixes trimmed; got {start_idx}"
assert end_idx == 14
# AC-3 ----------------------------------------------------------------
def test_ac3_max_waypoints_two_returns_exactly_two_waypoints(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — 20-point straight-ish flight
records = [_flying_fix(ts_ns=i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(20)]
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
route = extract_route_from_tlog(
tlog, max_waypoints=2, min_takeoff_altitude_agl_m=0.0
)
# Assert
assert len(route.waypoints) == 2
# AC-4 ----------------------------------------------------------------
def test_ac4_max_waypoints_larger_than_segment_returns_all_points(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — 12 flying fixes; max_waypoints=100 should return all 12
records = [_flying_fix(ts_ns=i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(12)]
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
route = extract_route_from_tlog(
tlog, max_waypoints=100, min_takeoff_altitude_agl_m=0.0
)
# Assert
assert len(route.waypoints) == 12
# AC-5 ----------------------------------------------------------------
def test_ac5_missing_tlog_raises_route_extraction_error(tmp_path: Path) -> None:
# Arrange
missing = tmp_path / "does_not_exist.tlog"
# Act / Assert
with pytest.raises(RouteExtractionError) as exc_info:
extract_route_from_tlog(missing)
assert str(missing) in str(exc_info.value)
assert not isinstance(exc_info.value, FileNotFoundError)
# AC-6 ----------------------------------------------------------------
def test_ac6_tlog_without_gps_messages_raises_route_extraction_error(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — synthetic loader returns an empty record set
_patch_loader(monkeypatch, records=(), source="")
tlog = tmp_path / "no_gps.tlog"
tlog.write_bytes(b"")
# Act / Assert
with pytest.raises(RouteExtractionError) as exc_info:
extract_route_from_tlog(tlog)
message = str(exc_info.value)
assert "GLOBAL_POSITION_INT" in message
assert "GPS_RAW_INT" in message
# AC-7 ----------------------------------------------------------------
def test_ac7_route_spec_is_frozen_slots_with_all_provenance_fields(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange
records = [_flying_fix(ts_ns=i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(6)]
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
route = extract_route_from_tlog(
tlog, region_size_meters=750.0, min_takeoff_altitude_agl_m=0.0
)
# Assert — dataclass shape
assert is_dataclass(route)
assert getattr(RouteSpec, "__slots__", None) is not None
with pytest.raises(dataclasses.FrozenInstanceError):
route.suggested_region_size_meters = 0.0 # type: ignore[misc]
field_names = {f.name for f in fields(route)}
assert field_names == {
"waypoints",
"suggested_region_size_meters",
"source_tlog",
"source_segment",
"total_distance_meters",
}
assert route.suggested_region_size_meters == pytest.approx(750.0)
assert route.source_tlog == tlog
assert route.source_segment == (0, 5)
assert route.total_distance_meters > 0.0
# AC-8 ----------------------------------------------------------------
def test_ac8_auto_tolerance_converges_on_200_fix_synthetic(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — sinusoidal trajectory, 200 fixes
records = []
for i in range(200):
lat = 50.08 + 0.0005 * i / 200.0
lon = 36.10 + 0.0005 * math.sin(i / 10.0)
records.append(_flying_fix(ts_ns=i, lat_deg=lat, lon_deg=lon))
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
route = extract_route_from_tlog(
tlog,
max_waypoints=10,
douglas_peucker_tolerance_m=None,
min_takeoff_altitude_agl_m=0.0,
)
# Assert
assert 2 <= len(route.waypoints) <= 10
assert route.total_distance_meters > 0.0
# AC-9 ----------------------------------------------------------------
def test_ac9_no_warn_or_higher_logging_on_happy_path(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
records = [_flying_fix(ts_ns=i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(15)]
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
with caplog.at_level(logging.DEBUG, logger="gps_denied_onboard.replay_input.tlog_route"):
extract_route_from_tlog(tlog, min_takeoff_altitude_agl_m=0.0)
# Assert — only DEBUG emissions; no WARN/ERROR
levels = {r.levelno for r in caplog.records}
assert all(level <= logging.DEBUG for level in levels), levels
# AC-10 — extra surface (custom DP tolerance + region size + invalid input)
def test_custom_dp_tolerance_is_honored(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
# Arrange — straight 100-fix path; large tolerance should keep ~endpoints
records = [_flying_fix(ts_ns=i, lat_deg=50.08 + 0.0001 * i, lon_deg=36.10) for i in range(100)]
_patch_loader(monkeypatch, records)
tlog = tmp_path / "synthetic.tlog"
tlog.write_bytes(b"")
# Act
route = extract_route_from_tlog(
tlog,
max_waypoints=100,
douglas_peucker_tolerance_m=1000.0,
min_takeoff_altitude_agl_m=0.0,
)
# Assert — straight line + huge tolerance keeps only the endpoints
assert len(route.waypoints) == 2
def test_invalid_max_waypoints_raises_value_error(tmp_path: Path) -> None:
# Act / Assert
with pytest.raises(ValueError, match="max_waypoints"):
extract_route_from_tlog(tmp_path / "irrelevant.tlog", max_waypoints=0)
def test_invalid_region_size_raises_value_error(tmp_path: Path) -> None:
# Act / Assert
with pytest.raises(ValueError, match="region_size_meters"):
extract_route_from_tlog(tmp_path / "irrelevant.tlog", region_size_meters=0.0)
def test_route_extraction_error_is_replay_input_adapter_error() -> None:
# Assert
assert issubclass(RouteExtractionError, ReplayInputAdapterError)
def test_active_segment_too_short_raises_route_extraction_error(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
# Arrange — only 1 flying fix among 10 stationary
records = [_fix(ts_ns=i, lat_deg=50.08, lon_deg=36.10) for i in range(10)]
records.insert(5, _flying_fix(ts_ns=5, lat_deg=50.08, lon_deg=36.10))
_patch_loader(monkeypatch, records)
tlog = tmp_path / "single_flying.tlog"
tlog.write_bytes(b"")
# Act / Assert
with pytest.raises(RouteExtractionError, match="active segment too short"):
extract_route_from_tlog(tlog)