Files
annotations/Azaion.Common/Services/FailsafeProducer.cs
T
Alex Bezdieniezhnykh d842466594 gps matcher async
put cryptography lib to fixed version
fix race condition bug in queue handler
add lock to db writing and backup to file db on each write
2025-05-29 00:35:35 +03:00

163 lines
6.9 KiB
C#

using System.IO;
using System.Net;
using Azaion.Common.Database;
using Azaion.Common.DTO.Config;
using Azaion.Common.DTO.Queue;
using Azaion.Common.Extensions;
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.Services;
using LinqToDB;
using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using RabbitMQ.Stream.Client.Reliable;
namespace Azaion.Common.Services;
public class FailsafeAnnotationsProducer
{
private readonly ILogger<FailsafeAnnotationsProducer> _logger;
private readonly IDbFactory _dbFactory;
private readonly IAzaionApi _azaionApi;
private readonly QueueConfig _queueConfig;
private readonly UIConfig _uiConfig;
private Producer _annotationProducer = null!;
public FailsafeAnnotationsProducer(ILogger<FailsafeAnnotationsProducer> logger,
IDbFactory dbFactory,
IOptions<QueueConfig> queueConfig,
IOptions<UIConfig> uiConfig,
IAzaionApi azaionApi)
{
_logger = logger;
_dbFactory = dbFactory;
_azaionApi = azaionApi;
_queueConfig = queueConfig.Value;
_uiConfig = uiConfig.Value;
Task.Run(async () => await ProcessQueue());
}
private async Task<StreamSystem> GetProducerQueueConfig()
{
return await StreamSystem.Create(new StreamSystemConfig
{
Endpoints = new List<EndPoint> { new IPEndPoint(IPAddress.Parse(_queueConfig.Host), _queueConfig.Port) },
UserName = _queueConfig.ProducerUsername,
Password = _queueConfig.ProducerPassword
});
}
private async Task ProcessQueue(CancellationToken ct = default)
{
_annotationProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_QUEUE));
while (!ct.IsCancellationRequested)
{
var sent = false;
while (!sent || !ct.IsCancellationRequested) //Waiting for send
{
try
{
var (records, annotationsDict) = await _dbFactory.Run(async db =>
{
var records = await db.AnnotationsQueueRecords.OrderBy(x => x.DateTime).ToListAsync(token: ct);
var editedCreatedNames = records
.Where(x => x.Operation.In(AnnotationStatus.Created, AnnotationStatus.Edited))
.Select(x => x.AnnotationNames.FirstOrDefault())
.ToList();
var annotationsDict = await db.Annotations.LoadWith(x => x.Detections)
.Where(x => editedCreatedNames.Contains(x.Name))
.ToDictionaryAsync(a => a.Name, token: ct);
return (records, annotationsDict);
});
var messages = new List<Message>();
foreach (var record in records)
{
var appProperties = new ApplicationProperties
{
{ nameof(AnnotationStatus), record.Operation.ToString() },
{ nameof(User.Email), _azaionApi.CurrentUser.Email }
};
if (record.Operation.In(AnnotationStatus.Validated, AnnotationStatus.Deleted))
{
var message = new Message(MessagePackSerializer.Serialize(new AnnotationBulkMessage
{
AnnotationNames = record.AnnotationNames.ToArray(),
AnnotationStatus = record.Operation,
Email = _azaionApi.CurrentUser.Email,
CreatedDate = record.DateTime
})) { ApplicationProperties = appProperties };
messages.Add(message);
}
else
{
var annotation = annotationsDict!.GetValueOrDefault(record.AnnotationNames.FirstOrDefault());
if (annotation == null)
continue;
var image = record.Operation == AnnotationStatus.Created
? await File.ReadAllBytesAsync(annotation.ImagePath, ct)
: null;
var annMessage = new AnnotationMessage
{
Name = annotation.Name,
OriginalMediaName = annotation.OriginalMediaName,
Time = annotation.Time,
Role = annotation.CreatedRole,
Email = annotation.CreatedEmail,
CreatedDate = annotation.CreatedDate,
Status = annotation.AnnotationStatus,
ImageExtension = annotation.ImageExtension,
Image = image,
Detections = JsonConvert.SerializeObject(annotation.Detections),
Source = annotation.Source,
};
var message = new Message(MessagePackSerializer.Serialize(annMessage)) { ApplicationProperties = appProperties };
messages.Add(message);
}
}
if (messages.Any())
{
await _annotationProducer.Send(messages, CompressionType.Gzip);
var ids = records.Select(x => x.Id).ToList();
var removed = await _dbFactory.RunWrite(async db => await db.AnnotationsQueueRecords.DeleteAsync(x => ids.Contains(x.Id), token: ct));
sent = true;
}
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
await Task.Delay(TimeSpan.FromSeconds(10), ct);
}
await Task.Delay(TimeSpan.FromSeconds(10), ct);
}
}
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
public async Task SendToInnerQueue(List<string> annotationNames, AnnotationStatus status, CancellationToken cancellationToken = default)
{
if (_uiConfig.SilentDetection)
return;
await _dbFactory.RunWrite(async db =>
await db.InsertAsync(new AnnotationQueueRecord
{
Id = Guid.NewGuid(),
DateTime = DateTime.UtcNow,
Operation = status,
AnnotationNames = annotationNames
}, token: cancellationToken));
}
}