Files
Oleksandr Bezdieniezhnykh 142c6c4de8 Refactor constants management to use Pydantic BaseModel for configuration
- Replaced module-level path variables in constants.py with a structured Pydantic Config class.
- Updated all relevant modules (train.py, augmentation.py, exports.py, dataset-visualiser.py, manual_run.py) to access paths through the new config structure.
- Fixed bugs related to image processing and model saving.
- Enhanced test infrastructure to accommodate the new configuration approach.

This refactor improves code maintainability and clarity by centralizing configuration management.
2026-03-27 18:18:30 +02:00

3.8 KiB

Component: Annotation Queue Service

Overview

Self-contained async service that consumes annotation CRUD events from a RabbitMQ Streams queue and persists images + labels to the filesystem. Operates independently from the training pipeline.

Pattern: Message-driven event handler / consumer service Upstream: External RabbitMQ Streams queue (Azaion platform) Downstream: Data Pipeline (files written become input for augmentation)

Modules

  • annotation-queue/annotation_queue_dto — message DTOs (AnnotationMessage, AnnotationBulkMessage, AnnotationStatus, Detection, etc.)
  • annotation-queue/annotation_queue_handler — async queue consumer with message routing and file management

Internal Interfaces

AnnotationQueueHandler

AnnotationQueueHandler()
AnnotationQueueHandler.start() -> async
AnnotationQueueHandler.on_message(message: AMQPMessage, context: MessageContext) -> None
AnnotationQueueHandler.save_annotation(ann: AnnotationMessage) -> None
AnnotationQueueHandler.validate(msg: AnnotationBulkMessage) -> None
AnnotationQueueHandler.delete(msg: AnnotationBulkMessage) -> None

Key DTOs

AnnotationMessage(msgpack_bytes)  # Full annotation with image + detections
AnnotationBulkMessage(msgpack_bytes)  # Bulk validate/delete
AnnotationStatus: Created(10), Edited(20), Validated(30), Deleted(40)
RoleEnum: Operator(10), Validator(20), CompanionPC(30), Admin(40), ApiAdmin(1000)

Data Access Patterns

  • Queue: Consumes from RabbitMQ Streams queue azaion-annotations using rstream library
  • Offset persistence: offset.yaml tracks last processed message offset for resume
  • Filesystem writes:
    • Validated annotations → {root}/data/images/ + {root}/data/labels/
    • Unvalidated (seed) → {root}/data-seed/images/ + {root}/data-seed/labels/
    • Deleted → {root}/data_deleted/images/ + {root}/data_deleted/labels/

Implementation Details

  • Message routing: Based on AnnotationStatus from AMQP application properties:
    • Created/Edited → save label + optionally image; validator role writes to data, operator to seed
    • Validated (bulk) → move from seed to data
    • Deleted (bulk) → move to deleted directory
  • Role-based logic: RoleEnum.is_validator() returns True for Validator, Admin, ApiAdmin — these roles write directly to validated data directory
  • Serialization: Messages are msgpack-encoded with positional integer keys. Detections are embedded as a JSON string within the msgpack payload.
  • Offset tracking: After each successfully processed message, offset is persisted to offset.yaml (survives restarts)
  • Logging: TimedRotatingFileHandler with daily rotation, 7-day retention, writes to logs/ directory
  • Separate dependencies: Own requirements.txt (pyyaml, msgpack, rstream only)
  • Own config.yaml: Points to test directories by default (data-test, data-test-seed)

Caveats

  • Credentials hardcoded in config.yaml (queue host, user, password)
  • AnnotationClass duplicated (third copy) with slight differences from dto/ version
  • No reconnection logic for queue disconnections
  • No dead-letter queue or message retry on processing failures
  • save_annotation writes empty label files when detections list has no newline separators between entries
  • The annotation-queue config.yaml uses different directory names (data-test vs data) than the main config.yaml — likely a test vs production configuration issue

Dependency Graph

graph TD
    annotation_queue_dto --> annotation_queue_handler
    rstream_ext[rstream library] --> annotation_queue_handler
    msgpack_ext[msgpack library] --> annotation_queue_dto

Logging Strategy

logging module with TimedRotatingFileHandler. Format: HH:MM:SS|message. Daily rotation, 7-day retention. Also outputs to stdout.