Files
Oleksandr Bezdieniezhnykh 740bf37d76 [AZ-641] [AZ-642] [AZ-644] mavlink transport + codec + mission pull
Lands the second batch under epic AZ-626's implementation plan.

mavlink_layer (AZ-641 + AZ-642):
- Hand-rolled MAVLink v2 codec covering the §7.7 surface: HEARTBEAT,
  SYS_STATUS, SET_MODE, ATTITUDE, GLOBAL_POSITION_INT, MISSION_* (7),
  COMMAND_LONG, COMMAND_ACK, EXTENDED_SYS_STATE, STATUSTEXT (17 total).
- Streaming decoder demuxes arbitrary-sized byte arrivals, drops malformed
  frames with typed parse-error counters (crc/truncated/unknown_id/seq_gap),
  and surfaces sequence gaps without hard-failing the link.
- Encoder tracks the per-link tx_seq counter and applies the MAVLink v2
  trailing-zero payload truncation rule.
- UDP and POSIX-serial transports behind a single async Transport trait;
  the run loop owns transport open with bounded exponential backoff
  (2 s serial / 5 s UDP cap) and a tokio::select! per-link read+write
  loop.
- 1 Hz outbound HEARTBEAT scheduler + inbound-heartbeat watchdog that
  fires LinkUp / LinkLost on a broadcast channel and feeds health detail
  (connected, last_heartbeat_age_ms, signing_enabled, parse_errors).

mission_client (AZ-644):
- HTTPS GET /missions/{id} over rustls (no OpenSSL on the airframe).
- Bundled JSON Schema (crates/shared/contracts/mission-schema.json,
  draft-07, additionalProperties:false) validates every response;
  schema-invalid bodies surface as FetchError::SchemaInvalid with a
  1 KiB sample of the raw body for offline analysis.
- Transient failures (timeout, 5xx, 429) retry with bounded exponential
  backoff up to MissionClientOptions.max_attempts (default 5); permanent
  failures (4xx, malformed URL) abort immediately.
- Health surface mirrors AC-1's contract: last_fetch_ts,
  fetch_errors_total, schema_version, connection_state.

Caught and fixed before commit (NOT a code-review finding — caught by
the unit test that hand-computed CRC("123456789")): the hand-rolled
X.25 CRC accumulator was operating in u16 throughout. The MAVLink C
reference declares `tmp` as uint8_t, which silently truncates the
shifted-in bits. Round-trip tests passed (encoder and decoder shared
the bug); a real MAVLink peer would have rejected every frame. Fixed
by mirroring the C reference: `let mut tmp: u8 = …; tmp ^= tmp.wrapping_shl(4);`.
Added a regression test asserting CRC("123456789") == 0x6F91 against
pymavlink's reference value (NOT the textbook 0x29B1 — MAVLink uses a
byte-wise variant, not the bit-reflected CCITT).

AC verification (full detail in
_docs/03_implementation/batch_02_cycle1_report.md):

