""" AZ-178: True streaming video detection — e2e tests. Both tests upload video_test01.mp4 (12 MB), wait for the first SSE event to prove streaming starts early, then keep draining SSE until the terminal event so later tests do not overlap with background video inference. Run with: pytest e2e/tests/test_streaming_video_upload.py -s -v """ import json import threading import time import uuid from pathlib import Path import pytest import sseclient FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures" _TIMEOUT = 5.0 _DRAIN_TIMEOUT = 45.0 _VIDEO_CONFIG = json.dumps({"model_batch_size": 1, "frame_period_recognition": 100}) def _fixture_path(name: str) -> str: p = FIXTURES_DIR / name if not p.is_file(): pytest.skip(f"missing fixture {p}") return str(p) def _chunked_reader(path: str, chunk_size: int = 64 * 1024): with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break yield chunk def _start_sse_listener( http_client, channel_id: str, auth_headers: dict ) -> tuple[ list[dict], list[BaseException], threading.Event, threading.Event, threading.Event, threading.Thread, ]: events: list[dict] = [] errors: list[BaseException] = [] first_event = threading.Event() terminal_event = threading.Event() listener_done = threading.Event() connected = threading.Event() def _listen(): try: with http_client.get( f"/detect/events/{channel_id}", stream=True, timeout=_DRAIN_TIMEOUT + 5, headers=auth_headers, ) as resp: resp.raise_for_status() connected.set() for event in sseclient.SSEClient(resp).events(): if not event.data or not str(event.data).strip(): continue data = json.loads(event.data) events.append(data) first_event.set() if data.get("mediaStatus") in ("AIProcessed", "Error"): terminal_event.set() first_event.set() break except BaseException as exc: errors.append(exc) finally: connected.set() first_event.set() listener_done.set() th = threading.Thread(target=_listen, daemon=True) th.start() connected.wait(timeout=3) return events, errors, first_event, terminal_event, listener_done, th @pytest.mark.timeout(60) def test_streaming_video_detections_appear_during_upload( warm_engine, http_client, auth_headers ): # Arrange video_path = _fixture_path("video_test01.mp4") channel_id = str(uuid.uuid4()) events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener( http_client, channel_id, auth_headers ) # Act r = http_client.post( "/detect/video", data=_chunked_reader(video_path), headers={ **auth_headers, "X-Channel-Id": channel_id, "X-Filename": "video_test01.mp4", "X-Config": _VIDEO_CONFIG, "Content-Type": "application/octet-stream", }, timeout=8, ) assert r.status_code == 202 assert first_event.wait(timeout=_TIMEOUT) assert terminal_event.wait(timeout=_DRAIN_TIMEOUT) assert listener_done.wait(timeout=2) th.join(timeout=2) # Assert assert not errors, f"SSE thread error: {errors}" assert len(events) >= 1, "Expected at least one SSE event within 5s" assert events[-1].get("mediaStatus") == "AIProcessed" print(f"\n First {len(events)} SSE events:") for e in events: print(f" {e}") @pytest.mark.timeout(60) def test_non_faststart_video_still_works(warm_engine, http_client, auth_headers): # Arrange video_path = _fixture_path("video_test01.mp4") channel_id = str(uuid.uuid4()) events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener( http_client, channel_id, auth_headers ) # Act r = http_client.post( "/detect/video", data=_chunked_reader(video_path), headers={ **auth_headers, "X-Channel-Id": channel_id, "X-Filename": "video_test01_plain.mp4", "X-Config": _VIDEO_CONFIG, "Content-Type": "application/octet-stream", }, timeout=8, ) assert r.status_code == 202 assert first_event.wait(timeout=_TIMEOUT) assert terminal_event.wait(timeout=_DRAIN_TIMEOUT) assert listener_done.wait(timeout=2) th.join(timeout=2) # Assert assert not errors, f"SSE thread error: {errors}" assert len(events) >= 1, "Expected at least one SSE event within 5s" assert events[-1].get("mediaStatus") == "AIProcessed" print(f"\n First {len(events)} SSE events:") for e in events: print(f" {e}")