Add core functionality for API client, CDN management, and data augmentation

- Introduced `ApiClient` for handling API interactions, including file uploads and downloads.
- Implemented `CDNManager` for managing CDN operations with AWS S3.
- Added `Augmentator` class for image augmentation, including bounding box corrections and transformations.
- Created utility functions for annotation conversion and dataset visualization.
- Established a new rules file for sound notifications during human input requests.

These additions enhance the system's capabilities for data handling and user interaction, laying the groundwork for future features.

Simplify autopilot state file to minimal current-step pointer; add execution safety rule to cursor-meta; remove Completed Steps/Key Decisions/Retry Log/Blockers from state template and all references.
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-28 00:12:54 +02:00
parent 142c6c4de8
commit c20018745b
31 changed files with 0 additions and 0 deletions
@@ -0,0 +1,142 @@
import json
from datetime import datetime, timedelta
from enum import Enum
import msgpack
class WeatherMode(Enum):
Norm = 0
Wint = 20
Night = 40
class AnnotationClass:
def __init__(self, id, name, color):
self.id = id
self.name = name
self.color = color
color_str = color.lstrip('#')
self.opencv_color = (int(color_str[4:6], 16), int(color_str[2:4], 16), int(color_str[0:2], 16))
@staticmethod
def read_json():
with open('classes.json', 'r', encoding='utf-8') as f:
j = json.loads(f.read())
annotations_dict = {}
for mode in WeatherMode:
for cl in j:
id = mode.value + cl['Id']
name = cl['Name'] if mode.value == 0 else f'{cl["Name"]}({mode.name})'
annotations_dict[id] = AnnotationClass(id, name, cl['Color'])
return annotations_dict
annotation_classes = AnnotationClass.read_json()
class AnnotationStatus(Enum):
Created = 10
Edited = 20
Validated = 30
Deleted = 40
def __str__(self):
return self.name
class SourceEnum(Enum):
AI = 0
Manual = 1
class RoleEnum(Enum):
Operator = 10
Validator = 20
CompanionPC = 30
Admin = 40
ApiAdmin = 1000
def __str__(self):
return self.name
def is_validator(self) -> bool:
return self in {
self.__class__.Validator,
self.__class__.Admin,
self.__class__.ApiAdmin
}
class Detection:
def __init__(self, annotation_name, cls, x, y, w, h, confidence=None):
self.annotation_name = annotation_name
self.cls = cls
self.x = x
self.y = y
self.w = w
self.h = h
self.confidence = confidence
def __str__(self):
return f'{annotation_classes[self.cls].name}: {(self.confidence * 100):.1f}%'
class AnnotationCreatedMessageNarrow:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
self.name = unpacked_data.get(1)
self.createdEmail = unpacked_data.get(2)
class AnnotationMessage:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
ts = unpacked_data[0]
self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds/1000)
self.name = unpacked_data[1]
self.originalMediaName = unpacked_data[2]
self.time = timedelta(microseconds=unpacked_data[3]/10)
self.imageExtension = unpacked_data[4]
self.detections = self._parse_detections(unpacked_data[5])
self.image = unpacked_data[6]
self.createdRole = RoleEnum(unpacked_data[7])
self.createdEmail = unpacked_data[8]
self.source = SourceEnum(unpacked_data[9])
self.status = AnnotationStatus(unpacked_data[10])
def __str__(self):
detections_str = ""
if self.detections:
detections_str_list = [str(detection) for detection in self.detections]
detections_str = " [" + ", ".join(detections_str_list) + "]"
createdBy = 'AI' if self.source == SourceEnum.AI else self.createdRole
return f'{self.status} {self.name} by [{createdBy} {self.createdEmail}|{self.createdDate:%m-%d %H:%M}]{detections_str}'
@staticmethod
def _parse_detections(detections_json_str):
if detections_json_str:
detections_list = json.loads(detections_json_str)
return [Detection(
d.get('an'),
d.get('cl'),
d.get('x'),
d.get('y'),
d.get('w'),
d.get('h'),
d.get('p')
) for d in detections_list]
return []
class AnnotationBulkMessage:
def __init__(self, msgpack_bytes):
unpacked_data = msgpack.unpackb(msgpack_bytes, strict_map_key=False)
self.annotation_names = unpacked_data[0]
self.annotation_status = AnnotationStatus(unpacked_data[1])
self.createdEmail = unpacked_data[2]
ts = unpacked_data[3]
self.createdDate = datetime.utcfromtimestamp(ts.seconds) + timedelta(microseconds=ts.nanoseconds / 1000)
def __str__(self):
return f'{self.annotation_status}: [{self.annotation_names}] by [{self.createdEmail}|{self.createdDate:%m-%d %H:%M}]'
@@ -0,0 +1,173 @@
import os
import shutil
import sys
import yaml
import asyncio
from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
from rstream.amqp import amqp_decoder
from os import path, makedirs
from datetime import datetime, timedelta
import logging
from logging.handlers import TimedRotatingFileHandler
from annotation_queue_dto import AnnotationStatus, AnnotationMessage, AnnotationBulkMessage
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
makedirs('logs', exist_ok=True)
handler = TimedRotatingFileHandler(
path.join("logs", '{:%Y-%m-%d}.log'.format(datetime.now())),
when="midnight",
interval=1,
backupCount=7,
encoding="utf-8"
)
formatter = logging.Formatter('%(asctime)s|%(message)s', "%H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler(sys.stdout))
class AnnotationQueueHandler:
CONFIG_FILE = 'config.yaml'
OFFSET_FILE = 'offset.yaml'
QUEUE_ANNOTATION_STATUS_RECORD = b'AnnotationStatus'
QUEUE_EMAIL_RECORD = b'Email'
JPG_EXT = '.jpg'
TXT_EXT = '.txt'
class AnnotationName:
def __init__(self, h, name):
self.img_data = path.join(h.data_dir, h.images_dir, f"{name}{h.JPG_EXT}")
self.lbl_data = path.join(h.data_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
self.img_seed = path.join(h.data_seed_dir, h.images_dir, f"{name}{h.JPG_EXT}")
self.lbl_seed = path.join(h.data_seed_dir, h.labels_dir, f"{name}{h.TXT_EXT}")
def __init__(self):
with open(self.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
root = config_dict['dirs']['root']
self.data_dir = path.join(root, config_dict['dirs']['data'])
self.data_seed_dir = path.join(root, config_dict['dirs']['data_seed'])
self.images_dir = config_dict['dirs']['images']
self.labels_dir = config_dict['dirs']['labels']
self.del_img_dir = path.join(root, config_dict['dirs']['data_deleted'], self.images_dir)
self.del_lbl_dir = path.join(root, config_dict['dirs']['data_deleted'], self.labels_dir)
makedirs(path.join(self.data_dir, self.images_dir), exist_ok=True)
makedirs(path.join(self.data_dir, self.labels_dir), exist_ok=True)
makedirs(path.join(self.data_seed_dir, self.images_dir), exist_ok=True)
makedirs(path.join(self.data_seed_dir, self.labels_dir), exist_ok=True)
makedirs(self.del_img_dir, exist_ok=True)
makedirs(self.del_lbl_dir, exist_ok=True)
self.consumer = Consumer(
host=config_dict['queue']['host'],
port=config_dict['queue']['port'],
username=config_dict['queue']['consumer_user'],
password=config_dict['queue']['consumer_pw']
)
self.queue_name = config_dict['queue']['name']
try:
with open(self.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
self.offset_queue = offset_dict['offset_queue']
except (FileNotFoundError, ValueError):
with open(self.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0'])
self.offset_queue = 0
def on_message(self, message: AMQPMessage, context: MessageContext):
try:
str_status = message.application_properties[bytes(self.QUEUE_ANNOTATION_STATUS_RECORD)].decode('utf-8')
ann_status = AnnotationStatus[str_status]
match ann_status:
case AnnotationStatus.Created | AnnotationStatus.Edited:
annotation_message = AnnotationMessage(message.body)
self.save_annotation(annotation_message)
logger.info(f'{annotation_message}')
case AnnotationStatus.Validated:
bulk_validate_message = AnnotationBulkMessage(message.body)
bulk_validate_message.annotation_status = AnnotationStatus.Validated
self.validate(bulk_validate_message)
logger.info(f'{bulk_validate_message}')
case AnnotationStatus.Deleted:
bulk_delete_message = AnnotationBulkMessage(message.body)
bulk_delete_message.annotation_status = AnnotationStatus.Deleted
self.delete(bulk_delete_message)
logger.info(f'{bulk_delete_message}')
self.offset_queue = context.offset + 1
with open(self.OFFSET_FILE, 'w') as offset_file:
yaml.dump({'offset_queue': self.offset_queue}, offset_file)
except Exception as e:
logger.error(e)
async def start(self):
await self.consumer.start()
await self.consumer.subscribe(stream=self.queue_name,
callback=self.on_message,
decoder=amqp_decoder,
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue))
try:
logger.info(f'Start reading queue from {self.offset_queue}')
await self.consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Closing Consumer...")
return
def save_annotation(self, ann):
a = self.AnnotationName(self, ann.name)
is_val = ann.createdRole.is_validator()
try:
# save label anyway
lbl = a.lbl_data if is_val else a.lbl_seed
with open(lbl, 'w') as label_file:
label_file.writelines([
f'{detection.cls} {detection.x} {detection.y} {detection.w} {detection.h}'
for detection in ann.detections
])
if ann.status == AnnotationStatus.Created:
img = a.img_data if is_val else a.img_seed
with open(img, 'wb') as image_file:
image_file.write(ann.image)
else:
if is_val and path.exists(a.img_seed):
shutil.move(a.img_seed, a.img_data)
if is_val and path.exists(a.lbl_seed):
os.remove(a.lbl_seed)
except IOError as e:
logger.error(f"Error during saving: {e}")
def validate(self, msg):
for name in msg.annotation_names:
a = self.AnnotationName(self, name)
shutil.move(a.img_seed, a.img_data)
shutil.move(a.lbl_seed, a.lbl_data)
def delete(self, msg):
for name in msg.annotation_names:
a = self.AnnotationName(self, name)
if path.exists(a.img_data):
shutil.move(a.img_data, self.del_img_dir)
if path.exists(a.img_seed):
shutil.move(a.img_seed, self.del_img_dir)
if path.exists(a.lbl_data):
shutil.move(a.lbl_data, self.del_lbl_dir)
if path.exists(a.lbl_seed):
shutil.move(a.lbl_seed, self.del_lbl_dir)
if __name__ == '__main__':
with asyncio.Runner() as runner:
runner.run(AnnotationQueueHandler().start())
+19
View File
@@ -0,0 +1,19 @@
[
{ "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#ff0000" },
{ "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00ff00" },
{ "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000ff" },
{ "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#ffff00" },
{ "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#ff00ff" },
{ "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00ffff" },
{ "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" },
{ "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" },
{ "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" },
{ "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" },
{ "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#a52a2a" },
{ "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" },
{ "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#87ceeb" },
{ "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" },
{ "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" },
{ "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" },
{ "Id": 16, "Name": "Caponier", "ShortName": "Капонір", "Color": "#ffa500" }
]
+20
View File
@@ -0,0 +1,20 @@
api:
url: 'https://api.azaion.com'
email: 'uploader@azaion.com'
password: 'Az@1on_10Upl0@der'
queue:
host: '188.245.120.247'
port: 5552
consumer_user: 'azaion_receiver'
consumer_pw: 'Az1onRecce777ve2r'
name: 'azaion-annotations'
dirs:
root: '/azaion'
data: 'data-test'
data_seed: 'data-test-seed'
data_processed: 'data-test-processed'
data_deleted: 'data-test_deleted'
images: 'images'
labels: 'labels'
+1
View File
@@ -0,0 +1 @@
offset_queue: 1
+3
View File
@@ -0,0 +1,3 @@
pyyaml
msgpack
rstream
+7
View File
@@ -0,0 +1,7 @@
if [ ! -d "venv" ]; then
python3 -m venv venv
fi
venv/bin/python -m pip install --upgrade pip
venv/bin/pip install -r requirements.txt
venv/bin/python annotation_queue_handler.py
+128
View File
@@ -0,0 +1,128 @@
import io
import json
import os
from http import HTTPStatus
from os import path
import requests
import yaml
import constants
from cdn_manager import CDNCredentials, CDNManager
from hardware_service import get_hardware_info
from security import Security
class ApiCredentials:
def __init__(self, url, email, password):
self.url = url
self.email = email
self.password = password
class ApiClient:
def __init__(self):
self.token = None
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
api_c = config_dict["api"]
self.credentials = ApiCredentials(api_c["url"],
api_c["email"],
api_c["password"])
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, '')
data = yaml.safe_load(io.BytesIO(yaml_bytes))
creds = CDNCredentials(data["host"],
data["downloader_access_key"],
data["downloader_access_secret"],
data["uploader_access_key"],
data["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
def login(self):
response = requests.post(f'{self.credentials.url}/login',
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.token = token
def upload_file(self, filename: str, file_bytes: bytearray, folder):
if self.token is None:
self.login()
url = f"{self.credentials.url}/resources/{folder}"
headers = {"Authorization": f"Bearer {self.token}"}
files = {'data': (filename, io.BytesIO(file_bytes))}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
print(f"Upload {len(file_bytes)} bytes ({filename}) to {self.credentials.url}. Result: {r.status_code}")
except Exception as e:
print(f"Upload fail: {e}")
def load_bytes(self, filename, folder):
hardware_str = get_hardware_info()
if self.token is None:
self.login()
url = f"{self.credentials.url}/resources/get/{folder}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware_str,
"fileName": filename
}, indent=4)
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
print('500!')
hw_hash = Security.get_hw_hash(hardware_str)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
print(f'Downloaded file: {filename}, {len(data)} bytes')
return data
def load_big_small_resource(self, resource_name, folder, key):
big_part = path.join(folder, f'{resource_name}.big')
small_part = f'{resource_name}.small'
with open(big_part, 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
encrypted_bytes_small = self.load_bytes(small_part, folder)
encrypted_bytes = encrypted_bytes_small + encrypted_bytes_big
result = Security.decrypt_to(encrypted_bytes, key)
return result
def upload_big_small_resource(self, resource, resource_name, folder, key):
big_part_name = f'{resource_name}.big'
small_part_name = f'{resource_name}.small'
resource_encrypted = Security.encrypt_to(resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.2 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size] # slice bytes for part1
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(constants.MODELS_FOLDER, big_part_name, part_big)
os.makedirs(folder, exist_ok=True)
with open(path.join(folder, big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, constants.MODELS_FOLDER)
+152
View File
@@ -0,0 +1,152 @@
import concurrent.futures
import os.path
import shutil
import time
from datetime import datetime
from pathlib import Path
import albumentations as A
import cv2
import numpy as np
import constants
from dto.imageLabel import ImageLabel
class Augmentator:
def __init__(self):
self.total_files_processed = 0
self.total_images_to_process = 0
self.correct_margin = 0.0005
self.correct_min_bbox_size = 0.01
self.transform = A.Compose([
A.HorizontalFlip(p=0.6),
A.RandomBrightnessContrast(p=0.4, brightness_limit=(-0.3, 0.3), contrast_limit=(-0.05, 0.05)),
A.Affine(p=0.8, scale=(0.8, 1.2), rotate=(-35, 35), shear=(-10, 10)),
A.MotionBlur(p=0.1, blur_limit=(1, 2)),
A.HueSaturationValue(p=0.4, hue_shift_limit=10, sat_shift_limit=10, val_shift_limit=10)
], bbox_params=A.BboxParams(format='yolo'))
def correct_bboxes(self, labels):
res = []
for bboxes in labels:
x = bboxes[0]
y = bboxes[1]
half_width = 0.5*bboxes[2]
half_height = 0.5*bboxes[3]
# calc how much bboxes are outside borders ( +small margin ).
# value should be negative. If it's positive, then put 0, as no correction
w_diff = min((1 - self.correct_margin) - (x + half_width), (x - half_width) - self.correct_margin, 0)
w = bboxes[2] + 2*w_diff
if w < self.correct_min_bbox_size:
continue
h_diff = min((1 - self.correct_margin) - (y + half_height), ((y - half_height) - self.correct_margin), 0)
h = bboxes[3] + 2 * h_diff
if h < self.correct_min_bbox_size:
continue
res.append([x, y, w, h, bboxes[4]])
return res
pass
def augment_inner(self, img_ann: ImageLabel) -> [ImageLabel]:
results = []
labels = self.correct_bboxes(img_ann.labels)
if len(labels) == 0 and len(img_ann.labels) != 0:
print('no labels but was!!!')
results.append(ImageLabel(
image=img_ann.image,
labels=img_ann.labels,
image_path=os.path.join(constants.config.processed_images_dir, Path(img_ann.image_path).name),
labels_path=os.path.join(constants.config.processed_labels_dir, Path(img_ann.labels_path).name)
)
)
for i in range(7):
try:
res = self.transform(image=img_ann.image, bboxes=labels)
path = Path(img_ann.image_path)
name = f'{path.stem}_{i + 1}'
img = ImageLabel(
image=res['image'],
labels=res['bboxes'],
image_path=os.path.join(constants.config.processed_images_dir, f'{name}{path.suffix}'),
labels_path=os.path.join(constants.config.processed_labels_dir, f'{name}.txt')
)
results.append(img)
except Exception as e:
print(f'Error during transformation: {e}')
return results
def read_labels(self, labels_path) -> [[]]:
with open(labels_path, 'r') as f:
rows = f.readlines()
arr = []
for row in rows:
str_coordinates = row.split(' ')
class_num = str_coordinates.pop(0)
coordinates = [float(n.replace(',', '.')) for n in str_coordinates]
# noinspection PyTypeChecker
coordinates.append(class_num)
arr.append(coordinates)
return arr
def augment_annotation(self, image_file):
try:
image_path = os.path.join(constants.config.data_images_dir, image_file.name)
labels_path = os.path.join(constants.config.data_labels_dir, f'{Path(str(image_path)).stem}.txt')
image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
img_ann = ImageLabel(
image_path=image_path,
image=image,
labels_path=labels_path,
labels=self.read_labels(labels_path)
)
try:
results = self.augment_inner(img_ann)
for annotation in results:
cv2.imencode('.jpg', annotation.image)[1].tofile(annotation.image_path)
with open(annotation.labels_path, 'w') as f:
lines = [f'{l[4]} {round(l[0], 5)} {round(l[1], 5)} {round(l[2], 5)} {round(l[3], 5)}\n' for l in
annotation.labels]
f.writelines(lines)
f.close()
print(f'{datetime.now():{"%Y-%m-%d %H:%M:%S"}}: {self.total_files_processed + 1}/{self.total_images_to_process} : {image_file.name} has augmented')
except Exception as e:
print(e)
self.total_files_processed += 1
except Exception as e:
print(f'Error appeared in thread for {image_file.name}: {e}')
def augment_annotations(self, from_scratch=False):
self.total_files_processed = 0
if from_scratch:
shutil.rmtree(constants.config.processed_dir)
os.makedirs(constants.config.processed_images_dir, exist_ok=True)
os.makedirs(constants.config.processed_labels_dir, exist_ok=True)
processed_images = set(f.name for f in os.scandir(constants.config.processed_images_dir))
images = []
with os.scandir(constants.config.data_images_dir) as imd:
for image_file in imd:
if image_file.is_file() and image_file.name not in processed_images:
images.append(image_file)
self.total_images_to_process = len(images)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(self.augment_annotation, images)
if __name__ == '__main__':
augmentator = Augmentator()
while True:
augmentator.augment_annotations()
print('All processed, waiting for 5 minutes...')
time.sleep(300)
+44
View File
@@ -0,0 +1,44 @@
import io
import sys
import boto3
import yaml
import os
class CDNCredentials:
def __init__(self, host, downloader_access_key, downloader_access_secret, uploader_access_key, uploader_access_secret):
self.host = host
self.downloader_access_key = downloader_access_key
self.downloader_access_secret = downloader_access_secret
self.uploader_access_key = uploader_access_key
self.uploader_access_secret = uploader_access_secret
class CDNManager:
def __init__(self, credentials: CDNCredentials):
self.creds = credentials
self.download_client = boto3.client('s3', endpoint_url=self.creds.host,
aws_access_key_id=self.creds.downloader_access_key,
aws_secret_access_key=self.creds.downloader_access_secret)
self.upload_client = boto3.client('s3', endpoint_url=self.creds.host,
aws_access_key_id=self.creds.uploader_access_key,
aws_secret_access_key=self.creds.uploader_access_secret)
def upload(self, bucket: str, filename: str, file_bytes: bytearray):
try:
self.upload_client.upload_fileobj(io.BytesIO(file_bytes), bucket, filename)
print(f'uploaded {filename} ({len(file_bytes)} bytes) to the {bucket}')
return True
except Exception as e:
print(e)
return False
def download(self, bucket: str, filename: str):
try:
self.download_client.download_file(bucket, filename, filename)
print(f'downloaded {filename} from the {bucket} to current folder')
return True
except Exception as e:
print(e)
return False
+19
View File
@@ -0,0 +1,19 @@
[
{ "Id": 0, "Name": "ArmorVehicle", "ShortName": "Броня", "Color": "#ff0000" },
{ "Id": 1, "Name": "Truck", "ShortName": "Вантаж.", "Color": "#00ff00" },
{ "Id": 2, "Name": "Vehicle", "ShortName": "Машина", "Color": "#0000ff" },
{ "Id": 3, "Name": "Atillery", "ShortName": "Арта", "Color": "#ffff00" },
{ "Id": 4, "Name": "Shadow", "ShortName": "Тінь", "Color": "#ff00ff" },
{ "Id": 5, "Name": "Trenches", "ShortName": "Окопи", "Color": "#00ffff" },
{ "Id": 6, "Name": "MilitaryMan", "ShortName": "Військов", "Color": "#188021" },
{ "Id": 7, "Name": "TyreTracks", "ShortName": "Накати", "Color": "#800000" },
{ "Id": 8, "Name": "AdditArmoredTank", "ShortName": "Танк.захист", "Color": "#008000" },
{ "Id": 9, "Name": "Smoke", "ShortName": "Дим", "Color": "#000080" },
{ "Id": 10, "Name": "Plane", "ShortName": "Літак", "Color": "#a52a2a" },
{ "Id": 11, "Name": "Moto", "ShortName": "Мото", "Color": "#808000" },
{ "Id": 12, "Name": "CamouflageNet", "ShortName": "Сітка", "Color": "#87ceeb" },
{ "Id": 13, "Name": "CamouflageBranches", "ShortName": "Гілки", "Color": "#2f4f4f" },
{ "Id": 14, "Name": "Roof", "ShortName": "Дах", "Color": "#1e90ff" },
{ "Id": 15, "Name": "Building", "ShortName": "Будівля", "Color": "#ffb6c1" },
{ "Id": 16, "Name": "Caponier", "ShortName": "Капонір", "Color": "#ffa500" }
]
+118
View File
@@ -0,0 +1,118 @@
from os import path
import yaml
from pydantic import BaseModel
class DirsConfig(BaseModel):
root: str = '/azaion'
class TrainingConfig(BaseModel):
model: str = 'yolo11m.yaml'
epochs: int = 120
batch: int = 11
imgsz: int = 1280
save_period: int = 1
workers: int = 24
class ExportConfig(BaseModel):
onnx_imgsz: int = 1280
onnx_batch: int = 4
class Config(BaseModel):
dirs: DirsConfig = DirsConfig()
training: TrainingConfig = TrainingConfig()
export: ExportConfig = ExportConfig()
@property
def azaion(self) -> str:
return self.dirs.root
@property
def data_dir(self) -> str:
return path.join(self.dirs.root, 'data')
@property
def data_images_dir(self) -> str:
return path.join(self.data_dir, 'images')
@property
def data_labels_dir(self) -> str:
return path.join(self.data_dir, 'labels')
@property
def processed_dir(self) -> str:
return path.join(self.dirs.root, 'data-processed')
@property
def processed_images_dir(self) -> str:
return path.join(self.processed_dir, 'images')
@property
def processed_labels_dir(self) -> str:
return path.join(self.processed_dir, 'labels')
@property
def corrupted_dir(self) -> str:
return path.join(self.dirs.root, 'data-corrupted')
@property
def corrupted_images_dir(self) -> str:
return path.join(self.corrupted_dir, 'images')
@property
def corrupted_labels_dir(self) -> str:
return path.join(self.corrupted_dir, 'labels')
@property
def sample_dir(self) -> str:
return path.join(self.dirs.root, 'data-sample')
@property
def datasets_dir(self) -> str:
return path.join(self.dirs.root, 'datasets')
@property
def models_dir(self) -> str:
return path.join(self.dirs.root, 'models')
@property
def current_pt_model(self) -> str:
return path.join(self.models_dir, f'{prefix[:-1]}.pt')
@property
def current_onnx_model(self) -> str:
return path.join(self.models_dir, f'{prefix[:-1]}.onnx')
@classmethod
def from_yaml(cls, config_file: str, root: str = None) -> 'Config':
try:
with open(config_file) as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError:
data = {}
if root is not None:
data.setdefault('dirs', {})['root'] = root
return cls(**data)
prefix = 'azaion-'
date_format = '%Y-%m-%d'
checkpoint_file = 'checkpoint.txt'
checkpoint_date_format = '%Y-%m-%d %H:%M:%S'
CONFIG_FILE = 'config.yaml'
JPG_EXT = '.jpg'
TXT_EXT = '.txt'
OFFSET_FILE = 'offset.yaml'
SMALL_SIZE_KB = 3
CDN_CONFIG = 'cdn.yaml'
MODELS_FOLDER = 'models'
config: Config = Config.from_yaml(CONFIG_FILE)
+119
View File
@@ -0,0 +1,119 @@
import os
import shutil
import xml.etree.cElementTree as et
from pathlib import Path
import cv2
tag_size = 'size'
tag_object = 'object'
tag_name = 'name'
tag_bndbox = 'bndbox'
name_class_map = {'Truck': 1, 'Car': 2, 'Taxi': 2} # 1 Вантажівка, 2 Машина легкова
forbidden_classes = ['Motorcycle']
default_class = 1
image_extensions = ['jpg', 'png', 'jpeg']
def convert(folder, dest_folder, read_annotations, ann_format):
dest_images_dir = os.path.join(dest_folder, 'images')
dest_labels_dir = os.path.join(dest_folder, 'labels')
os.makedirs(dest_images_dir, exist_ok=True)
os.makedirs(dest_labels_dir, exist_ok=True)
for f in os.listdir(folder):
if not f[-3:] in image_extensions:
continue
im = cv2.imread(os.path.join(folder, f))
height = im.shape[0]
width = im.shape[1]
label = f'{Path(f).stem}.{ann_format}'
try:
with open(os.path.join(folder, label), 'r') as label_file:
text = label_file.read()
lines = read_annotations(width, height, text)
except ValueError as val_err:
print(f'Image {f} annotations could not be converted. Error: {val_err}')
continue
except Exception as e:
print(f'Error conversion for {f}. Error: {e}')
shutil.copy(os.path.join(folder, f), os.path.join(dest_images_dir, f))
with open(os.path.join(dest_labels_dir, f'{Path(label).stem}.txt'), 'w') as new_label_file:
new_label_file.writelines(lines)
new_label_file.close()
print(f'Image {f} has been processed successfully')
def minmax2yolo(width, height, xmin, xmax, ymin, ymax):
c_w = (xmax - xmin) / width
c_h = (ymax - ymin) / height
c_x = xmin / width + c_w / 2
c_y = ymin / height + c_h / 2
return round(c_x, 5), round(c_y, 5), round(c_w, 5), round(c_h, 5)
def read_pascal_voc(width, height, s):
root = et.fromstring(s)
lines = []
for node_object in root.findall(tag_object):
class_num = default_class
c_x = c_y = c_w = c_h = 0
for node_object_ch in node_object:
if node_object_ch.tag == tag_name:
key = node_object_ch.text
if key in name_class_map:
class_num = name_class_map[key]
else:
if key in forbidden_classes:
class_num = -1
continue
else:
class_num = default_class
if node_object_ch.tag == tag_bndbox:
bbox_dict = {bbox_ch.tag: bbox_ch.text for bbox_ch in node_object_ch}
c_x, c_y, c_w, c_h = minmax2yolo(width, height,
int(bbox_dict['xmin']),
int(bbox_dict['xmax']),
int(bbox_dict['ymin']),
int(bbox_dict['ymax']))
if class_num == -1:
continue
if c_x > 1 or c_y > 1 or c_w > 1 or c_h > 1:
print('Values are out of bounds')
else:
if c_x != 0 and c_y != 0 and c_w != 0 and c_h != 0:
lines.append(f'{class_num} {c_x} {c_y} {c_w} {c_h}\n')
return lines
def read_bbox_oriented(width, height, s):
yolo_lines = []
lines = s.split('\n', )
for line in lines:
if line == '':
continue
vals = line.split(' ')
if len(vals) != 14:
raise ValueError('wrong format')
xmin = min(int(vals[6]), int(vals[7]), int(vals[8]), int(vals[9]))
xmax = max(int(vals[6]), int(vals[7]), int(vals[8]), int(vals[9]))
ymin = min(int(vals[10]), int(vals[11]), int(vals[12]), int(vals[13]))
ymax = max(int(vals[10]), int(vals[11]), int(vals[12]), int(vals[13]))
c_x, c_y, c_w, c_h = minmax2yolo(width, height, xmin, xmax, ymin, ymax)
if c_x > 1 or c_y > 1 or c_w > 1 or c_h > 1:
print('Values are out of bounds')
else:
yolo_lines.append(f'2 {c_x} {c_y} {c_w} {c_h}\n')
return yolo_lines
def rename_images(folder):
for f in os.listdir(folder):
shutil.move(os.path.join(folder, f), os.path.join(folder, f[:-7] + '.png'))
if __name__ == '__main__':
convert('/azaion/datasets/others/cars_height', '/azaion/datasets/converted', read_bbox_oriented, 'txt')
convert('/azaion/datasets/others/cars', '/azaion/datasets/converted', read_pascal_voc, 'xml')
+52
View File
@@ -0,0 +1,52 @@
import os
from pathlib import Path
import cv2
from dto.annotationClass import AnnotationClass
from dto.imageLabel import ImageLabel
from preprocessing import read_labels
from matplotlib import pyplot as plt
import constants
annotation_classes = AnnotationClass.read_json()
def visualise_dataset():
cur_dataset = os.path.join(constants.config.datasets_dir, f'{constants.prefix}2024-06-18', 'train')
images_dir = os.path.join(cur_dataset, 'images')
labels_dir = os.path.join(cur_dataset, 'labels')
for f in os.listdir(images_dir)[35247:]:
image_path = os.path.join(images_dir, f)
labels_path = os.path.join(labels_dir, f'{Path(f).stem}.txt')
img = ImageLabel(
image_path=image_path,
image=cv2.imread(image_path),
labels_path=labels_path,
labels=read_labels(labels_path)
)
img.visualize(annotation_classes)
print(f'visualizing {image_path}')
plt.close()
key = input('Press any key to continue')
def visualise_processed_folder():
def show_image(img):
image_path = os.path.join(constants.config.processed_images_dir, img)
labels_path = os.path.join(constants.config.processed_labels_dir, f'{Path(img).stem}.txt')
img = ImageLabel(
image_path=image_path,
image=cv2.imread(image_path),
labels_path=labels_path,
labels=read_labels(labels_path)
)
img.visualize(annotation_classes)
images = os.listdir(constants.config.processed_images_dir)
cur = 0
show_image(images[cur])
pass
if __name__ == '__main__':
visualise_processed_folder()
+36
View File
@@ -0,0 +1,36 @@
import json
from enum import Enum
from os.path import dirname, join
class WeatherMode(Enum):
Norm = 0
Wint = 20
Night = 40
class AnnotationClass:
def __init__(self, id, name, color):
self.id = id
self.name = name
self.color = color
@staticmethod
def read_json():
classes_path = join(dirname(dirname(__file__)), 'classes.json')
with open(classes_path, 'r', encoding='utf-8') as f:
j = json.loads(f.read())
annotations_dict = {}
for mode in WeatherMode:
for cl in j:
id = mode.value + cl['Id']
name = cl['Name'] if mode.value == 0 else f'{cl["Name"]}({mode.name})'
annotations_dict[id] = AnnotationClass(id, name, cl['Color'])
return annotations_dict
@property
def color_tuple(self):
color = self.color[3:]
lv = len(color)
xx = range(0, lv, lv // 3)
return tuple(int(color[i:i + lv // 3], 16) for i in xx)
View File
View File
+32
View File
@@ -0,0 +1,32 @@
import cv2
from matplotlib import pyplot as plt
class ImageLabel:
def __init__(self, image_path, image, labels_path, labels):
self.image_path = image_path
self.image = image
self.labels_path = labels_path
self.labels = labels
def visualize(self, annotation_classes):
img = cv2.cvtColor(self.image.copy(), cv2.COLOR_BGR2RGB)
height, width, channels = img.shape
for label in self.labels:
class_num = int(label[-1])
x_c = float(label[0])
y_c = float(label[1])
w = float(label[2])
h = float(label[3])
x_min = x_c - w / 2
y_min = y_c - h / 2
x_max = x_min + w
y_max = y_min + h
color = annotation_classes[class_num].color_tuple
cv2.rectangle(img, (int(x_min * width), int(y_min * height)), (int(x_max * width), int(y_max * height)),
color=color, thickness=3)
plt.figure(figsize=(12, 12))
plt.axis('off')
plt.imshow(img)
plt.show()
+118
View File
@@ -0,0 +1,118 @@
import os
import shutil
from os import path, scandir, makedirs
from pathlib import Path
import random
import netron
import yaml
from ultralytics import YOLO
import constants
from api_client import ApiClient, ApiCredentials
from cdn_manager import CDNManager, CDNCredentials
from security import Security
from utils import Dotdict
def export_rknn(model_path):
model = YOLO(model_path)
model.export(format="rknn", name="rk3588", simplify=True)
model_stem = Path(model_path).stem
folder_name = f'{model_stem}_rknn_model'
shutil.move(path.join(folder_name, f'{Path(model_path).stem}-rk3588.rknn'), f'{model_stem}.rknn')
shutil.rmtree(folder_name)
pass
def export_onnx(model_path, batch_size=None):
if batch_size is None:
batch_size = constants.config.export.onnx_batch
model = YOLO(model_path)
onnx_path = Path(model_path).stem + '.onnx'
if path.exists(onnx_path):
os.remove(onnx_path)
model.export(
format="onnx",
imgsz=constants.config.export.onnx_imgsz,
batch=batch_size,
simplify=True,
nms=True,
)
def export_coreml(model_path):
model = YOLO(model_path)
model.export(
format="coreml",
imgsz=constants.config.export.onnx_imgsz,
)
def export_tensorrt(model_path):
YOLO(model_path).export(
format='engine',
batch=4,
half=True,
simplify=True,
nms=True
)
def form_data_sample(destination_path, size=500, write_txt_log=False):
images = []
with scandir(constants.config.processed_images_dir) as imd:
for image_file in imd:
if not image_file.is_file():
continue
images.append(image_file)
print('shuffling images')
random.shuffle(images)
images = images[:size]
shutil.rmtree(destination_path, ignore_errors=True)
makedirs(destination_path, exist_ok=True)
lines = []
for image in images:
shutil.copy(image.path, path.join(destination_path, image.name))
lines.append(f'./{image.name}')
if write_txt_log:
with open(path.join(destination_path, 'azaion_subset.txt'), 'w', encoding='utf-8') as f:
f.writelines([f'{line}\n' for line in lines])
def show_model(model: str = None):
netron.start(model)
def upload_model(model_path: str, filename: str, size_small_in_kb: int=3):
with open(model_path, 'rb') as f_in:
model_bytes = f_in.read()
key = Security.get_model_encryption_key()
model_encrypted = Security.encrypt_to(model_bytes, key)
part1_size = min(size_small_in_kb * 1024, int(0.3 * len(model_encrypted)))
model_part_small = model_encrypted[:part1_size] # slice bytes for part1
model_part_big = model_encrypted[part1_size:]
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
d_config = Dotdict(config_dict)
api_c = Dotdict(d_config.api)
api = ApiClient(ApiCredentials(api_c.url, api_c.user, api_c.pw, api_c.folder))
yaml_bytes = api.load_bytes(constants.CDN_CONFIG, '')
data = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(data["host"],
data["downloader_access_key"],
data["downloader_access_secret"],
data["uploader_access_key"],
data["uploader_access_secret"])
cdn_manager = CDNManager(creds)
api.upload_file(f'{filename}.small', model_part_small, constants.MODELS_FOLDER)
cdn_manager.upload(constants.MODELS_FOLDER, f'{filename}.big', model_part_big)
+35
View File
@@ -0,0 +1,35 @@
import os
import subprocess
def get_hardware_info():
if os.name == 'nt': # windows
os_command = (
"powershell -Command \""
"Get-CimInstance -ClassName Win32_Processor | Select-Object -ExpandProperty Name | Write-Output; "
"Get-CimInstance -ClassName Win32_VideoController | Select-Object -ExpandProperty Name | Write-Output; "
"Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object -ExpandProperty TotalVisibleMemorySize | Write-Output; "
"(Get-Disk | Where-Object {$_.IsSystem -eq $true}).SerialNumber"
"\""
)
else:
os_command = (
"lscpu | grep 'Model name:' | cut -d':' -f2 && "
"lspci | grep VGA | cut -d':' -f3 && "
"free -k | awk '/^Mem:/ {print $2}' && "
"cat /sys/block/sda/device/vpd_pg80 2>/dev/null || cat /sys/block/sda/device/serial 2>/dev/null"
)
result = subprocess.check_output(os_command, shell=True).decode('utf-8', errors='ignore')
lines = [line.replace(" ", " ").replace("Name=", "").strip('\x00\x14 \t\n\r\v\f') for line in result.splitlines() if line.strip()]
cpu = lines[0]
gpu = lines[1]
# could be multiple gpus, that's why take memory and drive from the 2 last lines
len_lines = len(lines)
memory = lines[len_lines - 2].replace("TotalVisibleMemorySize=", "")
drive_serial = lines[len_lines - 1]
res = f'CPU: {cpu}. GPU: {gpu}. Memory: {memory}. DriveSerial: {drive_serial}'
return res
View File
+63
View File
@@ -0,0 +1,63 @@
import json
from enum import Enum
from os.path import join, dirname
class Detection:
def __init__(self, x, y, w, h, cls, confidence):
self.x = x
self.y = y
self.w = w
self.h = h
self.cls = cls
self.confidence = confidence
def overlaps(self, det2, iou_threshold):
overlap_x = 0.5 * (self.w + det2.w) - abs(self.x - det2.x)
overlap_y = 0.5 * (self.h + det2.h) - abs(self.y - det2.y)
intersection = max(0, overlap_x) * max(0, overlap_y)
union = self.w * self.h + det2.w * det2.h - intersection
return intersection / union > iou_threshold
class Annotation:
def __init__(self, frame, time, detections: list[Detection]):
self.frame = frame
self.time = time
self.detections = detections if detections is not None else []
class WeatherMode(Enum):
Norm = 0
Wint = 20
Night = 40
class AnnotationClass:
def __init__(self, id, name, color):
self.id = id
self.name = name
self.color = color
color_str = color.lstrip('#')
self.opencv_color = (int(color_str[4:6], 16), int(color_str[2:4], 16), int(color_str[0:2], 16))
@staticmethod
def read_json():
classes_path = join(dirname(dirname(__file__)), 'classes.json')
with open(classes_path, 'r', encoding='utf-8') as f:
j = json.loads(f.read())
annotations_dict = {}
for mode in WeatherMode:
for cl in j:
id = mode.value + cl['Id']
name = cl['Name'] if mode.value == 0 else f'{cl["Name"]}({mode.name})'
annotations_dict[id] = AnnotationClass(id, name, cl['Color'])
return annotations_dict
@property
def color_tuple(self):
color = self.color[3:]
lv = len(color)
xx = range(0, lv, lv // 3)
return tuple(int(color[i:i + lv // 3], 16) for i in xx)
+139
View File
@@ -0,0 +1,139 @@
import cv2
import numpy as np
from inference.dto import Annotation, Detection, AnnotationClass
from inference.onnx_engine import InferenceEngine
class Inference:
def __init__(self, engine: InferenceEngine, confidence_threshold, iou_threshold):
self.engine = engine
self.confidence_threshold = confidence_threshold
self.iou_threshold = iou_threshold
self.batch_size = engine.get_batch_size()
self.model_height, self.model_width = engine.get_input_shape()
self.classes = AnnotationClass.read_json()
def draw(self, annotation: Annotation):
img = annotation.frame
img_height, img_width = img.shape[:2]
for d in annotation.detections:
x1 = int(img_width * (d.x - d.w / 2))
y1 = int(img_height * (d.y - d.h / 2))
x2 = int(x1 + img_width * d.w)
y2 = int(y1 + img_height * d.h)
color = self.classes[d.cls].opencv_color
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
label = f"{self.classes[d.cls].name}: {d.confidence:.2f}"
(label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10
cv2.rectangle(
img, (x1, label_y - label_height), (x1 + label_width, label_y + label_height), color, cv2.FILLED
)
cv2.putText(img, label, (x1, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
cv2.imshow('Video', img)
def preprocess(self, frames):
blobs = [cv2.dnn.blobFromImage(frame,
scalefactor=1.0 / 255.0,
size=(self.model_width, self.model_height),
mean=(0, 0, 0),
swapRB=True,
crop=False)
for frame in frames]
return np.vstack(blobs)
def postprocess(self, batch_frames, batch_timestamps, output):
anns = []
for i in range(len(output[0])):
frame = batch_frames[i]
timestamp = batch_timestamps[i]
detections = []
for det in output[0][i]:
if det[4] == 0:
break
if det[4] < self.confidence_threshold:
continue
x1 = max(0, det[0] / self.model_width)
y1 = max(0, det[1] / self.model_height)
x2 = min(1, det[2] / self.model_width)
y2 = min(1, det[3] / self.model_height)
conf = round(det[4], 2)
class_id = int(det[5])
x = (x1 + x2) / 2
y = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
detections.append(Detection(x, y, w, h, class_id, conf))
filtered_detections = self.remove_overlapping_detections(detections)
# if len(filtered_detections) > 0:
# _, image = cv2.imencode('.jpg', frame)
# image_bytes = image.tobytes()
annotation = Annotation(frame, timestamp, filtered_detections)
anns.append(annotation)
return anns
def process(self, video):
frame_count = 0
batch_frames = []
batch_timestamps = []
v_input = cv2.VideoCapture(video)
while v_input.isOpened():
ret, frame = v_input.read()
if not ret or frame is None:
break
frame_count += 1
if frame_count % 4 == 0:
batch_frames.append(frame)
batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC)))
if len(batch_frames) == self.batch_size:
input_blob = self.preprocess(batch_frames)
outputs = self.engine.run(input_blob)
annotations = self.postprocess(batch_frames, batch_timestamps, outputs)
for annotation in annotations:
self.draw(annotation)
print(f'video: {annotation.time / 1000:.3f}s')
if cv2.waitKey(1) & 0xFF == ord('q'):
break
batch_frames.clear()
batch_timestamps.clear()
if len(batch_frames) > 0:
input_blob = self.preprocess(batch_frames)
outputs = self.engine.run(input_blob)
annotations = self.postprocess(batch_frames, batch_timestamps, outputs)
for annotation in annotations:
self.draw(annotation)
print(f'video: {annotation.time / 1000:.3f}s')
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def remove_overlapping_detections(self, detections):
filtered_output = []
filtered_out_indexes = []
for det1_index in range(len(detections)):
if det1_index in filtered_out_indexes:
continue
det1 = detections[det1_index]
res = det1_index
for det2_index in range(det1_index + 1, len(detections)):
det2 = detections[det2_index]
if det1.overlaps(det2, self.iou_threshold):
if det1.confidence > det2.confidence or (det1.confidence == det2.confidence and det1.cls < det2.cls):
filtered_out_indexes.append(det2_index)
else:
filtered_out_indexes.append(res)
res = det2_index
filtered_output.append(detections[res])
filtered_out_indexes.append(res)
return filtered_output
+47
View File
@@ -0,0 +1,47 @@
import abc
from typing import List, Tuple
import numpy as np
import onnxruntime as onnx
class InferenceEngine(abc.ABC):
@abc.abstractmethod
def __init__(self, model_path: str, batch_size: int = 1, **kwargs):
pass
@abc.abstractmethod
def get_input_shape(self) -> Tuple[int, int]:
pass
@abc.abstractmethod
def get_batch_size(self) -> int:
pass
@abc.abstractmethod
def run(self, input_data: np.ndarray) -> List[np.ndarray]:
pass
class OnnxEngine(InferenceEngine):
def __init__(self, model_bytes, batch_size: int = 1, **kwargs):
self.batch_size = batch_size
self.session = onnx.InferenceSession(model_bytes, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
self.model_inputs = self.session.get_inputs()
self.input_name = self.model_inputs[0].name
self.input_shape = self.model_inputs[0].shape
if self.input_shape[0] != -1:
self.batch_size = self.input_shape[0]
model_meta = self.session.get_modelmeta()
print("Metadata:", model_meta.custom_metadata_map)
self.class_names = eval(model_meta.custom_metadata_map["names"])
pass
def get_input_shape(self) -> Tuple[int, int]:
shape = self.input_shape
return shape[2], shape[3]
def get_batch_size(self) -> int:
return self.batch_size
def run(self, input_data: np.ndarray) -> List[np.ndarray]:
return self.session.run(None, {self.input_name: input_data})
+148
View File
@@ -0,0 +1,148 @@
import re
import struct
import subprocess
from pathlib import Path
from typing import List, Tuple
import json
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
from inference.onnx_engine import InferenceEngine
# required for automatically initialize CUDA, do not remove.
import pycuda.autoinit
import pynvml
class TensorRTEngine(InferenceEngine):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def __init__(self, model_bytes: bytes, **kwargs):
try:
# metadata_len = struct.unpack("<I", model_bytes[:4])[0]
# try:
# self.metadata = json.loads(model_bytes[4:4 + metadata_len])
# self.class_names = self.metadata['names']
# print(f"Model metadata: {json.dumps(self.metadata, indent=2)}")
# except json.JSONDecodeError as err:
# print(f"Failed to parse metadata")
# return
# engine_data = model_bytes[4 + metadata_len:]
runtime = trt.Runtime(self.TRT_LOGGER)
self.engine = runtime.deserialize_cuda_engine(model_bytes)
if self.engine is None:
raise RuntimeError(f"Failed to load TensorRT engine!")
self.context = self.engine.create_execution_context()
# input
self.input_name = self.engine.get_tensor_name(0)
engine_input_shape = self.engine.get_tensor_shape(self.input_name)
if engine_input_shape[0] != -1:
self.batch_size = engine_input_shape[0]
self.input_shape = [
self.batch_size,
engine_input_shape[1], # Channels (usually fixed at 3 for RGB)
1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height
1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width
]
self.context.set_input_shape(self.input_name, self.input_shape)
input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize
self.d_input = cuda.mem_alloc(input_size)
# output
self.output_name = self.engine.get_tensor_name(1)
engine_output_shape = tuple(self.engine.get_tensor_shape(self.output_name))
self.output_shape = [
4 if self.input_shape[0] == -1 else self.input_shape[0], # by default, batch size is 4
300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number
6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls
]
self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32)
self.d_output = cuda.mem_alloc(self.h_output.nbytes)
self.stream = cuda.Stream()
except Exception as e:
raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}")
def get_input_shape(self) -> Tuple[int, int]:
return self.input_shape[2], self.input_shape[3]
def get_batch_size(self) -> int:
return self.batch_size
@staticmethod
def get_gpu_memory_bytes(device_id=0) -> int:
total_memory = None
try:
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(device_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
total_memory = mem_info.total
except pynvml.NVMLError:
total_memory = None
finally:
try:
pynvml.nvmlShutdown()
except pynvml.NVMLError:
pass
return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory # default 2 Gb
@staticmethod
def get_engine_filename(device_id=0) -> str | None:
try:
device = cuda.Device(device_id)
sm_count = device.multiprocessor_count
cc_major, cc_minor = device.compute_capability()
return f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}.engine"
except Exception:
return None
@staticmethod
def convert_from_onnx(onnx_model: bytes) -> bytes | None:
workspace_bytes = int(TensorRTEngine.get_gpu_memory_bytes() * 0.9)
explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
with trt.Builder(TensorRTEngine.TRT_LOGGER) as builder, \
builder.create_network(explicit_batch_flag) as network, \
trt.OnnxParser(network, TensorRTEngine.TRT_LOGGER) as parser, \
builder.create_builder_config() as config:
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_bytes)
if not parser.parse(onnx_model):
return None
if builder.platform_has_fast_fp16:
print('Converting to supported fp16')
config.set_flag(trt.BuilderFlag.FP16)
else:
print('Converting to supported fp32. (fp16 is not supported)')
plan = builder.build_serialized_network(network, config)
if plan is None:
print('Conversion failed.')
return None
return bytes(plan)
def run(self, input_data: np.ndarray) -> List[np.ndarray]:
try:
cuda.memcpy_htod_async(self.d_input, input_data, self.stream)
self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer
self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer
self.context.execute_async_v3(stream_handle=self.stream.handle)
self.stream.synchronize()
# Fix: Remove the stream parameter from memcpy_dtoh
cuda.memcpy_dtoh(self.h_output, self.d_output)
output = self.h_output.reshape(self.output_shape)
return [output]
except Exception as e:
raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")
+21
View File
@@ -0,0 +1,21 @@
import glob
import os
import shutil
from os import path
import constants
import train
from augmentation import Augmentator
# Augmentator().augment_annotations()
# train.train_dataset()
# train.resume_training('/azaion/dev/ai-training/runs/detect/train12/weights/last.pt')
model_dir = path.join(constants.config.models_dir, f'{constants.prefix}2025-05-18')
for file in glob.glob(path.join(model_dir, 'weights', 'epoch*')):
os.remove(file)
shutil.copy(path.join(model_dir, 'weights', 'best.pt'), constants.config.current_pt_model)
train.export_current_model()
print('success!')
+68
View File
@@ -0,0 +1,68 @@
import base64
import hashlib
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
BUFFER_SIZE = 64 * 1024 # 64 KB
class Security:
@staticmethod
def encrypt_to(input_bytes, key):
aes_key = hashlib.sha256(key.encode('utf-8')).digest()
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(input_bytes) + padder.finalize()
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return iv + ciphertext
@staticmethod
def decrypt_to(ciphertext_with_iv_bytes, key):
aes_key = hashlib.sha256(key.encode('utf-8')).digest()
iv = ciphertext_with_iv_bytes[:16]
ciphertext_bytes = ciphertext_with_iv_bytes[16:]
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_padded_bytes = decryptor.update(ciphertext_bytes) + decryptor.finalize()
# Manual PKCS7 unpadding check and removal
padding_value = decrypted_padded_bytes[-1] # Get the last byte, which indicates padding length
if 1 <= padding_value <= 16: # Valid PKCS7 padding value range for AES-128
padding_length = padding_value
plaintext_bytes = decrypted_padded_bytes[:-padding_length] # Remove padding bytes
else:
plaintext_bytes = decrypted_padded_bytes
return bytes(plaintext_bytes)
@staticmethod
def calc_hash(key):
str_bytes = key.encode('utf-8')
hash_bytes = hashlib.sha384(str_bytes).digest()
h = base64.b64encode(hash_bytes).decode('utf-8')
return h
@staticmethod
def get_hw_hash(hardware):
key = f'Azaion_{hardware}_%$$$)0_'
return Security.calc_hash(key)
@staticmethod
def get_api_encryption_key(creds, hardware_hash):
key = f'{creds.email}-{creds.password}-{hardware_hash}-#%@AzaionKey@%#---'
return Security.calc_hash(key)
@staticmethod
def get_model_encryption_key():
key = '-#%@AzaionKey@%#---234sdfklgvhjbnn'
return Security.calc_hash(key)
+50
View File
@@ -0,0 +1,50 @@
import pycuda.driver as cuda
import yaml
import constants
from api_client import ApiClient, ApiCredentials
from cdn_manager import CDNManager, CDNCredentials
from inference.inference import Inference
from inference.tensorrt_engine import TensorRTEngine
from security import Security
from utils import Dotdict
def get_engine_filename(device_id=0):
try:
device = cuda.Device(device_id)
sm_count = device.multiprocessor_count
cc_major, cc_minor = device.compute_capability()
return f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}.engine"
except Exception:
return None
if __name__ == "__main__":
# Inference(OnnxEngine('azaion-2025-03-10.onnx', batch_size=4),
# confidence_threshold=0.5, iou_threshold=0.3).process('ForAI_test.mp4')
# detection for the first 200sec of video:
# onnxInference: 81 sec, 6.3Gb VRAM
# tensorrt: 54 sec, 3.7Gb VRAM
# Inference(TensorRTEngine('azaion-2025-03-10_int8.engine', batch_size=16),
# confidence_threshold=0.5, iou_threshold=0.3).process('ForAI_test.mp4')
# INT8 for 200sec: 54 sec 3.7Gb
# Inference(TensorRTEngine('azaion-2025-03-10_batch8.engine', batch_size=8),
# confidence_threshold=0.5, iou_threshold=0.3).process('ForAI_test.mp4')
api_client = ApiClient()
key = Security.get_model_encryption_key()
engine_filename = TensorRTEngine.get_engine_filename()
model_bytes = api_client.load_big_small_resource(engine_filename, 'models', key)
Inference(TensorRTEngine(model_bytes),
confidence_threshold=0.5, iou_threshold=0.3).process('tests/ForAI_test.mp4')
# cdn_manager.download(cdn_c.bucket, constants.AI_TENSOR_MODEL_FILE_BIG)
# tensor_model_bytes = api_client.load_resource(constants.AI_TENSOR_MODEL_FILE_BIG, constants.AI_TENSOR_MODEL_FILE_SMALL)
# Inference(OnnxEngine(onxx_model_bytes, batch_size=4),
# confidence_threshold=0.5, iou_threshold=0.3).process('tests/ForAI_test.mp4')
+178
View File
@@ -0,0 +1,178 @@
import concurrent.futures
import glob
import os
import random
import shutil
import subprocess
from datetime import datetime
from os import path, replace, listdir, makedirs, scandir
from os.path import abspath
from pathlib import Path
from time import sleep
import yaml
from ultralytics import YOLO
import constants
from api_client import ApiCredentials, ApiClient
from cdn_manager import CDNCredentials, CDNManager
from dto.annotationClass import AnnotationClass
from inference.onnx_engine import OnnxEngine
from security import Security
from utils import Dotdict
from exports import export_tensorrt, upload_model, export_onnx
today_folder = f'{constants.prefix}{datetime.now():{constants.date_format}}'
train_set = 70
valid_set = 20
test_set = 10
old_images_percentage = 75
DEFAULT_CLASS_NUM = 80
total_files_copied = 0
def form_dataset():
today_dataset = path.join(constants.config.datasets_dir, today_folder)
shutil.rmtree(today_dataset, ignore_errors=True)
makedirs(today_dataset)
images = []
with scandir(constants.config.processed_images_dir) as imd:
for image_file in imd:
if not image_file.is_file():
continue
images.append(image_file)
print(f'Got {len(images)} images. Start shuffling...')
random.shuffle(images)
train_size = int(len(images) * train_set / 100.0)
valid_size = int(len(images) * valid_set / 100.0)
print(f'Start copying...')
copy_annotations(images[:train_size], 'train')
copy_annotations(images[train_size:train_size + valid_size], 'valid')
copy_annotations(images[train_size + valid_size:], 'test')
def copy_annotations(images, folder):
global total_files_copied
total_files_copied = 0
def copy_image(image):
global total_files_copied
total_files_copied += 1
label_name = f'{Path(image.path).stem}.txt'
label_path = path.join(constants.config.processed_labels_dir, label_name)
if check_label(label_path):
shutil.copy(image.path, path.join(destination_images, image.name))
shutil.copy(label_path, path.join(destination_labels, label_name))
else:
shutil.copy(image.path, path.join(constants.config.corrupted_images_dir, image.name))
shutil.copy(label_path, path.join(constants.config.corrupted_labels_dir, label_name))
print(f'Label {label_path} is corrupted! Copy with its image to the corrupted directory ({constants.config.corrupted_labels_dir})')
if total_files_copied % 1000 == 0:
print(f'{total_files_copied} copied...')
today_dataset = path.join(constants.config.datasets_dir, today_folder)
destination_images = path.join(today_dataset, folder, 'images')
makedirs(destination_images, exist_ok=True)
destination_labels = path.join(today_dataset, folder, 'labels')
makedirs(destination_labels, exist_ok=True)
makedirs(constants.config.corrupted_images_dir, exist_ok=True)
makedirs(constants.config.corrupted_labels_dir, exist_ok=True)
copied = 0
print(f'Copying annotations to {destination_images} and {destination_labels} folders:')
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(copy_image, images)
print(f'Copied all {copied} annotations to {destination_images} and {destination_labels} folders')
def check_label(label_path):
if not path.exists(label_path):
return False
with open(label_path, 'r') as f:
lines = f.readlines()
for line in lines:
for val in line.split(' ')[1:]:
if float(val) > 1:
return False
return True
def create_yaml():
print('creating yaml...')
lines = ['names:']
annotation_classes = AnnotationClass.read_json()
for i in range(DEFAULT_CLASS_NUM):
if i in annotation_classes:
lines.append(f'- {annotation_classes[i].name}')
else:
lines.append(f'- Class-{i + 1}')
lines.append(f'nc: {DEFAULT_CLASS_NUM}')
lines.append(f'test: test/images')
lines.append(f'train: train/images')
lines.append(f'val: valid/images')
lines.append('')
today_dataset = path.join(constants.config.datasets_dir, today_folder)
today_yaml = abspath(path.join(today_dataset, 'data.yaml'))
with open(today_yaml, 'w', encoding='utf-8') as f:
f.writelines([f'{line}\n' for line in lines])
def resume_training(last_pt_path):
model = YOLO(last_pt_path)
results = model.train(data=yaml,
resume=True,
epochs=constants.config.training.epochs,
batch=constants.config.training.batch,
imgsz=constants.config.training.imgsz,
save_period=constants.config.training.save_period,
workers=constants.config.training.workers)
def train_dataset():
form_dataset()
create_yaml()
model = YOLO(constants.config.training.model)
today_dataset = path.join(constants.config.datasets_dir, today_folder)
results = model.train(data=abspath(path.join(today_dataset, 'data.yaml')),
epochs=constants.config.training.epochs,
batch=constants.config.training.batch,
imgsz=constants.config.training.imgsz,
save_period=constants.config.training.save_period,
workers=constants.config.training.workers)
model_dir = path.join(constants.config.models_dir, today_folder)
shutil.copytree(results.save_dir, model_dir)
for file in glob.glob(path.join(model_dir, 'weights', 'epoch*')): # remove unnecessary middle epochs
os.remove(file)
shutil.copy(path.join(model_dir, 'weights', 'best.pt'), constants.config.current_pt_model)
def export_current_model():
export_onnx(constants.config.current_pt_model)
api_client = ApiClient()
with open(constants.config.current_onnx_model, 'rb') as binary_file:
onnx_bytes = binary_file.read()
key = Security.get_model_encryption_key()
api_client.upload_big_small_resource(onnx_bytes, 'azaion.onnx', constants.MODELS_FOLDER, key)
if __name__ == '__main__':
train_dataset()
export_current_model()
print('success!')
+5
View File
@@ -0,0 +1,5 @@
class Dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__