Files
satellite-provider/SatelliteProvider.Services/RegionProcessingService.cs
T
Anton Martynenko 9048a7b3ec cleanup
2025-11-19 18:27:35 +01:00

76 lines
2.3 KiB
C#

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SatelliteProvider.Common.Configs;
using SatelliteProvider.Common.Interfaces;
namespace SatelliteProvider.Services;
public class RegionProcessingService : BackgroundService
{
private readonly IRegionRequestQueue _queue;
private readonly IRegionService _regionService;
private readonly ProcessingConfig _processingConfig;
private readonly ILogger<RegionProcessingService> _logger;
public RegionProcessingService(
IRegionRequestQueue queue,
IRegionService regionService,
IOptions<ProcessingConfig> processingConfig,
ILogger<RegionProcessingService> logger)
{
_queue = queue;
_regionService = regionService;
_processingConfig = processingConfig.Value;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Region Processing Service started with {MaxConcurrent} parallel workers",
_processingConfig.MaxConcurrentRegions);
var workers = new List<Task>();
for (int i = 0; i < _processingConfig.MaxConcurrentRegions; i++)
{
var workerId = i + 1;
workers.Add(ProcessRegionWorkerAsync(workerId, stoppingToken));
}
await Task.WhenAll(workers);
_logger.LogInformation("Region Processing Service stopped");
}
private async Task ProcessRegionWorkerAsync(int workerId, CancellationToken stoppingToken)
{
if (workerId > 1)
{
var startupDelay = Random.Shared.Next(100, 500);
await Task.Delay(startupDelay, stoppingToken);
}
while (!stoppingToken.IsCancellationRequested)
{
try
{
var request = await _queue.DequeueAsync(stoppingToken);
if (request != null)
{
await _regionService.ProcessRegionAsync(request.Id, stoppingToken);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Worker {WorkerId}: Error processing region request", workerId);
}
}
}
}