//! AZ-675 — gRPC `TelemetryStream::Subscribe` service implementation. //! //! The client sends a single `SubscribeRequest`; the server returns a //! server-streaming response built directly from per-topic //! `BroadcastStream`s merged with `StreamMap`. The tonic transport //! is what polls our stream — when the wire (or the operator client) //! cannot keep up, the broadcast ring overflows that client's cursor //! and `BroadcastStream` yields `Err(BroadcastStreamRecvError::Lagged(n))` //! on the next poll. That is the *only* place drop accounting //! happens: there is no intermediate mpsc buffer that could absorb //! back-pressure and hide lag. //! //! `StreamGuard` decrements `subscribed_clients` on stream drop. use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::{Stream, StreamExt, StreamMap}; use tonic::{Request, Response, Status}; use tracing::{info, warn}; use crate::internal::proto::telemetry_stream_server::TelemetryStream; use crate::internal::proto::{SubscribeRequest, TelemetryMessage, Topic}; use crate::internal::publisher::{TelemetryPublisher, ALL_TOPICS}; pub struct TelemetryService { publisher: Arc, } impl TelemetryService { pub fn new(publisher: Arc) -> Self { Self { publisher } } } type SubscribeStream = Pin> + Send>>; #[tonic::async_trait] impl TelemetryStream for TelemetryService { type SubscribeStream = SubscribeStream; async fn subscribe( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); if req.client_id.trim().is_empty() { return Err(Status::invalid_argument("client_id is required")); } let client_id = req.client_id.clone(); let requested: Vec = if req.topics.is_empty() { ALL_TOPICS.to_vec() } else { let mut out = Vec::with_capacity(req.topics.len()); for raw in &req.topics { let t = Topic::try_from(*raw) .map_err(|_| Status::invalid_argument(format!("unknown topic {raw}")))?; if matches!(t, Topic::Unspecified) { return Err(Status::invalid_argument("TOPIC_UNSPECIFIED not allowed")); } out.push(t); } out }; let mut map: StreamMap> = StreamMap::new(); for &t in &requested { match self.publisher.subscribe_topic(t) { Some(rx) => { map.insert(t, BroadcastStream::new(rx)); } None => { return Err(Status::failed_precondition(format!( "topic {t:?} not registered" ))) } } } self.publisher.register_client(); info!(client_id = %client_id, topics = ?requested, "telemetry subscribe"); let publisher = Arc::clone(&self.publisher); let cid = client_id.clone(); let stream = map.filter_map(move |(topic, item)| match item { Ok(msg) => Some(Ok(msg)), Err(BroadcastStreamRecvError::Lagged(n)) => { warn!(client_id = %cid, ?topic, dropped = n, "slow client lagged"); publisher.record_drops(&cid, topic, n); None } }); let stream = StreamGuard { inner: stream, publisher: Arc::clone(&self.publisher), }; Ok(Response::new(Box::pin(stream) as Self::SubscribeStream)) } } /// Decrement `subscribed_clients` when the per-client outbound /// stream is dropped (tonic drops the stream when the client side /// goes away). struct StreamGuard { inner: S, publisher: Arc, } impl Stream for StreamGuard { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) } } impl Drop for StreamGuard { fn drop(&mut self) { self.publisher.deregister_client(); } }