[AZ-178] Implement streaming video detection endpoint

- Added `/detect/video` endpoint for true streaming video detection, allowing inference to start as upload bytes arrive.
- Introduced `run_detect_video_stream` method in the inference module to handle video processing from a file-like object.
- Updated media hashing to include a new function for computing hashes directly from files with minimal I/O.
- Enhanced documentation to reflect changes in video processing and API behavior.

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-04-01 03:11:43 +03:00
parent e65d8da6a3
commit be4cab4fcb
42 changed files with 2983 additions and 29 deletions
+19
View File
@@ -199,6 +199,25 @@ cdef class Inference:
writer_done.wait()
wt.join(timeout=3600)
cpdef run_detect_video_stream(self, object readable, AIRecognitionConfig ai_config, str media_name,
object annotation_callback, object status_callback=None):
cdef str original_media_name
self._annotation_callback = annotation_callback
self._status_callback = status_callback
self.stop_signal = <bint>False
self.init_ai()
if self.engine is None:
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
return
original_media_name = media_name.replace(" ", "")
self.detection_counts = {}
self.detection_counts[original_media_name] = 0
container = av.open(readable)
try:
self._process_video_pyav(ai_config, original_media_name, container)
finally:
container.close()
cdef _process_video_pyav(self, AIRecognitionConfig ai_config, str original_media_name, object container):
cdef int frame_count = 0
cdef int batch_count = 0
+132
View File
@@ -467,6 +467,138 @@ async def detect_image(
pass
@app.post("/detect/video")
async def detect_video_upload(request: Request):
from media_hash import compute_media_content_hash_from_file
from inference import ai_config_from_dict
from streaming_buffer import StreamingBuffer
filename = request.headers.get("x-filename", "upload.mp4")
config_json = request.headers.get("x-config", "")
ext = _normalize_upload_ext(filename)
if ext not in _VIDEO_EXTENSIONS:
raise HTTPException(status_code=400, detail="Expected a video file extension")
config_dict = json.loads(config_json) if config_json else {}
ai_cfg = ai_config_from_dict(config_dict)
auth_header = request.headers.get("authorization", "")
access_token = auth_header.removeprefix("Bearer ").strip() if auth_header else ""
refresh_token = request.headers.get("x-refresh-token", "")
token_mgr = TokenManager(access_token, refresh_token) if access_token else None
user_id = TokenManager.decode_user_id(access_token) if access_token else None
videos_dir = os.environ.get(
"VIDEOS_DIR", os.path.join(os.getcwd(), "data", "videos")
)
os.makedirs(videos_dir, exist_ok=True)
content_length = request.headers.get("content-length")
total_size = int(content_length) if content_length else None
buffer = StreamingBuffer(videos_dir, total_size=total_size)
media_name = Path(filename).stem.replace(" ", "")
loop = asyncio.get_event_loop()
inf = get_inference()
def _enqueue(event):
for q in _event_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
placeholder_id = f"tmp_{os.path.basename(buffer.path)}"
def on_annotation(annotation, percent):
dtos = [detection_to_dto(d) for d in annotation.detections]
event = DetectionEvent(
annotations=dtos,
mediaId=placeholder_id,
mediaStatus="AIProcessing",
mediaPercent=percent,
)
loop.call_soon_threadsafe(_enqueue, event)
def on_status(media_name_cb, count):
event = DetectionEvent(
annotations=[],
mediaId=placeholder_id,
mediaStatus="AIProcessed",
mediaPercent=100,
)
loop.call_soon_threadsafe(_enqueue, event)
def run_inference():
inf.run_detect_video_stream(buffer, ai_cfg, media_name, on_annotation, on_status)
inference_future = loop.run_in_executor(executor, run_inference)
try:
async for chunk in request.stream():
await loop.run_in_executor(None, buffer.append, chunk)
except Exception:
buffer.close_writer()
buffer.close()
raise
buffer.close_writer()
content_hash = compute_media_content_hash_from_file(buffer.path)
if not ext.startswith("."):
ext = "." + ext
storage_path = os.path.abspath(os.path.join(videos_dir, f"{content_hash}{ext}"))
if token_mgr and user_id:
os.rename(buffer.path, storage_path)
payload = {
"id": content_hash,
"name": Path(filename).name,
"path": storage_path,
"mediaType": "Video",
"mediaStatus": _MEDIA_STATUS_NEW,
"userId": user_id,
}
bearer = token_mgr.get_valid_token()
_post_media_record(payload, bearer)
_put_media_status(content_hash, _MEDIA_STATUS_AI_PROCESSING, bearer)
async def _wait_inference():
try:
await inference_future
if token_mgr and user_id:
_put_media_status(
content_hash, _MEDIA_STATUS_AI_PROCESSED,
token_mgr.get_valid_token(),
)
done_event = DetectionEvent(
annotations=[],
mediaId=content_hash,
mediaStatus="AIProcessed",
mediaPercent=100,
)
_enqueue(done_event)
except Exception:
if token_mgr and user_id:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR,
token_mgr.get_valid_token(),
)
err_event = DetectionEvent(
annotations=[], mediaId=content_hash,
mediaStatus="Error", mediaPercent=0,
)
_enqueue(err_event)
finally:
_active_detections.pop(content_hash, None)
buffer.close()
if not (token_mgr and user_id) and os.path.isfile(buffer.path):
try:
os.unlink(buffer.path)
except OSError:
pass
_active_detections[content_hash] = asyncio.create_task(_wait_inference())
return {"status": "started", "mediaId": content_hash}
def _post_annotation_to_service(token_mgr: TokenManager, media_id: str,
annotation, dtos: list[DetectionDto]):
try:
+20
View File
@@ -1,3 +1,5 @@
import os
import xxhash
@@ -14,3 +16,21 @@ def compute_media_content_hash(data: bytes, virtual: bool = False) -> str:
payload = _sampling_payload(data)
h = xxhash.xxh64(payload).hexdigest()
return f"V{h}" if virtual else h
def compute_media_content_hash_from_file(path: str, virtual: bool = False) -> str:
n = os.path.getsize(path)
size_bytes = n.to_bytes(8, "little")
with open(path, "rb") as f:
if n < 3072:
payload = size_bytes + f.read()
else:
first = f.read(1024)
mid = (n - 1024) // 2
f.seek(mid)
middle = f.read(1024)
f.seek(-1024, 2)
last = f.read(1024)
payload = size_bytes + first + middle + last
h = xxhash.xxh64(payload).hexdigest()
return f"V{h}" if virtual else h
+93
View File
@@ -0,0 +1,93 @@
import os
import tempfile
import threading
class StreamingBuffer:
def __init__(self, temp_dir: str | None = None, total_size: int | None = None):
fd, self.path = tempfile.mkstemp(dir=temp_dir, suffix=".tmp")
os.close(fd)
self._writer = open(self.path, "wb")
self._reader = open(self.path, "rb")
self._written = 0
self._total_size = total_size
self._eof = False
self._cond = threading.Condition()
def append(self, data: bytes) -> None:
with self._cond:
self._writer.write(data)
self._writer.flush()
self._written += len(data)
self._cond.notify_all()
def close_writer(self) -> None:
with self._cond:
self._writer.close()
self._eof = True
self._cond.notify_all()
def read(self, size: int = -1) -> bytes:
with self._cond:
pos = self._reader.tell()
if size < 0:
while not self._eof:
self._cond.wait()
to_read = self._written - pos
else:
while self._written <= pos and not self._eof:
self._cond.wait()
available = self._written - pos
if available <= 0:
return b""
to_read = min(size, available)
return self._reader.read(to_read)
def seek(self, offset: int, whence: int = 0) -> int:
with self._cond:
if whence == 2:
if self._total_size is not None:
target = self._total_size + offset
self._reader.seek(target, 0)
return target
while not self._eof:
self._cond.wait()
return self._reader.seek(offset, 2)
if whence == 1:
target = self._reader.tell() + offset
else:
target = offset
if target <= self._written:
return self._reader.seek(offset, whence)
if self._total_size is not None:
self._reader.seek(target, 0)
return target
while target > self._written and not self._eof:
self._cond.wait()
return self._reader.seek(offset, whence)
def tell(self) -> int:
return self._reader.tell()
def readable(self) -> bool:
return True
def seekable(self) -> bool:
return True
def writable(self) -> bool:
return False
def close(self) -> None:
try:
self._reader.close()
except Exception:
pass
try:
self._writer.close()
except Exception:
pass
@property
def written(self) -> int:
return self._written