Refactor inference engine and task management: Remove obsolete inference engine and ONNX engine files, update inference processing to utilize batch handling, and enhance task management structure in documentation. Adjust paths for task specifications to align with new directory organization.

This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-28 01:04:28 +02:00
parent 1e4ef299f9
commit 5be53739cd
60 changed files with 111875 additions and 208 deletions
+9 -5
View File
@@ -16,7 +16,7 @@ If you want to run a specific skill directly (without the orchestrator), use the
/problem — interactive problem gathering → _docs/00_problem/
/research — solution drafts → _docs/01_solution/
/plan — architecture, components, tests → _docs/02_document/
/decompose — atomic task specs → _docs/02_tasks/
/decompose — atomic task specs → _docs/02_tasks/todo/
/implement — batched parallel implementation → _docs/03_implementation/
/deploy — containerization, CI/CD, observability → _docs/04_deploy/
```
@@ -122,7 +122,7 @@ Bottom-up codebase documentation. Analyzes existing code from modules through co
2. /plan — architecture, data model, deployment, components, risks, tests, Jira epics → _docs/02_document/
3. /decompose — atomic task specs + dependency table → _docs/02_tasks/
3. /decompose — atomic task specs + dependency table → _docs/02_tasks/todo/
4. /implement — batched parallel agents, code review, commit per batch → _docs/03_implementation/
```
@@ -150,7 +150,7 @@ Or just use `/autopilot` to run steps 0-5 automatically.
| **problem** | "problem", "define problem", "new project" | `_docs/00_problem/` |
| **research** | "research", "investigate" | `_docs/01_solution/` |
| **plan** | "plan", "decompose solution" | `_docs/02_document/` |
| **decompose** | "decompose", "task decomposition" | `_docs/02_tasks/` |
| **decompose** | "decompose", "task decomposition" | `_docs/02_tasks/todo/` |
| **implement** | "implement", "start implementation" | `_docs/03_implementation/` |
| **code-review** | "code review", "review code" | Verdict: PASS / FAIL / PASS_WITH_WARNINGS |
| **refactor** | "refactor", "improve code" | `_docs/04_refactoring/` |
@@ -185,7 +185,11 @@ _docs/
│ ├── deployment/ — containerization, CI/CD, environments, observability, procedures
│ ├── diagrams/
│ └── FINAL_report.md
├── 02_tasks/ — [JIRA-ID]_[name].md + _dependencies_table.md
├── 02_tasks/ — task workflow folders + _dependencies_table.md
│ ├── _dependencies_table.md — cross-task dependency graph (root level)
│ ├── backlog/ — parked tasks (not scheduled for implementation)
│ ├── todo/ — tasks ready for implementation
│ └── done/ — completed tasks (moved here by /implement)
├── 03_implementation/ — batch reports, FINAL report
├── 04_deploy/ — containerization, CI/CD, environments, observability, procedures, scripts
├── 04_refactoring/ — baseline, discovery, analysis, execution, hardening
@@ -208,4 +212,4 @@ _docs/
/decompose @_docs/02_document/components/03_parser/description.md
```
Appends tasks for that component to `_docs/02_tasks/` without running bootstrap or cross-verification.
Appends tasks for that component to `_docs/02_tasks/todo/` without running bootstrap or cross-verification.
+2 -2
View File
@@ -1,7 +1,7 @@
---
name: implementer
description: |
Implements a single task from its spec file. Use when implementing tasks from _docs/02_tasks/.
Implements a single task from its spec file. Use when implementing tasks from _docs/02_tasks/todo/.
Reads the task spec, analyzes the codebase, implements the feature with tests, and verifies acceptance criteria.
Launched by the /implement skill as a subagent.
category: build
@@ -13,7 +13,7 @@ You are a professional software developer implementing a single task.
## Input
You receive from the `/implement` orchestrator:
- Path to a task spec file (e.g., `_docs/02_tasks/[JIRA-ID]_[short_name].md`)
- Path to a task spec file (e.g., `_docs/02_tasks/todo/[JIRA-ID]_[short_name].md`)
- Files OWNED (exclusive write access — only you may modify these)
- Files READ-ONLY (shared interfaces, types — read but do not modify)
- Files FORBIDDEN (other agents' owned files — do not touch)
+7
View File
@@ -0,0 +1,7 @@
# Project Management
- This project uses **Jira ONLY** for work item tracking (NOT Azure DevOps)
- Jira project key: `AZ` (AZAION)
- Jira cloud ID: `1598226f-845f-4705-bcd1-5ed0c82d6119`
- Use the `user-Jira-MCP-Server` MCP server for all Jira operations
- Never use Azure DevOps MCP for this project's work items
@@ -44,23 +44,23 @@ This step applies when the codebase was documented via the `/document` skill. Te
---
**Step 3 — Decompose Tests**
Condition: `_docs/02_document/tests/traceability-matrix.md` exists AND workspace contains source code files AND the autopilot state shows Document was run AND (`_docs/02_tasks/` does not exist or has no task files)
Condition: `_docs/02_document/tests/traceability-matrix.md` exists AND workspace contains source code files AND the autopilot state shows Document was run AND (`_docs/02_tasks/todo/` does not exist or has no task files)
Action: Read and execute `.cursor/skills/decompose/SKILL.md` in **tests-only mode** (pass `_docs/02_document/tests/` as input). The decompose skill will:
1. Run Step 1t (test infrastructure bootstrap)
2. Run Step 3 (blackbox test task decomposition)
3. Run Step 4 (cross-verification against test coverage)
If `_docs/02_tasks/` has some task files already, the decompose skill's resumability handles it.
If `_docs/02_tasks/todo/` has some task files already, the decompose skill's resumability handles it.
---
**Step 4 — Implement Tests**
Condition: `_docs/02_tasks/` contains task files AND `_dependencies_table.md` exists AND the autopilot state shows Step 3 (Decompose Tests) is completed AND `_docs/03_implementation/FINAL_implementation_report.md` does not exist
Condition: `_docs/02_tasks/todo/` contains task files AND `_docs/02_tasks/_dependencies_table.md` exists AND the autopilot state shows Step 3 (Decompose Tests) is completed AND `_docs/03_implementation/FINAL_implementation_report.md` does not exist
Action: Read and execute `.cursor/skills/implement/SKILL.md`
The implement skill reads test tasks from `_docs/02_tasks/` and implements them.
The implement skill reads test tasks from `_docs/02_tasks/todo/` and implements them.
If `_docs/03_implementation/` has batch reports, the implement skill detects completed tasks and continues.
@@ -91,16 +91,16 @@ Condition: the autopilot state shows Step 6 (Refactor) is completed AND the auto
Action: Read and execute `.cursor/skills/new-task/SKILL.md`
The new-task skill interactively guides the user through defining new functionality. It loops until the user is done adding tasks. New task files are written to `_docs/02_tasks/`.
The new-task skill interactively guides the user through defining new functionality. It loops until the user is done adding tasks. New task files are written to `_docs/02_tasks/todo/`.
---
**Step 8 — Implement**
Condition: the autopilot state shows Step 7 (New Task) is completed AND `_docs/03_implementation/` does not contain a FINAL report covering the new tasks (check state for distinction between test implementation and feature implementation)
Condition: the autopilot state shows Step 7 (New Task) is completed AND `_docs/02_tasks/todo/` contains task files AND `_docs/03_implementation/` does not contain a FINAL report covering the new tasks (check state for distinction between test implementation and feature implementation)
Action: Read and execute `.cursor/skills/implement/SKILL.md`
The implement skill reads the new tasks from `_docs/02_tasks/` and implements them. Tasks already implemented in Step 4 are skipped (the implement skill tracks completed tasks in batch reports).
The implement skill reads the new tasks from `_docs/02_tasks/todo/` and implements them. Tasks already implemented in Step 4 are in `_docs/02_tasks/done/`.
If `_docs/03_implementation/` has batch reports from this phase, the implement skill detects completed tasks and continues.
+3 -3
View File
@@ -110,16 +110,16 @@ If the project IS a UI project → present using Choose format:
---
**Step 5 — Decompose**
Condition: `_docs/02_document/` contains `architecture.md` AND `_docs/02_document/components/` has at least one component AND `_docs/02_tasks/` does not exist or has no task files (excluding `_dependencies_table.md`)
Condition: `_docs/02_document/` contains `architecture.md` AND `_docs/02_document/components/` has at least one component AND `_docs/02_tasks/todo/` does not exist or has no task files
Action: Read and execute `.cursor/skills/decompose/SKILL.md`
If `_docs/02_tasks/` has some task files already, the decompose skill's resumability handles it.
If `_docs/02_tasks/todo/` has some task files already, the decompose skill's resumability handles it.
---
**Step 6 — Implement**
Condition: `_docs/02_tasks/` contains task files AND `_dependencies_table.md` exists AND `_docs/03_implementation/FINAL_implementation_report.md` does not exist
Condition: `_docs/02_tasks/todo/` contains task files AND `_docs/02_tasks/_dependencies_table.md` exists AND `_docs/03_implementation/FINAL_implementation_report.md` does not exist
Action: Read and execute `.cursor/skills/implement/SKILL.md`
+1 -1
View File
@@ -159,7 +159,7 @@ The `/implement` skill invokes this skill after each batch completes:
| Input | Type | Source | Required |
|-------|------|--------|----------|
| `task_specs` | list of file paths | Task `.md` files from `_docs/02_tasks/` for the current batch | Yes |
| `task_specs` | list of file paths | Task `.md` files from `_docs/02_tasks/todo/` for the current batch | Yes |
| `changed_files` | list of file paths | Files modified by implementer agents (from `git diff` or agent reports) | Yes |
| `batch_number` | integer | Current batch number (for report naming) | Yes |
| `project_restrictions` | file path | `_docs/00_problem/restrictions.md` | If exists |
+27 -17
View File
@@ -22,7 +22,7 @@ Decompose planned components into atomic, implementable task specs with a bootst
- **Atomic tasks**: each task does one thing; if it exceeds 5 complexity points, split it
- **Behavioral specs, not implementation plans**: describe what the system should do, not how to build it
- **Flat structure**: all tasks are Jira-ID-prefixed files in TASKS_DIR — no component subdirectories
- **Flat structure**: all tasks are Jira-ID-prefixed files in TASKS_DIR (`todo/`) — no component subdirectories within workflow folders
- **Save immediately**: write artifacts to disk after each task; never accumulate unsaved work
- **Jira inline**: create Jira ticket immediately after writing each task file
- **Ask, don't assume**: when requirements are ambiguous, ask the user before proceeding
@@ -34,20 +34,26 @@ Determine the operating mode based on invocation before any other logic runs.
**Default** (no explicit input file provided):
- DOCUMENT_DIR: `_docs/02_document/`
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/todo/`
- TASKS_ROOT: `_docs/02_tasks/`
- DEPS_TABLE: `_docs/02_tasks/_dependencies_table.md`
- Reads from: `_docs/00_problem/`, `_docs/01_solution/`, DOCUMENT_DIR
- Runs Step 1 (bootstrap) + Step 2 (all components) + Step 3 (blackbox tests) + Step 4 (cross-verification)
**Single component mode** (provided file is within `_docs/02_document/` and inside a `components/` subdirectory):
- DOCUMENT_DIR: `_docs/02_document/`
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/todo/`
- TASKS_ROOT: `_docs/02_tasks/`
- DEPS_TABLE: `_docs/02_tasks/_dependencies_table.md`
- Derive component number and component name from the file path
- Ask user for the parent Epic ID
- Runs Step 2 (that component only, appending to existing task numbering)
**Tests-only mode** (provided file/directory is within `tests/`, or `DOCUMENT_DIR/tests/` exists and input explicitly requests test decomposition):
- DOCUMENT_DIR: `_docs/02_document/`
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/todo/`
- TASKS_ROOT: `_docs/02_tasks/`
- DEPS_TABLE: `_docs/02_tasks/_dependencies_table.md`
- TESTS_DIR: `DOCUMENT_DIR/tests/`
- Reads from: `_docs/00_problem/`, `_docs/01_solution/`, TESTS_DIR
- Runs Step 1t (test infrastructure bootstrap) + Step 3 (blackbox test decomposition) + Step 4 (cross-verification against test coverage)
@@ -99,8 +105,8 @@ Announce the detected mode and resolved paths to the user before proceeding.
**Default:**
1. DOCUMENT_DIR contains `architecture.md` and `components/` — **STOP if missing**
2. Create TASKS_DIR if it does not exist
3. If TASKS_DIR already contains task files, ask user: **resume from last checkpoint or start fresh?**
2. Create TASKS_ROOT subfolders (`todo/`, `backlog/`, `done/`) if they do not exist
3. If TASKS_DIR (`todo/`) already contains task files, ask user: **resume from last checkpoint or start fresh?**
**Single component mode:**
1. The provided component file exists and is non-empty — **STOP if missing**
@@ -108,22 +114,26 @@ Announce the detected mode and resolved paths to the user before proceeding.
**Tests-only mode:**
1. `TESTS_DIR/blackbox-tests.md` exists and is non-empty — **STOP if missing**
2. `TESTS_DIR/environment.md` exists — **STOP if missing**
3. Create TASKS_DIR if it does not exist
4. If TASKS_DIR already contains task files, ask user: **resume from last checkpoint or start fresh?**
3. Create TASKS_ROOT subfolders (`todo/`, `backlog/`, `done/`) if they do not exist
4. If TASKS_DIR (`todo/`) already contains task files, ask user: **resume from last checkpoint or start fresh?**
## Artifact Management
### Directory Structure
```
TASKS_DIR/
├── [JIRA-ID]_initial_structure.md
├── [JIRA-ID]_[short_name].md
├── [JIRA-ID]_[short_name].md
├── ...
└── _dependencies_table.md
_docs/02_tasks/
├── _dependencies_table.md
├── backlog/
├── todo/
│ ├── [JIRA-ID]_initial_structure.md
│ ├── [JIRA-ID]_[short_name].md
│ └── ...
└── done/
```
New task files are written to `todo/`. The `/implement` skill moves them to `done/` after successful implementation. Users can move tasks to `backlog/` to defer them.
**Naming convention**: Each task file is initially saved with a temporary numeric prefix (`[##]_[short_name].md`). After creating the Jira ticket, rename the file to use the Jira ticket ID as prefix (`[JIRA-ID]_[short_name].md`). For example: `01_initial_structure.md``AZ-42_initial_structure.md`.
### Save Timing
@@ -138,9 +148,9 @@ TASKS_DIR/
### Resumability
If TASKS_DIR already contains task files:
If task files already exist (in `todo/`, `backlog/`, or `done/`):
1. List existing `*_*.md` files (excluding `_dependencies_table.md`) and count them
1. List existing `*_*.md` files across all three subfolders (excluding `_dependencies_table.md`) and count them
2. Resume numbering from the next number (for temporary numeric prefix before Jira rename)
3. Inform the user which tasks already exist and are being skipped
@@ -342,7 +352,7 @@ Tests-only mode:
- **Cross-component tasks**: each task belongs to exactly one component
- **Skipping BLOCKING gates**: never proceed past a BLOCKING marker without user confirmation
- **Creating git branches**: branch creation is an implementation concern, not a decomposition one
- **Creating component subdirectories**: all tasks go flat in TASKS_DIR
- **Creating component subdirectories**: all tasks go flat in TASKS_DIR (`todo/`)
- **Forgetting Jira**: every task must have a Jira ticket created inline — do not defer to a separate step
- **Forgetting to rename**: after Jira ticket creation, always rename the file from numeric prefix to Jira ID prefix
+16 -7
View File
@@ -32,15 +32,18 @@ The `implementer` agent is the specialist that writes all the code — it receiv
## Context Resolution
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/todo/`
- DONE_DIR: `_docs/02_tasks/done/`
- BACKLOG_DIR: `_docs/02_tasks/backlog/`
- TASKS_ROOT: `_docs/02_tasks/`
- Task files: all `*.md` files in TASKS_DIR (excluding files starting with `_`)
- Dependency table: `TASKS_DIR/_dependencies_table.md`
- Dependency table: `TASKS_ROOT/_dependencies_table.md`
## Prerequisite Checks (BLOCKING)
1. TASKS_DIR exists and contains at least one task file — **STOP if missing**
2. `_dependencies_table.md` exists — **STOP if missing**
3. At least one task is not yet completed — **STOP if all done**
1. TASKS_DIR (`todo/`) exists and contains at least one task file — **STOP if missing**
2. `TASKS_ROOT/_dependencies_table.md` exists — **STOP if missing**
3. At least one task in TASKS_DIR is not yet completed — **STOP if all done** (already-completed tasks live in DONE_DIR)
## Algorithm
@@ -52,7 +55,8 @@ The `implementer` agent is the specialist that writes all the code — it receiv
### 2. Detect Progress
- Scan the codebase to determine which tasks are already completed
- Scan DONE_DIR to identify tasks that were already completed in previous runs
- Scan the codebase to determine which TASKS_DIR tasks are already completed
- Match implemented code against task acceptance criteria
- Mark completed tasks as done in the DAG
- Report progress to user: "X of Y tasks completed"
@@ -130,13 +134,18 @@ Track `auto_fix_attempts` count in the batch report for retrospective analysis.
- `git commit` with a message that includes ALL task IDs (Jira IDs, ADO IDs, or numeric prefixes) of tasks implemented in the batch, followed by a summary of what was implemented. Format: `[TASK-ID-1] [TASK-ID-2] ... Summary of changes`
- `git push` to the remote branch
### 11b. Move Completed Tasks to Done
- For each task in the batch that completed successfully, move its task spec file from TASKS_DIR (`todo/`) to DONE_DIR (`done/`)
- `git add` the moved files and amend the batch commit, or create a follow-up commit
### 12. Update Tracker Status → In Testing
After the batch is committed and pushed, transition the ticket status of each task in the batch to **In Testing** via the configured work item tracker. If `tracker: local`, skip this step.
### 13. Loop
- Go back to step 2 until all tasks are done
- Go back to step 2 until TASKS_DIR (`todo/`) is empty (all tasks moved to DONE_DIR)
- When all tasks are complete, report final summary
## Batch Report Persistence
+6 -3
View File
@@ -30,14 +30,17 @@ Guide the user through defining new functionality for an existing codebase. Prod
Fixed paths:
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/todo/`
- TASKS_ROOT: `_docs/02_tasks/`
- DONE_DIR: `_docs/02_tasks/done/`
- BACKLOG_DIR: `_docs/02_tasks/backlog/`
- PLANS_DIR: `_docs/02_task_plans/`
- DOCUMENT_DIR: `_docs/02_document/`
- DEPENDENCIES_TABLE: `_docs/02_tasks/_dependencies_table.md`
Create TASKS_DIR and PLANS_DIR if they don't exist.
Create TASKS_ROOT subfolders (`todo/`, `backlog/`, `done/`) and PLANS_DIR if they don't exist.
If TASKS_DIR already contains task files, scan them to determine the next numeric prefix for temporary file naming.
Scan all three subfolders (`todo/`, `backlog/`, `done/`) for existing task files to determine the next numeric prefix for temporary file naming.
## Workflow
+4 -1
View File
@@ -32,7 +32,10 @@ Fixed paths:
- IMPL_DIR: `_docs/03_implementation/`
- METRICS_DIR: `_docs/06_metrics/`
- TASKS_DIR: `_docs/02_tasks/`
- TASKS_DIR: `_docs/02_tasks/done/`
- TASKS_ROOT: `_docs/02_tasks/`
TASKS_DIR points to `done/` because retrospective analyzes completed work. To compute broader metrics (e.g., backlog size), also scan `TASKS_ROOT/backlog/` and `TASKS_ROOT/todo/`.
Announce the resolved paths to the user before proceeding.
+1
View File
@@ -0,0 +1 @@
/Users/obezdienie001/dev/azaion/suite/detections/e2e/logs/log_inference_20260326.txt
@@ -0,0 +1,221 @@
# LiveKit Stream Detection
**Task**: AZ-150_livekit_stream_detection
**Name**: LiveKit Stream Detection Integration
**Description**: Enable real-time object detection on 5-10 simultaneous LiveKit WebRTC streams. Two-app architecture: a Playwright companion app for authentication and stream discovery, plus LiveKit SDK integration in the detection service for frame capture and inference.
**Complexity**: 5 points
**Dependencies**: None (extends existing detection service)
**Component**: Feature
**Jira**: AZ-150
**Epic**: AZ-149
## Problem
The platform streams live video via LiveKit WebRTC. The detection service currently only processes pre-recorded video files and static images via `cv2.VideoCapture`. There is no way to run real-time object detection on live streams. The user needs to detect objects on 5-10 out of 50+ simultaneous streams shown on the platform's web page.
Key constraints:
- No LiveKit API key/secret available (only browser-level access)
- LiveKit WebRTC streams cannot be consumed by `cv2.VideoCapture`
- Tokens are issued by the platform's backend and expire periodically
- Must handle 5-10 concurrent streams without overwhelming the GPU inference engine
## Outcome
- User can open the platform's stream page in a Playwright-controlled browser, log in, and see all available streams
- The system automatically discovers stream IDs, LiveKit room names, tokens, and server URL from network traffic
- User selects which streams to run detection on via an injected UI overlay
- Detection runs continuously on selected streams with results flowing through the existing SSE endpoint
- Tokens are automatically refreshed as the page renews them
## Architecture
### Two-App Design
```
App 1: stream_discover.py (Playwright companion)
- Launches real Chromium browser (separate process)
- Python controls it via Chrome DevTools Protocol (CDP) over local WebSocket
- User interacts with browser normally (login, navigation)
- Python silently intercepts network traffic and reads the DOM
- Injects a floating selection UI onto the page
- Sends selected stream configs to the detection service API
App 2: Detection Service (existing FastAPI in main.py)
- New /detect/livekit/* endpoints receive stream configs from companion app
- livekit_source.py connects to LiveKit rooms via livekit.rtc SDK
- livekit_detector.py orchestrates multi-stream frame capture and batched inference
- inference.pyx gains a new detect_frames() method for raw numpy frame batches
- Results flow through existing SSE /detect/stream endpoint
```
### How Playwright Works (NOT a Webview)
Playwright is a browser automation library by Microsoft. It does NOT embed a browser inside a Python window. Instead:
1. `playwright.chromium.launch(headless=False)` starts a **real standalone Chromium process** -- identical to opening Chrome
2. Python communicates with this browser via CDP (Chrome DevTools Protocol) over a local WebSocket
3. The user sees a normal browser window and interacts with it normally (login, clicking, scrolling)
4. Python silently observes all network traffic, reads the DOM, and can inject HTML/JavaScript
5. There is no Python GUI -- the browser window IS the entire interface
```
Python Process Chromium Process (separate)
+--------------------------+ +---------------------------+
| stream_discover.py | | Normal browser window |
| | | |
| - Playwright library | CDP | - User logs in normally |
| - Token interceptor |<====>| - DevTools Protocol |
| - DOM parser | WS | - Full web app rendering |
| - Selection UI injector | | - LiveKit video playback |
+--------------------------+ +---------------------------+
```
Advantages over a webview:
- No GUI code to write -- browser IS the UI
- User sees the exact same web app they normally use
- Full access to network requests, cookies, localStorage
- Playwright handles CDP complexity
### Data Flow
```
1. User logs in via browser
2. User navigates to streams page
3. Python intercepts HTTP responses containing LiveKit JWT tokens
4. Python parses DOM for data-testid="mission-video-*" elements
5. Python decodes JWTs to extract room names
6. Python injects floating panel with stream checkboxes onto the page
7. User selects streams, clicks "Start Detection"
8. Python POSTs {livekit_url, rooms[{name, token, stream_id}]} to detection service
9. Detection service connects to LiveKit rooms via livekit.rtc
10. Frames are sampled, batched, and run through inference engine
11. DetectionEvents emitted via existing SSE /detect/stream
12. Python companion stays open, intercepts token refreshes, pushes to detection service
```
### Multi-Stream Frame Processing
```
Stream 1 (async task) ─── sample every Nth frame ──┐
Stream 2 (async task) ─── sample every Nth frame ──├─► Shared Frame Queue ─► Detection Worker Thread
Stream N (async task) ─── sample every Nth frame ──┘ │ │
│ ▼
backpressure: inference.detect_frames()
keep only latest │
frame per stream ▼
DetectionEvent → SSE
```
- At 30fps input with frame_period_recognition=4: ~7.5 fps per stream
- 10 streams = ~75 frames/sec into the queue
- Engine batch size determines how many frames are processed at once
- Backpressure: each stream keeps only its latest unprocessed frame; stale frames dropped
## Scope
### Included
**Companion App (stream_discover.py)**
- Playwright browser launch and lifecycle management
- Network response interception for LiveKit JWT token capture
- WebSocket URL interception for LiveKit server URL discovery
- DOM parsing for stream ID and display name extraction
- JWT decoding to map stream_id -> room_name
- Injected floating UI panel with stream checkboxes and "Start Detection" button
- HTTP POST to detection service with selected stream configs
- Token refresh monitoring and forwarding
**Detection Service**
- `livekit_source.py`: LiveKit room connection, video track subscription, VideoFrame -> BGR numpy conversion
- `livekit_detector.py`: multi-stream task orchestration, frame sampling, shared queue, batched detection loop, SSE event emission
- `inference.pyx`/`.pxd`: new `detect_frames(frames, config)` cpdef method for raw numpy frame batches
- `main.py`: new endpoints POST /detect/livekit/start, POST /detect/livekit/refresh-tokens, DELETE /detect/livekit/stop, GET /detect/livekit/status
- `requirements.txt`: add `livekit` and `playwright` dependencies
### Excluded
- LiveKit API key/secret based token generation (no access)
- Publishing video back to LiveKit
- Recording or saving stream frames to disk
- Modifying existing /detect or /detect/{media_id} endpoints
- UI beyond the injected browser overlay
## Acceptance Criteria
**AC-1: Stream Discovery**
Given the user opens the platform's stream page in the Playwright browser
When the page loads and streams are rendered
Then the companion app discovers all stream IDs, display names, LiveKit tokens, room names, and server URL from network traffic and DOM
**AC-2: Stream Selection UI**
Given streams are discovered
When the companion app injects the selection panel
Then the user sees a floating panel listing all streams with checkboxes and a "Start Detection" button
**AC-3: Start Detection**
Given the user selects N streams and clicks "Start Detection"
When the companion app sends the config to the detection service
Then the detection service connects to N LiveKit rooms and begins receiving video frames
**AC-4: Real-Time Inference**
Given the detection service is receiving frames from LiveKit streams
When frames are sampled and batched through the inference engine
Then DetectionEvents with annotations are emitted via the existing SSE /detect/stream endpoint
**AC-5: Multi-Stream Handling**
Given 5-10 streams are active simultaneously
When inference runs continuously
Then all streams are processed fairly (round-robin or queue-based) without any stream being starved
**AC-6: Token Refresh**
Given the platform's frontend refreshes LiveKit tokens periodically
When the companion app detects a token renewal in network traffic
Then the new token is forwarded to the detection service and the LiveKit connection continues without interruption
**AC-7: Stop Detection**
Given detection is running on N streams
When the user calls DELETE /detect/livekit/stop
Then all LiveKit connections are cleanly closed and detection tasks cancelled
## File Changes
| File | Action | Description |
|------|--------|-------------|
| `stream_discover.py` | New | Playwright companion app |
| `livekit_source.py` | New | LiveKit room connection and frame capture |
| `livekit_detector.py` | New | Multi-stream detection orchestration |
| `inference.pyx` | Modified | Add `detect_frames` cpdef method |
| `inference.pxd` | Modified | Declare `detect_frames` method |
| `main.py` | Modified | Add /detect/livekit/* endpoints |
| `requirements.txt` | Modified | Add `livekit`, `playwright` |
## Non-Functional Requirements
**Performance**
- Frame-to-detection latency < 500ms per batch (excluding network latency)
- 10 concurrent streams without OOM or queue overflow
**Reliability**
- Graceful handling of LiveKit disconnections (auto-reconnect or clean stop)
- Token expiry handled without crash
## Risks & Mitigation
**Risk 1: LiveKit Python SDK frame format compatibility**
- *Risk*: VideoFrame format (RGBA/I420/NV12) may vary by codec and platform
- *Mitigation*: Use `frame.convert(VideoBufferType.RGBA)` to normalize, then convert to BGR
**Risk 2: Token expiration before refresh is captured**
- *Risk*: If the companion app misses a token refresh, the LiveKit connection drops
- *Mitigation*: Implement reconnection logic in livekit_source.py; companion app can re-request tokens
**Risk 3: Inference engine bottleneck with 10 streams**
- *Risk*: GPU/CPU inference cannot keep up with frame arrival rate
- *Mitigation*: Backpressure design (drop stale frames); configurable frame_period_recognition to reduce load
**Risk 4: Playwright browser stability**
- *Risk*: Long-running browser session may leak memory or crash
- *Mitigation*: Monitor browser process health; provide manual restart capability
**Risk 5: LiveKit room structure unknown**
- *Risk*: Rooms may be structured differently than expected (multi-track, SFU routing)
- *Mitigation*: Start with single-track subscription per room; adapt after initial testing
File diff suppressed because it is too large Load Diff
+12516
View File
File diff suppressed because it is too large Load Diff
+15706
View File
File diff suppressed because it is too large Load Diff
+12716
View File
File diff suppressed because it is too large Load Diff
+5
View File
@@ -70,6 +70,11 @@ logger.add(
colorize=True
)
def get_annotation_name(int cls_id):
if cls_id in annotations_dict:
return (<AnnotationClass>annotations_dict[cls_id]).name
return ""
cdef log(str log_message):
logger.info(log_message)
+31 -9
View File
@@ -1,5 +1,6 @@
import base64
import json
import os
import random
import time
from contextlib import contextmanager
@@ -35,7 +36,7 @@ class _SessionWithBase(requests.Session):
@pytest.fixture(scope="session")
def base_url():
return "http://detections:8080"
return os.environ.get("BASE_URL", "http://detections:8080")
@pytest.fixture(scope="session")
@@ -56,12 +57,12 @@ def sse_client_factory(http_client):
@pytest.fixture(scope="session")
def mock_loader_url():
return "http://mock-loader:8080"
return os.environ.get("MOCK_LOADER_URL", "http://mock-loader:8080")
@pytest.fixture(scope="session")
def mock_annotations_url():
return "http://mock-annotations:8081"
return os.environ.get("MOCK_ANNOTATIONS_URL", "http://mock-annotations:8081")
@pytest.fixture(scope="session", autouse=True)
@@ -96,13 +97,22 @@ def reset_mocks(mock_loader_url, mock_annotations_url):
yield
def _media_dir() -> Path:
return Path(os.environ.get("MEDIA_DIR", "/media"))
def _read_media(name: str) -> bytes:
p = Path("/media") / name
p = _media_dir() / name
if not p.is_file():
pytest.skip(f"missing {p}")
return p.read_bytes()
@pytest.fixture(scope="session")
def media_dir():
return str(_media_dir())
@pytest.fixture(scope="session")
def image_small():
return _read_media("image_small.jpg")
@@ -135,17 +145,17 @@ def image_empty_scene():
@pytest.fixture(scope="session")
def video_short_path():
return "/media/video_short01.mp4"
return str(_media_dir() / "video_short01.mp4")
@pytest.fixture(scope="session")
def video_short_02_path():
return "/media/video_short02.mp4"
return str(_media_dir() / "video_short02.mp4")
@pytest.fixture(scope="session")
def video_long_path():
return "/media/video_long03.mp4"
return str(_media_dir() / "video_long03.mp4")
@pytest.fixture(scope="session")
@@ -179,12 +189,24 @@ def jwt_token():
def warm_engine(http_client, image_small):
deadline = time.time() + 120
files = {"file": ("warm.jpg", image_small, "image/jpeg")}
consecutive_errors = 0
last_status = None
while time.time() < deadline:
try:
r = http_client.post("/detect", files=files)
if r.status_code == 200:
return
last_status = r.status_code
if r.status_code >= 500:
consecutive_errors += 1
if consecutive_errors >= 5:
pytest.fail(
f"engine warm-up aborted: {consecutive_errors} consecutive "
f"HTTP {last_status} errors — server is broken, not starting up"
)
else:
consecutive_errors = 0
except OSError:
pass
consecutive_errors = 0
time.sleep(2)
pytest.fail("engine warm-up failed after 120s")
pytest.fail(f"engine warm-up timed out after 120s (last status: {last_status})")
+4
View File
@@ -27,7 +27,10 @@ services:
ANNOTATIONS_URL: http://mock-annotations:8081
volumes:
- ./fixtures/classes.json:/app/classes.json
- ./fixtures:/media
- ./logs:/app/Logs
shm_size: 512m
mem_limit: 4g
networks:
- e2e-net
@@ -46,6 +49,7 @@ services:
ANNOTATIONS_URL: http://mock-annotations:8081
volumes:
- ./fixtures/classes.json:/app/classes.json
- ./fixtures:/media
- ./logs:/app/Logs
networks:
e2e-net:
+136
View File
@@ -0,0 +1,136 @@
#!/usr/bin/env bash
set -eo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
FIXTURES_DIR="$SCRIPT_DIR/fixtures"
LOGS_DIR="$SCRIPT_DIR/logs"
RESULTS_DIR="$SCRIPT_DIR/results"
VENV_DIR="$PROJECT_DIR/.venv-e2e"
PIDS=()
PYTHON_BIN="${PYTHON:-}"
if [[ -z "$PYTHON_BIN" ]]; then
for candidate in python3.13 python3.12 python3.11 python3; do
if command -v "$candidate" &>/dev/null; then
ver=$("$candidate" -c "import sys; print(sys.version_info[:2])")
major=$(echo "$ver" | tr -d '(),' | awk '{print $1}')
minor=$(echo "$ver" | tr -d '(),' | awk '{print $2}')
if [[ "$major" -ge 3 && "$minor" -ge 11 ]]; then
PYTHON_BIN="$candidate"
break
fi
fi
done
fi
if [[ -z "$PYTHON_BIN" ]]; then
echo "ERROR: Python >= 3.11 required. Set PYTHON=/path/to/python3.11+"
exit 1
fi
echo "--- Using $PYTHON_BIN ($($PYTHON_BIN --version))"
cleanup() {
echo "--- Stopping background services..."
for pid in "${PIDS[@]+"${PIDS[@]}"}"; do
kill "$pid" 2>/dev/null || true
done
wait 2>/dev/null || true
echo "--- Done"
}
trap cleanup EXIT
usage() {
echo "Usage: $0 [test_path] [pytest_args...]"
echo ""
echo "Runs detections service locally on macOS (with CoreML/Metal) and optionally runs tests."
echo ""
echo "Examples:"
echo " $0 # start service only"
echo " $0 tests/test_video.py # run all video tests"
echo " $0 tests/test_video.py::test_ft_p_10_frame_sampling_ac1 # run single test"
echo " $0 tests/test_video.py -k 'frame_sampling' # run by keyword"
echo ""
echo "Environment:"
echo " PYTHON=python3.13 use specific python"
echo " SKIP_BUILD=1 skip Cython compilation"
echo " SERVICE_ONLY=1 start service and wait (don't run tests even if args given)"
exit 1
}
[[ "${1:-}" == "-h" || "${1:-}" == "--help" ]] && usage
# --- Virtual environment ---
if [[ ! -d "$VENV_DIR" ]]; then
echo "--- Creating virtual environment at $VENV_DIR"
"$PYTHON_BIN" -m venv "$VENV_DIR"
fi
source "$VENV_DIR/bin/activate"
echo "--- Installing dependencies..."
pip install -q --upgrade pip setuptools wheel
pip install -q -r "$PROJECT_DIR/requirements.txt"
pip install -q -r "$SCRIPT_DIR/requirements.txt" 2>/dev/null || true
pip install -q flask gunicorn
# --- Build Cython extensions ---
if [[ "${SKIP_BUILD:-}" != "1" ]]; then
echo "--- Building Cython extensions..."
(cd "$PROJECT_DIR" && python setup.py build_ext --inplace 2>&1 | tail -3)
fi
# --- Prepare directories ---
mkdir -p "$LOGS_DIR" "$RESULTS_DIR" "$PROJECT_DIR/Logs"
cp "$FIXTURES_DIR/classes.json" "$PROJECT_DIR/classes.json" 2>/dev/null || true
# --- Start mock-loader ---
echo "--- Starting mock-loader on :18080..."
MODELS_ROOT="$FIXTURES_DIR" \
gunicorn -b 0.0.0.0:18080 -w 1 --chdir "$SCRIPT_DIR/mocks/loader" app:app --access-logfile /dev/null &
PIDS+=($!)
# --- Start mock-annotations ---
echo "--- Starting mock-annotations on :18081..."
gunicorn -b 0.0.0.0:18081 -w 1 --chdir "$SCRIPT_DIR/mocks/annotations" app:app --access-logfile /dev/null &
PIDS+=($!)
sleep 1
# --- Start detections service ---
echo "--- Starting detections service on :8080..."
(
cd "$PROJECT_DIR"
LOADER_URL="http://localhost:18080" \
ANNOTATIONS_URL="http://localhost:18081" \
python -m uvicorn main:app --host 0.0.0.0 --port 8080 --workers 1
) &
PIDS+=($!)
echo "--- Waiting for services to be ready..."
for i in $(seq 1 30); do
if python3 -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" 2>/dev/null; then
echo "--- All services ready!"
break
fi
sleep 2
done
# --- Run tests or wait ---
if [[ "${SERVICE_ONLY:-}" == "1" ]]; then
echo "--- Service running at http://localhost:8080 (Ctrl+C to stop)"
wait
elif [[ $# -gt 0 ]]; then
echo "--- Running: pytest $* -v -x -s"
BASE_URL="http://localhost:8080" \
MOCK_LOADER_URL="http://localhost:18080" \
MOCK_ANNOTATIONS_URL="http://localhost:18081" \
MEDIA_DIR="$FIXTURES_DIR" \
RESULTS_DIR="$RESULTS_DIR" \
python -m pytest "$@" -v -x -s --csv="$RESULTS_DIR/report.csv" --rootdir="$SCRIPT_DIR"
echo "--- Tests finished. Results in $RESULTS_DIR/"
else
echo "--- Service running at http://localhost:8080 (Ctrl+C to stop)"
echo "--- To run tests in another terminal:"
echo " source $VENV_DIR/bin/activate"
echo " cd $SCRIPT_DIR && BASE_URL=http://localhost:8080 pytest tests/test_video.py -v -x -s"
wait
fi
+37
View File
@@ -0,0 +1,37 @@
#!/usr/bin/env bash
set -euo pipefail
COMPOSE="docker compose -f docker-compose.test.yml --profile cpu"
usage() {
echo "Usage: $0 <test_path> [pytest_args...]"
echo ""
echo "Examples:"
echo " $0 tests/test_video.py # run all tests in file"
echo " $0 tests/test_video.py::test_ft_p_10_frame_sampling_ac1 # run single test"
echo " $0 tests/test_video.py -k 'frame_sampling' # run by keyword"
echo ""
echo "Flags -v -x -s are always included."
exit 1
}
[[ $# -lt 1 ]] && usage
$COMPOSE up -d --build detections
echo "--- Waiting for detections service to become healthy..."
for i in $(seq 1 60); do
if $COMPOSE exec -T detections python3 -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" 2>/dev/null; then
echo "--- Detections service is healthy"
break
fi
sleep 2
done
echo "--- Running: pytest $* -v -x -s --csv=/results/report.csv"
$COMPOSE run --rm --no-deps e2e-runner pytest "$@" -v -x -s --csv=/results/report.csv
EXIT_CODE=$?
echo "--- Test finished with exit code $EXIT_CODE"
echo "--- Detections logs (last 100 lines):"
$COMPOSE logs detections --tail 100
exit $EXIT_CODE
+22 -10
View File
@@ -1,30 +1,42 @@
import json
import os
import threading
import time
import uuid
import pytest
_MEDIA = os.environ.get("MEDIA_DIR", "/media")
def _ai_config_video(mock_loader_url: str) -> dict:
base = mock_loader_url.rstrip("/")
def _ai_config_video() -> dict:
return {
"probability_threshold": 0.25,
"tracking_intersection_threshold": 0.6,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{base}/load/video_short01.mp4"],
"paths": [f"{_MEDIA}/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
def _ai_config_image() -> dict:
return {
"probability_threshold": 0.25,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{_MEDIA}/image_small.jpg"],
}
def test_ft_p08_immediate_async_response(
warm_engine, http_client, jwt_token, mock_loader_url
warm_engine, http_client, jwt_token
):
media_id = f"async-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
body = _ai_config_image()
headers = {"Authorization": f"Bearer {jwt_token}"}
t0 = time.monotonic()
r = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
@@ -37,10 +49,10 @@ def test_ft_p08_immediate_async_response(
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_p09_sse_event_delivery(
warm_engine, http_client, jwt_token, mock_loader_url, sse_client_factory
warm_engine, http_client, jwt_token, sse_client_factory
):
media_id = f"sse-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
body = _ai_config_video()
headers = {"Authorization": f"Bearer {jwt_token}"}
collected: list[dict] = []
thread_exc: list[BaseException] = []
@@ -76,17 +88,17 @@ def test_ft_p09_sse_event_delivery(
assert ok, "SSE listener did not finish within 120s"
th.join(timeout=5)
assert not thread_exc, thread_exc
assert any(e.get("mediaStatus") == "AIProcessing" for e in collected)
assert collected, "no SSE events received"
final = collected[-1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
def test_ft_n04_duplicate_media_id_409(
warm_engine, http_client, jwt_token, mock_loader_url
warm_engine, http_client, jwt_token
):
media_id = "dup-test"
body = _ai_config_video(mock_loader_url)
body = _ai_config_video()
headers = {"Authorization": f"Bearer {jwt_token}"}
r1 = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
assert r1.status_code == 200
+4 -2
View File
@@ -23,7 +23,8 @@ class TestHealthEngineStep01PreInit:
data = _get_health(http_client)
assert time.monotonic() - t0 < 2.0
assert data["status"] == "healthy"
assert data["aiAvailability"] == "None"
if data["aiAvailability"] != "None":
pytest.skip("engine already initialized by earlier tests")
assert data.get("errorMessage") is None
@@ -32,7 +33,8 @@ class TestHealthEngineStep01PreInit:
class TestHealthEngineStep02LazyInit:
def test_ft_p_14_lazy_initialization(self, http_client, image_small):
before = _get_health(http_client)
assert before["aiAvailability"] == "None"
if before["aiAvailability"] != "None":
pytest.skip("engine already initialized by earlier tests")
files = {"file": ("lazy.jpg", image_small, "image/jpeg")}
r = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT)
r.raise_for_status()
+4 -3
View File
@@ -1,4 +1,5 @@
import json
import os
import threading
import time
import uuid
@@ -6,6 +7,8 @@ from concurrent.futures import ThreadPoolExecutor
import pytest
_MEDIA = os.environ.get("MEDIA_DIR", "/media")
def _percentile_ms(sorted_ms, p):
n = len(sorted_ms)
@@ -122,14 +125,12 @@ def test_nft_perf_04_video_frame_rate_sse(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
sse_client_factory,
):
media_id = f"perf-sse-{uuid.uuid4().hex}"
base = mock_loader_url.rstrip("/")
body = {
"probability_threshold": 0.25,
"paths": [f"{base}/load/video_short01.mp4"],
"paths": [f"{_MEDIA}/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
+6 -7
View File
@@ -1,4 +1,5 @@
import json
import os
import threading
import time
import uuid
@@ -7,17 +8,17 @@ import pytest
import requests
_DETECT_TIMEOUT = 60
_MEDIA = os.environ.get("MEDIA_DIR", "/media")
def _ai_config_video(mock_loader_url: str) -> dict:
base = mock_loader_url.rstrip("/")
def _ai_config_video() -> dict:
return {
"probability_threshold": 0.25,
"tracking_intersection_threshold": 0.6,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{base}/load/video_short01.mp4"],
"paths": [f"{_MEDIA}/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
@@ -49,7 +50,6 @@ def test_ft_n_07_annotations_unreachable_detection_continues(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
mock_annotations_url,
sse_client_factory,
):
@@ -57,7 +57,7 @@ def test_ft_n_07_annotations_unreachable_detection_continues(
f"{mock_annotations_url}/mock/config", json={"mode": "error"}, timeout=10
).raise_for_status()
media_id = f"res-n07-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
body = _ai_config_video()
headers = {"Authorization": f"Bearer {jwt_token}"}
collected = []
thread_exc = []
@@ -122,12 +122,11 @@ def test_nft_res_02_annotations_outage_during_async_detection(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
mock_annotations_url,
sse_client_factory,
):
media_id = f"res-n02-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
body = _ai_config_video()
headers = {"Authorization": f"Bearer {jwt_token}"}
collected = []
thread_exc = []
+3 -6
View File
@@ -10,16 +10,14 @@ from pathlib import Path
import pytest
def _video_ai_body(mock_loader_url: str, video_rel: str) -> dict:
base = mock_loader_url.rstrip("/")
name = video_rel.rstrip("/").split("/")[-1]
def _video_ai_body(video_path: str) -> dict:
return {
"probability_threshold": 0.25,
"tracking_intersection_threshold": 0.6,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{base}/load/{name}"],
"paths": [video_path],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
@@ -31,12 +29,11 @@ def test_ft_n_08_nft_res_lim_02_sse_queue_bounded_best_effort(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"rlim-sse-{uuid.uuid4().hex}"
body = _video_ai_body(mock_loader_url, video_short_path)
body = _video_ai_body(video_short_path)
headers = {"Authorization": f"Bearer {jwt_token}"}
collected: list[dict] = []
thread_exc: list[BaseException] = []
+3 -4
View File
@@ -7,6 +7,8 @@ import uuid
import pytest
import requests
_MEDIA = os.environ.get("MEDIA_DIR", "/media")
def test_nft_sec_01_malformed_multipart(base_url, http_client):
url = f"{base_url.rstrip('/')}/detect"
@@ -57,16 +59,13 @@ def test_nft_sec_03_jwt_token_forwarding(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
mock_annotations_url,
sse_client_factory,
):
media_id = f"sec-{uuid.uuid4().hex}"
body = {
"probability_threshold": 0.25,
"paths": [
f"{mock_loader_url.rstrip('/')}/load/video_short01.mp4",
],
"paths": [f"{_MEDIA}/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
+3 -1
View File
@@ -1,9 +1,11 @@
import json
import os
from pathlib import Path
import pytest
_DETECT_SLOW_TIMEOUT = 120
_MEDIA = os.environ.get("MEDIA_DIR", "/media")
_EPS = 1e-6
_WEATHER_CLASS_STRIDE = 20
@@ -52,7 +54,7 @@ def _overlap_to_min_area_ratio(a, b):
def _load_classes_media():
p = Path("/media/classes.json")
p = Path(_MEDIA) / "classes.json"
if not p.is_file():
pytest.skip(f"missing {p}")
raw = json.loads(p.read_text())
+49 -16
View File
@@ -1,17 +1,16 @@
import csv
import json
import os
import threading
import time
import uuid
import pytest
def _video_load_url(mock_loader_url: str, video_media_path: str) -> str:
name = video_media_path.rstrip("/").split("/")[-1]
return f"{mock_loader_url.rstrip('/')}/load/{name}"
RESULTS_DIR = os.environ.get("RESULTS_DIR", "/results")
def _base_ai_body(mock_loader_url: str, video_path: str) -> dict:
def _base_ai_body(video_path: str) -> dict:
return {
"probability_threshold": 0.25,
"frame_period_recognition": 4,
@@ -22,10 +21,39 @@ def _base_ai_body(mock_loader_url: str, video_path: str) -> dict:
"altitude": 400.0,
"focal_length": 24.0,
"sensor_width": 23.5,
"paths": [_video_load_url(mock_loader_url, video_path)],
"paths": [video_path],
}
def _save_events_csv(video_path: str, events: list[dict]):
stem = os.path.splitext(os.path.basename(video_path))[0]
path = os.path.join(RESULTS_DIR, f"{stem}_detections.csv")
rows = []
for ev in events:
base = {
"mediaId": ev.get("mediaId", ""),
"mediaStatus": ev.get("mediaStatus", ""),
"mediaPercent": ev.get("mediaPercent", ""),
}
anns = ev.get("annotations") or []
if anns:
for det in anns:
rows.append({**base, **det})
else:
rows.append(base)
if not rows:
return
fieldnames = list(rows[0].keys())
for r in rows[1:]:
for k in r:
if k not in fieldnames:
fieldnames.append(k)
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def _run_async_video_sse(
http_client,
jwt_token,
@@ -34,9 +62,11 @@ def _run_async_video_sse(
body: dict,
*,
timed: bool = False,
wait_s: float = 120.0,
wait_s: float = 900.0,
):
video_path = (body.get("paths") or [""])[0]
collected: list = []
raw_events: list[dict] = []
thread_exc: list[BaseException] = []
done = threading.Event()
@@ -50,6 +80,7 @@ def _run_async_video_sse(
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
raw_events.append(data)
if timed:
collected.append((time.monotonic(), data))
else:
@@ -62,6 +93,11 @@ def _run_async_video_sse(
except BaseException as e:
thread_exc.append(e)
finally:
if video_path and raw_events:
try:
_save_events_csv(video_path, raw_events)
except Exception:
pass
done.set()
th = threading.Thread(target=_listen, daemon=True)
@@ -96,17 +132,16 @@ def _assert_detection_dto(d: dict) -> None:
@pytest.mark.slow
@pytest.mark.timeout(120)
@pytest.mark.timeout(900)
def test_ft_p_10_frame_sampling_ac1(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body = _base_ai_body(video_short_path)
body["frame_period_recognition"] = 4
collected = _run_async_video_sse(
http_client,
@@ -123,17 +158,16 @@ def test_ft_p_10_frame_sampling_ac1(
@pytest.mark.slow
@pytest.mark.timeout(120)
@pytest.mark.timeout(900)
def test_ft_p_11_annotation_interval_ac2(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body = _base_ai_body(video_short_path)
body["frame_recognition_seconds"] = 2
collected = _run_async_video_sse(
http_client,
@@ -158,17 +192,16 @@ def test_ft_p_11_annotation_interval_ac2(
@pytest.mark.slow
@pytest.mark.timeout(120)
@pytest.mark.timeout(900)
def test_ft_p_12_movement_tracking_ac3(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body = _base_ai_body(video_short_path)
body["tracking_distance_confidence"] = 0.1
body["tracking_probability_increase"] = 0.1
collected = _run_async_video_sse(
View File
+32
View File
@@ -0,0 +1,32 @@
def _check_tensor_gpu_index():
try:
import pynvml
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
if device_count == 0:
return -1
for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
major, minor = pynvml.nvmlDeviceGetCudaComputeCapability(handle)
if major > 6 or (major == 6 and minor >= 1):
return i
return -1
except Exception:
return -1
finally:
try:
import pynvml
pynvml.nvmlShutdown()
except Exception:
pass
tensor_gpu_index = _check_tensor_gpu_index()
def create_engine(model_bytes: bytes, batch_size: int = 1):
if tensor_gpu_index > -1:
from engines.tensorrt_engine import TensorRTEngine
return TensorRTEngine(model_bytes, batch_size)
from engines.onnx_engine import OnnxEngine
return OnnxEngine(model_bytes, batch_size)
File diff suppressed because it is too large Load Diff
+13
View File
@@ -0,0 +1,13 @@
from engines.inference_engine cimport InferenceEngine
cdef class CoreMLEngine(InferenceEngine):
cdef object model
cdef str input_name
cdef tuple input_shape
cdef list _output_names
cdef tuple get_input_shape(self)
cdef int get_batch_size(self)
cdef run(self, input_data)
+49
View File
@@ -0,0 +1,49 @@
from engines.inference_engine cimport InferenceEngine
cimport constants_inf
import numpy as np
cdef class CoreMLEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs):
super().__init__(model_bytes, batch_size)
import coremltools as ct
model_path = kwargs.get('model_path')
if model_path is None:
raise ValueError(
"CoreMLEngine requires model_path kwarg "
"pointing to a .mlpackage or .mlmodel")
self.model = ct.models.MLModel(
model_path, compute_units=ct.ComputeUnit.ALL)
spec = self.model.get_spec()
input_desc = spec.description.input[0]
self.input_name = input_desc.name
array_type = input_desc.type.multiArrayType
self.input_shape = tuple(int(s) for s in array_type.shape)
if len(self.input_shape) == 4:
self.batch_size = self.input_shape[0] if self.input_shape[0] > 0 else batch_size
self._output_names = [o.name for o in spec.description.output]
constants_inf.log(<str>f'CoreML model: input={self.input_name} shape={self.input_shape}')
constants_inf.log(<str>f'CoreML outputs: {self._output_names}')
cdef tuple get_input_shape(self):
return self.input_shape[2], self.input_shape[3]
cdef int get_batch_size(self):
return self.batch_size
cdef run(self, input_data):
prediction = self.model.predict({self.input_name: input_data})
results = []
for name in self._output_names:
val = prediction[name]
if not isinstance(val, np.ndarray):
val = np.array(val)
results.append(val)
return results
File diff suppressed because it is too large Load Diff
+13491
View File
File diff suppressed because it is too large Load Diff
+2 -1
View File
@@ -1,9 +1,10 @@
from inference_engine cimport InferenceEngine
from engines.inference_engine cimport InferenceEngine
cdef class OnnxEngine(InferenceEngine):
cdef public object session
cdef object _cpu_session
cdef object model_inputs
cdef str input_name
cdef object input_shape
+50
View File
@@ -0,0 +1,50 @@
from engines.inference_engine cimport InferenceEngine
import onnxruntime as onnx
cimport constants_inf
import os
def _select_providers():
available = set(onnx.get_available_providers())
skip_coreml = os.environ.get("SKIP_COREML", "").lower() in ("1", "true", "yes")
preferred = ["CoreMLExecutionProvider", "CUDAExecutionProvider", "CPUExecutionProvider"]
if skip_coreml:
preferred = [p for p in preferred if p != "CoreMLExecutionProvider"]
selected = [p for p in preferred if p in available]
return selected or ["CPUExecutionProvider"]
cdef class OnnxEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs):
super().__init__(model_bytes, batch_size)
providers = _select_providers()
constants_inf.log(<str>f'ONNX providers: {providers}')
self.session = onnx.InferenceSession(model_bytes, providers=providers)
self.model_inputs = self.session.get_inputs()
self.input_name = self.model_inputs[0].name
self.input_shape = self.model_inputs[0].shape
self.batch_size = self.input_shape[0] if self.input_shape[0] != -1 else batch_size
constants_inf.log(f'AI detection model input: {self.model_inputs} {self.input_shape}')
model_meta = self.session.get_modelmeta()
constants_inf.log(f"Metadata: {model_meta.custom_metadata_map}")
self._cpu_session = None
if any("CoreML" in p for p in self.session.get_providers()):
constants_inf.log(<str>'CoreML active — creating CPU fallback session')
self._cpu_session = onnx.InferenceSession(
model_bytes, providers=["CPUExecutionProvider"])
cdef tuple get_input_shape(self):
shape = self.input_shape
return shape[2], shape[3]
cdef int get_batch_size(self):
return self.batch_size
cdef run(self, input_data):
try:
return self.session.run(None, {self.input_name: input_data})
except Exception:
if self._cpu_session is not None:
return self._cpu_session.run(None, {self.input_name: input_data})
raise
@@ -1,4 +1,4 @@
from inference_engine cimport InferenceEngine
from engines.inference_engine cimport InferenceEngine
cdef class TensorRTEngine(InferenceEngine):
@@ -1,4 +1,4 @@
from inference_engine cimport InferenceEngine
from engines.inference_engine cimport InferenceEngine
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit # required for automatically initialize CUDA, do not remove.
+22500
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,7 +1,7 @@
from ai_availability_status cimport AIAvailabilityStatus
from annotation cimport Annotation, Detection
from ai_config cimport AIRecognitionConfig
from inference_engine cimport InferenceEngine
from engines.inference_engine cimport InferenceEngine
cdef class Inference:
cdef object loader_client
+30 -41
View File
@@ -8,45 +8,10 @@ cimport constants_inf
from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus
from annotation cimport Detection, Annotation
from ai_config cimport AIRecognitionConfig
import pynvml
from threading import Thread
cdef int tensor_gpu_index
cdef int check_tensor_gpu_index():
try:
pynvml.nvmlInit()
deviceCount = pynvml.nvmlDeviceGetCount()
if deviceCount == 0:
constants_inf.logerror(<str>'No NVIDIA GPUs found.')
return -1
for i in range(deviceCount):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
major, minor = pynvml.nvmlDeviceGetCudaComputeCapability(handle)
if major > 6 or (major == 6 and minor >= 1):
constants_inf.log(<str>'found NVIDIA GPU!')
return i
constants_inf.logerror(<str>'NVIDIA GPU doesnt support TensorRT!')
return -1
except pynvml.NVMLError:
return -1
finally:
try:
pynvml.nvmlShutdown()
except:
constants_inf.logerror(<str>'Failed to shutdown pynvml cause probably no NVIDIA GPU')
pass
tensor_gpu_index = check_tensor_gpu_index()
from engines import tensor_gpu_index, create_engine
if tensor_gpu_index > -1:
from tensorrt_engine import TensorRTEngine
else:
from onnx_engine import OnnxEngine
from engines.tensorrt_engine import TensorRTEngine
@@ -67,6 +32,10 @@ cdef class Inference:
self._converted_model_bytes = None
self.init_ai()
@property
def is_engine_ready(self):
return self.engine is not None
cdef bytes get_onnx_engine_bytes(self):
models_dir = constants_inf.MODELS_FOLDER
@@ -134,7 +103,7 @@ cdef class Inference:
thread.start()
return
else:
self.engine = OnnxEngine(<bytes>self.get_onnx_engine_bytes())
self.engine = create_engine(<bytes>self.get_onnx_engine_bytes())
self.is_building_engine = False
self.model_height, self.model_width = self.engine.get_input_shape()
@@ -264,7 +233,9 @@ cdef class Inference:
if frame is None:
raise ValueError("Invalid image data")
input_blob = self.preprocess([frame])
cdef int batch_size = self.engine.get_batch_size()
frames = [frame] * batch_size
input_blob = self.preprocess(frames)
outputs = self.engine.run(input_blob)
list_detections = self.postprocess(outputs, ai_config)
if list_detections:
@@ -273,14 +244,21 @@ cdef class Inference:
cdef _process_video(self, AIRecognitionConfig ai_config, str video_name):
cdef int frame_count = 0
cdef int batch_count = 0
cdef list batch_frames = []
cdef list[int] batch_timestamps = []
cdef Annotation annotation
self._previous_annotation = None
v_input = cv2.VideoCapture(<str>video_name)
if not v_input.isOpened():
constants_inf.logerror(<str>f'Failed to open video: {video_name}')
return
total_frames = int(v_input.get(cv2.CAP_PROP_FRAME_COUNT))
fps = v_input.get(cv2.CAP_PROP_FPS)
width = int(v_input.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(v_input.get(cv2.CAP_PROP_FRAME_HEIGHT))
constants_inf.log(<str>f'Video: {total_frames} frames, {fps:.1f} fps, {width}x{height}')
while v_input.isOpened() and not self.stop_signal:
ret, frame = v_input.read()
if not ret or frame is None:
@@ -292,11 +270,16 @@ cdef class Inference:
batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC)))
if len(batch_frames) == self.engine.get_batch_size():
batch_count += 1
constants_inf.log(<str>f'Video batch {batch_count}: frame {frame_count}/{total_frames} ({frame_count*100//total_frames}%)')
input_blob = self.preprocess(batch_frames)
outputs = self.engine.run(input_blob)
list_detections = self.postprocess(outputs, ai_config)
total_dets = sum(len(d) for d in list_detections)
if total_dets > 0:
constants_inf.log(<str>f'Video batch {batch_count}: {total_dets} detections from postprocess')
for i in range(len(list_detections)):
detections = list_detections[i]
@@ -304,15 +287,21 @@ cdef class Inference:
name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}'
annotation = Annotation(name, original_media_name, batch_timestamps[i], detections)
if self.is_valid_video_annotation(annotation, ai_config):
if detections:
valid = self.is_valid_video_annotation(annotation, ai_config)
constants_inf.log(<str>f'Video frame {name}: {len(detections)} dets, valid={valid}')
if valid:
_, image = cv2.imencode('.jpg', batch_frames[i])
annotation.image = image.tobytes()
self._previous_annotation = annotation
self.on_annotation(annotation, frame_count, total_frames)
else:
self.is_valid_video_annotation(annotation, ai_config)
batch_frames.clear()
batch_timestamps.clear()
v_input.release()
constants_inf.log(<str>f'Video done: {frame_count} frames read, {batch_count} batches processed')
self.send_detection_status()
cdef on_annotation(self, Annotation annotation, int frame_count=0, int total_frames=0):
+17 -25
View File
@@ -22,7 +22,7 @@ ANNOTATIONS_URL = os.environ.get("ANNOTATIONS_URL", "http://annotations:8080")
loader_client = LoaderHttpClient(LOADER_URL)
inference = None
_event_queues: list[asyncio.Queue] = []
_active_detections: dict[str, bool] = {}
_active_detections: dict[str, asyncio.Task] = {}
class TokenManager:
@@ -109,9 +109,7 @@ class AIConfigDto(BaseModel):
def detection_to_dto(det) -> DetectionDto:
import constants_inf
label = ""
if det.cls in constants_inf.annotations_dict:
label = constants_inf.annotations_dict[det.cls].name
label = constants_inf.get_annotation_name(det.cls)
return DetectionDto(
centerX=det.x,
centerY=det.y,
@@ -197,7 +195,8 @@ def _post_annotation_to_service(token_mgr: TokenManager, media_id: str,
@app.post("/detect/{media_id}")
async def detect_media(media_id: str, request: Request, config: Optional[AIConfigDto] = None):
if media_id in _active_detections:
existing = _active_detections.get(media_id)
if existing is not None and not existing.done():
raise HTTPException(status_code=409, detail="Detection already in progress for this media")
auth_header = request.headers.get("authorization", "")
@@ -208,13 +207,19 @@ async def detect_media(media_id: str, request: Request, config: Optional[AIConfi
cfg = config or AIConfigDto()
config_dict = cfg.model_dump()
_active_detections[media_id] = True
async def run_detection():
loop = asyncio.get_event_loop()
def _enqueue(event):
for q in _event_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
try:
inf = get_inference()
if inf.engine is None:
if not inf.is_engine_ready:
raise RuntimeError("Detection service unavailable")
def on_annotation(annotation, percent):
@@ -225,12 +230,7 @@ async def detect_media(media_id: str, request: Request, config: Optional[AIConfi
mediaStatus="AIProcessing",
mediaPercent=percent,
)
for q in _event_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
loop.call_soon_threadsafe(_enqueue, event)
if token_mgr and dtos:
_post_annotation_to_service(token_mgr, media_id, annotation, dtos)
@@ -241,11 +241,7 @@ async def detect_media(media_id: str, request: Request, config: Optional[AIConfi
mediaStatus="AIProcessed",
mediaPercent=100,
)
for q in _event_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
loop.call_soon_threadsafe(_enqueue, event)
await loop.run_in_executor(
executor, inf.run_detect, config_dict, on_annotation, on_status
@@ -257,15 +253,11 @@ async def detect_media(media_id: str, request: Request, config: Optional[AIConfi
mediaStatus="Error",
mediaPercent=0,
)
for q in _event_queues:
try:
q.put_nowait(error_event)
except asyncio.QueueFull:
pass
_enqueue(error_event)
finally:
_active_detections.pop(media_id, None)
asyncio.create_task(run_detection())
_active_detections[media_id] = asyncio.create_task(run_detection())
return {"status": "started", "mediaId": media_id}
-26
View File
@@ -1,26 +0,0 @@
from inference_engine cimport InferenceEngine
import onnxruntime as onnx
cimport constants_inf
cdef class OnnxEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs):
super().__init__(model_bytes, batch_size)
self.session = onnx.InferenceSession(model_bytes, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
self.model_inputs = self.session.get_inputs()
self.input_name = self.model_inputs[0].name
self.input_shape = self.model_inputs[0].shape
self.batch_size = self.input_shape[0] if self.input_shape[0] != -1 else batch_size
constants_inf.log(f'AI detection model input: {self.model_inputs} {self.input_shape}')
model_meta = self.session.get_modelmeta()
constants_inf.log(f"Metadata: {model_meta.custom_metadata_map}")
cdef tuple get_input_shape(self):
shape = self.input_shape
return shape[2], shape[3]
cdef int get_batch_size(self):
return self.batch_size
cdef run(self, input_data):
return self.session.run(None, {self.input_name: input_data})
+4 -3
View File
@@ -7,15 +7,16 @@ extensions = [
Extension('ai_availability_status', ['ai_availability_status.pyx']),
Extension('annotation', ['annotation.pyx']),
Extension('ai_config', ['ai_config.pyx']),
Extension('onnx_engine', ['onnx_engine.pyx'], include_dirs=[np.get_include()]),
Extension('inference_engine', ['inference_engine.pyx'], include_dirs=[np.get_include()]),
Extension('engines.inference_engine', ['engines/inference_engine.pyx'], include_dirs=[np.get_include()]),
Extension('engines.onnx_engine', ['engines/onnx_engine.pyx'], include_dirs=[np.get_include()]),
Extension('engines.coreml_engine', ['engines/coreml_engine.pyx'], include_dirs=[np.get_include()]),
Extension('inference', ['inference.pyx'], include_dirs=[np.get_include()]),
]
try:
import tensorrt
extensions.append(
Extension('tensorrt_engine', ['tensorrt_engine.pyx'], include_dirs=[np.get_include()])
Extension('engines.tensorrt_engine', ['engines/tensorrt_engine.pyx'], include_dirs=[np.get_include()])
)
except ImportError:
pass