parallel processing for routes and regions

This commit is contained in:
Anton Martynenko
2025-11-19 13:01:30 +01:00
parent 7f33567632
commit b66d3a0277
8 changed files with 331 additions and 133 deletions
+5 -1
View File
@@ -32,7 +32,11 @@ builder.Services.AddSingleton<GoogleMapsDownloaderV2>();
builder.Services.AddSingleton<ITileService, TileService>(); builder.Services.AddSingleton<ITileService, TileService>();
var processingConfig = builder.Configuration.GetSection("ProcessingConfig").Get<ProcessingConfig>() ?? new ProcessingConfig(); var processingConfig = builder.Configuration.GetSection("ProcessingConfig").Get<ProcessingConfig>() ?? new ProcessingConfig();
builder.Services.AddSingleton<IRegionRequestQueue>(sp => new RegionRequestQueue(processingConfig.QueueCapacity)); builder.Services.AddSingleton<IRegionRequestQueue>(sp =>
{
var logger = sp.GetRequiredService<ILogger<RegionRequestQueue>>();
return new RegionRequestQueue(processingConfig.QueueCapacity, logger);
});
builder.Services.AddSingleton<IRegionService, RegionService>(); builder.Services.AddSingleton<IRegionService, RegionService>();
builder.Services.AddHostedService<RegionProcessingService>(); builder.Services.AddHostedService<RegionProcessingService>();
builder.Services.AddSingleton<IRouteService, RouteService>(); builder.Services.AddSingleton<IRouteService, RouteService>();
+7 -2
View File
@@ -4,7 +4,9 @@
"MinimumLevel": { "MinimumLevel": {
"Default": "Information", "Default": "Information",
"Override": { "Override": {
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning",
"SatelliteProvider.Services.RegionRequestQueue": "Debug",
"SatelliteProvider.Services.GoogleMapsDownloaderV2": "Debug"
} }
}, },
"WriteTo": [ "WriteTo": [
@@ -33,7 +35,10 @@
}, },
"ProcessingConfig": { "ProcessingConfig": {
"MaxConcurrentDownloads": 4, "MaxConcurrentDownloads": 4,
"MaxConcurrentRegions": 3,
"DefaultZoomLevel": 20, "DefaultZoomLevel": 20,
"QueueCapacity": 100 "QueueCapacity": 1000,
"DelayBetweenRequestsMs": 50,
"SessionTokenReuseCount": 100
} }
} }
@@ -3,7 +3,10 @@ namespace SatelliteProvider.Common.Configs;
public class ProcessingConfig public class ProcessingConfig
{ {
public int MaxConcurrentDownloads { get; set; } = 4; public int MaxConcurrentDownloads { get; set; } = 4;
public int MaxConcurrentRegions { get; set; } = 3;
public int DefaultZoomLevel { get; set; } = 20; public int DefaultZoomLevel { get; set; } = 20;
public int QueueCapacity { get; set; } = 100; public int QueueCapacity { get; set; } = 100;
public int DelayBetweenRequestsMs { get; set; } = 50;
public int SessionTokenReuseCount { get; set; } = 100;
} }
@@ -28,18 +28,24 @@ public class GoogleMapsDownloaderV2
private readonly ILogger<GoogleMapsDownloaderV2> _logger; private readonly ILogger<GoogleMapsDownloaderV2> _logger;
private readonly string _apiKey; private readonly string _apiKey;
private readonly StorageConfig _storageConfig; private readonly StorageConfig _storageConfig;
private readonly ProcessingConfig _processingConfig;
private readonly IHttpClientFactory _httpClientFactory; private readonly IHttpClientFactory _httpClientFactory;
private readonly SemaphoreSlim _downloadSemaphore;
private static readonly System.Collections.Concurrent.ConcurrentDictionary<string, Task<DownloadedTileInfoV2>> _activeDownloads = new();
public GoogleMapsDownloaderV2( public GoogleMapsDownloaderV2(
ILogger<GoogleMapsDownloaderV2> logger, ILogger<GoogleMapsDownloaderV2> logger,
IOptions<MapConfig> mapConfig, IOptions<MapConfig> mapConfig,
IOptions<StorageConfig> storageConfig, IOptions<StorageConfig> storageConfig,
IOptions<ProcessingConfig> processingConfig,
IHttpClientFactory httpClientFactory) IHttpClientFactory httpClientFactory)
{ {
_logger = logger; _logger = logger;
_apiKey = mapConfig.Value.ApiKey; _apiKey = mapConfig.Value.ApiKey;
_storageConfig = storageConfig.Value; _storageConfig = storageConfig.Value;
_processingConfig = processingConfig.Value;
_httpClientFactory = httpClientFactory; _httpClientFactory = httpClientFactory;
_downloadSemaphore = new SemaphoreSlim(_processingConfig.MaxConcurrentDownloads, _processingConfig.MaxConcurrentDownloads);
} }
private record SessionResponse(string Session); private record SessionResponse(string Session);
@@ -247,15 +253,13 @@ public class GoogleMapsDownloaderV2
centerGeoPoint.Lat, centerGeoPoint.Lon, radiusM, zoomLevel); centerGeoPoint.Lat, centerGeoPoint.Lon, radiusM, zoomLevel);
_logger.LogInformation("Tile range: X=[{XMin}, {XMax}], Y=[{YMin}, {YMax}]", xMin, xMax, yMin, yMax); _logger.LogInformation("Tile range: X=[{XMin}, {XMax}], Y=[{YMin}, {YMax}]", xMin, xMax, yMin, yMax);
var downloadedTiles = new List<DownloadedTileInfoV2>(); var tilesToDownload = new List<(int x, int y, GeoPoint center, double tileSizeMeters)>();
int skippedCount = 0; int skippedCount = 0;
for (var y = yMin; y <= yMax; y++) for (var y = yMin; y <= yMax; y++)
{ {
for (var x = xMin; x <= xMax; x++) for (var x = xMin; x <= xMax; x++)
{ {
token.ThrowIfCancellationRequested();
var tileCenter = GeoUtils.TileToWorldPos(x, y, zoomLevel); var tileCenter = GeoUtils.TileToWorldPos(x, y, zoomLevel);
var existingTile = existingTiles.FirstOrDefault(t => var existingTile = existingTiles.FirstOrDefault(t =>
@@ -266,15 +270,120 @@ public class GoogleMapsDownloaderV2
if (existingTile != null) if (existingTile != null)
{ {
skippedCount++; skippedCount++;
_logger.LogInformation("Skipping tile ({X}, {Y}) - already exists at {FilePath}", x, y, existingTile.FilePath);
continue; continue;
} }
var tileSizeMeters = CalculateTileSizeInMeters(zoomLevel, tileCenter.Lat); var tileSizeMeters = CalculateTileSizeInMeters(zoomLevel, tileCenter.Lat);
tilesToDownload.Add((x, y, tileCenter, tileSizeMeters));
}
}
_logger.LogInformation("Need to download {Count} tiles (skipped {Skipped} existing), using {MaxConcurrent} parallel downloads",
tilesToDownload.Count, skippedCount, _processingConfig.MaxConcurrentDownloads);
if (tilesToDownload.Count == 0)
{
_logger.LogInformation("All tiles already exist, returning empty list");
return new List<DownloadedTileInfoV2>();
}
_logger.LogInformation("Getting initial session token before starting {Count} downloads", tilesToDownload.Count);
var sessionToken = await GetSessionToken();
_logger.LogInformation("Session token obtained, starting parallel downloads");
var downloadTasks = new List<Task<DownloadedTileInfoV2?>>();
int sessionTokenUsageCount = 0;
for (int i = 0; i < tilesToDownload.Count; i++)
{
var tileInfo = tilesToDownload[i];
if (sessionTokenUsageCount >= _processingConfig.SessionTokenReuseCount)
{
_logger.LogInformation("Session token usage limit reached ({Count}), requesting new token", sessionTokenUsageCount);
sessionToken = await GetSessionToken();
sessionTokenUsageCount = 0;
_logger.LogInformation("New session token obtained, continuing downloads");
}
var currentToken = sessionToken;
var tileIndex = i;
sessionTokenUsageCount++;
var downloadTask = DownloadTileAsync(
tileInfo.x,
tileInfo.y,
tileInfo.center,
tileInfo.tileSizeMeters,
zoomLevel,
currentToken,
tileIndex,
tilesToDownload.Count,
token);
downloadTasks.Add(downloadTask);
}
_logger.LogInformation("All {Count} download tasks created, waiting for completion", downloadTasks.Count);
var results = await Task.WhenAll(downloadTasks);
_logger.LogInformation("Task.WhenAll completed, processing results");
var downloadedTiles = results.Where(r => r != null).Cast<DownloadedTileInfoV2>().ToList();
_logger.LogInformation("Parallel download completed: {Downloaded} tiles downloaded, {Skipped} skipped, {Failed} failed",
downloadedTiles.Count, skippedCount, tilesToDownload.Count - downloadedTiles.Count);
return downloadedTiles;
}
private async Task<DownloadedTileInfoV2?> DownloadTileAsync(
int x,
int y,
GeoPoint tileCenter,
double tileSizeMeters,
int zoomLevel,
string? sessionToken,
int tileIndex,
int totalTiles,
CancellationToken token)
{
var tileKey = $"{zoomLevel}_{x}_{y}";
var downloadTask = _activeDownloads.GetOrAdd(tileKey, _ => PerformDownloadAsync(
x, y, tileCenter, tileSizeMeters, zoomLevel, sessionToken, tileIndex, totalTiles, token));
try try
{ {
var sessionToken = await GetSessionToken(); return await downloadTask;
}
finally
{
_activeDownloads.TryRemove(tileKey, out _);
}
}
private async Task<DownloadedTileInfoV2> PerformDownloadAsync(
int x,
int y,
GeoPoint tileCenter,
double tileSizeMeters,
int zoomLevel,
string? sessionToken,
int tileIndex,
int totalTiles,
CancellationToken token)
{
_logger.LogDebug("Tile ({X},{Y}) [{Index}/{Total}]: Waiting for semaphore slot", x, y, tileIndex + 1, totalTiles);
await _downloadSemaphore.WaitAsync(token);
_logger.LogDebug("Tile ({X},{Y}) [{Index}/{Total}]: Acquired semaphore slot, starting download", x, y, tileIndex + 1, totalTiles);
try
{
if (_processingConfig.DelayBetweenRequestsMs > 0)
{
await Task.Delay(_processingConfig.DelayBetweenRequestsMs, token);
}
var server = (x + y) % 4; var server = (x + y) % 4;
var url = string.Format(TILE_URL_TEMPLATE, server, x, y, zoomLevel, sessionToken); var url = string.Format(TILE_URL_TEMPLATE, server, x, y, zoomLevel, sessionToken);
@@ -294,8 +403,8 @@ public class GoogleMapsDownloaderV2
if (!response.IsSuccessStatusCode) if (!response.IsSuccessStatusCode)
{ {
var errorBody = await response.Content.ReadAsStringAsync(token); var errorBody = await response.Content.ReadAsStringAsync(token);
_logger.LogError("Tile download failed. Tile: ({X}, {Y}), Status: {StatusCode}, URL: {Url}, Response: {Response}", _logger.LogError("Tile download failed. Tile: ({X}, {Y}), Status: {StatusCode}, Response: {Response}",
x, y, response.StatusCode, url, errorBody); x, y, response.StatusCode, errorBody);
} }
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
@@ -305,15 +414,18 @@ public class GoogleMapsDownloaderV2
await File.WriteAllBytesAsync(filePath, imageBytes, token); await File.WriteAllBytesAsync(filePath, imageBytes, token);
_logger.LogInformation("Downloaded tile ({X}, {Y}) to {FilePath}, center=({Lat:F6}, {Lon:F6}), size={Size:F2}m", if ((tileIndex + 1) % 10 == 0 || tileIndex == 0 || tileIndex == totalTiles - 1)
x, y, filePath, tileCenter.Lat, tileCenter.Lon, tileSizeMeters); {
_logger.LogInformation("Progress: {Current}/{Total} tiles downloaded - tile ({X}, {Y})",
tileIndex + 1, totalTiles, x, y);
}
downloadedTiles.Add(new DownloadedTileInfoV2( return new DownloadedTileInfoV2(
x, y, zoomLevel, tileCenter.Lat, tileCenter.Lon, filePath, tileSizeMeters)); x, y, zoomLevel, tileCenter.Lat, tileCenter.Lon, filePath, tileSizeMeters);
} }
catch (TaskCanceledException ex) catch (TaskCanceledException ex)
{ {
_logger.LogError(ex, "Tile download cancelled for ({X}, {Y}). This may be due to HttpClient timeout or explicit cancellation.", x, y); _logger.LogError(ex, "Tile download cancelled for ({X}, {Y})", x, y);
throw; throw;
} }
catch (OperationCanceledException ex) catch (OperationCanceledException ex)
@@ -323,26 +435,25 @@ public class GoogleMapsDownloaderV2
} }
catch (RateLimitException ex) catch (RateLimitException ex)
{ {
_logger.LogError(ex, "Rate limit exceeded for tile ({X}, {Y}). Google Maps API is throttling requests. Consider reducing concurrent requests or adding delays.", x, y); _logger.LogError(ex, "Rate limit exceeded for tile ({X}, {Y})", x, y);
throw; throw;
} }
catch (HttpRequestException ex) catch (HttpRequestException ex)
{ {
_logger.LogError(ex, "HTTP request failed for tile ({X}, {Y}). StatusCode: {StatusCode}, Message: {Message}", _logger.LogError(ex, "HTTP request failed for tile ({X}, {Y}). StatusCode: {StatusCode}",
x, y, ex.StatusCode, ex.Message); x, y, ex.StatusCode);
throw; throw;
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Unexpected error downloading tile ({X}, {Y}). Type: {ExceptionType}, Message: {Message}", _logger.LogError(ex, "Unexpected error downloading tile ({X}, {Y})", x, y);
x, y, ex.GetType().Name, ex.Message);
throw; throw;
} }
} finally
} {
_logger.LogDebug("Tile ({X},{Y}) [{Index}/{Total}]: Releasing semaphore slot", x, y, tileIndex + 1, totalTiles);
_logger.LogInformation("Downloaded {Count} new tiles, skipped {Skipped} existing tiles", downloadedTiles.Count, skippedCount); _downloadSemaphore.Release();
return downloadedTiles; }
} }
} }
@@ -1,5 +1,7 @@
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SatelliteProvider.Common.Configs;
using SatelliteProvider.Common.Interfaces; using SatelliteProvider.Common.Interfaces;
namespace SatelliteProvider.Services; namespace SatelliteProvider.Services;
@@ -8,21 +10,49 @@ public class RegionProcessingService : BackgroundService
{ {
private readonly IRegionRequestQueue _queue; private readonly IRegionRequestQueue _queue;
private readonly IRegionService _regionService; private readonly IRegionService _regionService;
private readonly ProcessingConfig _processingConfig;
private readonly ILogger<RegionProcessingService> _logger; private readonly ILogger<RegionProcessingService> _logger;
public RegionProcessingService( public RegionProcessingService(
IRegionRequestQueue queue, IRegionRequestQueue queue,
IRegionService regionService, IRegionService regionService,
IOptions<ProcessingConfig> processingConfig,
ILogger<RegionProcessingService> logger) ILogger<RegionProcessingService> logger)
{ {
_queue = queue; _queue = queue;
_regionService = regionService; _regionService = regionService;
_processingConfig = processingConfig.Value;
_logger = logger; _logger = logger;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
_logger.LogInformation("Region Processing Service started"); _logger.LogInformation("Region Processing Service started with {MaxConcurrent} parallel workers",
_processingConfig.MaxConcurrentRegions);
var workers = new List<Task>();
for (int i = 0; i < _processingConfig.MaxConcurrentRegions; i++)
{
var workerId = i + 1;
workers.Add(ProcessRegionWorkerAsync(workerId, stoppingToken));
}
await Task.WhenAll(workers);
_logger.LogInformation("Region Processing Service stopped");
}
private async Task ProcessRegionWorkerAsync(int workerId, CancellationToken stoppingToken)
{
if (workerId > 1)
{
var startupDelay = Random.Shared.Next(100, 500);
_logger.LogInformation("Region worker {WorkerId} starting with {Delay}ms delay to reduce contention",
workerId, startupDelay);
await Task.Delay(startupDelay, stoppingToken);
}
_logger.LogInformation("Region worker {WorkerId} started", workerId);
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
{ {
@@ -32,9 +62,13 @@ public class RegionProcessingService : BackgroundService
if (request != null) if (request != null)
{ {
_logger.LogInformation("Dequeued region request {RegionId}", request.Id); _logger.LogInformation("Worker {WorkerId}: Dequeued region request {RegionId}",
workerId, request.Id);
await _regionService.ProcessRegionAsync(request.Id, stoppingToken); await _regionService.ProcessRegionAsync(request.Id, stoppingToken);
_logger.LogInformation("Worker {WorkerId}: Completed region {RegionId}",
workerId, request.Id);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -43,11 +77,11 @@ public class RegionProcessingService : BackgroundService
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error processing region request"); _logger.LogError(ex, "Worker {WorkerId}: Error processing region request", workerId);
} }
} }
_logger.LogInformation("Region Processing Service stopped"); _logger.LogInformation("Region worker {WorkerId} stopped", workerId);
} }
} }
@@ -1,4 +1,5 @@
using System.Threading.Channels; using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using SatelliteProvider.Common.DTO; using SatelliteProvider.Common.DTO;
using SatelliteProvider.Common.Interfaces; using SatelliteProvider.Common.Interfaces;
@@ -7,19 +8,33 @@ namespace SatelliteProvider.Services;
public class RegionRequestQueue : IRegionRequestQueue public class RegionRequestQueue : IRegionRequestQueue
{ {
private readonly Channel<RegionRequest> _queue; private readonly Channel<RegionRequest> _queue;
private readonly ILogger<RegionRequestQueue>? _logger;
private int _totalEnqueued = 0;
private int _totalDequeued = 0;
public RegionRequestQueue(int capacity) public RegionRequestQueue(int capacity, ILogger<RegionRequestQueue>? logger = null)
{ {
var options = new BoundedChannelOptions(capacity) var options = new BoundedChannelOptions(capacity)
{ {
FullMode = BoundedChannelFullMode.Wait FullMode = BoundedChannelFullMode.Wait
}; };
_queue = Channel.CreateBounded<RegionRequest>(options); _queue = Channel.CreateBounded<RegionRequest>(options);
_logger = logger;
_logger?.LogInformation("RegionRequestQueue created with capacity {Capacity}", capacity);
} }
public async ValueTask EnqueueAsync(RegionRequest request, CancellationToken cancellationToken = default) public async ValueTask EnqueueAsync(RegionRequest request, CancellationToken cancellationToken = default)
{ {
var queueDepthBefore = Count;
_totalEnqueued++;
_logger?.LogDebug("Enqueuing region {RegionId} (queue depth before: {Depth}, total enqueued: {Total})",
request.Id, queueDepthBefore, _totalEnqueued);
await _queue.Writer.WriteAsync(request, cancellationToken); await _queue.Writer.WriteAsync(request, cancellationToken);
var queueDepthAfter = Count;
_logger?.LogDebug("Enqueued region {RegionId} (queue depth after: {Depth})",
request.Id, queueDepthAfter);
} }
public async ValueTask<RegionRequest?> DequeueAsync(CancellationToken cancellationToken = default) public async ValueTask<RegionRequest?> DequeueAsync(CancellationToken cancellationToken = default)
@@ -28,6 +43,10 @@ public class RegionRequestQueue : IRegionRequestQueue
{ {
if (_queue.Reader.TryRead(out var request)) if (_queue.Reader.TryRead(out var request))
{ {
_totalDequeued++;
var queueDepth = Count;
_logger?.LogDebug("Dequeued region {RegionId} (queue depth: {Depth}, total dequeued: {Total})",
request.Id, queueDepth, _totalDequeued);
return request; return request;
} }
} }
+10 -7
View File
@@ -79,20 +79,22 @@ public class RegionService : IRegionService
public async Task ProcessRegionAsync(Guid id, CancellationToken cancellationToken = default) public async Task ProcessRegionAsync(Guid id, CancellationToken cancellationToken = default)
{ {
_logger.LogInformation("Processing region {RegionId}", id); _logger.LogInformation("Processing region {RegionId} - START", id);
var startTime = DateTime.UtcNow; var startTime = DateTime.UtcNow;
var region = await _regionRepository.GetByIdAsync(id); var region = await _regionRepository.GetByIdAsync(id);
if (region == null) if (region == null)
{ {
_logger.LogWarning("Region {RegionId} not found", id); _logger.LogWarning("Region {RegionId} not found in database", id);
return; return;
} }
_logger.LogInformation("Region {RegionId}: Updating status to 'processing' (was: {OldStatus})", id, region.Status);
region.Status = "processing"; region.Status = "processing";
region.UpdatedAt = DateTime.UtcNow; region.UpdatedAt = DateTime.UtcNow;
await _regionRepository.UpdateAsync(region); await _regionRepository.UpdateAsync(region);
_logger.LogInformation("Region {RegionId}: Creating timeout CTS (5 minutes)", id);
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
@@ -103,32 +105,33 @@ public class RegionService : IRegionService
try try
{ {
_logger.LogInformation("Downloading tiles for region {RegionId} at ({Lat}, {Lon}) size {Size}m zoom {Zoom}", _logger.LogInformation("Region {RegionId}: Step 1 - Starting download for location ({Lat}, {Lon}) size {Size}m zoom {Zoom}",
id, region.Latitude, region.Longitude, region.SizeMeters, region.ZoomLevel); id, region.Latitude, region.Longitude, region.SizeMeters, region.ZoomLevel);
var processingStartTime = DateTime.UtcNow; var processingStartTime = DateTime.UtcNow;
_logger.LogInformation("Checking for existing tiles in region {RegionId}", id); _logger.LogInformation("Region {RegionId}: Step 2 - Checking for existing tiles", id);
var tilesBeforeDownload = await _tileService.GetTilesByRegionAsync( var tilesBeforeDownload = await _tileService.GetTilesByRegionAsync(
region.Latitude, region.Latitude,
region.Longitude, region.Longitude,
region.SizeMeters, region.SizeMeters,
region.ZoomLevel); region.ZoomLevel);
var existingTileIds = new HashSet<Guid>(tilesBeforeDownload.Select(t => t.Id)); var existingTileIds = new HashSet<Guid>(tilesBeforeDownload.Select(t => t.Id));
_logger.LogInformation("Found {Count} existing tiles for region {RegionId}", existingTileIds.Count, id); _logger.LogInformation("Region {RegionId}: Step 3 - Found {Count} existing tiles", id, existingTileIds.Count);
_logger.LogInformation("Starting tile download for region {RegionId}", id); _logger.LogInformation("Region {RegionId}: Step 4 - Calling DownloadAndStoreTilesAsync", id);
tiles = await _tileService.DownloadAndStoreTilesAsync( tiles = await _tileService.DownloadAndStoreTilesAsync(
region.Latitude, region.Latitude,
region.Longitude, region.Longitude,
region.SizeMeters, region.SizeMeters,
region.ZoomLevel, region.ZoomLevel,
linkedCts.Token); linkedCts.Token);
_logger.LogInformation("Region {RegionId}: Step 5 - DownloadAndStoreTilesAsync completed with {Count} tiles", id, tiles.Count);
tilesDownloaded = tiles.Count(t => !existingTileIds.Contains(t.Id)); tilesDownloaded = tiles.Count(t => !existingTileIds.Contains(t.Id));
tilesReused = tiles.Count(t => existingTileIds.Contains(t.Id)); tilesReused = tiles.Count(t => existingTileIds.Contains(t.Id));
_logger.LogInformation("Region {RegionId}: Downloaded {Downloaded} tiles, Reused {Reused} tiles", _logger.LogInformation("Region {RegionId}: Step 6 - Downloaded {Downloaded} new tiles, Reused {Reused} existing tiles",
id, tilesDownloaded, tilesReused); id, tilesDownloaded, tilesReused);
var readyDir = _storageConfig.ReadyDirectory; var readyDir = _storageConfig.ReadyDirectory;
@@ -97,76 +97,95 @@ public class RouteProcessingService : BackgroundService
using var scope = _serviceProvider.CreateScope(); using var scope = _serviceProvider.CreateScope();
var regionService = scope.ServiceProvider.GetRequiredService<Common.Interfaces.IRegionService>(); var regionService = scope.ServiceProvider.GetRequiredService<Common.Interfaces.IRegionService>();
_logger.LogInformation("Route {RouteId}: Starting sequential region processing for {PointCount} points", _logger.LogInformation("Route {RouteId}: Starting parallel region processing for {PointCount} points",
routeId, routePointsList.Count); routeId, routePointsList.Count);
var firstPoint = routePointsList.First(); var queuedRegionIds = new List<Guid>();
foreach (var point in routePointsList)
{
var regionId = Guid.NewGuid(); var regionId = Guid.NewGuid();
await regionService.RequestRegionAsync( await regionService.RequestRegionAsync(
regionId, regionId,
firstPoint.Latitude, point.Latitude,
firstPoint.Longitude, point.Longitude,
route.RegionSizeMeters, route.RegionSizeMeters,
route.ZoomLevel, route.ZoomLevel,
stitchTiles: false); stitchTiles: false);
await _routeRepository.LinkRouteToRegionAsync(routeId, regionId); await _routeRepository.LinkRouteToRegionAsync(routeId, regionId);
queuedRegionIds.Add(regionId);
}
_logger.LogInformation("Route {RouteId}: Queued first region {RegionId} (1/{TotalPoints})", _logger.LogInformation("Route {RouteId}: Queued all {Count} regions for parallel processing. Region IDs: {RegionIds}",
routeId, regionId, routePointsList.Count); routeId, queuedRegionIds.Count, string.Join(", ", queuedRegionIds.Take(5)) + (queuedRegionIds.Count > 5 ? "..." : ""));
return; return;
} }
var lastRegion = await _regionRepository.GetByIdAsync(regionIdsList.Last()); var regions = new List<DataAccess.Models.RegionEntity>();
if (lastRegion == null) foreach (var regionId in regionIdsList)
{ {
var region = await _regionRepository.GetByIdAsync(regionId);
if (region != null)
{
regions.Add(region);
}
}
var completedRegions = regions.Where(r => r.Status == "completed").ToList();
var failedRegions = regions.Where(r => r.Status == "failed").ToList();
var processingRegions = regions.Where(r => r.Status == "queued" || r.Status == "processing").ToList();
var hasEnoughCompleted = completedRegions.Count >= routePointsList.Count;
var activeRegions = completedRegions.Count + processingRegions.Count;
var shouldRetryFailed = failedRegions.Count > 0 && !hasEnoughCompleted && activeRegions < routePointsList.Count;
if (hasEnoughCompleted)
{
_logger.LogInformation("Route {RouteId}: Have {Completed} completed regions (required: {Required}). Generating final maps. Ignoring {Processing} processing and {Failed} failed regions.",
routeId, completedRegions.Count, routePointsList.Count, processingRegions.Count, failedRegions.Count);
await GenerateRouteMapsAsync(routeId, route, completedRegions.Take(routePointsList.Count).Select(r => r.Id), cancellationToken);
return; return;
} }
if (lastRegion.Status == "queued" || lastRegion.Status == "processing") if (shouldRetryFailed)
{ {
return; _logger.LogWarning("Route {RouteId}: {FailedCount} region(s) failed: {FailedRegions}. Active regions: {ActiveCount}/{RequiredCount}. Attempting to retry with new regions.",
} routeId, failedRegions.Count, string.Join(", ", failedRegions.Select(r => r.Id)), activeRegions, routePointsList.Count);
if (lastRegion.Status == "failed")
{
_logger.LogError("Route {RouteId}: Region {RegionId} failed. Stopping route processing.",
routeId, lastRegion.Id);
route.MapsReady = false;
route.UpdatedAt = DateTime.UtcNow;
await _routeRepository.UpdateRouteAsync(route);
return;
}
if (regionIdsList.Count < routePointsList.Count)
{
using var scope = _serviceProvider.CreateScope(); using var scope = _serviceProvider.CreateScope();
var regionService = scope.ServiceProvider.GetRequiredService<Common.Interfaces.IRegionService>(); var regionService = scope.ServiceProvider.GetRequiredService<Common.Interfaces.IRegionService>();
var nextPoint = routePointsList[regionIdsList.Count]; foreach (var failedRegion in failedRegions)
var regionId = Guid.NewGuid(); {
var newRegionId = Guid.NewGuid();
_logger.LogInformation("Route {RouteId}: Retrying failed region {OldRegionId} with new region {NewRegionId}",
routeId, failedRegion.Id, newRegionId);
await regionService.RequestRegionAsync( await regionService.RequestRegionAsync(
regionId, newRegionId,
nextPoint.Latitude, failedRegion.Latitude,
nextPoint.Longitude, failedRegion.Longitude,
route.RegionSizeMeters, failedRegion.SizeMeters,
route.ZoomLevel, failedRegion.ZoomLevel,
stitchTiles: false); stitchTiles: false);
await _routeRepository.LinkRouteToRegionAsync(routeId, regionId); await _routeRepository.LinkRouteToRegionAsync(routeId, newRegionId);
}
_logger.LogInformation("Route {RouteId}: Queued next region {RegionId} ({CurrentRegion}/{TotalPoints})", _logger.LogInformation("Route {RouteId}: Queued {Count} retry regions", routeId, failedRegions.Count);
routeId, regionId, regionIdsList.Count + 1, routePointsList.Count);
return; return;
} }
_logger.LogInformation("Route {RouteId}: All {RegionCount} regions completed, generating final maps", var anyProcessing = processingRegions.Count > 0;
routeId, regionIdsList.Count); if (anyProcessing)
{
await GenerateRouteMapsAsync(routeId, route, regionIdsList, cancellationToken); _logger.LogInformation("Route {RouteId}: Progress - {Completed}/{Required} regions completed, {Processing} still processing, {Failed} failed (will retry if needed)",
routeId, completedRegions.Count, routePointsList.Count, processingRegions.Count, failedRegions.Count);
return;
}
} }
private async Task GenerateRouteMapsAsync( private async Task GenerateRouteMapsAsync(