diff --git a/.gitignore b/.gitignore index 24b714b..dbf3c2f 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ models/ *.rknn *.mp4 venv -*.engine \ No newline at end of file +*.engine +*.log diff --git a/annotation_queue/annotation_queue_dto.py b/annotation_queue/annotation_queue_dto.py new file mode 100644 index 0000000..2394788 --- /dev/null +++ b/annotation_queue/annotation_queue_dto.py @@ -0,0 +1,142 @@ +import json +from datetime import datetime, timedelta +from enum import Enum +import msgpack + + +class WeatherMode(Enum): + Norm = 0 + Wint = 20 + Night = 40 + + +class AnnotationClass: + def __init__(self, id, name, color): + self.id = id + self.name = name + self.color = color + color_str = color.lstrip('#') + self.opencv_color = (int(color_str[4:6], 16), int(color_str[2:4], 16), int(color_str[0:2], 16)) + + @staticmethod + def read_json(): + with open('classes.json', 'r', encoding='utf-8') as f: + j = json.loads(f.read()) + annotations_dict = {} + for mode in WeatherMode: + for cl in j: + id = mode.value + cl['Id'] + name = cl['Name'] if mode.value == 0 else f'{cl["Name"]}({mode.name})' + annotations_dict[id] = AnnotationClass(id, name, cl['Color']) + return annotations_dict + + +annotation_classes = AnnotationClass.read_json() + + +class AnnotationStatus(Enum): + Created = 10 + Edited = 20 + Validated = 30 + Deleted = 40 + + def __str__(self): + return self.name + + +class SourceEnum(Enum): + AI = 0 + Manual = 1 + + +class RoleEnum(Enum): + Operator = 10 + Validator = 20 + CompanionPC = 30 + Admin = 40 + ApiAdmin = 1000 + + def __str__(self): + return self.name + + def is_validator(self) -> bool: + return self in { + self.__class__.Validator, + self.__class__.Admin, + self.__class__.ApiAdmin + } + + +class Detection: + def __init__(self, annotation_name, cls, x, y, w, h, confidence=None): + self.annotation_name = annotation_name + self.cls = cls + self.x = x + self.y = y + self.w = w + self.h = h + self.confidence = confidence + + def __str__(self): + return f'{annotation_classes[self.cls].name}: {self.x:.2f} {self.y:.2f} {self.w:.2f} {self.h:.2f}, prob: {(self.confidence * 100):.1f}%' + + +class AnnotationCreatedMessageNarrow: + def __init__(self, msgpack_bytes): + unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False) + self.name = unpacked_data.get(1) + self.createdEmail = unpacked_data.get(2) + + +class AnnotationMessage: + def __init__(self, msgpack_bytes): + unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False) + ts = unpacked_data[0] + self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds/1000) + self.name = unpacked_data[1] + self.originalMediaName = unpacked_data[2] + self.time = timedelta(microseconds=unpacked_data[3]/10) + self.imageExtension = unpacked_data[4] + self.detections = self._parse_detections(unpacked_data[5]) + self.image = unpacked_data[6] + + self.createdRole = RoleEnum(unpacked_data[7]) + self.createdEmail = unpacked_data[8] + self.source = SourceEnum(unpacked_data[9]) + self.status = AnnotationStatus(unpacked_data[10]) + + def __str__(self): + detections_str = "" + if self.detections: + detections_str_list = [str(detection) for detection in self.detections] + detections_str = " [" + ", ".join(detections_str_list) + "]" + createdBy = 'AI' if self.source == SourceEnum.AI else self.createdRole + return f'{self.status} {self.name} by [{createdBy} {self.createdEmail}]{detections_str}' + + @staticmethod + def _parse_detections(detections_json_str): + if detections_json_str: + detections_list = json.loads(detections_json_str) + return [Detection( + d.get('an'), + d.get('cl'), + d.get('x'), + d.get('y'), + d.get('w'), + d.get('h'), + d.get('p') + ) for d in detections_list] + return [] + + +class AnnotationBulkMessage: + def __init__(self, msgpack_bytes): + unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False) + self.annotation_names = unpacked_data[0] + self.annotation_status = AnnotationStatus(unpacked_data[1]) + self.createdEmail = unpacked_data[2] + ts = unpacked_data[3] + self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds / 1000) + + def __str__(self): + return f'{self.annotation_status}: [{self.annotation_names}]' diff --git a/annotation_queue/annotation_queue_handler.py b/annotation_queue/annotation_queue_handler.py new file mode 100644 index 0000000..8a38576 --- /dev/null +++ b/annotation_queue/annotation_queue_handler.py @@ -0,0 +1,170 @@ +import os +import shutil +import yaml +import asyncio +from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext +from rstream.amqp import amqp_decoder +from os import path, makedirs +from datetime import datetime, timedelta +import logging +from logging.handlers import TimedRotatingFileHandler + +from annotation_queue.annotation_queue_dto import AnnotationStatus, AnnotationMessage, AnnotationBulkMessage + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +makedirs('logs', exist_ok=True) +handler = TimedRotatingFileHandler( + path.join("logs", '{:%Y-%m-%d}.log'.format(datetime.now())), + when="midnight", + interval=1, + backupCount=7, + encoding="utf-8" +) +formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s', "%H:%M:%S") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +class AnnotationQueueHandler: + CONFIG_FILE = 'config.yaml' + OFFSET_FILE = 'offset.yaml' + QUEUE_ANNOTATION_STATUS_RECORD = b'AnnotationStatus' + QUEUE_EMAIL_RECORD = b'Email' + + JPG_EXT = '.jpg' + TXT_EXT = '.txt' + + class AnnotationName: + def __init__(self, h, name): + self.img_data = path.join(h.data_dir, h.images_dir, f"{name}{h.JPG_EXT}") + self.lbl_data = path.join(h.data_dir, h.labels_dir, f"{name}{h.TXT_EXT}") + self.img_seed = path.join(h.data_seed_dir, h.images_dir, f"{name}{h.JPG_EXT}") + self.lbl_seed = path.join(h.data_seed_dir, h.labels_dir, f"{name}{h.TXT_EXT}") + + def __init__(self): + with open(self.CONFIG_FILE, "r") as f: + config_dict = yaml.safe_load(f) + + root = config_dict['dirs']['root'] + + self.data_dir = path.join(root, config_dict['dirs']['data']) + self.data_seed_dir = path.join(root, config_dict['dirs']['data_seed']) + self.images_dir = config_dict['dirs']['images'] + self.labels_dir = config_dict['dirs']['labels'] + + self.del_img_dir = path.join(root, config_dict['dirs']['data_deleted'], self.images_dir) + self.del_lbl_dir = path.join(root, config_dict['dirs']['data_deleted'], self.labels_dir) + + makedirs(path.join(self.data_dir, self.images_dir), exist_ok=True) + makedirs(path.join(self.data_dir, self.labels_dir), exist_ok=True) + makedirs(path.join(self.data_seed_dir, self.images_dir), exist_ok=True) + makedirs(path.join(self.data_seed_dir, self.labels_dir), exist_ok=True) + + makedirs(self.del_img_dir, exist_ok=True) + makedirs(self.del_lbl_dir, exist_ok=True) + + self.consumer = Consumer( + host=config_dict['queue']['host'], + port=config_dict['queue']['port'], + username=config_dict['queue']['consumer_user'], + password=config_dict['queue']['consumer_pw'] + ) + self.queue_name = config_dict['queue']['name'] + + try: + with open(self.OFFSET_FILE, 'r') as f: + offset_dict = yaml.safe_load(f) + self.offset_queue = offset_dict['offset_queue'] + except (FileNotFoundError, ValueError): + with open(self.OFFSET_FILE, 'w') as f: + f.writelines(['offset_queue: 0']) + self.offset_queue = 0 + + def on_message(self, message: AMQPMessage, context: MessageContext): + try: + str_status = message.application_properties[bytes(self.QUEUE_ANNOTATION_STATUS_RECORD)].decode('utf-8') + ann_status = AnnotationStatus[str_status] + match ann_status: + case AnnotationStatus.Created | AnnotationStatus.Edited: + annotation_message = AnnotationMessage(message.body) + self.save_annotation(annotation_message) + logger.info(f'{annotation_message}') + case AnnotationStatus.Validated: + bulk_validate_message = AnnotationBulkMessage(message.body) + bulk_validate_message.annotation_status = AnnotationStatus.Validated + self.validate(bulk_validate_message) + logger.info(f'{bulk_validate_message}') + case AnnotationStatus.Deleted: + bulk_delete_message = AnnotationBulkMessage(message.body) + bulk_delete_message.annotation_status = AnnotationStatus.Deleted + self.delete(bulk_delete_message) + logger.info(f'{bulk_delete_message}') + self.offset_queue = context.offset + 1 + with open(self.OFFSET_FILE, 'w') as offset_file: + yaml.dump({'offset_queue': self.offset_queue}, offset_file) + except Exception as e: + logger.error(e) + + async def start(self): + await self.consumer.start() + await self.consumer.subscribe(stream=self.queue_name, + callback=self.on_message, + decoder=amqp_decoder, + offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue)) + + try: + logger.info(f'Start reading queue from {self.offset_queue}') + await self.consumer.run() + except (KeyboardInterrupt, asyncio.CancelledError): + logger.info("Closing Consumer...") + return + + def save_annotation(self, ann): + a = self.AnnotationName(self, ann.name) + is_val = ann.createdRole.is_validator() + + try: + # save label anyway + lbl = a.lbl_data if is_val else a.lbl_seed + with open(lbl, 'w') as label_file: + label_file.writelines([ + f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}' + for detection in ann.detections + ]) + if ann.status == AnnotationStatus.Created: + img = a.img_data if is_val else a.img_seed + with open(img, 'wb') as image_file: + image_file.write(ann.image) + else: + if is_val and path.exists(a.img_seed): + shutil.move(a.img_seed, a.img_data) + if is_val and path.exists(a.lbl_seed): + os.remove(a.lbl_seed) + + except IOError as e: + logger.error(f"Error during saving: {e}") + + def validate(self, msg): + for name in msg.annotation_names: + a = self.AnnotationName(self, name) + shutil.move(a.img_seed, a.img_data) + shutil.move(a.lbl_seed, a.lbl_data) + + def delete(self, msg): + for name in msg.annotation_names: + a = self.AnnotationName(self, name) + if path.exists(a.img_data): + shutil.move(a.img_data, self.del_img_dir) + if path.exists(a.img_seed): + shutil.move(a.img_seed, self.del_img_dir) + + if path.exists(a.lbl_data): + shutil.move(a.lbl_data, self.del_lbl_dir) + if path.exists(a.lbl_seed): + shutil.move(a.lbl_seed, self.del_lbl_dir) + + +if __name__ == '__main__': + with asyncio.Runner() as runner: + runner.run(AnnotationQueueHandler().start()) diff --git a/annotation_queue/classes.json b/annotation_queue/classes.json new file mode 100644 index 0000000..1b6259c --- /dev/null +++ b/annotation_queue/classes.json @@ -0,0 +1,19 @@ +[ + { "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#ff0000" }, + { "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00ff00" }, + { "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000ff" }, + { "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#ffff00" }, + { "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#ff00ff" }, + { "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00ffff" }, + { "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" }, + { "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" }, + { "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" }, + { "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" }, + { "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#a52a2a" }, + { "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" }, + { "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#87ceeb" }, + { "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" }, + { "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" }, + { "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" }, + { "Id": 16, "Name": "Caponier", "ShortName": "Капонір", "Color": "#ffa500" } +] \ No newline at end of file diff --git a/annotation_queue/config.yaml b/annotation_queue/config.yaml new file mode 100644 index 0000000..33f268d --- /dev/null +++ b/annotation_queue/config.yaml @@ -0,0 +1,20 @@ +api: + url: 'https://api.azaion.com' + email: 'uploader@azaion.com' + password: 'Az@1on_10Upl0@der' + +queue: + host: '188.245.120.247' + port: 5552 + consumer_user: 'azaion_receiver' + consumer_pw: 'Az1onRecce777ve2r' + name: 'azaion-annotations' + +dirs: + root: '/azaion' + data: 'data' + data_seed: 'data-seed' + data_processed: 'data-processed' + data_deleted: 'data_deleted' + images: 'images' + labels: 'labels' diff --git a/annotation_queue/offset.yaml b/annotation_queue/offset.yaml new file mode 100644 index 0000000..3815d7c --- /dev/null +++ b/annotation_queue/offset.yaml @@ -0,0 +1 @@ +offset_queue: 0 diff --git a/annotation_queue_handler.py b/annotation_queue_handler.py deleted file mode 100644 index 6a16414..0000000 --- a/annotation_queue_handler.py +++ /dev/null @@ -1,67 +0,0 @@ -import yaml -import asyncio -from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext -import constants -from dto.annotation_created_message import AnnotationCreatedMessage -from utils import Dotdict - - -class AnnotationQueueHandler: - def __init__(self): - with open(constants.CONFIG_FILE, "r") as f: - config_dict = yaml.safe_load(f) - d_config = Dotdict(config_dict) - queue_config = Dotdict(d_config.queue) - - self.consumer = Consumer( - host=queue_config.host, - port=queue_config.port, - username=queue_config.consumer_user, - password=queue_config.consumer_pw - ) - try: - with open(constants.OFFSET_FILE, 'r') as f: - offset_dict = yaml.safe_load(f) - self.offset_queue = offset_dict['offset_queue'] - self.offset_confirmed_queue = offset_dict['offset_confirmed_queue'] - except (FileNotFoundError, ValueError): - with open(constants.OFFSET_FILE, 'w') as f: - f.writelines(['offset_queue: 0', - 'offset_confirmed_queue: 0']) - self.offset_queue = 0 - self.offset_confirmed_queue = 0 - - def on_message(self, message: AMQPMessage, context: MessageContext, is_seed: bool): - annotation_message = AnnotationCreatedMessage(message) - print(f'Received: {annotation_message}') - save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir - annotation_message.save_annotation(save_folder) - - if is_seed: - self.offset_queue = context.offset - else: - self.offset_confirmed_queue = context.offset - with open(constants.OFFSET_FILE, 'w') as offset_file: - yaml.dump({'offset_queue': self.offset_queue, - 'offset_confirmed_queue': self.offset_confirmed_queue}, offset_file) - - async def start(self): - await self.consumer.start() - await self.consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, - callback=lambda msg, context: self.on_message(msg, context, is_seed=True), - offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue)) - - await self.consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, - callback=lambda msg, context: self.on_message(msg, context, is_seed=False), - offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_confirmed_queue)) - - try: - await self.consumer.run() - except (KeyboardInterrupt, asyncio.CancelledError): - print("Closing Consumer...") - return - - -if __name__ == '__main__': - with asyncio.Runner() as runner: - runner.run(AnnotationQueueHandler().start()) \ No newline at end of file diff --git a/augmentation.py b/augmentation.py index d7aa7d4..5c8cff8 100644 --- a/augmentation.py +++ b/augmentation.py @@ -115,7 +115,7 @@ class Augmentator: f.writelines(lines) f.close() - print(f'{datetime.now():{'%Y-%m-%d %H:%M:%S'}}: {self.total_files_processed + 1}/{self.total_images_to_process} : {image_file.name} has augmented') + print(f'{datetime.now():{"%Y-%m-%d %H:%M:%S"}}: {self.total_files_processed + 1}/{self.total_to_process} : {image_file.name} has augmented') except Exception as e: print(e) self.total_files_processed += 1 @@ -148,5 +148,5 @@ if __name__ == '__main__': augmentator = Augmentator() while True: augmentator.augment_annotations() - print('All processed, waiting for 2 minutes...') + print('All processed, waiting for 5 minutes...') time.sleep(300) diff --git a/classes.json b/classes.json index b6e1ac5..1b6259c 100644 --- a/classes.json +++ b/classes.json @@ -1,18 +1,19 @@ [ - { "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#FF0000" }, - { "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00FF00" }, - { "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000FF" }, - { "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#FFFF00" }, - { "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#FF00FF" }, - { "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00FFFF" }, - { "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" }, - { "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" }, - { "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" }, - { "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" }, - { "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#000080" }, - { "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" }, - { "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#800080" }, - { "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" }, - { "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" }, - { "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" } + { "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#ff0000" }, + { "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00ff00" }, + { "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000ff" }, + { "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#ffff00" }, + { "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#ff00ff" }, + { "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00ffff" }, + { "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" }, + { "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" }, + { "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" }, + { "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" }, + { "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#a52a2a" }, + { "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" }, + { "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#87ceeb" }, + { "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" }, + { "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" }, + { "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" }, + { "Id": 16, "Name": "Caponier", "ShortName": "Капонір", "Color": "#ffa500" } ] \ No newline at end of file diff --git a/config.yaml b/config.yaml index 10943c7..33f268d 100644 --- a/config.yaml +++ b/config.yaml @@ -8,3 +8,13 @@ queue: port: 5552 consumer_user: 'azaion_receiver' consumer_pw: 'Az1onRecce777ve2r' + name: 'azaion-annotations' + +dirs: + root: '/azaion' + data: 'data' + data_seed: 'data-seed' + data_processed: 'data-processed' + data_deleted: 'data_deleted' + images: 'images' + labels: 'labels' diff --git a/constants.py b/constants.py index 339b2b1..26b2c88 100644 --- a/constants.py +++ b/constants.py @@ -6,8 +6,6 @@ images = 'images' labels = 'labels' data_dir = path.join(azaion, 'data') -data_seeds_dir = path.join(azaion, 'data-seeds') -data_rabbit_dir = path.join(azaion, 'data-rabbit') data_images_dir = path.join(data_dir, images) data_labels_dir = path.join(data_dir, labels) @@ -30,8 +28,10 @@ checkpoint_file = 'checkpoint.txt' checkpoint_date_format = '%Y-%m-%d %H:%M:%S' CONFIG_FILE = 'config.yaml' -ANNOTATIONS_QUEUE = 'azaion-annotations' -ANNOTATIONS_CONFIRMED_QUEUE = 'azaion-annotations-confirm' + +JPG_EXT = '.jpg' +TXT_EXT = '.txt' + OFFSET_FILE = 'offset.yaml' AI_ONNX_MODEL_FILE_BIG = "azaion.onnx.big" diff --git a/dto/annotation_bulk_message.py b/dto/annotation_bulk_message.py new file mode 100644 index 0000000..e69de29 diff --git a/dto/annotation_created_message.py b/dto/annotation_created_message.py deleted file mode 100644 index 161239f..0000000 --- a/dto/annotation_created_message.py +++ /dev/null @@ -1,117 +0,0 @@ -import datetime -import os -import msgpack -import json -import datetime - - -class Detection: - def __init__(self, annotation_name, cls, x, y, w, h, confidence=None): - self.annotation_name = annotation_name - self.cls = cls - self.x = x - self.y = y - self.w = w - self.h = h - self.confidence = confidence - - def __str__(self): - return f'{self.cls}: {self.x:.2f} {self.y:.2f} {self.w:.2f} {self.h:.2f}, prob: {(self.confidence * 100):.1f}%' - - -class AnnotationCreatedMessageNarrow: - def __init__(self, msgpack_bytes): - unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False) - self.name = unpacked_data.get(1) - self.createdEmail = unpacked_data.get(2) - - -class AnnotationCreatedMessage: - last_offset = None - - def __init__(self, msgpack_bytes): - unpacked_data = self.read_rabbit(msgpack_bytes) - ts = unpacked_data[0] - self.createdDate = datetime.datetime.utcfromtimestamp(ts.seconds) + datetime.timedelta(microseconds=ts.nanoseconds/1000) - self.name = unpacked_data[1] - self.originalMediaName = unpacked_data[2] - self.time = datetime.timedelta(microseconds=unpacked_data[3]/10) - self.imageExtension = unpacked_data[4] - detections_json_str = unpacked_data[5] - self.detections = self._parse_detections(detections_json_str) - self.image = unpacked_data[6] - self.createdRole = unpacked_data[7] - self.createdEmail = unpacked_data[8] - self.source = unpacked_data[9] - self.status = unpacked_data[10] - - @staticmethod - def read_rabbit(message_bytes): - if AnnotationCreatedMessage.last_offset is not None: - try: - unpacked_data = msgpack.unpackb(message_bytes[AnnotationCreatedMessage.last_offset:], raw=False, strict_map_key=False) - return unpacked_data - except Exception: - pass - - for offset in range(3, 15): - try: - unpacked_data = msgpack.unpackb(message_bytes[offset:], raw=False, strict_map_key=False) - AnnotationCreatedMessage.last_offset = offset - return unpacked_data - except Exception: - pass - raise Exception(f'Cannot read rabbit message! Bytes: {message_bytes}') - - def __str__(self): - if self.detections: - detections_str_list = [str(detection) for detection in self.detections] - detections_str = ", ".join(detections_str_list) - return f'{self.name}: [{detections_str}]' - else: - return f'{self.name}: [Empty]' - - @staticmethod - def _parse_detections(detections_json_str): - if detections_json_str: - detections_list = json.loads(detections_json_str) - return [Detection( - d.get('an'), - d.get('cl'), - d.get('x'), - d.get('y'), - d.get('w'), - d.get('h'), - d.get('p') - ) for d in detections_list] - return [] - - def save_annotation(self, save_folder): - image_folder = os.path.join(save_folder, 'images') - labels_folder = os.path.join(save_folder, 'labels') - - os.makedirs(image_folder, exist_ok=True) - os.makedirs(labels_folder, exist_ok=True) - - image_path = os.path.join(image_folder, f"{self.name}.{self.imageExtension}") - label_path = os.path.join(labels_folder, f"{self.name}.txt") - - try: - with open(image_path, 'wb') as image_file: - image_file.write(self.image) - print(f"Image saved to: {image_path}") - except IOError as e: - print(f"Error saving image: {e}") - - try: - with open(label_path, 'w') as label_file: - if self.detections: - label_file.writelines([ - f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}' - for detection in self.detections - ]) - else: - label_file.write('') - print(f'Label saved to: {label_path}') - except IOError as e: - print(f"Error saving label: {e}") diff --git a/dto/annotation_message.py b/dto/annotation_message.py new file mode 100644 index 0000000..e69de29 diff --git a/offset.yaml b/offset.yaml index 4c18904..c70ccd2 100644 --- a/offset.yaml +++ b/offset.yaml @@ -1,2 +1 @@ -offset_queue: 0 -offset_confirmed_queue: 0 \ No newline at end of file +offset_queue: 0 \ No newline at end of file