"""Synthesize a pymavlink ``.tlog`` from the Derkachi ``data_imu.csv``. The Derkachi fixture (``_docs/00_problem/input_data/flight_derkachi/``) ships ``flight_derkachi.mp4`` + ``data_imu.csv`` only — the original pymavlink tlog is not in-repo (it was the source the CSV was *exported* from). The AZ-404 E2E test runs ``gps-denied-replay`` which expects a tlog input, so we round-trip the CSV back to a tlog here. Output schema (per ``tlog_replay_adapter._REQUIRED_MESSAGE_GROUPS``): * ``SCALED_IMU2`` — one per CSV row (xacc/yacc/zacc/xgyro/ygyro/zgyro/ xmag/ymag/zmag fields map 1:1). * ``GPS_RAW_INT`` — one per CSV row, derived from ``GLOBAL_POSITION_INT.lat / .lon / .alt / .vx / .vy``. ``fix_type`` is held at ``GPS_FIX_TYPE_3D_FIX`` (3) for every row — the CSV is post-flight cleaned and contains valid GPS throughout. * ``ATTITUDE`` — one per CSV row. roll/pitch are synthesized as zero (the camera is mechanically locked nadir per ``camera_info.md``); yaw is derived from ``GLOBAL_POSITION_INT.hdg`` (cdeg → rad). * ``HEARTBEAT`` — one per second so the tlog-replay adapter's pre-scan find the type quickly. The tlog binary format is the pymavlink convention: ``<8-byte big-endian timestamp microseconds>``, repeated. The C8 ``TlogReplayFcAdapter`` consumes it via ``mavutil.mavlink_connection(path, mavlink_version="2.0")``. The synthesizer is deterministic: identical CSV → identical bytes. The conftest caches the output path next to the CSV so repeat runs short-circuit when the cache is up-to-date. """ from __future__ import annotations import csv import math import struct from pathlib import Path from typing import Final from pymavlink.dialects.v20 import ardupilotmega as mavlink __all__ = [ "SOURCE_COMPONENT", "SOURCE_SYSTEM", "synthesize_tlog", ] SOURCE_SYSTEM: Final[int] = 1 # vehicle id (any non-zero stable integer) SOURCE_COMPONENT: Final[int] = mavlink.MAV_COMP_ID_AUTOPILOT1 _HEARTBEAT_PERIOD_S: Final[float] = 1.0 # tlog timestamp epoch — pymavlink stores absolute microseconds. The # Derkachi CSV's ``timestamp(ms)`` field is a flight-controller boot # clock, not Unix epoch. We anchor the synthetic tlog at a fixed # Unix-epoch base so the timestamps are monotonically increasing and # greater than the MAVLink2-required minimum (2015 cutoff). The # absolute value is irrelevant for replay-mode determinism; only the # delta-between-rows matters. _TLOG_BASE_TIMESTAMP_US: Final[int] = 1_700_000_000_000_000 # 2023-11-14 22:13:20 UTC def synthesize_tlog(csv_path: Path, tlog_path: Path) -> int: """Write a tlog reproduced from ``csv_path`` to ``tlog_path``. Returns the number of bytes written. Overwrites ``tlog_path`` atomically (write to ``.tmp``, fsync, rename). The output schema satisfies ``TlogReplayFcAdapter``'s pre-scan requirements per ``c8_fc_adapter/tlog_replay_adapter.py``: ``RAW_IMU`` or ``SCALED_IMU2`` + ``ATTITUDE`` + ``GPS_RAW_INT`` or ``GPS2_RAW`` + ``HEARTBEAT``. """ tmp_path = tlog_path.with_suffix(tlog_path.suffix + ".tmp") mav = mavlink.MAVLink( file=None, srcSystem=SOURCE_SYSTEM, srcComponent=SOURCE_COMPONENT, ) bytes_written = 0 next_heartbeat_t_s = 0.0 with csv_path.open(newline="") as fp, tmp_path.open("wb") as out: reader = csv.DictReader(fp) for row in reader: t_s = float(row["Time"]) ts_us = _TLOG_BASE_TIMESTAMP_US + int(t_s * 1_000_000) time_boot_ms = int(float(row["timestamp(ms)"])) # SCALED_IMU2 ---------------------------------------------------- imu2 = mav.scaled_imu2_encode( time_boot_ms=time_boot_ms, xacc=int(float(row["SCALED_IMU2.xacc"])), yacc=int(float(row["SCALED_IMU2.yacc"])), zacc=int(float(row["SCALED_IMU2.zacc"])), xgyro=int(float(row["SCALED_IMU2.xgyro"])), ygyro=int(float(row["SCALED_IMU2.ygyro"])), zgyro=int(float(row["SCALED_IMU2.zgyro"])), xmag=int(float(row["SCALED_IMU2.xmag"])), ymag=int(float(row["SCALED_IMU2.ymag"])), zmag=int(float(row["SCALED_IMU2.zmag"])), ) bytes_written += _write_record(out, ts_us, imu2.pack(mav)) # ATTITUDE ------------------------------------------------------- yaw_cdeg = float(row["GLOBAL_POSITION_INT.hdg"]) yaw_rad = math.radians(yaw_cdeg / 100.0) if yaw_cdeg > 0 else 0.0 attitude = mav.attitude_encode( time_boot_ms=time_boot_ms, roll=0.0, pitch=0.0, yaw=yaw_rad, rollspeed=0.0, pitchspeed=0.0, yawspeed=0.0, ) bytes_written += _write_record(out, ts_us, attitude.pack(mav)) # GPS_RAW_INT ---------------------------------------------------- gps = mav.gps_raw_int_encode( time_usec=ts_us, fix_type=mavlink.GPS_FIX_TYPE_3D_FIX, lat=int(float(row["GLOBAL_POSITION_INT.lat"])), lon=int(float(row["GLOBAL_POSITION_INT.lon"])), alt=int(float(row["GLOBAL_POSITION_INT.alt"])), eph=100, epv=200, vel=int( math.hypot( float(row["GLOBAL_POSITION_INT.vx"]), float(row["GLOBAL_POSITION_INT.vy"]), ) ), cog=int(yaw_cdeg) if yaw_cdeg > 0 else 0, satellites_visible=12, ) bytes_written += _write_record(out, ts_us, gps.pack(mav)) # HEARTBEAT (1 Hz) ----------------------------------------------- if t_s >= next_heartbeat_t_s: heartbeat = mav.heartbeat_encode( type=mavlink.MAV_TYPE_FIXED_WING, autopilot=mavlink.MAV_AUTOPILOT_ARDUPILOTMEGA, base_mode=mavlink.MAV_MODE_FLAG_AUTO_ENABLED, custom_mode=10, # AUTO mode for ArduPlane system_status=mavlink.MAV_STATE_ACTIVE, ) bytes_written += _write_record(out, ts_us, heartbeat.pack(mav)) next_heartbeat_t_s = t_s + _HEARTBEAT_PERIOD_S out.flush() # fsync the temp file so the rename below is durable on power loss. # OSError here is rare; we want it to surface, not be swallowed. import os as _os _os.fsync(out.fileno()) tmp_path.replace(tlog_path) return bytes_written def _write_record(out, ts_us: int, payload: bytes) -> int: """Write one tlog record (8B big-endian timestamp + MAVLink frame).""" header = struct.pack(">Q", ts_us) out.write(header) out.write(payload) return len(header) + len(payload)