//! AZ-643 — ack-demux integration tests (AC-1 happy path, AC-2 timeout). //! //! A fake UDP peer either acks immediately or stays silent; the autopilot side //! issues `send_command(...)` and asserts on the resolution. use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::watch; use tokio::time::timeout; use mavlink_layer::{ CommandAck, CommandLong, Decoder, DecoderEvent, Encoder, Heartbeat, MavlinkConnection, MavlinkLayer, MavlinkLayerOptions, MavlinkMessage, SendCommandError, }; const MAV_CMD_NAV_RETURN_TO_LAUNCH: u16 = 20; const MAV_RESULT_ACCEPTED: u8 = 0; const SHORT_TIMEOUT_MS: u64 = 250; fn options_for(uri: String, link_timeout_ms: u64) -> MavlinkLayerOptions { let mut o = MavlinkLayerOptions::new(MavlinkConnection::new(uri)); o.link_timeout = Duration::from_millis(link_timeout_ms); o.reconnect_base = Duration::from_millis(50); o.reconnect_cap = Duration::from_millis(200); // Keep the ack deadline tight so AC-2 finishes fast. o.command_ack_deadline = Duration::from_millis(500); o } async fn drain_first_heartbeat_addr(peer: &UdpSocket) -> std::net::SocketAddr { let mut buf = vec![0u8; 1024]; let (_, layer_addr) = timeout(Duration::from_secs(2), peer.recv_from(&mut buf)) .await .expect("first heartbeat must arrive") .expect("udp recv_from"); layer_addr } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ac1_send_command_happy_path() { // Arrange: a peer that acks any inbound COMMAND_LONG promptly. let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer"); let peer_addr = peer.local_addr().expect("peer addr").to_string(); let (_shutdown_tx, shutdown_rx) = watch::channel(false); let (layer, handle) = MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS)); tokio::spawn(layer.run(shutdown_rx)); // Capture the layer's source address from its first heartbeat. let layer_addr = drain_first_heartbeat_addr(&peer).await; let peer_enc = Encoder::new(2, 1); // Peer task: on every inbound COMMAND_LONG, reply with COMMAND_ACK. let peer_arc = std::sync::Arc::new(peer); let peer_for_task = peer_arc.clone(); tokio::spawn(async move { let mut dec = Decoder::new(); let mut buf = vec![0u8; 2048]; loop { let n = match peer_for_task.recv(&mut buf).await { Ok(n) => n, Err(_) => return, }; for ev in dec.feed(&buf[..n]) { if let DecoderEvent::Message { message: MavlinkMessage::CommandLong(cl), .. } = ev { let ack = peer_enc.encode(&MavlinkMessage::CommandAck(CommandAck { command: cl.command, result: MAV_RESULT_ACCEPTED, })); let _ = peer_for_task.send_to(&ack, layer_addr).await; } } } }); // Act: call send_command and await resolution. let cmd = CommandLong { param1: 0.0, param2: 0.0, param3: 0.0, param4: 0.0, param5: 0.0, param6: 0.0, param7: 0.0, command: MAV_CMD_NAV_RETURN_TO_LAUNCH, target_system: 1, target_component: 1, confirmation: 0, }; let ack = timeout(Duration::from_secs(2), handle.send_command(cmd, None)) .await .expect("ack must arrive within 2 s") .expect("send_command must succeed"); // Assert: ack matches and in-flight map is clear. assert_eq!(ack.command, MAV_CMD_NAV_RETURN_TO_LAUNCH); assert_eq!(ack.result, MAV_RESULT_ACCEPTED); assert_eq!( handle.commands_in_flight(), 0, "in-flight map must be drained" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ac2_send_command_timeout_returns_explicit_error() { // Arrange: a peer that NEVER acks. let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer"); let peer_addr = peer.local_addr().expect("peer addr").to_string(); let (_shutdown_tx, shutdown_rx) = watch::channel(false); let (layer, handle) = MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS)); tokio::spawn(layer.run(shutdown_rx)); // Pull the layer's first heartbeat just so the link is open. let _ = drain_first_heartbeat_addr(&peer).await; let cmd = CommandLong { param1: 0.0, param2: 0.0, param3: 0.0, param4: 0.0, param5: 0.0, param6: 0.0, param7: 0.0, command: MAV_CMD_NAV_RETURN_TO_LAUNCH, target_system: 1, target_component: 1, confirmation: 0, }; // Act let result = handle .send_command(cmd, Some(Duration::from_millis(300))) .await; // Assert match result { Err(SendCommandError::Timeout(d)) => { assert_eq!(d, Duration::from_millis(300)); } other => panic!("expected Timeout, got {other:?}"), } assert_eq!( handle.commands_in_flight(), 0, "in-flight map must be cleared on timeout (no leaks)" ); } /// Defensive coverage: a stray COMMAND_ACK without a matching waiter must not /// crash the link or leak entries. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unmatched_ack_is_dropped_without_side_effect() { // Arrange let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer"); let peer_addr = peer.local_addr().expect("peer addr").to_string(); let (_shutdown_tx, shutdown_rx) = watch::channel(false); let (layer, handle) = MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS)); tokio::spawn(layer.run(shutdown_rx)); let layer_addr = drain_first_heartbeat_addr(&peer).await; // Act: send a HEARTBEAT (to keep watchdog happy) and a stray COMMAND_ACK. let peer_enc = Encoder::new(2, 1); let hb = peer_enc.encode(&MavlinkMessage::Heartbeat(Heartbeat { custom_mode: 0, mavtype: 2, autopilot: 3, base_mode: 0, system_status: 4, mavlink_version: 3, })); peer.send_to(&hb, layer_addr).await.unwrap(); let stray = peer_enc.encode(&MavlinkMessage::CommandAck(CommandAck { command: MAV_CMD_NAV_RETURN_TO_LAUNCH, result: MAV_RESULT_ACCEPTED, })); peer.send_to(&stray, layer_addr).await.unwrap(); // Give the layer a beat to process. tokio::time::sleep(Duration::from_millis(150)).await; // Assert assert_eq!(handle.commands_in_flight(), 0); let snap = handle.parse_errors(); assert_eq!(snap.signing_mismatch, 0); assert_eq!(snap.crc, 0); }