Files
satellite-provider/SatelliteProvider.Api/Grpc/RouteTileDeliveryGrpcService.cs
T
Oleksandr Bezdieniezhnykh 7ed780b063
ci/woodpecker/push/01-test Pipeline failed
ci/woodpecker/push/02-build-push unknown status
[AZ-1074] [AZ-1075] Cycle 9 closeout: security, tests, metrics
Resolve F-AZ1074-1/2 (collection caps, generic gRPC internal errors).
Standalone integration compose stack, docs, security audit, perf and retro.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-25 17:32:14 +03:00

204 lines
7.1 KiB
C#

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.AspNetCore.Authorization;
using Satellite.V1;
using SatelliteProvider.Services.RouteManagement.TileProvision;
namespace SatelliteProvider.Api.Grpc;
[Authorize]
public sealed class RouteTileDeliveryGrpcService : RouteTileDelivery.RouteTileDeliveryBase
{
private readonly RouteTileDeliveryOrchestrator _orchestrator;
private readonly ILogger<RouteTileDeliveryGrpcService> _logger;
public RouteTileDeliveryGrpcService(
RouteTileDeliveryOrchestrator orchestrator,
ILogger<RouteTileDeliveryGrpcService> logger)
{
_orchestrator = orchestrator;
_logger = logger;
}
public override async Task DeliverRouteTiles(
DeliverRouteTilesRequest request,
IServerStreamWriter<RouteTileEvent> responseStream,
ServerCallContext context)
{
if (request.Route is null)
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "route is required"));
}
if (!Guid.TryParse(request.Route.RouteId, out var routeId))
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "route_id must be a UUID"));
}
var job = MapJob(request, routeId);
var sink = new GrpcRouteTileDeliverySink(responseStream, context.CancellationToken);
try
{
await _orchestrator.DeliverAsync(job, sink, context.CancellationToken);
}
catch (ArgumentException ex)
{
_logger.LogWarning(ex, "Invalid route tile delivery request for route {RouteId}", routeId);
throw new RpcException(new Status(StatusCode.InvalidArgument, ex.Message));
}
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Route tile delivery failed for route {RouteId}", routeId);
await WriteErrorAsync(responseStream, "INTERNAL_ERROR", "An internal error occurred.", retryable: true, context.CancellationToken);
}
}
private static RouteTileDeliveryJob MapJob(DeliverRouteTilesRequest request, Guid routeId)
{
var route = request.Route!;
var waypoints = route.Waypoints
.Select(w => (w.Lat, w.Lon))
.ToList();
var geofences = route.Geofences
.Select(polygon => (IReadOnlyList<(double Lat, double Lon)>)polygon.Vertices
.Select(v => (v.Lat, v.Lon))
.ToList())
.ToList();
var clientTiles = request.ClientTiles
.Select(MapClientTile)
.ToList();
return new RouteTileDeliveryJob(
routeId,
waypoints,
route.RegionSizeMeters,
route.Zoom,
geofences,
route.IncludeGeofenceTiles,
clientTiles);
}
private static ClientTileSnapshot MapClientTile(ClientTileRecord record)
{
var capturedAt = record.CapturedAt?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
byte[]? hash = record.ContentSha256 is { Length: > 0 } sha ? sha.ToByteArray() : null;
return new ClientTileSnapshot(
record.Z,
record.X,
record.Y,
record.ResolutionMPerPx,
capturedAt,
hash);
}
private static async Task WriteErrorAsync(
IServerStreamWriter<RouteTileEvent> responseStream,
string code,
string message,
bool retryable,
CancellationToken cancellationToken)
{
await responseStream.WriteAsync(new RouteTileEvent
{
Error = new DeliveryError
{
Code = code,
Message = message,
Retryable = retryable,
},
}, cancellationToken);
}
private sealed class GrpcRouteTileDeliverySink : IRouteTileDeliverySink
{
private readonly IServerStreamWriter<RouteTileEvent> _stream;
public GrpcRouteTileDeliverySink(IServerStreamWriter<RouteTileEvent> stream, CancellationToken cancellationToken)
{
_stream = stream;
_ = cancellationToken;
}
public async ValueTask WriteManifestAsync(uint totalCandidates, uint skippedByClient, uint toDeliver, CancellationToken cancellationToken)
{
await _stream.WriteAsync(new RouteTileEvent
{
Manifest = new RouteManifest
{
TotalCandidates = totalCandidates,
SkippedByClient = skippedByClient,
ToDeliver = toDeliver,
},
}, cancellationToken);
}
public async ValueTask WriteBatchAsync(uint batchSeq, IReadOnlyList<PreparedTileDelivery> tiles, CancellationToken cancellationToken)
{
var batch = new TileBatch { BatchSeq = batchSeq };
foreach (var tile in tiles)
{
batch.Tiles.Add(new TilePayload
{
Z = tile.Candidate.Z,
X = tile.Candidate.X,
Y = tile.Candidate.Y,
ResolutionMPerPx = tile.ResolutionMetersPerPx,
CapturedAt = Timestamp.FromDateTime(DateTime.SpecifyKind(tile.CapturedAtUtc, DateTimeKind.Utc)),
Source = tile.Source,
Jpeg = Google.Protobuf.ByteString.CopyFrom(tile.Jpeg),
ContentSha256 = Google.Protobuf.ByteString.CopyFrom(tile.ContentSha256),
RoutePriority = tile.Candidate.RoutePriority,
});
}
await _stream.WriteAsync(new RouteTileEvent { Batch = batch }, cancellationToken);
}
public async ValueTask WriteProgressAsync(uint delivered, uint total, uint downloading, CancellationToken cancellationToken)
{
await _stream.WriteAsync(new RouteTileEvent
{
Progress = new ProgressUpdate
{
Delivered = delivered,
Total = total,
Downloading = downloading,
},
}, cancellationToken);
}
public async ValueTask WriteCompleteAsync(uint delivered, uint skippedClient, uint skippedServerFilter, CancellationToken cancellationToken)
{
await _stream.WriteAsync(new RouteTileEvent
{
Complete = new DeliveryComplete
{
Delivered = delivered,
SkippedClient = skippedClient,
SkippedServerFilter = skippedServerFilter,
},
}, cancellationToken);
}
public async ValueTask WriteErrorAsync(string code, string message, bool retryable, CancellationToken cancellationToken)
{
await _stream.WriteAsync(new RouteTileEvent
{
Error = new DeliveryError
{
Code = code,
Message = message,
Retryable = retryable,
},
}, cancellationToken);
}
}
}