//! AZ-653 integration tests for the ViewPro A40 transport. //! //! Strategy: bring up a fake A40 endpoint on a second `UdpSocket` in //! the same process; pair it with the transport under test via a //! pre-bound `peer` address; drive scenarios by scripting the fake's //! reply behaviour (echo, drop, corrupt CRC). use std::net::{Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::Mutex; use gimbal_controller::{ build_a1_angles, decode_frame, encode_frame, A40Transport, CameraCommand, FrameId, GimbalCommand, GimbalController, ImageSensor, }; use shared::models::gimbal::GimbalState; const LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); fn loopback(port: u16) -> SocketAddr { SocketAddr::new(LOCALHOST.into(), port) } fn initial_state() -> GimbalState { GimbalState { yaw: 0.0, pitch: 0.0, zoom: 1.0, ts_monotonic_ns: 0, command_in_flight: false, } } /// Bind a UDP socket on an OS-chosen ephemeral port and return both /// the socket and the bound address. async fn bind_ephemeral() -> (Arc, SocketAddr) { let s = UdpSocket::bind(loopback(0)).await.expect("bind ephemeral"); let addr = s.local_addr().expect("local_addr"); (Arc::new(s), addr) } /// Helper — minimal fake A40 endpoint. Behaviour is supplied as a /// closure invoked for every inbound frame. struct FakeA40 { socket: Arc, addr: SocketAddr, } impl FakeA40 { async fn bind() -> Self { let (socket, addr) = bind_ephemeral().await; Self { socket, addr } } } #[tokio::test] async fn ac1_crc_round_trip_no_faults() { // Arrange — bring up the fake; build a yaw-30 A1 frame; spawn a // task that echoes the (well-formed) command back as a // T1_F1_B1_D1 reply (the vendor's angle-feedback frame). let fake = FakeA40::bind().await; let (test_socket, test_addr) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let fake_socket = fake.socket.clone(); let echo_task = tokio::spawn(async move { let mut buf = [0u8; 128]; let (n, from) = fake_socket .recv_from(&mut buf) .await .expect("fake recv_from"); // Validate the incoming A1 frame parses cleanly. let inbound = decode_frame(&buf[..n]).expect("inbound decode"); assert_eq!(inbound.frame_id, FrameId::A1); // Reply with T1_F1_B1_D1 (12 bytes of arbitrary feedback // payload — content unchecked by the transport). let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); fake_socket .send_to(&reply, from) .await .expect("fake send_to"); }); let (transport, _recv_task) = A40Transport::from_socket(test_socket.clone(), fake.addr).expect("from_socket"); let _ = test_addr; let payload = build_a1_angles(30.0, 0.0); // Act let reply = transport .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) .await .expect("send_with_response"); // Assert assert_eq!(reply.frame_id, FrameId::T1F1B1D1); assert_eq!(transport.faults().crc, 0); assert_eq!(transport.faults().timeout, 0); echo_task.await.expect("echo task"); } #[tokio::test] async fn ac2_crc_mismatch_counted_and_dropped() { // Arrange — fake echoes a frame whose checksum is one bit off. let fake = FakeA40::bind().await; let (test_socket, _) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let fake_socket = fake.socket.clone(); tokio::spawn(async move { let mut buf = [0u8; 128]; let (_n, from) = fake_socket .recv_from(&mut buf) .await .expect("fake recv_from"); // Craft a corrupt frame (flip the checksum). let mut reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); let last = reply.len() - 1; reply[last] ^= 0x01; fake_socket .send_to(&reply, from) .await .expect("fake send_to"); }); let (transport, _recv_task) = A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); let transport = transport .with_command_deadline(Duration::from_millis(80)) .with_max_retries(1); let payload = build_a1_angles(30.0, 0.0); // Act — must fail (the corrupt frame is dropped; no valid reply // arrives within the deadline). let result = transport .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) .await; // Assert — CRC counter incremented; timeout counter incremented // because no valid reply arrived. assert!( result.is_err(), "expected MaxRetriesExceeded; got {result:?}" ); // The receive loop is asynchronous; give it a tick to record. tokio::time::sleep(Duration::from_millis(20)).await; let faults = transport.faults(); assert!(faults.crc >= 1, "expected ≥1 CRC fault, got {}", faults.crc); assert!( faults.timeout >= 1, "expected ≥1 timeout fault, got {}", faults.timeout ); } #[tokio::test] async fn ac3_command_timeout_retries_then_succeeds() { // Arrange — fake drops the FIRST inbound frame silently; replies // to every subsequent one. let fake = FakeA40::bind().await; let (test_socket, _) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let drop_count = Arc::new(AtomicU8::new(0)); let fake_socket = fake.socket.clone(); let drop_count_for_task = drop_count.clone(); tokio::spawn(async move { loop { let mut buf = [0u8; 128]; let Ok((_n, from)) = fake_socket.recv_from(&mut buf).await else { return; }; let prior = drop_count_for_task.fetch_add(1, Ordering::Relaxed); if prior == 0 { // Silently drop the first command. continue; } let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); let _ = fake_socket.send_to(&reply, from).await; } }); let (transport, _recv_task) = A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); let transport = transport .with_command_deadline(Duration::from_millis(80)) .with_max_retries(3); let payload = build_a1_angles(30.0, 0.0); // Act let reply = transport .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) .await .expect("retry should succeed"); // Assert — exactly one timeout (first attempt dropped); reply // arrived on the second attempt. assert_eq!(reply.frame_id, FrameId::T1F1B1D1); let faults = transport.faults(); assert_eq!( faults.timeout, 1, "expected 1 timeout fault, got {}", faults.timeout ); assert_eq!(faults.crc, 0); assert!( drop_count.load(Ordering::Relaxed) >= 2, "fake should have seen ≥2 commands" ); } #[tokio::test] async fn ac4_cap_exhaustion_returns_max_retries_exceeded() { // Arrange — fake never replies. The transport should fail after // exactly `max_retries` attempts with `MaxRetriesExceeded`. let fake = FakeA40::bind().await; let (test_socket, _) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let attempts_seen = Arc::new(Mutex::new(0u32)); let fake_socket = fake.socket.clone(); let attempts_for_task = attempts_seen.clone(); tokio::spawn(async move { loop { let mut buf = [0u8; 128]; let Ok((_, _from)) = fake_socket.recv_from(&mut buf).await else { return; }; *attempts_for_task.lock().await += 1; // Never reply. } }); let (transport, _recv_task) = A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); let transport = transport .with_command_deadline(Duration::from_millis(60)) .with_max_retries(3); let payload = build_a1_angles(30.0, 0.0); // Act let err = transport .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) .await .expect_err("should hit cap"); // Assert assert!( matches!( err, gimbal_controller::A40Error::MaxRetriesExceeded { attempts: 3, .. } ), "expected MaxRetriesExceeded(3); got {err:?}" ); let faults = transport.faults(); assert_eq!( faults.timeout, 3, "expected 3 timeout faults; got {}", faults.timeout ); // Give the fake one final beat to record the final attempt. tokio::time::sleep(Duration::from_millis(20)).await; let seen = *attempts_seen.lock().await; assert_eq!(seen, 3, "fake should have seen exactly 3 attempts"); } #[tokio::test] async fn set_pose_via_transport_updates_state_stream() { // Arrange — full GimbalController + transport wired together; // fake echoes every A1 with a T1_F1_B1_D1 ack. let fake = FakeA40::bind().await; let (test_socket, _) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let fake_socket = fake.socket.clone(); tokio::spawn(async move { loop { let mut buf = [0u8; 128]; let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else { return; }; let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); let _ = fake_socket.send_to(&reply, from).await; } }); let (transport, _recv_task) = A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); let controller = GimbalController::with_transport(initial_state(), transport); let handle = controller.handle(); let mut state_rx = handle.state_stream(); // Act handle .set_pose(GimbalCommand { yaw_deg: 45.0, pitch_deg: -10.0, }) .await .expect("set_pose"); // Assert state_rx.changed().await.expect("state changed"); let snapshot = *state_rx.borrow(); assert_eq!(snapshot.yaw, 45.0); assert_eq!(snapshot.pitch, -10.0); assert_eq!(handle.faults().expect("transport present").timeout, 0); } #[tokio::test] async fn zoom_via_transport_updates_zoom_state() { // Arrange let fake = FakeA40::bind().await; let (test_socket, _) = bind_ephemeral().await; test_socket.connect(fake.addr).await.expect("connect"); let fake_socket = fake.socket.clone(); tokio::spawn(async move { loop { let mut buf = [0u8; 128]; let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else { return; }; let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); let _ = fake_socket.send_to(&reply, from).await; } }); let (transport, _recv_task) = A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); let controller = GimbalController::with_transport(initial_state(), transport); let handle = controller.handle(); // Act handle.zoom(4.0).await.expect("zoom"); // Assert let snapshot = handle.state(); assert_eq!(snapshot.zoom, 4.0); } #[tokio::test] async fn build_c1_camera_payload_matches_vendor_layout() { // Arrange + Act let payload = gimbal_controller::build_c1_camera(ImageSensor::Eo1, CameraCommand::ZoomIn); // Assert — sanity-check the byte layout the transport will send. assert_eq!(payload, [0x01, 0x09]); }