import os.path import time from datetime import datetime from pathlib import Path import albumentations as A import cv2 import numpy as np from dateutil.relativedelta import relativedelta from config import Config from constants import current_images_dir, current_labels_dir, annotation_classes, prefix, date_format, \ current_dataset_dir from dto.imageLabel import ImageLabel config = Config() def image_processing(img_ann: ImageLabel) -> [ImageLabel]: transforms = [ A.Compose([A.HorizontalFlip(always_apply=True)], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.RandomBrightnessContrast(always_apply=True)], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.SafeRotate(limit=90, always_apply=True)], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.SafeRotate(limit=90, always_apply=True), A.RandomBrightnessContrast(always_apply=True)], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True), A.VerticalFlip(always_apply=True), ], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.ShiftScaleRotate(scale_limit=0.2, always_apply=True)], bbox_params=A.BboxParams(format='yolo')), A.Compose([A.SafeRotate(limit=90, always_apply=True), A.RandomBrightnessContrast(always_apply=True)], bbox_params=A.BboxParams(format='yolo')) ] results = [] for i, transform in enumerate(transforms): try: res = transform(image=img_ann.image, bboxes=img_ann.labels) path = Path(img_ann.image_path) name = f'{path.stem}_{i + 1}' img = ImageLabel( image=res['image'], labels=res['bboxes'], image_path=os.path.join(current_images_dir, f'{name}{path.suffix}'), labels_path=os.path.join(current_labels_dir, f'{name}.txt') ) results.append(img) except Exception as e: print(f'Error during transformtation: {e}') return results def write_result(img_ann: ImageLabel, show_image=False): os.makedirs(os.path.dirname(img_ann.image_path), exist_ok=True) os.makedirs(os.path.dirname(img_ann.labels_path), exist_ok=True) if show_image: img_ann.visualize(annotation_classes) cv2.imencode('.jpg', img_ann.image)[1].tofile(img_ann.image_path) print(f'{img_ann.image_path} written') with open(img_ann.labels_path, 'w') as f: lines = [f'{ann[4]} {round(ann[0], 5)} {round(ann[1], 5)} {round(ann[2], 5)} {round(ann[3], 5)}\n' for ann in img_ann.labels] f.writelines(lines) f.close() print(f'{img_ann.labels_path} written') def read_labels(labels_path) -> [[]]: with open(labels_path, 'r') as f: rows = f.readlines() arr = [] for row in rows: str_coordinates = row.split(' ') class_num = str_coordinates.pop(0) coordinates = [float(n.replace(',', '.')) for n in str_coordinates] coordinates.append(class_num) arr.append(coordinates) return arr def process_image(img_ann): results = image_processing(img_ann) for res_ann in results: write_result(res_ann) write_result(ImageLabel( image=img_ann.image, labels=img_ann.labels, image_path=os.path.join(current_images_dir, Path(img_ann.image_path).name), labels_path=os.path.join(current_labels_dir, Path(img_ann.labels_path).name) )) # os.remove(img_ann.image_path) # os.remove(img_ann.labels_path) def get_checkpoint(): if config.checkpoint is not None: return config.checkpoint dates = [] for directory in os.listdir('models'): try: dates.append(datetime.strptime(directory[len(prefix):], date_format)) except: continue if len(dates) == 0: return datetime.now() - relativedelta(years=1) else: return max(dates) def main(): last_date = checkpoint = get_checkpoint() while True: images = [] with os.scandir(config.images_dir) as imd: for image_file in imd: if not image_file.is_file(): continue mod_time = datetime.fromtimestamp(image_file.stat().st_mtime) if mod_time > checkpoint: images.append(image_file) last_date = max(last_date, mod_time) for image_file in images: try: image_path = os.path.join(config.images_dir, image_file.name) labels_path = os.path.join(config.labels_dir, f'{Path(image_path).stem}.txt') image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED) process_image(ImageLabel( image_path=image_path, image=image, labels_path=labels_path, labels=read_labels(labels_path) )) except Exception as e: print(f'Error appeared {e}') if last_date != checkpoint: checkpoint = config.checkpoint = last_date config.write() time.sleep(5) def check_labels(): for label in os.listdir(os.path.join(current_dataset_dir, 'labels')): with open(os.path.join(current_dataset_dir, 'labels', label), 'r') as f: lines = f.readlines() for line in lines: list_c = line.split(' ')[1:] for l in list_c: if float(l) > 1: print('Error!') def fix_class(folder): for label in os.listdir(folder): if label.startswith('0000'): with open(os.path.join(folder, label), 'r+') as f: lines = f.readlines() for i in range(0, len(lines)): l = lines[i] lines[i] = f'2{l[1:]}' f.seek(0) # rewind f.writelines(lines) if __name__ == '__main__': fix_class('datasets/zombobase-2024-06-18/test/labels') fix_class('datasets/zombobase-2024-06-18/train/labels') fix_class('datasets/zombobase-2024-06-18/valid/labels') # main()