Files
Oleksandr Bezdieniezhnykh 288e7f8c46
ci/woodpecker/push/build-arm Pipeline failed
[AZ-653] gimbal_controller ViewPro A40 vendor UDP transport (batch 10)
Implements the vendor wire protocol for the A40 gimbal (XOR-8 checksum,
not CRC16 — task spec corrected against ArduPilot AP_Mount_Viewpro.h):
frame encode/decode, typed FrameId/CameraCommand/ImageSensor, A1 angles,
C1 camera, C2 set-zoom command builders, and a tokio UdpSocket transport
with bounded retry, per-command deadline, and atomic vendor-fault
counters surfaced via faults()/health(). GimbalControllerHandle::set_pose
and zoom now ride the transport when wired; remain disabled when no
transport is bound. 32/32 gimbal_controller tests green; workspace test
suite green except for a pre-existing flake in
mission_executor::state_machine::ac3_bounded_retry_then_success that
reproduces only under parallel workspace test load (passes 5/5 in
isolation; flagged in batch 8 report, unrelated to this batch).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 20:07:32 +03:00

359 lines
12 KiB
Rust

//! AZ-653 integration tests for the ViewPro A40 transport.
//!
//! Strategy: bring up a fake A40 endpoint on a second `UdpSocket` in
//! the same process; pair it with the transport under test via a
//! pre-bound `peer` address; drive scenarios by scripting the fake's
//! reply behaviour (echo, drop, corrupt CRC).
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use gimbal_controller::{
build_a1_angles, decode_frame, encode_frame, A40Transport, CameraCommand, FrameId,
GimbalCommand, GimbalController, ImageSensor,
};
use shared::models::gimbal::GimbalState;
const LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
fn loopback(port: u16) -> SocketAddr {
SocketAddr::new(LOCALHOST.into(), port)
}
fn initial_state() -> GimbalState {
GimbalState {
yaw: 0.0,
pitch: 0.0,
zoom: 1.0,
ts_monotonic_ns: 0,
command_in_flight: false,
}
}
/// Bind a UDP socket on an OS-chosen ephemeral port and return both
/// the socket and the bound address.
async fn bind_ephemeral() -> (Arc<UdpSocket>, SocketAddr) {
let s = UdpSocket::bind(loopback(0)).await.expect("bind ephemeral");
let addr = s.local_addr().expect("local_addr");
(Arc::new(s), addr)
}
/// Helper — minimal fake A40 endpoint. Behaviour is supplied as a
/// closure invoked for every inbound frame.
struct FakeA40 {
socket: Arc<UdpSocket>,
addr: SocketAddr,
}
impl FakeA40 {
async fn bind() -> Self {
let (socket, addr) = bind_ephemeral().await;
Self { socket, addr }
}
}
#[tokio::test]
async fn ac1_crc_round_trip_no_faults() {
// Arrange — bring up the fake; build a yaw-30 A1 frame; spawn a
// task that echoes the (well-formed) command back as a
// T1_F1_B1_D1 reply (the vendor's angle-feedback frame).
let fake = FakeA40::bind().await;
let (test_socket, test_addr) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let fake_socket = fake.socket.clone();
let echo_task = tokio::spawn(async move {
let mut buf = [0u8; 128];
let (n, from) = fake_socket
.recv_from(&mut buf)
.await
.expect("fake recv_from");
// Validate the incoming A1 frame parses cleanly.
let inbound = decode_frame(&buf[..n]).expect("inbound decode");
assert_eq!(inbound.frame_id, FrameId::A1);
// Reply with T1_F1_B1_D1 (12 bytes of arbitrary feedback
// payload — content unchecked by the transport).
let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply");
fake_socket
.send_to(&reply, from)
.await
.expect("fake send_to");
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket.clone(), fake.addr).expect("from_socket");
let _ = test_addr;
let payload = build_a1_angles(30.0, 0.0);
// Act
let reply = transport
.send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1)
.await
.expect("send_with_response");
// Assert
assert_eq!(reply.frame_id, FrameId::T1F1B1D1);
assert_eq!(transport.faults().crc, 0);
assert_eq!(transport.faults().timeout, 0);
echo_task.await.expect("echo task");
}
#[tokio::test]
async fn ac2_crc_mismatch_counted_and_dropped() {
// Arrange — fake echoes a frame whose checksum is one bit off.
let fake = FakeA40::bind().await;
let (test_socket, _) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let fake_socket = fake.socket.clone();
tokio::spawn(async move {
let mut buf = [0u8; 128];
let (_n, from) = fake_socket
.recv_from(&mut buf)
.await
.expect("fake recv_from");
// Craft a corrupt frame (flip the checksum).
let mut reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply");
let last = reply.len() - 1;
reply[last] ^= 0x01;
fake_socket
.send_to(&reply, from)
.await
.expect("fake send_to");
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket, fake.addr).expect("from_socket");
let transport = transport
.with_command_deadline(Duration::from_millis(80))
.with_max_retries(1);
let payload = build_a1_angles(30.0, 0.0);
// Act — must fail (the corrupt frame is dropped; no valid reply
// arrives within the deadline).
let result = transport
.send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1)
.await;
// Assert — CRC counter incremented; timeout counter incremented
// because no valid reply arrived.
assert!(
result.is_err(),
"expected MaxRetriesExceeded; got {result:?}"
);
// The receive loop is asynchronous; give it a tick to record.
tokio::time::sleep(Duration::from_millis(20)).await;
let faults = transport.faults();
assert!(faults.crc >= 1, "expected ≥1 CRC fault, got {}", faults.crc);
assert!(
faults.timeout >= 1,
"expected ≥1 timeout fault, got {}",
faults.timeout
);
}
#[tokio::test]
async fn ac3_command_timeout_retries_then_succeeds() {
// Arrange — fake drops the FIRST inbound frame silently; replies
// to every subsequent one.
let fake = FakeA40::bind().await;
let (test_socket, _) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let drop_count = Arc::new(AtomicU8::new(0));
let fake_socket = fake.socket.clone();
let drop_count_for_task = drop_count.clone();
tokio::spawn(async move {
loop {
let mut buf = [0u8; 128];
let Ok((_n, from)) = fake_socket.recv_from(&mut buf).await else {
return;
};
let prior = drop_count_for_task.fetch_add(1, Ordering::Relaxed);
if prior == 0 {
// Silently drop the first command.
continue;
}
let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply");
let _ = fake_socket.send_to(&reply, from).await;
}
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket, fake.addr).expect("from_socket");
let transport = transport
.with_command_deadline(Duration::from_millis(80))
.with_max_retries(3);
let payload = build_a1_angles(30.0, 0.0);
// Act
let reply = transport
.send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1)
.await
.expect("retry should succeed");
// Assert — exactly one timeout (first attempt dropped); reply
// arrived on the second attempt.
assert_eq!(reply.frame_id, FrameId::T1F1B1D1);
let faults = transport.faults();
assert_eq!(
faults.timeout, 1,
"expected 1 timeout fault, got {}",
faults.timeout
);
assert_eq!(faults.crc, 0);
assert!(
drop_count.load(Ordering::Relaxed) >= 2,
"fake should have seen ≥2 commands"
);
}
#[tokio::test]
async fn ac4_cap_exhaustion_returns_max_retries_exceeded() {
// Arrange — fake never replies. The transport should fail after
// exactly `max_retries` attempts with `MaxRetriesExceeded`.
let fake = FakeA40::bind().await;
let (test_socket, _) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let attempts_seen = Arc::new(Mutex::new(0u32));
let fake_socket = fake.socket.clone();
let attempts_for_task = attempts_seen.clone();
tokio::spawn(async move {
loop {
let mut buf = [0u8; 128];
let Ok((_, _from)) = fake_socket.recv_from(&mut buf).await else {
return;
};
*attempts_for_task.lock().await += 1;
// Never reply.
}
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket, fake.addr).expect("from_socket");
let transport = transport
.with_command_deadline(Duration::from_millis(60))
.with_max_retries(3);
let payload = build_a1_angles(30.0, 0.0);
// Act
let err = transport
.send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1)
.await
.expect_err("should hit cap");
// Assert
assert!(
matches!(
err,
gimbal_controller::A40Error::MaxRetriesExceeded { attempts: 3, .. }
),
"expected MaxRetriesExceeded(3); got {err:?}"
);
let faults = transport.faults();
assert_eq!(
faults.timeout, 3,
"expected 3 timeout faults; got {}",
faults.timeout
);
// Give the fake one final beat to record the final attempt.
tokio::time::sleep(Duration::from_millis(20)).await;
let seen = *attempts_seen.lock().await;
assert_eq!(seen, 3, "fake should have seen exactly 3 attempts");
}
#[tokio::test]
async fn set_pose_via_transport_updates_state_stream() {
// Arrange — full GimbalController + transport wired together;
// fake echoes every A1 with a T1_F1_B1_D1 ack.
let fake = FakeA40::bind().await;
let (test_socket, _) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let fake_socket = fake.socket.clone();
tokio::spawn(async move {
loop {
let mut buf = [0u8; 128];
let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else {
return;
};
let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply");
let _ = fake_socket.send_to(&reply, from).await;
}
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket, fake.addr).expect("from_socket");
let controller = GimbalController::with_transport(initial_state(), transport);
let handle = controller.handle();
let mut state_rx = handle.state_stream();
// Act
handle
.set_pose(GimbalCommand {
yaw_deg: 45.0,
pitch_deg: -10.0,
})
.await
.expect("set_pose");
// Assert
state_rx.changed().await.expect("state changed");
let snapshot = *state_rx.borrow();
assert_eq!(snapshot.yaw, 45.0);
assert_eq!(snapshot.pitch, -10.0);
assert_eq!(handle.faults().expect("transport present").timeout, 0);
}
#[tokio::test]
async fn zoom_via_transport_updates_zoom_state() {
// Arrange
let fake = FakeA40::bind().await;
let (test_socket, _) = bind_ephemeral().await;
test_socket.connect(fake.addr).await.expect("connect");
let fake_socket = fake.socket.clone();
tokio::spawn(async move {
loop {
let mut buf = [0u8; 128];
let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else {
return;
};
let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply");
let _ = fake_socket.send_to(&reply, from).await;
}
});
let (transport, _recv_task) =
A40Transport::from_socket(test_socket, fake.addr).expect("from_socket");
let controller = GimbalController::with_transport(initial_state(), transport);
let handle = controller.handle();
// Act
handle.zoom(4.0).await.expect("zoom");
// Assert
let snapshot = handle.state();
assert_eq!(snapshot.zoom, 4.0);
}
#[tokio::test]
async fn build_c1_camera_payload_matches_vendor_layout() {
// Arrange + Act
let payload = gimbal_controller::build_c1_camera(ImageSensor::Eo1, CameraCommand::ZoomIn);
// Assert — sanity-check the byte layout the transport will send.
assert_eq!(payload, [0x01, 0x09]);
}