//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink. //! //! Real implementation lands in: //! - AZ-675 `telemetry_stream_grpc_server` //! - AZ-676 `telemetry_stream_video_path` //! - AZ-677 `telemetry_stream_mapobjects_snapshot` use async_trait::async_trait; use tokio::sync::mpsc; use shared::contracts::TelemetrySink; use shared::error::{AutopilotError, Result}; use shared::health::ComponentHealth; use shared::models::detection::DetectionBatch; use shared::models::frame::Frame; use shared::models::operator::OperatorCommand; const NAME: &str = "telemetry_stream"; pub struct TelemetryStream { commands_tx: mpsc::Sender, commands_rx: Option>, } impl TelemetryStream { pub fn new(downlink_capacity: usize) -> Self { let (commands_tx, commands_rx) = mpsc::channel(downlink_capacity); Self { commands_tx, commands_rx: Some(commands_rx), } } pub fn handle(&self) -> TelemetryStreamHandle { TelemetryStreamHandle { commands_tx: self.commands_tx.clone(), } } /// Take the downlink command receiver. The composition root forwards it to /// `operator_bridge` as `Receiver`. pub fn take_command_receiver(&mut self) -> Option> { self.commands_rx.take() } } #[derive(Clone)] pub struct TelemetryStreamHandle { commands_tx: mpsc::Sender, } impl TelemetryStreamHandle { /// Inject an operator command. Production path is fed by the downlink /// receiver in `internal::downlink/*`; tests can call this directly. pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> { self.commands_tx .send(command) .await .map_err(|_| AutopilotError::Internal("downlink channel closed".into())) } pub fn health(&self) -> ComponentHealth { ComponentHealth::disabled(NAME) } } #[async_trait] impl TelemetrySink for TelemetryStreamHandle { async fn push_frame(&self, _frame: Frame) -> Result<()> { Err(AutopilotError::NotImplemented( "telemetry_stream::push_frame (AZ-676)", )) } async fn push_detections(&self, _batch: DetectionBatch) -> Result<()> { Err(AutopilotError::NotImplemented( "telemetry_stream::push_detections (AZ-675)", )) } } #[cfg(test)] mod tests { use super::*; #[test] fn it_compiles() { let h = TelemetryStream::new(8).handle(); assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); } }