//! `mavlink_layer` — hand-rolled MAVLink v2 transport + codec. //! //! Public surface (per `module-layout.md`): //! - [`MavlinkLayer`] — actor; runs the open / reconnect loop and the //! per-link read+write loop. //! - [`MavlinkHandle`] — clonable handle; lets callers send outbound //! messages, subscribe to inbound messages, subscribe to link events, and //! inspect health. //! - [`MavlinkConnection`] — typed URI wrapper used by callers that want the //! stricter form. [`MavlinkLayerOptions`] is the constructor argument. //! - Codec types (`MavlinkMessage`, the per-message structs) re-exported //! from `internal::codec`. //! //! Real implementation tasks: AZ-641 (transport + heartbeat), AZ-642 (codec), //! AZ-643 (ack demux + signing — future). mod internal; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use tokio::sync::{broadcast, mpsc, watch}; use shared::contracts::MavlinkSink; use shared::error::{AutopilotError, Result}; use shared::health::ComponentHealth; pub use internal::codec::{ Attitude, CommandAck, CommandLong, Decoder, DecoderEvent, Encoder, ExtendedSysState, GlobalPositionInt, Heartbeat, MavlinkMessage, MavlinkParseError, MissionAck, MissionClearAll, MissionCount, MissionCurrent, MissionItemInt, MissionItemReached, MissionRequestInt, MissionSetCurrent, ParseErrorKind, ParseErrors, SetMode, StatusText, SysStatus, }; pub use internal::heartbeat::LinkEvent; pub use internal::uri::{ConnectionUri, DEFAULT_SERIAL_BAUD}; use internal::codec::parse_errors::ParseErrorsSnapshot; use internal::heartbeat::{heartbeat_period, make_outbound_heartbeat, InboundWatchdog}; use internal::retry::ExponentialBackoff; use internal::transport::serial::SerialTransport; use internal::transport::udp::UdpTransport; use internal::transport::Transport; const NAME: &str = "mavlink_layer"; /// Default outbound channel capacity (frames). const OUTBOUND_CHAN_CAP: usize = 64; /// Default inbound broadcast capacity. const INBOUND_CHAN_CAP: usize = 256; /// Connection descriptor — the URI string a caller would put in TOML. #[derive(Debug, Clone)] pub struct MavlinkConnection { pub uri: String, } impl MavlinkConnection { pub fn new(uri: impl Into) -> Self { Self { uri: uri.into() } } } /// Tunables for the MAVLink actor. Defaults follow AZ-641 §NFR. #[derive(Debug, Clone)] pub struct MavlinkLayerOptions { pub connection: MavlinkConnection, /// MAVLink sysid this process advertises (default 1). pub sysid: u8, /// MAVLink compid this process advertises (default 191 = MAV_COMP_ID_ONBOARD_COMPUTER). pub compid: u8, /// Wall-clock budget without an inbound HEARTBEAT before `LinkLost` fires. pub link_timeout: Duration, /// Cap for the open-loop exponential backoff. pub reconnect_cap: Duration, /// Base delay for the open-loop exponential backoff. pub reconnect_base: Duration, /// MAVLink-2 signing flag; plumbed through to health, not enforced here /// (AZ-643 owns the signing path). pub signing_enabled: bool, } impl MavlinkLayerOptions { pub fn new(connection: MavlinkConnection) -> Self { Self { connection, sysid: 1, compid: 191, link_timeout: Duration::from_millis(internal::heartbeat::DEFAULT_LINK_TIMEOUT_MS), reconnect_cap: Duration::from_secs(5), reconnect_base: Duration::from_millis(100), signing_enabled: false, } } } #[derive(Debug, Clone)] pub struct InboundMessage { pub sysid: u8, pub compid: u8, pub seq: u8, pub message: MavlinkMessage, } #[derive(Debug)] enum OutboundItem { Message(MavlinkMessage), RawFrame(Vec), } #[derive(Debug)] struct LinkState { encoder: Encoder, parse_errors: Arc, watchdog: Arc, inbound: broadcast::Sender, connected: AtomicBool, signing_enabled: bool, } /// Long-running actor that owns the transport, reconnect loop, and codec. pub struct MavlinkLayer { options: MavlinkLayerOptions, outbound_rx: mpsc::Receiver, state: Arc, } /// Clonable handle to a running `MavlinkLayer`. #[derive(Debug, Clone)] pub struct MavlinkHandle { outbound_tx: mpsc::Sender, state: Arc, } impl MavlinkLayer { /// Build the layer + handle pair. The layer is **not** yet running — /// callers must spawn [`MavlinkLayer::run`] from a tokio task. pub fn new(options: MavlinkLayerOptions) -> (Self, MavlinkHandle) { let (tx, rx) = mpsc::channel(OUTBOUND_CHAN_CAP); let (inbound_tx, _inbound_rx) = broadcast::channel(INBOUND_CHAN_CAP); let (watchdog, _link_rx) = InboundWatchdog::new(options.link_timeout.as_millis() as u64); let state = Arc::new(LinkState { encoder: Encoder::new(options.sysid, options.compid), parse_errors: Arc::new(ParseErrors::new()), watchdog, inbound: inbound_tx, connected: AtomicBool::new(false), signing_enabled: options.signing_enabled, }); let layer = Self { options, outbound_rx: rx, state: state.clone(), }; let handle = MavlinkHandle { outbound_tx: tx, state, }; (layer, handle) } /// Run the open / reconnect loop until `shutdown` flips to `true`. pub async fn run(mut self, mut shutdown: watch::Receiver) -> Result<()> { let uri = ConnectionUri::parse(&self.options.connection.uri)?; let mut backoff = ExponentialBackoff::new(self.options.reconnect_base, self.options.reconnect_cap); loop { if *shutdown.borrow() { tracing::info!(component = NAME, "shutdown received before transport open"); return Ok(()); } let open_result = open_transport(&uri).await; let mut transport: Box = match open_result { Ok(t) => { backoff.reset(); self.state.connected.store(true, Ordering::SeqCst); tracing::info!(component = NAME, uri = %self.options.connection.uri, "mavlink transport opened"); t } Err(e) => { let delay = backoff.next_delay(); tracing::warn!( component = NAME, error = %e, attempts = backoff.attempts(), backoff_ms = delay.as_millis() as u64, "mavlink transport open failed; retrying" ); self.state.connected.store(false, Ordering::SeqCst); tokio::select! { _ = tokio::time::sleep(delay) => {} _ = shutdown.changed() => return Ok(()), } continue; } }; let outcome = self.run_link(&mut *transport, &mut shutdown).await; self.state.connected.store(false, Ordering::SeqCst); match outcome { LinkOutcome::Shutdown => return Ok(()), LinkOutcome::TransportLost(reason) => { tracing::warn!(component = NAME, reason = %reason, "mavlink transport lost; reconnecting"); } } } } async fn run_link( &mut self, transport: &mut dyn Transport, shutdown: &mut watch::Receiver, ) -> LinkOutcome { let mut decoder = Decoder::new(); let mut heartbeat_tick = tokio::time::interval(heartbeat_period()); heartbeat_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut watchdog_tick = tokio::time::interval(Duration::from_millis(200)); watchdog_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut read_buf = vec![0u8; 4 * 1024]; let mut pending_outbound: Vec> = Vec::new(); let mut wants_heartbeat = false; loop { tokio::select! { biased; _ = shutdown.changed() => return LinkOutcome::Shutdown, _ = heartbeat_tick.tick() => { wants_heartbeat = true; } _ = watchdog_tick.tick() => { self.state.watchdog.maybe_trip_link_lost(); } msg = self.outbound_rx.recv() => { match msg { Some(OutboundItem::Message(m)) => { let bytes = self.state.encoder.encode(&m); pending_outbound.push(bytes); } Some(OutboundItem::RawFrame(bytes)) => pending_outbound.push(bytes), None => return LinkOutcome::Shutdown, } } read = transport.read(&mut read_buf) => { match read { Ok(0) => return LinkOutcome::TransportLost("eof".into()), Ok(n) => { let events = decoder.feed(&read_buf[..n]); for ev in events { self.process_decoder_event(ev); } // Mirror decoder errors into the layer's own counters. let snap = decoder.errors.snapshot(); let _ = snap; // counters are owned by the decoder; surfaced via health } Err(e) => return LinkOutcome::TransportLost(format!("read: {e}")), } } } if wants_heartbeat { wants_heartbeat = false; let frame = self.state.encoder.encode(&make_outbound_heartbeat()); if let Err(e) = transport.write_all(&frame).await { return LinkOutcome::TransportLost(format!("write heartbeat: {e}")); } self.state.watchdog.note_outbound_heartbeat(); } while let Some(bytes) = pending_outbound.pop() { if let Err(e) = transport.write_all(&bytes).await { return LinkOutcome::TransportLost(format!("write: {e}")); } } } } fn process_decoder_event(&self, ev: DecoderEvent) { match ev { DecoderEvent::Message { sysid, compid, seq, message, } => { if matches!(message, MavlinkMessage::Heartbeat(_)) { self.state.watchdog.note_inbound_heartbeat(); } let _ = self.state.inbound.send(InboundMessage { sysid, compid, seq, message, }); } DecoderEvent::Crc { msg_id, seq } => { self.state.parse_errors.record(ParseErrorKind::Crc); tracing::warn!(component = NAME, msg_id, seq, "mavlink crc mismatch"); } DecoderEvent::UnknownId { msg_id, seq } => { self.state.parse_errors.record(ParseErrorKind::UnknownId); tracing::warn!(component = NAME, msg_id, seq, "mavlink unknown message id"); } DecoderEvent::InvalidPayload { msg_id, seq, reason, } => { self.state .parse_errors .record(ParseErrorKind::InvalidPayload); tracing::warn!( component = NAME, msg_id, seq, reason, "mavlink invalid payload" ); } DecoderEvent::SequenceGap { sysid, compid, expected, actual, } => { self.state.parse_errors.record(ParseErrorKind::SequenceGap); tracing::warn!( component = NAME, sysid, compid, expected, actual, "mavlink sequence gap" ); } } } } async fn open_transport(uri: &ConnectionUri) -> Result> { match uri { ConnectionUri::Udp { host, port } => { let t = UdpTransport::connect(&format!("{host}:{port}")).await?; Ok(Box::new(t)) } ConnectionUri::Serial { path, baud } => { let t = SerialTransport::open(path, *baud)?; Ok(Box::new(t)) } } } #[derive(Debug)] enum LinkOutcome { Shutdown, TransportLost(String), } impl MavlinkHandle { /// Send a typed MAVLink message — encoded with the actor's sysid/compid /// and the next outbound sequence number. pub async fn send(&self, msg: MavlinkMessage) -> Result<()> { self.outbound_tx .send(OutboundItem::Message(msg)) .await .map_err(|e| AutopilotError::Internal(format!("mavlink send: channel closed ({e})"))) } /// Send already-framed bytes verbatim. Used by callers that maintain /// their own encoder (e.g. tests, or external supervisors that bridge a /// second MAVLink endpoint). pub async fn send_raw_bytes(&self, frame: Vec) -> Result<()> { self.outbound_tx .send(OutboundItem::RawFrame(frame)) .await .map_err(|e| { AutopilotError::Internal(format!("mavlink send_raw: channel closed ({e})")) }) } pub fn subscribe_inbound(&self) -> broadcast::Receiver { self.state.inbound.subscribe() } pub fn subscribe_link_events(&self) -> broadcast::Receiver { self.state.watchdog.subscribe() } pub fn parse_errors(&self) -> ParseErrorsSnapshot { self.state.parse_errors.snapshot() } pub fn health(&self) -> ComponentHealth { let connected = self.state.connected.load(Ordering::Relaxed); let age = self.state.watchdog.last_inbound_age_ms(); let detail = format!( "connected={connected} last_heartbeat_age_ms={} signing_enabled={} outbound={} parse_errors={}", age.map(|m| m.to_string()).unwrap_or_else(|| "none".into()), self.state.signing_enabled, self.state.watchdog.outbound_total(), self.parse_errors().total(), ); if !connected { ComponentHealth::red(NAME, detail) } else if !self.state.watchdog.link_up() { ComponentHealth::yellow(NAME, detail) } else { ComponentHealth::green(NAME) } } } #[async_trait] impl MavlinkSink for MavlinkHandle { async fn send_raw(&self, msg: Vec) -> Result<()> { MavlinkHandle::send_raw_bytes(self, msg).await } } #[cfg(test)] mod tests { use super::*; #[test] fn handle_health_is_red_when_never_connected() { // Arrange / Act let (_layer, handle) = MavlinkLayer::new(MavlinkLayerOptions::new(MavlinkConnection::new( "udp://127.0.0.1:14550", ))); // Assert let h = handle.health(); assert_eq!(h.level, shared::health::HealthLevel::Red); } #[test] fn handle_clones() { // Arrange let (_layer, h) = MavlinkLayer::new(MavlinkLayerOptions::new(MavlinkConnection::new( "udp://127.0.0.1:14550", ))); // Act let h2 = h.clone(); // Assert assert_eq!(h.health().level, h2.health().level); } }