Files
gps-denied-onboard/f02_flight_processing_engine.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

228 lines
9.7 KiB
Python

import threading
import logging
import numpy as np
import asyncio
import time
from queue import Queue, Empty
from typing import Optional, Callable, Any
from f13_result_manager import ResultData, GPSPoint
from h05_performance_monitor import PerformanceMonitor
logger = logging.getLogger(__name__)
class FlightProcessingEngine:
"""
Orchestrates the main frame-by-frame processing loop.
Coordinates Visual Odometry (Front-End), Cross-View Geo-Localization (Back-End),
and the Factor Graph Optimizer. Manages chunk lifecycles and real-time streaming.
"""
def __init__(
self,
vo_frontend: Any,
factor_graph: Any,
cvgl_backend: Any,
async_pose_publisher: Optional[Callable] = None,
event_loop: Optional[asyncio.AbstractEventLoop] = None,
failure_coordinator: Any = None,
result_manager: Any = None,
camera_params: Any = None
):
self.vo = vo_frontend
self.optimizer = factor_graph
self.cvgl = cvgl_backend
self.async_pose_publisher = async_pose_publisher
self.event_loop = event_loop
self.failure_coordinator = failure_coordinator
self.result_manager = result_manager
self.camera_params = camera_params
self.image_queue = Queue()
self.is_running = False
self.processing_thread = None
self.recovery_thread = None
# State Machine & Flight Data
self.active_flight_id = None
self.current_chunk_id = "chunk_0"
self.chunk_counter = 0
self.last_frame_id = -1
self.last_image = None
self.unanchored_chunks = set()
self.chunk_image_cache = {}
self.perf_monitor = PerformanceMonitor(ac7_limit_s=5.0)
# External Index for CVGL Back-End
self.satellite_index = None
def set_satellite_index(self, index):
"""Sets the Faiss Index containing local satellite tiles."""
self.satellite_index = index
def start_processing(self, flight_id: str):
"""Starts the main processing loop in a background thread."""
if self.is_running:
logger.warning("Engine is already running.")
return
self.active_flight_id = flight_id
self.is_running = True
self.processing_thread = threading.Thread(target=self._run_processing_loop, daemon=True)
self.processing_thread.start()
self.recovery_thread = threading.Thread(target=self._chunk_recovery_loop, daemon=True)
self.recovery_thread.start()
logger.info(f"Started processing loop for flight {self.active_flight_id}")
def stop_processing(self):
"""Stops the processing loop gracefully."""
self.is_running = False
if self.processing_thread:
self.processing_thread.join()
if self.recovery_thread:
self.recovery_thread.join()
logger.info("Flight Processing Engine stopped.")
def add_image(self, frame_id: int, image: np.ndarray):
"""Ingests an image into the processing queue."""
self.image_queue.put((frame_id, image))
def _run_processing_loop(self):
"""The core continuous loop running in a background thread."""
while self.is_running:
try:
# Wait for up to 1 second for a new image
frame_id, image = self.image_queue.get(timeout=1.0)
with self.perf_monitor.measure(f"frame_{frame_id}_total", limit_ms=5000.0):
self._process_single_frame(frame_id, image)
except Empty:
continue
except Exception as e:
logger.error(f"Critical error processing frame: {e}")
def _process_single_frame(self, frame_id: int, image: np.ndarray):
"""Processes a single frame through the VO -> Graph -> CVGL pipeline."""
if self.last_image is None:
self.last_image = image
self.last_frame_id = frame_id
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
self._attempt_global_anchoring(frame_id, image)
return
# 1. Front-End: Compute Unscaled Relative Pose (High Frequency)
with self.perf_monitor.measure(f"frame_{frame_id}_vo_tracking"):
rel_pose = self.vo.compute_relative_pose(self.last_image, image, self.camera_params)
if not rel_pose or not rel_pose.tracking_good:
logger.warning(f"Tracking lost at frame {frame_id}. Initiating new chunk.")
# AC-4: Handle sharp turns by creating a disconnected map chunk
if self.failure_coordinator and self.active_flight_id:
chunk_handle = self.failure_coordinator.create_chunk_on_tracking_loss(self.active_flight_id, frame_id)
self.current_chunk_id = chunk_handle.chunk_id
self.unanchored_chunks.add(self.current_chunk_id)
else:
self.chunk_counter += 1
self.current_chunk_id = f"chunk_{self.chunk_counter}"
self.last_frame_id = -1
self.last_image = image
self.last_frame_id = frame_id
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
self._attempt_global_anchoring(frame_id, image)
return
transform = np.eye(4)
transform[:3, :3] = rel_pose.rotation
transform[:3, 3] = rel_pose.translation.flatten()
# 2. Factor Graph: Initialize Chunk or Add Relative Factor
if self.last_frame_id == -1 or self.current_chunk_id not in self.optimizer.chunks:
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
self.last_frame_id = frame_id
# Immediately attempt to anchor the new chunk
self._attempt_global_anchoring(frame_id, image)
return
self.optimizer.add_relative_factor_to_chunk(
self.current_chunk_id, self.last_frame_id, frame_id, transform
)
# Cache images for unanchored chunks to build sequence descriptors
if self.current_chunk_id in self.unanchored_chunks:
self.chunk_image_cache.setdefault(self.current_chunk_id, []).append(image)
# 3. Optimize and Stream Immediate Unscaled Pose (< 5s | AC-7)
opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id)
if opt_success and frame_id in results:
self._publish_result(frame_id, results[frame_id], is_refined=False)
# 4. Back-End: Global Anchoring (Low Frequency / Periodic)
# We run the heavy global search only every 15 frames to save compute
if frame_id % 15 == 0:
self._attempt_global_anchoring(frame_id, image)
self.last_frame_id = frame_id
self.last_image = image
def _attempt_global_anchoring(self, frame_id: int, image: np.ndarray):
"""Queries the CVGL Back-End for an absolute metric GPS anchor."""
if not self.satellite_index:
return
with self.perf_monitor.measure(f"frame_{frame_id}_cvgl_anchoring"):
found, H_transform, sat_info = self.cvgl.retrieve_and_match(image, self.satellite_index)
if found and sat_info:
logger.info(f"Global metric anchor found for frame {frame_id}!")
# Pass hard constraint to Factor Graph Optimizer
# Note: sat_info should ideally contain the absolute metric X, Y, Z translation
anchor_gps = np.array([sat_info.get('lat', 0.0), sat_info.get('lon', 0.0), 400.0])
self.optimizer.add_chunk_anchor(self.current_chunk_id, frame_id, anchor_gps)
# Re-optimize. The graph will resolve scale drift.
opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id)
if opt_success:
# Stream asynchronous Refined Poses (AC-8)
for fid, pose_matrix in results.items():
self._publish_result(fid, pose_matrix, is_refined=True)
def _publish_result(self, frame_id: int, pose_matrix: np.ndarray, is_refined: bool):
"""Safely pushes the pose event to the async SSE stream."""
# Simplified ENU to Lat/Lon mock logic for demonstration
lat = 48.0 + pose_matrix[0, 3] * 0.00001
lon = 37.0 + pose_matrix[1, 3] * 0.00001
confidence = 0.9 if is_refined else 0.5
if self.result_manager and self.active_flight_id:
try:
res = ResultData(
flight_id=self.active_flight_id,
image_id=f"AD{frame_id:06d}.jpg",
sequence_number=frame_id,
estimated_gps=GPSPoint(lat=lat, lon=lon, altitude_m=400.0),
confidence=confidence,
source="factor_graph" if is_refined else "vo_frontend",
refinement_reason="Global Anchor Merge" if is_refined else None
)
self.result_manager.store_result(res)
except Exception as e:
logger.error(f"Failed to store result for frame {frame_id}: {e}")
if self.async_pose_publisher and self.event_loop:
asyncio.run_coroutine_threadsafe(
self.async_pose_publisher(frame_id, lat, lon, confidence, is_refined),
self.event_loop
)
def _chunk_recovery_loop(self):
"""Background task to asynchronously match and merge unanchored chunks."""
while self.is_running:
if self.failure_coordinator and self.active_flight_id:
self.failure_coordinator.process_unanchored_chunks(self.active_flight_id)
time.sleep(2.0)