import threading import logging import numpy as np import asyncio import time from queue import Queue, Empty from typing import Optional, Callable, Any from f13_result_manager import ResultData, GPSPoint from h05_performance_monitor import PerformanceMonitor logger = logging.getLogger(__name__) class FlightProcessingEngine: """ Orchestrates the main frame-by-frame processing loop. Coordinates Visual Odometry (Front-End), Cross-View Geo-Localization (Back-End), and the Factor Graph Optimizer. Manages chunk lifecycles and real-time streaming. """ def __init__( self, vo_frontend: Any, factor_graph: Any, cvgl_backend: Any, async_pose_publisher: Optional[Callable] = None, event_loop: Optional[asyncio.AbstractEventLoop] = None, failure_coordinator: Any = None, result_manager: Any = None, camera_params: Any = None ): self.vo = vo_frontend self.optimizer = factor_graph self.cvgl = cvgl_backend self.async_pose_publisher = async_pose_publisher self.event_loop = event_loop self.failure_coordinator = failure_coordinator self.result_manager = result_manager self.camera_params = camera_params self.image_queue = Queue() self.is_running = False self.processing_thread = None self.recovery_thread = None # State Machine & Flight Data self.active_flight_id = None self.current_chunk_id = "chunk_0" self.chunk_counter = 0 self.last_frame_id = -1 self.last_image = None self.unanchored_chunks = set() self.chunk_image_cache = {} self.perf_monitor = PerformanceMonitor(ac7_limit_s=5.0) # External Index for CVGL Back-End self.satellite_index = None def set_satellite_index(self, index): """Sets the Faiss Index containing local satellite tiles.""" self.satellite_index = index def start_processing(self, flight_id: str): """Starts the main processing loop in a background thread.""" if self.is_running: logger.warning("Engine is already running.") return self.active_flight_id = flight_id self.is_running = True self.processing_thread = threading.Thread(target=self._run_processing_loop, daemon=True) self.processing_thread.start() self.recovery_thread = threading.Thread(target=self._chunk_recovery_loop, daemon=True) self.recovery_thread.start() logger.info(f"Started processing loop for flight {self.active_flight_id}") def stop_processing(self): """Stops the processing loop gracefully.""" self.is_running = False if self.processing_thread: self.processing_thread.join() if self.recovery_thread: self.recovery_thread.join() logger.info("Flight Processing Engine stopped.") def add_image(self, frame_id: int, image: np.ndarray): """Ingests an image into the processing queue.""" self.image_queue.put((frame_id, image)) def _run_processing_loop(self): """The core continuous loop running in a background thread.""" while self.is_running: try: # Wait for up to 1 second for a new image frame_id, image = self.image_queue.get(timeout=1.0) with self.perf_monitor.measure(f"frame_{frame_id}_total", limit_ms=5000.0): self._process_single_frame(frame_id, image) except Empty: continue except Exception as e: logger.error(f"Critical error processing frame: {e}") def _process_single_frame(self, frame_id: int, image: np.ndarray): """Processes a single frame through the VO -> Graph -> CVGL pipeline.""" if self.last_image is None: self.last_image = image self.last_frame_id = frame_id self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id) self._attempt_global_anchoring(frame_id, image) return # 1. Front-End: Compute Unscaled Relative Pose (High Frequency) with self.perf_monitor.measure(f"frame_{frame_id}_vo_tracking"): rel_pose = self.vo.compute_relative_pose(self.last_image, image, self.camera_params) if not rel_pose or not rel_pose.tracking_good: logger.warning(f"Tracking lost at frame {frame_id}. Initiating new chunk.") # AC-4: Handle sharp turns by creating a disconnected map chunk if self.failure_coordinator and self.active_flight_id: chunk_handle = self.failure_coordinator.create_chunk_on_tracking_loss(self.active_flight_id, frame_id) self.current_chunk_id = chunk_handle.chunk_id self.unanchored_chunks.add(self.current_chunk_id) else: self.chunk_counter += 1 self.current_chunk_id = f"chunk_{self.chunk_counter}" self.last_frame_id = -1 self.last_image = image self.last_frame_id = frame_id self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id) self._attempt_global_anchoring(frame_id, image) return transform = np.eye(4) transform[:3, :3] = rel_pose.rotation transform[:3, 3] = rel_pose.translation.flatten() # 2. Factor Graph: Initialize Chunk or Add Relative Factor if self.last_frame_id == -1 or self.current_chunk_id not in self.optimizer.chunks: self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id) self.last_frame_id = frame_id # Immediately attempt to anchor the new chunk self._attempt_global_anchoring(frame_id, image) return self.optimizer.add_relative_factor_to_chunk( self.current_chunk_id, self.last_frame_id, frame_id, transform ) # Cache images for unanchored chunks to build sequence descriptors if self.current_chunk_id in self.unanchored_chunks: self.chunk_image_cache.setdefault(self.current_chunk_id, []).append(image) # 3. Optimize and Stream Immediate Unscaled Pose (< 5s | AC-7) opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id) if opt_success and frame_id in results: self._publish_result(frame_id, results[frame_id], is_refined=False) # 4. Back-End: Global Anchoring (Low Frequency / Periodic) # We run the heavy global search only every 15 frames to save compute if frame_id % 15 == 0: self._attempt_global_anchoring(frame_id, image) self.last_frame_id = frame_id self.last_image = image def _attempt_global_anchoring(self, frame_id: int, image: np.ndarray): """Queries the CVGL Back-End for an absolute metric GPS anchor.""" if not self.satellite_index: return with self.perf_monitor.measure(f"frame_{frame_id}_cvgl_anchoring"): found, H_transform, sat_info = self.cvgl.retrieve_and_match(image, self.satellite_index) if found and sat_info: logger.info(f"Global metric anchor found for frame {frame_id}!") # Pass hard constraint to Factor Graph Optimizer # Note: sat_info should ideally contain the absolute metric X, Y, Z translation anchor_gps = np.array([sat_info.get('lat', 0.0), sat_info.get('lon', 0.0), 400.0]) self.optimizer.add_chunk_anchor(self.current_chunk_id, frame_id, anchor_gps) # Re-optimize. The graph will resolve scale drift. opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id) if opt_success: # Stream asynchronous Refined Poses (AC-8) for fid, pose_matrix in results.items(): self._publish_result(fid, pose_matrix, is_refined=True) def _publish_result(self, frame_id: int, pose_matrix: np.ndarray, is_refined: bool): """Safely pushes the pose event to the async SSE stream.""" # Simplified ENU to Lat/Lon mock logic for demonstration lat = 48.0 + pose_matrix[0, 3] * 0.00001 lon = 37.0 + pose_matrix[1, 3] * 0.00001 confidence = 0.9 if is_refined else 0.5 if self.result_manager and self.active_flight_id: try: res = ResultData( flight_id=self.active_flight_id, image_id=f"AD{frame_id:06d}.jpg", sequence_number=frame_id, estimated_gps=GPSPoint(lat=lat, lon=lon, altitude_m=400.0), confidence=confidence, source="factor_graph" if is_refined else "vo_frontend", refinement_reason="Global Anchor Merge" if is_refined else None ) self.result_manager.store_result(res) except Exception as e: logger.error(f"Failed to store result for frame {frame_id}: {e}") if self.async_pose_publisher and self.event_loop: asyncio.run_coroutine_threadsafe( self.async_pose_publisher(frame_id, lat, lon, confidence, is_refined), self.event_loop ) def _chunk_recovery_loop(self): """Background task to asynchronously match and merge unanchored chunks.""" while self.is_running: if self.failure_coordinator and self.active_flight_id: self.failure_coordinator.process_unanchored_chunks(self.active_flight_id) time.sleep(2.0)