mirror of
https://github.com/azaion/detections.git
synced 2026-04-22 21:56:33 +00:00
a469579882
Made-with: Cursor
120 lines
3.4 KiB
Python
120 lines
3.4 KiB
Python
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
|
|
def test_nft_sec_01_malformed_multipart(base_url, http_client):
|
|
url = f"{base_url.rstrip('/')}/detect"
|
|
r1 = requests.post(
|
|
url,
|
|
data=b"not-multipart-body",
|
|
headers={"Content-Type": "multipart/form-data"},
|
|
timeout=30,
|
|
)
|
|
assert r1.status_code in (400, 422)
|
|
r2 = requests.post(
|
|
url,
|
|
data=b"does-not-match-boundary",
|
|
headers={"Content-Type": "multipart/form-data; boundary=----abc"},
|
|
timeout=30,
|
|
)
|
|
assert r2.status_code in (400, 422)
|
|
r3 = requests.post(
|
|
url,
|
|
files={"file": ("", b"", "")},
|
|
timeout=30,
|
|
)
|
|
assert r3.status_code in (400, 422)
|
|
assert http_client.get("/health").status_code == 200
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.timeout(300)
|
|
def test_nft_sec_02_oversized_request(http_client):
|
|
large = os.urandom(50 * 1024 * 1024)
|
|
try:
|
|
r = http_client.post(
|
|
"/detect",
|
|
files={"file": ("large.jpg", large, "image/jpeg")},
|
|
timeout=180,
|
|
)
|
|
except requests.RequestException:
|
|
pass
|
|
else:
|
|
assert r.status_code != 500
|
|
assert r.status_code in (413, 400, 422)
|
|
assert http_client.get("/health").status_code == 200
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.timeout(120)
|
|
def test_nft_sec_03_jwt_token_forwarding(
|
|
warm_engine,
|
|
http_client,
|
|
jwt_token,
|
|
mock_loader_url,
|
|
mock_annotations_url,
|
|
sse_client_factory,
|
|
):
|
|
media_id = f"sec-{uuid.uuid4().hex}"
|
|
body = {
|
|
"probability_threshold": 0.25,
|
|
"paths": [
|
|
f"{mock_loader_url.rstrip('/')}/load/video_short01.mp4",
|
|
],
|
|
"frame_period_recognition": 4,
|
|
"frame_recognition_seconds": 2,
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt_token}",
|
|
"x-refresh-token": "test-refresh-token",
|
|
}
|
|
collected: list[dict] = []
|
|
thread_exc: list[BaseException] = []
|
|
done = threading.Event()
|
|
|
|
def _listen():
|
|
try:
|
|
with sse_client_factory() as sse:
|
|
time.sleep(0.3)
|
|
for event in sse.events():
|
|
if not event.data or not str(event.data).strip():
|
|
continue
|
|
data = json.loads(event.data)
|
|
if data.get("mediaId") != media_id:
|
|
continue
|
|
collected.append(data)
|
|
if (
|
|
data.get("mediaStatus") == "AIProcessed"
|
|
and data.get("mediaPercent") == 100
|
|
):
|
|
break
|
|
except BaseException as e:
|
|
thread_exc.append(e)
|
|
finally:
|
|
done.set()
|
|
|
|
th = threading.Thread(target=_listen, daemon=True)
|
|
th.start()
|
|
time.sleep(0.5)
|
|
r = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
|
|
assert r.status_code == 200
|
|
ok = done.wait(timeout=120)
|
|
assert ok, "SSE listener did not finish within 120s"
|
|
th.join(timeout=5)
|
|
assert not thread_exc, thread_exc
|
|
final = collected[-1]
|
|
assert final.get("mediaStatus") == "AIProcessed"
|
|
assert final.get("mediaPercent") == 100
|
|
ar = requests.get(f"{mock_annotations_url}/mock/annotations", timeout=30)
|
|
ar.raise_for_status()
|
|
anns = ar.json().get("annotations") or []
|
|
assert any(
|
|
isinstance(a, dict) and a.get("mediaId") == media_id for a in anns
|
|
), anns
|