Files
satellite-provider/SatelliteProvider.Services.RouteManagement/TileProvision/RouteTileDeliveryOrchestrator.cs
T
Oleksandr Bezdieniezhnykh 275ee1b554
ci/woodpecker/push/01-test Pipeline failed
ci/woodpecker/push/02-build-push unknown status
Add TileProvision configuration and gRPC service for tile delivery
- Introduced new TileProvision settings in appsettings.json, including MaxTilesPerBatch and ProgressEmitIntervalSeconds.
- Configured TileProvisionConfig in Program.cs to bind the new settings.
- Added gRPC service for RouteTileDelivery in Program.cs to handle tile delivery requests.
- Updated SatelliteProvider.Api.csproj to include Grpc.AspNetCore package and added protobuf file for tile provision.
- Enhanced AuthenticationServiceCollectionExtensions to handle JWT token extraction from the Authorization header.
- Registered additional services in RouteManagementServiceCollectionExtensions for tile processing.

These changes enhance the API's capability to manage tile provisioning and delivery efficiently.
2026-06-23 13:18:59 +03:00

337 lines
12 KiB
C#

using System.Security.Cryptography;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SatelliteProvider.Common.Configs;
using SatelliteProvider.Common.Interfaces;
using SatelliteProvider.Common.Utils;
using SatelliteProvider.DataAccess.Models;
using SatelliteProvider.DataAccess.Repositories;
namespace SatelliteProvider.Services.RouteManagement.TileProvision;
public sealed class RouteTileDeliveryOrchestrator
{
private readonly RouteTileExpander _expander;
private readonly ITileRepository _tileRepository;
private readonly ITileService _tileService;
private readonly MapConfig _mapConfig;
private readonly ProcessingConfig _processingConfig;
private readonly TileProvisionConfig _provisionConfig;
private readonly ILogger<RouteTileDeliveryOrchestrator> _logger;
public RouteTileDeliveryOrchestrator(
RouteTileExpander expander,
ITileRepository tileRepository,
ITileService tileService,
IOptions<MapConfig> mapConfig,
IOptions<ProcessingConfig> processingConfig,
IOptions<TileProvisionConfig> provisionConfig,
ILogger<RouteTileDeliveryOrchestrator> logger)
{
_expander = expander;
_tileRepository = tileRepository;
_tileService = tileService;
_mapConfig = mapConfig.Value;
_processingConfig = processingConfig.Value;
_provisionConfig = provisionConfig.Value;
_logger = logger;
}
public async Task DeliverAsync(
RouteTileDeliveryJob job,
IRouteTileDeliverySink sink,
CancellationToken cancellationToken)
{
ValidateJob(job);
var jobStartedAt = DateTime.UtcNow;
var candidates = _expander.Expand(
job.Waypoints,
job.RegionSizeMeters,
job.Zoom,
job.GeofenceVertices,
job.IncludeGeofenceTiles);
var clientIndex = ClientTileCatalog.IndexByZxy(job.ClientTiles);
var locationHashes = candidates
.Select(c => Uuidv5.LocationHashForTile(c.Z, c.X, c.Y))
.Distinct()
.ToArray();
var dbTiles = await _tileRepository.GetTilesByLocationHashesAsync(locationHashes);
var workItems = new List<TileWorkItem>(candidates.Count);
var skippedByClient = 0u;
foreach (var candidate in candidates)
{
var hash = Uuidv5.LocationHashForTile(candidate.Z, candidate.X, candidate.Y);
dbTiles.TryGetValue(hash, out var dbTile);
var prospect = BuildServerProspect(candidate, dbTile, jobStartedAt);
clientIndex.TryGetValue((candidate.Z, candidate.X, candidate.Y), out var clientTile);
if (ClientTileCatalog.ShouldSkipForClient(clientTile, prospect))
{
skippedByClient++;
continue;
}
workItems.Add(new TileWorkItem(candidate, dbTile));
}
var totalCandidates = (uint)candidates.Count;
var toDeliver = (uint)workItems.Count;
await sink.WriteManifestAsync(totalCandidates, skippedByClient, toDeliver, cancellationToken);
if (toDeliver == 0)
{
await sink.WriteCompleteAsync(0, skippedByClient, 0, cancellationToken);
return;
}
var delivered = 0u;
var batchSeq = 0u;
var pendingBatch = new List<PreparedTileDelivery>(_provisionConfig.MaxTilesPerBatch);
async Task FlushBatchAsync()
{
if (pendingBatch.Count == 0)
{
return;
}
pendingBatch.Sort((a, b) => a.Candidate.RoutePriority.CompareTo(b.Candidate.RoutePriority));
await sink.WriteBatchAsync(batchSeq++, pendingBatch.ToList(), cancellationToken);
delivered += (uint)pendingBatch.Count;
pendingBatch.Clear();
}
var cachedItems = workItems.Where(w => w.DbTile is not null).ToList();
var missingItems = workItems.Where(w => w.DbTile is null).ToList();
foreach (var item in cachedItems.OrderBy(w => w.Candidate.RoutePriority))
{
cancellationToken.ThrowIfCancellationRequested();
var prepared = await TryPrepareFromDatabaseAsync(item.Candidate, item.DbTile!, cancellationToken);
if (prepared is null)
{
missingItems.Add(item);
continue;
}
pendingBatch.Add(prepared);
if (pendingBatch.Count >= _provisionConfig.MaxTilesPerBatch)
{
await FlushBatchAsync();
}
}
await FlushBatchAsync();
if (missingItems.Count > 0)
{
var downloadChannel = System.Threading.Channels.Channel.CreateBounded<PreparedTileDelivery>(
new System.Threading.Channels.BoundedChannelOptions(_provisionConfig.MaxTilesPerBatch * 2)
{
FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
});
var inFlight = 0;
var lastProgressAt = DateTime.UtcNow;
async Task EmitProgressIfDueAsync()
{
var now = DateTime.UtcNow;
if ((now - lastProgressAt).TotalSeconds < _provisionConfig.ProgressEmitIntervalSeconds)
{
return;
}
lastProgressAt = now;
await sink.WriteProgressAsync(delivered, toDeliver, (uint)Volatile.Read(ref inFlight), cancellationToken);
}
var downloadLimiter = new SemaphoreSlim(
_processingConfig.MaxConcurrentDownloads,
_processingConfig.MaxConcurrentDownloads);
try
{
var producer = Task.Run(async () =>
{
try
{
var tasks = missingItems
.OrderBy(w => w.Candidate.RoutePriority)
.Select(async item =>
{
await downloadLimiter.WaitAsync(cancellationToken);
Interlocked.Increment(ref inFlight);
try
{
var prepared = await TryDownloadAndPrepareAsync(item.Candidate, cancellationToken);
if (prepared is not null)
{
await downloadChannel.Writer.WriteAsync(prepared, cancellationToken);
}
}
finally
{
Interlocked.Decrement(ref inFlight);
downloadLimiter.Release();
}
});
await Task.WhenAll(tasks);
}
finally
{
downloadChannel.Writer.Complete();
}
}, cancellationToken);
await foreach (var prepared in downloadChannel.Reader.ReadAllAsync(cancellationToken))
{
await EmitProgressIfDueAsync();
pendingBatch.Add(prepared);
if (pendingBatch.Count >= _provisionConfig.MaxTilesPerBatch)
{
await FlushBatchAsync();
}
}
await producer;
}
finally
{
downloadLimiter.Dispose();
}
}
await FlushBatchAsync();
var serverFiltered = toDeliver - delivered;
await sink.WriteCompleteAsync(delivered, skippedByClient, serverFiltered, cancellationToken);
}
private void ValidateJob(RouteTileDeliveryJob job)
{
if (job.RouteId == Guid.Empty)
{
throw new ArgumentException("route_id must be a non-empty UUID", nameof(job));
}
if (job.Waypoints.Count < 2)
{
throw new ArgumentException("Route must have at least 2 waypoints", nameof(job));
}
if (job.RegionSizeMeters <= 0)
{
throw new ArgumentOutOfRangeException(nameof(job), "region_size_meters must be positive");
}
if (!_mapConfig.AllowedZoomLevels.Contains(job.Zoom))
{
throw new ArgumentException(
$"zoom {job.Zoom} is not allowed. Allowed: {string.Join(", ", _mapConfig.AllowedZoomLevels)}",
nameof(job));
}
}
private ServerTileProspect BuildServerProspect(
RouteTileCandidate candidate,
TileEntity? dbTile,
DateTime jobStartedAtUtc)
{
var center = GeoUtils.TileToWorldPos(candidate.X, candidate.Y, candidate.Z);
var resolution = TileResolutionHelper.ResolutionMetersPerPixel(
candidate.Z,
center.Lat,
_mapConfig.TileSizePixels);
if (dbTile is null)
{
return new ServerTileProspect(resolution, jobStartedAtUtc, null);
}
return new ServerTileProspect(
dbTile.TileSizePixels > 0 ? dbTile.TileSizeMeters / dbTile.TileSizePixels : resolution,
dbTile.CapturedAt,
dbTile.ContentSha256);
}
private async Task<PreparedTileDelivery?> TryPrepareFromDatabaseAsync(
RouteTileCandidate candidate,
TileEntity dbTile,
CancellationToken cancellationToken)
{
if (!File.Exists(dbTile.FilePath))
{
return null;
}
try
{
var jpeg = await File.ReadAllBytesAsync(dbTile.FilePath, cancellationToken);
var hash = dbTile.ContentSha256 is { Length: 32 } existingHash
? existingHash
: SHA256.HashData(jpeg);
var resolution = dbTile.TileSizePixels > 0
? dbTile.TileSizeMeters / dbTile.TileSizePixels
: TileResolutionHelper.ResolutionMetersPerPixel(
candidate.Z,
dbTile.Latitude,
_mapConfig.TileSizePixels);
return new PreparedTileDelivery(
candidate,
jpeg,
hash,
resolution,
dbTile.CapturedAt,
dbTile.Source);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to read cached tile ({Z}, {X}, {Y})", candidate.Z, candidate.X, candidate.Y);
return null;
}
}
private async Task<PreparedTileDelivery?> TryDownloadAndPrepareAsync(
RouteTileCandidate candidate,
CancellationToken cancellationToken)
{
try
{
var center = GeoUtils.TileToWorldPos(candidate.X, candidate.Y, candidate.Z);
await _tileService.DownloadAndStoreSingleTileAsync(
center.Lat,
center.Lon,
candidate.Z,
cancellationToken);
var dbTile = await _tileRepository.GetByTileCoordinatesAsync(candidate.Z, candidate.X, candidate.Y);
if (dbTile is null || !File.Exists(dbTile.FilePath))
{
return null;
}
return await TryPrepareFromDatabaseAsync(candidate, dbTile, cancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to download tile ({Z}, {X}, {Y}) for route delivery",
candidate.Z,
candidate.X,
candidate.Y);
return null;
}
}
private sealed record TileWorkItem(RouteTileCandidate Candidate, TileEntity? DbTile);
}