gps matcher async

put cryptography lib to fixed version
fix race condition bug in queue handler
add lock to db writing and backup to file db on each write
This commit is contained in:
Alex Bezdieniezhnykh
2025-05-29 00:35:35 +03:00
parent 34ea821fb3
commit d842466594
12 changed files with 245 additions and 191 deletions
+43 -52
View File
@@ -1,4 +1,5 @@
using System.Drawing.Imaging;
using System.Drawing;
using System.Drawing.Imaging;
using System.IO;
using System.Net;
using Azaion.Common.Database;
@@ -21,7 +22,8 @@ using RabbitMQ.Stream.Client.Reliable;
namespace Azaion.Common.Services;
public class AnnotationService : IAnnotationService, INotificationHandler<AnnotationsDeletedEvent>
// SHOULD BE ONLY ONE INSTANCE OF AnnotationService. Do not add ANY NotificationHandler to it!
public class AnnotationService : IAnnotationService
{
private readonly IDbFactory _dbFactory;
private readonly FailsafeAnnotationsProducer _producer;
@@ -32,16 +34,16 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
private readonly QueueConfig _queueConfig;
private Consumer _consumer = null!;
private readonly UIConfig _uiConfig;
private readonly DirectoriesConfig _dirConfig;
private static readonly Guid SaveTaskId = Guid.NewGuid();
private readonly SemaphoreSlim _imageAccessSemaphore = new(1, 1);
private readonly SemaphoreSlim _messageProcessingSemaphore = new(1, 1);
private static readonly Guid SaveQueueOffsetTaskId = Guid.NewGuid();
public AnnotationService(
IDbFactory dbFactory,
FailsafeAnnotationsProducer producer,
IOptions<QueueConfig> queueConfig,
IOptions<UIConfig> uiConfig,
IOptions<DirectoriesConfig> directoriesConfig,
IGalleryService galleryService,
IMediator mediator,
IAzaionApi api,
@@ -55,7 +57,6 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
_logger = logger;
_queueConfig = queueConfig.Value;
_uiConfig = uiConfig.Value;
_dirConfig = directoriesConfig.Value;
Task.Run(async () => await InitQueueConsumer()).Wait();
}
@@ -80,6 +81,7 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
OffsetSpec = new OffsetTypeOffset(offsets.AnnotationsOffset),
MessageHandler = async (_, _, context, message) =>
{
await _messageProcessingSemaphore.WaitAsync(cancellationToken);
try
{
var email = (string)message.ApplicationProperties[nameof(User.Email)]!;
@@ -100,7 +102,7 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
msg.Image == null ? null : new MemoryStream(msg.Image),
msg.Role,
msg.Email,
fromQueue: true,
context.Offset,
token: cancellationToken);
}
else
@@ -124,6 +126,10 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
{
_logger.LogError(e, e.Message);
}
finally
{
_messageProcessingSemaphore.Release();
}
}
});
}
@@ -145,12 +151,12 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
List<Detection> detections, SourceEnum source, Stream? stream,
RoleEnum userRole,
string createdEmail,
bool fromQueue = false,
ulong? offset = null,
CancellationToken token = default)
{
var status = AnnotationStatus.Created;
var fName = originalMediaName.ToTimeName(time);
var annotation = await _dbFactory.Run(async db =>
var annotation = await _dbFactory.RunWrite(async db =>
{
var ann = await db.Annotations
.LoadWith(x => x.Detections)
@@ -200,27 +206,40 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
return ann;
});
if (stream != null)
//Save image should be done in 1 thread only
await _imageAccessSemaphore.WaitAsync(token);
try
{
var img = System.Drawing.Image.FromStream(stream);
img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue
}
await YoloLabel.WriteToFile(detections, annotation.LabelPath, token);
Image image = null!;
if (stream != null)
{
image = Image.FromStream(stream);
if (File.Exists(annotation.ImagePath))
ResilienceExt.WithRetry(() => File.Delete(annotation.ImagePath));
image.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue
}
await _galleryService.CreateThumbnail(annotation, token);
if (_uiConfig.GenerateAnnotatedImage)
await _galleryService.CreateAnnotatedImage(annotation, token);
await YoloLabel.WriteToFile(detections, annotation.LabelPath, token);
await _galleryService.CreateThumbnail(annotation, image, token);
if (_uiConfig.GenerateAnnotatedImage)
await _galleryService.CreateAnnotatedImage(annotation, image, token);
}
catch (Exception e)
{
_logger.LogError(e, $"Try to save {annotation.ImagePath}, Error: {e.Message}");
throw;
}
finally
{
_imageAccessSemaphore.Release();
}
await _mediator.Publish(new AnnotationCreatedEvent(annotation), token);
if (!fromQueue) //Send to queue only if we're not getting from queue already
if (!offset.HasValue) //Send to queue only if we're not getting from queue already
await _producer.SendToInnerQueue([annotation.Name], status, token);
ThrottleExt.Throttle(async () =>
{
_dbFactory.SaveToDisk();
await Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), true);
return annotation;
}
@@ -230,7 +249,7 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
return;
var annNames = annotationNames.ToHashSet();
await _dbFactory.Run(async db =>
await _dbFactory.RunWrite(async db =>
{
await db.Annotations
.Where(x => annNames.Contains(x.Name))
@@ -241,34 +260,6 @@ public class AnnotationService : IAnnotationService, INotificationHandler<Annota
});
if (!fromQueue)
await _producer.SendToInnerQueue(annotationNames, AnnotationStatus.Validated, token);
ThrottleExt.Throttle(async () =>
{
_dbFactory.SaveToDisk();
await Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), true);
}
public async Task Handle(AnnotationsDeletedEvent notification, CancellationToken ct)
{
await _dbFactory.DeleteAnnotations(notification.AnnotationNames, ct);
foreach (var name in notification.AnnotationNames)
{
File.Delete(Path.Combine(_dirConfig.ImagesDirectory, $"{name}{Constants.JPG_EXT}"));
File.Delete(Path.Combine(_dirConfig.LabelsDirectory, $"{name}{Constants.TXT_EXT}"));
File.Delete(Path.Combine(_dirConfig.ThumbnailsDirectory, $"{name}{Constants.THUMBNAIL_PREFIX}{Constants.JPG_EXT}"));
File.Delete(Path.Combine(_dirConfig.ResultsDirectory, $"{name}{Constants.RESULT_PREFIX}{Constants.JPG_EXT}"));
}
//Only validators can send Delete to the queue
if (!notification.FromQueue && _api.CurrentUser.Role.IsValidator())
await _producer.SendToInnerQueue(notification.AnnotationNames, AnnotationStatus.Deleted, ct);
ThrottleExt.Throttle(async () =>
{
_dbFactory.SaveToDisk();
await Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), true);
}
}
+59 -61
View File
@@ -62,7 +62,7 @@ public class FailsafeAnnotationsProducer
{
try
{
var result = await _dbFactory.Run(async db =>
var (records, annotationsDict) = await _dbFactory.Run(async db =>
{
var records = await db.AnnotationsQueueRecords.OrderBy(x => x.DateTime).ToListAsync(token: ct);
var editedCreatedNames = records
@@ -73,69 +73,67 @@ public class FailsafeAnnotationsProducer
var annotationsDict = await db.Annotations.LoadWith(x => x.Detections)
.Where(x => editedCreatedNames.Contains(x.Name))
.ToDictionaryAsync(a => a.Name, token: ct);
var messages = new List<Message>();
foreach (var record in records)
{
var appProperties = new ApplicationProperties
{
{ nameof(AnnotationStatus), record.Operation.ToString() },
{ nameof(User.Email), _azaionApi.CurrentUser.Email }
};
if (record.Operation.In(AnnotationStatus.Validated, AnnotationStatus.Deleted))
{
var message = new Message(MessagePackSerializer.Serialize(new AnnotationBulkMessage
{
AnnotationNames = record.AnnotationNames.ToArray(),
AnnotationStatus = record.Operation,
Email = _azaionApi.CurrentUser.Email,
CreatedDate = record.DateTime
})) { ApplicationProperties = appProperties };
messages.Add(message);
}
else
{
var annotation = annotationsDict!.GetValueOrDefault(record.AnnotationNames.FirstOrDefault());
if (annotation == null)
continue;
var image = record.Operation == AnnotationStatus.Created
? await File.ReadAllBytesAsync(annotation.ImagePath, ct)
: null;
var annMessage = new AnnotationMessage
{
Name = annotation.Name,
OriginalMediaName = annotation.OriginalMediaName,
Time = annotation.Time,
Role = annotation.CreatedRole,
Email = annotation.CreatedEmail,
CreatedDate = annotation.CreatedDate,
Status = annotation.AnnotationStatus,
ImageExtension = annotation.ImageExtension,
Image = image,
Detections = JsonConvert.SerializeObject(annotation.Detections),
Source = annotation.Source,
};
var message = new Message(MessagePackSerializer.Serialize(annMessage)) { ApplicationProperties = appProperties };
messages.Add(message);
}
}
return (messages, records);
return (records, annotationsDict);
});
if (result.messages.Any())
var messages = new List<Message>();
foreach (var record in records)
{
await _annotationProducer.Send(result.messages, CompressionType.Gzip);
var ids = result.records.Select(x => x.Id).ToList();
var removed = await _dbFactory.Run(async db => await db.AnnotationsQueueRecords.DeleteAsync(x => ids.Contains(x.Id), token: ct));
var appProperties = new ApplicationProperties
{
{ nameof(AnnotationStatus), record.Operation.ToString() },
{ nameof(User.Email), _azaionApi.CurrentUser.Email }
};
if (record.Operation.In(AnnotationStatus.Validated, AnnotationStatus.Deleted))
{
var message = new Message(MessagePackSerializer.Serialize(new AnnotationBulkMessage
{
AnnotationNames = record.AnnotationNames.ToArray(),
AnnotationStatus = record.Operation,
Email = _azaionApi.CurrentUser.Email,
CreatedDate = record.DateTime
})) { ApplicationProperties = appProperties };
messages.Add(message);
}
else
{
var annotation = annotationsDict!.GetValueOrDefault(record.AnnotationNames.FirstOrDefault());
if (annotation == null)
continue;
var image = record.Operation == AnnotationStatus.Created
? await File.ReadAllBytesAsync(annotation.ImagePath, ct)
: null;
var annMessage = new AnnotationMessage
{
Name = annotation.Name,
OriginalMediaName = annotation.OriginalMediaName,
Time = annotation.Time,
Role = annotation.CreatedRole,
Email = annotation.CreatedEmail,
CreatedDate = annotation.CreatedDate,
Status = annotation.AnnotationStatus,
ImageExtension = annotation.ImageExtension,
Image = image,
Detections = JsonConvert.SerializeObject(annotation.Detections),
Source = annotation.Source,
};
var message = new Message(MessagePackSerializer.Serialize(annMessage)) { ApplicationProperties = appProperties };
messages.Add(message);
}
}
if (messages.Any())
{
await _annotationProducer.Send(messages, CompressionType.Gzip);
var ids = records.Select(x => x.Id).ToList();
var removed = await _dbFactory.RunWrite(async db => await db.AnnotationsQueueRecords.DeleteAsync(x => ids.Contains(x.Id), token: ct));
sent = true;
_dbFactory.SaveToDisk();
}
}
catch (Exception e)
@@ -153,7 +151,7 @@ public class FailsafeAnnotationsProducer
{
if (_uiConfig.SilentDetection)
return;
await _dbFactory.Run(async db =>
await _dbFactory.RunWrite(async db =>
await db.InsertAsync(new AnnotationQueueRecord
{
Id = Guid.NewGuid(),
+1 -2
View File
@@ -18,7 +18,6 @@ public class GpsMatcherService(IGpsMatcherClient gpsMatcherClient, ISatelliteDow
INotificationHandler<GPSMatcherResultEvent>,
INotificationHandler<GPSMatcherFinishedEvent>
{
private readonly IGpsMatcherClient _gpsMatcherClient = gpsMatcherClient;
private readonly DirectoriesConfig _dirConfig = dirConfig.Value;
private const int ZOOM_LEVEL = 18;
private const int POINTS_COUNT = 10;
@@ -69,7 +68,7 @@ public class GpsMatcherService(IGpsMatcherClient gpsMatcherClient, ISatelliteDow
.ToDictionary(x => x.Filename, x => x.Index);
await satelliteTileDownloader.GetTiles(_currentLat, _currentLon, SATELLITE_RADIUS_M, ZOOM_LEVEL, _detectToken);
_gpsMatcherClient.StartMatching(new StartMatchingEvent
gpsMatcherClient.StartMatching(new StartMatchingEvent
{
ImagesCount = POINTS_COUNT,
Latitude = _currentLat,
+15 -14
View File
@@ -61,7 +61,7 @@ public class GalleryService(
{
foreach(var file in new DirectoryInfo(_dirConfig.ThumbnailsDirectory).GetFiles())
file.Delete();
await dbFactory.Run(async db =>
await dbFactory.RunWrite(async db =>
{
await db.Detections.DeleteAsync(x => true, token: cancellationToken);
await db.Annotations.DeleteAsync(x => true, token: cancellationToken);
@@ -157,7 +157,7 @@ public class GalleryService(
if (!thumbnails.Contains(fName))
await CreateThumbnail(annotation, cancellationToken);
await CreateThumbnail(annotation, cancellationToken: cancellationToken);
}
catch (Exception e)
@@ -198,24 +198,23 @@ public class GalleryService(
.Select(x => x.Value)
.ToList();
await dbFactory.Run(async db =>
await dbFactory.RunWrite(async db =>
{
await db.BulkCopyAsync(copyOptions, annotationsToInsert);
await db.BulkCopyAsync(copyOptions, annotationsToInsert.SelectMany(x => x.Detections));
});
dbFactory.SaveToDisk();
_updateLock.Release();
}
}
public async Task CreateThumbnail(Annotation annotation, CancellationToken cancellationToken = default)
public async Task CreateThumbnail(Annotation annotation, Image? originalImage = null, CancellationToken cancellationToken = default)
{
try
{
var width = (int)_thumbnailConfig.Size.Width;
var height = (int)_thumbnailConfig.Size.Height;
var originalImage = Image.FromStream(new MemoryStream(await File.ReadAllBytesAsync(annotation.ImagePath, cancellationToken)));
originalImage ??= Image.FromStream(new MemoryStream(await File.ReadAllBytesAsync(annotation.ImagePath, cancellationToken)));
var bitmap = new Bitmap(width, height);
@@ -282,10 +281,9 @@ public class GalleryService(
logger.LogError(e, e.Message);
}
}
public async Task CreateAnnotatedImage(Annotation annotation, CancellationToken token)
public async Task CreateAnnotatedImage(Annotation annotation, Image? originalImage = null, CancellationToken token = default)
{
var originalImage = Image.FromStream(new MemoryStream(await File.ReadAllBytesAsync(annotation.ImagePath, token)));
originalImage ??= Image.FromStream(new MemoryStream(await File.ReadAllBytesAsync(annotation.ImagePath, token)));
using var g = Graphics.FromImage(originalImage);
foreach (var detection in annotation.Detections)
@@ -299,17 +297,20 @@ public class GalleryService(
var label = detection.Confidence >= 0.995 ? detClass.UIName : $"{detClass.UIName}: {detection.Confidence * 100:F0}%";
g.DrawTextBox(label, new PointF((float)(det.X + det.Width / 2.0), (float)(det.Y - 24)), brush, Brushes.Black);
}
originalImage.Save(Path.Combine(_dirConfig.ResultsDirectory, $"{annotation.Name}{Constants.RESULT_PREFIX}.jpg"), ImageFormat.Jpeg);
var imagePath = Path.Combine(_dirConfig.ResultsDirectory, $"{annotation.Name}{Constants.RESULT_PREFIX}.jpg");
if (File.Exists(imagePath))
ResilienceExt.WithRetry(() => File.Delete(imagePath));
originalImage.Save(imagePath, ImageFormat.Jpeg);
}
}
public interface IGalleryService
{
event ThumbnailsUpdatedEventHandler? ThumbnailsUpdate;
double ProcessedThumbnailsPercentage { get; set; }
Task CreateThumbnail(Annotation annotation, CancellationToken cancellationToken = default);
Task CreateThumbnail(Annotation annotation, Image? originalImage = null, CancellationToken cancellationToken = default);
Task RefreshThumbnails();
Task ClearThumbnails(CancellationToken cancellationToken = default);
Task CreateAnnotatedImage(Annotation annotation, CancellationToken token);
Task CreateAnnotatedImage(Annotation annotation, Image? originalImage = null, CancellationToken token = default);
}
+10 -17
View File
@@ -32,22 +32,15 @@ public class StartMatchingEvent
public class GpsMatcherClient : IGpsMatcherClient
{
private readonly IMediator _mediator;
private readonly GpsDeniedClientConfig _gpsDeniedClientConfig;
private string _requestAddress;
private readonly string _requestAddress;
private readonly RequestSocket _requestSocket = new();
private string _subscriberAddress;
private readonly string _subscriberAddress;
private readonly SubscriberSocket _subscriberSocket = new();
public GpsMatcherClient(IMediator mediator, IOptions<GpsDeniedClientConfig> gpsDeniedClientConfig)
public GpsMatcherClient(IMediator mediator, IOptions<GpsDeniedClientConfig> gpsConfig)
{
_mediator = mediator;
_gpsDeniedClientConfig = gpsDeniedClientConfig.Value;
Start();
}
private void Start(CancellationToken ct = default)
{
try
{
using var process = new Process();
@@ -71,16 +64,16 @@ public class GpsMatcherClient : IGpsMatcherClient
//throw;
}
_requestAddress = $"tcp://{_gpsDeniedClientConfig.ZeroMqHost}:{_gpsDeniedClientConfig.ZeroMqPort}";
_requestAddress = $"tcp://{gpsConfig.Value.ZeroMqHost}:{gpsConfig.Value.ZeroMqPort}";
_requestSocket.Connect(_requestAddress);
_subscriberAddress = $"tcp://{_gpsDeniedClientConfig.ZeroMqHost}:{_gpsDeniedClientConfig.ZeroMqSubscriberPort}";
_subscriberAddress = $"tcp://{gpsConfig.Value.ZeroMqHost}:{gpsConfig.Value.ZeroMqSubscriberPort}";
_subscriberSocket.Connect(_subscriberAddress);
_subscriberSocket.Subscribe("");
_subscriberSocket.ReceiveReady += async (_, e) => await ProcessClientCommand(e.Socket, ct);
_subscriberSocket.ReceiveReady += async (_, e) => await ProcessClientCommand(e.Socket);
}
private async Task ProcessClientCommand(NetMQSocket socket, CancellationToken ct)
private async Task ProcessClientCommand(NetMQSocket socket)
{
while (socket.TryReceiveFrameString(TimeSpan.Zero, out var str))
{
@@ -90,10 +83,10 @@ public class GpsMatcherClient : IGpsMatcherClient
switch (str)
{
case "FINISHED":
await _mediator.Publish(new GPSMatcherFinishedEvent(), ct);
await _mediator.Publish(new GPSMatcherFinishedEvent());
break;
case "OK":
await _mediator.Publish(new GPSMatcherJobAcceptedEvent(), ct);
await _mediator.Publish(new GPSMatcherJobAcceptedEvent());
break;
default:
var parts = str.Split(',');
@@ -107,7 +100,7 @@ public class GpsMatcherClient : IGpsMatcherClient
Latitude = double.Parse(parts[2]),
Longitude = double.Parse(parts[3]),
MatchType = parts[4]
}, ct);
});
break;
}
}