Files
annotations/Azaion.Common/Services/AnnotationService.cs
T
Alex Bezdieniezhnykh 83ae6a0ae9 move detection classes and other system values from local config to remote
forbid non validators to read from queue
create better visualization in detector control
make colors for detection classes more distinguishable
fix bug with removing detection (probably completely)
2025-04-02 19:53:03 +03:00

211 lines
8.7 KiB
C#

using System.Drawing.Imaging;
using System.IO;
using System.Net;
using Azaion.Common.Database;
using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.Common.DTO.Queue;
using Azaion.Common.Events;
using Azaion.Common.Extensions;
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.Services;
using LinqToDB;
using LinqToDB.Data;
using MediatR;
using MessagePack;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace Azaion.Common.Services;
public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
{
private readonly IDbFactory _dbFactory;
private readonly FailsafeAnnotationsProducer _producer;
private readonly IGalleryService _galleryService;
private readonly IMediator _mediator;
private readonly IHardwareService _hardwareService;
private readonly IAuthProvider _authProvider;
private readonly QueueConfig _queueConfig;
private Consumer _consumer = null!;
private static readonly Guid SaveTaskId = Guid.NewGuid();
public AnnotationService(
IResourceLoader resourceLoader,
IDbFactory dbFactory,
FailsafeAnnotationsProducer producer,
IOptions<QueueConfig> queueConfig,
IGalleryService galleryService,
IMediator mediator,
IHardwareService hardwareService,
IAuthProvider authProvider)
{
_dbFactory = dbFactory;
_producer = producer;
_galleryService = galleryService;
_mediator = mediator;
_hardwareService = hardwareService;
_authProvider = authProvider;
_queueConfig = queueConfig.Value;
Task.Run(async () => await Init()).Wait();
}
private async Task Init(CancellationToken cancellationToken = default)
{
if (!_authProvider.CurrentUser.Role.IsValidator())
return;
var consumerSystem = await StreamSystem.Create(new StreamSystemConfig
{
Endpoints = new List<EndPoint>{new DnsEndPoint(_queueConfig.Host, _queueConfig.Port)},
UserName = _queueConfig.ConsumerUsername,
Password = _queueConfig.ConsumerPassword
});
var offset = (await _dbFactory.Run(db => db.QueueOffsets.FirstOrDefaultAsync(
x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE, token: cancellationToken))
)?.Offset ?? 0;
_consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE)
{
Reference = _hardwareService.GetHardware().Hash,
OffsetSpec = new OffsetTypeOffset(offset + 1),
MessageHandler = async (_, _, context, message) =>
{
var msg = MessagePackSerializer.Deserialize<AnnotationCreatedMessage>(message.Data.Contents);
await _dbFactory.Run(async db => await db.QueueOffsets
.Where(x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE)
.Set(x => x.Offset, context.Offset)
.UpdateAsync(token: cancellationToken));
await ThrottleExt.ThrottleRunAfter(() =>
{
_dbFactory.SaveToDisk();
return Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), cancellationToken);
if (msg.CreatedEmail == _authProvider.CurrentUser.Email) //Don't process messages by yourself
return;
await SaveAnnotationInner(
msg.CreatedDate,
msg.OriginalMediaName,
msg.Time,
JsonConvert.DeserializeObject<List<Detection>>(msg.Detections) ?? [],
msg.Source,
new MemoryStream(msg.Image),
msg.CreatedRole,
msg.CreatedEmail,
generateThumbnail: true,
fromQueue: true,
token: cancellationToken);
}
});
}
//AI
public async Task<Annotation> SaveAnnotation(AnnotationImage a, CancellationToken ct = default)
{
a.Time = TimeSpan.FromMilliseconds(a.Milliseconds);
return await SaveAnnotationInner(DateTime.Now, a.OriginalMediaName, a.Time, a.Detections.ToList(),
SourceEnum.AI, new MemoryStream(a.Image), _authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: true, token: ct);
}
//Manual
public async Task<Annotation> SaveAnnotation(string originalMediaName, TimeSpan time, List<Detection> detections, Stream? stream = null, CancellationToken token = default) =>
await SaveAnnotationInner(DateTime.UtcNow, originalMediaName, time, detections, SourceEnum.Manual, stream,
_authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: true, token: token);
//Manual Validate existing
public async Task ValidateAnnotation(Annotation annotation, CancellationToken token = default) =>
await SaveAnnotationInner(DateTime.UtcNow, annotation.OriginalMediaName, annotation.Time, annotation.Detections.ToList(), SourceEnum.Manual, null,
_authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, token: token);
// Manual save from Validators -> Validated -> stream: azaion-annotations-confirm
// AI, Manual save from Operators -> Created -> stream: azaion-annotations
private async Task<Annotation> SaveAnnotationInner(DateTime createdDate, string originalMediaName, TimeSpan time, List<Detection> detections, SourceEnum source, Stream? stream,
RoleEnum userRole,
string createdEmail,
bool generateThumbnail = false,
bool fromQueue = false,
CancellationToken token = default)
{
AnnotationStatus status;
var fName = originalMediaName.ToTimeName(time);
var annotation = await _dbFactory.Run(async db =>
{
var ann = await db.Annotations.FirstOrDefaultAsync(x => x.Name == fName, token: token);
status = userRole.IsValidator() && source == SourceEnum.Manual
? AnnotationStatus.Validated
: AnnotationStatus.Created;
await db.Detections.DeleteAsync(x => x.AnnotationName == fName, token: token);
await db.BulkCopyAsync(detections, cancellationToken: token);
if (ann != null)
{
await db.Annotations
.Where(x => x.Name == fName)
.Set(x => x.Source, source)
.Set(x => x.AnnotationStatus, status)
.Set(x => x.CreatedDate, createdDate)
.Set(x => x.CreatedEmail, createdEmail)
.Set(x => x.CreatedRole, userRole)
.UpdateAsync(token: token);
ann.Detections = detections;
}
else
{
ann = new Annotation
{
CreatedDate = createdDate,
Name = fName,
OriginalMediaName = originalMediaName,
Time = time,
ImageExtension = Constants.JPG_EXT,
CreatedEmail = createdEmail,
CreatedRole = userRole,
AnnotationStatus = status,
Source = source,
Detections = detections
};
await db.InsertAsync(ann, token: token);
}
return ann;
});
if (stream != null)
{
var img = System.Drawing.Image.FromStream(stream);
img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue
}
await YoloLabel.WriteToFile(detections, annotation.LabelPath, token);
if (generateThumbnail)
await _galleryService.CreateThumbnail(annotation, token);
if (!fromQueue) //Send to queue only if we're not getting from queue already
await _producer.SendToInnerQueue(annotation, token);
await _mediator.Publish(new AnnotationCreatedEvent(annotation), token);
await ThrottleExt.ThrottleRunAfter(() =>
{
_dbFactory.SaveToDisk();
return Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), token);
return annotation;
}
public async Task Handle(AnnotationsDeletedEvent notification, CancellationToken cancellationToken)
{
await _dbFactory.DeleteAnnotations(notification.Annotations, cancellationToken);
foreach (var annotation in notification.Annotations)
{
File.Delete(annotation.ImagePath);
File.Delete(annotation.LabelPath);
File.Delete(annotation.ThumbPath);
}
}
}