import os import time from datetime import datetime, timedelta from pathlib import Path import cv2 import numpy as np import torch import kornia.augmentation as K import kornia.utils as KU from torch.utils.data import Dataset, DataLoader import concurrent.futures from constants import (data_images_dir, data_labels_dir, processed_images_dir, processed_labels_dir) from dto.imageLabel import ImageLabel # Configurable parameters num_augmented_images = 7 augmentation_probability = 0.5 # general probability for augmentations, can be adjusted per augmentation RESIZE_SIZE = (1080, 1920) # Resize images to Full HD 1920x1080 (height, width) processed_images_dir = processed_images_dir + '_cuda' processed_labels_dir = processed_labels_dir + '_cuda' # Ensure directories exist os.makedirs(processed_images_dir, exist_ok=True) os.makedirs(processed_labels_dir, exist_ok=True) # Custom Augmentations class RandomFog(K.AugmentationBase2D): def __init__(self, fog_coef_range=(0, 0.3), p=augmentation_probability, same_on_batch=True): super().__init__(p=p, same_on_batch=same_on_batch) self.fog_coef_range = fog_coef_range def compute_transformation(self, input_shape: torch.Size, params: dict) -> dict: return {"fog_factor": torch.rand(input_shape[0], device=self.device) * (self.fog_coef_range[1] - self.fog_coef_range[0]) + self.fog_coef_range[0]} def apply_transform(self, input: torch.Tensor, params: dict, transform: dict) -> torch.Tensor: fog_factor = transform['fog_factor'].view(-1, 1, 1, 1) return input * (1.0 - fog_factor) + fog_factor class RandomShadow(K.AugmentationBase2D): def __init__(self, shadow_factor_range=(0.2, 0.8), p=augmentation_probability, same_on_batch=True): super().__init__(p=p, same_on_batch=same_on_batch) self.shadow_factor_range = shadow_factor_range def compute_transformation(self, input_shape: torch.Size, params: dict) -> dict: batch_size, _, height, width = input_shape x1 = torch.randint(0, width, (batch_size,), device=self.device) y1 = torch.randint(0, height, (batch_size,), device=self.device) x2 = torch.randint(x1, width, (batch_size,), device=self.device) y2 = torch.randint(y1, height, (batch_size,), device=self.device) shadow_factor = torch.rand(batch_size, device=self.device) * (self.shadow_factor_range[1] - self.shadow_factor_range[0]) + self.shadow_factor_range[0] return {"x1": x1, "y1": y1, "x2": x2, "y2": y2, "shadow_factor": shadow_factor} def apply_transform(self, input: torch.Tensor, params: dict, transform: dict) -> torch.Tensor: batch_size, _, height, width = input.size() mask = torch.zeros_like(input, device=self.device) for b in range(batch_size): mask[b, :, transform['y1'][b]:transform['y2'][b], transform['x1'][b]:transform['x2'][b]] = 1 shadow_factor = transform['shadow_factor'].view(-1, 1, 1, 1) return input * (1.0 - mask) + input * mask * shadow_factor class ImageDataset(Dataset): def __init__(self, images_dir, labels_dir): self.images_dir = images_dir self.labels_dir = labels_dir self.image_filenames = [f for f in os.listdir(images_dir) if os.path.isfile(os.path.join(images_dir, f))] self.resize = K.Resize(RESIZE_SIZE) # Add resize transform here def __len__(self): return len(self.image_filenames) def __getitem__(self, idx): image_filename = self.image_filenames[idx] image_path = os.path.join(self.images_dir, image_filename) label_path = os.path.join(self.labels_dir, Path(image_filename).stem + '.txt') image_np = cv2.imread(image_path) if image_np is None: raise FileNotFoundError(f"Error reading image: {image_path}") image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) # Convert to RGB for Kornia image = KU.image_to_tensor(image_np, keepdim=False).float() # HWC -> CHW, and to tensor, convert to float here! image = self.resize(image) # Resize image here to fixed size print(f"Image shape after resize (index {idx}, filename {image_filename}): {image.shape}") # DEBUG PRINT labels = [] if os.path.exists(label_path): labels = self._read_labels(label_path) return image, labels, image_filename def _read_labels(self, labels_path): labels = [] with open(labels_path, 'r') as f: for row in f.readlines(): str_coordinates = row.strip().split(' ') class_num = int(str_coordinates[0]) coordinates = [float(n) for n in str_coordinates[1:]] # x_center, y_center, width, height (normalized YOLO) labels.append([*coordinates, class_num]) return labels def yolo_to_xyxy(bboxes_yolo, image_width, image_height): bboxes_xyxy = [] for bbox in bboxes_yolo: x_center, y_center, w, h, class_id = bbox x_min = int((x_center - w / 2) * image_width) y_min = int((y_center - h / 2) * image_height) x_max = int((x_center + w / 2) * image_width) y_max = int((y_center + h / 2) * image_height) bboxes_xyxy.append([x_min, y_min, x_max, y_max, class_id]) return torch.tensor(bboxes_xyxy) if bboxes_xyxy else torch.empty((0, 5)) def xyxy_to_yolo(bboxes_xyxy, image_width, image_height): bboxes_yolo = [] for bbox in bboxes_xyxy: x_min, y_min, x_max, y_max, class_id = bbox x_center = ((x_min + x_max) / 2) / image_width y_center = ((y_min + y_max) / 2) / image_height w = (x_max - x_min) / image_width h = (y_max - y_min) / image_height bboxes_yolo.append([x_center, y_center, w, h, int(class_id)]) return bboxes_yolo def correct_bboxes(labels): margin = 0.0005 min_size = 0.01 res = [] for bboxes in labels: x = bboxes[0] y = bboxes[1] half_width = 0.5*bboxes[2] half_height = 0.5*bboxes[3] w_diff = min( (1 - margin) - (x + half_width), (x - half_width) - margin, 0 ) w = bboxes[2] + 2*w_diff if w < min_size: continue h_diff = min( (1 - margin) - (y + half_height), ((y - half_height) - margin), 0) h = bboxes[3] + 2 * h_diff if h < min_size: continue res.append([x, y, w, h, bboxes[4]]) return res def process_image_and_labels(image, labels_yolo, image_filename, geometric_pipeline, intensity_pipeline, device): image = image.float() / 255.0 original_height, original_width = RESIZE_SIZE[0], RESIZE_SIZE[1] # Use fixed resize size (Height, Width) processed_image_labels = [] # 1. Original image and labels current_labels_yolo_corrected = correct_bboxes(labels_yolo) processed_image_labels.append(ImageLabel( image=KU.tensor_to_image(image.byte()), # Convert back to numpy uint8 for saving labels=current_labels_yolo_corrected, image_path=os.path.join(processed_images_dir, image_filename), labels_path=os.path.join(processed_labels_dir, Path(image_filename).stem + '.txt') )) # 2-8. Augmented images for i in range(num_augmented_images): img_batch = image.unsqueeze(0).to(device) # BCHW bboxes_xyxy = yolo_to_xyxy(labels_yolo, original_width, original_height).unsqueeze(0).to(device) # B N 5 augmented_batch = geometric_pipeline(img_batch, params={"bbox": bboxes_xyxy}) geo_augmented_image = augmented_batch["input"] geo_augmented_bboxes_xyxy = augmented_batch["bbox"] intensity_augmented_image = intensity_pipeline(geo_augmented_image) # Convert back to CPU and numpy augmented_image_np = KU.tensor_to_image((intensity_augmented_image.squeeze(0).cpu() * 255.0).byte()) augmented_bboxes_xyxy_cpu = geo_augmented_bboxes_xyxy.squeeze(0).cpu() augmented_bboxes_yolo = xyxy_to_yolo(augmented_bboxes_xyxy_cpu, original_width, original_height) augmented_bboxes_yolo_corrected = correct_bboxes(augmented_bboxes_yolo) processed_image_labels.append(ImageLabel( image=augmented_image_np, labels=augmented_bboxes_yolo_corrected, image_path=os.path.join(processed_images_dir, f'{Path(image_filename).stem}_{i + 1}{Path(image_filename).suffix}'), labels_path=os.path.join(processed_labels_dir, f'{Path(image_filename).stem}_{i + 1}.txt') )) return processed_image_labels def write_result(img_ann: ImageLabel): cv2.imwrite(img_ann.image_path, cv2.cvtColor(img_ann.image, cv2.COLOR_RGB2BGR)) # Save as BGR print(f'{img_ann.image_path} written') with open(img_ann.labels_path, 'w') as f: lines = [f'{ann[4]} {round(ann[0], 5)} {round(ann[1], 5)} {round(ann[2], 5)} {round(ann[3], 5)}\n' for ann in img_ann.labels] f.writelines(lines) f.close() print(f'{img_ann.labels_path} written') def process_batch_wrapper(batch_data, geometric_pipeline, intensity_pipeline, device): processed_batch_image_labels = [] for image, labels_yolo, image_filename in batch_data: results = process_image_and_labels(image, labels_yolo, image_filename, geometric_pipeline, intensity_pipeline, device) processed_batch_image_labels.extend(results) return processed_batch_image_labels def save_batch_results(batch_image_labels): global total_files_processed for img_ann in batch_image_labels: write_result(img_ann) total_files_processed += 1 print(f"Total processed images: {total_files_processed}") def main(): global total_files_processed total_files_processed = 0 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") geometric_pipeline = K.AugmentationSequential( K.RandomHorizontalFlip(p=0.5), K.RandomAffine(degrees=25, translate=(0.1, 0.1), scale=(0.8, 1.2), p=0.5), data_keys=["input", "bbox"], same_on_batch=False ).to(device) intensity_pipeline = K.AugmentationSequential( K.ColorJitter(brightness=0.1, contrast=0.07, saturation=0.1, hue=0.1, p=0.5), RandomFog(p=0.2), RandomShadow(p=0.3), K.RandomMotionBlur(kernel_size=3, angle=35., direction=0.5, p=0.3), data_keys=["input"], same_on_batch=False ).to(device) dataset = ImageDataset(data_images_dir, data_labels_dir) dataloader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=os.cpu_count()) # Adjust batch_size as needed processed_images_set = set(os.listdir(processed_images_dir)) images_to_process_indices = [i for i, filename in enumerate(dataset.image_filenames) if filename not in processed_images_set] dataloader_filtered = torch.utils.data.Subset(dataset, images_to_process_indices) filtered_dataloader = DataLoader(dataloader_filtered, batch_size=32, shuffle=False, num_workers=os.cpu_count()) start_time = time.time() try: for batch_data in filtered_dataloader: batch_image = batch_data[0] batch_labels = batch_data[1] batch_filenames = batch_data[2] batch_processed_image_labels = process_batch_wrapper(list(zip(batch_image, batch_labels, batch_filenames)), geometric_pipeline, intensity_pipeline, device) save_batch_results(batch_processed_image_labels) except Exception as e: print(e) end_time = time.time() elapsed_time = end_time - start_time print(f"Total processing time: {elapsed_time:.2f} seconds") images_per_hour = (total_files_processed / elapsed_time) * 3600 print(f"Processed images per hour: {images_per_hour:.2f}") print("Augmentation process completed.") if __name__ == '__main__': main()