# cython: language_level=3 import json import socket import struct import threading import msgpack from msgpack import packb from rstream import Producer, Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext import constants from api_client import ApiClient from processor_command import FileCommand, ProcessorType cdef class QueueConfig: cdef str host cdef int port cdef str producer_user cdef str producer_pw cdef str consumer_user cdef str consumer_pw @staticmethod cdef QueueConfig from_json(str json_string): cdef dict config_dict = json.loads(json_string) cdef QueueConfig config = QueueConfig() config.Host = config_dict["Host"] config.Port = config_dict["Port"] config.ProducerUsername = config_dict["ProducerUsername"] config.ProducerPassword = config_dict["ProducerPassword"] config.ConsumerUsername = config_dict["ConsumerUsername"] config.ConsumerPassword = config_dict["ConsumerPassword"] return config cdef class SocketHandler: """Handles socket communication with size-prefixed messages.""" def __init__(self, on_message): self.on_message = on_message self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((constants.SOCKET_HOST, constants.SOCKET_PORT)) self._socket.listen(1) cdef start(self): threading.Thread(target=self.start, daemon=True).start() cdef start_inner(self): while True: self._connection, client_address = self._socket.accept() size_data = self._connection.recv(4) if not size_data: raise ConnectionError("Connection closed while reading size prefix.") data_size = struct.unpack('>I', size_data)[0] # Read the full message data = b"" while len(data) < data_size: packet = self._socket.recv(data_size - len(data)) if not packet: raise ConnectionError("Connection closed while reading data.") data += packet cmd = FileCommand.from_msgpack(data, ProcessorType.SOCKET) self.on_message(cmd) async def send(self, object message): data = msgpack.packb(message) size_prefix = len(data).to_bytes(4, 'big') self._connection.sendall(size_prefix + data) def close(self): if self._socket: self._socket.close() self._socket = None cdef class RabbitHandler: cdef str hardware_hash def __init__(self, config_filename, on_message): self.on_message = on_message cdef str config_str = ApiClient().load_file(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8') self.queue_config = QueueConfig.from_json(config_str) self.annotation_producer = Producer( host=self.queue_config.host, port=self.queue_config.port, username=self.queue_config.producer_user, password=self.queue_config.producer_pw ) self.command_consumer = Consumer( host=self.queue_config.host, port=self.queue_config.port, username=self.queue_config.consumer_user, password=self.queue_config.consumer_pw ) cdef start(self): self.command_consumer.start() self.command_consumer.subscribe(stream=constants.COMMANDS_QUEUE, callback=self.on_message_inner, offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None)) # put real offset cdef on_message_inner(self, message: AMQPMessage, message_context: MessageContext): cdef bytes body = message.body cmd = FileCommand.from_msgpack(body, ProcessorType.RABBIT) self.on_message(cmd) cpdef send(self, object message): packed_message = AMQPMessage(body=packb(message)) self.annotation_producer.send(constants.ANNOTATIONS_QUEUE, packed_message) async def close(self): if self.annotation_producer: await self.annotation_producer.close() if self.command_consumer: await self.command_consumer.close()