[AZ-676] [AZ-677] [AZ-678] [AZ-679] telemetry+operator foundation

Batch 15 ships the four foundation tickets sitting on top of AZ-675
(gRPC server) and AZ-667 (mapobjects_store hydrate):

* AZ-676: telemetry_stream video path (rtsp_forward + bytes_inline)
  with ai_locked atomic + session counter, SubscribeVideo RPC.
* AZ-677: MapObjects snapshot-on-subscribe + diff broadcast +
  reconnect-resync (StartThen stream-prepend pattern).
* AZ-678: HmacOperatorValidator with per-session monotonic seq,
  in-process session registry + TTL, constant-time HMAC compare,
  rejection-reason counters, sliding 60 s sig-failure red-health gate.
  Trait OperatorCommandValidator in shared::contracts::operator_auth.
* AZ-679: PoiSurfaceMapper produces OperatorPoiEvent per architecture
  §7.10; PoiDequeued events on rotate/age-out/complete; pushed via
  new TelemetrySink::push_operator_event extension on Topic::OperatorEvent.

Cross-task wiring: TelemetrySink trait extended with
push_operator_event; OperatorBridge gets optional builder methods
with_telemetry_sink / with_validator (composition root wires in
AZ-680). Workspace deps: hmac = "0.12"; per-crate adds bytes,
serde_json, parking_lot, chrono, uuid, sha2, thiserror.

