test(e2e): add MARSLVIGAdapter (rotary, RTK, raw IMU)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Yuzviak
2026-04-16 21:59:34 +03:00
parent 58192f5d73
commit d42e6e546c
2 changed files with 150 additions and 0 deletions
@@ -0,0 +1,86 @@
"""Adapter for MARS-LVIG sequences — reads pre-extracted images/ + imu.csv + gnss.csv."""
from __future__ import annotations
import csv
from pathlib import Path
from typing import Iterator
from gps_denied.testing.datasets.base import (
DatasetAdapter,
DatasetCapabilities,
DatasetFrame,
DatasetIMU,
DatasetNotAvailableError,
DatasetPose,
PlatformClass,
)
class MARSLVIGAdapter(DatasetAdapter):
"""Reads one MARS-LVIG sequence that has been pre-extracted from its ROS bag.
Expected layout under `root/`:
images/{timestamp_ns}.jpg
imu.csv — timestamp_ns, ax, ay, az, gx, gy, gz
gnss.csv — timestamp_ns, lat, lon, alt
"""
def __init__(self, root: Path) -> None:
self._root = Path(root)
self._img_dir = self._root / "images"
self._imu_csv = self._root / "imu.csv"
self._gnss_csv = self._root / "gnss.csv"
missing = [
str(p) for p in (self._img_dir, self._imu_csv, self._gnss_csv)
if not p.exists()
]
if missing:
raise DatasetNotAvailableError(
f"MARS-LVIG sequence incomplete at {self._root}; missing: {missing}. "
"Pre-extract the ROS bag into the expected layout first."
)
@property
def name(self) -> str:
return f"mars_lvig:{self._root.name}"
@property
def capabilities(self) -> DatasetCapabilities:
return DatasetCapabilities(
has_raw_imu=True,
has_rtk_gt=True, # RTK + DJI L1 reference
has_loop_closures=False,
platform_class=PlatformClass.ROTARY,
)
def iter_frames(self) -> Iterator[DatasetFrame]:
for idx, img in enumerate(sorted(self._img_dir.glob("*.jpg"))):
ts_ns = int(img.stem)
yield DatasetFrame(
frame_idx=idx,
timestamp_ns=ts_ns,
image_path=str(img),
)
def iter_imu(self) -> Iterator[DatasetIMU]:
with self._imu_csv.open() as fh:
reader = csv.DictReader(fh)
for row in reader:
yield DatasetIMU(
timestamp_ns=int(row["timestamp_ns"]),
accel=(float(row["ax"]), float(row["ay"]), float(row["az"])),
gyro=(float(row["gx"]), float(row["gy"]), float(row["gz"])),
)
def iter_ground_truth(self) -> Iterator[DatasetPose]:
with self._gnss_csv.open() as fh:
reader = csv.DictReader(fh)
for row in reader:
yield DatasetPose(
timestamp_ns=int(row["timestamp_ns"]),
lat=float(row["lat"]),
lon=float(row["lon"]),
alt=float(row["alt"]),
qx=0.0, qy=0.0, qz=0.0, qw=1.0,
)