- Added `/detect/video` endpoint for true streaming video detection, allowing inference to start as upload bytes arrive. - Introduced `run_detect_video_stream` method in the inference module to handle video processing from a file-like object. - Updated media hashing to include a new function for computing hashes directly from files with minimal I/O. - Enhanced documentation to reflect changes in video processing and API behavior. Made-with: Cursor
5.7 KiB
Component: API
Overview
Purpose: HTTP API layer exposing object detection capabilities via FastAPI — handles request/response serialization, async task management, SSE streaming, media lifecycle management, DB-driven configuration, and authentication token forwarding.
Pattern: Controller layer — thin API surface that delegates inference to the Inference Pipeline and manages media records via the Annotations service.
Upstream: Inference Pipeline (Inference class), Domain (constants_inf for labels), Annotations service (AI settings, media records). Downstream: None (top-level, client-facing).
Modules
| Module | Role |
|---|---|
main |
FastAPI app definition, endpoints, DTOs, TokenManager, SSE streaming, media lifecycle, DB-driven config resolution |
streaming_buffer |
File-like object for concurrent write+read — enables true streaming video detection (AZ-178) |
External API Specification
GET /health
Response: HealthResponse
{
"status": "healthy",
"aiAvailability": "Enabled",
"engineType": "onnx",
"errorMessage": null
}
aiAvailability values: None, Downloading, Converting, Uploading, Enabled, Warning, Error.
POST /detect
Input: Multipart form — file (image or video bytes), optional config (JSON string), optional auth headers.
Response: list[DetectionDto]
[
{
"centerX": 0.5,
"centerY": 0.5,
"width": 0.1,
"height": 0.1,
"classNum": 0,
"label": "ArmorVehicle",
"confidence": 0.85
}
]
Behavior (AZ-173, AZ-175): Accepts both images and videos. Detects upload kind by extension, falls back to content probing. If authenticated: computes content hash, persists to storage, creates media record, tracks status lifecycle (New → AI Processing → AI Processed / Error). Errors: 400 (empty/invalid image data), 422 (runtime error), 503 (engine unavailable).
POST /detect/video
Input: Raw binary body (not multipart). Headers: X-Filename (e.g. clip.mp4), optional X-Config (JSON), optional Authorization: Bearer {token}, optional X-Refresh-Token.
Response: {"status": "started", "mediaId": "..."}
Behavior (AZ-178): True streaming video detection. Bypasses Starlette multipart buffering by accepting raw body via request.stream(). Creates a StreamingBuffer (temp file), starts inference thread immediately, feeds HTTP chunks to the buffer as they arrive. PyAV reads from the buffer concurrently, decoding frames and running inference. Detections broadcast via SSE in real-time during upload. After upload: computes content hash from file (3 KB I/O), renames to permanent path, creates media record if authenticated.
Errors: 400 (non-video extension).
POST /detect/{media_id}
Input: Path param media_id, optional JSON body AIConfigDto, headers Authorization: Bearer {token}, x-refresh-token: {token}.
Response: {"status": "started", "mediaId": "..."} (202-style).
Behavior (AZ-174): Resolves media path and AI settings from Annotations service. Merges DB settings with client overrides. Reads file bytes from resolved path, dispatches run_detect_image or run_detect_video.
Errors: 409 (duplicate detection), 503 (media path not resolved).
GET /detect/stream
Response: text/event-stream (SSE).
data: {"annotations": [...], "mediaId": "...", "mediaStatus": "AIProcessing", "mediaPercent": 50}
mediaStatus values: AIProcessing, AIProcessed, Error.
Data Access Patterns
- In-memory state:
_active_detections: dict[str, asyncio.Task]— guards against duplicate media processing_event_queues: list[asyncio.Queue]— SSE client queues (maxsize=100)
- Persistent media storage to
VIDEOS_DIR/IMAGES_DIR(AZ-175) - No direct database access — media records managed via Annotations service HTTP API
Implementation Details
Inferenceis lazy-loaded on first use viaget_inference()global functionThreadPoolExecutor(max_workers=2)runs inference off the async event loop- SSE: one
asyncio.Queueper connected client; events broadcast to all queues; full queues silently drop events TokenManagerdecodes JWT exp from base64 payload (no signature verification), auto-refreshes 60s before expiryTokenManager.decode_user_idextracts user identity from multiple JWT claim formats (sub, userId, nameid, SAML)- DB-driven config via
_resolve_media_for_detect: fetches AI settings from Annotations, merges nested sections and casing variants - Media lifecycle:
_post_media_record+_put_media_statusmanage status transitions via Annotations API - Content hashing via
compute_media_content_hash(bytes, XxHash64 with sampling) andcompute_media_content_hash_from_file(file on disk, 3 KB I/O) for media deduplication StreamingBufferfor/detect/video: concurrent file append + read viathreading.Condition, enables PyAV to decode frames as HTTP chunks arrive
Caveats
- No CORS middleware configured
- No rate limiting
- No request body size limits beyond FastAPI defaults
_active_detectionsis an in-memory dict — not persistent across restarts, not distributed- SSE queue overflow silently drops events (QueueFull caught and ignored)
- JWT token handling has no signature verification — relies entirely on the Annotations service for auth
- No graceful shutdown handling for in-progress detections
- Media record creation failures are silently caught (detection proceeds regardless)
Dependency Graph
graph TD
main --> inference
main --> constants_inf
main --> loader_http_client
main --> media_hash
main --> streaming_buffer
Logging Strategy
No explicit logging in main.py — errors are caught and returned as HTTP responses. Logging happens in downstream components.