import base64 import json import os import tempfile import threading import time from unittest.mock import MagicMock, patch import cv2 import numpy as np import pytest from fastapi.testclient import TestClient pytest.importorskip("inference") import inference as inference_mod def _access_jwt(sub: str = "u1") -> str: raw = json.dumps( {"exp": int(time.time()) + 3600, "sub": sub}, separators=(",", ":") ).encode() payload = base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"h.{payload}.s" def _jpeg_bytes() -> bytes: _, buf = cv2.imencode(".jpg", np.zeros((2, 2, 3), dtype=np.uint8)) return buf.tobytes() class _FakeInfVideo: def run_detect_video( self, video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback=None, ): writer = inference_mod._write_video_bytes_to_path ev = threading.Event() t = threading.Thread( target=writer, args=(save_path, video_bytes, ev), ) t.start() ev.wait(timeout=60) t.join(timeout=60) def run_detect_image(self, *args, **kwargs): pass class _FakeInfVideoConcurrent: def run_detect_video( self, video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback=None, ): holder = {} writer = inference_mod._write_video_bytes_to_path def write(): holder["tid"] = threading.get_ident() writer(save_path, video_bytes, threading.Event()) t = threading.Thread(target=write) t.start() holder["caller_tid"] = threading.get_ident() t.join(timeout=60) assert holder["tid"] != holder["caller_tid"] def run_detect_image(self, *args, **kwargs): pass @pytest.fixture def reset_main_inference(): import main main.inference = None yield main.inference = None def test_auth_video_storage_path_opened_wb_once(reset_main_inference): # Arrange import main from media_hash import compute_media_content_hash write_paths = [] real_write = inference_mod._write_video_bytes_to_path def tracking_write(path, data, ev): write_paths.append(os.path.abspath(str(path))) return real_write(path, data, ev) video_body = b"vid-bytes-" * 20 token = _access_jwt() mock_post = MagicMock() mock_post.return_value.status_code = 201 mock_put = MagicMock() mock_put.return_value.status_code = 204 with tempfile.TemporaryDirectory() as vd: os.environ["VIDEOS_DIR"] = vd content_hash = compute_media_content_hash(video_body) expected_path = os.path.abspath(os.path.join(vd, f"{content_hash}.mp4")) client = TestClient(main.app) with ( patch.object(inference_mod, "_write_video_bytes_to_path", tracking_write), patch.object(main.http_requests, "post", mock_post), patch.object(main.http_requests, "put", mock_put), patch.object(main, "get_inference", return_value=_FakeInfVideo()), ): # Act r = client.post( "/detect", files={"file": ("clip.mp4", video_body, "video/mp4")}, headers={"Authorization": f"Bearer {token}"}, ) # Assert assert r.status_code == 200 assert write_paths.count(expected_path) == 1 with open(expected_path, "rb") as f: assert f.read() == video_body assert mock_post.called assert mock_put.call_count >= 2 def test_non_auth_temp_video_opened_wb_once_and_removed(reset_main_inference): # Arrange import main write_paths = [] real_write = inference_mod._write_video_bytes_to_path def tracking_write(path, data, ev): write_paths.append(os.path.abspath(str(path))) return real_write(path, data, ev) video_body = b"tmp-vid-" * 30 client = TestClient(main.app) tmp_path_holder = [] class _CaptureTmp(_FakeInfVideo): def run_detect_video(self, video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback=None): tmp_path_holder.append(os.path.abspath(str(save_path))) super().run_detect_video( video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback ) with ( patch.object(inference_mod, "_write_video_bytes_to_path", tracking_write), patch.object(main, "get_inference", return_value=_CaptureTmp()), ): # Act r = client.post( "/detect", files={"file": ("n.mp4", video_body, "video/mp4")}, ) # Assert assert r.status_code == 200 assert len(tmp_path_holder) == 1 tmp_path = tmp_path_holder[0] assert write_paths.count(tmp_path) == 1 assert not os.path.isfile(tmp_path) def test_auth_image_still_writes_once_before_detect(reset_main_inference): # Arrange import builtins import main from media_hash import compute_media_content_hash real_open = builtins.open wb_hits = [] def tracking_open(file, mode="r", *args, **kwargs): if mode == "wb": wb_hits.append(os.path.abspath(str(file))) return real_open(file, mode, *args, **kwargs) img = _jpeg_bytes() token = _access_jwt() mock_post = MagicMock() mock_post.return_value.status_code = 201 mock_put = MagicMock() mock_put.return_value.status_code = 204 with tempfile.TemporaryDirectory() as idir: os.environ["IMAGES_DIR"] = idir content_hash = compute_media_content_hash(img) expected_path = os.path.abspath(os.path.join(idir, f"{content_hash}.jpg")) client = TestClient(main.app) with ( patch.object(builtins, "open", tracking_open), patch.object(main.http_requests, "post", mock_post), patch.object(main.http_requests, "put", mock_put), patch.object(main, "get_inference", return_value=_FakeInfVideo()), ): # Act r = client.post( "/detect", files={"file": ("p.jpg", img, "image/jpeg")}, headers={"Authorization": f"Bearer {token}"}, ) # Assert assert r.status_code == 200 assert wb_hits.count(expected_path) == 1 with real_open(expected_path, "rb") as f: assert f.read() == img def test_video_writer_runs_in_separate_thread_from_executor(reset_main_inference): # Arrange import main token = _access_jwt() mock_post = MagicMock() mock_post.return_value.status_code = 201 mock_put = MagicMock() mock_put.return_value.status_code = 204 video_body = b"thr-test-" * 15 with tempfile.TemporaryDirectory() as vd: os.environ["VIDEOS_DIR"] = vd client = TestClient(main.app) with ( patch.object(main.http_requests, "post", mock_post), patch.object(main.http_requests, "put", mock_put), patch.object(main, "get_inference", return_value=_FakeInfVideoConcurrent()), ): # Act r = client.post( "/detect", files={"file": ("c.mp4", video_body, "video/mp4")}, headers={"Authorization": f"Bearer {token}"}, ) # Assert assert r.status_code == 200