Files
detections/_docs/02_document/system-flows.md
T
2026-03-23 14:07:54 +02:00

10 KiB

Azaion.Detections — System Flows

Flow Inventory

# Flow Name Trigger Primary Components Criticality
F1 Health Check Client GET /health API, Inference Pipeline High
F2 Single Image Detection Client POST /detect API, Inference Pipeline, Engines, Domain High
F3 Media Detection (Async) Client POST /detect/{media_id} API, Inference Pipeline, Engines, Domain, Loader, Annotations High
F4 SSE Event Streaming Client GET /detect/stream API Medium
F5 Engine Initialization First detection request Inference Pipeline, Engines, Loader High
F6 TensorRT Background Conversion No pre-built TensorRT engine Inference Pipeline, Engines, Loader Medium

Flow Dependencies

Flow Depends On Shares Data With
F1 F5 (for meaningful status)
F2 F5 (engine must be ready)
F3 F5 (engine must be ready) F4 (via SSE event queues)
F4 F3 (receives events)
F5 F6 (triggers conversion if needed)
F6 F5 (triggered by init failure) F5 (provides converted bytes)

Flow F1: Health Check

Description

Client queries the service health status. Returns the current AI engine availability (None, Downloading, Converting, Enabled, Error, etc.) without triggering engine initialization.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as main.py
    participant INF as Inference
    participant STATUS as AIAvailabilityStatus

    Client->>API: GET /health
    API->>INF: get_inference()
    INF-->>API: Inference instance
    API->>STATUS: str(ai_availability_status)
    STATUS-->>API: "Enabled" / "Downloading" / etc.
    API-->>Client: HealthResponse{status, aiAvailability, errorMessage}

Error Scenarios

Error Where Detection Recovery
Inference not yet created get_inference() Exception caught Returns aiAvailability="None"

Flow F2: Single Image Detection

Description

Client uploads an image file and optionally provides config. The service runs inference synchronously (via ThreadPoolExecutor) and returns detection results.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as main.py
    participant INF as Inference
    participant ENG as Engine (ONNX/TRT)
    participant CONST as constants_inf

    Client->>API: POST /detect (file + config?)
    API->>API: Read image bytes, parse config
    API->>INF: detect_single_image(bytes, config_dict)
    INF->>INF: init_ai() (idempotent)
    INF->>INF: cv2.imdecode → preprocess
    INF->>ENG: run(input_blob)
    ENG-->>INF: raw output
    INF->>INF: postprocess → filter by threshold → remove overlaps
    INF-->>API: list[Detection]
    API->>CONST: annotations_dict[cls].name (label lookup)
    API-->>Client: list[DetectionDto]

Error Scenarios

Error Where Detection Recovery
Empty image API len(bytes)==0 400 Bad Request
Invalid image data imdecode frame is None 400 ValueError
Engine not available init_ai engine is None 503 Service Unavailable
Inference failure run/postprocess RuntimeError 422 Unprocessable Entity

Flow F3: Media Detection (Async)

Description

Client triggers detection on media files (images/video) available via the Loader service. Processing runs asynchronously. Results are streamed via SSE (F4) and optionally posted to the Annotations service.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as main.py
    participant INF as Inference
    participant ENG as Engine
    participant LDR as Loader Service
    participant ANN as Annotations Service
    participant SSE as SSE Queues

    Client->>API: POST /detect/{media_id} (config + auth headers)
    API->>API: Check _active_detections (duplicate guard)
    API-->>Client: {"status": "started"}

    Note over API: asyncio.Task created

    API->>INF: run_detect(config, on_annotation, on_status)
    loop For each media file
        INF->>INF: Read/decode media (cv2)
        INF->>INF: Preprocess (tile/batch)
        INF->>ENG: run(input_blob)
        ENG-->>INF: raw output
        INF->>INF: Postprocess + validate

        opt Valid annotation found
            INF->>API: on_annotation(annotation, percent)
            API->>SSE: DetectionEvent → all queues
            opt Auth token present
                API->>ANN: POST /annotations (detections + image)
            end
        end
    end

    INF->>API: on_status(media_name, count)
    API->>SSE: DetectionEvent(status=AIProcessed, percent=100)

Data Flow

Step From To Data Format
1 Client API media_id, config, auth tokens HTTP POST JSON + headers
2 API Inference config_dict, callbacks Python dict + callables
3 Inference Engine preprocessed batch numpy ndarray
4 Engine Inference raw detections numpy ndarray
5 Inference API (callback) Annotation + percent Python objects
6 API SSE clients DetectionEvent SSE JSON stream
7 API Annotations Service CreateAnnotationRequest HTTP POST JSON

Step 7 — Annotations POST detail:

