using System.Drawing.Imaging; using System.IO; using System.Net; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Queue; using Azaion.Common.Events; using Azaion.Common.Extensions; using Azaion.CommonSecurity.DTO; using Azaion.CommonSecurity.Services; using LinqToDB; using LinqToDB.Data; using MediatR; using MessagePack; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Common.Services; public class AnnotationService : INotificationHandler { private readonly IDbFactory _dbFactory; private readonly FailsafeAnnotationsProducer _producer; private readonly IGalleryService _galleryService; private readonly IMediator _mediator; private readonly IHardwareService _hardwareService; private readonly IAuthProvider _authProvider; private readonly QueueConfig _queueConfig; private Consumer _consumer = null!; private static readonly Guid SaveTaskId = Guid.NewGuid(); public AnnotationService( IResourceLoader resourceLoader, IDbFactory dbFactory, FailsafeAnnotationsProducer producer, IOptions queueConfig, IGalleryService galleryService, IMediator mediator, IHardwareService hardwareService, IAuthProvider authProvider) { _dbFactory = dbFactory; _producer = producer; _galleryService = galleryService; _mediator = mediator; _hardwareService = hardwareService; _authProvider = authProvider; _queueConfig = queueConfig.Value; Task.Run(async () => await Init()).Wait(); } private async Task Init(CancellationToken cancellationToken = default) { var consumerSystem = await StreamSystem.Create(new StreamSystemConfig { Endpoints = new List{new DnsEndPoint(_queueConfig.Host, _queueConfig.Port)}, UserName = _queueConfig.ConsumerUsername, Password = _queueConfig.ConsumerPassword }); var offset = (await _dbFactory.Run(db => db.QueueOffsets.FirstOrDefaultAsync( x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE, token: cancellationToken)) )?.Offset ?? 0; _consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE) { Reference = _hardwareService.GetHardware().Hash, OffsetSpec = new OffsetTypeOffset(offset + 1), MessageHandler = async (stream, consumer, context, message) => { var msg = MessagePackSerializer.Deserialize(message.Data.Contents); if (msg.CreatedRole != RoleEnum.Operator) //Process only operator's messages return; await SaveAnnotationInner( msg.CreatedDate, msg.OriginalMediaName, msg.Time, msg.ImageExtension, JsonConvert.DeserializeObject>(msg.Detections) ?? [], msg.Source, new MemoryStream(msg.Image), msg.CreatedRole, msg.CreatedEmail, generateThumbnail: true, cancellationToken); await _dbFactory.Run(async db => await db.QueueOffsets .Where(x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE) .Set(x => x.Offset, context.Offset) .UpdateAsync(token: cancellationToken)); await ThrottleExt.ThrottleRunAfter(() => { _dbFactory.SaveToDisk(); return Task.CompletedTask; }, SaveTaskId, TimeSpan.FromSeconds(3), cancellationToken); } }); } //AI public async Task SaveAnnotation(AnnotationImage a, CancellationToken cancellationToken = default) { a.Time = TimeSpan.FromMilliseconds(a.Milliseconds); return await SaveAnnotationInner(DateTime.Now, a.OriginalMediaName, a.Time, ".jpg", a.Detections.ToList(), a.Source, new MemoryStream(a.Image), a.CreatedRole, a.CreatedEmail, generateThumbnail: true, cancellationToken); } //Manual public async Task SaveAnnotation(string originalMediaName, TimeSpan time, string imageExtension, List detections, SourceEnum source, Stream? stream = null, CancellationToken token = default) => await SaveAnnotationInner(DateTime.UtcNow, originalMediaName, time, imageExtension, detections, source, stream, _authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: true, token); //Manual Validate existing public async Task ValidateAnnotation(Annotation annotation, CancellationToken token = default) => await SaveAnnotationInner(DateTime.UtcNow, annotation.OriginalMediaName, annotation.Time, annotation.ImageExtension, annotation.Detections.ToList(), SourceEnum.Manual, null, _authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: false, token); // Manual save from Validators -> Validated -> stream: azaion-annotations-confirm // AI, Manual save from Operators -> Created -> stream: azaion-annotations private async Task SaveAnnotationInner(DateTime createdDate, string originalMediaName, TimeSpan time, string imageExtension, List detections, SourceEnum source, Stream? stream, RoleEnum userRole, string createdEmail, bool generateThumbnail = false, CancellationToken token = default) { AnnotationStatus status; var fName = originalMediaName.ToTimeName(time); var annotation = await _dbFactory.Run(async db => { var ann = await db.Annotations.FirstOrDefaultAsync(x => x.Name == fName, token: token); status = userRole.IsValidator() && source == SourceEnum.Manual ? AnnotationStatus.Validated : AnnotationStatus.Created; await db.Detections.DeleteAsync(x => x.AnnotationName == fName, token: token); await db.BulkCopyAsync(detections, cancellationToken: token); if (ann != null) { await db.Annotations .Where(x => x.Name == fName) .Set(x => x.Source, source) .Set(x => x.AnnotationStatus, status) .UpdateAsync(token: token); ann.Detections = detections; } else { ann = new Annotation { CreatedDate = createdDate, Name = fName, OriginalMediaName = originalMediaName, Time = time, ImageExtension = imageExtension, CreatedEmail = createdEmail, CreatedRole = userRole, AnnotationStatus = status, Source = source, Detections = detections }; await db.InsertAsync(ann, token: token); } return ann; }); if (stream != null) { var img = System.Drawing.Image.FromStream(stream); img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue } await YoloLabel.WriteToFile(detections, annotation.LabelPath, token); if (generateThumbnail) await _galleryService.CreateThumbnail(annotation, token); await _producer.SendToInnerQueue(annotation, token); await _mediator.Publish(new AnnotationCreatedEvent(annotation), token); await ThrottleExt.ThrottleRunAfter(() => { _dbFactory.SaveToDisk(); return Task.CompletedTask; }, SaveTaskId, TimeSpan.FromSeconds(5), token); return annotation; } public async Task Handle(AnnotationsDeletedEvent notification, CancellationToken cancellationToken) { await _dbFactory.DeleteAnnotations(notification.Annotations, cancellationToken); foreach (var annotation in notification.Annotations) { File.Delete(annotation.ImagePath); File.Delete(annotation.LabelPath); File.Delete(annotation.ThumbPath); } } }