From edf685fabc94789f4233f8b6c4d46e1295af4586 Mon Sep 17 00:00:00 2001 From: Alex Bezdieniezhnykh Date: Fri, 7 Mar 2025 14:33:02 +0200 Subject: [PATCH] fix rabbit consumer --- annotation_queue_handler.py | 81 ++++++++++++++++++------------- dto/annotation_created_message.py | 68 ++++++++++++++++++-------- preprocessing.py | 10 ++-- 3 files changed, 101 insertions(+), 58 deletions(-) diff --git a/annotation_queue_handler.py b/annotation_queue_handler.py index 1179779..6a16414 100644 --- a/annotation_queue_handler.py +++ b/annotation_queue_handler.py @@ -6,51 +6,62 @@ from dto.annotation_created_message import AnnotationCreatedMessage from utils import Dotdict -async def main(): - with open(constants.CONFIG_FILE, "r") as f: - config_dict = yaml.safe_load(f) - d_config = Dotdict(config_dict) - queue_config = Dotdict(d_config.queue) +class AnnotationQueueHandler: + def __init__(self): + with open(constants.CONFIG_FILE, "r") as f: + config_dict = yaml.safe_load(f) + d_config = Dotdict(config_dict) + queue_config = Dotdict(d_config.queue) - try: - with open(constants.OFFSET_FILE, 'r') as f: - offset_dict = yaml.safe_load(f) - offset_queue = offset_dict['offset_queue'] - offset_confirmed_queue = offset_dict['offset_confirmed_queue'] - except (FileNotFoundError, ValueError): - with open(constants.OFFSET_FILE, 'w') as f: - f.writelines(['offset_queue: 0', - 'offset_confirmed_queue: 0']) - offset_queue = 0 - offset_confirmed_queue = 0 + self.consumer = Consumer( + host=queue_config.host, + port=queue_config.port, + username=queue_config.consumer_user, + password=queue_config.consumer_pw + ) + try: + with open(constants.OFFSET_FILE, 'r') as f: + offset_dict = yaml.safe_load(f) + self.offset_queue = offset_dict['offset_queue'] + self.offset_confirmed_queue = offset_dict['offset_confirmed_queue'] + except (FileNotFoundError, ValueError): + with open(constants.OFFSET_FILE, 'w') as f: + f.writelines(['offset_queue: 0', + 'offset_confirmed_queue: 0']) + self.offset_queue = 0 + self.offset_confirmed_queue = 0 - consumer = Consumer( - host=queue_config.host, - port=queue_config.port, - username=queue_config.consumer_user, - password=queue_config.consumer_pw - ) - - def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool): + def on_message(self, message: AMQPMessage, context: MessageContext, is_seed: bool): annotation_message = AnnotationCreatedMessage(message) print(f'Received: {annotation_message}') save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir annotation_message.save_annotation(save_folder) + if is_seed: + self.offset_queue = context.offset + else: + self.offset_confirmed_queue = context.offset + with open(constants.OFFSET_FILE, 'w') as offset_file: + yaml.dump({'offset_queue': self.offset_queue, + 'offset_confirmed_queue': self.offset_confirmed_queue}, offset_file) - await consumer.start() - await consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True), - offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue)) + async def start(self): + await self.consumer.start() + await self.consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, + callback=lambda msg, context: self.on_message(msg, context, is_seed=True), + offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue)) - await consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False), - offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue)) + await self.consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, + callback=lambda msg, context: self.on_message(msg, context, is_seed=False), + offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_confirmed_queue)) + + try: + await self.consumer.run() + except (KeyboardInterrupt, asyncio.CancelledError): + print("Closing Consumer...") + return - try: - await consumer.run() - except (KeyboardInterrupt, asyncio.CancelledError): - print("Closing Consumer...") - return if __name__ == '__main__': with asyncio.Runner() as runner: - runner.run(main()) \ No newline at end of file + runner.run(AnnotationQueueHandler().start()) \ No newline at end of file diff --git a/dto/annotation_created_message.py b/dto/annotation_created_message.py index 3e2a44e..161239f 100644 --- a/dto/annotation_created_message.py +++ b/dto/annotation_created_message.py @@ -1,6 +1,8 @@ +import datetime import os import msgpack import json +import datetime class Detection: @@ -17,21 +19,49 @@ class Detection: return f'{self.cls}: {self.x:.2f} {self.y:.2f} {self.w:.2f} {self.h:.2f}, prob: {(self.confidence * 100):.1f}%' -class AnnotationCreatedMessage: +class AnnotationCreatedMessageNarrow: def __init__(self, msgpack_bytes): - unpacked_data = msgpack.unpackb(msgpack_bytes) - self.createdDate = unpacked_data.get(0) + unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False) self.name = unpacked_data.get(1) - self.originalMediaName = unpacked_data.get(2) - self.time = unpacked_data.get(3) - self.imageExtension = unpacked_data.get(4) - detections_json_str = unpacked_data.get(5) + self.createdEmail = unpacked_data.get(2) + + +class AnnotationCreatedMessage: + last_offset = None + + def __init__(self, msgpack_bytes): + unpacked_data = self.read_rabbit(msgpack_bytes) + ts = unpacked_data[0] + self.createdDate = datetime.datetime.utcfromtimestamp(ts.seconds) + datetime.timedelta(microseconds=ts.nanoseconds/1000) + self.name = unpacked_data[1] + self.originalMediaName = unpacked_data[2] + self.time = datetime.timedelta(microseconds=unpacked_data[3]/10) + self.imageExtension = unpacked_data[4] + detections_json_str = unpacked_data[5] self.detections = self._parse_detections(detections_json_str) - self.image = unpacked_data.get(6) - self.createdRole = unpacked_data.get(7) - self.createdEmail = unpacked_data.get(8) - self.source = unpacked_data.get(9) - self.status = unpacked_data.get(10) + self.image = unpacked_data[6] + self.createdRole = unpacked_data[7] + self.createdEmail = unpacked_data[8] + self.source = unpacked_data[9] + self.status = unpacked_data[10] + + @staticmethod + def read_rabbit(message_bytes): + if AnnotationCreatedMessage.last_offset is not None: + try: + unpacked_data = msgpack.unpackb(message_bytes[AnnotationCreatedMessage.last_offset:], raw=False, strict_map_key=False) + return unpacked_data + except Exception: + pass + + for offset in range(3, 15): + try: + unpacked_data = msgpack.unpackb(message_bytes[offset:], raw=False, strict_map_key=False) + AnnotationCreatedMessage.last_offset = offset + return unpacked_data + except Exception: + pass + raise Exception(f'Cannot read rabbit message! Bytes: {message_bytes}') def __str__(self): if self.detections: @@ -46,13 +76,13 @@ class AnnotationCreatedMessage: if detections_json_str: detections_list = json.loads(detections_json_str) return [Detection( - d.get('AnnotationName'), - d.get('ClassNumber'), - d.get('CenterX'), - d.get('CenterY'), - d.get('Height'), - d.get('Width'), - d.get('Probability') + d.get('an'), + d.get('cl'), + d.get('x'), + d.get('y'), + d.get('w'), + d.get('h'), + d.get('p') ) for d in detections_list] return [] diff --git a/preprocessing.py b/preprocessing.py index d4c30d6..1a104a9 100644 --- a/preprocessing.py +++ b/preprocessing.py @@ -28,6 +28,7 @@ transform = A.Compose([ A.HueSaturationValue(p=0.3, hue_shift_limit=8, sat_shift_limit=8, val_shift_limit=8) ], bbox_params=A.BboxParams(format='yolo')) + def correct_bboxes(labels): margin = 0.0005 min_size = 0.01 @@ -40,11 +41,11 @@ def correct_bboxes(labels): # calc how much bboxes are outside borders ( +small margin ). # value should be negative. If it's positive, then put 0, as no correction - w_diff = min( (1 - margin) - (x + half_width), (x - half_width) - margin, 0 ) + w_diff = min((1 - margin) - (x + half_width), (x - half_width) - margin, 0) w = bboxes[2] + 2*w_diff if w < min_size: continue - h_diff = min( (1 - margin) - (y + half_height), ((y - half_height) - margin), 0) + h_diff = min((1 - margin) - (y + half_height), ((y - half_height) - margin), 0) h = bboxes[3] + 2 * h_diff if h < min_size: continue @@ -125,10 +126,11 @@ def preprocess_annotations(): with concurrent.futures.ThreadPoolExecutor() as executor: executor.map(process_image_file, images) -def process_image_file(image_file): # this function will be executed in thread + +def process_image_file(image_file): try: image_path = os.path.join(data_images_dir, image_file.name) - labels_path = os.path.join(data_labels_dir, f'{Path(image_path).stem}.txt') + labels_path = os.path.join(data_labels_dir, f'{Path(str(image_path)).stem}.txt') image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED) img_ann = ImageLabel(