Files
gps-denied-onboard/f12_route_chunk_manager.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

258 lines
9.9 KiB
Python

import uuid
import logging
import numpy as np
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
from abc import ABC, abstractmethod
from f02_1_flight_lifecycle_manager import GPSPoint
from f07_sequential_visual_odometry import RelativePose
from f09_local_geospatial_anchoring import Sim3Transform
logger = logging.getLogger(__name__)
# --- Data Models ---
class ChunkHandle(BaseModel):
chunk_id: str
flight_id: str
start_frame_id: int
end_frame_id: Optional[int] = None
frames: List[int] = []
is_active: bool = True
has_anchor: bool = False
anchor_frame_id: Optional[int] = None
anchor_gps: Optional[GPSPoint] = None
matching_status: str = "unanchored" # "unanchored", "matching", "anchored", "merged"
class ChunkBounds(BaseModel):
estimated_center: GPSPoint
estimated_radius: float
confidence: float
class ChunkConfig(BaseModel):
min_frames_for_matching: int = 5
max_frames_per_chunk: int = 20
descriptor_aggregation: str = "mean"
# --- Interface ---
class IRouteChunkManager(ABC):
@abstractmethod
def create_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle: pass
@abstractmethod
def add_frame_to_chunk(self, chunk_id: str, frame_id: int, vo_result: RelativePose) -> bool: pass
@abstractmethod
def get_chunk_frames(self, chunk_id: str) -> List[int]: pass
@abstractmethod
def get_chunk_images(self, chunk_id: str) -> List[np.ndarray]: pass
@abstractmethod
def get_chunk_composite_descriptor(self, chunk_id: str) -> Optional[np.ndarray]: pass
@abstractmethod
def get_chunk_bounds(self, chunk_id: str) -> ChunkBounds: pass
@abstractmethod
def is_chunk_ready_for_matching(self, chunk_id: str) -> bool: pass
@abstractmethod
def mark_chunk_anchored(self, chunk_id: str, frame_id: int, gps: GPSPoint) -> bool: pass
@abstractmethod
def get_chunks_for_matching(self, flight_id: str) -> List[ChunkHandle]: pass
@abstractmethod
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: pass
@abstractmethod
def deactivate_chunk(self, chunk_id: str) -> bool: pass
@abstractmethod
def merge_chunks(self, main_chunk_id: str, new_chunk_id: str, transform: Sim3Transform) -> bool: pass
@abstractmethod
def mark_chunk_matching(self, chunk_id: str) -> bool: pass
@abstractmethod
def save_chunk_state(self, flight_id: str) -> bool: pass
@abstractmethod
def load_chunk_state(self, flight_id: str) -> bool: pass
# --- Implementation ---
class RouteChunkManager(IRouteChunkManager):
"""
F12: Route Chunk Manager
Tracks the independent mapping states and chunk readiness of Atlas multi-map fragments.
Ensures transactional integrity with F10 Factor Graph Optimizer.
"""
def __init__(self, f03=None, f05=None, f08=None, f10=None, config: Optional[ChunkConfig] = None):
self.f03 = f03 # Flight Database
self.f05 = f05 # Image Input Pipeline
self.f08 = f08 # Global Place Recognition
self.f10 = f10 # Factor Graph Optimizer
self.config = config or ChunkConfig()
self._chunks: Dict[str, ChunkHandle] = {}
def _generate_chunk_id(self) -> str:
return f"chunk_{uuid.uuid4().hex[:8]}"
def _get_chunk_by_id(self, chunk_id: str) -> Optional[ChunkHandle]:
return self._chunks.get(chunk_id)
def _validate_chunk_active(self, chunk_id: str) -> bool:
chunk = self._get_chunk_by_id(chunk_id)
return chunk is not None and chunk.is_active
# --- 12.01 Chunk Lifecycle Management ---
def create_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle:
chunk_id = self._generate_chunk_id()
# Transactional: Create in F10 first
if self.f10:
self.f10.create_chunk_subgraph(flight_id, chunk_id, start_frame_id)
chunk = ChunkHandle(
chunk_id=chunk_id,
flight_id=flight_id,
start_frame_id=start_frame_id,
end_frame_id=start_frame_id,
frames=[start_frame_id],
is_active=True,
has_anchor=False,
matching_status="unanchored"
)
self._chunks[chunk_id] = chunk
logger.info(f"Created new chunk {chunk_id} for flight {flight_id} starting at frame {start_frame_id}")
return chunk
def add_frame_to_chunk(self, chunk_id: str, frame_id: int, vo_result: RelativePose) -> bool:
if not self._validate_chunk_active(chunk_id):
return False
chunk = self._chunks[chunk_id]
# Assumes the relative factor is from the last frame added to the current frame
prev_frame_id = chunk.frames[-1] if chunk.frames else chunk.start_frame_id
# Transactional: Add to F10 first
if self.f10 and not self.f10.add_relative_factor_to_chunk(chunk.flight_id, chunk_id, prev_frame_id, frame_id, vo_result, np.eye(6)):
return False
chunk.frames.append(frame_id)
chunk.end_frame_id = frame_id
return True
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]:
for chunk in self._chunks.values():
if chunk.flight_id == flight_id and chunk.is_active:
return chunk
return None
def deactivate_chunk(self, chunk_id: str) -> bool:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk:
return False
chunk.is_active = False
return True
# --- 12.02 Chunk Data Retrieval ---
def get_chunk_frames(self, chunk_id: str) -> List[int]:
chunk = self._get_chunk_by_id(chunk_id)
return chunk.frames if chunk else []
def get_chunk_images(self, chunk_id: str) -> List[np.ndarray]:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk or not self.f05:
return []
images = []
for fid in chunk.frames:
img_data = self.f05.get_image_by_sequence(chunk.flight_id, fid)
if img_data and img_data.image is not None:
images.append(img_data.image)
return images
def get_chunk_composite_descriptor(self, chunk_id: str) -> Optional[np.ndarray]:
images = self.get_chunk_images(chunk_id)
if not images or not self.f08:
return None
return self.f08.compute_chunk_descriptor(images)
def get_chunk_bounds(self, chunk_id: str) -> ChunkBounds:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk:
return ChunkBounds(estimated_center=GPSPoint(lat=0, lon=0), estimated_radius=0.0, confidence=0.0)
trajectory = self.f10.get_chunk_trajectory(chunk.flight_id, chunk_id) if self.f10 else {}
positions = [pose.position for pose in trajectory.values()] if trajectory else []
radius = max(np.linalg.norm(p - np.mean(positions, axis=0)) for p in positions) if positions else 50.0
center_gps = chunk.anchor_gps if chunk.has_anchor else GPSPoint(lat=0.0, lon=0.0)
conf = 0.8 if chunk.has_anchor else 0.2
return ChunkBounds(estimated_center=center_gps, estimated_radius=float(radius), confidence=conf)
# --- 12.03 Chunk Matching Coordination ---
def is_chunk_ready_for_matching(self, chunk_id: str) -> bool:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk: return False
if chunk.matching_status in ["anchored", "merged", "matching"]: return False
return self.config.min_frames_for_matching <= len(chunk.frames) <= self.config.max_frames_per_chunk
def get_chunks_for_matching(self, flight_id: str) -> List[ChunkHandle]:
return [c for c in self._chunks.values() if c.flight_id == flight_id and self.is_chunk_ready_for_matching(c.chunk_id)]
def mark_chunk_matching(self, chunk_id: str) -> bool:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk: return False
chunk.matching_status = "matching"
return True
def mark_chunk_anchored(self, chunk_id: str, frame_id: int, gps: GPSPoint) -> bool:
chunk = self._get_chunk_by_id(chunk_id)
if not chunk: return False
if self.f10 and not self.f10.add_chunk_anchor(chunk.flight_id, chunk_id, frame_id, gps, np.eye(3)):
return False
chunk.has_anchor = True
chunk.anchor_frame_id = frame_id
chunk.anchor_gps = gps
chunk.matching_status = "anchored"
return True
def merge_chunks(self, main_chunk_id: str, new_chunk_id: str, transform: Sim3Transform) -> bool:
main_chunk = self._get_chunk_by_id(main_chunk_id)
new_chunk = self._get_chunk_by_id(new_chunk_id)
if not main_chunk or not new_chunk:
return False
# Transactional: Call F10 to apply Sim3 Transform and fuse subgraphs
if self.f10 and not self.f10.merge_chunk_subgraphs(main_chunk.flight_id, new_chunk_id, main_chunk_id, transform):
return False
# Absorb frames
main_chunk.frames.extend(new_chunk.frames)
main_chunk.end_frame_id = new_chunk.end_frame_id
new_chunk.is_active = False
new_chunk.matching_status = "merged"
if self.f03:
self.f03.save_chunk_state(main_chunk.flight_id, main_chunk)
self.f03.save_chunk_state(new_chunk.flight_id, new_chunk)
return True
# --- 12.04 Chunk State Persistence ---
def save_chunk_state(self, flight_id: str) -> bool:
if not self.f03: return False
success = True
for chunk in self._chunks.values():
if chunk.flight_id == flight_id:
if not self.f03.save_chunk_state(flight_id, chunk):
success = False
return success
def load_chunk_state(self, flight_id: str) -> bool:
if not self.f03: return False
loaded_chunks = self.f03.load_chunk_states(flight_id)
for chunk in loaded_chunks:
self._chunks[chunk.chunk_id] = chunk
return True