Files
detections/e2e/conftest.py
T
Oleksandr Bezdieniezhnykh 097811a67b [AZ-178] Fix Critical/High security findings: auth, CVEs, non-root containers, per-job SSE
- Pin all deps; h11==0.16.0 (CVE-2025-43859), python-multipart>=1.3.1 (CVE-2026-28356), PyJWT==2.12.1
- Add HMAC JWT verification (require_auth FastAPI dependency, JWT_SECRET-gated)
- Fix TokenManager._refresh() to use ADMIN_API_URL instead of ANNOTATIONS_URL
- Rename POST /detect → POST /detect/image (image-only, rejects video files)
- Replace global SSE stream with per-job SSE: GET /detect/{media_id} with event replay buffer
- Apply require_auth to all 4 protected endpoints
- Fix on_annotation/on_status closure to use mutable current_id for correct post-upload event routing
- Add non-root appuser to Dockerfile and Dockerfile.gpu
- Add JWT_SECRET to e2e/docker-compose.test.yml and run-tests.sh
- Update all e2e tests and unit tests for new endpoints and HMAC token signing
- 64/64 tests pass

Made-with: Cursor
2026-04-02 06:32:12 +03:00

230 lines
5.9 KiB
Python

import os
import random
import time
from contextlib import contextmanager
from pathlib import Path
import jwt as pyjwt
import pytest
import requests
import sseclient
from pytest import ExitCode
def pytest_collection_modifyitems(items):
early = []
rest = []
for item in items:
if "Step01PreInit" in item.nodeid or "Step02LazyInit" in item.nodeid:
early.append(item)
else:
rest.append(item)
items[:] = early + rest
@pytest.hookimpl(trylast=True)
def pytest_sessionfinish(session, exitstatus):
if exitstatus in (ExitCode.NO_TESTS_COLLECTED, 5):
session.exitstatus = ExitCode.OK
class _SessionWithBase(requests.Session):
def __init__(self, base: str, default_timeout: float = 30):
super().__init__()
self._base = base.rstrip("/")
self._default_timeout = default_timeout
def request(self, method, url, *args, **kwargs):
if url.startswith("http://") or url.startswith("https://"):
full = url
else:
path = url if url.startswith("/") else f"/{url}"
full = f"{self._base}{path}"
kwargs.setdefault("timeout", self._default_timeout)
return super().request(method, full, *args, **kwargs)
@pytest.fixture(scope="session")
def base_url():
return os.environ.get("BASE_URL", "http://detections:8080")
@pytest.fixture(scope="session")
def http_client(base_url):
return _SessionWithBase(base_url, 30)
@pytest.fixture(scope="session")
def jwt_secret():
return os.environ.get("JWT_SECRET", "")
@pytest.fixture(scope="session")
def jwt_token(jwt_secret):
if not jwt_secret:
return ""
return pyjwt.encode(
{"sub": "test-user", "exp": int(time.time()) + 3600},
jwt_secret,
algorithm="HS256",
)
@pytest.fixture(scope="session")
def auth_headers(jwt_token):
return {"Authorization": f"Bearer {jwt_token}"} if jwt_token else {}
@pytest.fixture
def sse_client_factory(http_client, auth_headers):
@contextmanager
def _open(media_id: str):
with http_client.get(f"/detect/{media_id}", stream=True,
timeout=600, headers=auth_headers) as resp:
resp.raise_for_status()
yield sseclient.SSEClient(resp)
return _open
@pytest.fixture(scope="session")
def mock_loader_url():
return os.environ.get("MOCK_LOADER_URL", "http://mock-loader:8080")
@pytest.fixture(scope="session")
def mock_annotations_url():
return os.environ.get("MOCK_ANNOTATIONS_URL", "http://mock-annotations:8081")
@pytest.fixture(scope="session", autouse=True)
def wait_for_services(base_url, mock_loader_url, mock_annotations_url):
urls = [
f"{base_url}/health",
f"{mock_loader_url}/mock/status",
f"{mock_annotations_url}/mock/status",
]
deadline = time.time() + 120
while time.time() < deadline:
ok = True
for u in urls:
try:
r = requests.get(u, timeout=5)
if r.status_code != 200:
ok = False
break
except OSError:
ok = False
break
if ok:
return
time.sleep(2)
pytest.fail("services not ready within 120s")
@pytest.fixture(autouse=True)
def reset_mocks(mock_loader_url, mock_annotations_url):
requests.post(f"{mock_loader_url}/mock/reset", timeout=10)
requests.post(f"{mock_annotations_url}/mock/reset", timeout=10)
yield
def _media_dir() -> Path:
return Path(os.environ.get("MEDIA_DIR", "/media"))
def _read_media(name: str) -> bytes:
p = _media_dir() / name
if not p.is_file():
pytest.skip(f"missing {p}")
return p.read_bytes()
@pytest.fixture(scope="session")
def media_dir():
return str(_media_dir())
@pytest.fixture(scope="session")
def image_small():
return _read_media("image_small.jpg")
@pytest.fixture(scope="session")
def image_large():
return _read_media("image_large.JPG")
@pytest.fixture(scope="session")
def image_dense():
return _read_media("image_dense01.jpg")
@pytest.fixture(scope="session")
def image_dense_02():
return _read_media("image_dense02.jpg")
@pytest.fixture(scope="session")
def image_different_types():
return _read_media("image_different_types.jpg")
@pytest.fixture(scope="session")
def image_empty_scene():
return _read_media("image_empty_scene.jpg")
@pytest.fixture(scope="session")
def video_short_path():
return str(_media_dir() / "video_test01.mp4")
@pytest.fixture(scope="session")
def video_short_02_path():
return str(_media_dir() / "video_short02.mp4")
@pytest.fixture(scope="session")
def video_long_path():
return str(_media_dir() / "video_long03.mp4")
@pytest.fixture(scope="session")
def empty_image():
return b""
@pytest.fixture(scope="session")
def corrupt_image():
random.seed(42)
return random.randbytes(1024)
@pytest.fixture(scope="module")
def warm_engine(http_client, image_small, auth_headers):
deadline = time.time() + 120
files = {"file": ("warm.jpg", image_small, "image/jpeg")}
consecutive_errors = 0
last_status = None
while time.time() < deadline:
try:
r = http_client.post("/detect/image", files=files, headers=auth_headers)
if r.status_code == 200:
return
last_status = r.status_code
if r.status_code >= 500:
consecutive_errors += 1
if consecutive_errors >= 5:
pytest.fail(
f"engine warm-up aborted: {consecutive_errors} consecutive "
f"HTTP {last_status} errors — server is broken, not starting up"
)
else:
consecutive_errors = 0
except OSError:
consecutive_errors = 0
time.sleep(2)
pytest.fail(f"engine warm-up timed out after 120s (last status: {last_status})")