Files
gps-denied-onboard/e2e/runner/helpers/mavproxy_tlog_reader.py
T
Oleksandr Bezdieniezhnykh a644debdb7 [AZ-416] [AZ-417] [AZ-419] Test batch 72: FT-P-09 AP/iNav + FT-P-11 cold start
- AZ-416 (FT-P-09-AP): fills mavproxy_tlog_reader.iter_messages with
  pymavlink body (AZ-406 surface kept); adds ap_contract_evaluator
  covering AC-1 (signing handshake <=5s), AC-2 (GPS_INPUT >=4.5 Hz),
  AC-3 (EK3_SRC1_POSXY=3), AC-4 (GPS_RAW_INT health >=80%); scenario
  forces fc_adapter=ardupilot.
- AZ-417 (FT-P-09-iNav): msp_frame_observer covering AC-2 (MSP rate)
  and AC-3 (fix_type/provider/numSat); scenario forces
  fc_adapter=inav.
- AZ-419 (FT-P-11): cold_start_evaluator covering AC-1 (operator
  manifest origin), AC-2 (FC EKF fallback), AC-3 (no-origin abort),
  AC-4 (bounded-delta conflict, ADR-010 Principle #11 amended);
  scenario parametrized on origin_source plus dedicated no-origin
  abort scenario.
- All scenarios skip-gated on upstream frame_source_replay /
  imu_replay / fdr_reader / sitl_observer extensions.
- +67 unit tests; full e2e unit suite: 460 passed.
- K=3 cumulative review fired: PASS for batches 70-72.

See _docs/03_implementation/batch_72_report.md,
_docs/03_implementation/reviews/batch_72_review.md,
_docs/03_implementation/cumulative_review_batches_70-72_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 07:49:17 +03:00

95 lines
3.1 KiB
Python

"""Parse `.tlog` files emitted by `mavproxy-listener`.
`.tlog` is the standard MAVLink dialect dump format: each message is a
6-byte unix-microsecond timestamp followed by the wire bytes of the MAVLink
frame. pymavlink ships `mavlogfile` which knows how to iterate this.
This module exposes a small typed wrapper so per-scenario tests can:
1. Filter for the message types they care about.
2. Compute summary statistics (count per type, message-rate Hz, ratio
of signed vs unsigned messages for NFT-SEC-03).
3. Attach the source `.tlog` path to the evidence bundler.
AZ-416 (FT-P-09-AP) owns the pymavlink-backed body; AZ-406 committed to
the public surface.
Public-boundary discipline: does NOT import any ``src/gps_denied_onboard``
symbol.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
from pymavlink import mavutil
@dataclass(frozen=True)
class TlogMessage:
timestamp_us: int
msg_type: str
signed: bool
fields: dict[str, object]
def iter_messages(tlog_path: Path) -> Iterator[TlogMessage]:
"""Iterate `.tlog` messages oldest-first.
Uses ``pymavlink.mavutil.mavlink_connection`` in tlog-file mode.
Each yielded ``TlogMessage`` carries:
* ``timestamp_us`` — unix microseconds, as recorded by mavproxy
(pymavlink exposes this as ``msg._timestamp`` in seconds-float).
* ``msg_type`` — message name (e.g. ``"GPS_INPUT"``, ``"GPS_RAW_INT"``).
* ``signed`` — True iff the wire frame carried a MAVLink 2.0
signature block (`msg.get_signed()` on pymavlink ≥2.4).
* ``fields`` — dict of field name → value, via ``msg.to_dict()``
minus the ``mavpackettype`` key.
Bad / unparsable frames are skipped (mavlogfile returns ``None`` or
raises internally) but EOF closes the iterator cleanly.
"""
if not tlog_path.exists():
raise FileNotFoundError(f"tlog not found: {tlog_path}")
conn = mavutil.mavlink_connection(str(tlog_path))
try:
while True:
msg = conn.recv_match(blocking=False)
if msg is None:
break
msg_type = msg.get_type()
if msg_type == "BAD_DATA":
continue
try:
fields = msg.to_dict()
except Exception:
continue
fields.pop("mavpackettype", None)
ts_s = getattr(msg, "_timestamp", 0.0) or 0.0
try:
signed = bool(msg.get_signed())
except AttributeError:
signed = False
yield TlogMessage(
timestamp_us=int(ts_s * 1_000_000),
msg_type=msg_type,
signed=signed,
fields=fields,
)
finally:
try:
conn.close()
except Exception:
pass
def count_by_type(tlog_path: Path) -> dict[str, int]:
"""Return ``{msg_type: count}`` for every distinct message type."""
counts: dict[str, int] = {}
for msg in iter_messages(tlog_path):
counts[msg.msg_type] = counts.get(msg.msg_type, 0) + 1
return counts