using System.IO; using System.Net; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Queue; using LinqToDB; using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Common.Services; public class FailsafeAnnotationsProducer { private readonly ILogger _logger; private readonly IDbFactory _dbFactory; private readonly QueueConfig _queueConfig; private Producer _annotationProducer = null!; private Producer _annotationConfirmProducer = null!; public FailsafeAnnotationsProducer(ILogger logger, IDbFactory dbFactory, IOptions queueConfig) { _logger = logger; _dbFactory = dbFactory; _queueConfig = queueConfig.Value; Task.Run(async () => await ProcessQueue()); } private async Task GetProducerQueueConfig() { return await StreamSystem.Create(new StreamSystemConfig { Endpoints = new List { new IPEndPoint(IPAddress.Parse(_queueConfig.Host), _queueConfig.Port) }, UserName = _queueConfig.ProducerUsername, Password = _queueConfig.ProducerPassword }); } private async Task Init(CancellationToken cancellationToken = default) { _annotationProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_QUEUE)); _annotationConfirmProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_CONFIRM_QUEUE)); } private async Task ProcessQueue(CancellationToken cancellationToken = default) { await Init(cancellationToken); while (!cancellationToken.IsCancellationRequested) { var messages = await GetFromInnerQueue(cancellationToken); foreach (var messagesChunk in messages.Chunk(10)) //Sending by 10 { var sent = false; while (!sent || cancellationToken.IsCancellationRequested) //Waiting for send { try { var createdMessages = messagesChunk .Where(x => x.Status == AnnotationStatus.Created) .Select(x => new Message(MessagePackSerializer.Serialize(x))) .ToList(); if (createdMessages.Any()) await _annotationProducer.Send(createdMessages, CompressionType.Gzip); var validatedMessages = messagesChunk .Where(x => x.Status == AnnotationStatus.Validated) .Select(x => new Message(MessagePackSerializer.Serialize(x))) .ToList(); if (validatedMessages.Any()) await _annotationConfirmProducer.Send(validatedMessages, CompressionType.Gzip); await _dbFactory.Run(async db => await db.AnnotationsQueue.DeleteAsync(aq => messagesChunk.Any(x => aq.Name == x.OriginalMediaName), token: cancellationToken)); sent = true; } catch (Exception e) { _logger.LogError(e, e.Message); await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); } await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); } } await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); } } private async Task> GetFromInnerQueue(CancellationToken cancellationToken = default) { return await _dbFactory.Run(async db => { var annotations = await db.AnnotationsQueue.Join( db.Annotations.LoadWith(x => x.Detections), aq => aq.Name, a => a.Name, (aq, a) => a) .ToListAsync(token: cancellationToken); var messages = new List(); foreach (var annotation in annotations) { var image = await File.ReadAllBytesAsync(annotation.ImagePath, cancellationToken); var annCreateMessage = new AnnotationCreatedMessage { Name = annotation.Name, OriginalMediaName = annotation.OriginalMediaName, Time = annotation.Time, CreatedRole = annotation.CreatedRole, CreatedEmail = annotation.CreatedEmail, CreatedDate = annotation.CreatedDate, Status = annotation.AnnotationStatus, ImageExtension = annotation.ImageExtension, Image = image, Detections = JsonConvert.SerializeObject(annotation.Detections), Source = annotation.Source, }; messages.Add(annCreateMessage); } return messages; }); } public async Task SendToInnerQueue(Annotation annotation, CancellationToken cancellationToken = default) { await _dbFactory.Run(async db => await db.InsertAsync(new AnnotationName { Name = annotation.Name }, token: cancellationToken)); } }