[AZ-153] [AZ-155] [AZ-156] [AZ-158] Add augmentation, dataset formation, label validation, model split tests

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-26 23:18:17 +02:00
parent 66fe1cc918
commit 41552c5699
7 changed files with 690 additions and 0 deletions
+13
View File
@@ -9,6 +9,19 @@ models/
*.rknn
*.mp4
venv
.venv
*.engine
*.log
*.pyc
# Binary / media / model files
*.onnx
*.mp4
*.avi
*.jpg
*.JPG
*.jpeg
*.png
# Test results
test-results/
+6
View File
@@ -0,0 +1,6 @@
[pytest]
markers =
performance: Performance/throughput tests
resilience: Resilience/error handling tests
security: Security tests
resource_limit: Resource limit tests
+103
View File
@@ -0,0 +1,103 @@
import shutil
import sys
import time
import types
from os import path as osp
from pathlib import Path
import pytest
import constants as c_mod
def _stub_train_dependencies():
if getattr(_stub_train_dependencies, "_done", False):
return
def add_mod(name):
if name in sys.modules:
return sys.modules[name]
m = types.ModuleType(name)
sys.modules[name] = m
return m
ultra = add_mod("ultralytics")
class YOLO:
pass
ultra.YOLO = YOLO
def fake_client(*_a, **_k):
return types.SimpleNamespace(
upload_fileobj=lambda *_a, **_k: None,
download_file=lambda *_a, **_k: None,
)
boto = add_mod("boto3")
boto.client = fake_client
add_mod("netron")
add_mod("requests")
_stub_train_dependencies._done = True
_stub_train_dependencies()
def _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
count,
corrupt_stems,
):
constants_patch(tmp_path)
import train
proc_img = Path(c_mod.processed_images_dir)
proc_lbl = Path(c_mod.processed_labels_dir)
proc_img.mkdir(parents=True, exist_ok=True)
proc_lbl.mkdir(parents=True, exist_ok=True)
imgs = sorted(fixture_images_dir.glob("*.jpg"))[:count]
for p in imgs:
stem = p.stem
shutil.copy2(fixture_images_dir / f"{stem}.jpg", proc_img / f"{stem}.jpg")
dst = proc_lbl / f"{stem}.txt"
shutil.copy2(fixture_labels_dir / f"{stem}.txt", dst)
if stem in corrupt_stems:
dst.write_text("0 1.5 0.5 0.1 0.1\n", encoding="utf-8")
today_ds = osp.join(c_mod.datasets_dir, train.today_folder)
monkeypatch.setattr(train, "today_dataset", today_ds)
monkeypatch.setattr(train, "processed_images_dir", c_mod.processed_images_dir)
monkeypatch.setattr(train, "processed_labels_dir", c_mod.processed_labels_dir)
monkeypatch.setattr(train, "corrupted_images_dir", c_mod.corrupted_images_dir)
monkeypatch.setattr(train, "corrupted_labels_dir", c_mod.corrupted_labels_dir)
monkeypatch.setattr(train, "datasets_dir", c_mod.datasets_dir)
return train
@pytest.mark.performance
def test_pt_dsf_01_dataset_formation_under_thirty_seconds(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
set(),
)
t0 = time.perf_counter()
train.form_dataset()
elapsed = time.perf_counter() - t0
assert elapsed <= 30.0
+260
View File
@@ -0,0 +1,260 @@
import random
import shutil
import sys
import types
from pathlib import Path
if "matplotlib" not in sys.modules:
_mpl = types.ModuleType("matplotlib")
_plt = types.ModuleType("matplotlib.pyplot")
_mpl.pyplot = _plt
sys.modules["matplotlib"] = _mpl
sys.modules["matplotlib.pyplot"] = _plt
import cv2
import numpy as np
from tests.conftest import apply_constants_patch
def _patch_augmentation_paths(monkeypatch, base: Path):
import augmentation as aug
import constants as c
apply_constants_patch(monkeypatch, base)
monkeypatch.setattr(aug, "data_images_dir", c.data_images_dir)
monkeypatch.setattr(aug, "data_labels_dir", c.data_labels_dir)
monkeypatch.setattr(aug, "processed_images_dir", c.processed_images_dir)
monkeypatch.setattr(aug, "processed_labels_dir", c.processed_labels_dir)
monkeypatch.setattr(aug, "processed_dir", c.processed_dir)
def _seed():
random.seed(42)
np.random.seed(42)
def _augment_annotation_with_total(monkeypatch):
import augmentation as aug
orig = aug.Augmentator.augment_annotation
def wrapped(self, image_file):
self.total_to_process = self.total_images_to_process
return orig(self, image_file)
monkeypatch.setattr(aug.Augmentator, "augment_annotation", wrapped)
def test_bt_aug_01_augment_inner_returns_eight_image_labels(
tmp_path, monkeypatch, fixture_images_dir, fixture_labels_dir
):
_patch_augmentation_paths(monkeypatch, tmp_path)
_seed()
from augmentation import Augmentator
stem = sorted(fixture_images_dir.glob("*.jpg"))[0].stem
img_path = fixture_images_dir / f"{stem}.jpg"
lbl_path = fixture_labels_dir / f"{stem}.txt"
img = cv2.imdecode(np.fromfile(str(img_path), dtype=np.uint8), cv2.IMREAD_COLOR)
aug = Augmentator()
labels = aug.read_labels(lbl_path)
proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg"
proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt"
proc_img.parent.mkdir(parents=True, exist_ok=True)
proc_lbl.parent.mkdir(parents=True, exist_ok=True)
from dto.imageLabel import ImageLabel
img_ann = ImageLabel(
image_path=str(proc_img),
image=img,
labels_path=str(proc_lbl),
labels=labels,
)
out = aug.augment_inner(img_ann)
assert len(out) == 8
def test_bt_aug_02_naming_convention(tmp_path, monkeypatch, fixture_images_dir, fixture_labels_dir):
_patch_augmentation_paths(monkeypatch, tmp_path)
_seed()
from augmentation import Augmentator
from dto.imageLabel import ImageLabel
stem = "test_image"
proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg"
proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt"
proc_img.parent.mkdir(parents=True, exist_ok=True)
proc_lbl.parent.mkdir(parents=True, exist_ok=True)
src_img = sorted(fixture_images_dir.glob("*.jpg"))[0]
img = cv2.imdecode(np.fromfile(str(src_img), dtype=np.uint8), cv2.IMREAD_COLOR)
lbl_path = fixture_labels_dir / f"{src_img.stem}.txt"
labels = Augmentator().read_labels(lbl_path)
aug = Augmentator()
img_ann = ImageLabel(
image_path=str(proc_img),
image=img,
labels_path=str(proc_lbl),
labels=labels,
)
out = aug.augment_inner(img_ann)
names = [Path(o.image_path).name for o in out]
expected = [f"{stem}.jpg"] + [f"{stem}_{i}.jpg" for i in range(1, 8)]
assert names == expected
lbl_names = [Path(o.labels_path).name for o in out]
expected_lbl = [f"{stem}.txt"] + [f"{stem}_{i}.txt" for i in range(1, 8)]
assert lbl_names == expected_lbl
def _all_coords_in_unit(labels_list):
for row in labels_list:
for j in range(4):
v = float(row[j])
if v < 0.0 or v > 1.0:
return False
return True
def test_bt_aug_03_all_bbox_coords_in_zero_one(
tmp_path, monkeypatch, fixture_images_dir, fixture_labels_dir
):
_patch_augmentation_paths(monkeypatch, tmp_path)
_seed()
from augmentation import Augmentator
from dto.imageLabel import ImageLabel
stem = sorted(fixture_images_dir.glob("*.jpg"))[0].stem
proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg"
proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt"
proc_img.parent.mkdir(parents=True, exist_ok=True)
proc_lbl.parent.mkdir(parents=True, exist_ok=True)
img_path = fixture_images_dir / f"{stem}.jpg"
lbl_path = fixture_labels_dir / f"{stem}.txt"
img = cv2.imdecode(np.fromfile(str(img_path), dtype=np.uint8), cv2.IMREAD_COLOR)
aug = Augmentator()
labels = aug.read_labels(lbl_path)
img_ann = ImageLabel(
image_path=str(proc_img),
image=img,
labels_path=str(proc_lbl),
labels=labels,
)
out = aug.augment_inner(img_ann)
for o in out:
for row in o.labels:
assert len(row) >= 5
assert _all_coords_in_unit(o.labels)
def test_bt_aug_04_correct_bboxes_clips_edge(tmp_path, monkeypatch):
_patch_augmentation_paths(monkeypatch, tmp_path)
from augmentation import Augmentator
aug = Augmentator()
m = aug.correct_margin
inp = [[0.99, 0.5, 0.2, 0.1, 0]]
res = aug.correct_bboxes(inp)
assert len(res) == 1
x, y, w, h, _ = res[0]
hw, hh = 0.5 * w, 0.5 * h
assert x - hw >= m - 1e-9
assert x + hw <= 1.0 - m + 1e-9
assert y - hh >= m - 1e-9
assert y + hh <= 1.0 - m + 1e-9
def test_bt_aug_05_tiny_bbox_removed_after_clipping(tmp_path, monkeypatch):
_patch_augmentation_paths(monkeypatch, tmp_path)
from augmentation import Augmentator
aug = Augmentator()
inp = [[0.995, 0.5, 0.01, 0.5, 0]]
res = aug.correct_bboxes(inp)
assert res == []
def test_bt_aug_06_empty_label_eight_outputs_empty_labels(
tmp_path, monkeypatch, fixture_images_dir
):
_patch_augmentation_paths(monkeypatch, tmp_path)
_seed()
from augmentation import Augmentator
from dto.imageLabel import ImageLabel
stem = "empty_case"
proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg"
proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt"
proc_img.parent.mkdir(parents=True, exist_ok=True)
proc_lbl.parent.mkdir(parents=True, exist_ok=True)
src_img = sorted(fixture_images_dir.glob("*.jpg"))[0]
img = cv2.imdecode(np.fromfile(str(src_img), dtype=np.uint8), cv2.IMREAD_COLOR)
aug = Augmentator()
img_ann = ImageLabel(
image_path=str(proc_img),
image=img,
labels_path=str(proc_lbl),
labels=[],
)
out = aug.augment_inner(img_ann)
assert len(out) == 8
for o in out:
assert o.labels == []
def test_bt_aug_07_full_pipeline_five_images_forty_outputs(
tmp_path, monkeypatch, sample_images_labels
):
_patch_augmentation_paths(monkeypatch, tmp_path)
_augment_annotation_with_total(monkeypatch)
_seed()
import constants as c
from augmentation import Augmentator
img_dir = Path(c.data_images_dir)
lbl_dir = Path(c.data_labels_dir)
img_dir.mkdir(parents=True, exist_ok=True)
lbl_dir.mkdir(parents=True, exist_ok=True)
src_img, src_lbl = sample_images_labels(5)
for p in src_img.glob("*.jpg"):
shutil.copy2(p, img_dir / p.name)
for p in src_lbl.glob("*.txt"):
shutil.copy2(p, lbl_dir / p.name)
Augmentator().augment_annotations()
proc_img = Path(c.processed_images_dir)
proc_lbl = Path(c.processed_labels_dir)
assert len(list(proc_img.glob("*.jpg"))) == 40
assert len(list(proc_lbl.glob("*.txt"))) == 40
def test_bt_aug_08_skips_already_processed(tmp_path, monkeypatch, sample_images_labels):
_patch_augmentation_paths(monkeypatch, tmp_path)
_augment_annotation_with_total(monkeypatch)
_seed()
import constants as c
from augmentation import Augmentator
img_dir = Path(c.data_images_dir)
lbl_dir = Path(c.data_labels_dir)
proc_img = Path(c.processed_images_dir)
proc_lbl = Path(c.processed_labels_dir)
img_dir.mkdir(parents=True, exist_ok=True)
lbl_dir.mkdir(parents=True, exist_ok=True)
proc_img.mkdir(parents=True, exist_ok=True)
proc_lbl.mkdir(parents=True, exist_ok=True)
src_img, src_lbl = sample_images_labels(5)
jpgs = sorted(src_img.glob("*.jpg"))
for p in jpgs:
shutil.copy2(p, img_dir / p.name)
for p in src_lbl.glob("*.txt"):
shutil.copy2(p, lbl_dir / p.name)
markers = []
for p in jpgs[:3]:
dst = proc_img / p.name
shutil.copy2(p, dst)
markers.append(dst.read_bytes())
Augmentator().augment_annotations()
after_jpgs = list(proc_img.glob("*.jpg"))
assert len(after_jpgs) == 19
assert len(list(proc_lbl.glob("*.txt"))) == 16
for i, p in enumerate(jpgs[:3]):
assert (proc_img / p.name).read_bytes() == markers[i]
+244
View File
@@ -0,0 +1,244 @@
import shutil
import sys
import types
from os import path as osp
from pathlib import Path
import pytest
import constants as c_mod
def _stub_train_dependencies():
if getattr(_stub_train_dependencies, "_done", False):
return
def add_mod(name):
if name in sys.modules:
return sys.modules[name]
m = types.ModuleType(name)
sys.modules[name] = m
return m
ultra = add_mod("ultralytics")
class YOLO:
pass
ultra.YOLO = YOLO
def fake_client(*_a, **_k):
return types.SimpleNamespace(
upload_fileobj=lambda *_a, **_k: None,
download_file=lambda *_a, **_k: None,
)
boto = add_mod("boto3")
boto.client = fake_client
add_mod("netron")
add_mod("requests")
_stub_train_dependencies._done = True
_stub_train_dependencies()
def _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
count,
corrupt_stems,
):
constants_patch(tmp_path)
import train
proc_img = Path(c_mod.processed_images_dir)
proc_lbl = Path(c_mod.processed_labels_dir)
proc_img.mkdir(parents=True, exist_ok=True)
proc_lbl.mkdir(parents=True, exist_ok=True)
imgs = sorted(fixture_images_dir.glob("*.jpg"))[:count]
for p in imgs:
stem = p.stem
shutil.copy2(fixture_images_dir / f"{stem}.jpg", proc_img / f"{stem}.jpg")
dst = proc_lbl / f"{stem}.txt"
shutil.copy2(fixture_labels_dir / f"{stem}.txt", dst)
if stem in corrupt_stems:
dst.write_text("0 1.5 0.5 0.1 0.1\n", encoding="utf-8")
today_ds = osp.join(c_mod.datasets_dir, train.today_folder)
monkeypatch.setattr(train, "today_dataset", today_ds)
monkeypatch.setattr(train, "processed_images_dir", c_mod.processed_images_dir)
monkeypatch.setattr(train, "processed_labels_dir", c_mod.processed_labels_dir)
monkeypatch.setattr(train, "corrupted_images_dir", c_mod.corrupted_images_dir)
monkeypatch.setattr(train, "corrupted_labels_dir", c_mod.corrupted_labels_dir)
monkeypatch.setattr(train, "datasets_dir", c_mod.datasets_dir)
return train
def _count_jpg(p):
return len(list(Path(p).glob("*.jpg")))
def test_bt_dsf_01_split_ratio_70_20_10(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
set(),
)
train.form_dataset()
base = train.today_dataset
assert _count_jpg(Path(base, "train", "images")) == 70
assert _count_jpg(Path(base, "valid", "images")) == 20
assert _count_jpg(Path(base, "test", "images")) == 10
def test_bt_dsf_02_six_subdirectories(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
set(),
)
train.form_dataset()
base = Path(train.today_dataset)
assert (base / "train" / "images").is_dir()
assert (base / "train" / "labels").is_dir()
assert (base / "valid" / "images").is_dir()
assert (base / "valid" / "labels").is_dir()
assert (base / "test" / "images").is_dir()
assert (base / "test" / "labels").is_dir()
def test_bt_dsf_03_total_files_one_hundred(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
set(),
)
train.form_dataset()
base = train.today_dataset
n = (
_count_jpg(Path(base, "train", "images"))
+ _count_jpg(Path(base, "valid", "images"))
+ _count_jpg(Path(base, "test", "images"))
)
assert n == 100
def test_bt_dsf_04_corrupted_labels_quarantined(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
stems = [p.stem for p in sorted(fixture_images_dir.glob("*.jpg"))[:100]]
corrupt = set(stems[:5])
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
corrupt,
)
train.form_dataset()
base = train.today_dataset
split_total = (
_count_jpg(Path(base, "train", "images"))
+ _count_jpg(Path(base, "valid", "images"))
+ _count_jpg(Path(base, "test", "images"))
)
assert split_total == 95
assert _count_jpg(c_mod.corrupted_images_dir) == 5
assert len(list(Path(c_mod.corrupted_labels_dir).glob("*.txt"))) == 5
@pytest.mark.resilience
def test_rt_dsf_01_empty_processed_no_crash(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
0,
set(),
)
train.form_dataset()
base = Path(train.today_dataset)
assert base.is_dir()
@pytest.mark.resource_limit
def test_rl_dsf_01_split_ratios_sum_hundred():
import train
assert train.train_set + train.valid_set + train.test_set == 100
@pytest.mark.resource_limit
def test_rl_dsf_02_no_filename_duplication_across_splits(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
):
train = _prepare_form_dataset(
monkeypatch,
tmp_path,
constants_patch,
fixture_images_dir,
fixture_labels_dir,
100,
set(),
)
train.form_dataset()
base = Path(train.today_dataset)
names = []
for split in ("train", "valid", "test"):
for f in (base / split / "images").glob("*.jpg"):
names.append(f.name)
assert len(names) == len(set(names))
assert len(names) == 100
+39
View File
@@ -0,0 +1,39 @@
import sys
import types
for _name in ("ultralytics", "boto3", "netron", "requests"):
if _name not in sys.modules:
sys.modules[_name] = types.ModuleType(_name)
sys.modules["ultralytics"].YOLO = type("YOLO", (), {})
sys.modules["boto3"].client = lambda *a, **k: None
from train import check_label
def test_bt_lbl_01_valid_label_returns_true(tmp_path):
p = tmp_path / "a.txt"
p.write_text("0 0.5 0.5 0.1 0.1", encoding="utf-8")
assert check_label(str(p)) is True
def test_bt_lbl_02_x_gt_one_returns_false(tmp_path):
p = tmp_path / "a.txt"
p.write_text("0 1.5 0.5 0.1 0.1", encoding="utf-8")
assert check_label(str(p)) is False
def test_bt_lbl_03_height_gt_one_returns_false(tmp_path):
p = tmp_path / "a.txt"
p.write_text("0 0.5 0.5 0.1 1.2", encoding="utf-8")
assert check_label(str(p)) is False
def test_bt_lbl_04_missing_file_returns_false(tmp_path):
p = tmp_path / "missing.txt"
assert check_label(str(p)) is False
def test_bt_lbl_05_multiline_one_corrupted_returns_false(tmp_path):
p = tmp_path / "a.txt"
p.write_text("0 0.5 0.5 0.1 0.1\n3 0.5 0.5 0.1 1.5", encoding="utf-8")
assert check_label(str(p)) is False
+25
View File
@@ -0,0 +1,25 @@
import os
import constants
def _split_encrypted(data: bytes):
part_small_size = min(
constants.SMALL_SIZE_KB * 1024, int(0.2 * len(data))
)
small = data[:part_small_size]
big = data[part_small_size:]
return small, big
def test_bt_spl_01_split_respects_size_constraint():
data = os.urandom(10000)
small, _ = _split_encrypted(data)
cap = max(constants.SMALL_SIZE_KB * 1024, int(0.2 * len(data)))
assert len(small) <= cap
def test_bt_spl_02_reassembly_equals_original():
data = os.urandom(10000)
small, big = _split_encrypted(data)
assert small + big == data