# Component: Inference Pipeline ## Overview **Purpose**: Orchestrates the full inference lifecycle — engine initialization with fallback strategy, stream-based media preprocessing (images + video from bytes), batched inference execution, postprocessing with detection filtering, and result delivery via callbacks. **Pattern**: Façade + Pipeline — `Inference` class is the single entry point that coordinates engine selection, preprocessing, inference, and postprocessing stages. **Upstream**: Domain (data models, config, status), Inference Engines (OnnxEngine/TensorRTEngine), External Client (LoaderHttpClient). **Downstream**: API (creates Inference, calls `run_detect_image` and `run_detect_video`). ## Modules | Module | Role | |--------|------| | `inference` | Core orchestrator: engine lifecycle, stream-based image/video processing, postprocessing | | `loader_http_client` | HTTP client for model download/upload (Loader) and API queries (Annotations service) | | `media_hash` | XxHash64 content hashing with sampling algorithm for media identification | ## Internal Interfaces ### Inference ``` cdef class Inference: __init__(loader_client) cpdef run_detect_image(bytes image_bytes, AIRecognitionConfig ai_config, str media_name, annotation_callback, status_callback=None) cpdef run_detect_video(bytes video_bytes, AIRecognitionConfig ai_config, str media_name, str save_path, annotation_callback, status_callback=None) cpdef run_detect_video_stream(object readable, AIRecognitionConfig ai_config, str media_name, annotation_callback, status_callback=None) cpdef stop() # Internal pipeline stages: cdef init_ai() cdef _process_video_pyav(AIRecognitionConfig, str original_media_name, object container) cdef _process_video_batch(AIRecognitionConfig, list frames, list timestamps, str name, int frame_count, int total, int model_w) cdef _append_image_frame_entries(AIRecognitionConfig, list all_frame_data, frame, str original_media_name) cdef _finalize_image_inference(AIRecognitionConfig, list all_frame_data) cdef split_to_tiles(frame, str media_stem, tile_size, overlap_percent) cdef remove_overlapping_detections(list[Detection], float threshold) -> list[Detection] (delegated to engine) ``` ### Free Functions ``` def ai_config_from_dict(dict data) -> AIRecognitionConfig ``` ### LoaderHttpClient ``` class LoaderHttpClient: cdef load_big_small_resource(str filename, str directory) -> LoadResult cdef upload_big_small_resource(bytes content, str filename, str directory) -> LoadResult cpdef fetch_user_ai_settings(str user_id, str bearer_token) -> object cpdef fetch_media_path(str media_id, str bearer_token) -> object ``` ### media_hash ``` def compute_media_content_hash(data: bytes, virtual: bool = False) -> str def compute_media_content_hash_from_file(path: str, virtual: bool = False) -> str ``` ## External API None — internal component, consumed by API layer. ## Data Access Patterns - Model bytes downloaded from Loader service (HTTP) - Converted TensorRT engines uploaded back to Loader for caching - Video frames decoded from in-memory bytes via PyAV (`av.open(BytesIO)`) — `run_detect_video` - Video frames decoded from streaming file-like via PyAV (`av.open(readable)`) — `run_detect_video_stream` (AZ-178) - Images decoded from in-memory bytes via `cv2.imdecode` - Video bytes concurrently written to persistent storage path in background thread (`run_detect_video`) or via StreamingBuffer (`run_detect_video_stream`) - All inference processing is in-memory ## Implementation Details ### Engine Initialization Strategy ``` 1. Check GPU availability (pynvml, compute capability ≥ 6.1) 2. If GPU: a. Try loading pre-built TensorRT engine from Loader b. If fails → download ONNX model → start background conversion thread c. Background thread: convert ONNX→TensorRT → upload to Loader → set _converted_model_bytes d. Next init_ai() call: load from _converted_model_bytes 3. If no GPU: a. Download ONNX model from Loader → create OnnxEngine ``` ### Preprocessing - `cv2.dnn.blobFromImage`: normalize 0..1, resize to model input, BGR→RGB - Batch via `np.vstack` ### Postprocessing - Parse `[batch][det][x1,y1,x2,y2,conf,cls]` output - Normalize coordinates to 0..1 - Convert to center-format Detection objects - Filter by confidence threshold - Remove overlapping detections (greedy: keep higher confidence, tie-break by lower class_id) ### Large Image Tiling - Ground Sampling Distance: `sensor_width * altitude / (focal_length * image_width)` - Tile size: `METERS_IN_TILE / GSD` pixels - Overlap: configurable percentage - Tile deduplication: absolute-coordinate Detection equality across adjacent tiles - Physical size filtering: remove detections exceeding class max_object_size_meters ### Video Processing (PyAV-based — AZ-173) - Reads video from in-memory `BytesIO` via `av.open` (no filesystem read needed) - Concurrently writes bytes to `save_path` for persistent storage - Frame sampling: every Nth frame - Annotation validity heuristics: time gap, detection count increase, spatial movement, confidence improvement - JPEG encoding of valid frames for annotation images ### Streaming Video Processing (AZ-178) - `run_detect_video_stream` accepts a file-like `readable` (e.g. `StreamingBuffer`) instead of `bytes` - Opens `av.open(readable)` directly — PyAV calls `read()`/`seek()` on the object as needed - No writer thread — the `StreamingBuffer` already persists data to disk as the HTTP handler feeds it chunks - Reuses `_process_video_pyav` for all frame decoding, batching, and annotation logic - For faststart MP4/MKV/WebM: true streaming (~500ms to first frame) - For standard MP4 (moov at end): graceful degradation via blocking SEEK_END ### Callbacks - `annotation_callback(annotation, percent)` — called per valid annotation - `status_callback(media_name, count)` — called when all detections for a media item are complete ## Caveats - `ThreadPoolExecutor` with max_workers=2 limits concurrent inference (set in main.py) - Background TensorRT conversion runs in a daemon thread — may be interrupted on shutdown - `init_ai()` called on every detection entry point — idempotent but checks engine state each time - Video processing is sequential per video (no parallel video processing) - `_tile_detections` dict is instance-level state that persists across image calls within a single `run_detect_image` invocation ## Dependency Graph ```mermaid graph TD inference --> constants_inf inference --> ai_availability_status inference --> annotation inference --> ai_config inference -.-> onnx_engine inference -.-> tensorrt_engine inference --> loader_http_client main --> media_hash ``` ## Logging Strategy Extensive logging via `constants_inf.log`: engine init status, media processing start, GSD calculation, tile splitting, detection results, size filtering decisions.