mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 22:46:36 +00:00
94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
import zmq
|
|
import json
|
|
import time
|
|
import logging
|
|
from threading import Thread
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("ZmqBackgroundService")
|
|
|
|
class AuraBackgroundService:
|
|
"""
|
|
Headless ZeroMQ Service for AURA.
|
|
Handles REP/REQ for commands/user input and PUB for asynchronous streaming of results.
|
|
"""
|
|
def __init__(self, rep_port: int = 5555, pub_port: int = 5556):
|
|
self.context = zmq.Context()
|
|
|
|
# Command Socket (REP)
|
|
self.rep_socket = self.context.socket(zmq.REP)
|
|
self.rep_socket.bind(f"tcp://*:{rep_port}")
|
|
|
|
# Data Stream Socket (PUB)
|
|
self.pub_socket = self.context.socket(zmq.PUB)
|
|
self.pub_socket.bind(f"tcp://*:{pub_port}")
|
|
|
|
self.running = False
|
|
self.processing_thread = None
|
|
|
|
def start(self):
|
|
self.running = True
|
|
self.processing_thread = Thread(target=self._listen_for_commands, daemon=True)
|
|
self.processing_thread.start()
|
|
logger.info("AURA ZeroMQ Background Service Started.")
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if self.processing_thread:
|
|
self.processing_thread.join()
|
|
self.rep_socket.close()
|
|
self.pub_socket.close()
|
|
self.context.term()
|
|
logger.info("Service Stopped.")
|
|
|
|
def _listen_for_commands(self):
|
|
while self.running:
|
|
try:
|
|
# Non-blocking receive
|
|
if self.rep_socket.poll(100):
|
|
message = self.rep_socket.recv_json()
|
|
response = self._handle_command(message)
|
|
self.rep_socket.send_json(response)
|
|
except Exception as e:
|
|
logger.error(f"Error processing command: {e}")
|
|
|
|
def _handle_command(self, msg: dict) -> dict:
|
|
cmd = msg.get("cmd")
|
|
logger.info(f"Received command: {cmd}")
|
|
|
|
if cmd == "START":
|
|
# In a real integration, this connects to the ingest pipeline
|
|
return {"status": "SUCCESS", "message": "Pipeline started"}
|
|
|
|
elif cmd == "USER_FIX":
|
|
# Fulfills AC-6: User Input for the next image location
|
|
lat, lon = msg.get("lat"), msg.get("lon")
|
|
frame_id = msg.get("frame_id")
|
|
# Pass hard constraint to Factor Graph Optimizer here
|
|
return {"status": "SUCCESS", "message": f"User fix applied at frame {frame_id}"}
|
|
|
|
elif cmd == "STATUS":
|
|
return {"status": "SUCCESS", "state": "ACTIVE"}
|
|
|
|
return {"status": "ERROR", "message": "Unknown command"}
|
|
|
|
def publish_pose(self, frame_id: int, lat: float, lon: float, confidence: float, is_refined: bool = False):
|
|
"""
|
|
Streams results asynchronously. Fulfills AC-8 (Immediate results + async refinement).
|
|
"""
|
|
payload = {
|
|
"frame_id": frame_id,
|
|
"gps": [lat, lon],
|
|
"confidence": confidence,
|
|
"type": "REFINED" if is_refined else "ESTIMATED",
|
|
"timestamp": time.time()
|
|
}
|
|
self.pub_socket.send_string(f"POSE {json.dumps(payload)}")
|
|
|
|
if __name__ == "__main__":
|
|
service = AuraBackgroundService()
|
|
service.start()
|
|
try:
|
|
while True: time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
service.stop() |