From e65d8da6a3f0c0bf4a8d240b281e724215908562 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 1 Apr 2026 01:12:05 +0300 Subject: [PATCH] [AZ-177] Remove redundant synchronous video pre-writes in /detect endpoint Made-with: Cursor --- _docs/02_tasks/_dependencies_table.md | 10 +- .../AZ-177_remove_redundant_video_prewrite.md | 67 +++++ _docs/_autopilot_state.md | 6 +- src/main.py | 7 +- tests/test_az177_video_single_write.py | 244 ++++++++++++++++++ 5 files changed, 325 insertions(+), 9 deletions(-) create mode 100644 _docs/02_tasks/todo/AZ-177_remove_redundant_video_prewrite.md create mode 100644 tests/test_az177_video_single_write.py diff --git a/_docs/02_tasks/_dependencies_table.md b/_docs/02_tasks/_dependencies_table.md index d564677..9451a76 100644 --- a/_docs/02_tasks/_dependencies_table.md +++ b/_docs/02_tasks/_dependencies_table.md @@ -1,8 +1,8 @@ # Dependencies Table **Date**: 2026-03-31 -**Total Tasks**: 15 (11 test + 4 feature — all done) -**Total Complexity Points**: 43 +**Total Tasks**: 16 (11 test + 5 feature) +**Total Complexity Points**: 45 ## Completed Tasks (Steps 4-6: Tests) @@ -28,3 +28,9 @@ | AZ-174 | db_driven_ai_config | 2 | None | AZ-172 | done | | AZ-175 | media_table_integration | 2 | AZ-173 | AZ-172 | done | | AZ-176 | cleanup_obsolete_path_code | 1 | AZ-173, AZ-174 | AZ-172 | done | + +## Current Cycle (Step 9: New Features) + +| Task | Name | Complexity | Dependencies | Epic | Status | +|------|------|-----------|-------------|------|--------| +| AZ-177 | remove_redundant_video_prewrite | 2 | AZ-173 | AZ-172 | todo | diff --git a/_docs/02_tasks/todo/AZ-177_remove_redundant_video_prewrite.md b/_docs/02_tasks/todo/AZ-177_remove_redundant_video_prewrite.md new file mode 100644 index 0000000..7e27fad --- /dev/null +++ b/_docs/02_tasks/todo/AZ-177_remove_redundant_video_prewrite.md @@ -0,0 +1,67 @@ +# Remove Redundant Synchronous Video Pre-Write in /detect Endpoint + +**Task**: AZ-177_remove_redundant_video_prewrite +**Name**: Remove redundant synchronous video file writes in /detect endpoint +**Description**: The `/detect` endpoint writes video bytes to disk synchronously before calling `run_detect_video`, which writes them again in a background thread concurrently with frame detection. Remove the redundant pre-writes so videos are written only once — by inference.pyx — concurrently with detection, as AZ-173 intended. +**Complexity**: 2 points +**Dependencies**: AZ-173 (stream-based run_detect) +**Component**: Main +**Jira**: AZ-177 +**Parent**: AZ-172 + +## Problem + +After AZ-173 implemented simultaneous disk-write + frame-detection in `inference.pyx`, the `/detect` endpoint in `main.py` still writes video bytes to disk **synchronously before** calling `run_detect_video`. Since `run_detect_video` internally spawns a thread to write the same bytes to the same path, every video upload gets written to disk **twice**: + +1. **Auth'd path** (lines 394-395): `storage_path` is written synchronously via `open(storage_path, "wb").write(image_bytes)`, then `run_detect_video(..., save_path=storage_path, ...)` writes to the same path again in a background thread. + +2. **Non-auth'd path** (lines 427-430): A temp file is created and written synchronously via `tempfile.mkstemp` + `open(tmp_video_path, "wb").write(image_bytes)`, then `run_detect_video(..., save_path=tmp_video_path, ...)` writes to the same path again. + +This defeats the purpose of AZ-173's concurrent design: the video data is fully written before detection starts, so there is no actual concurrency between writing and detecting. + +### How inference.pyx handles it (correctly) + +```python +# inference.pyx run_detect_video: +writer_done = threading.Event() +wt = threading.Thread(target=_write_video_bytes_to_path, args=(save_path, video_bytes, writer_done)) +wt.start() # thread A: writes bytes to disk +bio = io.BytesIO(video_bytes) +container = av.open(bio) # thread B (caller): decodes frames via PyAV +self._process_video_pyav(...) # detection happens concurrently with disk write +writer_done.wait() # wait for write to finish +``` + +## Target State + +### Auth'd video uploads +- Do NOT write file at line 394 for videos — only write for images (since `run_detect_image` doesn't write to disk) +- Pass `storage_path` to `run_detect_video`; let inference handle the concurrent write + +### Non-auth'd video uploads +- Do NOT create/write a temp file with `tempfile.mkstemp` + `open().write()` +- Instead, only create a temp file **path** (empty) and pass it to `run_detect_video` which writes the data concurrently with detection +- Alternatively: use `tempfile.mktemp` or build the path manually without pre-writing + +### Image uploads (no change needed) +- `run_detect_image` does NOT write to disk, so the synchronous write at line 394 remains necessary for images + +## Acceptance Criteria + +- [ ] Video bytes are written to disk exactly once (by `_write_video_bytes_to_path` in inference.pyx), not twice +- [ ] For videos, disk write and frame detection happen concurrently (not sequentially) +- [ ] Image storage behavior is unchanged (synchronous write before detection) +- [ ] Temp file cleanup in the `finally` block still works correctly +- [ ] Auth'd video uploads: media record and status updates unchanged +- [ ] All existing tests pass + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/main.py` | Modified | Split storage write by media kind; remove redundant video pre-writes | + +## Technical Notes + +- The `finally` block (lines 463-468) cleans up `tmp_video_path` — this must still work after the change. Since `run_detect_video` waits for the writer thread with `writer_done.wait()`, the file will exist when cleanup runs. +- `tempfile.mkstemp` creates the file atomically (open + create); we may need `tempfile.mkstemp` still just for the safe path generation, but skip the `write()` call. Or use a different approach to generate the temp path. diff --git a/_docs/_autopilot_state.md b/_docs/_autopilot_state.md index 803859e..8d8bb3a 100644 --- a/_docs/_autopilot_state.md +++ b/_docs/_autopilot_state.md @@ -2,8 +2,8 @@ ## Current Step flow: existing-code -step: 11 -name: Update Docs -status: done +step: 9 +name: Implement +status: in_progress sub_step: 0 retry_count: 0 diff --git a/src/main.py b/src/main.py index 036b263..e31330b 100644 --- a/src/main.py +++ b/src/main.py @@ -391,8 +391,9 @@ async def detect_image( if not ext.startswith("."): ext = "." + ext storage_path = os.path.abspath(os.path.join(base, f"{content_hash}{ext}")) - with open(storage_path, "wb") as out: - out.write(image_bytes) + if kind == "image": + with open(storage_path, "wb") as out: + out.write(image_bytes) mt = "Video" if kind == "video" else "Image" payload = { "id": content_hash, @@ -426,8 +427,6 @@ async def detect_image( suf = ext if ext.startswith(".") else ".mp4" fd, tmp_video_path = tempfile.mkstemp(suffix=suf) os.close(fd) - with open(tmp_video_path, "wb") as f: - f.write(image_bytes) save = tmp_video_path inf.run_detect_video(image_bytes, ai_cfg, media_name, save, on_annotation) else: diff --git a/tests/test_az177_video_single_write.py b/tests/test_az177_video_single_write.py new file mode 100644 index 0000000..c94bf47 --- /dev/null +++ b/tests/test_az177_video_single_write.py @@ -0,0 +1,244 @@ +import base64 +import json +import os +import tempfile +import threading +import time +from unittest.mock import MagicMock, patch + +import cv2 +import numpy as np +import pytest +from fastapi.testclient import TestClient + +pytest.importorskip("inference") + +import inference as inference_mod + + +def _access_jwt(sub: str = "u1") -> str: + raw = json.dumps( + {"exp": int(time.time()) + 3600, "sub": sub}, separators=(",", ":") + ).encode() + payload = base64.urlsafe_b64encode(raw).decode().rstrip("=") + return f"h.{payload}.s" + + +def _jpeg_bytes() -> bytes: + _, buf = cv2.imencode(".jpg", np.zeros((2, 2, 3), dtype=np.uint8)) + return buf.tobytes() + + +class _FakeInfVideo: + def run_detect_video( + self, + video_bytes, + ai_cfg, + media_name, + save_path, + on_annotation, + status_callback=None, + ): + writer = inference_mod._write_video_bytes_to_path + ev = threading.Event() + t = threading.Thread( + target=writer, + args=(save_path, video_bytes, ev), + ) + t.start() + ev.wait(timeout=60) + t.join(timeout=60) + + def run_detect_image(self, *args, **kwargs): + pass + + +class _FakeInfVideoConcurrent: + def run_detect_video( + self, + video_bytes, + ai_cfg, + media_name, + save_path, + on_annotation, + status_callback=None, + ): + holder = {} + writer = inference_mod._write_video_bytes_to_path + + def write(): + holder["tid"] = threading.get_ident() + writer(save_path, video_bytes, threading.Event()) + + t = threading.Thread(target=write) + t.start() + holder["caller_tid"] = threading.get_ident() + t.join(timeout=60) + assert holder["tid"] != holder["caller_tid"] + + def run_detect_image(self, *args, **kwargs): + pass + + +@pytest.fixture +def reset_main_inference(): + import main + + main.inference = None + yield + main.inference = None + + +def test_auth_video_storage_path_opened_wb_once(reset_main_inference): + # Arrange + import main + from media_hash import compute_media_content_hash + + write_paths = [] + real_write = inference_mod._write_video_bytes_to_path + + def tracking_write(path, data, ev): + write_paths.append(os.path.abspath(str(path))) + return real_write(path, data, ev) + + video_body = b"vid-bytes-" * 20 + token = _access_jwt() + mock_post = MagicMock() + mock_post.return_value.status_code = 201 + mock_put = MagicMock() + mock_put.return_value.status_code = 204 + with tempfile.TemporaryDirectory() as vd: + os.environ["VIDEOS_DIR"] = vd + content_hash = compute_media_content_hash(video_body) + expected_path = os.path.abspath(os.path.join(vd, f"{content_hash}.mp4")) + client = TestClient(main.app) + with ( + patch.object(inference_mod, "_write_video_bytes_to_path", tracking_write), + patch.object(main.http_requests, "post", mock_post), + patch.object(main.http_requests, "put", mock_put), + patch.object(main, "get_inference", return_value=_FakeInfVideo()), + ): + # Act + r = client.post( + "/detect", + files={"file": ("clip.mp4", video_body, "video/mp4")}, + headers={"Authorization": f"Bearer {token}"}, + ) + # Assert + assert r.status_code == 200 + assert write_paths.count(expected_path) == 1 + with open(expected_path, "rb") as f: + assert f.read() == video_body + assert mock_post.called + assert mock_put.call_count >= 2 + + +def test_non_auth_temp_video_opened_wb_once_and_removed(reset_main_inference): + # Arrange + import main + + write_paths = [] + real_write = inference_mod._write_video_bytes_to_path + + def tracking_write(path, data, ev): + write_paths.append(os.path.abspath(str(path))) + return real_write(path, data, ev) + + video_body = b"tmp-vid-" * 30 + client = TestClient(main.app) + tmp_path_holder = [] + + class _CaptureTmp(_FakeInfVideo): + def run_detect_video(self, video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback=None): + tmp_path_holder.append(os.path.abspath(str(save_path))) + super().run_detect_video( + video_bytes, ai_cfg, media_name, save_path, on_annotation, status_callback + ) + + with ( + patch.object(inference_mod, "_write_video_bytes_to_path", tracking_write), + patch.object(main, "get_inference", return_value=_CaptureTmp()), + ): + # Act + r = client.post( + "/detect", + files={"file": ("n.mp4", video_body, "video/mp4")}, + ) + # Assert + assert r.status_code == 200 + assert len(tmp_path_holder) == 1 + tmp_path = tmp_path_holder[0] + assert write_paths.count(tmp_path) == 1 + assert not os.path.isfile(tmp_path) + + +def test_auth_image_still_writes_once_before_detect(reset_main_inference): + # Arrange + import builtins + import main + from media_hash import compute_media_content_hash + + real_open = builtins.open + wb_hits = [] + + def tracking_open(file, mode="r", *args, **kwargs): + if mode == "wb": + wb_hits.append(os.path.abspath(str(file))) + return real_open(file, mode, *args, **kwargs) + + img = _jpeg_bytes() + token = _access_jwt() + mock_post = MagicMock() + mock_post.return_value.status_code = 201 + mock_put = MagicMock() + mock_put.return_value.status_code = 204 + with tempfile.TemporaryDirectory() as idir: + os.environ["IMAGES_DIR"] = idir + content_hash = compute_media_content_hash(img) + expected_path = os.path.abspath(os.path.join(idir, f"{content_hash}.jpg")) + client = TestClient(main.app) + with ( + patch.object(builtins, "open", tracking_open), + patch.object(main.http_requests, "post", mock_post), + patch.object(main.http_requests, "put", mock_put), + patch.object(main, "get_inference", return_value=_FakeInfVideo()), + ): + # Act + r = client.post( + "/detect", + files={"file": ("p.jpg", img, "image/jpeg")}, + headers={"Authorization": f"Bearer {token}"}, + ) + # Assert + assert r.status_code == 200 + assert wb_hits.count(expected_path) == 1 + with real_open(expected_path, "rb") as f: + assert f.read() == img + + +def test_video_writer_runs_in_separate_thread_from_executor(reset_main_inference): + # Arrange + import main + + token = _access_jwt() + mock_post = MagicMock() + mock_post.return_value.status_code = 201 + mock_put = MagicMock() + mock_put.return_value.status_code = 204 + video_body = b"thr-test-" * 15 + with tempfile.TemporaryDirectory() as vd: + os.environ["VIDEOS_DIR"] = vd + client = TestClient(main.app) + with ( + patch.object(main.http_requests, "post", mock_post), + patch.object(main.http_requests, "put", mock_put), + patch.object(main, "get_inference", return_value=_FakeInfVideoConcurrent()), + ): + # Act + r = client.post( + "/detect", + files={"file": ("c.mp4", video_body, "video/mp4")}, + headers={"Authorization": f"Bearer {token}"}, + ) + # Assert + assert r.status_code == 200