using System.Drawing.Imaging; using System.IO; using System.Net; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Queue; using Azaion.CommonSecurity.DTO; using Azaion.CommonSecurity.Services; using LinqToDB; using MediatR; using MessagePack; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; namespace Azaion.Common.Services; public class AnnotationService { private readonly AzaionApiClient _apiClient; private readonly IDbFactory _dbFactory; private readonly FailsafeAnnotationsProducer _producer; private readonly IGalleryService _galleryService; private readonly IMediator _mediator; private readonly QueueConfig _queueConfig; private Consumer _consumer = null!; public AnnotationService(AzaionApiClient apiClient, IDbFactory dbFactory, FailsafeAnnotationsProducer producer, IOptions queueConfig, IGalleryService galleryService, IMediator mediator) { _apiClient = apiClient; _dbFactory = dbFactory; _producer = producer; _galleryService = galleryService; _mediator = mediator; _queueConfig = queueConfig.Value; Task.Run(async () => await Init()).Wait(); } private async Task Init() { var consumerSystem = await StreamSystem.Create(new StreamSystemConfig { Endpoints = new List{new DnsEndPoint(_queueConfig.Host, _queueConfig.Port)}, UserName = _queueConfig.ConsumerUsername, Password = _queueConfig.ConsumerPassword }); _consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE) { OffsetSpec = new OffsetTypeFirst(), MessageHandler = async (stream, _, _, message) => await Consume(MessagePackSerializer.Deserialize(message.Data.Contents)), }); } //AI / Manual public async Task SaveAnnotation(string fName, string imageExtension, List detections, SourceEnum source, Stream? stream = null, CancellationToken token = default) => await SaveAnnotationInner(DateTime.UtcNow, fName, imageExtension, detections, source, stream, _apiClient.User.Role, _apiClient.User.Email, token); //Queue (only from operators) public async Task Consume(AnnotationCreatedMessage message, CancellationToken cancellationToken = default) { if (message.CreatedRole == RoleEnum.Validator) //Don't proceed our own messages (or from another Validator) return; await SaveAnnotationInner( message.CreatedDate, message.Name, message.ImageExtension, JsonConvert.DeserializeObject>(message.Detections) ?? [], message.Source, new MemoryStream(message.Image), message.CreatedRole, message.CreatedEmail, cancellationToken); } private async Task SaveAnnotationInner(DateTime createdDate, string fName, string imageExtension, List detections, SourceEnum source, Stream? stream, RoleEnum createdRole, string createdEmail, CancellationToken token = default) { //Flow for roles: // Operator: // sourceEnum: (manual, ai) // Validator: // sourceEnum: (manual) if was in received.json then else // sourceEnum: (queue, AI) if queue CreatedMessage with the same user - do nothing Add to received.json var classes = detections.Select(x => x.ClassNumber).Distinct().ToList() ?? []; AnnotationStatus status; var annotation = await _dbFactory.Run(async db => { var ann = await db.Annotations.FirstOrDefaultAsync(x => x.Name == fName, token: token); status = ann?.AnnotationStatus == AnnotationStatus.Created && createdRole == RoleEnum.Validator ? AnnotationStatus.Validated : AnnotationStatus.Created; if (ann != null) await db.Annotations .Where(x => x.Name == fName) .Set(x => x.Classes, classes) .Set(x => x.Source, source) .Set(x => x.AnnotationStatus, status) .UpdateAsync(token: token); else { ann = new Annotation { CreatedDate = createdDate, Name = fName, ImageExtension = imageExtension, CreatedEmail = createdEmail, CreatedRole = createdRole, AnnotationStatus = status, Source = source, Detections = detections }; await db.InsertAsync(ann, token: token); } return ann; }); if (stream != null) { var img = System.Drawing.Image.FromStream(stream); img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue } await YoloLabel.WriteToFile(detections, annotation.LabelPath, token); await _galleryService.CreateThumbnail(annotation, token); await _producer.SendToQueue(annotation, token); await _mediator.Publish(new AnnotationCreatedEvent(annotation), token); } }