# Architecture **Analysis Date:** 2026-04-01 ## Pattern Overview **Overall:** Layered async service with component-injected processing pipeline **Key Characteristics:** - FastAPI HTTP layer delegates all logic to a singleton `FlightProcessor` orchestrator - Core processing components are instantiated at app startup via lifespan and injected via `attach_components()` - All components define ABC interfaces (`ISequentialVisualOdometry`, `IFactorGraphOptimizer`, etc.) with a single concrete implementation — enabling future substitution - All inference engines are mocked behind `IModelManager` / `MockInferenceEngine`; no real GPU/TRT execution exists in code yet - Database layer is async SQLAlchemy (aiosqlite default) with a thin `FlightRepository` DAO - SSE streaming is fully wired: per-flight async queues, `EventSourceResponse` at `GET /flights/{id}/stream` ## Layers **API Layer:** - Purpose: HTTP request routing, validation, auth-free (no JWT in code despite spec) - Location: `src/gps_denied/api/` - Contains: FastAPI routers, dependency injection wiring, `deps.py` singletons - Depends on: `FlightProcessor`, `FlightRepository`, `SSEEventStreamer` - Used by: External callers, other onboard systems **Orchestration Layer:** - Purpose: Manages per-flight state machine, invokes pipeline components in sequence - Location: `src/gps_denied/core/processor.py` - Contains: `FlightProcessor`, `TrackingState` enum (NORMAL/LOST/RECOVERY), `FrameResult` - Depends on: All core components, `FlightRepository`, `SSEEventStreamer` - Used by: API layer via dependency injection **Core Pipeline Components:** - Purpose: Individual processing stages, each behind an interface - Location: `src/gps_denied/core/` - Contains: `ImageInputPipeline`, `SequentialVisualOdometry`, `GlobalPlaceRecognition`, `MetricRefinement`, `FactorGraphOptimizer`, `RouteChunkManager`, `FailureRecoveryCoordinator`, `ImageRotationManager`, `CoordinateTransformer`, `ResultManager`, `SSEEventStreamer`, `SatelliteDataManager`, `ModelManager` - Depends on: `IModelManager`, `FlightRepository` (some), schemas - Used by: `FlightProcessor` **Inference Layer:** - Purpose: AI model lifecycle and inference dispatch - Location: `src/gps_denied/core/models.py` - Contains: `IModelManager`, `ModelManager`, `MockInferenceEngine` - Depends on: `schemas/model.py` - Used by: `SequentialVisualOdometry`, `GlobalPlaceRecognition`, `MetricRefinement` **Database Layer:** - Purpose: Async persistence, all SQL via SQLAlchemy ORM - Location: `src/gps_denied/db/` - Contains: `FlightRepository`, ORM models (`FlightRow`, `WaypointRow`, `GeofenceRow`, `FlightStateRow`, `FrameResultRow`, `HeadingRow`, `ImageRow`, `ChunkRow`) - Depends on: SQLAlchemy async engine - Used by: `FlightProcessor`, `ResultManager`, API deps **Schema Layer:** - Purpose: Pydantic models for validation and inter-component data contracts - Location: `src/gps_denied/schemas/` - Contains: Domain models (`GPSPoint`, `CameraParameters`), request/response schemas, VO/GPR/metric/satellite/rotation/chunk schemas, SSE event types - Depends on: Nothing internal - Used by: All layers ## Data Flow **Frame Processing (primary path):** 1. Client uploads image batch → `POST /flights/{id}/images/batch` 2. Router spawns `asyncio.create_task(_process_batch())`, returns 202 immediately 3. `_process_batch` calls `processor.process_frame(flight_id, frame_id, image)` per image 4. `FlightProcessor.process_frame`: a. Calls `SequentialVisualOdometry.compute_relative_pose(prev, curr, cam)` b. If VO succeeds: adds relative factor to `FactorGraphOptimizer` c. State machine: NORMAL → LOST (on VO failure) → RECOVERY → NORMAL (on recovery) d. On RECOVERY: `FailureRecoveryCoordinator.process_chunk_recovery()` calls GPR + MetricRefinement e. In NORMAL: calls `GlobalPlaceRecognition.retrieve_candidate_tiles()` then `MetricRefinement.align_to_satellite()` f. Runs incremental `FactorGraphOptimizer.optimize()` g. Publishes `FrameResult` via `SSEEventStreamer.push_event()` 5. SSE clients receive real-time frame events **Tracking Loss / Chunk Recovery:** 1. VO fails → `processor._flight_states[id] = LOST` 2. `FailureRecoveryCoordinator.handle_tracking_lost()` creates new chunk via `RouteChunkManager` 3. Next frame enters RECOVERY: `process_chunk_recovery()` runs GPR on chunk images 4. GPR finds candidate tiles → `MetricRefinement.align_chunk_to_satellite()` computes homography 5. If aligned: chunk anchored, state → NORMAL 6. If not aligned: chunk stays UNANCHORED, state stays RECOVERY **Satellite Tile Fetch:** 1. `SatelliteDataManager.fetch_tile()` checks `diskcache` first 2. On miss: fetches from `https://mt1.google.com/vt/lyrs=s&x=...` via httpx 3. Decoded to numpy array, stored in diskcache 4. `fetch_tile_grid()` and `prefetch_route_corridor()` do parallel async fetches **State Management:** - Per-flight tracking state held in `FlightProcessor._flight_states: dict[str, TrackingState]` - Per-flight previous frame cache in `FlightProcessor._prev_images: dict[str, np.ndarray]` - Per-flight chunk state in `RouteChunkManager._chunks: dict[str, dict[str, ChunkHandle]]` - Per-flight factor graph in `FactorGraphOptimizer._flights_state: dict[str, dict]` - Per-flight SSE queues in `SSEEventStreamer._streams: dict[str, dict[str, Queue]]` - All persistent state (waypoints, frame results, flight metadata) in SQLite via `FlightRepository` ## Key Abstractions **TrackingState (State Machine):** - Purpose: Three-state machine per flight controlling pipeline branch selection - Location: `src/gps_denied/core/processor.py` - States: NORMAL (VO active + drift correction) → LOST (VO failed, chunk created) → RECOVERY (GPR + metric) → NORMAL - Note: Simplified vs. documented 5-state design; no IMU-only prediction state **IModelManager / MockInferenceEngine:** - Purpose: Decouples inference calls from model backend; enables mock-first development - Location: `src/gps_denied/core/models.py` - Pattern: All models auto-loaded as `MockInferenceEngine` when first accessed; no real TRT/ONNX loading - Mock models: SuperPoint (500 random features), LightGlue (100 random matches), DINOv2 (4096-dim random descriptor), LiteSAM (random homography, 80% match probability) **ChunkHandle / RouteChunkManager:** - Purpose: Represents a disconnected trajectory segment between tracking losses - Location: `src/gps_denied/core/chunk_manager.py` - Lifecycle: UNANCHORED → MATCHING → ANCHORED or UNANCHORED → MERGED **FactorGraphOptimizer:** - Purpose: Maintains per-flight pose graph with relative (VO) and absolute (GPS/satellite) factors - Location: `src/gps_denied/core/graph.py` - Reality: GTSAM import is optional (`try: import gtsam`); concrete implementation is a mock using simple vector arithmetic ## Entry Points **Application startup:** - Location: `src/gps_denied/app.py` - Triggers: `uvicorn` or `python -m gps_denied` (via `src/gps_denied/__main__.py`) - Responsibilities: Creates FastAPI app, registers `/flights` router, wires lifespan (instantiates all pipeline components, stores on `app.state.pipeline_components`) **Frame processing:** - Location: `src/gps_denied/api/routers/flights.py` → `upload_image_batch` - Triggers: `POST /flights/{id}/images/batch` multipart form - Responsibilities: Validates batch, spawns background task, each frame calls `processor.process_frame()` **SSE stream:** - Location: `src/gps_denied/api/routers/flights.py` → `create_sse_stream` - Triggers: `GET /flights/{id}/stream` - Responsibilities: Returns `EventSourceResponse` wrapping async generator from `SSEEventStreamer` ## Error Handling **Strategy:** Exception swallowing in processor with `logger.warning`; most component failures are non-fatal **Patterns:** - VO failure: caught with `except Exception as exc`, logged, `vo_ok = False` → state machine handles - Drift correction failure: caught with `except Exception as exc`, logged, frame continues without correction - HTTP errors in satellite fetching: `httpx.HTTPError` caught, returns `None` (tile treated as missing) - DB not-found: returns `None`, router converts to HTTP 404 - Batch upload errors: HTTP 422 with detail string ## Cross-Cutting Concerns **Logging:** Standard `logging.getLogger(__name__)` in every module; no structured logging or log levels configuration in code **Validation:** Pydantic models at API boundary; no internal validation between pipeline components **Authentication:** Documented as JWT in solution spec; **not implemented** in code — no auth middleware, no JWT verification on any endpoint **Coordinate System:** `CoordinateTransformer` (`src/gps_denied/core/coordinates.py`) handles ENU↔GPS conversion with real math; `pixel_to_gps` is a placeholder with fake scaling (1px = 0.1m) **ESKF / MAVLink / cuVSLAM:** **Not present in code.** The solution document specifies all three in detail, but the codebase contains none of them. The implemented architecture is a ground-processing post-flight pipeline (images uploaded via REST), not the real-time onboard ESKF+cuVSLAM system described in `solution.md`. ## Divergence: Documented Design vs. Implemented Code This is a critical architectural gap. The solution document describes a **real-time embedded system**; the code implements a **batch REST processing service**: | Aspect | solution.md (documented) | Code (implemented) | |--------|--------------------------|---------------------| | Processing model | Real-time, 0.7fps camera stream | Batch HTTP upload, async background task | | State estimator | ESKF (15-state, IMU-driven 5-10Hz) | FactorGraphOptimizer (mock GTSAM/pose graph) | | Visual odometry | cuVSLAM Inertial mode | SuperPoint + LightGlue (mocked) via SequentialVisualOdometry | | Satellite matching | LiteSAM/XFeat TRT Engine FP16 | LiteSAM via MockInferenceEngine (random homography) | | Place recognition | Not mentioned as separate component | AnyLoc DINOv2 (GlobalPlaceRecognition, mocked) | | GPS output | MAVLink GPS_INPUT via pymavlink UART | None — GPS positions computed but not sent anywhere | | FC integration | pymavlink over UART | Not present | | CUDA streams | Dual CUDA streams (Stream A/B) | Not present | | Deployment | Jetson Orin Nano Super, systemd service | Local dev server (uvicorn, SQLite) | | Auth | JWT on all endpoints | Not implemented | The code is TRL ~2 for the actual target system. It is a functional prototype of the processing logic with all AI inference mocked. --- *Architecture analysis: 2026-04-01*