diff --git a/src/gps_denied/testing/datasets/euroc.py b/src/gps_denied/testing/datasets/euroc.py new file mode 100644 index 0000000..48b1204 --- /dev/null +++ b/src/gps_denied/testing/datasets/euroc.py @@ -0,0 +1,115 @@ +"""Adapter for EuRoC MAV datasets (ETHZ ASL format).""" + +from __future__ import annotations + +import csv +from pathlib import Path +from typing import Iterator + +import numpy as np + +from gps_denied.testing.datasets.base import ( + DatasetAdapter, + DatasetCapabilities, + DatasetFrame, + DatasetIMU, + DatasetNotAvailableError, + DatasetPose, + PlatformClass, +) + + +EARTH_R = 6_378_137.0 +FIXED_ORIGIN_LAT = 49.0 +FIXED_ORIGIN_LON = 32.0 + + +class EuRoCAdapter(DatasetAdapter): + """Reads a single EuRoC sequence directory, e.g. `datasets/euroc/MH_01/`.""" + + def __init__(self, root: Path) -> None: + self._root = Path(root) + self._mav0 = self._root / "mav0" + if not self._mav0.is_dir(): + raise DatasetNotAvailableError( + f"EuRoC sequence not found at {self._root} " + f"(expected {self._root}/mav0/). " + "Run `python scripts/download_dataset.py euroc_mh01` first." + ) + + @property + def name(self) -> str: + return f"euroc:{self._root.name}" + + @property + def capabilities(self) -> DatasetCapabilities: + return DatasetCapabilities( + has_raw_imu=True, + has_rtk_gt=False, # Vicon, not RTK + has_loop_closures=False, + platform_class=PlatformClass.INDOOR, + ) + + def iter_frames(self) -> Iterator[DatasetFrame]: + cam_csv = self._mav0 / "cam0" / "data.csv" + img_dir = self._mav0 / "cam0" / "data" + with cam_csv.open() as fh: + reader = csv.reader(fh) + idx = 0 + for row in reader: + if not row or row[0].startswith("#"): + continue + ts_ns = int(row[0]) + filename = row[1].strip() + yield DatasetFrame( + frame_idx=idx, + timestamp_ns=ts_ns, + image_path=str(img_dir / filename), + ) + idx += 1 + + def iter_imu(self) -> Iterator[DatasetIMU]: + imu_csv = self._mav0 / "imu0" / "data.csv" + with imu_csv.open() as fh: + reader = csv.reader(fh) + for row in reader: + if not row or row[0].startswith("#"): + continue + ts_ns = int(row[0]) + wx, wy, wz = float(row[1]), float(row[2]), float(row[3]) + ax, ay, az = float(row[4]), float(row[5]), float(row[6]) + yield DatasetIMU( + timestamp_ns=ts_ns, + accel=(ax, ay, az), + gyro=(wx, wy, wz), + ) + + def iter_ground_truth(self) -> Iterator[DatasetPose]: + gt_csv = self._mav0 / "state_groundtruth_estimate0" / "data.csv" + origin_local: tuple[float, float, float] | None = None + with gt_csv.open() as fh: + reader = csv.reader(fh) + for row in reader: + if not row or row[0].startswith("#"): + continue + ts_ns = int(row[0]) + x, y, z = float(row[1]), float(row[2]), float(row[3]) + qw, qx, qy, qz = ( + float(row[4]), float(row[5]), float(row[6]), float(row[7]) + ) + if origin_local is None: + origin_local = (x, y, z) + de = x - origin_local[0] + dn = y - origin_local[1] + du = z - origin_local[2] + dlat = np.degrees(dn / EARTH_R) + dlon = np.degrees( + de / (EARTH_R * np.cos(np.radians(FIXED_ORIGIN_LAT))) + ) + yield DatasetPose( + timestamp_ns=ts_ns, + lat=FIXED_ORIGIN_LAT + dlat, + lon=FIXED_ORIGIN_LON + dlon, + alt=du, + qx=qx, qy=qy, qz=qz, qw=qw, + ) diff --git a/tests/e2e/test_euroc_adapter.py b/tests/e2e/test_euroc_adapter.py new file mode 100644 index 0000000..55e3189 --- /dev/null +++ b/tests/e2e/test_euroc_adapter.py @@ -0,0 +1,88 @@ +"""EuRoCAdapter unit tests — uses a minimal fabricated MH_01-style directory.""" + +from pathlib import Path + +import pytest + +from gps_denied.testing.datasets.base import ( + DatasetNotAvailableError, + PlatformClass, +) +from gps_denied.testing.datasets.euroc import EuRoCAdapter + + +@pytest.fixture +def fake_euroc_root(tmp_path: Path) -> Path: + mav0 = tmp_path / "mav0" + (mav0 / "cam0" / "data").mkdir(parents=True) + (mav0 / "imu0").mkdir(parents=True) + (mav0 / "state_groundtruth_estimate0").mkdir(parents=True) + + # Two frames + for ts in (1403636579763555584, 1403636579813555456): + img = mav0 / "cam0" / "data" / f"{ts}.png" + img.write_bytes(b"\x89PNG\r\n\x1a\n") # stub header + (mav0 / "cam0" / "data.csv").write_text( + "#timestamp [ns],filename\n" + "1403636579763555584,1403636579763555584.png\n" + "1403636579813555456,1403636579813555456.png\n" + ) + + # IMU: two samples + (mav0 / "imu0" / "data.csv").write_text( + "#timestamp [ns],w_RS_S_x [rad s^-1],w_RS_S_y,w_RS_S_z," + "a_RS_S_x [m s^-2],a_RS_S_y,a_RS_S_z\n" + "1403636579763555584,0.01,0.02,0.03,0.1,0.2,-9.7\n" + "1403636579813555456,0.01,0.02,0.03,0.1,0.2,-9.7\n" + ) + + # GT: two poses + (mav0 / "state_groundtruth_estimate0" / "data.csv").write_text( + "#timestamp [ns],p_RS_R_x [m],p_RS_R_y,p_RS_R_z," + "q_RS_w [],q_RS_x,q_RS_y,q_RS_z," + "v_RS_R_x,v_RS_R_y,v_RS_R_z,b_w_x,b_w_y,b_w_z,b_a_x,b_a_y,b_a_z\n" + "1403636579763555584,0.0,0.0,0.0,1.0,0.0,0.0,0.0," + "0,0,0,0,0,0,0,0,0\n" + "1403636579813555456,0.05,0.0,0.0,1.0,0.0,0.0,0.0," + "0,0,0,0,0,0,0,0,0\n" + ) + return tmp_path + + +def test_adapter_raises_when_missing(tmp_path: Path): + with pytest.raises(DatasetNotAvailableError): + EuRoCAdapter(tmp_path / "nope") + + +def test_adapter_capabilities(fake_euroc_root: Path): + adapter = EuRoCAdapter(fake_euroc_root) + cap = adapter.capabilities + assert cap.has_raw_imu is True + assert cap.has_rtk_gt is False # EuRoC uses Vicon, not RTK + assert cap.platform_class == PlatformClass.INDOOR + + +def test_adapter_iter_frames(fake_euroc_root: Path): + adapter = EuRoCAdapter(fake_euroc_root) + frames = list(adapter.iter_frames()) + assert len(frames) == 2 + assert frames[0].timestamp_ns == 1403636579763555584 + assert frames[0].image_path.endswith("1403636579763555584.png") + + +def test_adapter_iter_imu(fake_euroc_root: Path): + adapter = EuRoCAdapter(fake_euroc_root) + imu = list(adapter.iter_imu()) + assert len(imu) == 2 + assert imu[0].gyro == (0.01, 0.02, 0.03) + assert imu[0].accel == (0.1, 0.2, -9.7) + + +def test_adapter_iter_ground_truth(fake_euroc_root: Path): + adapter = EuRoCAdapter(fake_euroc_root) + gt = list(adapter.iter_ground_truth()) + assert len(gt) == 2 + assert gt[0].qw == 1.0 + # First pose is the fictitious origin + assert gt[0].lat == pytest.approx(49.0) + assert gt[0].lon == pytest.approx(32.0)