Files
Oleksandr Bezdieniezhnykh 64d961f60c [AZ-697] [AZ-702] tlog GPS truth + KHP20S30 factory calibration
Batch 98 (cycle 2) — first two PBIs of epic AZ-696 (real-flight
validation harness):

AZ-697: direct binary-tlog GPS-truth extractor

- New src/gps_denied_onboard/replay_input/tlog_ground_truth.py reads
  GLOBAL_POSITION_INT (with GPS_RAW_INT fallback) from a binary
  ArduPilot tlog via pymavlink.mavutil and returns a frozen+slotted
  TlogGroundTruth DTO with per-record ts_ns / lat_deg / lon_deg / alt_m
  / hdg_deg / vx_m_s / vy_m_s / vz_m_s.
- Promoted l2_horizontal_m + match_percentage + GroundTruthRow from
  tests/e2e/replay/_helpers.py into the new production module
  src/gps_denied_onboard/helpers/gps_compare.py. The e2e helper now
  re-exports the same objects (identity, not copies) so existing test
  imports continue working untouched.
- tests/e2e/replay/conftest.py prefers the real derkachi.tlog when
  present, falls back to the CSV synth path otherwise.
- 22 new unit tests cover AC-1..AC-5 (mypy --strict subprocess test
  included). All passing.

AZ-702: Topotek KHP20S30 factory-sheet camera calibration

- New _docs/00_problem/input_data/flight_derkachi/khp20s30_factory.json:
  fx = fy = 4644.444, cx = 960, cy = 540, HFOV ~ 23.3 deg, VFOV ~ 13.2
  deg, computed from the published 8.5 mm focal length + 1/2.8" sensor
  + 1920x1080 capture at lowest zoom step. Distortion zeroed,
  body_to_camera_se3 = identity with nadir convention. Acquisition
  method explicitly recorded as factory_sheet so downstream code can
  expect higher residual error than a lab calibration.
- _docs/00_problem/input_data/flight_derkachi/camera_info.md updated
  to document the assumptions, expected residual error window, and
  conftest pick-up rule.
- tests/e2e/replay/conftest.py::_calibration_path() prefers
  khp20s30_factory.json when present, falls back to adti26.json.
- 9 new unit tests cover AC-1..AC-4 (schema, intrinsics traceback,
  doc reference, conftest pick-up). All passing.

Test run: 45 new tests, all passing. Full-suite gate deferred to
Step 16 (after the last batch in cycle 2 per the implement skill).

Adjacent note (not fixed in this batch, recorded in the batch report):
auto_sync.py has the same redundant pymavlink type:ignore + a few
numpy/cv2 mypy --strict issues. None on this batch's path.

Refs: _docs/03_implementation/batch_98_cycle2_report.md
Refs: _docs/02_tasks/done/AZ-697_tlog_ground_truth_extractor.md
Refs: _docs/02_tasks/done/AZ-702_khp20s30_calibration.md

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 16:09:03 +03:00

134 lines
4.6 KiB
Python

"""Helpers shared by the AZ-404 E2E replay tests.
The numerical kernels (``l2_horizontal_m``, ``match_percentage``,
``GroundTruthRow``) moved into production code at
:mod:`gps_denied_onboard.helpers.gps_compare` in AZ-697; they're
re-exported here so existing import sites stay stable.
* :func:`parse_jsonl` — read the ``JsonlReplaySink`` output into a list
of dicts with one entry per emit.
* :class:`CapturingMavlinkTransport` — test-only ``MavlinkTransport``
impl that records every ``write`` so AC-4b can compare the byte
streams produced by ``compose_root(config_live)`` vs.
``compose_root(config_replay)``.
* :func:`load_ground_truth_csv` — the IMU CSV's ``GLOBAL_POSITION_INT``
columns ARE the AC-3 reference (the original tlog's GPS rows
exported to CSV); this helper materialises them. Retained for the
CSV-only fallback path; the real-tlog branch uses
:func:`gps_denied_onboard.replay_input.load_tlog_ground_truth`
instead.
All functions are pure / deterministic and stay safely importable on
dev macOS without ``RUN_REPLAY_E2E``; the regular regression suite
calls them via the unit-level helper test in this module's sibling
``test_helpers.py``.
"""
from __future__ import annotations
import csv
import json
from pathlib import Path
from typing import Any
from gps_denied_onboard.helpers.gps_compare import (
GroundTruthRow,
l2_horizontal_m,
match_percentage,
)
__all__ = [
"CapturingMavlinkTransport",
"GroundTruthRow",
"l2_horizontal_m",
"load_ground_truth_csv",
"match_percentage",
"parse_jsonl",
]
def parse_jsonl(path: Path) -> list[dict[str, Any]]:
"""Return one dict per line of a JsonlReplaySink output file.
Empty trailing lines are tolerated (orjson always terminates with
``\\n`` so the last newline is followed by ``""``); other empty
lines indicate a corrupt file and surface as a JSON decode error.
"""
records: list[dict[str, Any]] = []
with path.open(encoding="utf-8") as fp:
for lineno, line in enumerate(fp, start=1):
stripped = line.rstrip("\n")
if not stripped:
continue
try:
records.append(json.loads(stripped))
except json.JSONDecodeError as exc:
raise AssertionError(
f"line {lineno} in {path} is not valid JSON: {exc.msg!r}"
) from exc
return records
def load_ground_truth_csv(csv_path: Path) -> list[GroundTruthRow]:
"""Load the Derkachi IMU CSV's GPS rows as ground truth.
The original ``flight_derkachi.tlog``'s ``GLOBAL_POSITION_INT``
messages were exported to ``data_imu.csv``; the ``lat / lon /
alt`` columns are degrees * 1e7 / metres * 1e3 (mavlink integer
encoding), so we divide accordingly.
"""
rows: list[GroundTruthRow] = []
with csv_path.open(newline="") as fp:
reader = csv.DictReader(fp)
for r in reader:
rows.append(
GroundTruthRow(
t_s=float(r["Time"]),
lat_deg=float(r["GLOBAL_POSITION_INT.lat"]) / 1e7,
lon_deg=float(r["GLOBAL_POSITION_INT.lon"]) / 1e7,
alt_m=float(r["GLOBAL_POSITION_INT.alt"]) / 1e3,
)
)
return rows
class CapturingMavlinkTransport:
"""Test-only :class:`MavlinkTransport` that records every write.
Used by AZ-404 AC-4b: capture the byte streams produced by
``compose_root(config_live).c8.emit_external_position(out)`` and
``compose_root(config_replay).c8.emit_external_position(out)`` to
assert byte-identity per replay protocol Invariant 5.
NOTE: AC-4b is currently SKIPPED (blocked on AZ-558 — the C8
encoders still bypass the ``MavlinkTransport`` seam by calling
``mav.*_send`` directly). This class is in place so the test
fixture is ready the moment AZ-558 lands.
"""
def __init__(self) -> None:
self._chunks: list[bytes] = []
self._closed = False
def write(self, payload: bytes) -> int:
if self._closed:
raise RuntimeError("CapturingMavlinkTransport.write after close")
self._chunks.append(bytes(payload))
return len(payload)
def bytes_written(self) -> int:
return sum(len(c) for c in self._chunks)
def close(self) -> None:
self._closed = True
@property
def captured_payloads(self) -> tuple[bytes, ...]:
"""Tuple of every payload passed to :meth:`write`, in order."""
return tuple(self._chunks)
@property
def captured_concat(self) -> bytes:
"""All captured payloads concatenated — the wire-byte stream."""
return b"".join(self._chunks)