From 080f257423f3ae261a8c77379f2d4614030511cd Mon Sep 17 00:00:00 2001 From: Roman Meshko Date: Fri, 15 May 2026 13:15:42 +0300 Subject: [PATCH] Fix e2e tests --- e2e/tests/test_streaming_video_upload.py | 58 ++++++++++++++++++------ e2e/tests/test_video.py | 2 +- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/e2e/tests/test_streaming_video_upload.py b/e2e/tests/test_streaming_video_upload.py index 1e1f195..bce3336 100644 --- a/e2e/tests/test_streaming_video_upload.py +++ b/e2e/tests/test_streaming_video_upload.py @@ -1,9 +1,9 @@ """ AZ-178: True streaming video detection — e2e tests. -Both tests upload video_test01.mp4 (12 MB), wait for the first SSE event, -then stop. The goal is to prove the service starts and produces detections, -not to process the whole file. +Both tests upload video_test01.mp4 (12 MB), wait for the first SSE event +to prove streaming starts early, then keep draining SSE until the terminal +event so later tests do not overlap with background video inference. Run with: pytest e2e/tests/test_streaming_video_upload.py -s -v """ @@ -18,7 +18,8 @@ import sseclient FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures" _TIMEOUT = 5.0 -_STOP_AFTER = 5 +_DRAIN_TIMEOUT = 45.0 +_VIDEO_CONFIG = json.dumps({"model_batch_size": 1, "frame_period_recognition": 100}) def _fixture_path(name: str) -> str: @@ -39,10 +40,19 @@ def _chunked_reader(path: str, chunk_size: int = 64 * 1024): def _start_sse_listener( http_client, channel_id: str, auth_headers: dict -) -> tuple[list[dict], list[BaseException], threading.Event]: +) -> tuple[ + list[dict], + list[BaseException], + threading.Event, + threading.Event, + threading.Event, + threading.Thread, +]: events: list[dict] = [] errors: list[BaseException] = [] first_event = threading.Event() + terminal_event = threading.Event() + listener_done = threading.Event() connected = threading.Event() def _listen(): @@ -50,7 +60,7 @@ def _start_sse_listener( with http_client.get( f"/detect/events/{channel_id}", stream=True, - timeout=_TIMEOUT + 2, + timeout=_DRAIN_TIMEOUT + 5, headers=auth_headers, ) as resp: resp.raise_for_status() @@ -58,8 +68,11 @@ def _start_sse_listener( for event in sseclient.SSEClient(resp).events(): if not event.data or not str(event.data).strip(): continue - events.append(json.loads(event.data)) - if len(events) >= _STOP_AFTER: + data = json.loads(event.data) + events.append(data) + first_event.set() + if data.get("mediaStatus") in ("AIProcessed", "Error"): + terminal_event.set() first_event.set() break except BaseException as exc: @@ -67,21 +80,24 @@ def _start_sse_listener( finally: connected.set() first_event.set() + listener_done.set() th = threading.Thread(target=_listen, daemon=True) th.start() connected.wait(timeout=3) - return events, errors, first_event + return events, errors, first_event, terminal_event, listener_done, th -@pytest.mark.timeout(10) +@pytest.mark.timeout(60) def test_streaming_video_detections_appear_during_upload( warm_engine, http_client, auth_headers ): # Arrange video_path = _fixture_path("video_test01.mp4") channel_id = str(uuid.uuid4()) - events, errors, first_event = _start_sse_listener(http_client, channel_id, auth_headers) + events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener( + http_client, channel_id, auth_headers + ) # Act r = http_client.post( @@ -91,27 +107,34 @@ def test_streaming_video_detections_appear_during_upload( **auth_headers, "X-Channel-Id": channel_id, "X-Filename": "video_test01.mp4", + "X-Config": _VIDEO_CONFIG, "Content-Type": "application/octet-stream", }, timeout=8, ) assert r.status_code == 202 - first_event.wait(timeout=_TIMEOUT) + assert first_event.wait(timeout=_TIMEOUT) + assert terminal_event.wait(timeout=_DRAIN_TIMEOUT) + assert listener_done.wait(timeout=2) + th.join(timeout=2) # Assert assert not errors, f"SSE thread error: {errors}" assert len(events) >= 1, "Expected at least one SSE event within 5s" + assert events[-1].get("mediaStatus") == "AIProcessed" print(f"\n First {len(events)} SSE events:") for e in events: print(f" {e}") -@pytest.mark.timeout(10) +@pytest.mark.timeout(60) def test_non_faststart_video_still_works(warm_engine, http_client, auth_headers): # Arrange video_path = _fixture_path("video_test01.mp4") channel_id = str(uuid.uuid4()) - events, errors, first_event = _start_sse_listener(http_client, channel_id, auth_headers) + events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener( + http_client, channel_id, auth_headers + ) # Act r = http_client.post( @@ -121,16 +144,21 @@ def test_non_faststart_video_still_works(warm_engine, http_client, auth_headers) **auth_headers, "X-Channel-Id": channel_id, "X-Filename": "video_test01_plain.mp4", + "X-Config": _VIDEO_CONFIG, "Content-Type": "application/octet-stream", }, timeout=8, ) assert r.status_code == 202 - first_event.wait(timeout=_TIMEOUT) + assert first_event.wait(timeout=_TIMEOUT) + assert terminal_event.wait(timeout=_DRAIN_TIMEOUT) + assert listener_done.wait(timeout=2) + th.join(timeout=2) # Assert assert not errors, f"SSE thread error: {errors}" assert len(events) >= 1, "Expected at least one SSE event within 5s" + assert events[-1].get("mediaStatus") == "AIProcessed" print(f"\n First {len(events)} SSE events:") for e in events: print(f" {e}") diff --git a/e2e/tests/test_video.py b/e2e/tests/test_video.py index b003332..e5afcc3 100644 --- a/e2e/tests/test_video.py +++ b/e2e/tests/test_video.py @@ -73,7 +73,7 @@ def video_events(warm_engine, http_client, auth_headers): "X-Channel-Id": channel_id, "X-Filename": "video_test01.mp4", "X-Config": json.dumps( - {"model_batch_size": 1, "frame_period_recognition": 25} + {"model_batch_size": 1, "frame_period_recognition": 100} ), "Content-Type": "application/octet-stream", },