From ca327034c0e39b65964e674323cb3d32d3d1dcf9 Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Sun, 22 Mar 2026 23:35:12 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20P0+P1=20audit=20=E2=80=94=20memory=20lea?= =?UTF-8?q?k,=20hardcoded=20camera/GPS,=20lifespan=20init,=20background=20?= =?UTF-8?q?processing,=20batch=20validation,=20ABC=20interfaces?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gps_denied/api/deps.py | 19 +++++++++-- src/gps_denied/api/routers/flights.py | 26 +++++++++++++-- src/gps_denied/app.py | 36 ++++++++++++++++++++ src/gps_denied/core/chunk_manager.py | 9 ++++- src/gps_denied/core/graph.py | 48 +++++++++++++++++---------- src/gps_denied/core/processor.py | 20 +++++++++-- src/gps_denied/core/recovery.py | 5 ++- tests/test_acceptance.py | 17 +++++++--- tests/test_graph.py | 19 +++++++---- 9 files changed, 161 insertions(+), 38 deletions(-) diff --git a/src/gps_denied/api/deps.py b/src/gps_denied/api/deps.py index 97d4287..e36a53e 100644 --- a/src/gps_denied/api/deps.py +++ b/src/gps_denied/api/deps.py @@ -1,7 +1,7 @@ from collections.abc import AsyncGenerator from typing import Annotated -from fastapi import Depends +from fastapi import Depends, Request from sqlalchemy.ext.asyncio import AsyncSession from gps_denied.core.processor import FlightProcessor @@ -12,17 +12,32 @@ from gps_denied.db.repository import FlightRepository # Singleton instance of SSE Event Streamer _sse_streamer = SSEEventStreamer() +# Singleton FlightProcessor (one per process, reused across requests) +_processor: FlightProcessor | None = None + + def get_sse_streamer() -> SSEEventStreamer: return _sse_streamer async def get_repository(session: AsyncSession = Depends(get_session)) -> FlightRepository: return FlightRepository(session) + async def get_flight_processor( + request: Request, repo: FlightRepository = Depends(get_repository), sse: SSEEventStreamer = Depends(get_sse_streamer), ) -> FlightProcessor: - return FlightProcessor(repo, sse) + global _processor + if _processor is None: + _processor = FlightProcessor(repo, sse) + # Attach pipeline components from lifespan (P1#4) + components = getattr(request.app.state, "pipeline_components", None) + if components: + _processor.attach_components(**components) + # Always update repo (new session per request) + _processor.repository = repo + return _processor # Type aliases for cleaner router definitions diff --git a/src/gps_denied/api/routers/flights.py b/src/gps_denied/api/routers/flights.py index 1aea133..617a994 100644 --- a/src/gps_denied/api/routers/flights.py +++ b/src/gps_denied/api/routers/flights.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json from typing import Annotated @@ -115,11 +116,30 @@ async def upload_image_batch( if not f_info: raise HTTPException(status_code=404, detail="Flight not found") - if not (10 <= len(images) <= 50): - # Allow fewer for small tests, but raise bad request based on spec typically - pass + # P1#6: Batch size validation (allow 1-50 for dev, spec says 10-50) + if len(images) < 1 or len(images) > 50: + raise HTTPException( + status_code=422, + detail=f"Batch must contain 1-50 images, got {len(images)}", + ) res = await processor.queue_images(flight_id, meta_obj, len(images)) + + # P1#5: Spawn background task to process each frame + import cv2 + import numpy as np + + async def _process_batch(): + for idx, upload in enumerate(images): + raw = await upload.read() + arr = np.frombuffer(raw, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img is not None: + frame_id = meta_obj.start_frame_id + idx + await processor.process_frame(flight_id, frame_id, img) + + asyncio.create_task(_process_batch()) + await session.commit() return res diff --git a/src/gps_denied/app.py b/src/gps_denied/app.py index ce8e029..7054172 100644 --- a/src/gps_denied/app.py +++ b/src/gps_denied/app.py @@ -1,17 +1,53 @@ """FastAPI application factory.""" +from contextlib import asynccontextmanager + from fastapi import FastAPI from gps_denied import __version__ from gps_denied.api.routers import flights +@asynccontextmanager +async def lifespan(app: FastAPI): + """Initialise core pipeline components on startup.""" + from gps_denied.core.models import ModelManager + from gps_denied.core.vo import SequentialVisualOdometry + from gps_denied.core.gpr import GlobalPlaceRecognition + from gps_denied.core.metric import MetricRefinement + from gps_denied.core.graph import FactorGraphOptimizer + from gps_denied.core.chunk_manager import RouteChunkManager + from gps_denied.core.recovery import FailureRecoveryCoordinator + from gps_denied.core.rotation import ImageRotationManager + from gps_denied.schemas.graph import FactorGraphConfig + + mm = ModelManager() + vo = SequentialVisualOdometry(mm) + gpr = GlobalPlaceRecognition(mm) + metric = MetricRefinement(mm) + graph = FactorGraphOptimizer(FactorGraphConfig()) + chunk_mgr = RouteChunkManager(graph) + recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric) + rotation = ImageRotationManager(mm) + + # Store on app.state so deps can access them + app.state.pipeline_components = { + "vo": vo, "gpr": gpr, "metric": metric, + "graph": graph, "recovery": recovery, + "chunk_mgr": chunk_mgr, "rotation": rotation, + } + yield + # Cleanup + app.state.pipeline_components = None + + def create_app() -> FastAPI: """Factory function to create and configure the FastAPI application.""" app = FastAPI( title="GPS-Denied Onboard API", description="REST API for UAV Flight Processing in GPS-denied environments.", version=__version__, + lifespan=lifespan, ) app.include_router(flights.router) diff --git a/src/gps_denied/core/chunk_manager.py b/src/gps_denied/core/chunk_manager.py index 7075674..0fdef25 100644 --- a/src/gps_denied/core/chunk_manager.py +++ b/src/gps_denied/core/chunk_manager.py @@ -1,6 +1,7 @@ """Route Chunk Manager (Component F12).""" import logging +from abc import ABC, abstractmethod from typing import Dict, List, Optional import uuid @@ -11,22 +12,28 @@ from gps_denied.schemas.metric import Sim3Transform logger = logging.getLogger(__name__) -class IRouteChunkManager: +class IRouteChunkManager(ABC): + @abstractmethod def create_new_chunk(self, flight_id: str, start_frame_id: int) -> ChunkHandle: pass + @abstractmethod def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]: pass + @abstractmethod def get_all_chunks(self, flight_id: str) -> List[ChunkHandle]: pass + @abstractmethod def add_frame_to_chunk(self, flight_id: str, frame_id: int) -> bool: pass + @abstractmethod def update_chunk_status(self, flight_id: str, chunk_id: str, status: ChunkStatus) -> bool: pass + @abstractmethod def merge_chunks(self, flight_id: str, new_chunk_id: str, main_chunk_id: str, transform: Sim3Transform) -> bool: pass diff --git a/src/gps_denied/core/graph.py b/src/gps_denied/core/graph.py index 03d4d7b..13e0219 100644 --- a/src/gps_denied/core/graph.py +++ b/src/gps_denied/core/graph.py @@ -3,7 +3,7 @@ import logging from abc import ABC, abstractmethod from typing import Dict, List, Optional -from datetime import datetime +from datetime import datetime, timezone import numpy as np @@ -87,10 +87,11 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): def __init__(self, config: FactorGraphConfig): self.config = config # Keyed by flight_id - # Value structure: {"graph": graph, "initial": values, "isam": isam2_obj, "poses": {frame_id: Pose}} self._flights_state: Dict[str, dict] = {} # Keyed by chunk_id self._chunks_state: Dict[str, dict] = {} + # Per-flight ENU origin (set from first absolute GPS factor) + self._enu_origins: Dict[str, GPSPoint] = {} def _init_flight(self, flight_id: str): if flight_id not in self._flights_state: @@ -134,26 +135,32 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): frame_id=frame_j, position=new_pos, orientation=new_orientation, - timestamp=datetime.now(), + timestamp=datetime.now(timezone.utc), covariance=np.eye(6) ) state["dirty"] = True return True return False + def _gps_to_enu(self, flight_id: str, gps: GPSPoint) -> np.ndarray: + """Convert GPS to local ENU using per-flight origin.""" + origin = self._enu_origins.get(flight_id) + if origin is None: + # First GPS factor sets the origin + self._enu_origins[flight_id] = gps + return np.zeros(3) + enu_x = (gps.lon - origin.lon) * 111000 * np.cos(np.radians(origin.lat)) + enu_y = (gps.lat - origin.lat) * 111000 + return np.array([enu_x, enu_y, 0.0]) + def add_absolute_factor(self, flight_id: str, frame_id: int, gps: GPSPoint, covariance: np.ndarray, is_user_anchor: bool) -> bool: self._init_flight(flight_id) state = self._flights_state[flight_id] - # Mock GPS to ENU mapping (1 degree lat ~= 111km) - # Assuming origin is some coordinate - enu_x = (gps.lon - 30.0) * 111000 * np.cos(np.radians(49.0)) - enu_y = (gps.lat - 49.0) * 111000 - enu_z = 0.0 + enu = self._gps_to_enu(flight_id, gps) if frame_id in state["poses"]: - # Hard snap - state["poses"][frame_id].position = np.array([enu_x, enu_y, enu_z]) + state["poses"][frame_id].position = enu state["dirty"] = True return True return False @@ -163,7 +170,11 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): state = self._flights_state[flight_id] if frame_id in state["poses"]: - state["poses"][frame_id].position[2] = altitude + state["poses"][frame_id].position = np.array([ + state["poses"][frame_id].position[0], + state["poses"][frame_id].position[1], + altitude, + ]) state["dirty"] = True return True return False @@ -201,7 +212,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): frame_id=start_frame_id, position=np.zeros(3), orientation=np.eye(3), - timestamp=datetime.now(), + timestamp=datetime.now(timezone.utc), covariance=np.eye(6) ) return True @@ -219,7 +230,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): frame_id=frame_j, position=new_pos, orientation=np.eye(3), - timestamp=datetime.now(), + timestamp=datetime.now(timezone.utc), covariance=np.eye(6) ) state["dirty"] = True @@ -232,9 +243,8 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): state = self._chunks_state[chunk_id] if frame_id in state["poses"]: - # Snap logic for mock - state["poses"][frame_id].position[0] = (gps.lon - 30.0) * 111000 * np.cos(np.radians(49.0)) - state["poses"][frame_id].position[1] = (gps.lat - 49.0) * 111000 + enu = self._gps_to_enu(flight_id, gps) + state["poses"][frame_id].position = enu state["dirty"] = True return True return False @@ -296,7 +306,9 @@ class FactorGraphOptimizer(IFactorGraphOptimizer): ) def delete_flight_graph(self, flight_id: str) -> bool: + removed = False if flight_id in self._flights_state: del self._flights_state[flight_id] - return True - return False + removed = True + self._enu_origins.pop(flight_id, None) + return removed diff --git a/src/gps_denied/core/processor.py b/src/gps_denied/core/processor.py index 32ce628..a9b6de0 100644 --- a/src/gps_denied/core/processor.py +++ b/src/gps_denied/core/processor.py @@ -23,6 +23,7 @@ from gps_denied.schemas.flight import ( BatchMetadata, BatchResponse, BatchUpdateResponse, + CameraParameters, DeleteResponse, FlightCreateRequest, FlightDetailResponse, @@ -76,6 +77,7 @@ class FlightProcessor: # Per-flight processing state self._flight_states: dict[str, TrackingState] = {} self._prev_images: dict[str, np.ndarray] = {} # previous frame cache + self._flight_cameras: dict[str, CameraParameters] = {} # per-flight camera # Lazy-initialised component references (set via `attach_components`) self._vo = None # SequentialVisualOdometry @@ -130,11 +132,10 @@ class FlightProcessor: vo_ok = False if self._vo and flight_id in self._prev_images: try: - from gps_denied.schemas.flight import CameraParameters - cam = CameraParameters( + cam = self._flight_cameras.get(flight_id, CameraParameters( focal_length=4.5, sensor_width=6.17, sensor_height=4.55, resolution_width=640, resolution_height=480, - ) + )) rel_pose = self._vo.compute_relative_pose( self._prev_images[flight_id], image, cam ) @@ -246,6 +247,9 @@ class FlightProcessor: altitude=req.altitude, camera_params=req.camera_params.model_dump(), ) + # P0#2: Store camera params for process_frame VO calls + self._flight_cameras[flight.id] = req.camera_params + for poly in req.geofences.polygons: await self.repository.insert_geofence( flight.id, @@ -308,8 +312,18 @@ class FlightProcessor: async def delete_flight(self, flight_id: str) -> DeleteResponse: deleted = await self.repository.delete_flight(flight_id) + # P0#1: Cleanup in-memory state to prevent memory leaks + self._cleanup_flight(flight_id) return DeleteResponse(deleted=deleted, flight_id=flight_id) + def _cleanup_flight(self, flight_id: str) -> None: + """Remove all in-memory state for a flight (prevents memory leaks).""" + self._prev_images.pop(flight_id, None) + self._flight_states.pop(flight_id, None) + self._flight_cameras.pop(flight_id, None) + if self._graph: + self._graph.delete_flight_graph(flight_id) + async def update_waypoint( self, flight_id: str, waypoint_id: str, waypoint: Waypoint ) -> UpdateResponse: diff --git a/src/gps_denied/core/recovery.py b/src/gps_denied/core/recovery.py index b2f5da4..acce720 100644 --- a/src/gps_denied/core/recovery.py +++ b/src/gps_denied/core/recovery.py @@ -1,6 +1,7 @@ """Failure Recovery Coordinator (Component F11).""" import logging +from abc import ABC, abstractmethod from typing import List, Optional import numpy as np @@ -13,10 +14,12 @@ from gps_denied.schemas.chunk import ChunkStatus logger = logging.getLogger(__name__) -class IFailureRecoveryCoordinator: +class IFailureRecoveryCoordinator(ABC): + @abstractmethod def handle_tracking_lost(self, flight_id: str, current_frame_id: int) -> bool: pass + @abstractmethod def process_chunk_recovery(self, flight_id: str, chunk_id: str, images: List[np.ndarray]) -> bool: pass diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 2b69758..c7e4c49 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -155,19 +155,28 @@ async def test_ac4_user_anchor_fix(wired_processor): flight = "ac4_anchor" graph = wired_processor._graph - # Inject initial pose + # Inject two poses graph._init_flight(flight) graph._flights_state[flight]["poses"][0] = Pose( frame_id=0, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now(), ) + graph._flights_state[flight]["poses"][1] = Pose( + frame_id=1, position=np.zeros(3), + orientation=np.eye(3), timestamp=datetime.now(), + ) - gps = GPSPoint(lat=49.5, lon=31.0) - ok = graph.add_absolute_factor(flight, 0, gps, np.eye(2), is_user_anchor=True) + # First GPS sets origin + origin = GPSPoint(lat=49.0, lon=30.0) + graph.add_absolute_factor(flight, 0, origin, np.eye(2), is_user_anchor=True) + + # Second GPS — 0.5° north + gps2 = GPSPoint(lat=49.5, lon=31.0) + ok = graph.add_absolute_factor(flight, 1, gps2, np.eye(2), is_user_anchor=True) assert ok is True traj = graph.get_trajectory(flight) - assert traj[0].position[1] > 50000 # ~55 km north of origin + assert traj[1].position[1] > 50000 # ~55 km north of origin # --------------------------------------------------------------- diff --git a/tests/test_graph.py b/tests/test_graph.py index 3ef2ab8..9a4f25b 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -50,21 +50,28 @@ def test_add_absolute_factor(optimizer): flight_id = "test_f_2" optimizer._init_flight(flight_id) - # Inject 0 + # Inject frame 0 and frame 1 from gps_denied.schemas.graph import Pose from datetime import datetime optimizer._flights_state[flight_id]["poses"][0] = Pose( frame_id=0, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now() ) + optimizer._flights_state[flight_id]["poses"][1] = Pose( + frame_id=1, position=np.zeros(3), orientation=np.eye(3), timestamp=datetime.now() + ) - gps = GPSPoint(lat=49.1, lon=30.1) - res = optimizer.add_absolute_factor(flight_id, 0, gps, np.eye(2), is_user_anchor=True) + # First GPS sets the ENU origin → position becomes [0,0,0] + origin_gps = GPSPoint(lat=49.0, lon=30.0) + optimizer.add_absolute_factor(flight_id, 0, origin_gps, np.eye(2), is_user_anchor=True) + assert np.allclose(optimizer.get_trajectory(flight_id)[0].position, [0, 0, 0]) + + # Second GPS at different coords → should produce non-zero ENU displacement + gps2 = GPSPoint(lat=49.1, lon=30.1) + res = optimizer.add_absolute_factor(flight_id, 1, gps2, np.eye(2), is_user_anchor=True) assert res is True - # Verify modification traj = optimizer.get_trajectory(flight_id) - # The x,y are roughly calcualted - assert traj[0].position[1] > 1000 # lat 49.1 > 49.0 + assert traj[1].position[1] > 1000 # ~11 km north def test_optimize_and_retrieve(optimizer): flight_id = "test_f_3"