mirror of
https://github.com/azaion/ai-training.git
synced 2026-04-22 08:26:35 +00:00
174 lines
7.0 KiB
Python
174 lines
7.0 KiB
Python
import os
|
|
import shutil
|
|
import sys
|
|
|
|
import yaml
|
|
import asyncio
|
|
from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
|
|
from rstream.amqp import amqp_decoder
|
|
from os import path, makedirs
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
|
|
from annotation_queue_dto import AnnotationStatus, AnnotationMessage, AnnotationBulkMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
makedirs('logs', exist_ok=True)
|
|
handler = TimedRotatingFileHandler(
|
|
path.join("logs", '{:%Y-%m-%d}.log'.format(datetime.now())),
|
|
when="midnight",
|
|
interval=1,
|
|
backupCount=7,
|
|
encoding="utf-8"
|
|
)
|
|
formatter = logging.Formatter('%(asctime)s|%(message)s', "%H:%M:%S")
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.addHandler(logging.StreamHandler(sys.stdout))
|
|
|
|
|
|
class AnnotationQueueHandler:
|
|
CONFIG_FILE = 'config.yaml'
|
|
OFFSET_FILE = 'offset.yaml'
|
|
QUEUE_ANNOTATION_STATUS_RECORD = b'AnnotationStatus'
|
|
QUEUE_EMAIL_RECORD = b'Email'
|
|
|
|
JPG_EXT = '.jpg'
|
|
TXT_EXT = '.txt'
|
|
|
|
class AnnotationName:
|
|
def __init__(self, h, name):
|
|
self.img_data = path.join(h.data_dir, h.images_dir, f"{name}{h.JPG_EXT}")
|
|
self.lbl_data = path.join(h.data_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
|
|
self.img_seed = path.join(h.data_seed_dir, h.images_dir, f"{name}{h.JPG_EXT}")
|
|
self.lbl_seed = path.join(h.data_seed_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
|
|
|
|
def __init__(self):
|
|
with open(self.CONFIG_FILE, "r") as f:
|
|
config_dict = yaml.safe_load(f)
|
|
|
|
root = config_dict['dirs']['root']
|
|
|
|
self.data_dir = path.join(root, config_dict['dirs']['data'])
|
|
self.data_seed_dir = path.join(root, config_dict['dirs']['data_seed'])
|
|
self.images_dir = config_dict['dirs']['images']
|
|
self.labels_dir = config_dict['dirs']['labels']
|
|
|
|
self.del_img_dir = path.join(root, config_dict['dirs']['data_deleted'], self.images_dir)
|
|
self.del_lbl_dir = path.join(root, config_dict['dirs']['data_deleted'], self.labels_dir)
|
|
|
|
makedirs(path.join(self.data_dir, self.images_dir), exist_ok=True)
|
|
makedirs(path.join(self.data_dir, self.labels_dir), exist_ok=True)
|
|
makedirs(path.join(self.data_seed_dir, self.images_dir), exist_ok=True)
|
|
makedirs(path.join(self.data_seed_dir, self.labels_dir), exist_ok=True)
|
|
|
|
makedirs(self.del_img_dir, exist_ok=True)
|
|
makedirs(self.del_lbl_dir, exist_ok=True)
|
|
|
|
self.consumer = Consumer(
|
|
host=config_dict['queue']['host'],
|
|
port=config_dict['queue']['port'],
|
|
username=config_dict['queue']['consumer_user'],
|
|
password=config_dict['queue']['consumer_pw']
|
|
)
|
|
self.queue_name = config_dict['queue']['name']
|
|
|
|
try:
|
|
with open(self.OFFSET_FILE, 'r') as f:
|
|
offset_dict = yaml.safe_load(f)
|
|
self.offset_queue = offset_dict['offset_queue']
|
|
except (FileNotFoundError, ValueError):
|
|
with open(self.OFFSET_FILE, 'w') as f:
|
|
f.writelines(['offset_queue: 0'])
|
|
self.offset_queue = 0
|
|
|
|
def on_message(self, message: AMQPMessage, context: MessageContext):
|
|
try:
|
|
str_status = message.application_properties[bytes(self.QUEUE_ANNOTATION_STATUS_RECORD)].decode('utf-8')
|
|
ann_status = AnnotationStatus[str_status]
|
|
match ann_status:
|
|
case AnnotationStatus.Created | AnnotationStatus.Edited:
|
|
annotation_message = AnnotationMessage(message.body)
|
|
self.save_annotation(annotation_message)
|
|
logger.info(f'{annotation_message}')
|
|
case AnnotationStatus.Validated:
|
|
bulk_validate_message = AnnotationBulkMessage(message.body)
|
|
bulk_validate_message.annotation_status = AnnotationStatus.Validated
|
|
self.validate(bulk_validate_message)
|
|
logger.info(f'{bulk_validate_message}')
|
|
case AnnotationStatus.Deleted:
|
|
bulk_delete_message = AnnotationBulkMessage(message.body)
|
|
bulk_delete_message.annotation_status = AnnotationStatus.Deleted
|
|
self.delete(bulk_delete_message)
|
|
logger.info(f'{bulk_delete_message}')
|
|
self.offset_queue = context.offset + 1
|
|
with open(self.OFFSET_FILE, 'w') as offset_file:
|
|
yaml.dump({'offset_queue': self.offset_queue}, offset_file)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
async def start(self):
|
|
await self.consumer.start()
|
|
await self.consumer.subscribe(stream=self.queue_name,
|
|
callback=self.on_message,
|
|
decoder=amqp_decoder,
|
|
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue))
|
|
|
|
try:
|
|
logger.info(f'Start reading queue from {self.offset_queue}')
|
|
await self.consumer.run()
|
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
|
logger.info("Closing Consumer...")
|
|
return
|
|
|
|
def save_annotation(self, ann):
|
|
a = self.AnnotationName(self, ann.name)
|
|
is_val = ann.createdRole.is_validator()
|
|
|
|
try:
|
|
# save label anyway
|
|
lbl = a.lbl_data if is_val else a.lbl_seed
|
|
with open(lbl, 'w') as label_file:
|
|
label_file.writelines([
|
|
f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}'
|
|
for detection in ann.detections
|
|
])
|
|
if ann.status == AnnotationStatus.Created:
|
|
img = a.img_data if is_val else a.img_seed
|
|
with open(img, 'wb') as image_file:
|
|
image_file.write(ann.image)
|
|
else:
|
|
if is_val and path.exists(a.img_seed):
|
|
shutil.move(a.img_seed, a.img_data)
|
|
if is_val and path.exists(a.lbl_seed):
|
|
os.remove(a.lbl_seed)
|
|
|
|
except IOError as e:
|
|
logger.error(f"Error during saving: {e}")
|
|
|
|
def validate(self, msg):
|
|
for name in msg.annotation_names:
|
|
a = self.AnnotationName(self, name)
|
|
shutil.move(a.img_seed, a.img_data)
|
|
shutil.move(a.lbl_seed, a.lbl_data)
|
|
|
|
def delete(self, msg):
|
|
for name in msg.annotation_names:
|
|
a = self.AnnotationName(self, name)
|
|
if path.exists(a.img_data):
|
|
shutil.move(a.img_data, self.del_img_dir)
|
|
if path.exists(a.img_seed):
|
|
shutil.move(a.img_seed, self.del_img_dir)
|
|
|
|
if path.exists(a.lbl_data):
|
|
shutil.move(a.lbl_data, self.del_lbl_dir)
|
|
if path.exists(a.lbl_seed):
|
|
shutil.move(a.lbl_seed, self.del_lbl_dir)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
with asyncio.Runner() as runner:
|
|
runner.run(AnnotationQueueHandler().start())
|