Files
detections/_docs/02_document/modules/main.md
T
Oleksandr Bezdieniezhnykh be4cab4fcb [AZ-178] Implement streaming video detection endpoint
- Added `/detect/video` endpoint for true streaming video detection, allowing inference to start as upload bytes arrive.
- Introduced `run_detect_video_stream` method in the inference module to handle video processing from a file-like object.
- Updated media hashing to include a new function for computing hashes directly from files with minimal I/O.
- Enhanced documentation to reflect changes in video processing and API behavior.

Made-with: Cursor
2026-04-01 03:11:43 +03:00

8.7 KiB

Module: main

Purpose

FastAPI application entry point — exposes HTTP API for object detection on images and video media, health checks, and Server-Sent Events (SSE) streaming of detection results. Manages media lifecycle (content hashing, persistent storage, media record creation, status updates) and DB-driven AI configuration.

Public Interface

API Endpoints

Method Path Description
GET /health Returns AI engine availability status
POST /detect Image/video detection with media lifecycle management (buffered)
POST /detect/video Streaming video detection — inference starts as upload bytes arrive (AZ-178)
POST /detect/{media_id} Start async detection on media resolved from Annotations service
GET /detect/stream SSE stream of detection events

DTOs (Pydantic Models)

Model Fields Description
DetectionDto centerX, centerY, width, height, classNum, label, confidence Single detection result
DetectionEvent annotations (list[DetectionDto]), mediaId, mediaStatus, mediaPercent SSE event payload
HealthResponse status, aiAvailability, engineType, errorMessage Health check response
AIConfigDto frame_period_recognition, frame_recognition_seconds, probability_threshold, tracking_*, model_batch_size, big_image_tile_overlap_percent, altitude, focal_length, sensor_width Configuration input (no paths field — removed in AZ-174)

Class: TokenManager

Method Signature Description
__init__ (str access_token, str refresh_token) Stores tokens
get_valid_token () -> str Returns access_token; auto-refreshes if expiring within 60s
decode_user_id (str token) -> Optional[str] Static. Extracts user ID from JWT claims (sub, userId, user_id, nameid, or SAML nameidentifier)

Helper Functions

Function Signature Description
_merged_annotation_settings_payload (raw: object) -> dict Merges nested AI settings from Annotations service response (handles aiRecognitionSettings, cameraSettings sub-objects and PascalCase/camelCase/snake_case aliases)
_resolve_media_for_detect (media_id, token_mgr, override) -> tuple[dict, str] Fetches user AI settings + media path from Annotations service, merges with client overrides
_detect_upload_kind (filename, data) -> tuple[str, str] Determines if upload is image or video by extension, falls back to content probing (cv2/PyAV)
_post_media_record (payload, bearer) -> bool Creates media record via POST /api/media on Annotations service
_put_media_status (media_id, status, bearer) -> bool Updates media status via PUT /api/media/{media_id}/status on Annotations service
compute_media_content_hash (imported from media_hash) XxHash64 content hash with sampling (from bytes)
compute_media_content_hash_from_file (imported from media_hash) XxHash64 content hash from file on disk — reads only 3 KB

Internal Logic

/health

Returns HealthResponse with status="healthy" always. aiAvailability reflects the engine's AIAvailabilityStatus. engineType reports the active engine name. On exception, returns aiAvailability="None".

/detect (unified upload — AZ-173, AZ-175)

  1. Reads uploaded file bytes, rejects empty
  2. Detects kind (image/video) via _detect_upload_kind (extension → content probe)
  3. Validates image data with cv2.imdecode if kind is image
  4. Parses optional JSON config
  5. Extracts auth tokens; if authenticated: a. Computes XxHash64 content hash b. For images: persists file to IMAGES_DIR synchronously (since run_detect_image does not write to disk) c. For videos: file path is prepared but writing is deferred to run_detect_video which writes concurrently with frame detection (AZ-177) d. Creates media record via POST /api/media e. Sets status to AI_PROCESSING via PUT /api/media/{id}/status
  6. Runs run_detect_image or run_detect_video in ThreadPoolExecutor
  7. On success: sets status to AI_PROCESSED
  8. On failure: sets status to ERROR
  9. Returns list of DetectionDto

