Files
annotations/Azaion.Common/Database/DbFactory.cs
T
Alex Bezdieniezhnykh d842466594 gps matcher async
put cryptography lib to fixed version
fix race condition bug in queue handler
add lock to db writing and backup to file db on each write
2025-05-29 00:35:35 +03:00

182 lines
6.5 KiB
C#

using System.Data.SQLite;
using System.IO;
using Azaion.Common.DTO;
using Azaion.Common.DTO.Config;
using Azaion.Common.Extensions;
using LinqToDB;
using LinqToDB.DataProvider.SQLite;
using LinqToDB.Mapping;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace Azaion.Common.Database;
public interface IDbFactory
{
Task<T> Run<T>(Func<AnnotationsDb, Task<T>> func);
Task RunWrite(Func<AnnotationsDb, Task> func);
Task<T> RunWrite<T>(Func<AnnotationsDb, Task<T>> func);
Task DeleteAnnotations(List<string> annotationNames, CancellationToken cancellationToken = default);
}
public class DbFactory : IDbFactory
{
private readonly ILogger<DbFactory> _logger;
private readonly AnnotationConfig _annConfig;
private string MemoryConnStr => "Data Source=:memory:";
private readonly SQLiteConnection _memoryConnection;
private readonly DataOptions _memoryDataOptions;
private string FileConnStr => $"Data Source={_annConfig.AnnotationsDbFile}";
private readonly SQLiteConnection _fileConnection;
private readonly DataOptions _fileDataOptions;
private static readonly SemaphoreSlim WriteSemaphore = new(1, 1);
private static readonly Guid SaveTaskId = Guid.NewGuid();
public DbFactory(IOptions<AnnotationConfig> annConfig, ILogger<DbFactory> logger)
{
_logger = logger;
_annConfig = annConfig.Value;
_memoryConnection = new SQLiteConnection(MemoryConnStr);
_memoryConnection.Open();
_memoryDataOptions = new DataOptions()
.UseDataProvider(SQLiteTools.GetDataProvider())
.UseConnection(_memoryConnection)
.UseMappingSchema(AnnotationsDbSchemaHolder.MappingSchema)
;//.UseTracing(TraceLevel.Info, t => logger.LogInformation(t.SqlText));
_fileConnection = new SQLiteConnection(FileConnStr);
_fileDataOptions = new DataOptions()
.UseDataProvider(SQLiteTools.GetDataProvider())
.UseConnection(_fileConnection)
.UseMappingSchema(AnnotationsDbSchemaHolder.MappingSchema);
if (!File.Exists(_annConfig.AnnotationsDbFile))
SQLiteConnection.CreateFile(_annConfig.AnnotationsDbFile);
RecreateTables();
_fileConnection.Open();
_fileConnection.BackupDatabase(_memoryConnection, "main", "main", -1, null, -1);
}
private void RecreateTables()
{
using var db = new AnnotationsDb(_fileDataOptions);
var schema = db.DataProvider.GetSchemaProvider().GetSchema(db);
var existingTables = schema.Tables.Select(x => x.TableName).ToHashSet();
if (!existingTables.Contains(Constants.ANNOTATIONS_TABLENAME))
db.CreateTable<Annotation>();
if (!existingTables.Contains(Constants.DETECTIONS_TABLENAME))
db.CreateTable<Detection>();
if (!existingTables.Contains(Constants.ANNOTATIONS_QUEUE_TABLENAME))
db.CreateTable<AnnotationQueueRecord>();
}
public async Task<T> Run<T>(Func<AnnotationsDb, Task<T>> func)
{
await using var db = new AnnotationsDb(_memoryDataOptions);
return await func(db);
}
public async Task RunWrite(Func<AnnotationsDb, Task> func)
{
await WriteSemaphore.WaitAsync();
try
{
await using var db = new AnnotationsDb(_memoryDataOptions);
await func(db);
ThrottleExt.Throttle(async () =>
{
_memoryConnection.BackupDatabase(_fileConnection, "main", "main", -1, null, -1);
await Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), true);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
throw;
}
finally
{
WriteSemaphore.Release();
}
}
public async Task<T> RunWrite<T>(Func<AnnotationsDb, Task<T>> func)
{
await WriteSemaphore.WaitAsync();
try
{
await using var db = new AnnotationsDb(_memoryDataOptions);
var result = await func(db);
ThrottleExt.Throttle(async () =>
{
_memoryConnection.BackupDatabase(_fileConnection, "main", "main", -1, null, -1);
await Task.CompletedTask;
}, SaveTaskId, TimeSpan.FromSeconds(5), true);
return result;
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
throw;
}
finally
{
WriteSemaphore.Release();
}
}
public async Task DeleteAnnotations(List<string> annotationNames, CancellationToken cancellationToken = default)
{
await RunWrite(async db =>
{
var detDeleted = await db.Detections.DeleteAsync(x => annotationNames.Contains(x.AnnotationName), token: cancellationToken);
var annDeleted = await db.Annotations.DeleteAsync(x => annotationNames.Contains(x.Name), token: cancellationToken);
Console.WriteLine($"Deleted {detDeleted} detections, {annDeleted} annotations");
});
}
}
public static class AnnotationsDbSchemaHolder
{
public static readonly MappingSchema MappingSchema;
static AnnotationsDbSchemaHolder()
{
MappingSchema = new MappingSchema();
var builder = new FluentMappingBuilder(MappingSchema);
var annotationBuilder = builder.Entity<Annotation>();
annotationBuilder.HasTableName(Constants.ANNOTATIONS_TABLENAME)
.HasPrimaryKey(x => x.Name)
.Association(a => a.Detections, (a, d) => a.Name == d.AnnotationName)
.Property(x => x.Time).HasDataType(DataType.Int64).HasConversion(ts => ts.Ticks, t => new TimeSpan(t));
annotationBuilder
.Ignore(x => x.Milliseconds)
.Ignore(x => x.Classes)
.Ignore(x => x.Classes)
.Ignore(x => x.ImagePath)
.Ignore(x => x.LabelPath)
.Ignore(x => x.ThumbPath);
builder.Entity<Detection>()
.HasTableName(Constants.DETECTIONS_TABLENAME);
builder.Entity<AnnotationQueueRecord>()
.HasTableName(Constants.ANNOTATIONS_QUEUE_TABLENAME)
.HasPrimaryKey(x => x.Id)
.Property(x => x.AnnotationNames)
.HasDataType(DataType.NVarChar)
.HasConversion(list => JsonConvert.SerializeObject(list), str => JsonConvert.DeserializeObject<List<string>>(str) ?? new List<string>());
builder.Build();
}
}