Files
autopilot/crates/telemetry_stream/proto/telemetry.proto
T
Oleksandr Bezdieniezhnykh ccf929af69 [AZ-676] [AZ-677] [AZ-678] [AZ-679] telemetry+operator foundation
Batch 15 ships the four foundation tickets sitting on top of AZ-675
(gRPC server) and AZ-667 (mapobjects_store hydrate):

* AZ-676: telemetry_stream video path (rtsp_forward + bytes_inline)
  with ai_locked atomic + session counter, SubscribeVideo RPC.
* AZ-677: MapObjects snapshot-on-subscribe + diff broadcast +
  reconnect-resync (StartThen stream-prepend pattern).
* AZ-678: HmacOperatorValidator with per-session monotonic seq,
  in-process session registry + TTL, constant-time HMAC compare,
  rejection-reason counters, sliding 60 s sig-failure red-health gate.
  Trait OperatorCommandValidator in shared::contracts::operator_auth.
* AZ-679: PoiSurfaceMapper produces OperatorPoiEvent per architecture
  §7.10; PoiDequeued events on rotate/age-out/complete; pushed via
  new TelemetrySink::push_operator_event extension on Topic::OperatorEvent.

Cross-task wiring: TelemetrySink trait extended with
push_operator_event; OperatorBridge gets optional builder methods
with_telemetry_sink / with_validator (composition root wires in
AZ-680). Workspace deps: hmac = "0.12"; per-crate adds bytes,
serde_json, parking_lot, chrono, uuid, sha2, thiserror.

Tests: 14/14 ACs verified locally (4 + 3 + 5 + 3 by AC) plus
6 supporting unit tests + 7 integration tests + 2 shared serde
roundtrips. cargo clippy clean on touched crates. Cumulative
review for batches 13-15 produced; verdict PASS_WITH_WARNINGS
(0 Critical, 0 High, 1 Medium, 4 Low — all carry-overs or
deferred-producer notes for AZ-680/AZ-684).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 16:18:40 +03:00

135 lines
5.0 KiB
Protocol Buffer

// AZ-675 telemetry_stream — operator-bound gRPC contract.
//
// One Subscribe RPC multiplexes structured topics (telemetry, gimbal,
// detection, movement, MapObjects). Video is carried by a dedicated
// SubscribeVideo RPC because frame payloads are binary, large, and
// don't share the JSON-broadcast model the structured topics use.
//
// The Subscribe server enforces per-client drop-oldest back-pressure
// for the structured topics; SubscribeVideo applies the same back-
// pressure to the bytes_inline frame queue when the operator client
// cannot keep up.
//
// MapObjectsBundle (topic on Subscribe) is special: on subscribe the
// server first emits a Snapshot variant of MapObjectsBundleMessage
// and then forwards Diff variants for in-flight changes. Reconnect
// is treated as a new subscribe — a fresh Snapshot is emitted and
// diffs accumulated during the disconnect are NOT replayed.
syntax = "proto3";
package autopilot.telemetry.v1;
// Topics a client can subscribe to. Repeated in SubscribeRequest so a
// single stream multiplexes whichever subset the client cares about.
enum Topic {
TOPIC_UNSPECIFIED = 0;
TOPIC_TELEMETRY_SAMPLE = 1;
TOPIC_GIMBAL_STATE = 2;
TOPIC_DETECTION_EVENT = 3;
TOPIC_MOVEMENT_CANDIDATE = 4;
TOPIC_MAP_OBJECTS_BUNDLE = 5;
// AZ-679 — operator-bound POI events (surfaced + dequeued). JSON
// payload is a tagged enum (`kind: poi_surfaced | poi_dequeued`).
TOPIC_OPERATOR_EVENT = 6;
}
message SubscribeRequest {
// Operator/client identifier. Plumbed into per-client drop counters
// and log lines. Free-form for AZ-675; AZ-678 (operator command
// auth) will tighten the format.
string client_id = 1;
repeated Topic topics = 2;
}
// Each message carries the topic tag + an opaque JSON payload. We
// don't pin schemas per topic in proto here because the canonical
// payload shapes are already authoritative in `crates/shared/models`
// (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate,
// MapObjectsBundle). Using JSON keeps the wire honest with what the
// rest of the system already serializes; if a topic later needs
// schema enforcement, add a typed message variant in a future bump.
message TelemetryMessage {
Topic topic = 1;
// Monotonic nanoseconds since the autopilot process started. Used
// by the operator for ordering and latency measurement.
uint64 monotonic_ts_ns = 2;
// Server-side per-client sequence number. Strictly increases per
// (client, topic) stream; gaps imply drops.
uint64 sequence = 3;
// Serialized JSON payload (utf-8). Topic determines the shape.
bytes payload_json = 4;
}
// Pixel format enum mirroring `shared::models::frame::PixelFormat`.
// Only used by VideoFrame (bytes_inline mode).
enum PixelFormat {
PIXEL_FORMAT_UNSPECIFIED = 0;
PIXEL_FORMAT_NV12 = 1;
PIXEL_FORMAT_YUV420P = 2;
PIXEL_FORMAT_RGB24 = 3;
}
// Operator-bound video delivery mode. Per AZ-676 the autopilot is
// configured at startup to either forward the RTSP URL straight to
// the operator (lower onboard cost; default) or carry encoded bytes
// over this gRPC stream.
enum VideoMode {
VIDEO_MODE_UNSPECIFIED = 0;
VIDEO_MODE_RTSP_FORWARD = 1;
VIDEO_MODE_BYTES_INLINE = 2;
}
message SubscribeVideoRequest {
// Operator/client identifier — plumbed into the ai_locked session
// counter, drop counters, and log lines.
string client_id = 1;
}
// First message every SubscribeVideo stream emits. Tells the operator
// which mode the autopilot is configured in and, for rtsp_forward,
// the URL the operator should pull from.
message VideoSessionStart {
VideoMode mode = 1;
// Populated iff `mode == VIDEO_MODE_RTSP_FORWARD`.
string rtsp_url = 2;
}
// Encoded video frame (one decoded image from frame_ingest). Emitted
// only when `mode == VIDEO_MODE_BYTES_INLINE`.
message VideoFrame {
uint64 seq = 1;
uint64 monotonic_ts_ns = 2;
uint32 width = 3;
uint32 height = 4;
PixelFormat pix_fmt = 5;
bytes pixels = 6;
}
// Server-streamed messages on SubscribeVideo. Exactly one start
// message is always sent first, followed by zero or more frames
// (bytes_inline mode only).
message VideoMessage {
oneof kind {
VideoSessionStart start = 1;
VideoFrame frame = 2;
}
}
service TelemetryStream {
// Server-streaming subscribe. The client sends ONE SubscribeRequest;
// the server pushes TelemetryMessage values until the client cancels
// the stream or the server shuts down. The server applies per-
// client drop-oldest back-pressure if the client cannot keep up.
rpc Subscribe(SubscribeRequest) returns (stream TelemetryMessage);
// AZ-676 operator video path. The first message on every stream is
// a VideoSessionStart describing the configured delivery mode; in
// rtsp_forward mode no further messages are sent until disconnect.
// In bytes_inline mode the server forwards frames published by
// frame_ingest with the same per-client drop-oldest back-pressure
// as Subscribe (a slow operator loses frames on its own stream
// without affecting other clients or the AI pipeline).
rpc SubscribeVideo(SubscribeVideoRequest) returns (stream VideoMessage);
}