Files
gps-denied-onboard/f09_local_geospatial_anchoring.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

288 lines
12 KiB
Python

import cv2
import numpy as np
import logging
from typing import List, Optional, Tuple, Dict, Any
from pydantic import BaseModel
from abc import ABC, abstractmethod
from f02_1_flight_lifecycle_manager import GPSPoint
from f04_satellite_data_manager import TileBounds
logger = logging.getLogger(__name__)
# --- Data Models ---
class AlignmentResult(BaseModel):
matched: bool
homography: Any # np.ndarray (3, 3)
gps_center: GPSPoint
confidence: float
inlier_count: int
total_correspondences: int
reprojection_error: float
model_config = {"arbitrary_types_allowed": True}
class Sim3Transform(BaseModel):
translation: Any # np.ndarray (3,)
rotation: Any # np.ndarray (3, 3)
scale: float
model_config = {"arbitrary_types_allowed": True}
class ChunkAlignmentResult(BaseModel):
matched: bool
chunk_id: str
chunk_center_gps: GPSPoint
rotation_angle: float
confidence: float
inlier_count: int
transform: Sim3Transform
reprojection_error: float
model_config = {"arbitrary_types_allowed": True}
class LiteSAMConfig(BaseModel):
model_path: str = "litesam.onnx"
confidence_threshold: float = 0.7
min_inliers: int = 15
max_reprojection_error: float = 2.0
multi_scale_levels: int = 3
chunk_min_inliers: int = 30
# --- Interface ---
class IMetricRefinement(ABC):
@abstractmethod
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]: pass
@abstractmethod
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass
@abstractmethod
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: pass
@abstractmethod
def compute_match_confidence(self, alignment: Any) -> float: pass
@abstractmethod
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: pass
@abstractmethod
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass
# --- Implementation ---
class LocalGeospatialAnchoring(IMetricRefinement):
"""
F09: Local Geospatial Anchoring Back-End
Handles precise metric refinement (absolute GPS anchoring) using LiteSAM for
cross-view UAV-to-Satellite matching via homography estimation.
"""
def __init__(self, config: Optional[LiteSAMConfig] = None, model_manager=None):
self.config = config or LiteSAMConfig()
self.model_manager = model_manager
# --- Internal Math & Coordinate Helpers ---
def _pixel_to_gps(self, pixel_x: float, pixel_y: float, tile_bounds: TileBounds, tile_w: int, tile_h: int) -> GPSPoint:
# Interpolate GPS within the tile bounds (assuming Web Mercator linearity at tile scale)
x_ratio = pixel_x / tile_w
y_ratio = pixel_y / tile_h
lon = tile_bounds.nw.lon + (tile_bounds.ne.lon - tile_bounds.nw.lon) * x_ratio
lat = tile_bounds.nw.lat + (tile_bounds.sw.lat - tile_bounds.nw.lat) * y_ratio
return GPSPoint(lat=lat, lon=lon)
def _compute_reprojection_error(self, homography: np.ndarray, src_pts: np.ndarray, dst_pts: np.ndarray) -> float:
if len(src_pts) == 0: return float('inf')
src_homog = np.hstack([src_pts, np.ones((src_pts.shape[0], 1))])
projected = (homography @ src_homog.T).T
projected = projected[:, :2] / projected[:, 2:]
errors = np.linalg.norm(projected - dst_pts, axis=1)
return float(np.mean(errors))
def _compute_spatial_distribution(self, inliers: np.ndarray) -> float:
# Mock spatial distribution heuristic (1.0 = perfect spread, 0.0 = single point)
if len(inliers) < 3: return 0.0
std_x = np.std(inliers[:, 0])
std_y = np.std(inliers[:, 1])
return min(1.0, (std_x + std_y) / 100.0) # Assume good spread if std dev is > 50px
# --- 09.01 Feature: Single Image Alignment ---
def _extract_features(self, image: np.ndarray) -> np.ndarray:
# Mock TAIFormer encoder features
return np.random.rand(100, 256).astype(np.float32)
def _compute_correspondences(self, uav_features: np.ndarray, sat_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
if self.model_manager and hasattr(self.model_manager, 'run_litesam'):
return self.model_manager.run_litesam(uav_features, sat_features)
# Mock CTM correlation field: returning matched pixel coordinates
num_matches = 100
uav_pts = np.random.rand(num_matches, 2) * [640, 480]
# Create "perfect" matches + noise for RANSAC
sat_pts = uav_pts + np.array([100.0, 50.0]) + np.random.normal(0, 2.0, (num_matches, 2))
return uav_pts.astype(np.float32), sat_pts.astype(np.float32)
def _estimate_homography_ransac(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], np.ndarray]:
if len(src_pts) < 4:
return None, np.array([])
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
return H, mask
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]:
uav_feat = self._extract_features(uav_image)
sat_feat = self._extract_features(satellite_tile)
src_pts, dst_pts = self._compute_correspondences(uav_feat, sat_feat)
H, mask = self._estimate_homography_ransac(src_pts, dst_pts)
if H is None:
return None
inliers_mask = mask.ravel() == 1
inlier_count = int(np.sum(inliers_mask))
if inlier_count < self.config.min_inliers:
return None
mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask])
stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]}
return H, stats
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
center_pt = np.array([[image_center[0], image_center[1]]], dtype=np.float32)
center_homog = np.array([[[center_pt[0,0], center_pt[0,1]]]])
transformed = cv2.perspectiveTransform(center_homog, homography)
sat_x, sat_y = transformed[0][0]
# Assuming satellite tile is 512x512 for generic testing without explicit image dims
return self._pixel_to_gps(sat_x, sat_y, tile_bounds, 512, 512)
def compute_match_confidence(self, inlier_ratio: float, inlier_count: int, mre: float, spatial_dist: float) -> float:
if inlier_count < self.config.min_inliers or mre > self.config.max_reprojection_error:
return 0.0
base_conf = min(1.0, (inlier_ratio * 0.5) + (inlier_count / 100.0 * 0.3) + (spatial_dist * 0.2))
if inlier_ratio > 0.6 and inlier_count > 50 and mre < 0.5:
return max(0.85, base_conf)
if inlier_ratio > 0.4 and inlier_count > 30:
return max(0.5, min(0.8, base_conf))
return min(0.49, base_conf)
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]:
res = self.compute_homography(uav_image, satellite_tile)
if res is None: return None
H, stats = res
h, w = uav_image.shape[:2]
gps = self.extract_gps_from_alignment(H, tile_bounds, (w//2, h//2))
ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0
spatial = self._compute_spatial_distribution(stats["inlier_pts"])
conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial)
return AlignmentResult(matched=True, homography=H, gps_center=gps, confidence=conf, inlier_count=stats["inlier_count"], total_correspondences=stats["total"], reprojection_error=stats["mre"])
# --- 09.02 Feature: Chunk Alignment ---
def _extract_chunk_features(self, chunk_images: List[np.ndarray]) -> List[np.ndarray]:
return [self._extract_features(img) for img in chunk_images]
def _aggregate_features(self, features_list: List[np.ndarray]) -> np.ndarray:
return np.mean(np.stack(features_list), axis=0)
def _aggregate_correspondences(self, correspondences_list: List[Tuple[np.ndarray, np.ndarray]]) -> Tuple[np.ndarray, np.ndarray]:
src_pts = np.vstack([c[0] for c in correspondences_list])
dst_pts = np.vstack([c[1] for c in correspondences_list])
return src_pts, dst_pts
def _estimate_chunk_homography(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], dict]:
H, mask = self._estimate_homography_ransac(src_pts, dst_pts)
if H is None:
return None, {}
inliers_mask = mask.ravel() == 1
inlier_count = int(np.sum(inliers_mask))
if inlier_count < self.config.chunk_min_inliers:
return None, {}
mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask])
stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]}
return H, stats
def _compute_sim3_transform(self, homography: np.ndarray, tile_bounds: TileBounds) -> Sim3Transform:
tx, ty = homography[0, 2], homography[1, 2]
scale = np.sqrt(homography[0, 0]**2 + homography[1, 0]**2)
rot_angle = np.arctan2(homography[1, 0], homography[0, 0])
R = np.array([
[np.cos(rot_angle), -np.sin(rot_angle), 0],
[np.sin(rot_angle), np.cos(rot_angle), 0],
[0, 0, 1]
])
return Sim3Transform(translation=np.array([tx, ty, 0.0]), rotation=R, scale=float(scale))
def _get_chunk_center_gps(self, homography: np.ndarray, tile_bounds: TileBounds, chunk_images: List[np.ndarray]) -> GPSPoint:
mid_idx = len(chunk_images) // 2
mid_img = chunk_images[mid_idx]
h, w = mid_img.shape[:2]
return self.extract_gps_from_alignment(homography, tile_bounds, (w // 2, h // 2))
def _validate_chunk_match(self, inliers: int, confidence: float) -> bool:
return inliers >= self.config.chunk_min_inliers and confidence >= self.config.confidence_threshold
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]:
if not chunk_images:
return None
sat_feat = self._extract_features(satellite_tile)
chunk_features = self._extract_chunk_features(chunk_images)
correspondences = []
for feat in chunk_features:
src, dst = self._compute_correspondences(feat, sat_feat)
correspondences.append((src, dst))
agg_src, agg_dst = self._aggregate_correspondences(correspondences)
H, stats = self._estimate_chunk_homography(agg_src, agg_dst)
if H is None:
return None
return H, stats
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
if not chunk_images:
return None
res = self.match_chunk_homography(chunk_images, satellite_tile)
if res is None: return None
H, stats = res
gps = self._get_chunk_center_gps(H, tile_bounds, chunk_images)
ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0
spatial = self._compute_spatial_distribution(stats["inlier_pts"])
conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial)
if not self._validate_chunk_match(stats["inlier_count"], conf):
return None
sim3 = self._compute_sim3_transform(H, tile_bounds)
rot_angle_deg = float(np.degrees(np.arctan2(H[1, 0], H[0, 0])))
return ChunkAlignmentResult(
matched=True,
chunk_id="chunk_matched",
chunk_center_gps=gps,
rotation_angle=rot_angle_deg,
confidence=conf,
inlier_count=stats["inlier_count"],
transform=sim3,
reprojection_error=stats["mre"]
)