failsafe load dlls

add user config queue offsets
throttle improvements
This commit is contained in:
Alex Bezdieniezhnykh
2025-04-17 01:19:48 +03:00
parent 0237e279a5
commit 0c66607ed7
32 changed files with 320 additions and 188 deletions
+14 -19
View File
@@ -26,15 +26,13 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
private readonly FailsafeAnnotationsProducer _producer;
private readonly IGalleryService _galleryService;
private readonly IMediator _mediator;
private readonly IHardwareService _hardwareService;
private readonly IAuthProvider _authProvider;
private readonly IAzaionApi _api;
private readonly QueueConfig _queueConfig;
private Consumer _consumer = null!;
private readonly UIConfig _uiConfig;
private static readonly Guid SaveTaskId = Guid.NewGuid();
public AnnotationService(
IResourceLoader resourceLoader,
IDbFactory dbFactory,
FailsafeAnnotationsProducer producer,
IOptions<QueueConfig> queueConfig,
@@ -42,14 +40,13 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
IGalleryService galleryService,
IMediator mediator,
IHardwareService hardwareService,
IAuthProvider authProvider)
IAzaionApi api)
{
_dbFactory = dbFactory;
_producer = producer;
_galleryService = galleryService;
_mediator = mediator;
_hardwareService = hardwareService;
_authProvider = authProvider;
_api = api;
_queueConfig = queueConfig.Value;
_uiConfig = uiConfig.Value;
@@ -58,7 +55,7 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
private async Task Init(CancellationToken cancellationToken = default)
{
if (!_authProvider.CurrentUser.Role.IsValidator())
if (!_api.CurrentUser.Role.IsValidator())
return;
var consumerSystem = await StreamSystem.Create(new StreamSystemConfig
@@ -68,13 +65,11 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
Password = _queueConfig.ConsumerPassword
});
var offset = (await _dbFactory.Run(db => db.QueueOffsets.FirstOrDefaultAsync(
x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE, token: cancellationToken))
)?.Offset ?? 0;
var offset = (ulong)(_api.CurrentUser.UserConfig?.QueueConfig?.AnnotationsOffset ?? 0);
_consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE)
{
Reference = _hardwareService.GetHardware().Hash,
Reference = _api.CurrentUser.Email,
OffsetSpec = new OffsetTypeOffset(offset + 1),
MessageHandler = async (_, _, context, message) =>
{
@@ -84,13 +79,13 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
.Set(x => x.Offset, context.Offset)
.UpdateAsync(token: cancellationToken));
await ThrottleExt.ThrottleRunAfter(() =>
await ThrottleExt.Throttle(() =>
{
_dbFactory.SaveToDisk();
return Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), cancellationToken);
}, TimeSpan.FromSeconds(10), cancellationToken);
if (msg.CreatedEmail == _authProvider.CurrentUser.Email) //Don't process messages by yourself
if (msg.CreatedEmail == _api.CurrentUser.Email) //Don't process messages by yourself
return;
await SaveAnnotationInner(
@@ -114,18 +109,18 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
{
a.Time = TimeSpan.FromMilliseconds(a.Milliseconds);
return await SaveAnnotationInner(DateTime.Now, a.OriginalMediaName, a.Time, a.Detections.ToList(),
SourceEnum.AI, new MemoryStream(a.Image), _authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: true, token: ct);
SourceEnum.AI, new MemoryStream(a.Image), _api.CurrentUser.Role, _api.CurrentUser.Email, generateThumbnail: true, token: ct);
}
//Manual
public async Task<Annotation> SaveAnnotation(string originalMediaName, TimeSpan time, List<Detection> detections, Stream? stream = null, CancellationToken token = default) =>
await SaveAnnotationInner(DateTime.UtcNow, originalMediaName, time, detections, SourceEnum.Manual, stream,
_authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, generateThumbnail: true, token: token);
_api.CurrentUser.Role, _api.CurrentUser.Email, generateThumbnail: true, token: token);
//Manual Validate existing
public async Task ValidateAnnotation(Annotation annotation, CancellationToken token = default) =>
await SaveAnnotationInner(DateTime.UtcNow, annotation.OriginalMediaName, annotation.Time, annotation.Detections.ToList(), SourceEnum.Manual, null,
_authProvider.CurrentUser.Role, _authProvider.CurrentUser.Email, token: token);
_api.CurrentUser.Role, _api.CurrentUser.Email, token: token);
// Manual save from Validators -> Validated -> stream: azaion-annotations-confirm
// AI, Manual save from Operators -> Created -> stream: azaion-annotations
@@ -199,11 +194,11 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
await _producer.SendToInnerQueue(annotation, token);
await _mediator.Publish(new AnnotationCreatedEvent(annotation), token);
await ThrottleExt.ThrottleRunAfter(() =>
await ThrottleExt.Throttle(() =>
{
_dbFactory.SaveToDisk();
return Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), token);
}, TimeSpan.FromSeconds(5), token);
return annotation;
}
@@ -3,6 +3,7 @@ using System.IO;
using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.CommonSecurity;
using Azaion.CommonSecurity.DTO;
using Microsoft.Extensions.Options;
namespace Azaion.Common.Services;
+2 -1
View File
@@ -17,10 +17,11 @@ public interface IInferenceService
void StopInference();
}
public class InferenceService(ILogger<InferenceService> logger, IInferenceClient client, IOptions<AIRecognitionConfig> aiConfigOptions) : IInferenceService
public class InferenceService(ILogger<InferenceService> logger, IInferenceClient client, IAzaionApi azaionApi, IOptions<AIRecognitionConfig> aiConfigOptions) : IInferenceService
{
public async Task RunInference(List<string> mediaPaths, Func<AnnotationImage, Task> processAnnotation, CancellationToken detectToken = default)
{
client.Send(RemoteCommand.Create(CommandType.Login, azaionApi.Credentials));
var aiConfig = aiConfigOptions.Value;
aiConfig.Paths = mediaPaths;
@@ -7,6 +7,7 @@ using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.Common.Extensions;
using Azaion.CommonSecurity;
using Azaion.CommonSecurity.DTO;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;