mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-23 02:46:36 +00:00
test(e2e): add EuRoCAdapter with local fabricated fixture tests
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,115 @@
|
||||
"""Adapter for EuRoC MAV datasets (ETHZ ASL format)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
import numpy as np
|
||||
|
||||
from gps_denied.testing.datasets.base import (
|
||||
DatasetAdapter,
|
||||
DatasetCapabilities,
|
||||
DatasetFrame,
|
||||
DatasetIMU,
|
||||
DatasetNotAvailableError,
|
||||
DatasetPose,
|
||||
PlatformClass,
|
||||
)
|
||||
|
||||
|
||||
EARTH_R = 6_378_137.0
|
||||
FIXED_ORIGIN_LAT = 49.0
|
||||
FIXED_ORIGIN_LON = 32.0
|
||||
|
||||
|
||||
class EuRoCAdapter(DatasetAdapter):
|
||||
"""Reads a single EuRoC sequence directory, e.g. `datasets/euroc/MH_01/`."""
|
||||
|
||||
def __init__(self, root: Path) -> None:
|
||||
self._root = Path(root)
|
||||
self._mav0 = self._root / "mav0"
|
||||
if not self._mav0.is_dir():
|
||||
raise DatasetNotAvailableError(
|
||||
f"EuRoC sequence not found at {self._root} "
|
||||
f"(expected {self._root}/mav0/). "
|
||||
"Run `python scripts/download_dataset.py euroc_mh01` first."
|
||||
)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return f"euroc:{self._root.name}"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> DatasetCapabilities:
|
||||
return DatasetCapabilities(
|
||||
has_raw_imu=True,
|
||||
has_rtk_gt=False, # Vicon, not RTK
|
||||
has_loop_closures=False,
|
||||
platform_class=PlatformClass.INDOOR,
|
||||
)
|
||||
|
||||
def iter_frames(self) -> Iterator[DatasetFrame]:
|
||||
cam_csv = self._mav0 / "cam0" / "data.csv"
|
||||
img_dir = self._mav0 / "cam0" / "data"
|
||||
with cam_csv.open() as fh:
|
||||
reader = csv.reader(fh)
|
||||
idx = 0
|
||||
for row in reader:
|
||||
if not row or row[0].startswith("#"):
|
||||
continue
|
||||
ts_ns = int(row[0])
|
||||
filename = row[1].strip()
|
||||
yield DatasetFrame(
|
||||
frame_idx=idx,
|
||||
timestamp_ns=ts_ns,
|
||||
image_path=str(img_dir / filename),
|
||||
)
|
||||
idx += 1
|
||||
|
||||
def iter_imu(self) -> Iterator[DatasetIMU]:
|
||||
imu_csv = self._mav0 / "imu0" / "data.csv"
|
||||
with imu_csv.open() as fh:
|
||||
reader = csv.reader(fh)
|
||||
for row in reader:
|
||||
if not row or row[0].startswith("#"):
|
||||
continue
|
||||
ts_ns = int(row[0])
|
||||
wx, wy, wz = float(row[1]), float(row[2]), float(row[3])
|
||||
ax, ay, az = float(row[4]), float(row[5]), float(row[6])
|
||||
yield DatasetIMU(
|
||||
timestamp_ns=ts_ns,
|
||||
accel=(ax, ay, az),
|
||||
gyro=(wx, wy, wz),
|
||||
)
|
||||
|
||||
def iter_ground_truth(self) -> Iterator[DatasetPose]:
|
||||
gt_csv = self._mav0 / "state_groundtruth_estimate0" / "data.csv"
|
||||
origin_local: tuple[float, float, float] | None = None
|
||||
with gt_csv.open() as fh:
|
||||
reader = csv.reader(fh)
|
||||
for row in reader:
|
||||
if not row or row[0].startswith("#"):
|
||||
continue
|
||||
ts_ns = int(row[0])
|
||||
x, y, z = float(row[1]), float(row[2]), float(row[3])
|
||||
qw, qx, qy, qz = (
|
||||
float(row[4]), float(row[5]), float(row[6]), float(row[7])
|
||||
)
|
||||
if origin_local is None:
|
||||
origin_local = (x, y, z)
|
||||
de = x - origin_local[0]
|
||||
dn = y - origin_local[1]
|
||||
du = z - origin_local[2]
|
||||
dlat = np.degrees(dn / EARTH_R)
|
||||
dlon = np.degrees(
|
||||
de / (EARTH_R * np.cos(np.radians(FIXED_ORIGIN_LAT)))
|
||||
)
|
||||
yield DatasetPose(
|
||||
timestamp_ns=ts_ns,
|
||||
lat=FIXED_ORIGIN_LAT + dlat,
|
||||
lon=FIXED_ORIGIN_LON + dlon,
|
||||
alt=du,
|
||||
qx=qx, qy=qy, qz=qz, qw=qw,
|
||||
)
|
||||
Reference in New Issue
Block a user