import asyncio import json import socket import struct import threading import msgpack from msgpack import packb from rstream import Producer, Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext cimport constants from api_client cimport ApiClient from processor_command cimport FileCommand, ProcessorType from annotation cimport Annotation cdef class QueueConfig: cdef str host, cdef int port cdef str producer_user, producer_pw, consumer_user, consumer_pw @staticmethod cdef QueueConfig from_json(str json_string): cdef dict config_dict = json.loads(json_string)["QueueConfig"] cdef QueueConfig config = QueueConfig() config.host = config_dict["Host"] config.port = config_dict["Port"] config.producer_user = config_dict["ProducerUsername"] config.producer_pw = config_dict["ProducerPassword"] config.consumer_user = config_dict["ConsumerUsername"] config.consumer_pw = config_dict["ConsumerPassword"] return config cdef class SocketHandler: """Handles socket communication with size-prefixed messages.""" def __init__(self, object on_message): self.on_message = on_message self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.bind((constants.SOCKET_HOST, constants.SOCKET_PORT)) self._socket.listen(1) cdef start(self): threading.Thread(target=self.start_inner, daemon=True).start() cdef start_inner(self): while True: self._connection, client_address = self._socket.accept() size_data = self._connection.recv(4) if not size_data: raise ConnectionError("Connection closed while reading size prefix.") data_size = struct.unpack('>I', size_data)[0] # Read the full message data = b"" while len(data) < data_size: packet = self._socket.recv(data_size - len(data)) if not packet: raise ConnectionError("Connection closed while reading data.") data += packet cmd = FileCommand.from_msgpack(data, ProcessorType.SOCKET) self.on_message(cmd) cdef send(self, list[Annotation] message): data = msgpack.packb(message) size_prefix = len(data).to_bytes(4, 'big') self._connection.sendall(size_prefix + data) cdef close(self): if self._socket: self._socket.close() self._socket = None cdef class RabbitHandler: def __init__(self, ApiClient api_client, object on_message): self.on_message = on_message cdef str config_str = api_client.load_queue_config() queue_config = QueueConfig.from_json(config_str) self.annotation_producer = Producer( host=queue_config.host, port=queue_config.port, username=queue_config.producer_user, password=queue_config.producer_pw ) self.command_consumer = Consumer( host=queue_config.host, port=queue_config.port, username=queue_config.consumer_user, password=queue_config.consumer_pw ) cdef start(self): threading.Thread(target=self._run_async, daemon=True).start() def _run_async(self): asyncio.run(self.start_inner()) async def start_inner(self): await self.command_consumer.start() await self.command_consumer.subscribe(stream=constants.COMMANDS_QUEUE, callback=self.on_message_inner, offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None)) # put real offset def on_message_inner(self, message: AMQPMessage, message_context: MessageContext): cdef bytes body = message.body cmd = FileCommand.from_msgpack(body, ProcessorType.RABBIT) self.on_message(cmd) cdef send(self, object message): packed_message = AMQPMessage(body=packb(message)) self.annotation_producer.send(constants.ANNOTATIONS_QUEUE, packed_message) cdef close(self): if self.annotation_producer: self.annotation_producer.close() if self.command_consumer: self.command_consumer.close()