# Component: Annotation Queue Service ## Overview Self-contained async service that consumes annotation CRUD events from a RabbitMQ Streams queue and persists images + labels to the filesystem. Operates independently from the training pipeline. **Pattern**: Message-driven event handler / consumer service **Upstream**: External RabbitMQ Streams queue (Azaion platform) **Downstream**: Data Pipeline (files written become input for augmentation) ## Modules - `annotation-queue/annotation_queue_dto` — message DTOs (AnnotationMessage, AnnotationBulkMessage, AnnotationStatus, Detection, etc.) - `annotation-queue/annotation_queue_handler` — async queue consumer with message routing and file management ## Internal Interfaces ### AnnotationQueueHandler ```python AnnotationQueueHandler() AnnotationQueueHandler.start() -> async AnnotationQueueHandler.on_message(message: AMQPMessage, context: MessageContext) -> None AnnotationQueueHandler.save_annotation(ann: AnnotationMessage) -> None AnnotationQueueHandler.validate(msg: AnnotationBulkMessage) -> None AnnotationQueueHandler.delete(msg: AnnotationBulkMessage) -> None ``` ### Key DTOs ```python AnnotationMessage(msgpack_bytes) # Full annotation with image + detections AnnotationBulkMessage(msgpack_bytes) # Bulk validate/delete AnnotationStatus: Created(10), Edited(20), Validated(30), Deleted(40) RoleEnum: Operator(10), Validator(20), CompanionPC(30), Admin(40), ApiAdmin(1000) ``` ## Data Access Patterns - **Queue**: Consumes from RabbitMQ Streams queue `azaion-annotations` using rstream library - **Offset persistence**: `offset.yaml` tracks last processed message offset for resume - **Filesystem writes**: - Validated annotations → `{root}/data/images/` + `{root}/data/labels/` - Unvalidated (seed) → `{root}/data-seed/images/` + `{root}/data-seed/labels/` - Deleted → `{root}/data_deleted/images/` + `{root}/data_deleted/labels/` ## Implementation Details - **Message routing**: Based on `AnnotationStatus` from AMQP application properties: - Created/Edited → save label + optionally image; validator role writes to data, operator to seed - Validated (bulk) → move from seed to data - Deleted (bulk) → move to deleted directory - **Role-based logic**: `RoleEnum.is_validator()` returns True for Validator, Admin, ApiAdmin — these roles write directly to validated data directory - **Serialization**: Messages are msgpack-encoded with positional integer keys. Detections are embedded as a JSON string within the msgpack payload. - **Offset tracking**: After each successfully processed message, offset is persisted to `offset.yaml` (survives restarts) - **Logging**: TimedRotatingFileHandler with daily rotation, 7-day retention, writes to `logs/` directory - **Separate dependencies**: Own `requirements.txt` (pyyaml, msgpack, rstream only) - **Own config.yaml**: Points to test directories by default (`data-test`, `data-test-seed`) ## Caveats - Credentials hardcoded in `config.yaml` (queue host, user, password) - AnnotationClass duplicated (third copy) with slight differences from dto/ version - No reconnection logic for queue disconnections - No dead-letter queue or message retry on processing failures - `save_annotation` writes empty label files when detections list has no newline separators between entries - The annotation-queue `config.yaml` uses different directory names (`data-test` vs `data`) than the main `config.yaml` — likely a test vs production configuration issue ## Dependency Graph ```mermaid graph TD annotation_queue_dto --> annotation_queue_handler rstream_ext[rstream library] --> annotation_queue_handler msgpack_ext[msgpack library] --> annotation_queue_dto ``` ## Logging Strategy `logging` module with TimedRotatingFileHandler. Format: `HH:MM:SS|message`. Daily rotation, 7-day retention. Also outputs to stdout.