Files
detections/_docs/02_document/components/03_inference_pipeline/description.md
T
Oleksandr Bezdieniezhnykh be4cab4fcb [AZ-178] Implement streaming video detection endpoint
- Added `/detect/video` endpoint for true streaming video detection, allowing inference to start as upload bytes arrive.
- Introduced `run_detect_video_stream` method in the inference module to handle video processing from a file-like object.
- Updated media hashing to include a new function for computing hashes directly from files with minimal I/O.
- Enhanced documentation to reflect changes in video processing and API behavior.

Made-with: Cursor
2026-04-01 03:11:43 +03:00

6.9 KiB

Component: Inference Pipeline

Overview

Purpose: Orchestrates the full inference lifecycle — engine initialization with fallback strategy, stream-based media preprocessing (images + video from bytes), batched inference execution, postprocessing with detection filtering, and result delivery via callbacks.

Pattern: Façade + Pipeline — Inference class is the single entry point that coordinates engine selection, preprocessing, inference, and postprocessing stages.

Upstream: Domain (data models, config, status), Inference Engines (OnnxEngine/TensorRTEngine), External Client (LoaderHttpClient). Downstream: API (creates Inference, calls run_detect_image and run_detect_video).

Modules

Module Role
inference Core orchestrator: engine lifecycle, stream-based image/video processing, postprocessing
loader_http_client HTTP client for model download/upload (Loader) and API queries (Annotations service)
media_hash XxHash64 content hashing with sampling algorithm for media identification

Internal Interfaces

Inference

cdef class Inference:
    __init__(loader_client)
    cpdef run_detect_image(bytes image_bytes, AIRecognitionConfig ai_config, str media_name,
                           annotation_callback, status_callback=None)
    cpdef run_detect_video(bytes video_bytes, AIRecognitionConfig ai_config, str media_name,
                           str save_path, annotation_callback, status_callback=None)
    cpdef run_detect_video_stream(object readable, AIRecognitionConfig ai_config, str media_name,
                                  annotation_callback, status_callback=None)
    cpdef stop()

    # Internal pipeline stages:
    cdef init_ai()
    cdef _process_video_pyav(AIRecognitionConfig, str original_media_name, object container)
    cdef _process_video_batch(AIRecognitionConfig, list frames, list timestamps, str name, int frame_count, int total, int model_w)
    cdef _append_image_frame_entries(AIRecognitionConfig, list all_frame_data, frame, str original_media_name)
    cdef _finalize_image_inference(AIRecognitionConfig, list all_frame_data)
    cdef split_to_tiles(frame, str media_stem, tile_size, overlap_percent)
    cdef remove_overlapping_detections(list[Detection], float threshold) -> list[Detection]  (delegated to engine)

Free Functions

def ai_config_from_dict(dict data) -> AIRecognitionConfig

LoaderHttpClient

class LoaderHttpClient:
    cdef load_big_small_resource(str filename, str directory) -> LoadResult
    cdef upload_big_small_resource(bytes content, str filename, str directory) -> LoadResult
    cpdef fetch_user_ai_settings(str user_id, str bearer_token) -> object
    cpdef fetch_media_path(str media_id, str bearer_token) -> object

media_hash

def compute_media_content_hash(data: bytes, virtual: bool = False) -> str
def compute_media_content_hash_from_file(path: str, virtual: bool = False) -> str

External API

None — internal component, consumed by API layer.

Data Access Patterns

  • Model bytes downloaded from Loader service (HTTP)
  • Converted TensorRT engines uploaded back to Loader for caching
  • Video frames decoded from in-memory bytes via PyAV (av.open(BytesIO)) — run_detect_video
  • Video frames decoded from streaming file-like via PyAV (av.open(readable)) — run_detect_video_stream (AZ-178)
  • Images decoded from in-memory bytes via cv2.imdecode
  • Video bytes concurrently written to persistent storage path in background thread (run_detect_video) or via StreamingBuffer (run_detect_video_stream)
  • All inference processing is in-memory

Implementation Details

Engine Initialization Strategy

1. Check GPU availability (pynvml, compute capability ≥ 6.1)
2. If GPU:
   a. Try loading pre-built TensorRT engine from Loader
   b. If fails → download ONNX model → start background conversion thread
   c. Background thread: convert ONNX→TensorRT → upload to Loader → set _converted_model_bytes
   d. Next init_ai() call: load from _converted_model_bytes
3. If no GPU:
   a. Download ONNX model from Loader → create OnnxEngine

Preprocessing

  • cv2.dnn.blobFromImage: normalize 0..1, resize to model input, BGR→RGB
  • Batch via np.vstack

Postprocessing

  • Parse [batch][det][x1,y1,x2,y2,conf,cls] output
  • Normalize coordinates to 0..1
  • Convert to center-format Detection objects
  • Filter by confidence threshold
  • Remove overlapping detections (greedy: keep higher confidence, tie-break by lower class_id)

Large Image Tiling

  • Ground Sampling Distance: sensor_width * altitude / (focal_length * image_width)
  • Tile size: METERS_IN_TILE / GSD pixels
  • Overlap: configurable percentage
  • Tile deduplication: absolute-coordinate Detection equality across adjacent tiles
  • Physical size filtering: remove detections exceeding class max_object_size_meters

Video Processing (PyAV-based — AZ-173)

  • Reads video from in-memory BytesIO via av.open (no filesystem read needed)
  • Concurrently writes bytes to save_path for persistent storage
  • Frame sampling: every Nth frame
  • Annotation validity heuristics: time gap, detection count increase, spatial movement, confidence improvement
  • JPEG encoding of valid frames for annotation images

Streaming Video Processing (AZ-178)

  • run_detect_video_stream accepts a file-like readable (e.g. StreamingBuffer) instead of bytes
  • Opens av.open(readable) directly — PyAV calls read()/seek() on the object as needed
  • No writer thread — the StreamingBuffer already persists data to disk as the HTTP handler feeds it chunks
  • Reuses _process_video_pyav for all frame decoding, batching, and annotation logic
  • For faststart MP4/MKV/WebM: true streaming (~500ms to first frame)
  • For standard MP4 (moov at end): graceful degradation via blocking SEEK_END

Callbacks

  • annotation_callback(annotation, percent) — called per valid annotation
  • status_callback(media_name, count) — called when all detections for a media item are complete

Caveats

  • ThreadPoolExecutor with max_workers=2 limits concurrent inference (set in main.py)
  • Background TensorRT conversion runs in a daemon thread — may be interrupted on shutdown
  • init_ai() called on every detection entry point — idempotent but checks engine state each time
  • Video processing is sequential per video (no parallel video processing)
  • _tile_detections dict is instance-level state that persists across image calls within a single run_detect_image invocation

Dependency Graph

graph TD
    inference --> constants_inf
    inference --> ai_availability_status
    inference --> annotation
    inference --> ai_config
    inference -.-> onnx_engine
    inference -.-> tensorrt_engine
    inference --> loader_http_client
    main --> media_hash

Logging Strategy

Extensive logging via constants_inf.log: engine init status, media processing start, GSD calculation, tile splitting, detection results, size filtering decisions.