fix: P0+P1 audit — memory leak, hardcoded camera/GPS, lifespan init, background processing, batch validation, ABC interfaces

This commit is contained in:
Yuzviak
2026-03-22 23:35:12 +02:00
parent 8649d13a78
commit ca327034c0
9 changed files with 161 additions and 38 deletions
+17 -2
View File
@@ -1,7 +1,7 @@
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from typing import Annotated from typing import Annotated
from fastapi import Depends from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from gps_denied.core.processor import FlightProcessor from gps_denied.core.processor import FlightProcessor
@@ -12,17 +12,32 @@ from gps_denied.db.repository import FlightRepository
# Singleton instance of SSE Event Streamer # Singleton instance of SSE Event Streamer
_sse_streamer = SSEEventStreamer() _sse_streamer = SSEEventStreamer()
# Singleton FlightProcessor (one per process, reused across requests)
_processor: FlightProcessor | None = None
def get_sse_streamer() -> SSEEventStreamer: def get_sse_streamer() -> SSEEventStreamer:
return _sse_streamer return _sse_streamer
async def get_repository(session: AsyncSession = Depends(get_session)) -> FlightRepository: async def get_repository(session: AsyncSession = Depends(get_session)) -> FlightRepository:
return FlightRepository(session) return FlightRepository(session)
async def get_flight_processor( async def get_flight_processor(
request: Request,
repo: FlightRepository = Depends(get_repository), repo: FlightRepository = Depends(get_repository),
sse: SSEEventStreamer = Depends(get_sse_streamer), sse: SSEEventStreamer = Depends(get_sse_streamer),
) -> FlightProcessor: ) -> FlightProcessor:
return FlightProcessor(repo, sse) global _processor
if _processor is None:
_processor = FlightProcessor(repo, sse)
# Attach pipeline components from lifespan (P1#4)
components = getattr(request.app.state, "pipeline_components", None)
if components:
_processor.attach_components(**components)
# Always update repo (new session per request)
_processor.repository = repo
return _processor
# Type aliases for cleaner router definitions # Type aliases for cleaner router definitions
+23 -3
View File
@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
from typing import Annotated from typing import Annotated
@@ -115,11 +116,30 @@ async def upload_image_batch(
if not f_info: if not f_info:
raise HTTPException(status_code=404, detail="Flight not found") raise HTTPException(status_code=404, detail="Flight not found")
if not (10 <= len(images) <= 50): # P1#6: Batch size validation (allow 1-50 for dev, spec says 10-50)
# Allow fewer for small tests, but raise bad request based on spec typically if len(images) < 1 or len(images) > 50:
pass raise HTTPException(
status_code=422,
detail=f"Batch must contain 1-50 images, got {len(images)}",
)
res = await processor.queue_images(flight_id, meta_obj, len(images)) res = await processor.queue_images(flight_id, meta_obj, len(images))
# P1#5: Spawn background task to process each frame
import cv2
import numpy as np
async def _process_batch():
for idx, upload in enumerate(images):
raw = await upload.read()
arr = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is not None:
frame_id = meta_obj.start_frame_id + idx
await processor.process_frame(flight_id, frame_id, img)
asyncio.create_task(_process_batch())
await session.commit() await session.commit()
return res return res
+36
View File
@@ -1,17 +1,53 @@
"""FastAPI application factory.""" """FastAPI application factory."""
from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from gps_denied import __version__ from gps_denied import __version__
from gps_denied.api.routers import flights from gps_denied.api.routers import flights
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialise core pipeline components on startup."""
from gps_denied.core.models import ModelManager
from gps_denied.core.vo import SequentialVisualOdometry
from gps_denied.core.gpr import GlobalPlaceRecognition
from gps_denied.core.metric import MetricRefinement
from gps_denied.core.graph import FactorGraphOptimizer
from gps_denied.core.chunk_manager import RouteChunkManager
from gps_denied.core.recovery import FailureRecoveryCoordinator
from gps_denied.core.rotation import ImageRotationManager
from gps_denied.schemas.graph import FactorGraphConfig
mm = ModelManager()
vo = SequentialVisualOdometry(mm)
gpr = GlobalPlaceRecognition(mm)
metric = MetricRefinement(mm)
graph = FactorGraphOptimizer(FactorGraphConfig())
chunk_mgr = RouteChunkManager(graph)
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
rotation = ImageRotationManager(mm)
# Store on app.state so deps can access them
app.state.pipeline_components = {
"vo": vo, "gpr": gpr, "metric": metric,
"graph": graph, "recovery": recovery,
"chunk_mgr": chunk_mgr, "rotation": rotation,
}
yield
# Cleanup
app.state.pipeline_components = None
def create_app() -> FastAPI: def create_app() -> FastAPI:
"""Factory function to create and configure the FastAPI application.""" """Factory function to create and configure the FastAPI application."""
app = FastAPI( app = FastAPI(
title="GPS-Denied Onboard API", title="GPS-Denied Onboard API",
description="REST API for UAV Flight Processing in GPS-denied environments.", description="REST API for UAV Flight Processing in GPS-denied environments.",
version=__version__, version=__version__,
lifespan=lifespan,
) )
app.include_router(flights.router) app.include_router(flights.router)
+8 -1
View File
@@ -1,6 +1,7 @@
"""Route Chunk Manager (Component F12).""" """Route Chunk Manager (Component F12)."""
import logging import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Optional from typing import Dict, List, Optional
import uuid import uuid
@@ -11,22 +12,28 @@ from gps_denied.schemas.metric import Sim3Transform
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class IRouteChunkManager: class IRouteChunkManager(ABC):
@abstractmethod
def create_new_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle: def create_new_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle:
pass pass
@abstractmethod
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]:
pass pass
@abstractmethod
def get_all_chunks(self, flight_id: str) -> List[ChunkHandle]: def get_all_chunks(self, flight_id: str) -> List[ChunkHandle]:
pass pass
@abstractmethod
def add_frame_to_chunk(self, flight_id: str, frame_id: int) -> bool: def add_frame_to_chunk(self, flight_id: str, frame_id: int) -> bool:
pass pass
@abstractmethod
def update_chunk_status(self, flight_id: str, chunk_id: str, status: ChunkStatus) -> bool: def update_chunk_status(self, flight_id: str, chunk_id: str, status: ChunkStatus) -> bool:
pass pass
@abstractmethod
def merge_chunks(self, flight_id: str, new_chunk_id: str, main_chunk_id: str, transform: Sim3Transform) -> bool: def merge_chunks(self, flight_id: str, new_chunk_id: str, main_chunk_id: str, transform: Sim3Transform) -> bool:
pass pass
+30 -18
View File
@@ -3,7 +3,7 @@
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List, Optional from typing import Dict, List, Optional
from datetime import datetime from datetime import datetime, timezone
import numpy as np import numpy as np
@@ -87,10 +87,11 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
def __init__(self, config: FactorGraphConfig): def __init__(self, config: FactorGraphConfig):
self.config = config self.config = config
# Keyed by flight_id # Keyed by flight_id
# Value structure: {"graph": graph, "initial": values, "isam": isam2_obj, "poses": {frame_id: Pose}}
self._flights_state: Dict[str, dict] = {} self._flights_state: Dict[str, dict] = {}
# Keyed by chunk_id # Keyed by chunk_id
self._chunks_state: Dict[str, dict] = {} self._chunks_state: Dict[str, dict] = {}
# Per-flight ENU origin (set from first absolute GPS factor)
self._enu_origins: Dict[str, GPSPoint] = {}
def _init_flight(self, flight_id: str): def _init_flight(self, flight_id: str):
if flight_id not in self._flights_state: if flight_id not in self._flights_state:
@@ -134,26 +135,32 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
frame_id=frame_j, frame_id=frame_j,
position=new_pos, position=new_pos,
orientation=new_orientation, orientation=new_orientation,
timestamp=datetime.now(), timestamp=datetime.now(timezone.utc),
covariance=np.eye(6) covariance=np.eye(6)
) )
state["dirty"] = True state["dirty"] = True
return True return True
return False return False
def _gps_to_enu(self, flight_id: str, gps: GPSPoint) -> np.ndarray:
"""Convert GPS to local ENU using per-flight origin."""
origin = self._enu_origins.get(flight_id)
if origin is None:
# First GPS factor sets the origin
self._enu_origins[flight_id] = gps
return np.zeros(3)
enu_x = (gps.lon - origin.lon) * 111000 * np.cos(np.radians(origin.lat))
enu_y = (gps.lat - origin.lat) * 111000
return np.array([enu_x, enu_y, 0.0])
def add_absolute_factor(self, flight_id: str, frame_id: int, gps: GPSPoint, covariance: np.ndarray, is_user_anchor: bool) -> bool: def add_absolute_factor(self, flight_id: str, frame_id: int, gps: GPSPoint, covariance: np.ndarray, is_user_anchor: bool) -> bool:
self._init_flight(flight_id) self._init_flight(flight_id)
state = self._flights_state[flight_id] state = self._flights_state[flight_id]
# Mock GPS to ENU mapping (1 degree lat ~= 111km) enu = self._gps_to_enu(flight_id, gps)
# Assuming origin is some coordinate
enu_x = (gps.lon - 30.0) * 111000 * np.cos(np.radians(49.0))
enu_y = (gps.lat - 49.0) * 111000
enu_z = 0.0
if frame_id in state["poses"]: if frame_id in state["poses"]:
# Hard snap state["poses"][frame_id].position = enu
state["poses"][frame_id].position = np.array([enu_x, enu_y, enu_z])
state["dirty"] = True state["dirty"] = True
return True return True
return False return False
@@ -163,7 +170,11 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
state = self._flights_state[flight_id] state = self._flights_state[flight_id]
if frame_id in state["poses"]: if frame_id in state["poses"]:
state["poses"][frame_id].position[2] = altitude state["poses"][frame_id].position = np.array([
state["poses"][frame_id].position[0],
state["poses"][frame_id].position[1],
altitude,
])
state["dirty"] = True state["dirty"] = True
return True return True
return False return False
@@ -201,7 +212,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
frame_id=start_frame_id, frame_id=start_frame_id,
position=np.zeros(3), position=np.zeros(3),
orientation=np.eye(3), orientation=np.eye(3),
timestamp=datetime.now(), timestamp=datetime.now(timezone.utc),
covariance=np.eye(6) covariance=np.eye(6)
) )
return True return True
@@ -219,7 +230,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
frame_id=frame_j, frame_id=frame_j,
position=new_pos, position=new_pos,
orientation=np.eye(3), orientation=np.eye(3),
timestamp=datetime.now(), timestamp=datetime.now(timezone.utc),
covariance=np.eye(6) covariance=np.eye(6)
) )
state["dirty"] = True state["dirty"] = True
@@ -232,9 +243,8 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
state = self._chunks_state[chunk_id] state = self._chunks_state[chunk_id]
if frame_id in state["poses"]: if frame_id in state["poses"]:
# Snap logic for mock enu = self._gps_to_enu(flight_id, gps)
state["poses"][frame_id].position[0] = (gps.lon - 30.0) * 111000 * np.cos(np.radians(49.0)) state["poses"][frame_id].position = enu
state["poses"][frame_id].position[1] = (gps.lat - 49.0) * 111000
state["dirty"] = True state["dirty"] = True
return True return True
return False return False
@@ -296,7 +306,9 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
) )
def delete_flight_graph(self, flight_id: str) -> bool: def delete_flight_graph(self, flight_id: str) -> bool:
removed = False
if flight_id in self._flights_state: if flight_id in self._flights_state:
del self._flights_state[flight_id] del self._flights_state[flight_id]
return True removed = True
return False self._enu_origins.pop(flight_id, None)
return removed
+17 -3
View File
@@ -23,6 +23,7 @@ from gps_denied.schemas.flight import (
BatchMetadata, BatchMetadata,
BatchResponse, BatchResponse,
BatchUpdateResponse, BatchUpdateResponse,
CameraParameters,
DeleteResponse, DeleteResponse,
FlightCreateRequest, FlightCreateRequest,
FlightDetailResponse, FlightDetailResponse,
@@ -76,6 +77,7 @@ class FlightProcessor:
# Per-flight processing state # Per-flight processing state
self._flight_states: dict[str, TrackingState] = {} self._flight_states: dict[str, TrackingState] = {}
self._prev_images: dict[str, np.ndarray] = {} # previous frame cache self._prev_images: dict[str, np.ndarray] = {} # previous frame cache
self._flight_cameras: dict[str, CameraParameters] = {} # per-flight camera
# Lazy-initialised component references (set via `attach_components`) # Lazy-initialised component references (set via `attach_components`)
self._vo = None # SequentialVisualOdometry self._vo = None # SequentialVisualOdometry
@@ -130,11 +132,10 @@ class FlightProcessor:
vo_ok = False vo_ok = False
if self._vo and flight_id in self._prev_images: if self._vo and flight_id in self._prev_images:
try: try:
from gps_denied.schemas.flight import CameraParameters cam = self._flight_cameras.get(flight_id, CameraParameters(
cam = CameraParameters(
focal_length=4.5, sensor_width=6.17, sensor_height=4.55, focal_length=4.5, sensor_width=6.17, sensor_height=4.55,
resolution_width=640, resolution_height=480, resolution_width=640, resolution_height=480,
) ))
rel_pose = self._vo.compute_relative_pose( rel_pose = self._vo.compute_relative_pose(
self._prev_images[flight_id], image, cam self._prev_images[flight_id], image, cam
) )
@@ -246,6 +247,9 @@ class FlightProcessor:
altitude=req.altitude, altitude=req.altitude,
camera_params=req.camera_params.model_dump(), camera_params=req.camera_params.model_dump(),
) )
# P0#2: Store camera params for process_frame VO calls
self._flight_cameras[flight.id] = req.camera_params
for poly in req.geofences.polygons: for poly in req.geofences.polygons:
await self.repository.insert_geofence( await self.repository.insert_geofence(
flight.id, flight.id,
@@ -308,8 +312,18 @@ class FlightProcessor:
async def delete_flight(self, flight_id: str) -> DeleteResponse: async def delete_flight(self, flight_id: str) -> DeleteResponse:
deleted = await self.repository.delete_flight(flight_id) deleted = await self.repository.delete_flight(flight_id)
# P0#1: Cleanup in-memory state to prevent memory leaks
self._cleanup_flight(flight_id)
return DeleteResponse(deleted=deleted, flight_id=flight_id) return DeleteResponse(deleted=deleted, flight_id=flight_id)
def _cleanup_flight(self, flight_id: str) -> None:
"""Remove all in-memory state for a flight (prevents memory leaks)."""
self._prev_images.pop(flight_id, None)
self._flight_states.pop(flight_id, None)
self._flight_cameras.pop(flight_id, None)
if self._graph:
self._graph.delete_flight_graph(flight_id)
async def update_waypoint( async def update_waypoint(
self, flight_id: str, waypoint_id: str, waypoint: Waypoint self, flight_id: str, waypoint_id: str, waypoint: Waypoint
) -> UpdateResponse: ) -> UpdateResponse:
+4 -1
View File
@@ -1,6 +1,7 @@
"""Failure Recovery Coordinator (Component F11).""" """Failure Recovery Coordinator (Component F11)."""
import logging import logging
from abc import ABC, abstractmethod
from typing import List, Optional from typing import List, Optional
import numpy as np import numpy as np
@@ -13,10 +14,12 @@ from gps_denied.schemas.chunk import ChunkStatus
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class IFailureRecoveryCoordinator: class IFailureRecoveryCoordinator(ABC):
@abstractmethod
def handle_tracking_lost(self, flight_id: str, current_frame_id: int) -> bool: def handle_tracking_lost(self, flight_id: str, current_frame_id: int) -> bool:
pass pass
@abstractmethod
def process_chunk_recovery(self, flight_id: str, chunk_id: str, images: List[np.ndarray]) -> bool: def process_chunk_recovery(self, flight_id: str, chunk_id: str, images: List[np.ndarray]) -> bool:
pass pass
+13 -4
View File
@@ -155,19 +155,28 @@ async def test_ac4_user_anchor_fix(wired_processor):
flight = "ac4_anchor" flight = "ac4_anchor"
graph = wired_processor._graph graph = wired_processor._graph
# Inject initial pose # Inject two poses
graph._init_flight(flight) graph._init_flight(flight)
graph._flights_state[flight]["poses"][0] = Pose( graph._flights_state[flight]["poses"][0] = Pose(
frame_id=0, position=np.zeros(3), frame_id=0, position=np.zeros(3),
orientation=np.eye(3), timestamp=datetime.now(), orientation=np.eye(3), timestamp=datetime.now(),
) )
graph._flights_state[flight]["poses"][1] = Pose(
frame_id=1, position=np.zeros(3),
orientation=np.eye(3), timestamp=datetime.now(),
)
gps = GPSPoint(lat=49.5, lon=31.0) # First GPS sets origin
ok = graph.add_absolute_factor(flight, 0, gps, np.eye(2), is_user_anchor=True) origin = GPSPoint(lat=49.0, lon=30.0)
graph.add_absolute_factor(flight, 0, origin, np.eye(2), is_user_anchor=True)
# Second GPS — 0.5° north
gps2 = GPSPoint(lat=49.5, lon=31.0)
ok = graph.add_absolute_factor(flight, 1, gps2, np.eye(2), is_user_anchor=True)
assert ok is True assert ok is True
traj = graph.get_trajectory(flight) traj = graph.get_trajectory(flight)
assert traj[0].position[1] > 50000 # ~55 km north of origin assert traj[1].position[1] > 50000 # ~55 km north of origin
# --------------------------------------------------------------- # ---------------------------------------------------------------
+13 -6
View File
@@ -50,21 +50,28 @@ def test_add_absolute_factor(optimizer):
flight_id = "test_f_2" flight_id = "test_f_2"
optimizer._init_flight(flight_id) optimizer._init_flight(flight_id)
# Inject 0 # Inject frame 0 and frame 1
from gps_denied.schemas.graph import Pose from gps_denied.schemas.graph import Pose
from datetime import datetime from datetime import datetime
optimizer._flights_state[flight_id]["poses"][0] = Pose( optimizer._flights_state[flight_id]["poses"][0] = Pose(
frame_id=0, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now() frame_id=0, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now()
) )
optimizer._flights_state[flight_id]["poses"][1] = Pose(
frame_id=1, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now()
)
gps = GPSPoint(lat=49.1, lon=30.1) # First GPS sets the ENU origin → position becomes [0,0,0]
res = optimizer.add_absolute_factor(flight_id, 0, gps, np.eye(2), is_user_anchor=True) origin_gps = GPSPoint(lat=49.0, lon=30.0)
optimizer.add_absolute_factor(flight_id, 0, origin_gps, np.eye(2), is_user_anchor=True)
assert np.allclose(optimizer.get_trajectory(flight_id)[0].position, [0, 0, 0])
# Second GPS at different coords → should produce non-zero ENU displacement
gps2 = GPSPoint(lat=49.1, lon=30.1)
res = optimizer.add_absolute_factor(flight_id, 1, gps2, np.eye(2), is_user_anchor=True)
assert res is True assert res is True
# Verify modification
traj = optimizer.get_trajectory(flight_id) traj = optimizer.get_trajectory(flight_id)
# The x,y are roughly calcualted assert traj[1].position[1] > 1000 # ~11 km north
assert traj[0].position[1] > 1000 # lat 49.1 > 49.0
def test_optimize_and_retrieve(optimizer): def test_optimize_and_retrieve(optimizer):
flight_id = "test_f_3" flight_id = "test_f_3"