using System.Diagnostics; using System.IO; using System.Text; using Azaion.Common.Database; using Azaion.Common.Extensions; using Azaion.CommonSecurity; using Azaion.CommonSecurity.DTO; using Azaion.CommonSecurity.DTO.Commands; using Azaion.CommonSecurity.Exceptions; using Azaion.CommonSecurity.Services; using MessagePack; using Microsoft.Extensions.Options; using NetMQ; using NetMQ.Sockets; namespace Azaion.Common.Services; public interface IInferenceClient : IDisposable { event EventHandler BytesReceived; event EventHandler? InferenceDataReceived; event EventHandler? AIAvailabilityReceived; void Send(RemoteCommand create); void Stop(); } public class InferenceClient : IInferenceClient, IResourceLoader { private CancellationTokenSource _waitFileCancelSource = new(); public event EventHandler? BytesReceived; public event EventHandler? InferenceDataReceived; public event EventHandler? AIAvailabilityReceived; private readonly DealerSocket _dealer = new(); private readonly NetMQPoller _poller = new(); private readonly Guid _clientId = Guid.NewGuid(); private readonly InferenceClientConfig _inferenceClientConfig; public InferenceClient(IOptions config, CancellationToken ct) { _inferenceClientConfig = config.Value; Start(ct); } private void Start(CancellationToken ct = default) { try { using var process = new Process(); process.StartInfo = new ProcessStartInfo { FileName = SecurityConstants.EXTERNAL_INFERENCE_PATH, Arguments = $"--port {_inferenceClientConfig.ZeroMqPort} --api {_inferenceClientConfig.ApiUrl}", //RedirectStandardOutput = true, //RedirectStandardError = true, //CreateNoWindow = true }; process.OutputDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); }; process.ErrorDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); }; process.Start(); } catch (Exception e) { Console.WriteLine(e); //throw; } _dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N")); _dealer.Connect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.ReceiveReady += (_, e) => ProcessClientCommand(e.Socket, ct); _poller.Add(_dealer); _ = Task.Run(() => _poller.RunAsync(), ct); } private void ProcessClientCommand(NetMQSocket socket, CancellationToken ct = default) { while (socket.TryReceiveFrameBytes(TimeSpan.Zero, out var bytes)) { if (bytes?.Length == 0) continue; var remoteCommand = MessagePackSerializer.Deserialize(bytes, cancellationToken: ct); switch (remoteCommand.CommandType) { case CommandType.DataBytes: BytesReceived?.Invoke(this, remoteCommand); break; case CommandType.InferenceData: InferenceDataReceived?.Invoke(this, remoteCommand); break; case CommandType.AIAvailabilityResult: AIAvailabilityReceived?.Invoke(this, remoteCommand); break; } } } public void Stop() { } public void Send(RemoteCommand command) { _dealer.SendFrame(MessagePackSerializer.Serialize(command)); } public MemoryStream LoadFile(string fileName, string? folder = null, TimeSpan? timeout = null) { //TODO: Bad solution, look for better implementation byte[] bytes = []; Exception? exception = null; _waitFileCancelSource = new CancellationTokenSource(); Send(RemoteCommand.Create(CommandType.Load, new LoadFileData(fileName, folder))); BytesReceived += OnBytesReceived; void OnBytesReceived(object? sender, RemoteCommand command) { if (command.Data is null) { exception = new BusinessException(command.Message ?? "File is empty"); _waitFileCancelSource.Cancel(); } bytes = command.Data!; _waitFileCancelSource.Cancel(); } _waitFileCancelSource.Token.WaitForCancel(timeout ?? TimeSpan.FromSeconds(15)); BytesReceived -= OnBytesReceived; if (exception != null) throw exception; return new MemoryStream(bytes); } public void Dispose() { _waitFileCancelSource.Dispose(); _poller.Stop(); _poller.Dispose(); _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit))); _dealer.Disconnect($"tcp://{_inferenceClientConfig.ZeroMqHost}:{_inferenceClientConfig.ZeroMqPort}"); _dealer.Close(); _dealer.Dispose(); } }