Made-with: Cursor
10 KiB
Azaion.Detections — System Flows
Flow Inventory
| # | Flow Name | Trigger | Primary Components | Criticality |
|---|---|---|---|---|
| F1 | Health Check | Client GET /health | API, Inference Pipeline | High |
| F2 | Single Image Detection | Client POST /detect | API, Inference Pipeline, Engines, Domain | High |
| F3 | Media Detection (Async) | Client POST /detect/{media_id} | API, Inference Pipeline, Engines, Domain, Loader, Annotations | High |
| F4 | SSE Event Streaming | Client GET /detect/stream | API | Medium |
| F5 | Engine Initialization | First detection request | Inference Pipeline, Engines, Loader | High |
| F6 | TensorRT Background Conversion | No pre-built TensorRT engine | Inference Pipeline, Engines, Loader | Medium |
Flow Dependencies
| Flow | Depends On | Shares Data With |
|---|---|---|
| F1 | F5 (for meaningful status) | — |
| F2 | F5 (engine must be ready) | — |
| F3 | F5 (engine must be ready) | F4 (via SSE event queues) |
| F4 | — | F3 (receives events) |
| F5 | — | F6 (triggers conversion if needed) |
| F6 | F5 (triggered by init failure) | F5 (provides converted bytes) |
Flow F1: Health Check
Description
Client queries the service health status. Returns the current AI engine availability (None, Downloading, Converting, Enabled, Error, etc.) without triggering engine initialization.
Sequence Diagram
sequenceDiagram
participant Client
participant API as main.py
participant INF as Inference
participant STATUS as AIAvailabilityStatus
Client->>API: GET /health
API->>INF: get_inference()
INF-->>API: Inference instance
API->>STATUS: str(ai_availability_status)
STATUS-->>API: "Enabled" / "Downloading" / etc.
API-->>Client: HealthResponse{status, aiAvailability, errorMessage}
Error Scenarios
| Error | Where | Detection | Recovery |
|---|---|---|---|
| Inference not yet created | get_inference() | Exception caught | Returns aiAvailability="None" |
Flow F2: Single Image Detection
Description
Client uploads an image file and optionally provides config. The service runs inference synchronously (via ThreadPoolExecutor) and returns detection results.
Sequence Diagram
sequenceDiagram
participant Client
participant API as main.py
participant INF as Inference
participant ENG as Engine (ONNX/TRT)
participant CONST as constants_inf
Client->>API: POST /detect (file + config?)
API->>API: Read image bytes, parse config
API->>INF: detect_single_image(bytes, config_dict)
INF->>INF: init_ai() (idempotent)
INF->>INF: cv2.imdecode → preprocess
INF->>ENG: run(input_blob)
ENG-->>INF: raw output
INF->>INF: postprocess → filter by threshold → remove overlaps
INF-->>API: list[Detection]
API->>CONST: annotations_dict[cls].name (label lookup)
API-->>Client: list[DetectionDto]
Error Scenarios
| Error | Where | Detection | Recovery |
|---|---|---|---|
| Empty image | API | len(bytes)==0 | 400 Bad Request |
| Invalid image data | imdecode | frame is None | 400 ValueError |
| Engine not available | init_ai | engine is None | 503 Service Unavailable |
| Inference failure | run/postprocess | RuntimeError | 422 Unprocessable Entity |
Flow F3: Media Detection (Async)
Description
Client triggers detection on media files (images/video) available via the Loader service. Processing runs asynchronously. Results are streamed via SSE (F4) and optionally posted to the Annotations service.
Sequence Diagram
sequenceDiagram
participant Client
participant API as main.py
participant INF as Inference
participant ENG as Engine
participant LDR as Loader Service
participant ANN as Annotations Service
participant SSE as SSE Queues
Client->>API: POST /detect/{media_id} (config + auth headers)
API->>API: Check _active_detections (duplicate guard)
API-->>Client: {"status": "started"}
Note over API: asyncio.Task created
API->>INF: run_detect(config, on_annotation, on_status)
loop For each media file
INF->>INF: Read/decode media (cv2)
INF->>INF: Preprocess (tile/batch)
INF->>ENG: run(input_blob)
ENG-->>INF: raw output
INF->>INF: Postprocess + validate
opt Valid annotation found
INF->>API: on_annotation(annotation, percent)
API->>SSE: DetectionEvent → all queues
opt Auth token present
API->>ANN: POST /annotations (detections + image)
end
end
end
INF->>API: on_status(media_name, count)
API->>SSE: DetectionEvent(status=AIProcessed, percent=100)
Data Flow
| Step | From | To | Data | Format |
|---|---|---|---|---|
| 1 | Client | API | media_id, config, auth tokens | HTTP POST JSON + headers |
| 2 | API | Inference | config_dict, callbacks | Python dict + callables |
| 3 | Inference | Engine | preprocessed batch | numpy ndarray |
| 4 | Engine | Inference | raw detections | numpy ndarray |
| 5 | Inference | API (callback) | Annotation + percent | Python objects |
| 6 | API | SSE clients | DetectionEvent | SSE JSON stream |
| 7 | API | Annotations Service | CreateAnnotationRequest | HTTP POST JSON |
Step 7 — Annotations POST detail:
Fired once per detection batch when auth token is present. The request to POST {ANNOTATIONS_URL}/annotations carries:
{
"mediaId": "string",
"source": 0,
"videoTime": "00:01:23",
"detections": [
{
"centerX": 0.56, "centerY": 0.67,
"width": 0.25, "height": 0.22,
"classNum": 3, "label": "ArmorVehicle",
"confidence": 0.92
}
],
"image": "<base64 encoded frame bytes, optional>"
}
userId is not included — the Annotations service resolves the user from the JWT. The Annotations API contract also accepts description, affiliation, and combatReadiness on each detection, but Detections does not populate these.
Authorization: Bearer {accessToken} forwarded from the original client request. For long-running video, the token is auto-refreshed via POST {ANNOTATIONS_URL}/auth/refresh.
The Annotations service responds 201 on success, 400 if neither image nor mediaId provided, 404 if mediaId unknown. On the Annotations side, the saved annotation triggers: SSE notification to UI, and enqueue to the RabbitMQ sync pipeline (unless SilentDetection mode).
Error Scenarios
| Error | Where | Detection | Recovery |
|---|---|---|---|
| Duplicate media_id | API | _active_detections check | 409 Conflict |
| Engine unavailable | run_detect | engine is None | Error event pushed to SSE |
| Inference failure | processing | Exception | Error event pushed to SSE, media_id cleared |
| Annotations POST failure | _post_annotation | Exception | Silently caught, detection continues |
| Annotations 404 | _post_annotation | MediaId not found in Annotations DB | Silently caught, detection continues |
| Token refresh failure | TokenManager | Exception on /auth/refresh | Silently caught, subsequent POSTs may fail with 401 |
| SSE queue full | event broadcast | QueueFull | Event silently dropped for that client |
Flow F4: SSE Event Streaming
Description
Client opens a persistent SSE connection. Receives real-time detection events from all active F3 media detection tasks.
Sequence Diagram
sequenceDiagram
participant Client
participant API as main.py
participant Queue as asyncio.Queue
Client->>API: GET /detect/stream
API->>Queue: Create queue (maxsize=100)
API->>API: Add to _event_queues
loop Until disconnect
Queue-->>API: await event
API-->>Client: data: {DetectionEvent JSON}
end
Note over API: Client disconnects (CancelledError)
API->>API: Remove from _event_queues
Flow F5: Engine Initialization
Description
On first detection request, the Inference class initializes the ML engine. Strategy: try TensorRT pre-built engine → fall back to ONNX → background TensorRT conversion.
Flowchart
flowchart TD
Start([init_ai called]) --> CheckEngine{engine exists?}
CheckEngine -->|Yes| Done([Return])
CheckEngine -->|No| CheckBuilding{is_building_engine?}
CheckBuilding -->|Yes| Done
CheckBuilding -->|No| CheckConverted{_converted_model_bytes?}
CheckConverted -->|Yes| LoadConverted[Load TensorRT from bytes]
LoadConverted --> SetEnabled[status = ENABLED]
SetEnabled --> Done
CheckConverted -->|No| CheckGPU{GPU available?}
CheckGPU -->|Yes| DownloadTRT[Download pre-built TensorRT engine]
DownloadTRT --> TRTSuccess{Success?}
TRTSuccess -->|Yes| LoadTRT[Create TensorRTEngine]
LoadTRT --> SetEnabled
TRTSuccess -->|No| DownloadONNX[Download ONNX model]
DownloadONNX --> StartConversion[Start background thread: convert ONNX→TRT]
StartConversion --> Done
CheckGPU -->|No| DownloadONNX2[Download ONNX model]
DownloadONNX2 --> LoadONNX[Create OnnxEngine]
LoadONNX --> Done
Flow F6: TensorRT Background Conversion
Description
When no pre-built TensorRT engine exists, a background daemon thread converts the ONNX model to TensorRT, uploads the result to Loader for caching, and stores the bytes for the next init_ai call.
Sequence Diagram
sequenceDiagram
participant INF as Inference
participant TRT as TensorRTEngine
participant LDR as Loader Service
participant STATUS as AIAvailabilityStatus
Note over INF: Background thread starts
INF->>STATUS: set_status(CONVERTING)
INF->>TRT: convert_from_onnx(onnx_bytes)
TRT->>TRT: Build TensorRT engine (90% GPU memory workspace)
TRT-->>INF: engine_bytes
INF->>STATUS: set_status(UPLOADING)
INF->>LDR: upload_big_small_resource(engine_bytes, filename)
LDR-->>INF: LoadResult
INF->>INF: _converted_model_bytes = engine_bytes
INF->>STATUS: set_status(ENABLED)
Note over INF: Next init_ai() call will load from _converted_model_bytes