[AZ-643] [AZ-665] [AZ-672] mavlink+mapobjects+vlm batch 4
ci/woodpecker/push/build-arm Pipeline failed

AZ-643 mavlink_layer:
- ack demux on COMMAND_LONG/COMMAND_ACK with oneshot dispatch and
  configurable deadline; MavlinkHandle::send_command + SendCommandError
- MAVLink-2 signing: Signer/Verifier built on SHA-256, key + timestamp
  source, incompat-flag wiring in encoder, reject + counter in decoder
- new tests: tests/ack_demux.rs (3) + tests/signing.rs (5)

AZ-665 mapobjects_store:
- internal/h3_index.rs (h3o wrapper, cell_of, grid_disk, haversine)
- internal/store.rs (in-memory (cell -> Vec<MapObject>) hashmap with
  k-ring classify and class-group resolution)
- public API: MapObjectsStoreHandle::classify(ClassifyInput) ->
  Classification {New|Moved|Existing}
- AC1-4 in tests/classify.rs; AC5 perf gate (#[ignore], passes in
  --release)

AZ-672 vlm_client + autopilot:
- DisabledVlmProvider in shared::contracts; VlmProvider::name() for
  composition-root diagnostics
- vlm_client::VlmClient gated behind feature = "vlm"; placeholder
  until AZ-673 lands the real NanoLLM IPC
- autopilot: vlm_client is now optional = true under feature vlm;
  Runtime::select_vlm_provider picks DisabledVlmProvider when feature
  off OR config.vlm.enabled = false

