import base64 import json import threading import time import uuid import pytest import sseclient def _make_jwt() -> str: header = base64.urlsafe_b64encode( json.dumps({"alg": "none", "typ": "JWT"}).encode() ).decode().rstrip("=") raw = json.dumps( {"exp": int(time.time()) + 3600, "sub": "test"}, separators=(",", ":") ).encode() payload = base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"{header}.{payload}.signature" @pytest.fixture(scope="module") def video_events(warm_engine, http_client): media_id = f"video-{uuid.uuid4().hex}" body = {} token = _make_jwt() collected: list[tuple[float, dict]] = [] thread_exc: list[BaseException] = [] done = threading.Event() def _listen(): try: with http_client.get("/detect/stream", stream=True, timeout=600) as resp: resp.raise_for_status() sse = sseclient.SSEClient(resp) time.sleep(0.3) for event in sse.events(): if not event.data or not str(event.data).strip(): continue data = json.loads(event.data) if data.get("mediaId") != media_id: continue collected.append((time.monotonic(), data)) if ( data.get("mediaStatus") == "AIProcessed" and data.get("mediaPercent") == 100 ): break except BaseException as e: thread_exc.append(e) finally: done.set() th = threading.Thread(target=_listen, daemon=True) th.start() time.sleep(0.5) r = http_client.post( f"/detect/{media_id}", json=body, headers={"Authorization": f"Bearer {token}"}, ) assert r.status_code == 200 assert r.json() == {"status": "started", "mediaId": media_id} assert done.wait(timeout=900) th.join(timeout=5) assert not thread_exc, thread_exc return collected @pytest.mark.slow @pytest.mark.timeout(900) def test_ft_p_10_frame_sampling_ac1(video_events): # Assert processing = [d for _, d in video_events if d.get("mediaStatus") == "AIProcessing"] assert len(processing) >= 2 final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100 @pytest.mark.slow @pytest.mark.timeout(900) def test_ft_p_11_annotation_interval_ac2(video_events): # Assert processing = [ (t, d) for t, d in video_events if d.get("mediaStatus") == "AIProcessing" ] assert len(processing) >= 2 gaps = [processing[i][0] - processing[i - 1][0] for i in range(1, len(processing))] assert all(g >= 0.0 for g in gaps) final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100 @pytest.mark.slow @pytest.mark.timeout(900) def test_ft_p_12_movement_tracking_ac3(video_events): # Assert for _, e in video_events: anns = e.get("annotations") if not anns: continue assert isinstance(anns, list) for d in anns: assert isinstance(d["centerX"], (int, float)) assert isinstance(d["centerY"], (int, float)) assert isinstance(d["width"], (int, float)) assert isinstance(d["height"], (int, float)) assert 0.0 <= float(d["centerX"]) <= 1.0 assert 0.0 <= float(d["centerY"]) <= 1.0 assert 0.0 <= float(d["width"]) <= 1.0 assert 0.0 <= float(d["height"]) <= 1.0 assert isinstance(d["classNum"], int) assert isinstance(d["label"], str) assert isinstance(d["confidence"], (int, float)) assert 0.0 <= float(d["confidence"]) <= 1.0 final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100