Files
ai-training/preprocessing.py
T
Alex Bezdieniezhnykh 6c50dd19b7 add dataset-visualiser.py
2024-06-19 02:28:50 +03:00

184 lines
6.5 KiB
Python

import os.path
import time
from datetime import datetime
from pathlib import Path
import albumentations as A
import cv2
import numpy as np
from dateutil.relativedelta import relativedelta
from config import Config
from constants import current_images_dir, current_labels_dir, annotation_classes, prefix, date_format, \
current_dataset_dir
from dto.imageLabel import ImageLabel
config = Config()
def image_processing(img_ann: ImageLabel) -> [ImageLabel]:
transforms = [
A.Compose([A.HorizontalFlip(always_apply=True)],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.RandomBrightnessContrast(always_apply=True)],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.SafeRotate(limit=90, always_apply=True)],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.SafeRotate(limit=90, always_apply=True),
A.RandomBrightnessContrast(always_apply=True)],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True),
A.VerticalFlip(always_apply=True), ],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True)],
bbox_params=A.BboxParams(format='yolo')),
A.Compose([A.SafeRotate(limit=90, always_apply=True),
A.RandomBrightnessContrast(always_apply=True)],
bbox_params=A.BboxParams(format='yolo'))
]
results = []
for i, transform in enumerate(transforms):
try:
res = transform(image=img_ann.image, bboxes=img_ann.labels)
path = Path(img_ann.image_path)
name = f'{path.stem}_{i + 1}'
img = ImageLabel(
image=res['image'],
labels=res['bboxes'],
image_path=os.path.join(current_images_dir, f'{name}{path.suffix}'),
labels_path=os.path.join(current_labels_dir, f'{name}.txt')
)
results.append(img)
except Exception as e:
print(f'Error during transformtation: {e}')
return results
def write_result(img_ann: ImageLabel, show_image=False):
os.makedirs(os.path.dirname(img_ann.image_path), exist_ok=True)
os.makedirs(os.path.dirname(img_ann.labels_path), exist_ok=True)
if show_image:
img_ann.visualize(annotation_classes)
cv2.imencode('.jpg', img_ann.image)[1].tofile(img_ann.image_path)
print(f'{img_ann.image_path} written')
with open(img_ann.labels_path, 'w') as f:
lines = [f'{ann[4]} {round(ann[0], 5)} {round(ann[1], 5)} {round(ann[2], 5)} {round(ann[3], 5)}\n' for ann in
img_ann.labels]
f.writelines(lines)
f.close()
print(f'{img_ann.labels_path} written')
def read_labels(labels_path) -> [[]]:
with open(labels_path, 'r') as f:
rows = f.readlines()
arr = []
for row in rows:
str_coordinates = row.split(' ')
class_num = str_coordinates.pop(0)
coordinates = [float(n.replace(',', '.')) for n in str_coordinates]
coordinates.append(class_num)
arr.append(coordinates)
return arr
def process_image(img_ann):
results = image_processing(img_ann)
for res_ann in results:
write_result(res_ann)
write_result(ImageLabel(
image=img_ann.image,
labels=img_ann.labels,
image_path=os.path.join(current_images_dir, Path(img_ann.image_path).name),
labels_path=os.path.join(current_labels_dir, Path(img_ann.labels_path).name)
))
# os.remove(img_ann.image_path)
# os.remove(img_ann.labels_path)
def get_checkpoint():
if config.checkpoint is not None:
return config.checkpoint
dates = []
for directory in os.listdir('models'):
try:
dates.append(datetime.strptime(directory[len(prefix):], date_format))
except:
continue
if len(dates) == 0:
return datetime.now() - relativedelta(years=1)
else:
return max(dates)
def main():
last_date = checkpoint = get_checkpoint()
while True:
images = []
with os.scandir(config.images_dir) as imd:
for image_file in imd:
if not image_file.is_file():
continue
mod_time = datetime.fromtimestamp(image_file.stat().st_mtime)
if mod_time > checkpoint:
images.append(image_file)
last_date = max(last_date, mod_time)
for image_file in images:
try:
image_path = os.path.join(config.images_dir, image_file.name)
labels_path = os.path.join(config.labels_dir, f'{Path(image_path).stem}.txt')
image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
process_image(ImageLabel(
image_path=image_path,
image=image,
labels_path=labels_path,
labels=read_labels(labels_path)
))
except Exception as e:
print(f'Error appeared {e}')
if last_date != checkpoint:
checkpoint = config.checkpoint = last_date
config.write()
time.sleep(5)
def check_labels():
for label in os.listdir(os.path.join(current_dataset_dir, 'labels')):
with open(os.path.join(current_dataset_dir, 'labels', label), 'r') as f:
lines = f.readlines()
for line in lines:
list_c = line.split(' ')[1:]
for l in list_c:
if float(l) > 1:
print('Error!')
def fix_class(folder):
for label in os.listdir(folder):
if label.startswith('0000'):
with open(os.path.join(folder, label), 'r+') as f:
lines = f.readlines()
truncated = False
for i in range(0, len(lines)):
if len(lines[i]) < 25:
print(lines[i])
truncated = True
lines.pop(i)
if truncated:
f.truncate(0)
f.seek(0)
f.writelines(lines)
f.close()
if __name__ == '__main__':
fix_class('datasets/zombobase-2024-06-18/test/labels')
fix_class('datasets/zombobase-2024-06-18/train/labels')
fix_class('datasets/zombobase-2024-06-18/valid/labels')
# main()