From 277aaf09b05786792ea6d7e6ba5b8d344b078d3d Mon Sep 17 00:00:00 2001 From: Alex Bezdieniezhnykh Date: Thu, 17 Apr 2025 09:16:34 +0300 Subject: [PATCH] throttle reimplemented --- Azaion.Annotator/Annotator.xaml.cs | 4 +- .../Extensions/ThrottleExtensions.cs | 72 +++++++++++++++---- Azaion.Common/Services/AnnotationService.cs | 21 +++--- Azaion.CommonSecurity/DTO/User.cs | 8 +-- .../Services/AuthProvider.cs | 45 +++++++----- Azaion.Suite/App.xaml.cs | 4 +- Azaion.Suite/MainSuite.xaml.cs | 6 +- 7 files changed, 109 insertions(+), 51 deletions(-) diff --git a/Azaion.Annotator/Annotator.xaml.cs b/Azaion.Annotator/Annotator.xaml.cs index 3b4b39b..e3b3f19 100644 --- a/Azaion.Annotator/Annotator.xaml.cs +++ b/Azaion.Annotator/Annotator.xaml.cs @@ -261,11 +261,11 @@ public partial class Annotator _appConfig.UIConfig.LeftPanelWidth = MainGrid.ColumnDefinitions.FirstOrDefault()!.Width.Value; _appConfig.UIConfig.RightPanelWidth = MainGrid.ColumnDefinitions.LastOrDefault()!.Width.Value; - await ThrottleExt.Throttle(() => + ThrottleExt.Throttle(() => { _configUpdater.Save(_appConfig); return Task.CompletedTask; - }, TimeSpan.FromSeconds(5)); + }, SaveConfigTaskId, TimeSpan.FromSeconds(5)); } private void ShowTimeAnnotations(TimeSpan time) diff --git a/Azaion.Common/Extensions/ThrottleExtensions.cs b/Azaion.Common/Extensions/ThrottleExtensions.cs index b6c12e7..a210621 100644 --- a/Azaion.Common/Extensions/ThrottleExtensions.cs +++ b/Azaion.Common/Extensions/ThrottleExtensions.cs @@ -1,22 +1,70 @@ -namespace Azaion.Common.Extensions; +using System.Collections.Concurrent; + +namespace Azaion.Common.Extensions; public static class ThrottleExt { - private static readonly Dictionary LastExecution = new(); - private static readonly object Lock = new(); - - public static async Task Throttle(this Func func, TimeSpan interval, CancellationToken ct = default) + private class ThrottleState(Func action) { - ArgumentNullException.ThrowIfNull(func); + public Func Action { get; } = action ?? throw new ArgumentNullException(nameof(action)); + public bool IsCoolingDown = false; + public bool CallScheduledDuringCooldown = false; + public Task CooldownTask = Task.CompletedTask; + public readonly object StateLock = new(); + } - lock (Lock) + private static readonly ConcurrentDictionary ThrottlerStates = new(); + + public static void Throttle(Func action, Guid actionId, TimeSpan interval) + { + ArgumentNullException.ThrowIfNull(action); + if (actionId == Guid.Empty) + throw new ArgumentException("Throttle identifier cannot be empty.", nameof(actionId)); + if (interval <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(interval), "Interval must be positive."); + + var state = ThrottlerStates.GetOrAdd(actionId, new ThrottleState(action)); + + lock (state.StateLock) { - if (LastExecution.ContainsKey(func) && DateTime.UtcNow - LastExecution[func] < interval) - return; - - func(); - LastExecution[func] = DateTime.UtcNow; + if (!state.IsCoolingDown) + { + state.IsCoolingDown = true; + state.CooldownTask = ExecuteAndManageCooldownStaticAsync(actionId, interval, state); + } + else + { + state.CallScheduledDuringCooldown = true; + } } } + private static async Task ExecuteAndManageCooldownStaticAsync(Guid throttleId, TimeSpan interval, ThrottleState state) + { + try + { + await state.Action(); + } + catch (Exception ex) + { + Console.WriteLine($"[Throttled Action Error - ID: {throttleId}] {ex.GetType().Name}: {ex.Message}"); + } + finally + { + await Task.Delay(interval); + + lock (state.StateLock) + { + if (state.CallScheduledDuringCooldown) + { + state.CallScheduledDuringCooldown = false; + state.CooldownTask = ExecuteAndManageCooldownStaticAsync(throttleId, interval, state); + } + else + { + state.IsCoolingDown = false; + } + } + } + } } \ No newline at end of file diff --git a/Azaion.Common/Services/AnnotationService.cs b/Azaion.Common/Services/AnnotationService.cs index 0f45792..293ad4f 100644 --- a/Azaion.Common/Services/AnnotationService.cs +++ b/Azaion.Common/Services/AnnotationService.cs @@ -65,25 +65,22 @@ public class AnnotationService : INotificationHandler Password = _queueConfig.ConsumerPassword }); - var offset = (ulong)(_api.CurrentUser.UserConfig?.QueueConfig?.AnnotationsOffset ?? 0); + var offsets = _api.CurrentUser.UserConfig?.QueueOffsets ?? new UserQueueOffsets(); _consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE) { Reference = _api.CurrentUser.Email, - OffsetSpec = new OffsetTypeOffset(offset + 1), + OffsetSpec = new OffsetTypeOffset(offsets.AnnotationsOffset + 1), MessageHandler = async (_, _, context, message) => { var msg = MessagePackSerializer.Deserialize(message.Data.Contents); - await _dbFactory.Run(async db => await db.QueueOffsets - .Where(x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE) - .Set(x => x.Offset, context.Offset) - .UpdateAsync(token: cancellationToken)); - await ThrottleExt.Throttle(() => + offsets.AnnotationsOffset = context.Offset; + ThrottleExt.Throttle(() => { - _dbFactory.SaveToDisk(); + _api.UpdateOffsets(offsets); return Task.CompletedTask; - }, TimeSpan.FromSeconds(10), cancellationToken); + }, SaveTaskId, TimeSpan.FromSeconds(10)); if (msg.CreatedEmail == _api.CurrentUser.Email) //Don't process messages by yourself return; @@ -194,11 +191,11 @@ public class AnnotationService : INotificationHandler await _producer.SendToInnerQueue(annotation, token); await _mediator.Publish(new AnnotationCreatedEvent(annotation), token); - await ThrottleExt.Throttle(() => + ThrottleExt.Throttle(async () => { _dbFactory.SaveToDisk(); - return Task.CompletedTask; - }, TimeSpan.FromSeconds(5), token); + await Task.CompletedTask; + }, SaveTaskId, TimeSpan.FromSeconds(5)); return annotation; } diff --git a/Azaion.CommonSecurity/DTO/User.cs b/Azaion.CommonSecurity/DTO/User.cs index 9f6add9..2254574 100644 --- a/Azaion.CommonSecurity/DTO/User.cs +++ b/Azaion.CommonSecurity/DTO/User.cs @@ -10,12 +10,12 @@ public class User public class UserConfig { - public UserQueueOffsets? QueueConfig { get; set; } = new(); + public UserQueueOffsets? QueueOffsets { get; set; } = new(); } public class UserQueueOffsets { - public int AnnotationsOffset { get; set; } - public int AnnotationsConfirmOffset { get; set; } - public int AnnotationsCommandsOffset { get; set; } + public ulong AnnotationsOffset { get; set; } + public ulong AnnotationsConfirmOffset { get; set; } + public ulong AnnotationsCommandsOffset { get; set; } } \ No newline at end of file diff --git a/Azaion.CommonSecurity/Services/AuthProvider.cs b/Azaion.CommonSecurity/Services/AuthProvider.cs index 235a7e4..ca0ade1 100644 --- a/Azaion.CommonSecurity/Services/AuthProvider.cs +++ b/Azaion.CommonSecurity/Services/AuthProvider.cs @@ -10,7 +10,7 @@ public interface IAzaionApi { ApiCredentials Credentials { get; } User CurrentUser { get; } - T? Get(string url); + void UpdateOffsets(UserQueueOffsets offsets); Stream GetResource(string filename); } @@ -28,13 +28,27 @@ public class AzaionApi(HttpClient client, ICache cache, ApiCredentials credentia () => Get("currentUser")); if (user == null) throw new Exception("Can't get current user"); - return user; } - } - private HttpResponseMessage Send(HttpRequestMessage request, CancellationToken ct = default) + public Stream GetResource(string filename) + { + var hardware = cache.GetFromCache(SecurityConstants.HARDWARE_INFO_KEY, hardwareService.GetHardware); + + var response = Send(new HttpRequestMessage(HttpMethod.Post, $"/resources/get/{credentials.Folder}") + { + Content = new StringContent(JsonConvert.SerializeObject(new { filename, credentials.Password, hardware }), Encoding.UTF8, APP_JSON) + }); + return response.Content.ReadAsStream(); + } + + public void UpdateOffsets(UserQueueOffsets offsets) + { + Put($"/users/queue-offsets/{CurrentUser.Email}", offsets); + } + + private HttpResponseMessage Send(HttpRequestMessage request) { if (string.IsNullOrEmpty(_jwtToken)) Authorize(); @@ -61,15 +75,20 @@ public class AzaionApi(HttpClient client, ICache cache, ApiCredentials credentia throw new Exception($"Failed: {response.StatusCode}! Result: {content}"); } - public Stream GetResource(string filename) + private T? Get(string url) { - var hardware = cache.GetFromCache(SecurityConstants.HARDWARE_INFO_KEY, hardwareService.GetHardware); + var response = Send(new HttpRequestMessage(HttpMethod.Get, url)); + var stream = response.Content.ReadAsStream(); + var json = new StreamReader(stream).ReadToEnd(); + return JsonConvert.DeserializeObject(json); + } - var response = Send(new HttpRequestMessage(HttpMethod.Post, $"/resources/get/{credentials.Folder}") + private void Put(string url, T obj) + { + Send(new HttpRequestMessage(HttpMethod.Put, url) { - Content = new StringContent(JsonConvert.SerializeObject(new { filename, credentials.Password, hardware }), Encoding.UTF8, APP_JSON) + Content = new StringContent(JsonConvert.SerializeObject(obj), Encoding.UTF8, APP_JSON) }); - return response.Content.ReadAsStream(); } private void Authorize() @@ -107,11 +126,5 @@ public class AzaionApi(HttpClient client, ICache cache, ApiCredentials credentia } } - public T? Get(string url) - { - var response = Send(new HttpRequestMessage(HttpMethod.Get, url)); - var stream = response.Content.ReadAsStream(); - var json = new StreamReader(stream).ReadToEnd(); - return JsonConvert.DeserializeObject(json); - } + } \ No newline at end of file diff --git a/Azaion.Suite/App.xaml.cs b/Azaion.Suite/App.xaml.cs index 9561987..85ddb8f 100644 --- a/Azaion.Suite/App.xaml.cs +++ b/Azaion.Suite/App.xaml.cs @@ -187,7 +187,7 @@ public partial class App services.ConfigureSection(context.Configuration); services.ConfigureSection(context.Configuration); - services.AddSingleton(); + services.AddSingleton(_inferenceClient); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -238,7 +238,7 @@ public partial class App { var args = (KeyEventArgs)e; var keyEvent = new KeyEvent(sender, args, _formState.ActiveWindow); - _ = ThrottleExt.Throttle(() => _mediator.Publish(keyEvent), TimeSpan.FromMilliseconds(50)); + ThrottleExt.Throttle(() => _mediator.Publish(keyEvent), KeyPressTaskId, TimeSpan.FromMilliseconds(50)); } protected override async void OnExit(ExitEventArgs e) diff --git a/Azaion.Suite/MainSuite.xaml.cs b/Azaion.Suite/MainSuite.xaml.cs index 14d527b..c377f00 100644 --- a/Azaion.Suite/MainSuite.xaml.cs +++ b/Azaion.Suite/MainSuite.xaml.cs @@ -8,7 +8,6 @@ using Azaion.Common.DTO; using Azaion.Common.DTO.Config; using Azaion.Common.Extensions; using Azaion.Common.Services; -using Azaion.CommonSecurity; using Azaion.CommonSecurity.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -130,13 +129,14 @@ public partial class MainSuite } } + private async Task SaveUserSettings() { - await ThrottleExt.Throttle(() => + ThrottleExt.Throttle(() => { _configUpdater.Save(_appConfig); return Task.CompletedTask; - }, TimeSpan.FromSeconds(2)); + }, SaveConfigTaskId, TimeSpan.FromSeconds(2)); } private void OnFormClosed(object? sender, EventArgs e)