mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 13:26:30 +00:00
small fixes, renames
This commit is contained in:
@@ -67,7 +67,22 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
|
||||
OffsetSpec = new OffsetTypeOffset(offset + 1),
|
||||
MessageHandler = async (stream, consumer, context, message) =>
|
||||
{
|
||||
await Consume(MessagePackSerializer.Deserialize<AnnotationCreatedMessage>(message.Data.Contents), cancellationToken);
|
||||
var msg = MessagePackSerializer.Deserialize<AnnotationCreatedMessage>(message.Data.Contents);
|
||||
if (msg.CreatedRole != RoleEnum.Operator) //Process only operator's messages
|
||||
return;
|
||||
|
||||
await SaveAnnotationInner(
|
||||
msg.CreatedDate,
|
||||
msg.Name,
|
||||
msg.ImageExtension,
|
||||
JsonConvert.DeserializeObject<List<Detection>>(msg.Detections) ?? [],
|
||||
msg.Source,
|
||||
new MemoryStream(msg.Image),
|
||||
msg.CreatedRole,
|
||||
msg.CreatedEmail,
|
||||
generateThumbnail: true,
|
||||
cancellationToken);
|
||||
|
||||
await _dbFactory.Run(async db => await db.QueueOffsets
|
||||
.Where(x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE)
|
||||
.Set(x => x.Offset, context.Offset)
|
||||
@@ -92,24 +107,11 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
|
||||
await SaveAnnotationInner(DateTime.UtcNow, annotation.Name, annotation.ImageExtension, annotation.Detections.ToList(), SourceEnum.Manual, null, _apiClient.User.Role, _apiClient.User.Email,
|
||||
generateThumbnail: false, token);
|
||||
|
||||
//Queue (only from operators)
|
||||
public async Task Consume(AnnotationCreatedMessage message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (message.CreatedRole != RoleEnum.Operator) //Process only operator's messages
|
||||
return;
|
||||
|
||||
await SaveAnnotationInner(
|
||||
message.CreatedDate,
|
||||
message.Name,
|
||||
message.ImageExtension,
|
||||
JsonConvert.DeserializeObject<List<Detection>>(message.Detections) ?? [],
|
||||
message.Source,
|
||||
new MemoryStream(message.Image),
|
||||
message.CreatedRole,
|
||||
message.CreatedEmail,
|
||||
generateThumbnail: true,
|
||||
cancellationToken);
|
||||
}
|
||||
// //Queue (only from operators)
|
||||
// public async Task Consume(AnnotationCreatedMessage message, CancellationToken cancellationToken = default)
|
||||
// {
|
||||
//
|
||||
// }
|
||||
|
||||
private async Task<Annotation> SaveAnnotationInner(DateTime createdDate, string fName, string imageExtension, List<Detection> detections, SourceEnum source, Stream? stream,
|
||||
RoleEnum userRole,
|
||||
@@ -168,7 +170,7 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
|
||||
if (generateThumbnail)
|
||||
await _galleryService.CreateThumbnail(annotation, token);
|
||||
|
||||
await _producer.SendToQueue(annotation, token);
|
||||
await _producer.SendToInnerQueue(annotation, token);
|
||||
|
||||
await _mediator.Publish(new AnnotationCreatedEvent(annotation), token);
|
||||
await ThrottleExt.ThrottleRunAfter(() =>
|
||||
|
||||
@@ -53,7 +53,7 @@ public class FailsafeAnnotationsProducer
|
||||
await Init(cancellationToken);
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
var messages = await GetFromQueue(cancellationToken);
|
||||
var messages = await GetFromInnerQueue(cancellationToken);
|
||||
foreach (var messagesChunk in messages.Chunk(10)) //Sending by 10
|
||||
{
|
||||
var sent = false;
|
||||
@@ -91,7 +91,7 @@ public class FailsafeAnnotationsProducer
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<List<AnnotationCreatedMessage>> GetFromQueue(CancellationToken cancellationToken = default)
|
||||
private async Task<List<AnnotationCreatedMessage>> GetFromInnerQueue(CancellationToken cancellationToken = default)
|
||||
{
|
||||
return await _dbFactory.Run(async db =>
|
||||
{
|
||||
@@ -124,7 +124,7 @@ public class FailsafeAnnotationsProducer
|
||||
});
|
||||
}
|
||||
|
||||
public async Task SendToQueue(Annotation annotation, CancellationToken cancellationToken = default)
|
||||
public async Task SendToInnerQueue(Annotation annotation, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await _dbFactory.Run(async db =>
|
||||
await db.InsertAsync(new AnnotationName { Name = annotation.Name }, token: cancellationToken));
|
||||
|
||||
Reference in New Issue
Block a user