import yaml import asyncio from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext import constants from dto.annotation_created_message import AnnotationCreatedMessage from utils import Dotdict class AnnotationQueueHandler: def __init__(self): with open(constants.CONFIG_FILE, "r") as f: config_dict = yaml.safe_load(f) d_config = Dotdict(config_dict) queue_config = Dotdict(d_config.queue) self.consumer = Consumer( host=queue_config.host, port=queue_config.port, username=queue_config.consumer_user, password=queue_config.consumer_pw ) try: with open(constants.OFFSET_FILE, 'r') as f: offset_dict = yaml.safe_load(f) self.offset_queue = offset_dict['offset_queue'] self.offset_confirmed_queue = offset_dict['offset_confirmed_queue'] except (FileNotFoundError, ValueError): with open(constants.OFFSET_FILE, 'w') as f: f.writelines(['offset_queue: 0', 'offset_confirmed_queue: 0']) self.offset_queue = 0 self.offset_confirmed_queue = 0 def on_message(self, message: AMQPMessage, context: MessageContext, is_seed: bool): annotation_message = AnnotationCreatedMessage(message) print(f'Received: {annotation_message}') save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir annotation_message.save_annotation(save_folder) if is_seed: self.offset_queue = context.offset else: self.offset_confirmed_queue = context.offset with open(constants.OFFSET_FILE, 'w') as offset_file: yaml.dump({'offset_queue': self.offset_queue, 'offset_confirmed_queue': self.offset_confirmed_queue}, offset_file) async def start(self): await self.consumer.start() await self.consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: self.on_message(msg, context, is_seed=True), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue)) await self.consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: self.on_message(msg, context, is_seed=False), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_confirmed_queue)) try: await self.consumer.run() except (KeyboardInterrupt, asyncio.CancelledError): print("Closing Consumer...") return if __name__ == '__main__': with asyncio.Runner() as runner: runner.run(AnnotationQueueHandler().start())