queue + local sqlite WIP

This commit is contained in:
Alex Bezdieniezhnykh
2024-12-17 18:46:33 +02:00
parent 626767469a
commit 5fa18aa514
47 changed files with 694 additions and 222 deletions
+132
View File
@@ -0,0 +1,132 @@
using System.Drawing.Imaging;
using System.IO;
using System.Net;
using Azaion.Common.Database;
using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.Common.DTO.Queue;
using Azaion.CommonSecurity.DTO;
using Azaion.CommonSecurity.Services;
using LinqToDB;
using MessagePack;
using Microsoft.Extensions.Options;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace Azaion.Common.Services;
public class AnnotationService
{
private readonly AzaionApiClient _apiClient;
private readonly IDbFactory _dbFactory;
private readonly FailsafeAnnotationsProducer _producer;
private readonly QueueConfig _queueConfig;
private Consumer _consumer = null!;
public AnnotationService(AzaionApiClient apiClient,
IDbFactory dbFactory,
FailsafeAnnotationsProducer producer,
IOptions<QueueConfig> queueConfig)
{
_apiClient = apiClient;
_dbFactory = dbFactory;
_producer = producer;
_queueConfig = queueConfig.Value;
Task.Run(async () => await Init()).Wait();
}
private async Task Init()
{
var consumerSystem = await StreamSystem.Create(new StreamSystemConfig
{
Endpoints = new List<EndPoint>{new DnsEndPoint(_queueConfig.Host, _queueConfig.Port)},
UserName = _queueConfig.ConsumerUsername,
Password = _queueConfig.ConsumerPassword
});
_consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE)
{
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (stream, _, _, message) =>
await Consume(MessagePackSerializer.Deserialize<AnnotationCreatedMessage>(message.Data.Contents)),
});
}
//AI / Manual
public async Task SaveAnnotation(string fName, List<YoloLabel>? labels, SourceEnum source, MemoryStream? stream = null, CancellationToken token = default) =>
await SaveAnnotationInner(DateTime.UtcNow, fName, labels, source, stream, _apiClient.User.Role, _apiClient.User.Email, token);
//Queue (only from operators)
public async Task Consume(AnnotationCreatedMessage message, CancellationToken cancellationToken = default)
{
if (message.CreatedRole == RoleEnum.Validator) //Don't proceed our own messages (or from another Validator)
return;
await SaveAnnotationInner(
message.CreatedDate,
message.Name,
YoloLabel.Deserialize(message.Label),
message.Source,
new MemoryStream(message.Image),
message.CreatedRole,
message.CreatedEmail,
cancellationToken);
}
private async Task SaveAnnotationInner(DateTime createdDate, string fName, List<YoloLabel>? labels, SourceEnum source, MemoryStream? stream,
RoleEnum createdRole,
string createdEmail,
CancellationToken token = default)
{
//Flow for roles:
// Operator:
// sourceEnum: (manual, ai) <AnnotationCreatedMessage>
// Validator:
// sourceEnum: (manual) if was in received.json then <AnnotationValidatedMessage> else <AnnotationCreatedMessage>
// sourceEnum: (queue, AI) if queue CreatedMessage with the same user - do nothing Add to received.json
var classes = labels?.Select(x => x.ClassNumber).Distinct().ToList() ?? [];
AnnotationStatus status;
var annotation = await _dbFactory.Run(async db =>
{
var ann = await db.Annotations.FirstOrDefaultAsync(x => x.Name == fName, token: token);
status = ann?.AnnotationStatus == AnnotationStatus.Created && createdRole == RoleEnum.Validator
? AnnotationStatus.Validated
: AnnotationStatus.Created;
if (ann != null)
await db.Annotations
.Where(x => x.Name == fName)
.Set(x => x.Classes, classes)
.Set(x => x.Source, source)
.Set(x => x.AnnotationStatus, status)
.UpdateAsync(token: token);
else
{
ann = new Annotation
{
CreatedDate = createdDate,
Name = fName,
Classes = classes,
CreatedEmail = createdEmail,
CreatedRole = createdRole,
AnnotationStatus = status,
Source = source
};
await db.InsertAsync(ann, token: token);
}
return ann;
});
if (stream != null)
{
var img = System.Drawing.Image.FromStream(stream);
img.Save(annotation.ImagePath, ImageFormat.Jpeg); //todo: check png images coming from queue
}
if (labels != null)
await YoloLabel.WriteToFile(labels, annotation.LabelPath, token);
await _producer.SendToQueue(annotation, token);
}
}
-98
View File
@@ -1,98 +0,0 @@
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security;
using System.Text;
using Azaion.Common.DTO;
using Newtonsoft.Json;
using System.IdentityModel.Tokens.Jwt;
namespace Azaion.Common.Services;
public class AzaionApiClient(HttpClient httpClient) : IDisposable
{
const string JSON_MEDIA = "application/json";
private string Email { get; set; } = null!;
private SecureString Password { get; set; } = new();
private string JwtToken { get; set; } = null!;
public User User { get; set; } = null!;
public void EnterCredentials(ApiCredentials credentials)
{
if (string.IsNullOrWhiteSpace(credentials.Email) || string.IsNullOrWhiteSpace(credentials.Password))
throw new Exception("Email or password is empty!");
Email = credentials.Email;
Password = credentials.Password.ToSecureString();
}
public async Task<Stream> GetResource(string fileName, string password, HardwareInfo hardware)
{
var response = await Send(httpClient, new HttpRequestMessage(HttpMethod.Post, "/resources/get")
{
Content = new StringContent(JsonConvert.SerializeObject(new { fileName, password, hardware }), Encoding.UTF8, JSON_MEDIA)
});
return await response.Content.ReadAsStreamAsync();
}
private async Task Authorize()
{
if (string.IsNullOrEmpty(Email) || Password.Length == 0)
throw new Exception("Email or password is empty! Please do EnterCredentials first!");
var payload = new
{
email = Email,
password = Password.ToRealString()
};
var response = await httpClient.PostAsync(
"login",
new StringContent(JsonConvert.SerializeObject(payload), Encoding.UTF8, JSON_MEDIA));
if (!response.IsSuccessStatusCode)
throw new Exception($"EnterCredentials failed: {response.StatusCode}");
var responseData = await response.Content.ReadAsStringAsync();
var result = JsonConvert.DeserializeObject<LoginResponse>(responseData);
if (string.IsNullOrEmpty(result?.Token))
throw new Exception("JWT Token not found in response");
var handler = new JwtSecurityTokenHandler();
var token = handler.ReadJwtToken(result.Token);
User = new User(token.Claims);
JwtToken = result.Token;
}
private async Task<HttpResponseMessage> Send(HttpClient client, HttpRequestMessage request)
{
if (string.IsNullOrEmpty(JwtToken))
await Authorize();
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", JwtToken);
var response = await client.SendAsync(request);
if (response.StatusCode == HttpStatusCode.Unauthorized)
{
await Authorize();
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", JwtToken);
response = await client.SendAsync(request);
}
if (response.IsSuccessStatusCode)
return response;
var result = await response.Content.ReadAsStringAsync();
throw new Exception($"Failed: {response.StatusCode}! Result: {result}");
}
public void Dispose()
{
httpClient.Dispose();
Password.Dispose();
}
}
+125
View File
@@ -0,0 +1,125 @@
using System.IO;
using System.Net;
using Azaion.Common.Database;
using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.Common.DTO.Queue;
using LinqToDB;
using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace Azaion.Common.Services;
public class FailsafeAnnotationsProducer
{
private readonly ILogger<FailsafeAnnotationsProducer> _logger;
private readonly IDbFactory _dbFactory;
private readonly QueueConfig _queueConfig;
private Producer _annotationProducer = null!;
private Producer _annotationConfirmProducer = null!;
public FailsafeAnnotationsProducer(ILogger<FailsafeAnnotationsProducer> logger, IDbFactory dbFactory, IOptions<QueueConfig> queueConfig)
{
_logger = logger;
_dbFactory = dbFactory;
_queueConfig = queueConfig.Value;
Task.Run(async () => await ProcessQueue()).Wait();
}
private async Task<StreamSystem> GetProducerQueueConfig()
{
return await StreamSystem.Create(new StreamSystemConfig
{
Endpoints = new List<EndPoint> { new IPEndPoint(IPAddress.Parse(_queueConfig.Host), _queueConfig.Port) },
UserName = _queueConfig.ProducerUsername,
Password = _queueConfig.ProducerPassword
});
}
private async Task Init(CancellationToken cancellationToken = default)
{
_annotationProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_QUEUE));
//_annotationConfirmProducer = await Producer.Create(new ProducerConfig(await GetProducerQueueConfig(), Constants.MQ_ANNOTATIONS_CONFIRM_QUEUE));
}
private async Task ProcessQueue(CancellationToken cancellationToken = default)
{
await Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
var messages = await GetFromQueue(cancellationToken);
foreach (var messagesChunk in messages.Chunk(10)) //Sending by 10
{
var sent = false;
while (!sent || cancellationToken.IsCancellationRequested) //Waiting for send
{
try
{
var createdMessages = messagesChunk
.Where(x => x.Status == AnnotationStatus.Created)
.Select(x => new Message(MessagePackSerializer.Serialize(x)))
.ToList();
await _annotationProducer.Send(createdMessages, CompressionType.Gzip);
var validatedMessages = messagesChunk
.Where(x => x.Status == AnnotationStatus.Validated)
.Select(x => new Message(MessagePackSerializer.Serialize(x)))
.ToList();
await _annotationConfirmProducer.Send(validatedMessages, CompressionType.Gzip);
await _dbFactory.Run(async db =>
await db.AnnotationsQueue.DeleteAsync(aq => messagesChunk.Any(x => aq.Name == x.Name), token: cancellationToken));
sent = true;
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
}
}
}
}
}
private async Task<List<AnnotationCreatedMessage>> GetFromQueue(CancellationToken cancellationToken = default)
{
return await _dbFactory.Run(async db =>
{
var annotations = await db.AnnotationsQueue.Join(db.Annotations, aq => aq.Name, a => a.Name, (aq, a) => a)
.ToListAsync(token: cancellationToken);
var messages = new List<AnnotationCreatedMessage>();
foreach (var annotation in annotations)
{
var image = await File.ReadAllBytesAsync(annotation.ImagePath, cancellationToken);
var label = await File.ReadAllTextAsync(annotation.LabelPath, cancellationToken);
var annCreateMessage = new AnnotationCreatedMessage
{
Name = annotation.Name,
CreatedRole = annotation.CreatedRole,
CreatedEmail = annotation.CreatedEmail,
CreatedDate = annotation.CreatedDate,
Image = image,
Label = label,
Source = annotation.Source
};
messages.Add(annCreateMessage);
}
return messages;
});
}
public async Task SendToQueue(Annotation annotation, CancellationToken cancellationToken = default)
{
await _dbFactory.Run(async db =>
await db.InsertAsync(new AnnotationName { Name = annotation.Name }, token: cancellationToken));
}
}
-108
View File
@@ -1,108 +0,0 @@
using System.Diagnostics;
using System.Net.NetworkInformation;
using System.Security.Cryptography;
using System.Text;
using Azaion.Common.DTO;
namespace Azaion.Common.Services;
public interface IHardwareService
{
HardwareInfo GetHardware();
}
public class HardwareService : IHardwareService
{
private const string WIN32_GET_HARDWARE_COMMAND =
"wmic OS get TotalVisibleMemorySize /Value && " +
"wmic CPU get Name /Value && " +
"wmic path Win32_VideoController get Name /Value";
private const string UNIX_GET_HARDWARE_COMMAND =
"/bin/bash -c \"free -g | grep Mem: | awk '{print $2}' && " +
"lscpu | grep 'Model name:' | cut -d':' -f2 && " +
"lspci | grep VGA | cut -d':' -f3\"";
public HardwareInfo GetHardware()
{
try
{
var output = RunCommand(Environment.OSVersion.Platform == PlatformID.Win32NT
? WIN32_GET_HARDWARE_COMMAND
: UNIX_GET_HARDWARE_COMMAND);
var lines = output
.Replace("TotalVisibleMemorySize=", "")
.Replace("Name=", "")
.Replace(" ", " ")
.Trim()
.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries);
var memoryStr = "Unknown RAM";
if (lines.Length > 0)
{
memoryStr = lines[0];
if (int.TryParse(memoryStr, out var memKb))
memoryStr = $"{Math.Round(memKb / 1024.0 / 1024.0)} Gb";
}
var hardwareInfo = new HardwareInfo
{
Memory = memoryStr,
CPU = lines.Length > 1 && string.IsNullOrEmpty(lines[1])
? "Unknown RAM"
: lines[1],
GPU = lines.Length > 2 && string.IsNullOrEmpty(lines[2])
? "Unknown GPU"
: lines[2]
};
hardwareInfo.Hash = ToHash($"Azaion_{MacAddress()}_{hardwareInfo.CPU}_{hardwareInfo.GPU}");
return hardwareInfo;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
private string MacAddress()
{
var macAddress = NetworkInterface
.GetAllNetworkInterfaces()
.Where(nic => nic.OperationalStatus == OperationalStatus.Up)
.Select(nic => nic.GetPhysicalAddress().ToString())
.FirstOrDefault();
return macAddress ?? string.Empty;
}
private string RunCommand(string command)
{
try
{
using var process = new Process();
process.StartInfo.FileName = Environment.OSVersion.Platform == PlatformID.Unix ? "/bin/bash" : "cmd.exe";
process.StartInfo.Arguments = Environment.OSVersion.Platform == PlatformID.Unix
? $"-c \"{command}\""
: $"/c {command}";
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.UseShellExecute = false;
process.StartInfo.CreateNoWindow = true;
process.Start();
var result = process.StandardOutput.ReadToEnd();
process.WaitForExit();
return result.Trim();
}
catch
{
return string.Empty;
}
}
private static string ToHash(string str) =>
Convert.ToBase64String(SHA384.HashData(Encoding.UTF8.GetBytes(str)));
}
-58
View File
@@ -1,58 +0,0 @@
using System.IO;
using System.Reflection;
using Azaion.Common.DTO;
namespace Azaion.Common.Services;
public interface IResourceLoader
{
Task<MemoryStream> Load(string fileName, CancellationToken cancellationToken = default);
Assembly? LoadAssembly(string asmName);
}
public class ResourceLoader(AzaionApiClient api, ApiCredentials credentials) : IResourceLoader
{
private static readonly List<string> EncryptedResources =
[
"Azaion.Annotator",
"Azaion.Dataset"
];
public Assembly? LoadAssembly(string resourceName)
{
var assemblyName = resourceName.Split(',').First();
if (EncryptedResources.Contains(assemblyName))
{
try
{
var stream = Load($"{assemblyName}.dll").GetAwaiter().GetResult();
return Assembly.Load(stream.ToArray());
}
catch (Exception e)
{
Console.WriteLine(e);
var currentLocation = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!;
var dllPath = Path.Combine(currentLocation, "dummy", $"{assemblyName}.dll");
return Assembly.LoadFile(dllPath);
}
}
var loadedAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().Name == assemblyName);
return loadedAssembly;
}
public async Task<MemoryStream> Load(string fileName, CancellationToken cancellationToken = default)
{
var hardwareService = new HardwareService();
var hardwareInfo = hardwareService.GetHardware();
var encryptedStream = Task.Run(() => api.GetResource(fileName, credentials.Password, hardwareInfo), cancellationToken).Result;
var key = Security.MakeEncryptionKey(credentials.Email, credentials.Password, hardwareInfo.Hash);
var stream = new MemoryStream();
await encryptedStream.DecryptTo(stream, key, cancellationToken);
return stream;
}
}
-83
View File
@@ -1,83 +0,0 @@
using System.IO;
using System.Runtime.InteropServices;
using System.Security;
using System.Security.Cryptography;
using System.Text;
namespace Azaion.Common.Services;
public static class Security
{
private const int BUFFER_SIZE = 524288; // 512 KB buffer size
public static string ToHash(this string str) =>
Convert.ToBase64String(SHA384.HashData(Encoding.UTF8.GetBytes(str)));
public static string MakeEncryptionKey(string email, string password, string? hardwareHash) =>
$"{email}-{password}-{hardwareHash}-#%@AzaionKey@%#---".ToHash();
public static SecureString ToSecureString(this string str)
{
var secureString = new SecureString();
foreach (var c in str.ToCharArray())
secureString.AppendChar(c);
return secureString;
}
public static string? ToRealString(this SecureString value)
{
var valuePtr = IntPtr.Zero;
try
{
valuePtr = Marshal.SecureStringToGlobalAllocUnicode(value);
return Marshal.PtrToStringUni(valuePtr);
}
finally
{
Marshal.ZeroFreeGlobalAllocUnicode(valuePtr);
}
}
public static async Task EncryptTo(this Stream stream, Stream toStream, string key, CancellationToken cancellationToken = default)
{
if (stream is { CanRead: false }) throw new ArgumentNullException(nameof(stream));
if (key is not { Length: > 0 }) throw new ArgumentNullException(nameof(key));
using var aes = Aes.Create();
aes.Key = SHA256.HashData(Encoding.UTF8.GetBytes(key));
aes.GenerateIV();
using var encryptor = aes.CreateEncryptor(aes.Key, aes.IV);
await using var cs = new CryptoStream(toStream, encryptor, CryptoStreamMode.Write, leaveOpen: true);
// Prepend IV to the encrypted data
await toStream.WriteAsync(aes.IV.AsMemory(0, aes.IV.Length), cancellationToken);
var buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, cancellationToken)) > 0)
await cs.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken);
}
public static async Task DecryptTo(this Stream encryptedStream, Stream toStream, string key, CancellationToken cancellationToken = default)
{
using var aes = Aes.Create();
aes.Key = SHA256.HashData(Encoding.UTF8.GetBytes(key));
// Read the IV from the start of the input stream
var iv = new byte[aes.BlockSize / 8];
_ = await encryptedStream.ReadAsync(iv, cancellationToken);
aes.IV = iv;
using var decryptor = aes.CreateDecryptor(aes.Key, aes.IV);
await using var cryptoStream = new CryptoStream(encryptedStream, decryptor, CryptoStreamMode.Read, leaveOpen: true);
// Read and write in chunks
var buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = await cryptoStream.ReadAsync(buffer, cancellationToken)) > 0)
await toStream.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken);
}
}