mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 23:46:37 +00:00
73 lines
2.6 KiB
Python
73 lines
2.6 KiB
Python
import asyncio
|
|
import json
|
|
import time
|
|
import logging
|
|
from fastapi import FastAPI, Request
|
|
from sse_starlette.sse import EventSourceResponse
|
|
from pydantic import BaseModel
|
|
import uvicorn
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("RestSseService")
|
|
|
|
app = FastAPI(title="AURA Geolocalization API")
|
|
|
|
# Shared asynchronous event queue for SSE broadcasting
|
|
pose_event_queue = asyncio.Queue()
|
|
|
|
class UserInputFix(BaseModel):
|
|
frame_id: int
|
|
lat: float
|
|
lon: float
|
|
|
|
@app.post("/api/v1/start")
|
|
async def start_pipeline():
|
|
"""Starts the ingestion and processing pipeline."""
|
|
logger.info("Pipeline started via REST API.")
|
|
# Integration trigger to start the main AI loop goes here
|
|
return {"status": "SUCCESS", "message": "Processing pipeline initialized."}
|
|
|
|
@app.post("/api/v1/user_fix")
|
|
async def user_fix(fix: UserInputFix):
|
|
"""Accepts user input for location when system is absolutely incapable (AC-6)."""
|
|
logger.info(f"Received user fix for frame {fix.frame_id}: Lat {fix.lat}, Lon {fix.lon}")
|
|
# Pass this hard geodetic constraint to the Factor Graph Optimizer
|
|
return {"status": "SUCCESS", "message": f"User fix applied for frame {fix.frame_id}."}
|
|
|
|
@app.get("/api/v1/stream")
|
|
async def stream_poses(request: Request):
|
|
"""
|
|
Server-Sent Events (SSE) endpoint for real-time streaming of poses.
|
|
Fulfills the requirement for immediate results and refined results.
|
|
"""
|
|
async def event_generator():
|
|
while True:
|
|
if await request.is_disconnected():
|
|
logger.info("Client disconnected from SSE stream.")
|
|
break
|
|
|
|
# Wait for the next pose event from the processing engine
|
|
pose_data = await pose_event_queue.get()
|
|
yield {
|
|
"event": "pose_update",
|
|
"id": str(pose_data.get("frame_id")),
|
|
"retry": 15000,
|
|
"data": json.dumps(pose_data)
|
|
}
|
|
|
|
return EventSourceResponse(event_generator())
|
|
|
|
async def publish_pose_to_stream(frame_id: int, lat: float, lon: float, confidence: float, is_refined: bool = False):
|
|
"""Helper function to push a new pose into the SSE queue from the Optimizer."""
|
|
payload = {
|
|
"frame_id": frame_id,
|
|
"gps": [lat, lon],
|
|
"confidence": confidence,
|
|
"type": "REFINED" if is_refined else "ESTIMATED",
|
|
"timestamp": time.time()
|
|
}
|
|
await pose_event_queue.put(payload)
|
|
|
|
if __name__ == "__main__":
|
|
# Run the background service (typically deployed via a worker process)
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |