Files
gps-denied-onboard/src/gps_denied/app.py
T
Yuzviak 0bb94da3c4 feat(01-08): rewire app.py lifespan and deps.py to use build_pipeline
- app.py: replace inline component wiring with build_pipeline(env=cfg.env)
  - Store processor as app.state.processor (and backwards-compat pipeline_components)
  - RuntimeConfig replaces get_settings(); MAVLink stop() on shutdown
- deps.py: get_flight_processor prefers app.state.processor from lifespan
  - Falls back to build_pipeline() for test contexts without lifespan
  - Per-request repo/streamer swap preserved
2026-05-11 09:04:56 +03:00

77 lines
2.1 KiB
Python

"""FastAPI application factory."""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from gps_denied import __version__
from gps_denied.api.routers import flights
from gps_denied.config import RuntimeConfig
from gps_denied.pipeline import build_pipeline
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialise core pipeline components on startup via build_pipeline."""
cfg = RuntimeConfig()
processor = build_pipeline(env=cfg.env, config=cfg)
# Retrieve MAVLink bridge from processor internals for lifecycle management
mavlink = processor._mavlink
app.state.processor = processor
app.state.config = cfg
# Keep backwards-compat key so any code reading pipeline_components still works
app.state.pipeline_components = {
"vo": processor._vo,
"gpr": processor._gpr,
"metric": processor._metric,
"graph": processor._graph,
"recovery": processor._recovery,
"chunk_mgr": processor._chunk_mgr,
"rotation": processor._rotation,
"coord": processor._coord,
"satellite": processor._satellite,
"mavlink": mavlink,
}
app.state.eskf_config = processor._eskf_config
logger.info(
"Pipeline ready — env=%s, MAVLink: %s, tiles: %s",
cfg.env, cfg.mavlink.connection, cfg.satellite.tile_dir,
)
yield
# Cleanup MAVLink on shutdown
if mavlink is not None:
try:
await mavlink.stop()
except Exception:
pass
app.state.pipeline_components = None
def create_app() -> FastAPI:
"""Factory function to create and configure the FastAPI application."""
app = FastAPI(
title="GPS-Denied Onboard API",
description="REST API for UAV Flight Processing in GPS-denied environments.",
version=__version__,
lifespan=lifespan,
)
app.include_router(flights.router)
@app.get("/health", tags=["Health"])
async def health() -> dict[str, str]:
"""Simple health check endpoint."""
return {"status": "ok"}
return app
app = create_app()