From b2af48a2e57e3b9a33fa7950253fc75b4ddf9503 Mon Sep 17 00:00:00 2001 From: Roman Meshko Date: Fri, 8 May 2026 18:05:51 +0300 Subject: [PATCH] Fixed stabilize e2e SSE event handling --- e2e/conftest.py | 101 +++++++++++++++++++++++------------- e2e/tests/test_async_sse.py | 41 +-------------- 2 files changed, 66 insertions(+), 76 deletions(-) diff --git a/e2e/conftest.py b/e2e/conftest.py index bdaf093..af3da3f 100644 --- a/e2e/conftest.py +++ b/e2e/conftest.py @@ -88,38 +88,6 @@ def image_detect(http_client, auth_headers): def _detect(image_bytes, filename="img.jpg", config=None, timeout=30): cid = str(uuid.uuid4()) headers = {**auth_headers, "X-Channel-Id": cid} - detections = [] - errors = [] - done = threading.Event() - connected = threading.Event() - - def _listen(): - try: - with http_client.get( - f"/detect/events/{cid}", - stream=True, - timeout=timeout + 2, - headers=auth_headers, - ) as resp: - resp.raise_for_status() - connected.set() - for ev in sseclient.SSEClient(resp).events(): - if not ev.data or not str(ev.data).strip(): - continue - data = json.loads(ev.data) - if data.get("mediaStatus") == "AIProcessing": - detections.extend(data.get("annotations", [])) - if data.get("mediaStatus") in ("AIProcessed", "Error"): - break - except BaseException as e: - errors.append(e) - finally: - connected.set() - done.set() - - th = threading.Thread(target=_listen, daemon=True) - th.start() - connected.wait(timeout=5) data_form = {} if config: @@ -133,19 +101,78 @@ def image_detect(http_client, auth_headers): headers=headers, timeout=timeout, ) - completed = done.wait(timeout=timeout) + assert r.status_code == 202, f"Expected 202, got {r.status_code}: {r.text}" + events = _read_sse_events_until_terminal( + http_client, cid, auth_headers, timeout=timeout + ) elapsed_ms = (time.perf_counter() - t0) * 1000.0 - assert r.status_code == 202, f"Expected 202, got {r.status_code}: {r.text}" - assert completed, f"Timed out waiting for terminal SSE event on channel {cid}" - assert not errors, f"SSE errors: {errors}" + assert events, f"No SSE events received on channel {cid}" + assert events[-1].get("mediaStatus") in ("AIProcessed", "Error") - th.join(timeout=1) + detections = [] + for event in events: + if event.get("mediaStatus") == "AIProcessing": + detections.extend(event.get("annotations", [])) return detections, elapsed_ms return _detect +def _read_sse_events_until_terminal(http_client, channel_id, headers, timeout=30): + deadline = time.time() + timeout + after_ts = 0 + collected = [] + last_error = None + + while time.time() < deadline: + remaining = max(0.1, deadline - time.time()) + read_timeout = min(5, remaining) + try: + with http_client.get( + f"/detect/events/{channel_id}?after_ts={after_ts}", + stream=True, + timeout=(5, read_timeout), + headers=headers, + ) as resp: + resp.raise_for_status() + for ev in sseclient.SSEClient(resp).events(): + if ev.id: + try: + after_ts = max(after_ts, int(ev.id)) + except ValueError: + pass + if not ev.data or not str(ev.data).strip(): + continue + data = json.loads(ev.data) + collected.append(data) + if data.get("mediaStatus") in ("AIProcessed", "Error"): + return collected + if time.time() >= deadline: + break + except (requests.exceptions.RequestException, OSError) as exc: + last_error = exc + time.sleep(0.2) + + if collected: + return collected + if last_error is not None: + raise AssertionError( + f"SSE timed out on channel {channel_id}: {last_error}" + ) from last_error + return collected + + +@pytest.fixture(scope="session") +def sse_events_until_terminal(http_client, auth_headers): + def _read(channel_id, headers=None, timeout=30): + return _read_sse_events_until_terminal( + http_client, channel_id, headers or auth_headers, timeout=timeout + ) + + return _read + + @pytest.fixture def sse_client_factory(http_client, auth_headers): @contextmanager diff --git a/e2e/tests/test_async_sse.py b/e2e/tests/test_async_sse.py index 0feb038..3ff6cee 100644 --- a/e2e/tests/test_async_sse.py +++ b/e2e/tests/test_async_sse.py @@ -1,10 +1,7 @@ -import json -import threading import time import uuid import pytest -import sseclient def _ai_config_video() -> dict: @@ -31,50 +28,16 @@ def test_ft_p08_immediate_async_response( @pytest.mark.timeout(30) def test_ft_p09_sse_event_delivery( - warm_engine, http_client, jwt_token + warm_engine, http_client, jwt_token, sse_events_until_terminal ): media_id = f"event-{uuid.uuid4().hex}" channel_id = str(uuid.uuid4()) body = _ai_config_video() auth_header = {"Authorization": f"Bearer {jwt_token}"} post_headers = {**auth_header, "X-Channel-Id": channel_id} - collected: list[dict] = [] - thread_exc: list[BaseException] = [] - first_event = threading.Event() - connected = threading.Event() - - def _listen(): - try: - with http_client.get( - f"/detect/events/{channel_id}", - stream=True, - timeout=600, - headers=auth_header, - ) as resp: - resp.raise_for_status() - connected.set() - for event in sseclient.SSEClient(resp).events(): - if not event.data or not str(event.data).strip(): - continue - data = json.loads(event.data) - collected.append(data) - first_event.set() - if data.get("mediaStatus") in ("AIProcessed", "Error"): - break - except BaseException as e: - thread_exc.append(e) - finally: - connected.set() - first_event.set() - - th = threading.Thread(target=_listen, daemon=True) - th.start() - connected.wait(timeout=5) r = http_client.post(f"/detect/{media_id}", json=body, headers=post_headers) assert r.status_code == 202 - first_event.wait(timeout=5) - th.join(timeout=5) - assert not thread_exc, thread_exc + collected = sse_events_until_terminal(channel_id, headers=auth_header, timeout=20) assert collected, "no SSE events received"