move annotation_queue_handler to a separate repository

This commit is contained in:
Alex Bezdieniezhnykh
2025-05-22 16:50:59 +03:00
parent eecc11b1e5
commit ce29d7fc58
8 changed files with 15 additions and 6 deletions
+142
View File
@@ -0,0 +1,142 @@
import json
from datetime import datetime, timedelta
from enum import Enum
import msgpack
class WeatherMode(Enum):
Norm = 0
Wint = 20
Night = 40
class AnnotationClass:
def __init__(self, id, name, color):
self.id = id
self.name = name
self.color = color
color_str = color.lstrip('#')
self.opencv_color = (int(color_str[4:6], 16), int(color_str[2:4], 16), int(color_str[0:2], 16))
@staticmethod
def read_json():
with open('classes.json', 'r', encoding='utf-8') as f:
j = json.loads(f.read())
annotations_dict = {}
for mode in WeatherMode:
for cl in j:
id = mode.value + cl['Id']
name = cl['Name'] if mode.value == 0 else f'{cl["Name"]}({mode.name})'
annotations_dict[id] = AnnotationClass(id, name, cl['Color'])
return annotations_dict
annotation_classes = AnnotationClass.read_json()
class AnnotationStatus(Enum):
Created = 10
Edited = 20
Validated = 30
Deleted = 40
def __str__(self):
return self.name
class SourceEnum(Enum):
AI = 0
Manual = 1
class RoleEnum(Enum):
Operator = 10
Validator = 20
CompanionPC = 30
Admin = 40
ApiAdmin = 1000
def __str__(self):
return self.name
def is_validator(self) -> bool:
return self in {
self.__class__.Validator,
self.__class__.Admin,
self.__class__.ApiAdmin
}
class Detection:
def __init__(self, annotation_name, cls, x, y, w, h, confidence=None):
self.annotation_name = annotation_name
self.cls = cls
self.x = x
self.y = y
self.w = w
self.h = h
self.confidence = confidence
def __str__(self):
return f'{annotation_classes[self.cls].name}: {(self.confidence * 100):.1f}%'
class AnnotationCreatedMessageNarrow:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
self.name = unpacked_data.get(1)
self.createdEmail = unpacked_data.get(2)
class AnnotationMessage:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
ts = unpacked_data[0]
self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds/1000)
self.name = unpacked_data[1]
self.originalMediaName = unpacked_data[2]
self.time = timedelta(microseconds=unpacked_data[3]/10)
self.imageExtension = unpacked_data[4]
self.detections = self._parse_detections(unpacked_data[5])
self.image = unpacked_data[6]
self.createdRole = RoleEnum(unpacked_data[7])
self.createdEmail = unpacked_data[8]
self.source = SourceEnum(unpacked_data[9])
self.status = AnnotationStatus(unpacked_data[10])
def __str__(self):
detections_str = ""
if self.detections:
detections_str_list = [str(detection) for detection in self.detections]
detections_str = " [" + ", ".join(detections_str_list) + "]"
createdBy = 'AI' if self.source == SourceEnum.AI else self.createdRole
return f'{self.status} {self.name} by [{createdBy} {self.createdEmail}|{self.createdDate:%m-%d %H:%M}]{detections_str}'
@staticmethod
def _parse_detections(detections_json_str):
if detections_json_str:
detections_list = json.loads(detections_json_str)
return [Detection(
d.get('an'),
d.get('cl'),
d.get('x'),
d.get('y'),
d.get('w'),
d.get('h'),
d.get('p')
) for d in detections_list]
return []
class AnnotationBulkMessage:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
self.annotation_names = unpacked_data[0]
self.annotation_status = AnnotationStatus(unpacked_data[1])
self.createdEmail = unpacked_data[2]
ts = unpacked_data[3]
self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds / 1000)
def __str__(self):
return f'{self.annotation_status}: [{self.annotation_names}] by [{self.createdEmail}|{self.createdDate:%m-%d %H:%M}]'
@@ -0,0 +1,170 @@
import os
import shutil
import yaml
import asyncio
from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
from rstream.amqp import amqp_decoder
from os import path, makedirs
from datetime import datetime, timedelta
import logging
from logging.handlers import TimedRotatingFileHandler
from annotation_queue_dto import AnnotationStatus, AnnotationMessage, AnnotationBulkMessage
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
makedirs('logs', exist_ok=True)
handler = TimedRotatingFileHandler(
path.join("logs", '{:%Y-%m-%d}.log'.format(datetime.now())),
when="midnight",
interval=1,
backupCount=7,
encoding="utf-8"
)
formatter = logging.Formatter('%(asctime)s|%(message)s', "%H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
class AnnotationQueueHandler:
CONFIG_FILE = 'config.yaml'
OFFSET_FILE = 'offset.yaml'
QUEUE_ANNOTATION_STATUS_RECORD = b'AnnotationStatus'
QUEUE_EMAIL_RECORD = b'Email'
JPG_EXT = '.jpg'
TXT_EXT = '.txt'
class AnnotationName:
def __init__(self, h, name):
self.img_data = path.join(h.data_dir, h.images_dir, f"{name}{h.JPG_EXT}")
self.lbl_data = path.join(h.data_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
self.img_seed = path.join(h.data_seed_dir, h.images_dir, f"{name}{h.JPG_EXT}")
self.lbl_seed = path.join(h.data_seed_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
def __init__(self):
with open(self.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
root = config_dict['dirs']['root']
self.data_dir = path.join(root, config_dict['dirs']['data'])
self.data_seed_dir = path.join(root, config_dict['dirs']['data_seed'])
self.images_dir = config_dict['dirs']['images']
self.labels_dir = config_dict['dirs']['labels']
self.del_img_dir = path.join(root, config_dict['dirs']['data_deleted'], self.images_dir)
self.del_lbl_dir = path.join(root, config_dict['dirs']['data_deleted'], self.labels_dir)
makedirs(path.join(self.data_dir, self.images_dir), exist_ok=True)
makedirs(path.join(self.data_dir, self.labels_dir), exist_ok=True)
makedirs(path.join(self.data_seed_dir, self.images_dir), exist_ok=True)
makedirs(path.join(self.data_seed_dir, self.labels_dir), exist_ok=True)
makedirs(self.del_img_dir, exist_ok=True)
makedirs(self.del_lbl_dir, exist_ok=True)
self.consumer = Consumer(
host=config_dict['queue']['host'],
port=config_dict['queue']['port'],
username=config_dict['queue']['consumer_user'],
password=config_dict['queue']['consumer_pw']
)
self.queue_name = config_dict['queue']['name']
try:
with open(self.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
self.offset_queue = offset_dict['offset_queue']
except (FileNotFoundError, ValueError):
with open(self.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0'])
self.offset_queue = 0
def on_message(self, message: AMQPMessage, context: MessageContext):
try:
str_status = message.application_properties[bytes(self.QUEUE_ANNOTATION_STATUS_RECORD)].decode('utf-8')
ann_status = AnnotationStatus[str_status]
match ann_status:
case AnnotationStatus.Created | AnnotationStatus.Edited:
annotation_message = AnnotationMessage(message.body)
self.save_annotation(annotation_message)
logger.info(f'{annotation_message}')
case AnnotationStatus.Validated:
bulk_validate_message = AnnotationBulkMessage(message.body)
bulk_validate_message.annotation_status = AnnotationStatus.Validated
self.validate(bulk_validate_message)
logger.info(f'{bulk_validate_message}')
case AnnotationStatus.Deleted:
bulk_delete_message = AnnotationBulkMessage(message.body)
bulk_delete_message.annotation_status = AnnotationStatus.Deleted
self.delete(bulk_delete_message)
logger.info(f'{bulk_delete_message}')
self.offset_queue = context.offset + 1
with open(self.OFFSET_FILE, 'w') as offset_file:
yaml.dump({'offset_queue': self.offset_queue}, offset_file)
except Exception as e:
logger.error(e)
async def start(self):
await self.consumer.start()
await self.consumer.subscribe(stream=self.queue_name,
callback=self.on_message,
decoder=amqp_decoder,
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue))
try:
logger.info(f'Start reading queue from {self.offset_queue}')
await self.consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Closing Consumer...")
return
def save_annotation(self, ann):
a = self.AnnotationName(self, ann.name)
is_val = ann.createdRole.is_validator()
try:
# save label anyway
lbl = a.lbl_data if is_val else a.lbl_seed
with open(lbl, 'w') as label_file:
label_file.writelines([
f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}'
for detection in ann.detections
])
if ann.status == AnnotationStatus.Created:
img = a.img_data if is_val else a.img_seed
with open(img, 'wb') as image_file:
image_file.write(ann.image)
else:
if is_val and path.exists(a.img_seed):
shutil.move(a.img_seed, a.img_data)
if is_val and path.exists(a.lbl_seed):
os.remove(a.lbl_seed)
except IOError as e:
logger.error(f"Error during saving: {e}")
def validate(self, msg):
for name in msg.annotation_names:
a = self.AnnotationName(self, name)
shutil.move(a.img_seed, a.img_data)
shutil.move(a.lbl_seed, a.lbl_data)
def delete(self, msg):
for name in msg.annotation_names:
a = self.AnnotationName(self, name)
if path.exists(a.img_data):
shutil.move(a.img_data, self.del_img_dir)
if path.exists(a.img_seed):
shutil.move(a.img_seed, self.del_img_dir)
if path.exists(a.lbl_data):
shutil.move(a.lbl_data, self.del_lbl_dir)
if path.exists(a.lbl_seed):
shutil.move(a.lbl_seed, self.del_lbl_dir)
if __name__ == '__main__':
with asyncio.Runner() as runner:
runner.run(AnnotationQueueHandler().start())
+19
View File
@@ -0,0 +1,19 @@
[
{ "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#ff0000" },
{ "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00ff00" },
{ "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000ff" },
{ "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#ffff00" },
{ "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#ff00ff" },
{ "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00ffff" },
{ "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" },
{ "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" },
{ "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" },
{ "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" },
{ "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#a52a2a" },
{ "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" },
{ "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#87ceeb" },
{ "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" },
{ "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" },
{ "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" },
{ "Id": 16, "Name": "Caponier", "ShortName": "Капонір", "Color": "#ffa500" }
]
+20
View File
@@ -0,0 +1,20 @@
api:
url: 'https://api.azaion.com'
email: 'uploader@azaion.com'
password: 'Az@1on_10Upl0@der'
queue:
host: '188.245.120.247'
port: 5552
consumer_user: 'azaion_receiver'
consumer_pw: 'Az1onRecce777ve2r'
name: 'azaion-annotations'
dirs:
root: '/azaion'
data: 'data'
data_seed: 'data-seed'
data_processed: 'data-processed'
data_deleted: 'data_deleted'
images: 'images'
labels: 'labels'
+1
View File
@@ -0,0 +1 @@
offset_queue: 0
+3
View File
@@ -0,0 +1,3 @@
yaml
msgpack
rstream
+7
View File
@@ -0,0 +1,7 @@
if [ ! -d "venv" ]; then
python3 -m venv venv
fi
venv/bin/python -m pip install --upgrade pip
venv/bin/pip install -r requirements.txt
venv/bin/pip install --upgrade pyinstaller pyinstaller-hooks-contrib