Files
ai-training/annotation_queue_handler.py
T
Alex Bezdieniezhnykh edf685fabc fix rabbit consumer
2025-03-07 14:33:02 +02:00

67 lines
2.9 KiB
Python

import yaml
import asyncio
from rstream import Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
import constants
from dto.annotation_created_message import AnnotationCreatedMessage
from utils import Dotdict
class AnnotationQueueHandler:
def __init__(self):
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
d_config = Dotdict(config_dict)
queue_config = Dotdict(d_config.queue)
self.consumer = Consumer(
host=queue_config.host,
port=queue_config.port,
username=queue_config.consumer_user,
password=queue_config.consumer_pw
)
try:
with open(constants.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
self.offset_queue = offset_dict['offset_queue']
self.offset_confirmed_queue = offset_dict['offset_confirmed_queue']
except (FileNotFoundError, ValueError):
with open(constants.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0',
'offset_confirmed_queue: 0'])
self.offset_queue = 0
self.offset_confirmed_queue = 0
def on_message(self, message: AMQPMessage, context: MessageContext, is_seed: bool):
annotation_message = AnnotationCreatedMessage(message)
print(f'Received: {annotation_message}')
save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir
annotation_message.save_annotation(save_folder)
if is_seed:
self.offset_queue = context.offset
else:
self.offset_confirmed_queue = context.offset
with open(constants.OFFSET_FILE, 'w') as offset_file:
yaml.dump({'offset_queue': self.offset_queue,
'offset_confirmed_queue': self.offset_confirmed_queue}, offset_file)
async def start(self):
await self.consumer.start()
await self.consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE,
callback=lambda msg, context: self.on_message(msg, context, is_seed=True),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue))
await self.consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE,
callback=lambda msg, context: self.on_message(msg, context, is_seed=False),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_confirmed_queue))
try:
await self.consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
if __name__ == '__main__':
with asyncio.Runner() as runner:
runner.run(AnnotationQueueHandler().start())