# Module: annotation-queue/annotation_queue_handler ## Purpose Async consumer for the Azaion annotation queue (RabbitMQ Streams). Listens for annotation CRUD events and writes/moves image+label files on the filesystem. ## Public Interface ### AnnotationQueueHandler | Method | Signature | Returns | Description | |--------|-----------|---------|-------------| | `__init__` | `()` | — | Reads config.yaml, creates directories, initializes rstream Consumer, reads offset | | `start` | `async ()` | — | Starts consumer, subscribes to queue stream, runs event loop | | `on_message` | `(message: AMQPMessage, context: MessageContext)` | — | Message callback: routes by AnnotationStatus to save/validate/delete | | `save_annotation` | `(ann: AnnotationMessage)` | — | Writes label file + image to data or seed directory based on role | | `validate` | `(msg: AnnotationBulkMessage)` | — | Moves annotations from seed to data directory | | `delete` | `(msg: AnnotationBulkMessage)` | — | Moves annotations to deleted directory | ### AnnotationQueueHandler.AnnotationName (inner class) Helper that pre-computes file paths for an annotation name across data/seed directories. ## Internal Logic - **Queue protocol**: Subscribes to a RabbitMQ Streams queue using rstream library with AMQP message decoding. Resumes from a persisted offset stored in `offset.yaml`. - **Message routing** (via `application_properties['AnnotationStatus']`): - `Created` / `Edited` → `save_annotation`: If validator role, writes to data dir; else writes to seed dir. For Created status, also saves the image bytes. For Edited by validator, moves image from seed to data. - `Validated` → `validate`: Bulk-moves all named annotations from seed to data directory. - `Deleted` → `delete`: Bulk-moves all named annotations to the deleted directory. - **Offset tracking**: After each message, increments offset and persists to `offset.yaml`. - **Directory layout**: - `{root}/data/images/` + `{root}/data/labels/` — validated annotations - `{root}/data-seed/images/` + `{root}/data-seed/labels/` — unvalidated annotations - `{root}/data_deleted/images/` + `{root}/data_deleted/labels/` — soft-deleted annotations - **Logging**: TimedRotatingFileHandler with daily rotation, 7-day retention, logs to `logs/` directory. ## Dependencies - `annotation_queue_dto` — AnnotationStatus, AnnotationMessage, AnnotationBulkMessage - `rstream` (external) — RabbitMQ Streams consumer - `yaml` (external) — config and offset persistence - `asyncio`, `os`, `shutil`, `sys`, `logging`, `datetime` (stdlib) ## Consumers None (entry point — runs via `__main__`). ## Data Models Uses AnnotationMessage, AnnotationBulkMessage from annotation_queue_dto. ## Configuration - `config.yaml`: API creds (url, email, password), queue config (host, port, consumer_user, consumer_pw, name), directory structure (root, data, data_seed, data_processed, data_deleted, images, labels) - `offset.yaml`: persisted queue consumer offset ## External Integrations - RabbitMQ Streams queue (rstream library) on host `188.245.120.247:5552` - Filesystem: `/azaion/data/`, `/azaion/data-seed/`, `/azaion/data_deleted/` ## Security - Queue credentials in `config.yaml` (hardcoded — security concern) - No encryption of annotation data at rest ## Tests None.