refactor(01-05): migrate satellite+metric to satellite_matcher component

- Move SatelliteDataManager impl to components/satellite_matcher/local_tile_loader.py
- Move MetricRefinement impl to components/satellite_matcher/metric_refinement.py
- MetricRefinement imports IMetricRefinement from protocol.py (no ABC copy)
- Replace core/satellite.py and core/metric.py with re-export shims
- Update satellite_matcher __init__.py to export both classes + protocols
- 216/216 tests pass (regression floor maintained)
This commit is contained in:
Yuzviak
2026-05-11 08:49:32 +03:00
parent 55ef732b96
commit 4c65770702
5 changed files with 500 additions and 501 deletions
@@ -0,0 +1,13 @@
"""satellite_matcher component public API."""
from .local_tile_loader import SatelliteDataManager
from .metric_refinement import MetricRefinement
from .protocol import IMetricRefinement, MetricRefiner, SatelliteTileLoader
__all__ = [
"SatelliteDataManager",
"MetricRefinement",
"IMetricRefinement",
"MetricRefiner",
"SatelliteTileLoader",
]
@@ -0,0 +1,283 @@
"""Local-disk tile loader (SAT-01/02). Phase 1 home of the existing SatelliteDataManager impl."""
import hashlib
import logging
import math
import os
from concurrent.futures import ThreadPoolExecutor
import cv2
import numpy as np
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.satellite import TileBounds, TileCoords
from gps_denied.utils import mercator
class SatelliteDataManager:
"""Manages satellite tiles from a local pre-loaded directory.
Directory layout (SAT-01):
{tile_dir}/{zoom}/{x}/{y}.png — standard Web Mercator slippy-map layout
No live HTTP requests are made during flight. A separate offline tooling step
downloads and stores tiles before the mission.
"""
_logger = logging.getLogger(__name__)
def __init__(
self,
tile_dir: str = ".satellite_tiles",
cache_dir: str = ".satellite_cache",
max_size_gb: float = 10.0,
):
self.tile_dir = tile_dir
self.thread_pool = ThreadPoolExecutor(max_workers=4)
# In-memory LRU for hot tiles (avoids repeated disk reads)
self._mem_cache: dict[str, np.ndarray] = {}
self._mem_cache_max = 256
# SHA-256 manifest for tile integrity (якщо файл існує)
self._manifest: dict[str, str] = self._load_manifest()
# ------------------------------------------------------------------
# SAT-01: Local tile reads (no HTTP)
# ------------------------------------------------------------------
def _load_manifest(self) -> dict[str, str]:
"""Завантажити SHA-256 manifest з tile_dir/manifest.sha256."""
path = os.path.join(self.tile_dir, "manifest.sha256")
if not os.path.isfile(path):
return {}
manifest: dict[str, str] = {}
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(maxsplit=1)
if len(parts) == 2:
manifest[parts[1].strip()] = parts[0].strip()
return manifest
def _verify_tile_integrity(self, rel_path: str, file_path: str) -> bool:
"""Перевірити SHA-256 тайла проти manifest (якщо manifest існує)."""
if not self._manifest:
return True # без manifest — пропускаємо
expected = self._manifest.get(rel_path)
if expected is None:
return True # тайл не в manifest — OK
sha = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
self._logger.warning("Tile integrity failed: %s (exp %s, got %s)",
rel_path, expected[:12], actual[:12])
return False
return True
def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None:
"""Load a tile image from the local pre-loaded directory.
Expected path: {tile_dir}/{zoom}/{x}/{y}.png
Returns None if the file does not exist.
"""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
if key in self._mem_cache:
return self._mem_cache[key]
rel_path = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}.png"
path = os.path.join(self.tile_dir, rel_path)
if not os.path.isfile(path):
return None
if not self._verify_tile_integrity(rel_path, path):
return None # тайл пошкоджений
img = cv2.imread(path, cv2.IMREAD_COLOR)
if img is None:
return None
# LRU eviction: drop oldest if full
if len(self._mem_cache) >= self._mem_cache_max:
oldest = next(iter(self._mem_cache))
del self._mem_cache[oldest]
self._mem_cache[key] = img
return img
def save_local_tile(self, tile_coords: TileCoords, image: np.ndarray) -> bool:
"""Persist a tile to the local directory (used by offline pre-fetch tooling)."""
path = os.path.join(self.tile_dir, str(tile_coords.zoom),
str(tile_coords.x), f"{tile_coords.y}.png")
os.makedirs(os.path.dirname(path), exist_ok=True)
ok, encoded = cv2.imencode(".png", image)
if not ok:
return False
with open(path, "wb") as f:
f.write(encoded.tobytes())
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
self._mem_cache[key] = image
return True
# ------------------------------------------------------------------
# SAT-02: Tile selection for ESKF position ± 3σ_horizontal
# ------------------------------------------------------------------
@staticmethod
def _meters_to_degrees(meters: float, lat: float) -> tuple[float, float]:
"""Convert a radius in metres to (Δlat°, Δlon°) at the given latitude."""
delta_lat = meters / 111_320.0
delta_lon = meters / (111_320.0 * math.cos(math.radians(lat)))
return delta_lat, delta_lon
def select_tiles_for_eskf_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> list[TileCoords]:
"""Return all tile coords covering the ESKF position ± 3σ_horizontal area.
Args:
gps: ESKF best-estimate position.
sigma_h_m: 1-σ horizontal uncertainty in metres (from ESKF covariance).
zoom: Web Mercator zoom level (18 recommended ≈ 0.6 m/px).
"""
radius_m = 3.0 * sigma_h_m
dlat, dlon = self._meters_to_degrees(radius_m, gps.lat)
# Bounding box corners
lat_min, lat_max = gps.lat - dlat, gps.lat + dlat
lon_min, lon_max = gps.lon - dlon, gps.lon + dlon
# Convert corners to tile coords
tc_nw = mercator.latlon_to_tile(lat_max, lon_min, zoom)
tc_se = mercator.latlon_to_tile(lat_min, lon_max, zoom)
tiles: list[TileCoords] = []
for x in range(tc_nw.x, tc_se.x + 1):
for y in range(tc_nw.y, tc_se.y + 1):
tiles.append(TileCoords(x=x, y=y, zoom=zoom))
return tiles
def assemble_mosaic(
self,
tile_list: list[tuple[TileCoords, np.ndarray]],
target_size: int = 512,
) -> tuple[np.ndarray, TileBounds] | None:
"""Assemble a list of (TileCoords, image) pairs into a single mosaic.
Returns (mosaic_image, combined_bounds) or None if tile_list is empty.
The mosaic is resized to (target_size × target_size) for the matcher.
"""
if not tile_list:
return None
xs = [tc.x for tc, _ in tile_list]
ys = [tc.y for tc, _ in tile_list]
zoom = tile_list[0][0].zoom
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
cols = x_max - x_min + 1
rows = y_max - y_min + 1
# Determine single-tile pixel size from first image
sample = tile_list[0][1]
th, tw = sample.shape[:2]
canvas = np.zeros((rows * th, cols * tw, 3), dtype=np.uint8)
for tc, img in tile_list:
col = tc.x - x_min
row = tc.y - y_min
h, w = img.shape[:2]
canvas[row * th: row * th + h, col * tw: col * tw + w] = img
mosaic = cv2.resize(canvas, (target_size, target_size), interpolation=cv2.INTER_AREA)
# Compute combined GPS bounds
nw_bounds = mercator.compute_tile_bounds(TileCoords(x=x_min, y=y_min, zoom=zoom))
se_bounds = mercator.compute_tile_bounds(TileCoords(x=x_max, y=y_max, zoom=zoom))
combined = TileBounds(
nw=nw_bounds.nw,
ne=GPSPoint(lat=nw_bounds.nw.lat, lon=se_bounds.se.lon),
sw=GPSPoint(lat=se_bounds.se.lat, lon=nw_bounds.nw.lon),
se=se_bounds.se,
center=GPSPoint(
lat=(nw_bounds.nw.lat + se_bounds.se.lat) / 2,
lon=(nw_bounds.nw.lon + se_bounds.se.lon) / 2,
),
gsd=nw_bounds.gsd,
)
return mosaic, combined
def fetch_tiles_for_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> tuple[np.ndarray, TileBounds] | None:
"""High-level helper: select tiles + load + assemble mosaic.
Returns (mosaic, bounds) or None if no local tiles are available.
"""
coords = self.select_tiles_for_eskf_position(gps, sigma_h_m, zoom)
loaded: list[tuple[TileCoords, np.ndarray]] = []
for tc in coords:
img = self.load_local_tile(tc)
if img is not None:
loaded.append((tc, img))
return self.assemble_mosaic(loaded) if loaded else None
# ------------------------------------------------------------------
# Cache helpers (backward-compat, also used for warm-path caching)
# ------------------------------------------------------------------
def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool:
"""Cache a tile image in memory (used by tests and offline tools)."""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
self._mem_cache[key] = tile_data
return True
def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None:
"""Retrieve a cached tile from memory."""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
return self._mem_cache.get(key)
# ------------------------------------------------------------------
# Tile math helpers
# ------------------------------------------------------------------
def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]:
"""Return grid_size tiles centered on center."""
if grid_size == 1:
return [center]
side = int(grid_size ** 0.5)
half = side // 2
coords: list[TileCoords] = []
for dy in range(-half, half + 1):
for dx in range(-half, half + 1):
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
if grid_size == 4:
coords = []
for dy in range(2):
for dx in range(2):
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
return coords[:grid_size]
def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]:
"""Return only the NEW tiles when expanding from current_size to new_size grid."""
old_set = {(c.x, c.y) for c in self.get_tile_grid(center, current_size)}
return [c for c in self.get_tile_grid(center, new_size) if (c.x, c.y) not in old_set]
def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords:
return mercator.latlon_to_tile(lat, lon, zoom)
def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds:
return mercator.compute_tile_bounds(tile_coords)
def clear_flight_cache(self, flight_id: str) -> bool:
"""Clear in-memory cache (flight scoping is tile-key-based)."""
self._mem_cache.clear()
return True
@@ -0,0 +1,190 @@
"""Metric Refinement implementation (SAT-03/04). Phase 1 home of MetricRefinement impl.
SAT-03: GSD normalization — downsample camera frame to satellite resolution.
SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio.
"""
import logging
from typing import List, Optional, Tuple
import cv2
import numpy as np
from gps_denied.components.satellite_matcher.protocol import IMetricRefinement
from gps_denied.core.models import IModelManager
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform
from gps_denied.schemas.satellite import TileBounds
logger = logging.getLogger(__name__)
class MetricRefinement(IMetricRefinement):
"""LiteSAM/XFeat-based alignment with GSD normalization.
SAT-03: normalize_gsd() downsamples UAV frame to match satellite GSD before matching.
SAT-04: confidence is computed as inlier_count / total_correspondences (inlier ratio).
"""
def __init__(self, model_manager: IModelManager):
self.model_manager = model_manager
# ------------------------------------------------------------------
# SAT-03: GSD normalization
# ------------------------------------------------------------------
@staticmethod
def normalize_gsd(
uav_image: np.ndarray,
uav_gsd_mpp: float,
sat_gsd_mpp: float,
) -> np.ndarray:
"""Resize UAV frame to match satellite GSD (meters-per-pixel).
Args:
uav_image: Raw UAV camera frame.
uav_gsd_mpp: UAV GSD in m/px (e.g. 0.159 at 600 m altitude).
sat_gsd_mpp: Satellite tile GSD in m/px (e.g. 0.6 at zoom 18).
Returns:
Resized image. If already coarser than satellite, returned unchanged.
"""
if uav_gsd_mpp <= 0 or sat_gsd_mpp <= 0:
return uav_image
scale = uav_gsd_mpp / sat_gsd_mpp
if scale >= 1.0:
return uav_image # UAV already coarser, nothing to do
h, w = uav_image.shape[:2]
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
return cv2.resize(uav_image, (new_w, new_h), interpolation=cv2.INTER_AREA)
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]:
engine = self.model_manager.get_inference_engine("LiteSAM")
# In reality we pass both images, for mock we just invoke to get generated format
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
if res["inlier_count"] < 15:
return None
return res["homography"]
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
# UAV image center
cx, cy = image_center
# Apply homography
pt = np.array([cx, cy, 1.0])
# transformed = H * pt
transformed = homography @ pt
transformed = transformed / transformed[2]
tx, ty = transformed[0], transformed[1]
# Approximate GPS mapping using bounds
# ty maps to latitude (ty=0 is North, ty=Height is South)
# tx maps to longitude (tx=0 is West, tx=Width is East)
# We assume standard 256x256 tiles for this mock calculation
tile_size = 256.0
lat_span = tile_bounds.nw.lat - tile_bounds.sw.lat
lon_span = tile_bounds.ne.lon - tile_bounds.nw.lon
# Calculate offsets
# If ty is down, lat decreases
lat_rel = (tile_size - ty) / tile_size
lon_rel = tx / tile_size
target_lat = tile_bounds.sw.lat + (lat_span * lat_rel)
target_lon = tile_bounds.nw.lon + (lon_span * lon_rel)
return GPSPoint(lat=target_lat, lon=target_lon)
def align_to_satellite(
self,
uav_image: np.ndarray,
satellite_tile: np.ndarray,
tile_bounds: TileBounds,
uav_gsd_mpp: float = 0.0,
) -> Optional[AlignmentResult]:
"""Align UAV frame to satellite tile.
Args:
uav_gsd_mpp: If > 0, the UAV frame is GSD-normalised to satellite
resolution before matching (SAT-03).
"""
# SAT-03: optional GSD normalization
sat_gsd = tile_bounds.gsd
if uav_gsd_mpp > 0 and sat_gsd > 0:
uav_image = self.normalize_gsd(uav_image, uav_gsd_mpp, sat_gsd)
engine = self.model_manager.get_inference_engine("LiteSAM")
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
if res["inlier_count"] < 15:
return None
h, w = uav_image.shape[:2] if hasattr(uav_image, "shape") else (480, 640)
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
# SAT-04: confidence = inlier_ratio (not raw engine confidence)
total = res.get("total_correspondences", max(res["inlier_count"], 1))
inlier_ratio = res["inlier_count"] / max(total, 1)
align = AlignmentResult(
matched=True,
homography=res["homography"],
gps_center=gps,
confidence=inlier_ratio,
inlier_count=res["inlier_count"],
total_correspondences=total,
reprojection_error=res.get("reprojection_error", 1.0),
)
return align if self.compute_match_confidence(align) > 0.5 else None
def compute_match_confidence(self, alignment: AlignmentResult) -> float:
# Complex heuristic combining inliers, reprojection error
score = alignment.confidence
# Penalty for high reproj error
if alignment.reprojection_error > 2.0:
score -= 0.2
return max(0.0, min(1.0, score))
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]:
# Aggregate logic is complex, for mock we just use the first image's match
if not chunk_images:
return None
return self.compute_homography(chunk_images[0], satellite_tile)
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
if not chunk_images:
return None
engine = self.model_manager.get_inference_engine("LiteSAM")
res = engine.infer({"img1": chunk_images[0], "img2": satellite_tile})
# Demands higher inliners for chunk
if res["inlier_count"] < 30:
return None
h, w = chunk_images[0].shape[:2] if hasattr(chunk_images[0], "shape") else (480, 640)
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
# Fake sim3
sim3 = Sim3Transform(
translation=np.array([10., 0., 0.]),
rotation=np.eye(3),
scale=1.0
)
chunk_align = ChunkAlignmentResult(
matched=True,
chunk_id="chunk1",
chunk_center_gps=gps,
rotation_angle=0.0,
confidence=res["confidence"],
inlier_count=res["inlier_count"],
transform=sim3,
reprojection_error=1.0
)
return chunk_align
+9 -215
View File
@@ -1,216 +1,10 @@
"""Metric Refinement (Component F09).
"""Legacy import path. Phase 1 shim — code lives in components/satellite_matcher/."""
from gps_denied.components.satellite_matcher.protocol import ( # noqa: F401
MetricRefiner,
IMetricRefinement,
)
from gps_denied.components.satellite_matcher.metric_refinement import ( # noqa: F401
MetricRefinement,
)
SAT-03: GSD normalization — downsample camera frame to satellite resolution.
SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio.
"""
import logging
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple
import cv2
import numpy as np
from gps_denied.core.models import IModelManager
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform
from gps_denied.schemas.satellite import TileBounds
logger = logging.getLogger(__name__)
class IMetricRefinement(ABC):
@abstractmethod
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]:
pass
@abstractmethod
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]:
pass
@abstractmethod
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
pass
@abstractmethod
def compute_match_confidence(self, alignment: AlignmentResult) -> float:
pass
@abstractmethod
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
pass
@abstractmethod
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]:
pass
class MetricRefinement(IMetricRefinement):
"""LiteSAM/XFeat-based alignment with GSD normalization.
SAT-03: normalize_gsd() downsamples UAV frame to match satellite GSD before matching.
SAT-04: confidence is computed as inlier_count / total_correspondences (inlier ratio).
"""
def __init__(self, model_manager: IModelManager):
self.model_manager = model_manager
# ------------------------------------------------------------------
# SAT-03: GSD normalization
# ------------------------------------------------------------------
@staticmethod
def normalize_gsd(
uav_image: np.ndarray,
uav_gsd_mpp: float,
sat_gsd_mpp: float,
) -> np.ndarray:
"""Resize UAV frame to match satellite GSD (meters-per-pixel).
Args:
uav_image: Raw UAV camera frame.
uav_gsd_mpp: UAV GSD in m/px (e.g. 0.159 at 600 m altitude).
sat_gsd_mpp: Satellite tile GSD in m/px (e.g. 0.6 at zoom 18).
Returns:
Resized image. If already coarser than satellite, returned unchanged.
"""
if uav_gsd_mpp <= 0 or sat_gsd_mpp <= 0:
return uav_image
scale = uav_gsd_mpp / sat_gsd_mpp
if scale >= 1.0:
return uav_image # UAV already coarser, nothing to do
h, w = uav_image.shape[:2]
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
return cv2.resize(uav_image, (new_w, new_h), interpolation=cv2.INTER_AREA)
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]:
engine = self.model_manager.get_inference_engine("LiteSAM")
# In reality we pass both images, for mock we just invoke to get generated format
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
if res["inlier_count"] < 15:
return None
return res["homography"]
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
# UAV image center
cx, cy = image_center
# Apply homography
pt = np.array([cx, cy, 1.0])
# transformed = H * pt
transformed = homography @ pt
transformed = transformed / transformed[2]
tx, ty = transformed[0], transformed[1]
# Approximate GPS mapping using bounds
# ty maps to latitude (ty=0 is North, ty=Height is South)
# tx maps to longitude (tx=0 is West, tx=Width is East)
# We assume standard 256x256 tiles for this mock calculation
tile_size = 256.0
lat_span = tile_bounds.nw.lat - tile_bounds.sw.lat
lon_span = tile_bounds.ne.lon - tile_bounds.nw.lon
# Calculate offsets
# If ty is down, lat decreases
lat_rel = (tile_size - ty) / tile_size
lon_rel = tx / tile_size
target_lat = tile_bounds.sw.lat + (lat_span * lat_rel)
target_lon = tile_bounds.nw.lon + (lon_span * lon_rel)
return GPSPoint(lat=target_lat, lon=target_lon)
def align_to_satellite(
self,
uav_image: np.ndarray,
satellite_tile: np.ndarray,
tile_bounds: TileBounds,
uav_gsd_mpp: float = 0.0,
) -> Optional[AlignmentResult]:
"""Align UAV frame to satellite tile.
Args:
uav_gsd_mpp: If > 0, the UAV frame is GSD-normalised to satellite
resolution before matching (SAT-03).
"""
# SAT-03: optional GSD normalization
sat_gsd = tile_bounds.gsd
if uav_gsd_mpp > 0 and sat_gsd > 0:
uav_image = self.normalize_gsd(uav_image, uav_gsd_mpp, sat_gsd)
engine = self.model_manager.get_inference_engine("LiteSAM")
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
if res["inlier_count"] < 15:
return None
h, w = uav_image.shape[:2] if hasattr(uav_image, "shape") else (480, 640)
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
# SAT-04: confidence = inlier_ratio (not raw engine confidence)
total = res.get("total_correspondences", max(res["inlier_count"], 1))
inlier_ratio = res["inlier_count"] / max(total, 1)
align = AlignmentResult(
matched=True,
homography=res["homography"],
gps_center=gps,
confidence=inlier_ratio,
inlier_count=res["inlier_count"],
total_correspondences=total,
reprojection_error=res.get("reprojection_error", 1.0),
)
return align if self.compute_match_confidence(align) > 0.5 else None
def compute_match_confidence(self, alignment: AlignmentResult) -> float:
# Complex heuristic combining inliers, reprojection error
score = alignment.confidence
# Penalty for high reproj error
if alignment.reprojection_error > 2.0:
score -= 0.2
return max(0.0, min(1.0, score))
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]:
# Aggregate logic is complex, for mock we just use the first image's match
if not chunk_images:
return None
return self.compute_homography(chunk_images[0], satellite_tile)
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
if not chunk_images:
return None
engine = self.model_manager.get_inference_engine("LiteSAM")
res = engine.infer({"img1": chunk_images[0], "img2": satellite_tile})
# Demands higher inliners for chunk
if res["inlier_count"] < 30:
return None
h, w = chunk_images[0].shape[:2] if hasattr(chunk_images[0], "shape") else (480, 640)
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
# Fake sim3
sim3 = Sim3Transform(
translation=np.array([10., 0., 0.]),
rotation=np.eye(3),
scale=1.0
)
chunk_align = ChunkAlignmentResult(
matched=True,
chunk_id="chunk1",
chunk_center_gps=gps,
rotation_angle=0.0,
confidence=res["confidence"],
inlier_count=res["inlier_count"],
transform=sim3,
reprojection_error=1.0
)
return chunk_align
__all__ = ["MetricRefinement", "IMetricRefinement", "MetricRefiner"]
+5 -286
View File
@@ -1,287 +1,6 @@
"""Satellite Data Manager (Component F04).
"""Legacy import path. Phase 1 shim — code lives in components/satellite_matcher/."""
from gps_denied.components.satellite_matcher.local_tile_loader import ( # noqa: F401
SatelliteDataManager,
)
SAT-01: Reads pre-loaded tiles from a local z/x/y directory (no live HTTP during flight).
SAT-02: Tile selection uses ESKF position ± 3σ_horizontal to define search area.
"""
import hashlib
import logging
import math
import os
from concurrent.futures import ThreadPoolExecutor
import cv2
import numpy as np
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.satellite import TileBounds, TileCoords
from gps_denied.utils import mercator
class SatelliteDataManager:
"""Manages satellite tiles from a local pre-loaded directory.
Directory layout (SAT-01):
{tile_dir}/{zoom}/{x}/{y}.png — standard Web Mercator slippy-map layout
No live HTTP requests are made during flight. A separate offline tooling step
downloads and stores tiles before the mission.
"""
_logger = logging.getLogger(__name__)
def __init__(
self,
tile_dir: str = ".satellite_tiles",
cache_dir: str = ".satellite_cache",
max_size_gb: float = 10.0,
):
self.tile_dir = tile_dir
self.thread_pool = ThreadPoolExecutor(max_workers=4)
# In-memory LRU for hot tiles (avoids repeated disk reads)
self._mem_cache: dict[str, np.ndarray] = {}
self._mem_cache_max = 256
# SHA-256 manifest for tile integrity (якщо файл існує)
self._manifest: dict[str, str] = self._load_manifest()
# ------------------------------------------------------------------
# SAT-01: Local tile reads (no HTTP)
# ------------------------------------------------------------------
def _load_manifest(self) -> dict[str, str]:
"""Завантажити SHA-256 manifest з tile_dir/manifest.sha256."""
path = os.path.join(self.tile_dir, "manifest.sha256")
if not os.path.isfile(path):
return {}
manifest: dict[str, str] = {}
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(maxsplit=1)
if len(parts) == 2:
manifest[parts[1].strip()] = parts[0].strip()
return manifest
def _verify_tile_integrity(self, rel_path: str, file_path: str) -> bool:
"""Перевірити SHA-256 тайла проти manifest (якщо manifest існує)."""
if not self._manifest:
return True # без manifest — пропускаємо
expected = self._manifest.get(rel_path)
if expected is None:
return True # тайл не в manifest — OK
sha = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
self._logger.warning("Tile integrity failed: %s (exp %s, got %s)",
rel_path, expected[:12], actual[:12])
return False
return True
def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None:
"""Load a tile image from the local pre-loaded directory.
Expected path: {tile_dir}/{zoom}/{x}/{y}.png
Returns None if the file does not exist.
"""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
if key in self._mem_cache:
return self._mem_cache[key]
rel_path = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}.png"
path = os.path.join(self.tile_dir, rel_path)
if not os.path.isfile(path):
return None
if not self._verify_tile_integrity(rel_path, path):
return None # тайл пошкоджений
img = cv2.imread(path, cv2.IMREAD_COLOR)
if img is None:
return None
# LRU eviction: drop oldest if full
if len(self._mem_cache) >= self._mem_cache_max:
oldest = next(iter(self._mem_cache))
del self._mem_cache[oldest]
self._mem_cache[key] = img
return img
def save_local_tile(self, tile_coords: TileCoords, image: np.ndarray) -> bool:
"""Persist a tile to the local directory (used by offline pre-fetch tooling)."""
path = os.path.join(self.tile_dir, str(tile_coords.zoom),
str(tile_coords.x), f"{tile_coords.y}.png")
os.makedirs(os.path.dirname(path), exist_ok=True)
ok, encoded = cv2.imencode(".png", image)
if not ok:
return False
with open(path, "wb") as f:
f.write(encoded.tobytes())
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
self._mem_cache[key] = image
return True
# ------------------------------------------------------------------
# SAT-02: Tile selection for ESKF position ± 3σ_horizontal
# ------------------------------------------------------------------
@staticmethod
def _meters_to_degrees(meters: float, lat: float) -> tuple[float, float]:
"""Convert a radius in metres to (Δlat°, Δlon°) at the given latitude."""
delta_lat = meters / 111_320.0
delta_lon = meters / (111_320.0 * math.cos(math.radians(lat)))
return delta_lat, delta_lon
def select_tiles_for_eskf_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> list[TileCoords]:
"""Return all tile coords covering the ESKF position ± 3σ_horizontal area.
Args:
gps: ESKF best-estimate position.
sigma_h_m: 1-σ horizontal uncertainty in metres (from ESKF covariance).
zoom: Web Mercator zoom level (18 recommended ≈ 0.6 m/px).
"""
radius_m = 3.0 * sigma_h_m
dlat, dlon = self._meters_to_degrees(radius_m, gps.lat)
# Bounding box corners
lat_min, lat_max = gps.lat - dlat, gps.lat + dlat
lon_min, lon_max = gps.lon - dlon, gps.lon + dlon
# Convert corners to tile coords
tc_nw = mercator.latlon_to_tile(lat_max, lon_min, zoom)
tc_se = mercator.latlon_to_tile(lat_min, lon_max, zoom)
tiles: list[TileCoords] = []
for x in range(tc_nw.x, tc_se.x + 1):
for y in range(tc_nw.y, tc_se.y + 1):
tiles.append(TileCoords(x=x, y=y, zoom=zoom))
return tiles
def assemble_mosaic(
self,
tile_list: list[tuple[TileCoords, np.ndarray]],
target_size: int = 512,
) -> tuple[np.ndarray, TileBounds] | None:
"""Assemble a list of (TileCoords, image) pairs into a single mosaic.
Returns (mosaic_image, combined_bounds) or None if tile_list is empty.
The mosaic is resized to (target_size × target_size) for the matcher.
"""
if not tile_list:
return None
xs = [tc.x for tc, _ in tile_list]
ys = [tc.y for tc, _ in tile_list]
zoom = tile_list[0][0].zoom
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
cols = x_max - x_min + 1
rows = y_max - y_min + 1
# Determine single-tile pixel size from first image
sample = tile_list[0][1]
th, tw = sample.shape[:2]
canvas = np.zeros((rows * th, cols * tw, 3), dtype=np.uint8)
for tc, img in tile_list:
col = tc.x - x_min
row = tc.y - y_min
h, w = img.shape[:2]
canvas[row * th: row * th + h, col * tw: col * tw + w] = img
mosaic = cv2.resize(canvas, (target_size, target_size), interpolation=cv2.INTER_AREA)
# Compute combined GPS bounds
nw_bounds = mercator.compute_tile_bounds(TileCoords(x=x_min, y=y_min, zoom=zoom))
se_bounds = mercator.compute_tile_bounds(TileCoords(x=x_max, y=y_max, zoom=zoom))
combined = TileBounds(
nw=nw_bounds.nw,
ne=GPSPoint(lat=nw_bounds.nw.lat, lon=se_bounds.se.lon),
sw=GPSPoint(lat=se_bounds.se.lat, lon=nw_bounds.nw.lon),
se=se_bounds.se,
center=GPSPoint(
lat=(nw_bounds.nw.lat + se_bounds.se.lat) / 2,
lon=(nw_bounds.nw.lon + se_bounds.se.lon) / 2,
),
gsd=nw_bounds.gsd,
)
return mosaic, combined
def fetch_tiles_for_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> tuple[np.ndarray, TileBounds] | None:
"""High-level helper: select tiles + load + assemble mosaic.
Returns (mosaic, bounds) or None if no local tiles are available.
"""
coords = self.select_tiles_for_eskf_position(gps, sigma_h_m, zoom)
loaded: list[tuple[TileCoords, np.ndarray]] = []
for tc in coords:
img = self.load_local_tile(tc)
if img is not None:
loaded.append((tc, img))
return self.assemble_mosaic(loaded) if loaded else None
# ------------------------------------------------------------------
# Cache helpers (backward-compat, also used for warm-path caching)
# ------------------------------------------------------------------
def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool:
"""Cache a tile image in memory (used by tests and offline tools)."""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
self._mem_cache[key] = tile_data
return True
def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None:
"""Retrieve a cached tile from memory."""
key = f"{tile_coords.zoom}/{tile_coords.x}/{tile_coords.y}"
return self._mem_cache.get(key)
# ------------------------------------------------------------------
# Tile math helpers
# ------------------------------------------------------------------
def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]:
"""Return grid_size tiles centered on center."""
if grid_size == 1:
return [center]
side = int(grid_size ** 0.5)
half = side // 2
coords: list[TileCoords] = []
for dy in range(-half, half + 1):
for dx in range(-half, half + 1):
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
if grid_size == 4:
coords = []
for dy in range(2):
for dx in range(2):
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
return coords[:grid_size]
def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]:
"""Return only the NEW tiles when expanding from current_size to new_size grid."""
old_set = {(c.x, c.y) for c in self.get_tile_grid(center, current_size)}
return [c for c in self.get_tile_grid(center, new_size) if (c.x, c.y) not in old_set]
def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords:
return mercator.latlon_to_tile(lat, lon, zoom)
def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds:
return mercator.compute_tile_bounds(tile_coords)
def clear_flight_cache(self, flight_id: str) -> bool:
"""Clear in-memory cache (flight scoping is tile-key-based)."""
self._mem_cache.clear()
return True
__all__ = ["SatelliteDataManager"]