//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink. //! //! Real implementations: //! - **AZ-675**: Tonic gRPC server, per-client bounded queue, //! drop-oldest back-pressure, drop counters. Topics: //! `TelemetrySample`, `GimbalState`, `DetectionEvent`, //! `MovementCandidate`, `MapObjectsBundle`. //! - **AZ-676** (this crate, this batch): operator video path — two //! modes (`RtspForward { url }`, `BytesInline`) plus shared //! `ai_locked` atomic flipped by SubscribeVideo session counter. //! - **AZ-677** (this crate, this batch): MapObjectsBundle snapshot //! on subscribe + diff stream while connected + fresh snapshot on //! reconnect (no diff replay). //! - **AZ-678+**: command-auth on the return path (operator_bridge). pub mod internal; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tonic::transport::Server; use shared::contracts::TelemetrySink; use shared::error::{AutopilotError, Result}; use shared::health::{ComponentHealth, HealthLevel}; use shared::models::detection::DetectionBatch; use shared::models::frame::Frame; use shared::models::operator::OperatorCommand; use shared::models::operator_event::OperatorEvent; use crate::internal::mapobjects::{MapObjectsDiff, SharedSnapshotSource}; use crate::internal::proto::telemetry_stream_server::TelemetryStreamServer; use crate::internal::proto::Topic; use crate::internal::publisher::{TelemetryPublisher, DEFAULT_TOPIC_CAPACITY}; use crate::internal::server::TelemetryService; use crate::internal::video::{VideoPath, VideoPublisher, DEFAULT_VIDEO_CAPACITY}; pub use crate::internal::mapobjects::{ EmptyMapObjectsSource, MapObjectsBundleSnapshot, MapObjectsSnapshotSource, MapObjectsTopicMessage, }; pub use crate::internal::proto::{ telemetry_stream_client::TelemetryStreamClient, video_message, SubscribeRequest, SubscribeVideoRequest, TelemetryMessage, Topic as TelemetryTopic, VideoFrame, VideoMessage, VideoMode, VideoSessionStart, }; pub use crate::internal::publisher::{ PerTopicCounters, PublishError, PublisherSnapshot, ALL_TOPICS, }; pub use crate::internal::video::{VideoSnapshot, DEFAULT_VIDEO_CAPACITY as VIDEO_DEFAULT_CAPACITY}; const NAME: &str = "telemetry_stream"; /// Per-(client, topic) drop rate at or above which health flips to /// yellow. Picked to surface persistent slow consumers without /// flapping on a single transient lag spike. const DROP_YELLOW_THRESHOLD: u64 = 100; #[derive(Debug, Clone)] pub struct TelemetryStreamConfig { /// Where the Tonic gRPC server binds. `0.0.0.0:50061` by default. pub listen_addr: SocketAddr, /// Per-topic broadcast capacity (per subscriber buffer). pub topic_capacity: usize, /// Bounded capacity of the downlink command channel that feeds /// `operator_bridge`. pub downlink_capacity: usize, /// AZ-676 — video delivery mode + per-client video broadcast /// capacity. pub video_path: VideoPath, pub video_capacity: usize, } impl Default for TelemetryStreamConfig { fn default() -> Self { Self { listen_addr: "0.0.0.0:50061".parse().expect("hardcoded addr parses"), topic_capacity: DEFAULT_TOPIC_CAPACITY, downlink_capacity: 64, video_path: VideoPath::default(), video_capacity: DEFAULT_VIDEO_CAPACITY, } } } pub struct TelemetryStream { publisher: Arc, video: Arc, commands_tx: mpsc::Sender, commands_rx: Option>, config: TelemetryStreamConfig, } impl TelemetryStream { pub fn new(downlink_capacity: usize) -> Self { Self::with_config(TelemetryStreamConfig { downlink_capacity, ..TelemetryStreamConfig::default() }) } pub fn with_config(config: TelemetryStreamConfig) -> Self { let publisher = TelemetryPublisher::new(config.topic_capacity); let video = VideoPublisher::new(config.video_path.clone(), config.video_capacity); let (commands_tx, commands_rx) = mpsc::channel(config.downlink_capacity); Self { publisher, video, commands_tx, commands_rx: Some(commands_rx), config, } } pub fn handle(&self) -> TelemetryStreamHandle { TelemetryStreamHandle { publisher: Arc::clone(&self.publisher), video: Arc::clone(&self.video), commands_tx: self.commands_tx.clone(), } } /// AZ-676 — handle on the shared `ai_locked` atomic. /// `frame_ingest` and `detection_client` read this at decode and /// inference time. The composition root must call this and feed /// the result into their constructors. pub fn ai_locked_handle(&self) -> Arc { self.video.ai_locked_handle() } /// AZ-677 — wire the snapshot source. The composition root passes /// an adapter over `mapobjects_store::MapObjectsStore::snapshot()`. pub fn set_mapobjects_snapshot_source(&self, src: SharedSnapshotSource) { self.publisher.set_snapshot_source(src); } /// Take the downlink command receiver. The composition root /// forwards it to `operator_bridge` as `Receiver`. pub fn take_command_receiver(&mut self) -> Option> { self.commands_rx.take() } /// Spawn the Tonic server. Returns a JoinHandle that runs until /// `shutdown` is signalled (closing the returned `shutdown_tx`). /// The server is bound on `config.listen_addr`. pub fn spawn_grpc_server( &self, ) -> Result<( JoinHandle>, GrpcShutdown, )> { let listen_addr = self.config.listen_addr; let publisher = Arc::clone(&self.publisher); let video = Arc::clone(&self.video); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let svc = TelemetryStreamServer::new(TelemetryService::new(publisher, video)); let join = tokio::spawn(async move { Server::builder() .add_service(svc) .serve_with_shutdown(listen_addr, async move { let _ = shutdown_rx.await; }) .await }); Ok(( join, GrpcShutdown { tx: Some(shutdown_tx), }, )) } /// Spawn the Tonic server bound on a specific `TcpListener`. /// Useful for tests that need to know the actual port ahead of /// time (bind to `127.0.0.1:0` then read the assigned port). pub fn spawn_grpc_server_on( &self, listener: std::net::TcpListener, ) -> Result<( JoinHandle>, GrpcShutdown, )> { listener .set_nonblocking(true) .map_err(|e| AutopilotError::Internal(format!("set_nonblocking: {e}")))?; let tokio_listener = tokio::net::TcpListener::from_std(listener) .map_err(|e| AutopilotError::Internal(format!("TcpListener::from_std: {e}")))?; let stream = tokio_stream::wrappers::TcpListenerStream::new(tokio_listener); let publisher = Arc::clone(&self.publisher); let video = Arc::clone(&self.video); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let svc = TelemetryStreamServer::new(TelemetryService::new(publisher, video)); let join = tokio::spawn(async move { Server::builder() .add_service(svc) .serve_with_incoming_shutdown(stream, async move { let _ = shutdown_rx.await; }) .await }); Ok(( join, GrpcShutdown { tx: Some(shutdown_tx), }, )) } } /// RAII shutdown trigger for the spawned gRPC server. Drop the value /// or call `shutdown()` to stop the server. pub struct GrpcShutdown { tx: Option>, } impl GrpcShutdown { pub fn shutdown(mut self) { if let Some(tx) = self.tx.take() { let _ = tx.send(()); } } } impl Drop for GrpcShutdown { fn drop(&mut self) { if let Some(tx) = self.tx.take() { let _ = tx.send(()); } } } #[derive(Clone)] pub struct TelemetryStreamHandle { publisher: Arc, video: Arc, commands_tx: mpsc::Sender, } impl TelemetryStreamHandle { /// Publish a payload on `topic`. Never blocks the caller; slow /// subscribers experience drops accounted in [`snapshot`]. pub fn publish( &self, topic: TelemetryTopic, payload: &T, ) -> std::result::Result<(), PublishError> { self.publisher.publish(topic, payload) } /// AZ-677 — broadcast a MapObjectsDiff to operators subscribed to /// the MapObjectsBundle topic. Fed by the composition root that /// owns the `mapobjects_store` append stream. pub fn push_mapobjects_diff( &self, diff: MapObjectsDiff, ) -> std::result::Result<(), PublishError> { self.publisher.publish_mapobjects_diff(diff) } /// Inject an operator command downlink. Production path is fed /// by the gRPC return half once AZ-678 lands; tests may call this /// directly. pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> { self.commands_tx .send(command) .await .map_err(|_| AutopilotError::Internal("downlink channel closed".into())) } pub fn snapshot(&self) -> PublisherSnapshot { self.publisher.snapshot() } pub fn video_snapshot(&self) -> VideoSnapshot { self.video.snapshot() } pub fn health(&self) -> ComponentHealth { let snap = self.publisher.snapshot(); let vsnap = self.video.snapshot(); let (resnap, diff_count, snap_bytes) = self.publisher.mapobjects_counters(); let mut h = ComponentHealth::green(NAME); let hot_drops: Vec<_> = snap .drops_total .iter() .filter(|(_, &v)| v >= DROP_YELLOW_THRESHOLD) .collect(); let detail = format!( "subscribers={} published_total={} hot_drop_pairs={} \ video_path={} ai_locked={} video_sessions={} \ bytes_inline_drops={} mapobjects_snapshot_bytes={} \ mapobjects_diff_count={} mapobjects_resnap_count={}", snap.subscribed_clients, snap.published_total, hot_drops.len(), vsnap.mode, vsnap.ai_locked, vsnap.video_session_count, vsnap.bytes_inline_drops_total, snap_bytes, diff_count, resnap, ); if !hot_drops.is_empty() { h.level = HealthLevel::Yellow; } h.detail = Some(detail); h } } #[async_trait] impl TelemetrySink for TelemetryStreamHandle { async fn push_frame(&self, frame: Frame) -> Result<()> { // AZ-676 — bytes_inline path. In rtsp_forward mode the // publisher returns early; the call is intentionally // infallible so frame_ingest can always push without // branching on configuration. self.video.publish_frame(&frame); Ok(()) } async fn push_detections(&self, batch: DetectionBatch) -> Result<()> { self.publisher .publish(Topic::DetectionEvent, &batch) .map_err(|e| AutopilotError::Internal(format!("publish detections: {e}"))) } async fn push_operator_event(&self, event: OperatorEvent) -> Result<()> { // AZ-679 — serialised onto Topic::OperatorEvent. JSON payload // is the tagged enum (`kind: poi_surfaced | poi_dequeued`). self.publisher .publish(Topic::OperatorEvent, &event) .map_err(|e| AutopilotError::Internal(format!("publish operator event: {e}"))) } } #[cfg(test)] mod tests { use super::*; use std::sync::atomic::Ordering; #[test] fn handle_starts_with_zero_subscribers_and_green_health() { // Arrange let s = TelemetryStream::new(8); let h = s.handle(); // Act let snap = h.snapshot(); let health = h.health(); // Assert assert_eq!(snap.subscribed_clients, 0); assert_eq!(snap.published_total, 0); assert_eq!(health.level, HealthLevel::Green); } #[test] fn publish_without_subscribers_is_no_op_but_counts() { // Arrange let s = TelemetryStream::new(8); let h = s.handle(); // Act h.publish( TelemetryTopic::TelemetrySample, &serde_json::json!({"v": 1}), ) .unwrap(); // Assert assert_eq!(h.snapshot().per_topic[&Topic::TelemetrySample].published, 1); } #[test] fn ai_locked_handle_starts_false() { // Arrange let s = TelemetryStream::new(8); // Act let flag = s.ai_locked_handle(); // Assert assert!(!flag.load(Ordering::Acquire)); assert!(!s.handle().video_snapshot().ai_locked); } #[test] fn push_frame_bytes_inline_counts_in_video_snapshot() { // Arrange let cfg = TelemetryStreamConfig { video_path: VideoPath::BytesInline, ..TelemetryStreamConfig::default() }; let s = TelemetryStream::with_config(cfg); let h = s.handle(); let f = Frame { seq: 1, capture_ts_monotonic_ns: 1, decode_ts_monotonic_ns: 2, pixels: Arc::new(bytes::Bytes::from(vec![0u8; 32])), width: 4, height: 4, pix_fmt: shared::models::frame::PixelFormat::Nv12, ai_locked: false, }; // Act let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async { h.push_frame(f).await.unwrap(); }); // Assert assert_eq!(h.video_snapshot().published_frames, 1); } #[test] fn push_frame_rtsp_forward_does_not_count() { // Arrange let cfg = TelemetryStreamConfig { video_path: VideoPath::RtspForward { url: "rtsp://x".to_string(), }, ..TelemetryStreamConfig::default() }; let s = TelemetryStream::with_config(cfg); let h = s.handle(); let f = Frame { seq: 1, capture_ts_monotonic_ns: 1, decode_ts_monotonic_ns: 2, pixels: Arc::new(bytes::Bytes::from(vec![0u8; 32])), width: 4, height: 4, pix_fmt: shared::models::frame::PixelFormat::Nv12, ai_locked: false, }; // Act let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async { h.push_frame(f).await.unwrap(); }); // Assert assert_eq!(h.video_snapshot().published_frames, 0); assert_eq!(h.video_snapshot().mode, "rtsp_forward"); } }