mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 09:16:38 +00:00
test(e2e): add SHA256-verified dataset downloader + EuRoC registry entry
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,28 @@
|
|||||||
|
"""CLI: python scripts/download_dataset.py euroc_mh01"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from gps_denied.testing.download import DATASET_REGISTRY, download_dataset
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str]) -> int:
|
||||||
|
if len(argv) != 2:
|
||||||
|
print(f"usage: {argv[0]} <dataset_name>", file=sys.stderr)
|
||||||
|
print(f"available: {', '.join(DATASET_REGISTRY)}", file=sys.stderr)
|
||||||
|
return 2
|
||||||
|
name = argv[1]
|
||||||
|
if name not in DATASET_REGISTRY:
|
||||||
|
print(f"unknown dataset: {name}", file=sys.stderr)
|
||||||
|
return 2
|
||||||
|
spec = DATASET_REGISTRY[name]
|
||||||
|
root = Path("datasets") / spec.target_subdir
|
||||||
|
dest = root / Path(spec.url).name
|
||||||
|
print(f"→ {dest}")
|
||||||
|
download_dataset(spec, dest)
|
||||||
|
print("OK")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main(sys.argv))
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
"""Dataset downloader — registry + SHA256-verified fetch."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import urllib.request
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DatasetSpec:
|
||||||
|
url: str
|
||||||
|
sha256: str
|
||||||
|
target_subdir: str
|
||||||
|
unpack: bool = True
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE ON SHA256 VALUES BELOW:
|
||||||
|
# The EuRoC MH_01 hash is a placeholder of 64 zeros. Before the first real
|
||||||
|
# download the engineer MUST:
|
||||||
|
# 1. Manually fetch the zip:
|
||||||
|
# curl -L <url from spec> -o /tmp/MH_01_easy.zip
|
||||||
|
# 2. Compute its hash:
|
||||||
|
# sha256sum /tmp/MH_01_easy.zip
|
||||||
|
# 3. Replace the placeholder here with the real hex string.
|
||||||
|
# 4. Commit the updated hash alongside the first real test run.
|
||||||
|
# Leaving a length-valid (64-char) placeholder keeps the registry well-formed
|
||||||
|
# and lets the download function refuse to keep any file that doesn't match,
|
||||||
|
# so the placeholder state fails loudly rather than silently accepting.
|
||||||
|
DATASET_REGISTRY: dict[str, DatasetSpec] = {
|
||||||
|
"euroc_mh01": DatasetSpec(
|
||||||
|
url=(
|
||||||
|
"http://robotics.ethz.ch/~asl-datasets/ijrr_euroc_mav_dataset"
|
||||||
|
"/machine_hall/MH_01_easy/MH_01_easy.zip"
|
||||||
|
),
|
||||||
|
sha256="0" * 64, # placeholder — see note above
|
||||||
|
target_subdir="euroc/MH_01",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def verify_sha256(path: Path, expected_hex: str) -> bool:
|
||||||
|
"""Return True iff SHA256 of `path` hex-matches `expected_hex`."""
|
||||||
|
h = hashlib.sha256()
|
||||||
|
with path.open("rb") as fh:
|
||||||
|
for chunk in iter(lambda: fh.read(1024 * 1024), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
return h.hexdigest() == expected_hex
|
||||||
|
|
||||||
|
|
||||||
|
def download_dataset(spec: DatasetSpec, dest: Path) -> Path:
|
||||||
|
"""Download `spec.url` to `dest` if not already present and valid.
|
||||||
|
|
||||||
|
If `dest` already exists and its SHA256 matches `spec.sha256`, return
|
||||||
|
immediately without touching the network. Otherwise download, verify,
|
||||||
|
and raise RuntimeError on hash mismatch (deleting the bad file).
|
||||||
|
"""
|
||||||
|
if dest.exists() and verify_sha256(dest, spec.sha256):
|
||||||
|
return dest
|
||||||
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = dest.with_suffix(dest.suffix + ".part")
|
||||||
|
urllib.request.urlretrieve(spec.url, tmp)
|
||||||
|
tmp.rename(dest)
|
||||||
|
if not verify_sha256(dest, spec.sha256):
|
||||||
|
dest.unlink()
|
||||||
|
raise RuntimeError(f"SHA256 mismatch after download of {spec.url}")
|
||||||
|
return dest
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
"""Dataset downloader — URL registry + SHA256 verification."""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gps_denied.testing.download import (
|
||||||
|
DATASET_REGISTRY,
|
||||||
|
DatasetSpec,
|
||||||
|
verify_sha256,
|
||||||
|
download_dataset,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_registry_has_euroc():
|
||||||
|
assert "euroc_mh01" in DATASET_REGISTRY
|
||||||
|
spec = DATASET_REGISTRY["euroc_mh01"]
|
||||||
|
assert isinstance(spec, DatasetSpec)
|
||||||
|
assert spec.url.startswith("http")
|
||||||
|
assert len(spec.sha256) == 64
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_sha256_matches(tmp_path: Path):
|
||||||
|
data = b"hello world"
|
||||||
|
f = tmp_path / "x.bin"
|
||||||
|
f.write_bytes(data)
|
||||||
|
expected = hashlib.sha256(data).hexdigest()
|
||||||
|
assert verify_sha256(f, expected) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_sha256_mismatch(tmp_path: Path):
|
||||||
|
f = tmp_path / "x.bin"
|
||||||
|
f.write_bytes(b"hello world")
|
||||||
|
assert verify_sha256(f, "0" * 64) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_skip_if_present(tmp_path: Path, monkeypatch):
|
||||||
|
f = tmp_path / "cached.zip"
|
||||||
|
f.write_bytes(b"cached")
|
||||||
|
spec = DatasetSpec(
|
||||||
|
url="http://example.invalid/cached.zip",
|
||||||
|
sha256=hashlib.sha256(b"cached").hexdigest(),
|
||||||
|
target_subdir="cached",
|
||||||
|
)
|
||||||
|
# Should return the path without hitting the network
|
||||||
|
called = {"n": 0}
|
||||||
|
|
||||||
|
def fake_get(*args, **kwargs):
|
||||||
|
called["n"] += 1
|
||||||
|
raise AssertionError("download should have been skipped")
|
||||||
|
|
||||||
|
monkeypatch.setattr("urllib.request.urlretrieve", fake_get)
|
||||||
|
result = download_dataset(spec, f)
|
||||||
|
assert result == f
|
||||||
|
assert called["n"] == 0
|
||||||
Reference in New Issue
Block a user