From d42e6e546cdafee0540a4a347a74fecb1f02f823 Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Thu, 16 Apr 2026 21:59:34 +0300 Subject: [PATCH] test(e2e): add MARSLVIGAdapter (rotary, RTK, raw IMU) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/gps_denied/testing/datasets/mars_lvig.py | 86 ++++++++++++++++++++ tests/e2e/test_mars_lvig_adapter.py | 64 +++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 src/gps_denied/testing/datasets/mars_lvig.py create mode 100644 tests/e2e/test_mars_lvig_adapter.py diff --git a/src/gps_denied/testing/datasets/mars_lvig.py b/src/gps_denied/testing/datasets/mars_lvig.py new file mode 100644 index 0000000..fc5230d --- /dev/null +++ b/src/gps_denied/testing/datasets/mars_lvig.py @@ -0,0 +1,86 @@ +"""Adapter for MARS-LVIG sequences — reads pre-extracted images/ + imu.csv + gnss.csv.""" + +from __future__ import annotations + +import csv +from pathlib import Path +from typing import Iterator + +from gps_denied.testing.datasets.base import ( + DatasetAdapter, + DatasetCapabilities, + DatasetFrame, + DatasetIMU, + DatasetNotAvailableError, + DatasetPose, + PlatformClass, +) + + +class MARSLVIGAdapter(DatasetAdapter): + """Reads one MARS-LVIG sequence that has been pre-extracted from its ROS bag. + + Expected layout under `root/`: + images/{timestamp_ns}.jpg + imu.csv — timestamp_ns, ax, ay, az, gx, gy, gz + gnss.csv — timestamp_ns, lat, lon, alt + """ + + def __init__(self, root: Path) -> None: + self._root = Path(root) + self._img_dir = self._root / "images" + self._imu_csv = self._root / "imu.csv" + self._gnss_csv = self._root / "gnss.csv" + missing = [ + str(p) for p in (self._img_dir, self._imu_csv, self._gnss_csv) + if not p.exists() + ] + if missing: + raise DatasetNotAvailableError( + f"MARS-LVIG sequence incomplete at {self._root}; missing: {missing}. " + "Pre-extract the ROS bag into the expected layout first." + ) + + @property + def name(self) -> str: + return f"mars_lvig:{self._root.name}" + + @property + def capabilities(self) -> DatasetCapabilities: + return DatasetCapabilities( + has_raw_imu=True, + has_rtk_gt=True, # RTK + DJI L1 reference + has_loop_closures=False, + platform_class=PlatformClass.ROTARY, + ) + + def iter_frames(self) -> Iterator[DatasetFrame]: + for idx, img in enumerate(sorted(self._img_dir.glob("*.jpg"))): + ts_ns = int(img.stem) + yield DatasetFrame( + frame_idx=idx, + timestamp_ns=ts_ns, + image_path=str(img), + ) + + def iter_imu(self) -> Iterator[DatasetIMU]: + with self._imu_csv.open() as fh: + reader = csv.DictReader(fh) + for row in reader: + yield DatasetIMU( + timestamp_ns=int(row["timestamp_ns"]), + accel=(float(row["ax"]), float(row["ay"]), float(row["az"])), + gyro=(float(row["gx"]), float(row["gy"]), float(row["gz"])), + ) + + def iter_ground_truth(self) -> Iterator[DatasetPose]: + with self._gnss_csv.open() as fh: + reader = csv.DictReader(fh) + for row in reader: + yield DatasetPose( + timestamp_ns=int(row["timestamp_ns"]), + lat=float(row["lat"]), + lon=float(row["lon"]), + alt=float(row["alt"]), + qx=0.0, qy=0.0, qz=0.0, qw=1.0, + ) diff --git a/tests/e2e/test_mars_lvig_adapter.py b/tests/e2e/test_mars_lvig_adapter.py new file mode 100644 index 0000000..040f248 --- /dev/null +++ b/tests/e2e/test_mars_lvig_adapter.py @@ -0,0 +1,64 @@ +"""MARSLVIGAdapter — reads pre-extracted per-topic files from a sequence dir.""" + +from pathlib import Path + +import pytest + +from gps_denied.testing.datasets.base import ( + DatasetNotAvailableError, + PlatformClass, +) +from gps_denied.testing.datasets.mars_lvig import MARSLVIGAdapter + + +@pytest.fixture +def fake_mars_seq(tmp_path: Path) -> Path: + seq = tmp_path / "HKairport_01" + (seq / "images").mkdir(parents=True) + for ts in (100_000_000, 200_000_000): + (seq / "images" / f"{ts}.jpg").write_bytes(b"\xff\xd8\xff\xd9") + (seq / "imu.csv").write_text( + "timestamp_ns,ax,ay,az,gx,gy,gz\n" + "100000000,0.0,0.0,-9.81,0.0,0.0,0.0\n" + "200000000,0.0,0.0,-9.81,0.0,0.0,0.0\n" + ) + (seq / "gnss.csv").write_text( + "timestamp_ns,lat,lon,alt\n" + "100000000,22.32,114.17,100.0\n" + "200000000,22.3201,114.1702,100.0\n" + ) + return seq + + +def test_raises_when_missing(tmp_path: Path): + with pytest.raises(DatasetNotAvailableError): + MARSLVIGAdapter(tmp_path / "nope") + + +def test_capabilities(fake_mars_seq: Path): + adapter = MARSLVIGAdapter(fake_mars_seq) + cap = adapter.capabilities + assert cap.has_raw_imu is True + assert cap.has_rtk_gt is True + assert cap.platform_class == PlatformClass.ROTARY + + +def test_iter_frames(fake_mars_seq: Path): + adapter = MARSLVIGAdapter(fake_mars_seq) + frames = list(adapter.iter_frames()) + assert len(frames) == 2 + assert frames[0].timestamp_ns == 100_000_000 + + +def test_iter_imu(fake_mars_seq: Path): + adapter = MARSLVIGAdapter(fake_mars_seq) + imu = list(adapter.iter_imu()) + assert len(imu) == 2 + assert imu[0].accel == (0.0, 0.0, -9.81) + + +def test_iter_ground_truth(fake_mars_seq: Path): + adapter = MARSLVIGAdapter(fake_mars_seq) + gt = list(adapter.iter_ground_truth()) + assert len(gt) == 2 + assert gt[0].lat == pytest.approx(22.32)