using System.Net; using System.Text.Json; using LinqToDB; using Azaion.Annotations.Database; using Azaion.Annotations.Database.Entities; using Azaion.Annotations.DTOs; using Azaion.Annotations.Enums; using MessagePack; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.AMQP; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Annotations.Services; public class RabbitMqConfig { public string Host { get; set; } = "rabbitmq"; public int Port { get; set; } = 5552; public string Username { get; set; } = "azaion_producer"; public string Password { get; set; } = "producer_pass"; public string StreamName { get; set; } = "azaion-annotations"; } public class FailsafeProducer( IServiceScopeFactory scopeFactory, PathResolver pathResolver, RabbitMqConfig config, ILogger logger) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken ct) { await Task.Delay(TimeSpan.FromSeconds(5), ct); while (!ct.IsCancellationRequested) { try { await ProcessQueue(ct); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; } catch (Exception ex) { logger.LogError(ex, ex.Message); await Task.Delay(TimeSpan.FromSeconds(10), ct); } } } private async Task ProcessQueue(CancellationToken ct) { var streamSystem = await StreamSystem.Create(new StreamSystemConfig { Endpoints = [new IPEndPoint(IPAddress.Parse(config.Host), config.Port)], UserName = config.Username, Password = config.Password }); var producer = await Producer.Create(new ProducerConfig(streamSystem, config.StreamName)); try { while (!ct.IsCancellationRequested) { await DrainQueue(producer, ct); await Task.Delay(TimeSpan.FromSeconds(10), ct); } } finally { await producer.Close(); await streamSystem.Close(); } } private async Task DrainQueue(Producer producer, CancellationToken ct) { using var scope = scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var records = await db.AnnotationsQueueRecords .OrderBy(x => x.DateTime) .ToListAsync(token: ct); if (records.Count == 0) return; var createdIds = records .Where(x => x.Operation == QueueOperation.Created) .SelectMany(x => ParseIds(x.AnnotationIds)) .ToList(); var annotationsDict = createdIds.Count > 0 ? await db.Annotations .LoadWith(a => a.Detections) .Where(a => createdIds.Contains(a.Id)) .ToDictionaryAsync(a => a.Id, ct) : new Dictionary(); var messages = new List(); foreach (var record in records) { var ids = ParseIds(record.AnnotationIds); if (record.Operation is QueueOperation.Validated or QueueOperation.Deleted) { var msg = MessagePackSerializer.Serialize(new AnnotationBulkQueueMessage { AnnotationIds = ids.ToArray(), Operation = (int)record.Operation, CreatedDate = record.DateTime }); messages.Add(new Message(msg) { ApplicationProperties = new ApplicationProperties { { "Operation", record.Operation.ToString() } } }); } else { foreach (var id in ids) { if (!annotationsDict.TryGetValue(id, out var annotation)) continue; byte[]? image = null; try { var imgPath = await pathResolver.GetImagePath(id); if (File.Exists(imgPath)) image = await File.ReadAllBytesAsync(imgPath, ct); } catch { } var detectionsJson = JsonSerializer.Serialize( annotation.Detections?.Select(d => new { d.CenterX, d.CenterY, d.Width, d.Height, d.ClassNum, d.Label, d.Confidence }) ?? []); var msg = MessagePackSerializer.Serialize(new AnnotationQueueMessage { Name = annotation.Id, MediaHash = annotation.MediaId, OriginalMediaName = annotation.MediaId, Time = TimeSpan.FromTicks(annotation.TimeTicks), ImageExtension = ".jpg", Detections = detectionsJson, Image = image, Email = "", Source = (int)annotation.Source, Status = (int)annotation.Status, CreatedDate = annotation.CreatedDate }); messages.Add(new Message(msg) { ApplicationProperties = new ApplicationProperties { { "Operation", record.Operation.ToString() } } }); } } } if (messages.Count > 0) { await producer.Send(messages, CompressionType.Gzip); var recordIds = records.Select(x => x.Id).ToList(); await db.AnnotationsQueueRecords .Where(x => recordIds.Contains(x.Id)) .DeleteAsync(token: ct); } } private static List ParseIds(string json) { try { return JsonSerializer.Deserialize>(json) ?? []; } catch { return []; } } public static async Task EnqueueAsync(AppDataConnection db, string annotationId, QueueOperation operation) { var ids = JsonSerializer.Serialize(new[] { annotationId }); await db.InsertAsync(new AnnotationsQueueRecord { Id = Guid.NewGuid(), DateTime = DateTime.UtcNow, Operation = operation, AnnotationIds = ids }); } }