import yaml import asyncio from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext import constants from dto.annotation_created_message import AnnotationCreatedMessage from utils import Dotdict async def main(): with open(constants.CONFIG_FILE, "r") as f: config_dict = yaml.safe_load(f) d_config = Dotdict(config_dict) queue_config = Dotdict(d_config.queue) try: with open(constants.OFFSET_FILE, 'r') as f: offset_dict = yaml.safe_load(f) offset_queue = offset_dict['offset_queue'] offset_confirmed_queue = offset_dict['offset_confirmed_queue'] except (FileNotFoundError, ValueError): with open(constants.OFFSET_FILE, 'w') as f: f.writelines(['offset_queue: 0', 'offset_confirmed_queue: 0']) offset_queue = 0 offset_confirmed_queue = 0 consumer = Consumer( host=queue_config.host, port=queue_config.port, username=queue_config.consumer_user, password=queue_config.consumer_pw ) def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool): annotation_message = AnnotationCreatedMessage(message) print(f'Received: {annotation_message}') save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir annotation_message.save_annotation(save_folder) await consumer.start() await consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue)) await consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue)) try: await consumer.run() except (KeyboardInterrupt, asyncio.CancelledError): print("Closing Consumer...") return if __name__ == '__main__': with asyncio.Runner() as runner: runner.run(main())