Files
ai-training/augmentation.py
T
Alex Bezdieniezhnykh eecc11b1e5 queue handler done
2025-05-22 02:15:21 +03:00

153 lines
5.9 KiB
Python

import concurrent.futures
import os.path
import shutil
import time
from datetime import datetime
from pathlib import Path
import albumentations as A
import cv2
import numpy as np
from constants import (data_images_dir, data_labels_dir, processed_images_dir, processed_labels_dir, processed_dir)
from dto.imageLabel import ImageLabel
class Augmentator:
def __init__(self):
self.total_files_processed = 0
self.total_images_to_process = 0
self.correct_margin = 0.0005
self.correct_min_bbox_size = 0.01
self.transform = A.Compose([
A.HorizontalFlip(p=0.6),
A.RandomBrightnessContrast(p=0.4, brightness_limit=(-0.3, 0.3), contrast_limit=(-0.05, 0.05)),
A.Affine(p=0.8, scale=(0.8, 1.2), rotate=(-35, 35), shear=(-10, 10)),
A.MotionBlur(p=0.1, blur_limit=(1, 2)),
A.HueSaturationValue(p=0.4, hue_shift_limit=10, sat_shift_limit=10, val_shift_limit=10)
], bbox_params=A.BboxParams(format='yolo'))
def correct_bboxes(self, labels):
res = []
for bboxes in labels:
x = bboxes[0]
y = bboxes[1]
half_width = 0.5*bboxes[2]
half_height = 0.5*bboxes[3]
# calc how much bboxes are outside borders ( +small margin ).
# value should be negative. If it's positive, then put 0, as no correction
w_diff = min((1 - self.correct_margin) - (x + half_width), (x - half_width) - self.correct_margin, 0)
w = bboxes[2] + 2*w_diff
if w < self.correct_min_bbox_size:
continue
h_diff = min((1 - self.correct_margin) - (y + half_height), ((y - half_height) - self.correct_margin), 0)
h = bboxes[3] + 2 * h_diff
if h < self.correct_min_bbox_size:
continue
res.append([x, y, w, h, bboxes[4]])
return res
pass
def augment_inner(self, img_ann: ImageLabel) -> [ImageLabel]:
results = []
labels = self.correct_bboxes(img_ann.labels)
if len(labels) == 0 and len(img_ann.labels) != 0:
print('no labels but was!!!')
results.append(ImageLabel(
image=img_ann.image,
labels=img_ann.labels,
image_path=os.path.join(processed_images_dir, Path(img_ann.image_path).name),
labels_path=os.path.join(processed_labels_dir, Path(img_ann.labels_path).name)
)
)
for i in range(7):
try:
res = self.transform(image=img_ann.image, bboxes=labels)
path = Path(img_ann.image_path)
name = f'{path.stem}_{i + 1}'
img = ImageLabel(
image=res['image'],
labels=res['bboxes'],
image_path=os.path.join(processed_images_dir, f'{name}{path.suffix}'),
labels_path=os.path.join(processed_labels_dir, f'{name}.txt')
)
results.append(img)
except Exception as e:
print(f'Error during transformation: {e}')
return results
def read_labels(self, labels_path) -> [[]]:
with open(labels_path, 'r') as f:
rows = f.readlines()
arr = []
for row in rows:
str_coordinates = row.split(' ')
class_num = str_coordinates.pop(0)
coordinates = [float(n.replace(',', '.')) for n in str_coordinates]
# noinspection PyTypeChecker
coordinates.append(class_num)
arr.append(coordinates)
return arr
def augment_annotation(self, image_file):
try:
image_path = os.path.join(data_images_dir, image_file.name)
labels_path = os.path.join(data_labels_dir, f'{Path(str(image_path)).stem}.txt')
image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
img_ann = ImageLabel(
image_path=image_path,
image=image,
labels_path=labels_path,
labels=self.read_labels(labels_path)
)
try:
results = self.augment_inner(img_ann)
for annotation in results:
cv2.imencode('.jpg', annotation.image)[1].tofile(annotation.image_path)
with open(annotation.labels_path, 'w') as f:
lines = [f'{l[4]} {round(l[0], 5)} {round(l[1], 5)} {round(l[2], 5)} {round(l[3], 5)}\n' for l in
annotation.labels]
f.writelines(lines)
f.close()
print(f'{datetime.now():{"%Y-%m-%d %H:%M:%S"}}: {self.total_files_processed + 1}/{self.total_to_process} : {image_file.name} has augmented')
except Exception as e:
print(e)
self.total_files_processed += 1
except Exception as e:
print(f'Error appeared in thread for {image_file.name}: {e}')
def augment_annotations(self, from_scratch=False):
self.total_files_processed = 0
if from_scratch:
shutil.rmtree(processed_dir)
os.makedirs(processed_images_dir, exist_ok=True)
os.makedirs(processed_labels_dir, exist_ok=True)
processed_images = set(f.name for f in os.scandir(processed_images_dir))
images = []
with os.scandir(data_images_dir) as imd:
for image_file in imd:
if image_file.is_file() and image_file.name not in processed_images:
images.append(image_file)
self.total_images_to_process = len(images)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(self.augment_annotation, images)
if __name__ == '__main__':
augmentator = Augmentator()
while True:
augmentator.augment_annotations()
print('All processed, waiting for 5 minutes...')
time.sleep(300)