mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 17:46:36 +00:00
update AI initializing
rework AIAvailabilityStatus events to mediatr
This commit is contained in:
@@ -21,6 +21,7 @@ using RabbitMQ.Stream.Client.Reliable;
|
||||
namespace Azaion.Common.Services;
|
||||
|
||||
// SHOULD BE ONLY ONE INSTANCE OF AnnotationService. Do not add ANY NotificationHandler to it!
|
||||
// Queue consumer should be created only once.
|
||||
public class AnnotationService : IAnnotationService
|
||||
{
|
||||
private readonly IDbFactory _dbFactory;
|
||||
|
||||
+15
-16
@@ -1,18 +1,17 @@
|
||||
using System.Diagnostics;
|
||||
using System.Text;
|
||||
using Azaion.Common.DTO;
|
||||
using MediatR;
|
||||
using MessagePack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
|
||||
namespace Azaion.Common.Services;
|
||||
namespace Azaion.Common.Services.Inference;
|
||||
|
||||
public interface IInferenceClient : IDisposable
|
||||
{
|
||||
event EventHandler<RemoteCommand>? InferenceDataReceived;
|
||||
event EventHandler<RemoteCommand>? AIAvailabilityReceived;
|
||||
void Send(RemoteCommand create);
|
||||
void Stop();
|
||||
}
|
||||
@@ -20,21 +19,22 @@ public interface IInferenceClient : IDisposable
|
||||
public class InferenceClient : IInferenceClient
|
||||
{
|
||||
private readonly ILogger<InferenceClient> _logger;
|
||||
public event EventHandler<RemoteCommand>? BytesReceived;
|
||||
public event EventHandler<RemoteCommand>? InferenceDataReceived;
|
||||
public event EventHandler<RemoteCommand>? AIAvailabilityReceived;
|
||||
|
||||
private readonly DealerSocket _dealer = new();
|
||||
private readonly NetMQPoller _poller = new();
|
||||
private readonly Guid _clientId = Guid.NewGuid();
|
||||
private readonly InferenceClientConfig _inferenceClientConfig;
|
||||
private readonly LoaderClientConfig _loaderClientConfig;
|
||||
private readonly IMediator _mediator;
|
||||
|
||||
public InferenceClient(ILogger<InferenceClient> logger, IOptions<InferenceClientConfig> inferenceConfig, IOptions<LoaderClientConfig> loaderConfig)
|
||||
public InferenceClient(ILogger<InferenceClient> logger, IOptions<InferenceClientConfig> inferenceConfig,
|
||||
IMediator mediator,
|
||||
IOptions<LoaderClientConfig> loaderConfig)
|
||||
{
|
||||
_logger = logger;
|
||||
_inferenceClientConfig = inferenceConfig.Value;
|
||||
_loaderClientConfig = loaderConfig.Value;
|
||||
_mediator = mediator;
|
||||
Start();
|
||||
}
|
||||
|
||||
@@ -59,32 +59,31 @@ public class InferenceClient : IInferenceClient
|
||||
_dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N"));
|
||||
_dealer.Connect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}");
|
||||
|
||||
_dealer.ReceiveReady += (_, e) => ProcessClientCommand(e.Socket);
|
||||
_dealer.ReceiveReady += async (_, e) => await ProcessClientCommand(e.Socket);
|
||||
_poller.Add(_dealer);
|
||||
_ = Task.Run(() => _poller.RunAsync());
|
||||
}
|
||||
|
||||
private void ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default)
|
||||
private async Task ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default)
|
||||
{
|
||||
while (socket.TryReceiveFrameBytes(TimeSpan.Zero, out var bytes))
|
||||
{
|
||||
if (bytes?.Length == 0)
|
||||
if (bytes.Length == 0)
|
||||
continue;
|
||||
|
||||
var remoteCommand = MessagePackSerializer.Deserialize<RemoteCommand>(bytes, cancellationToken: ct);
|
||||
switch (remoteCommand.CommandType)
|
||||
{
|
||||
case CommandType.DataBytes:
|
||||
BytesReceived?.Invoke(this, remoteCommand);
|
||||
break;
|
||||
case CommandType.InferenceData:
|
||||
InferenceDataReceived?.Invoke(this, remoteCommand);
|
||||
await _mediator.Publish(new InferenceDataEvent(remoteCommand), ct);
|
||||
break;
|
||||
case CommandType.AIAvailabilityResult:
|
||||
AIAvailabilityReceived?.Invoke(this, remoteCommand);
|
||||
var aiAvailabilityStatus = MessagePackSerializer.Deserialize<AIAvailabilityStatusEvent>(remoteCommand.Data, cancellationToken: ct);
|
||||
await _mediator.Publish(aiAvailabilityStatus, ct);
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
using Azaion.Common.DTO;
|
||||
using Azaion.Common.DTO.Config;
|
||||
using Azaion.Common.Extensions;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Azaion.Common.Services.Inference;
|
||||
|
||||
public interface IInferenceService
|
||||
{
|
||||
Task RunInference(List<string> mediaPaths, CancellationToken ct = default);
|
||||
CancellationTokenSource InferenceCancelTokenSource { get; set; }
|
||||
void StopInference();
|
||||
}
|
||||
|
||||
// SHOULD BE ONLY ONE INSTANCE OF InferenceService. Do not add ANY NotificationHandler to it!
|
||||
// _inferenceCancelTokenSource should be created only once.
|
||||
public class InferenceService : IInferenceService
|
||||
{
|
||||
private readonly IInferenceClient _client;
|
||||
private readonly IAzaionApi _azaionApi;
|
||||
private readonly IOptions<AIRecognitionConfig> _aiConfigOptions;
|
||||
public CancellationTokenSource InferenceCancelTokenSource { get; set; } = new();
|
||||
public CancellationTokenSource CheckAIAvailabilityTokenSource { get; set; } = new();
|
||||
|
||||
public InferenceService(IInferenceClient client, IAzaionApi azaionApi, IOptions<AIRecognitionConfig> aiConfigOptions)
|
||||
{
|
||||
_client = client;
|
||||
_azaionApi = azaionApi;
|
||||
_aiConfigOptions = aiConfigOptions;
|
||||
}
|
||||
|
||||
public async Task CheckAIAvailabilityStatus()
|
||||
{
|
||||
CheckAIAvailabilityTokenSource = new CancellationTokenSource();
|
||||
while (!CheckAIAvailabilityTokenSource.IsCancellationRequested)
|
||||
{
|
||||
_client.Send(RemoteCommand.Create(CommandType.AIAvailabilityCheck));
|
||||
await Task.Delay(10000, CheckAIAvailabilityTokenSource.Token);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task RunInference(List<string> mediaPaths, CancellationToken ct = default)
|
||||
{
|
||||
InferenceCancelTokenSource = new CancellationTokenSource();
|
||||
_client.Send(RemoteCommand.Create(CommandType.Login, _azaionApi.Credentials));
|
||||
|
||||
var aiConfig = _aiConfigOptions.Value;
|
||||
aiConfig.Paths = mediaPaths;
|
||||
_client.Send(RemoteCommand.Create(CommandType.Inference, aiConfig));
|
||||
|
||||
using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct, InferenceCancelTokenSource.Token);
|
||||
await combinedTokenSource.Token.AsTask();
|
||||
}
|
||||
|
||||
public void StopInference() => _client.Stop();
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
using Azaion.Common.Database;
|
||||
using Azaion.Common.DTO;
|
||||
using Azaion.Common.Events;
|
||||
using MediatR;
|
||||
using MessagePack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Azaion.Common.Services.Inference;
|
||||
|
||||
public class InferenceServiceEventHandler(IInferenceService inferenceService,
|
||||
IAnnotationService annotationService,
|
||||
IMediator mediator,
|
||||
ILogger<InferenceServiceEventHandler> logger) :
|
||||
INotificationHandler<InferenceDataEvent>,
|
||||
INotificationHandler<AIAvailabilityStatusEvent>
|
||||
{
|
||||
public async Task Handle(InferenceDataEvent e, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (e.Command.Message == "DONE")
|
||||
{
|
||||
await inferenceService.InferenceCancelTokenSource.CancelAsync();
|
||||
return;
|
||||
}
|
||||
|
||||
var annImage = MessagePackSerializer.Deserialize<AnnotationImage>(e.Command.Data, cancellationToken: ct);
|
||||
var annotation = await annotationService.SaveAnnotation(annImage, ct);
|
||||
await mediator.Publish(new AnnotationAddedEvent(annotation), ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, ex.Message);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Handle(AIAvailabilityStatusEvent e, CancellationToken ct)
|
||||
{
|
||||
|
||||
e.Status = AIAvailabilityEnum.Enabled;
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
using Azaion.Common.DTO;
|
||||
using MediatR;
|
||||
|
||||
namespace Azaion.Common.Services.Inference;
|
||||
|
||||
public class InferenceDataEvent(RemoteCommand command) : INotification
|
||||
{
|
||||
public RemoteCommand Command { get; set; } = command;
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
using Azaion.Common.Database;
|
||||
using Azaion.Common.DTO;
|
||||
using Azaion.Common.DTO.Config;
|
||||
using Azaion.Common.Events;
|
||||
using Azaion.Common.Extensions;
|
||||
using MediatR;
|
||||
using MessagePack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Azaion.Common.Services;
|
||||
|
||||
public interface IInferenceService
|
||||
{
|
||||
Task RunInference(List<string> mediaPaths, CancellationToken ct = default);
|
||||
void StopInference();
|
||||
}
|
||||
|
||||
public class InferenceService : IInferenceService
|
||||
{
|
||||
private readonly IInferenceClient _client;
|
||||
private readonly IAzaionApi _azaionApi;
|
||||
private readonly IOptions<AIRecognitionConfig> _aiConfigOptions;
|
||||
private readonly IAnnotationService _annotationService;
|
||||
private readonly IMediator _mediator;
|
||||
private CancellationTokenSource _inferenceCancelTokenSource = new();
|
||||
|
||||
public InferenceService(
|
||||
ILogger<InferenceService> logger,
|
||||
IInferenceClient client,
|
||||
IAzaionApi azaionApi,
|
||||
IOptions<AIRecognitionConfig> aiConfigOptions,
|
||||
IAnnotationService annotationService,
|
||||
IMediator mediator)
|
||||
{
|
||||
_client = client;
|
||||
_azaionApi = azaionApi;
|
||||
_aiConfigOptions = aiConfigOptions;
|
||||
_annotationService = annotationService;
|
||||
_mediator = mediator;
|
||||
|
||||
client.InferenceDataReceived += async (sender, command) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (command.Message == "DONE")
|
||||
{
|
||||
_inferenceCancelTokenSource?.Cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
var annImage = MessagePackSerializer.Deserialize<AnnotationImage>(command.Data);
|
||||
await ProcessDetection(annImage);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
logger.LogError(e, e.Message);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async Task ProcessDetection(AnnotationImage annotationImage, CancellationToken ct = default)
|
||||
{
|
||||
var annotation = await _annotationService.SaveAnnotation(annotationImage, ct);
|
||||
await _mediator.Publish(new AnnotationAddedEvent(annotation), ct);
|
||||
}
|
||||
|
||||
public async Task RunInference(List<string> mediaPaths, CancellationToken ct = default)
|
||||
{
|
||||
_inferenceCancelTokenSource = new CancellationTokenSource();
|
||||
_client.Send(RemoteCommand.Create(CommandType.Login, _azaionApi.Credentials));
|
||||
|
||||
var aiConfig = _aiConfigOptions.Value;
|
||||
aiConfig.Paths = mediaPaths;
|
||||
_client.Send(RemoteCommand.Create(CommandType.Inference, aiConfig));
|
||||
|
||||
using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct, _inferenceCancelTokenSource.Token);
|
||||
await combinedTokenSource.Token.AsTask();
|
||||
}
|
||||
|
||||
public void StopInference() => _client.Stop();
|
||||
}
|
||||
Reference in New Issue
Block a user