From 0841e095c80306b9fd3a9ad2a77b4fcfdafc9e72 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Thu, 26 Mar 2026 23:21:05 +0200 Subject: [PATCH] [AZ-154] [AZ-157] [AZ-159] [AZ-160] Add augmentation nonfunc, encryption, annotation class, hardware hash tests Made-with: Cursor --- tests/performance/test_augmentation_perf.py | 126 +++++++++++++++++ tests/performance/test_encryption_perf.py | 18 +++ tests/test_annotation_classes.py | 79 +++++++++++ tests/test_augmentation_nonfunc.py | 148 ++++++++++++++++++++ tests/test_encryption.py | 94 +++++++++++++ tests/test_hardware_hash.py | 53 +++++++ 6 files changed, 518 insertions(+) create mode 100644 tests/performance/test_augmentation_perf.py create mode 100644 tests/performance/test_encryption_perf.py create mode 100644 tests/test_annotation_classes.py create mode 100644 tests/test_augmentation_nonfunc.py create mode 100644 tests/test_encryption.py create mode 100644 tests/test_hardware_hash.py diff --git a/tests/performance/test_augmentation_perf.py b/tests/performance/test_augmentation_perf.py new file mode 100644 index 0000000..0d359ba --- /dev/null +++ b/tests/performance/test_augmentation_perf.py @@ -0,0 +1,126 @@ +import concurrent.futures +import random +import shutil +import sys +import time +import types +from pathlib import Path + +import numpy as np +import pytest + +from tests.conftest import apply_constants_patch + +if "matplotlib" not in sys.modules: + _mpl = types.ModuleType("matplotlib") + _plt = types.ModuleType("matplotlib.pyplot") + _mpl.pyplot = _plt + sys.modules["matplotlib"] = _mpl + sys.modules["matplotlib.pyplot"] = _plt + + +def _patch_augmentation_paths(monkeypatch, base: Path): + import augmentation as aug + import constants as c + + apply_constants_patch(monkeypatch, base) + monkeypatch.setattr(aug, "data_images_dir", c.data_images_dir) + monkeypatch.setattr(aug, "data_labels_dir", c.data_labels_dir) + monkeypatch.setattr(aug, "processed_images_dir", c.processed_images_dir) + monkeypatch.setattr(aug, "processed_labels_dir", c.processed_labels_dir) + monkeypatch.setattr(aug, "processed_dir", c.processed_dir) + + +def _augment_annotation_with_total(monkeypatch): + import augmentation as aug + + orig = aug.Augmentator.augment_annotation + + def wrapped(self, image_file): + self.total_to_process = self.total_images_to_process + return orig(self, image_file) + + monkeypatch.setattr(aug.Augmentator, "augment_annotation", wrapped) + + +def _seed(): + random.seed(42) + np.random.seed(42) + + +@pytest.mark.performance +def test_pt_aug_01_throughput_ten_images_sixty_seconds( + tmp_path, monkeypatch, sample_images_labels +): + _patch_augmentation_paths(monkeypatch, tmp_path) + _augment_annotation_with_total(monkeypatch) + _seed() + import constants as c + from augmentation import Augmentator + + img_dir = Path(c.data_images_dir) + lbl_dir = Path(c.data_labels_dir) + img_dir.mkdir(parents=True, exist_ok=True) + lbl_dir.mkdir(parents=True, exist_ok=True) + src_img, src_lbl = sample_images_labels(10) + for p in src_img.glob("*.jpg"): + shutil.copy2(p, img_dir / p.name) + for p in src_lbl.glob("*.txt"): + shutil.copy2(p, lbl_dir / p.name) + t0 = time.perf_counter() + Augmentator().augment_annotations() + elapsed = time.perf_counter() - t0 + assert elapsed <= 60.0 + + +@pytest.mark.performance +def test_pt_aug_02_parallel_at_least_one_point_five_x_faster( + tmp_path, monkeypatch, sample_images_labels +): + _patch_augmentation_paths(monkeypatch, tmp_path) + _augment_annotation_with_total(monkeypatch) + _seed() + import constants as c + from augmentation import Augmentator + + img_dir = Path(c.data_images_dir) + lbl_dir = Path(c.data_labels_dir) + proc_dir = Path(c.processed_dir) + img_dir.mkdir(parents=True, exist_ok=True) + lbl_dir.mkdir(parents=True, exist_ok=True) + src_img, src_lbl = sample_images_labels(10) + for p in src_img.glob("*.jpg"): + shutil.copy2(p, img_dir / p.name) + for p in src_lbl.glob("*.txt"): + shutil.copy2(p, lbl_dir / p.name) + Path(c.processed_images_dir).mkdir(parents=True, exist_ok=True) + Path(c.processed_labels_dir).mkdir(parents=True, exist_ok=True) + names = sorted(p.name for p in img_dir.glob("*.jpg")) + + class _E: + __slots__ = ("name",) + + def __init__(self, name): + self.name = name + + entries = [_E(n) for n in names] + + aug_seq = Augmentator() + aug_seq.total_images_to_process = len(entries) + t0 = time.perf_counter() + for e in entries: + aug_seq.augment_annotation(e) + seq_elapsed = time.perf_counter() - t0 + + shutil.rmtree(proc_dir) + Path(c.processed_images_dir).mkdir(parents=True, exist_ok=True) + Path(c.processed_labels_dir).mkdir(parents=True, exist_ok=True) + + aug_par = Augmentator() + aug_par.total_images_to_process = len(entries) + t0 = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor() as ex: + list(ex.map(aug_par.augment_annotation, entries)) + par_elapsed = time.perf_counter() - t0 + + assert seq_elapsed >= par_elapsed * 1.5 diff --git a/tests/performance/test_encryption_perf.py b/tests/performance/test_encryption_perf.py new file mode 100644 index 0000000..63da3b6 --- /dev/null +++ b/tests/performance/test_encryption_perf.py @@ -0,0 +1,18 @@ +import os +import time + +import pytest + +from security import Security + + +@pytest.mark.performance +def test_pt_enc_01_encrypt_decrypt_10mb_within_five_seconds(): + key = "test-key" + data = os.urandom(10 * 1024 * 1024) + t0 = time.perf_counter() + enc = Security.encrypt_to(data, key) + out = Security.decrypt_to(enc, key) + elapsed = time.perf_counter() - t0 + assert elapsed <= 5.0 + assert out == data diff --git a/tests/test_annotation_classes.py b/tests/test_annotation_classes.py new file mode 100644 index 0000000..3036080 --- /dev/null +++ b/tests/test_annotation_classes.py @@ -0,0 +1,79 @@ +import re +import sys +import types + +import pytest + +from dto.annotationClass import AnnotationClass + + +def _stub_train_imports(): + if getattr(_stub_train_imports, "_done", False): + return + for _name in ("ultralytics", "boto3", "netron", "requests"): + if _name not in sys.modules: + sys.modules[_name] = types.ModuleType(_name) + sys.modules["ultralytics"].YOLO = type("YOLO", (), {}) + sys.modules["boto3"].client = lambda *a, **k: None + _stub_train_imports._done = True + + +def _name_lines_under_names(text): + lines = text.splitlines() + out = [] + in_block = False + for line in lines: + s = line.strip() + if s == "names:": + in_block = True + continue + if s.startswith("nc:"): + break + if in_block and s.startswith("-"): + out.append(s) + return out + + +_PLACEHOLDER_RE = re.compile(r"^-\s+Class-\d+\s*$") + + +@pytest.fixture +def data_yaml_text(monkeypatch, tmp_path, fixture_classes_json): + _stub_train_imports() + import train + + monkeypatch.setattr(train, "today_dataset", str(tmp_path)) + train.create_yaml() + return (tmp_path / "data.yaml").read_text(encoding="utf-8") + + +def test_bt_cls_01_base_classes(fixture_classes_json): + d = AnnotationClass.read_json() + norm = {k: d[k] for k in range(17)} + assert len(norm) == 17 + assert len({v.id for v in norm.values()}) == 17 + + +def test_bt_cls_02_weather_expansion(fixture_classes_json): + d = AnnotationClass.read_json() + assert d[0].name == "ArmorVehicle" + assert d[20].name == "ArmorVehicle(Wint)" + assert d[40].name == "ArmorVehicle(Night)" + + +@pytest.mark.resource_limit +def test_bt_cls_03_yaml_generation(data_yaml_text): + text = data_yaml_text + assert "nc: 80" in text + names = _name_lines_under_names(text) + placeholders = [ln for ln in names if _PLACEHOLDER_RE.match(ln)] + named = [ln for ln in names if not _PLACEHOLDER_RE.match(ln)] + assert len(names) == 80 + assert len(placeholders) == 29 + assert len(named) == 51 + + +@pytest.mark.resource_limit +def test_rl_cls_01_total_class_count(data_yaml_text): + names = _name_lines_under_names(data_yaml_text) + assert len(names) == 80 diff --git a/tests/test_augmentation_nonfunc.py b/tests/test_augmentation_nonfunc.py new file mode 100644 index 0000000..419a406 --- /dev/null +++ b/tests/test_augmentation_nonfunc.py @@ -0,0 +1,148 @@ +import random +import shutil +import sys +import types +from pathlib import Path +from types import SimpleNamespace + +import cv2 +import numpy as np +import pytest + +from tests.conftest import apply_constants_patch + +if "matplotlib" not in sys.modules: + _mpl = types.ModuleType("matplotlib") + _plt = types.ModuleType("matplotlib.pyplot") + _mpl.pyplot = _plt + sys.modules["matplotlib"] = _mpl + sys.modules["matplotlib.pyplot"] = _plt + + +def _patch_augmentation_paths(monkeypatch, base: Path): + import augmentation as aug + import constants as c + + apply_constants_patch(monkeypatch, base) + monkeypatch.setattr(aug, "data_images_dir", c.data_images_dir) + monkeypatch.setattr(aug, "data_labels_dir", c.data_labels_dir) + monkeypatch.setattr(aug, "processed_images_dir", c.processed_images_dir) + monkeypatch.setattr(aug, "processed_labels_dir", c.processed_labels_dir) + monkeypatch.setattr(aug, "processed_dir", c.processed_dir) + + +def _augment_annotation_with_total(monkeypatch): + import augmentation as aug + + orig = aug.Augmentator.augment_annotation + + def wrapped(self, image_file): + self.total_to_process = self.total_images_to_process + return orig(self, image_file) + + monkeypatch.setattr(aug.Augmentator, "augment_annotation", wrapped) + + +def _seed(): + random.seed(42) + np.random.seed(42) + + +@pytest.mark.resilience +def test_rt_aug_01_corrupted_image_skipped( + tmp_path, monkeypatch, fixture_images_dir, fixture_labels_dir +): + _patch_augmentation_paths(monkeypatch, tmp_path) + _augment_annotation_with_total(monkeypatch) + _seed() + import constants as c + from augmentation import Augmentator + + img_dir = Path(c.data_images_dir) + lbl_dir = Path(c.data_labels_dir) + img_dir.mkdir(parents=True, exist_ok=True) + lbl_dir.mkdir(parents=True, exist_ok=True) + stem = sorted(fixture_images_dir.glob("*.jpg"))[0].stem + shutil.copy2(fixture_images_dir / f"{stem}.jpg", img_dir / f"{stem}.jpg") + shutil.copy2(fixture_labels_dir / f"{stem}.txt", lbl_dir / f"{stem}.txt") + raw = (fixture_images_dir / f"{stem}.jpg").read_bytes()[:200] + (img_dir / "corrupted_trunc.jpg").write_bytes(raw) + Augmentator().augment_annotations() + proc_img = Path(c.processed_images_dir) + assert len(list(proc_img.glob("*.jpg"))) == 8 + + +@pytest.mark.resilience +def test_rt_aug_02_missing_label_no_crash(tmp_path, monkeypatch, fixture_images_dir): + _patch_augmentation_paths(monkeypatch, tmp_path) + _augment_annotation_with_total(monkeypatch) + import constants as c + from augmentation import Augmentator + + img_dir = Path(c.data_images_dir) + lbl_dir = Path(c.data_labels_dir) + img_dir.mkdir(parents=True, exist_ok=True) + lbl_dir.mkdir(parents=True, exist_ok=True) + stem = "no_label_here" + shutil.copy2(sorted(fixture_images_dir.glob("*.jpg"))[0], img_dir / f"{stem}.jpg") + aug = Augmentator() + aug.total_images_to_process = 1 + aug.augment_annotation(SimpleNamespace(name=f"{stem}.jpg")) + assert len(list(Path(c.processed_images_dir).glob("*.jpg"))) == 0 + + +@pytest.mark.resilience +def test_rt_aug_03_narrow_bbox_fewer_or_eight_variants( + tmp_path, monkeypatch, fixture_images_dir +): + _patch_augmentation_paths(monkeypatch, tmp_path) + _seed() + from augmentation import Augmentator + from dto.imageLabel import ImageLabel + + stem = "narrow_bbox" + proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg" + proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt" + proc_img.parent.mkdir(parents=True, exist_ok=True) + proc_lbl.parent.mkdir(parents=True, exist_ok=True) + src_img = sorted(fixture_images_dir.glob("*.jpg"))[0] + img = cv2.imdecode(np.fromfile(str(src_img), dtype=np.uint8), cv2.IMREAD_COLOR) + aug = Augmentator() + labels = [[0.5, 0.5, 0.0005, 0.0005, 0]] + img_ann = ImageLabel( + image_path=str(proc_img), + image=img, + labels_path=str(proc_lbl), + labels=labels, + ) + out = aug.augment_inner(img_ann) + assert 1 <= len(out) <= 8 + + +@pytest.mark.resource_limit +def test_rl_aug_01_augment_inner_exactly_eight_outputs( + tmp_path, monkeypatch, fixture_images_dir, fixture_labels_dir +): + _patch_augmentation_paths(monkeypatch, tmp_path) + _seed() + from augmentation import Augmentator + from dto.imageLabel import ImageLabel + + stem = sorted(fixture_images_dir.glob("*.jpg"))[0].stem + img_path = fixture_images_dir / f"{stem}.jpg" + lbl_path = fixture_labels_dir / f"{stem}.txt" + img = cv2.imdecode(np.fromfile(str(img_path), dtype=np.uint8), cv2.IMREAD_COLOR) + aug = Augmentator() + labels = aug.read_labels(lbl_path) + proc_img = Path(tmp_path) / "azaion" / "data-processed" / "images" / f"{stem}.jpg" + proc_lbl = Path(tmp_path) / "azaion" / "data-processed" / "labels" / f"{stem}.txt" + proc_img.parent.mkdir(parents=True, exist_ok=True) + proc_lbl.parent.mkdir(parents=True, exist_ok=True) + img_ann = ImageLabel( + image_path=str(proc_img), + image=img, + labels_path=str(proc_lbl), + labels=labels, + ) + out = aug.augment_inner(img_ann) + assert len(out) == 8 diff --git a/tests/test_encryption.py b/tests/test_encryption.py new file mode 100644 index 0000000..1e5a127 --- /dev/null +++ b/tests/test_encryption.py @@ -0,0 +1,94 @@ +import os + +import pytest + +from security import Security + + +def test_bt_enc_01_roundtrip_1024_random_bytes(): + key = "test-key" + data = os.urandom(1024) + enc = Security.encrypt_to(data, key) + assert Security.decrypt_to(enc, key) == data + + +def test_bt_enc_02_roundtrip_onnx_model(fixture_onnx_model): + key = Security.get_model_encryption_key() + data = fixture_onnx_model + enc = Security.encrypt_to(data, key) + assert Security.decrypt_to(enc, key) == data + + +def test_bt_enc_03_roundtrip_empty_input(): + key = "k" + data = b"" + enc = Security.encrypt_to(data, key) + assert Security.decrypt_to(enc, key) == b"" + + +def test_bt_enc_04_roundtrip_single_zero_byte(): + key = "k" + data = b"\x00" + enc = Security.encrypt_to(data, key) + assert Security.decrypt_to(enc, key) == b"\x00" + + +def test_bt_enc_05_same_data_different_keys_different_ciphertext(): + data = b"payload" + a = Security.encrypt_to(data, "key-a") + b = Security.encrypt_to(data, "key-b") + assert a != b + + +def test_bt_enc_06_decrypt_wrong_key_not_equal_original(): + original = b"secret" + enc = Security.encrypt_to(original, "key-a") + out = Security.decrypt_to(enc, "key-b") + assert out != original + + +@pytest.mark.resilience +def test_rt_enc_01_corrupted_ciphertext(): + key = "k" + original = b"hello world" + enc = Security.encrypt_to(original, key) + corrupted = bytearray(enc) + corrupted[len(corrupted) // 2] ^= 0xFF + try: + result = Security.decrypt_to(bytes(corrupted), key) + except Exception: + return + assert result != original + + +@pytest.mark.security +def test_st_enc_01_same_data_same_key_two_encryptions_differ(): + key = "k" + data = b"x" * 64 + a = Security.encrypt_to(data, key) + b = Security.encrypt_to(data, key) + assert a != b + + +@pytest.mark.security +def test_st_enc_02_wrong_key_cannot_recover_plaintext(): + original = b"data" + enc = Security.encrypt_to(original, "key-one") + out = Security.decrypt_to(enc, "key-two") + assert out != original + + +@pytest.mark.security +def test_st_enc_03_model_encryption_key_deterministic(): + a = Security.get_model_encryption_key() + b = Security.get_model_encryption_key() + assert a == b + + +@pytest.mark.resource_limit +def test_rl_enc_01_encrypted_size_at_most_plaintext_plus_32(): + key = "k" + for n in (0, 1, 15, 16, 17, 1024, 4096): + data = os.urandom(n) + enc = Security.encrypt_to(data, key) + assert len(enc) <= n + 32 diff --git a/tests/test_hardware_hash.py b/tests/test_hardware_hash.py new file mode 100644 index 0000000..bb997ce --- /dev/null +++ b/tests/test_hardware_hash.py @@ -0,0 +1,53 @@ +import re +from types import SimpleNamespace + +import pytest + +from security import Security + + +def test_bt_hsh_01_deterministic_hw_hash(): + h1 = Security.get_hw_hash("test-hardware-info") + h2 = Security.get_hw_hash("test-hardware-info") + assert h1 == h2 + + +def test_bt_hsh_02_different_hardware_different_hash(): + assert Security.get_hw_hash("hw-a") != Security.get_hw_hash("hw-b") + + +def test_bt_hsh_03_output_valid_base64(): + h = Security.get_hw_hash("test-hardware-info") + assert re.match(r"^[A-Za-z0-9+/]+=*$", h) + + +@pytest.mark.security +def test_st_hsh_01_hardware_hash_deterministic(): + h1 = Security.get_hw_hash("test-hardware-info") + h2 = Security.get_hw_hash("test-hardware-info") + assert h1 == h2 + + +@pytest.mark.security +def test_st_hsh_02_different_hardware_different_hash(): + assert Security.get_hw_hash("hw-a") != Security.get_hw_hash("hw-b") + + +@pytest.mark.security +def test_st_hsh_03_api_key_depends_on_hardware(): + creds = SimpleNamespace(email="a@b.com", password="pass1") + hw1 = Security.get_hw_hash("hw-1") + hw2 = Security.get_hw_hash("hw-2") + k1 = Security.get_api_encryption_key(creds, hw1) + k2 = Security.get_api_encryption_key(creds, hw2) + assert k1 != k2 + + +@pytest.mark.security +def test_st_hsh_04_api_key_depends_on_credentials(): + hw = Security.get_hw_hash("fixed-hw") + c1 = SimpleNamespace(email="a@b.com", password="pass1") + c2 = SimpleNamespace(email="x@y.com", password="pass2") + k1 = Security.get_api_encryption_key(c1, hw) + k2 = Security.get_api_encryption_key(c2, hw) + assert k1 != k2