Files
ai-training/_docs/02_document/modules/annotation_queue_handler.md
T
Oleksandr Bezdieniezhnykh 142c6c4de8 Refactor constants management to use Pydantic BaseModel for configuration
- Replaced module-level path variables in constants.py with a structured Pydantic Config class.
- Updated all relevant modules (train.py, augmentation.py, exports.py, dataset-visualiser.py, manual_run.py) to access paths through the new config structure.
- Fixed bugs related to image processing and model saving.
- Enhanced test infrastructure to accommodate the new configuration approach.

This refactor improves code maintainability and clarity by centralizing configuration management.
2026-03-27 18:18:30 +02:00

3.3 KiB

Module: annotation-queue/annotation_queue_handler

Purpose

Async consumer for the Azaion annotation queue (RabbitMQ Streams). Listens for annotation CRUD events and writes/moves image+label files on the filesystem.

Public Interface

AnnotationQueueHandler

Method Signature Returns Description
__init__ () Reads config.yaml, creates directories, initializes rstream Consumer, reads offset
start async () Starts consumer, subscribes to queue stream, runs event loop
on_message (message: AMQPMessage, context: MessageContext) Message callback: routes by AnnotationStatus to save/validate/delete
save_annotation (ann: AnnotationMessage) Writes label file + image to data or seed directory based on role
validate (msg: AnnotationBulkMessage) Moves annotations from seed to data directory
delete (msg: AnnotationBulkMessage) Moves annotations to deleted directory

AnnotationQueueHandler.AnnotationName (inner class)

Helper that pre-computes file paths for an annotation name across data/seed directories.

Internal Logic

  • Queue protocol: Subscribes to a RabbitMQ Streams queue using rstream library with AMQP message decoding. Resumes from a persisted offset stored in offset.yaml.
  • Message routing (via application_properties['AnnotationStatus']):
    • Created / Editedsave_annotation: If validator role, writes to data dir; else writes to seed dir. For Created status, also saves the image bytes. For Edited by validator, moves image from seed to data.
    • Validatedvalidate: Bulk-moves all named annotations from seed to data directory.
    • Deleteddelete: Bulk-moves all named annotations to the deleted directory.
  • Offset tracking: After each message, increments offset and persists to offset.yaml.
  • Directory layout:
    • {root}/data/images/ + {root}/data/labels/ — validated annotations
    • {root}/data-seed/images/ + {root}/data-seed/labels/ — unvalidated annotations
    • {root}/data_deleted/images/ + {root}/data_deleted/labels/ — soft-deleted annotations
  • Logging: TimedRotatingFileHandler with daily rotation, 7-day retention, logs to logs/ directory.

Dependencies

  • annotation_queue_dto — AnnotationStatus, AnnotationMessage, AnnotationBulkMessage
  • rstream (external) — RabbitMQ Streams consumer
  • yaml (external) — config and offset persistence
  • asyncio, os, shutil, sys, logging, datetime (stdlib)

Consumers

None (entry point — runs via __main__).

Data Models

Uses AnnotationMessage, AnnotationBulkMessage from annotation_queue_dto.

Configuration

  • config.yaml: API creds (url, email, password), queue config (host, port, consumer_user, consumer_pw, name), directory structure (root, data, data_seed, data_processed, data_deleted, images, labels)
  • offset.yaml: persisted queue consumer offset

External Integrations

  • RabbitMQ Streams queue (rstream library) on host 188.245.120.247:5552
  • Filesystem: /azaion/data/, /azaion/data-seed/, /azaion/data_deleted/

Security

  • Queue credentials in config.yaml (hardcoded — security concern)
  • No encryption of annotation data at rest

Tests

None.