Files
gps-denied-onboard/f01_flight_api.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

452 lines
16 KiB
Python

import logging
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form, Request
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
# Import core data models
from f02_1_flight_lifecycle_manager import (
FlightLifecycleManager,
GPSPoint,
CameraParameters,
Waypoint,
UserFixRequest,
FlightState
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/flights", tags=["Flight Management"])
# --- Dependency Injection ---
def get_lifecycle_manager() -> FlightLifecycleManager:
"""
Dependency placeholder for the Flight Lifecycle Manager.
This will be overridden in main.py during app startup.
"""
raise NotImplementedError("FlightLifecycleManager dependency not overridden.")
def get_flight_database():
"""Dependency for direct DB access if bypassed by manager for simple CRUD."""
raise NotImplementedError("FlightDatabase dependency not overridden.")
# --- API Data Models ---
class Polygon(BaseModel):
north_west: GPSPoint
south_east: GPSPoint
class Geofences(BaseModel):
polygons: List[Polygon] = []
class FlightCreateRequest(BaseModel):
name: str
description: str = ""
start_gps: GPSPoint
rough_waypoints: List[GPSPoint] = []
geofences: Geofences = Geofences()
camera_params: CameraParameters
altitude: float
class FlightResponse(BaseModel):
flight_id: str
status: str
message: Optional[str] = None
created_at: datetime
class FlightDetailResponse(BaseModel):
flight_id: str
name: str
description: str
start_gps: GPSPoint
waypoints: List[Waypoint]
camera_params: CameraParameters
altitude: float
status: str
frames_processed: int
frames_total: int
created_at: datetime
updated_at: datetime
class DeleteResponse(BaseModel):
deleted: bool
flight_id: str
class UpdateResponse(BaseModel):
updated: bool
waypoint_id: str
class BatchUpdateResponse(BaseModel):
success: bool
updated_count: int
failed_ids: List[str]
class BatchResponse(BaseModel):
accepted: bool
sequences: List[int] = []
next_expected: int = 0
message: Optional[str] = None
class UserFixResponse(BaseModel):
accepted: bool
processing_resumed: bool
message: Optional[str] = None
class ObjectToGPSRequest(BaseModel):
pixel_x: float
pixel_y: float
class ObjectGPSResponse(BaseModel):
gps: GPSPoint
accuracy_meters: float
frame_id: int
pixel: Tuple[float, float]
class FlightStatusResponse(BaseModel):
status: str
frames_processed: int
frames_total: int
has_active_engine: bool
class ResultResponse(BaseModel):
image_id: str
sequence_number: int
estimated_gps: GPSPoint
confidence: float
source: str
class CandidateTile(BaseModel):
tile_id: str
image_url: str
center_gps: GPSPoint
class FrameContextResponse(BaseModel):
frame_id: int
uav_image_url: str
satellite_candidates: List[CandidateTile]
# --- Internal Validation & Builder Methods (Feature 01.01) ---
def _validate_gps_coordinates(lat: float, lon: float) -> bool:
"""Validate GPS coordinate ranges."""
return -90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0
def _validate_camera_params(params: CameraParameters) -> bool:
"""Validate camera parameter values."""
if params.focal_length_mm <= 0 or params.sensor_width_mm <= 0:
return False
if "width" not in params.resolution or "height" not in params.resolution:
return False
return True
def _validate_geofences(geofences: Geofences) -> bool:
"""Validate geofence polygon data."""
for poly in geofences.polygons:
if not _validate_gps_coordinates(poly.north_west.lat, poly.north_west.lon):
return False
if not _validate_gps_coordinates(poly.south_east.lat, poly.south_east.lon):
return False
return True
def _build_flight_response(flight_id: str, status: str, message: str) -> FlightResponse:
"""Build response from F02 result."""
return FlightResponse(flight_id=flight_id, status=status, message=message, created_at=datetime.utcnow())
def _build_status_response(state: FlightState) -> FlightStatusResponse:
"""Build status response."""
return FlightStatusResponse(
status=state.state,
frames_processed=state.processed_images,
frames_total=state.total_images,
has_active_engine=state.has_active_engine
)
# --- Internal Validation & Builder Methods (Feature 01.02) ---
def _validate_batch_size(images: List[UploadFile]) -> bool:
"""Validate batch contains 10-50 images."""
return 10 <= len(images) <= 50
def _validate_sequence_numbers(start_seq: int, end_seq: int, count: int) -> bool:
"""Validate start/end sequence are valid."""
if start_seq > end_seq: return False
if (end_seq - start_seq + 1) != count: return False
return True
def _validate_image_format(content_type: str, filename: str) -> bool:
"""Validate image file is valid JPEG/PNG."""
if content_type not in ["image/jpeg", "image/png"]: return False
if not any(filename.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png"]): return False
return True
def _build_batch_response(accepted: bool, start_seq: int, end_seq: int, message: str) -> BatchResponse:
"""Build response with accepted sequences."""
sequences = list(range(start_seq, end_seq + 1)) if accepted else []
next_expected = end_seq + 1 if accepted else start_seq
return BatchResponse(accepted=accepted, sequences=sequences, next_expected=next_expected, message=message)
# --- Endpoints ---
@router.get("", response_model=List[FlightResponse])
async def list_flights(
status: Optional[str] = None,
limit: int = 10,
db: Any = Depends(get_flight_database)
):
"""Retrieves a list of all flights matching the optional status filter."""
if not db:
raise HTTPException(status_code=500, detail="Database dependency missing.")
filters = {"state": status} if status else None
flights = db.query_flights(filters=filters, limit=limit)
return [
FlightResponse(
flight_id=f.flight_id,
status=f.state,
message="Retrieved successfully.",
created_at=f.created_at
) for f in flights
]
@router.post("", response_model=FlightResponse, status_code=201)
async def create_flight(
request: FlightCreateRequest,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Creates a new flight, initializes its origin, and triggers pre-flight satellite tile prefetching."""
if not _validate_gps_coordinates(request.start_gps.lat, request.start_gps.lon):
raise HTTPException(status_code=400, detail="Invalid GPS coordinates.")
if not _validate_camera_params(request.camera_params):
raise HTTPException(status_code=400, detail="Invalid camera parameters.")
if not _validate_geofences(request.geofences):
raise HTTPException(status_code=400, detail="Invalid geofence coordinates.")
try:
flight_data = {
"flight_name": request.name,
"start_gps": request.start_gps.model_dump(),
"altitude_m": request.altitude,
"camera_params": request.camera_params.model_dump(),
"state": "prefetching"
}
flight_id = manager.create_flight(flight_data)
return _build_flight_response(flight_id, "prefetching", "Flight created. Satellite prefetching initiated asynchronously.")
except Exception as e:
logger.error(f"Flight creation failed: {e}")
raise HTTPException(status_code=500, detail="Internal server error during flight creation.")
@router.get("/{flight_id}", response_model=FlightDetailResponse)
async def get_flight(
flight_id: str,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager),
db: Any = Depends(get_flight_database)
):
"""Retrieves complete flight details including its waypoints and processing state."""
flight = manager.get_flight(flight_id)
if not flight:
raise HTTPException(status_code=404, detail="Flight not found.")
state = manager.get_flight_state(flight_id)
waypoints = db.get_waypoints(flight_id) if db else []
return FlightDetailResponse(
flight_id=flight.flight_id,
name=flight.flight_name,
description="", # Simplified for payload
start_gps=flight.start_gps,
waypoints=waypoints,
camera_params=flight.camera_params,
altitude=flight.altitude_m,
status=state.state if state else flight.state,
frames_processed=state.processed_images if state else 0,
frames_total=state.total_images if state else 0,
created_at=flight.created_at,
updated_at=flight.updated_at
)
@router.delete("/{flight_id}", response_model=DeleteResponse)
async def delete_flight(
flight_id: str,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Stops processing, purges cached tiles, and deletes the flight trajectory from the database."""
if manager.delete_flight(flight_id):
return DeleteResponse(deleted=True, flight_id=flight_id)
raise HTTPException(status_code=404, detail="Flight not found or could not be deleted.")
@router.put("/{flight_id}/waypoints/batch", response_model=BatchUpdateResponse)
async def batch_update_waypoints(
flight_id: str,
waypoints: List[Waypoint],
db: Any = Depends(get_flight_database)
):
"""Asynchronously batch-updates trajectory waypoints after factor graph convergence."""
if not db:
raise HTTPException(status_code=500, detail="Database dependency missing.")
result = db.batch_update_waypoints(flight_id, waypoints)
return BatchUpdateResponse(
success=len(result.failed_ids) == 0,
updated_count=result.updated_count,
failed_ids=result.failed_ids
)
@router.put("/{flight_id}/waypoints/{waypoint_id}", response_model=UpdateResponse)
async def update_waypoint(
flight_id: str,
waypoint_id: str,
waypoint: Waypoint,
db: Any = Depends(get_flight_database)
):
"""Updates a single waypoint (e.g., manual refinement)."""
if db and db.update_waypoint(flight_id, waypoint_id, waypoint):
return UpdateResponse(updated=True, waypoint_id=waypoint_id)
raise HTTPException(status_code=404, detail="Waypoint or Flight not found.")
@router.post("/{flight_id}/images/batch", response_model=BatchResponse, status_code=202)
async def upload_image_batch(
flight_id: str,
start_sequence: int = Form(...),
end_sequence: int = Form(...),
batch_number: int = Form(...),
images: List[UploadFile] = File(...),
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Ingests a sequential batch of UAV images and pushes them onto the Flight Processing Engine queue."""
if not _validate_batch_size(images):
raise HTTPException(status_code=400, detail="Batch size must be between 10 and 50 images.")
if not _validate_sequence_numbers(start_sequence, end_sequence, len(images)):
raise HTTPException(status_code=400, detail="Invalid sequence numbers or gap detected.")
for img in images:
if not _validate_image_format(img.content_type, img.filename):
raise HTTPException(status_code=400, detail=f"Invalid image format for {img.filename}. Must be JPEG or PNG.")
from f05_image_input_pipeline import ImageBatch
# Load byte data securely
image_bytes = [await img.read() for img in images]
filenames = [img.filename for img in images]
total_size = sum(len(b) for b in image_bytes)
if total_size > 500 * 1024 * 1024: # 500MB batch limit
raise HTTPException(status_code=413, detail="Batch size exceeds 500MB limit.")
batch = ImageBatch(
images=image_bytes,
filenames=filenames,
start_sequence=start_sequence,
end_sequence=end_sequence,
batch_number=batch_number
)
if manager.queue_images(flight_id, batch):
return _build_batch_response(True, start_sequence, end_sequence, "Batch queued for processing.")
raise HTTPException(status_code=400, detail="Batch validation failed.")
@router.post("/{flight_id}/user-fix", response_model=UserFixResponse)
async def submit_user_fix(
flight_id: str,
fix_data: UserFixRequest,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Provides a manual hard geodetic anchor when autonomous recovery fails (AC-6)."""
result = manager.handle_user_fix(flight_id, fix_data)
if result.get("status") == "success":
return UserFixResponse(accepted=True, processing_resumed=True, message=result.get("message"))
error_msg = result.get("message", "Fix rejected.")
if "not in blocked state" in error_msg.lower():
raise HTTPException(status_code=409, detail=error_msg)
if "not found" in error_msg.lower():
raise HTTPException(status_code=404, detail=error_msg)
raise HTTPException(status_code=400, detail=error_msg)
@router.get("/{flight_id}/status", response_model=FlightStatusResponse)
async def get_flight_status(
flight_id: str,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Retrieves the real-time processing and pipeline state of the flight."""
state = manager.get_flight_state(flight_id)
if not state:
raise HTTPException(status_code=404, detail="Flight not found.")
return _build_status_response(state)
@router.get("/{flight_id}/results", response_model=List[ResultResponse])
async def get_flight_results(
flight_id: str,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Retrieves computed flight results."""
results = manager.get_flight_results(flight_id)
if results is None:
raise HTTPException(status_code=404, detail="Flight not found.")
return results
@router.get("/{flight_id}/stream")
async def create_sse_stream(
flight_id: str,
request: Request,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""Opens a Server-Sent Events (SSE) stream for sub-millisecond, low-latency trajectory updates."""
if not manager.get_flight(flight_id):
raise HTTPException(status_code=404, detail="Flight not found.")
stream_generator = manager.create_client_stream(flight_id, client_id=request.client.host)
if not stream_generator:
raise HTTPException(status_code=500, detail="Failed to initialize telemetry stream.")
return EventSourceResponse(stream_generator)
@router.post("/{flight_id}/frames/{frame_id}/object-to-gps", response_model=ObjectGPSResponse)
async def convert_object_to_gps(
flight_id: str,
frame_id: int,
request: ObjectToGPSRequest,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""
Calculates the absolute GPS coordinate of an object selected by a user pixel click.
Utilizes Ray-Cloud intersection for high precision (AC-2/AC-10).
"""
if request.pixel_x < 0 or request.pixel_y < 0:
raise HTTPException(status_code=400, detail="Invalid pixel coordinates: must be non-negative.")
try:
gps_point = manager.convert_object_to_gps(flight_id, frame_id, (request.pixel_x, request.pixel_y))
if not gps_point:
raise HTTPException(status_code=409, detail="Frame not yet processed or pose unavailable.")
return ObjectGPSResponse(
gps=gps_point,
accuracy_meters=5.0,
frame_id=frame_id,
pixel=(request.pixel_x, request.pixel_y)
)
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception:
raise HTTPException(status_code=404, detail="Flight or frame not found.")
@router.get("/{flight_id}/frames/{frame_id}/context", response_model=FrameContextResponse)
async def get_frame_context(
flight_id: str,
frame_id: int,
manager: FlightLifecycleManager = Depends(get_lifecycle_manager)
):
"""
Retrieves the UAV image and top candidate satellite tiles to assist the user
in providing a manual GPS fix when the system is blocked.
"""
context = manager.get_frame_context(flight_id, frame_id)
if not context:
raise HTTPException(status_code=404, detail="Context not found for this flight or frame.")
return FrameContextResponse(**context)