mirror of
https://github.com/azaion/ai-training.git
synced 2026-04-22 10:16:34 +00:00
184 lines
6.5 KiB
Python
184 lines
6.5 KiB
Python
import os.path
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import albumentations as A
|
|
import cv2
|
|
import numpy as np
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
from config import Config
|
|
from constants import current_images_dir, current_labels_dir, annotation_classes, prefix, date_format, \
|
|
current_dataset_dir
|
|
from dto.imageLabel import ImageLabel
|
|
|
|
config = Config()
|
|
|
|
|
|
def image_processing(img_ann: ImageLabel) -> [ImageLabel]:
|
|
transforms = [
|
|
A.Compose([A.HorizontalFlip(always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.RandomBrightnessContrast(always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.SafeRotate(limit=90, always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.SafeRotate(limit=90, always_apply=True),
|
|
A.RandomBrightnessContrast(always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True),
|
|
A.VerticalFlip(always_apply=True), ],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo')),
|
|
A.Compose([A.SafeRotate(limit=90, always_apply=True),
|
|
A.RandomBrightnessContrast(always_apply=True)],
|
|
bbox_params=A.BboxParams(format='yolo'))
|
|
]
|
|
|
|
results = []
|
|
for i, transform in enumerate(transforms):
|
|
try:
|
|
res = transform(image=img_ann.image, bboxes=img_ann.labels)
|
|
path = Path(img_ann.image_path)
|
|
name = f'{path.stem}_{i + 1}'
|
|
img = ImageLabel(
|
|
image=res['image'],
|
|
labels=res['bboxes'],
|
|
image_path=os.path.join(current_images_dir, f'{name}{path.suffix}'),
|
|
labels_path=os.path.join(current_labels_dir, f'{name}.txt')
|
|
)
|
|
results.append(img)
|
|
except Exception as e:
|
|
print(f'Error during transformtation: {e}')
|
|
return results
|
|
|
|
|
|
def write_result(img_ann: ImageLabel, show_image=False):
|
|
os.makedirs(os.path.dirname(img_ann.image_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(img_ann.labels_path), exist_ok=True)
|
|
|
|
if show_image:
|
|
img_ann.visualize(annotation_classes)
|
|
|
|
cv2.imencode('.jpg', img_ann.image)[1].tofile(img_ann.image_path)
|
|
print(f'{img_ann.image_path} written')
|
|
|
|
with open(img_ann.labels_path, 'w') as f:
|
|
lines = [f'{ann[4]} {round(ann[0], 5)} {round(ann[1], 5)} {round(ann[2], 5)} {round(ann[3], 5)}\n' for ann in
|
|
img_ann.labels]
|
|
f.writelines(lines)
|
|
f.close()
|
|
print(f'{img_ann.labels_path} written')
|
|
|
|
|
|
def read_labels(labels_path) -> [[]]:
|
|
with open(labels_path, 'r') as f:
|
|
rows = f.readlines()
|
|
arr = []
|
|
for row in rows:
|
|
str_coordinates = row.split(' ')
|
|
class_num = str_coordinates.pop(0)
|
|
coordinates = [float(n.replace(',', '.')) for n in str_coordinates]
|
|
coordinates.append(class_num)
|
|
arr.append(coordinates)
|
|
return arr
|
|
|
|
|
|
def process_image(img_ann):
|
|
results = image_processing(img_ann)
|
|
for res_ann in results:
|
|
write_result(res_ann)
|
|
write_result(ImageLabel(
|
|
image=img_ann.image,
|
|
labels=img_ann.labels,
|
|
image_path=os.path.join(current_images_dir, Path(img_ann.image_path).name),
|
|
labels_path=os.path.join(current_labels_dir, Path(img_ann.labels_path).name)
|
|
))
|
|
# os.remove(img_ann.image_path)
|
|
# os.remove(img_ann.labels_path)
|
|
|
|
|
|
def get_checkpoint():
|
|
if config.checkpoint is not None:
|
|
return config.checkpoint
|
|
|
|
dates = []
|
|
for directory in os.listdir('models'):
|
|
try:
|
|
dates.append(datetime.strptime(directory[len(prefix):], date_format))
|
|
except:
|
|
continue
|
|
if len(dates) == 0:
|
|
return datetime.now() - relativedelta(years=1)
|
|
else:
|
|
return max(dates)
|
|
|
|
|
|
def main():
|
|
last_date = checkpoint = get_checkpoint()
|
|
while True:
|
|
images = []
|
|
with os.scandir(config.images_dir) as imd:
|
|
for image_file in imd:
|
|
if not image_file.is_file():
|
|
continue
|
|
mod_time = datetime.fromtimestamp(image_file.stat().st_mtime)
|
|
if mod_time > checkpoint:
|
|
images.append(image_file)
|
|
last_date = max(last_date, mod_time)
|
|
|
|
for image_file in images:
|
|
try:
|
|
image_path = os.path.join(config.images_dir, image_file.name)
|
|
labels_path = os.path.join(config.labels_dir, f'{Path(image_path).stem}.txt')
|
|
image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
|
|
process_image(ImageLabel(
|
|
image_path=image_path,
|
|
image=image,
|
|
labels_path=labels_path,
|
|
labels=read_labels(labels_path)
|
|
))
|
|
except Exception as e:
|
|
print(f'Error appeared {e}')
|
|
if last_date != checkpoint:
|
|
checkpoint = config.checkpoint = last_date
|
|
config.write()
|
|
time.sleep(5)
|
|
|
|
|
|
def check_labels():
|
|
for label in os.listdir(os.path.join(current_dataset_dir, 'labels')):
|
|
with open(os.path.join(current_dataset_dir, 'labels', label), 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
list_c = line.split(' ')[1:]
|
|
for l in list_c:
|
|
if float(l) > 1:
|
|
print('Error!')
|
|
|
|
|
|
def fix_class(folder):
|
|
for label in os.listdir(folder):
|
|
if label.startswith('0000'):
|
|
with open(os.path.join(folder, label), 'r+') as f:
|
|
lines = f.readlines()
|
|
truncated = False
|
|
for i in range(0, len(lines)):
|
|
if len(lines[i]) < 25:
|
|
print(lines[i])
|
|
truncated = True
|
|
lines.pop(i)
|
|
if truncated:
|
|
f.truncate(0)
|
|
f.seek(0)
|
|
f.writelines(lines)
|
|
f.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
fix_class('datasets/zombobase-2024-06-18/test/labels')
|
|
fix_class('datasets/zombobase-2024-06-18/train/labels')
|
|
fix_class('datasets/zombobase-2024-06-18/valid/labels')
|
|
# main()
|