import json import threading import time import uuid import pytest def _video_load_url(mock_loader_url: str, video_media_path: str) -> str: name = video_media_path.rstrip("/").split("/")[-1] return f"{mock_loader_url.rstrip('/')}/load/{name}" def _base_ai_body(mock_loader_url: str, video_path: str) -> dict: return { "probability_threshold": 0.25, "frame_period_recognition": 4, "frame_recognition_seconds": 2, "tracking_distance_confidence": 0.0, "tracking_probability_increase": 0.0, "tracking_intersection_threshold": 0.6, "altitude": 400.0, "focal_length": 24.0, "sensor_width": 23.5, "paths": [_video_load_url(mock_loader_url, video_path)], } def _run_async_video_sse( http_client, jwt_token, sse_client_factory, media_id: str, body: dict, *, timed: bool = False, wait_s: float = 120.0, ): collected: list = [] thread_exc: list[BaseException] = [] done = threading.Event() def _listen(): try: with sse_client_factory() as sse: time.sleep(0.3) for event in sse.events(): if not event.data or not str(event.data).strip(): continue data = json.loads(event.data) if data.get("mediaId") != media_id: continue if timed: collected.append((time.monotonic(), data)) else: collected.append(data) if ( data.get("mediaStatus") == "AIProcessed" and data.get("mediaPercent") == 100 ): break except BaseException as e: thread_exc.append(e) finally: done.set() th = threading.Thread(target=_listen, daemon=True) th.start() time.sleep(0.5) r = http_client.post( f"/detect/{media_id}", json=body, headers={"Authorization": f"Bearer {jwt_token}"}, ) assert r.status_code == 200 assert r.json() == {"status": "started", "mediaId": media_id} assert done.wait(timeout=wait_s) th.join(timeout=5) assert not thread_exc, thread_exc return collected def _assert_detection_dto(d: dict) -> None: assert isinstance(d["centerX"], (int, float)) assert isinstance(d["centerY"], (int, float)) assert isinstance(d["width"], (int, float)) assert isinstance(d["height"], (int, float)) assert 0.0 <= float(d["centerX"]) <= 1.0 assert 0.0 <= float(d["centerY"]) <= 1.0 assert 0.0 <= float(d["width"]) <= 1.0 assert 0.0 <= float(d["height"]) <= 1.0 assert isinstance(d["classNum"], int) assert isinstance(d["label"], str) assert isinstance(d["confidence"], (int, float)) assert 0.0 <= float(d["confidence"]) <= 1.0 @pytest.mark.slow @pytest.mark.timeout(120) def test_ft_p_10_frame_sampling_ac1( warm_engine, http_client, jwt_token, mock_loader_url, video_short_path, sse_client_factory, ): media_id = f"video-{uuid.uuid4().hex}" body = _base_ai_body(mock_loader_url, video_short_path) body["frame_period_recognition"] = 4 collected = _run_async_video_sse( http_client, jwt_token, sse_client_factory, media_id, body, ) processing = [e for e in collected if e.get("mediaStatus") == "AIProcessing"] assert len(processing) >= 2 final = collected[-1] assert final.get("mediaStatus") == "AIProcessed" assert final.get("mediaPercent") == 100 @pytest.mark.slow @pytest.mark.timeout(120) def test_ft_p_11_annotation_interval_ac2( warm_engine, http_client, jwt_token, mock_loader_url, video_short_path, sse_client_factory, ): media_id = f"video-{uuid.uuid4().hex}" body = _base_ai_body(mock_loader_url, video_short_path) body["frame_recognition_seconds"] = 2 collected = _run_async_video_sse( http_client, jwt_token, sse_client_factory, media_id, body, timed=True, ) processing = [ (t, d) for t, d in collected if d.get("mediaStatus") == "AIProcessing" ] assert len(processing) >= 2 gaps = [ processing[i][0] - processing[i - 1][0] for i in range(1, len(processing)) ] assert all(g >= 0.0 for g in gaps) final = collected[-1][1] assert final.get("mediaStatus") == "AIProcessed" assert final.get("mediaPercent") == 100 @pytest.mark.slow @pytest.mark.timeout(120) def test_ft_p_12_movement_tracking_ac3( warm_engine, http_client, jwt_token, mock_loader_url, video_short_path, sse_client_factory, ): media_id = f"video-{uuid.uuid4().hex}" body = _base_ai_body(mock_loader_url, video_short_path) body["tracking_distance_confidence"] = 0.1 body["tracking_probability_increase"] = 0.1 collected = _run_async_video_sse( http_client, jwt_token, sse_client_factory, media_id, body, ) for e in collected: anns = e.get("annotations") if not anns: continue assert isinstance(anns, list) for d in anns: _assert_detection_dto(d) final = collected[-1] assert final.get("mediaStatus") == "AIProcessed" assert final.get("mediaPercent") == 100