/detect/{media_id} (async — AZ-174)

  1. Checks for duplicate active detection (409)
  2. Extracts auth tokens
  3. Resolves media via _resolve_media_for_detect: a. Fetches user AI settings from GET /api/users/{user_id}/ai-settings b. Merges with client overrides c. Fetches media path from GET /api/media/{media_id}
  4. Reads file bytes from resolved path
  5. Creates asyncio.Task for background detection
  6. Calls run_detect_video or run_detect_image depending on file extension
  7. Callbacks push DetectionEvent to SSE queues and POST annotations to Annotations service
  8. Updates media status via PUT /api/media/{id}/status
  9. Returns immediately: {"status": "started", "mediaId": media_id}

/detect/video (streaming upload — AZ-178)

  1. Parses X-Filename, X-Config, auth headers (no multipart — raw binary body)
  2. Validates video extension
  3. Creates StreamingBuffer backed by a temp file in VIDEOS_DIR
  4. Starts inference thread via run_in_executor: run_detect_video_stream(buffer, ...)
  5. Reads HTTP body chunks via request.stream(), feeds each to buffer.append() via executor
  6. Inference thread reads from the same buffer concurrently — PyAV decodes frames as data arrives
  7. Detections are broadcast to SSE queues in real-time during upload
  8. After upload completes: signals EOF, computes content hash from temp file (3 KB read), renames to permanent path
  9. If authenticated: creates media record, tracks status lifecycle
  10. Returns {"status": "started", "mediaId": "..."} — inference continues in background task
  11. Background task awaits inference completion, updates status to AI_PROCESSED or Error

/detect/stream (SSE)

  • Creates asyncio.Queue per client (maxsize=100)
  • Yields data: {json}\n\n SSE format
  • Cleans up queue on disconnect

Token Management

  • _decode_exp: Decodes JWT exp claim from base64 payload (no signature verification)
  • Auto-refreshes via POST to {ANNOTATIONS_URL}/auth/refresh when within 60s of expiry
  • decode_user_id: Extracts user identity from multiple possible JWT claim keys

Annotations Service Integration

Detections posts results to POST {ANNOTATIONS_URL}/annotations during async media detection (F3). Media lifecycle (create record, update status) uses POST /api/media and PUT /api/media/{media_id}/status.

Dependencies

  • External: asyncio, base64, io, json, os, tempfile, time, concurrent.futures, pathlib, typing, av, cv2, numpy, requests, fastapi, pydantic
  • Internal: inference (lazy import), constants_inf (label lookup), loader_http_client (client instantiation), media_hash (content hashing), streaming_buffer (streaming video upload)

Consumers

None (entry point).

Data Models

  • DetectionDto, DetectionEvent, HealthResponse, AIConfigDto — Pydantic models for API
  • TokenManager — JWT token lifecycle

Configuration

Env Var Default Description
LOADER_URL http://loader:8080 Loader service base URL
ANNOTATIONS_URL http://annotations:8080 Annotations service base URL
VIDEOS_DIR {cwd}/data/videos Persistent video storage directory
IMAGES_DIR {cwd}/data/images Persistent image storage directory

External Integrations

Service Protocol Purpose
Loader HTTP (via LoaderHttpClient) Model loading
Annotations HTTP GET User AI settings (/api/users/{id}/ai-settings), media path resolution (/api/media/{id})
Annotations HTTP POST Annotation posting (/annotations), media record creation (/api/media)
Annotations HTTP PUT Media status updates (/api/media/{id}/status)
Annotations HTTP POST Auth refresh (/auth/refresh)

Security

  • Bearer token from request headers, refreshed via Annotations service
  • JWT exp decoded (base64, no signature verification) — token validation is not performed locally
  • Image data validated via cv2.imdecode before processing
  • No CORS configuration
  • No rate limiting

Tests

  • tests/test_az174_db_driven_config.pydecode_user_id, _merged_annotation_settings_payload, _resolve_media_for_detect
  • tests/test_az175_api_calls.py_post_media_record, _put_media_status
  • tests/test_az177_video_single_write.py — video single-write, image unchanged, concurrent writer thread, temp cleanup
  • e2e/tests/test_*.py — full API e2e tests (health, single image, video, async, SSE, negative, security, performance, resilience)