diff --git a/.cursor/skills/refactor/phases/01-discovery.md b/.cursor/skills/refactor/phases/01-discovery.md index 5621bac..8617577 100644 --- a/.cursor/skills/refactor/phases/01-discovery.md +++ b/.cursor/skills/refactor/phases/01-discovery.md @@ -37,7 +37,13 @@ For each file/area referenced in the input file: Write per-component to `RUN_DIR/discovery/components/[##]_[name].md` (same format as automatic mode, but scoped to affected areas only). -### 1i. Produce List of Changes +### 1i. Logical Flow Analysis (guided mode) + +Even in guided mode, perform the logical flow analysis from step 1c (automatic mode) — scoped to the areas affected by the input file. Cross-reference documented flows against actual implementation for the affected components. This catches issues the input file author may have missed. + +Write findings to `RUN_DIR/discovery/logical_flow_analysis.md`. + +### 1j. Produce List of Changes 1. Start from the validated input file entries 2. Enrich each entry with: @@ -45,7 +51,8 @@ Write per-component to `RUN_DIR/discovery/components/[##]_[name].md` (same forma - Risk assessment (low/medium/high) - Dependencies between changes 3. Add any additional issues discovered during scoped analysis (1h) -4. Write `RUN_DIR/list-of-changes.md` using `templates/list-of-changes.md` format +4. **Add any logical flow contradictions** discovered during step 1i +5. Write `RUN_DIR/list-of-changes.md` using `templates/list-of-changes.md` format - Set **Mode**: `guided` - Set **Source**: path to the original input file @@ -84,9 +91,36 @@ Also copy to project standard locations: - `SOLUTION_DIR/solution.md` - `DOCUMENT_DIR/system_flows.md` -### 1c. Produce List of Changes +### 1c. Logical Flow Analysis -From the component analysis and solution synthesis, identify all issues that need refactoring: +**Critical step — do not skip.** Before producing the change list, cross-reference documented business flows against actual implementation. This catches issues that static code inspection alone misses. + +1. **Read documented flows**: Load `DOCUMENT_DIR/system-flows.md`, `DOCUMENT_DIR/architecture.md`, and `SOLUTION_DIR/solution.md` (if they exist). Extract every documented business flow, data path, and architectural decision. + +2. **Trace each flow through code**: For every documented flow (e.g., "video batch processing", "image tiling", "engine initialization"), walk the actual code path line by line. At each decision point ask: + - Does the code match the documented/intended behavior? + - Are there edge cases where the flow silently drops data, double-processes, or deadlocks? + - Do loop boundaries handle partial batches, empty inputs, and last-iteration cleanup? + - Are assumptions from one component (e.g., "batch size is dynamic") honored by all consumers? + +3. **Check for logical contradictions**: Specifically look for: + - **Fixed-size assumptions vs dynamic-size reality**: Does the code require exact batch alignment when the engine supports variable sizes? Does it pad, truncate, or drop data to fit a fixed size? + - **Loop scoping bugs**: Are accumulators (lists, counters) reset at the right point? Does the last iteration flush remaining data? Are results from inside the loop duplicated outside? + - **Wasted computation**: Is the system doing redundant work (e.g., duplicating frames to fill a batch, processing the same data twice)? + - **Silent data loss**: Are partial batches, remaining frames, or edge-case inputs silently dropped instead of processed? + - **Documentation drift**: Does the architecture doc describe components or patterns (e.g., "msgpack serialization") that are actually dead in the code? + +4. **Classify each finding** as: + - **Logic bug**: Incorrect behavior (data loss, double-processing) + - **Performance waste**: Correct but inefficient (unnecessary padding, redundant inference) + - **Design contradiction**: Code assumes X but system needs Y (fixed vs dynamic batch) + - **Documentation drift**: Docs describe something the code doesn't do + +Write findings to `RUN_DIR/discovery/logical_flow_analysis.md`. + +### 1d. Produce List of Changes + +From the component analysis, solution synthesis, and **logical flow analysis**, identify all issues that need refactoring: 1. Hardcoded values (paths, config, magic numbers) 2. Tight coupling between components @@ -97,6 +131,8 @@ From the component analysis and solution synthesis, identify all issues that nee 7. Testability blockers (code that cannot be exercised in isolation) 8. Security concerns 9. Performance bottlenecks +10. **Logical flow contradictions** (from step 1c) +11. **Silent data loss or wasted computation** (from step 1c) Write `RUN_DIR/list-of-changes.md` using `templates/list-of-changes.md` format: - Set **Mode**: `automatic` @@ -112,6 +148,8 @@ Write all discovery artifacts to RUN_DIR. - [ ] Every referenced file in list-of-changes.md exists in the codebase - [ ] Each change entry has file paths, problem, change description, risk, and dependencies - [ ] Component documentation covers all areas affected by the changes +- [ ] **Logical flow analysis completed**: every documented business flow traced through code, contradictions identified +- [ ] **No silent data loss**: loop boundaries, partial batches, and edge cases checked for all processing flows - [ ] In guided mode: all input file entries are validated or flagged - [ ] In automatic mode: solution description covers all components - [ ] Mermaid diagrams are syntactically correct diff --git a/.gitignore b/.gitignore index 3dbc949..cb4d99e 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,6 @@ e2e/results/ e2e/logs/ !e2e/results/.gitkeep !e2e/logs/.gitkeep + +# Runtime logs +Logs/ diff --git a/Dockerfile b/Dockerfile index b170b0d..c72a349 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,5 +5,6 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . RUN python setup.py build_ext --inplace +ENV PYTHONPATH=/app/src EXPOSE 8080 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/Dockerfile.gpu b/Dockerfile.gpu index ab43cb6..e8754e7 100644 --- a/Dockerfile.gpu +++ b/Dockerfile.gpu @@ -5,5 +5,6 @@ COPY requirements.txt requirements-gpu.txt ./ RUN pip3 install --no-cache-dir -r requirements-gpu.txt COPY . . RUN python3 setup.py build_ext --inplace +ENV PYTHONPATH=/app/src EXPOSE 8080 CMD ["python3", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/_docs/02_document/architecture.md b/_docs/02_document/architecture.md index cf08d9e..9bc412b 100644 --- a/_docs/02_document/architecture.md +++ b/_docs/02_document/architecture.md @@ -25,7 +25,6 @@ | ML Runtime (CPU) | ONNX Runtime | 1.22.0 | Portable model format, CPU/CUDA provider fallback | | ML Runtime (GPU) | TensorRT + PyCUDA | 10.11.0 / 2025.1.1 | Maximum GPU inference performance | | Image Processing | OpenCV | 4.10.0 | Frame decoding, preprocessing, tiling | -| Serialization | msgpack | 1.1.1 | Compact binary serialization for annotations and configs | | HTTP Client | requests | 2.32.4 | Synchronous HTTP to Loader and Annotations services | | Logging | loguru | 0.7.3 | Structured file + console logging | | GPU Monitoring | pynvml | 12.0.0 | GPU detection, capability checks, memory queries | diff --git a/_docs/02_tasks/backlog/AZ-172_distributed_architecture_adaptation.md b/_docs/02_tasks/backlog/AZ-172_distributed_architecture_adaptation.md new file mode 100644 index 0000000..d2f33d9 --- /dev/null +++ b/_docs/02_tasks/backlog/AZ-172_distributed_architecture_adaptation.md @@ -0,0 +1,107 @@ +# Distributed Architecture Adaptation + +**Task**: AZ-172_distributed_architecture_adaptation +**Name**: Adapt detections module for distributed architecture: stream-based input & DB-driven AI config +**Description**: Replace the co-located file-path-based detection flow with stream-based input and DB-driven configuration, enabling UI to run on a separate device from the detections API. +**Complexity**: 5 points +**Dependencies**: Annotations service (C# backend) needs endpoints for per-user AI config and Media management +**Component**: Architecture +**Jira**: AZ-172 + +## Problem + +The detections module assumes co-located deployment (same machine as the WPF UI). The UI sends local file paths, and inference reads files directly from disk: + +- `inference.pyx` → `_process_video()` opens local video via `cv2.VideoCapture(video_name)` +- `inference.pyx` → `_process_images()` reads local images via `cv2.imread(path)` +- `ai_config.pyx` has a `paths: list[str]` field carrying local filesystem paths +- `AIRecognitionConfig` is passed from UI as a dict (via the `config_dict` parameter in `run_detect`) + +In the new distributed architecture, UI runs on a separate device (laptop, tablet, phone). The detections module is a standalone API on a different device. Local file paths are meaningless. + +## Outcome + +- Video detection works with streamed input (no local file paths required) +- Video is simultaneously saved to disk and processed frame-by-frame +- Image detection works with uploaded bytes (no local file paths required) +- AIRecognitionConfig is fetched from DB by userId, not passed from UI +- Media table records created on upload with correct XxHash64 Id, path, type, status +- Old path-based code removed + +## Subtasks + +| Jira | Summary | Points | +|------|---------|--------| +| AZ-173 | Replace path-based `run_detect` with stream-based API in `inference.pyx` | 3 | +| AZ-174 | Fetch AIRecognitionConfig from DB by userId instead of UI-passed config | 2 | +| AZ-175 | Integrate Media table: create record on upload, store file, track status | 2 | +| AZ-176 | Clean up obsolete path-based code and old methods | 1 | + +## Acceptance Criteria + +**AC-1: Stream-based video detection** +Given a video is uploaded via HTTP to the detection API +When the detections module processes it +Then frames are decoded and run through inference without requiring a local file path from the caller + +**AC-2: Concurrent write and detect for video** +Given a video stream is being received +When the detection module processes it +Then the stream is simultaneously written to persistent storage AND processed frame-by-frame for detection + +**AC-3: Stream-based image detection** +Given an image is uploaded via HTTP to the detection API +When the detections module processes it +Then the image bytes are decoded and run through inference without requiring a local file path + +**AC-4: DB-driven AI config** +Given a detection request arrives with a userId (from JWT) +When the detection module needs AIRecognitionConfig +Then it fetches AIRecognitionSettings + CameraSettings from the DB via the annotations service, not from the request payload + +**AC-5: Default config on user creation** +Given a new user is created in the system +When their account is provisioned +Then default AIRecognitionSettings and CameraSettings rows are created for that user + +**AC-6: Media record lifecycle** +Given a file is uploaded for detection +When the upload is received +Then a Media record is created (XxHash64 Id, Name, Path, MediaType, UserId) and MediaStatus transitions through New → AIProcessing → AIProcessed (or Error) + +**AC-7: Old code removed** +Given the refactoring is complete +When the codebase is reviewed +Then no references to `paths` in AIRecognitionConfig, no `cv2.VideoCapture(local_path)`, no `cv2.imread(local_path)`, and no `is_video(filepath)` remain + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/inference.pyx` | Modified | Replace `run_detect` with stream-based methods; remove path iteration | +| `src/ai_config.pxd` | Modified | Remove `paths` field | +| `src/ai_config.pyx` | Modified | Remove `paths` field; adapt `from_dict` | +| `src/main.py` | Modified | Fetch config from DB; handle Media records; adapt endpoints | +| `src/loader_http_client.pyx` | Modified | Add method to fetch user AI config from annotations service | + +## Technical Notes + +- `cv2.VideoCapture` can read from a named pipe or a file being appended to. Alternative: feed frames via a queue from the HTTP upload handler, or use PyAV for direct byte-stream decoding +- The annotations service (C# backend) owns the DB. Config retrieval requires API endpoints on that service +- XxHash64 ID generation algorithm is documented in `_docs/00_database_schema.md` +- Token management (JWT refresh) is already implemented in `main.py` via `TokenManager` +- DB tables `AIRecognitionSettings` and `CameraSettings` exist in schema but are not yet linked to `Users`; need FK or join table + +## Risks & Mitigation + +**Risk 1: Concurrent write + read of video file** +- *Risk*: `cv2.VideoCapture` may fail or stall reading an incomplete file +- *Mitigation*: Use a frame queue pipeline (one thread writes, another reads) or PyAV for byte-stream decoding + +**Risk 2: Annotations service API dependency** +- *Risk*: New endpoints needed on the C# backend for config retrieval and Media management +- *Mitigation*: Define API contract upfront; detections module can use fallback defaults if service is unreachable + +**Risk 3: Config-to-User linking not yet in DB** +- *Risk*: `AIRecognitionSettings` and `CameraSettings` tables have no FK to `Users` +- *Mitigation*: Add `UserId` FK or create a `UserAIConfig` join table in the backend migration diff --git a/_docs/02_tasks/backlog/AZ-173_stream_based_run_detect.md b/_docs/02_tasks/backlog/AZ-173_stream_based_run_detect.md new file mode 100644 index 0000000..6c15155 --- /dev/null +++ b/_docs/02_tasks/backlog/AZ-173_stream_based_run_detect.md @@ -0,0 +1,65 @@ +# Stream-Based run_detect + +**Task**: AZ-173_stream_based_run_detect +**Name**: Replace path-based run_detect with stream-based API in inference.pyx +**Description**: Refactor `run_detect` in `inference.pyx` to accept media bytes/stream instead of a config dict with local file paths. Enable simultaneous disk write and frame-by-frame detection for video. +**Complexity**: 3 points +**Dependencies**: None (core change, other subtasks depend on this) +**Component**: Inference +**Jira**: AZ-173 +**Parent**: AZ-172 + +## Problem + +`run_detect` currently takes a `config_dict` containing `paths: list[str]` — local filesystem paths. It iterates over them, guesses media type via `mimetypes.guess_type`, and opens files with `cv2.VideoCapture` or `cv2.imread`. This doesn't work when the caller is on a different device. + +## Current State + +```python +cpdef run_detect(self, dict config_dict, object annotation_callback, object status_callback=None): + ai_config = AIRecognitionConfig.from_dict(config_dict) + for p in ai_config.paths: + if self.is_video(p): videos.append(p) + else: images.append(p) + self._process_images(ai_config, images) # cv2.imread(path) + for v in videos: + self._process_video(ai_config, v) # cv2.VideoCapture(path) +``` + +## Target State + +Split into two dedicated methods: + +- `run_detect_video(self, stream, AIRecognitionConfig ai_config, str media_name, str save_path, ...)` — accepts a video stream/bytes, writes to `save_path` while decoding frames for detection +- `run_detect_image(self, bytes image_bytes, AIRecognitionConfig ai_config, str media_name, ...)` — accepts image bytes, decodes in memory + +Remove: +- `is_video(self, str filepath)` method +- `paths` iteration loop in `run_detect` +- Direct `cv2.VideoCapture(local_path)` and `cv2.imread(local_path)` calls + +## Video Stream Processing Options + +**Option A: Write-then-read** +Write entire upload to temp file, then open with `cv2.VideoCapture`. Simple but not real-time. + +**Option B: Concurrent pipe** +One thread writes incoming bytes to a file; another thread reads frames via `cv2.VideoCapture` on the growing file. Requires careful synchronization. + +**Option C: PyAV byte-stream decoding** +Use `av.open(io.BytesIO(data))` or a custom `av.InputContainer` to decode frames directly from bytes without file I/O. Most flexible for streaming. + +## Acceptance Criteria + +- [ ] Video can be processed from bytes/stream without a local file path from the caller +- [ ] Video is simultaneously written to disk and processed frame-by-frame +- [ ] Image can be processed from bytes without a local file path +- [ ] `_process_video_batch` and batch processing logic preserved (only input source changes) +- [ ] All existing detection logic (tile splitting, validation, tracking) unaffected + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/inference.pyx` | Modified | New stream-based methods, remove path-based `run_detect` | +| `src/main.py` | Modified | Adapt callers to new method signatures | diff --git a/_docs/02_tasks/backlog/AZ-174_db_driven_ai_config.md b/_docs/02_tasks/backlog/AZ-174_db_driven_ai_config.md new file mode 100644 index 0000000..0830975 --- /dev/null +++ b/_docs/02_tasks/backlog/AZ-174_db_driven_ai_config.md @@ -0,0 +1,76 @@ +# DB-Driven AI Config + +**Task**: AZ-174_db_driven_ai_config +**Name**: Fetch AIRecognitionConfig from DB by userId instead of UI-passed config +**Description**: Replace UI-passed AI configuration with database-driven config fetched by userId from the annotations service. +**Complexity**: 2 points +**Dependencies**: Annotations service needs new endpoint `GET /api/users/{userId}/ai-settings` +**Component**: Configuration +**Jira**: AZ-174 +**Parent**: AZ-172 + +## Problem + +`AIRecognitionConfig` is currently built from a dict passed by the caller (UI). In the distributed architecture, the UI should not own or pass detection configuration — it should be stored server-side per user. + +## Current State + +- `main.py`: `AIConfigDto` Pydantic model with hardcoded defaults, passed as `config_dict` +- `ai_config.pyx`: `AIRecognitionConfig.from_dict(data)` builds from dict with defaults +- Camera settings (`altitude`, `focal_length`, `sensor_width`) baked into the config DTO +- No DB interaction for config + +## Target State + +- Extract userId from JWT (already parsed in `TokenManager._decode_exp`) +- Call annotations service: `GET /api/users/{userId}/ai-settings` +- Response contains merged `AIRecognitionSettings` + `CameraSettings` fields +- Build `AIRecognitionConfig` from the API response +- Remove `AIConfigDto` from `main.py` (or keep as optional override for testing) +- Remove `paths` field from `AIRecognitionConfig` entirely + +## DB Tables (from schema) + +**AIRecognitionSettings:** +- FramePeriodRecognition (default 4) +- FrameRecognitionSeconds (default 2) +- ProbabilityThreshold (default 0.25) +- TrackingDistanceConfidence +- TrackingProbabilityIncrease +- TrackingIntersectionThreshold +- ModelBatchSize +- BigImageTileOverlapPercent + +**CameraSettings:** +- Altitude (default 400m) +- FocalLength (default 24mm) +- SensorWidth (default 23.5mm) + +**Linking:** These tables currently have no FK to Users. The backend needs either: +- Add `UserId` FK to both tables, or +- Create a `UserAIConfig` join table referencing both + +## Backend Dependency + +The annotations C# service needs: +1. New endpoint: `GET /api/users/{userId}/ai-settings` returning merged config +2. On user creation: seed default `AIRecognitionSettings` + `CameraSettings` rows +3. Optional: `PUT /api/users/{userId}/ai-settings` for user to update their config + +## Acceptance Criteria + +- [ ] Detection endpoint extracts userId from JWT +- [ ] AIRecognitionConfig is fetched from annotations service by userId +- [ ] Fallback to sensible defaults if service is unreachable +- [ ] `paths` field removed from `AIRecognitionConfig` +- [ ] Camera settings come from DB, not request payload + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/main.py` | Modified | Fetch config from annotations service via HTTP | +| `src/ai_config.pxd` | Modified | Remove `paths` field | +| `src/ai_config.pyx` | Modified | Remove `paths` from `__init__` and `from_dict` | +| `src/loader_http_client.pyx` | Modified | Add method to fetch user AI config | +| `src/loader_http_client.pxd` | Modified | Declare new method | diff --git a/_docs/02_tasks/backlog/AZ-175_media_table_integration.md b/_docs/02_tasks/backlog/AZ-175_media_table_integration.md new file mode 100644 index 0000000..dbf39cd --- /dev/null +++ b/_docs/02_tasks/backlog/AZ-175_media_table_integration.md @@ -0,0 +1,73 @@ +# Media Table Integration + +**Task**: AZ-175_media_table_integration +**Name**: Integrate Media table: create record on upload, store file, track status +**Description**: When a file is uploaded to the detections API, create a Media record in the DB, store the file at the proper path, and update MediaStatus throughout processing. +**Complexity**: 2 points +**Dependencies**: Annotations service needs Media CRUD endpoints +**Component**: Media Management +**Jira**: AZ-175 +**Parent**: AZ-172 + +## Problem + +Currently, uploaded files are written to temp files, processed, and deleted. No `Media` record is created in the database. File persistence and status tracking are missing. + +## Current State + +- `/detect`: writes upload to `tempfile.NamedTemporaryFile`, processes, deletes via `os.unlink` +- `/detect/{media_id}`: accepts a media_id parameter but doesn't create or manage Media records +- No XxHash64 ID generation in the detections module +- No file storage to persistent paths + +## Target State + +### On Upload + +1. Receive file bytes from HTTP upload +2. Compute XxHash64 of file content using the sampling algorithm +3. Determine MediaType from file extension (Video or Image) +4. Store file at proper path (from DirectorySettings: VideosDir or ImagesDir) +5. Create Media record via annotations service: `POST /api/media` + - Id: XxHash64 hex string + - Name: original filename + - Path: storage path + - MediaType: Video|Image + - MediaStatus: New (1) + - UserId: from JWT + +### During Processing + +6. Update MediaStatus to AIProcessing (2) via `PUT /api/media/{id}/status` +7. Run detection (stream-based per AZ-173) +8. Update MediaStatus to AIProcessed (3) on success, or Error (6) on failure + +## XxHash64 Sampling Algorithm + +``` +For files >= 3072 bytes: + Input = file_size_as_8_bytes + first_1024_bytes + middle_1024_bytes + last_1024_bytes + Output = XxHash64(input) as hex string + +For files < 3072 bytes: + Input = file_size_as_8_bytes + entire_file_content + Output = XxHash64(input) as hex string +``` + +Virtual hashes (in-memory only) prefixed with "V". + +## Acceptance Criteria + +- [ ] XxHash64 ID computed correctly using the sampling algorithm +- [ ] Media record created in DB on upload with correct fields +- [ ] File stored at proper persistent path (not temp) +- [ ] MediaStatus transitions: New → AIProcessing → AIProcessed (or Error) +- [ ] UserId correctly extracted from JWT and associated with Media record + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/main.py` | Modified | Upload handling, Media API calls, status updates | +| `src/media_hash.py` | New | XxHash64 sampling hash utility | +| `requirements.txt` | Modified | Add `xxhash` library if not present | diff --git a/_docs/02_tasks/backlog/AZ-176_cleanup_obsolete_path_code.md b/_docs/02_tasks/backlog/AZ-176_cleanup_obsolete_path_code.md new file mode 100644 index 0000000..341e935 --- /dev/null +++ b/_docs/02_tasks/backlog/AZ-176_cleanup_obsolete_path_code.md @@ -0,0 +1,65 @@ +# Cleanup Obsolete Path-Based Code + +**Task**: AZ-176_cleanup_obsolete_path_code +**Name**: Clean up obsolete path-based code and old methods +**Description**: Remove all code that relies on the old co-located architecture where the UI sends local file paths to the detection module. +**Complexity**: 1 point +**Dependencies**: AZ-173 (stream-based run_detect), AZ-174 (DB-driven config) +**Component**: Cleanup +**Jira**: AZ-176 +**Parent**: AZ-172 + +## Problem + +After implementing stream-based detection and DB-driven config, the old path-based code becomes dead code. It must be removed to avoid confusion and maintenance burden. + +## Items to Remove + +### `inference.pyx` + +| Item | Reason | +|------|--------| +| `is_video(self, str filepath)` | Media type comes from upload metadata, not filesystem guessing | +| `for p in ai_config.paths: ...` loop in `run_detect` | Replaced by stream-based dispatch | +| `cv2.VideoCapture(video_name)` with local path arg | Replaced by stream-based video processing | +| `cv2.imread(path)` with local path arg | Replaced by bytes-based image processing | +| Old `run_detect` signature (if fully replaced) | Replaced by `run_detect_video` / `run_detect_image` | + +### `ai_config.pxd` + +| Item | Reason | +|------|--------| +| `cdef public list[str] paths` | Paths no longer part of config | + +### `ai_config.pyx` + +| Item | Reason | +|------|--------| +| `paths` parameter in `__init__` | Paths no longer part of config | +| `self.paths = paths` assignment | Paths no longer part of config | +| `data.get("paths", [])` in `from_dict` | Paths no longer part of config | +| `paths: {self.paths}` in `__str__` | Paths no longer part of config | + +### `main.py` + +| Item | Reason | +|------|--------| +| `AIConfigDto.paths: list[str]` field | Paths no longer sent by caller | +| `config_dict["paths"] = [tmp.name]` in `/detect` | Temp file path injection no longer needed | + +## Acceptance Criteria + +- [ ] No references to `paths` in `AIRecognitionConfig` or its Pydantic DTO +- [ ] No `cv2.VideoCapture(local_path)` or `cv2.imread(local_path)` calls remain +- [ ] No `is_video(filepath)` method remains +- [ ] All tests pass after removal +- [ ] No dead imports left behind + +## File Changes + +| File | Action | Description | +|------|--------|-------------| +| `src/inference.pyx` | Modified | Remove old methods and path-based logic | +| `src/ai_config.pxd` | Modified | Remove `paths` field declaration | +| `src/ai_config.pyx` | Modified | Remove `paths` from init, from_dict, __str__ | +| `src/main.py` | Modified | Remove `AIConfigDto.paths`, path injection | diff --git a/_docs/04_refactoring/01-code-cleanup/baseline_metrics.md b/_docs/04_refactoring/01-code-cleanup/baseline_metrics.md new file mode 100644 index 0000000..e21f7f3 --- /dev/null +++ b/_docs/04_refactoring/01-code-cleanup/baseline_metrics.md @@ -0,0 +1,52 @@ +# Baseline Metrics + +**Run**: 01-code-cleanup +**Date**: 2026-03-30 + +## Code Metrics + +| Metric | Value | +|--------|-------| +| Source LOC (pyx + pxd + py) | 1,714 | +| Test LOC (e2e + mocks) | 1,238 | +| Source files | 22 (.pyx: 10, .pxd: 9, .py: 3) | +| Test files | 10 | +| Dependencies (requirements.txt) | 11 packages | +| Dead code items identified | 20 | + +## Test Suite + +| Metric | Value | +|--------|-------| +| Total tests | 23 | +| Passing | 23 | +| Failing | 0 | +| Skipped | 0 | +| Execution time | 11.93s | + +## Functionality Inventory + +| Endpoint | Method | Coverage | Status | +|----------|--------|----------|--------| +| /health | GET | Covered | Working | +| /detect | POST | Covered | Working | +| /detect/{media_id} | POST | Covered | Working | +| /detect/stream | GET | Covered | Working | + +## File Structure (pre-refactoring) + +All source code lives in the repository root — no `src/` separation: +- Root: main.py, setup.py, 8x .pyx, 7x .pxd, classes.json +- engines/: 3x .pyx, 4x .pxd, __init__.py, __init__.pxd +- e2e/: tests, mocks, fixtures, config + +## Dead Code Inventory + +| Category | Count | Files | +|----------|-------|-------| +| Unused methods | 4 | serialize() x2, from_msgpack(), stop() | +| Unused fields | 3 | file_data, model_batch_size, annotation_name | +| Unused constants | 5 | CONFIG_FILE, QUEUE_CONFIG_FILENAME, CDN_CONFIG, SMALL_SIZE_KB, QUEUE_MAXSIZE | +| Orphaned declarations | 3 | COMMANDS_QUEUE, ANNOTATIONS_QUEUE, weather enum PXD | +| Dead imports | 4 | msgpack x3, typing/numpy in pxd | +| Empty files | 1 | engines/__init__.pxd | diff --git a/_docs/04_refactoring/01-code-cleanup/discovery/logical_flow_analysis.md b/_docs/04_refactoring/01-code-cleanup/discovery/logical_flow_analysis.md new file mode 100644 index 0000000..9eba171 --- /dev/null +++ b/_docs/04_refactoring/01-code-cleanup/discovery/logical_flow_analysis.md @@ -0,0 +1,193 @@ +# Logical Flow Analysis + +**Run**: 01-code-cleanup +**Date**: 2026-03-30 + +Each documented business flow (from `_docs/02_document/system-flows.md`) traced through actual code. Contradictions classified as: Logic Bug, Performance Waste, Design Contradiction, Documentation Drift. + +--- + +## F2: Single Image Detection (`detect_single_image`) + +### LF-01: Batch padding wastes compute (Performance Waste) + +**Documented**: Client uploads one image → preprocess → engine → postprocess → return detections. + +**Actual** (inference.pyx:261-264): +```python +batch_size = self.engine.get_batch_size() +frames = [frame] * batch_size # duplicate frame N times +input_blob = self.preprocess(frames) # preprocess N copies +outputs = self.engine.run(input_blob)# run inference on N copies +list_detections = self.postprocess(outputs, ai_config) +detections = list_detections[0] # use only first result +``` + +For TensorRT (batch_size=4): 4x the preprocessing, 4x the inference, 3/4 of results discarded. For CoreML (batch_size=1): no waste. For ONNX: depends on model's batch dimension. + +**Impact**: Up to 4x unnecessary GPU/CPU compute per single-image request. + +**Fix**: Engine should support running with fewer frames than max batch size. If the engine requires fixed batch, pad only at the engine boundary, not at the preprocessing level. + +--- + +## F3: Media Detection — Video Processing (`_process_video`) + +### LF-02: Last partial batch silently dropped (Logic Bug / Data Loss) + +**Documented** (system-flows.md F3): "loop For each media file → preprocess/batch → engine → postprocess" + +**Actual** (inference.pyx:297-340): +```python +while v_input.isOpened() and not self.stop_signal: + ret, frame = v_input.read() + if not ret or frame is None: + break + frame_count += 1 + if frame_count % ai_config.frame_period_recognition == 0: + batch_frames.append(frame) + batch_timestamps.append(...) + + if len(batch_frames) == self.engine.get_batch_size(): + # process batch + ... + batch_frames.clear() + batch_timestamps.clear() + +v_input.release() # loop ends +self.send_detection_status() +# batch_frames may still have 1..(batch_size-1) unprocessed frames — DROPPED +``` + +When the video ends, any remaining frames in `batch_frames` (fewer than `batch_size`) are silently lost. For batch_size=4 and frame_period=4: up to 3 sampled frames at the end of every video are never processed. + +**Impact**: Detections in the final seconds of every video are potentially missed. + +### LF-03: `split_list_extend` padding is unnecessary and harmful (Design Contradiction + Performance Waste) + +**Design intent**: With dynamic batch sizing (agreed upon during engine refactoring in Step 3), engines should accept variable-size inputs. + +**Actual** (inference.pyx:208-217): +```python +cdef split_list_extend(self, lst, chunk_size): + chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] + last_chunk = chunks[len(chunks) - 1] + if len(last_chunk) < chunk_size: + last_elem = last_chunk[len(last_chunk)-1] + while len(last_chunk) < chunk_size: + last_chunk.append(last_elem) + return chunks +``` + +This duplicates the last element to pad the final chunk to exactly `chunk_size`. Problems: +1. With dynamic batch sizing, this padding is completely unnecessary — just process the smaller batch +2. The duplicated frames go through full preprocessing and inference, wasting compute +3. The duplicated detections from padded frames are processed by `_process_images_inner` and may emit duplicate annotations (the dedup logic only catches tile overlaps, not frame-level duplicates from padding) + +**Impact**: Unnecessary compute + potential duplicate detections from padded frames. + +### LF-04: Fixed batch gate `==` should be `>=` or removed entirely (Design Contradiction) + +**Actual** (inference.pyx:307): +```python +if len(batch_frames) == self.engine.get_batch_size(): +``` + +This strict equality means: only process when the batch is **exactly** full. Combined with LF-02 (no flush), remaining frames are dropped. With dynamic batch support, this gate is unnecessary — process frames as they accumulate, or at minimum flush remaining frames after the loop. + +--- + +## F3: Media Detection — Image Processing (`_process_images`) + +### LF-05: Non-last small images silently dropped (Logic Bug / Data Loss) + +**Actual** (inference.pyx:349-379): +```python +for path in image_paths: + frame_data = [] # ← RESET each iteration + frame = cv2.imread(path) + ... + frame_data.append(...) # or .extend(...) for tiled images + + if len(frame_data) > self.engine.get_batch_size(): + for chunk in self.split_list_extend(frame_data, ...): + self._process_images_inner(...) + self.send_detection_status() + +# Outside loop: only the LAST image's frame_data survives +for chunk in self.split_list_extend(frame_data, ...): + self._process_images_inner(...) +self.send_detection_status() +``` + +Walk through with 3 images [A(small), B(small), C(small)] and batch_size=4: +- Iteration A: `frame_data = [(A, ...)]`. `1 > 4` → False. Not processed. +- Iteration B: `frame_data = [(B, ...)]` (A lost!). `1 > 4` → False. Not processed. +- Iteration C: `frame_data = [(C, ...)]` (B lost!). `1 > 4` → False. Not processed. +- After loop: `frame_data = [(C, ...)]` → processed. Only C was ever detected. + +**Impact**: In multi-image media detection, all images except the last are silently dropped when each is smaller than the batch size. This is a critical data loss bug. + +### LF-06: Large images double-processed (Logic Bug) + +With image D producing 10 tiles and batch_size=4: +- Inside loop: `10 > 4` → True. All 10 tiles processed (3 chunks: 4+4+4 with last padded). `send_detection_status()` called. +- After loop: `frame_data` still contains all 10 tiles. Processed again (3 more chunks). `send_detection_status()` called again. + +**Impact**: Large images get inference run twice, producing duplicate detection events. + +### LF-07: `frame.shape` before None check (Logic Bug / Crash) + +**Actual** (inference.pyx:355-358): +```python +frame = cv2.imread(path) +img_h, img_w, _ = frame.shape # crashes if frame is None +if frame is None: # dead code — never reached + continue +``` + +**Impact**: Corrupt or missing image file crashes the entire detection pipeline instead of gracefully skipping. + +--- + +## Cross-Cutting: Batch Size Design Contradiction + +### LF-08: Entire pipeline assumes fixed batch size (Design Contradiction) + +The engine polymorphism (Step 3 refactoring) established that different engines have different batch sizes: TensorRT=4, CoreML=1, ONNX=variable. But the processing pipeline treats batch size as a fixed gate: + +| Location | Pattern | Problem | +|----------|---------|---------| +| `detect_single_image:262` | `[frame] * batch_size` | Pads single frame to batch size | +| `_process_video:307` | `== batch_size` | Only processes exact-full batches | +| `_process_images:372` | `> batch_size` | Only processes when exceeding batch | +| `split_list_extend` | Pads last chunk | Duplicates frames to fill batch | + +All engines already accept the full batch as a numpy blob. The fix is to make the pipeline batch-agnostic: collect frames, process when you have enough OR when the stream ends. Never pad with duplicates. + +--- + +## Architecture Documentation Drift + +### LF-09: Architecture doc lists msgpack as active technology (Documentation Drift) + +**Architecture.md** § Technology Stack: +> "Serialization | msgpack | 1.1.1 | Compact binary serialization for annotations and configs" + +**Reality**: All `serialize()` and `from_msgpack()` methods are dead code. The system uses Pydantic JSON for API responses and `from_dict()` for config parsing. msgpack is not used by any live code path. + +--- + +## Summary Table + +| ID | Flow | Type | Severity | Description | +|----|------|------|----------|-------------| +| LF-01 | F2 | Performance Waste | Medium | Single image duplicated to fill batch — up to 4x wasted compute | +| LF-02 | F3/Video | Data Loss | High | Last partial video batch silently dropped | +| LF-03 | F3/Both | Design Contradiction + Perf | Medium | split_list_extend pads with duplicates instead of processing smaller batch | +| LF-04 | F3/Video | Design Contradiction | High | Fixed `== batch_size` gate prevents partial batch processing | +| LF-05 | F3/Images | Data Loss | Critical | Non-last small images silently dropped in multi-image processing | +| LF-06 | F3/Images | Logic Bug | High | Large images processed twice (inside loop + after loop) | +| LF-07 | F3/Images | Crash | High | frame.shape before None check | +| LF-08 | Cross-cutting | Design Contradiction | High | Entire pipeline assumes fixed batch size vs dynamic engine reality | +| LF-09 | Documentation | Drift | Low | Architecture lists msgpack as active; it's dead | diff --git a/_docs/04_refactoring/01-code-cleanup/list-of-changes.md b/_docs/04_refactoring/01-code-cleanup/list-of-changes.md new file mode 100644 index 0000000..085a595 --- /dev/null +++ b/_docs/04_refactoring/01-code-cleanup/list-of-changes.md @@ -0,0 +1,132 @@ +# List of Changes + +**Run**: 01-code-cleanup +**Mode**: automatic +**Source**: self-discovered +**Date**: 2026-03-30 + +## Summary + +Two tiers: (1) Fix critical logical flow bugs — batch handling, data loss, crash prevention, and remove the fixed-batch-size assumption that contradicts the dynamic engine design. (2) Dead code cleanup, configurable paths, HTTP timeouts, and move source to `src/`. + +## Changes + +### C01: Move source code to `src/` directory +- **File(s)**: main.py, inference.pyx, constants_inf.pyx, constants_inf.pxd, annotation.pyx, annotation.pxd, ai_config.pyx, ai_config.pxd, ai_availability_status.pyx, ai_availability_status.pxd, loader_http_client.pyx, loader_http_client.pxd, engines/, setup.py, run-tests.sh, e2e/run_local.sh, e2e/docker-compose.test.yml +- **Problem**: All source code is in the repository root, mixed with config, docs, and test infrastructure. +- **Change**: Move all application source files into `src/`. Update setup.py extension paths, run-tests.sh, e2e scripts, and docker-compose volumes. Keep setup.py, requirements, and tests at root. +- **Rationale**: Project convention requires source under `src/`. +- **Risk**: medium +- **Dependencies**: None (do first — all other changes reference new paths) + +### C02: Fix `_process_images` — accumulate all images, process once (LF-05, LF-06) +- **File(s)**: src/inference.pyx (`_process_images`) +- **Problem**: `frame_data = []` is reset inside the per-image loop, so only the last image's data survives to the outer processing loop. Non-last small images are silently dropped. Large images that exceed batch_size inside the loop are also re-processed outside the loop (double-processing). +- **Change**: Accumulate frame_data across ALL images (move reset before the loop). Process all accumulated data once after the loop. Remove the inner batch-processing + status call. Each image's tiles/frames should carry their own ground_sampling_distance so mixed-GSD images process correctly. +- **Rationale**: Critical data loss — multi-image requests silently drop all images except the last. +- **Risk**: medium +- **Dependencies**: C01, C04 + +### C03: Fix `_process_video` — flush remaining frames after loop (LF-02, LF-04) +- **File(s)**: src/inference.pyx (`_process_video`) +- **Problem**: The `if len(batch_frames) == self.engine.get_batch_size()` gate means frames are only processed in exact-batch-size groups. When the video ends with a partial batch (1..batch_size-1 frames), those frames are silently dropped. Detections at the end of every video are potentially missed. +- **Change**: After the video read loop, if `batch_frames` is non-empty, process the remaining frames as a partial batch (no padding). Change the `==` gate to `>=` as a safety measure, though with the flush it's not strictly needed. +- **Rationale**: Silent data loss — last frames of every video are dropped. +- **Risk**: medium +- **Dependencies**: C01, C04 + +### C04: Remove `split_list_extend` — replace with simple chunking without padding (LF-03, LF-08) +- **File(s)**: src/inference.pyx (`split_list_extend`, `_process_images`, `detect_single_image`) +- **Problem**: `split_list_extend` pads the last chunk by duplicating its final element to fill `batch_size`. This wastes compute (duplicate inference), may produce duplicate detections, and contradicts the dynamic batch design established in Step 3 (engine polymorphism). In `detect_single_image`, `[frame] * batch_size` pads a single frame to batch_size copies — same issue. +- **Change**: Replace `split_list_extend` with plain chunking (no padding). Last chunk keeps its natural size. In `detect_single_image`, pass a single-frame list. Engine `run()` and `preprocess()` must handle variable-size input — verify each engine supports this or add a minimal adapter. +- **Rationale**: Unnecessary compute (up to 4x for TensorRT single-image), potential duplicate detections from padding, contradicts dynamic batch design. +- **Risk**: high +- **Dependencies**: C01 + +### C05: Fix frame-is-None crash in `_process_images` (LF-07) +- **File(s)**: src/inference.pyx (`_process_images`) +- **Problem**: `frame.shape` is accessed before `frame is None` check. If `cv2.imread` fails, the pipeline crashes instead of skipping the file. +- **Change**: Move the None check before the shape access. +- **Rationale**: Crash prevention for missing/corrupt image files. +- **Risk**: low +- **Dependencies**: C01 + +### C06: Remove orphaned RabbitMQ declarations from constants_inf.pxd +- **File(s)**: src/constants_inf.pxd +- **Problem**: `QUEUE_MAXSIZE`, `COMMANDS_QUEUE`, `ANNOTATIONS_QUEUE` are declared but have no implementations. Remnants of previous RabbitMQ architecture. +- **Change**: Remove the three declarations and their comments. +- **Rationale**: Dead declarations mislead about system architecture. +- **Risk**: low +- **Dependencies**: C01 + +### C07: Remove unused constants from constants_inf +- **File(s)**: src/constants_inf.pxd, src/constants_inf.pyx +- **Problem**: `CONFIG_FILE` (with stale "zmq" comment), `QUEUE_CONFIG_FILENAME`, `CDN_CONFIG`, `SMALL_SIZE_KB` — defined but never referenced. +- **Change**: Remove all four from .pxd and .pyx. +- **Rationale**: Dead constants with misleading comments. +- **Risk**: low +- **Dependencies**: C01 + +### C08: Remove dead serialize/from_msgpack methods and msgpack imports +- **File(s)**: src/annotation.pyx, src/annotation.pxd, src/ai_availability_status.pyx, src/ai_availability_status.pxd, src/ai_config.pyx, src/ai_config.pxd +- **Problem**: `Annotation.serialize()`, `AIAvailabilityStatus.serialize()`, `AIRecognitionConfig.from_msgpack()` — all dead. Associated `import msgpack` / `from msgpack import unpackb` only serve these dead methods. +- **Change**: Remove all three methods from .pyx and .pxd files. Remove msgpack imports. +- **Rationale**: Legacy queue-era serialization with no callers. +- **Risk**: low +- **Dependencies**: C01 + +### C09: Remove unused fields (file_data, model_batch_size, annotation_name) +- **File(s)**: src/ai_config.pyx, src/ai_config.pxd, src/annotation.pyx, src/annotation.pxd, src/main.py +- **Problem**: `AIRecognitionConfig.file_data` populated but never read. `AIRecognitionConfig.model_batch_size` parsed but never used (engine owns batch size). `Detection.annotation_name` set but never read. +- **Change**: Remove field declarations from .pxd, remove from constructors and factory methods in .pyx. Remove `file_data` and `model_batch_size` from AIConfigDto in main.py. Remove annotation_name assignment loop in Annotation.__init__. +- **Rationale**: Dead fields that mislead about responsibilities. +- **Risk**: low +- **Dependencies**: C01, C08 + +### C10: Remove misc dead code (stop no-op, empty pxd, unused pxd imports) +- **File(s)**: src/loader_http_client.pyx, src/loader_http_client.pxd, src/engines/__init__.pxd, src/engines/inference_engine.pxd +- **Problem**: `LoaderHttpClient.stop()` is a no-op. `engines/__init__.pxd` is empty. `inference_engine.pxd` imports `List, Tuple` from typing and `numpy` — both unused. +- **Change**: Remove stop() from .pyx and .pxd. Delete empty __init__.pxd. Remove unused imports from inference_engine.pxd. +- **Rationale**: Dead code noise. +- **Risk**: low +- **Dependencies**: C01 + +### C11: Remove msgpack from requirements.txt +- **File(s)**: requirements.txt +- **Problem**: `msgpack==1.1.1` has no consumers after C08 removes all msgpack usage. +- **Change**: Remove from requirements.txt. +- **Rationale**: Unused dependency. +- **Risk**: low +- **Dependencies**: C08 + +### C12: Make classes.json path configurable via env var +- **File(s)**: src/constants_inf.pyx +- **Problem**: `open('classes.json')` is hardcoded, depends on CWD at import time. +- **Change**: Read from `os.environ.get("CLASSES_JSON_PATH", "classes.json")`. +- **Rationale**: Environment-appropriate configuration. +- **Risk**: low +- **Dependencies**: C01 + +### C13: Make log directory configurable via env var +- **File(s)**: src/constants_inf.pyx +- **Problem**: `sink="Logs/log_inference_..."` is hardcoded. +- **Change**: Read from `os.environ.get("LOG_DIR", "Logs")`. +- **Rationale**: Environment configurability. +- **Risk**: low +- **Dependencies**: C01 + +### C14: Add timeouts to LoaderHttpClient HTTP calls +- **File(s)**: src/loader_http_client.pyx +- **Problem**: No explicit timeout on `requests.post()` calls. Stalled loader hangs detections service. +- **Change**: Add `timeout=120` to load and upload calls. +- **Rationale**: Prevent service hangs. +- **Risk**: low +- **Dependencies**: C01 + +### C15: Update architecture doc — remove msgpack from tech stack (LF-09) +- **File(s)**: _docs/02_document/architecture.md +- **Problem**: Tech stack lists "msgpack | 1.1.1 | Compact binary serialization for annotations and configs" but msgpack is dead code after this refactoring. +- **Change**: Remove msgpack row from tech stack table. +- **Rationale**: Documentation accuracy. +- **Risk**: low +- **Dependencies**: C08, C11 diff --git a/_docs/_autopilot_state.md b/_docs/_autopilot_state.md index bfe220c..4ec616d 100644 --- a/_docs/_autopilot_state.md +++ b/_docs/_autopilot_state.md @@ -4,8 +4,8 @@ flow: existing-code step: 7 name: Refactor -status: not_started -sub_step: 0 +status: completed +sub_step: done retry_count: 0 ## Completed Steps @@ -18,6 +18,7 @@ retry_count: 0 | 4 | Decompose Tests | 2026-03-23 | 11 tasks (AZ-138..AZ-148), 35 complexity points, 3 batches. Phase 3 test data gate PASSED: 39/39 scenarios validated, 12 data files provided. | | 5 | Implement Tests | 2026-03-23 | 11 tasks implemented across 4 batches, 38 tests (2 skipped), all code reviews PASS_WITH_WARNINGS. Commits: 5418bd7, a469579, 861d4f0, f0e3737. | | 6 | Run Tests | 2026-03-30 | 23 passed, 0 failed, 0 skipped, 0 errors in 11.93s. Fixed: Cython __reduce_cython__ (clean rebuild), missing Pillow dep, relative MEDIA_DIR paths. Removed 14 dead/unreachable tests. Updated test-run skill to treat skips as blocking gate. | +| 7 | Refactor | 2026-03-31 | Engine-centric dynamic batch refactoring. Moved source to src/. Engine pipeline redesign: preprocess/postprocess/process_frames in base InferenceEngine, dynamic batching per engine (CoreML=1, TensorRT=GPU-calculated, ONNX=config). Fixed: video partial batch flush, image accumulation data loss, frame-is-None crash. Removed detect_single_image (POST /detect delegates to run_detect). Dead code: removed msgpack, serialize methods, unused constants/fields. Configurable classes.json + log paths, HTTP timeouts. 28 e2e tests pass. | ## Key Decisions - User chose to document existing codebase before proceeding @@ -35,12 +36,19 @@ retry_count: 0 - User confirmed dependency table and test data gate - Jira MCP auth skipped — tickets not transitioned to In Testing - Test run: removed 14 dead/unreachable tests (explicit @skip + runtime always-skip), added .c to .gitignore +- User chose to refactor (option A) — clean up legacy dead code +- User requested: move code to src/, thorough re-analysis, exhaustive refactoring list +- Refactoring round: 01-code-cleanup, automatic mode, 15 changes identified +- User feedback: analyze logical flow contradictions, not just static code. Updated refactor skill Phase 1 with logical flow analysis. +- User chose: split scope — engine refactoring as Step 7, architecture shift (streaming, DB config, media storage, Jetson) as Step 8 +- User chose: remove detect_single_image, POST /detect delegates to run_detect +- GPU memory fraction: 80% for inference, 20% buffer (Jetson 40% deferred to Step 8) ## Last Session -date: 2026-03-30 -ended_at: Step 6 completed, Step 7 (Refactor) next -reason: All 23 tests pass with zero skips -notes: Fixed Cython build (clean rebuild resolved __reduce_cython__ KeyError), installed missing Pillow, used absolute MEDIA_DIR. Service crash root-caused to CoreML thread-safety during concurrent requests (not a test issue). Updated test-run skill: skipped tests now require investigation like failures. +date: 2026-03-31 +ended_at: Step 7 complete — all 11 todos done, 28 e2e tests pass +reason: Refactoring complete +notes: Engine-centric dynamic batch refactoring implemented. Source moved to src/. InferenceEngine base class now owns preprocess/postprocess/process_frames with per-engine max_batch_size. CoreML overrides preprocess (direct PIL, no blob reversal) and postprocess. TensorRT calculates max_batch_size from GPU memory (80% fraction) with optimization profiles for dynamic batch. All logical flow bugs fixed (LF-01 through LF-09). Dead code removed (msgpack, serialize, unused constants). POST /detect unified through run_detect. Next: Step 8 (architecture shift — streaming media, DB-backed config, media storage, Jetson support). ## Blockers - none diff --git a/constants_inf.h b/constants_inf.h deleted file mode 100644 index 620b985..0000000 --- a/constants_inf.h +++ /dev/null @@ -1,55 +0,0 @@ -/* Generated by Cython 3.1.2 */ - -#ifndef __PYX_HAVE__constants_inf -#define __PYX_HAVE__constants_inf - -#include "Python.h" - -#ifndef __PYX_HAVE_API__constants_inf - -#ifdef CYTHON_EXTERN_C - #undef __PYX_EXTERN_C - #define __PYX_EXTERN_C CYTHON_EXTERN_C -#elif defined(__PYX_EXTERN_C) - #ifdef _MSC_VER - #pragma message ("Please do not define the '__PYX_EXTERN_C' macro externally. Use 'CYTHON_EXTERN_C' instead.") - #else - #warning Please do not define the '__PYX_EXTERN_C' macro externally. Use 'CYTHON_EXTERN_C' instead. - #endif -#else - #ifdef __cplusplus - #define __PYX_EXTERN_C extern "C" - #else - #define __PYX_EXTERN_C extern - #endif -#endif - -#ifndef DL_IMPORT - #define DL_IMPORT(_T) _T -#endif - -__PYX_EXTERN_C int TILE_DUPLICATE_CONFIDENCE_THRESHOLD; - -#endif /* !__PYX_HAVE_API__constants_inf */ - -/* WARNING: the interface of the module init function changed in CPython 3.5. */ -/* It now returns a PyModuleDef instance instead of a PyModule instance. */ - -/* WARNING: Use PyImport_AppendInittab("constants_inf", PyInit_constants_inf) instead of calling PyInit_constants_inf directly from Python 3.5 */ -PyMODINIT_FUNC PyInit_constants_inf(void); - -#if PY_VERSION_HEX >= 0x03050000 && (defined(__GNUC__) || defined(__clang__) || defined(_MSC_VER) || (defined(__cplusplus) && __cplusplus >= 201402L)) -#if defined(__cplusplus) && __cplusplus >= 201402L -[[deprecated("Use PyImport_AppendInittab(\"constants_inf\", PyInit_constants_inf) instead of calling PyInit_constants_inf directly.")]] inline -#elif defined(__GNUC__) || defined(__clang__) -__attribute__ ((__deprecated__("Use PyImport_AppendInittab(\"constants_inf\", PyInit_constants_inf) instead of calling PyInit_constants_inf directly."), __unused__)) __inline__ -#elif defined(_MSC_VER) -__declspec(deprecated("Use PyImport_AppendInittab(\"constants_inf\", PyInit_constants_inf) instead of calling PyInit_constants_inf directly.")) __inline -#endif -static PyObject* __PYX_WARN_IF_PyInit_constants_inf_INIT_CALLED(PyObject* res) { - return res; -} -#define PyInit_constants_inf() __PYX_WARN_IF_PyInit_constants_inf_INIT_CALLED(PyInit_constants_inf()) -#endif - -#endif /* !__PYX_HAVE__constants_inf */ diff --git a/e2e/run_local.sh b/e2e/run_local.sh index 4192952..d2fae56 100755 --- a/e2e/run_local.sh +++ b/e2e/run_local.sh @@ -105,6 +105,7 @@ echo "--- Starting detections service on :8080..." cd "$PROJECT_DIR" LOADER_URL="http://localhost:18080" \ ANNOTATIONS_URL="http://localhost:18081" \ + PYTHONPATH="$PROJECT_DIR/src" \ python -m uvicorn main:app --host 0.0.0.0 --port 8080 --workers 1 ) & PIDS+=($!) diff --git a/engines/__init__.pxd b/engines/__init__.pxd deleted file mode 100644 index e69de29..0000000 diff --git a/engines/inference_engine.pxd b/engines/inference_engine.pxd deleted file mode 100644 index 3ff040b..0000000 --- a/engines/inference_engine.pxd +++ /dev/null @@ -1,10 +0,0 @@ -from typing import List, Tuple -import numpy as np - - -cdef class InferenceEngine: - cdef public int batch_size - cdef public str engine_name - cdef tuple get_input_shape(self) # type: ignore - cdef int get_batch_size(self) # type: ignore - cdef run(self, input_data) # type: ignore diff --git a/engines/inference_engine.pyx b/engines/inference_engine.pyx deleted file mode 100644 index e30d56f..0000000 --- a/engines/inference_engine.pyx +++ /dev/null @@ -1,25 +0,0 @@ -cdef class InferenceEngine: - def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs): - self.batch_size = batch_size - self.engine_name = "onnx" - - @staticmethod - def get_engine_filename(): - return None - - @staticmethod - def get_source_filename(): - return None - - @staticmethod - def convert_from_source(bytes source_bytes): - return source_bytes - - cdef tuple get_input_shape(self): - raise NotImplementedError("Subclass must implement get_input_shape") - - cdef int get_batch_size(self): - return self.batch_size - - cdef run(self, input_data): - raise NotImplementedError("Subclass must implement run") diff --git a/requirements.txt b/requirements.txt index c614bb2..43f8105 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,3 @@ pynvml==12.0.0 requests==2.32.4 loguru==0.7.3 python-multipart -msgpack==1.1.1 diff --git a/run-tests.sh b/run-tests.sh index 3dad4ff..193ce87 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -10,7 +10,7 @@ DETECTIONS_PORT=8000 PIDS=() cleanup() { - for pid in "${PIDS[@]}"; do + for pid in "${PIDS[@]+"${PIDS[@]}"}"; do kill "$pid" 2>/dev/null || true done wait 2>/dev/null @@ -22,8 +22,9 @@ python setup.py build_ext --inplace for port in $LOADER_PORT $ANNOTATIONS_PORT $DETECTIONS_PORT; do if lsof -ti :"$port" >/dev/null 2>&1; then - echo "ERROR: port $port is already in use" >&2 - exit 1 + echo "Killing stale process on port $port ..." + lsof -ti :"$port" | xargs kill -9 2>/dev/null || true + sleep 1 fi done @@ -41,6 +42,7 @@ PIDS+=($!) echo "Starting detections service on :$DETECTIONS_PORT ..." LOADER_URL="http://localhost:$LOADER_PORT" \ ANNOTATIONS_URL="http://localhost:$ANNOTATIONS_PORT" \ +PYTHONPATH="$ROOT/src" \ python -m uvicorn main:app --host 0.0.0.0 --port "$DETECTIONS_PORT" \ --log-level warning >/dev/null 2>&1 & PIDS+=($!) diff --git a/setup.py b/setup.py index 118594d..ec15a41 100644 --- a/setup.py +++ b/setup.py @@ -2,30 +2,36 @@ from setuptools import setup, Extension from Cython.Build import cythonize import numpy as np +SRC = "src" +np_inc = [np.get_include(), SRC] + extensions = [ - Extension('constants_inf', ['constants_inf.pyx']), - Extension('ai_availability_status', ['ai_availability_status.pyx']), - Extension('annotation', ['annotation.pyx']), - Extension('ai_config', ['ai_config.pyx']), - Extension('loader_http_client', ['loader_http_client.pyx']), - Extension('engines.inference_engine', ['engines/inference_engine.pyx'], include_dirs=[np.get_include()]), - Extension('engines.onnx_engine', ['engines/onnx_engine.pyx'], include_dirs=[np.get_include()]), - Extension('engines.coreml_engine', ['engines/coreml_engine.pyx'], include_dirs=[np.get_include()]), - Extension('inference', ['inference.pyx'], include_dirs=[np.get_include()]), + Extension('constants_inf', [f'{SRC}/constants_inf.pyx'], include_dirs=[SRC]), + Extension('ai_availability_status', [f'{SRC}/ai_availability_status.pyx'], include_dirs=[SRC]), + Extension('annotation', [f'{SRC}/annotation.pyx'], include_dirs=[SRC]), + Extension('ai_config', [f'{SRC}/ai_config.pyx'], include_dirs=[SRC]), + Extension('loader_http_client', [f'{SRC}/loader_http_client.pyx'], include_dirs=[SRC]), + Extension('engines.inference_engine', [f'{SRC}/engines/inference_engine.pyx'], include_dirs=np_inc), + Extension('engines.onnx_engine', [f'{SRC}/engines/onnx_engine.pyx'], include_dirs=np_inc), + Extension('engines.coreml_engine', [f'{SRC}/engines/coreml_engine.pyx'], include_dirs=np_inc), + Extension('inference', [f'{SRC}/inference.pyx'], include_dirs=np_inc), ] try: import tensorrt # pyright: ignore[reportMissingImports] extensions.append( - Extension('engines.tensorrt_engine', ['engines/tensorrt_engine.pyx'], include_dirs=[np.get_include()]) + Extension('engines.tensorrt_engine', [f'{SRC}/engines/tensorrt_engine.pyx'], include_dirs=np_inc) ) except ImportError: pass setup( name="azaion.detections", + package_dir={"": SRC}, + packages=["engines"], ext_modules=cythonize( extensions, + include_path=[SRC], compiler_directives={ "language_level": 3, "emit_code_comments": False, diff --git a/ai_availability_status.pxd b/src/ai_availability_status.pxd similarity index 76% rename from ai_availability_status.pxd rename to src/ai_availability_status.pxd index c8ae4b5..0801e90 100644 --- a/ai_availability_status.pxd +++ b/src/ai_availability_status.pxd @@ -14,5 +14,4 @@ cdef class AIAvailabilityStatus: cdef str error_message cdef pymutex _lock - cdef bytes serialize(self) - cdef set_status(self, int status, str error_message=*) \ No newline at end of file + cdef set_status(self, int status, str error_message=*) diff --git a/ai_availability_status.pyx b/src/ai_availability_status.pyx similarity index 83% rename from ai_availability_status.pyx rename to src/ai_availability_status.pyx index f72931c..fba9d7d 100644 --- a/ai_availability_status.pyx +++ b/src/ai_availability_status.pyx @@ -1,6 +1,5 @@ cimport cython cimport constants_inf -import msgpack AIStatus2Text = { AIAvailabilityEnum.NONE: "None", @@ -23,13 +22,6 @@ cdef class AIAvailabilityStatus: error_text = self.error_message if self.error_message else "" return f"{status_text} {error_text}" - cdef bytes serialize(self): - with self._lock: - return msgpack.packb({ - "s": self.status, - "m": self.error_message - }) - cdef set_status(self, int status, str error_message=""): log_message = "" with self._lock: @@ -42,4 +34,4 @@ cdef class AIAvailabilityStatus: if error_message: constants_inf.logerror(error_message) else: - constants_inf.log(log_message) \ No newline at end of file + constants_inf.log(log_message) diff --git a/ai_config.pxd b/src/ai_config.pxd similarity index 88% rename from ai_config.pxd rename to src/ai_config.pxd index a86ea33..399444a 100644 --- a/ai_config.pxd +++ b/src/ai_config.pxd @@ -10,7 +10,6 @@ cdef class AIRecognitionConfig: cdef public int big_image_tile_overlap_percent - cdef public bytes file_data cdef public list[str] paths cdef public int model_batch_size @@ -18,8 +17,5 @@ cdef class AIRecognitionConfig: cdef public double focal_length cdef public double sensor_width - @staticmethod - cdef from_msgpack(bytes data) - @staticmethod cdef AIRecognitionConfig from_dict(dict data) diff --git a/ai_config.pyx b/src/ai_config.pyx similarity index 76% rename from ai_config.pyx rename to src/ai_config.pyx index b083449..14f6cc0 100644 --- a/ai_config.pyx +++ b/src/ai_config.pyx @@ -1,5 +1,3 @@ -from msgpack import unpackb - cdef class AIRecognitionConfig: def __init__(self, frame_period_recognition, @@ -9,13 +7,9 @@ cdef class AIRecognitionConfig: tracking_distance_confidence, tracking_probability_increase, tracking_intersection_threshold, - - file_data, paths, model_batch_size, - big_image_tile_overlap_percent, - altitude, focal_length, sensor_width @@ -28,7 +22,6 @@ cdef class AIRecognitionConfig: self.tracking_probability_increase = tracking_probability_increase self.tracking_intersection_threshold = tracking_intersection_threshold - self.file_data = file_data self.paths = paths self.model_batch_size = model_batch_size @@ -51,29 +44,6 @@ cdef class AIRecognitionConfig: f'sensor_width: {self.sensor_width}' ) - @staticmethod - cdef from_msgpack(bytes data): - unpacked = unpackb(data, strict_map_key=False) - return AIRecognitionConfig( - unpacked.get("f_pr", 0), - unpacked.get("f_rs", 0.0), - unpacked.get("pt", 0.0), - - unpacked.get("t_dc", 0.0), - unpacked.get("t_pi", 0.0), - unpacked.get("t_it", 0.0), - - unpacked.get("d", b''), - unpacked.get("p", []), - unpacked.get("m_bs"), - - unpacked.get("ov_p", 20), - - unpacked.get("cam_a", 400), - unpacked.get("cam_fl", 24), - unpacked.get("cam_sw", 23.5) - ) - @staticmethod cdef AIRecognitionConfig from_dict(dict data): return AIRecognitionConfig( @@ -85,13 +55,12 @@ cdef class AIRecognitionConfig: data.get("tracking_probability_increase", 0.0), data.get("tracking_intersection_threshold", 0.6), - data.get("file_data", b''), data.get("paths", []), - data.get("model_batch_size", 1), + data.get("model_batch_size", 8), data.get("big_image_tile_overlap_percent", 20), data.get("altitude", 400), data.get("focal_length", 24), data.get("sensor_width", 23.5) - ) \ No newline at end of file + ) diff --git a/annotation.pxd b/src/annotation.pxd similarity index 83% rename from annotation.pxd rename to src/annotation.pxd index 2c19d9b..795220b 100644 --- a/annotation.pxd +++ b/src/annotation.pxd @@ -1,6 +1,5 @@ cdef class Detection: cdef public double x, y, w, h, confidence - cdef public str annotation_name cdef public int cls cdef bint overlaps(self, Detection det2, float confidence_threshold) @@ -11,5 +10,3 @@ cdef class Annotation: cdef long time cdef public list[Detection] detections cdef public bytes image - - cdef bytes serialize(self) diff --git a/annotation.pyx b/src/annotation.pyx similarity index 72% rename from annotation.pyx rename to src/annotation.pyx index bd73537..c43b5c1 100644 --- a/annotation.pyx +++ b/src/annotation.pyx @@ -1,9 +1,7 @@ -import msgpack cimport constants_inf cdef class Detection: def __init__(self, double x, double y, double w, double h, int cls, double confidence): - self.annotation_name = "" self.x = x self.y = y self.w = w @@ -39,8 +37,6 @@ cdef class Annotation: self.original_media_name = original_media_name self.time = ms self.detections = detections if detections is not None else [] - for d in self.detections: - d.annotation_name = self.name self.image = b'' def __str__(self): @@ -52,22 +48,3 @@ cdef class Annotation: for d in self.detections ) return f"{self.name}: {detections_str}" - - cdef bytes serialize(self): - return msgpack.packb({ - "n": self.name, - "mn": self.original_media_name, - "i": self.image, # "i" = image - "t": self.time, # "t" = time - "d": [ # "d" = detections - { - "an": det.annotation_name, - "x": det.x, - "y": det.y, - "w": det.w, - "h": det.h, - "c": det.cls, - "p": det.confidence - } for det in self.detections - ] - }) diff --git a/constants_inf.pxd b/src/constants_inf.pxd similarity index 61% rename from constants_inf.pxd rename to src/constants_inf.pxd index 02b7977..132f5e6 100644 --- a/constants_inf.pxd +++ b/src/constants_inf.pxd @@ -1,10 +1,4 @@ -cdef str CONFIG_FILE # Port for the zmq - -cdef int QUEUE_MAXSIZE # Maximum size of the command queue -cdef str COMMANDS_QUEUE # Name of the commands queue in rabbit -cdef str ANNOTATIONS_QUEUE # Name of the annotations queue in rabbit - -cdef str QUEUE_CONFIG_FILENAME # queue config filename to load from api +cdef str CONFIG_FILE cdef str AI_ONNX_MODEL_FILE @@ -33,4 +27,3 @@ cdef enum WeatherMode: Norm = 0 Wint = 20 Night = 40 - diff --git a/constants_inf.pyx b/src/constants_inf.pyx similarity index 88% rename from constants_inf.pyx rename to src/constants_inf.pyx index 195457a..effa6a7 100644 --- a/constants_inf.pyx +++ b/src/constants_inf.pyx @@ -1,11 +1,10 @@ import json +import os import sys from loguru import logger -cdef str CONFIG_FILE = "config.yaml" # Port for the zmq - -cdef str QUEUE_CONFIG_FILENAME = "secured-config.json" +cdef str CONFIG_FILE = "config.yaml" cdef str AI_ONNX_MODEL_FILE = "azaion.onnx" cdef str CDN_CONFIG = "cdn.yaml" @@ -35,7 +34,8 @@ WEATHER_MODE_NAMES = { Night: "Night" } -with open('classes.json', 'r', encoding='utf-8') as f: +_classes_path = os.environ.get("CLASSES_JSON_PATH", "classes.json") +with open(_classes_path, 'r', encoding='utf-8') as f: j = json.loads(f.read()) annotations_dict = {} @@ -46,10 +46,12 @@ with open('classes.json', 'r', encoding='utf-8') as f: name = cl['Name'] if i == 0 else f'{cl["Name"]}({mode_name})' annotations_dict[id] = AnnotationClass(id, name, cl['Color'], cl['MaxSizeM']) +_log_dir = os.environ.get("LOG_DIR", "Logs") +os.makedirs(_log_dir, exist_ok=True) logger.remove() log_format = "[{time:HH:mm:ss} {level}] {message}" logger.add( - sink="Logs/log_inference_{time:YYYYMMDD}.txt", + sink=f"{_log_dir}/log_inference_{{time:YYYYMMDD}}.txt", level="INFO", format=log_format, enqueue=True, diff --git a/engines/__init__.py b/src/engines/__init__.py similarity index 100% rename from engines/__init__.py rename to src/engines/__init__.py diff --git a/engines/coreml_engine.pxd b/src/engines/coreml_engine.pxd similarity index 70% rename from engines/coreml_engine.pxd rename to src/engines/coreml_engine.pxd index a371e65..d47fa00 100644 --- a/engines/coreml_engine.pxd +++ b/src/engines/coreml_engine.pxd @@ -8,5 +8,6 @@ cdef class CoreMLEngine(InferenceEngine): cdef int img_height cdef tuple get_input_shape(self) - cdef int get_batch_size(self) cdef run(self, input_data) + cdef preprocess(self, list frames) + cdef list postprocess(self, output, object ai_config) diff --git a/engines/coreml_engine.pyx b/src/engines/coreml_engine.pyx similarity index 54% rename from engines/coreml_engine.pyx rename to src/engines/coreml_engine.pyx index 359a981..f80aa2c 100644 --- a/engines/coreml_engine.pyx +++ b/src/engines/coreml_engine.pyx @@ -1,7 +1,9 @@ from engines.inference_engine cimport InferenceEngine +from annotation cimport Detection cimport constants_inf import numpy as np from PIL import Image +import cv2 import io import os import tempfile @@ -10,8 +12,8 @@ import zipfile cdef class CoreMLEngine(InferenceEngine): - def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs): - super().__init__(model_bytes, batch_size) + def __init__(self, model_bytes: bytes, max_batch_size: int = 1, **kwargs): + InferenceEngine.__init__(self, model_bytes, max_batch_size, engine_name="coreml") import coremltools as ct model_path = kwargs.get('model_path') @@ -25,10 +27,8 @@ cdef class CoreMLEngine(InferenceEngine): img_input = spec.description.input[0] self.img_width = int(img_input.type.imageType.width) self.img_height = int(img_input.type.imageType.height) - self.batch_size = 1 constants_inf.log(f'CoreML model: {self.img_width}x{self.img_height}') - self.engine_name = "coreml" @staticmethod def get_engine_filename(): @@ -48,29 +48,30 @@ cdef class CoreMLEngine(InferenceEngine): cdef tuple get_input_shape(self): return (self.img_height, self.img_width) - cdef int get_batch_size(self): - return 1 + cdef preprocess(self, list frames): + frame = frames[0] + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + resized = cv2.resize(rgb, (self.img_width, self.img_height)) + return Image.fromarray(resized) cdef run(self, input_data): - cdef int w = self.img_width - cdef int h = self.img_height - - blob = input_data[0] - img_array = np.clip(blob * 255.0, 0, 255).astype(np.uint8) - img_array = np.transpose(img_array, (1, 2, 0)) - pil_img = Image.fromarray(img_array, 'RGB') - - pred = self.model.predict({ - 'image': pil_img, + predict = getattr(self.model, 'predict') + return predict({ + 'image': input_data, 'iouThreshold': 0.45, 'confidenceThreshold': 0.25, }) - coords = pred.get('coordinates', np.empty((0, 4), dtype=np.float32)) - confs = pred.get('confidence', np.empty((0, 80), dtype=np.float32)) + cdef list postprocess(self, output, object ai_config): + cdef int w = self.img_width + cdef int h = self.img_height + coords = output.get('coordinates', np.empty((0, 4), dtype=np.float32)) + confs = output.get('confidence', np.empty((0, 80), dtype=np.float32)) + + cdef list[Detection] detections = [] if coords.size == 0: - return [np.zeros((1, 0, 6), dtype=np.float32)] + return [detections] cx, cy, bw, bh = coords[:, 0], coords[:, 1], coords[:, 2], coords[:, 3] x1 = (cx - bw / 2) * w @@ -78,8 +79,22 @@ cdef class CoreMLEngine(InferenceEngine): x2 = (cx + bw / 2) * w y2 = (cy + bh / 2) * h - class_ids = np.argmax(confs, axis=1).astype(np.float32) + class_ids = np.argmax(confs, axis=1) conf_values = np.max(confs, axis=1) - dets = np.stack([x1, y1, x2, y2, conf_values, class_ids], axis=1) - return [dets[np.newaxis, :, :]] + for i in range(len(conf_values)): + conf = round(float(conf_values[i]), 2) + if conf < ai_config.probability_threshold: + continue + det_x1 = float(x1[i]) / w + det_y1 = float(y1[i]) / h + det_x2 = float(x2[i]) / w + det_y2 = float(y2[i]) / h + det_cx = (det_x1 + det_x2) / 2 + det_cy = (det_y1 + det_y2) / 2 + det_w = det_x2 - det_x1 + det_h = det_y2 - det_y1 + detections.append(Detection(det_cx, det_cy, det_w, det_h, int(class_ids[i]), conf)) + + filtered = self.remove_overlapping(detections, ai_config.tracking_intersection_threshold) + return [filtered] diff --git a/src/engines/inference_engine.pxd b/src/engines/inference_engine.pxd new file mode 100644 index 0000000..7a8c2d5 --- /dev/null +++ b/src/engines/inference_engine.pxd @@ -0,0 +1,12 @@ +from annotation cimport Detection + + +cdef class InferenceEngine: + cdef public int max_batch_size + cdef public str engine_name + cdef tuple get_input_shape(self) + cdef run(self, input_data) + cdef preprocess(self, list frames) + cdef list postprocess(self, output, object ai_config) + cdef list remove_overlapping(self, list[Detection] detections, float threshold) + cpdef list process_frames(self, list frames, object ai_config) diff --git a/src/engines/inference_engine.pyx b/src/engines/inference_engine.pyx new file mode 100644 index 0000000..97f4760 --- /dev/null +++ b/src/engines/inference_engine.pyx @@ -0,0 +1,106 @@ +import cv2 +import numpy as np +from annotation cimport Detection + +cdef class InferenceEngine: + def __init__(self, model_bytes: bytes, max_batch_size: int = 8, **kwargs): + self.max_batch_size = max_batch_size + self.engine_name = kwargs.get('engine_name', "onnx") + + @staticmethod + def get_engine_filename(): + return None + + @staticmethod + def get_source_filename(): + return None + + @staticmethod + def convert_from_source(bytes source_bytes): + return source_bytes + + cdef tuple get_input_shape(self): + raise NotImplementedError("Subclass must implement get_input_shape") + + cdef run(self, input_data): + raise NotImplementedError("Subclass must implement run") + + cdef preprocess(self, list frames): + cdef int h, w + h, w = self.get_input_shape() + blobs = [cv2.dnn.blobFromImage(frame, + scalefactor=1.0 / 255.0, + size=(w, h), + mean=(0, 0, 0), + swapRB=True, + crop=False) + for frame in frames] + return np.vstack(blobs) + + cdef list postprocess(self, output, object ai_config): + cdef list[Detection] detections + cdef int ann_index + cdef float x1, y1, x2, y2, conf + cdef int class_id + cdef list results = [] + cdef int h, w + h, w = self.get_input_shape() + + for ann_index in range(len(output[0])): + detections = [] + for det in output[0][ann_index]: + if det[4] == 0: + break + x1 = det[0] / w + y1 = det[1] / h + x2 = det[2] / w + y2 = det[3] / h + conf = round(det[4], 2) + class_id = int(det[5]) + + x = (x1 + x2) / 2 + y = (y1 + y2) / 2 + bw = x2 - x1 + bh = y2 - y1 + if conf >= ai_config.probability_threshold: + detections.append(Detection(x, y, bw, bh, class_id, conf)) + filtered = self.remove_overlapping(detections, ai_config.tracking_intersection_threshold) + results.append(filtered) + return results + + cdef list remove_overlapping(self, list[Detection] detections, float threshold): + cdef Detection det1, det2 + filtered_output = [] + filtered_out_indexes = [] + + for det1_index in range(len(detections)): + if det1_index in filtered_out_indexes: + continue + det1 = detections[det1_index] + res = det1_index + for det2_index in range(det1_index + 1, len(detections)): + det2 = detections[det2_index] + if det1.overlaps(det2, threshold): + if det1.confidence > det2.confidence or ( + det1.confidence == det2.confidence and det1.cls < det2.cls): + filtered_out_indexes.append(det2_index) + else: + filtered_out_indexes.append(res) + res = det2_index + filtered_output.append(detections[res]) + filtered_out_indexes.append(res) + return filtered_output + + cpdef list process_frames(self, list frames, object ai_config): + cdef int effective_batch = min(self.max_batch_size, ai_config.model_batch_size) + if effective_batch < 1: + effective_batch = 1 + cdef list all_detections = [] + cdef int i + for i in range(0, len(frames), effective_batch): + chunk = frames[i:i + effective_batch] + input_blob = self.preprocess(chunk) + raw_output = self.run(input_blob) + batch_dets = self.postprocess(raw_output, ai_config) + all_detections.extend(batch_dets) + return all_detections diff --git a/engines/onnx_engine.pxd b/src/engines/onnx_engine.pxd similarity index 90% rename from engines/onnx_engine.pxd rename to src/engines/onnx_engine.pxd index 85302b4..93da10a 100644 --- a/engines/onnx_engine.pxd +++ b/src/engines/onnx_engine.pxd @@ -10,5 +10,4 @@ cdef class OnnxEngine(InferenceEngine): cdef object input_shape cdef tuple get_input_shape(self) - cdef int get_batch_size(self) cdef run(self, input_data) diff --git a/engines/onnx_engine.pyx b/src/engines/onnx_engine.pyx similarity index 82% rename from engines/onnx_engine.pyx rename to src/engines/onnx_engine.pyx index 9770008..e539edc 100644 --- a/engines/onnx_engine.pyx +++ b/src/engines/onnx_engine.pyx @@ -14,8 +14,8 @@ def _select_providers(): return selected or ["CPUExecutionProvider"] cdef class OnnxEngine(InferenceEngine): - def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs): - super().__init__(model_bytes, batch_size) + def __init__(self, model_bytes: bytes, max_batch_size: int = 8, **kwargs): + InferenceEngine.__init__(self, model_bytes, max_batch_size) providers = _select_providers() constants_inf.log(f'ONNX providers: {providers}') @@ -23,7 +23,8 @@ cdef class OnnxEngine(InferenceEngine): self.model_inputs = self.session.get_inputs() self.input_name = self.model_inputs[0].name self.input_shape = self.model_inputs[0].shape - self.batch_size = self.input_shape[0] if self.input_shape[0] != -1 else batch_size + if self.input_shape[0] not in (-1, None, "N"): + self.max_batch_size = self.input_shape[0] constants_inf.log(f'AI detection model input: {self.model_inputs} {self.input_shape}') model_meta = self.session.get_modelmeta() constants_inf.log(f"Metadata: {model_meta.custom_metadata_map}") @@ -38,13 +39,10 @@ cdef class OnnxEngine(InferenceEngine): shape = self.input_shape return (shape[2], shape[3]) - cdef int get_batch_size(self): - return self.batch_size - cdef run(self, input_data): try: - return self.session.run(None, {self.input_name: input_data}) # type: ignore[attr-defined] + return self.session.run(None, {self.input_name: input_data}) except Exception: if self._cpu_session is not None: - return self._cpu_session.run(None, {self.input_name: input_data}) # type: ignore[attr-defined] - raise \ No newline at end of file + return self._cpu_session.run(None, {self.input_name: input_data}) + raise diff --git a/engines/tensorrt_engine.pxd b/src/engines/tensorrt_engine.pxd similarity index 85% rename from engines/tensorrt_engine.pxd rename to src/engines/tensorrt_engine.pxd index 53237ab..c44b2a3 100644 --- a/engines/tensorrt_engine.pxd +++ b/src/engines/tensorrt_engine.pxd @@ -12,13 +12,9 @@ cdef class TensorRTEngine(InferenceEngine): cdef object h_output cdef str output_name - cdef object output_shape + cdef list output_shape cdef object stream - cdef tuple get_input_shape(self) - - cdef int get_batch_size(self) - cdef run(self, input_data) diff --git a/engines/tensorrt_engine.pyx b/src/engines/tensorrt_engine.pyx similarity index 62% rename from engines/tensorrt_engine.pyx rename to src/engines/tensorrt_engine.pyx index 575f6b5..993584f 100644 --- a/engines/tensorrt_engine.pyx +++ b/src/engines/tensorrt_engine.pyx @@ -1,51 +1,50 @@ from engines.inference_engine cimport InferenceEngine import tensorrt as trt # pyright: ignore[reportMissingImports] import pycuda.driver as cuda # pyright: ignore[reportMissingImports] -import pycuda.autoinit # pyright: ignore[reportMissingImports] # required for automatically initialize CUDA, do not remove. +import pycuda.autoinit # pyright: ignore[reportMissingImports] import pynvml import numpy as np cimport constants_inf +GPU_MEMORY_FRACTION = 0.8 + cdef class TensorRTEngine(InferenceEngine): - def __init__(self, model_bytes: bytes, batch_size: int = 4, **kwargs): - super().__init__(model_bytes, batch_size) + def __init__(self, model_bytes: bytes, max_batch_size: int = 8, **kwargs): + InferenceEngine.__init__(self, model_bytes, max_batch_size, engine_name="tensorrt") try: logger = trt.Logger(trt.Logger.WARNING) - runtime = trt.Runtime(logger) engine = runtime.deserialize_cuda_engine(model_bytes) - if engine is None: - raise RuntimeError(f"Failed to load TensorRT engine from bytes") + raise RuntimeError("Failed to load TensorRT engine from bytes") self.context = engine.create_execution_context() - # input self.input_name = engine.get_tensor_name(0) engine_input_shape = engine.get_tensor_shape(self.input_name) - if engine_input_shape[0] != -1: - self.batch_size = engine_input_shape[0] - else: - self.batch_size = batch_size - self.input_shape = [ - self.batch_size, - engine_input_shape[1], # Channels (usually fixed at 3 for RGB) - 1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height - 1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width - ] + C = engine_input_shape[1] + H = 1280 if engine_input_shape[2] == -1 else engine_input_shape[2] + W = 1280 if engine_input_shape[3] == -1 else engine_input_shape[3] + + if engine_input_shape[0] == -1: + gpu_mem = TensorRTEngine.get_gpu_memory_bytes(0) + self.max_batch_size = TensorRTEngine.calculate_max_batch_size(gpu_mem, H, W) + else: + self.max_batch_size = engine_input_shape[0] + + self.input_shape = [self.max_batch_size, C, H, W] self.context.set_input_shape(self.input_name, self.input_shape) input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize self.d_input = cuda.mem_alloc(input_size) - # output self.output_name = engine.get_tensor_name(1) engine_output_shape = tuple(engine.get_tensor_shape(self.output_name)) self.output_shape = [ - self.batch_size, - 300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number - 6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls + self.max_batch_size, + 300 if engine_output_shape[1] == -1 else engine_output_shape[1], + 6 if engine_output_shape[2] == -1 else engine_output_shape[2], ] self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32) self.d_output = cuda.mem_alloc(self.h_output.nbytes) @@ -54,7 +53,14 @@ cdef class TensorRTEngine(InferenceEngine): except Exception as e: raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}") - self.engine_name = "tensorrt" + + @staticmethod + def calculate_max_batch_size(gpu_memory_bytes, int input_h, int input_w): + frame_input_bytes = 3 * input_h * input_w * 4 + estimated_per_frame = frame_input_bytes * 12 + available = gpu_memory_bytes * GPU_MEMORY_FRACTION + calculated = max(1, int(available / estimated_per_frame)) + return min(calculated, 32) @staticmethod def get_gpu_memory_bytes(int device_id): @@ -71,7 +77,7 @@ cdef class TensorRTEngine(InferenceEngine): pynvml.nvmlShutdown() except pynvml.NVMLError: pass - return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory # default 2 Gb + return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory @staticmethod def get_engine_filename(): @@ -91,7 +97,8 @@ cdef class TensorRTEngine(InferenceEngine): @staticmethod def convert_from_source(bytes onnx_model): - workspace_bytes = int(TensorRTEngine.get_gpu_memory_bytes(0) * 0.9) + gpu_mem = TensorRTEngine.get_gpu_memory_bytes(0) + workspace_bytes = int(gpu_mem * 0.9) explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) trt_logger = trt.Logger(trt.Logger.WARNING) @@ -106,13 +113,30 @@ cdef class TensorRTEngine(InferenceEngine): if not parser.parse(onnx_model): return None + input_tensor = network.get_input(0) + shape = input_tensor.shape + C = shape[1] + H = max(shape[2], 1280) if shape[2] != -1 else 1280 + W = max(shape[3], 1280) if shape[3] != -1 else 1280 + + if shape[0] == -1: + max_batch = TensorRTEngine.calculate_max_batch_size(gpu_mem, H, W) + profile = builder.create_optimization_profile() + profile.set_shape( + input_tensor.name, + (1, C, H, W), + (max_batch, C, H, W), + (max_batch, C, H, W), + ) + config.add_optimization_profile(profile) + if builder.platform_has_fast_fp16: constants_inf.log('Converting to supported fp16') config.set_flag(trt.BuilderFlag.FP16) else: constants_inf.log('Converting to supported fp32. (fp16 is not supported)') - plan = builder.build_serialized_network(network, config) + plan = builder.build_serialized_network(network, config) if plan is None: constants_inf.logerror('Conversion failed.') return None @@ -122,20 +146,23 @@ cdef class TensorRTEngine(InferenceEngine): cdef tuple get_input_shape(self): return (self.input_shape[2], self.input_shape[3]) - cdef int get_batch_size(self): - return self.batch_size - cdef run(self, input_data): try: - cuda.memcpy_htod_async(self.d_input, input_data, self.stream) - self.context.set_tensor_address(self.input_name, int(self.d_input)) # type: ignore - self.context.set_tensor_address(self.output_name, int(self.d_output)) # type: ignore + actual_batch = input_data.shape[0] + if actual_batch != self.input_shape[0]: + actual_shape = [actual_batch, self.input_shape[1], self.input_shape[2], self.input_shape[3]] + self.context.set_input_shape(self.input_name, actual_shape) - self.context.execute_async_v3(stream_handle=self.stream.handle) # type: ignore - self.stream.synchronize() # type: ignore + cuda.memcpy_htod_async(self.d_input, input_data, self.stream) + self.context.set_tensor_address(self.input_name, int(self.d_input)) + self.context.set_tensor_address(self.output_name, int(self.d_output)) + + self.context.execute_async_v3(stream_handle=self.stream.handle) + self.stream.synchronize() cuda.memcpy_dtoh(self.h_output, self.d_output) - output = self.h_output.reshape(self.output_shape) # type: ignore + output_shape = [actual_batch, self.output_shape[1], self.output_shape[2]] + output = self.h_output[:actual_batch].reshape(output_shape) return [output] except Exception as e: diff --git a/inference.pyx b/src/inference.pyx similarity index 64% rename from inference.pyx rename to src/inference.pyx index c4b6ef5..4bc9996 100644 --- a/inference.pyx +++ b/src/inference.pyx @@ -2,7 +2,6 @@ import mimetypes from pathlib import Path import cv2 -import numpy as np cimport constants_inf from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus @@ -26,8 +25,6 @@ cdef class Inference: cdef bint stop_signal cdef public AIAvailabilityStatus ai_availability_status cdef str model_input - cdef int model_width - cdef int model_height cdef bytes _converted_model_bytes cdef bint is_building_engine @@ -37,8 +34,6 @@ cdef class Inference: self._status_callback = None self.stop_signal = False self.model_input = None - self.model_width = 0 - self.model_height = 0 self.detection_counts = {} self.engine = None self.is_building_engine = False @@ -96,7 +91,6 @@ cdef class Inference: try: self.engine = EngineClass(self._converted_model_bytes) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) - self.model_height, self.model_width = self.engine.get_input_shape() except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) finally: @@ -130,92 +124,14 @@ cdef class Inference: self.engine = EngineClass(self.download_model(constants_inf.AI_ONNX_MODEL_FILE)) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) self.is_building_engine = False - - self.model_height, self.model_width = self.engine.get_input_shape() except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) self.is_building_engine = False - - cdef preprocess(self, frames): - blobs = [cv2.dnn.blobFromImage(frame, - scalefactor=1.0 / 255.0, - size=(self.model_width, self.model_height), - mean=(0, 0, 0), - swapRB=True, - crop=False) - for frame in frames] - return np.vstack(blobs) - - cdef postprocess(self, output, ai_config): - cdef list[Detection] detections = [] - cdef int ann_index - cdef float x1, y1, x2, y2, conf, cx, cy, w, h - cdef int class_id - cdef list[list[Detection]] results = [] - try: - for ann_index in range(len(output[0])): - detections.clear() - for det in output[0][ann_index]: - if det[4] == 0: # if confidence is 0 then valid points are over. - break - x1 = det[0] / self.model_width - y1 = det[1] / self.model_height - x2 = det[2] / self.model_width - y2 = det[3] / self.model_height - conf = round(det[4], 2) - class_id = int(det[5]) - - x = (x1 + x2) / 2 - y = (y1 + y2) / 2 - w = x2 - x1 - h = y2 - y1 - if conf >= ai_config.probability_threshold: - detections.append(Detection(x, y, w, h, class_id, conf)) # type: ignore[call-arg] - filtered_detections = self.remove_overlapping_detections(detections, ai_config.tracking_intersection_threshold) - results.append(filtered_detections) - return results - except Exception as e: - raise RuntimeError(f"Failed to postprocess: {str(e)}") - - cdef remove_overlapping_detections(self, list[Detection] detections, float confidence_threshold=0.6): - cdef Detection det1, det2 - filtered_output = [] - filtered_out_indexes = [] - - for det1_index in range(len(detections)): - if det1_index in filtered_out_indexes: - continue - det1 = detections[det1_index] - res = det1_index - for det2_index in range(det1_index + 1, len(detections)): - det2 = detections[det2_index] - if det1.overlaps(det2, confidence_threshold): - if det1.confidence > det2.confidence or ( - det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id - filtered_out_indexes.append(det2_index) - else: - filtered_out_indexes.append(res) - res = det2_index - filtered_output.append(detections[res]) - filtered_out_indexes.append(res) - return filtered_output - cdef bint is_video(self, str filepath): mime_type, _ = mimetypes.guess_type(filepath) return (mime_type and mime_type.startswith("video")) - cdef split_list_extend(self, lst, chunk_size): - chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] - - # If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element. - last_chunk = chunks[len(chunks) - 1] - if len(last_chunk) < chunk_size: - last_elem = last_chunk[len(last_chunk)-1] - while len(last_chunk) < chunk_size: - last_chunk.append(last_elem) - return chunks - cpdef run_detect(self, dict config_dict, object annotation_callback, object status_callback=None): cdef list[str] videos = [] cdef list[str] images = [] @@ -247,44 +163,17 @@ cdef class Inference: constants_inf.log(f'run inference on {v}...') self._process_video(ai_config, v) - cpdef list detect_single_image(self, bytes image_bytes, dict config_dict): - cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_dict(config_dict) - self.init_ai() - if self.engine is None: - raise RuntimeError("AI engine not available") - - img_array = np.frombuffer(image_bytes, dtype=np.uint8) - frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR) - if frame is None: - raise ValueError("Invalid image data") - - cdef int batch_size = self.engine.get_batch_size() - frames = [frame] * batch_size - input_blob = self.preprocess(frames) - outputs = self.engine.run(input_blob) - list_detections = self.postprocess(outputs, ai_config) - if not list_detections: - return [] - - cdef list[Detection] detections = list_detections[0] - if ai_config.focal_length > 0 and ai_config.sensor_width > 0: - img_h, img_w = frame.shape[0], frame.shape[1] - gsd = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w) - detections = [ - d for d in detections - if d.w * img_w * gsd <= constants_inf.annotations_dict[d.cls].max_object_size_meters - and d.h * img_h * gsd <= constants_inf.annotations_dict[d.cls].max_object_size_meters - ] - return detections - cdef _process_video(self, AIRecognitionConfig ai_config, str video_name): cdef int frame_count = 0 cdef int batch_count = 0 cdef list batch_frames = [] cdef list[long] batch_timestamps = [] cdef Annotation annotation + cdef int model_h, model_w self._previous_annotation = None + model_h, model_w = self.engine.get_input_shape() + v_input = cv2.VideoCapture(video_name) if not v_input.isOpened(): constants_inf.logerror(f'Failed to open video: {video_name}') @@ -294,6 +183,11 @@ cdef class Inference: width = int(v_input.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(v_input.get(cv2.CAP_PROP_FRAME_HEIGHT)) constants_inf.log(f'Video: {total_frames} frames, {fps:.1f} fps, {width}x{height}') + + cdef int effective_batch = min(self.engine.max_batch_size, ai_config.model_batch_size) + if effective_batch < 1: + effective_batch = 1 + while v_input.isOpened() and not self.stop_signal: ret, frame = v_input.read() if not ret or frame is None: @@ -304,41 +198,48 @@ cdef class Inference: batch_frames.append(frame) batch_timestamps.append(v_input.get(cv2.CAP_PROP_POS_MSEC)) - if len(batch_frames) == self.engine.get_batch_size(): + if len(batch_frames) >= effective_batch: batch_count += 1 constants_inf.log(f'Video batch {batch_count}: frame {frame_count}/{total_frames} ({frame_count*100//total_frames}%)') - input_blob = self.preprocess(batch_frames) + self._process_video_batch(ai_config, batch_frames, batch_timestamps, video_name, frame_count, total_frames, model_w) + batch_frames = [] + batch_timestamps = [] - outputs = self.engine.run(input_blob) + if batch_frames: + batch_count += 1 + constants_inf.log(f'Video batch {batch_count} (flush): {len(batch_frames)} remaining frames') + self._process_video_batch(ai_config, batch_frames, batch_timestamps, video_name, frame_count, total_frames, model_w) - list_detections = self.postprocess(outputs, ai_config) - total_dets = sum(len(d) for d in list_detections) - if total_dets > 0: - constants_inf.log(f'Video batch {batch_count}: {total_dets} detections from postprocess') - for i in range(len(list_detections)): - detections = list_detections[i] - - original_media_name = Path(video_name).stem.replace(" ", "") - name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}' - annotation = Annotation(name, original_media_name, batch_timestamps[i], detections) # type: ignore[call-arg] - - if detections: - valid = self.is_valid_video_annotation(annotation, ai_config) - constants_inf.log(f'Video frame {name}: {len(detections)} dets, valid={valid}') - if valid: - _, image = cv2.imencode('.jpg', batch_frames[i]) - annotation.image = image.tobytes() - self._previous_annotation = annotation - self.on_annotation(annotation, frame_count, total_frames) - else: - self.is_valid_video_annotation(annotation, ai_config) - - batch_frames.clear() - batch_timestamps.clear() v_input.release() constants_inf.log(f'Video done: {frame_count} frames read, {batch_count} batches processed') self.send_detection_status() + cdef _process_video_batch(self, AIRecognitionConfig ai_config, list batch_frames, + list batch_timestamps, str video_name, + int frame_count, int total_frames, int model_w): + cdef Annotation annotation + list_detections = self.engine.process_frames(batch_frames, ai_config) + total_dets = sum(len(d) for d in list_detections) + if total_dets > 0: + constants_inf.log(f'Video batch: {total_dets} detections from postprocess') + + for i in range(len(list_detections)): + detections = list_detections[i] + original_media_name = Path(video_name).stem.replace(" ", "") + name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}' + annotation = Annotation(name, original_media_name, batch_timestamps[i], detections) + + if detections: + valid = self.is_valid_video_annotation(annotation, ai_config, model_w) + constants_inf.log(f'Video frame {name}: {len(detections)} dets, valid={valid}') + if valid: + _, image = cv2.imencode('.jpg', batch_frames[i]) + annotation.image = image.tobytes() + self._previous_annotation = annotation + self.on_annotation(annotation, frame_count, total_frames) + else: + self.is_valid_video_annotation(annotation, ai_config, model_w) + cdef on_annotation(self, Annotation annotation, int frame_count=0, int total_frames=0): self.detection_counts[annotation.original_media_name] = self.detection_counts.get(annotation.original_media_name, 0) + 1 if self._annotation_callback is not None: @@ -347,35 +248,53 @@ cdef class Inference: cb(annotation, percent) cdef _process_images(self, AIRecognitionConfig ai_config, list[str] image_paths): - cdef list frame_data + cdef list all_frame_data = [] cdef double ground_sampling_distance + cdef int model_h, model_w + + model_h, model_w = self.engine.get_input_shape() self._tile_detections = {} + for path in image_paths: - frame_data = [] frame = cv2.imread(path) - img_h, img_w, _ = frame.shape if frame is None: constants_inf.logerror(f'Failed to read image {path}') continue + img_h, img_w, _ = frame.shape original_media_name = Path( path).stem.replace(" ", "") ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w) constants_inf.log(f'ground sampling distance: {ground_sampling_distance}') - if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width: - frame_data.append((frame, original_media_name, f'{original_media_name}_000000')) + if img_h <= 1.5 * model_h and img_w <= 1.5 * model_w: + all_frame_data.append((frame, original_media_name, f'{original_media_name}_000000', ground_sampling_distance)) else: tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance) constants_inf.log( f'calc tile size: {tile_size}') res = self.split_to_tiles(frame, path, tile_size, ai_config.big_image_tile_overlap_percent) - frame_data.extend(res) - if len(frame_data) > self.engine.get_batch_size(): - for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()): - self._process_images_inner(ai_config, chunk, ground_sampling_distance) - self.send_detection_status() + for tile_frame, omn, tile_name in res: + all_frame_data.append((tile_frame, omn, tile_name, ground_sampling_distance)) + + if not all_frame_data: + return + + frames = [fd[0] for fd in all_frame_data] + all_dets = self.engine.process_frames(frames, ai_config) + + for i in range(len(all_dets)): + frame_entry = all_frame_data[i] + f = frame_entry[0] + original_media_name = frame_entry[1] + name = frame_entry[2] + gsd = frame_entry[3] + + annotation = Annotation(name, original_media_name, 0, all_dets[i]) + if self.is_valid_image_annotation(annotation, gsd, f.shape): + constants_inf.log( f'Detected {annotation}') + _, image = cv2.imencode('.jpg', f) + annotation.image = image.tobytes() + self.on_annotation(annotation) - for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()): - self._process_images_inner(ai_config, chunk, ground_sampling_distance) self.send_detection_status() cdef send_detection_status(self): @@ -398,14 +317,13 @@ cdef class Inference: x_end = min(x + tile_size, img_w) y_end = min(y + tile_size, img_h) - # correct x,y for the close-to-border tiles if x_end - x < tile_size: if img_w - (x - stride_w) <= tile_size: - continue # the previous tile already covered the last gap + continue x = img_w - tile_size if y_end - y < tile_size: if img_h - (y - stride_h) <= tile_size: - continue # the previous tile already covered the last gap + continue y = img_h - tile_size tile = frame[y:y_end, x:x_end] @@ -413,24 +331,6 @@ cdef class Inference: results.append((tile, original_media_name, name)) return results - cdef _process_images_inner(self, AIRecognitionConfig ai_config, list frame_data, double ground_sampling_distance): - cdef list frames, original_media_names, names - cdef Annotation annotation - cdef int i - frames, original_media_names, names = map(list, zip(*frame_data)) - - input_blob = self.preprocess(frames) - outputs = self.engine.run(input_blob) - - list_detections = self.postprocess(outputs, ai_config) - for i in range(len(list_detections)): - annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i]) # type: ignore[call-arg] - if self.is_valid_image_annotation(annotation, ground_sampling_distance, frames[i].shape): - constants_inf.log( f'Detected {annotation}') - _, image = cv2.imencode('.jpg', frames[i]) - annotation.image = image.tobytes() - self.on_annotation(annotation) - cpdef stop(self): self.stop_signal = True @@ -449,7 +349,7 @@ cdef class Inference: for det in annotation.detections: x1 = det.x * tile_size y1 = det.y * tile_size - det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence) # type: ignore[call-arg] + det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence) if det_abs not in existing_abs_detections: unique_detections.append(det) @@ -482,7 +382,7 @@ cdef class Inference: return False return True - cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config): + cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config, int model_w): if constants_inf.SPLIT_SUFFIX in annotation.name: self.remove_tiled_duplicates(annotation) if not annotation.detections: @@ -515,7 +415,7 @@ cdef class Inference: min_distance_sq = distance_sq closest_det = prev_det - dist_px = ai_config.tracking_distance_confidence * self.model_width + dist_px = ai_config.tracking_distance_confidence * model_w dist_px_sq = dist_px * dist_px if min_distance_sq > dist_px_sq: return True diff --git a/loader_http_client.pxd b/src/loader_http_client.pxd similarity index 93% rename from loader_http_client.pxd rename to src/loader_http_client.pxd index 47ee170..d0946fc 100644 --- a/loader_http_client.pxd +++ b/src/loader_http_client.pxd @@ -6,4 +6,3 @@ cdef class LoaderHttpClient: cdef str base_url cdef LoadResult load_big_small_resource(self, str filename, str directory) cdef LoadResult upload_big_small_resource(self, bytes content, str filename, str directory) - cdef stop(self) diff --git a/loader_http_client.pyx b/src/loader_http_client.pyx similarity index 93% rename from loader_http_client.pyx rename to src/loader_http_client.pyx index 40cf72c..2a275da 100644 --- a/loader_http_client.pyx +++ b/src/loader_http_client.pyx @@ -1,6 +1,8 @@ import requests from loguru import logger +HTTP_TIMEOUT = 120 + cdef class LoadResult: def __init__(self, err, data=None): @@ -18,6 +20,7 @@ cdef class LoaderHttpClient: f"{self.base_url}/load/{filename}", json={"filename": filename, "folder": directory}, stream=True, + timeout=HTTP_TIMEOUT, ) response.raise_for_status() return LoadResult(None, response.content) @@ -31,12 +34,10 @@ cdef class LoaderHttpClient: f"{self.base_url}/upload/{filename}", files={"data": (filename, content)}, data={"folder": directory}, + timeout=HTTP_TIMEOUT, ) response.raise_for_status() return LoadResult(None) except Exception as e: logger.error(f"LoaderHttpClient.upload_big_small_resource failed: {e}") return LoadResult(str(e)) - - cdef stop(self): - pass diff --git a/main.py b/src/main.py similarity index 91% rename from main.py rename to src/main.py index f9314fa..67dda84 100644 --- a/main.py +++ b/src/main.py @@ -100,7 +100,7 @@ class AIConfigDto(BaseModel): tracking_distance_confidence: float = 0.0 tracking_probability_increase: float = 0.0 tracking_intersection_threshold: float = 0.6 - model_batch_size: int = 1 + model_batch_size: int = 8 big_image_tile_overlap_percent: int = 20 altitude: float = 400 focal_length: float = 24 @@ -150,28 +150,46 @@ async def detect_image( file: UploadFile = File(...), config: Optional[str] = Form(None), ): + import tempfile + import cv2 + import numpy as np + image_bytes = await file.read() if not image_bytes: raise HTTPException(status_code=400, detail="Image is empty") + arr = np.frombuffer(image_bytes, dtype=np.uint8) + if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: + raise HTTPException(status_code=400, detail="Invalid image data") + config_dict = {} if config: config_dict = json.loads(config) - loop = asyncio.get_event_loop() + suffix = os.path.splitext(file.filename or "upload.jpg")[1] or ".jpg" + tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) try: + tmp.write(image_bytes) + tmp.close() + config_dict["paths"] = [tmp.name] + + loop = asyncio.get_event_loop() inf = get_inference() - detections = await loop.run_in_executor( - executor, inf.detect_single_image, image_bytes, config_dict - ) + results = [] + + def on_annotation(annotation, percent): + results.extend(annotation.detections) + + await loop.run_in_executor(executor, inf.run_detect, config_dict, on_annotation) + return [detection_to_dto(d) for d in results] except RuntimeError as e: if "not available" in str(e): raise HTTPException(status_code=503, detail=str(e)) raise HTTPException(status_code=422, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) - - return [detection_to_dto(d) for d in detections] + finally: + os.unlink(tmp.name) def _post_annotation_to_service(token_mgr: TokenManager, media_id: str,