Tests: 14/14 ACs verified locally (4 + 3 + 5 + 3 by AC) plus
6 supporting unit tests + 7 integration tests + 2 shared serde
roundtrips. cargo clippy clean on touched crates. Cumulative
review for batches 13-15 produced; verdict PASS_WITH_WARNINGS
(0 Critical, 0 High, 1 Medium, 4 Low — all carry-overs or
deferred-producer notes for AZ-680/AZ-684).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 16:18:40 +03:00
parent 0eb09eec2d
commit ccf929af69
29 changed files with 3495 additions and 68 deletions
@@ -0,0 +1,178 @@
//! AZ-677 — MapObjectsBundle snapshot + in-flight diff stream.
//!
//! Pattern: every operator client that subscribes to
//! `Topic::MapObjectsBundle` first receives one
//! [`MapObjectsTopicMessage::Snapshot`] built from the configured
//! [`MapObjectsSnapshotSource`], and then receives
//! [`MapObjectsTopicMessage::Diff`] messages for every append the
//! composition root publishes via
//! [`crate::TelemetryStreamHandle::push_mapobjects_diff`]. On
//! reconnect, the client is treated as a fresh subscriber: it gets a
//! brand new snapshot — diffs that were broadcast during the gap are
//! NOT replayed (per AZ-677 spec — best-effort replay creates
//! consistency hazards).
//!
//! The snapshot source lives outside `telemetry_stream` (composition
//! root supplies an `Arc<dyn MapObjectsSnapshotSource>` that adapts
//! `mapobjects_store::MapObjectsStore::snapshot()`). The diff
//! publishing side is fed by the same composition root, which
//! subscribes to the store's append log and forwards each entry as
//! `push_mapobjects_diff(diff)`.
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use shared::models::mapobject::{IgnoredItem, MapObject, MapObjectObservation, MapObjectsBundle};
/// Wire shape of a diff message. Mirrors `data_model.md §MapObjectsDiff`
/// (added observations, moved observations, removed candidates, newly
/// ignored items). Empty vectors are valid — the publisher may emit a
/// diff with only one populated bucket.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MapObjectsDiff {
#[serde(default)]
pub added: Vec<MapObjectObservation>,
#[serde(default)]
pub moved: Vec<MapObjectObservation>,
#[serde(default)]
pub removed_candidates: Vec<MapObject>,
#[serde(default)]
pub ignored: Vec<IgnoredItem>,
}
/// Wire shape of the initial snapshot. Re-exposes the canonical
/// `MapObjectsBundle` payload — no transformation, just a tag so the
/// operator can tell snapshot from diff on the same topic.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapObjectsBundleSnapshot {
pub bundle: MapObjectsBundle,
}
/// Tagged enum carried as the JSON payload on every
/// `Topic::MapObjectsBundle` message. The discriminator is
/// `"kind": "snapshot" | "diff"` so the operator deserialises with a
/// `serde(tag = "kind")` adjacent-tagging.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum MapObjectsTopicMessage {
Snapshot(MapObjectsBundleSnapshot),
Diff(MapObjectsDiff),
}
/// Provided by the composition root, implemented in
/// `mapobjects_store` (via a thin adapter). `telemetry_stream` queries
/// this on every fresh MapObjectsBundle subscribe.
///
/// Implementations MUST be cheap to call concurrently (read-only).
pub trait MapObjectsSnapshotSource: Send + Sync + 'static {
fn snapshot(&self) -> MapObjectsBundle;
}
/// Fixture impl for tests + the default "no store wired yet" mode.
/// Returns an empty bundle keyed to the supplied `mission_id`.
///
/// Production code MUST replace this with a real adapter; the empty
/// bundle is acceptable only for unit tests and for the case where
/// the composition root has not finished wiring (a green-field
/// startup race).
pub struct EmptyMapObjectsSource {
pub mission_id: String,
}
impl MapObjectsSnapshotSource for EmptyMapObjectsSource {
fn snapshot(&self) -> MapObjectsBundle {
use chrono::Utc;
use shared::models::mission::Coordinate;
let zero = Coordinate {
latitude: 0.0,
longitude: 0.0,
altitude_m: 0.0,
};
MapObjectsBundle {
schema_version: "1.0".to_string(),
mission_id: self.mission_id.clone(),
bbox: [zero, zero],
map_objects: Vec::new(),
observations: Vec::new(),
ignored_items: Vec::new(),
as_of: Utc::now(),
freshness: None,
}
}
}
/// Type-erased snapshot source — what `TelemetryStream` holds.
pub type SharedSnapshotSource = Arc<dyn MapObjectsSnapshotSource>;
#[cfg(test)]
mod tests {
use super::*;
use shared::models::mission::Coordinate;
#[test]
fn topic_message_serde_roundtrip_snapshot() {
// Arrange
let bundle = MapObjectsBundle {
schema_version: "1.0".to_string(),
mission_id: "m1".to_string(),
bbox: [
Coordinate {
latitude: 0.0,
longitude: 0.0,
altitude_m: 0.0,
},
Coordinate {
latitude: 1.0,
longitude: 1.0,
altitude_m: 0.0,
},
],
map_objects: vec![],
observations: vec![],
ignored_items: vec![],
as_of: chrono::Utc::now(),
freshness: None,
};
let msg = MapObjectsTopicMessage::Snapshot(MapObjectsBundleSnapshot { bundle });
// Act
let s = serde_json::to_string(&msg).unwrap();
let back: MapObjectsTopicMessage = serde_json::from_str(&s).unwrap();
// Assert
assert!(matches!(back, MapObjectsTopicMessage::Snapshot(_)));
assert!(s.contains("\"kind\":\"snapshot\""));
}
#[test]
fn topic_message_serde_roundtrip_diff() {
// Arrange
let msg = MapObjectsTopicMessage::Diff(MapObjectsDiff::default());
// Act
let s = serde_json::to_string(&msg).unwrap();
let back: MapObjectsTopicMessage = serde_json::from_str(&s).unwrap();
// Assert
assert!(matches!(back, MapObjectsTopicMessage::Diff(_)));
assert!(s.contains("\"kind\":\"diff\""));
}
#[test]
fn empty_source_returns_empty_bundle_with_mission_id() {
// Arrange
let src = EmptyMapObjectsSource {
mission_id: "m42".to_string(),
};
// Act
let b = src.snapshot();
// Assert
assert_eq!(b.mission_id, "m42");
assert!(b.map_objects.is_empty());
assert!(b.observations.is_empty());
assert!(b.ignored_items.is_empty());
}
}
@@ -1,5 +1,8 @@
//! Internal modules for `telemetry_stream`. Not part of the public API.
pub mod mapobjects;
pub mod proto;
pub mod publisher;
pub mod server;
pub mod video;
pub mod video_server;
@@ -17,6 +17,9 @@ use serde::Serialize;
use tokio::sync::broadcast;
use tracing::warn;
use crate::internal::mapobjects::{
MapObjectsBundleSnapshot, MapObjectsDiff, MapObjectsTopicMessage, SharedSnapshotSource,
};
use crate::internal::proto::{TelemetryMessage, Topic};
/// Per-topic broadcast capacity. A client falling more than this many
@@ -34,6 +37,7 @@ pub const ALL_TOPICS: &[Topic] = &[
Topic::DetectionEvent,
Topic::MovementCandidate,
Topic::MapObjectsBundle,
Topic::OperatorEvent,
];
/// Errors returned by [`TelemetryPublisher::publish`]. Publish never
@@ -96,6 +100,20 @@ pub struct TelemetryPublisher {
topics: HashMap<Topic, TopicChannel>,
drops: DropMap,
subscribed_clients: AtomicUsize,
/// AZ-677 — composition-root-supplied snapshot source. Read on
/// every fresh MapObjectsBundle subscribe.
snapshot_source: Mutex<Option<SharedSnapshotSource>>,
/// AZ-677 — `mapobjects_resnap_count` counter. Incremented every
/// time the subscribe handler emits a snapshot (new client OR
/// reconnecting client).
mapobjects_resnap_count: AtomicU64,
/// AZ-677 — `mapobjects_diff_count` counter. Incremented every
/// time `publish_mapobjects_diff` is called.
mapobjects_diff_count: AtomicU64,
/// AZ-677 — cumulative bytes of the most recently serialised
/// snapshot. Updated by `current_snapshot_message()` so the
/// health surface can report bundle weight without re-serialising.
last_snapshot_bytes: AtomicU64,
}
impl TelemetryPublisher {
@@ -111,9 +129,74 @@ impl TelemetryPublisher {
topics,
drops: Mutex::new(HashMap::new()),
subscribed_clients: AtomicUsize::new(0),
snapshot_source: Mutex::new(None),
mapobjects_resnap_count: AtomicU64::new(0),
mapobjects_diff_count: AtomicU64::new(0),
last_snapshot_bytes: AtomicU64::new(0),
})
}
/// Composition-root entry point. Wires the
/// `MapObjectsSnapshotSource` (typically an adapter over
/// `mapobjects_store::MapObjectsStore`). Replacing an existing
/// source is allowed (test fixtures use this).
pub fn set_snapshot_source(&self, src: SharedSnapshotSource) {
*self.snapshot_source.lock() = Some(src);
}
/// AZ-677 — build the snapshot message the subscribe handler must
/// emit before forwarding any diff. Returns `None` when no
/// snapshot source has been wired yet; the subscribe handler then
/// proceeds straight to the diff broadcast (an empty store is the
/// natural cold-start state).
pub(crate) fn current_snapshot_message(&self) -> Option<TelemetryMessage> {
let snap_src = self.snapshot_source.lock().as_ref().map(Arc::clone)?;
let bundle = snap_src.snapshot();
let payload = MapObjectsTopicMessage::Snapshot(MapObjectsBundleSnapshot { bundle });
let bytes = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "mapobjects snapshot serialise failed; skipping");
return None;
}
};
self.last_snapshot_bytes
.store(bytes.len() as u64, Ordering::Relaxed);
self.mapobjects_resnap_count.fetch_add(1, Ordering::Relaxed);
let topic = Topic::MapObjectsBundle;
let channel = self.topics.get(&topic)?;
let seq = channel.seq.fetch_add(1, Ordering::Relaxed) + 1;
Some(TelemetryMessage {
topic: topic as i32,
monotonic_ts_ns: shared::clock::MonoClock::new().elapsed_ns(),
sequence: seq,
payload_json: bytes,
})
}
/// AZ-677 — broadcast a MapObjectsDiff to every active operator
/// subscriber that has the MapObjectsBundle topic in their
/// subscription set. The composition root calls this whenever
/// `mapobjects_store` appends an observation / ignored item.
///
/// Diffs flow through the existing `Topic::MapObjectsBundle`
/// broadcast channel — discriminated from snapshots by the
/// `"kind": "diff"` tag on the JSON payload.
pub fn publish_mapobjects_diff(&self, diff: MapObjectsDiff) -> Result<(), PublishError> {
let payload = MapObjectsTopicMessage::Diff(diff);
self.publish(Topic::MapObjectsBundle, &payload)?;
self.mapobjects_diff_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn mapobjects_counters(&self) -> (u64, u64, u64) {
(
self.mapobjects_resnap_count.load(Ordering::Relaxed),
self.mapobjects_diff_count.load(Ordering::Relaxed),
self.last_snapshot_bytes.load(Ordering::Relaxed),
)
}
pub fn default_capacity() -> Arc<Self> {
Self::new(DEFAULT_TOPIC_CAPACITY)
}
+60 -4
View File
@@ -23,16 +23,22 @@ use tonic::{Request, Response, Status};
use tracing::{info, warn};
use crate::internal::proto::telemetry_stream_server::TelemetryStream;
use crate::internal::proto::{SubscribeRequest, TelemetryMessage, Topic};
use crate::internal::proto::{SubscribeRequest, SubscribeVideoRequest, TelemetryMessage, Topic};
use crate::internal::publisher::{TelemetryPublisher, ALL_TOPICS};
use crate::internal::video::VideoPublisher;
use crate::internal::video_server::{VideoService, VideoStream};
pub struct TelemetryService {
publisher: Arc<TelemetryPublisher>,
video: Arc<VideoService>,
}
impl TelemetryService {
pub fn new(publisher: Arc<TelemetryPublisher>) -> Self {
Self { publisher }
pub fn new(publisher: Arc<TelemetryPublisher>, video_publisher: Arc<VideoPublisher>) -> Self {
Self {
publisher,
video: Arc::new(VideoService::new(video_publisher)),
}
}
}
@@ -41,6 +47,7 @@ type SubscribeStream = Pin<Box<dyn Stream<Item = Result<TelemetryMessage, Status
#[tonic::async_trait]
impl TelemetryStream for TelemetryService {
type SubscribeStream = SubscribeStream;
type SubscribeVideoStream = VideoStream;
async fn subscribe(
&self,
@@ -84,9 +91,24 @@ impl TelemetryStream for TelemetryService {
self.publisher.register_client();
info!(client_id = %client_id, topics = ?requested, "telemetry subscribe");
// AZ-677 — if the client asked for MapObjectsBundle (either
// explicitly or via the default "all topics" path), capture
// the current snapshot now so the per-client stream emits it
// before any diff. The snapshot is computed exactly once per
// subscribe (a reconnect = a fresh subscribe → fresh snapshot,
// diffs that flew during the gap are NOT replayed).
let mapobjects_snapshot = if requested
.iter()
.any(|t| matches!(t, Topic::MapObjectsBundle))
{
self.publisher.current_snapshot_message()
} else {
None
};
let publisher = Arc::clone(&self.publisher);
let cid = client_id.clone();
let stream = map.filter_map(move |(topic, item)| match item {
let body = map.filter_map(move |(topic, item)| match item {
Ok(msg) => Some(Ok(msg)),
Err(BroadcastStreamRecvError::Lagged(n)) => {
warn!(client_id = %cid, ?topic, dropped = n, "slow client lagged");
@@ -95,6 +117,11 @@ impl TelemetryStream for TelemetryService {
}
});
let stream = StartThen {
start: mapobjects_snapshot.map(Ok),
body,
};
let stream = StreamGuard {
inner: stream,
publisher: Arc::clone(&self.publisher),
@@ -102,6 +129,35 @@ impl TelemetryStream for TelemetryService {
Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
}
async fn subscribe_video(
&self,
request: Request<SubscribeVideoRequest>,
) -> Result<Response<Self::SubscribeVideoStream>, Status> {
self.video.handle_subscribe(request).await
}
}
/// AZ-677 — emit `start` once (the MapObjects snapshot), then yield
/// everything from `body`. When `start` is `None` the stream
/// degenerates to `body` with zero overhead.
struct StartThen<S> {
start: Option<Result<TelemetryMessage, Status>>,
body: S,
}
impl<S> Stream for StartThen<S>
where
S: Stream<Item = Result<TelemetryMessage, Status>> + Send + Unpin,
{
type Item = Result<TelemetryMessage, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(msg) = self.start.take() {
return Poll::Ready(Some(msg));
}
Pin::new(&mut self.body).poll_next(cx)
}
}
/// Decrement `subscribed_clients` when the per-client outbound
@@ -0,0 +1,365 @@
//! AZ-676 — operator video path.
//!
//! Two delivery modes selected at startup via [`VideoPath`]:
//! - `RtspForward { url }`: the autopilot tells the operator which
//! RTSP URL the camera is publishing on; bytes never traverse this
//! gRPC stream. This is the recommended default (lower onboard
//! cost, no per-frame copy).
//! - `BytesInline`: the operator pulls encoded frames over the
//! `SubscribeVideo` stream. `frame_ingest` publishes each decoded
//! frame here via [`VideoPublisher::publish_frame`]; the per-client
//! stream applies drop-oldest back-pressure identical to the
//! structured `Subscribe` path so a slow operator never blocks
//! `frame_ingest`.
//!
//! ## ai_locked coordination
//!
//! [`VideoPublisher`] owns an `Arc<AtomicBool>` exposed via
//! [`VideoPublisher::ai_locked_handle`]. The atomic is shared with
//! `frame_ingest` and `detection_client` (composition root wires it
//! into their constructors). The atomic flips:
//! - `false → true` when the first operator subscribes to
//! `SubscribeVideo` (first session join).
//! - `true → false` when the last operator disconnects (last session
//! leave).
//!
//! In `RtspForward` mode the same toggle applies — even though we
//! emit only the URL, the operator is consuming the video path and
//! AI must back off the frame budget.
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::broadcast;
use tracing::warn;
use shared::models::frame::{Frame, PixelFormat};
/// Server-side per-client outbound broadcast capacity for the
/// bytes_inline frame channel. Frames are large (full-resolution
/// pixel buffers) so the budget is smaller than the structured-topic
/// publisher: ≥1 second of headroom at 30 fps is enough for transient
/// modem stalls without ballooning memory.
pub const DEFAULT_VIDEO_CAPACITY: usize = 32;
/// Selected at startup. The autopilot's `config.video_path` resolves
/// to one of these.
#[derive(Debug, Clone)]
pub enum VideoPath {
/// Emit the configured RTSP URL on session-start; no bytes flow
/// through this gRPC stream. Operator stacks pull RTSP directly
/// from the camera (most common).
RtspForward { url: String },
/// Carry encoded bytes over the gRPC stream. Used when the
/// operator cannot reach the camera's RTSP source directly.
BytesInline,
}
impl Default for VideoPath {
fn default() -> Self {
// The architecture default is rtsp_forward with an empty URL
// placeholder; the composition root must set the real URL
// before binding the server. We choose a sentinel URL so a
// misconfigured deployment surfaces in the operator session-
// start message rather than silently mis-pointing.
Self::RtspForward {
url: "rtsp://unconfigured.invalid/stream".to_string(),
}
}
}
impl VideoPath {
pub fn mode_label(&self) -> &'static str {
match self {
Self::RtspForward { .. } => "rtsp_forward",
Self::BytesInline => "bytes_inline",
}
}
}
/// Wire-shaped video frame. We carry exactly what
/// `shared::models::frame::Frame` carries, minus the `ai_locked`
/// flag (it's a control signal, not a per-frame property the
/// operator needs).
///
/// Pixels are cloned (`Arc<Bytes>` shallow clone — O(1)) into the
/// broadcast channel; downstream the gRPC encode path turns them
/// into the proto `VideoFrame` message.
#[derive(Debug, Clone)]
pub struct VideoFrameMessage {
pub seq: u64,
pub monotonic_ts_ns: u64,
pub width: u32,
pub height: u32,
pub pix_fmt: PixelFormat,
pub pixels: bytes::Bytes,
}
impl From<&Frame> for VideoFrameMessage {
fn from(f: &Frame) -> Self {
Self {
seq: f.seq,
monotonic_ts_ns: f.decode_ts_monotonic_ns,
width: f.width,
height: f.height,
pix_fmt: f.pix_fmt,
pixels: (*f.pixels).clone(),
}
}
}
/// Snapshot of video-path health for the
/// [`crate::TelemetryStreamHandle::health`] surface.
#[derive(Debug, Clone)]
pub struct VideoSnapshot {
pub mode: &'static str,
pub ai_locked: bool,
pub video_session_count: usize,
pub published_frames: u64,
pub bytes_inline_drops_total: u64,
}
pub struct VideoPublisher {
path: VideoPath,
/// Per-client broadcast for bytes_inline mode. Allocated even in
/// rtsp_forward mode so [`publish_frame`] is a cheap no-op (no
/// branch on the hot path beyond the mode check). Subscriber
/// count drives the per-client send.
tx: broadcast::Sender<VideoFrameMessage>,
ai_locked: Arc<AtomicBool>,
/// Live operator subscribers to `SubscribeVideo`. The atomic flip
/// is keyed off the transition through zero in either direction.
video_session_count: Arc<AtomicUsize>,
/// Aggregate per-client drops on the video broadcast. Equivalent
/// to `bytes_inline_drops_total` in the AZ-676 health surface.
bytes_inline_drops: Arc<AtomicU64>,
/// `publish_frame` call count (incremented in both modes; in
/// rtsp_forward it stays 0 because the function returns early).
published_frames: AtomicU64,
drops_per_client: Mutex<std::collections::HashMap<String, AtomicU64>>,
}
impl VideoPublisher {
pub fn new(path: VideoPath, capacity: usize) -> Arc<Self> {
let (tx, _) = broadcast::channel(capacity);
Arc::new(Self {
path,
tx,
ai_locked: Arc::new(AtomicBool::new(false)),
video_session_count: Arc::new(AtomicUsize::new(0)),
bytes_inline_drops: Arc::new(AtomicU64::new(0)),
published_frames: AtomicU64::new(0),
drops_per_client: Mutex::new(std::collections::HashMap::new()),
})
}
pub fn default_capacity(path: VideoPath) -> Arc<Self> {
Self::new(path, DEFAULT_VIDEO_CAPACITY)
}
/// Shared `Arc<AtomicBool>` siblings (`frame_ingest`,
/// `detection_client`) read at decode/inference time. The atomic
/// is owned by `telemetry_stream`; siblings only read.
pub fn ai_locked_handle(&self) -> Arc<AtomicBool> {
Arc::clone(&self.ai_locked)
}
pub fn mode(&self) -> &VideoPath {
&self.path
}
/// Publish one decoded frame. In rtsp_forward mode this is a
/// no-op (the operator never pulls bytes through this server);
/// the call exists so `frame_ingest` can always invoke
/// `TelemetrySink::push_frame` regardless of configuration.
pub fn publish_frame(&self, frame: &Frame) {
if matches!(self.path, VideoPath::RtspForward { .. }) {
return;
}
let msg = VideoFrameMessage::from(frame);
// `broadcast::send` returns the number of receivers it
// queued for; Err means no receivers, which is fine and
// expected (no operator subscribed).
let _ = self.tx.send(msg);
self.published_frames.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn subscribe_video(&self) -> broadcast::Receiver<VideoFrameMessage> {
self.tx.subscribe()
}
/// Called by the gRPC `SubscribeVideo` handler when a new client
/// joins. Returns the post-join session count. The first joiner
/// (transition 0 → 1) flips `ai_locked` to `true`.
pub(crate) fn register_session(&self) -> usize {
let prev = self.video_session_count.fetch_add(1, Ordering::AcqRel);
if prev == 0 {
self.ai_locked.store(true, Ordering::Release);
}
prev + 1
}
/// Called by the gRPC handler (via `Drop` on the per-client
/// guard) when a client disconnects. The last leaver (transition
/// 1 → 0) flips `ai_locked` back to `false`.
pub(crate) fn deregister_session(&self) -> usize {
let prev = self.video_session_count.fetch_sub(1, Ordering::AcqRel);
if prev == 1 {
self.ai_locked.store(false, Ordering::Release);
} else if prev == 0 {
// Defensive: should never underflow because every
// deregister is paired with a register. Log loudly so we
// catch wiring mistakes early.
warn!("video_session_count underflow — register/deregister mismatch");
self.video_session_count.store(0, Ordering::Release);
}
prev.saturating_sub(1)
}
pub fn record_drops(&self, client_id: &str, n: u64) {
if n == 0 {
return;
}
self.bytes_inline_drops.fetch_add(n, Ordering::Relaxed);
let mut map = self.drops_per_client.lock();
map.entry(client_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(n, Ordering::Relaxed);
}
pub fn snapshot(&self) -> VideoSnapshot {
VideoSnapshot {
mode: self.path.mode_label(),
ai_locked: self.ai_locked.load(Ordering::Acquire),
video_session_count: self.video_session_count.load(Ordering::Acquire),
published_frames: self.published_frames.load(Ordering::Relaxed),
bytes_inline_drops_total: self.bytes_inline_drops.load(Ordering::Relaxed),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
fn frame(seq: u64, ai_locked: bool) -> Frame {
Frame {
seq,
capture_ts_monotonic_ns: seq,
decode_ts_monotonic_ns: seq + 1,
pixels: Arc::new(bytes::Bytes::from(vec![0u8; 16])),
width: 4,
height: 4,
pix_fmt: PixelFormat::Nv12,
ai_locked,
}
}
#[test]
fn rtsp_forward_publish_frame_is_a_no_op() {
// Arrange
let pubv = VideoPublisher::default_capacity(VideoPath::RtspForward {
url: "rtsp://x/y".to_string(),
});
// Act
pubv.publish_frame(&frame(1, false));
pubv.publish_frame(&frame(2, false));
// Assert
let snap = pubv.snapshot();
assert_eq!(snap.published_frames, 0);
assert_eq!(snap.mode, "rtsp_forward");
}
#[test]
fn bytes_inline_publish_frame_counts_and_fans_out() {
// Arrange
let pubv = VideoPublisher::default_capacity(VideoPath::BytesInline);
let mut rx = pubv.subscribe_video();
// Act
pubv.publish_frame(&frame(1, false));
pubv.publish_frame(&frame(2, false));
// Assert
let snap = pubv.snapshot();
assert_eq!(snap.published_frames, 2);
assert_eq!(snap.mode, "bytes_inline");
assert_eq!(rx.try_recv().unwrap().seq, 1);
assert_eq!(rx.try_recv().unwrap().seq, 2);
}
#[test]
fn register_first_session_flips_ai_locked_true() {
// Arrange
let pubv = VideoPublisher::default_capacity(VideoPath::BytesInline);
let flag = pubv.ai_locked_handle();
assert!(!flag.load(Ordering::Acquire));
// Act
let n = pubv.register_session();
// Assert
assert_eq!(n, 1);
assert!(flag.load(Ordering::Acquire));
assert_eq!(pubv.snapshot().video_session_count, 1);
}
#[test]
fn deregister_last_session_flips_ai_locked_false() {
// Arrange
let pubv = VideoPublisher::default_capacity(VideoPath::BytesInline);
let flag = pubv.ai_locked_handle();
pubv.register_session();
pubv.register_session();
assert!(flag.load(Ordering::Acquire));
assert_eq!(pubv.snapshot().video_session_count, 2);
// Act 1 — one session leaves; flag must still be true.
let after_first_leave = pubv.deregister_session();
assert_eq!(after_first_leave, 1);
assert!(
flag.load(Ordering::Acquire),
"one session left → still locked"
);
// Act 2 — last session leaves; flag must flip to false.
let after_second_leave = pubv.deregister_session();
assert_eq!(after_second_leave, 0);
assert!(
!flag.load(Ordering::Acquire),
"last session left → unlocked"
);
}
#[test]
fn record_drops_aggregates_and_per_client() {
// Arrange
let pubv = VideoPublisher::default_capacity(VideoPath::BytesInline);
// Act
pubv.record_drops("op_a", 5);
pubv.record_drops("op_a", 2);
pubv.record_drops("op_b", 3);
// Assert
assert_eq!(pubv.snapshot().bytes_inline_drops_total, 10);
}
#[test]
fn mode_label_matches_task_spec_strings() {
// The AZ-676 task spec calls these out as the operator-facing
// mode strings; pin them as a regression guard.
assert_eq!(VideoPath::BytesInline.mode_label(), "bytes_inline");
assert_eq!(
VideoPath::RtspForward {
url: "rtsp://x".into()
}
.mode_label(),
"rtsp_forward"
);
}
}
@@ -0,0 +1,167 @@
//! AZ-676 — `SubscribeVideo` RPC handler.
//!
//! Each accepted stream:
//! 1. Registers the session (increments `video_session_count`; flips
//! `ai_locked` to `true` on the 0 → 1 transition).
//! 2. Emits exactly one `VideoSessionStart` describing the configured
//! delivery mode (`rtsp_forward { rtsp_url }` or `bytes_inline`).
//! 3. In `bytes_inline` mode, forwards `VideoFrameMessage`s from the
//! publisher's broadcast channel as `VideoFrame` proto messages.
//! Lagged broadcast → drop accounting (per AZ-676 spec; bytes_inline
//! drops_total counter on the health surface).
//! 4. On stream drop, deregisters the session (decrements counter;
//! flips `ai_locked` to `false` on the 1 → 0 transition).
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status};
use tracing::{info, warn};
use crate::internal::proto::{
video_message, PixelFormat as ProtoPixelFormat, SubscribeVideoRequest, VideoFrame,
VideoMessage, VideoMode, VideoSessionStart,
};
use crate::internal::video::{VideoFrameMessage, VideoPath, VideoPublisher};
use shared::models::frame::PixelFormat as SharedPixelFormat;
pub type VideoStream = Pin<Box<dyn Stream<Item = Result<VideoMessage, Status>> + Send>>;
pub struct VideoService {
publisher: Arc<VideoPublisher>,
}
impl VideoService {
pub fn new(publisher: Arc<VideoPublisher>) -> Self {
Self { publisher }
}
pub async fn handle_subscribe(
&self,
request: Request<SubscribeVideoRequest>,
) -> Result<Response<VideoStream>, Status> {
let req = request.into_inner();
if req.client_id.trim().is_empty() {
return Err(Status::invalid_argument("client_id is required"));
}
let client_id = req.client_id.clone();
let session_n = self.publisher.register_session();
info!(client_id = %client_id, session_n, mode = self.publisher.mode().mode_label(), "video subscribe");
let start_msg = match self.publisher.mode() {
VideoPath::RtspForward { url } => VideoMessage {
kind: Some(video_message::Kind::Start(VideoSessionStart {
mode: VideoMode::RtspForward as i32,
rtsp_url: url.clone(),
})),
},
VideoPath::BytesInline => VideoMessage {
kind: Some(video_message::Kind::Start(VideoSessionStart {
mode: VideoMode::BytesInline as i32,
rtsp_url: String::new(),
})),
},
};
// Build the body stream: in bytes_inline mode, forward frames
// from the broadcast. In rtsp_forward mode the body is empty
// (operator keeps the stream open just to hold the ai_locked
// session; we hand it `pending` so it sits idle until the
// client cancels).
let publisher = Arc::clone(&self.publisher);
let cid = client_id.clone();
let body: VideoStream = match self.publisher.mode() {
VideoPath::RtspForward { .. } => Box::pin(tokio_stream::pending()),
VideoPath::BytesInline => {
let rx = self.publisher.subscribe_video();
let mapped = BroadcastStream::new(rx).filter_map(move |item| match item {
Ok(f) => Some(Ok(VideoMessage {
kind: Some(video_message::Kind::Frame(to_proto_frame(&f))),
})),
Err(BroadcastStreamRecvError::Lagged(n)) => {
warn!(client_id = %cid, dropped = n, "video client lagged");
publisher.record_drops(&cid, n);
None
}
});
Box::pin(mapped)
}
};
let stream = StartThen {
start: Some(Ok(start_msg)),
body,
};
let guarded = VideoStreamGuard {
inner: stream,
publisher: Arc::clone(&self.publisher),
};
Ok(Response::new(Box::pin(guarded) as VideoStream))
}
}
fn to_proto_frame(f: &VideoFrameMessage) -> VideoFrame {
let pix = match f.pix_fmt {
SharedPixelFormat::Nv12 => ProtoPixelFormat::Nv12,
SharedPixelFormat::Yuv420p => ProtoPixelFormat::Yuv420p,
SharedPixelFormat::Rgb24 => ProtoPixelFormat::Rgb24,
};
VideoFrame {
seq: f.seq,
monotonic_ts_ns: f.monotonic_ts_ns,
width: f.width,
height: f.height,
pix_fmt: pix as i32,
pixels: f.pixels.to_vec(),
}
}
/// Emit `start` once, then yield everything from `body`. Cheaper than
/// `stream::once(...).chain(body)` because we avoid allocating an
/// extra adapter just for one message.
struct StartThen<S> {
start: Option<Result<VideoMessage, Status>>,
body: S,
}
impl<S> Stream for StartThen<S>
where
S: Stream<Item = Result<VideoMessage, Status>> + Send + Unpin,
{
type Item = Result<VideoMessage, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(msg) = self.start.take() {
return Poll::Ready(Some(msg));
}
Pin::new(&mut self.body).poll_next(cx)
}
}
/// Deregister the video session when the per-client outbound stream
/// drops. This flips `ai_locked` back to `false` on the last leaver.
struct VideoStreamGuard<S> {
inner: S,
publisher: Arc<VideoPublisher>,
}
impl<S: Stream + Unpin> Stream for VideoStreamGuard<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<S> Drop for VideoStreamGuard<S> {
fn drop(&mut self) {
self.publisher.deregister_session();
}
}
+175 -15
View File
@@ -1,18 +1,22 @@
//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink.
//!
//! Real implementations:
//! - **AZ-675 (this crate, this batch)**: Tonic gRPC server, per-client
//! bounded queue, drop-oldest back-pressure, drop counters. Topics:
//! - **AZ-675**: Tonic gRPC server, per-client bounded queue,
//! drop-oldest back-pressure, drop counters. Topics:
//! `TelemetrySample`, `GimbalState`, `DetectionEvent`,
//! `MovementCandidate`, `MapObjectsBundle`.
//! - **AZ-676**: video frame topic (separate RPC, server-streamed
//! binary payloads).
//! - **AZ-677**: diff-based snapshot emission for `MapObjectsBundle`.
//! - **AZ-676** (this crate, this batch): operator video path — two
//! modes (`RtspForward { url }`, `BytesInline`) plus shared
//! `ai_locked` atomic flipped by SubscribeVideo session counter.
//! - **AZ-677** (this crate, this batch): MapObjectsBundle snapshot
//! on subscribe + diff stream while connected + fresh snapshot on
//! reconnect (no diff replay).
//! - **AZ-678+**: command-auth on the return path (operator_bridge).
pub mod internal;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use async_trait::async_trait;
@@ -26,19 +30,28 @@ use shared::health::{ComponentHealth, HealthLevel};
use shared::models::detection::DetectionBatch;
use shared::models::frame::Frame;
use shared::models::operator::OperatorCommand;
use shared::models::operator_event::OperatorEvent;
use crate::internal::mapobjects::{MapObjectsDiff, SharedSnapshotSource};
use crate::internal::proto::telemetry_stream_server::TelemetryStreamServer;
use crate::internal::proto::Topic;
use crate::internal::publisher::{TelemetryPublisher, DEFAULT_TOPIC_CAPACITY};
use crate::internal::server::TelemetryService;
use crate::internal::video::{VideoPath, VideoPublisher, DEFAULT_VIDEO_CAPACITY};
pub use crate::internal::mapobjects::{
EmptyMapObjectsSource, MapObjectsBundleSnapshot, MapObjectsSnapshotSource,
MapObjectsTopicMessage,
};
pub use crate::internal::proto::{
telemetry_stream_client::TelemetryStreamClient, SubscribeRequest, TelemetryMessage,
Topic as TelemetryTopic,
telemetry_stream_client::TelemetryStreamClient, video_message, SubscribeRequest,
SubscribeVideoRequest, TelemetryMessage, Topic as TelemetryTopic, VideoFrame, VideoMessage,
VideoMode, VideoSessionStart,
};
pub use crate::internal::publisher::{
PerTopicCounters, PublishError, PublisherSnapshot, ALL_TOPICS,
};
pub use crate::internal::video::{VideoSnapshot, DEFAULT_VIDEO_CAPACITY as VIDEO_DEFAULT_CAPACITY};
const NAME: &str = "telemetry_stream";
@@ -56,6 +69,10 @@ pub struct TelemetryStreamConfig {
/// Bounded capacity of the downlink command channel that feeds
/// `operator_bridge`.
pub downlink_capacity: usize,
/// AZ-676 — video delivery mode + per-client video broadcast
/// capacity.
pub video_path: VideoPath,
pub video_capacity: usize,
}
impl Default for TelemetryStreamConfig {
@@ -64,12 +81,15 @@ impl Default for TelemetryStreamConfig {
listen_addr: "0.0.0.0:50061".parse().expect("hardcoded addr parses"),
topic_capacity: DEFAULT_TOPIC_CAPACITY,
downlink_capacity: 64,
video_path: VideoPath::default(),
video_capacity: DEFAULT_VIDEO_CAPACITY,
}
}
}
pub struct TelemetryStream {
publisher: Arc<TelemetryPublisher>,
video: Arc<VideoPublisher>,
commands_tx: mpsc::Sender<OperatorCommand>,
commands_rx: Option<mpsc::Receiver<OperatorCommand>>,
config: TelemetryStreamConfig,
@@ -85,9 +105,11 @@ impl TelemetryStream {
pub fn with_config(config: TelemetryStreamConfig) -> Self {
let publisher = TelemetryPublisher::new(config.topic_capacity);
let video = VideoPublisher::new(config.video_path.clone(), config.video_capacity);
let (commands_tx, commands_rx) = mpsc::channel(config.downlink_capacity);
Self {
publisher,
video,
commands_tx,
commands_rx: Some(commands_rx),
config,
@@ -97,10 +119,25 @@ impl TelemetryStream {
pub fn handle(&self) -> TelemetryStreamHandle {
TelemetryStreamHandle {
publisher: Arc::clone(&self.publisher),
video: Arc::clone(&self.video),
commands_tx: self.commands_tx.clone(),
}
}
/// AZ-676 — handle on the shared `ai_locked` atomic.
/// `frame_ingest` and `detection_client` read this at decode and
/// inference time. The composition root must call this and feed
/// the result into their constructors.
pub fn ai_locked_handle(&self) -> Arc<AtomicBool> {
self.video.ai_locked_handle()
}
/// AZ-677 — wire the snapshot source. The composition root passes
/// an adapter over `mapobjects_store::MapObjectsStore::snapshot()`.
pub fn set_mapobjects_snapshot_source(&self, src: SharedSnapshotSource) {
self.publisher.set_snapshot_source(src);
}
/// Take the downlink command receiver. The composition root
/// forwards it to `operator_bridge` as `Receiver<OperatorCommand>`.
pub fn take_command_receiver(&mut self) -> Option<mpsc::Receiver<OperatorCommand>> {
@@ -118,9 +155,10 @@ impl TelemetryStream {
)> {
let listen_addr = self.config.listen_addr;
let publisher = Arc::clone(&self.publisher);
let video = Arc::clone(&self.video);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher, video));
let join = tokio::spawn(async move {
Server::builder()
.add_service(svc)
@@ -156,8 +194,9 @@ impl TelemetryStream {
let stream = tokio_stream::wrappers::TcpListenerStream::new(tokio_listener);
let publisher = Arc::clone(&self.publisher);
let video = Arc::clone(&self.video);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher, video));
let join = tokio::spawn(async move {
Server::builder()
@@ -202,6 +241,7 @@ impl Drop for GrpcShutdown {
#[derive(Clone)]
pub struct TelemetryStreamHandle {
publisher: Arc<TelemetryPublisher>,
video: Arc<VideoPublisher>,
commands_tx: mpsc::Sender<OperatorCommand>,
}
@@ -216,6 +256,16 @@ impl TelemetryStreamHandle {
self.publisher.publish(topic, payload)
}
/// AZ-677 — broadcast a MapObjectsDiff to operators subscribed to
/// the MapObjectsBundle topic. Fed by the composition root that
/// owns the `mapobjects_store` append stream.
pub fn push_mapobjects_diff(
&self,
diff: MapObjectsDiff,
) -> std::result::Result<(), PublishError> {
self.publisher.publish_mapobjects_diff(diff)
}
/// Inject an operator command downlink. Production path is fed
/// by the gRPC return half once AZ-678 lands; tests may call this
/// directly.
@@ -230,8 +280,14 @@ impl TelemetryStreamHandle {
self.publisher.snapshot()
}
pub fn video_snapshot(&self) -> VideoSnapshot {
self.video.snapshot()
}
pub fn health(&self) -> ComponentHealth {
let snap = self.publisher.snapshot();
let vsnap = self.video.snapshot();
let (resnap, diff_count, snap_bytes) = self.publisher.mapobjects_counters();
let mut h = ComponentHealth::green(NAME);
let hot_drops: Vec<_> = snap
@@ -241,10 +297,20 @@ impl TelemetryStreamHandle {
.collect();
let detail = format!(
"subscribers={} published_total={} hot_drop_pairs={}",
"subscribers={} published_total={} hot_drop_pairs={} \
video_path={} ai_locked={} video_sessions={} \
bytes_inline_drops={} mapobjects_snapshot_bytes={} \
mapobjects_diff_count={} mapobjects_resnap_count={}",
snap.subscribed_clients,
snap.published_total,
hot_drops.len()
hot_drops.len(),
vsnap.mode,
vsnap.ai_locked,
vsnap.video_session_count,
vsnap.bytes_inline_drops_total,
snap_bytes,
diff_count,
resnap,
);
if !hot_drops.is_empty() {
@@ -257,10 +323,13 @@ impl TelemetryStreamHandle {
#[async_trait]
impl TelemetrySink for TelemetryStreamHandle {
async fn push_frame(&self, _frame: Frame) -> Result<()> {
Err(AutopilotError::NotImplemented(
"telemetry_stream::push_frame (AZ-676 video path)",
))
async fn push_frame(&self, frame: Frame) -> Result<()> {
// AZ-676 — bytes_inline path. In rtsp_forward mode the
// publisher returns early; the call is intentionally
// infallible so frame_ingest can always push without
// branching on configuration.
self.video.publish_frame(&frame);
Ok(())
}
async fn push_detections(&self, batch: DetectionBatch) -> Result<()> {
@@ -268,11 +337,20 @@ impl TelemetrySink for TelemetryStreamHandle {
.publish(Topic::DetectionEvent, &batch)
.map_err(|e| AutopilotError::Internal(format!("publish detections: {e}")))
}
async fn push_operator_event(&self, event: OperatorEvent) -> Result<()> {
// AZ-679 — serialised onto Topic::OperatorEvent. JSON payload
// is the tagged enum (`kind: poi_surfaced | poi_dequeued`).
self.publisher
.publish(Topic::OperatorEvent, &event)
.map_err(|e| AutopilotError::Internal(format!("publish operator event: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
#[test]
fn handle_starts_with_zero_subscribers_and_green_health() {
@@ -306,4 +384,86 @@ mod tests {
// Assert
assert_eq!(h.snapshot().per_topic[&Topic::TelemetrySample].published, 1);
}
#[test]
fn ai_locked_handle_starts_false() {
// Arrange
let s = TelemetryStream::new(8);
// Act
let flag = s.ai_locked_handle();
// Assert
assert!(!flag.load(Ordering::Acquire));
assert!(!s.handle().video_snapshot().ai_locked);
}
#[test]
fn push_frame_bytes_inline_counts_in_video_snapshot() {
// Arrange
let cfg = TelemetryStreamConfig {
video_path: VideoPath::BytesInline,
..TelemetryStreamConfig::default()
};
let s = TelemetryStream::with_config(cfg);
let h = s.handle();
let f = Frame {
seq: 1,
capture_ts_monotonic_ns: 1,
decode_ts_monotonic_ns: 2,
pixels: Arc::new(bytes::Bytes::from(vec![0u8; 32])),
width: 4,
height: 4,
pix_fmt: shared::models::frame::PixelFormat::Nv12,
ai_locked: false,
};
// Act
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
h.push_frame(f).await.unwrap();
});
// Assert
assert_eq!(h.video_snapshot().published_frames, 1);
}
#[test]
fn push_frame_rtsp_forward_does_not_count() {
// Arrange
let cfg = TelemetryStreamConfig {
video_path: VideoPath::RtspForward {
url: "rtsp://x".to_string(),
},
..TelemetryStreamConfig::default()
};
let s = TelemetryStream::with_config(cfg);
let h = s.handle();
let f = Frame {
seq: 1,
capture_ts_monotonic_ns: 1,
decode_ts_monotonic_ns: 2,
pixels: Arc::new(bytes::Bytes::from(vec![0u8; 32])),
width: 4,
height: 4,
pix_fmt: shared::models::frame::PixelFormat::Nv12,
ai_locked: false,
};
// Act
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
h.push_frame(f).await.unwrap();
});
// Assert
assert_eq!(h.video_snapshot().published_frames, 0);
assert_eq!(h.video_snapshot().mode, "rtsp_forward");
}
}