using System.Diagnostics; using System.Text; using Azaion.Common.DTO; using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NetMQ; using NetMQ.Sockets; namespace Azaion.Common.Services; public interface IInferenceClient : IDisposable { event EventHandler? InferenceDataReceived; event EventHandler? AIAvailabilityReceived; void Send(RemoteCommand create); void Stop(); } public class InferenceClient : IInferenceClient { private readonly ILogger _logger; public event EventHandler? BytesReceived; public event EventHandler? InferenceDataReceived; public event EventHandler? AIAvailabilityReceived; private readonly DealerSocket _dealer = new(); private readonly NetMQPoller _poller = new(); private readonly Guid _clientId = Guid.NewGuid(); private readonly InferenceClientConfig _inferenceClientConfig; private readonly LoaderClientConfig _loaderClientConfig; public InferenceClient(ILogger logger, IOptions inferenceConfig, IOptions loaderConfig) { _logger = logger; _inferenceClientConfig = inferenceConfig.Value; _loaderClientConfig = loaderConfig.Value; Start(); } private void Start() { try { using var process = new Process(); process.StartInfo = new ProcessStartInfo { FileName = Constants.EXTERNAL_INFERENCE_PATH, Arguments = $"-p {_inferenceClientConfig.ZeroMqPort} -lp {_loaderClientConfig.ZeroMqPort} -a {_inferenceClientConfig.ApiUrl}", CreateNoWindow = true }; process.Start(); } catch (Exception e) { _logger.LogError(e, e.Message); } _dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N")); _dealer.Connect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.ReceiveReady += (_, e) => ProcessClientCommand(e.Socket); _poller.Add(_dealer); _ = Task.Run(() => _poller.RunAsync()); } private void ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default) { while (socket.TryReceiveFrameBytes(TimeSpan.Zero, out var bytes)) { if (bytes?.Length == 0) continue; var remoteCommand = MessagePackSerializer.Deserialize(bytes, cancellationToken: ct); switch (remoteCommand.CommandType) { case CommandType.DataBytes: BytesReceived?.Invoke(this, remoteCommand); break; case CommandType.InferenceData: InferenceDataReceived?.Invoke(this, remoteCommand); break; case CommandType.AIAvailabilityResult: AIAvailabilityReceived?.Invoke(this, remoteCommand); break; } } } public void Stop() => Send(RemoteCommand.Create(CommandType.StopInference)); public void Send(RemoteCommand command) => _dealer.SendFrame(MessagePackSerializer.Serialize(command)); public void Dispose() { _poller.Stop(); _poller.Dispose(); _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit))); _dealer.Disconnect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.Close(); _dealer.Dispose(); } }