[AZ-175] Media table integration with XxHash64 content hashing and status lifecycle

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-31 06:36:56 +03:00
parent 6c24d09eab
commit 40be55ac03
10 changed files with 381 additions and 30 deletions
+202 -20
View File
@@ -1,11 +1,17 @@
import asyncio
import base64
import io
import json
import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Annotated, Optional
import av
import cv2
import numpy as np
import requests as http_requests
from fastapi import Body, FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.responses import StreamingResponse
@@ -19,6 +25,14 @@ executor = ThreadPoolExecutor(max_workers=2)
LOADER_URL = os.environ.get("LOADER_URL", "http://loader:8080")
ANNOTATIONS_URL = os.environ.get("ANNOTATIONS_URL", "http://annotations:8080")
_MEDIA_STATUS_NEW = 1
_MEDIA_STATUS_AI_PROCESSING = 2
_MEDIA_STATUS_AI_PROCESSED = 3
_MEDIA_STATUS_ERROR = 6
_VIDEO_EXTENSIONS = frozenset({".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"})
_IMAGE_EXTENSIONS = frozenset({".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"})
loader_client = LoaderHttpClient(LOADER_URL)
annotations_client = LoaderHttpClient(ANNOTATIONS_URL)
inference = None
@@ -214,11 +228,66 @@ def _merged_annotation_settings_payload(raw: object) -> dict:
return out
def _build_media_detect_config_dict(
def _normalize_upload_ext(filename: str) -> str:
s = Path(filename or "").suffix.lower()
return s if s else ""
def _detect_upload_kind(filename: str, data: bytes) -> tuple[str, str]:
ext = _normalize_upload_ext(filename)
if ext in _VIDEO_EXTENSIONS:
return "video", ext
if ext in _IMAGE_EXTENSIONS:
return "image", ext
arr = np.frombuffer(data, dtype=np.uint8)
if cv2.imdecode(arr, cv2.IMREAD_COLOR) is not None:
return "image", ext if ext else ".jpg"
try:
bio = io.BytesIO(data)
with av.open(bio):
pass
return "video", ext if ext else ".mp4"
except Exception:
raise HTTPException(status_code=400, detail="Invalid image or video data")
def _is_video_media_path(media_path: str) -> bool:
return Path(media_path).suffix.lower() in _VIDEO_EXTENSIONS
def _post_media_record(payload: dict, bearer: str) -> bool:
try:
headers = {"Authorization": f"Bearer {bearer}"}
r = http_requests.post(
f"{ANNOTATIONS_URL}/api/media",
json=payload,
headers=headers,
timeout=30,
)
return r.status_code in (200, 201)
except Exception:
return False
def _put_media_status(media_id: str, media_status: int, bearer: str) -> bool:
try:
headers = {"Authorization": f"Bearer {bearer}"}
r = http_requests.put(
f"{ANNOTATIONS_URL}/api/media/{media_id}/status",
json={"mediaStatus": media_status},
headers=headers,
timeout=30,
)
return r.status_code in (200, 204)
except Exception:
return False
def _resolve_media_for_detect(
media_id: str,
token_mgr: Optional[TokenManager],
override: Optional[AIConfigDto],
) -> dict:
) -> tuple[dict, str]:
cfg: dict = {}
bearer = ""
if token_mgr:
@@ -236,8 +305,7 @@ def _build_media_detect_config_dict(
status_code=503,
detail="Could not resolve media path from annotations service",
)
cfg["paths"] = [media_path]
return cfg
return cfg, media_path
def detection_to_dto(det) -> DetectionDto:
@@ -279,49 +347,120 @@ def health() -> HealthResponse:
@app.post("/detect")
async def detect_image(
request: Request,
file: UploadFile = File(...),
config: Optional[str] = Form(None),
):
import cv2
import numpy as np
from pathlib import Path
from media_hash import compute_media_content_hash
from inference import ai_config_from_dict
image_bytes = await file.read()
if not image_bytes:
raise HTTPException(status_code=400, detail="Image is empty")
arr = np.frombuffer(image_bytes, dtype=np.uint8)
if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None:
raise HTTPException(status_code=400, detail="Invalid image data")
orig_name = file.filename or "upload"
kind, ext = _detect_upload_kind(orig_name, image_bytes)
config_dict = {}
if config:
config_dict = json.loads(config)
media_name = Path(file.filename or "upload.jpg").stem.replace(" ", "")
auth_header = request.headers.get("authorization", "")
access_token = auth_header.removeprefix("Bearer ").strip() if auth_header else ""
refresh_token = request.headers.get("x-refresh-token", "")
token_mgr = TokenManager(access_token, refresh_token) if access_token else None
user_id = TokenManager.decode_user_id(access_token) if access_token else None
videos_dir = os.environ.get(
"VIDEOS_DIR", os.path.join(os.getcwd(), "data", "videos")
)
images_dir = os.environ.get(
"IMAGES_DIR", os.path.join(os.getcwd(), "data", "images")
)
storage_path = None
content_hash = None
if token_mgr and user_id:
content_hash = compute_media_content_hash(image_bytes)
base = videos_dir if kind == "video" else images_dir
os.makedirs(base, exist_ok=True)
if not ext.startswith("."):
ext = "." + ext
storage_path = os.path.abspath(os.path.join(base, f"{content_hash}{ext}"))
with open(storage_path, "wb") as out:
out.write(image_bytes)
mt = "Video" if kind == "video" else "Image"
payload = {
"id": content_hash,
"name": Path(orig_name).name,
"path": storage_path,
"mediaType": mt,
"mediaStatus": _MEDIA_STATUS_NEW,
"userId": user_id,
}
bearer = token_mgr.get_valid_token()
_post_media_record(payload, bearer)
_put_media_status(content_hash, _MEDIA_STATUS_AI_PROCESSING, bearer)
media_name = Path(orig_name).stem.replace(" ", "")
loop = asyncio.get_event_loop()
inf = get_inference()
results = []
tmp_video_path = None
def on_annotation(annotation, percent):
results.extend(annotation.detections)
ai_cfg = ai_config_from_dict(config_dict)
def run_img():
inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation)
def run_upload():
nonlocal tmp_video_path
if kind == "video":
if storage_path:
save = storage_path
else:
suf = ext if ext.startswith(".") else ".mp4"
fd, tmp_video_path = tempfile.mkstemp(suffix=suf)
os.close(fd)
with open(tmp_video_path, "wb") as f:
f.write(image_bytes)
save = tmp_video_path
inf.run_detect_video(image_bytes, ai_cfg, media_name, save, on_annotation)
else:
inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation)
try:
await loop.run_in_executor(executor, run_img)
await loop.run_in_executor(executor, run_upload)
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_AI_PROCESSED, token_mgr.get_valid_token()
)
return [detection_to_dto(d) for d in results]
except RuntimeError as e:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
if "not available" in str(e):
raise HTTPException(status_code=503, detail=str(e))
raise HTTPException(status_code=422, detail=str(e))
except ValueError as e:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
raise
finally:
if tmp_video_path and os.path.isfile(tmp_video_path):
try:
os.unlink(tmp_video_path)
except OSError:
pass
def _post_annotation_to_service(token_mgr: TokenManager, media_id: str,
@@ -362,7 +501,7 @@ async def detect_media(
refresh_token = request.headers.get("x-refresh-token", "")
token_mgr = TokenManager(access_token, refresh_token) if access_token else None
config_dict = _build_media_detect_config_dict(media_id, token_mgr, config)
config_dict, media_path = _resolve_media_for_detect(media_id, token_mgr, config)
async def run_detection():
loop = asyncio.get_event_loop()
@@ -375,6 +514,22 @@ async def detect_media(
pass
try:
from inference import ai_config_from_dict
if token_mgr:
_put_media_status(
media_id,
_MEDIA_STATUS_AI_PROCESSING,
token_mgr.get_valid_token(),
)
with open(media_path, "rb") as mf:
file_bytes = mf.read()
video = _is_video_media_path(media_path)
stem_name = Path(media_path).stem.replace(" ", "")
ai_cfg = ai_config_from_dict(config_dict)
inf = get_inference()
if not inf.is_engine_ready:
raise RuntimeError("Detection service unavailable")
@@ -391,7 +546,7 @@ async def detect_media(
if token_mgr and dtos:
_post_annotation_to_service(token_mgr, media_id, annotation, dtos)
def on_status(media_name, count):
def on_status(media_name_cb, count):
event = DetectionEvent(
annotations=[],
mediaId=media_id,
@@ -399,11 +554,38 @@ async def detect_media(
mediaPercent=100,
)
loop.call_soon_threadsafe(_enqueue, event)
if token_mgr:
_put_media_status(
media_id,
_MEDIA_STATUS_AI_PROCESSED,
token_mgr.get_valid_token(),
)
await loop.run_in_executor(
executor, inf.run_detect, config_dict, on_annotation, on_status
)
def run_sync():
if video:
inf.run_detect_video(
file_bytes,
ai_cfg,
stem_name,
media_path,
on_annotation,
on_status,
)
else:
inf.run_detect_image(
file_bytes,
ai_cfg,
stem_name,
on_annotation,
on_status,
)
await loop.run_in_executor(executor, run_sync)
except Exception:
if token_mgr:
_put_media_status(
media_id, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
error_event = DetectionEvent(
annotations=[],
mediaId=media_id,
+16
View File
@@ -0,0 +1,16 @@
import xxhash
def _sampling_payload(data: bytes) -> bytes:
n = len(data)
size_bytes = n.to_bytes(8, "little")
if n < 3072:
return size_bytes + data
mid = (n - 1024) // 2
return size_bytes + data[0:1024] + data[mid : mid + 1024] + data[-1024:]
def compute_media_content_hash(data: bytes, virtual: bool = False) -> str:
payload = _sampling_payload(data)
h = xxhash.xxh64(payload).hexdigest()
return f"V{h}" if virtual else h