refactor external clients

put model batch size as parameter in config
This commit is contained in:
Alex Bezdieniezhnykh
2025-03-24 00:33:41 +02:00
parent 32f9de3c71
commit 6429ad62c2
28 changed files with 352 additions and 226 deletions
@@ -0,0 +1,26 @@
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.DTO.Commands;
using Microsoft.Extensions.DependencyInjection;
namespace Azaion.CommonSecurity.Services;
public interface IAuthProvider
{
void Login(ApiCredentials credentials);
User CurrentUser { get; }
}
public class AuthProvider([FromKeyedServices(SecurityConstants.EXTERNAL_INFERENCE_PATH)] IExternalClient externalClient) : IAuthProvider
{
public User CurrentUser { get; private set; } = null!;
public void Login(ApiCredentials credentials)
{
externalClient.Send(RemoteCommand.Create(CommandType.Login, credentials));
var user = externalClient.Get<User>();
if (user == null)
throw new Exception("Can't get user from Auth provider");
CurrentUser = user;
}
}
@@ -0,0 +1,112 @@
using System.Diagnostics;
using System.Text;
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.DTO.Commands;
using MessagePack;
using Microsoft.Extensions.Options;
using NetMQ;
using NetMQ.Sockets;
namespace Azaion.CommonSecurity.Services;
public interface IExternalClient
{
void Stop();
void Send(RemoteCommand create);
T? Get<T>(int retries = 24, int tryTimeoutSeconds = 5, CancellationToken ct = default) where T : class;
byte[]? GetBytes(int retries = 24, int tryTimeoutSeconds = 5, CancellationToken ct = default);
}
public abstract class BaseZeroMqExternalClient : IExternalClient
{
private readonly DealerSocket _dealer = new();
private readonly Guid _clientId = Guid.NewGuid();
private readonly ExternalClientConfig _externalClientConfig;
protected abstract string ClientPath { get; }
protected BaseZeroMqExternalClient(ExternalClientConfig config)
{
_externalClientConfig = config;
Start();
}
private void Start()
{
try
{
using var process = new Process();
process.StartInfo = new ProcessStartInfo
{
FileName = ClientPath
//Arguments = $"-e {credentials.Email} -p {credentials.Password} -f {apiConfig.ResourcesFolder}",
//RedirectStandardOutput = true,
//RedirectStandardError = true,
//CreateNoWindow = true
};
process.OutputDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); };
process.ErrorDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); };
process.Start();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
_dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N"));
_dealer.Connect($"tcp://{_externalClientConfig.ZeroMqHost}:{_externalClientConfig.ZeroMqPort}");
}
public void Stop()
{
if (!_dealer.IsDisposed)
{
_dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit)));
_dealer.Close();
}
}
public void Send(RemoteCommand command)
{
_dealer.SendFrame(MessagePackSerializer.Serialize(command));
}
public T? Get<T>(int retries = 24, int tryTimeoutSeconds = 5, CancellationToken ct = default) where T : class
{
var bytes = GetBytes(retries, tryTimeoutSeconds, ct);
return bytes != null ? MessagePackSerializer.Deserialize<T>(bytes, cancellationToken: ct) : null;
}
public byte[]? GetBytes(int retries = 24, int tryTimeoutSeconds = 5, CancellationToken ct = default)
{
var tryNum = 0;
while (!ct.IsCancellationRequested && tryNum++ < retries)
{
if (!_dealer.TryReceiveFrameBytes(TimeSpan.FromSeconds(tryTimeoutSeconds), out var bytes))
continue;
return bytes;
}
if (!ct.IsCancellationRequested)
throw new Exception($"Unable to get bytes after {tryNum} retries, {tryTimeoutSeconds} seconds each");
return null;
}
}
public class InferenceExternalClient(IOptions<InferenceClientConfig> inferenceClientConfig)
: BaseZeroMqExternalClient(inferenceClientConfig.Value)
{
protected override string ClientPath => SecurityConstants.EXTERNAL_INFERENCE_PATH;
}
public class GpsDeniedExternalClient(IOptions<GpsDeniedClientConfig> gpsDeniedClientConfig)
: BaseZeroMqExternalClient(gpsDeniedClientConfig.Value)
{
protected override string ClientPath => SecurityConstants.EXTERNAL_GPS_DENIED_PATH;
}
@@ -1,98 +0,0 @@
using System.Diagnostics;
using System.Text;
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.DTO.Commands;
using MessagePack;
using NetMQ;
using NetMQ.Sockets;
namespace Azaion.CommonSecurity.Services;
public interface IResourceLoader
{
MemoryStream LoadFileFromPython(string fileName, string? folder = null);
void StopPython();
}
public interface IAuthProvider
{
User CurrentUser { get; }
}
public class PythonResourceLoader : IResourceLoader, IAuthProvider
{
private readonly DealerSocket _dealer = new();
private readonly Guid _clientId = Guid.NewGuid();
public User CurrentUser { get; set; } = null!;
public PythonResourceLoader(PythonConfig config)
{
StartPython();
_dealer.Options.Identity = Encoding.UTF8.GetBytes(_clientId.ToString("N"));
_dealer.Connect($"tcp://{config.ZeroMqHost}:{config.ZeroMqPort}");
}
private void StartPython()
{
try
{
using var process = new Process();
process.StartInfo = new ProcessStartInfo
{
FileName = SecurityConstants.AZAION_INFERENCE_PATH,
//Arguments = $"-e {credentials.Email} -p {credentials.Password} -f {apiConfig.ResourcesFolder}",
//UseShellExecute = false,
//RedirectStandardOutput = true,
// RedirectStandardError = true,
//CreateNoWindow = true
};
process.OutputDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); };
process.ErrorDataReceived += (_, e) => { if (e.Data != null) Console.WriteLine(e.Data); };
process.Start();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
public void Login(ApiCredentials credentials)
{
_dealer.SendFrame(RemoteCommand.Serialize(CommandType.Login, credentials));
var user = _dealer.Get<User>();
if (user == null)
throw new Exception("Can't get user from Auth provider");
CurrentUser = user;
}
public void StopPython()
{
if (!_dealer.IsDisposed)
{
_dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit)));
_dealer.Close();
}
}
public MemoryStream LoadFileFromPython(string fileName, string? folder = null)
{
try
{
_dealer.SendFrame(RemoteCommand.Serialize(CommandType.Load, new LoadFileData(fileName, folder)));
if (!_dealer.TryReceiveFrameBytes(TimeSpan.FromSeconds(300), out var bytes))
throw new Exception($"Unable to receive {fileName}");
return new MemoryStream(bytes);
}
catch (Exception ex)
{
throw new Exception($"Failed to load fil0e '{fileName}': {ex.Message}", ex);
}
}
}
@@ -0,0 +1,22 @@
using Azaion.CommonSecurity.DTO.Commands;
using Microsoft.Extensions.DependencyInjection;
namespace Azaion.CommonSecurity.Services;
public interface IResourceLoader
{
MemoryStream LoadFile(string fileName, string? folder = null);
}
public class ResourceLoader([FromKeyedServices(SecurityConstants.EXTERNAL_INFERENCE_PATH)] IExternalClient externalClient) : IResourceLoader
{
public MemoryStream LoadFile(string fileName, string? folder = null)
{
externalClient.Send(RemoteCommand.Create(CommandType.Load, new LoadFileData(fileName, folder)));
var bytes = externalClient.GetBytes();
if (bytes == null)
throw new Exception($"Unable to receive {fileName}");
return new MemoryStream(bytes);
}
}