using System.Drawing.Imaging; using System.IO; using System.Net; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Queue; using Azaion.Common.Events; using Azaion.Common.Extensions; using Azaion.CommonSecurity.DTO; using Azaion.CommonSecurity.Services; using LinqToDB; using LinqToDB.Data; using MediatR; using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Common.Services; public class AnnotationService : IAnnotationService, INotificationHandler { private readonly IDbFactory _dbFactory; private readonly FailsafeAnnotationsProducer _producer; private readonly IGalleryService _galleryService; private readonly IMediator _mediator; private readonly IAzaionApi _api; private readonly ILogger _logger; private readonly QueueConfig _queueConfig; private Consumer _consumer = null!; private readonly UIConfig _uiConfig; private readonly DirectoriesConfig _dirConfig; private static readonly Guid SaveTaskId = Guid.NewGuid(); private static readonly Guid SaveQueueOffsetTaskId = Guid.NewGuid(); public AnnotationService( IDbFactory dbFactory, FailsafeAnnotationsProducer producer, IOptions queueConfig, IOptions uiConfig, IOptions directoriesConfig, IGalleryService galleryService, IMediator mediator, IAzaionApi api, ILogger logger) { _dbFactory = dbFactory; _producer = producer; _galleryService = galleryService; _mediator = mediator; _api = api; _logger = logger; _queueConfig = queueConfig.Value; _uiConfig = uiConfig.Value; _dirConfig = directoriesConfig.Value; Task.Run(async () => await InitQueueConsumer()).Wait(); } private async Task InitQueueConsumer(CancellationToken cancellationToken = default) { if (!_api.CurrentUser.Role.IsValidator()) return; var consumerSystem = await StreamSystem.Create(new StreamSystemConfig { Endpoints = new List{new DnsEndPoint(_queueConfig.Host, _queueConfig.Port)}, UserName = _queueConfig.ConsumerUsername, Password = _queueConfig.ConsumerPassword }); var offsets = _api.CurrentUser.UserConfig?.QueueOffsets ?? new UserQueueOffsets(); _consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE) { Reference = _api.CurrentUser.Email, OffsetSpec = new OffsetTypeOffset(offsets.AnnotationsOffset), MessageHandler = async (_, _, context, message) => { try { var email = (string)message.ApplicationProperties[nameof(User.Email)]!; if (!Enum.TryParse((string)message.ApplicationProperties[nameof(AnnotationStatus)], out var annotationStatus)) return; if (email != _api.CurrentUser.Email) //Don't process messages by yourself { if (annotationStatus.In(AnnotationStatus.Created, AnnotationStatus.Edited)) { var msg = MessagePackSerializer.Deserialize(message.Data.Contents); await SaveAnnotationInner( msg.CreatedDate, msg.OriginalMediaName, msg.Time, JsonConvert.DeserializeObject>(msg.Detections) ?? [], msg.Source, msg.Image == null ? null : new MemoryStream(msg.Image), msg.Role, msg.Email, fromQueue: true, token: cancellationToken); } else { var msg = MessagePackSerializer.Deserialize(message.Data.Contents); if (annotationStatus == AnnotationStatus.Validated) await ValidateAnnotations(msg.AnnotationNames.ToList(), true, cancellationToken); if (annotationStatus == AnnotationStatus.Deleted) await _mediator.Publish(new AnnotationsDeletedEvent(msg.AnnotationNames.ToList(), fromQueue:true), cancellationToken); } } offsets.AnnotationsOffset = context.Offset + 1; //to consume on the next launch from the next message ThrottleExt.Throttle(() => { _api.UpdateOffsets(offsets); return Task.CompletedTask; }, SaveQueueOffsetTaskId, TimeSpan.FromSeconds(10), scheduleCallAfterCooldown: true); } catch (Exception e) { _logger.LogError(e, e.Message); } } }); } //AI public async Task SaveAnnotation(AnnotationImage a, CancellationToken ct = default) { a.Time = TimeSpan.FromMilliseconds(a.Milliseconds); return await SaveAnnotationInner(DateTime.Now, a.OriginalMediaName, a.Time, a.Detections.ToList(), SourceEnum.AI, new MemoryStream(a.Image), _api.CurrentUser.Role, _api.CurrentUser.Email, token: ct); } //Manual public async Task SaveAnnotation(string originalMediaName, TimeSpan time, List detections, Stream? stream = null, CancellationToken token = default) => await SaveAnnotationInner(DateTime.UtcNow, originalMediaName, time, detections, SourceEnum.Manual, stream, _api.CurrentUser.Role, _api.CurrentUser.Email, token: token); private async Task SaveAnnotationInner(DateTime createdDate, string originalMediaName, TimeSpan time, List detections, SourceEnum source, Stream? stream, RoleEnum userRole, string createdEmail, bool fromQueue = false, CancellationToken token = default) { var status = AnnotationStatus.Created; var fName = originalMediaName.ToTimeName(time); var annotation = await _dbFactory.Run(async db => { var ann = await db.Annotations .LoadWith(x => x.Detections) .FirstOrDefaultAsync(x => x.Name == fName, token: token); await db.Detections.DeleteAsync(x => x.AnnotationName == fName, token: token); if (ann != null) //Annotation is already exists { status = AnnotationStatus.Edited; var annotationUpdatable = db.Annotations .Where(x => x.Name == fName) .Set(x => x.Source, source); if (userRole.IsValidator() && source == SourceEnum.Manual) { annotationUpdatable = annotationUpdatable .Set(x => x.ValidateDate, createdDate) .Set(x => x.ValidateEmail, createdEmail); } await annotationUpdatable .Set(x => x.AnnotationStatus, status) .UpdateAsync(token: token); ann.Detections = detections; } else { ann = new Annotation { CreatedDate = createdDate, Name = fName, OriginalMediaName = originalMediaName, Time = time, ImageExtension = Constants.JPG_EXT, CreatedEmail = createdEmail, CreatedRole = userRole, AnnotationStatus = status, Source = source, Detections = detections }; await db.InsertAsync(ann, token: token); } await db.BulkCopyAsync(detections, cancellationToken: token); return ann; }); if (stream != null) { var img = System.Drawing.Image.FromStream(stream); img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue } await YoloLabel.WriteToFile(detections, annotation.LabelPath, token); await _galleryService.CreateThumbnail(annotation, token); if (_uiConfig.GenerateAnnotatedImage) await _galleryService.CreateAnnotatedImage(annotation, token); await _mediator.Publish(new AnnotationCreatedEvent(annotation), token); if (!fromQueue) //Send to queue only if we're not getting from queue already await _producer.SendToInnerQueue([annotation.Name], status, token); ThrottleExt.Throttle(async () => { _dbFactory.SaveToDisk(); await Task.CompletedTask; }, SaveTaskId, TimeSpan.FromSeconds(5), true); return annotation; } public async Task ValidateAnnotations(List annotationNames, bool fromQueue = false, CancellationToken token = default) { if (!_api.CurrentUser.Role.IsValidator()) return; var annNames = annotationNames.ToHashSet(); await _dbFactory.Run(async db => { await db.Annotations .Where(x => annNames.Contains(x.Name)) .Set(x => x.AnnotationStatus, AnnotationStatus.Validated) .Set(x => x.ValidateDate, DateTime.UtcNow) .Set(x => x.ValidateEmail, _api.CurrentUser.Email) .UpdateAsync(token: token); }); if (!fromQueue) await _producer.SendToInnerQueue(annotationNames, AnnotationStatus.Validated, token); ThrottleExt.Throttle(async () => { _dbFactory.SaveToDisk(); await Task.CompletedTask; }, SaveTaskId, TimeSpan.FromSeconds(5), true); } public async Task Handle(AnnotationsDeletedEvent notification, CancellationToken ct) { await _dbFactory.DeleteAnnotations(notification.AnnotationNames, ct); foreach (var name in notification.AnnotationNames) { File.Delete(Path.Combine(_dirConfig.ImagesDirectory, $"{name}{Constants.JPG_EXT}")); File.Delete(Path.Combine(_dirConfig.LabelsDirectory, $"{name}{Constants.TXT_EXT}")); File.Delete(Path.Combine(_dirConfig.ThumbnailsDirectory, $"{name}{Constants.THUMBNAIL_PREFIX}{Constants.JPG_EXT}")); File.Delete(Path.Combine(_dirConfig.ResultsDirectory, $"{name}{Constants.RESULT_PREFIX}{Constants.JPG_EXT}")); } //Only validators can send Delete to the queue if (!notification.FromQueue && _api.CurrentUser.Role.IsValidator()) await _producer.SendToInnerQueue(notification.AnnotationNames, AnnotationStatus.Deleted, ct); ThrottleExt.Throttle(async () => { _dbFactory.SaveToDisk(); await Task.CompletedTask; }, SaveTaskId, TimeSpan.FromSeconds(5), true); } } public interface IAnnotationService { Task SaveAnnotation(AnnotationImage a, CancellationToken ct = default); Task SaveAnnotation(string originalMediaName, TimeSpan time, List detections, Stream? stream = null, CancellationToken token = default); Task ValidateAnnotations(List annotationNames, bool fromQueue = false, CancellationToken token = default); }