"""AZ-697 — Direct binary-tlog GPS-truth extractor. Covers AC-1..AC-5 of ``_docs/02_tasks/todo/AZ-697_tlog_ground_truth_extractor.md``: * AC-1 (Happy path on real tlog) — gated on the committed ``derkachi.tlog`` (5.8 MB binary). When present, asserts ≥ 100 records inside the Derkachi geofence. * AC-2 (Empty GPS gracefully) — synthetic source emits no messages. * AC-3 (GPS_RAW_INT fallback / mixed precedence). * AC-4 (mypy --strict) — project-wide strict via ``pyproject.toml [tool.mypy] strict = true``. A scoped smoke test re-runs mypy on the module to catch regressions before CI. * AC-5 (Helper move snapshot) — covered by ``tests/unit/helpers/test_gps_compare.py``. All tests use a synthetic ``source_factory`` for determinism (no disk IO, no real pymavlink). Style: every test follows the Arrange / Act / Assert pattern. """ from __future__ import annotations import logging import math import subprocess import sys from collections.abc import Iterator from pathlib import Path from typing import Any import pytest from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError from gps_denied_onboard.replay_input.tlog_ground_truth import ( TlogGpsFix, TlogGroundTruth, load_tlog_ground_truth, ) # --------------------------------------------------------------------- # Synthetic-source fixture helpers class _FakeMavlinkMessage: """Stand-in for a pymavlink message object. Mirrors the duck-typed surface ``load_tlog_ground_truth`` uses: ``get_type()`` returns the message-type string and ``_timestamp`` is the Unix-second float that pymavlink's mavlogfile populates on every ``recv_match()`` return. """ def __init__(self, msg_type: str, timestamp_s: float, **fields: Any) -> None: self._msg_type = msg_type self._timestamp = timestamp_s for name, value in fields.items(): setattr(self, name, value) def get_type(self) -> str: return self._msg_type class _FakeMavlinkSource: """Stand-in for pymavlink's ``mavutil.mavlink_connection`` return. ``recv_match`` walks an in-memory message queue, filtering by the ``type`` argument. Returns ``None`` once the queue is exhausted — matching mavlogfile's end-of-stream behaviour. """ def __init__(self, messages: list[_FakeMavlinkMessage]) -> None: self._iter: Iterator[_FakeMavlinkMessage] = iter(messages) self.closed = False def recv_match( self, type: list[str] | str | None = None, blocking: bool = False, ) -> _FakeMavlinkMessage | None: wanted = {type} if isinstance(type, str) else set(type or []) for msg in self._iter: if not wanted or msg.get_type() in wanted: return msg return None def close(self) -> None: self.closed = True def _global_position_int( *, ts_s: float, lat_e7: int, lon_e7: int, alt_mm: int, hdg_cdeg: int = 0, vx_cm_s: int = 0, vy_cm_s: int = 0, vz_cm_s: int = 0, ) -> _FakeMavlinkMessage: return _FakeMavlinkMessage( "GLOBAL_POSITION_INT", ts_s, lat=lat_e7, lon=lon_e7, alt=alt_mm, hdg=hdg_cdeg, vx=vx_cm_s, vy=vy_cm_s, vz=vz_cm_s, ) def _gps_raw_int( *, ts_s: float, lat_e7: int, lon_e7: int, alt_mm: int, vel_cm_s: int = 0, cog_cdeg: int = 0, ) -> _FakeMavlinkMessage: return _FakeMavlinkMessage( "GPS_RAW_INT", ts_s, lat=lat_e7, lon=lon_e7, alt=alt_mm, vel=vel_cm_s, cog=cog_cdeg, ) def _factory_from(messages: list[_FakeMavlinkMessage]) -> Any: """Return a ``source_factory`` that yields the given message list.""" def _factory(_path: str) -> _FakeMavlinkSource: return _FakeMavlinkSource(messages) return _factory # --------------------------------------------------------------------- # AC-1: Happy path on real tlog (gated on the committed binary) def _real_derkachi_tlog() -> Path: return ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" / "derkachi.tlog" ) @pytest.mark.skipif( not _real_derkachi_tlog().is_file(), reason=( "Real derkachi.tlog binary not present (gitignored 5.8 MB blob). " "Place it at _docs/00_problem/input_data/flight_derkachi/derkachi.tlog " "to exercise AC-1." ), ) def test_ac1_real_derkachi_tlog_has_geofence_records() -> None: # Arrange tlog = _real_derkachi_tlog() # Act truth = load_tlog_ground_truth(tlog) # Assert assert len(truth.records) > 100, ( f"expected > 100 GPS records, got {len(truth.records)}" ) assert truth.source in {"GLOBAL_POSITION_INT", "GPS_RAW_INT"} # Derkachi geofence: lat ≈ 50.08, lon ≈ 36.11 (Kharkiv suburb). lats = [r.lat_deg for r in truth.records if r.lat_deg != 0.0] lons = [r.lon_deg for r in truth.records if r.lon_deg != 0.0] assert lats, "every GPS record has lat == 0; tlog likely malformed" median_lat = sorted(lats)[len(lats) // 2] median_lon = sorted(lons)[len(lons) // 2] assert 49.9 <= median_lat <= 50.3, f"median lat {median_lat} outside Derkachi band" assert 35.9 <= median_lon <= 36.4, f"median lon {median_lon} outside Derkachi band" # --------------------------------------------------------------------- # AC-2: Empty GPS gracefully (no messages → empty records + WARN log) def test_ac2_empty_tlog_returns_empty_records_and_warns( tmp_path: Path, caplog: pytest.LogCaptureFixture, ) -> None: # Arrange fake_tlog = tmp_path / "empty.tlog" fake_tlog.write_bytes(b"") factory = _factory_from([]) # Act with caplog.at_level( logging.WARNING, logger="gps_denied_onboard.replay_input.tlog_ground_truth", ): truth = load_tlog_ground_truth(fake_tlog, source_factory=factory) # Assert assert truth.records == () assert truth.source == "" assert any( "contains no GLOBAL_POSITION_INT or GPS_RAW_INT" in rec.message for rec in caplog.records ) def test_missing_file_raises(tmp_path: Path) -> None: # Arrange missing = tmp_path / "absent.tlog" # Act / Assert with pytest.raises(ReplayInputAdapterError, match="tlog file not found"): load_tlog_ground_truth(missing) # --------------------------------------------------------------------- # AC-3: Fallback precedence (GPS_RAW_INT only; mixed source) def test_ac3_gps_raw_int_fallback_when_no_global_position_int(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "raw_only.tlog" fake_tlog.write_bytes(b"") messages = [ _gps_raw_int( ts_s=1_700_000_000.000, lat_e7=500_800_000, # 50.08 lon_e7=361_100_000, # 36.11 alt_mm=200_000, # 200 m MSL vel_cm_s=1500, # 15 m/s cog_cdeg=9000, # 90° (east) ), _gps_raw_int( ts_s=1_700_000_000.200, lat_e7=500_801_000, lon_e7=361_101_000, alt_mm=200_500, vel_cm_s=1500, cog_cdeg=9000, ), ] factory = _factory_from(messages) # Act truth = load_tlog_ground_truth(fake_tlog, source_factory=factory) # Assert assert truth.source == "GPS_RAW_INT" assert len(truth.records) == 2 first = truth.records[0] assert first.lat_deg == pytest.approx(50.08, abs=1e-6) assert first.lon_deg == pytest.approx(36.11, abs=1e-6) assert first.alt_m == pytest.approx(200.0, abs=1e-3) # cog=90° (east) ⇒ vx (north) = 0, vy (east) = 15 m/s, vz = 0. assert first.vx_m_s == pytest.approx(0.0, abs=1e-9) assert first.vy_m_s == pytest.approx(15.0, abs=1e-9) assert first.vz_m_s == 0.0 assert first.hdg_deg == pytest.approx(90.0, abs=1e-6) assert first.ts_ns == 1_700_000_000_000_000_000 def test_ac3_mixed_messages_prefer_global_position_int(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "mixed.tlog" fake_tlog.write_bytes(b"") messages = [ _gps_raw_int( ts_s=1.0, lat_e7=400_000_000, # 40.00 — distinguishable from GPI rows lon_e7=300_000_000, # 30.00 alt_mm=100_000, cog_cdeg=0, ), _global_position_int( ts_s=1.0, lat_e7=500_800_000, # 50.08 lon_e7=361_100_000, # 36.11 alt_mm=200_000, hdg_cdeg=4500, # 45° vx_cm_s=500, vy_cm_s=-500, vz_cm_s=100, ), _gps_raw_int( ts_s=2.0, lat_e7=400_001_000, lon_e7=300_001_000, alt_mm=100_500, cog_cdeg=0, ), _global_position_int( ts_s=2.0, lat_e7=500_801_000, lon_e7=361_101_000, alt_mm=200_500, hdg_cdeg=4500, vx_cm_s=500, vy_cm_s=-500, vz_cm_s=100, ), ] factory = _factory_from(messages) # Act truth = load_tlog_ground_truth(fake_tlog, source_factory=factory) # Assert — GLOBAL_POSITION_INT wins; GPS_RAW_INT rows are ignored. assert truth.source == "GLOBAL_POSITION_INT" assert len(truth.records) == 2 for rec in truth.records: assert rec.lat_deg == pytest.approx(50.08, abs=1e-3) assert rec.lon_deg == pytest.approx(36.11, abs=1e-3) assert rec.hdg_deg == pytest.approx(45.0, abs=1e-6) assert rec.vx_m_s == pytest.approx(5.0, abs=1e-9) assert rec.vy_m_s == pytest.approx(-5.0, abs=1e-9) assert rec.vz_m_s == pytest.approx(1.0, abs=1e-9) # --------------------------------------------------------------------- # Unit conversions (MAVLink integer encodings) def test_global_position_int_unit_conversions(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "units.tlog" fake_tlog.write_bytes(b"") messages = [ _global_position_int( ts_s=10.5, lat_e7=123_456_789, # 12.3456789 deg lon_e7=-98_765_432, # -9.8765432 deg alt_mm=12_345, # 12.345 m hdg_cdeg=18_000, # 180.00 deg vx_cm_s=-2_500, # -25.00 m/s vy_cm_s=0, vz_cm_s=50, # 0.5 m/s ) ] factory = _factory_from(messages) # Act truth = load_tlog_ground_truth(fake_tlog, source_factory=factory) # Assert assert truth.source == "GLOBAL_POSITION_INT" (rec,) = truth.records assert rec.lat_deg == pytest.approx(12.345_678_9, abs=1e-9) assert rec.lon_deg == pytest.approx(-9.876_543_2, abs=1e-9) assert rec.alt_m == pytest.approx(12.345, abs=1e-9) assert rec.hdg_deg == pytest.approx(180.0, abs=1e-9) assert rec.vx_m_s == pytest.approx(-25.0, abs=1e-9) assert rec.vy_m_s == 0.0 assert rec.vz_m_s == pytest.approx(0.5, abs=1e-9) assert rec.ts_ns == int(10.5 * 1_000_000_000) def test_gps_raw_int_cog_to_ned_decomposition(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "cog.tlog" fake_tlog.write_bytes(b"") messages = [ _gps_raw_int( ts_s=0.0, lat_e7=0, lon_e7=0, alt_mm=0, vel_cm_s=2000, # 20 m/s cog_cdeg=4500, # 45° (NE) ) ] factory = _factory_from(messages) # Act truth = load_tlog_ground_truth(fake_tlog, source_factory=factory) # Assert — 20 m/s @ 45° ⇒ vx = vy = 20/sqrt(2) ≈ 14.142. (rec,) = truth.records expected = 20.0 * math.cos(math.radians(45.0)) assert rec.vx_m_s == pytest.approx(expected, abs=1e-9) assert rec.vy_m_s == pytest.approx(expected, abs=1e-9) assert rec.vz_m_s == 0.0 assert rec.hdg_deg == pytest.approx(45.0, abs=1e-9) def test_missing_timestamp_raises(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "no_ts.tlog" fake_tlog.write_bytes(b"") class _MsgNoTimestamp: def get_type(self) -> str: return "GLOBAL_POSITION_INT" factory = _factory_from([_MsgNoTimestamp()]) # type: ignore[list-item] # Act / Assert with pytest.raises( ReplayInputAdapterError, match="missing _timestamp attribute" ): load_tlog_ground_truth(fake_tlog, source_factory=factory) def test_source_is_closed_after_load(tmp_path: Path) -> None: # Arrange fake_tlog = tmp_path / "close.tlog" fake_tlog.write_bytes(b"") captured: dict[str, _FakeMavlinkSource] = {} def _factory(_path: str) -> _FakeMavlinkSource: src = _FakeMavlinkSource([]) captured["src"] = src return src # Act load_tlog_ground_truth(fake_tlog, source_factory=_factory) # Assert assert captured["src"].closed is True # --------------------------------------------------------------------- # DTO surface def test_tlog_ground_truth_is_frozen() -> None: # Arrange truth = TlogGroundTruth(records=(), source="") # Act / Assert with pytest.raises((AttributeError, TypeError)): truth.source = "GLOBAL_POSITION_INT" # type: ignore[misc] def test_tlog_gps_fix_is_frozen() -> None: # Arrange fix = TlogGpsFix( ts_ns=0, lat_deg=0.0, lon_deg=0.0, alt_m=0.0, hdg_deg=0.0, vx_m_s=0.0, vy_m_s=0.0, vz_m_s=0.0, ) # Act / Assert with pytest.raises((AttributeError, TypeError)): fix.lat_deg = 1.0 # type: ignore[misc] # --------------------------------------------------------------------- # AC-4: mypy --strict scoped to the new module def test_ac4_mypy_strict_clean(tmp_path: Path) -> None: """``mypy --strict`` on the AZ-697 module reports zero errors. The project is strict-by-default via ``pyproject.toml [tool.mypy]``; this scoped run catches regressions in CI without waiting for the full-suite mypy pass. """ # Arrange module_path = ( Path(__file__).resolve().parents[2].parent / "src" / "gps_denied_onboard" / "replay_input" / "tlog_ground_truth.py" ) # Act result = subprocess.run( [sys.executable, "-m", "mypy", "--strict", str(module_path)], capture_output=True, text=True, timeout=120, ) # Assert assert result.returncode == 0, ( f"mypy --strict reported errors:\n" f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}" )