Fired once per detection batch when auth token is present. The request to POST {ANNOTATIONS_URL}/annotations carries:

{
  "mediaId": "string",
  "source": 0,
  "videoTime": "00:01:23",
  "detections": [
    {
      "centerX": 0.56, "centerY": 0.67,
      "width": 0.25, "height": 0.22,
      "classNum": 3, "label": "ArmorVehicle",
      "confidence": 0.92
    }
  ],
  "image": "<base64 encoded frame bytes, optional>"
}

userId is not included — the Annotations service resolves the user from the JWT. The Annotations API contract also accepts description, affiliation, and combatReadiness on each detection, but Detections does not populate these.

Authorization: Bearer {accessToken} forwarded from the original client request. For long-running video, the token is auto-refreshed via POST {ANNOTATIONS_URL}/auth/refresh.

The Annotations service responds 201 on success, 400 if neither image nor mediaId provided, 404 if mediaId unknown. On the Annotations side, the saved annotation triggers: SSE notification to UI, and enqueue to the RabbitMQ sync pipeline (unless SilentDetection mode).

Error Scenarios

Error Where Detection Recovery
Duplicate media_id API _active_detections check 409 Conflict
Engine unavailable run_detect engine is None Error event pushed to SSE
Inference failure processing Exception Error event pushed to SSE, media_id cleared
Annotations POST failure _post_annotation Exception Silently caught, detection continues
Annotations 404 _post_annotation MediaId not found in Annotations DB Silently caught, detection continues
Token refresh failure TokenManager Exception on /auth/refresh Silently caught, subsequent POSTs may fail with 401
SSE queue full event broadcast QueueFull Event silently dropped for that client

Flow F4: SSE Event Streaming

Description

Client opens a persistent SSE connection. Receives real-time detection events from all active F3 media detection tasks.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as main.py
    participant Queue as asyncio.Queue

    Client->>API: GET /detect/stream
    API->>Queue: Create queue (maxsize=100)
    API->>API: Add to _event_queues

    loop Until disconnect
        Queue-->>API: await event
        API-->>Client: data: {DetectionEvent JSON}
    end

    Note over API: Client disconnects (CancelledError)
    API->>API: Remove from _event_queues

Flow F5: Engine Initialization

Description

On first detection request, the Inference class initializes the ML engine. Strategy: try TensorRT pre-built engine → fall back to ONNX → background TensorRT conversion.

Flowchart

flowchart TD
    Start([init_ai called]) --> CheckEngine{engine exists?}
    CheckEngine -->|Yes| Done([Return])
    CheckEngine -->|No| CheckBuilding{is_building_engine?}
    CheckBuilding -->|Yes| Done
    CheckBuilding -->|No| CheckConverted{_converted_model_bytes?}
    CheckConverted -->|Yes| LoadConverted[Load TensorRT from bytes]
    LoadConverted --> SetEnabled[status = ENABLED]
    SetEnabled --> Done

    CheckConverted -->|No| CheckGPU{GPU available?}
    CheckGPU -->|Yes| DownloadTRT[Download pre-built TensorRT engine]
    DownloadTRT --> TRTSuccess{Success?}
    TRTSuccess -->|Yes| LoadTRT[Create TensorRTEngine]
    LoadTRT --> SetEnabled
    TRTSuccess -->|No| DownloadONNX[Download ONNX model]
    DownloadONNX --> StartConversion[Start background thread: convert ONNX→TRT]
    StartConversion --> Done

    CheckGPU -->|No| DownloadONNX2[Download ONNX model]
    DownloadONNX2 --> LoadONNX[Create OnnxEngine]
    LoadONNX --> Done

Flow F6: TensorRT Background Conversion

Description

When no pre-built TensorRT engine exists, a background daemon thread converts the ONNX model to TensorRT, uploads the result to Loader for caching, and stores the bytes for the next init_ai call.

Sequence Diagram

sequenceDiagram
    participant INF as Inference
    participant TRT as TensorRTEngine
    participant LDR as Loader Service
    participant STATUS as AIAvailabilityStatus

    Note over INF: Background thread starts
    INF->>STATUS: set_status(CONVERTING)
    INF->>TRT: convert_from_onnx(onnx_bytes)
    TRT->>TRT: Build TensorRT engine (90% GPU memory workspace)
    TRT-->>INF: engine_bytes

    INF->>STATUS: set_status(UPLOADING)
    INF->>LDR: upload_big_small_resource(engine_bytes, filename)
    LDR-->>INF: LoadResult

    INF->>INF: _converted_model_bytes = engine_bytes
    INF->>STATUS: set_status(ENABLED)
    Note over INF: Next init_ai() call will load from _converted_model_bytes