mirror of
https://github.com/azaion/ai-training.git
synced 2026-04-22 22:06:36 +00:00
142c6c4de8
- Replaced module-level path variables in constants.py with a structured Pydantic Config class. - Updated all relevant modules (train.py, augmentation.py, exports.py, dataset-visualiser.py, manual_run.py) to access paths through the new config structure. - Fixed bugs related to image processing and model saving. - Enhanced test infrastructure to accommodate the new configuration approach. This refactor improves code maintainability and clarity by centralizing configuration management.
3.8 KiB
3.8 KiB
Component: Annotation Queue Service
Overview
Self-contained async service that consumes annotation CRUD events from a RabbitMQ Streams queue and persists images + labels to the filesystem. Operates independently from the training pipeline.
Pattern: Message-driven event handler / consumer service Upstream: External RabbitMQ Streams queue (Azaion platform) Downstream: Data Pipeline (files written become input for augmentation)
Modules
annotation-queue/annotation_queue_dto— message DTOs (AnnotationMessage, AnnotationBulkMessage, AnnotationStatus, Detection, etc.)annotation-queue/annotation_queue_handler— async queue consumer with message routing and file management
Internal Interfaces
AnnotationQueueHandler
AnnotationQueueHandler()
AnnotationQueueHandler.start() -> async
AnnotationQueueHandler.on_message(message: AMQPMessage, context: MessageContext) -> None
AnnotationQueueHandler.save_annotation(ann: AnnotationMessage) -> None
AnnotationQueueHandler.validate(msg: AnnotationBulkMessage) -> None
AnnotationQueueHandler.delete(msg: AnnotationBulkMessage) -> None
Key DTOs
AnnotationMessage(msgpack_bytes) # Full annotation with image + detections
AnnotationBulkMessage(msgpack_bytes) # Bulk validate/delete
AnnotationStatus: Created(10), Edited(20), Validated(30), Deleted(40)
RoleEnum: Operator(10), Validator(20), CompanionPC(30), Admin(40), ApiAdmin(1000)
Data Access Patterns
- Queue: Consumes from RabbitMQ Streams queue
azaion-annotationsusing rstream library - Offset persistence:
offset.yamltracks last processed message offset for resume - Filesystem writes:
- Validated annotations →
{root}/data/images/+{root}/data/labels/ - Unvalidated (seed) →
{root}/data-seed/images/+{root}/data-seed/labels/ - Deleted →
{root}/data_deleted/images/+{root}/data_deleted/labels/
- Validated annotations →
Implementation Details
- Message routing: Based on
AnnotationStatusfrom AMQP application properties:- Created/Edited → save label + optionally image; validator role writes to data, operator to seed
- Validated (bulk) → move from seed to data
- Deleted (bulk) → move to deleted directory
- Role-based logic:
RoleEnum.is_validator()returns True for Validator, Admin, ApiAdmin — these roles write directly to validated data directory - Serialization: Messages are msgpack-encoded with positional integer keys. Detections are embedded as a JSON string within the msgpack payload.
- Offset tracking: After each successfully processed message, offset is persisted to
offset.yaml(survives restarts) - Logging: TimedRotatingFileHandler with daily rotation, 7-day retention, writes to
logs/directory - Separate dependencies: Own
requirements.txt(pyyaml, msgpack, rstream only) - Own config.yaml: Points to test directories by default (
data-test,data-test-seed)
Caveats
- Credentials hardcoded in
config.yaml(queue host, user, password) - AnnotationClass duplicated (third copy) with slight differences from dto/ version
- No reconnection logic for queue disconnections
- No dead-letter queue or message retry on processing failures
save_annotationwrites empty label files when detections list has no newline separators between entries- The annotation-queue
config.yamluses different directory names (data-testvsdata) than the mainconfig.yaml— likely a test vs production configuration issue
Dependency Graph
graph TD
annotation_queue_dto --> annotation_queue_handler
rstream_ext[rstream library] --> annotation_queue_handler
msgpack_ext[msgpack library] --> annotation_queue_dto
Logging Strategy
logging module with TimedRotatingFileHandler. Format: HH:MM:SS|message. Daily rotation, 7-day retention. Also outputs to stdout.