mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 08:46:38 +00:00
288 lines
12 KiB
Python
288 lines
12 KiB
Python
import cv2
|
|
import numpy as np
|
|
import logging
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
from pydantic import BaseModel
|
|
from abc import ABC, abstractmethod
|
|
|
|
from f02_1_flight_lifecycle_manager import GPSPoint
|
|
from f04_satellite_data_manager import TileBounds
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Data Models ---
|
|
|
|
class AlignmentResult(BaseModel):
|
|
matched: bool
|
|
homography: Any # np.ndarray (3, 3)
|
|
gps_center: GPSPoint
|
|
confidence: float
|
|
inlier_count: int
|
|
total_correspondences: int
|
|
reprojection_error: float
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class Sim3Transform(BaseModel):
|
|
translation: Any # np.ndarray (3,)
|
|
rotation: Any # np.ndarray (3, 3)
|
|
scale: float
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class ChunkAlignmentResult(BaseModel):
|
|
matched: bool
|
|
chunk_id: str
|
|
chunk_center_gps: GPSPoint
|
|
rotation_angle: float
|
|
confidence: float
|
|
inlier_count: int
|
|
transform: Sim3Transform
|
|
reprojection_error: float
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class LiteSAMConfig(BaseModel):
|
|
model_path: str = "litesam.onnx"
|
|
confidence_threshold: float = 0.7
|
|
min_inliers: int = 15
|
|
max_reprojection_error: float = 2.0
|
|
multi_scale_levels: int = 3
|
|
chunk_min_inliers: int = 30
|
|
|
|
# --- Interface ---
|
|
|
|
class IMetricRefinement(ABC):
|
|
@abstractmethod
|
|
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]: pass
|
|
|
|
@abstractmethod
|
|
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass
|
|
|
|
@abstractmethod
|
|
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint: pass
|
|
|
|
@abstractmethod
|
|
def compute_match_confidence(self, alignment: Any) -> float: pass
|
|
|
|
@abstractmethod
|
|
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]: pass
|
|
|
|
@abstractmethod
|
|
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]: pass
|
|
|
|
# --- Implementation ---
|
|
|
|
class LocalGeospatialAnchoring(IMetricRefinement):
|
|
"""
|
|
F09: Local Geospatial Anchoring Back-End
|
|
Handles precise metric refinement (absolute GPS anchoring) using LiteSAM for
|
|
cross-view UAV-to-Satellite matching via homography estimation.
|
|
"""
|
|
def __init__(self, config: Optional[LiteSAMConfig] = None, model_manager=None):
|
|
self.config = config or LiteSAMConfig()
|
|
self.model_manager = model_manager
|
|
|
|
# --- Internal Math & Coordinate Helpers ---
|
|
|
|
def _pixel_to_gps(self, pixel_x: float, pixel_y: float, tile_bounds: TileBounds, tile_w: int, tile_h: int) -> GPSPoint:
|
|
# Interpolate GPS within the tile bounds (assuming Web Mercator linearity at tile scale)
|
|
x_ratio = pixel_x / tile_w
|
|
y_ratio = pixel_y / tile_h
|
|
|
|
lon = tile_bounds.nw.lon + (tile_bounds.ne.lon - tile_bounds.nw.lon) * x_ratio
|
|
lat = tile_bounds.nw.lat + (tile_bounds.sw.lat - tile_bounds.nw.lat) * y_ratio
|
|
return GPSPoint(lat=lat, lon=lon)
|
|
|
|
def _compute_reprojection_error(self, homography: np.ndarray, src_pts: np.ndarray, dst_pts: np.ndarray) -> float:
|
|
if len(src_pts) == 0: return float('inf')
|
|
|
|
src_homog = np.hstack([src_pts, np.ones((src_pts.shape[0], 1))])
|
|
projected = (homography @ src_homog.T).T
|
|
projected = projected[:, :2] / projected[:, 2:]
|
|
|
|
errors = np.linalg.norm(projected - dst_pts, axis=1)
|
|
return float(np.mean(errors))
|
|
|
|
def _compute_spatial_distribution(self, inliers: np.ndarray) -> float:
|
|
# Mock spatial distribution heuristic (1.0 = perfect spread, 0.0 = single point)
|
|
if len(inliers) < 3: return 0.0
|
|
std_x = np.std(inliers[:, 0])
|
|
std_y = np.std(inliers[:, 1])
|
|
return min(1.0, (std_x + std_y) / 100.0) # Assume good spread if std dev is > 50px
|
|
|
|
# --- 09.01 Feature: Single Image Alignment ---
|
|
|
|
def _extract_features(self, image: np.ndarray) -> np.ndarray:
|
|
# Mock TAIFormer encoder features
|
|
return np.random.rand(100, 256).astype(np.float32)
|
|
|
|
def _compute_correspondences(self, uav_features: np.ndarray, sat_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
if self.model_manager and hasattr(self.model_manager, 'run_litesam'):
|
|
return self.model_manager.run_litesam(uav_features, sat_features)
|
|
|
|
# Mock CTM correlation field: returning matched pixel coordinates
|
|
num_matches = 100
|
|
uav_pts = np.random.rand(num_matches, 2) * [640, 480]
|
|
# Create "perfect" matches + noise for RANSAC
|
|
sat_pts = uav_pts + np.array([100.0, 50.0]) + np.random.normal(0, 2.0, (num_matches, 2))
|
|
return uav_pts.astype(np.float32), sat_pts.astype(np.float32)
|
|
|
|
def _estimate_homography_ransac(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], np.ndarray]:
|
|
if len(src_pts) < 4:
|
|
return None, np.array([])
|
|
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
|
|
return H, mask
|
|
|
|
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]:
|
|
uav_feat = self._extract_features(uav_image)
|
|
sat_feat = self._extract_features(satellite_tile)
|
|
|
|
src_pts, dst_pts = self._compute_correspondences(uav_feat, sat_feat)
|
|
H, mask = self._estimate_homography_ransac(src_pts, dst_pts)
|
|
|
|
if H is None:
|
|
return None
|
|
|
|
inliers_mask = mask.ravel() == 1
|
|
inlier_count = int(np.sum(inliers_mask))
|
|
|
|
if inlier_count < self.config.min_inliers:
|
|
return None
|
|
|
|
mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask])
|
|
stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]}
|
|
return H, stats
|
|
|
|
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
|
|
center_pt = np.array([[image_center[0], image_center[1]]], dtype=np.float32)
|
|
center_homog = np.array([[[center_pt[0,0], center_pt[0,1]]]])
|
|
|
|
transformed = cv2.perspectiveTransform(center_homog, homography)
|
|
sat_x, sat_y = transformed[0][0]
|
|
|
|
# Assuming satellite tile is 512x512 for generic testing without explicit image dims
|
|
return self._pixel_to_gps(sat_x, sat_y, tile_bounds, 512, 512)
|
|
|
|
def compute_match_confidence(self, inlier_ratio: float, inlier_count: int, mre: float, spatial_dist: float) -> float:
|
|
if inlier_count < self.config.min_inliers or mre > self.config.max_reprojection_error:
|
|
return 0.0
|
|
|
|
base_conf = min(1.0, (inlier_ratio * 0.5) + (inlier_count / 100.0 * 0.3) + (spatial_dist * 0.2))
|
|
if inlier_ratio > 0.6 and inlier_count > 50 and mre < 0.5:
|
|
return max(0.85, base_conf)
|
|
if inlier_ratio > 0.4 and inlier_count > 30:
|
|
return max(0.5, min(0.8, base_conf))
|
|
return min(0.49, base_conf)
|
|
|
|
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[AlignmentResult]:
|
|
res = self.compute_homography(uav_image, satellite_tile)
|
|
if res is None: return None
|
|
|
|
H, stats = res
|
|
h, w = uav_image.shape[:2]
|
|
gps = self.extract_gps_from_alignment(H, tile_bounds, (w//2, h//2))
|
|
|
|
ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0
|
|
spatial = self._compute_spatial_distribution(stats["inlier_pts"])
|
|
conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial)
|
|
|
|
return AlignmentResult(matched=True, homography=H, gps_center=gps, confidence=conf, inlier_count=stats["inlier_count"], total_correspondences=stats["total"], reprojection_error=stats["mre"])
|
|
|
|
# --- 09.02 Feature: Chunk Alignment ---
|
|
|
|
def _extract_chunk_features(self, chunk_images: List[np.ndarray]) -> List[np.ndarray]:
|
|
return [self._extract_features(img) for img in chunk_images]
|
|
|
|
def _aggregate_features(self, features_list: List[np.ndarray]) -> np.ndarray:
|
|
return np.mean(np.stack(features_list), axis=0)
|
|
|
|
def _aggregate_correspondences(self, correspondences_list: List[Tuple[np.ndarray, np.ndarray]]) -> Tuple[np.ndarray, np.ndarray]:
|
|
src_pts = np.vstack([c[0] for c in correspondences_list])
|
|
dst_pts = np.vstack([c[1] for c in correspondences_list])
|
|
return src_pts, dst_pts
|
|
|
|
def _estimate_chunk_homography(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Tuple[Optional[np.ndarray], dict]:
|
|
H, mask = self._estimate_homography_ransac(src_pts, dst_pts)
|
|
if H is None:
|
|
return None, {}
|
|
|
|
inliers_mask = mask.ravel() == 1
|
|
inlier_count = int(np.sum(inliers_mask))
|
|
|
|
if inlier_count < self.config.chunk_min_inliers:
|
|
return None, {}
|
|
|
|
mre = self._compute_reprojection_error(H, src_pts[inliers_mask], dst_pts[inliers_mask])
|
|
stats = {"inlier_count": inlier_count, "total": len(src_pts), "mre": mre, "inlier_pts": src_pts[inliers_mask]}
|
|
return H, stats
|
|
|
|
def _compute_sim3_transform(self, homography: np.ndarray, tile_bounds: TileBounds) -> Sim3Transform:
|
|
tx, ty = homography[0, 2], homography[1, 2]
|
|
scale = np.sqrt(homography[0, 0]**2 + homography[1, 0]**2)
|
|
rot_angle = np.arctan2(homography[1, 0], homography[0, 0])
|
|
|
|
R = np.array([
|
|
[np.cos(rot_angle), -np.sin(rot_angle), 0],
|
|
[np.sin(rot_angle), np.cos(rot_angle), 0],
|
|
[0, 0, 1]
|
|
])
|
|
return Sim3Transform(translation=np.array([tx, ty, 0.0]), rotation=R, scale=float(scale))
|
|
|
|
def _get_chunk_center_gps(self, homography: np.ndarray, tile_bounds: TileBounds, chunk_images: List[np.ndarray]) -> GPSPoint:
|
|
mid_idx = len(chunk_images) // 2
|
|
mid_img = chunk_images[mid_idx]
|
|
h, w = mid_img.shape[:2]
|
|
return self.extract_gps_from_alignment(homography, tile_bounds, (w // 2, h // 2))
|
|
|
|
def _validate_chunk_match(self, inliers: int, confidence: float) -> bool:
|
|
return inliers >= self.config.chunk_min_inliers and confidence >= self.config.confidence_threshold
|
|
|
|
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[Tuple[np.ndarray, dict]]:
|
|
if not chunk_images:
|
|
return None
|
|
|
|
sat_feat = self._extract_features(satellite_tile)
|
|
chunk_features = self._extract_chunk_features(chunk_images)
|
|
|
|
correspondences = []
|
|
for feat in chunk_features:
|
|
src, dst = self._compute_correspondences(feat, sat_feat)
|
|
correspondences.append((src, dst))
|
|
|
|
agg_src, agg_dst = self._aggregate_correspondences(correspondences)
|
|
H, stats = self._estimate_chunk_homography(agg_src, agg_dst)
|
|
if H is None:
|
|
return None
|
|
return H, stats
|
|
|
|
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
|
|
if not chunk_images:
|
|
return None
|
|
|
|
res = self.match_chunk_homography(chunk_images, satellite_tile)
|
|
if res is None: return None
|
|
|
|
H, stats = res
|
|
gps = self._get_chunk_center_gps(H, tile_bounds, chunk_images)
|
|
|
|
ratio = stats["inlier_count"] / stats["total"] if stats["total"] > 0 else 0
|
|
spatial = self._compute_spatial_distribution(stats["inlier_pts"])
|
|
conf = self.compute_match_confidence(ratio, stats["inlier_count"], stats["mre"], spatial)
|
|
|
|
if not self._validate_chunk_match(stats["inlier_count"], conf):
|
|
return None
|
|
|
|
sim3 = self._compute_sim3_transform(H, tile_bounds)
|
|
rot_angle_deg = float(np.degrees(np.arctan2(H[1, 0], H[0, 0])))
|
|
|
|
return ChunkAlignmentResult(
|
|
matched=True,
|
|
chunk_id="chunk_matched",
|
|
chunk_center_gps=gps,
|
|
rotation_angle=rot_angle_deg,
|
|
confidence=conf,
|
|
inlier_count=stats["inlier_count"],
|
|
transform=sim3,
|
|
reprojection_error=stats["mre"]
|
|
) |