import os import shutil import yaml import asyncio from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext from rstream.amqp import amqp_decoder from os import path, makedirs from datetime import datetime, timedelta import logging from logging.handlers import TimedRotatingFileHandler from annotation_queue_dto import AnnotationStatus, AnnotationMessage, AnnotationBulkMessage logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) makedirs('logs', exist_ok=True) handler = TimedRotatingFileHandler( path.join("logs", '{:%Y-%m-%d}.log'.format(datetime.now())), when="midnight", interval=1, backupCount=7, encoding="utf-8" ) formatter = logging.Formatter('%(asctime)s|%(message)s', "%H:%M:%S") handler.setFormatter(formatter) logger.addHandler(handler) class AnnotationQueueHandler: CONFIG_FILE = 'config.yaml' OFFSET_FILE = 'offset.yaml' QUEUE_ANNOTATION_STATUS_RECORD = b'AnnotationStatus' QUEUE_EMAIL_RECORD = b'Email' JPG_EXT = '.jpg' TXT_EXT = '.txt' class AnnotationName: def __init__(self, h, name): self.img_data = path.join(h.data_dir, h.images_dir, f"{name}{h.JPG_EXT}") self.lbl_data = path.join(h.data_dir, h.labels_dir, f"{name}{h.TXT_EXT}") self.img_seed = path.join(h.data_seed_dir, h.images_dir, f"{name}{h.JPG_EXT}") self.lbl_seed = path.join(h.data_seed_dir, h.labels_dir, f"{name}{h.TXT_EXT}") def __init__(self): with open(self.CONFIG_FILE, "r") as f: config_dict = yaml.safe_load(f) root = config_dict['dirs']['root'] self.data_dir = path.join(root, config_dict['dirs']['data']) self.data_seed_dir = path.join(root, config_dict['dirs']['data_seed']) self.images_dir = config_dict['dirs']['images'] self.labels_dir = config_dict['dirs']['labels'] self.del_img_dir = path.join(root, config_dict['dirs']['data_deleted'], self.images_dir) self.del_lbl_dir = path.join(root, config_dict['dirs']['data_deleted'], self.labels_dir) makedirs(path.join(self.data_dir, self.images_dir), exist_ok=True) makedirs(path.join(self.data_dir, self.labels_dir), exist_ok=True) makedirs(path.join(self.data_seed_dir, self.images_dir), exist_ok=True) makedirs(path.join(self.data_seed_dir, self.labels_dir), exist_ok=True) makedirs(self.del_img_dir, exist_ok=True) makedirs(self.del_lbl_dir, exist_ok=True) self.consumer = Consumer( host=config_dict['queue']['host'], port=config_dict['queue']['port'], username=config_dict['queue']['consumer_user'], password=config_dict['queue']['consumer_pw'] ) self.queue_name = config_dict['queue']['name'] try: with open(self.OFFSET_FILE, 'r') as f: offset_dict = yaml.safe_load(f) self.offset_queue = offset_dict['offset_queue'] except (FileNotFoundError, ValueError): with open(self.OFFSET_FILE, 'w') as f: f.writelines(['offset_queue: 0']) self.offset_queue = 0 def on_message(self, message: AMQPMessage, context: MessageContext): try: str_status = message.application_properties[bytes(self.QUEUE_ANNOTATION_STATUS_RECORD)].decode('utf-8') ann_status = AnnotationStatus[str_status] match ann_status: case AnnotationStatus.Created | AnnotationStatus.Edited: annotation_message = AnnotationMessage(message.body) self.save_annotation(annotation_message) logger.info(f'{annotation_message}') case AnnotationStatus.Validated: bulk_validate_message = AnnotationBulkMessage(message.body) bulk_validate_message.annotation_status = AnnotationStatus.Validated self.validate(bulk_validate_message) logger.info(f'{bulk_validate_message}') case AnnotationStatus.Deleted: bulk_delete_message = AnnotationBulkMessage(message.body) bulk_delete_message.annotation_status = AnnotationStatus.Deleted self.delete(bulk_delete_message) logger.info(f'{bulk_delete_message}') self.offset_queue = context.offset + 1 with open(self.OFFSET_FILE, 'w') as offset_file: yaml.dump({'offset_queue': self.offset_queue}, offset_file) except Exception as e: logger.error(e) async def start(self): await self.consumer.start() await self.consumer.subscribe(stream=self.queue_name, callback=self.on_message, decoder=amqp_decoder, offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue)) try: logger.info(f'Start reading queue from {self.offset_queue}') await self.consumer.run() except (KeyboardInterrupt, asyncio.CancelledError): logger.info("Closing Consumer...") return def save_annotation(self, ann): a = self.AnnotationName(self, ann.name) is_val = ann.createdRole.is_validator() try: # save label anyway lbl = a.lbl_data if is_val else a.lbl_seed with open(lbl, 'w') as label_file: label_file.writelines([ f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}' for detection in ann.detections ]) if ann.status == AnnotationStatus.Created: img = a.img_data if is_val else a.img_seed with open(img, 'wb') as image_file: image_file.write(ann.image) else: if is_val and path.exists(a.img_seed): shutil.move(a.img_seed, a.img_data) if is_val and path.exists(a.lbl_seed): os.remove(a.lbl_seed) except IOError as e: logger.error(f"Error during saving: {e}") def validate(self, msg): for name in msg.annotation_names: a = self.AnnotationName(self, name) shutil.move(a.img_seed, a.img_data) shutil.move(a.lbl_seed, a.lbl_data) def delete(self, msg): for name in msg.annotation_names: a = self.AnnotationName(self, name) if path.exists(a.img_data): shutil.move(a.img_data, self.del_img_dir) if path.exists(a.img_seed): shutil.move(a.img_seed, self.del_img_dir) if path.exists(a.lbl_data): shutil.move(a.lbl_data, self.del_lbl_dir) if path.exists(a.lbl_seed): shutil.move(a.lbl_seed, self.del_lbl_dir) if __name__ == '__main__': with asyncio.Runner() as runner: runner.run(AnnotationQueueHandler().start())