mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 08:26:36 +00:00
228 lines
9.7 KiB
Python
228 lines
9.7 KiB
Python
import threading
|
|
import logging
|
|
import numpy as np
|
|
import asyncio
|
|
import time
|
|
from queue import Queue, Empty
|
|
from typing import Optional, Callable, Any
|
|
|
|
from f13_result_manager import ResultData, GPSPoint
|
|
from h05_performance_monitor import PerformanceMonitor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class FlightProcessingEngine:
|
|
"""
|
|
Orchestrates the main frame-by-frame processing loop.
|
|
Coordinates Visual Odometry (Front-End), Cross-View Geo-Localization (Back-End),
|
|
and the Factor Graph Optimizer. Manages chunk lifecycles and real-time streaming.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
vo_frontend: Any,
|
|
factor_graph: Any,
|
|
cvgl_backend: Any,
|
|
async_pose_publisher: Optional[Callable] = None,
|
|
event_loop: Optional[asyncio.AbstractEventLoop] = None,
|
|
failure_coordinator: Any = None,
|
|
result_manager: Any = None,
|
|
camera_params: Any = None
|
|
):
|
|
self.vo = vo_frontend
|
|
self.optimizer = factor_graph
|
|
self.cvgl = cvgl_backend
|
|
|
|
self.async_pose_publisher = async_pose_publisher
|
|
self.event_loop = event_loop
|
|
self.failure_coordinator = failure_coordinator
|
|
self.result_manager = result_manager
|
|
self.camera_params = camera_params
|
|
|
|
self.image_queue = Queue()
|
|
self.is_running = False
|
|
self.processing_thread = None
|
|
self.recovery_thread = None
|
|
|
|
# State Machine & Flight Data
|
|
self.active_flight_id = None
|
|
self.current_chunk_id = "chunk_0"
|
|
self.chunk_counter = 0
|
|
self.last_frame_id = -1
|
|
self.last_image = None
|
|
self.unanchored_chunks = set()
|
|
self.chunk_image_cache = {}
|
|
|
|
self.perf_monitor = PerformanceMonitor(ac7_limit_s=5.0)
|
|
|
|
# External Index for CVGL Back-End
|
|
self.satellite_index = None
|
|
|
|
def set_satellite_index(self, index):
|
|
"""Sets the Faiss Index containing local satellite tiles."""
|
|
self.satellite_index = index
|
|
|
|
def start_processing(self, flight_id: str):
|
|
"""Starts the main processing loop in a background thread."""
|
|
if self.is_running:
|
|
logger.warning("Engine is already running.")
|
|
return
|
|
|
|
self.active_flight_id = flight_id
|
|
self.is_running = True
|
|
self.processing_thread = threading.Thread(target=self._run_processing_loop, daemon=True)
|
|
self.processing_thread.start()
|
|
|
|
self.recovery_thread = threading.Thread(target=self._chunk_recovery_loop, daemon=True)
|
|
self.recovery_thread.start()
|
|
logger.info(f"Started processing loop for flight {self.active_flight_id}")
|
|
|
|
def stop_processing(self):
|
|
"""Stops the processing loop gracefully."""
|
|
self.is_running = False
|
|
if self.processing_thread:
|
|
self.processing_thread.join()
|
|
if self.recovery_thread:
|
|
self.recovery_thread.join()
|
|
logger.info("Flight Processing Engine stopped.")
|
|
|
|
def add_image(self, frame_id: int, image: np.ndarray):
|
|
"""Ingests an image into the processing queue."""
|
|
self.image_queue.put((frame_id, image))
|
|
|
|
def _run_processing_loop(self):
|
|
"""The core continuous loop running in a background thread."""
|
|
while self.is_running:
|
|
try:
|
|
# Wait for up to 1 second for a new image
|
|
frame_id, image = self.image_queue.get(timeout=1.0)
|
|
|
|
with self.perf_monitor.measure(f"frame_{frame_id}_total", limit_ms=5000.0):
|
|
self._process_single_frame(frame_id, image)
|
|
|
|
except Empty:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Critical error processing frame: {e}")
|
|
|
|
def _process_single_frame(self, frame_id: int, image: np.ndarray):
|
|
"""Processes a single frame through the VO -> Graph -> CVGL pipeline."""
|
|
|
|
if self.last_image is None:
|
|
self.last_image = image
|
|
self.last_frame_id = frame_id
|
|
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
|
|
self._attempt_global_anchoring(frame_id, image)
|
|
return
|
|
|
|
# 1. Front-End: Compute Unscaled Relative Pose (High Frequency)
|
|
with self.perf_monitor.measure(f"frame_{frame_id}_vo_tracking"):
|
|
rel_pose = self.vo.compute_relative_pose(self.last_image, image, self.camera_params)
|
|
|
|
if not rel_pose or not rel_pose.tracking_good:
|
|
logger.warning(f"Tracking lost at frame {frame_id}. Initiating new chunk.")
|
|
# AC-4: Handle sharp turns by creating a disconnected map chunk
|
|
if self.failure_coordinator and self.active_flight_id:
|
|
chunk_handle = self.failure_coordinator.create_chunk_on_tracking_loss(self.active_flight_id, frame_id)
|
|
self.current_chunk_id = chunk_handle.chunk_id
|
|
self.unanchored_chunks.add(self.current_chunk_id)
|
|
else:
|
|
self.chunk_counter += 1
|
|
self.current_chunk_id = f"chunk_{self.chunk_counter}"
|
|
self.last_frame_id = -1
|
|
self.last_image = image
|
|
self.last_frame_id = frame_id
|
|
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
|
|
self._attempt_global_anchoring(frame_id, image)
|
|
return
|
|
|
|
transform = np.eye(4)
|
|
transform[:3, :3] = rel_pose.rotation
|
|
transform[:3, 3] = rel_pose.translation.flatten()
|
|
|
|
# 2. Factor Graph: Initialize Chunk or Add Relative Factor
|
|
if self.last_frame_id == -1 or self.current_chunk_id not in self.optimizer.chunks:
|
|
self.optimizer.create_chunk_subgraph(self.current_chunk_id, frame_id)
|
|
self.last_frame_id = frame_id
|
|
|
|
# Immediately attempt to anchor the new chunk
|
|
self._attempt_global_anchoring(frame_id, image)
|
|
return
|
|
|
|
self.optimizer.add_relative_factor_to_chunk(
|
|
self.current_chunk_id, self.last_frame_id, frame_id, transform
|
|
)
|
|
|
|
# Cache images for unanchored chunks to build sequence descriptors
|
|
if self.current_chunk_id in self.unanchored_chunks:
|
|
self.chunk_image_cache.setdefault(self.current_chunk_id, []).append(image)
|
|
|
|
# 3. Optimize and Stream Immediate Unscaled Pose (< 5s | AC-7)
|
|
opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id)
|
|
if opt_success and frame_id in results:
|
|
self._publish_result(frame_id, results[frame_id], is_refined=False)
|
|
|
|
# 4. Back-End: Global Anchoring (Low Frequency / Periodic)
|
|
# We run the heavy global search only every 15 frames to save compute
|
|
if frame_id % 15 == 0:
|
|
self._attempt_global_anchoring(frame_id, image)
|
|
|
|
self.last_frame_id = frame_id
|
|
self.last_image = image
|
|
|
|
def _attempt_global_anchoring(self, frame_id: int, image: np.ndarray):
|
|
"""Queries the CVGL Back-End for an absolute metric GPS anchor."""
|
|
if not self.satellite_index:
|
|
return
|
|
|
|
with self.perf_monitor.measure(f"frame_{frame_id}_cvgl_anchoring"):
|
|
found, H_transform, sat_info = self.cvgl.retrieve_and_match(image, self.satellite_index)
|
|
|
|
if found and sat_info:
|
|
logger.info(f"Global metric anchor found for frame {frame_id}!")
|
|
|
|
# Pass hard constraint to Factor Graph Optimizer
|
|
# Note: sat_info should ideally contain the absolute metric X, Y, Z translation
|
|
anchor_gps = np.array([sat_info.get('lat', 0.0), sat_info.get('lon', 0.0), 400.0])
|
|
self.optimizer.add_chunk_anchor(self.current_chunk_id, frame_id, anchor_gps)
|
|
|
|
# Re-optimize. The graph will resolve scale drift.
|
|
opt_success, results = self.optimizer.optimize_chunk(self.current_chunk_id)
|
|
if opt_success:
|
|
# Stream asynchronous Refined Poses (AC-8)
|
|
for fid, pose_matrix in results.items():
|
|
self._publish_result(fid, pose_matrix, is_refined=True)
|
|
|
|
def _publish_result(self, frame_id: int, pose_matrix: np.ndarray, is_refined: bool):
|
|
"""Safely pushes the pose event to the async SSE stream."""
|
|
# Simplified ENU to Lat/Lon mock logic for demonstration
|
|
lat = 48.0 + pose_matrix[0, 3] * 0.00001
|
|
lon = 37.0 + pose_matrix[1, 3] * 0.00001
|
|
confidence = 0.9 if is_refined else 0.5
|
|
|
|
if self.result_manager and self.active_flight_id:
|
|
try:
|
|
res = ResultData(
|
|
flight_id=self.active_flight_id,
|
|
image_id=f"AD{frame_id:06d}.jpg",
|
|
sequence_number=frame_id,
|
|
estimated_gps=GPSPoint(lat=lat, lon=lon, altitude_m=400.0),
|
|
confidence=confidence,
|
|
source="factor_graph" if is_refined else "vo_frontend",
|
|
refinement_reason="Global Anchor Merge" if is_refined else None
|
|
)
|
|
self.result_manager.store_result(res)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store result for frame {frame_id}: {e}")
|
|
|
|
if self.async_pose_publisher and self.event_loop:
|
|
asyncio.run_coroutine_threadsafe(
|
|
self.async_pose_publisher(frame_id, lat, lon, confidence, is_refined),
|
|
self.event_loop
|
|
)
|
|
|
|
def _chunk_recovery_loop(self):
|
|
"""Background task to asynchronously match and merge unanchored chunks."""
|
|
while self.is_running:
|
|
if self.failure_coordinator and self.active_flight_id:
|
|
self.failure_coordinator.process_unanchored_chunks(self.active_flight_id)
|
|
time.sleep(2.0) |