Files
ai-training/annotation_queue_handler.py
T
Alex Bezdieniezhnykh b4a4826b19 add rabbit consumer
2025-03-05 20:58:33 +02:00

47 lines
1.9 KiB
Python

import yaml
from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
import constants
from dto.annotation_created_message import AnnotationCreatedMessage
from utils import Dotdict
def main():
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
d_config = Dotdict(config_dict)
queue_config = Dotdict(d_config.queue)
try:
with open(constants.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
offset_queue = offset_dict['offset_queue']
offset_confirmed_queue = offset_dict['offset_confirmed_queue']
except (FileNotFoundError, ValueError):
with open(constants.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0',
'offset_confirmed_queue: 0'])
command_consumer = Consumer(
host=queue_config.host,
port=queue_config.port,
username=queue_config.consumer_user,
password=queue_config.consumer_pw
)
def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool):
annotation_message = AnnotationCreatedMessage(message.body)
print(f'Received: {annotation_message}')
save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir
annotation_message.save_annotation(save_folder)
command_consumer.start()
command_consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue))
command_consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue))
if __name__ == '__main__':
main()