import yaml from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext import constants from dto.annotation_created_message import AnnotationCreatedMessage from utils import Dotdict def main(): with open(constants.CONFIG_FILE, "r") as f: config_dict = yaml.safe_load(f) d_config = Dotdict(config_dict) queue_config = Dotdict(d_config.queue) try: with open(constants.OFFSET_FILE, 'r') as f: offset_dict = yaml.safe_load(f) offset_queue = offset_dict['offset_queue'] offset_confirmed_queue = offset_dict['offset_confirmed_queue'] except (FileNotFoundError, ValueError): with open(constants.OFFSET_FILE, 'w') as f: f.writelines(['offset_queue: 0', 'offset_confirmed_queue: 0']) command_consumer = Consumer( host=queue_config.host, port=queue_config.port, username=queue_config.consumer_user, password=queue_config.consumer_pw ) def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool): annotation_message = AnnotationCreatedMessage(message.body) print(f'Received: {annotation_message}') save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir annotation_message.save_annotation(save_folder) command_consumer.start() command_consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue)) command_consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue)) if __name__ == '__main__': main()