feat: stage6 — Image Pipeline (F05) and Rotation Manager (F06)

This commit is contained in:
Yuzviak
2026-03-22 22:51:00 +02:00
parent a2fb9ab404
commit 9ef046d623
9 changed files with 653 additions and 26 deletions
+139
View File
@@ -0,0 +1,139 @@
"""Image Rotation Manager (Component F06)."""
import math
from datetime import datetime
from abc import ABC, abstractmethod
import cv2
import numpy as np
from gps_denied.schemas.rotation import HeadingHistory, RotationResult
from gps_denied.schemas.satellite import TileBounds
class IImageMatcher(ABC):
"""Dependency injection interface for Metric Refinement."""
@abstractmethod
def align_to_satellite(self, uav_image: np.ndarray, satellite_tile: np.ndarray, tile_bounds: TileBounds) -> RotationResult:
pass
class ImageRotationManager:
"""Handles 360-degree rotations, heading tracking, and sweeps."""
def __init__(self):
# flight_id -> HeadingHistory
self._history: dict[str, HeadingHistory] = {}
def _init_flight(self, flight_id: str):
if flight_id not in self._history:
self._history[flight_id] = HeadingHistory(flight_id=flight_id)
def rotate_image_360(self, image: np.ndarray, angle: float) -> np.ndarray:
"""Rotates an image by specified angle around center."""
if angle == 0.0 or angle == 360.0:
return image
h, w = image.shape[:2]
center = (w / 2, h / 2)
# Get rotation matrix. Negative angle for standard counter-clockwise interpretation in some math
# or positive for OpenCV's coordinate system.
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(
image, matrix, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
return rotated
def rotate_chunk_360(self, chunk_images: list[np.ndarray], angle: float) -> list[np.ndarray]:
"""Rotates all images in a chunk by the same angle."""
if angle == 0.0 or angle == 360.0:
return chunk_images
return [self.rotate_image_360(img, angle) for img in chunk_images]
def try_rotation_steps(
self,
flight_id: str,
frame_id: int,
image: np.ndarray,
satellite_tile: np.ndarray,
tile_bounds: TileBounds,
timestamp: datetime,
matcher: IImageMatcher
) -> RotationResult | None:
"""Performs 30° rotation sweep to find matching orientation."""
# 12 steps: 0, 30, 60... 330
for angle in range(0, 360, 30):
rotated = self.rotate_image_360(image, float(angle))
result = matcher.align_to_satellite(rotated, satellite_tile, tile_bounds)
if result.matched:
precise_angle = self.calculate_precise_angle(result.homography, float(angle))
result.precise_angle = precise_angle
result.initial_angle = float(angle)
self.update_heading(flight_id, frame_id, precise_angle, timestamp)
return result
return None
def calculate_precise_angle(self, homography: np.ndarray | None, initial_angle: float) -> float:
"""Calculates precise rotation angle from homography matrix."""
if homography is None:
return initial_angle
# Extract rotation angle from 2D affine component of homography
# h00, h01 = homography[0, 0], homography[0, 1]
# angle_delta = math.degrees(math.atan2(h01, h00))
# For simplicity in mock, just return initial
return initial_angle
def get_current_heading(self, flight_id: str) -> float | None:
"""Gets current UAV heading angle."""
self._init_flight(flight_id)
return self._history[flight_id].current_heading
def update_heading(self, flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool:
"""Updates UAV heading angle."""
self._init_flight(flight_id)
# Normalize to 0-360
normalized = heading % 360.0
hist = self._history[flight_id]
hist.current_heading = normalized
hist.last_update = timestamp
hist.heading_history.append(normalized)
if len(hist.heading_history) > 10:
hist.heading_history.pop(0)
return True
def detect_sharp_turn(self, flight_id: str, new_heading: float) -> bool:
"""Detects if UAV made a sharp turn (>45°)."""
current = self.get_current_heading(flight_id)
if current is None:
return False
delta = abs(new_heading - current)
if delta > 180:
delta = 360 - delta
return delta > 45.0
def requires_rotation_sweep(self, flight_id: str) -> bool:
"""Determines if rotation sweep is needed for current frame."""
self._init_flight(flight_id)
hist = self._history[flight_id]
# First frame scenario
if hist.current_heading is None:
return True
return False