7 structured documents covering stack, integrations, architecture, structure, conventions, testing, and concerns. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
10 KiB
Architecture
Analysis Date: 2026-04-01
Pattern Overview
Overall: Layered async service with component-injected processing pipeline
Key Characteristics:
- FastAPI HTTP layer delegates all logic to a singleton
FlightProcessororchestrator - Core processing components are instantiated at app startup via lifespan and injected via
attach_components() - All components define ABC interfaces (
ISequentialVisualOdometry,IFactorGraphOptimizer, etc.) with a single concrete implementation — enabling future substitution - All inference engines are mocked behind
IModelManager/MockInferenceEngine; no real GPU/TRT execution exists in code yet - Database layer is async SQLAlchemy (aiosqlite default) with a thin
FlightRepositoryDAO - SSE streaming is fully wired: per-flight async queues,
EventSourceResponseatGET /flights/{id}/stream
Layers
API Layer:
- Purpose: HTTP request routing, validation, auth-free (no JWT in code despite spec)
- Location:
src/gps_denied/api/ - Contains: FastAPI routers, dependency injection wiring,
deps.pysingletons - Depends on:
FlightProcessor,FlightRepository,SSEEventStreamer - Used by: External callers, other onboard systems
Orchestration Layer:
- Purpose: Manages per-flight state machine, invokes pipeline components in sequence
- Location:
src/gps_denied/core/processor.py - Contains:
FlightProcessor,TrackingStateenum (NORMAL/LOST/RECOVERY),FrameResult - Depends on: All core components,
FlightRepository,SSEEventStreamer - Used by: API layer via dependency injection
Core Pipeline Components:
- Purpose: Individual processing stages, each behind an interface
- Location:
src/gps_denied/core/ - Contains:
ImageInputPipeline,SequentialVisualOdometry,GlobalPlaceRecognition,MetricRefinement,FactorGraphOptimizer,RouteChunkManager,FailureRecoveryCoordinator,ImageRotationManager,CoordinateTransformer,ResultManager,SSEEventStreamer,SatelliteDataManager,ModelManager - Depends on:
IModelManager,FlightRepository(some), schemas - Used by:
FlightProcessor
Inference Layer:
- Purpose: AI model lifecycle and inference dispatch
- Location:
src/gps_denied/core/models.py - Contains:
IModelManager,ModelManager,MockInferenceEngine - Depends on:
schemas/model.py - Used by:
SequentialVisualOdometry,GlobalPlaceRecognition,MetricRefinement
Database Layer:
- Purpose: Async persistence, all SQL via SQLAlchemy ORM
- Location:
src/gps_denied/db/ - Contains:
FlightRepository, ORM models (FlightRow,WaypointRow,GeofenceRow,FlightStateRow,FrameResultRow,HeadingRow,ImageRow,ChunkRow) - Depends on: SQLAlchemy async engine
- Used by:
FlightProcessor,ResultManager, API deps
Schema Layer:
- Purpose: Pydantic models for validation and inter-component data contracts
- Location:
src/gps_denied/schemas/ - Contains: Domain models (
GPSPoint,CameraParameters), request/response schemas, VO/GPR/metric/satellite/rotation/chunk schemas, SSE event types - Depends on: Nothing internal
- Used by: All layers
Data Flow
Frame Processing (primary path):
- Client uploads image batch →
POST /flights/{id}/images/batch - Router spawns
asyncio.create_task(_process_batch()), returns 202 immediately _process_batchcallsprocessor.process_frame(flight_id, frame_id, image)per imageFlightProcessor.process_frame: a. CallsSequentialVisualOdometry.compute_relative_pose(prev, curr, cam)b. If VO succeeds: adds relative factor toFactorGraphOptimizerc. State machine: NORMAL → LOST (on VO failure) → RECOVERY → NORMAL (on recovery) d. On RECOVERY:FailureRecoveryCoordinator.process_chunk_recovery()calls GPR + MetricRefinement e. In NORMAL: callsGlobalPlaceRecognition.retrieve_candidate_tiles()thenMetricRefinement.align_to_satellite()f. Runs incrementalFactorGraphOptimizer.optimize()g. PublishesFrameResultviaSSEEventStreamer.push_event()- SSE clients receive real-time frame events
Tracking Loss / Chunk Recovery:
- VO fails →
processor._flight_states[id] = LOST FailureRecoveryCoordinator.handle_tracking_lost()creates new chunk viaRouteChunkManager- Next frame enters RECOVERY:
process_chunk_recovery()runs GPR on chunk images - GPR finds candidate tiles →
MetricRefinement.align_chunk_to_satellite()computes homography - If aligned: chunk anchored, state → NORMAL
- If not aligned: chunk stays UNANCHORED, state stays RECOVERY
Satellite Tile Fetch:
SatelliteDataManager.fetch_tile()checksdiskcachefirst- On miss: fetches from
https://mt1.google.com/vt/lyrs=s&x=...via httpx - Decoded to numpy array, stored in diskcache
fetch_tile_grid()andprefetch_route_corridor()do parallel async fetches
State Management:
- Per-flight tracking state held in
FlightProcessor._flight_states: dict[str, TrackingState] - Per-flight previous frame cache in
FlightProcessor._prev_images: dict[str, np.ndarray] - Per-flight chunk state in
RouteChunkManager._chunks: dict[str, dict[str, ChunkHandle]] - Per-flight factor graph in
FactorGraphOptimizer._flights_state: dict[str, dict] - Per-flight SSE queues in
SSEEventStreamer._streams: dict[str, dict[str, Queue]] - All persistent state (waypoints, frame results, flight metadata) in SQLite via
FlightRepository
Key Abstractions
TrackingState (State Machine):
- Purpose: Three-state machine per flight controlling pipeline branch selection
- Location:
src/gps_denied/core/processor.py - States: NORMAL (VO active + drift correction) → LOST (VO failed, chunk created) → RECOVERY (GPR + metric) → NORMAL
- Note: Simplified vs. documented 5-state design; no IMU-only prediction state
IModelManager / MockInferenceEngine:
- Purpose: Decouples inference calls from model backend; enables mock-first development
- Location:
src/gps_denied/core/models.py - Pattern: All models auto-loaded as
MockInferenceEnginewhen first accessed; no real TRT/ONNX loading - Mock models: SuperPoint (500 random features), LightGlue (100 random matches), DINOv2 (4096-dim random descriptor), LiteSAM (random homography, 80% match probability)
ChunkHandle / RouteChunkManager:
- Purpose: Represents a disconnected trajectory segment between tracking losses
- Location:
src/gps_denied/core/chunk_manager.py - Lifecycle: UNANCHORED → MATCHING → ANCHORED or UNANCHORED → MERGED
FactorGraphOptimizer:
- Purpose: Maintains per-flight pose graph with relative (VO) and absolute (GPS/satellite) factors
- Location:
src/gps_denied/core/graph.py - Reality: GTSAM import is optional (
try: import gtsam); concrete implementation is a mock using simple vector arithmetic
Entry Points
Application startup:
- Location:
src/gps_denied/app.py - Triggers:
uvicornorpython -m gps_denied(viasrc/gps_denied/__main__.py) - Responsibilities: Creates FastAPI app, registers
/flightsrouter, wires lifespan (instantiates all pipeline components, stores onapp.state.pipeline_components)
Frame processing:
- Location:
src/gps_denied/api/routers/flights.py→upload_image_batch - Triggers:
POST /flights/{id}/images/batchmultipart form - Responsibilities: Validates batch, spawns background task, each frame calls
processor.process_frame()
SSE stream:
- Location:
src/gps_denied/api/routers/flights.py→create_sse_stream - Triggers:
GET /flights/{id}/stream - Responsibilities: Returns
EventSourceResponsewrapping async generator fromSSEEventStreamer
Error Handling
Strategy: Exception swallowing in processor with logger.warning; most component failures are non-fatal
Patterns:
- VO failure: caught with
except Exception as exc, logged,vo_ok = False→ state machine handles - Drift correction failure: caught with
except Exception as exc, logged, frame continues without correction - HTTP errors in satellite fetching:
httpx.HTTPErrorcaught, returnsNone(tile treated as missing) - DB not-found: returns
None, router converts to HTTP 404 - Batch upload errors: HTTP 422 with detail string
Cross-Cutting Concerns
Logging: Standard logging.getLogger(__name__) in every module; no structured logging or log levels configuration in code
Validation: Pydantic models at API boundary; no internal validation between pipeline components
Authentication: Documented as JWT in solution spec; not implemented in code — no auth middleware, no JWT verification on any endpoint
Coordinate System: CoordinateTransformer (src/gps_denied/core/coordinates.py) handles ENU↔GPS conversion with real math; pixel_to_gps is a placeholder with fake scaling (1px = 0.1m)
ESKF / MAVLink / cuVSLAM: Not present in code. The solution document specifies all three in detail, but the codebase contains none of them. The implemented architecture is a ground-processing post-flight pipeline (images uploaded via REST), not the real-time onboard ESKF+cuVSLAM system described in solution.md.
Divergence: Documented Design vs. Implemented Code
This is a critical architectural gap. The solution document describes a real-time embedded system; the code implements a batch REST processing service:
| Aspect | solution.md (documented) | Code (implemented) |
|---|---|---|
| Processing model | Real-time, 0.7fps camera stream | Batch HTTP upload, async background task |
| State estimator | ESKF (15-state, IMU-driven 5-10Hz) | FactorGraphOptimizer (mock GTSAM/pose graph) |
| Visual odometry | cuVSLAM Inertial mode | SuperPoint + LightGlue (mocked) via SequentialVisualOdometry |
| Satellite matching | LiteSAM/XFeat TRT Engine FP16 | LiteSAM via MockInferenceEngine (random homography) |
| Place recognition | Not mentioned as separate component | AnyLoc DINOv2 (GlobalPlaceRecognition, mocked) |
| GPS output | MAVLink GPS_INPUT via pymavlink UART | None — GPS positions computed but not sent anywhere |
| FC integration | pymavlink over UART | Not present |
| CUDA streams | Dual CUDA streams (Stream A/B) | Not present |
| Deployment | Jetson Orin Nano Super, systemd service | Local dev server (uvicorn, SQLite) |
| Auth | JWT on all endpoints | Not implemented |
The code is TRL ~2 for the actual target system. It is a functional prototype of the processing logic with all AI inference mocked.
Architecture analysis: 2026-04-01