mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-23 01:56:37 +00:00
328 lines
12 KiB
Python
328 lines
12 KiB
Python
import time
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
import numpy as np
|
|
from pydantic import BaseModel, Field
|
|
|
|
from f02_1_flight_lifecycle_manager import GPSPoint
|
|
from f04_satellite_data_manager import TileCoords
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Data Models ---
|
|
|
|
class RelativePose(BaseModel):
|
|
transform: np.ndarray
|
|
inlier_count: int
|
|
reprojection_error: float
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class Sim3Transform(BaseModel):
|
|
translation: np.ndarray
|
|
rotation: np.ndarray
|
|
scale: float
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class AlignmentResult(BaseModel):
|
|
matched: bool
|
|
gps: GPSPoint
|
|
confidence: float
|
|
inlier_count: int
|
|
transform: np.ndarray
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class ConfidenceAssessment(BaseModel):
|
|
overall_confidence: float
|
|
vo_confidence: float
|
|
litesam_confidence: float
|
|
inlier_count: int
|
|
tracking_status: str # "good", "degraded", "lost"
|
|
|
|
class SearchSession(BaseModel):
|
|
session_id: str
|
|
flight_id: str
|
|
frame_id: int
|
|
center_gps: GPSPoint
|
|
current_grid_size: int
|
|
max_grid_size: int
|
|
found: bool
|
|
exhausted: bool
|
|
|
|
class SearchStatus(BaseModel):
|
|
current_grid_size: int
|
|
found: bool
|
|
exhausted: bool
|
|
|
|
class TileCandidate(BaseModel):
|
|
tile_id: str
|
|
score: float
|
|
gps: GPSPoint
|
|
|
|
class UserInputRequest(BaseModel):
|
|
request_id: str
|
|
flight_id: str
|
|
frame_id: int
|
|
uav_image: Any = Field(exclude=True)
|
|
candidate_tiles: List[TileCandidate]
|
|
message: str
|
|
created_at: datetime
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class UserAnchor(BaseModel):
|
|
uav_pixel: Tuple[float, float]
|
|
satellite_gps: GPSPoint
|
|
confidence: float = 1.0
|
|
|
|
class ChunkHandle(BaseModel):
|
|
chunk_id: str
|
|
flight_id: str
|
|
start_frame_id: int = 0
|
|
end_frame_id: Optional[int] = None
|
|
frames: List[int] = []
|
|
is_active: bool = True
|
|
has_anchor: bool = False
|
|
anchor_frame_id: Optional[int] = None
|
|
anchor_gps: Optional[GPSPoint] = None
|
|
matching_status: str = "unanchored" # "unanchored", "matching", "anchored", "merged"
|
|
|
|
class ChunkAlignmentResult(BaseModel):
|
|
matched: bool
|
|
chunk_id: str
|
|
chunk_center_gps: GPSPoint
|
|
rotation_angle: float
|
|
confidence: float
|
|
inlier_count: int
|
|
transform: Sim3Transform
|
|
|
|
class RecoveryStatus(BaseModel):
|
|
success: bool
|
|
method: str
|
|
gps: Optional[GPSPoint]
|
|
chunk_id: Optional[str]
|
|
message: Optional[str]
|
|
|
|
# --- Implementation ---
|
|
|
|
class FailureRecoveryCoordinator:
|
|
"""
|
|
Coordinates failure recovery strategies (progressive search, chunk matching, user input).
|
|
Pure logic component: decides what to do, delegates execution to dependencies.
|
|
"""
|
|
def __init__(self, deps: Dict[str, Any]):
|
|
# Dependencies injected via constructor dictionary to prevent circular imports
|
|
self.f04 = deps.get("satellite_data_manager")
|
|
self.f06 = deps.get("image_rotation_manager")
|
|
self.f08 = deps.get("global_place_recognition")
|
|
self.f09 = deps.get("metric_refinement")
|
|
self.f10 = deps.get("factor_graph_optimizer")
|
|
self.f12 = deps.get("route_chunk_manager")
|
|
|
|
# --- Status Checks ---
|
|
|
|
def check_confidence(self, vo_result: RelativePose, litesam_result: Optional[AlignmentResult]) -> ConfidenceAssessment:
|
|
inliers = vo_result.inlier_count if vo_result else 0
|
|
|
|
if inliers > 50:
|
|
status = "good"
|
|
vo_conf = 1.0
|
|
elif inliers >= 20:
|
|
status = "degraded"
|
|
vo_conf = inliers / 50.0
|
|
else:
|
|
status = "lost"
|
|
vo_conf = 0.0
|
|
|
|
ls_conf = litesam_result.confidence if litesam_result else 0.0
|
|
overall = max(vo_conf, ls_conf)
|
|
|
|
return ConfidenceAssessment(
|
|
overall_confidence=overall,
|
|
vo_confidence=vo_conf,
|
|
litesam_confidence=ls_conf,
|
|
inlier_count=inliers,
|
|
tracking_status=status
|
|
)
|
|
|
|
def detect_tracking_loss(self, confidence: ConfidenceAssessment) -> bool:
|
|
return confidence.tracking_status == "lost"
|
|
|
|
# --- Search & Recovery ---
|
|
|
|
def start_search(self, flight_id: str, frame_id: int, estimated_gps: GPSPoint) -> SearchSession:
|
|
logger.info(f"Starting progressive search for flight {flight_id}, frame {frame_id} at {estimated_gps}")
|
|
return SearchSession(
|
|
session_id=f"search_{flight_id}_{frame_id}",
|
|
flight_id=flight_id,
|
|
frame_id=frame_id,
|
|
center_gps=estimated_gps,
|
|
current_grid_size=1,
|
|
max_grid_size=25,
|
|
found=False,
|
|
exhausted=False
|
|
)
|
|
|
|
def expand_search_radius(self, session: SearchSession) -> List[TileCoords]:
|
|
grid_progression = [1, 4, 9, 16, 25]
|
|
try:
|
|
idx = grid_progression.index(session.current_grid_size)
|
|
if idx + 1 < len(grid_progression):
|
|
new_size = grid_progression[idx + 1]
|
|
|
|
# Mocking tile expansion assuming F04 has compute_tile_coords
|
|
center_tc = self.f04.compute_tile_coords(session.center_gps.lat, session.center_gps.lon, zoom=18)
|
|
new_tiles = self.f04.expand_search_grid(center_tc, session.current_grid_size, new_size)
|
|
|
|
session.current_grid_size = new_size
|
|
return new_tiles
|
|
except ValueError:
|
|
pass
|
|
|
|
session.exhausted = True
|
|
return []
|
|
|
|
def try_current_grid(self, session: SearchSession, tiles: Dict[str, Tuple[np.ndarray, Any]], uav_image: np.ndarray) -> Optional[AlignmentResult]:
|
|
if os.environ.get("USE_MOCK_MODELS") == "1":
|
|
# Fake a successful satellite recovery to keep the simulation moving automatically
|
|
mock_res = AlignmentResult(
|
|
matched=True,
|
|
gps=session.center_gps,
|
|
confidence=0.9,
|
|
inlier_count=100,
|
|
transform=np.eye(3)
|
|
)
|
|
self.mark_found(session, mock_res)
|
|
return mock_res
|
|
|
|
for tile_id, (tile_img, bounds) in tiles.items():
|
|
if self.f09:
|
|
result = self.f09.align_to_satellite(uav_image, tile_img, bounds)
|
|
if result and result.confidence > 0.7:
|
|
self.mark_found(session, result)
|
|
return result
|
|
return None
|
|
|
|
def mark_found(self, session: SearchSession, result: AlignmentResult) -> bool:
|
|
session.found = True
|
|
logger.info(f"Search session {session.session_id} succeeded at grid size {session.current_grid_size}.")
|
|
return True
|
|
|
|
def get_search_status(self, session: SearchSession) -> SearchStatus:
|
|
return SearchStatus(
|
|
current_grid_size=session.current_grid_size,
|
|
found=session.found,
|
|
exhausted=session.exhausted
|
|
)
|
|
|
|
def create_user_input_request(self, flight_id: str, frame_id: int, uav_image: np.ndarray, candidate_tiles: List[TileCandidate]) -> UserInputRequest:
|
|
return UserInputRequest(
|
|
request_id=f"usr_req_{flight_id}_{frame_id}",
|
|
flight_id=flight_id,
|
|
frame_id=frame_id,
|
|
uav_image=uav_image,
|
|
candidate_tiles=candidate_tiles,
|
|
message="Tracking lost and automatic recovery failed. Please provide a location anchor.",
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
def apply_user_anchor(self, flight_id: str, frame_id: int, anchor: UserAnchor) -> bool:
|
|
logger.info(f"Applying user anchor for frame {frame_id} at {anchor.satellite_gps}")
|
|
gps_array = np.array([anchor.satellite_gps.lat, anchor.satellite_gps.lon, 400.0]) # Defaulting alt
|
|
|
|
# Delegate to Factor Graph Optimizer to add hard constraint
|
|
# Note: In a real integration, we'd need to find which chunk this frame belongs to
|
|
chunk_id = self.f12.get_chunk_for_frame(flight_id, frame_id)
|
|
if chunk_id:
|
|
self.f10.add_chunk_anchor(chunk_id, frame_id, gps_array)
|
|
return True
|
|
return False
|
|
|
|
# --- Chunk Recovery ---
|
|
|
|
def create_chunk_on_tracking_loss(self, flight_id: str, frame_id: int) -> ChunkHandle:
|
|
logger.warning(f"Creating proactive recovery chunk starting at frame {frame_id}")
|
|
chunk = self.f12.create_chunk(flight_id, frame_id)
|
|
return chunk
|
|
|
|
def try_chunk_semantic_matching(self, chunk_id: str) -> Optional[List[TileCandidate]]:
|
|
"""Attempts semantic matching for a whole chunk using aggregate descriptors."""
|
|
logger.info(f"Attempting semantic matching for chunk {chunk_id}.")
|
|
if not hasattr(self.f12, 'get_chunk_images'):
|
|
return None
|
|
|
|
chunk_images = self.f12.get_chunk_images(chunk_id)
|
|
if not chunk_images:
|
|
return None
|
|
|
|
candidates = self.f08.retrieve_candidate_tiles_for_chunk(chunk_images)
|
|
return candidates if candidates else None
|
|
|
|
def try_chunk_litesam_matching(self, chunk_id: str, candidate_tiles: List[TileCandidate]) -> Optional[ChunkAlignmentResult]:
|
|
"""Attempts LiteSAM matching across candidate tiles with rotation sweeps."""
|
|
logger.info(f"Attempting LiteSAM rotation sweeps for chunk {chunk_id}")
|
|
|
|
if not hasattr(self.f12, 'get_chunk_images'):
|
|
return None
|
|
|
|
chunk_images = self.f12.get_chunk_images(chunk_id)
|
|
if not chunk_images:
|
|
return None
|
|
|
|
for candidate in candidate_tiles:
|
|
tile_img = self.f04.fetch_tile(candidate.gps.lat, candidate.gps.lon, zoom=18)
|
|
if tile_img is not None:
|
|
coords = self.f04.compute_tile_coords(candidate.gps.lat, candidate.gps.lon, zoom=18)
|
|
bounds = self.f04.compute_tile_bounds(coords)
|
|
|
|
rot_result = self.f06.try_chunk_rotation_steps(chunk_images, tile_img, bounds, self.f09)
|
|
if rot_result and rot_result.matched:
|
|
sim3 = self.f09._compute_sim3_transform(rot_result.homography, bounds) if hasattr(self.f09, '_compute_sim3_transform') else Sim3Transform(translation=np.zeros(3), rotation=np.eye(3), scale=1.0)
|
|
gps = self.f09._get_chunk_center_gps(rot_result.homography, bounds, chunk_images) if hasattr(self.f09, '_get_chunk_center_gps') else candidate.gps
|
|
return ChunkAlignmentResult(
|
|
matched=True,
|
|
chunk_id=chunk_id,
|
|
chunk_center_gps=gps,
|
|
rotation_angle=rot_result.precise_angle,
|
|
confidence=rot_result.confidence,
|
|
inlier_count=rot_result.inlier_count,
|
|
transform=sim3,
|
|
reprojection_error=0.0
|
|
)
|
|
return None
|
|
|
|
def merge_chunk_to_trajectory(self, flight_id: str, chunk_id: str, alignment_result: ChunkAlignmentResult) -> bool:
|
|
logger.info(f"Merging chunk {chunk_id} to global trajectory.")
|
|
|
|
main_chunk_id = self.f12.get_preceding_chunk(flight_id, chunk_id) if hasattr(self.f12, 'get_preceding_chunk') else "main"
|
|
if not main_chunk_id:
|
|
main_chunk_id = "main"
|
|
|
|
anchor_gps = np.array([alignment_result.chunk_center_gps.lat, alignment_result.chunk_center_gps.lon, 400.0])
|
|
if hasattr(self.f12, 'mark_chunk_anchored'):
|
|
self.f12.mark_chunk_anchored(chunk_id, anchor_gps)
|
|
|
|
if hasattr(self.f12, 'merge_chunks'):
|
|
return self.f12.merge_chunks(main_chunk_id, chunk_id, alignment_result.transform)
|
|
return False
|
|
|
|
def process_unanchored_chunks(self, flight_id: str) -> None:
|
|
"""Background task loop structure designed to be called by a worker thread."""
|
|
if not hasattr(self.f12, 'get_chunks_for_matching'):
|
|
return
|
|
|
|
unanchored_chunks = self.f12.get_chunks_for_matching(flight_id)
|
|
for chunk in unanchored_chunks:
|
|
if hasattr(self.f12, 'is_chunk_ready_for_matching') and self.f12.is_chunk_ready_for_matching(chunk.chunk_id):
|
|
if hasattr(self.f12, 'mark_chunk_matching'):
|
|
self.f12.mark_chunk_matching(chunk.chunk_id)
|
|
|
|
candidates = self.try_chunk_semantic_matching(chunk.chunk_id)
|
|
if candidates:
|
|
alignment = self.try_chunk_litesam_matching(chunk.chunk_id, candidates)
|
|
if alignment:
|
|
self.merge_chunk_to_trajectory(flight_id, chunk.chunk_id, alignment) |