From 4c657707025f8e033374de13f7a4a7506026b834 Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Mon, 11 May 2026 08:49:32 +0300 Subject: [PATCH] refactor(01-05): migrate satellite+metric to satellite_matcher component - Move SatelliteDataManager impl to components/satellite_matcher/local_tile_loader.py - Move MetricRefinement impl to components/satellite_matcher/metric_refinement.py - MetricRefinement imports IMetricRefinement from protocol.py (no ABC copy) - Replace core/satellite.py and core/metric.py with re-export shims - Update satellite_matcher __init__.py to export both classes + protocols - 216/216 tests pass (regression floor maintained) --- .../components/satellite_matcher/__init__.py | 13 + .../satellite_matcher/local_tile_loader.py | 283 +++++++++++++++++ .../satellite_matcher/metric_refinement.py | 190 ++++++++++++ src/gps_denied/core/metric.py | 224 +------------- src/gps_denied/core/satellite.py | 291 +----------------- 5 files changed, 500 insertions(+), 501 deletions(-) create mode 100644 src/gps_denied/components/satellite_matcher/local_tile_loader.py create mode 100644 src/gps_denied/components/satellite_matcher/metric_refinement.py diff --git a/src/gps_denied/components/satellite_matcher/__init__.py b/src/gps_denied/components/satellite_matcher/__init__.py index e69de29..13e839f 100644 --- a/src/gps_denied/components/satellite_matcher/__init__.py +++ b/src/gps_denied/components/satellite_matcher/__init__.py @@ -0,0 +1,13 @@ +"""satellite_matcher component public API.""" + +from .local_tile_loader import SatelliteDataManager +from .metric_refinement import MetricRefinement +from .protocol import IMetricRefinement, MetricRefiner, SatelliteTileLoader + +__all__ = [ + "SatelliteDataManager", + "MetricRefinement", + "IMetricRefinement", + "MetricRefiner", + "SatelliteTileLoader", +] diff --git a/src/gps_denied/components/satellite_matcher/local_tile_loader.py b/src/gps_denied/components/satellite_matcher/local_tile_loader.py new file mode 100644 index 0000000..1ebf607 --- /dev/null +++ b/src/gps_denied/components/satellite_matcher/local_tile_loader.py @@ -0,0 +1,283 @@ +"""Local-disk tile loader (SAT-01/02). Phase 1 home of the existing SatelliteDataManager impl.""" + +import hashlib +import logging +import math +import os +from concurrent.futures import ThreadPoolExecutor + +import cv2 +import numpy as np + +from gps_denied.schemas import GPSPoint +from gps_denied.schemas.satellite import TileBounds, TileCoords +from gps_denied.utils import mercator + + +class SatelliteDataManager: + """Manages satellite tiles from a local pre-loaded directory. + + Directory layout (SAT-01): + {tile_dir}/{zoom}/{x}/{y}.png — standard Web Mercator slippy-map layout + + No live HTTP requests are made during flight. A separate offline tooling step + downloads and stores tiles before the mission. + """ + + _logger = logging.getLogger(__name__) + + def __init__( + self, + tile_dir: str = ".satellite_tiles", + cache_dir: str = ".satellite_cache", + max_size_gb: float = 10.0, + ): + self.tile_dir = tile_dir + self.thread_pool = ThreadPoolExecutor(max_workers=4) + # In-memory LRU for hot tiles (avoids repeated disk reads) + self._mem_cache: dict[str, np.ndarray] = {} + self._mem_cache_max = 256 + # SHA-256 manifest for tile integrity (якщо файл існує) + self._manifest: dict[str, str] = self._load_manifest() + + # ------------------------------------------------------------------ + # SAT-01: Local tile reads (no HTTP) + # ------------------------------------------------------------------ + + def _load_manifest(self) -> dict[str, str]: + """Завантажити SHA-256 manifest з tile_dir/manifest.sha256.""" + path = os.path.join(self.tile_dir, "manifest.sha256") + if not os.path.isfile(path): + return {} + manifest: dict[str, str] = {} + with open(path) as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split(maxsplit=1) + if len(parts) == 2: + manifest[parts[1].strip()] = parts[0].strip() + return manifest + + def _verify_tile_integrity(self, rel_path: str, file_path: str) -> bool: + """Перевірити SHA-256 тайла проти manifest (якщо manifest існує).""" + if not self._manifest: + return True # без manifest — пропускаємо + expected = self._manifest.get(rel_path) + if expected is None: + return True # тайл не в manifest — OK + sha = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha.update(chunk) + actual = sha.hexdigest() + if actual != expected: + self._logger.warning("Tile integrity failed: %s (exp %s, got %s)", + rel_path, expected[:12], actual[:12]) + return False + return True + + def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None: + """Load a tile image from the local pre-loaded directory. + + Expected path: {tile_dir}/{zoom}/{x}/{y}.png + Returns None if the file does not exist. + """ + key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" + if key in self._mem_cache: + return self._mem_cache[key] + + rel_path = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}.png" + path = os.path.join(self.tile_dir, rel_path) + if not os.path.isfile(path): + return None + + if not self._verify_tile_integrity(rel_path, path): + return None # тайл пошкоджений + + img = cv2.imread(path, cv2.IMREAD_COLOR) + if img is None: + return None + + # LRU eviction: drop oldest if full + if len(self._mem_cache) >= self._mem_cache_max: + oldest = next(iter(self._mem_cache)) + del self._mem_cache[oldest] + self._mem_cache[key] = img + return img + + def save_local_tile(self, tile_coords: TileCoords, image: np.ndarray) -> bool: + """Persist a tile to the local directory (used by offline pre-fetch tooling).""" + path = os.path.join(self.tile_dir, str(tile_coords.zoom), + str(tile_coords.x), f"{tile_coords.y}.png") + os.makedirs(os.path.dirname(path), exist_ok=True) + ok, encoded = cv2.imencode(".png", image) + if not ok: + return False + with open(path, "wb") as f: + f.write(encoded.tobytes()) + key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" + self._mem_cache[key] = image + return True + + # ------------------------------------------------------------------ + # SAT-02: Tile selection for ESKF position ± 3σ_horizontal + # ------------------------------------------------------------------ + + @staticmethod + def _meters_to_degrees(meters: float, lat: float) -> tuple[float, float]: + """Convert a radius in metres to (Δlat°, Δlon°) at the given latitude.""" + delta_lat = meters / 111_320.0 + delta_lon = meters / (111_320.0 * math.cos(math.radians(lat))) + return delta_lat, delta_lon + + def select_tiles_for_eskf_position( + self, gps: GPSPoint, sigma_h_m: float, zoom: int + ) -> list[TileCoords]: + """Return all tile coords covering the ESKF position ± 3σ_horizontal area. + + Args: + gps: ESKF best-estimate position. + sigma_h_m: 1-σ horizontal uncertainty in metres (from ESKF covariance). + zoom: Web Mercator zoom level (18 recommended ≈ 0.6 m/px). + """ + radius_m = 3.0 * sigma_h_m + dlat, dlon = self._meters_to_degrees(radius_m, gps.lat) + + # Bounding box corners + lat_min, lat_max = gps.lat - dlat, gps.lat + dlat + lon_min, lon_max = gps.lon - dlon, gps.lon + dlon + + # Convert corners to tile coords + tc_nw = mercator.latlon_to_tile(lat_max, lon_min, zoom) + tc_se = mercator.latlon_to_tile(lat_min, lon_max, zoom) + + tiles: list[TileCoords] = [] + for x in range(tc_nw.x, tc_se.x + 1): + for y in range(tc_nw.y, tc_se.y + 1): + tiles.append(TileCoords(x=x, y=y, zoom=zoom)) + return tiles + + def assemble_mosaic( + self, + tile_list: list[tuple[TileCoords, np.ndarray]], + target_size: int = 512, + ) -> tuple[np.ndarray, TileBounds] | None: + """Assemble a list of (TileCoords, image) pairs into a single mosaic. + + Returns (mosaic_image, combined_bounds) or None if tile_list is empty. + The mosaic is resized to (target_size × target_size) for the matcher. + """ + if not tile_list: + return None + + xs = [tc.x for tc, _ in tile_list] + ys = [tc.y for tc, _ in tile_list] + zoom = tile_list[0][0].zoom + + x_min, x_max = min(xs), max(xs) + y_min, y_max = min(ys), max(ys) + + cols = x_max - x_min + 1 + rows = y_max - y_min + 1 + + # Determine single-tile pixel size from first image + sample = tile_list[0][1] + th, tw = sample.shape[:2] + + canvas = np.zeros((rows * th, cols * tw, 3), dtype=np.uint8) + for tc, img in tile_list: + col = tc.x - x_min + row = tc.y - y_min + h, w = img.shape[:2] + canvas[row * th: row * th + h, col * tw: col * tw + w] = img + + mosaic = cv2.resize(canvas, (target_size, target_size), interpolation=cv2.INTER_AREA) + + # Compute combined GPS bounds + nw_bounds = mercator.compute_tile_bounds(TileCoords(x=x_min, y=y_min, zoom=zoom)) + se_bounds = mercator.compute_tile_bounds(TileCoords(x=x_max, y=y_max, zoom=zoom)) + combined = TileBounds( + nw=nw_bounds.nw, + ne=GPSPoint(lat=nw_bounds.nw.lat, lon=se_bounds.se.lon), + sw=GPSPoint(lat=se_bounds.se.lat, lon=nw_bounds.nw.lon), + se=se_bounds.se, + center=GPSPoint( + lat=(nw_bounds.nw.lat + se_bounds.se.lat) / 2, + lon=(nw_bounds.nw.lon + se_bounds.se.lon) / 2, + ), + gsd=nw_bounds.gsd, + ) + return mosaic, combined + + def fetch_tiles_for_position( + self, gps: GPSPoint, sigma_h_m: float, zoom: int + ) -> tuple[np.ndarray, TileBounds] | None: + """High-level helper: select tiles + load + assemble mosaic. + + Returns (mosaic, bounds) or None if no local tiles are available. + """ + coords = self.select_tiles_for_eskf_position(gps, sigma_h_m, zoom) + loaded: list[tuple[TileCoords, np.ndarray]] = [] + for tc in coords: + img = self.load_local_tile(tc) + if img is not None: + loaded.append((tc, img)) + return self.assemble_mosaic(loaded) if loaded else None + + # ------------------------------------------------------------------ + # Cache helpers (backward-compat, also used for warm-path caching) + # ------------------------------------------------------------------ + + def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool: + """Cache a tile image in memory (used by tests and offline tools).""" + key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" + self._mem_cache[key] = tile_data + return True + + def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None: + """Retrieve a cached tile from memory.""" + key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" + return self._mem_cache.get(key) + + # ------------------------------------------------------------------ + # Tile math helpers + # ------------------------------------------------------------------ + + def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]: + """Return grid_size tiles centered on center.""" + if grid_size == 1: + return [center] + + side = int(grid_size ** 0.5) + half = side // 2 + + coords: list[TileCoords] = [] + for dy in range(-half, half + 1): + for dx in range(-half, half + 1): + coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) + + if grid_size == 4: + coords = [] + for dy in range(2): + for dx in range(2): + coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) + + return coords[:grid_size] + + def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]: + """Return only the NEW tiles when expanding from current_size to new_size grid.""" + old_set = {(c.x, c.y) for c in self.get_tile_grid(center, current_size)} + return [c for c in self.get_tile_grid(center, new_size) if (c.x, c.y) not in old_set] + + def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords: + return mercator.latlon_to_tile(lat, lon, zoom) + + def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds: + return mercator.compute_tile_bounds(tile_coords) + + def clear_flight_cache(self, flight_id: str) -> bool: + """Clear in-memory cache (flight scoping is tile-key-based).""" + self._mem_cache.clear() + return True diff --git a/src/gps_denied/components/satellite_matcher/metric_refinement.py b/src/gps_denied/components/satellite_matcher/metric_refinement.py new file mode 100644 index 0000000..aeb8cd9 --- /dev/null +++ b/src/gps_denied/components/satellite_matcher/metric_refinement.py @@ -0,0 +1,190 @@ +"""Metric Refinement implementation (SAT-03/04). Phase 1 home of MetricRefinement impl. + +SAT-03: GSD normalization — downsample camera frame to satellite resolution. +SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio. +""" + +import logging +from typing import List, Optional, Tuple + +import cv2 +import numpy as np + +from gps_denied.components.satellite_matcher.protocol import IMetricRefinement +from gps_denied.core.models import IModelManager +from gps_denied.schemas import GPSPoint +from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform +from gps_denied.schemas.satellite import TileBounds + +logger = logging.getLogger(__name__) + + +class MetricRefinement(IMetricRefinement): + """LiteSAM/XFeat-based alignment with GSD normalization. + + SAT-03: normalize_gsd() downsamples UAV frame to match satellite GSD before matching. + SAT-04: confidence is computed as inlier_count / total_correspondences (inlier ratio). + """ + + def __init__(self, model_manager: IModelManager): + self.model_manager = model_manager + + # ------------------------------------------------------------------ + # SAT-03: GSD normalization + # ------------------------------------------------------------------ + + @staticmethod + def normalize_gsd( + uav_image: np.ndarray, + uav_gsd_mpp: float, + sat_gsd_mpp: float, + ) -> np.ndarray: + """Resize UAV frame to match satellite GSD (meters-per-pixel). + + Args: + uav_image: Raw UAV camera frame. + uav_gsd_mpp: UAV GSD in m/px (e.g. 0.159 at 600 m altitude). + sat_gsd_mpp: Satellite tile GSD in m/px (e.g. 0.6 at zoom 18). + + Returns: + Resized image. If already coarser than satellite, returned unchanged. + """ + if uav_gsd_mpp <= 0 or sat_gsd_mpp <= 0: + return uav_image + scale = uav_gsd_mpp / sat_gsd_mpp + if scale >= 1.0: + return uav_image # UAV already coarser, nothing to do + h, w = uav_image.shape[:2] + new_w = max(1, int(w * scale)) + new_h = max(1, int(h * scale)) + return cv2.resize(uav_image, (new_w, new_h), interpolation=cv2.INTER_AREA) + + def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: + engine = self.model_manager.get_inference_engine("LiteSAM") + # In reality we pass both images, for mock we just invoke to get generated format + res = engine.infer({"img1": uav_image, "img2": satellite_tile}) + + if res["inlier_count"] < 15: + return None + + return res["homography"] + + def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: + # UAV image center + cx, cy = image_center + # Apply homography + pt = np.array([cx, cy, 1.0]) + # transformed = H * pt + transformed = homography @ pt + transformed = transformed / transformed[2] + + tx, ty = transformed[0], transformed[1] + + # Approximate GPS mapping using bounds + # ty maps to latitude (ty=0 is North, ty=Height is South) + # tx maps to longitude (tx=0 is West, tx=Width is East) + # We assume standard 256x256 tiles for this mock calculation + tile_size = 256.0 + + lat_span = tile_bounds.nw.lat - tile_bounds.sw.lat + lon_span = tile_bounds.ne.lon - tile_bounds.nw.lon + + # Calculate offsets + # If ty is down, lat decreases + lat_rel = (tile_size - ty) / tile_size + lon_rel = tx / tile_size + + target_lat = tile_bounds.sw.lat + (lat_span * lat_rel) + target_lon = tile_bounds.nw.lon + (lon_span * lon_rel) + + return GPSPoint(lat=target_lat, lon=target_lon) + + def align_to_satellite( + self, + uav_image: np.ndarray, + satellite_tile: np.ndarray, + tile_bounds: TileBounds, + uav_gsd_mpp: float = 0.0, + ) -> Optional[AlignmentResult]: + """Align UAV frame to satellite tile. + + Args: + uav_gsd_mpp: If > 0, the UAV frame is GSD-normalised to satellite + resolution before matching (SAT-03). + """ + # SAT-03: optional GSD normalization + sat_gsd = tile_bounds.gsd + if uav_gsd_mpp > 0 and sat_gsd > 0: + uav_image = self.normalize_gsd(uav_image, uav_gsd_mpp, sat_gsd) + + engine = self.model_manager.get_inference_engine("LiteSAM") + res = engine.infer({"img1": uav_image, "img2": satellite_tile}) + + if res["inlier_count"] < 15: + return None + + h, w = uav_image.shape[:2] if hasattr(uav_image, "shape") else (480, 640) + gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2)) + + # SAT-04: confidence = inlier_ratio (not raw engine confidence) + total = res.get("total_correspondences", max(res["inlier_count"], 1)) + inlier_ratio = res["inlier_count"] / max(total, 1) + + align = AlignmentResult( + matched=True, + homography=res["homography"], + gps_center=gps, + confidence=inlier_ratio, + inlier_count=res["inlier_count"], + total_correspondences=total, + reprojection_error=res.get("reprojection_error", 1.0), + ) + return align if self.compute_match_confidence(align) > 0.5 else None + + def compute_match_confidence(self, alignment: AlignmentResult) -> float: + # Complex heuristic combining inliers, reprojection error + score = alignment.confidence + # Penalty for high reproj error + if alignment.reprojection_error > 2.0: + score -= 0.2 + return max(0.0, min(1.0, score)) + + def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: + # Aggregate logic is complex, for mock we just use the first image's match + if not chunk_images: + return None + return self.compute_homography(chunk_images[0], satellite_tile) + + def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: + if not chunk_images: + return None + + engine = self.model_manager.get_inference_engine("LiteSAM") + res = engine.infer({"img1": chunk_images[0], "img2": satellite_tile}) + + # Demands higher inliners for chunk + if res["inlier_count"] < 30: + return None + + h, w = chunk_images[0].shape[:2] if hasattr(chunk_images[0], "shape") else (480, 640) + gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2)) + + # Fake sim3 + sim3 = Sim3Transform( + translation=np.array([10., 0., 0.]), + rotation=np.eye(3), + scale=1.0 + ) + + chunk_align = ChunkAlignmentResult( + matched=True, + chunk_id="chunk1", + chunk_center_gps=gps, + rotation_angle=0.0, + confidence=res["confidence"], + inlier_count=res["inlier_count"], + transform=sim3, + reprojection_error=1.0 + ) + + return chunk_align diff --git a/src/gps_denied/core/metric.py b/src/gps_denied/core/metric.py index a0d26c1..23d93ef 100644 --- a/src/gps_denied/core/metric.py +++ b/src/gps_denied/core/metric.py @@ -1,216 +1,10 @@ -"""Metric Refinement (Component F09). +"""Legacy import path. Phase 1 shim — code lives in components/satellite_matcher/.""" +from gps_denied.components.satellite_matcher.protocol import ( # noqa: F401 + MetricRefiner, + IMetricRefinement, +) +from gps_denied.components.satellite_matcher.metric_refinement import ( # noqa: F401 + MetricRefinement, +) -SAT-03: GSD normalization — downsample camera frame to satellite resolution. -SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio. -""" - -import logging -from abc import ABC, abstractmethod -from typing import List, Optional, Tuple - -import cv2 -import numpy as np - -from gps_denied.core.models import IModelManager -from gps_denied.schemas import GPSPoint -from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform -from gps_denied.schemas.satellite import TileBounds - -logger = logging.getLogger(__name__) - - -class IMetricRefinement(ABC): - @abstractmethod - def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]: - pass - - @abstractmethod - def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: - pass - - @abstractmethod - def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: - pass - - @abstractmethod - def compute_match_confidence(self, alignment: AlignmentResult) -> float: - pass - - @abstractmethod - def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: - pass - - @abstractmethod - def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: - pass - - -class MetricRefinement(IMetricRefinement): - """LiteSAM/XFeat-based alignment with GSD normalization. - - SAT-03: normalize_gsd() downsamples UAV frame to match satellite GSD before matching. - SAT-04: confidence is computed as inlier_count / total_correspondences (inlier ratio). - """ - - def __init__(self, model_manager: IModelManager): - self.model_manager = model_manager - - # ------------------------------------------------------------------ - # SAT-03: GSD normalization - # ------------------------------------------------------------------ - - @staticmethod - def normalize_gsd( - uav_image: np.ndarray, - uav_gsd_mpp: float, - sat_gsd_mpp: float, - ) -> np.ndarray: - """Resize UAV frame to match satellite GSD (meters-per-pixel). - - Args: - uav_image: Raw UAV camera frame. - uav_gsd_mpp: UAV GSD in m/px (e.g. 0.159 at 600 m altitude). - sat_gsd_mpp: Satellite tile GSD in m/px (e.g. 0.6 at zoom 18). - - Returns: - Resized image. If already coarser than satellite, returned unchanged. - """ - if uav_gsd_mpp <= 0 or sat_gsd_mpp <= 0: - return uav_image - scale = uav_gsd_mpp / sat_gsd_mpp - if scale >= 1.0: - return uav_image # UAV already coarser, nothing to do - h, w = uav_image.shape[:2] - new_w = max(1, int(w * scale)) - new_h = max(1, int(h * scale)) - return cv2.resize(uav_image, (new_w, new_h), interpolation=cv2.INTER_AREA) - - def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: - engine = self.model_manager.get_inference_engine("LiteSAM") - # In reality we pass both images, for mock we just invoke to get generated format - res = engine.infer({"img1": uav_image, "img2": satellite_tile}) - - if res["inlier_count"] < 15: - return None - - return res["homography"] - - def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: - # UAV image center - cx, cy = image_center - # Apply homography - pt = np.array([cx, cy, 1.0]) - # transformed = H * pt - transformed = homography @ pt - transformed = transformed / transformed[2] - - tx, ty = transformed[0], transformed[1] - - # Approximate GPS mapping using bounds - # ty maps to latitude (ty=0 is North, ty=Height is South) - # tx maps to longitude (tx=0 is West, tx=Width is East) - # We assume standard 256x256 tiles for this mock calculation - tile_size = 256.0 - - lat_span = tile_bounds.nw.lat - tile_bounds.sw.lat - lon_span = tile_bounds.ne.lon - tile_bounds.nw.lon - - # Calculate offsets - # If ty is down, lat decreases - lat_rel = (tile_size - ty) / tile_size - lon_rel = tx / tile_size - - target_lat = tile_bounds.sw.lat + (lat_span * lat_rel) - target_lon = tile_bounds.nw.lon + (lon_span * lon_rel) - - return GPSPoint(lat=target_lat, lon=target_lon) - - def align_to_satellite( - self, - uav_image: np.ndarray, - satellite_tile: np.ndarray, - tile_bounds: TileBounds, - uav_gsd_mpp: float = 0.0, - ) -> Optional[AlignmentResult]: - """Align UAV frame to satellite tile. - - Args: - uav_gsd_mpp: If > 0, the UAV frame is GSD-normalised to satellite - resolution before matching (SAT-03). - """ - # SAT-03: optional GSD normalization - sat_gsd = tile_bounds.gsd - if uav_gsd_mpp > 0 and sat_gsd > 0: - uav_image = self.normalize_gsd(uav_image, uav_gsd_mpp, sat_gsd) - - engine = self.model_manager.get_inference_engine("LiteSAM") - res = engine.infer({"img1": uav_image, "img2": satellite_tile}) - - if res["inlier_count"] < 15: - return None - - h, w = uav_image.shape[:2] if hasattr(uav_image, "shape") else (480, 640) - gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2)) - - # SAT-04: confidence = inlier_ratio (not raw engine confidence) - total = res.get("total_correspondences", max(res["inlier_count"], 1)) - inlier_ratio = res["inlier_count"] / max(total, 1) - - align = AlignmentResult( - matched=True, - homography=res["homography"], - gps_center=gps, - confidence=inlier_ratio, - inlier_count=res["inlier_count"], - total_correspondences=total, - reprojection_error=res.get("reprojection_error", 1.0), - ) - return align if self.compute_match_confidence(align) > 0.5 else None - - def compute_match_confidence(self, alignment: AlignmentResult) -> float: - # Complex heuristic combining inliers, reprojection error - score = alignment.confidence - # Penalty for high reproj error - if alignment.reprojection_error > 2.0: - score -= 0.2 - return max(0.0, min(1.0, score)) - - def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: - # Aggregate logic is complex, for mock we just use the first image's match - if not chunk_images: - return None - return self.compute_homography(chunk_images[0], satellite_tile) - - def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: - if not chunk_images: - return None - - engine = self.model_manager.get_inference_engine("LiteSAM") - res = engine.infer({"img1": chunk_images[0], "img2": satellite_tile}) - - # Demands higher inliners for chunk - if res["inlier_count"] < 30: - return None - - h, w = chunk_images[0].shape[:2] if hasattr(chunk_images[0], "shape") else (480, 640) - gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2)) - - # Fake sim3 - sim3 = Sim3Transform( - translation=np.array([10., 0., 0.]), - rotation=np.eye(3), - scale=1.0 - ) - - chunk_align = ChunkAlignmentResult( - matched=True, - chunk_id="chunk1", - chunk_center_gps=gps, - rotation_angle=0.0, - confidence=res["confidence"], - inlier_count=res["inlier_count"], - transform=sim3, - reprojection_error=1.0 - ) - - return chunk_align +__all__ = ["MetricRefinement", "IMetricRefinement", "MetricRefiner"] diff --git a/src/gps_denied/core/satellite.py b/src/gps_denied/core/satellite.py index 9f9d76e..0ded6e7 100644 --- a/src/gps_denied/core/satellite.py +++ b/src/gps_denied/core/satellite.py @@ -1,287 +1,6 @@ -"""Satellite Data Manager (Component F04). +"""Legacy import path. Phase 1 shim — code lives in components/satellite_matcher/.""" +from gps_denied.components.satellite_matcher.local_tile_loader import ( # noqa: F401 + SatelliteDataManager, +) -SAT-01: Reads pre-loaded tiles from a local z/x/y directory (no live HTTP during flight). -SAT-02: Tile selection uses ESKF position ± 3σ_horizontal to define search area. -""" - -import hashlib -import logging -import math -import os -from concurrent.futures import ThreadPoolExecutor - -import cv2 -import numpy as np - -from gps_denied.schemas import GPSPoint -from gps_denied.schemas.satellite import TileBounds, TileCoords -from gps_denied.utils import mercator - - -class SatelliteDataManager: - """Manages satellite tiles from a local pre-loaded directory. - - Directory layout (SAT-01): - {tile_dir}/{zoom}/{x}/{y}.png — standard Web Mercator slippy-map layout - - No live HTTP requests are made during flight. A separate offline tooling step - downloads and stores tiles before the mission. - """ - - _logger = logging.getLogger(__name__) - - def __init__( - self, - tile_dir: str = ".satellite_tiles", - cache_dir: str = ".satellite_cache", - max_size_gb: float = 10.0, - ): - self.tile_dir = tile_dir - self.thread_pool = ThreadPoolExecutor(max_workers=4) - # In-memory LRU for hot tiles (avoids repeated disk reads) - self._mem_cache: dict[str, np.ndarray] = {} - self._mem_cache_max = 256 - # SHA-256 manifest for tile integrity (якщо файл існує) - self._manifest: dict[str, str] = self._load_manifest() - - # ------------------------------------------------------------------ - # SAT-01: Local tile reads (no HTTP) - # ------------------------------------------------------------------ - - def _load_manifest(self) -> dict[str, str]: - """Завантажити SHA-256 manifest з tile_dir/manifest.sha256.""" - path = os.path.join(self.tile_dir, "manifest.sha256") - if not os.path.isfile(path): - return {} - manifest: dict[str, str] = {} - with open(path) as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - parts = line.split(maxsplit=1) - if len(parts) == 2: - manifest[parts[1].strip()] = parts[0].strip() - return manifest - - def _verify_tile_integrity(self, rel_path: str, file_path: str) -> bool: - """Перевірити SHA-256 тайла проти manifest (якщо manifest існує).""" - if not self._manifest: - return True # без manifest — пропускаємо - expected = self._manifest.get(rel_path) - if expected is None: - return True # тайл не в manifest — OK - sha = hashlib.sha256() - with open(file_path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - sha.update(chunk) - actual = sha.hexdigest() - if actual != expected: - self._logger.warning("Tile integrity failed: %s (exp %s, got %s)", - rel_path, expected[:12], actual[:12]) - return False - return True - - def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None: - """Load a tile image from the local pre-loaded directory. - - Expected path: {tile_dir}/{zoom}/{x}/{y}.png - Returns None if the file does not exist. - """ - key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" - if key in self._mem_cache: - return self._mem_cache[key] - - rel_path = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}.png" - path = os.path.join(self.tile_dir, rel_path) - if not os.path.isfile(path): - return None - - if not self._verify_tile_integrity(rel_path, path): - return None # тайл пошкоджений - - img = cv2.imread(path, cv2.IMREAD_COLOR) - if img is None: - return None - - # LRU eviction: drop oldest if full - if len(self._mem_cache) >= self._mem_cache_max: - oldest = next(iter(self._mem_cache)) - del self._mem_cache[oldest] - self._mem_cache[key] = img - return img - - def save_local_tile(self, tile_coords: TileCoords, image: np.ndarray) -> bool: - """Persist a tile to the local directory (used by offline pre-fetch tooling).""" - path = os.path.join(self.tile_dir, str(tile_coords.zoom), - str(tile_coords.x), f"{tile_coords.y}.png") - os.makedirs(os.path.dirname(path), exist_ok=True) - ok, encoded = cv2.imencode(".png", image) - if not ok: - return False - with open(path, "wb") as f: - f.write(encoded.tobytes()) - key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" - self._mem_cache[key] = image - return True - - # ------------------------------------------------------------------ - # SAT-02: Tile selection for ESKF position ± 3σ_horizontal - # ------------------------------------------------------------------ - - @staticmethod - def _meters_to_degrees(meters: float, lat: float) -> tuple[float, float]: - """Convert a radius in metres to (Δlat°, Δlon°) at the given latitude.""" - delta_lat = meters / 111_320.0 - delta_lon = meters / (111_320.0 * math.cos(math.radians(lat))) - return delta_lat, delta_lon - - def select_tiles_for_eskf_position( - self, gps: GPSPoint, sigma_h_m: float, zoom: int - ) -> list[TileCoords]: - """Return all tile coords covering the ESKF position ± 3σ_horizontal area. - - Args: - gps: ESKF best-estimate position. - sigma_h_m: 1-σ horizontal uncertainty in metres (from ESKF covariance). - zoom: Web Mercator zoom level (18 recommended ≈ 0.6 m/px). - """ - radius_m = 3.0 * sigma_h_m - dlat, dlon = self._meters_to_degrees(radius_m, gps.lat) - - # Bounding box corners - lat_min, lat_max = gps.lat - dlat, gps.lat + dlat - lon_min, lon_max = gps.lon - dlon, gps.lon + dlon - - # Convert corners to tile coords - tc_nw = mercator.latlon_to_tile(lat_max, lon_min, zoom) - tc_se = mercator.latlon_to_tile(lat_min, lon_max, zoom) - - tiles: list[TileCoords] = [] - for x in range(tc_nw.x, tc_se.x + 1): - for y in range(tc_nw.y, tc_se.y + 1): - tiles.append(TileCoords(x=x, y=y, zoom=zoom)) - return tiles - - def assemble_mosaic( - self, - tile_list: list[tuple[TileCoords, np.ndarray]], - target_size: int = 512, - ) -> tuple[np.ndarray, TileBounds] | None: - """Assemble a list of (TileCoords, image) pairs into a single mosaic. - - Returns (mosaic_image, combined_bounds) or None if tile_list is empty. - The mosaic is resized to (target_size × target_size) for the matcher. - """ - if not tile_list: - return None - - xs = [tc.x for tc, _ in tile_list] - ys = [tc.y for tc, _ in tile_list] - zoom = tile_list[0][0].zoom - - x_min, x_max = min(xs), max(xs) - y_min, y_max = min(ys), max(ys) - - cols = x_max - x_min + 1 - rows = y_max - y_min + 1 - - # Determine single-tile pixel size from first image - sample = tile_list[0][1] - th, tw = sample.shape[:2] - - canvas = np.zeros((rows * th, cols * tw, 3), dtype=np.uint8) - for tc, img in tile_list: - col = tc.x - x_min - row = tc.y - y_min - h, w = img.shape[:2] - canvas[row * th: row * th + h, col * tw: col * tw + w] = img - - mosaic = cv2.resize(canvas, (target_size, target_size), interpolation=cv2.INTER_AREA) - - # Compute combined GPS bounds - nw_bounds = mercator.compute_tile_bounds(TileCoords(x=x_min, y=y_min, zoom=zoom)) - se_bounds = mercator.compute_tile_bounds(TileCoords(x=x_max, y=y_max, zoom=zoom)) - combined = TileBounds( - nw=nw_bounds.nw, - ne=GPSPoint(lat=nw_bounds.nw.lat, lon=se_bounds.se.lon), - sw=GPSPoint(lat=se_bounds.se.lat, lon=nw_bounds.nw.lon), - se=se_bounds.se, - center=GPSPoint( - lat=(nw_bounds.nw.lat + se_bounds.se.lat) / 2, - lon=(nw_bounds.nw.lon + se_bounds.se.lon) / 2, - ), - gsd=nw_bounds.gsd, - ) - return mosaic, combined - - def fetch_tiles_for_position( - self, gps: GPSPoint, sigma_h_m: float, zoom: int - ) -> tuple[np.ndarray, TileBounds] | None: - """High-level helper: select tiles + load + assemble mosaic. - - Returns (mosaic, bounds) or None if no local tiles are available. - """ - coords = self.select_tiles_for_eskf_position(gps, sigma_h_m, zoom) - loaded: list[tuple[TileCoords, np.ndarray]] = [] - for tc in coords: - img = self.load_local_tile(tc) - if img is not None: - loaded.append((tc, img)) - return self.assemble_mosaic(loaded) if loaded else None - - # ------------------------------------------------------------------ - # Cache helpers (backward-compat, also used for warm-path caching) - # ------------------------------------------------------------------ - - def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool: - """Cache a tile image in memory (used by tests and offline tools).""" - key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" - self._mem_cache[key] = tile_data - return True - - def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None: - """Retrieve a cached tile from memory.""" - key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}" - return self._mem_cache.get(key) - - # ------------------------------------------------------------------ - # Tile math helpers - # ------------------------------------------------------------------ - - def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]: - """Return grid_size tiles centered on center.""" - if grid_size == 1: - return [center] - - side = int(grid_size ** 0.5) - half = side // 2 - - coords: list[TileCoords] = [] - for dy in range(-half, half + 1): - for dx in range(-half, half + 1): - coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) - - if grid_size == 4: - coords = [] - for dy in range(2): - for dx in range(2): - coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom)) - - return coords[:grid_size] - - def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]: - """Return only the NEW tiles when expanding from current_size to new_size grid.""" - old_set = {(c.x, c.y) for c in self.get_tile_grid(center, current_size)} - return [c for c in self.get_tile_grid(center, new_size) if (c.x, c.y) not in old_set] - - def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords: - return mercator.latlon_to_tile(lat, lon, zoom) - - def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds: - return mercator.compute_tile_bounds(tile_coords) - - def clear_flight_cache(self, flight_id: str) -> bool: - """Clear in-memory cache (flight scoping is tile-key-based).""" - self._mem_cache.clear() - return True +__all__ = ["SatelliteDataManager"]