mirror of
https://github.com/azaion/detections.git
synced 2026-04-22 21:36:32 +00:00
be4cab4fcb
- Added `/detect/video` endpoint for true streaming video detection, allowing inference to start as upload bytes arrive. - Introduced `run_detect_video_stream` method in the inference module to handle video processing from a file-like object. - Updated media hashing to include a new function for computing hashes directly from files with minimal I/O. - Enhanced documentation to reflect changes in video processing and API behavior. Made-with: Cursor
574 lines
21 KiB
Markdown
574 lines
21 KiB
Markdown
# Azaion.Detections — System Flows
|
||
|
||
## Flow Inventory
|
||
|
||
| # | Flow Name | Trigger | Primary Components | Criticality |
|
||
|---|-----------|---------|-------------------|-------------|
|
||
| F1 | Health Check | Client GET /health | API, Inference Pipeline | High |
|
||
| F2 | Upload Detection (Image/Video) | Client POST /detect | API, Inference Pipeline, Engines, Domain, Annotations | High |
|
||
| F3 | Media Detection (Async, DB-Driven) | Client POST /detect/{media_id} | API, Inference Pipeline, Engines, Domain, Annotations | High |
|
||
| F4 | SSE Event Streaming | Client GET /detect/stream | API | Medium |
|
||
| F5 | Engine Initialization | First detection request | Inference Pipeline, Engines, Loader | High |
|
||
| F6 | TensorRT Background Conversion | No pre-built TensorRT engine | Inference Pipeline, Engines, Loader | Medium |
|
||
| F7 | Streaming Video Detection | Client POST /detect/video | API, StreamingBuffer, Inference Pipeline, Engines, Domain, Annotations | High |
|
||
|
||
## Flow Dependencies
|
||
|
||
| Flow | Depends On | Shares Data With |
|
||
|------|-----------|-----------------|
|
||
| F1 | F5 (for meaningful status) | — |
|
||
| F2 | F5 (engine must be ready) | Annotations (media lifecycle) |
|
||
| F3 | F5 (engine must be ready) | F4 (via SSE event queues), Annotations (settings, media lifecycle) |
|
||
| F4 | — | F3, F7 (receives events) |
|
||
| F5 | — | F6 (triggers conversion if needed) |
|
||
| F6 | F5 (triggered by init failure) | F5 (provides converted bytes) |
|
||
| F7 | F5 (engine must be ready) | F4 (via SSE event queues), Annotations (media lifecycle) |
|
||
|
||
---
|
||
|
||
## Flow F1: Health Check
|
||
|
||
### Description
|
||
|
||
Client queries the service health status. Returns the current AI engine availability (None, Downloading, Converting, Enabled, Error, etc.) without triggering engine initialization.
|
||
|
||
### Sequence Diagram
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client
|
||
participant API as main.py
|
||
participant INF as Inference
|
||
participant STATUS as AIAvailabilityStatus
|
||
|
||
Client->>API: GET /health
|
||
API->>INF: get_inference()
|
||
INF-->>API: Inference instance
|
||
API->>STATUS: str(ai_availability_status)
|
||
STATUS-->>API: "Enabled" / "Downloading" / etc.
|
||
API-->>Client: HealthResponse{status, aiAvailability, errorMessage}
|
||
```
|
||
|
||
### Error Scenarios
|
||
|
||
| Error | Where | Detection | Recovery |
|
||
|-------|-------|-----------|----------|
|
||
| Inference not yet created | get_inference() | Exception caught | Returns aiAvailability="None" |
|
||
|
||
---
|
||
|
||
## Flow F2: Upload Detection (Image or Video)
|
||
|
||
### Description
|
||
|
||
Client uploads a media file (image or video) and optionally provides config and auth tokens. The service detects the media kind, manages the media lifecycle (hashing, storage, record creation, status tracking), runs inference synchronously (via ThreadPoolExecutor), and returns detection results.
|
||
|
||
### Sequence Diagram
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client
|
||
participant API as main.py
|
||
participant HASH as media_hash
|
||
participant ANN as Annotations Service
|
||
participant INF as Inference
|
||
participant ENG as Engine (ONNX/TRT)
|
||
participant CONST as constants_inf
|
||
|
||
Client->>API: POST /detect (file + config? + auth?)
|
||
API->>API: Read bytes, detect kind (image/video)
|
||
API->>API: Validate image data (cv2.imdecode)
|
||
|
||
opt Authenticated user
|
||
API->>HASH: compute_media_content_hash(bytes)
|
||
HASH-->>API: content_hash
|
||
API->>API: Persist file to VIDEOS_DIR/IMAGES_DIR
|
||
API->>ANN: POST /api/media (create record)
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSING)
|
||
end
|
||
|
||
alt Image
|
||
API->>INF: run_detect_image(bytes, ai_config, name, callback)
|
||
else Video
|
||
API->>INF: run_detect_video(bytes, ai_config, name, path, callback)
|
||
end
|
||
|
||
INF->>INF: init_ai() (idempotent)
|
||
INF->>ENG: process_frames(batch)
|
||
ENG-->>INF: raw output
|
||
INF->>INF: postprocess → filter → callbacks
|
||
INF-->>API: results via callback
|
||
|
||
opt Authenticated user
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSED)
|
||
end
|
||
|
||
API->>CONST: annotations_dict[cls].name (label lookup)
|
||
API-->>Client: list[DetectionDto]
|
||
```
|
||
|
||
### Error Scenarios
|
||
|
||
| Error | Where | Detection | Recovery |
|
||
|-------|-------|-----------|----------|
|
||
| Empty upload | API | len(bytes)==0 | 400 Bad Request |
|
||
| Invalid image data | cv2.imdecode | returns None | 400 Bad Request |
|
||
| Unrecognized format | _detect_upload_kind | cv2+PyAV probe fails | 400 Bad Request |
|
||
| Engine not available | init_ai | engine is None | 503 Service Unavailable |
|
||
| Inference failure | run/postprocess | RuntimeError | 422 Unprocessable Entity |
|
||
| Media record failure | _post_media_record | exception caught | Silently continues |
|
||
|
||
---
|
||
|
||
## Flow F3: Media Detection (Async, DB-Driven)
|
||
|
||
### Description
|
||
|
||
Client triggers detection on a media file resolved from the Annotations service. AI settings are fetched from the user's DB profile and merged with client overrides. Processing runs asynchronously. Results are streamed via SSE (F4) and optionally posted to the Annotations service. Media status is tracked throughout.
|
||
|
||
### Sequence Diagram
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client
|
||
participant API as main.py
|
||
participant ANN as Annotations Service
|
||
participant INF as Inference
|
||
participant ENG as Engine
|
||
participant SSE as SSE Queues
|
||
|
||
Client->>API: POST /detect/{media_id} (config? + auth headers)
|
||
API->>API: Check _active_detections (duplicate guard)
|
||
API->>ANN: GET /api/users/{user_id}/ai-settings
|
||
ANN-->>API: AI settings (merged with overrides)
|
||
API->>ANN: GET /api/media/{media_id}
|
||
ANN-->>API: media path
|
||
API-->>Client: {"status": "started"}
|
||
|
||
Note over API: asyncio.Task created
|
||
|
||
API->>API: Read file bytes from resolved path
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSING)
|
||
|
||
alt Video file
|
||
API->>INF: run_detect_video(bytes, config, name, path, callbacks)
|
||
else Image file
|
||
API->>INF: run_detect_image(bytes, config, name, callbacks)
|
||
end
|
||
|
||
loop For each valid annotation
|
||
INF->>API: on_annotation(annotation, percent)
|
||
API->>SSE: DetectionEvent → all queues
|
||
opt Auth token present
|
||
API->>ANN: POST /annotations (detections + image)
|
||
end
|
||
end
|
||
|
||
INF->>API: on_status(media_name, count)
|
||
API->>SSE: DetectionEvent(status=AIProcessed, percent=100)
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSED)
|
||
```
|
||
|
||
### Data Flow
|
||
|
||
| Step | From | To | Data | Format |
|
||
|------|------|----|------|--------|
|
||
| 1 | Client | API | media_id, config, auth tokens | HTTP POST JSON + headers |
|
||
| 2 | API | Annotations | user AI settings request | HTTP GET |
|
||
| 3 | API | Annotations | media path request | HTTP GET |
|
||
| 4 | API | Annotations | media status update (AI_PROCESSING) | HTTP PUT JSON |
|
||
| 5 | API | Inference | file bytes, config, callbacks | bytes + AIRecognitionConfig + callables |
|
||
| 6 | Inference | Engine | preprocessed batch | numpy ndarray |
|
||
| 7 | Engine | Inference | raw detections | numpy ndarray |
|
||
| 8 | Inference | API (callback) | Annotation + percent | Python objects |
|
||
| 9 | API | SSE clients | DetectionEvent | SSE JSON stream |
|
||
| 10 | API | Annotations Service | CreateAnnotationRequest | HTTP POST JSON |
|
||
| 11 | API | Annotations | media status update (AI_PROCESSED) | HTTP PUT JSON |
|
||
|
||
**Step 7 — Annotations POST detail:**
|
||
|
||
Fired once per detection batch when auth token is present. The request to `POST {ANNOTATIONS_URL}/annotations` carries:
|
||
|
||
```json
|
||
{
|
||
"mediaId": "string",
|
||
"source": 0,
|
||
"videoTime": "00:01:23",
|
||
"detections": [
|
||
{
|
||
"centerX": 0.56, "centerY": 0.67,
|
||
"width": 0.25, "height": 0.22,
|
||
"classNum": 3, "label": "ArmorVehicle",
|
||
"confidence": 0.92
|
||
}
|
||
],
|
||
"image": "<base64 encoded frame bytes, optional>"
|
||
}
|
||
```
|
||
|
||
`userId` is not included — the Annotations service resolves the user from the JWT. The Annotations API contract also accepts `description`, `affiliation`, and `combatReadiness` on each detection, but Detections does not populate these.
|
||
|
||
Authorization: `Bearer {accessToken}` forwarded from the original client request. For long-running video, the token is auto-refreshed via `POST {ANNOTATIONS_URL}/auth/refresh`.
|
||
|
||
The Annotations service responds 201 on success, 400 if neither image nor mediaId provided, 404 if mediaId unknown. On the Annotations side, the saved annotation triggers: SSE notification to UI, and enqueue to the RabbitMQ sync pipeline (unless SilentDetection mode).
|
||
|
||
### Error Scenarios
|
||
|
||
| Error | Where | Detection | Recovery |
|
||
|-------|-------|-----------|----------|
|
||
| Duplicate media_id | API | _active_detections check | 409 Conflict |
|
||
| Engine unavailable | run_detect | engine is None | Error event pushed to SSE |
|
||
| Inference failure | processing | Exception | Error event pushed to SSE, media_id cleared |
|
||
| Annotations POST failure | _post_annotation | Exception | Silently caught, detection continues |
|
||
| Annotations 404 | _post_annotation | MediaId not found in Annotations DB | Silently caught, detection continues |
|
||
| Token refresh failure | TokenManager | Exception on /auth/refresh | Silently caught, subsequent POSTs may fail with 401 |
|
||
| SSE queue full | event broadcast | QueueFull | Event silently dropped for that client |
|
||
|
||
---
|
||
|
||
## Flow F4: SSE Event Streaming
|
||
|
||
### Description
|
||
|
||
Client opens a persistent SSE connection. Receives real-time detection events from all active F3 media detection tasks.
|
||
|
||
### Sequence Diagram
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client
|
||
participant API as main.py
|
||
participant Queue as asyncio.Queue
|
||
|
||
Client->>API: GET /detect/stream
|
||
API->>Queue: Create queue (maxsize=100)
|
||
API->>API: Add to _event_queues
|
||
|
||
loop Until disconnect
|
||
Queue-->>API: await event
|
||
API-->>Client: data: {DetectionEvent JSON}
|
||
end
|
||
|
||
Note over API: Client disconnects (CancelledError)
|
||
API->>API: Remove from _event_queues
|
||
```
|
||
|
||
---
|
||
|
||
## Flow F5: Engine Initialization
|
||
|
||
### Description
|
||
|
||
On first detection request, the Inference class initializes the ML engine. Strategy: try TensorRT pre-built engine → fall back to ONNX → background TensorRT conversion.
|
||
|
||
### Flowchart
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
Start([init_ai called]) --> CheckEngine{engine exists?}
|
||
CheckEngine -->|Yes| Done([Return])
|
||
CheckEngine -->|No| CheckBuilding{is_building_engine?}
|
||
CheckBuilding -->|Yes| Done
|
||
CheckBuilding -->|No| CheckConverted{_converted_model_bytes?}
|
||
CheckConverted -->|Yes| LoadConverted[Load TensorRT from bytes]
|
||
LoadConverted --> SetEnabled[status = ENABLED]
|
||
SetEnabled --> Done
|
||
|
||
CheckConverted -->|No| CheckGPU{GPU available?}
|
||
CheckGPU -->|Yes| DownloadTRT[Download pre-built TensorRT engine]
|
||
DownloadTRT --> TRTSuccess{Success?}
|
||
TRTSuccess -->|Yes| LoadTRT[Create TensorRTEngine]
|
||
LoadTRT --> SetEnabled
|
||
TRTSuccess -->|No| DownloadONNX[Download ONNX model]
|
||
DownloadONNX --> StartConversion[Start background thread: convert ONNX→TRT]
|
||
StartConversion --> Done
|
||
|
||
CheckGPU -->|No| DownloadONNX2[Download ONNX model]
|
||
DownloadONNX2 --> LoadONNX[Create OnnxEngine]
|
||
LoadONNX --> Done
|
||
```
|
||
|
||
---
|
||
|
||
## Flow F6: TensorRT Background Conversion
|
||
|
||
### Description
|
||
|
||
When no pre-built TensorRT engine exists, a background daemon thread converts the ONNX model to TensorRT, uploads the result to Loader for caching, and stores the bytes for the next `init_ai` call.
|
||
|
||
### Sequence Diagram
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant INF as Inference
|
||
participant TRT as TensorRTEngine
|
||
participant LDR as Loader Service
|
||
participant STATUS as AIAvailabilityStatus
|
||
|
||
Note over INF: Background thread starts
|
||
INF->>STATUS: set_status(CONVERTING)
|
||
INF->>TRT: convert_from_onnx(onnx_bytes)
|
||
TRT->>TRT: Build TensorRT engine (90% GPU memory workspace)
|
||
TRT-->>INF: engine_bytes
|
||
|
||
INF->>STATUS: set_status(UPLOADING)
|
||
INF->>LDR: upload_big_small_resource(engine_bytes, filename)
|
||
LDR-->>INF: LoadResult
|
||
|
||
INF->>INF: _converted_model_bytes = engine_bytes
|
||
INF->>STATUS: set_status(ENABLED)
|
||
Note over INF: Next init_ai() call will load from _converted_model_bytes
|
||
```
|
||
|
||
---
|
||
|
||
## Flow F7: Streaming Video Detection (AZ-178)
|
||
|
||
### Description
|
||
|
||
Client uploads a video file as raw binary and gets near-real-time detections via SSE as frames are decoded — **during** the upload, not after. The endpoint bypasses FastAPI's multipart buffering entirely, using `request.stream()` to read the HTTP body chunk-by-chunk. Each chunk is simultaneously written to a temp file (via `StreamingBuffer`) and read by PyAV in a background inference thread. First detections appear within ~500ms of the first decodable frames arriving at the API. Peak memory usage is bounded by the model batch size × frame size (tens of MB), regardless of video file size.
|
||
|
||
### Activity Diagram — Full Data Pipeline
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
subgraph CLIENT ["Client (Browser)"]
|
||
C1([Open SSE connection<br/>GET /detect/stream])
|
||
C2([Start upload<br/>POST /detect/video])
|
||
C3([Receive SSE events<br/>during upload])
|
||
end
|
||
|
||
subgraph API ["API Layer — main.py (async event loop)"]
|
||
A1[Parse headers:<br/>X-Filename, X-Config, Auth]
|
||
A2{Valid video<br/>extension?}
|
||
A3[Create StreamingBuffer<br/>backed by temp file]
|
||
A4[Start inference thread<br/>via run_in_executor]
|
||
A5["Read chunk from<br/>request.stream()"]
|
||
A6[buffer.append chunk<br/>via run_in_executor]
|
||
A7{More chunks?}
|
||
A8[buffer.close_writer<br/>signal EOF]
|
||
A9[Compute content hash<br/>from temp file on disk<br/>reads only 3 KB]
|
||
A10[Rename temp file →<br/>permanent storage path]
|
||
A11[Create media record<br/>POST /api/media]
|
||
A12["Return {status: started,<br/>mediaId: hash}"]
|
||
A13[Register background task<br/>to await inference completion]
|
||
end
|
||
|
||
subgraph BUF ["StreamingBuffer — streaming_buffer.py"]
|
||
B1[/"Temp file on disk<br/>(single file, two handles)"/]
|
||
B2["append(data):<br/>write + flush + notify"]
|
||
B3["read(size):<br/>block if ahead of writer<br/>return available bytes"]
|
||
B4["seek(offset, whence):<br/>SEEK_END blocks until EOF"]
|
||
B5["close_writer():<br/>set EOF flag, notify all"]
|
||
end
|
||
|
||
subgraph INF ["Inference Thread — inference.pyx"]
|
||
I1["av.open(buffer)<br/>PyAV reads via buffer.read()"]
|
||
I2{Moov at start?}
|
||
I3[Decode frames immediately<br/>~500ms latency]
|
||
I4["Blocks on seek(0, 2)<br/>until upload completes"]
|
||
I5["Decode batch of frames<br/>(frame_period_recognition sampling)"]
|
||
I6["engine.process_frames(batch)"]
|
||
I7{Detections found?}
|
||
I8["on_annotation callback<br/>→ SSE event broadcast"]
|
||
I9{More frames?}
|
||
I10[send_detection_status]
|
||
end
|
||
|
||
C2 --> A1
|
||
A1 --> A2
|
||
A2 -->|No| ERR([400 Bad Request])
|
||
A2 -->|Yes| A3
|
||
A3 --> A4
|
||
A4 --> A5
|
||
|
||
A5 --> A6
|
||
A6 --> B2
|
||
B2 --> B1
|
||
A6 --> A7
|
||
A7 -->|Yes| A5
|
||
A7 -->|No| A8
|
||
A8 --> B5
|
||
|
||
A8 --> A9
|
||
A9 --> A10
|
||
A10 --> A11
|
||
A11 --> A12
|
||
A12 --> A13
|
||
|
||
A4 -.->|background thread| I1
|
||
I1 --> I2
|
||
I2 -->|"Yes (faststart MP4,<br/>MKV, WebM)"| I3
|
||
I2 -->|"No (standard MP4)"| I4
|
||
I4 --> I3
|
||
I3 --> I5
|
||
I5 --> I6
|
||
I6 --> I7
|
||
I7 -->|Yes| I8
|
||
I8 --> C3
|
||
I7 -->|No| I9
|
||
I8 --> I9
|
||
I9 -->|Yes| I5
|
||
I9 -->|No| I10
|
||
|
||
B3 -.->|"PyAV calls<br/>read()"| I1
|
||
|
||
style BUF fill:#e8f4fd,stroke:#2196F3
|
||
style INF fill:#fce4ec,stroke:#e91e63
|
||
style API fill:#e8f5e9,stroke:#4CAF50
|
||
style CLIENT fill:#fff3e0,stroke:#FF9800
|
||
```
|
||
|
||
### Sequence Diagram — Concurrent Timeline
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client
|
||
participant SSE as SSE /detect/stream
|
||
participant API as main.py (async)
|
||
participant BUF as StreamingBuffer
|
||
participant INF as Inference Thread
|
||
participant PyAV
|
||
participant ENG as Engine (ONNX/TRT)
|
||
participant ANN as Annotations Service
|
||
|
||
Client->>SSE: GET /detect/stream (open)
|
||
Client->>API: POST /detect/video (raw body, streaming)
|
||
API->>API: Parse X-Filename, X-Config, Auth headers
|
||
API->>BUF: Create StreamingBuffer (temp file)
|
||
API->>INF: Start in executor thread
|
||
|
||
par Upload stream (async event loop) and Inference (background thread)
|
||
loop Each HTTP body chunk (~8-64 KB)
|
||
API->>BUF: append(chunk) → write + flush + notify
|
||
end
|
||
|
||
INF->>PyAV: av.open(buffer)
|
||
Note over PyAV,BUF: PyAV calls buffer.read().<br/>Blocks when no data yet.<br/>Resumes as chunks arrive.
|
||
|
||
loop Each decodable frame batch
|
||
PyAV->>BUF: read(size) → returns available bytes
|
||
BUF-->>PyAV: video data
|
||
PyAV-->>INF: decoded frames (BGR numpy)
|
||
INF->>ENG: process_frames(batch)
|
||
ENG-->>INF: detections
|
||
opt Valid detections
|
||
INF->>SSE: DetectionEvent (via callback)
|
||
SSE-->>Client: data: {...detections...}
|
||
end
|
||
end
|
||
end
|
||
|
||
API->>BUF: close_writer() → EOF signal
|
||
Note over INF: PyAV reads remaining frames, finishes
|
||
|
||
API->>API: compute_media_content_hash_from_file(temp file) — reads 3 KB
|
||
API->>API: Rename temp file → {hash}{ext}
|
||
|
||
opt Authenticated user
|
||
API->>ANN: POST /api/media (create record)
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSING)
|
||
end
|
||
|
||
API-->>Client: {"status": "started", "mediaId": "abc123"}
|
||
|
||
Note over API: Background task awaits inference completion
|
||
|
||
INF-->>API: Inference completes
|
||
opt Authenticated user
|
||
API->>ANN: PUT /api/media/{id}/status (AI_PROCESSED)
|
||
end
|
||
API->>SSE: DetectionEvent(status=AIProcessed, percent=100)
|
||
SSE-->>Client: data: {...status: AIProcessed...}
|
||
```
|
||
|
||
### Flowchart — StreamingBuffer Read/Write Coordination
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
subgraph WRITER ["Writer (HTTP handler thread)"]
|
||
W1["Receive HTTP chunk"]
|
||
W2["Acquire Condition lock"]
|
||
W3["file.write(chunk) + flush()"]
|
||
W4["_written += len(chunk)"]
|
||
W5["notify_all() → wake reader"]
|
||
W6["Release lock"]
|
||
W7{More chunks?}
|
||
W8["close_writer():<br/>set _eof = True<br/>notify_all()"]
|
||
end
|
||
|
||
subgraph READER ["Reader (PyAV / Inference thread)"]
|
||
R1["PyAV calls read(size)"]
|
||
R2["Acquire Condition lock"]
|
||
R3{"_written > pos?"}
|
||
R4["cond.wait()<br/>(releases lock, sleeps)"]
|
||
R5["Calculate to_read =<br/>min(size, available)"]
|
||
R6["Release lock"]
|
||
R7["file.read(to_read)<br/>(outside lock)"]
|
||
R8["Return bytes to PyAV"]
|
||
R9{"_eof and<br/>available == 0?"}
|
||
R10["Return b'' (EOF)"]
|
||
end
|
||
|
||
W1 --> W2 --> W3 --> W4 --> W5 --> W6 --> W7
|
||
W7 -->|Yes| W1
|
||
W7 -->|No| W8
|
||
|
||
R1 --> R2 --> R3
|
||
R3 -->|Yes| R5
|
||
R3 -->|No| R9
|
||
R9 -->|Yes| R10
|
||
R9 -->|No| R4
|
||
R4 -.->|"Woken by<br/>notify_all()"| R3
|
||
R5 --> R6 --> R7 --> R8
|
||
|
||
style WRITER fill:#e8f5e9,stroke:#4CAF50
|
||
style READER fill:#fce4ec,stroke:#e91e63
|
||
```
|
||
|
||
### Data Flow
|
||
|
||
| Step | From | To | Data | Format |
|
||
|------|------|----|------|--------|
|
||
| 1 | Client | API | Raw video bytes (streaming) | HTTP POST body chunks |
|
||
| 2 | API | StreamingBuffer | Byte chunks (8-64 KB each) | `append(bytes)` |
|
||
| 3 | StreamingBuffer | Temp file | Same chunks | `file.write()` + `flush()` |
|
||
| 4 | StreamingBuffer | PyAV (Inference thread) | Byte segments on demand | `read(size)` blocks when ahead |
|
||
| 5 | PyAV | Inference | Decoded BGR numpy frames | ndarray |
|
||
| 6 | Inference | Engine | Preprocessed batch | ndarray |
|
||
| 7 | Engine | Inference | Raw detections | ndarray |
|
||
| 8 | Inference | SSE clients | DetectionEvent | SSE JSON via `loop.call_soon_threadsafe` |
|
||
| 9 | API | Temp file | Content hash (3 KB read) | `compute_media_content_hash_from_file` |
|
||
| 10 | API | Disk | Rename temp → permanent path | `os.rename` |
|
||
| 11 | API | Annotations Service | Media record + status | HTTP POST/PUT JSON |
|
||
|
||
### Memory Profile (2 GB video)
|
||
|
||
| Stage | Current (F2) | Streaming (F7) |
|
||
|-------|-------------|----------------|
|
||
| Starlette buffering | 2 GB (SpooledTempFile) | 0 (raw stream) |
|
||
| `file.read()` / chunk buffer | 2 GB (full bytes) | ~64 KB (one chunk) |
|
||
| BytesIO for PyAV | 2 GB (copy) | 0 (reads from buffer) |
|
||
| Writer thread | 2 GB (same ref) | 0 (no separate writer) |
|
||
| **Peak process RAM** | **~4+ GB** | **~50 MB** (batch × frame) |
|
||
|
||
### Format Compatibility
|
||
|
||
| Container Format | Moov Location | Streaming Behavior |
|
||
|-----------------|--------------|-------------------|
|
||
| MP4 (faststart) | Beginning | True streaming — first frame decoded in ~500ms |
|
||
| MKV / WebM | Beginning | True streaming — first frame decoded in ~500ms |
|
||
| MP4 (standard) | End of file | Graceful degradation — `seek(0, 2)` blocks until upload completes, then decoding starts |
|
||
| MOV, AVI | Varies | Depends on header location |
|
||
|
||
### Error Scenarios
|
||
|
||
| Error | Where | Detection | Recovery |
|
||
|-------|-------|-----------|----------|
|
||
| Non-video extension | API | Extension check | 400 Bad Request |
|
||
| Client disconnects mid-upload | request.stream() | Exception | buffer.close_writer() called in except, inference thread gets EOF |
|
||
| Engine unavailable | Inference thread | engine is None | Error event via SSE |
|
||
| PyAV decode failure | Inference thread | Exception | Error event via SSE, media status set to Error |
|
||
| Disk full | StreamingBuffer.append | OSError | Propagated to API handler |
|
||
| Annotations service down | _post_media_record | Exception caught | Silently continues, detections still work |
|