mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 22:21:13 +00:00
7e64ef8d2b
Phase 2 deliverables not yet committed from plan execution: - structlog wired to 10 hot-path files (orchestrator, eskf, components) - bind_contextvars(correlation_id=frame_id) in process_frame - obs/logging_config.py: configure_logging(env) JSON/console renderer - pyproject.toml: structlog>=25.1, --strict-markers, 6 markers registered - tests/conftest.py: ac(id) validator plugin + pytest_collection hooks Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
191 lines
7.1 KiB
Python
191 lines
7.1 KiB
Python
"""Metric Refinement implementation (SAT-03/04). Phase 1 home of MetricRefinement impl.
|
|
|
|
SAT-03: GSD normalization — downsample camera frame to satellite resolution.
|
|
SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio.
|
|
"""
|
|
|
|
from typing import List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import structlog
|
|
|
|
from gps_denied.components.satellite_matcher.protocol import IMetricRefinement
|
|
from gps_denied.core.models import IModelManager
|
|
from gps_denied.schemas import GPSPoint
|
|
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform
|
|
from gps_denied.schemas.satellite import TileBounds
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
class MetricRefinement(IMetricRefinement):
|
|
"""LiteSAM/XFeat-based alignment with GSD normalization.
|
|
|
|
SAT-03: normalize_gsd() downsamples UAV frame to match satellite GSD before matching.
|
|
SAT-04: confidence is computed as inlier_count / total_correspondences (inlier ratio).
|
|
"""
|
|
|
|
def __init__(self, model_manager: IModelManager):
|
|
self.model_manager = model_manager
|
|
|
|
# ------------------------------------------------------------------
|
|
# SAT-03: GSD normalization
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def normalize_gsd(
|
|
uav_image: np.ndarray,
|
|
uav_gsd_mpp: float,
|
|
sat_gsd_mpp: float,
|
|
) -> np.ndarray:
|
|
"""Resize UAV frame to match satellite GSD (meters-per-pixel).
|
|
|
|
Args:
|
|
uav_image: Raw UAV camera frame.
|
|
uav_gsd_mpp: UAV GSD in m/px (e.g. 0.159 at 600 m altitude).
|
|
sat_gsd_mpp: Satellite tile GSD in m/px (e.g. 0.6 at zoom 18).
|
|
|
|
Returns:
|
|
Resized image. If already coarser than satellite, returned unchanged.
|
|
"""
|
|
if uav_gsd_mpp <= 0 or sat_gsd_mpp <= 0:
|
|
return uav_image
|
|
scale = uav_gsd_mpp / sat_gsd_mpp
|
|
if scale >= 1.0:
|
|
return uav_image # UAV already coarser, nothing to do
|
|
h, w = uav_image.shape[:2]
|
|
new_w = max(1, int(w * scale))
|
|
new_h = max(1, int(h * scale))
|
|
return cv2.resize(uav_image, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
|
|
|
def compute_homography(self, uav_image: np.ndarray, satellite_tile: np.ndarray) -> Optional[np.ndarray]:
|
|
engine = self.model_manager.get_inference_engine("LiteSAM")
|
|
# In reality we pass both images, for mock we just invoke to get generated format
|
|
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
|
|
|
|
if res["inlier_count"] < 15:
|
|
return None
|
|
|
|
return res["homography"]
|
|
|
|
def extract_gps_from_alignment(self, homography: np.ndarray, tile_bounds: TileBounds, image_center: Tuple[int, int]) -> GPSPoint:
|
|
# UAV image center
|
|
cx, cy = image_center
|
|
# Apply homography
|
|
pt = np.array([cx, cy, 1.0])
|
|
# transformed = H * pt
|
|
transformed = homography @ pt
|
|
transformed = transformed / transformed[2]
|
|
|
|
tx, ty = transformed[0], transformed[1]
|
|
|
|
# Approximate GPS mapping using bounds
|
|
# ty maps to latitude (ty=0 is North, ty=Height is South)
|
|
# tx maps to longitude (tx=0 is West, tx=Width is East)
|
|
# We assume standard 256x256 tiles for this mock calculation
|
|
tile_size = 256.0
|
|
|
|
lat_span = tile_bounds.nw.lat - tile_bounds.sw.lat
|
|
lon_span = tile_bounds.ne.lon - tile_bounds.nw.lon
|
|
|
|
# Calculate offsets
|
|
# If ty is down, lat decreases
|
|
lat_rel = (tile_size - ty) / tile_size
|
|
lon_rel = tx / tile_size
|
|
|
|
target_lat = tile_bounds.sw.lat + (lat_span * lat_rel)
|
|
target_lon = tile_bounds.nw.lon + (lon_span * lon_rel)
|
|
|
|
return GPSPoint(lat=target_lat, lon=target_lon)
|
|
|
|
def align_to_satellite(
|
|
self,
|
|
uav_image: np.ndarray,
|
|
satellite_tile: np.ndarray,
|
|
tile_bounds: TileBounds,
|
|
uav_gsd_mpp: float = 0.0,
|
|
) -> Optional[AlignmentResult]:
|
|
"""Align UAV frame to satellite tile.
|
|
|
|
Args:
|
|
uav_gsd_mpp: If > 0, the UAV frame is GSD-normalised to satellite
|
|
resolution before matching (SAT-03).
|
|
"""
|
|
# SAT-03: optional GSD normalization
|
|
sat_gsd = tile_bounds.gsd
|
|
if uav_gsd_mpp > 0 and sat_gsd > 0:
|
|
uav_image = self.normalize_gsd(uav_image, uav_gsd_mpp, sat_gsd)
|
|
|
|
engine = self.model_manager.get_inference_engine("LiteSAM")
|
|
res = engine.infer({"img1": uav_image, "img2": satellite_tile})
|
|
|
|
if res["inlier_count"] < 15:
|
|
return None
|
|
|
|
h, w = uav_image.shape[:2] if hasattr(uav_image, "shape") else (480, 640)
|
|
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
|
|
|
|
# SAT-04: confidence = inlier_ratio (not raw engine confidence)
|
|
total = res.get("total_correspondences", max(res["inlier_count"], 1))
|
|
inlier_ratio = res["inlier_count"] / max(total, 1)
|
|
|
|
align = AlignmentResult(
|
|
matched=True,
|
|
homography=res["homography"],
|
|
gps_center=gps,
|
|
confidence=inlier_ratio,
|
|
inlier_count=res["inlier_count"],
|
|
total_correspondences=total,
|
|
reprojection_error=res.get("reprojection_error", 1.0),
|
|
)
|
|
return align if self.compute_match_confidence(align) > 0.5 else None
|
|
|
|
def compute_match_confidence(self, alignment: AlignmentResult) -> float:
|
|
# Complex heuristic combining inliers, reprojection error
|
|
score = alignment.confidence
|
|
# Penalty for high reproj error
|
|
if alignment.reprojection_error > 2.0:
|
|
score -= 0.2
|
|
return max(0.0, min(1.0, score))
|
|
|
|
def match_chunk_homography(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray) -> Optional[np.ndarray]:
|
|
# Aggregate logic is complex, for mock we just use the first image's match
|
|
if not chunk_images:
|
|
return None
|
|
return self.compute_homography(chunk_images[0], satellite_tile)
|
|
|
|
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: TileBounds) -> Optional[ChunkAlignmentResult]:
|
|
if not chunk_images:
|
|
return None
|
|
|
|
engine = self.model_manager.get_inference_engine("LiteSAM")
|
|
res = engine.infer({"img1": chunk_images[0], "img2": satellite_tile})
|
|
|
|
# Demands higher inliners for chunk
|
|
if res["inlier_count"] < 30:
|
|
return None
|
|
|
|
h, w = chunk_images[0].shape[:2] if hasattr(chunk_images[0], "shape") else (480, 640)
|
|
gps = self.extract_gps_from_alignment(res["homography"], tile_bounds, (w // 2, h // 2))
|
|
|
|
# Fake sim3
|
|
sim3 = Sim3Transform(
|
|
translation=np.array([10., 0., 0.]),
|
|
rotation=np.eye(3),
|
|
scale=1.0
|
|
)
|
|
|
|
chunk_align = ChunkAlignmentResult(
|
|
matched=True,
|
|
chunk_id="chunk1",
|
|
chunk_center_gps=gps,
|
|
rotation_angle=0.0,
|
|
confidence=res["confidence"],
|
|
inlier_count=res["inlier_count"],
|
|
transform=sim3,
|
|
reprojection_error=1.0
|
|
)
|
|
|
|
return chunk_align
|