fix rabbit consumer

This commit is contained in:
Alex Bezdieniezhnykh
2025-03-07 14:33:02 +02:00
parent 4e8c67995a
commit edf685fabc
3 changed files with 101 additions and 58 deletions
+46 -35
View File
@@ -6,51 +6,62 @@ from dto.annotation_created_message import AnnotationCreatedMessage
from utils import Dotdict
async def main():
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
d_config = Dotdict(config_dict)
queue_config = Dotdict(d_config.queue)
class AnnotationQueueHandler:
def __init__(self):
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
d_config = Dotdict(config_dict)
queue_config = Dotdict(d_config.queue)
try:
with open(constants.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
offset_queue = offset_dict['offset_queue']
offset_confirmed_queue = offset_dict['offset_confirmed_queue']
except (FileNotFoundError, ValueError):
with open(constants.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0',
'offset_confirmed_queue: 0'])
offset_queue = 0
offset_confirmed_queue = 0
self.consumer = Consumer(
host=queue_config.host,
port=queue_config.port,
username=queue_config.consumer_user,
password=queue_config.consumer_pw
)
try:
with open(constants.OFFSET_FILE, 'r') as f:
offset_dict = yaml.safe_load(f)
self.offset_queue = offset_dict['offset_queue']
self.offset_confirmed_queue = offset_dict['offset_confirmed_queue']
except (FileNotFoundError, ValueError):
with open(constants.OFFSET_FILE, 'w') as f:
f.writelines(['offset_queue: 0',
'offset_confirmed_queue: 0'])
self.offset_queue = 0
self.offset_confirmed_queue = 0
consumer = Consumer(
host=queue_config.host,
port=queue_config.port,
username=queue_config.consumer_user,
password=queue_config.consumer_pw
)
def on_message(message: AMQPMessage, context: MessageContext, is_seed: bool):
def on_message(self, message: AMQPMessage, context: MessageContext, is_seed: bool):
annotation_message = AnnotationCreatedMessage(message)
print(f'Received: {annotation_message}')
save_folder = constants.data_seeds_dir if is_seed else constants.data_rabbit_dir
annotation_message.save_annotation(save_folder)
if is_seed:
self.offset_queue = context.offset
else:
self.offset_confirmed_queue = context.offset
with open(constants.OFFSET_FILE, 'w') as offset_file:
yaml.dump({'offset_queue': self.offset_queue,
'offset_confirmed_queue': self.offset_confirmed_queue}, offset_file)
await consumer.start()
await consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=True),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_queue))
async def start(self):
await self.consumer.start()
await self.consumer.subscribe(stream=constants.ANNOTATIONS_QUEUE,
callback=lambda msg, context: self.on_message(msg, context, is_seed=True),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_queue))
await consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE, callback=lambda msg, context: on_message(msg, context, is_seed=False),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset_confirmed_queue))
await self.consumer.subscribe(stream=constants.ANNOTATIONS_CONFIRMED_QUEUE,
callback=lambda msg, context: self.on_message(msg, context, is_seed=False),
offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, self.offset_confirmed_queue))
try:
await self.consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
if __name__ == '__main__':
with asyncio.Runner() as runner:
runner.run(main())
runner.run(AnnotationQueueHandler().start())