- Update module docs: main, inference, ai_config, loader_http_client - Add new module doc: media_hash - Update component docs: inference_pipeline, api - Update system-flows (F2, F3) and data_parameters - Add Task Mode to document skill for incremental doc updates - Insert Step 11 (Update Docs) in existing-code flow, renumber 11-13 to 12-14 Made-with: Cursor
5.9 KiB
Component: Inference Pipeline
Overview
Purpose: Orchestrates the full inference lifecycle — engine initialization with fallback strategy, stream-based media preprocessing (images + video from bytes), batched inference execution, postprocessing with detection filtering, and result delivery via callbacks.
Pattern: Façade + Pipeline — Inference class is the single entry point that coordinates engine selection, preprocessing, inference, and postprocessing stages.
Upstream: Domain (data models, config, status), Inference Engines (OnnxEngine/TensorRTEngine), External Client (LoaderHttpClient).
Downstream: API (creates Inference, calls run_detect_image and run_detect_video).
Modules
| Module | Role |
|---|---|
inference |
Core orchestrator: engine lifecycle, stream-based image/video processing, postprocessing |
loader_http_client |
HTTP client for model download/upload (Loader) and API queries (Annotations service) |
media_hash |
XxHash64 content hashing with sampling algorithm for media identification |
Internal Interfaces
Inference
cdef class Inference:
__init__(loader_client)
cpdef run_detect_image(bytes image_bytes, AIRecognitionConfig ai_config, str media_name,
annotation_callback, status_callback=None)
cpdef run_detect_video(bytes video_bytes, AIRecognitionConfig ai_config, str media_name,
str save_path, annotation_callback, status_callback=None)
cpdef stop()
# Internal pipeline stages:
cdef init_ai()
cdef _process_video_pyav(AIRecognitionConfig, str original_media_name, object container)
cdef _process_video_batch(AIRecognitionConfig, list frames, list timestamps, str name, int frame_count, int total, int model_w)
cdef _append_image_frame_entries(AIRecognitionConfig, list all_frame_data, frame, str original_media_name)
cdef _finalize_image_inference(AIRecognitionConfig, list all_frame_data)
cdef split_to_tiles(frame, str media_stem, tile_size, overlap_percent)
cdef remove_overlapping_detections(list[Detection], float threshold) -> list[Detection] (delegated to engine)
Free Functions
def ai_config_from_dict(dict data) -> AIRecognitionConfig
LoaderHttpClient
class LoaderHttpClient:
cdef load_big_small_resource(str filename, str directory) -> LoadResult
cdef upload_big_small_resource(bytes content, str filename, str directory) -> LoadResult
cpdef fetch_user_ai_settings(str user_id, str bearer_token) -> object
cpdef fetch_media_path(str media_id, str bearer_token) -> object
media_hash
def compute_media_content_hash(data: bytes, virtual: bool = False) -> str
External API
None — internal component, consumed by API layer.
Data Access Patterns
- Model bytes downloaded from Loader service (HTTP)
- Converted TensorRT engines uploaded back to Loader for caching
- Video frames decoded from in-memory bytes via PyAV (
av.open(BytesIO)) - Images decoded from in-memory bytes via
cv2.imdecode - Video bytes concurrently written to persistent storage path in background thread
- All inference processing is in-memory
Implementation Details
Engine Initialization Strategy
1. Check GPU availability (pynvml, compute capability ≥ 6.1)
2. If GPU:
a. Try loading pre-built TensorRT engine from Loader
b. If fails → download ONNX model → start background conversion thread
c. Background thread: convert ONNX→TensorRT → upload to Loader → set _converted_model_bytes
d. Next init_ai() call: load from _converted_model_bytes
3. If no GPU:
a. Download ONNX model from Loader → create OnnxEngine
Preprocessing
cv2.dnn.blobFromImage: normalize 0..1, resize to model input, BGR→RGB- Batch via
np.vstack
Postprocessing
- Parse
[batch][det][x1,y1,x2,y2,conf,cls]output - Normalize coordinates to 0..1
- Convert to center-format Detection objects
- Filter by confidence threshold
- Remove overlapping detections (greedy: keep higher confidence, tie-break by lower class_id)
Large Image Tiling
- Ground Sampling Distance:
sensor_width * altitude / (focal_length * image_width) - Tile size:
METERS_IN_TILE / GSDpixels - Overlap: configurable percentage
- Tile deduplication: absolute-coordinate Detection equality across adjacent tiles
- Physical size filtering: remove detections exceeding class max_object_size_meters
Video Processing (PyAV-based — AZ-173)
- Reads video from in-memory
BytesIOviaav.open(no filesystem read needed) - Concurrently writes bytes to
save_pathfor persistent storage - Frame sampling: every Nth frame
- Annotation validity heuristics: time gap, detection count increase, spatial movement, confidence improvement
- JPEG encoding of valid frames for annotation images
Callbacks
annotation_callback(annotation, percent)— called per valid annotationstatus_callback(media_name, count)— called when all detections for a media item are complete
Caveats
ThreadPoolExecutorwith max_workers=2 limits concurrent inference (set in main.py)- Background TensorRT conversion runs in a daemon thread — may be interrupted on shutdown
init_ai()called on every detection entry point — idempotent but checks engine state each time- Video processing is sequential per video (no parallel video processing)
_tile_detectionsdict is instance-level state that persists across image calls within a singlerun_detect_imageinvocation
Dependency Graph
graph TD
inference --> constants_inf
inference --> ai_availability_status
inference --> annotation
inference --> ai_config
inference -.-> onnx_engine
inference -.-> tensorrt_engine
inference --> loader_http_client
main --> media_hash
Logging Strategy
Extensive logging via constants_inf.log: engine init status, media processing start, GSD calculation, tile splitting, detection results, size filtering decisions.