From 40be55ac03ee165861e8971f613f32da47d32c9c Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Tue, 31 Mar 2026 06:36:56 +0300 Subject: [PATCH] [AZ-175] Media table integration with XxHash64 content hashing and status lifecycle Made-with: Cursor --- .../AZ-173_stream_based_run_detect.md | 0 .../AZ-174_db_driven_ai_config.md | 0 _docs/03_implementation/batch_05_report.md | 21 ++ e2e/mocks/annotations/app.py | 44 +++- requirements.txt | 1 + src/main.py | 222 ++++++++++++++++-- src/media_hash.py | 16 ++ tests/test_az174_db_driven_config.py | 18 +- tests/test_az175_api_calls.py | 43 ++++ tests/test_media_hash.py | 46 ++++ 10 files changed, 381 insertions(+), 30 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-173_stream_based_run_detect.md (100%) rename _docs/02_tasks/{todo => done}/AZ-174_db_driven_ai_config.md (100%) create mode 100644 _docs/03_implementation/batch_05_report.md create mode 100644 src/media_hash.py create mode 100644 tests/test_az175_api_calls.py create mode 100644 tests/test_media_hash.py diff --git a/_docs/02_tasks/todo/AZ-173_stream_based_run_detect.md b/_docs/02_tasks/done/AZ-173_stream_based_run_detect.md similarity index 100% rename from _docs/02_tasks/todo/AZ-173_stream_based_run_detect.md rename to _docs/02_tasks/done/AZ-173_stream_based_run_detect.md diff --git a/_docs/02_tasks/todo/AZ-174_db_driven_ai_config.md b/_docs/02_tasks/done/AZ-174_db_driven_ai_config.md similarity index 100% rename from _docs/02_tasks/todo/AZ-174_db_driven_ai_config.md rename to _docs/02_tasks/done/AZ-174_db_driven_ai_config.md diff --git a/_docs/03_implementation/batch_05_report.md b/_docs/03_implementation/batch_05_report.md new file mode 100644 index 0000000..7964d10 --- /dev/null +++ b/_docs/03_implementation/batch_05_report.md @@ -0,0 +1,21 @@ +# Batch Report + +**Batch**: 5 (Feature Batch 1) +**Tasks**: AZ-173, AZ-174 +**Date**: 2026-03-31 + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|---------------|-------|-------------|--------| +| AZ-173_stream_based_run_detect | Done | 3 files | 2/2 unit, e2e pass | 5/5 ACs covered | None | +| AZ-174_db_driven_ai_config | Done | 8 files | 9/9 unit, 37/37 e2e | 5/5 ACs covered | None | + +## AC Test Coverage: All covered +## Code Review Verdict: PASS_WITH_WARNINGS +## Auto-Fix Attempts: 0 +## Stuck Agents: None + +## Commit: 6c24d09 + +## Next Batch: AZ-175 (media_table_integration) diff --git a/e2e/mocks/annotations/app.py b/e2e/mocks/annotations/app.py index 25778b7..6a9b05d 100644 --- a/e2e/mocks/annotations/app.py +++ b/e2e/mocks/annotations/app.py @@ -6,6 +6,7 @@ app = Flask(__name__) _mode = "normal" _annotations: list = [] +_media_store: dict = {} def _fail(): @@ -46,12 +47,44 @@ def user_ai_settings(user_id): } +@app.route("/api/media", methods=["POST"]) +def create_media(): + if _fail(): + return "", 503 + body = request.get_json(silent=True) or {} + mid = body.get("id") + if not mid: + return "", 400 + _media_store[str(mid)] = dict(body) + return body, 201 + + +@app.route("/api/media//status", methods=["PUT"]) +def update_media_status(media_id): + if _fail(): + return "", 503 + body = request.get_json(silent=True) or {} + st = body.get("mediaStatus") + key = str(media_id) + rec = _media_store.get(key) + if rec is None: + rec = {"id": key} + _media_store[key] = rec + if st is not None: + rec["mediaStatus"] = st + return "", 204 + + @app.route("/api/media/", methods=["GET"]) def media_path(media_id): if _fail(): return "", 503 + key = str(media_id) + rec = _media_store.get(key) + if rec and rec.get("path"): + return {"path": rec["path"]} root = os.environ.get("MEDIA_DIR", "/media") - if media_id.startswith("sse-") or media_id.startswith("video-"): + if key.startswith("sse-") or key.startswith("video-"): return {"path": f"{root}/video_test01.mp4"} return {"path": f"{root}/image_small.jpg"} @@ -69,9 +102,10 @@ def mock_config(): @app.route("/mock/reset", methods=["POST"]) def mock_reset(): - global _mode, _annotations + global _mode, _annotations, _media_store _mode = "normal" _annotations.clear() + _media_store.clear() return "", 200 @@ -81,9 +115,15 @@ def mock_status(): "mode": _mode, "annotation_count": len(_annotations), "annotations": list(_annotations), + "media_count": len(_media_store), } @app.route("/mock/annotations", methods=["GET"]) def mock_annotations_list(): return {"annotations": list(_annotations)} + + +@app.route("/mock/media", methods=["GET"]) +def mock_media_list(): + return {"media": dict(_media_store)} diff --git a/requirements.txt b/requirements.txt index 7cf4c35..3badc62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ requests==2.32.4 loguru==0.7.3 python-multipart av==14.2.0 +xxhash==3.5.0 diff --git a/src/main.py b/src/main.py index c5d5848..f0ca8b0 100644 --- a/src/main.py +++ b/src/main.py @@ -1,11 +1,17 @@ import asyncio import base64 +import io import json import os +import tempfile import time from concurrent.futures import ThreadPoolExecutor +from pathlib import Path from typing import Annotated, Optional +import av +import cv2 +import numpy as np import requests as http_requests from fastapi import Body, FastAPI, UploadFile, File, Form, HTTPException, Request from fastapi.responses import StreamingResponse @@ -19,6 +25,14 @@ executor = ThreadPoolExecutor(max_workers=2) LOADER_URL = os.environ.get("LOADER_URL", "http://loader:8080") ANNOTATIONS_URL = os.environ.get("ANNOTATIONS_URL", "http://annotations:8080") +_MEDIA_STATUS_NEW = 1 +_MEDIA_STATUS_AI_PROCESSING = 2 +_MEDIA_STATUS_AI_PROCESSED = 3 +_MEDIA_STATUS_ERROR = 6 + +_VIDEO_EXTENSIONS = frozenset({".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"}) +_IMAGE_EXTENSIONS = frozenset({".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"}) + loader_client = LoaderHttpClient(LOADER_URL) annotations_client = LoaderHttpClient(ANNOTATIONS_URL) inference = None @@ -214,11 +228,66 @@ def _merged_annotation_settings_payload(raw: object) -> dict: return out -def _build_media_detect_config_dict( +def _normalize_upload_ext(filename: str) -> str: + s = Path(filename or "").suffix.lower() + return s if s else "" + + +def _detect_upload_kind(filename: str, data: bytes) -> tuple[str, str]: + ext = _normalize_upload_ext(filename) + if ext in _VIDEO_EXTENSIONS: + return "video", ext + if ext in _IMAGE_EXTENSIONS: + return "image", ext + arr = np.frombuffer(data, dtype=np.uint8) + if cv2.imdecode(arr, cv2.IMREAD_COLOR) is not None: + return "image", ext if ext else ".jpg" + try: + bio = io.BytesIO(data) + with av.open(bio): + pass + return "video", ext if ext else ".mp4" + except Exception: + raise HTTPException(status_code=400, detail="Invalid image or video data") + + +def _is_video_media_path(media_path: str) -> bool: + return Path(media_path).suffix.lower() in _VIDEO_EXTENSIONS + + +def _post_media_record(payload: dict, bearer: str) -> bool: + try: + headers = {"Authorization": f"Bearer {bearer}"} + r = http_requests.post( + f"{ANNOTATIONS_URL}/api/media", + json=payload, + headers=headers, + timeout=30, + ) + return r.status_code in (200, 201) + except Exception: + return False + + +def _put_media_status(media_id: str, media_status: int, bearer: str) -> bool: + try: + headers = {"Authorization": f"Bearer {bearer}"} + r = http_requests.put( + f"{ANNOTATIONS_URL}/api/media/{media_id}/status", + json={"mediaStatus": media_status}, + headers=headers, + timeout=30, + ) + return r.status_code in (200, 204) + except Exception: + return False + + +def _resolve_media_for_detect( media_id: str, token_mgr: Optional[TokenManager], override: Optional[AIConfigDto], -) -> dict: +) -> tuple[dict, str]: cfg: dict = {} bearer = "" if token_mgr: @@ -236,8 +305,7 @@ def _build_media_detect_config_dict( status_code=503, detail="Could not resolve media path from annotations service", ) - cfg["paths"] = [media_path] - return cfg + return cfg, media_path def detection_to_dto(det) -> DetectionDto: @@ -279,49 +347,120 @@ def health() -> HealthResponse: @app.post("/detect") async def detect_image( + request: Request, file: UploadFile = File(...), config: Optional[str] = Form(None), ): - import cv2 - import numpy as np - from pathlib import Path - + from media_hash import compute_media_content_hash from inference import ai_config_from_dict image_bytes = await file.read() if not image_bytes: raise HTTPException(status_code=400, detail="Image is empty") - arr = np.frombuffer(image_bytes, dtype=np.uint8) - if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: - raise HTTPException(status_code=400, detail="Invalid image data") + orig_name = file.filename or "upload" + kind, ext = _detect_upload_kind(orig_name, image_bytes) config_dict = {} if config: config_dict = json.loads(config) - media_name = Path(file.filename or "upload.jpg").stem.replace(" ", "") + auth_header = request.headers.get("authorization", "") + access_token = auth_header.removeprefix("Bearer ").strip() if auth_header else "" + refresh_token = request.headers.get("x-refresh-token", "") + token_mgr = TokenManager(access_token, refresh_token) if access_token else None + user_id = TokenManager.decode_user_id(access_token) if access_token else None + + videos_dir = os.environ.get( + "VIDEOS_DIR", os.path.join(os.getcwd(), "data", "videos") + ) + images_dir = os.environ.get( + "IMAGES_DIR", os.path.join(os.getcwd(), "data", "images") + ) + storage_path = None + content_hash = None + if token_mgr and user_id: + content_hash = compute_media_content_hash(image_bytes) + base = videos_dir if kind == "video" else images_dir + os.makedirs(base, exist_ok=True) + if not ext.startswith("."): + ext = "." + ext + storage_path = os.path.abspath(os.path.join(base, f"{content_hash}{ext}")) + with open(storage_path, "wb") as out: + out.write(image_bytes) + mt = "Video" if kind == "video" else "Image" + payload = { + "id": content_hash, + "name": Path(orig_name).name, + "path": storage_path, + "mediaType": mt, + "mediaStatus": _MEDIA_STATUS_NEW, + "userId": user_id, + } + bearer = token_mgr.get_valid_token() + _post_media_record(payload, bearer) + _put_media_status(content_hash, _MEDIA_STATUS_AI_PROCESSING, bearer) + + media_name = Path(orig_name).stem.replace(" ", "") loop = asyncio.get_event_loop() inf = get_inference() results = [] + tmp_video_path = None def on_annotation(annotation, percent): results.extend(annotation.detections) ai_cfg = ai_config_from_dict(config_dict) - def run_img(): - inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation) + def run_upload(): + nonlocal tmp_video_path + if kind == "video": + if storage_path: + save = storage_path + else: + suf = ext if ext.startswith(".") else ".mp4" + fd, tmp_video_path = tempfile.mkstemp(suffix=suf) + os.close(fd) + with open(tmp_video_path, "wb") as f: + f.write(image_bytes) + save = tmp_video_path + inf.run_detect_video(image_bytes, ai_cfg, media_name, save, on_annotation) + else: + inf.run_detect_image(image_bytes, ai_cfg, media_name, on_annotation) try: - await loop.run_in_executor(executor, run_img) + await loop.run_in_executor(executor, run_upload) + if token_mgr and user_id and content_hash: + _put_media_status( + content_hash, _MEDIA_STATUS_AI_PROCESSED, token_mgr.get_valid_token() + ) return [detection_to_dto(d) for d in results] except RuntimeError as e: + if token_mgr and user_id and content_hash: + _put_media_status( + content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token() + ) if "not available" in str(e): raise HTTPException(status_code=503, detail=str(e)) raise HTTPException(status_code=422, detail=str(e)) except ValueError as e: + if token_mgr and user_id and content_hash: + _put_media_status( + content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token() + ) raise HTTPException(status_code=400, detail=str(e)) + except Exception: + if token_mgr and user_id and content_hash: + _put_media_status( + content_hash, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token() + ) + raise + finally: + if tmp_video_path and os.path.isfile(tmp_video_path): + try: + os.unlink(tmp_video_path) + except OSError: + pass def _post_annotation_to_service(token_mgr: TokenManager, media_id: str, @@ -362,7 +501,7 @@ async def detect_media( refresh_token = request.headers.get("x-refresh-token", "") token_mgr = TokenManager(access_token, refresh_token) if access_token else None - config_dict = _build_media_detect_config_dict(media_id, token_mgr, config) + config_dict, media_path = _resolve_media_for_detect(media_id, token_mgr, config) async def run_detection(): loop = asyncio.get_event_loop() @@ -375,6 +514,22 @@ async def detect_media( pass try: + from inference import ai_config_from_dict + + if token_mgr: + _put_media_status( + media_id, + _MEDIA_STATUS_AI_PROCESSING, + token_mgr.get_valid_token(), + ) + + with open(media_path, "rb") as mf: + file_bytes = mf.read() + + video = _is_video_media_path(media_path) + stem_name = Path(media_path).stem.replace(" ", "") + ai_cfg = ai_config_from_dict(config_dict) + inf = get_inference() if not inf.is_engine_ready: raise RuntimeError("Detection service unavailable") @@ -391,7 +546,7 @@ async def detect_media( if token_mgr and dtos: _post_annotation_to_service(token_mgr, media_id, annotation, dtos) - def on_status(media_name, count): + def on_status(media_name_cb, count): event = DetectionEvent( annotations=[], mediaId=media_id, @@ -399,11 +554,38 @@ async def detect_media( mediaPercent=100, ) loop.call_soon_threadsafe(_enqueue, event) + if token_mgr: + _put_media_status( + media_id, + _MEDIA_STATUS_AI_PROCESSED, + token_mgr.get_valid_token(), + ) - await loop.run_in_executor( - executor, inf.run_detect, config_dict, on_annotation, on_status - ) + def run_sync(): + if video: + inf.run_detect_video( + file_bytes, + ai_cfg, + stem_name, + media_path, + on_annotation, + on_status, + ) + else: + inf.run_detect_image( + file_bytes, + ai_cfg, + stem_name, + on_annotation, + on_status, + ) + + await loop.run_in_executor(executor, run_sync) except Exception: + if token_mgr: + _put_media_status( + media_id, _MEDIA_STATUS_ERROR, token_mgr.get_valid_token() + ) error_event = DetectionEvent( annotations=[], mediaId=media_id, diff --git a/src/media_hash.py b/src/media_hash.py new file mode 100644 index 0000000..262c05e --- /dev/null +++ b/src/media_hash.py @@ -0,0 +1,16 @@ +import xxhash + + +def _sampling_payload(data: bytes) -> bytes: + n = len(data) + size_bytes = n.to_bytes(8, "little") + if n < 3072: + return size_bytes + data + mid = (n - 1024) // 2 + return size_bytes + data[0:1024] + data[mid : mid + 1024] + data[-1024:] + + +def compute_media_content_hash(data: bytes, virtual: bool = False) -> str: + payload = _sampling_payload(data) + h = xxhash.xxh64(payload).hexdigest() + return f"V{h}" if virtual else h diff --git a/tests/test_az174_db_driven_config.py b/tests/test_az174_db_driven_config.py index 15319ef..4f2db1f 100644 --- a/tests/test_az174_db_driven_config.py +++ b/tests/test_az174_db_driven_config.py @@ -74,7 +74,7 @@ def test_merged_annotation_nested_sections(): assert out["altitude"] == 100 -def test_build_media_detect_config_uses_api_path_and_defaults_when_api_empty(): +def test_resolve_media_for_detect_uses_api_path_and_defaults_when_api_empty(): # Arrange import main @@ -84,13 +84,14 @@ def test_build_media_detect_config_uses_api_path_and_defaults_when_api_empty(): mock_ann.fetch_media_path.return_value = "/m/file.jpg" with patch("main.annotations_client", mock_ann): # Act - cfg = main._build_media_detect_config_dict("mid-1", tm, None) + cfg, path = main._resolve_media_for_detect("mid-1", tm, None) # Assert - assert cfg["paths"] == ["/m/file.jpg"] + assert path == "/m/file.jpg" + assert "paths" not in cfg assert "probability_threshold" not in cfg -def test_build_media_detect_config_override_wins(): +def test_resolve_media_for_detect_override_wins(): # Arrange import main @@ -104,14 +105,15 @@ def test_build_media_detect_config_override_wins(): mock_ann.fetch_media_path.return_value = "/m/v.mp4" with patch("main.annotations_client", mock_ann): # Act - cfg = main._build_media_detect_config_dict("vid-1", tm, override) + cfg, path = main._resolve_media_for_detect("vid-1", tm, override) # Assert assert cfg["probability_threshold"] == 0.99 assert cfg["altitude"] == 500 - assert cfg["paths"] == ["/m/v.mp4"] + assert path == "/m/v.mp4" + assert "paths" not in cfg -def test_build_media_detect_config_raises_when_no_media_path(): +def test_resolve_media_for_detect_raises_when_no_media_path(): # Arrange import main @@ -122,5 +124,5 @@ def test_build_media_detect_config_raises_when_no_media_path(): with patch("main.annotations_client", mock_ann): # Act / Assert with pytest.raises(HTTPException) as exc: - main._build_media_detect_config_dict("missing", tm, None) + main._resolve_media_for_detect("missing", tm, None) assert exc.value.status_code == 503 diff --git a/tests/test_az175_api_calls.py b/tests/test_az175_api_calls.py new file mode 100644 index 0000000..016b159 --- /dev/null +++ b/tests/test_az175_api_calls.py @@ -0,0 +1,43 @@ +from unittest.mock import MagicMock, patch + + +def test_post_media_record_json_and_auth(): + # Arrange + import main + + mock_resp = MagicMock() + mock_resp.status_code = 201 + payload = { + "id": "h1", + "name": "a.jpg", + "path": "/x/a.jpg", + "mediaType": "Image", + "mediaStatus": 1, + "userId": "u1", + } + with patch.object(main.http_requests, "post", return_value=mock_resp) as post: + # Act + ok = main._post_media_record(payload, "tok") + # Assert + assert ok is True + post.assert_called_once() + args, kwargs = post.call_args + assert kwargs["json"] == payload + assert kwargs["headers"]["Authorization"] == "Bearer tok" + + +def test_put_media_status_json(): + # Arrange + import main + + mock_resp = MagicMock() + mock_resp.status_code = 204 + with patch.object(main.http_requests, "put", return_value=mock_resp) as put: + # Act + ok = main._put_media_status("mid", 2, "t") + # Assert + assert ok is True + put.assert_called_once() + _args, kwargs = put.call_args + assert kwargs["json"] == {"mediaStatus": 2} + assert "/api/media/mid/status" in put.call_args[0][0] diff --git a/tests/test_media_hash.py b/tests/test_media_hash.py new file mode 100644 index 0000000..85b7c68 --- /dev/null +++ b/tests/test_media_hash.py @@ -0,0 +1,46 @@ +import xxhash + + +def test_compute_media_content_hash_small_file(): + # Arrange + from media_hash import compute_media_content_hash + + data = b"x" * 100 + expected = xxhash.xxh64(len(data).to_bytes(8, "little") + data).hexdigest() + # Act + out = compute_media_content_hash(data) + # Assert + assert out == expected + + +def test_compute_media_content_hash_large_file(): + # Arrange + from media_hash import compute_media_content_hash + + data = (bytes(range(256)) * 20)[:5000] + n = len(data) + mid = (n - 1024) // 2 + blob = ( + n.to_bytes(8, "little") + + data[0:1024] + + data[mid : mid + 1024] + + data[-1024:] + ) + expected = xxhash.xxh64(blob).hexdigest() + # Act + out = compute_media_content_hash(data) + # Assert + assert out == expected + + +def test_compute_media_content_hash_virtual_prefix(): + # Arrange + from media_hash import compute_media_content_hash + + # Act + v = compute_media_content_hash(b"abc", virtual=True) + n = compute_media_content_hash(b"abc", virtual=False) + # Assert + assert v.startswith("V") + assert not n.startswith("V") + assert v == "V" + n