Files
detections/e2e/tests/test_streaming_video_upload.py
Roman Meshko 080f257423
ci/woodpecker/push/02-build-push Pipeline was canceled
ci/woodpecker/manual/01-test Pipeline was successful
ci/woodpecker/manual/02-build-push Pipeline was successful
ci/woodpecker/manual/e2e-smoke-jetson Pipeline was successful
Fix e2e tests
2026-05-15 13:15:42 +03:00

165 lines
4.9 KiB
Python

"""
AZ-178: True streaming video detection — e2e tests.
Both tests upload video_test01.mp4 (12 MB), wait for the first SSE event
to prove streaming starts early, then keep draining SSE until the terminal
event so later tests do not overlap with background video inference.
Run with: pytest e2e/tests/test_streaming_video_upload.py -s -v
"""
import json
import threading
import time
import uuid
from pathlib import Path
import pytest
import sseclient
FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures"
_TIMEOUT = 5.0
_DRAIN_TIMEOUT = 45.0
_VIDEO_CONFIG = json.dumps({"model_batch_size": 1, "frame_period_recognition": 100})
def _fixture_path(name: str) -> str:
p = FIXTURES_DIR / name
if not p.is_file():
pytest.skip(f"missing fixture {p}")
return str(p)
def _chunked_reader(path: str, chunk_size: int = 64 * 1024):
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def _start_sse_listener(
http_client, channel_id: str, auth_headers: dict
) -> tuple[
list[dict],
list[BaseException],
threading.Event,
threading.Event,
threading.Event,
threading.Thread,
]:
events: list[dict] = []
errors: list[BaseException] = []
first_event = threading.Event()
terminal_event = threading.Event()
listener_done = threading.Event()
connected = threading.Event()
def _listen():
try:
with http_client.get(
f"/detect/events/{channel_id}",
stream=True,
timeout=_DRAIN_TIMEOUT + 5,
headers=auth_headers,
) as resp:
resp.raise_for_status()
connected.set()
for event in sseclient.SSEClient(resp).events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
events.append(data)
first_event.set()
if data.get("mediaStatus") in ("AIProcessed", "Error"):
terminal_event.set()
first_event.set()
break
except BaseException as exc:
errors.append(exc)
finally:
connected.set()
first_event.set()
listener_done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
connected.wait(timeout=3)
return events, errors, first_event, terminal_event, listener_done, th
@pytest.mark.timeout(60)
def test_streaming_video_detections_appear_during_upload(
warm_engine, http_client, auth_headers
):
# Arrange
video_path = _fixture_path("video_test01.mp4")
channel_id = str(uuid.uuid4())
events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener(
http_client, channel_id, auth_headers
)
# Act
r = http_client.post(
"/detect/video",
data=_chunked_reader(video_path),
headers={
**auth_headers,
"X-Channel-Id": channel_id,
"X-Filename": "video_test01.mp4",
"X-Config": _VIDEO_CONFIG,
"Content-Type": "application/octet-stream",
},
timeout=8,
)
assert r.status_code == 202
assert first_event.wait(timeout=_TIMEOUT)
assert terminal_event.wait(timeout=_DRAIN_TIMEOUT)
assert listener_done.wait(timeout=2)
th.join(timeout=2)
# Assert
assert not errors, f"SSE thread error: {errors}"
assert len(events) >= 1, "Expected at least one SSE event within 5s"
assert events[-1].get("mediaStatus") == "AIProcessed"
print(f"\n First {len(events)} SSE events:")
for e in events:
print(f" {e}")
@pytest.mark.timeout(60)
def test_non_faststart_video_still_works(warm_engine, http_client, auth_headers):
# Arrange
video_path = _fixture_path("video_test01.mp4")
channel_id = str(uuid.uuid4())
events, errors, first_event, terminal_event, listener_done, th = _start_sse_listener(
http_client, channel_id, auth_headers
)
# Act
r = http_client.post(
"/detect/video",
data=_chunked_reader(video_path),
headers={
**auth_headers,
"X-Channel-Id": channel_id,
"X-Filename": "video_test01_plain.mp4",
"X-Config": _VIDEO_CONFIG,
"Content-Type": "application/octet-stream",
},
timeout=8,
)
assert r.status_code == 202
assert first_event.wait(timeout=_TIMEOUT)
assert terminal_event.wait(timeout=_DRAIN_TIMEOUT)
assert listener_done.wait(timeout=2)
th.join(timeout=2)
# Assert
assert not errors, f"SSE thread error: {errors}"
assert len(events) >= 1, "Expected at least one SSE event within 5s"
assert events[-1].get("mediaStatus") == "AIProcessed"
print(f"\n First {len(events)} SSE events:")
for e in events:
print(f" {e}")