"""Satellite Data Manager (Component F04). SAT-01: Reads pre-loaded tiles from a local z/x/y directory (no live HTTP during flight). SAT-02: Tile selection uses ESKF position ± 3σ_horizontal to define search area. """ import hashlib import logging import math import os from concurrent.futures import ThreadPoolExecutor import cv2 import numpy as np from gps_denied.schemas import GPSPoint from gps_denied.schemas.satellite import TileBounds, TileCoords from gps_denied.utils import mercator class SatelliteDataManager: """Manages satellite tiles from a local pre-loaded directory. Directory layout (SAT-01): {tile_dir}/{zoom}/{x}/{y}.png — standard Web Mercator slippy-map layout No live HTTP requests are made during flight. A separate offline tooling step downloads and stores tiles before the mission. """ _logger = logging.getLogger(__name__) def __init__( self, tile_dir: str = ".satellite_tiles", cache_dir: str = ".satellite_cache", max_size_gb: float = 10.0, ): self.tile_dir = tile_dir self.thread_pool = ThreadPoolExecutor(max_workers=4) # In-memory LRU for hot tiles (avoids repeated disk reads) self._mem_cache: dict[str, np.ndarray] = {} self._mem_cache_max = 256 # SHA-256 manifest for tile integrity (якщо файл існує) self._manifest: dict[str, str] = self._load_manifest() # ------------------------------------------------------------------ # SAT-01: Local tile reads (no HTTP) # ------------------------------------------------------------------ def _load_manifest(self) -> dict[str, str]: """Завантажити SHA-256 manifest з tile_dir/manifest.sha256.""" path = os.path.join(self.tile_dir, "manifest.sha256") if not os.path.isfile(path): return {} manifest: dict[str, str] = {} with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split(maxsplit=1) if len(parts) == 2: manifest[parts[1].strip()] = parts[0].strip() return manifest def _verify_tile_integrity(self, rel_path: str, file_path: str) -> bool: """Перевірити SHA-256 тайла проти manifest (якщо manifest існує).""" if not self._manifest: return True # без manifest — пропускаємо expected = self._manifest.get(rel_path) if expected is None: return True # тайл не в manifest — OK sha = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha.update(chunk) actual = sha.hexdigest() if actual != expected: self._logger.warning("Tile integrity failed: %s (exp %s, got %s)", rel_path, expected[:12], actual[:12]) return False return True def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None: """Load a tile image from the local pre-loaded directory. Expected path: {tile_dir}/{zoom}/{x}/{y}.png Returns None if the file does not exist. """ key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" if key in self._mem_cache: return self._mem_cache[key] rel_path = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}.png" path = os.path.join(self.tile_dir, rel_path) if not os.path.isfile(path): return None if not self._verify_tile_integrity(rel_path, path): return None # тайл пошкоджений img = cv2.imread(path, cv2.IMREAD_COLOR) if img is None: return None # LRU eviction: drop oldest if full if len(self._mem_cache) >= self._mem_cache_max: oldest = next(iter(self._mem_cache)) del self._mem_cache[oldest] self._mem_cache[key] = img return img def save_local_tile(self, tile_coords: TileCoords, image: np.ndarray) -> bool: """Persist a tile to the local directory (used by offline pre-fetch tooling).""" path = os.path.join(self.tile_dir, str(tile_coords.zoom), str(tile_coords.x), f"{tile_coords.y}.png") os.makedirs(os.path.dirname(path), exist_ok=True) ok, encoded = cv2.imencode(".png", image) if not ok: return False with open(path, "wb") as f: f.write(encoded.tobytes()) key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" self._mem_cache[key] = image return True # ------------------------------------------------------------------ # SAT-02: Tile selection for ESKF position ± 3σ_horizontal # ------------------------------------------------------------------ @staticmethod def _meters_to_degrees(meters: float, lat: float) -> tuple[float, float]: """Convert a radius in metres to (Δlat°, Δlon°) at the given latitude.""" delta_lat = meters / 111_320.0 delta_lon = meters / (111_320.0 * math.cos(math.radians(lat))) return delta_lat, delta_lon def select_tiles_for_eskf_position( self, gps: GPSPoint, sigma_h_m: float, zoom: int ) -> list[TileCoords]: """Return all tile coords covering the ESKF position ± 3σ_horizontal area. Args: gps: ESKF best-estimate position. sigma_h_m: 1-σ horizontal uncertainty in metres (from ESKF covariance). zoom: Web Mercator zoom level (18 recommended ≈ 0.6 m/px). """ radius_m = 3.0 * sigma_h_m dlat, dlon = self._meters_to_degrees(radius_m, gps.lat) # Bounding box corners lat_min, lat_max = gps.lat - dlat, gps.lat + dlat lon_min, lon_max = gps.lon - dlon, gps.lon + dlon # Convert corners to tile coords tc_nw = mercator.latlon_to_tile(lat_max, lon_min, zoom) tc_se = mercator.latlon_to_tile(lat_min, lon_max, zoom) tiles: list[TileCoords] = [] for x in range(tc_nw.x, tc_se.x + 1): for y in range(tc_nw.y, tc_se.y + 1): tiles.append(TileCoords(x=x, y=y, zoom=zoom)) return tiles def assemble_mosaic( self, tile_list: list[tuple[TileCoords, np.ndarray]], target_size: int = 512, ) -> tuple[np.ndarray, TileBounds] | None: """Assemble a list of (TileCoords, image) pairs into a single mosaic. Returns (mosaic_image, combined_bounds) or None if tile_list is empty. The mosaic is resized to (target_size × target_size) for the matcher. """ if not tile_list: return None xs = [tc.x for tc, _ in tile_list] ys = [tc.y for tc, _ in tile_list] zoom = tile_list[0][0].zoom x_min, x_max = min(xs), max(xs) y_min, y_max = min(ys), max(ys) cols = x_max - x_min + 1 rows = y_max - y_min + 1 # Determine single-tile pixel size from first image sample = tile_list[0][1] th, tw = sample.shape[:2] canvas = np.zeros((rows * th, cols * tw, 3), dtype=np.uint8) for tc, img in tile_list: col = tc.x - x_min row = tc.y - y_min h, w = img.shape[:2] canvas[row * th: row * th + h, col * tw: col * tw + w] = img mosaic = cv2.resize(canvas, (target_size, target_size), interpolation=cv2.INTER_AREA) # Compute combined GPS bounds nw_bounds = mercator.compute_tile_bounds(TileCoords(x=x_min, y=y_min, zoom=zoom)) se_bounds = mercator.compute_tile_bounds(TileCoords(x=x_max, y=y_max, zoom=zoom)) combined = TileBounds( nw=nw_bounds.nw, ne=GPSPoint(lat=nw_bounds.nw.lat, lon=se_bounds.se.lon), sw=GPSPoint(lat=se_bounds.se.lat, lon=nw_bounds.nw.lon), se=se_bounds.se, center=GPSPoint( lat=(nw_bounds.nw.lat + se_bounds.se.lat) / 2, lon=(nw_bounds.nw.lon + se_bounds.se.lon) / 2, ), gsd=nw_bounds.gsd, ) return mosaic, combined def fetch_tiles_for_position( self, gps: GPSPoint, sigma_h_m: float, zoom: int ) -> tuple[np.ndarray, TileBounds] | None: """High-level helper: select tiles + load + assemble mosaic. Returns (mosaic, bounds) or None if no local tiles are available. """ coords = self.select_tiles_for_eskf_position(gps, sigma_h_m, zoom) loaded: list[tuple[TileCoords, np.ndarray]] = [] for tc in coords: img = self.load_local_tile(tc) if img is not None: loaded.append((tc, img)) return self.assemble_mosaic(loaded) if loaded else None # ------------------------------------------------------------------ # Cache helpers (backward-compat, also used for warm-path caching) # ------------------------------------------------------------------ def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool: """Cache a tile image in memory (used by tests and offline tools).""" key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" self._mem_cache[key] = tile_data return True def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None: """Retrieve a cached tile from memory.""" key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" return self._mem_cache.get(key) # ------------------------------------------------------------------ # Tile math helpers # ------------------------------------------------------------------ def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]: """Return grid_size tiles centered on center.""" if grid_size == 1: return [center] side = int(grid_size ** 0.5) half = side // 2 coords: list[TileCoords] = [] for dy in range(-half, half + 1): for dx in range(-half, half + 1): coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) if grid_size == 4: coords = [] for dy in range(2): for dx in range(2): coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) return coords[:grid_size] def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]: """Return only the NEW tiles when expanding from current_size to new_size grid.""" old_set = {(c.x, c.y) for c in self.get_tile_grid(center, current_size)} return [c for c in self.get_tile_grid(center, new_size) if (c.x, c.y) not in old_set] def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords: return mercator.latlon_to_tile(lat, lon, zoom) def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds: return mercator.compute_tile_bounds(tile_coords) def clear_flight_cache(self, flight_id: str) -> bool: """Clear in-memory cache (flight scoping is tile-key-based).""" self._mem_cache.clear() return True