using System.IO; using System.Net; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Queue; using Azaion.Common.Extensions; using LinqToDB; using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.AMQP; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Common.Services; public class FailsafeAnnotationsProducer { private readonly ILogger _logger; private readonly IDbFactory _dbFactory; private readonly IAzaionApi _azaionApi; private readonly QueueConfig _queueConfig; private readonly UIConfig _uiConfig; private Producer _annotationProducer = null!; public FailsafeAnnotationsProducer(ILogger logger, IDbFactory dbFactory, IOptions queueConfig, IOptions uiConfig, IAzaionApi azaionApi) { _logger = logger; _dbFactory = dbFactory; _azaionApi = azaionApi; _queueConfig = queueConfig.Value; _uiConfig = uiConfig.Value; Task.Run(async () => await ProcessQueue()); } private async Task GetProducerQueueConfig() { return await StreamSystem.Create(new StreamSystemConfig { Endpoints = new List { new IPEndPoint(IPAddress.Parse(_queueConfig.Host), _queueConfig.Port) }, UserName = _queueConfig.ProducerUsername, Password = _queueConfig.ProducerPassword }); } private async Task ProcessQueue(CancellationToken ct = default) { _annotationProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_QUEUE)); var currentUser = await _azaionApi.GetCurrentUserAsync(); while (!ct.IsCancellationRequested) { var sent = false; while (!sent || !ct.IsCancellationRequested) //Waiting for send { try { var (records, annotationsDict) = await _dbFactory.Run(async db => { var records = await db.AnnotationsQueueRecords.OrderBy(x => x.DateTime).ToListAsync(token: ct); var editedCreatedNames = records .Where(x => x.Operation.In(AnnotationStatus.Created, AnnotationStatus.Edited)) .Select(x => x.AnnotationNames.FirstOrDefault()) .ToList(); var annotationsDict = await db.Annotations.LoadWith(x => x.Detections) .Where(x => editedCreatedNames.Contains(x.Name)) .ToDictionaryAsync(a => a.Name, token: ct); return (records, annotationsDict); }); var messages = new List(); foreach (var record in records) { var appProperties = new ApplicationProperties { { nameof(AnnotationStatus), record.Operation.ToString() }, { nameof(User.Email), currentUser.Email } }; if (record.Operation.In(AnnotationStatus.Validated, AnnotationStatus.Deleted)) { var message = new Message(MessagePackSerializer.Serialize(new AnnotationBulkMessage { AnnotationNames = record.AnnotationNames.ToArray(), AnnotationStatus = record.Operation, Email = currentUser.Email, CreatedDate = record.DateTime })) { ApplicationProperties = appProperties }; messages.Add(message); } else { var annotation = annotationsDict!.GetValueOrDefault(record.AnnotationNames.FirstOrDefault()); if (annotation == null) continue; var image = record.Operation == AnnotationStatus.Created ? await File.ReadAllBytesAsync(annotation.ImagePath, ct) : null; var annMessage = new AnnotationMessage { Name = annotation.Name, OriginalMediaName = annotation.OriginalMediaName, Time = annotation.Time, Role = annotation.CreatedRole, Email = annotation.CreatedEmail, CreatedDate = annotation.CreatedDate, Status = annotation.AnnotationStatus, ImageExtension = annotation.ImageExtension, Image = image, Detections = JsonConvert.SerializeObject(annotation.Detections), Source = annotation.Source, }; var message = new Message(MessagePackSerializer.Serialize(annMessage)) { ApplicationProperties = appProperties }; messages.Add(message); } } if (messages.Any()) { await _annotationProducer.Send(messages, CompressionType.Gzip); var ids = records.Select(x => x.Id).ToList(); var removed = await _dbFactory.RunWrite(async db => await db.AnnotationsQueueRecords.DeleteAsync(x => ids.Contains(x.Id), token: ct)); sent = true; } } catch (Exception e) { _logger.LogError(e, e.Message); await Task.Delay(TimeSpan.FromSeconds(10), ct); } await Task.Delay(TimeSpan.FromSeconds(10), ct); } } await Task.Delay(TimeSpan.FromSeconds(5), ct); } public async Task SendToInnerQueue(List annotationNames, AnnotationStatus status, CancellationToken cancellationToken = default) { if (_uiConfig.SilentDetection) return; await _dbFactory.RunWrite(async db => await db.InsertAsync(new AnnotationQueueRecord { Id = Guid.NewGuid(), DateTime = DateTime.UtcNow, Operation = status, AnnotationNames = annotationNames }, token: cancellationToken)); } }