import json import threading import time import uuid from pathlib import Path import pytest import sseclient FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures" _VIDEO = str(FIXTURES_DIR / "video_test01.mp4") def _chunked_reader(path: str, chunk_size: int = 64 * 1024): with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break yield chunk @pytest.fixture(scope="module") def video_events(warm_engine, http_client, auth_headers): if not Path(_VIDEO).is_file(): pytest.skip(f"missing fixture {_VIDEO}") channel_id = str(uuid.uuid4()) collected: list[tuple[float, dict]] = [] thread_exc: list[BaseException] = [] done = threading.Event() connected = threading.Event() def _listen(): try: with http_client.get( f"/detect/events/{channel_id}", stream=True, timeout=60, headers=auth_headers, ) as resp: resp.raise_for_status() connected.set() sse = sseclient.SSEClient(resp) for event in sse.events(): if not event.data or not str(event.data).strip(): continue data = json.loads(event.data) collected.append((time.monotonic(), data)) if ( data.get("mediaStatus") == "AIProcessed" and data.get("mediaPercent") == 100 ): break except BaseException as e: thread_exc.append(e) finally: connected.set() done.set() th = threading.Thread(target=_listen, daemon=True) th.start() connected.wait(timeout=5) r = http_client.post( "/detect/video", data=_chunked_reader(_VIDEO), headers={ **auth_headers, "X-Channel-Id": channel_id, "X-Filename": "video_test01.mp4", "Content-Type": "application/octet-stream", }, timeout=15, ) assert r.status_code == 202 assert done.wait(timeout=30) th.join(timeout=5) assert not thread_exc, thread_exc return collected @pytest.mark.timeout(30) def test_ft_p_10_frame_sampling_ac1(video_events): # Assert processing = [d for _, d in video_events if d.get("mediaStatus") == "AIProcessing"] assert len(processing) >= 2 final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100 @pytest.mark.timeout(30) def test_ft_p_11_annotation_interval_ac2(video_events): # Assert processing = [ (t, d) for t, d in video_events if d.get("mediaStatus") == "AIProcessing" ] assert len(processing) >= 2 gaps = [processing[i][0] - processing[i - 1][0] for i in range(1, len(processing))] assert all(g >= 0.0 for g in gaps) final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100 @pytest.mark.timeout(30) def test_ft_p_12_movement_tracking_ac3(video_events): # Assert for _, e in video_events: anns = e.get("annotations") if not anns: continue assert isinstance(anns, list) for d in anns: assert isinstance(d["centerX"], (int, float)) assert isinstance(d["centerY"], (int, float)) assert isinstance(d["width"], (int, float)) assert isinstance(d["height"], (int, float)) assert 0.0 <= float(d["centerX"]) <= 1.0 assert 0.0 <= float(d["centerY"]) <= 1.0 assert 0.0 <= float(d["width"]) <= 1.0 assert 0.0 <= float(d["height"]) <= 1.0 assert isinstance(d["classNum"], int) assert isinstance(d["label"], str) assert isinstance(d["confidence"], (int, float)) assert 0.0 <= float(d["confidence"]) <= 1.0 final = video_events[-1][1] assert final["mediaStatus"] == "AIProcessed" assert final["mediaPercent"] == 100