From b4a4826b1918154fad9b98af83b91280308b693d Mon Sep 17 00:00:00 2001 From: Alex Bezdieniezhnykh Date: Wed, 5 Mar 2025 20:58:33 +0200 Subject: [PATCH] add rabbit consumer --- annotation_queue_handler.py | 46 ++++++++++++++++ config.yaml | 8 ++- constants.py | 8 ++- dto/annotation_created_message.py | 87 +++++++++++++++++++++++++++++++ offset.yaml | 2 + requirements.txt | 4 +- 6 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 annotation_queue_handler.py create mode 100644 dto/annotation_created_message.py create mode 100644 offset.yaml diff --git a/annotation_queue_handler.py b/annotation_queue_handler.py new file mode 100644 index 0000000..5ba04a7 --- /dev/null +++ b/annotation_queue_handler.py @@ -0,0 +1,46 @@ +import yaml +from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext +import constants +from dto.annotation_created_message import AnnotationCreatedMessage +from utils import Dotdict + + +def main(): + with open(constants.CONFIG_FILE, "r") as f: + config_dict = yaml.safe_load(f) + d_config = Dotdict(config_dict) + queue_config = Dotdict(d_config.queue) + + try: + with open(constants.OFFSET_FILE, 'r') as f: + offset_dict = yaml.safe_load(f) + offset_queue = offset_dict['offset_queue'] + offset_confirmed_queue = offset_dict['offset_confirmed_queue'] + except (FileNotFoundError, ValueError): + with open(constants.OFFSET_FILE, 'w') as f: + f.writelines(['offset_queue: 0', + 'offset_confirmed_queue: 0']) + + command_consumer = Consumer( + host=queue_config.host, + port=queue_config.port, + username=queue_config.consumer_user, + password=queue_config.consumer_pw + ) + + def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool): + annotation_message = AnnotationCreatedMessage(message.body) + print(f'Received: {annotation_message}') + save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir + annotation_message.save_annotation(save_folder) + + command_consumer.start() + command_consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), + offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue)) + + command_consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), + offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue)) + + +if __name__ == '__main__': + main() diff --git a/config.yaml b/config.yaml index 8591287..5a8e52f 100644 --- a/config.yaml +++ b/config.yaml @@ -8,4 +8,10 @@ api: url: 'https://api.azaion.com' user: 'admin@azaion.com' pw: 'Az@1on1000Odm$n' - folder: '' \ No newline at end of file + folder: '' + +queue: + host: '188.245.120.247' + port: 5552 + consumer_user: 'azaion_receiver' + consumer_pw: 'Az1onRecce777ve2r' diff --git a/constants.py b/constants.py index cee0bcb..5beff4a 100644 --- a/constants.py +++ b/constants.py @@ -7,6 +7,8 @@ images = 'images' labels = 'labels' data_dir = path.join(azaion, 'data') +data_seeds_dir = path.join(azaion, 'data-seeds') +data_rabbit_dir = path.join(azaion, 'data-rabbit') data_images_dir = path.join(data_dir, images) data_labels_dir = path.join(data_dir, labels) @@ -23,9 +25,13 @@ sample_dir = path.join(azaion, 'data-sample') datasets_dir = path.join(azaion, 'datasets') models_dir = path.join(azaion, 'models') + annotation_classes = AnnotationClass.read_json() date_format = '%Y-%m-%d' checkpoint_file = 'checkpoint.txt' checkpoint_date_format = '%Y-%m-%d %H:%M:%S' -CONFIG_FILE = "config.yaml" +CONFIG_FILE = 'config.yaml' +ANNOTATIONS_QUEUE = 'azaion-annotations' +ANNOTATIONS_CONFIRMED_QUEUE = 'azaion-annotations-confirm' +OFFSET_FILE = 'offset.yaml' diff --git a/dto/annotation_created_message.py b/dto/annotation_created_message.py new file mode 100644 index 0000000..fa9f818 --- /dev/null +++ b/dto/annotation_created_message.py @@ -0,0 +1,87 @@ +import os +import msgpack +import json + + +class Detection: + def __init__(self, annotation_name, cls, x, y, w, h, confidence=None): + self.annotation_name = annotation_name + self.cls = cls + self.x = x + self.y = y + self.w = w + self.h = h + self.confidence = confidence + + def __str__(self): + return f'{self.cls}: {self.x:.2f} {self.y:.2f} {self.w:.2f} {self.h:.2f}, prob: {(self.confidence * 100):.1f}%' + + +class AnnotationCreatedMessage: + def __init__(self, msgpack_bytes): + unpacked_data = msgpack.unpackb(msgpack_bytes, raw=False) + self.createdDate = unpacked_data.get(0) + self.name = unpacked_data.get(1) + self.originalMediaName = unpacked_data.get(2) + self.time = unpacked_data.get(3) + self.imageExtension = unpacked_data.get(4) + detections_json_str = unpacked_data.get(5) + self.detections = self._parse_detections(detections_json_str) + self.image = unpacked_data.get(6) + self.createdRole = unpacked_data.get(7) + self.createdEmail = unpacked_data.get(8) + self.source = unpacked_data.get(9) + self.status = unpacked_data.get(10) + + def __str__(self): + if self.detections: + detections_str_list = [str(detection) for detection in self.detections] + detections_str = ", ".join(detections_str_list) + return f'{self.name}: [{detections_str}]' + else: + return f'{self.name}: [Empty]' + + @staticmethod + def _parse_detections(detections_json_str): + if detections_json_str: + detections_list = json.loads(detections_json_str) + return [Detection( + d.get('AnnotationName'), + d.get('ClassNumber'), + d.get('CenterX'), + d.get('CenterY'), + d.get('Height'), + d.get('Width'), + d.get('Probability') + ) for d in detections_list] + return [] + + def save_annotation(self, save_folder): + image_folder = os.path.join(save_folder, 'images') + labels_folder = os.path.join(save_folder, 'labels') + + os.makedirs(image_folder, exist_ok=True) + os.makedirs(labels_folder, exist_ok=True) + + image_path = os.path.join(image_folder, f"{self.name}.{self.imageExtension}") + label_path = os.path.join(labels_folder, f"{self.name}.txt") + + try: + with open(image_path, 'wb') as image_file: + image_file.write(self.image) + print(f"Image saved to: {image_path}") + except IOError as e: + print(f"Error saving image: {e}") + + try: + with open(label_path, 'w') as label_file: + if self.detections: + label_file.writelines([ + f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}' + for detection in self.detections + ]) + else: + label_file.write('') + print(f'Label saved to: {label_path}') + except IOError as e: + print(f"Error saving label: {e}") diff --git a/offset.yaml b/offset.yaml new file mode 100644 index 0000000..4c18904 --- /dev/null +++ b/offset.yaml @@ -0,0 +1,2 @@ +offset_queue: 0 +offset_confirmed_queue: 0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index fa4ab1b..61cbea4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,6 @@ PyYAML~=6.0.2 cryptography~=44.0.1 numpy~=2.1.1 requests~=2.32.3 -pyyaml \ No newline at end of file +pyyaml +msgpack +rstream \ No newline at end of file