"""Route Chunk Manager (Component F12).""" import logging import uuid from abc import ABC, abstractmethod from typing import Dict, List, Optional from gps_denied.core.graph import IFactorGraphOptimizer from gps_denied.schemas.chunk import ChunkHandle, ChunkStatus from gps_denied.schemas.metric import Sim3Transform logger = logging.getLogger(__name__) class IRouteChunkManager(ABC): @abstractmethod def create_new_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle: pass @abstractmethod def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: pass @abstractmethod def get_all_chunks(self, flight_id: str) -> List[ChunkHandle]: pass @abstractmethod def add_frame_to_chunk(self, flight_id: str, frame_id: int) -> bool: pass @abstractmethod def update_chunk_status(self, flight_id: str, chunk_id: str, status: ChunkStatus) -> bool: pass @abstractmethod def merge_chunks(self, flight_id: str, new_chunk_id: str, main_chunk_id: str, transform: Sim3Transform) -> bool: pass class RouteChunkManager(IRouteChunkManager): """Manages disconnected trajectory segments.""" def __init__(self, optimizer: IFactorGraphOptimizer): self.optimizer = optimizer # Dictionary flight_id -> Dict[chunk_id -> ChunkHandle] self._chunks: Dict[str, Dict[str, ChunkHandle]] = {} def _init_flight(self, flight_id: str): if flight_id not in self._chunks: self._chunks[flight_id] = {} def create_new_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle: self._init_flight(flight_id) # Deactivate previous active chunk if any active = self.get_active_chunk(flight_id) if active: active.is_active = False chunk_id = f"chunk_{uuid.uuid4().hex[:8]}" # Call F10 to initialize subgraph self.optimizer.create_chunk_subgraph(flight_id, chunk_id, start_frame_id) handle = ChunkHandle( chunk_id=chunk_id, flight_id=flight_id, start_frame_id=start_frame_id, frames=[start_frame_id], is_active=True, matching_status=ChunkStatus.UNANCHORED ) self._chunks[flight_id][chunk_id] = handle logger.info(f"Created new chunk {chunk_id} starting at frame {start_frame_id}") return handle def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: if flight_id not in self._chunks: return None for chunk in self._chunks[flight_id].values(): if chunk.is_active: return chunk return None def get_all_chunks(self, flight_id: str) -> List[ChunkHandle]: if flight_id not in self._chunks: return [] return list(self._chunks[flight_id].values()) def add_frame_to_chunk(self, flight_id: str, frame_id: int) -> bool: active = self.get_active_chunk(flight_id) if not active: return False if frame_id not in active.frames: active.frames.append(frame_id) return True def update_chunk_status(self, flight_id: str, chunk_id: str, status: ChunkStatus) -> bool: if flight_id not in self._chunks or chunk_id not in self._chunks[flight_id]: return False self._chunks[flight_id][chunk_id].matching_status = status return True def merge_chunks(self, flight_id: str, new_chunk_id: str, main_chunk_id: str, transform: Sim3Transform) -> bool: if flight_id not in self._chunks: return False if new_chunk_id not in self._chunks[flight_id] or main_chunk_id not in self._chunks[flight_id]: return False # Perform graph merge success = self.optimizer.merge_chunk_subgraphs(flight_id, new_chunk_id, main_chunk_id, transform) if success: new_chunk = self._chunks[flight_id][new_chunk_id] main_chunk = self._chunks[flight_id][main_chunk_id] # Transfer frames ownership for frame_id in new_chunk.frames: if frame_id not in main_chunk.frames: main_chunk.frames.append(frame_id) new_chunk.frames.clear() new_chunk.matching_status = ChunkStatus.MERGED new_chunk.is_active = False logger.info(f"Merged chunk {new_chunk_id} into {main_chunk_id}") return True return False