using System.Diagnostics; using System.Text; using Azaion.Common.DTO; using MediatR; using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NetMQ; using NetMQ.Sockets; namespace Azaion.Common.Services.Inference; public interface IInferenceClient : IDisposable { void Send(RemoteCommand create); void Stop(); } public class InferenceClient : IInferenceClient { private readonly ILogger _logger; private readonly DealerSocket _dealer = new(); private readonly NetMQPoller _poller = new(); private readonly Guid _clientId = Guid.NewGuid(); private readonly InferenceClientConfig _inferenceClientConfig; private readonly LoaderClientConfig _loaderClientConfig; private readonly IMediator _mediator; public InferenceClient(ILogger logger, IOptions inferenceConfig, IMediator mediator, IOptions loaderConfig) { _logger = logger; _inferenceClientConfig = inferenceConfig.Value; _loaderClientConfig = loaderConfig.Value; _mediator = mediator; Start(); } private void Start() { try { using var process = new Process(); process.StartInfo = new ProcessStartInfo { FileName = Constants.EXTERNAL_INFERENCE_PATH, Arguments = $"-p {_inferenceClientConfig.ZeroMqPort} -lp {_loaderClientConfig.ZeroMqPort} -a {_inferenceClientConfig.ApiUrl}", CreateNoWindow = true }; process.Start(); } catch (Exception e) { _logger.LogError(e, e.Message); } _dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N")); _dealer.Connect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.ReceiveReady += async (_, e) => await ProcessClientCommand(e.Socket); _poller.Add(_dealer); _ = Task.Run(() => _poller.RunAsync()); } private async Task ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default) { while (socket.TryReceiveFrameBytes(TimeSpan.Zero, out var bytes)) { if (bytes.Length == 0) continue; var remoteCommand = MessagePackSerializer.Deserialize(bytes, cancellationToken: ct); switch (remoteCommand.CommandType) { case CommandType.InferenceData: await _mediator.Publish(new InferenceDataEvent(remoteCommand), ct); break; case CommandType.AIAvailabilityResult: var aiAvailabilityStatus = MessagePackSerializer.Deserialize(remoteCommand.Data, cancellationToken: ct); await _mediator.Publish(aiAvailabilityStatus, ct); break; default: throw new ArgumentOutOfRangeException(); } } } public void Stop() => Send(RemoteCommand.Create(CommandType.StopInference)); public void Send(RemoteCommand command) => _dealer.SendFrame(MessagePackSerializer.Serialize(command)); public void Dispose() { _poller.Stop(); _poller.Dispose(); _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit))); _dealer.Disconnect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.Close(); _dealer.Dispose(); } }