Files
autopilot/crates/telemetry_stream/proto/telemetry.proto
T
Oleksandr Bezdieniezhnykh ff790bd639
ci/woodpecker/push/build-arm Pipeline failed
[AZ-675] telemetry_stream Tonic gRPC server + per-client lossy queue
Pins operator-link transport to gRPC server-streaming (closes
architecture Q2 in favour of gRPC). Adds first-time tonic / prost /
tonic-build infrastructure to the workspace; uses
protoc-bin-vendored so neither dev machines nor CI need system
protoc installed.

Design — back-pressure lives in the per-topic tokio::sync::broadcast
ring, drained directly by the tonic-streamed response via
BroadcastStream + StreamMap. No intermediate mpsc buffer that could
absorb back-pressure invisibly. Slow client overrun -> Lagged(n)
event -> per-(client_id, topic) drop counter incremented; healthy
clients on the same topic are unaffected.

Service surface — Subscribe(SubscribeRequest) -> stream
TelemetryMessage; five topics (TelemetrySample, GimbalState,
DetectionEvent, MovementCandidate, MapObjectsBundle); empty topics
list defaults to subscribe-all; empty client_id rejected; stream
drop decrements subscribed_clients via StreamGuard. TelemetrySink
push_detections is now real; push_frame still NotImplemented(AZ-676
video path).

Tests — 6 unit + 5 integration (AC-1..AC-3 via in-process gRPC
client, plus subscribe-all default + empty-client_id rejection).
Clippy on telemetry_stream clean.

Pre-existing mission_executor ac3 test polling race surfaces more
reliably under the new tonic build pressure; documented as
_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md
and unchanged by this batch.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 12:44:39 +03:00

65 lines
2.5 KiB
Protocol Buffer

// AZ-675 telemetry_stream — operator-bound gRPC contract.
//
// One service, one bi-directional Subscribe RPC. Client opens a stream
// declaring which topics it wants; server pushes messages for those
// topics until the client disconnects.
//
// The server enforces per-client back-pressure: when a client cannot
// keep up the oldest message in *that client's* queue is dropped and
// a per-(client, topic) drop counter is incremented. Other clients
// are unaffected.
//
// AZ-676 will add the video path (separate RPC, server-streamed binary
// frames). AZ-677 will add the MapObjectsBundle snapshot RPC. Keep
// those concerns out of this contract.
syntax = "proto3";
package autopilot.telemetry.v1;
// Topics a client can subscribe to. Repeated in SubscribeRequest so a
// single stream multiplexes whichever subset the client cares about.
enum Topic {
TOPIC_UNSPECIFIED = 0;
TOPIC_TELEMETRY_SAMPLE = 1;
TOPIC_GIMBAL_STATE = 2;
TOPIC_DETECTION_EVENT = 3;
TOPIC_MOVEMENT_CANDIDATE = 4;
TOPIC_MAP_OBJECTS_BUNDLE = 5;
}
message SubscribeRequest {
// Operator/client identifier. Plumbed into per-client drop counters
// and log lines. Free-form for AZ-675; AZ-678 (operator command
// auth) will tighten the format.
string client_id = 1;
repeated Topic topics = 2;
}
// Each message carries the topic tag + an opaque JSON payload. We
// don't pin schemas per topic in proto here because the canonical
// payload shapes are already authoritative in `crates/shared/models`
// (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate,
// MapObjectsBundle). Using JSON keeps the wire honest with what the
// rest of the system already serializes; if a topic later needs
// schema enforcement, add a typed message variant in a future bump.
message TelemetryMessage {
Topic topic = 1;
// Monotonic nanoseconds since the autopilot process started. Used
// by the operator for ordering and latency measurement.
uint64 monotonic_ts_ns = 2;
// Server-side per-client sequence number. Strictly increases per
// (client, topic) stream; gaps imply drops.
uint64 sequence = 3;
// Serialized JSON payload (utf-8). Topic determines the shape.
bytes payload_json = 4;
}
service TelemetryStream {
// Server-streaming subscribe. The client sends ONE SubscribeRequest;
// the server pushes TelemetryMessage values until the client cancels
// the stream or the server shuts down. The server applies per-
// client drop-oldest back-pressure if the client cannot keep up.
rpc Subscribe(SubscribeRequest) returns (stream TelemetryMessage);
}