"""Unit tests for ``runner.helpers.mavproxy_tlog_reader.iter_messages``. AZ-416 fills in the pymavlink-backed body; AZ-406 committed the public surface. These tests synthesise a tiny tlog on the fly so the parser can be exercised without needing a captured `.tlog` artifact. """ from __future__ import annotations import struct from pathlib import Path import pytest from pymavlink.dialects.v20 import ardupilotmega as mavlink from runner.helpers.mavproxy_tlog_reader import ( TlogMessage, count_by_type, iter_messages, ) _SRC_SYSTEM = 1 _SRC_COMPONENT = mavlink.MAV_COMP_ID_AUTOPILOT1 _BASE_TS_US = 1_700_000_000_000_000 def _write_tlog(tlog_path: Path, records: list[tuple[int, bytes]]) -> Path: """Write a synthetic tlog: ``[8B big-endian ts_us][raw frame]`` per record.""" with tlog_path.open("wb") as fh: for ts_us, payload in records: fh.write(struct.pack(">Q", ts_us)) fh.write(payload) return tlog_path def _make_mav() -> mavlink.MAVLink: return mavlink.MAVLink( file=None, srcSystem=_SRC_SYSTEM, srcComponent=_SRC_COMPONENT, ) def _heartbeat(mav: mavlink.MAVLink) -> bytes: return mav.heartbeat_encode( type=mavlink.MAV_TYPE_FIXED_WING, autopilot=mavlink.MAV_AUTOPILOT_ARDUPILOTMEGA, base_mode=mavlink.MAV_MODE_FLAG_AUTO_ENABLED, custom_mode=10, system_status=mavlink.MAV_STATE_ACTIVE, ).pack(mav) def _gps_raw_int(mav: mavlink.MAVLink, *, fix_type: int = 3, eph: int = 100) -> bytes: return mav.gps_raw_int_encode( time_usec=_BASE_TS_US, fix_type=fix_type, lat=487750000, lon=375940000, alt=280000, eph=eph, epv=200, vel=12000, cog=18000, satellites_visible=12, ).pack(mav) def _gps_input(mav: mavlink.MAVLink) -> bytes: return mav.gps_input_encode( time_usec=_BASE_TS_US, gps_id=0, ignore_flags=0, time_week_ms=0, time_week=0, fix_type=3, lat=487750000, lon=375940000, alt=280.0, hdop=1.0, vdop=2.0, vn=10.0, ve=5.0, vd=0.5, speed_accuracy=0.3, horiz_accuracy=1.0, vert_accuracy=2.0, satellites_visible=12, ).pack(mav) def test_iter_messages_raises_on_missing_file(tmp_path: Path) -> None: # Act / Assert with pytest.raises(FileNotFoundError, match="tlog not found"): list(iter_messages(tmp_path / "absent.tlog")) def test_iter_messages_yields_message_type_and_fields(tmp_path: Path) -> None: """A single heartbeat round-trips through iter_messages.""" # Arrange mav = _make_mav() tlog = _write_tlog(tmp_path / "single.tlog", [(_BASE_TS_US, _heartbeat(mav))]) # Act msgs = list(iter_messages(tlog)) # Assert assert len(msgs) == 1 m = msgs[0] assert isinstance(m, TlogMessage) assert m.msg_type == "HEARTBEAT" assert m.fields["autopilot"] == mavlink.MAV_AUTOPILOT_ARDUPILOTMEGA assert "mavpackettype" not in m.fields # excluded by the impl def test_iter_messages_preserves_order(tmp_path: Path) -> None: """Multiple records are yielded oldest-first.""" # Arrange mav = _make_mav() tlog = _write_tlog( tmp_path / "ordered.tlog", [ (_BASE_TS_US + 0, _heartbeat(mav)), (_BASE_TS_US + 1_000_000, _gps_raw_int(mav)), (_BASE_TS_US + 2_000_000, _gps_input(mav)), ], ) # Act types = [m.msg_type for m in iter_messages(tlog)] # Assert assert types == ["HEARTBEAT", "GPS_RAW_INT", "GPS_INPUT"] def test_iter_messages_timestamp_in_microseconds(tmp_path: Path) -> None: """``msg._timestamp`` is seconds; we expose microseconds.""" # Arrange mav = _make_mav() tlog = _write_tlog(tmp_path / "ts.tlog", [(_BASE_TS_US + 5_000_000, _heartbeat(mav))]) # Act msg = next(iter_messages(tlog)) # Assert — pymavlink rounds to its frame timestamp; tolerate ±1ms slop. assert abs(msg.timestamp_us - (_BASE_TS_US + 5_000_000)) <= 1_000 def test_iter_messages_signed_flag_default_false(tmp_path: Path) -> None: """Plain pymavlink-encoded frame is NOT signed → signed=False.""" # Arrange mav = _make_mav() tlog = _write_tlog(tmp_path / "u.tlog", [(_BASE_TS_US, _heartbeat(mav))]) # Act msg = next(iter_messages(tlog)) # Assert assert msg.signed is False def test_count_by_type_tallies_correctly(tmp_path: Path) -> None: """count_by_type runs iter_messages and aggregates the type counts.""" # Arrange mav = _make_mav() tlog = _write_tlog( tmp_path / "mixed.tlog", [ (_BASE_TS_US + 0, _heartbeat(mav)), (_BASE_TS_US + 1, _heartbeat(mav)), (_BASE_TS_US + 2, _gps_raw_int(mav)), ], ) # Act counts = count_by_type(tlog) # Assert assert counts["HEARTBEAT"] == 2 assert counts["GPS_RAW_INT"] == 1