namespace Azaion.Annotator.Extensions; public class ParallelOptions { public int ProgressUpdateInterval { get; set; } = 100; public Func ProgressFn { get; set; } = null!; public double CpuUtilPercent { get; set; } = 100; } public class ParallelExt { public static async Task ForEachAsync(ICollection source, Func processFn, ParallelOptions? parallelOptions = null, CancellationToken cancellationToken = default) { parallelOptions ??= new ParallelOptions { CpuUtilPercent = 100, ProgressUpdateInterval = 100, ProgressFn = i => { Console.WriteLine($"Processed {i} item by Task {Task.CurrentId}%"); return Task.CompletedTask; } }; var threadsCount = (int)Math.Round(Environment.ProcessorCount * parallelOptions.CpuUtilPercent / 100.0); var processedCount = 0; var chunkSize = Math.Max(1, (int)(source.Count / (decimal)threadsCount)); var chunks = source.Chunk(chunkSize).ToList(); if (chunks.Count > threadsCount) { chunks[^2] = chunks[^2].Concat(chunks.Last()).ToArray(); chunks.RemoveAt(chunks.Count - 1); } var progressUpdateLock = new SemaphoreSlim(1); var tasks = new List(); foreach (var chunk in chunks) { tasks.Add(await Task.Factory.StartNew(async () => { foreach (var item in chunk) { await processFn(item, cancellationToken); Interlocked.Increment(ref processedCount); if (processedCount % parallelOptions.ProgressUpdateInterval == 0 && parallelOptions.ProgressFn != null) _ = Task.Run(async () => { await progressUpdateLock.WaitAsync(cancellationToken); try { await parallelOptions.ProgressFn(processedCount); } finally { progressUpdateLock.Release(); } }, cancellationToken); } }, TaskCreationOptions.LongRunning)); } await Task.WhenAll(tasks); } }