mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 11:06:37 +00:00
218 lines
9.0 KiB
Python
218 lines
9.0 KiB
Python
import cv2
|
|
import math
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
from pydantic import BaseModel, Field
|
|
from abc import ABC, abstractmethod
|
|
|
|
from h07_image_rotation_utils import ImageRotationUtils
|
|
|
|
# --- Data Models ---
|
|
|
|
class RotationResult(BaseModel):
|
|
matched: bool
|
|
initial_angle: float
|
|
precise_angle: float
|
|
confidence: float
|
|
homography: Any
|
|
inlier_count: int
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class HeadingHistory(BaseModel):
|
|
flight_id: str
|
|
current_heading: float
|
|
heading_history: List[float] = Field(default_factory=list)
|
|
last_update: datetime
|
|
sharp_turns: int = 0
|
|
|
|
class RotationConfig(BaseModel):
|
|
step_angle: float = 30.0
|
|
sharp_turn_threshold: float = 45.0
|
|
confidence_threshold: float = 0.7
|
|
history_size: int = 10
|
|
|
|
class AlignmentResult(BaseModel):
|
|
matched: bool
|
|
confidence: float
|
|
homography: Any
|
|
inlier_count: int
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class ChunkAlignmentResult(BaseModel):
|
|
matched: bool
|
|
confidence: float
|
|
homography: Any
|
|
inlier_count: int
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
# --- Interface ---
|
|
|
|
class IImageMatcher(ABC):
|
|
@abstractmethod
|
|
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: Any) -> AlignmentResult: pass
|
|
|
|
@abstractmethod
|
|
def align_chunk_to_satellite(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: Any) -> ChunkAlignmentResult: pass
|
|
|
|
class IImageRotationManager(ABC):
|
|
@abstractmethod
|
|
def rotate_image_360(self, image: np.ndarray, angle: float) -> np.ndarray: pass
|
|
|
|
@abstractmethod
|
|
def try_rotation_steps(self, flight_id: str, frame_id: int, image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: Any, timestamp: datetime, matcher: IImageMatcher) -> Optional[RotationResult]: pass
|
|
|
|
@abstractmethod
|
|
def calculate_precise_angle(self, homography: np.ndarray, initial_angle: float) -> float: pass
|
|
|
|
@abstractmethod
|
|
def get_current_heading(self, flight_id: str) -> Optional[float]: pass
|
|
|
|
@abstractmethod
|
|
def update_heading(self, flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool: pass
|
|
|
|
@abstractmethod
|
|
def detect_sharp_turn(self, flight_id: str, new_heading: float) -> bool: pass
|
|
|
|
@abstractmethod
|
|
def requires_rotation_sweep(self, flight_id: str) -> bool: pass
|
|
|
|
@abstractmethod
|
|
def rotate_chunk_360(self, chunk_images: List[np.ndarray], angle: float) -> List[np.ndarray]: pass
|
|
|
|
@abstractmethod
|
|
def try_chunk_rotation_steps(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: Any, matcher: IImageMatcher) -> Optional[RotationResult]: pass
|
|
|
|
# --- Implementation ---
|
|
|
|
class ImageRotationManager(IImageRotationManager):
|
|
def __init__(self, config: Optional[RotationConfig] = None):
|
|
self.config = config or RotationConfig()
|
|
self.heading_states: Dict[str, HeadingHistory] = {}
|
|
self.sweep_flags: Dict[str, bool] = {}
|
|
self.rot_utils = ImageRotationUtils()
|
|
|
|
def rotate_image_360(self, image: np.ndarray, angle: float) -> np.ndarray:
|
|
return self.rot_utils.rotate_image(image, angle)
|
|
|
|
def rotate_chunk_360(self, chunk_images: List[np.ndarray], angle: float) -> List[np.ndarray]:
|
|
return [self.rotate_image_360(img, angle) for img in chunk_images]
|
|
|
|
def _extract_rotation_from_homography(self, homography: Any) -> float:
|
|
if homography is None or homography.shape != (3, 3): return 0.0
|
|
return math.degrees(math.atan2(homography[1, 0], homography[0, 0]))
|
|
|
|
def _combine_angles(self, initial_angle: float, delta_angle: float) -> float:
|
|
return self.rot_utils.normalize_angle(initial_angle + delta_angle)
|
|
|
|
def calculate_precise_angle(self, homography: Any, initial_angle: float) -> float:
|
|
delta = self._extract_rotation_from_homography(homography)
|
|
return self._combine_angles(initial_angle, delta)
|
|
|
|
# --- 06.02 Heading Management Internals ---
|
|
|
|
def _normalize_angle(self, angle: float) -> float:
|
|
return self.rot_utils.normalize_angle(angle)
|
|
|
|
def _calculate_angle_delta(self, angle1: float, angle2: float) -> float:
|
|
delta = abs(self._normalize_angle(angle1) - self._normalize_angle(angle2))
|
|
if delta > 180.0:
|
|
delta = 360.0 - delta
|
|
return delta
|
|
|
|
def _get_flight_state(self, flight_id: str) -> Optional[HeadingHistory]:
|
|
return self.heading_states.get(flight_id)
|
|
|
|
def _add_to_history(self, flight_id: str, heading: float):
|
|
state = self.heading_states[flight_id]
|
|
state.heading_history.append(heading)
|
|
if len(state.heading_history) > self.config.history_size:
|
|
state.heading_history.pop(0)
|
|
|
|
def _set_sweep_required(self, flight_id: str, required: bool):
|
|
self.sweep_flags[flight_id] = required
|
|
|
|
# --- 06.02 Heading Management Public API ---
|
|
|
|
def get_current_heading(self, flight_id: str) -> Optional[float]:
|
|
state = self._get_flight_state(flight_id)
|
|
return state.current_heading if state else None
|
|
|
|
def update_heading(self, flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool:
|
|
normalized = self._normalize_angle(heading)
|
|
state = self._get_flight_state(flight_id)
|
|
|
|
if not state:
|
|
self.heading_states[flight_id] = HeadingHistory(
|
|
flight_id=flight_id, current_heading=normalized,
|
|
heading_history=[], last_update=timestamp, sharp_turns=0
|
|
)
|
|
else:
|
|
state.current_heading = normalized
|
|
state.last_update = timestamp
|
|
|
|
self._add_to_history(flight_id, normalized)
|
|
# Automatically clear any pending sweep flag since we successfully oriented
|
|
self._set_sweep_required(flight_id, False)
|
|
return True
|
|
|
|
def detect_sharp_turn(self, flight_id: str, new_heading: float) -> bool:
|
|
current = self.get_current_heading(flight_id)
|
|
if current is None:
|
|
return False
|
|
|
|
delta = self._calculate_angle_delta(new_heading, current)
|
|
is_sharp = delta > self.config.sharp_turn_threshold
|
|
if is_sharp and self._get_flight_state(flight_id):
|
|
self.heading_states[flight_id].sharp_turns += 1
|
|
return is_sharp
|
|
|
|
def requires_rotation_sweep(self, flight_id: str) -> bool:
|
|
if not self._get_flight_state(flight_id):
|
|
return True # Always sweep on the first frame
|
|
return self.sweep_flags.get(flight_id, False)
|
|
|
|
def _get_rotation_steps(self) -> List[float]:
|
|
return [float(a) for a in range(0, 360, int(self.config.step_angle))]
|
|
|
|
def _select_best_result(self, results: List[Tuple[float, Any]]) -> Optional[Tuple[float, Any]]:
|
|
valid_results = [
|
|
(angle, res) for angle, res in results
|
|
if res and res.matched and res.confidence > self.config.confidence_threshold
|
|
]
|
|
if not valid_results:
|
|
return None
|
|
return max(valid_results, key=lambda item: item[1].confidence)
|
|
|
|
def _run_sweep(self, match_func, *args) -> Optional[Tuple[float, Any]]:
|
|
steps = self._get_rotation_steps()
|
|
all_results = [(angle, match_func(angle, *args)) for angle in steps]
|
|
return self._select_best_result(all_results)
|
|
|
|
def try_rotation_steps(self, flight_id: str, frame_id: int, image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: Any, timestamp: datetime, matcher: IImageMatcher) -> Optional[RotationResult]:
|
|
def match_wrapper(angle, img, sat, bnd):
|
|
rotated = self.rotate_image_360(img, angle)
|
|
return matcher.align_to_satellite(rotated, sat, bnd)
|
|
|
|
best = self._run_sweep(match_wrapper, image, satellite_tile, tile_bounds)
|
|
if best:
|
|
angle, res = best
|
|
precise_angle = self.calculate_precise_angle(res.homography, angle)
|
|
self.update_heading(flight_id, frame_id, precise_angle, timestamp)
|
|
return RotationResult(matched=True, initial_angle=angle, precise_angle=precise_angle, confidence=res.confidence, homography=res.homography, inlier_count=res.inlier_count)
|
|
return None
|
|
|
|
def try_chunk_rotation_steps(self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray, tile_bounds: Any, matcher: IImageMatcher) -> Optional[RotationResult]:
|
|
def chunk_match_wrapper(angle, chunk, sat, bnd):
|
|
rotated_chunk = self.rotate_chunk_360(chunk, angle)
|
|
return matcher.align_chunk_to_satellite(rotated_chunk, sat, bnd)
|
|
|
|
best = self._run_sweep(chunk_match_wrapper, chunk_images, satellite_tile, tile_bounds)
|
|
if best:
|
|
angle, res = best
|
|
precise_angle = self.calculate_precise_angle(res.homography, angle)
|
|
return RotationResult(matched=True, initial_angle=angle, precise_angle=precise_angle, confidence=res.confidence, homography=res.homography, inlier_count=res.inlier_count)
|
|
return None |