mirror of
https://github.com/azaion/detections.git
synced 2026-06-21 09:41:09 +00:00
Fixed stabilize e2e SSE event handling
This commit is contained in:
+64
-37
@@ -88,38 +88,6 @@ def image_detect(http_client, auth_headers):
|
|||||||
def _detect(image_bytes, filename="img.jpg", config=None, timeout=30):
|
def _detect(image_bytes, filename="img.jpg", config=None, timeout=30):
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
headers = {**auth_headers, "X-Channel-Id": cid}
|
headers = {**auth_headers, "X-Channel-Id": cid}
|
||||||
detections = []
|
|
||||||
errors = []
|
|
||||||
done = threading.Event()
|
|
||||||
connected = threading.Event()
|
|
||||||
|
|
||||||
def _listen():
|
|
||||||
try:
|
|
||||||
with http_client.get(
|
|
||||||
f"/detect/events/{cid}",
|
|
||||||
stream=True,
|
|
||||||
timeout=timeout + 2,
|
|
||||||
headers=auth_headers,
|
|
||||||
) as resp:
|
|
||||||
resp.raise_for_status()
|
|
||||||
connected.set()
|
|
||||||
for ev in sseclient.SSEClient(resp).events():
|
|
||||||
if not ev.data or not str(ev.data).strip():
|
|
||||||
continue
|
|
||||||
data = json.loads(ev.data)
|
|
||||||
if data.get("mediaStatus") == "AIProcessing":
|
|
||||||
detections.extend(data.get("annotations", []))
|
|
||||||
if data.get("mediaStatus") in ("AIProcessed", "Error"):
|
|
||||||
break
|
|
||||||
except BaseException as e:
|
|
||||||
errors.append(e)
|
|
||||||
finally:
|
|
||||||
connected.set()
|
|
||||||
done.set()
|
|
||||||
|
|
||||||
th = threading.Thread(target=_listen, daemon=True)
|
|
||||||
th.start()
|
|
||||||
connected.wait(timeout=5)
|
|
||||||
|
|
||||||
data_form = {}
|
data_form = {}
|
||||||
if config:
|
if config:
|
||||||
@@ -133,19 +101,78 @@ def image_detect(http_client, auth_headers):
|
|||||||
headers=headers,
|
headers=headers,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
completed = done.wait(timeout=timeout)
|
assert r.status_code == 202, f"Expected 202, got {r.status_code}: {r.text}"
|
||||||
|
events = _read_sse_events_until_terminal(
|
||||||
|
http_client, cid, auth_headers, timeout=timeout
|
||||||
|
)
|
||||||
elapsed_ms = (time.perf_counter() - t0) * 1000.0
|
elapsed_ms = (time.perf_counter() - t0) * 1000.0
|
||||||
|
|
||||||
assert r.status_code == 202, f"Expected 202, got {r.status_code}: {r.text}"
|
assert events, f"No SSE events received on channel {cid}"
|
||||||
assert completed, f"Timed out waiting for terminal SSE event on channel {cid}"
|
assert events[-1].get("mediaStatus") in ("AIProcessed", "Error")
|
||||||
assert not errors, f"SSE errors: {errors}"
|
|
||||||
|
|
||||||
th.join(timeout=1)
|
detections = []
|
||||||
|
for event in events:
|
||||||
|
if event.get("mediaStatus") == "AIProcessing":
|
||||||
|
detections.extend(event.get("annotations", []))
|
||||||
return detections, elapsed_ms
|
return detections, elapsed_ms
|
||||||
|
|
||||||
return _detect
|
return _detect
|
||||||
|
|
||||||
|
|
||||||
|
def _read_sse_events_until_terminal(http_client, channel_id, headers, timeout=30):
|
||||||
|
deadline = time.time() + timeout
|
||||||
|
after_ts = 0
|
||||||
|
collected = []
|
||||||
|
last_error = None
|
||||||
|
|
||||||
|
while time.time() < deadline:
|
||||||
|
remaining = max(0.1, deadline - time.time())
|
||||||
|
read_timeout = min(5, remaining)
|
||||||
|
try:
|
||||||
|
with http_client.get(
|
||||||
|
f"/detect/events/{channel_id}?after_ts={after_ts}",
|
||||||
|
stream=True,
|
||||||
|
timeout=(5, read_timeout),
|
||||||
|
headers=headers,
|
||||||
|
) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
for ev in sseclient.SSEClient(resp).events():
|
||||||
|
if ev.id:
|
||||||
|
try:
|
||||||
|
after_ts = max(after_ts, int(ev.id))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
if not ev.data or not str(ev.data).strip():
|
||||||
|
continue
|
||||||
|
data = json.loads(ev.data)
|
||||||
|
collected.append(data)
|
||||||
|
if data.get("mediaStatus") in ("AIProcessed", "Error"):
|
||||||
|
return collected
|
||||||
|
if time.time() >= deadline:
|
||||||
|
break
|
||||||
|
except (requests.exceptions.RequestException, OSError) as exc:
|
||||||
|
last_error = exc
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
if collected:
|
||||||
|
return collected
|
||||||
|
if last_error is not None:
|
||||||
|
raise AssertionError(
|
||||||
|
f"SSE timed out on channel {channel_id}: {last_error}"
|
||||||
|
) from last_error
|
||||||
|
return collected
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def sse_events_until_terminal(http_client, auth_headers):
|
||||||
|
def _read(channel_id, headers=None, timeout=30):
|
||||||
|
return _read_sse_events_until_terminal(
|
||||||
|
http_client, channel_id, headers or auth_headers, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
return _read
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def sse_client_factory(http_client, auth_headers):
|
def sse_client_factory(http_client, auth_headers):
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
|||||||
@@ -1,10 +1,7 @@
|
|||||||
import json
|
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import sseclient
|
|
||||||
|
|
||||||
|
|
||||||
def _ai_config_video() -> dict:
|
def _ai_config_video() -> dict:
|
||||||
@@ -31,50 +28,16 @@ def test_ft_p08_immediate_async_response(
|
|||||||
|
|
||||||
@pytest.mark.timeout(30)
|
@pytest.mark.timeout(30)
|
||||||
def test_ft_p09_sse_event_delivery(
|
def test_ft_p09_sse_event_delivery(
|
||||||
warm_engine, http_client, jwt_token
|
warm_engine, http_client, jwt_token, sse_events_until_terminal
|
||||||
):
|
):
|
||||||
media_id = f"event-{uuid.uuid4().hex}"
|
media_id = f"event-{uuid.uuid4().hex}"
|
||||||
channel_id = str(uuid.uuid4())
|
channel_id = str(uuid.uuid4())
|
||||||
body = _ai_config_video()
|
body = _ai_config_video()
|
||||||
auth_header = {"Authorization": f"Bearer {jwt_token}"}
|
auth_header = {"Authorization": f"Bearer {jwt_token}"}
|
||||||
post_headers = {**auth_header, "X-Channel-Id": channel_id}
|
post_headers = {**auth_header, "X-Channel-Id": channel_id}
|
||||||
collected: list[dict] = []
|
|
||||||
thread_exc: list[BaseException] = []
|
|
||||||
first_event = threading.Event()
|
|
||||||
connected = threading.Event()
|
|
||||||
|
|
||||||
def _listen():
|
|
||||||
try:
|
|
||||||
with http_client.get(
|
|
||||||
f"/detect/events/{channel_id}",
|
|
||||||
stream=True,
|
|
||||||
timeout=600,
|
|
||||||
headers=auth_header,
|
|
||||||
) as resp:
|
|
||||||
resp.raise_for_status()
|
|
||||||
connected.set()
|
|
||||||
for event in sseclient.SSEClient(resp).events():
|
|
||||||
if not event.data or not str(event.data).strip():
|
|
||||||
continue
|
|
||||||
data = json.loads(event.data)
|
|
||||||
collected.append(data)
|
|
||||||
first_event.set()
|
|
||||||
if data.get("mediaStatus") in ("AIProcessed", "Error"):
|
|
||||||
break
|
|
||||||
except BaseException as e:
|
|
||||||
thread_exc.append(e)
|
|
||||||
finally:
|
|
||||||
connected.set()
|
|
||||||
first_event.set()
|
|
||||||
|
|
||||||
th = threading.Thread(target=_listen, daemon=True)
|
|
||||||
th.start()
|
|
||||||
connected.wait(timeout=5)
|
|
||||||
r = http_client.post(f"/detect/{media_id}", json=body, headers=post_headers)
|
r = http_client.post(f"/detect/{media_id}", json=body, headers=post_headers)
|
||||||
assert r.status_code == 202
|
assert r.status_code == 202
|
||||||
first_event.wait(timeout=5)
|
collected = sse_events_until_terminal(channel_id, headers=auth_header, timeout=20)
|
||||||
th.join(timeout=5)
|
|
||||||
assert not thread_exc, thread_exc
|
|
||||||
assert collected, "no SSE events received"
|
assert collected, "no SSE events received"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user