Files
gps-denied-onboard/f11_failure_recovery_coordinator.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

328 lines
12 KiB
Python

import time
import logging
import os
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
import numpy as np
from pydantic import BaseModel, Field
from f02_1_flight_lifecycle_manager import GPSPoint
from f04_satellite_data_manager import TileCoords
logger = logging.getLogger(__name__)
# --- Data Models ---
class RelativePose(BaseModel):
transform: np.ndarray
inlier_count: int
reprojection_error: float
model_config = {"arbitrary_types_allowed": True}
class Sim3Transform(BaseModel):
translation: np.ndarray
rotation: np.ndarray
scale: float
model_config = {"arbitrary_types_allowed": True}
class AlignmentResult(BaseModel):
matched: bool
gps: GPSPoint
confidence: float
inlier_count: int
transform: np.ndarray
model_config = {"arbitrary_types_allowed": True}
class ConfidenceAssessment(BaseModel):
overall_confidence: float
vo_confidence: float
litesam_confidence: float
inlier_count: int
tracking_status: str # "good", "degraded", "lost"
class SearchSession(BaseModel):
session_id: str
flight_id: str
frame_id: int
center_gps: GPSPoint
current_grid_size: int
max_grid_size: int
found: bool
exhausted: bool
class SearchStatus(BaseModel):
current_grid_size: int
found: bool
exhausted: bool
class TileCandidate(BaseModel):
tile_id: str
score: float
gps: GPSPoint
class UserInputRequest(BaseModel):
request_id: str
flight_id: str
frame_id: int
uav_image: Any = Field(exclude=True)
candidate_tiles: List[TileCandidate]
message: str
created_at: datetime
model_config = {"arbitrary_types_allowed": True}
class UserAnchor(BaseModel):
uav_pixel: Tuple[float, float]
satellite_gps: GPSPoint
confidence: float = 1.0
class ChunkHandle(BaseModel):
chunk_id: str
flight_id: str
start_frame_id: int = 0
end_frame_id: Optional[int] = None
frames: List[int] = []
is_active: bool = True
has_anchor: bool = False
anchor_frame_id: Optional[int] = None
anchor_gps: Optional[GPSPoint] = None
matching_status: str = "unanchored" # "unanchored", "matching", "anchored", "merged"
class ChunkAlignmentResult(BaseModel):
matched: bool
chunk_id: str
chunk_center_gps: GPSPoint
rotation_angle: float
confidence: float
inlier_count: int
transform: Sim3Transform
class RecoveryStatus(BaseModel):
success: bool
method: str
gps: Optional[GPSPoint]
chunk_id: Optional[str]
message: Optional[str]
# --- Implementation ---
class FailureRecoveryCoordinator:
"""
Coordinates failure recovery strategies (progressive search, chunk matching, user input).
Pure logic component: decides what to do, delegates execution to dependencies.
"""
def __init__(self, deps: Dict[str, Any]):
# Dependencies injected via constructor dictionary to prevent circular imports
self.f04 = deps.get("satellite_data_manager")
self.f06 = deps.get("image_rotation_manager")
self.f08 = deps.get("global_place_recognition")
self.f09 = deps.get("metric_refinement")
self.f10 = deps.get("factor_graph_optimizer")
self.f12 = deps.get("route_chunk_manager")
# --- Status Checks ---
def check_confidence(self, vo_result: RelativePose, litesam_result: Optional[AlignmentResult]) -> ConfidenceAssessment:
inliers = vo_result.inlier_count if vo_result else 0
if inliers > 50:
status = "good"
vo_conf = 1.0
elif inliers >= 20:
status = "degraded"
vo_conf = inliers / 50.0
else:
status = "lost"
vo_conf = 0.0
ls_conf = litesam_result.confidence if litesam_result else 0.0
overall = max(vo_conf, ls_conf)
return ConfidenceAssessment(
overall_confidence=overall,
vo_confidence=vo_conf,
litesam_confidence=ls_conf,
inlier_count=inliers,
tracking_status=status
)
def detect_tracking_loss(self, confidence: ConfidenceAssessment) -> bool:
return confidence.tracking_status == "lost"
# --- Search & Recovery ---
def start_search(self, flight_id: str, frame_id: int, estimated_gps: GPSPoint) -> SearchSession:
logger.info(f"Starting progressive search for flight {flight_id}, frame {frame_id} at {estimated_gps}")
return SearchSession(
session_id=f"search_{flight_id}_{frame_id}",
flight_id=flight_id,
frame_id=frame_id,
center_gps=estimated_gps,
current_grid_size=1,
max_grid_size=25,
found=False,
exhausted=False
)
def expand_search_radius(self, session: SearchSession) -> List[TileCoords]:
grid_progression = [1, 4, 9, 16, 25]
try:
idx = grid_progression.index(session.current_grid_size)
if idx + 1 < len(grid_progression):
new_size = grid_progression[idx + 1]
# Mocking tile expansion assuming F04 has compute_tile_coords
center_tc = self.f04.compute_tile_coords(session.center_gps.lat, session.center_gps.lon, zoom=18)
new_tiles = self.f04.expand_search_grid(center_tc, session.current_grid_size, new_size)
session.current_grid_size = new_size
return new_tiles
except ValueError:
pass
session.exhausted = True
return []
def try_current_grid(self, session: SearchSession, tiles: Dict[str, Tuple[np.ndarray, Any]], uav_image: np.ndarray) -> Optional[AlignmentResult]:
if os.environ.get("USE_MOCK_MODELS") == "1":
# Fake a successful satellite recovery to keep the simulation moving automatically
mock_res = AlignmentResult(
matched=True,
gps=session.center_gps,
confidence=0.9,
inlier_count=100,
transform=np.eye(3)
)
self.mark_found(session, mock_res)
return mock_res
for tile_id, (tile_img, bounds) in tiles.items():
if self.f09:
result = self.f09.align_to_satellite(uav_image, tile_img, bounds)
if result and result.confidence > 0.7:
self.mark_found(session, result)
return result
return None
def mark_found(self, session: SearchSession, result: AlignmentResult) -> bool:
session.found = True
logger.info(f"Search session {session.session_id} succeeded at grid size {session.current_grid_size}.")
return True
def get_search_status(self, session: SearchSession) -> SearchStatus:
return SearchStatus(
current_grid_size=session.current_grid_size,
found=session.found,
exhausted=session.exhausted
)
def create_user_input_request(self, flight_id: str, frame_id: int, uav_image: np.ndarray, candidate_tiles: List[TileCandidate]) -> UserInputRequest:
return UserInputRequest(
request_id=f"usr_req_{flight_id}_{frame_id}",
flight_id=flight_id,
frame_id=frame_id,
uav_image=uav_image,
candidate_tiles=candidate_tiles,
message="Tracking lost and automatic recovery failed. Please provide a location anchor.",
created_at=datetime.utcnow()
)
def apply_user_anchor(self, flight_id: str, frame_id: int, anchor: UserAnchor) -> bool:
logger.info(f"Applying user anchor for frame {frame_id} at {anchor.satellite_gps}")
gps_array = np.array([anchor.satellite_gps.lat, anchor.satellite_gps.lon, 400.0]) # Defaulting alt
# Delegate to Factor Graph Optimizer to add hard constraint
# Note: In a real integration, we'd need to find which chunk this frame belongs to
chunk_id = self.f12.get_chunk_for_frame(flight_id, frame_id)
if chunk_id:
self.f10.add_chunk_anchor(chunk_id, frame_id, gps_array)
return True
return False
# --- Chunk Recovery ---
def create_chunk_on_tracking_loss(self, flight_id: str, frame_id: int) -> ChunkHandle:
logger.warning(f"Creating proactive recovery chunk starting at frame {frame_id}")
chunk = self.f12.create_chunk(flight_id, frame_id)
return chunk
def try_chunk_semantic_matching(self, chunk_id: str) -> Optional[List[TileCandidate]]:
"""Attempts semantic matching for a whole chunk using aggregate descriptors."""
logger.info(f"Attempting semantic matching for chunk {chunk_id}.")
if not hasattr(self.f12, 'get_chunk_images'):
return None
chunk_images = self.f12.get_chunk_images(chunk_id)
if not chunk_images:
return None
candidates = self.f08.retrieve_candidate_tiles_for_chunk(chunk_images)
return candidates if candidates else None
def try_chunk_litesam_matching(self, chunk_id: str, candidate_tiles: List[TileCandidate]) -> Optional[ChunkAlignmentResult]:
"""Attempts LiteSAM matching across candidate tiles with rotation sweeps."""
logger.info(f"Attempting LiteSAM rotation sweeps for chunk {chunk_id}")
if not hasattr(self.f12, 'get_chunk_images'):
return None
chunk_images = self.f12.get_chunk_images(chunk_id)
if not chunk_images:
return None
for candidate in candidate_tiles:
tile_img = self.f04.fetch_tile(candidate.gps.lat, candidate.gps.lon, zoom=18)
if tile_img is not None:
coords = self.f04.compute_tile_coords(candidate.gps.lat, candidate.gps.lon, zoom=18)
bounds = self.f04.compute_tile_bounds(coords)
rot_result = self.f06.try_chunk_rotation_steps(chunk_images, tile_img, bounds, self.f09)
if rot_result and rot_result.matched:
sim3 = self.f09._compute_sim3_transform(rot_result.homography, bounds) if hasattr(self.f09, '_compute_sim3_transform') else Sim3Transform(translation=np.zeros(3), rotation=np.eye(3), scale=1.0)
gps = self.f09._get_chunk_center_gps(rot_result.homography, bounds, chunk_images) if hasattr(self.f09, '_get_chunk_center_gps') else candidate.gps
return ChunkAlignmentResult(
matched=True,
chunk_id=chunk_id,
chunk_center_gps=gps,
rotation_angle=rot_result.precise_angle,
confidence=rot_result.confidence,
inlier_count=rot_result.inlier_count,
transform=sim3,
reprojection_error=0.0
)
return None
def merge_chunk_to_trajectory(self, flight_id: str, chunk_id: str, alignment_result: ChunkAlignmentResult) -> bool:
logger.info(f"Merging chunk {chunk_id} to global trajectory.")
main_chunk_id = self.f12.get_preceding_chunk(flight_id, chunk_id) if hasattr(self.f12, 'get_preceding_chunk') else "main"
if not main_chunk_id:
main_chunk_id = "main"
anchor_gps = np.array([alignment_result.chunk_center_gps.lat, alignment_result.chunk_center_gps.lon, 400.0])
if hasattr(self.f12, 'mark_chunk_anchored'):
self.f12.mark_chunk_anchored(chunk_id, anchor_gps)
if hasattr(self.f12, 'merge_chunks'):
return self.f12.merge_chunks(main_chunk_id, chunk_id, alignment_result.transform)
return False
def process_unanchored_chunks(self, flight_id: str) -> None:
"""Background task loop structure designed to be called by a worker thread."""
if not hasattr(self.f12, 'get_chunks_for_matching'):
return
unanchored_chunks = self.f12.get_chunks_for_matching(flight_id)
for chunk in unanchored_chunks:
if hasattr(self.f12, 'is_chunk_ready_for_matching') and self.f12.is_chunk_ready_for_matching(chunk.chunk_id):
if hasattr(self.f12, 'mark_chunk_matching'):
self.f12.mark_chunk_matching(chunk.chunk_id)
candidates = self.try_chunk_semantic_matching(chunk.chunk_id)
if candidates:
alignment = self.try_chunk_litesam_matching(chunk.chunk_id, candidates)
if alignment:
self.merge_chunk_to_trajectory(flight_id, chunk.chunk_id, alignment)