mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 10:46:30 +00:00
throttle reimplemented
This commit is contained in:
@@ -1,22 +1,70 @@
|
||||
namespace Azaion.Common.Extensions;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace Azaion.Common.Extensions;
|
||||
|
||||
public static class ThrottleExt
|
||||
{
|
||||
private static readonly Dictionary<Delegate, DateTime> LastExecution = new();
|
||||
private static readonly object Lock = new();
|
||||
|
||||
public static async Task Throttle(this Func<Task> func, TimeSpan interval, CancellationToken ct = default)
|
||||
private class ThrottleState(Func<Task> action)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(func);
|
||||
public Func<Task> Action { get; } = action ?? throw new ArgumentNullException(nameof(action));
|
||||
public bool IsCoolingDown = false;
|
||||
public bool CallScheduledDuringCooldown = false;
|
||||
public Task CooldownTask = Task.CompletedTask;
|
||||
public readonly object StateLock = new();
|
||||
}
|
||||
|
||||
lock (Lock)
|
||||
private static readonly ConcurrentDictionary<Guid, ThrottleState> ThrottlerStates = new();
|
||||
|
||||
public static void Throttle(Func<Task> action, Guid actionId, TimeSpan interval)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(action);
|
||||
if (actionId == Guid.Empty)
|
||||
throw new ArgumentException("Throttle identifier cannot be empty.", nameof(actionId));
|
||||
if (interval <= TimeSpan.Zero)
|
||||
throw new ArgumentOutOfRangeException(nameof(interval), "Interval must be positive.");
|
||||
|
||||
var state = ThrottlerStates.GetOrAdd(actionId, new ThrottleState(action));
|
||||
|
||||
lock (state.StateLock)
|
||||
{
|
||||
if (LastExecution.ContainsKey(func) && DateTime.UtcNow - LastExecution[func] < interval)
|
||||
return;
|
||||
|
||||
func();
|
||||
LastExecution[func] = DateTime.UtcNow;
|
||||
if (!state.IsCoolingDown)
|
||||
{
|
||||
state.IsCoolingDown = true;
|
||||
state.CooldownTask = ExecuteAndManageCooldownStaticAsync(actionId, interval, state);
|
||||
}
|
||||
else
|
||||
{
|
||||
state.CallScheduledDuringCooldown = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task ExecuteAndManageCooldownStaticAsync(Guid throttleId, TimeSpan interval, ThrottleState state)
|
||||
{
|
||||
try
|
||||
{
|
||||
await state.Action();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"[Throttled Action Error - ID: {throttleId}] {ex.GetType().Name}: {ex.Message}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await Task.Delay(interval);
|
||||
|
||||
lock (state.StateLock)
|
||||
{
|
||||
if (state.CallScheduledDuringCooldown)
|
||||
{
|
||||
state.CallScheduledDuringCooldown = false;
|
||||
state.CooldownTask = ExecuteAndManageCooldownStaticAsync(throttleId, interval, state);
|
||||
}
|
||||
else
|
||||
{
|
||||
state.IsCoolingDown = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -65,25 +65,22 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
|
||||
Password = _queueConfig.ConsumerPassword
|
||||
});
|
||||
|
||||
var offset = (ulong)(_api.CurrentUser.UserConfig?.QueueConfig?.AnnotationsOffset ?? 0);
|
||||
var offsets = _api.CurrentUser.UserConfig?.QueueOffsets ?? new UserQueueOffsets();
|
||||
|
||||
_consumer = await Consumer.Create(new ConsumerConfig(consumerSystem, Constants.MQ_ANNOTATIONS_QUEUE)
|
||||
{
|
||||
Reference = _api.CurrentUser.Email,
|
||||
OffsetSpec = new OffsetTypeOffset(offset + 1),
|
||||
OffsetSpec = new OffsetTypeOffset(offsets.AnnotationsOffset + 1),
|
||||
MessageHandler = async (_, _, context, message) =>
|
||||
{
|
||||
var msg = MessagePackSerializer.Deserialize<AnnotationCreatedMessage>(message.Data.Contents);
|
||||
await _dbFactory.Run(async db => await db.QueueOffsets
|
||||
.Where(x => x.QueueName == Constants.MQ_ANNOTATIONS_QUEUE)
|
||||
.Set(x => x.Offset, context.Offset)
|
||||
.UpdateAsync(token: cancellationToken));
|
||||
|
||||
await ThrottleExt.Throttle(() =>
|
||||
offsets.AnnotationsOffset = context.Offset;
|
||||
ThrottleExt.Throttle(() =>
|
||||
{
|
||||
_dbFactory.SaveToDisk();
|
||||
_api.UpdateOffsets(offsets);
|
||||
return Task.CompletedTask;
|
||||
}, TimeSpan.FromSeconds(10), cancellationToken);
|
||||
}, SaveTaskId, TimeSpan.FromSeconds(10));
|
||||
|
||||
if (msg.CreatedEmail == _api.CurrentUser.Email) //Don't process messages by yourself
|
||||
return;
|
||||
@@ -194,11 +191,11 @@ public class AnnotationService : INotificationHandler<AnnotationsDeletedEvent>
|
||||
await _producer.SendToInnerQueue(annotation, token);
|
||||
|
||||
await _mediator.Publish(new AnnotationCreatedEvent(annotation), token);
|
||||
await ThrottleExt.Throttle(() =>
|
||||
ThrottleExt.Throttle(async () =>
|
||||
{
|
||||
_dbFactory.SaveToDisk();
|
||||
return Task.CompletedTask;
|
||||
}, TimeSpan.FromSeconds(5), token);
|
||||
await Task.CompletedTask;
|
||||
}, SaveTaskId, TimeSpan.FromSeconds(5));
|
||||
return annotation;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user