using System.Diagnostics; using System.IO; using System.Text; using Azaion.Common.Database; using Azaion.Common.Extensions; using Azaion.CommonSecurity; using Azaion.CommonSecurity.DTO; using Azaion.CommonSecurity.DTO.Commands; using Azaion.CommonSecurity.Exceptions; using Azaion.CommonSecurity.Services; using MessagePack; using Microsoft.Extensions.Options; using NetMQ; using NetMQ.Sockets; namespace Azaion.Common.Services; public interface IInferenceClient : IDisposable { event EventHandler? InferenceDataReceived; event EventHandler? AIAvailabilityReceived; void Send(RemoteCommand create); void Stop(); } public class InferenceClient(IOptions inferenceConfig, IOptions loaderConfig) : IInferenceClient { public event EventHandler? BytesReceived; public event EventHandler? InferenceDataReceived; public event EventHandler? AIAvailabilityReceived; private readonly DealerSocket _dealer = new(); private readonly NetMQPoller _poller = new(); private readonly Guid _clientId = Guid.NewGuid(); private readonly InferenceClientConfig _inferenceClientConfig = inferenceConfig.Value; private readonly LoaderClientConfig _loaderClientConfig = loaderConfig.Value; private void Start() { try { using var process = new Process(); process.StartInfo = new ProcessStartInfo { FileName = SecurityConstants.EXTERNAL_INFERENCE_PATH, Arguments = $"--port {_inferenceClientConfig.ZeroMqPort} --loader-port {_loaderClientConfig.ZeroMqPort} --api {_inferenceClientConfig.ApiUrl}", //RedirectStandardOutput = true, //RedirectStandardError = true, //CreateNoWindow = true }; process.OutputDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); }; process.ErrorDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); }; process.Start(); } catch (Exception e) { Console.WriteLine(e); //throw; } _dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N")); _dealer.Connect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.ReceiveReady += (_, e) => ProcessClientCommand(e.Socket); _poller.Add(_dealer); _ = Task.Run(() => _poller.RunAsync()); } private void ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default) { while (socket.TryReceiveFrameBytes(TimeSpan.Zero, out var bytes)) { if (bytes?.Length == 0) continue; var remoteCommand = MessagePackSerializer.Deserialize(bytes, cancellationToken: ct); switch (remoteCommand.CommandType) { case CommandType.DataBytes: BytesReceived?.Invoke(this, remoteCommand); break; case CommandType.InferenceData: InferenceDataReceived?.Invoke(this, remoteCommand); break; case CommandType.AIAvailabilityResult: AIAvailabilityReceived?.Invoke(this, remoteCommand); break; } } } public void Stop() => Send(RemoteCommand.Create(CommandType.StopInference)); public void Send(RemoteCommand command) => _dealer.SendFrame(MessagePackSerializer.Serialize(command)); public void Dispose() { _poller.Stop(); _poller.Dispose(); _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit))); _dealer.Disconnect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.Close(); _dealer.Dispose(); } }