AZ-641: AC-1 + AC-3 + AC-4 verified via UDP loopback integration tests;
        AC-2 (serial) requires a socat pty pair and runs in the SITL/CI
        tier (test exists as #[ignore]-marked stub).
AZ-642: AC-1 + AC-2 + AC-3 verified via exhaustive codec round-trip and
        decoder negative-path tests; AC-4 (SITL round-trip) requires
        ArduPilot SITL — the CRC fix above means the codec is now
        wire-correct, ready for the sitl-conformance Woodpecker stage.
AZ-644: all four ACs verified via wiremock-driven integration tests.

Workspace gates green:
- cargo check --workspace                                clean
- cargo check --workspace --no-default-features          clean
- cargo fmt --all -- --check                             clean
- cargo clippy --workspace --all-targets -- -D warnings  clean
- cargo test --workspace                                 pass (1 expected ignore)

Layering invariants from module-layout.md hold: mavlink_layer and
mission_client are Layer 2 actors importing only `shared`; no sibling
Layer-2 imports; MavlinkHandle implements shared::contracts::MavlinkSink.

Jira: AZ-641, AZ-642, AZ-644 transitioned To Do → In Progress at batch
start; the matching In Testing transitions follow this commit.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 12:29:49 +03:00

190 lines
6.8 KiB
Rust

//! AZ-641 UDP integration tests: heartbeat cadence, reconnect, link-lost.
//!
//! These tests exercise a real `tokio::net::UdpSocket` peer on `127.0.0.1`.
//! No external services required.
use std::time::Duration;
use shared::health::HealthLevel;
use tokio::net::UdpSocket;
use tokio::sync::watch;
use tokio::time::timeout;
use mavlink_layer::{
Decoder, DecoderEvent, Encoder, Heartbeat, LinkEvent, MavlinkConnection, MavlinkLayer,
MavlinkLayerOptions, MavlinkMessage,
};
const SHORT_TIMEOUT: u64 = 250; // ms
async fn fresh_peer_socket() -> (UdpSocket, String) {
let s = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer");
let addr = s.local_addr().expect("addr").to_string();
(s, addr)
}
fn options_for(uri: String, link_timeout_ms: u64) -> MavlinkLayerOptions {
let mut o = MavlinkLayerOptions::new(MavlinkConnection::new(uri));
o.link_timeout = Duration::from_millis(link_timeout_ms);
o.reconnect_base = Duration::from_millis(50);
o.reconnect_cap = Duration::from_millis(200);
o
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac1_udp_opens_and_emits_heartbeats() {
// Arrange: peer is listening at a random local port; layer connects to it.
let (peer, peer_addr) = fresh_peer_socket().await;
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT));
tokio::spawn(layer.run(shutdown_rx));
// Act: wait for at least one heartbeat frame from the layer.
let mut buf = vec![0u8; 1024];
let n = timeout(Duration::from_secs(2), peer.recv(&mut buf))
.await
.expect("first heartbeat must arrive within 2 s")
.expect("udp recv");
let mut dec = Decoder::new();
let events = dec.feed(&buf[..n]);
// Assert: it's a HEARTBEAT and the layer reports connected.
assert!(events.iter().any(|e| matches!(
e,
DecoderEvent::Message {
message: MavlinkMessage::Heartbeat(_),
..
}
)));
// Drain at least one tick so health() reflects the connected state.
let h = handle.health();
assert!(
h.level == HealthLevel::Red || h.level == HealthLevel::Yellow,
// We have not yet received a peer heartbeat → link still down, so health
// should be at least Yellow (connected but link_up false) or Red (still opening).
"got {:?}",
h.level
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac3_emits_heartbeat_at_one_hertz() {
// Arrange
let (peer, peer_addr) = fresh_peer_socket().await;
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, _handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT));
tokio::spawn(layer.run(shutdown_rx));
// Act: count heartbeat frames over ~2.5 s; expect 2 or 3.
let mut heartbeats: u32 = 0;
let deadline = tokio::time::Instant::now() + Duration::from_millis(2500);
let mut buf = vec![0u8; 1024];
let mut dec = Decoder::new();
while tokio::time::Instant::now() < deadline {
let remaining = deadline - tokio::time::Instant::now();
if remaining.is_zero() {
break;
}
if let Ok(Ok(n)) = timeout(remaining, peer.recv(&mut buf)).await {
for ev in dec.feed(&buf[..n]) {
if let DecoderEvent::Message {
message: MavlinkMessage::Heartbeat(_),
..
} = ev
{
heartbeats += 1;
}
}
} else {
break;
}
}
// Assert: 1 Hz ± 50 ms; in 2.5 s we expect 2 or 3.
assert!(
(2..=3).contains(&heartbeats),
"expected 2 or 3 heartbeats in 2.5 s, got {heartbeats}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac4_link_lost_when_peer_silent() {
// Arrange: 200 ms link-timeout so the test runs fast.
let (peer, peer_addr) = fresh_peer_socket().await;
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, handle) = MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), 200));
let mut link_events = handle.subscribe_link_events();
tokio::spawn(layer.run(shutdown_rx));
// Wait for the layer to open and send its first heartbeat (so we can `send_to` back).
let mut buf = vec![0u8; 1024];
let (_n, layer_local_addr) = timeout(Duration::from_secs(2), peer.recv_from(&mut buf))
.await
.expect("recv first hb")
.expect("udp recv_from");
// Send one peer HEARTBEAT so the watchdog reports LinkUp.
let peer_enc = Encoder::new(2, 1); // pretend to be ArduPilot sysid=2 compid=1
let peer_hb = peer_enc.encode(&MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
}));
peer.send_to(&peer_hb, layer_local_addr)
.await
.expect("send_to");
// Drain LinkUp.
let up = timeout(Duration::from_secs(1), link_events.recv())
.await
.expect("LinkUp arrives")
.expect("event ok");
assert_eq!(up, LinkEvent::LinkUp);
// Act: stop sending peer heartbeats and wait > timeout.
tokio::time::sleep(Duration::from_millis(500)).await;
// Assert: LinkLost has been broadcast.
let lost = timeout(Duration::from_secs(1), link_events.recv())
.await
.expect("LinkLost arrives")
.expect("event ok");
assert_eq!(lost, LinkEvent::LinkLost);
assert!(!handle
.health()
.detail
.unwrap_or_default()
.contains("link_up=true"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac1_udp_reconnects_after_peer_restart() {
// Arrange: pick a port up-front; peer is offline at first.
let probe = UdpSocket::bind("127.0.0.1:0").await.expect("probe");
let peer_addr = probe.local_addr().expect("probe addr").to_string();
drop(probe);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, _handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT));
tokio::spawn(layer.run(shutdown_rx));
// Wait a moment so the layer has had a chance to open (UDP open never
// "fails" since there's no handshake — but the connect call still goes
// through). Then start the peer.
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = UdpSocket::bind(&peer_addr).await.expect("peer up");
// Act: confirm we receive a heartbeat from the layer within the cap.
let mut buf = vec![0u8; 1024];
let r = timeout(Duration::from_secs(5), peer.recv(&mut buf)).await;
// Assert
assert!(r.is_ok(), "heartbeat must arrive after peer comes up");
}