Files
gps-denied-onboard/f02_2_flight_processing_engine.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

319 lines
14 KiB
Python

import logging
import threading
import time
from typing import Optional, Any, Dict
import numpy as np
from pydantic import BaseModel
from abc import ABC, abstractmethod
from f02_1_flight_lifecycle_manager import UserFixRequest, GPSPoint
logger = logging.getLogger(__name__)
# --- Data Models ---
class FrameResult(BaseModel):
frame_id: int
success: bool
pose: Optional[Any] = None
image: Optional[np.ndarray] = None
model_config = {"arbitrary_types_allowed": True}
class UserFixResult(BaseModel):
status: str
message: str
class RecoveryStatus:
FOUND = "FOUND"
FAILED = "FAILED"
BLOCKED = "BLOCKED"
class ChunkHandle(BaseModel):
chunk_id: str
# --- Interface ---
class IFlightProcessingEngine(ABC):
@abstractmethod
def start_processing(self, flight_id: str) -> None: pass
@abstractmethod
def stop_processing(self, flight_id: str) -> None: pass
@abstractmethod
def process_frame(self, flight_id: str, frame_id: int, image: np.ndarray) -> FrameResult: pass
@abstractmethod
def apply_user_fix(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult: pass
@abstractmethod
def handle_tracking_loss(self, flight_id: str, frame_id: int, image: np.ndarray) -> str: pass
@abstractmethod
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: pass
@abstractmethod
def create_new_chunk(self, flight_id: str, frame_id: int) -> ChunkHandle: pass
# --- Implementation ---
class FlightProcessingEngine(IFlightProcessingEngine):
"""
Core frame-by-frame processing orchestration running the main visual odometry pipeline.
Manages flight state machine and coordinates chunking and recovery logic.
"""
def __init__(self, f04=None, f05=None, f06=None, f07=None, f08=None, f09=None, f10=None, f11=None, f12=None, f13=None, f14=None, f15=None, f17=None):
self.f04 = f04
self.f05 = f05
self.f06 = f06
self.f07 = f07
self.f08 = f08
self.f09 = f09
self.f10 = f10
self.f11 = f11
self.f12 = f12
self.f13 = f13
self.f14 = f14
self.f15 = f15
self.f17 = f17
self._threads: Dict[str, threading.Thread] = {}
self._stop_events: Dict[str, threading.Event] = {}
self._flight_status: Dict[str, str] = {}
def _get_flight_status(self, flight_id: str) -> str:
return self._flight_status.get(flight_id, "CREATED")
def _update_flight_status(self, flight_id: str, status: str) -> bool:
current = self._get_flight_status(flight_id)
# State Machine Validation
if current == "COMPLETED" and status not in ["COMPLETED", "DELETED"]:
logger.warning(f"Invalid state transition attempted for {flight_id}: {current} -> {status}")
return False
self._flight_status[flight_id] = status
logger.info(f"Flight {flight_id} transitioned to state: {status}")
return True
def _is_processing_active(self, flight_id: str) -> bool:
if flight_id not in self._stop_events:
return False
return not self._stop_events[flight_id].is_set()
def _process_single_frame(self, flight_id: str, image_data: Any) -> FrameResult:
if hasattr(image_data, 'sequence'):
frame_id = image_data.sequence
image = image_data.image
else:
frame_id = image_data.get("frame_id", 0) if isinstance(image_data, dict) else 0
image = image_data.get("image") if isinstance(image_data, dict) else None
return self.process_frame(flight_id, frame_id, image)
def _check_tracking_status(self, vo_result: FrameResult) -> bool:
return vo_result.success
def start_processing(self, flight_id: str) -> None:
if flight_id in self._threads and self._threads[flight_id].is_alive():
return
self._stop_events[flight_id] = threading.Event()
self._update_flight_status(flight_id, "PROCESSING")
thread = threading.Thread(target=self._run_processing_loop, args=(flight_id,), daemon=True)
self._threads[flight_id] = thread
thread.start()
def stop_processing(self, flight_id: str) -> None:
if flight_id in self._stop_events:
self._stop_events[flight_id].set()
if flight_id in self._threads:
self._threads[flight_id].join(timeout=2.0)
def _run_processing_loop(self, flight_id: str):
while self._is_processing_active(flight_id):
try:
if self._get_flight_status(flight_id) == "BLOCKED":
time.sleep(0.1) # Wait for user fix
continue
# Decode queued byte streams to disk so they are available for processing
if hasattr(self.f05, 'process_next_batch'):
self.f05.process_next_batch(flight_id)
# 1. Fetch next image
image_data = self.f05.get_next_image(flight_id) if self.f05 else None
if not image_data:
time.sleep(0.5) # Wait for the UAV to upload the next batch
continue
# 2. Process Frame
result = self._process_single_frame(flight_id, image_data)
frame_id = result.frame_id
image = result.image
# 3. Check Tracking Status and Manage Lifecycle
if self._check_tracking_status(result):
# Do not attempt to add relative constraints on the very first initialization frame
if result.pose is not None:
self._add_frame_to_active_chunk(flight_id, frame_id, result)
else:
if not self.get_active_chunk(flight_id):
self.create_new_chunk(flight_id, frame_id)
chunk = self.get_active_chunk(flight_id)
# Flow 4: Normal Frame Processing
if self.f04 and self.f09 and self.f13 and self.f10 and chunk:
traj = self.f10.get_chunk_trajectory(flight_id, chunk.chunk_id)
last_pose = traj.get(frame_id - 1)
if last_pose:
est_gps = self.f13.enu_to_gps(flight_id, tuple(last_pose.position))
tile = self.f04.fetch_tile(est_gps.lat, est_gps.lon, 18)
bounds = self.f04.compute_tile_bounds(self.f04.compute_tile_coords(est_gps.lat, est_gps.lon, 18))
align_res = self.f09.align_to_satellite(image, tile, bounds)
if align_res and align_res.matched:
self.f10.add_absolute_factor(flight_id, frame_id, align_res.gps_center, np.eye(3), False)
if self.f06: self.f06.update_heading(flight_id, frame_id, 0.0, datetime.utcnow())
self.f10.optimize_chunk(flight_id, chunk.chunk_id, 5)
traj = self.f10.get_chunk_trajectory(flight_id, chunk.chunk_id)
curr_pose = traj.get(frame_id)
if curr_pose and self.f14:
curr_gps = self.f13.enu_to_gps(flight_id, tuple(curr_pose.position))
from f14_result_manager import FrameResult as F14Result
from datetime import datetime
fr = F14Result(frame_id=frame_id, gps_center=curr_gps, altitude=400.0, heading=0.0, confidence=0.8, timestamp=datetime.utcnow())
self.f14.update_frame_result(flight_id, frame_id, fr)
else:
# Detect Chunk Boundary and trigger proactive chunk creation
if self._detect_chunk_boundary(flight_id, frame_id, tracking_status=False):
self._create_chunk_on_tracking_loss(flight_id, frame_id)
# Escalate to Recovery
recovery_status = self.handle_tracking_loss(flight_id, frame_id, image)
if recovery_status == RecoveryStatus.BLOCKED:
self._update_flight_status(flight_id, "BLOCKED")
except Exception as e:
logger.error(f"Critical error in processing loop: {e}", exc_info=True)
time.sleep(1.0) # Prevent tight spinning loop if DB goes down
# --- Core Pipeline Operations ---
def process_frame(self, flight_id: str, frame_id: int, image: np.ndarray) -> FrameResult:
success = False
pose = None
if self.f06 and self.f06.requires_rotation_sweep(flight_id):
if self.f04 and self.f13:
try:
origin = self.f13.get_enu_origin(flight_id)
tile = self.f04.fetch_tile(origin.lat, origin.lon, 18)
bounds = self.f04.compute_tile_bounds(self.f04.compute_tile_coords(origin.lat, origin.lon, 18))
if tile is not None and self.f09:
from datetime import datetime
self.f06.try_rotation_steps(flight_id, frame_id, image, tile, bounds, datetime.utcnow(), self.f09)
except Exception:
pass
if self.f07 and hasattr(self.f07, 'last_image') and self.f07.last_image is not None:
pose = self.f07.compute_relative_pose(self.f07.last_image, image)
if pose and pose.tracking_good:
success = True
elif self.f07:
# First frame initialization is implicitly successful
success = True
if self.f07:
self.f07.last_image = image
return FrameResult(frame_id=frame_id, success=success, pose=pose, image=image)
# --- Tracking Loss Recovery (Feature 02.2.02) ---
def _run_progressive_search(self, flight_id: str, frame_id: int, image: np.ndarray) -> str:
if not self.f13 or not self.f10 or not self.f11: return RecoveryStatus.FAILED
traj = self.f10.get_trajectory(flight_id)
last_pose = traj.get(frame_id - 1)
est_gps = self.f13.enu_to_gps(flight_id, tuple(last_pose.position)) if last_pose else GPSPoint(lat=48.0, lon=37.0)
session = self.f11.start_search(flight_id, frame_id, est_gps)
for _ in range(5):
tile_coords = self.f11.expand_search_radius(session)
tiles_dict = {}
for tc in tile_coords:
tile_img = self.f04.fetch_tile(est_gps.lat, est_gps.lon, tc.zoom) if self.f04 else np.zeros((256,256,3))
bounds = self.f04.compute_tile_bounds(tc) if self.f04 else None
tiles_dict[f"{tc.x}_{tc.y}"] = (tile_img, bounds)
if self.f11.try_current_grid(session, tiles_dict, image):
return RecoveryStatus.FOUND
return RecoveryStatus.FAILED
def _request_user_input(self, flight_id: str, frame_id: int, request: Any):
if self.f15:
self.f15.send_user_input_request(flight_id, request)
def handle_tracking_loss(self, flight_id: str, frame_id: int, image: np.ndarray) -> str:
if not self.f11:
return RecoveryStatus.FAILED
status = self._run_progressive_search(flight_id, frame_id, image)
if status == RecoveryStatus.FOUND:
return status
req = self.f11.create_user_input_request(flight_id, frame_id, image, [])
self._request_user_input(flight_id, frame_id, req)
return RecoveryStatus.BLOCKED
def _validate_user_fix(self, fix_data: UserFixRequest) -> bool:
return not (fix_data.uav_pixel[0] < 0 or fix_data.uav_pixel[1] < 0)
def _apply_fix_and_resume(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult:
if self.f11 and self.f11.apply_user_anchor(flight_id, fix_data):
self._update_flight_status(flight_id, "PROCESSING")
return UserFixResult(status="success", message="Processing resumed")
return UserFixResult(status="error", message="Failed to apply fix via F11")
def apply_user_fix(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult:
if self._get_flight_status(flight_id) != "BLOCKED":
return UserFixResult(status="error", message="Flight not in blocked state")
if not self._validate_user_fix(fix_data):
return UserFixResult(status="error", message="Invalid pixel coordinates")
return self._apply_fix_and_resume(flight_id, fix_data)
def _add_frame_to_active_chunk(self, flight_id: str, frame_id: int, frame_result: FrameResult):
if self.f12:
chunk = self.f12.get_active_chunk(flight_id)
if chunk:
self.f12.add_frame_to_chunk(chunk.chunk_id, frame_id, frame_result.pose)
# --- Chunk Lifecycle Orchestration (Feature 02.2.03) ---
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]:
if self.f12:
return self.f12.get_active_chunk(flight_id)
return None
def create_new_chunk(self, flight_id: str, frame_id: int) -> ChunkHandle:
if self.f12:
return self.f12.create_chunk(flight_id, frame_id)
return ChunkHandle(chunk_id=f"chunk_{frame_id}")
def _detect_chunk_boundary(self, flight_id: str, frame_id: int, tracking_status: bool) -> bool:
# Chunk boundaries occur on tracking loss
return not tracking_status
def _should_create_chunk_on_tracking_loss(self, flight_id: str) -> bool:
return True
def _create_chunk_on_tracking_loss(self, flight_id: str, frame_id: int) -> ChunkHandle:
logger.info(f"Proactive chunk creation at frame {frame_id} due to tracking loss.")
return self.create_new_chunk(flight_id, frame_id)