From cf563571c8592e3ba81e4161ae8c3400cf3a361b Mon Sep 17 00:00:00 2001 From: Alex Bezdieniezhnykh Date: Sat, 17 May 2025 19:38:07 +0300 Subject: [PATCH] log queue errors --- Azaion.Common/Services/AnnotationService.cs | 75 ++++++++++++--------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/Azaion.Common/Services/AnnotationService.cs b/Azaion.Common/Services/AnnotationService.cs index 0e11315..843cd86 100644 --- a/Azaion.Common/Services/AnnotationService.cs +++ b/Azaion.Common/Services/AnnotationService.cs @@ -13,6 +13,7 @@ using LinqToDB; using LinqToDB.Data; using MediatR; using MessagePack; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Stream.Client; @@ -27,6 +28,7 @@ public class AnnotationService : IAnnotationService, INotificationHandler _logger; private readonly QueueConfig _queueConfig; private Consumer _consumer = null!; private readonly UIConfig _uiConfig; @@ -42,13 +44,15 @@ public class AnnotationService : IAnnotationService, INotificationHandler directoriesConfig, IGalleryService galleryService, IMediator mediator, - IAzaionApi api) + IAzaionApi api, + ILogger logger) { _dbFactory = dbFactory; _producer = producer; _galleryService = galleryService; _mediator = mediator; _api = api; + _logger = logger; _queueConfig = queueConfig.Value; _uiConfig = uiConfig.Value; _dirConfig = directoriesConfig.Value; @@ -76,40 +80,47 @@ public class AnnotationService : IAnnotationService, INotificationHandler { - var email = (string)message.ApplicationProperties[nameof(User.Email)]!; - if (email == _api.CurrentUser.Email) //Don't process messages by yourself - return; - var annotationStatus = (AnnotationStatus)message.ApplicationProperties[nameof(AnnotationStatus)]; - if (annotationStatus.In(AnnotationStatus.Created, AnnotationStatus.ValidatedEdited)) + try { - var msg = MessagePackSerializer.Deserialize(message.Data.Contents); - await SaveAnnotationInner( - msg.CreatedDate, - msg.OriginalMediaName, - msg.Time, - JsonConvert.DeserializeObject>(msg.Detections) ?? [], - msg.Source, - new MemoryStream(msg.Image), - msg.Role, - msg.Email, - fromQueue: true, - token: cancellationToken); - } - else - { - var msg = MessagePackSerializer.Deserialize(message.Data.Contents); - if (annotationStatus == AnnotationStatus.Validated) - await ValidateAnnotations(msg.AnnotationNames.ToList(), true, cancellationToken); - if (annotationStatus == AnnotationStatus.Deleted) - await _mediator.Publish(new AnnotationsDeletedEvent(msg.AnnotationNames.ToList(), fromQueue:true), cancellationToken); - } + var email = (string)message.ApplicationProperties[nameof(User.Email)]!; + if (email == _api.CurrentUser.Email) //Don't process messages by yourself + return; + var annotationStatus = (AnnotationStatus)message.ApplicationProperties[nameof(AnnotationStatus)]; + if (annotationStatus.In(AnnotationStatus.Created, AnnotationStatus.ValidatedEdited)) + { + var msg = MessagePackSerializer.Deserialize(message.Data.Contents); + await SaveAnnotationInner( + msg.CreatedDate, + msg.OriginalMediaName, + msg.Time, + JsonConvert.DeserializeObject>(msg.Detections) ?? [], + msg.Source, + new MemoryStream(msg.Image), + msg.Role, + msg.Email, + fromQueue: true, + token: cancellationToken); + } + else + { + var msg = MessagePackSerializer.Deserialize(message.Data.Contents); + if (annotationStatus == AnnotationStatus.Validated) + await ValidateAnnotations(msg.AnnotationNames.ToList(), true, cancellationToken); + if (annotationStatus == AnnotationStatus.Deleted) + await _mediator.Publish(new AnnotationsDeletedEvent(msg.AnnotationNames.ToList(), fromQueue:true), cancellationToken); + } - offsets.AnnotationsOffset = context.Offset; - ThrottleExt.Throttle(() => + offsets.AnnotationsOffset = context.Offset; + ThrottleExt.Throttle(() => + { + _api.UpdateOffsets(offsets); + return Task.CompletedTask; + }, SaveQueueOffsetTaskId, TimeSpan.FromSeconds(10), scheduleCallAfterCooldown: true); + } + catch (Exception e) { - _api.UpdateOffsets(offsets); - return Task.CompletedTask; - }, SaveQueueOffsetTaskId, TimeSpan.FromSeconds(10), scheduleCallAfterCooldown: true); + _logger.LogError(e, e.Message); + } } }); }