import json import os import random import threading import time import uuid from contextlib import contextmanager from pathlib import Path import jwt as pyjwt import pytest import requests import sseclient from pytest import ExitCode def pytest_collection_modifyitems(items): early = [] rest = [] for item in items: if "Step01PreInit" in item.nodeid or "Step02LazyInit" in item.nodeid: early.append(item) else: rest.append(item) items[:] = early + rest @pytest.hookimpl(trylast=True) def pytest_sessionfinish(session, exitstatus): if exitstatus in (ExitCode.NO_TESTS_COLLECTED, 5): session.exitstatus = ExitCode.OK class _SessionWithBase(requests.Session): def __init__(self, base: str, default_timeout: float = 30): super().__init__() self._base = base.rstrip("/") self._default_timeout = default_timeout def request(self, method, url, *args, **kwargs): if url.startswith("http://") or url.startswith("https://"): full = url else: path = url if url.startswith("/") else f"/{url}" full = f"{self._base}{path}" kwargs.setdefault("timeout", self._default_timeout) return super().request(method, full, *args, **kwargs) @pytest.fixture(scope="session") def base_url(): return os.environ.get("BASE_URL", "http://detections:8080") @pytest.fixture(scope="session") def http_client(base_url): return _SessionWithBase(base_url, 30) @pytest.fixture(scope="session") def jwt_secret(): return os.environ.get("JWT_SECRET", "") @pytest.fixture(scope="session") def jwt_token(jwt_secret): if not jwt_secret: return "" return pyjwt.encode( {"sub": "test-user", "exp": int(time.time()) + 3600}, jwt_secret, algorithm="HS256", ) @pytest.fixture(scope="session") def auth_headers(jwt_token): return {"Authorization": f"Bearer {jwt_token}"} if jwt_token else {} @pytest.fixture def channel_id(): return str(uuid.uuid4()) @pytest.fixture(scope="session") def image_detect(http_client, auth_headers): def _detect(image_bytes, filename="img.jpg", config=None, timeout=30): cid = str(uuid.uuid4()) headers = {**auth_headers, "X-Channel-Id": cid} data_form = {} if config: data_form["config"] = config t0 = time.perf_counter() r = http_client.post( "/detect/image", files={"file": (filename, image_bytes, "image/jpeg")}, data=data_form, headers=headers, timeout=timeout, ) assert r.status_code == 202, f"Expected 202, got {r.status_code}: {r.text}" events = _read_sse_events_until_terminal( http_client, cid, auth_headers, timeout=timeout ) elapsed_ms = (time.perf_counter() - t0) * 1000.0 assert events, f"No SSE events received on channel {cid}" assert events[-1].get("mediaStatus") in ("AIProcessed", "Error") detections = [] for event in events: if event.get("mediaStatus") == "AIProcessing": detections.extend(event.get("annotations", [])) return detections, elapsed_ms return _detect def _read_sse_events_until_terminal(http_client, channel_id, headers, timeout=30): deadline = time.time() + timeout after_ts = 0 collected = [] last_error = None while time.time() < deadline: remaining = max(0.1, deadline - time.time()) read_timeout = min(5, remaining) try: with http_client.get( f"/detect/events/{channel_id}?after_ts={after_ts}", stream=True, timeout=(5, read_timeout), headers=headers, ) as resp: resp.raise_for_status() for ev in sseclient.SSEClient(resp).events(): if ev.id: try: after_ts = max(after_ts, int(ev.id)) except ValueError: pass if not ev.data or not str(ev.data).strip(): continue data = json.loads(ev.data) collected.append(data) if data.get("mediaStatus") in ("AIProcessed", "Error"): return collected if time.time() >= deadline: break except (requests.exceptions.RequestException, OSError) as exc: last_error = exc time.sleep(0.2) if collected: return collected if last_error is not None: raise AssertionError( f"SSE timed out on channel {channel_id}: {last_error}" ) from last_error return collected @pytest.fixture(scope="session") def sse_events_until_terminal(http_client, auth_headers): def _read(channel_id, headers=None, timeout=30): return _read_sse_events_until_terminal( http_client, channel_id, headers or auth_headers, timeout=timeout ) return _read @pytest.fixture def sse_client_factory(http_client, auth_headers): @contextmanager def _open(channel_id: str): with http_client.get( f"/detect/events/{channel_id}", stream=True, timeout=600, headers=auth_headers, ) as resp: resp.raise_for_status() yield sseclient.SSEClient(resp) return _open @pytest.fixture(scope="session") def mock_loader_url(): return os.environ.get("MOCK_LOADER_URL", "http://mock-loader:8080") @pytest.fixture(scope="session") def mock_annotations_url(): return os.environ.get("MOCK_ANNOTATIONS_URL", "http://mock-annotations:8081") @pytest.fixture(scope="session", autouse=True) def wait_for_services(base_url, mock_loader_url, mock_annotations_url): urls = [ f"{base_url}/health", f"{mock_loader_url}/mock/status", f"{mock_annotations_url}/mock/status", ] deadline = time.time() + 120 while time.time() < deadline: ok = True for u in urls: try: r = requests.get(u, timeout=5) if r.status_code != 200: ok = False break except OSError: ok = False break if ok: return time.sleep(2) pytest.fail("services not ready within 120s") @pytest.fixture(autouse=True) def reset_mocks(mock_loader_url, mock_annotations_url): requests.post(f"{mock_loader_url}/mock/reset", timeout=10) requests.post(f"{mock_annotations_url}/mock/reset", timeout=10) yield def _media_dir() -> Path: return Path(os.environ.get("MEDIA_DIR", "/media")) def _read_media(name: str) -> bytes: p = _media_dir() / name if not p.is_file(): pytest.skip(f"missing {p}") return p.read_bytes() def _health(http_client): r = http_client.get("/health") r.raise_for_status() return r.json() def _health_ai_active(data: dict) -> bool: return data.get("aiAvailability") == "Enabled" def _wait_for_ai_active(http_client, timeout: float = 30) -> dict | None: deadline = time.time() + timeout last = None while time.time() < deadline: last = _health(http_client) if _health_ai_active(last): return last time.sleep(1) return last @pytest.fixture(scope="session") def media_dir(): return str(_media_dir()) @pytest.fixture(scope="session") def image_small(): return _read_media("image_small.jpg") @pytest.fixture(scope="session") def image_large(): return _read_media("image_large.JPG") @pytest.fixture(scope="session") def image_different_types(): return _read_media("image_different_types.jpg") @pytest.fixture(scope="session") def image_empty_scene(): return _read_media("image_empty_scene.jpg") @pytest.fixture(scope="session") def image_dense(): for name in ("image_dense.jpg", "image_dense01.jpg", "image_dense02.jpg"): p = _media_dir() / name if p.is_file(): return p.read_bytes() pytest.skip("missing dense image fixture") @pytest.fixture(scope="session") def video_short_path(): return str(_media_dir() / "video_test01.mp4") @pytest.fixture(scope="session") def video_short_02_path(): return str(_media_dir() / "video_short02.mp4") @pytest.fixture(scope="session") def video_long_path(): return str(_media_dir() / "video_long03.mp4") @pytest.fixture(scope="session") def empty_image(): return b"" @pytest.fixture(scope="session") def corrupt_image(): random.seed(42) return random.randbytes(1024) @pytest.fixture(scope="module") def warm_engine(http_client, image_small, auth_headers): timeout = int(os.environ.get("E2E_ENGINE_WAIT_TIMEOUT", "900")) deadline = time.time() + timeout last_status = None consecutive_errors = 0 while time.time() < deadline: try: current_health = _health(http_client) if _health_ai_active(current_health): return except OSError: pass cid = str(uuid.uuid4()) headers = {**auth_headers, "X-Channel-Id": cid} done = threading.Event() terminal_status = None def _listen(cid=cid): nonlocal terminal_status try: with http_client.get( f"/detect/events/{cid}", stream=True, timeout=35, headers=auth_headers, ) as resp: resp.raise_for_status() for ev in sseclient.SSEClient(resp).events(): if not ev.data or not str(ev.data).strip(): continue data = json.loads(ev.data) terminal_status = data.get("mediaStatus") if terminal_status in ("AIProcessed", "Error"): break except Exception: pass finally: done.set() th = threading.Thread(target=_listen, daemon=True) th.start() time.sleep(0.1) try: r = http_client.post( "/detect/image", files={"file": ("warm.jpg", image_small, "image/jpeg")}, headers=headers, ) last_status = r.status_code if r.status_code == 202: completed = done.wait(timeout=30) th.join(timeout=1) if completed and terminal_status == "AIProcessed": if _health_ai_active(_wait_for_ai_active(http_client, timeout=30) or {}): return if completed and terminal_status == "Error": consecutive_errors += 1 else: active_health = _wait_for_ai_active(http_client, timeout=10) if _health_ai_active(active_health or {}): return time.sleep(2) continue if r.status_code >= 500: consecutive_errors += 1 if consecutive_errors >= 5: pytest.fail( f"engine warm-up aborted: {consecutive_errors} consecutive " f"HTTP {last_status} errors — server is broken, not starting up" ) else: consecutive_errors = 0 except OSError: consecutive_errors = 0 th.join(timeout=1) time.sleep(2) pytest.fail(f"engine warm-up timed out after {timeout}s (last status: {last_status})")