[AZ-175] Media table integration with XxHash64 content hashing and status lifecycle

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-31 06:36:56 +03:00
parent 6c24d09eab
commit 40be55ac03
10 changed files with 381 additions and 30 deletions
@@ -0,0 +1,21 @@
# Batch Report
**Batch**: 5 (Feature Batch 1)
**Tasks**: AZ-173, AZ-174
**Date**: 2026-03-31
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|---------------|-------|-------------|--------|
| AZ-173_stream_based_run_detect | Done | 3 files | 2/2 unit, e2e pass | 5/5 ACs covered | None |
| AZ-174_db_driven_ai_config | Done | 8 files | 9/9 unit, 37/37 e2e | 5/5 ACs covered | None |
## AC Test Coverage: All covered
## Code Review Verdict: PASS_WITH_WARNINGS
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Commit: 6c24d09
## Next Batch: AZ-175 (media_table_integration)
+42 -2
View File
@@ -6,6 +6,7 @@ app = Flask(__name__)
_mode = "normal"
_annotations: list = []
_media_store: dict = {}
def _fail():
@@ -46,12 +47,44 @@ def user_ai_settings(user_id):
}
@app.route("/api/media", methods=["POST"])
def create_media():
if _fail():
return "", 503
body = request.get_json(silent=True) or {}
mid = body.get("id")
if not mid:
return "", 400
_media_store[str(mid)] = dict(body)
return body, 201
@app.route("/api/media/<media_id>/status", methods=["PUT"])
def update_media_status(media_id):
if _fail():
return "", 503
body = request.get_json(silent=True) or {}
st = body.get("mediaStatus")
key = str(media_id)
rec = _media_store.get(key)
if rec is None:
rec = {"id": key}
_media_store[key] = rec
if st is not None:
rec["mediaStatus"] = st
return "", 204
@app.route("/api/media/<media_id>", methods=["GET"])
def media_path(media_id):
if _fail():
return "", 503
key = str(media_id)
rec = _media_store.get(key)
if rec and rec.get("path"):
return {"path": rec["path"]}
root = os.environ.get("MEDIA_DIR", "/media")
if media_id.startswith("sse-") or media_id.startswith("video-"):
if key.startswith("sse-") or key.startswith("video-"):
return {"path": f"{root}/video_test01.mp4"}
return {"path": f"{root}/image_small.jpg"}
@@ -69,9 +102,10 @@ def mock_config():
@app.route("/mock/reset", methods=["POST"])
def mock_reset():
global _mode, _annotations
global _mode, _annotations, _media_store
_mode = "normal"
_annotations.clear()
_media_store.clear()
return "", 200
@@ -81,9 +115,15 @@ def mock_status():
"mode": _mode,
"annotation_count": len(_annotations),
"annotations": list(_annotations),
"media_count": len(_media_store),
}
@app.route("/mock/annotations", methods=["GET"])
def mock_annotations_list():
return {"annotations": list(_annotations)}
@app.route("/mock/media", methods=["GET"])
def mock_media_list():
return {"media": dict(_media_store)}
+1
View File
@@ -9,3 +9,4 @@ requests==2.32.4
loguru==0.7.3
python-multipart
av==14.2.0
xxhash==3.5.0
+202 -20
View File
@@ -1,11 +1,17 @@
import asyncio
import base64
import io
import json
import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Annotated, Optional
import av
import cv2
import numpy as np
import requests as http_requests
from fastapi import Body, FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.responses import StreamingResponse
@@ -19,6 +25,14 @@ executor = ThreadPoolExecutor(max_workers=2)
LOADER_URL = os.environ.get("LOADER_URL", "http://loader:8080")
ANNOTATIONS_URL = os.environ.get("ANNOTATIONS_URL", "http://annotations:8080")
_MEDIA_STATUS_NEW = 1
_MEDIA_STATUS_AI_PROCESSING = 2
_MEDIA_STATUS_AI_PROCESSED = 3
_MEDIA_STATUS_ERROR = 6
_VIDEO_EXTENSIONS = frozenset({".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"})
_IMAGE_EXTENSIONS = frozenset({".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"})
loader_client = LoaderHttpClient(LOADER_URL)
annotations_client = LoaderHttpClient(ANNOTATIONS_URL)
inference = None
@@ -214,11 +228,66 @@ def _merged_annotation_settings_payload(raw: object) -> dict:
return out
def _build_media_detect_config_dict(
def _normalize_upload_ext(filename: str) -> str:
s = Path(filename or "").suffix.lower()
return s if s else ""
def _detect_upload_kind(filename: str, data: bytes) -> tuple[str, str]:
ext = _normalize_upload_ext(filename)
if ext in _VIDEO_EXTENSIONS:
return "video", ext
if ext in _IMAGE_EXTENSIONS:
return "image", ext
arr = np.frombuffer(data, dtype=np.uint8)
if cv2.imdecode(arr, cv2.IMREAD_COLOR) is not None:
return "image", ext if ext else ".jpg"
try:
bio = io.BytesIO(data)
with av.open(bio):
pass
return "video", ext if ext else ".mp4"
except Exception:
raise HTTPException(status_code=400, detail="Invalid image or video data")
def _is_video_media_path(media_path: str) -> bool:
return Path(media_path).suffix.lower() in _VIDEO_EXTENSIONS
def _post_media_record(payload: dict, bearer: str) -> bool:
try:
headers = {"Authorization": f"Bearer {bearer}"}
r = http_requests.post(
f"{ANNOTATIONS_URL}/api/media",
json=payload,
headers=headers,
timeout=30,
)
return r.status_code in (200, 201)
except Exception:
return False
def _put_media_status(media_id: str, media_status: int, bearer: str) -> bool:
try:
headers = {"Authorization": f"Bearer {bearer}"}
r = http_requests.put(
f"{ANNOTATIONS_URL}/api/media/{media_id}/status",
json={"mediaStatus": media_status},
headers=headers,
timeout=30,
)
return r.status_code in (200, 204)
except Exception:
return False
def _resolve_media_for_detect(
media_id: str,
token_mgr: Optional[TokenManager],
override: Optional[AIConfigDto],
) -> dict:
) -> tuple[dict, str]:
cfg: dict = {}
bearer = ""
if token_mgr:
@@ -236,8 +305,7 @@ def _build_media_detect_config_dict(
status_code=503,
detail="Could not resolve media path from annotations service",
)
cfg["paths"] = [media_path]
return cfg
return cfg, media_path
def detection_to_dto(det) -> DetectionDto:
@@ -279,49 +347,120 @@ def health() -> HealthResponse:
@app.post("/detect")
async def detect_image(
request: Request,
file: UploadFile = File(...),
config: Optional[str] = Form(None),
):
import cv2
import numpy as np
from pathlib import Path
from media_hash import compute_media_content_hash
from inference import ai_config_from_dict
image_bytes = await file.read()
if not image_bytes:
raise HTTPException(status_code=400, detail="Image is empty")
arr = np.frombuffer(image_bytes, dtype=np.uint8)
if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None:
raise HTTPException(status_code=400, detail="Invalid image data")
orig_name = file.filename or "upload"
kind, ext = _detect_upload_kind(orig_name, image_bytes)
config_dict = {}
if config:
config_dict = json.loads(config)
media_name = Path(file.filename or "upload.jpg").stem.replace(" ", "")
auth_header = request.headers.get("authorization", "")
access_token = auth_header.removeprefix("Bearer ").strip() if auth_header else ""
refresh_token = request.headers.get("x-refresh-token", "")
token_mgr = TokenManager(access_token, refresh_token) if access_token else None
user_id = TokenManager.decode_user_id(access_token) if access_token else None
videos_dir = os.environ.get(
"VIDEOS_DIR", os.path.join(os.getcwd(), "data", "videos")
)
images_dir = os.environ.get(
"IMAGES_DIR", os.path.join(os.getcwd(), "data", "images")
)
storage_path = None
content_hash = None
if token_mgr and user_id:
content_hash = compute_media_content_hash(image_bytes)
base = videos_dir if kind == "video" else images_dir
os.makedirs(base, exist_ok=True)
if not ext.startswith("."):
ext = "." + ext
storage_path = os.path.abspath(os.path.join(base, f"{content_hash}{ext}"))
with open(storage_path, "wb") as out:
out.write(image_bytes)
mt = "Video" if kind == "video" else "Image"
payload = {
"id": content_hash,
"name": Path(orig_name).name,
"path": storage_path,
"mediaType": mt,
"mediaStatus": _MEDIA_STATUS_NEW,
"userId": user_id,
}
bearer = token_mgr.get_valid_token()
_post_media_record(payload, bearer)
_put_media_status(content_hash, _MEDIA_STATUS_AI_PROCESSING, bearer)
media_name = Path(orig_name).stem.replace(" ", "")
loop = asyncio.get_event_loop()
inf = get_inference()
results = []
tmp_video_path = None
def on_annotation(annotation, percent):
results.extend(annotation.detections)
ai_cfg = ai_config_from_dict(config_dict)
def run_img():
inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation)
def run_upload():
nonlocal tmp_video_path
if kind == "video":
if storage_path:
save = storage_path
else:
suf = ext if ext.startswith(".") else ".mp4"
fd, tmp_video_path = tempfile.mkstemp(suffix=suf)
os.close(fd)
with open(tmp_video_path, "wb") as f:
f.write(image_bytes)
save = tmp_video_path
inf.run_detect_video(image_bytes, ai_cfg, media_name, save, on_annotation)
else:
inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation)
try:
await loop.run_in_executor(executor, run_img)
await loop.run_in_executor(executor, run_upload)
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_AI_PROCESSED, token_mgr.get_valid_token()
)
return [detection_to_dto(d) for d in results]
except RuntimeError as e:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
if "not available" in str(e):
raise HTTPException(status_code=503, detail=str(e))
raise HTTPException(status_code=422, detail=str(e))
except ValueError as e:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
if token_mgr and user_id and content_hash:
_put_media_status(
content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
raise
finally:
if tmp_video_path and os.path.isfile(tmp_video_path):
try:
os.unlink(tmp_video_path)
except OSError:
pass
def _post_annotation_to_service(token_mgr: TokenManager, media_id: str,
@@ -362,7 +501,7 @@ async def detect_media(
refresh_token = request.headers.get("x-refresh-token", "")
token_mgr = TokenManager(access_token, refresh_token) if access_token else None
config_dict = _build_media_detect_config_dict(media_id, token_mgr, config)
config_dict, media_path = _resolve_media_for_detect(media_id, token_mgr, config)
async def run_detection():
loop = asyncio.get_event_loop()
@@ -375,6 +514,22 @@ async def detect_media(
pass
try:
from inference import ai_config_from_dict
if token_mgr:
_put_media_status(
media_id,
_MEDIA_STATUS_AI_PROCESSING,
token_mgr.get_valid_token(),
)
with open(media_path, "rb") as mf:
file_bytes = mf.read()
video = _is_video_media_path(media_path)
stem_name = Path(media_path).stem.replace(" ", "")
ai_cfg = ai_config_from_dict(config_dict)
inf = get_inference()
if not inf.is_engine_ready:
raise RuntimeError("Detection service unavailable")
@@ -391,7 +546,7 @@ async def detect_media(
if token_mgr and dtos:
_post_annotation_to_service(token_mgr, media_id, annotation, dtos)
def on_status(media_name, count):
def on_status(media_name_cb, count):
event = DetectionEvent(
annotations=[],
mediaId=media_id,
@@ -399,11 +554,38 @@ async def detect_media(
mediaPercent=100,
)
loop.call_soon_threadsafe(_enqueue, event)
if token_mgr:
_put_media_status(
media_id,
_MEDIA_STATUS_AI_PROCESSED,
token_mgr.get_valid_token(),
)
await loop.run_in_executor(
executor, inf.run_detect, config_dict, on_annotation, on_status
)
def run_sync():
if video:
inf.run_detect_video(
file_bytes,
ai_cfg,
stem_name,
media_path,
on_annotation,
on_status,
)
else:
inf.run_detect_image(
file_bytes,
ai_cfg,
stem_name,
on_annotation,
on_status,
)
await loop.run_in_executor(executor, run_sync)
except Exception:
if token_mgr:
_put_media_status(
media_id, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token()
)
error_event = DetectionEvent(
annotations=[],
mediaId=media_id,
+16
View File
@@ -0,0 +1,16 @@
import xxhash
def _sampling_payload(data: bytes) -> bytes:
n = len(data)
size_bytes = n.to_bytes(8, "little")
if n < 3072:
return size_bytes + data
mid = (n - 1024) // 2
return size_bytes + data[0:1024] + data[mid : mid + 1024] + data[-1024:]
def compute_media_content_hash(data: bytes, virtual: bool = False) -> str:
payload = _sampling_payload(data)
h = xxhash.xxh64(payload).hexdigest()
return f"V{h}" if virtual else h
+10 -8
View File
@@ -74,7 +74,7 @@ def test_merged_annotation_nested_sections():
assert out["altitude"] == 100
def test_build_media_detect_config_uses_api_path_and_defaults_when_api_empty():
def test_resolve_media_for_detect_uses_api_path_and_defaults_when_api_empty():
# Arrange
import main
@@ -84,13 +84,14 @@ def test_build_media_detect_config_uses_api_path_and_defaults_when_api_empty():
mock_ann.fetch_media_path.return_value = "/m/file.jpg"
with patch("main.annotations_client", mock_ann):
# Act
cfg = main._build_media_detect_config_dict("mid-1", tm, None)
cfg, path = main._resolve_media_for_detect("mid-1", tm, None)
# Assert
assert cfg["paths"] == ["/m/file.jpg"]
assert path == "/m/file.jpg"
assert "paths" not in cfg
assert "probability_threshold" not in cfg
def test_build_media_detect_config_override_wins():
def test_resolve_media_for_detect_override_wins():
# Arrange
import main
@@ -104,14 +105,15 @@ def test_build_media_detect_config_override_wins():
mock_ann.fetch_media_path.return_value = "/m/v.mp4"
with patch("main.annotations_client", mock_ann):
# Act
cfg = main._build_media_detect_config_dict("vid-1", tm, override)
cfg, path = main._resolve_media_for_detect("vid-1", tm, override)
# Assert
assert cfg["probability_threshold"] == 0.99
assert cfg["altitude"] == 500
assert cfg["paths"] == ["/m/v.mp4"]
assert path == "/m/v.mp4"
assert "paths" not in cfg
def test_build_media_detect_config_raises_when_no_media_path():
def test_resolve_media_for_detect_raises_when_no_media_path():
# Arrange
import main
@@ -122,5 +124,5 @@ def test_build_media_detect_config_raises_when_no_media_path():
with patch("main.annotations_client", mock_ann):
# Act / Assert
with pytest.raises(HTTPException) as exc:
main._build_media_detect_config_dict("missing", tm, None)
main._resolve_media_for_detect("missing", tm, None)
assert exc.value.status_code == 503
+43
View File
@@ -0,0 +1,43 @@
from unittest.mock import MagicMock, patch
def test_post_media_record_json_and_auth():
# Arrange
import main
mock_resp = MagicMock()
mock_resp.status_code = 201
payload = {
"id": "h1",
"name": "a.jpg",
"path": "/x/a.jpg",
"mediaType": "Image",
"mediaStatus": 1,
"userId": "u1",
}
with patch.object(main.http_requests, "post", return_value=mock_resp) as post:
# Act
ok = main._post_media_record(payload, "tok")
# Assert
assert ok is True
post.assert_called_once()
args, kwargs = post.call_args
assert kwargs["json"] == payload
assert kwargs["headers"]["Authorization"] == "Bearer tok"
def test_put_media_status_json():
# Arrange
import main
mock_resp = MagicMock()
mock_resp.status_code = 204
with patch.object(main.http_requests, "put", return_value=mock_resp) as put:
# Act
ok = main._put_media_status("mid", 2, "t")
# Assert
assert ok is True
put.assert_called_once()
_args, kwargs = put.call_args
assert kwargs["json"] == {"mediaStatus": 2}
assert "/api/media/mid/status" in put.call_args[0][0]
+46
View File
@@ -0,0 +1,46 @@
import xxhash
def test_compute_media_content_hash_small_file():
# Arrange
from media_hash import compute_media_content_hash
data = b"x" * 100
expected = xxhash.xxh64(len(data).to_bytes(8, "little") + data).hexdigest()
# Act
out = compute_media_content_hash(data)
# Assert
assert out == expected
def test_compute_media_content_hash_large_file():
# Arrange
from media_hash import compute_media_content_hash
data = (bytes(range(256)) * 20)[:5000]
n = len(data)
mid = (n - 1024) // 2
blob = (
n.to_bytes(8, "little")
+ data[0:1024]
+ data[mid : mid + 1024]
+ data[-1024:]
)
expected = xxhash.xxh64(blob).hexdigest()
# Act
out = compute_media_content_hash(data)
# Assert
assert out == expected
def test_compute_media_content_hash_virtual_prefix():
# Arrange
from media_hash import compute_media_content_hash
# Act
v = compute_media_content_hash(b"abc", virtual=True)
n = compute_media_content_hash(b"abc", virtual=False)
# Assert
assert v.startswith("V")
assert not n.startswith("V")
assert v == "V" + n