#!/usr/bin/env python3 """Satellite tile downloader for GPS-denied navigation. Downloads Web Mercator satellite tiles for a given bounding box and stores them in the standard z/x/y.png directory layout expected by SatelliteDataManager. Usage: # Dry run (count tiles only) python scripts/download_tiles.py --lat-min 48.5 --lat-max 49.5 --lon-min 31.5 --lon-max 32.5 --dry-run # Download from OpenStreetMap (default, no API key) python scripts/download_tiles.py --lat-min 48.5 --lat-max 49.5 --lon-min 31.5 --lon-max 32.5 # Custom zoom and output dir python scripts/download_tiles.py --lat-min 49.0 --lat-max 49.1 --lon-min 32.0 --lon-max 32.1 \ --zoom 18 --output .satellite_tiles # Google Maps provider (requires API key) python scripts/download_tiles.py --lat-min 49.0 --lat-max 49.1 --lon-min 32.0 --lon-max 32.1 \ --provider google --api-key YOUR_KEY """ from __future__ import annotations import argparse import asyncio import os import sys import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) from gps_denied.schemas.satellite import TileCoords from gps_denied.utils.mercator import latlon_to_tile # Tile provider URL templates _PROVIDERS = { "osm": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", "google": ( "https://maps.googleapis.com/maps/api/staticmap" "?center={lat},{lon}&zoom={z}&size=256x256&maptype=satellite&key={api_key}" ), } _USER_AGENT = "GPS-Denied-Onboard/1.0 (tile prefetch)" def compute_tile_range( lat_min: float, lat_max: float, lon_min: float, lon_max: float, zoom: int ) -> list[TileCoords]: """Compute all tile coordinates within a lat/lon bounding box.""" nw = latlon_to_tile(lat_max, lon_min, zoom) # NW corner se = latlon_to_tile(lat_min, lon_max, zoom) # SE corner tiles: list[TileCoords] = [] for x in range(nw.x, se.x + 1): for y in range(nw.y, se.y + 1): tiles.append(TileCoords(x=x, y=y, zoom=zoom)) return tiles async def download_tiles( tiles: list[TileCoords], output_dir: str, provider: str = "osm", api_key: str = "", max_concurrent: int = 4, delay_s: float = 0.1, ) -> tuple[int, int]: """Download tiles concurrently. Returns (success_count, error_count).""" try: import httpx except ImportError: print("ERROR: httpx required. Install: pip install httpx") return 0, len(tiles) url_template = _PROVIDERS.get(provider, _PROVIDERS["osm"]) semaphore = asyncio.Semaphore(max_concurrent) success = 0 errors = 0 async def fetch_one(client: httpx.AsyncClient, tc: TileCoords) -> bool: nonlocal success, errors out_path = os.path.join(output_dir, str(tc.zoom), str(tc.x), f"{tc.y}.png") if os.path.isfile(out_path): success += 1 return True if provider == "google": from gps_denied.utils.mercator import tile_to_latlon center = tile_to_latlon(tc.x + 0.5, tc.y + 0.5, tc.zoom) url = url_template.format( z=tc.zoom, x=tc.x, y=tc.y, lat=center.lat, lon=center.lon, api_key=api_key, ) else: url = url_template.format(z=tc.zoom, x=tc.x, y=tc.y) async with semaphore: try: resp = await client.get(url) resp.raise_for_status() os.makedirs(os.path.dirname(out_path), exist_ok=True) with open(out_path, "wb") as f: f.write(resp.content) success += 1 await asyncio.sleep(delay_s) return True except Exception as exc: errors += 1 print(f" FAIL {tc.zoom}/{tc.x}/{tc.y}: {exc}") return False async with httpx.AsyncClient( timeout=30.0, headers={"User-Agent": _USER_AGENT}, follow_redirects=True, ) as client: tasks = [fetch_one(client, tc) for tc in tiles] await asyncio.gather(*tasks) return success, errors def main() -> int: parser = argparse.ArgumentParser(description="Download satellite tiles for GPS-denied navigation") parser.add_argument("--lat-min", type=float, required=True) parser.add_argument("--lat-max", type=float, required=True) parser.add_argument("--lon-min", type=float, required=True) parser.add_argument("--lon-max", type=float, required=True) parser.add_argument("--zoom", type=int, default=18) parser.add_argument("--output", default=".satellite_tiles", help="Output directory") parser.add_argument("--provider", choices=["osm", "google"], default="osm") parser.add_argument("--api-key", default="", help="API key (for google provider)") parser.add_argument("--max-concurrent", type=int, default=4) parser.add_argument("--dry-run", action="store_true", help="Only count tiles, don't download") args = parser.parse_args() tiles = compute_tile_range(args.lat_min, args.lat_max, args.lon_min, args.lon_max, args.zoom) # Estimate size: ~30KB per tile est_mb = len(tiles) * 30 / 1024 print(f"Tiles: {len(tiles)} at zoom {args.zoom}") print(f"Bounding box: ({args.lat_min}, {args.lon_min}) → ({args.lat_max}, {args.lon_max})") print(f"Estimated size: ~{est_mb:.1f} MB") print(f"Output: {args.output}") if args.dry_run: print("\n--dry-run: no tiles downloaded.") return 0 t0 = time.time() ok, err = asyncio.run(download_tiles( tiles, args.output, provider=args.provider, api_key=args.api_key, max_concurrent=args.max_concurrent, )) elapsed = time.time() - t0 print(f"\nDone in {elapsed:.1f}s: {ok} downloaded, {err} errors") return 1 if err > 0 else 0 if __name__ == "__main__": sys.exit(main())