diff --git a/annotation_queue_handler.py b/annotation_queue_handler.py index 5ba04a7..1179779 100644 --- a/annotation_queue_handler.py +++ b/annotation_queue_handler.py @@ -1,11 +1,12 @@ import yaml +import asyncio from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext import constants from dto.annotation_created_message import AnnotationCreatedMessage from utils import Dotdict -def main(): +async def main(): with open(constants.CONFIG_FILE, "r") as f: config_dict = yaml.safe_load(f) d_config = Dotdict(config_dict) @@ -20,8 +21,10 @@ def main(): with open(constants.OFFSET_FILE, 'w') as f: f.writelines(['offset_queue: 0', 'offset_confirmed_queue: 0']) + offset_queue = 0 + offset_confirmed_queue = 0 - command_consumer = Consumer( + consumer = Consumer( host=queue_config.host, port=queue_config.port, username=queue_config.consumer_user, @@ -29,18 +32,25 @@ def main(): ) def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool): - annotation_message = AnnotationCreatedMessage(message.body) + annotation_message = AnnotationCreatedMessage(message) print(f'Received: {annotation_message}') save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir annotation_message.save_annotation(save_folder) - command_consumer.start() - command_consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), + + await consumer.start() + await consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue)) - command_consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), + await consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue)) + try: + await consumer.run() + except (KeyboardInterrupt, asyncio.CancelledError): + print("Closing Consumer...") + return if __name__ == '__main__': - main() + with asyncio.Runner() as runner: + runner.run(main()) \ No newline at end of file diff --git a/dto/annotation_created_message.py b/dto/annotation_created_message.py index fa9f818..3e2a44e 100644 --- a/dto/annotation_created_message.py +++ b/dto/annotation_created_message.py @@ -19,7 +19,7 @@ class Detection: class AnnotationCreatedMessage: def __init__(self, msgpack_bytes): - unpacked_data = msgpack.unpackb(msgpack_bytes, raw=False) + unpacked_data = msgpack.unpackb(msgpack_bytes) self.createdDate = unpacked_data.get(0) self.name = unpacked_data.get(1) self.originalMediaName = unpacked_data.get(2) diff --git a/rabbit_receiver.py b/rabbit_receiver.py deleted file mode 100644 index e69de29..0000000