import asyncio import json import time import logging from fastapi import FastAPI, Request from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel import uvicorn logging.basicConfig(level=logging.INFO) logger = logging.getLogger("RestSseService") app = FastAPI(title="AURA Geolocalization API") # Shared asynchronous event queue for SSE broadcasting pose_event_queue = asyncio.Queue() class UserInputFix(BaseModel): frame_id: int lat: float lon: float @app.post("/api/v1/start") async def start_pipeline(): """Starts the ingestion and processing pipeline.""" logger.info("Pipeline started via REST API.") # Integration trigger to start the main AI loop goes here return {"status": "SUCCESS", "message": "Processing pipeline initialized."} @app.post("/api/v1/user_fix") async def user_fix(fix: UserInputFix): """Accepts user input for location when system is absolutely incapable (AC-6).""" logger.info(f"Received user fix for frame {fix.frame_id}: Lat {fix.lat}, Lon {fix.lon}") # Pass this hard geodetic constraint to the Factor Graph Optimizer return {"status": "SUCCESS", "message": f"User fix applied for frame {fix.frame_id}."} @app.get("/api/v1/stream") async def stream_poses(request: Request): """ Server-Sent Events (SSE) endpoint for real-time streaming of poses. Fulfills the requirement for immediate results and refined results. """ async def event_generator(): while True: if await request.is_disconnected(): logger.info("Client disconnected from SSE stream.") break # Wait for the next pose event from the processing engine pose_data = await pose_event_queue.get() yield { "event": "pose_update", "id": str(pose_data.get("frame_id")), "retry": 15000, "data": json.dumps(pose_data) } return EventSourceResponse(event_generator()) async def publish_pose_to_stream(frame_id: int, lat: float, lon: float, confidence: float, is_refined: bool = False): """Helper function to push a new pose into the SSE queue from the Optimizer.""" payload = { "frame_id": frame_id, "gps": [lat, lon], "confidence": confidence, "type": "REFINED" if is_refined else "ESTIMATED", "timestamp": time.time() } await pose_event_queue.put(payload) if __name__ == "__main__": # Run the background service (typically deployed via a worker process) uvicorn.run(app, host="0.0.0.0", port=8000)