mirror of
https://github.com/azaion/ai-training.git
synced 2026-04-22 09:56:36 +00:00
aeb7f8ca8c
- Modified the existing-code workflow to automatically loop back to New Task after project completion without user confirmation. - Updated the autopilot state to reflect the current step as `done` and status as `completed`. - Clarified the deployment status report by specifying non-deployed services and their purposes. These changes enhance the automation of task management and improve documentation clarity.
214 lines
6.2 KiB
Python
214 lines
6.2 KiB
Python
import csv
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
_TESTS_DIR = Path(__file__).resolve().parent
|
|
_TEST_ROOT = _TESTS_DIR / "root"
|
|
_DATASET_IMAGES = _TEST_ROOT / "data" / "images"
|
|
_DATASET_LABELS = _TEST_ROOT / "data" / "labels"
|
|
_ONNX_MODEL = _PROJECT_ROOT / "_docs/00_problem/input_data/azaion.onnx"
|
|
_PT_MODEL = _PROJECT_ROOT / "_docs/00_problem/input_data/azaion-2025-03-10.pt"
|
|
_CLASSES_JSON = _PROJECT_ROOT / "src" / "classes.json"
|
|
_CONFIG_TEST = _PROJECT_ROOT / "config.test.yaml"
|
|
_MODELS_DIR = _TEST_ROOT / "models"
|
|
|
|
collect_ignore = ["security_test.py", "imagelabel_visualize_test.py"]
|
|
|
|
_E2E_MODULE = "test_training_e2e"
|
|
|
|
_test_results = []
|
|
|
|
|
|
def pytest_collection_modifyitems(items):
|
|
e2e = [i for i in items if _E2E_MODULE in i.nodeid]
|
|
rest = [i for i in items if _E2E_MODULE not in i.nodeid]
|
|
items[:] = e2e + rest
|
|
|
|
|
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
|
def pytest_runtest_makereport(item, call):
|
|
outcome = yield
|
|
report = outcome.get_result()
|
|
if report.when == "call" or (report.when == "setup" and report.skipped):
|
|
_test_results.append({
|
|
"module": item.nodeid.rsplit("::", 1)[0],
|
|
"name": item.name,
|
|
"result": report.outcome.upper(),
|
|
"duration": round(report.duration, 3),
|
|
})
|
|
|
|
|
|
def pytest_sessionfinish(session, exitstatus):
|
|
if _test_results:
|
|
results_dir = Path(__file__).resolve().parent / "test-results"
|
|
results_dir.mkdir(exist_ok=True)
|
|
|
|
with open(results_dir / "test-results.csv", "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["module", "test", "result", "duration_s"])
|
|
for r in _test_results:
|
|
writer.writerow([r["module"], r["name"], r["result"], f"{r['duration']:.3f}"])
|
|
|
|
import constants as c
|
|
test_config = c.Config.from_yaml(str(_CONFIG_TEST), root=str(_TEST_ROOT))
|
|
for d in (_DATASET_IMAGES, _DATASET_LABELS, test_config.datasets_dir,
|
|
test_config.corrupted_dir, str(_MODELS_DIR)):
|
|
shutil.rmtree(str(d), ignore_errors=True)
|
|
|
|
|
|
def apply_constants_patch(monkeypatch, base: Path):
|
|
import constants as c
|
|
monkeypatch.setattr(c, "config", c.Config.from_yaml(str(_CONFIG_TEST), root=str(base / "azaion")))
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def fixture_images_dir():
|
|
p = _DATASET_IMAGES
|
|
if not p.is_dir():
|
|
pytest.skip(f"missing dataset images: {p}")
|
|
return p
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def fixture_labels_dir():
|
|
p = _DATASET_LABELS
|
|
if not p.is_dir():
|
|
pytest.skip(f"missing dataset labels: {p}")
|
|
return p
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def fixture_onnx_model():
|
|
p = _ONNX_MODEL
|
|
if not p.is_file():
|
|
pytest.skip(f"missing onnx model: {p}")
|
|
return p.read_bytes()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def fixture_pt_model():
|
|
p = _PT_MODEL
|
|
if not p.is_file():
|
|
pytest.skip(f"missing pt model: {p}")
|
|
return str(p)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def fixture_classes_json():
|
|
p = _CLASSES_JSON
|
|
if not p.is_file():
|
|
pytest.skip(f"missing classes.json: {p}")
|
|
return p
|
|
|
|
|
|
@pytest.fixture
|
|
def constants_patch(monkeypatch):
|
|
def _apply(base: Path):
|
|
apply_constants_patch(monkeypatch, base)
|
|
|
|
return _apply
|
|
|
|
|
|
@pytest.fixture
|
|
def work_dir(tmp_path):
|
|
w = tmp_path / "work"
|
|
(w / "images").mkdir(parents=True)
|
|
(w / "labels").mkdir(parents=True)
|
|
return w
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_image_label(fixture_images_dir, fixture_labels_dir, tmp_path):
|
|
imgs = sorted(fixture_images_dir.glob("*.jpg"))
|
|
if not imgs:
|
|
raise RuntimeError("no images in fixture_images_dir")
|
|
stem = imgs[0].stem
|
|
src_img = fixture_images_dir / f"{stem}.jpg"
|
|
src_lbl = fixture_labels_dir / f"{stem}.txt"
|
|
dst_img = tmp_path / "images" / f"{stem}.jpg"
|
|
dst_lbl = tmp_path / "labels" / f"{stem}.txt"
|
|
dst_img.parent.mkdir(parents=True, exist_ok=True)
|
|
dst_lbl.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(src_img, dst_img)
|
|
shutil.copy2(src_lbl, dst_lbl)
|
|
return dst_img, dst_lbl
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_images_labels(fixture_images_dir, fixture_labels_dir, tmp_path):
|
|
def _factory(count: int):
|
|
if count < 1:
|
|
raise ValueError("count must be >= 1")
|
|
imgs = sorted(fixture_images_dir.glob("*.jpg"))
|
|
if count > len(imgs):
|
|
raise ValueError("count exceeds available images")
|
|
out_img = tmp_path / "images"
|
|
out_lbl = tmp_path / "labels"
|
|
out_img.mkdir(parents=True, exist_ok=True)
|
|
out_lbl.mkdir(parents=True, exist_ok=True)
|
|
for p in imgs[:count]:
|
|
stem = p.stem
|
|
shutil.copy2(fixture_images_dir / f"{stem}.jpg", out_img / f"{stem}.jpg")
|
|
shutil.copy2(fixture_labels_dir / f"{stem}.txt", out_lbl / f"{stem}.txt")
|
|
return out_img, out_lbl
|
|
|
|
return _factory
|
|
|
|
|
|
@pytest.fixture
|
|
def corrupted_label(tmp_path):
|
|
p = tmp_path / "labels" / "corrupted.txt"
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text("0 1.5 0.5 0.1 0.1\n", encoding="utf-8")
|
|
return p
|
|
|
|
|
|
@pytest.fixture
|
|
def edge_bbox_label(tmp_path):
|
|
p = tmp_path / "labels" / "edge.txt"
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text("0 0.01 0.5 0.02 0.3\n", encoding="utf-8")
|
|
return p
|
|
|
|
|
|
@pytest.fixture
|
|
def empty_label(tmp_path):
|
|
p = tmp_path / "labels" / "empty.txt"
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text("", encoding="utf-8")
|
|
return p
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def exported_models():
|
|
from ultralytics import YOLO
|
|
import constants as c
|
|
import exports as exports_mod
|
|
|
|
_MODELS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
pt_path = str(_MODELS_DIR / "test.pt")
|
|
YOLO("yolo11n.pt").save(pt_path)
|
|
|
|
old_config = c.config
|
|
c.config = c.Config.from_yaml(str(_CONFIG_TEST), root=str(_TEST_ROOT))
|
|
imgsz = c.config.export.onnx_imgsz
|
|
|
|
exports_mod.export_onnx(pt_path)
|
|
if sys.platform == "darwin":
|
|
exports_mod.export_coreml(pt_path)
|
|
|
|
c.config = old_config
|
|
|
|
onnx_files = list(_MODELS_DIR.glob("test*.onnx"))
|
|
return {
|
|
"onnx": str(onnx_files[0]) if onnx_files else None,
|
|
"model_dir": _MODELS_DIR,
|
|
"pt_path": pt_path,
|
|
"imgsz": imgsz,
|
|
}
|