Workspace deps: +sha2 (mavlink signing), +h3o (mapobjects index).
Batch report: _docs/03_implementation/batch_04_cycle1_report.md

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-19 13:31:42 +03:00
parent 0a87c0f716
commit 69c0629350
29 changed files with 2492 additions and 131 deletions
+2
View File
@@ -15,6 +15,8 @@ async-trait = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
tokio-serial = { workspace = true }
sha2 = { workspace = true }
chrono = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "io-util", "net", "signal", "test-util"] }
@@ -0,0 +1,150 @@
//! In-flight `COMMAND_LONG` → `COMMAND_ACK` demultiplexer.
//!
//! Each outbound `COMMAND_LONG` registers a one-shot waiter keyed by
//! `command_id`. The inbound message pump (`MavlinkLayer::process_decoder_event`)
//! looks up the waiter on every `COMMAND_ACK` and resolves it. Unmatched acks
//! are logged but do not break the link. Timeouts are enforced by the caller's
//! `send_command` future; on timeout the waiter is removed so the map cannot
//! leak.
use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::oneshot;
use super::codec::messages::CommandAck;
#[derive(Debug, Default)]
pub struct AckDemux {
waiters: Mutex<HashMap<u16, oneshot::Sender<CommandAck>>>,
}
#[derive(Debug)]
pub enum AckDemuxRegister {
/// Caller now owns this receiver; the demux owns the matching sender.
Receiver(oneshot::Receiver<CommandAck>),
/// A waiter for `command_id` is already in flight; refuse to register a
/// second one.
Duplicate,
}
impl AckDemux {
pub fn new() -> Self {
Self {
waiters: Mutex::new(HashMap::new()),
}
}
/// Register a waiter for the given `command_id`. Returns the receiver end
/// of the oneshot channel; the demux holds the sender until the matching
/// ack arrives, the caller times out, or the entry is force-cleared.
pub fn register(&self, command_id: u16) -> AckDemuxRegister {
let (tx, rx) = oneshot::channel();
let mut guard = self.waiters.lock().expect("ack demux mutex poisoned");
if guard.contains_key(&command_id) {
return AckDemuxRegister::Duplicate;
}
guard.insert(command_id, tx);
AckDemuxRegister::Receiver(rx)
}
/// Deliver an inbound ack to its waiter. Returns `true` if a waiter was
/// resolved; `false` if the ack did not match any in-flight command (the
/// caller may log this for visibility but must not treat it as fatal).
pub fn dispatch(&self, ack: CommandAck) -> bool {
let waiter = {
let mut guard = self.waiters.lock().expect("ack demux mutex poisoned");
guard.remove(&ack.command)
};
match waiter {
Some(tx) => tx.send(ack).is_ok(),
None => false,
}
}
/// Forget the waiter for `command_id` without resolving it. Used by the
/// caller's timeout path so the map cannot leak.
pub fn forget(&self, command_id: u16) {
let mut guard = self.waiters.lock().expect("ack demux mutex poisoned");
guard.remove(&command_id);
}
/// Number of currently-in-flight commands. Exposed via `MavlinkHandle::health`.
pub fn in_flight(&self) -> usize {
let guard = self.waiters.lock().expect("ack demux mutex poisoned");
guard.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ack(command: u16, result: u8) -> CommandAck {
CommandAck { command, result }
}
#[tokio::test]
async fn register_then_dispatch_resolves_waiter() {
// Arrange
let demux = AckDemux::new();
let rx = match demux.register(20) {
AckDemuxRegister::Receiver(rx) => rx,
AckDemuxRegister::Duplicate => panic!("expected fresh registration"),
};
// Act
let dispatched = demux.dispatch(ack(20, 0));
let got = rx.await.expect("waiter must be resolved");
// Assert
assert!(dispatched);
assert_eq!(got.command, 20);
assert_eq!(got.result, 0);
assert_eq!(demux.in_flight(), 0);
}
#[tokio::test]
async fn duplicate_registration_is_refused() {
// Arrange
let demux = AckDemux::new();
let _rx = demux.register(20);
// Act
let again = demux.register(20);
// Assert
assert!(matches!(again, AckDemuxRegister::Duplicate));
}
#[tokio::test]
async fn dispatch_without_waiter_returns_false() {
// Arrange
let demux = AckDemux::new();
// Act
let dispatched = demux.dispatch(ack(20, 0));
// Assert
assert!(!dispatched);
}
#[tokio::test]
async fn forget_removes_entry_without_resolving() {
// Arrange
let demux = AckDemux::new();
let rx = match demux.register(20) {
AckDemuxRegister::Receiver(rx) => rx,
_ => panic!("expected receiver"),
};
// Act
demux.forget(20);
let dispatched = demux.dispatch(ack(20, 0));
// Assert: dropping the sender closes the receiver
assert!(!dispatched);
assert_eq!(demux.in_flight(), 0);
assert!(rx.await.is_err());
}
}
@@ -8,6 +8,7 @@
use super::crc::frame_crc;
use super::messages::{crc_extra_for_id, MavlinkMessage};
use super::parse_errors::{ParseErrorKind, ParseErrors};
use super::signing::Verifier;
use super::{HEADER_LEN, INCOMPAT_FLAG_SIGNED, MAVLINK_V2_STX, MAX_PAYLOAD, SIGNATURE_LEN};
#[derive(Debug, Clone, PartialEq)]
@@ -37,6 +38,21 @@ pub enum DecoderEvent {
expected: u8,
actual: u8,
},
/// A signed frame's trailer did not match the configured verifier, or a
/// signing-required link received an unsigned frame.
SigningMismatch {
msg_id: u32,
seq: u8,
reason: SigningReject,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SigningReject {
/// `incompat_flags` bit 0 was set but the signature trailer did not verify.
BadSignature,
/// A verifier is configured but the inbound frame was unsigned.
Unsigned,
}
#[derive(Debug)]
@@ -46,6 +62,7 @@ pub struct Decoder {
pub errors: ParseErrors,
/// Last sequence number per (sysid, compid).
last_seq: std::collections::HashMap<(u8, u8), u8>,
verifier: Option<Verifier>,
}
impl Default for Decoder {
@@ -60,9 +77,27 @@ impl Decoder {
buf: Vec::with_capacity(4 * 1024),
errors: ParseErrors::new(),
last_seq: std::collections::HashMap::new(),
verifier: None,
}
}
/// Construct a decoder that validates the MAVLink-2 signing trailer on
/// inbound frames. Mismatched signatures are surfaced as
/// [`DecoderEvent::SigningMismatch`] and counted via
/// [`ParseErrorKind::SigningMismatch`].
pub fn with_verifier(verifier: Verifier) -> Self {
Self {
buf: Vec::with_capacity(4 * 1024),
errors: ParseErrors::new(),
last_seq: std::collections::HashMap::new(),
verifier: Some(verifier),
}
}
pub fn signing_enabled(&self) -> bool {
self.verifier.is_some()
}
/// Push raw bytes into the decoder and drain any complete events.
pub fn feed(&mut self, bytes: &[u8]) -> Vec<DecoderEvent> {
self.buf.extend_from_slice(bytes);
@@ -147,6 +182,35 @@ impl Decoder {
}
}
if let Some(verifier) = &self.verifier {
let is_signed = incompat & INCOMPAT_FLAG_SIGNED != 0;
if !is_signed {
self.errors.record(ParseErrorKind::SigningMismatch);
events.push(DecoderEvent::SigningMismatch {
msg_id,
seq,
reason: SigningReject::Unsigned,
});
self.buf.drain(..total_frame);
continue;
}
let body_end = HEADER_LEN + payload_len + 2;
let trailer_start = body_end;
let trailer_end = trailer_start + SIGNATURE_LEN;
let frame_until_crc = &self.buf[..body_end];
let trailer = &self.buf[trailer_start..trailer_end];
if !verifier.verify(frame_until_crc, trailer) {
self.errors.record(ParseErrorKind::SigningMismatch);
events.push(DecoderEvent::SigningMismatch {
msg_id,
seq,
reason: SigningReject::BadSignature,
});
self.buf.drain(..total_frame);
continue;
}
}
let payload = &self.buf[HEADER_LEN..HEADER_LEN + payload_len];
match MavlinkMessage::decode(msg_id, payload) {
Ok(message) => {
@@ -1,19 +1,23 @@
//! MAVLink v2 frame encoder.
//!
//! The encoder owns the per-link outbound `tx_seq` counter and is the single
//! place that lays down the wire bytes.
//! place that lays down the wire bytes. When configured with a [`Signer`],
//! it appends the 13-byte MAVLink-2 signing trailer and sets the `incompat`
//! flag accordingly (AZ-643).
use std::sync::atomic::{AtomicU8, Ordering};
use super::crc::frame_crc;
use super::messages::MavlinkMessage;
use super::{HEADER_LEN, MAVLINK_V2_STX};
use super::signing::Signer;
use super::{HEADER_LEN, INCOMPAT_FLAG_SIGNED, MAVLINK_V2_STX};
#[derive(Debug)]
pub struct Encoder {
sysid: u8,
compid: u8,
tx_seq: AtomicU8,
signer: Option<Signer>,
}
impl Encoder {
@@ -22,6 +26,17 @@ impl Encoder {
sysid,
compid,
tx_seq: AtomicU8::new(0),
signer: None,
}
}
/// Construct an encoder that signs every outbound frame.
pub fn with_signer(sysid: u8, compid: u8, signer: Signer) -> Self {
Self {
sysid,
compid,
tx_seq: AtomicU8::new(0),
signer: Some(signer),
}
}
@@ -33,10 +48,16 @@ impl Encoder {
self.compid
}
pub fn signing_enabled(&self) -> bool {
self.signer.is_some()
}
/// Encode `msg` into a self-contained MAVLink v2 frame on the wire.
///
/// Trailing-zero payload bytes are truncated per the MAVLink spec. Each
/// call advances the per-link tx sequence counter by 1 with wrap-around.
/// If the encoder was constructed with a [`Signer`], the 13-byte signing
/// trailer is appended and `incompat_flags` bit 0 is set.
pub fn encode(&self, msg: &MavlinkMessage) -> Vec<u8> {
let mut full_payload = Vec::with_capacity(64);
msg.encode_payload(&mut full_payload);
@@ -45,13 +66,20 @@ impl Encoder {
let msg_id = msg.msg_id();
let seq = self.tx_seq.fetch_add(1, Ordering::Relaxed);
let mut frame = Vec::with_capacity(HEADER_LEN + payload_len + 2);
let incompat_flags = if self.signer.is_some() {
INCOMPAT_FLAG_SIGNED
} else {
0
};
let trailer_len = if self.signer.is_some() { 13 } else { 0 };
let mut frame = Vec::with_capacity(HEADER_LEN + payload_len + 2 + trailer_len);
frame.push(MAVLINK_V2_STX);
// Body that the CRC covers begins here.
let body_start = frame.len();
frame.push(payload_len as u8);
frame.push(0); // incompat_flags (no signing in this task — AZ-643)
frame.push(incompat_flags);
frame.push(0); // compat_flags
frame.push(seq);
frame.push(self.sysid);
@@ -64,6 +92,10 @@ impl Encoder {
let crc = frame_crc(&frame[body_start..], msg.crc_extra());
frame.extend_from_slice(&crc.to_le_bytes());
if let Some(signer) = &self.signer {
signer.sign_into(&mut frame);
}
frame
}
}
@@ -9,6 +9,7 @@ pub mod decoder;
pub mod encoder;
pub mod messages;
pub mod parse_errors;
pub mod signing;
pub use decoder::{Decoder, DecoderEvent};
pub use encoder::Encoder;
@@ -18,6 +19,7 @@ pub use messages::{
MissionItemReached, MissionRequestInt, MissionSetCurrent, SetMode, StatusText, SysStatus,
};
pub use parse_errors::{ParseErrorKind, ParseErrors};
pub use signing::{Signer, SigningKey, Verifier};
/// MAVLink v2 frame start byte.
pub const MAVLINK_V2_STX: u8 = 0xFD;
@@ -14,6 +14,8 @@ pub enum ParseErrorKind {
SequenceGap,
/// Message-specific payload decode failed (e.g. enum out of range).
InvalidPayload,
/// MAVLink-2 signing trailer did not match the verifier's secret + timestamp policy.
SigningMismatch,
}
#[derive(Debug, Default)]
@@ -23,6 +25,7 @@ pub struct ParseErrors {
unknown_id: AtomicU64,
sequence_gap: AtomicU64,
invalid_payload: AtomicU64,
signing_mismatch: AtomicU64,
}
impl ParseErrors {
@@ -37,6 +40,7 @@ impl ParseErrors {
ParseErrorKind::UnknownId => &self.unknown_id,
ParseErrorKind::SequenceGap => &self.sequence_gap,
ParseErrorKind::InvalidPayload => &self.invalid_payload,
ParseErrorKind::SigningMismatch => &self.signing_mismatch,
};
cell.fetch_add(1, Ordering::Relaxed);
}
@@ -48,6 +52,7 @@ impl ParseErrors {
unknown_id: self.unknown_id.load(Ordering::Relaxed),
sequence_gap: self.sequence_gap.load(Ordering::Relaxed),
invalid_payload: self.invalid_payload.load(Ordering::Relaxed),
signing_mismatch: self.signing_mismatch.load(Ordering::Relaxed),
}
}
}
@@ -59,11 +64,17 @@ pub struct ParseErrorsSnapshot {
pub unknown_id: u64,
pub sequence_gap: u64,
pub invalid_payload: u64,
pub signing_mismatch: u64,
}
impl ParseErrorsSnapshot {
pub fn total(&self) -> u64 {
self.crc + self.truncated + self.unknown_id + self.sequence_gap + self.invalid_payload
self.crc
+ self.truncated
+ self.unknown_id
+ self.sequence_gap
+ self.invalid_payload
+ self.signing_mismatch
}
}
@@ -0,0 +1,280 @@
//! MAVLink v2 message signing (per the MAVLink spec).
//!
//! When signing is enabled, outbound frames carry a 13-byte trailer:
//! `link_id(1) || timestamp(6, LE) || signature(6)` and have the `incompat_flags`
//! bit 0 set. Inbound signed frames are validated against the configured
//! secret key; mismatched signatures are rejected by the decoder.
//!
//! Algorithm: `signature = first 6 bytes of SHA-256(secret_key(32) || frame(1+9+payload+2) || link_id(1) || timestamp_le(6))`
//! where `frame` is the full pre-signature MAVLink-2 frame (STX through CRC).
//!
//! Timestamp epoch: 2015-01-01T00:00:00Z, granularity 10 μs (per the spec).
use std::sync::atomic::{AtomicU64, Ordering};
use chrono::{TimeZone, Utc};
use sha2::{Digest, Sha256};
use super::SIGNATURE_LEN;
/// Length of the truncated signature within the trailer.
pub const SIGNATURE_BYTES: usize = 6;
/// Length of the signing key in bytes.
pub const SIGNING_KEY_LEN: usize = 32;
/// Length of the per-frame timestamp field within the trailer.
pub const TIMESTAMP_LEN: usize = 6;
/// MAVLink-2 signing epoch — `2015-01-01T00:00:00Z`.
fn mavlink_epoch_unix_micros() -> i64 {
Utc.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.single()
.expect("2015-01-01 is a valid date")
.timestamp_micros()
}
/// 32-byte symmetric signing key shared with the link peer.
#[derive(Clone)]
pub struct SigningKey([u8; SIGNING_KEY_LEN]);
impl SigningKey {
pub fn new(bytes: [u8; SIGNING_KEY_LEN]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; SIGNING_KEY_LEN] {
&self.0
}
}
impl std::fmt::Debug for SigningKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Never log key bytes (security gate).
f.debug_struct("SigningKey")
.field("len", &SIGNING_KEY_LEN)
.finish()
}
}
/// Monotonic 48-bit timestamp source used by the outbound signer.
///
/// Resolution is 10 μs since the MAVLink epoch (2015-01-01 UTC). The source
/// is strictly monotonic: if wall-clock time goes backwards (NTP step), the
/// next stamp is `last + 1` so timestamps still increase per the spec.
#[derive(Debug)]
pub struct TimestampSource {
last: AtomicU64,
}
impl TimestampSource {
pub fn new() -> Self {
Self {
last: AtomicU64::new(0),
}
}
/// Return the next 6-byte timestamp value to embed in a signed frame.
pub fn next(&self) -> u64 {
let now_us = Utc::now().timestamp_micros();
let candidate = compute_timestamp(now_us);
loop {
let prev = self.last.load(Ordering::Relaxed);
let next = candidate.max(prev + 1) & 0x0000_FFFF_FFFF_FFFF;
match self
.last
.compare_exchange(prev, next, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => return next,
Err(_) => continue,
}
}
}
}
impl Default for TimestampSource {
fn default() -> Self {
Self::new()
}
}
fn compute_timestamp(now_unix_micros: i64) -> u64 {
let delta_us = now_unix_micros - mavlink_epoch_unix_micros();
if delta_us <= 0 {
return 0;
}
(delta_us / 10) as u64
}
/// Outbound signer. Produces the 13-byte signing trailer for a frame.
#[derive(Debug)]
pub struct Signer {
key: SigningKey,
link_id: u8,
timestamps: TimestampSource,
}
impl Signer {
pub fn new(key: SigningKey, link_id: u8) -> Self {
Self {
key,
link_id,
timestamps: TimestampSource::new(),
}
}
pub fn link_id(&self) -> u8 {
self.link_id
}
/// Append the 13-byte signing trailer to `frame`. Caller must have already
/// set `incompat_flags` bit 0 (the `INCOMPAT_FLAG_SIGNED` bit) **before**
/// calling this — the trailer hash covers the bytes as-emitted.
///
/// `frame` must contain `[STX, header(9), payload, crc_lo, crc_hi]` and
/// nothing else when this is called.
pub fn sign_into(&self, frame: &mut Vec<u8>) {
let timestamp = self.timestamps.next();
let timestamp_le = timestamp_to_bytes(timestamp);
let signature = compute_signature(self.key.as_bytes(), frame, self.link_id, &timestamp_le);
frame.push(self.link_id);
frame.extend_from_slice(&timestamp_le);
frame.extend_from_slice(&signature);
}
}
/// Inbound verifier. Returns `true` when the trailer matches the secret +
/// frame body; `false` otherwise.
#[derive(Debug, Clone)]
pub struct Verifier {
key: SigningKey,
}
impl Verifier {
pub fn new(key: SigningKey) -> Self {
Self { key }
}
/// Verify the 13-byte trailer against `frame_until_crc` (the bytes
/// `[STX..crc_hi]` inclusive). `trailer` must be exactly 13 bytes.
pub fn verify(&self, frame_until_crc: &[u8], trailer: &[u8]) -> bool {
if trailer.len() != SIGNATURE_LEN {
return false;
}
let link_id = trailer[0];
let mut timestamp_le = [0u8; TIMESTAMP_LEN];
timestamp_le.copy_from_slice(&trailer[1..1 + TIMESTAMP_LEN]);
let want_sig = &trailer[1 + TIMESTAMP_LEN..];
let got_sig =
compute_signature(self.key.as_bytes(), frame_until_crc, link_id, &timestamp_le);
constant_time_eq(want_sig, &got_sig)
}
}
fn compute_signature(
key: &[u8; SIGNING_KEY_LEN],
frame_until_crc: &[u8],
link_id: u8,
timestamp_le: &[u8; TIMESTAMP_LEN],
) -> [u8; SIGNATURE_BYTES] {
let mut hasher = Sha256::new();
hasher.update(key);
hasher.update(frame_until_crc);
hasher.update([link_id]);
hasher.update(timestamp_le);
let digest = hasher.finalize();
let mut out = [0u8; SIGNATURE_BYTES];
out.copy_from_slice(&digest[..SIGNATURE_BYTES]);
out
}
fn timestamp_to_bytes(ts: u64) -> [u8; TIMESTAMP_LEN] {
let bytes = ts.to_le_bytes();
let mut out = [0u8; TIMESTAMP_LEN];
out.copy_from_slice(&bytes[..TIMESTAMP_LEN]);
out
}
#[inline]
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff = 0u8;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
#[cfg(test)]
mod tests {
use super::*;
fn fixed_key() -> SigningKey {
let mut k = [0u8; SIGNING_KEY_LEN];
for (i, b) in k.iter_mut().enumerate() {
*b = i as u8;
}
SigningKey::new(k)
}
#[test]
fn signature_round_trip_validates() {
// Arrange
let key = fixed_key();
let signer = Signer::new(key.clone(), 7);
let verifier = Verifier::new(key);
let mut frame: Vec<u8> = vec![
0xFD, 0x09, 0x01, 0x00, 0x00, 0x01, 0xBE, 0x00, 0x00, 0x00, // header
0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, // payload
0x12, 0x34, // crc
];
let body_end = frame.len();
// Act
signer.sign_into(&mut frame);
// Assert
assert_eq!(frame.len() - body_end, SIGNATURE_LEN);
let trailer = frame[body_end..].to_vec();
assert!(verifier.verify(&frame[..body_end], &trailer));
}
#[test]
fn tampered_trailer_fails_verification() {
// Arrange
let signer = Signer::new(fixed_key(), 0);
let verifier = Verifier::new(fixed_key());
let mut frame: Vec<u8> = vec![
0xFD, 0x00, 0x01, 0x00, 0x05, 0x01, 0xBE, 0x00, 0x00, 0x00, 0x00, 0x00,
];
let body_end = frame.len();
signer.sign_into(&mut frame);
// Flip one bit in the signature region.
*frame.last_mut().unwrap() ^= 0x01;
// Act
let ok = verifier.verify(&frame[..body_end], &frame[body_end..]);
// Assert
assert!(!ok);
}
#[test]
fn timestamps_are_strictly_monotonic() {
// Arrange
let src = TimestampSource::new();
// Act
let a = src.next();
let b = src.next();
let c = src.next();
// Assert
assert!(b > a);
assert!(c > b);
}
}
+1
View File
@@ -1,3 +1,4 @@
pub mod ack_demux;
pub mod codec;
pub mod heartbeat;
pub mod retry;
+146 -11
View File
@@ -12,7 +12,7 @@
//! from `internal::codec`.
//!
//! Real implementation tasks: AZ-641 (transport + heartbeat), AZ-642 (codec),
//! AZ-643 (ack demux + signing — future).
//! AZ-643 (ack demux + signing).
mod internal;
@@ -27,11 +27,14 @@ use shared::contracts::MavlinkSink;
use shared::error::{AutopilotError, Result};
use shared::health::ComponentHealth;
pub use internal::ack_demux::{AckDemux, AckDemuxRegister};
pub use internal::codec::decoder::SigningReject;
pub use internal::codec::{
Attitude, CommandAck, CommandLong, Decoder, DecoderEvent, Encoder, ExtendedSysState,
GlobalPositionInt, Heartbeat, MavlinkMessage, MavlinkParseError, MissionAck, MissionClearAll,
MissionCount, MissionCurrent, MissionItemInt, MissionItemReached, MissionRequestInt,
MissionSetCurrent, ParseErrorKind, ParseErrors, SetMode, StatusText, SysStatus,
MissionSetCurrent, ParseErrorKind, ParseErrors, SetMode, Signer, SigningKey, StatusText,
SysStatus, Verifier,
};
pub use internal::heartbeat::LinkEvent;
pub use internal::uri::{ConnectionUri, DEFAULT_SERIAL_BAUD};
@@ -62,7 +65,7 @@ impl MavlinkConnection {
}
}
/// Tunables for the MAVLink actor. Defaults follow AZ-641 §NFR.
/// Tunables for the MAVLink actor. Defaults follow AZ-641 §NFR + AZ-643 §AC.
#[derive(Debug, Clone)]
pub struct MavlinkLayerOptions {
pub connection: MavlinkConnection,
@@ -76,9 +79,17 @@ pub struct MavlinkLayerOptions {
pub reconnect_cap: Duration,
/// Base delay for the open-loop exponential backoff.
pub reconnect_base: Duration,
/// MAVLink-2 signing flag; plumbed through to health, not enforced here
/// (AZ-643 owns the signing path).
pub signing_enabled: bool,
/// Default deadline for `send_command` if the caller passes `None`.
pub command_ack_deadline: Duration,
/// MAVLink-2 signing config; `None` disables signing on this link.
pub signing: Option<SigningOptions>,
}
/// Signing configuration for a MAVLink-2 link.
#[derive(Debug, Clone)]
pub struct SigningOptions {
pub key: SigningKey,
pub link_id: u8,
}
impl MavlinkLayerOptions {
@@ -90,9 +101,14 @@ impl MavlinkLayerOptions {
link_timeout: Duration::from_millis(internal::heartbeat::DEFAULT_LINK_TIMEOUT_MS),
reconnect_cap: Duration::from_secs(5),
reconnect_base: Duration::from_millis(100),
signing_enabled: false,
command_ack_deadline: Duration::from_secs(1),
signing: None,
}
}
pub fn signing_enabled(&self) -> bool {
self.signing.is_some()
}
}
#[derive(Debug, Clone)]
@@ -117,6 +133,9 @@ struct LinkState {
inbound: broadcast::Sender<InboundMessage>,
connected: AtomicBool,
signing_enabled: bool,
verifier: Option<Verifier>,
ack_demux: Arc<AckDemux>,
command_ack_deadline: Duration,
}
/// Long-running actor that owns the transport, reconnect loop, and codec.
@@ -140,13 +159,30 @@ impl MavlinkLayer {
let (tx, rx) = mpsc::channel(OUTBOUND_CHAN_CAP);
let (inbound_tx, _inbound_rx) = broadcast::channel(INBOUND_CHAN_CAP);
let (watchdog, _link_rx) = InboundWatchdog::new(options.link_timeout.as_millis() as u64);
let encoder = match &options.signing {
Some(sign) => Encoder::with_signer(
options.sysid,
options.compid,
Signer::new(sign.key.clone(), sign.link_id),
),
None => Encoder::new(options.sysid, options.compid),
};
let verifier = options
.signing
.as_ref()
.map(|s| Verifier::new(s.key.clone()));
let state = Arc::new(LinkState {
encoder: Encoder::new(options.sysid, options.compid),
encoder,
parse_errors: Arc::new(ParseErrors::new()),
watchdog,
inbound: inbound_tx,
connected: AtomicBool::new(false),
signing_enabled: options.signing_enabled,
signing_enabled: options.signing.is_some(),
verifier,
ack_demux: Arc::new(AckDemux::new()),
command_ack_deadline: options.command_ack_deadline,
});
let layer = Self {
options,
@@ -214,7 +250,10 @@ impl MavlinkLayer {
transport: &mut dyn Transport,
shutdown: &mut watch::Receiver<bool>,
) -> LinkOutcome {
let mut decoder = Decoder::new();
let mut decoder = match self.state.verifier.clone() {
Some(v) => Decoder::with_verifier(v),
None => Decoder::new(),
};
let mut heartbeat_tick = tokio::time::interval(heartbeat_period());
heartbeat_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut watchdog_tick = tokio::time::interval(Duration::from_millis(200));
@@ -293,6 +332,17 @@ impl MavlinkLayer {
if matches!(message, MavlinkMessage::Heartbeat(_)) {
self.state.watchdog.note_inbound_heartbeat();
}
if let MavlinkMessage::CommandAck(ack) = &message {
let matched = self.state.ack_demux.dispatch(*ack);
if !matched {
tracing::debug!(
component = NAME,
command = ack.command,
result = ack.result,
"command_ack with no in-flight waiter"
);
}
}
let _ = self.state.inbound.send(InboundMessage {
sysid,
compid,
@@ -340,6 +390,22 @@ impl MavlinkLayer {
"mavlink sequence gap"
);
}
DecoderEvent::SigningMismatch {
msg_id,
seq,
reason,
} => {
self.state
.parse_errors
.record(ParseErrorKind::SigningMismatch);
tracing::warn!(
component = NAME,
msg_id,
seq,
?reason,
"mavlink signing rejected"
);
}
}
}
}
@@ -363,6 +429,20 @@ enum LinkOutcome {
TransportLost(String),
}
/// Errors returned by `MavlinkHandle::send_command`.
#[derive(Debug, thiserror::Error)]
pub enum SendCommandError {
/// The configured ack deadline elapsed without a matching `COMMAND_ACK`.
#[error("command ack timeout after {0:?}")]
Timeout(Duration),
/// A waiter for the same `command_id` is already in flight on this link.
#[error("duplicate command in flight (command_id={0})")]
Duplicate(u16),
/// The outbound channel is closed (link shutting down).
#[error("mavlink send_command channel closed: {0}")]
ChannelClosed(String),
}
impl MavlinkHandle {
/// Send a typed MAVLink message — encoded with the actor's sysid/compid
/// and the next outbound sequence number.
@@ -385,6 +465,51 @@ impl MavlinkHandle {
})
}
/// Send a `COMMAND_LONG` and resolve when the matching `COMMAND_ACK`
/// arrives, or return [`SendCommandError::Timeout`] if `deadline` elapses
/// first.
///
/// `deadline = None` uses [`MavlinkLayerOptions::command_ack_deadline`].
/// Retry policy (if any) is the caller's concern (AZ-643 §Scope: caller
/// owns the retry decision).
pub async fn send_command(
&self,
cmd: CommandLong,
deadline: Option<Duration>,
) -> std::result::Result<CommandAck, SendCommandError> {
let command_id = cmd.command;
let rx = match self.state.ack_demux.register(command_id) {
AckDemuxRegister::Receiver(rx) => rx,
AckDemuxRegister::Duplicate => {
return Err(SendCommandError::Duplicate(command_id));
}
};
if let Err(e) = self
.outbound_tx
.send(OutboundItem::Message(MavlinkMessage::CommandLong(cmd)))
.await
{
// Channel closed — drop the waiter to keep the map clean.
self.state.ack_demux.forget(command_id);
return Err(SendCommandError::ChannelClosed(e.to_string()));
}
let wall = deadline.unwrap_or(self.state.command_ack_deadline);
match tokio::time::timeout(wall, rx).await {
Ok(Ok(ack)) => Ok(ack),
Ok(Err(_recv_err)) => {
// Sender was dropped (forget called elsewhere); treat as timeout
// for the caller's safety. Map is already clean.
Err(SendCommandError::Timeout(wall))
}
Err(_elapsed) => {
self.state.ack_demux.forget(command_id);
Err(SendCommandError::Timeout(wall))
}
}
}
pub fn subscribe_inbound(&self) -> broadcast::Receiver<InboundMessage> {
self.state.inbound.subscribe()
}
@@ -397,15 +522,25 @@ impl MavlinkHandle {
self.state.parse_errors.snapshot()
}
/// Currently in-flight `COMMAND_LONG` requests awaiting ack.
pub fn commands_in_flight(&self) -> usize {
self.state.ack_demux.in_flight()
}
pub fn signing_enabled(&self) -> bool {
self.state.signing_enabled
}
pub fn health(&self) -> ComponentHealth {
let connected = self.state.connected.load(Ordering::Relaxed);
let age = self.state.watchdog.last_inbound_age_ms();
let detail = format!(
"connected={connected} last_heartbeat_age_ms={} signing_enabled={} outbound={} parse_errors={}",
"connected={connected} last_heartbeat_age_ms={} signing_enabled={} outbound={} parse_errors={} commands_in_flight={}",
age.map(|m| m.to_string()).unwrap_or_else(|| "none".into()),
self.state.signing_enabled,
self.state.watchdog.outbound_total(),
self.parse_errors().total(),
self.commands_in_flight(),
);
if !connected {
ComponentHealth::red(NAME, detail)
+194
View File
@@ -0,0 +1,194 @@
//! AZ-643 — ack-demux integration tests (AC-1 happy path, AC-2 timeout).
//!
//! A fake UDP peer either acks immediately or stays silent; the autopilot side
//! issues `send_command(...)` and asserts on the resolution.
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::watch;
use tokio::time::timeout;
use mavlink_layer::{
CommandAck, CommandLong, Decoder, DecoderEvent, Encoder, Heartbeat, MavlinkConnection,
MavlinkLayer, MavlinkLayerOptions, MavlinkMessage, SendCommandError,
};
const MAV_CMD_NAV_RETURN_TO_LAUNCH: u16 = 20;
const MAV_RESULT_ACCEPTED: u8 = 0;
const SHORT_TIMEOUT_MS: u64 = 250;
fn options_for(uri: String, link_timeout_ms: u64) -> MavlinkLayerOptions {
let mut o = MavlinkLayerOptions::new(MavlinkConnection::new(uri));
o.link_timeout = Duration::from_millis(link_timeout_ms);
o.reconnect_base = Duration::from_millis(50);
o.reconnect_cap = Duration::from_millis(200);
// Keep the ack deadline tight so AC-2 finishes fast.
o.command_ack_deadline = Duration::from_millis(500);
o
}
async fn drain_first_heartbeat_addr(peer: &UdpSocket) -> std::net::SocketAddr {
let mut buf = vec![0u8; 1024];
let (_, layer_addr) = timeout(Duration::from_secs(2), peer.recv_from(&mut buf))
.await
.expect("first heartbeat must arrive")
.expect("udp recv_from");
layer_addr
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac1_send_command_happy_path() {
// Arrange: a peer that acks any inbound COMMAND_LONG promptly.
let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer");
let peer_addr = peer.local_addr().expect("peer addr").to_string();
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS));
tokio::spawn(layer.run(shutdown_rx));
// Capture the layer's source address from its first heartbeat.
let layer_addr = drain_first_heartbeat_addr(&peer).await;
let peer_enc = Encoder::new(2, 1);
// Peer task: on every inbound COMMAND_LONG, reply with COMMAND_ACK.
let peer_arc = std::sync::Arc::new(peer);
let peer_for_task = peer_arc.clone();
tokio::spawn(async move {
let mut dec = Decoder::new();
let mut buf = vec![0u8; 2048];
loop {
let n = match peer_for_task.recv(&mut buf).await {
Ok(n) => n,
Err(_) => return,
};
for ev in dec.feed(&buf[..n]) {
if let DecoderEvent::Message {
message: MavlinkMessage::CommandLong(cl),
..
} = ev
{
let ack = peer_enc.encode(&MavlinkMessage::CommandAck(CommandAck {
command: cl.command,
result: MAV_RESULT_ACCEPTED,
}));
let _ = peer_for_task.send_to(&ack, layer_addr).await;
}
}
}
});
// Act: call send_command and await resolution.
let cmd = CommandLong {
param1: 0.0,
param2: 0.0,
param3: 0.0,
param4: 0.0,
param5: 0.0,
param6: 0.0,
param7: 0.0,
command: MAV_CMD_NAV_RETURN_TO_LAUNCH,
target_system: 1,
target_component: 1,
confirmation: 0,
};
let ack = timeout(Duration::from_secs(2), handle.send_command(cmd, None))
.await
.expect("ack must arrive within 2 s")
.expect("send_command must succeed");
// Assert: ack matches and in-flight map is clear.
assert_eq!(ack.command, MAV_CMD_NAV_RETURN_TO_LAUNCH);
assert_eq!(ack.result, MAV_RESULT_ACCEPTED);
assert_eq!(
handle.commands_in_flight(),
0,
"in-flight map must be drained"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac2_send_command_timeout_returns_explicit_error() {
// Arrange: a peer that NEVER acks.
let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer");
let peer_addr = peer.local_addr().expect("peer addr").to_string();
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS));
tokio::spawn(layer.run(shutdown_rx));
// Pull the layer's first heartbeat just so the link is open.
let _ = drain_first_heartbeat_addr(&peer).await;
let cmd = CommandLong {
param1: 0.0,
param2: 0.0,
param3: 0.0,
param4: 0.0,
param5: 0.0,
param6: 0.0,
param7: 0.0,
command: MAV_CMD_NAV_RETURN_TO_LAUNCH,
target_system: 1,
target_component: 1,
confirmation: 0,
};
// Act
let result = handle
.send_command(cmd, Some(Duration::from_millis(300)))
.await;
// Assert
match result {
Err(SendCommandError::Timeout(d)) => {
assert_eq!(d, Duration::from_millis(300));
}
other => panic!("expected Timeout, got {other:?}"),
}
assert_eq!(
handle.commands_in_flight(),
0,
"in-flight map must be cleared on timeout (no leaks)"
);
}
/// Defensive coverage: a stray COMMAND_ACK without a matching waiter must not
/// crash the link or leak entries.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unmatched_ack_is_dropped_without_side_effect() {
// Arrange
let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer");
let peer_addr = peer.local_addr().expect("peer addr").to_string();
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let (layer, handle) =
MavlinkLayer::new(options_for(format!("udp://{peer_addr}"), SHORT_TIMEOUT_MS));
tokio::spawn(layer.run(shutdown_rx));
let layer_addr = drain_first_heartbeat_addr(&peer).await;
// Act: send a HEARTBEAT (to keep watchdog happy) and a stray COMMAND_ACK.
let peer_enc = Encoder::new(2, 1);
let hb = peer_enc.encode(&MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
}));
peer.send_to(&hb, layer_addr).await.unwrap();
let stray = peer_enc.encode(&MavlinkMessage::CommandAck(CommandAck {
command: MAV_CMD_NAV_RETURN_TO_LAUNCH,
result: MAV_RESULT_ACCEPTED,
}));
peer.send_to(&stray, layer_addr).await.unwrap();
// Give the layer a beat to process.
tokio::time::sleep(Duration::from_millis(150)).await;
// Assert
assert_eq!(handle.commands_in_flight(), 0);
let snap = handle.parse_errors();
assert_eq!(snap.signing_mismatch, 0);
assert_eq!(snap.crc, 0);
}
+236
View File
@@ -0,0 +1,236 @@
//! AZ-643 — MAVLink-2 signing integration tests (AC-3 rejection, AC-4 disabled).
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::watch;
use tokio::time::timeout;
use mavlink_layer::{
Decoder, DecoderEvent, Encoder, Heartbeat, MavlinkConnection, MavlinkLayer,
MavlinkLayerOptions, MavlinkMessage, Signer, SigningKey, SigningReject, Verifier,
};
fn options_for(uri: String) -> MavlinkLayerOptions {
let mut o = MavlinkLayerOptions::new(MavlinkConnection::new(uri));
o.link_timeout = Duration::from_millis(500);
o.reconnect_base = Duration::from_millis(50);
o.reconnect_cap = Duration::from_millis(200);
o
}
fn fixed_key(b: u8) -> SigningKey {
let mut k = [0u8; 32];
for (i, byte) in k.iter_mut().enumerate() {
*byte = b.wrapping_add(i as u8);
}
SigningKey::new(k)
}
#[test]
fn ac3_decoder_rejects_bad_signature() {
// Arrange: build a signed frame, then flip one bit in the signature trailer.
let signer = Signer::new(fixed_key(0x10), 5);
let encoder = Encoder::with_signer(1, 191, signer);
let _ = encoder; // signing is exercised through encode()
// Use a separate signer-on-encoder to produce a signed frame for the test.
let local_signer = Encoder::with_signer(1, 191, Signer::new(fixed_key(0x10), 5));
let mut frame = local_signer.encode(&MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
}));
let last = frame.len() - 1;
frame[last] ^= 0x01;
// Act: feed it to a decoder with the matching verifier.
let mut dec = Decoder::with_verifier(Verifier::new(fixed_key(0x10)));
let events = dec.feed(&frame);
// Assert
let rejected = events.iter().find(|e| {
matches!(
e,
DecoderEvent::SigningMismatch {
reason: SigningReject::BadSignature,
..
}
)
});
assert!(
rejected.is_some(),
"expected SigningMismatch event, got {events:?}"
);
assert_eq!(dec.errors.snapshot().signing_mismatch, 1);
// The HEARTBEAT must NOT have been emitted as a Message.
let emitted = events
.iter()
.any(|e| matches!(e, DecoderEvent::Message { .. }));
assert!(!emitted, "rejected frame must not surface as Message");
}
#[test]
fn ac3_signed_frame_with_matching_key_passes() {
// Arrange
let encoder = Encoder::with_signer(1, 191, Signer::new(fixed_key(0xAB), 9));
let frame = encoder.encode(&MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
}));
// Act
let mut dec = Decoder::with_verifier(Verifier::new(fixed_key(0xAB)));
let events = dec.feed(&frame);
// Assert
let mut got_message = false;
let mut got_mismatch = false;
for ev in &events {
match ev {
DecoderEvent::Message {
message: MavlinkMessage::Heartbeat(_),
..
} => got_message = true,
DecoderEvent::SigningMismatch { .. } => got_mismatch = true,
_ => {}
}
}
assert!(
got_message,
"valid signed heartbeat must surface as Message"
);
assert!(!got_mismatch, "valid signature must not trigger mismatch");
assert_eq!(dec.errors.snapshot().signing_mismatch, 0);
}
#[test]
fn ac4_signing_disabled_ignores_signature_field() {
// Arrange: build BOTH a signed frame and an unsigned frame.
let signed_enc = Encoder::with_signer(1, 191, Signer::new(fixed_key(0x33), 1));
let unsigned_enc = Encoder::new(1, 191);
let hb = MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
});
let signed_frame = signed_enc.encode(&hb);
let unsigned_frame = unsigned_enc.encode(&hb);
// Act: feed both into a Decoder with NO verifier (signing disabled).
let mut dec = Decoder::new();
let signed_events = dec.feed(&signed_frame);
let unsigned_events = dec.feed(&unsigned_frame);
// Assert: both surface as Message, signing_mismatch counter stays at 0.
let signed_ok = signed_events.iter().any(|e| {
matches!(
e,
DecoderEvent::Message {
message: MavlinkMessage::Heartbeat(_),
..
}
)
});
let unsigned_ok = unsigned_events.iter().any(|e| {
matches!(
e,
DecoderEvent::Message {
message: MavlinkMessage::Heartbeat(_),
..
}
)
});
assert!(signed_ok, "with verifier=None, signed frames must decode");
assert!(
unsigned_ok,
"with verifier=None, unsigned frames must decode"
);
assert_eq!(
dec.errors.snapshot().signing_mismatch,
0,
"signing_mismatch counter must stay at 0 in disabled mode"
);
}
#[test]
fn unsigned_frame_rejected_when_verifier_present() {
// Defensive coverage: per the MAVLink spec, with signing enabled the
// decoder rejects unsigned frames. AC-3 only specifies the bad-signature
// case, but the spec-consistent behaviour is to reject both.
let unsigned_enc = Encoder::new(1, 191);
let frame = unsigned_enc.encode(&MavlinkMessage::Heartbeat(Heartbeat {
custom_mode: 0,
mavtype: 2,
autopilot: 3,
base_mode: 0,
system_status: 4,
mavlink_version: 3,
}));
let mut dec = Decoder::with_verifier(Verifier::new(fixed_key(0x44)));
let events = dec.feed(&frame);
assert!(events.iter().any(|e| matches!(
e,
DecoderEvent::SigningMismatch {
reason: SigningReject::Unsigned,
..
}
)));
assert_eq!(dec.errors.snapshot().signing_mismatch, 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn signing_enabled_layer_reports_via_health() {
// Arrange: a layer with signing on, plus a peer that captures the frames.
let peer = UdpSocket::bind("127.0.0.1:0").await.expect("bind peer");
let peer_addr = peer.local_addr().expect("peer addr").to_string();
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let mut opts = options_for(format!("udp://{peer_addr}"));
opts.signing = Some(mavlink_layer::SigningOptions {
key: fixed_key(0x55),
link_id: 3,
});
let (layer, handle) = MavlinkLayer::new(opts);
tokio::spawn(layer.run(shutdown_rx));
// Act: wait for one heartbeat so we have at least one signed frame.
let mut buf = vec![0u8; 1024];
let n = timeout(Duration::from_secs(2), peer.recv(&mut buf))
.await
.expect("heartbeat must arrive within 2 s")
.expect("udp recv");
// Assert: incompat_flags bit 0 (signed) is set on the outbound frame.
assert!(n >= 10, "frame too short");
assert!(
handle.signing_enabled(),
"signing_enabled() must reflect config"
);
assert_eq!(
buf[2] & 0x01,
0x01,
"outbound frame must have INCOMPAT_FLAG_SIGNED set when signing is enabled"
);
let detail = handle.health().detail.unwrap_or_default();
assert!(
detail.contains("signing_enabled=true"),
"health detail must surface signing_enabled=true; got {detail:?}"
);
assert!(
detail.contains("commands_in_flight=0"),
"health detail must surface commands_in_flight; got {detail:?}"
);
}