mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 10:06:37 +00:00
319 lines
14 KiB
Python
319 lines
14 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
from typing import Optional, Any, Dict
|
|
import numpy as np
|
|
from pydantic import BaseModel
|
|
from abc import ABC, abstractmethod
|
|
|
|
from f02_1_flight_lifecycle_manager import UserFixRequest, GPSPoint
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Data Models ---
|
|
|
|
class FrameResult(BaseModel):
|
|
frame_id: int
|
|
success: bool
|
|
pose: Optional[Any] = None
|
|
image: Optional[np.ndarray] = None
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class UserFixResult(BaseModel):
|
|
status: str
|
|
message: str
|
|
|
|
class RecoveryStatus:
|
|
FOUND = "FOUND"
|
|
FAILED = "FAILED"
|
|
BLOCKED = "BLOCKED"
|
|
|
|
class ChunkHandle(BaseModel):
|
|
chunk_id: str
|
|
|
|
# --- Interface ---
|
|
|
|
class IFlightProcessingEngine(ABC):
|
|
@abstractmethod
|
|
def start_processing(self, flight_id: str) -> None: pass
|
|
|
|
@abstractmethod
|
|
def stop_processing(self, flight_id: str) -> None: pass
|
|
|
|
@abstractmethod
|
|
def process_frame(self, flight_id: str, frame_id: int, image: np.ndarray) -> FrameResult: pass
|
|
|
|
@abstractmethod
|
|
def apply_user_fix(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult: pass
|
|
|
|
@abstractmethod
|
|
def handle_tracking_loss(self, flight_id: str, frame_id: int, image: np.ndarray) -> str: pass
|
|
|
|
@abstractmethod
|
|
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: pass
|
|
|
|
@abstractmethod
|
|
def create_new_chunk(self, flight_id: str, frame_id: int) -> ChunkHandle: pass
|
|
|
|
|
|
# --- Implementation ---
|
|
|
|
class FlightProcessingEngine(IFlightProcessingEngine):
|
|
"""
|
|
Core frame-by-frame processing orchestration running the main visual odometry pipeline.
|
|
Manages flight state machine and coordinates chunking and recovery logic.
|
|
"""
|
|
def __init__(self, f04=None, f05=None, f06=None, f07=None, f08=None, f09=None, f10=None, f11=None, f12=None, f13=None, f14=None, f15=None, f17=None):
|
|
self.f04 = f04
|
|
self.f05 = f05
|
|
self.f06 = f06
|
|
self.f07 = f07
|
|
self.f08 = f08
|
|
self.f09 = f09
|
|
self.f10 = f10
|
|
self.f11 = f11
|
|
self.f12 = f12
|
|
self.f13 = f13
|
|
self.f14 = f14
|
|
self.f15 = f15
|
|
self.f17 = f17
|
|
|
|
self._threads: Dict[str, threading.Thread] = {}
|
|
self._stop_events: Dict[str, threading.Event] = {}
|
|
self._flight_status: Dict[str, str] = {}
|
|
|
|
def _get_flight_status(self, flight_id: str) -> str:
|
|
return self._flight_status.get(flight_id, "CREATED")
|
|
|
|
def _update_flight_status(self, flight_id: str, status: str) -> bool:
|
|
current = self._get_flight_status(flight_id)
|
|
|
|
# State Machine Validation
|
|
if current == "COMPLETED" and status not in ["COMPLETED", "DELETED"]:
|
|
logger.warning(f"Invalid state transition attempted for {flight_id}: {current} -> {status}")
|
|
return False
|
|
|
|
self._flight_status[flight_id] = status
|
|
logger.info(f"Flight {flight_id} transitioned to state: {status}")
|
|
return True
|
|
|
|
def _is_processing_active(self, flight_id: str) -> bool:
|
|
if flight_id not in self._stop_events:
|
|
return False
|
|
return not self._stop_events[flight_id].is_set()
|
|
|
|
def _process_single_frame(self, flight_id: str, image_data: Any) -> FrameResult:
|
|
if hasattr(image_data, 'sequence'):
|
|
frame_id = image_data.sequence
|
|
image = image_data.image
|
|
else:
|
|
frame_id = image_data.get("frame_id", 0) if isinstance(image_data, dict) else 0
|
|
image = image_data.get("image") if isinstance(image_data, dict) else None
|
|
return self.process_frame(flight_id, frame_id, image)
|
|
|
|
def _check_tracking_status(self, vo_result: FrameResult) -> bool:
|
|
return vo_result.success
|
|
|
|
def start_processing(self, flight_id: str) -> None:
|
|
if flight_id in self._threads and self._threads[flight_id].is_alive():
|
|
return
|
|
|
|
self._stop_events[flight_id] = threading.Event()
|
|
self._update_flight_status(flight_id, "PROCESSING")
|
|
|
|
thread = threading.Thread(target=self._run_processing_loop, args=(flight_id,), daemon=True)
|
|
self._threads[flight_id] = thread
|
|
thread.start()
|
|
|
|
def stop_processing(self, flight_id: str) -> None:
|
|
if flight_id in self._stop_events:
|
|
self._stop_events[flight_id].set()
|
|
if flight_id in self._threads:
|
|
self._threads[flight_id].join(timeout=2.0)
|
|
|
|
def _run_processing_loop(self, flight_id: str):
|
|
while self._is_processing_active(flight_id):
|
|
try:
|
|
if self._get_flight_status(flight_id) == "BLOCKED":
|
|
time.sleep(0.1) # Wait for user fix
|
|
continue
|
|
|
|
# Decode queued byte streams to disk so they are available for processing
|
|
if hasattr(self.f05, 'process_next_batch'):
|
|
self.f05.process_next_batch(flight_id)
|
|
|
|
# 1. Fetch next image
|
|
image_data = self.f05.get_next_image(flight_id) if self.f05 else None
|
|
if not image_data:
|
|
time.sleep(0.5) # Wait for the UAV to upload the next batch
|
|
continue
|
|
|
|
# 2. Process Frame
|
|
result = self._process_single_frame(flight_id, image_data)
|
|
frame_id = result.frame_id
|
|
image = result.image
|
|
|
|
# 3. Check Tracking Status and Manage Lifecycle
|
|
if self._check_tracking_status(result):
|
|
# Do not attempt to add relative constraints on the very first initialization frame
|
|
if result.pose is not None:
|
|
self._add_frame_to_active_chunk(flight_id, frame_id, result)
|
|
else:
|
|
if not self.get_active_chunk(flight_id):
|
|
self.create_new_chunk(flight_id, frame_id)
|
|
|
|
chunk = self.get_active_chunk(flight_id)
|
|
|
|
# Flow 4: Normal Frame Processing
|
|
if self.f04 and self.f09 and self.f13 and self.f10 and chunk:
|
|
traj = self.f10.get_chunk_trajectory(flight_id, chunk.chunk_id)
|
|
last_pose = traj.get(frame_id - 1)
|
|
if last_pose:
|
|
est_gps = self.f13.enu_to_gps(flight_id, tuple(last_pose.position))
|
|
tile = self.f04.fetch_tile(est_gps.lat, est_gps.lon, 18)
|
|
bounds = self.f04.compute_tile_bounds(self.f04.compute_tile_coords(est_gps.lat, est_gps.lon, 18))
|
|
|
|
align_res = self.f09.align_to_satellite(image, tile, bounds)
|
|
if align_res and align_res.matched:
|
|
self.f10.add_absolute_factor(flight_id, frame_id, align_res.gps_center, np.eye(3), False)
|
|
if self.f06: self.f06.update_heading(flight_id, frame_id, 0.0, datetime.utcnow())
|
|
|
|
self.f10.optimize_chunk(flight_id, chunk.chunk_id, 5)
|
|
|
|
traj = self.f10.get_chunk_trajectory(flight_id, chunk.chunk_id)
|
|
curr_pose = traj.get(frame_id)
|
|
if curr_pose and self.f14:
|
|
curr_gps = self.f13.enu_to_gps(flight_id, tuple(curr_pose.position))
|
|
from f14_result_manager import FrameResult as F14Result
|
|
from datetime import datetime
|
|
fr = F14Result(frame_id=frame_id, gps_center=curr_gps, altitude=400.0, heading=0.0, confidence=0.8, timestamp=datetime.utcnow())
|
|
self.f14.update_frame_result(flight_id, frame_id, fr)
|
|
else:
|
|
# Detect Chunk Boundary and trigger proactive chunk creation
|
|
if self._detect_chunk_boundary(flight_id, frame_id, tracking_status=False):
|
|
self._create_chunk_on_tracking_loss(flight_id, frame_id)
|
|
|
|
# Escalate to Recovery
|
|
recovery_status = self.handle_tracking_loss(flight_id, frame_id, image)
|
|
if recovery_status == RecoveryStatus.BLOCKED:
|
|
self._update_flight_status(flight_id, "BLOCKED")
|
|
except Exception as e:
|
|
logger.error(f"Critical error in processing loop: {e}", exc_info=True)
|
|
time.sleep(1.0) # Prevent tight spinning loop if DB goes down
|
|
|
|
# --- Core Pipeline Operations ---
|
|
|
|
def process_frame(self, flight_id: str, frame_id: int, image: np.ndarray) -> FrameResult:
|
|
success = False
|
|
pose = None
|
|
|
|
if self.f06 and self.f06.requires_rotation_sweep(flight_id):
|
|
if self.f04 and self.f13:
|
|
try:
|
|
origin = self.f13.get_enu_origin(flight_id)
|
|
tile = self.f04.fetch_tile(origin.lat, origin.lon, 18)
|
|
bounds = self.f04.compute_tile_bounds(self.f04.compute_tile_coords(origin.lat, origin.lon, 18))
|
|
if tile is not None and self.f09:
|
|
from datetime import datetime
|
|
self.f06.try_rotation_steps(flight_id, frame_id, image, tile, bounds, datetime.utcnow(), self.f09)
|
|
except Exception:
|
|
pass
|
|
|
|
if self.f07 and hasattr(self.f07, 'last_image') and self.f07.last_image is not None:
|
|
pose = self.f07.compute_relative_pose(self.f07.last_image, image)
|
|
if pose and pose.tracking_good:
|
|
success = True
|
|
elif self.f07:
|
|
# First frame initialization is implicitly successful
|
|
success = True
|
|
|
|
if self.f07:
|
|
self.f07.last_image = image
|
|
|
|
return FrameResult(frame_id=frame_id, success=success, pose=pose, image=image)
|
|
|
|
# --- Tracking Loss Recovery (Feature 02.2.02) ---
|
|
|
|
def _run_progressive_search(self, flight_id: str, frame_id: int, image: np.ndarray) -> str:
|
|
if not self.f13 or not self.f10 or not self.f11: return RecoveryStatus.FAILED
|
|
|
|
traj = self.f10.get_trajectory(flight_id)
|
|
last_pose = traj.get(frame_id - 1)
|
|
est_gps = self.f13.enu_to_gps(flight_id, tuple(last_pose.position)) if last_pose else GPSPoint(lat=48.0, lon=37.0)
|
|
|
|
session = self.f11.start_search(flight_id, frame_id, est_gps)
|
|
|
|
for _ in range(5):
|
|
tile_coords = self.f11.expand_search_radius(session)
|
|
tiles_dict = {}
|
|
for tc in tile_coords:
|
|
tile_img = self.f04.fetch_tile(est_gps.lat, est_gps.lon, tc.zoom) if self.f04 else np.zeros((256,256,3))
|
|
bounds = self.f04.compute_tile_bounds(tc) if self.f04 else None
|
|
tiles_dict[f"{tc.x}_{tc.y}"] = (tile_img, bounds)
|
|
|
|
if self.f11.try_current_grid(session, tiles_dict, image):
|
|
return RecoveryStatus.FOUND
|
|
return RecoveryStatus.FAILED
|
|
|
|
def _request_user_input(self, flight_id: str, frame_id: int, request: Any):
|
|
if self.f15:
|
|
self.f15.send_user_input_request(flight_id, request)
|
|
|
|
def handle_tracking_loss(self, flight_id: str, frame_id: int, image: np.ndarray) -> str:
|
|
if not self.f11:
|
|
return RecoveryStatus.FAILED
|
|
|
|
status = self._run_progressive_search(flight_id, frame_id, image)
|
|
if status == RecoveryStatus.FOUND:
|
|
return status
|
|
|
|
req = self.f11.create_user_input_request(flight_id, frame_id, image, [])
|
|
self._request_user_input(flight_id, frame_id, req)
|
|
return RecoveryStatus.BLOCKED
|
|
|
|
def _validate_user_fix(self, fix_data: UserFixRequest) -> bool:
|
|
return not (fix_data.uav_pixel[0] < 0 or fix_data.uav_pixel[1] < 0)
|
|
|
|
def _apply_fix_and_resume(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult:
|
|
if self.f11 and self.f11.apply_user_anchor(flight_id, fix_data):
|
|
self._update_flight_status(flight_id, "PROCESSING")
|
|
return UserFixResult(status="success", message="Processing resumed")
|
|
return UserFixResult(status="error", message="Failed to apply fix via F11")
|
|
|
|
def apply_user_fix(self, flight_id: str, fix_data: UserFixRequest) -> UserFixResult:
|
|
if self._get_flight_status(flight_id) != "BLOCKED":
|
|
return UserFixResult(status="error", message="Flight not in blocked state")
|
|
|
|
if not self._validate_user_fix(fix_data):
|
|
return UserFixResult(status="error", message="Invalid pixel coordinates")
|
|
|
|
return self._apply_fix_and_resume(flight_id, fix_data)
|
|
|
|
def _add_frame_to_active_chunk(self, flight_id: str, frame_id: int, frame_result: FrameResult):
|
|
if self.f12:
|
|
chunk = self.f12.get_active_chunk(flight_id)
|
|
if chunk:
|
|
self.f12.add_frame_to_chunk(chunk.chunk_id, frame_id, frame_result.pose)
|
|
|
|
# --- Chunk Lifecycle Orchestration (Feature 02.2.03) ---
|
|
|
|
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]:
|
|
if self.f12:
|
|
return self.f12.get_active_chunk(flight_id)
|
|
return None
|
|
|
|
def create_new_chunk(self, flight_id: str, frame_id: int) -> ChunkHandle:
|
|
if self.f12:
|
|
return self.f12.create_chunk(flight_id, frame_id)
|
|
return ChunkHandle(chunk_id=f"chunk_{frame_id}")
|
|
|
|
def _detect_chunk_boundary(self, flight_id: str, frame_id: int, tracking_status: bool) -> bool:
|
|
# Chunk boundaries occur on tracking loss
|
|
return not tracking_status
|
|
|
|
def _should_create_chunk_on_tracking_loss(self, flight_id: str) -> bool:
|
|
return True
|
|
|
|
def _create_chunk_on_tracking_loss(self, flight_id: str, frame_id: int) -> ChunkHandle:
|
|
logger.info(f"Proactive chunk creation at frame {frame_id} due to tracking loss.")
|
|
return self.create_new_chunk(flight_id, frame_id) |