import cv2 import numpy as np import logging from typing import List, Optional, Tuple, Dict, Any from pydantic import BaseModel from abc import ABC, abstractmethod from f02_1_flight_lifecycle_manager import GPSPoint from f04_satellite_data_manager import TileBounds logger = logging.getLogger(__name__) # --- Data Models --- class AlignmentResult(BaseModel): matched: bool homography: Any # np.ndarray (3, 3) gps_center: GPSPoint confidence: float inlier_count: int total_correspondences: int reprojection_error: float model_config = {"arbitrary_types_allowed": True} class Sim3Transform(BaseModel): translation: Any # np.ndarray (3,) rotation: Any # np.ndarray (3, 3) scale: float model_config = {"arbitrary_types_allowed": True} class ChunkAlignmentResult(BaseModel): matched: bool chunk_id: str chunk_center_gps: GPSPoint rotation_angle: float confidence: float inlier_count: int transform: Sim3Transform reprojection_error: float model_config = {"arbitrary_types_allowed": True} class LiteSAMConfig(BaseModel): model_path: str = "litesam.onnx" confidence_threshold: float = 0.7 min_inliers: int = 15 max_reprojection_error: float = 2.0 multi_scale_levels: int = 3 chunk_min_inliers: int = 30 # --- Interface --- class IMetricRefinement(ABC): @abstractmethod def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]: pass @abstractmethod def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass @abstractmethod def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: pass @abstractmethod def compute_match_confidence(self, alignment: Any) -> float: pass @abstractmethod def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: pass @abstractmethod def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass # --- Implementation --- class LocalGeospatialAnchoring(IMetricRefinement): """ F09: Local Geospatial Anchoring Back-End Handles precise metric refinement (absolute GPS anchoring) using LiteSAM for cross-view UAV-to-Satellite matching via homography estimation. """ def __init__(self, config: Optional[LiteSAMConfig] = None, model_manager=None): self.config = config or LiteSAMConfig() self.model_manager = model_manager # --- Internal Math & Coordinate Helpers --- def _pixel_to_gps(self, pixel_x: float, pixel_y: float, tile_bounds: TileBounds, tile_w: int, tile_h: int) -> GPSPoint: # Interpolate GPS within the tile bounds (assuming Web Mercator linearity at tile scale) x_ratio = pixel_x / tile_w y_ratio = pixel_y / tile_h lon = tile_bounds.nw.lon + (tile_bounds.ne.lon - tile_bounds.nw.lon) * x_ratio lat = tile_bounds.nw.lat + (tile_bounds.sw.lat - tile_bounds.nw.lat) * y_ratio return GPSPoint(lat=lat, lon=lon) def _compute_reprojection_error(self, homography: np.ndarray, src_pts: np.ndarray, dst_pts: np.ndarray) -> float: if len(src_pts) == 0: return float('inf') src_homog = np.hstack([src_pts, np.ones((src_pts.shape[0], 1))]) projected = (homography @ src_homog.T).T projected = projected[:, :2] / projected[:, 2:] errors = np.linalg.norm(projected - dst_pts, axis=1) return float(np.mean(errors)) def _compute_spatial_distribution(self, inliers: np.ndarray) -> float: # Mock spatial distribution heuristic (1.0 = perfect spread, 0.0 = single point) if len(inliers) < 3: return 0.0 std_x = np.std(inliers[:, 0]) std_y = np.std(inliers[:, 1]) return min(1.0, (std_x + std_y) / 100.0) # Assume good spread if std dev is > 50px # --- 09.01 Feature: Single Image Alignment --- def _extract_features(self, image: np.ndarray) -> np.ndarray: # Mock TAIFormer encoder features return np.random.rand(100, 256).astype(np.float32) def _compute_correspondences(self, uav_features: np.ndarray, sat_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: if self.model_manager and hasattr(self.model_manager, 'run_litesam'): return self.model_manager.run_litesam(uav_features, sat_features) # Mock CTM correlation field: returning matched pixel coordinates num_matches = 100 uav_pts = np.random.rand(num_matches, 2) * [640, 480] # Create "perfect" matches + noise for RANSAC sat_pts = uav_pts + np.array([100.0, 50.0]) + np.random.normal(0, 2.0, (num_matches, 2)) return uav_pts.astype(np.float32), sat_pts.astype(np.float32) def _estimate_homography_ransac(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], np.ndarray]: if len(src_pts) < 4: return None, np.array([]) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) return H, mask def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]: uav_feat = self._extract_features(uav_image) sat_feat = self._extract_features(satellite_tile) src_pts, dst_pts = self._compute_correspondences(uav_feat, sat_feat) H, mask = self._estimate_homography_ransac(src_pts, dst_pts) if H is None: return None inliers_mask = mask.ravel() == 1 inlier_count = int(np.sum(inliers_mask)) if inlier_count < self.config.min_inliers: return None mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask]) stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]} return H, stats def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: center_pt = np.array([[image_center[0], image_center[1]]], dtype=np.float32) center_homog = np.array([[[center_pt[0,0], center_pt[0,1]]]]) transformed = cv2.perspectiveTransform(center_homog, homography) sat_x, sat_y = transformed[0][0] # Assuming satellite tile is 512x512 for generic testing without explicit image dims return self._pixel_to_gps(sat_x, sat_y, tile_bounds, 512, 512) def compute_match_confidence(self, inlier_ratio: float, inlier_count: int, mre: float, spatial_dist: float) -> float: if inlier_count < self.config.min_inliers or mre > self.config.max_reprojection_error: return 0.0 base_conf = min(1.0, (inlier_ratio * 0.5) + (inlier_count / 100.0 * 0.3) + (spatial_dist * 0.2)) if inlier_ratio > 0.6 and inlier_count > 50 and mre < 0.5: return max(0.85, base_conf) if inlier_ratio > 0.4 and inlier_count > 30: return max(0.5, min(0.8, base_conf)) return min(0.49, base_conf) def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]: res = self.compute_homography(uav_image, satellite_tile) if res is None: return None H, stats = res h, w = uav_image.shape[:2] gps = self.extract_gps_from_alignment(H, tile_bounds, (w//2, h//2)) ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0 spatial = self._compute_spatial_distribution(stats["inlier_pts"]) conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial) return AlignmentResult(matched=True, homography=H, gps_center=gps, confidence=conf, inlier_count=stats["inlier_count"], total_correspondences=stats["total"], reprojection_error=stats["mre"]) # --- 09.02 Feature: Chunk Alignment --- def _extract_chunk_features(self, chunk_images: List[np.ndarray]) -> List[np.ndarray]: return [self._extract_features(img) for img in chunk_images] def _aggregate_features(self, features_list: List[np.ndarray]) -> np.ndarray: return np.mean(np.stack(features_list), axis=0) def _aggregate_correspondences(self, correspondences_list: List[Tuple[np.ndarray, np.ndarray]]) -> Tuple[np.ndarray, np.ndarray]: src_pts = np.vstack([c[0] for c in correspondences_list]) dst_pts = np.vstack([c[1] for c in correspondences_list]) return src_pts, dst_pts def _estimate_chunk_homography(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], dict]: H, mask = self._estimate_homography_ransac(src_pts, dst_pts) if H is None: return None, {} inliers_mask = mask.ravel() == 1 inlier_count = int(np.sum(inliers_mask)) if inlier_count < self.config.chunk_min_inliers: return None, {} mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask]) stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]} return H, stats def _compute_sim3_transform(self, homography: np.ndarray, tile_bounds: TileBounds) -> Sim3Transform: tx, ty = homography[0, 2], homography[1, 2] scale = np.sqrt(homography[0, 0]**2 + homography[1, 0]**2) rot_angle = np.arctan2(homography[1, 0], homography[0, 0]) R = np.array([ [np.cos(rot_angle), -np.sin(rot_angle), 0], [np.sin(rot_angle), np.cos(rot_angle), 0], [0, 0, 1] ]) return Sim3Transform(translation=np.array([tx, ty, 0.0]), rotation=R, scale=float(scale)) def _get_chunk_center_gps(self, homography: np.ndarray, tile_bounds: TileBounds, chunk_images: List[np.ndarray]) -> GPSPoint: mid_idx = len(chunk_images) // 2 mid_img = chunk_images[mid_idx] h, w = mid_img.shape[:2] return self.extract_gps_from_alignment(homography, tile_bounds, (w // 2, h // 2)) def _validate_chunk_match(self, inliers: int, confidence: float) -> bool: return inliers >= self.config.chunk_min_inliers and confidence >= self.config.confidence_threshold def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]: if not chunk_images: return None sat_feat = self._extract_features(satellite_tile) chunk_features = self._extract_chunk_features(chunk_images) correspondences = [] for feat in chunk_features: src, dst = self._compute_correspondences(feat, sat_feat) correspondences.append((src, dst)) agg_src, agg_dst = self._aggregate_correspondences(correspondences) H, stats = self._estimate_chunk_homography(agg_src, agg_dst) if H is None: return None return H, stats def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: if not chunk_images: return None res = self.match_chunk_homography(chunk_images, satellite_tile) if res is None: return None H, stats = res gps = self._get_chunk_center_gps(H, tile_bounds, chunk_images) ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0 spatial = self._compute_spatial_distribution(stats["inlier_pts"]) conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial) if not self._validate_chunk_match(stats["inlier_count"], conf): return None sim3 = self._compute_sim3_transform(H, tile_bounds) rot_angle_deg = float(np.degrees(np.arctan2(H[1, 0], H[0, 0]))) return ChunkAlignmentResult( matched=True, chunk_id="chunk_matched", chunk_center_gps=gps, rotation_angle=rot_angle_deg, confidence=conf, inlier_count=stats["inlier_count"], transform=sim3, reprojection_error=stats["mre"] )