Files
gps-denied-onboard/zmq_background_service.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

94 lines
3.3 KiB
Python

import zmq
import json
import time
import logging
from threading import Thread
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ZmqBackgroundService")
class AuraBackgroundService:
"""
Headless ZeroMQ Service for AURA.
Handles REP/REQ for commands/user input and PUB for asynchronous streaming of results.
"""
def __init__(self, rep_port: int = 5555, pub_port: int = 5556):
self.context = zmq.Context()
# Command Socket (REP)
self.rep_socket = self.context.socket(zmq.REP)
self.rep_socket.bind(f"tcp://*:{rep_port}")
# Data Stream Socket (PUB)
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_socket.bind(f"tcp://*:{pub_port}")
self.running = False
self.processing_thread = None
def start(self):
self.running = True
self.processing_thread = Thread(target=self._listen_for_commands, daemon=True)
self.processing_thread.start()
logger.info("AURA ZeroMQ Background Service Started.")
def stop(self):
self.running = False
if self.processing_thread:
self.processing_thread.join()
self.rep_socket.close()
self.pub_socket.close()
self.context.term()
logger.info("Service Stopped.")
def _listen_for_commands(self):
while self.running:
try:
# Non-blocking receive
if self.rep_socket.poll(100):
message = self.rep_socket.recv_json()
response = self._handle_command(message)
self.rep_socket.send_json(response)
except Exception as e:
logger.error(f"Error processing command: {e}")
def _handle_command(self, msg: dict) -> dict:
cmd = msg.get("cmd")
logger.info(f"Received command: {cmd}")
if cmd == "START":
# In a real integration, this connects to the ingest pipeline
return {"status": "SUCCESS", "message": "Pipeline started"}
elif cmd == "USER_FIX":
# Fulfills AC-6: User Input for the next image location
lat, lon = msg.get("lat"), msg.get("lon")
frame_id = msg.get("frame_id")
# Pass hard constraint to Factor Graph Optimizer here
return {"status": "SUCCESS", "message": f"User fix applied at frame {frame_id}"}
elif cmd == "STATUS":
return {"status": "SUCCESS", "state": "ACTIVE"}
return {"status": "ERROR", "message": "Unknown command"}
def publish_pose(self, frame_id: int, lat: float, lon: float, confidence: float, is_refined: bool = False):
"""
Streams results asynchronously. Fulfills AC-8 (Immediate results + async refinement).
"""
payload = {
"frame_id": frame_id,
"gps": [lat, lon],
"confidence": confidence,
"type": "REFINED" if is_refined else "ESTIMATED",
"timestamp": time.time()
}
self.pub_socket.send_string(f"POSE {json.dumps(payload)}")
if __name__ == "__main__":
service = AuraBackgroundService()
service.start()
try:
while True: time.sleep(1)
except KeyboardInterrupt:
service.stop()