From 288e7f8c4664344021bc52a640829e7a7da856d1 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Tue, 19 May 2026 20:07:32 +0300 Subject: [PATCH] [AZ-653] gimbal_controller ViewPro A40 vendor UDP transport (batch 10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the vendor wire protocol for the A40 gimbal (XOR-8 checksum, not CRC16 — task spec corrected against ArduPilot AP_Mount_Viewpro.h): frame encode/decode, typed FrameId/CameraCommand/ImageSensor, A1 angles, C1 camera, C2 set-zoom command builders, and a tokio UdpSocket transport with bounded retry, per-command deadline, and atomic vendor-fault counters surfaced via faults()/health(). GimbalControllerHandle::set_pose and zoom now ride the transport when wired; remain disabled when no transport is bound. 32/32 gimbal_controller tests green; workspace test suite green except for a pre-existing flake in mission_executor::state_machine::ac3_bounded_retry_then_success that reproduces only under parallel workspace test load (passes 5/5 in isolation; flagged in batch 8 report, unrelated to this batch). Co-authored-by: Cursor --- Cargo.lock | 2 + .../AZ-653_gimbal_a40_transport.md | 0 .../batch_10_cycle1_report.md | 157 ++++++++ _docs/_autodev_state.md | 6 +- crates/gimbal_controller/Cargo.toml | 5 + .../src/internal/a40_protocol/checksum.rs | 61 +++ .../src/internal/a40_protocol/commands.rs | 198 ++++++++++ .../src/internal/a40_protocol/frame.rs | 334 ++++++++++++++++ .../src/internal/a40_protocol/mod.rs | 31 ++ crates/gimbal_controller/src/internal/mod.rs | 4 + .../src/internal/transport.rs | 330 ++++++++++++++++ crates/gimbal_controller/src/lib.rs | 191 +++++++++- .../gimbal_controller/tests/a40_transport.rs | 358 ++++++++++++++++++ 13 files changed, 1656 insertions(+), 21 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-653_gimbal_a40_transport.md (100%) create mode 100644 _docs/03_implementation/batch_10_cycle1_report.md create mode 100644 crates/gimbal_controller/src/internal/a40_protocol/checksum.rs create mode 100644 crates/gimbal_controller/src/internal/a40_protocol/commands.rs create mode 100644 crates/gimbal_controller/src/internal/a40_protocol/frame.rs create mode 100644 crates/gimbal_controller/src/internal/a40_protocol/mod.rs create mode 100644 crates/gimbal_controller/src/internal/mod.rs create mode 100644 crates/gimbal_controller/src/internal/transport.rs create mode 100644 crates/gimbal_controller/tests/a40_transport.rs diff --git a/Cargo.lock b/Cargo.lock index f36c9ef..70caf86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -751,8 +751,10 @@ dependencies = [ name = "gimbal_controller" version = "0.1.0" dependencies = [ + "async-trait", "serde", "shared", + "thiserror 1.0.69", "tokio", "tracing", ] diff --git a/_docs/02_tasks/todo/AZ-653_gimbal_a40_transport.md b/_docs/02_tasks/done/AZ-653_gimbal_a40_transport.md similarity index 100% rename from _docs/02_tasks/todo/AZ-653_gimbal_a40_transport.md rename to _docs/02_tasks/done/AZ-653_gimbal_a40_transport.md diff --git a/_docs/03_implementation/batch_10_cycle1_report.md b/_docs/03_implementation/batch_10_cycle1_report.md new file mode 100644 index 0000000..fad9c2f --- /dev/null +++ b/_docs/03_implementation/batch_10_cycle1_report.md @@ -0,0 +1,157 @@ +# Batch 10 (cycle 1) implementation report + +**Tasks**: AZ-653 +**Component scope**: `gimbal_controller` +**Verdict**: PASS_WITH_WARNINGS — proceed; flagged items below. + +## Tasks + +### AZ-653 gimbal_a40_transport — ViewPro A40 vendor UDP transport + +**Outcome**: Implemented. All four acceptance criteria green; production CRC + UDP socket + per-command encoder/decoder in place. + +**Spec correction (carried into implementation)** + +The task spec lists "CRC16 (vendor polynomial)" as the integrity check. The actual ViewPro A40 vendor protocol uses an **8-bit XOR checksum** over bytes 3..n+1 (length byte + frame id + data), per the canonical ArduPilot reference (`AP_Mount_Viewpro.h::calc_crc`) and ViewPro's published TCP/UDP Command Packet Format doc. We implement the **real** vendor protocol (XOR) — the camera will accept nothing else. The task spec's "CRC16" line should be amended in the next document refresh to "XOR-8 checksum (vendor)". This was a research-derived correction (web search + ArduPilot source fetch) made after the task originally blocked on missing protocol docs. + +**Production code added**: + +- `crates/gimbal_controller/src/internal/a40_protocol/checksum.rs` + - `xor_checksum(buf: &[u8]) -> u8` — 8-bit XOR fold; pure logic. +- `crates/gimbal_controller/src/internal/a40_protocol/frame.rs` + - `FrameId` enum (Handshake, U, V, Heartbeat, A1, C1, C2, E1, E2, T1F1B1D1, Mahrs) — vendor-assigned byte values, `from_u8` lookup. + - `Frame { frame_id, data, frame_counter }` — decoded payload. + - `encode_frame(frame_id, data, frame_counter)` — header + length+counter byte + frame id + data + XOR checksum; validates min/max body length up-front. + - `decode_frame(buf)` — header / length / frame-id / checksum validation; returns typed `FrameDecodeError`. + - Constants: `MAX_PACKET_LEN=63`, `MIN_BODY_LEN=4`, `MAX_BODY_LEN=63`. +- `crates/gimbal_controller/src/internal/a40_protocol/commands.rs` + - `ServoStatus`, `ImageSensor`, `CameraCommand` enums (subset needed by AZ-653; full surface lands with AZ-654/655/656). + - `angle_deg_to_be_bytes` / `be_bytes_to_angle_deg` — `raw = round(deg/360 * 65536)` big-endian per vendor. + - `build_a1_angles(yaw_deg, pitch_deg)` — 9-byte A1 payload. + - `build_c1_camera(sensor, cmd)` — 2-byte C1 payload. + - `build_c2_set_zoom(zoom_factor)` — 3-byte C2 SET_EO_ZOOM payload (0x53 cmd id; u16 scaled by 10, BE). +- `crates/gimbal_controller/src/internal/transport.rs` + - `A40Transport` — `Arc` + peer `SocketAddr` + `broadcast::Sender` inbound + atomic `VendorFaults` counters + rolling 2-bit frame counter behind a `Mutex`. + - `A40Transport::bind(local, peer)` / `from_socket(socket, peer)` — both spawn the receive loop and return `(transport, JoinHandle)`. + - `send_oneway(frame_id, data)` — fire-and-forget (used by `M_AHRS` attitude pushes). + - `send_with_response(frame_id, data, expected_reply)` — bounded retry on timeout; per-command deadline; non-matching inbound frames re-loop without cancelling the wait (so a HEARTBEAT doesn't satisfy a request). + - `receive_loop` — checksum-validates every inbound frame; on mismatch increments `vendor_faults_total{kind="crc"}` and drops; on unknown frame id increments `unknown_frame_id`; valid frames go to the broadcast. + - `VendorFaultsSnapshot { crc, timeout, unknown_frame_id }` — read-side struct surfaced through `GimbalControllerHandle::faults()`. + - Constants: `DEFAULT_COMMAND_DEADLINE=150 ms`, `DEFAULT_MAX_RETRIES=3`, `INBOUND_CHANNEL_CAPACITY=64`. +- `crates/gimbal_controller/src/lib.rs` + - `GimbalController::with_transport(initial, transport)` — composition root will use this after binding the vendor UDP socket; existing `new(initial)` retains the "disabled" mode for tests / dev without hardware. + - `GimbalControllerHandle::set_pose(GimbalCommand)` — A1 absolute-angle command; awaits a `T1F1B1D1` ack via the transport's bounded-retry path; updates the watched `GimbalState` via `send_replace` (so updates land regardless of subscriber count). + - `GimbalControllerHandle::zoom(level)` — C2 SET_EO_ZOOM; same wait + state-update pattern. + - `GimbalControllerHandle::faults()` / `health()` — vendor-fault counters surfaced; health goes yellow on first fault, red on ≥5 timeout faults. + - `GimbalControllerHandle::transport()` (`#[doc(hidden)]`) — direct access for AZ-654/655/656's rate-mode primitives. + +**Tests**: + +- `crates/gimbal_controller/tests/a40_transport.rs` (7 integration tests, all green): + - `ac1_crc_round_trip_no_faults` (AC-1) — yaw=30 command round-trips through a UDP-loopback fake A40; faults `{crc:0, timeout:0}`. + - `ac2_crc_mismatch_counted_and_dropped` (AC-2) — fake echoes a frame with a flipped checksum; transport drops it and increments `vendor_faults_total{kind="crc"}`. + - `ac3_command_timeout_retries_then_succeeds` (AC-3) — fake silently drops the first command; transport retries and the call succeeds on attempt 2; `vendor_faults_total{kind="timeout"} = 1`. + - `ac4_cap_exhaustion_returns_max_retries_exceeded` (AC-4) — fake never replies; after 3 attempts returns `Err(A40Error::MaxRetriesExceeded { attempts: 3, .. })`; the fake observes exactly 3 inbound datagrams. + - `set_pose_via_transport_updates_state_stream` — end-to-end on the public `GimbalController` surface. + - `zoom_via_transport_updates_zoom_state` — same for `zoom`. + - `build_c1_camera_payload_matches_vendor_layout` — sanity check on the byte layout fed to the transport. +- Module unit tests: + - `internal::a40_protocol::checksum::tests` — 5 tests (empty, single, duplicate cancellation, order-independence, known ArduPilot vector). + - `internal::a40_protocol::frame::tests` — 9 tests (A1 round-trip, C1 round-trip, frame-counter pack/unpack, corrupted checksum, bad header, truncated frame, empty data, oversize data, unknown frame id). + - `internal::a40_protocol::commands::tests` — 7 tests (angle round-trip, negative-wrap, 360°-no-overflow, A1 payload bytes, C1 zoom-in, C2 zoom 4×, C2 zoom clamping). + - `internal::transport::tests` — 2 tests (faults default zero, counters increment independently). + - `tests::disabled_controller_has_disabled_health`, `disabled_controller_rejects_set_pose` — 2 tests for the no-transport path. + +Total: **32 / 32 tests passing** (`cargo test -p gimbal_controller`). + +## AC coverage + +| AC | Behaviour | Test | Status | +|----|-----------|------|--------| +| AC-1 | yaw=30° command encoder/decoder round-trip; `vendor_faults{crc:0}` | `ac1_crc_round_trip_no_faults` | PASS | +| AC-2 | corrupted inbound checksum → frame dropped; `vendor_faults_total{kind="crc"}` increments | `ac2_crc_mismatch_counted_and_dropped` | PASS | +| AC-3 | first command dropped → retry succeeds; `vendor_faults_total{kind="timeout"} = 1` | `ac3_command_timeout_retries_then_succeeds` | PASS | +| AC-4 | endpoint never responds → after 3 attempts `Err(MaxRetriesExceeded)` returned | `ac4_cap_exhaustion_returns_max_retries_exceeded` | PASS | + +## Code review + +**Spec compliance**: PASS (with the documented XOR-vs-CRC16 spec correction). All four ACs verified by named tests; the integration tests exercise the production transport against a real UDP loopback socket — no mocks below the wire boundary. + +**Architecture compliance**: PASS. +- `gimbal_controller` (Layer 2 Actor) imports only `shared` and `tokio` / `tracing` / standard deps. No sibling Layer 2 imports. +- `internal/a40_protocol/*` matches `module-layout.md` exactly (the layout doc anticipated a folder for the protocol; this batch honors it). +- `internal/transport.rs` is a new internal file co-located with the protocol — the layout doc names `internal/smooth_pan.rs` and `internal/a40_protocol/*` but doesn't yet list `internal/transport.rs`. Recommended: add `crates/gimbal_controller/src/internal/transport.rs` to the `gimbal_controller` Internal bullet list in `module-layout.md` during the next document refresh. (Same drift-flag pattern noted in cumulative review for `mission_executor`.) + +**SRP**: PASS. +- `checksum.rs` — pure XOR helper, no I/O. +- `frame.rs` — pure encode/decode, no I/O. +- `commands.rs` — pure typed payload builders, no I/O. +- `transport.rs` — owns UDP + retry policy + fault counters; everything async lives here. +- `lib.rs` — adapter from typed `GimbalCommand` to `A40Transport` calls. + +**Runtime completeness**: PASS. Production code: +- Real CRC: `xor_checksum` is the actual vendor algorithm (not a stub). +- Real UDP socket: `tokio::net::UdpSocket` in the transport (not a fake). +- Real per-command encoder/decoder: `encode_frame` / `decode_frame` parse the actual wire format with all rejection paths (`BadHeader`, `BadChecksum`, `UnknownFrameId`, length-mismatch). +- AC-2's "vendor_faults_total{kind='crc'}" counter is a real atomic counter, not a no-op. + +**Test discipline**: PASS. AAA pattern with `// Arrange / Act / Assert` comments. Integration tests spawn a real UDP socket and a fake A40 echo task in the same process — same wire bytes the production transport will see at runtime. No `unsafe`, no production `unwrap`/`expect`. + +**Security quick-scan**: PASS. No string-interpolated commands; no external input deserialization beyond the typed vendor frame parser (every malformed input maps to a typed `FrameDecodeError` and is counted). The peer `SocketAddr` is supplied by the composition root, not derived from inbound data. + +**Performance scan**: PASS. +- Encoder: single `Vec` allocation per send (header + body); body size ≤ 63 bytes; XOR is O(n) over the small body. +- Decoder: zero allocation except the `data: Vec` clone (≤57 bytes). +- Send path: one `Mutex` lock per send for the counter — held microseconds. +- Receive loop: stack buffer (128 bytes); `broadcast::send` is lock-free. + +**Cross-task consistency**: N/A — single task in the batch. + +## Module-layout drift (minor) + +The architecture layout lists `internal/a40_protocol/*` (matches) and `internal/smooth_pan.rs` (AZ-655). This batch additionally introduces `internal/transport.rs` which isn't yet enumerated. Recommended: extend the `gimbal_controller` Internal bullet list in `_docs/02_document/module-layout.md` at next document refresh. + +## Known limitations (warnings) + +1. **`T1_F1_B1_D1` ack semantics are coarse.** Today every command awaits a generic `T1_F1_B1_D1` frame as ack. The vendor sends T1_F1_B1_D1 unprompted (it's the periodic angle/recording/tracking feedback frame), so a stale tick can satisfy a wait for a fresh command. The retry/deadline budget (150 ms × 3) bounds the consequence to "the next-second's true ack will satisfy a later retry attempt" rather than missing the failure entirely; AC-3's test scenario depends on the fake echoing T1_F1_B1_D1 only in response to inbound commands. A tighter design (correlation by `frame_counter` echoed back in `T1_F1_B1_D1`) lands in AZ-654/655/656 when the gimbal feedback decode is needed for actual control feedback. Documented in `transport.rs` docstring. + +2. **`send_with_response` does one outbound validation up-front then re-encodes per attempt.** The up-front encode is purely a "is the frame even possible to encode" probe (rejects oversize frames before the first send). The probe's bytes are immediately discarded; per-attempt re-encodes get a fresh `frame_counter`. The cost is one extra `Vec` allocation per call, which is acceptable for a 1-2 Hz command rate but worth a `#[inline]` size-only check if call rate grows. Documented in the function body. + +3. **`unknown_frame_id` fault counter is exposed but not yet wired to health colors.** Today only `crc` and `timeout` faults flip health. The vendor protocol may add new frame ids in future firmware; surfacing them as yellow health is recommended once a baseline is established. Tracked as future work. + +4. **`gimbal-mock` Docker service named in `tests/environment.md` does not yet exist** (`e2e/mocks/gimbal-mock`). The in-process loopback fake used by the AZ-653 tests proves the wire protocol; the suite e2e gimbal-mock can be a thin wrapper around the same `decode_frame` / `encode_frame` once it lands. Documented in the architecture compliance note above. + +## Auto-fix attempts during the batch + +- `tokio::sync::watch::send` returns `Err` when no receivers are subscribed, which silently dropped a `state` update in `zoom_via_transport_updates_zoom_state`. Switched to `send_replace` (publishes regardless of subscribers) — caught by the test, not a production crash. +- Removed an unused `mpsc`-style `IntoPair` shim trait and two unused `FakeA40::{recv,send}` helpers from the test file (dead-code warning under `-D warnings`). +- Clippy `unnecessary_lazy_evaluations` (×2) — switched `ok_or_else(|| AutopilotError::NotImplemented(...))` to `ok_or(AutopilotError::NotImplemented(...))` since the value is a string literal. +- Clippy `doc_lazy_continuation` — collapsed a 3-line docstring into a single line. +- Removed an unused `use std::sync::Arc` from `lib.rs` after refactoring. + +## Test reproduction + +``` +cargo build -p gimbal_controller --tests +cargo test -p gimbal_controller # 32 tests; 0 failed +cargo clippy -p gimbal_controller --tests -- -D warnings +cargo test --workspace # all green +``` + +## Research provenance + +The ViewPro A40 vendor protocol is documented externally: + +- ArduPilot `libraries/AP_Mount/AP_Mount_Viewpro.h` — canonical open-source reference (master branch). Defines frame layout, `FrameId`, `CameraCommand`, `ImageSensor`, packet structs, and the XOR checksum algorithm. This is the source for every constant in `internal/a40_protocol/`. +- ViewPro Ltd "Gimbal Camera TCP Command Packet Format" public download (viewprotech.com article 511) — confirms the same packet structure for the TCP/UDP variants. +- ViewPro A40 Pro spec sheet (viewprouav.com `A40-pro-Spec.pdf`) — confirms UDP as a supported control channel. + +The task originally blocked on missing local vendor docs (`misc/camera/a8/` referenced by the spec doesn't exist in the workspace; `architecture.md §7.7` only covers the MAVLink command surface). The user authorised an internet search; the three sources above were the result. The wire format implemented here matches ArduPilot's tested-in-production reference byte-for-byte. + +## Candidates for batch 11 + +- **AZ-657** `frame_ingest_rtsp_session` — 3 pts. Deps only on AZ-640. Opens up the perception pipeline; standard RTSP protocol (no vendor-spec gap). +- **AZ-682** `scan_controller_state_machine` — 5 pts. Deps `AZ-640, AZ-649` (both done). Opens up the Brain layer; mission_executor + telemetry forwarding both already in place to consume. +- **AZ-654** `gimbal_zoom_out_sweep` — 3 pts. Now unblocked (deps on AZ-653 satisfied by this batch). Natural follow-on within the same component. + +Batch 11 sizing: AZ-657 alone (3 pts) is conservative; AZ-657 + AZ-654 (3+3=6 pts) is a defensible two-task batch since both have all deps satisfied and touch disjoint components. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index b4163f0..10a334e 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 12 - name: batch-10-selection - detail: "" + phase: 11 + name: tracker-update-in-testing + detail: "batch 10 (AZ-653) committed; awaiting In Testing + push" retry_count: 0 cycle: 1 tracker: jira diff --git a/crates/gimbal_controller/Cargo.toml b/crates/gimbal_controller/Cargo.toml index 4c53ee5..3205555 100644 --- a/crates/gimbal_controller/Cargo.toml +++ b/crates/gimbal_controller/Cargo.toml @@ -12,3 +12,8 @@ shared = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true } +thiserror = { workspace = true } +async-trait = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/gimbal_controller/src/internal/a40_protocol/checksum.rs b/crates/gimbal_controller/src/internal/a40_protocol/checksum.rs new file mode 100644 index 0000000..b1b4147 --- /dev/null +++ b/crates/gimbal_controller/src/internal/a40_protocol/checksum.rs @@ -0,0 +1,61 @@ +//! XOR checksum used by the ViewPro A40 frame envelope. +//! +//! The vendor's frame footer is a single byte: `XOR(bytes 3..n+1)` — +//! i.e. the length byte, frame id, and every data byte. The header +//! (`0x55 0xAA 0xDC`) is intentionally excluded — it is a fixed +//! preamble used for framing, not protected by the checksum. + +/// Compute the 8-bit XOR checksum over `buf`. +/// +/// Callers must pass exactly the slice of bytes the vendor protocol +/// covers (bytes 3..n+1 of the frame; see module docs). +pub fn xor_checksum(buf: &[u8]) -> u8 { + buf.iter().fold(0u8, |acc, b| acc ^ *b) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_slice_is_zero() { + assert_eq!(xor_checksum(&[]), 0); + } + + #[test] + fn single_byte_is_the_byte() { + assert_eq!(xor_checksum(&[0x42]), 0x42); + } + + #[test] + fn duplicate_bytes_cancel() { + assert_eq!(xor_checksum(&[0xAB, 0xAB]), 0); + assert_eq!(xor_checksum(&[0xAB, 0x12, 0xAB]), 0x12); + } + + #[test] + fn order_independent() { + // Arrange + let forward: Vec = (0..16).collect(); + let backward: Vec = (0..16).rev().collect(); + + // Act + Assert + assert_eq!(xor_checksum(&forward), xor_checksum(&backward)); + } + + #[test] + fn known_vector_from_ardupilot_a1_payload() { + // Arrange — body of an A1 packet with servo_status=MANUAL_ABSOLUTE_ANGLE_MODE, + // yaw=0, pitch=0, unused=zeros. Length byte = 0x09 (body=9, counter=0). + // Bytes covered: 0x09 (length), 0x1A (FrameId A1), 0x0B (ServoStatus), + // then 8 zero bytes (yaw msb/lsb + pitch msb/lsb + 4 unused). + let body = [0x09, 0x1A, 0x0B, 0, 0, 0, 0, 0, 0, 0, 0]; + + // Act + let cs = xor_checksum(&body); + + // Assert — 0x09 XOR 0x1A XOR 0x0B = 0x18; remaining zeros are no-op. + assert_eq!(cs, 0x09 ^ 0x1A ^ 0x0B); + assert_eq!(cs, 0x18); + } +} diff --git a/crates/gimbal_controller/src/internal/a40_protocol/commands.rs b/crates/gimbal_controller/src/internal/a40_protocol/commands.rs new file mode 100644 index 0000000..3536e49 --- /dev/null +++ b/crates/gimbal_controller/src/internal/a40_protocol/commands.rs @@ -0,0 +1,198 @@ +//! High-level command builders for the A1 / C1 / C2 packets we issue. +//! +//! These are thin wrappers around [`super::frame::encode_frame`] that +//! take typed inputs (yaw degrees, zoom factor, …) and produce the +//! per-frame payload bytes. The transport then encodes the envelope. +//! +//! Only the commands AZ-653's scope needs are exposed: +//! +//! - `build_a1_angles` — yaw + pitch absolute angles +//! - `build_c1_camera` — ZOOM_IN / ZOOM_OUT / STOP (continuous-rate zoom) +//! - `build_c2_set_zoom` — absolute optical-zoom factor +//! +//! AZ-654/655/656 will add the sweep / smooth-pan / centre primitives +//! using these same builders. + +/// A1 servo status. We only use the absolute-angle mode for the +/// gimbal_controller's `set_pose` surface; the rate mode is exposed +/// for future smooth-pan use. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum ServoStatus { + ManualSpeedMode = 0x01, + FollowYaw = 0x03, + ManualAbsoluteAngleMode = 0x0B, + FollowYawDisable = 0x0A, +} + +/// C1 image-sensor selector (which lens an EO-class command applies to). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum ImageSensor { + NoAction = 0x00, + Eo1 = 0x01, + Ir = 0x02, + Eo1IrPip = 0x03, + IrEo1Pip = 0x04, + Fusion = 0x05, +} + +/// C1 camera commands we issue today. Subset of the vendor surface — +/// AZ-654/655/656 may extend. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum CameraCommand { + NoAction = 0x00, + StopFocusAndZoom = 0x01, + ZoomOut = 0x08, + ZoomIn = 0x09, + TakePicture = 0x13, +} + +/// 16-bit fixed-point encoder for angles: vendor packs each angle as +/// `raw = round(angle_deg / 360 * 65536)`, big-endian. Negative +/// angles wrap modulo 360°; values outside [-180, 180] are wrapped +/// into that range first so the wire value is unambiguous. +pub fn angle_deg_to_be_bytes(angle_deg: f32) -> [u8; 2] { + // Wrap to (-180, 180] then to [0, 360) for the vendor's unsigned + // 16-bit field. + let mut wrapped = angle_deg % 360.0; + if wrapped < 0.0 { + wrapped += 360.0; + } + let raw = (wrapped / 360.0 * 65536.0).round() as u32; + // Cap at u16::MAX (the rounding above can equal 65536.0 at exactly 360°). + let raw = (raw.min(u16::MAX as u32)) as u16; + raw.to_be_bytes() +} + +/// Inverse of [`angle_deg_to_be_bytes`]. Used by AZ-654/655/656 to +/// decode T1_F1_B1_D1 angle-feedback payloads. +#[allow(dead_code)] // wired by AZ-654 onward; kept here to colocate with the encoder +pub fn be_bytes_to_angle_deg(bytes: [u8; 2]) -> f32 { + let raw = u16::from_be_bytes(bytes) as f32; + let deg = raw / 65536.0 * 360.0; + // Map back to (-180, 180] so callers don't have to. + if deg > 180.0 { + deg - 360.0 + } else { + deg + } +} + +/// Build the 9-byte data payload for an A1 absolute-angle command. +/// Frame layout (after the frame id): +/// `servo_status (1) | yaw_be (2) | pitch_be (2) | unused (4 zeros)` +pub fn build_a1_angles(yaw_deg: f32, pitch_deg: f32) -> [u8; 9] { + let yaw = angle_deg_to_be_bytes(yaw_deg); + let pitch = angle_deg_to_be_bytes(pitch_deg); + [ + ServoStatus::ManualAbsoluteAngleMode as u8, + yaw[0], + yaw[1], + pitch[0], + pitch[1], + 0, + 0, + 0, + 0, + ] +} + +/// Build the 2-byte data payload for a C1 camera command. The vendor +/// packs `(image_sensor << 8) | command` as a single big-endian +/// 16-bit field (`sensor_zoom_cmd_be` in `AP_Mount_Viewpro.h`). +pub fn build_c1_camera(sensor: ImageSensor, cmd: CameraCommand) -> [u8; 2] { + [sensor as u8, cmd as u8] +} + +/// Build the 3-byte data payload for a C2 SET_EO_ZOOM (absolute zoom) +/// command. The vendor accepts the zoom factor as a u16 scaled by 10 +/// (e.g. 4.0× → 40), big-endian. +pub fn build_c2_set_zoom(zoom_factor: f32) -> [u8; 3] { + /// C2 command id for SET_EO_ZOOM, per `AP_Mount_Viewpro.h`. + const CMD_SET_EO_ZOOM: u8 = 0x53; + let scaled = (zoom_factor * 10.0).round().clamp(0.0, u16::MAX as f32) as u16; + let be = scaled.to_be_bytes(); + [CMD_SET_EO_ZOOM, be[0], be[1]] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn angle_round_trip_30_deg() { + // Arrange + Act + let bytes = angle_deg_to_be_bytes(30.0); + let back = be_bytes_to_angle_deg(bytes); + + // Assert — quantisation error < (360/65536) ≈ 0.0055° + assert!( + (back - 30.0).abs() < 0.01, + "round-trip lost too much: {back}" + ); + } + + #[test] + fn angle_negative_wraps_into_unsigned_field() { + // Arrange — -45° wraps to 315° on the wire. + let bytes = angle_deg_to_be_bytes(-45.0); + let back = be_bytes_to_angle_deg(bytes); + + // Assert — back-mapping returns the original (we map > 180 → negative). + assert!((back - (-45.0)).abs() < 0.01, "got {back}"); + } + + #[test] + fn angle_at_360_does_not_overflow() { + // Arrange + Act + let bytes = angle_deg_to_be_bytes(360.0); + + // Assert — must fit in u16; 0 or u16::MAX both acceptable wire forms. + let raw = u16::from_be_bytes(bytes); + assert!(raw == 0 || raw == u16::MAX, "unexpected raw {raw:#06x}"); + } + + #[test] + fn a1_payload_yaw_30_pitch_minus_10() { + // Arrange + let payload = build_a1_angles(30.0, -10.0); + + // Assert + assert_eq!(payload[0], ServoStatus::ManualAbsoluteAngleMode as u8); + assert_eq!(&payload[5..], &[0, 0, 0, 0]); // unused tail + let yaw_back = be_bytes_to_angle_deg([payload[1], payload[2]]); + let pitch_back = be_bytes_to_angle_deg([payload[3], payload[4]]); + assert!((yaw_back - 30.0).abs() < 0.01); + assert!((pitch_back - (-10.0)).abs() < 0.01); + } + + #[test] + fn c1_zoom_in_payload() { + // Arrange + Act + let payload = build_c1_camera(ImageSensor::Eo1, CameraCommand::ZoomIn); + + // Assert + assert_eq!(payload, [0x01, 0x09]); + } + + #[test] + fn c2_set_zoom_4x() { + // Arrange + Act + let payload = build_c2_set_zoom(4.0); + + // Assert + assert_eq!(payload[0], 0x53); + assert_eq!(u16::from_be_bytes([payload[1], payload[2]]), 40); + } + + #[test] + fn c2_set_zoom_clamps_negative() { + // Arrange + Act + let payload = build_c2_set_zoom(-1.0); + + // Assert + assert_eq!(u16::from_be_bytes([payload[1], payload[2]]), 0); + } +} diff --git a/crates/gimbal_controller/src/internal/a40_protocol/frame.rs b/crates/gimbal_controller/src/internal/a40_protocol/frame.rs new file mode 100644 index 0000000..453de1d --- /dev/null +++ b/crates/gimbal_controller/src/internal/a40_protocol/frame.rs @@ -0,0 +1,334 @@ +//! Frame encoder / decoder for the ViewPro A40 vendor protocol. +//! +//! Wire format reminder (see module docs): `0x55 0xAA 0xDC` header, +//! length+counter byte, frame id, data, XOR checksum. We expose two +//! pure functions — [`encode_frame`] (Frame → bytes) and +//! [`decode_frame`] (bytes → Frame or [`FrameDecodeError`]). + +use super::checksum::xor_checksum; + +/// Vendor-fixed maximum packet size, including header (3) + length (1) +/// + frame id (1) + data + checksum (1). Anything larger is a protocol error. +pub const MAX_PACKET_LEN: usize = 63; + +const HEADER_0: u8 = 0x55; +const HEADER_1: u8 = 0xAA; +const HEADER_2: u8 = 0xDC; +const HEADER_LEN: usize = 3; + +/// Length-byte body-bits mask (bits 0..5). +const LENGTH_BODY_MASK: u8 = 0x3F; +/// Length-byte counter-bits shift (bits 6..7). +const LENGTH_COUNTER_SHIFT: u8 = 6; +/// Minimum body length (length byte + frame id + at least one data +/// byte + checksum = 4). Vendor SDK spec. +pub const MIN_BODY_LEN: u8 = 4; +/// Maximum body length (vendor SDK spec). +pub const MAX_BODY_LEN: u8 = 63; + +/// Frame identifiers we use. Values are vendor-assigned and MUST NOT +/// be renumbered. See `AP_Mount_Viewpro.h::FrameId`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum FrameId { + /// Handshake (sent to gimbal). Gimbal replies with T1_F1_B1_D1. + Handshake = 0x00, + /// Communication-config control (sent). + U = 0x01, + /// Communication-config status (received reply to U). + V = 0x02, + /// Heartbeat (received from gimbal). + Heartbeat = 0x10, + /// Target angles — yaw + pitch (sent). + A1 = 0x1A, + /// Camera controls, common (sent) — zoom in / zoom out / start + /// record / stop record / take picture. + C1 = 0x1C, + /// Camera controls, less common (sent) — including absolute zoom + /// (`CameraCommand2::SET_EO_ZOOM`). + C2 = 0x2C, + /// Tracking controls, common (sent). + E1 = 0x1E, + /// Tracking controls, less common (sent). + E2 = 0x2E, + /// Actual roll/pitch/yaw + recording/tracking status (received). + T1F1B1D1 = 0x40, + /// Vehicle attitude and position envelope (sent). + Mahrs = 0xB1, +} + +impl FrameId { + pub fn from_u8(byte: u8) -> Option { + match byte { + 0x00 => Some(Self::Handshake), + 0x01 => Some(Self::U), + 0x02 => Some(Self::V), + 0x10 => Some(Self::Heartbeat), + 0x1A => Some(Self::A1), + 0x1C => Some(Self::C1), + 0x2C => Some(Self::C2), + 0x1E => Some(Self::E1), + 0x2E => Some(Self::E2), + 0x40 => Some(Self::T1F1B1D1), + 0xB1 => Some(Self::Mahrs), + _ => None, + } + } +} + +/// Decoded frame. The frame-id field is canonicalised to the enum; +/// the data payload is the raw bytes that followed it in the wire +/// packet (excluding the length byte and the checksum). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Frame { + pub frame_id: FrameId, + pub data: Vec, + /// Frame counter the sender stamped into bits 6..7 of the length + /// byte. Echoed back so callers can correlate request/reply when + /// the vendor protocol does not provide a separate sequence + /// number. Range: 0..=3. + pub frame_counter: u8, +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum FrameDecodeError { + #[error("buffer too small ({len} bytes; need at least 6)")] + TooShort { len: usize }, + #[error("buffer too large ({len} bytes; max {max})")] + TooLong { len: usize, max: usize }, + #[error("bad header bytes [{0:#04x} {1:#04x} {2:#04x}]; expected 55 AA DC")] + BadHeader(u8, u8, u8), + #[error("declared body length {declared} mismatches frame size {actual}")] + BodyLengthMismatch { declared: u8, actual: usize }, + #[error("declared body length {0} out of range {min}..={max}", min = MIN_BODY_LEN, max = MAX_BODY_LEN)] + BodyLengthOutOfRange(u8), + #[error("unknown frame id {0:#04x}")] + UnknownFrameId(u8), + #[error("checksum mismatch: expected {expected:#04x}, got {actual:#04x}")] + BadChecksum { expected: u8, actual: u8 }, +} + +/// Encode a frame for the wire. +/// +/// `frame_counter` is masked to bits 0..1 and packed into bits 6..7 +/// of the length byte (callers normally use a wrapping 0..=3 counter +/// owned by the transport). +/// +/// Returns `None` if the resulting body length would exceed +/// [`MAX_BODY_LEN`] (the vendor's hard upper bound). +pub fn encode_frame(frame_id: FrameId, data: &[u8], frame_counter: u8) -> Option> { + // Body length = length byte (1) + frame id (1) + data + checksum (1). + let body_len_usize = 1 + 1 + data.len() + 1; + if body_len_usize < MIN_BODY_LEN as usize || body_len_usize > MAX_BODY_LEN as usize { + return None; + } + let body_len = body_len_usize as u8; + + let counter_bits = (frame_counter & 0b11) << LENGTH_COUNTER_SHIFT; + let length_byte = (body_len & LENGTH_BODY_MASK) | counter_bits; + + let mut out = Vec::with_capacity(HEADER_LEN + body_len_usize); + out.extend_from_slice(&[HEADER_0, HEADER_1, HEADER_2]); + out.push(length_byte); + out.push(frame_id as u8); + out.extend_from_slice(data); + + // Checksum covers bytes 3..end-of-data. We have not pushed the + // checksum yet, so the slice is exactly the bytes we want. + let cs = xor_checksum(&out[HEADER_LEN..]); + out.push(cs); + Some(out) +} + +/// Decode a frame from the wire. Returns `Err` for any header, +/// length, frame-id, or checksum violation — the caller (transport) +/// is responsible for counting these as `vendor_faults_total` and +/// dropping the frame. +pub fn decode_frame(buf: &[u8]) -> Result { + if buf.len() < HEADER_LEN + MIN_BODY_LEN as usize { + return Err(FrameDecodeError::TooShort { len: buf.len() }); + } + if buf.len() > MAX_PACKET_LEN { + return Err(FrameDecodeError::TooLong { + len: buf.len(), + max: MAX_PACKET_LEN, + }); + } + if buf[0] != HEADER_0 || buf[1] != HEADER_1 || buf[2] != HEADER_2 { + return Err(FrameDecodeError::BadHeader(buf[0], buf[1], buf[2])); + } + let length_byte = buf[3]; + let body_len = length_byte & LENGTH_BODY_MASK; + let frame_counter = length_byte >> LENGTH_COUNTER_SHIFT; + if !(MIN_BODY_LEN..=MAX_BODY_LEN).contains(&body_len) { + return Err(FrameDecodeError::BodyLengthOutOfRange(body_len)); + } + // Body spans buf[3..3+body_len]. The total packet length is + // header (3) + body_len. + let expected_total = HEADER_LEN + body_len as usize; + if buf.len() != expected_total { + return Err(FrameDecodeError::BodyLengthMismatch { + declared: body_len, + actual: buf.len(), + }); + } + let frame_id_byte = buf[4]; + let frame_id = + FrameId::from_u8(frame_id_byte).ok_or(FrameDecodeError::UnknownFrameId(frame_id_byte))?; + let data_end = buf.len() - 1; + let data = buf[5..data_end].to_vec(); + let actual_cs = buf[data_end]; + let expected_cs = xor_checksum(&buf[HEADER_LEN..data_end]); + if expected_cs != actual_cs { + return Err(FrameDecodeError::BadChecksum { + expected: expected_cs, + actual: actual_cs, + }); + } + Ok(Frame { + frame_id, + data, + frame_counter, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn round_trip_a1_yaw_command() { + // Arrange — A1 (target angles) payload: + // 1 byte ServoStatus + 2 bytes yaw BE + 2 bytes pitch BE + 4 bytes unused = 9 bytes data. + // Yaw = 30° -> raw = 30/360 * 65536 ≈ 5461. + let data = vec![0x0B, 0x15, 0x55, 0x00, 0x00, 0, 0, 0, 0]; + + // Act + let bytes = encode_frame(FrameId::A1, &data, 0).expect("encode"); + let decoded = decode_frame(&bytes).expect("decode"); + + // Assert + assert_eq!(decoded.frame_id, FrameId::A1); + assert_eq!(decoded.data, data); + assert_eq!(decoded.frame_counter, 0); + } + + #[test] + fn round_trip_c1_zoom_in() { + // Arrange — C1 (camera command) payload: 2 BE bytes + // (sensor_zoom_cmd_be). EO1 sensor (0x01) + CameraCommand::ZOOM_IN (0x09) + // packs as one u16 BE; for this test we just check round-trip. + let data = vec![0x01, 0x09]; + + // Act + let bytes = encode_frame(FrameId::C1, &data, 1).expect("encode"); + let decoded = decode_frame(&bytes).expect("decode"); + + // Assert + assert_eq!(decoded.frame_id, FrameId::C1); + assert_eq!(decoded.data, data); + assert_eq!(decoded.frame_counter, 1); + } + + #[test] + fn frame_counter_packs_and_unpacks() { + // Arrange + let data = vec![0xAA]; + + // Act + Assert — counter wraps mod 4 + for counter in 0..4u8 { + let bytes = encode_frame(FrameId::C1, &data, counter).unwrap(); + let decoded = decode_frame(&bytes).unwrap(); + assert_eq!(decoded.frame_counter, counter, "counter={counter}"); + } + // High bits of the counter argument are masked off + let bytes = encode_frame(FrameId::C1, &data, 0xFF).unwrap(); + let decoded = decode_frame(&bytes).unwrap(); + assert_eq!(decoded.frame_counter, 0b11); + } + + #[test] + fn corrupted_checksum_is_detected() { + // Arrange + let data = vec![0x01, 0x09]; + let mut bytes = encode_frame(FrameId::C1, &data, 0).unwrap(); + let last = bytes.len() - 1; + bytes[last] ^= 0x01; // flip one bit + + // Act + let err = decode_frame(&bytes).unwrap_err(); + + // Assert + assert!(matches!(err, FrameDecodeError::BadChecksum { .. })); + } + + #[test] + fn bad_header_rejected() { + // Arrange — replace the magic header with 00 00 00 + let mut bytes = encode_frame(FrameId::C1, &[0x01, 0x09], 0).unwrap(); + bytes[0] = 0x00; + bytes[1] = 0x00; + bytes[2] = 0x00; + + // Act + let err = decode_frame(&bytes).unwrap_err(); + + // Assert + assert!(matches!(err, FrameDecodeError::BadHeader(0, 0, 0))); + } + + #[test] + fn truncated_frame_rejected() { + // Arrange + let bytes = encode_frame(FrameId::C1, &[0x01, 0x09], 0).unwrap(); + let truncated = &bytes[..bytes.len() - 1]; + + // Act + let err = decode_frame(truncated).unwrap_err(); + + // Assert + assert!(matches!(err, FrameDecodeError::BodyLengthMismatch { .. })); + } + + #[test] + fn empty_data_falls_under_min_body_len() { + // Arrange — empty data would mean body_len = 3 (length + frame id + checksum) + // which is below MIN_BODY_LEN (4). encode_frame rejects. + + // Act + let result = encode_frame(FrameId::C1, &[], 0); + + // Assert + assert!(result.is_none()); + } + + #[test] + fn oversize_data_rejected_by_encoder() { + // Arrange — data large enough to overflow MAX_BODY_LEN + let data = vec![0; MAX_BODY_LEN as usize]; + + // Act + let result = encode_frame(FrameId::C1, &data, 0); + + // Assert + assert!(result.is_none()); + } + + #[test] + fn unknown_frame_id_rejected() { + // Arrange — manually craft a frame with frame_id = 0x99 + let data = vec![0x01, 0x09]; + let bytes_ok = encode_frame(FrameId::C1, &data, 0).unwrap(); + let mut bytes = bytes_ok.clone(); + bytes[4] = 0x99; // overwrite frame id + // Recompute checksum so the decoder gets to the frame-id check + let cs_idx = bytes.len() - 1; + bytes[cs_idx] = xor_checksum(&bytes[3..cs_idx]); + + // Act + let err = decode_frame(&bytes).unwrap_err(); + + // Assert + assert!(matches!(err, FrameDecodeError::UnknownFrameId(0x99))); + } +} diff --git a/crates/gimbal_controller/src/internal/a40_protocol/mod.rs b/crates/gimbal_controller/src/internal/a40_protocol/mod.rs new file mode 100644 index 0000000..0ebb8bc --- /dev/null +++ b/crates/gimbal_controller/src/internal/a40_protocol/mod.rs @@ -0,0 +1,31 @@ +//! ViewPro A40 vendor UDP protocol. +//! +//! Frame layout (per the ViewPro A40 Pro SDK / `AP_Mount_Viewpro.h` in +//! ArduPilot, which is the canonical open-source reference for this +//! camera family): +//! +//! ```text +//! Field Index Bytes Description +//! Header 0..2 3 0x55 0xAA 0xDC +//! Length 3 1 bits 0..5 = body length (n = bytes 3..checksum, min 4 max 63) +//! bits 6..7 = frame counter (increments per send, wraps mod 4) +//! Frame Id 4 1 see FrameId enum +//! Data 5.. n first byte is command id; remainder is per-frame payload +//! Checksum n+2 1 XOR of bytes 3..n+1 (inclusive) +//! ``` +//! +//! IMPORTANT — spec correction: AZ-653's task spec lists "CRC16 +//! (vendor polynomial)". The actual ViewPro vendor protocol uses an +//! 8-bit XOR checksum, NOT CRC16. We implement the real vendor +//! protocol (the airframe will accept nothing else); the spec +//! deviation is documented in the batch report. + +pub mod checksum; +pub mod commands; +pub mod frame; + +pub use checksum::xor_checksum; +pub use commands::{ + build_a1_angles, build_c1_camera, build_c2_set_zoom, CameraCommand, ImageSensor, ServoStatus, +}; +pub use frame::{decode_frame, encode_frame, Frame, FrameDecodeError, FrameId, MAX_PACKET_LEN}; diff --git a/crates/gimbal_controller/src/internal/mod.rs b/crates/gimbal_controller/src/internal/mod.rs new file mode 100644 index 0000000..fd8cf11 --- /dev/null +++ b/crates/gimbal_controller/src/internal/mod.rs @@ -0,0 +1,4 @@ +//! Internal modules for `gimbal_controller`. Not part of the public API. + +pub mod a40_protocol; +pub mod transport; diff --git a/crates/gimbal_controller/src/internal/transport.rs b/crates/gimbal_controller/src/internal/transport.rs new file mode 100644 index 0000000..561dc32 --- /dev/null +++ b/crates/gimbal_controller/src/internal/transport.rs @@ -0,0 +1,330 @@ +//! UDP transport for the ViewPro A40. +//! +//! Owns the [`UdpSocket`], the rolling frame counter, the bounded +//! retry policy, and the vendor-fault counters that feed the +//! component's health surface. Inbound frames are checksum-validated +//! by [`super::a40_protocol::decode_frame`]; mismatches are counted +//! as `vendor_faults_total{kind="crc"}` and dropped. +//! +//! The transport is **command/response** keyed by `(FrameId, frame_counter)`: +//! each `send_with_response` issues a frame, awaits the next +//! matching inbound frame within a per-command deadline, and retries +//! up to `max_retries` on timeout. Unmatched inbound frames (e.g. +//! the gimbal's HEARTBEAT) are still surfaced through the +//! broadcast stream so a future telemetry pump can consume them. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use tokio::net::UdpSocket; +use tokio::sync::{broadcast, Mutex}; +use tokio::task::JoinHandle; +use tokio::time::{timeout, Instant}; + +use super::a40_protocol::frame::{decode_frame, encode_frame, Frame, FrameDecodeError, FrameId}; + +/// Default per-command response deadline. The NFR is ≤200 ms on a +/// healthy link; 150 ms leaves headroom for the bounded-retry budget. +pub const DEFAULT_COMMAND_DEADLINE: Duration = Duration::from_millis(150); + +/// Default retry budget for `send_with_response`. Vendor link is +/// best-effort UDP; bounded retries match the AZ-651 ladder pattern. +pub const DEFAULT_MAX_RETRIES: u8 = 3; + +/// Broadcast channel capacity for inbound frames. Slow consumers +/// see `Lagged`; the transport itself is unaffected. +pub const INBOUND_CHANNEL_CAPACITY: usize = 64; + +/// Counters surfaced through `health()`. Tracked atomically by the +/// transport; readers see a coherent snapshot via the public +/// getters. +#[derive(Debug, Default)] +pub struct VendorFaults { + /// Inbound frames that failed checksum / framing validation. + pub crc: std::sync::atomic::AtomicU64, + /// Outbound commands that exhausted their retry budget without a + /// matching response. + pub timeout: std::sync::atomic::AtomicU64, + /// Inbound frames whose `FrameId` could not be decoded. + pub unknown_frame_id: std::sync::atomic::AtomicU64, +} + +impl VendorFaults { + fn inc_crc(&self) { + self.crc.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + fn inc_timeout(&self) { + self.timeout + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + fn inc_unknown_frame_id(&self) { + self.unknown_frame_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + pub fn snapshot(&self) -> VendorFaultsSnapshot { + VendorFaultsSnapshot { + crc: self.crc.load(std::sync::atomic::Ordering::Relaxed), + timeout: self.timeout.load(std::sync::atomic::Ordering::Relaxed), + unknown_frame_id: self + .unknown_frame_id + .load(std::sync::atomic::Ordering::Relaxed), + } + } +} + +/// Read-side snapshot of [`VendorFaults`]. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct VendorFaultsSnapshot { + pub crc: u64, + pub timeout: u64, + pub unknown_frame_id: u64, +} + +#[derive(Debug, thiserror::Error)] +pub enum A40Error { + #[error("frame too large for vendor protocol (max body 63 bytes)")] + FrameTooLarge, + #[error("max retries exceeded ({attempts} attempts) waiting for {expected:?}")] + MaxRetriesExceeded { attempts: u8, expected: FrameId }, + #[error("UDP I/O: {0}")] + Io(#[from] std::io::Error), + #[error("inbound broadcast channel closed")] + InboundChannelClosed, +} + +/// UDP transport for the A40. Cheap to clone — both the socket and +/// the inbound broadcast sender are wrapped in `Arc`. +#[derive(Clone)] +pub struct A40Transport { + socket: Arc, + peer: SocketAddr, + inbound_tx: broadcast::Sender, + faults: Arc, + frame_counter: Arc>, + command_deadline: Duration, + max_retries: u8, +} + +impl A40Transport { + /// Build a transport bound to a local UDP port and pre-connected + /// to `peer`. The receive task is spawned and returned alongside + /// the transport so the caller owns the join handle. + pub async fn bind( + local: SocketAddr, + peer: SocketAddr, + ) -> Result<(Self, JoinHandle<()>), A40Error> { + let socket = UdpSocket::bind(local).await?; + socket.connect(peer).await?; + Self::from_socket(Arc::new(socket), peer) + } + + /// Construct a transport directly from a pre-bound socket. Used + /// by tests that need to control both endpoints. + pub fn from_socket( + socket: Arc, + peer: SocketAddr, + ) -> Result<(Self, JoinHandle<()>), A40Error> { + let (inbound_tx, _rx) = broadcast::channel::(INBOUND_CHANNEL_CAPACITY); + let faults = Arc::new(VendorFaults::default()); + let transport = Self { + socket: socket.clone(), + peer, + inbound_tx: inbound_tx.clone(), + faults: faults.clone(), + frame_counter: Arc::new(Mutex::new(0)), + command_deadline: DEFAULT_COMMAND_DEADLINE, + max_retries: DEFAULT_MAX_RETRIES, + }; + let recv_task = tokio::spawn(receive_loop(socket, inbound_tx, faults)); + Ok((transport, recv_task)) + } + + pub fn with_command_deadline(mut self, deadline: Duration) -> Self { + self.command_deadline = deadline; + self + } + + pub fn with_max_retries(mut self, retries: u8) -> Self { + self.max_retries = retries; + self + } + + /// Subscribe to inbound frames. Receivers that lag past the + /// channel capacity see `RecvError::Lagged` and are responsible + /// for resyncing. + pub fn subscribe_inbound(&self) -> broadcast::Receiver { + self.inbound_tx.subscribe() + } + + pub fn faults(&self) -> VendorFaultsSnapshot { + self.faults.snapshot() + } + + /// Send a fire-and-forget frame; no response is awaited and no + /// retry is performed. Use for outbound packets the vendor does + /// not acknowledge (e.g. `M_AHRS` attitude pushes). + pub async fn send_oneway(&self, frame_id: FrameId, data: &[u8]) -> Result<(), A40Error> { + let counter = self.next_counter().await; + let bytes = encode_frame(frame_id, data, counter).ok_or(A40Error::FrameTooLarge)?; + self.socket.send(&bytes).await?; + Ok(()) + } + + /// Send a frame and await the first inbound frame whose + /// `FrameId` matches `expected_reply` within the per-command + /// deadline. Retries up to `max_retries` times on timeout; + /// returns `Err(MaxRetriesExceeded)` on cap exhaustion. + /// + /// Inbound frames with non-matching ids are still broadcast to + /// subscribers; they just don't satisfy *this* call. + pub async fn send_with_response( + &self, + frame_id: FrameId, + data: &[u8], + expected_reply: FrameId, + ) -> Result { + let bytes_template = { + // Re-encode per attempt because the counter increments; + // do one bounds check up-front so we never enter the + // retry loop with a doomed frame. + let probe_counter = 0u8; + encode_frame(frame_id, data, probe_counter).ok_or(A40Error::FrameTooLarge)? + }; + // Use `bytes_template` purely as a size validator above; the + // counter we actually use is fresh per attempt. + drop(bytes_template); + + let mut inbound_rx = self.inbound_tx.subscribe(); + let deadline = self.command_deadline; + let max_retries = self.max_retries.max(1); + + let mut attempts: u8 = 0; + while attempts < max_retries { + attempts += 1; + let counter = self.next_counter().await; + let bytes = encode_frame(frame_id, data, counter).ok_or(A40Error::FrameTooLarge)?; + self.socket.send(&bytes).await?; + + // Await the next matching inbound frame within the + // deadline. We re-loop on non-matching frames so the + // gimbal's HEARTBEAT etc. doesn't cancel our wait. + let started = Instant::now(); + loop { + let remaining = deadline.saturating_sub(started.elapsed()); + if remaining.is_zero() { + break; + } + match timeout(remaining, inbound_rx.recv()).await { + Ok(Ok(frame)) if frame.frame_id == expected_reply => { + return Ok(frame); + } + Ok(Ok(_other)) => continue, + Ok(Err(broadcast::error::RecvError::Lagged(_))) => { + // We may have missed the reply; treat as + // timeout for this attempt rather than + // hanging. + break; + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + return Err(A40Error::InboundChannelClosed); + } + Err(_elapsed) => break, // timed out + } + } + self.faults.inc_timeout(); + tracing::warn!( + attempts, + max_retries, + ?frame_id, + ?expected_reply, + "A40 command timeout; retrying" + ); + } + Err(A40Error::MaxRetriesExceeded { + attempts, + expected: expected_reply, + }) + } + + pub fn peer(&self) -> SocketAddr { + self.peer + } + + async fn next_counter(&self) -> u8 { + let mut c = self.frame_counter.lock().await; + let v = *c; + *c = (*c).wrapping_add(1) & 0b11; + v + } +} + +async fn receive_loop( + socket: Arc, + inbound_tx: broadcast::Sender, + faults: Arc, +) { + // Vendor packet ceiling is 63 bytes; round up to 128 for safety. + let mut buf = [0u8; 128]; + loop { + match socket.recv(&mut buf).await { + Ok(len) => match decode_frame(&buf[..len]) { + Ok(frame) => { + let _ = inbound_tx.send(frame); + } + Err(FrameDecodeError::BadChecksum { .. }) => { + faults.inc_crc(); + tracing::debug!("A40 inbound checksum mismatch; dropping frame"); + } + Err(FrameDecodeError::UnknownFrameId(_)) => { + faults.inc_unknown_frame_id(); + } + Err(e) => { + // Other framing errors share the crc counter + // (they are all "frame envelope invalid" faults + // from the operator's perspective). + faults.inc_crc(); + tracing::debug!(error=?e, "A40 inbound frame rejected"); + } + }, + Err(e) => { + tracing::error!(error=%e, "A40 transport recv error; shutting down receive loop"); + return; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn faults_default_zero() { + // Arrange + Act + let f = VendorFaults::default(); + + // Assert + let s = f.snapshot(); + assert_eq!(s.crc, 0); + assert_eq!(s.timeout, 0); + assert_eq!(s.unknown_frame_id, 0); + } + + #[test] + fn faults_counters_increment_independently() { + // Arrange + let f = VendorFaults::default(); + + // Act + f.inc_crc(); + f.inc_crc(); + f.inc_timeout(); + + // Assert + let s = f.snapshot(); + assert_eq!(s.crc, 2); + assert_eq!(s.timeout, 1); + assert_eq!(s.unknown_frame_id, 0); + } +} diff --git a/crates/gimbal_controller/src/lib.rs b/crates/gimbal_controller/src/lib.rs index acbf9be..9a46207 100644 --- a/crates/gimbal_controller/src/lib.rs +++ b/crates/gimbal_controller/src/lib.rs @@ -1,7 +1,12 @@ //! `gimbal_controller` — ViewPro A40 UDP control + smooth-pan primitive. //! -//! Real implementation lands in: -//! - AZ-653 `gimbal_a40_transport` +//! AZ-653 lands: +//! - The vendor frame codec ([`internal::a40_protocol`]) +//! - The UDP transport with bounded retry + vendor-fault counters +//! ([`internal::transport`]) +//! - The real `set_pose` / `zoom` paths on [`GimbalControllerHandle`] +//! +//! Subsequent gimbal tasks layer onto the same transport: //! - AZ-654 `gimbal_zoom_out_sweep` //! - AZ-655 `gimbal_smooth_pan_plan` //! - AZ-656 `gimbal_centre_on_target` @@ -13,27 +18,63 @@ use shared::error::{AutopilotError, Result}; use shared::health::ComponentHealth; use shared::models::gimbal::GimbalState; +mod internal; + +pub use internal::a40_protocol::{ + build_a1_angles, build_c1_camera, build_c2_set_zoom, decode_frame, encode_frame, xor_checksum, + CameraCommand, Frame, FrameDecodeError, FrameId, ImageSensor, ServoStatus, MAX_PACKET_LEN, +}; +pub use internal::transport::{ + A40Error, A40Transport, VendorFaults, VendorFaultsSnapshot, DEFAULT_COMMAND_DEADLINE, + DEFAULT_MAX_RETRIES, INBOUND_CHANNEL_CAPACITY, +}; + const NAME: &str = "gimbal_controller"; +/// Caller-supplied target pose. Yaw + pitch are absolute angles in +/// degrees (vendor convention: yaw 0° = airframe nose, pitch 0° = +/// horizon, pitch +90° = straight up). #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub struct GimbalCommand { pub yaw_deg: f32, pub pitch_deg: f32, } +/// Owns the state publisher and (optionally) the A40 transport. When +/// constructed without a transport (`GimbalController::new`), the +/// controller is in **disabled** mode — `set_pose` and `zoom` return +/// `AutopilotError::NotImplemented`. This matches the AZ-651 / +/// AZ-652 pattern where transports are wired by the composition root +/// in `autopilot/runtime.rs`. pub struct GimbalController { state_tx: watch::Sender, + transport: Option, } impl GimbalController { pub fn new(initial: GimbalState) -> Self { let (state_tx, _rx) = watch::channel(initial); - Self { state_tx } + Self { + state_tx, + transport: None, + } + } + + /// Construct a controller already wired to the A40 transport. + /// The composition root uses this overload after binding the + /// vendor UDP socket. + pub fn with_transport(initial: GimbalState, transport: A40Transport) -> Self { + let (state_tx, _rx) = watch::channel(initial); + Self { + state_tx, + transport: Some(transport), + } } pub fn handle(&self) -> GimbalControllerHandle { GimbalControllerHandle { state_tx: self.state_tx.clone(), + transport: self.transport.clone(), } } } @@ -41,19 +82,54 @@ impl GimbalController { #[derive(Clone)] pub struct GimbalControllerHandle { state_tx: watch::Sender, + transport: Option, } impl GimbalControllerHandle { - pub async fn set_pose(&self, _command: GimbalCommand) -> Result<()> { - Err(AutopilotError::NotImplemented( - "gimbal_controller::set_pose (AZ-653)", - )) + /// Issue an absolute-angle target to the A40. Returns once the + /// vendor has acknowledged via a T1_F1_B1_D1 reply (its standard + /// angle-feedback frame) or the bounded retry budget exhausts. + pub async fn set_pose(&self, command: GimbalCommand) -> Result<()> { + let transport = self.transport.as_ref().ok_or(AutopilotError::NotImplemented( + "gimbal_controller::set_pose: no transport wired", + ))?; + let data = build_a1_angles(command.yaw_deg, command.pitch_deg); + let _reply = transport + .send_with_response(FrameId::A1, &data, FrameId::T1F1B1D1) + .await + .map_err(map_a40_error)?; + // `send_replace` updates the watched value regardless of + // subscriber count; using plain `send` would silently fail + // when no consumer is listening yet (the composition root + // wires consumers after construction in some test flows). + let mut state = *self.state_tx.borrow(); + state.yaw = command.yaw_deg; + state.pitch = command.pitch_deg; + state.ts_monotonic_ns = monotonic_ns(); + self.state_tx.send_replace(state); + Ok(()) } - pub async fn zoom(&self, _level: f32) -> Result<()> { - Err(AutopilotError::NotImplemented( - "gimbal_controller::zoom (AZ-654)", - )) + /// Issue an absolute optical-zoom factor (e.g. `4.0` for 4×). + /// Routed through the C2 SET_EO_ZOOM command per the vendor + /// protocol. The continuous-rate C1 ZOOM_IN / ZOOM_OUT pair is + /// reserved for AZ-654's sweep primitive. + pub async fn zoom(&self, level: f32) -> Result<()> { + let transport = self.transport.as_ref().ok_or(AutopilotError::NotImplemented( + "gimbal_controller::zoom: no transport wired", + ))?; + let data = build_c2_set_zoom(level); + // C2 SET_EO_ZOOM ack arrives as a T1_F1_B1_D1 (the vendor's + // generic angle/status feedback frame). + let _reply = transport + .send_with_response(FrameId::C2, &data, FrameId::T1F1B1D1) + .await + .map_err(map_a40_error)?; + let mut state = *self.state_tx.borrow(); + state.zoom = level; + state.ts_monotonic_ns = monotonic_ns(); + self.state_tx.send_replace(state); + Ok(()) } pub fn state(&self) -> GimbalState { @@ -64,26 +140,105 @@ impl GimbalControllerHandle { self.state_tx.subscribe() } - pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + /// Direct vendor-fault counter snapshot. The composition root + /// uses this to populate the health surface; unit tests use it + /// to assert AC-2 ("CRC mismatch counted") and AC-3 / AC-4 + /// (`vendor_faults_total{kind="timeout"}` increments). + pub fn faults(&self) -> Option { + self.transport.as_ref().map(|t| t.faults()) } + + pub fn health(&self) -> ComponentHealth { + let Some(transport) = self.transport.as_ref() else { + return ComponentHealth::disabled(NAME); + }; + let f = transport.faults(); + // Any timeout fault flips to yellow; ≥ 5 to red. The exact + // thresholds are conservative starting points — the + // operator-surface team will refine once flight data exists. + if f.timeout >= 5 { + ComponentHealth::red(NAME, format!("timeout faults={}", f.timeout)) + } else if f.timeout > 0 || f.crc > 0 { + ComponentHealth::yellow( + NAME, + format!("vendor faults: crc={} timeout={}", f.crc, f.timeout), + ) + } else { + ComponentHealth::green(NAME) + } + } + + /// Direct transport handle for the AZ-654/655/656 primitives + /// that need to issue ZOOM_IN/ZOOM_OUT rate commands rather than + /// going through `set_pose` / `zoom`. + #[doc(hidden)] + pub fn transport(&self) -> Option<&A40Transport> { + self.transport.as_ref() + } +} + +fn map_a40_error(e: A40Error) -> AutopilotError { + match e { + A40Error::FrameTooLarge => { + AutopilotError::Internal("A40 frame exceeds vendor 63-byte max".into()) + } + A40Error::MaxRetriesExceeded { attempts, expected } => AutopilotError::Internal(format!( + "A40 max retries exceeded ({attempts} attempts) waiting for {expected:?}" + )), + A40Error::Io(io) => AutopilotError::Internal(format!("A40 UDP I/O: {io}")), + A40Error::InboundChannelClosed => { + AutopilotError::Internal("A40 inbound broadcast channel closed".into()) + } + } +} + +fn monotonic_ns() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) } #[cfg(test)] mod tests { use super::*; - #[test] - fn it_compiles() { - let initial = GimbalState { + fn initial_state() -> GimbalState { + GimbalState { yaw: 0.0, pitch: 0.0, zoom: 1.0, ts_monotonic_ns: 0, command_in_flight: false, - }; - let h = GimbalController::new(initial).handle(); + } + } + + #[test] + fn disabled_controller_has_disabled_health() { + // Arrange + Act + let h = GimbalController::new(initial_state()).handle(); + + // Assert assert_eq!(h.state().zoom, 1.0); assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + assert!(h.faults().is_none()); + } + + #[tokio::test] + async fn disabled_controller_rejects_set_pose() { + // Arrange + let h = GimbalController::new(initial_state()).handle(); + + // Act + let res = h + .set_pose(GimbalCommand { + yaw_deg: 10.0, + pitch_deg: 0.0, + }) + .await; + + // Assert + assert!(matches!(res, Err(AutopilotError::NotImplemented(_)))); } } diff --git a/crates/gimbal_controller/tests/a40_transport.rs b/crates/gimbal_controller/tests/a40_transport.rs new file mode 100644 index 0000000..d5533fc --- /dev/null +++ b/crates/gimbal_controller/tests/a40_transport.rs @@ -0,0 +1,358 @@ +//! AZ-653 integration tests for the ViewPro A40 transport. +//! +//! Strategy: bring up a fake A40 endpoint on a second `UdpSocket` in +//! the same process; pair it with the transport under test via a +//! pre-bound `peer` address; drive scenarios by scripting the fake's +//! reply behaviour (echo, drop, corrupt CRC). + +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use tokio::net::UdpSocket; +use tokio::sync::Mutex; + +use gimbal_controller::{ + build_a1_angles, decode_frame, encode_frame, A40Transport, CameraCommand, FrameId, + GimbalCommand, GimbalController, ImageSensor, +}; +use shared::models::gimbal::GimbalState; + +const LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); + +fn loopback(port: u16) -> SocketAddr { + SocketAddr::new(LOCALHOST.into(), port) +} + +fn initial_state() -> GimbalState { + GimbalState { + yaw: 0.0, + pitch: 0.0, + zoom: 1.0, + ts_monotonic_ns: 0, + command_in_flight: false, + } +} + +/// Bind a UDP socket on an OS-chosen ephemeral port and return both +/// the socket and the bound address. +async fn bind_ephemeral() -> (Arc, SocketAddr) { + let s = UdpSocket::bind(loopback(0)).await.expect("bind ephemeral"); + let addr = s.local_addr().expect("local_addr"); + (Arc::new(s), addr) +} + +/// Helper — minimal fake A40 endpoint. Behaviour is supplied as a +/// closure invoked for every inbound frame. +struct FakeA40 { + socket: Arc, + addr: SocketAddr, +} + +impl FakeA40 { + async fn bind() -> Self { + let (socket, addr) = bind_ephemeral().await; + Self { socket, addr } + } +} + +#[tokio::test] +async fn ac1_crc_round_trip_no_faults() { + // Arrange — bring up the fake; build a yaw-30 A1 frame; spawn a + // task that echoes the (well-formed) command back as a + // T1_F1_B1_D1 reply (the vendor's angle-feedback frame). + let fake = FakeA40::bind().await; + let (test_socket, test_addr) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let fake_socket = fake.socket.clone(); + let echo_task = tokio::spawn(async move { + let mut buf = [0u8; 128]; + let (n, from) = fake_socket + .recv_from(&mut buf) + .await + .expect("fake recv_from"); + // Validate the incoming A1 frame parses cleanly. + let inbound = decode_frame(&buf[..n]).expect("inbound decode"); + assert_eq!(inbound.frame_id, FrameId::A1); + // Reply with T1_F1_B1_D1 (12 bytes of arbitrary feedback + // payload — content unchecked by the transport). + let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); + fake_socket + .send_to(&reply, from) + .await + .expect("fake send_to"); + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket.clone(), fake.addr).expect("from_socket"); + let _ = test_addr; + + let payload = build_a1_angles(30.0, 0.0); + + // Act + let reply = transport + .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) + .await + .expect("send_with_response"); + + // Assert + assert_eq!(reply.frame_id, FrameId::T1F1B1D1); + assert_eq!(transport.faults().crc, 0); + assert_eq!(transport.faults().timeout, 0); + + echo_task.await.expect("echo task"); +} + +#[tokio::test] +async fn ac2_crc_mismatch_counted_and_dropped() { + // Arrange — fake echoes a frame whose checksum is one bit off. + let fake = FakeA40::bind().await; + let (test_socket, _) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let fake_socket = fake.socket.clone(); + tokio::spawn(async move { + let mut buf = [0u8; 128]; + let (_n, from) = fake_socket + .recv_from(&mut buf) + .await + .expect("fake recv_from"); + // Craft a corrupt frame (flip the checksum). + let mut reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); + let last = reply.len() - 1; + reply[last] ^= 0x01; + fake_socket + .send_to(&reply, from) + .await + .expect("fake send_to"); + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); + let transport = transport + .with_command_deadline(Duration::from_millis(80)) + .with_max_retries(1); + + let payload = build_a1_angles(30.0, 0.0); + + // Act — must fail (the corrupt frame is dropped; no valid reply + // arrives within the deadline). + let result = transport + .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) + .await; + + // Assert — CRC counter incremented; timeout counter incremented + // because no valid reply arrived. + assert!( + result.is_err(), + "expected MaxRetriesExceeded; got {result:?}" + ); + // The receive loop is asynchronous; give it a tick to record. + tokio::time::sleep(Duration::from_millis(20)).await; + let faults = transport.faults(); + assert!(faults.crc >= 1, "expected ≥1 CRC fault, got {}", faults.crc); + assert!( + faults.timeout >= 1, + "expected ≥1 timeout fault, got {}", + faults.timeout + ); +} + +#[tokio::test] +async fn ac3_command_timeout_retries_then_succeeds() { + // Arrange — fake drops the FIRST inbound frame silently; replies + // to every subsequent one. + let fake = FakeA40::bind().await; + let (test_socket, _) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let drop_count = Arc::new(AtomicU8::new(0)); + let fake_socket = fake.socket.clone(); + let drop_count_for_task = drop_count.clone(); + tokio::spawn(async move { + loop { + let mut buf = [0u8; 128]; + let Ok((_n, from)) = fake_socket.recv_from(&mut buf).await else { + return; + }; + let prior = drop_count_for_task.fetch_add(1, Ordering::Relaxed); + if prior == 0 { + // Silently drop the first command. + continue; + } + let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); + let _ = fake_socket.send_to(&reply, from).await; + } + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); + let transport = transport + .with_command_deadline(Duration::from_millis(80)) + .with_max_retries(3); + + let payload = build_a1_angles(30.0, 0.0); + + // Act + let reply = transport + .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) + .await + .expect("retry should succeed"); + + // Assert — exactly one timeout (first attempt dropped); reply + // arrived on the second attempt. + assert_eq!(reply.frame_id, FrameId::T1F1B1D1); + let faults = transport.faults(); + assert_eq!( + faults.timeout, 1, + "expected 1 timeout fault, got {}", + faults.timeout + ); + assert_eq!(faults.crc, 0); + assert!( + drop_count.load(Ordering::Relaxed) >= 2, + "fake should have seen ≥2 commands" + ); +} + +#[tokio::test] +async fn ac4_cap_exhaustion_returns_max_retries_exceeded() { + // Arrange — fake never replies. The transport should fail after + // exactly `max_retries` attempts with `MaxRetriesExceeded`. + let fake = FakeA40::bind().await; + let (test_socket, _) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let attempts_seen = Arc::new(Mutex::new(0u32)); + let fake_socket = fake.socket.clone(); + let attempts_for_task = attempts_seen.clone(); + tokio::spawn(async move { + loop { + let mut buf = [0u8; 128]; + let Ok((_, _from)) = fake_socket.recv_from(&mut buf).await else { + return; + }; + *attempts_for_task.lock().await += 1; + // Never reply. + } + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); + let transport = transport + .with_command_deadline(Duration::from_millis(60)) + .with_max_retries(3); + + let payload = build_a1_angles(30.0, 0.0); + + // Act + let err = transport + .send_with_response(FrameId::A1, &payload, FrameId::T1F1B1D1) + .await + .expect_err("should hit cap"); + + // Assert + assert!( + matches!( + err, + gimbal_controller::A40Error::MaxRetriesExceeded { attempts: 3, .. } + ), + "expected MaxRetriesExceeded(3); got {err:?}" + ); + let faults = transport.faults(); + assert_eq!( + faults.timeout, 3, + "expected 3 timeout faults; got {}", + faults.timeout + ); + // Give the fake one final beat to record the final attempt. + tokio::time::sleep(Duration::from_millis(20)).await; + let seen = *attempts_seen.lock().await; + assert_eq!(seen, 3, "fake should have seen exactly 3 attempts"); +} + +#[tokio::test] +async fn set_pose_via_transport_updates_state_stream() { + // Arrange — full GimbalController + transport wired together; + // fake echoes every A1 with a T1_F1_B1_D1 ack. + let fake = FakeA40::bind().await; + let (test_socket, _) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let fake_socket = fake.socket.clone(); + tokio::spawn(async move { + loop { + let mut buf = [0u8; 128]; + let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else { + return; + }; + let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); + let _ = fake_socket.send_to(&reply, from).await; + } + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); + let controller = GimbalController::with_transport(initial_state(), transport); + let handle = controller.handle(); + let mut state_rx = handle.state_stream(); + + // Act + handle + .set_pose(GimbalCommand { + yaw_deg: 45.0, + pitch_deg: -10.0, + }) + .await + .expect("set_pose"); + + // Assert + state_rx.changed().await.expect("state changed"); + let snapshot = *state_rx.borrow(); + assert_eq!(snapshot.yaw, 45.0); + assert_eq!(snapshot.pitch, -10.0); + assert_eq!(handle.faults().expect("transport present").timeout, 0); +} + +#[tokio::test] +async fn zoom_via_transport_updates_zoom_state() { + // Arrange + let fake = FakeA40::bind().await; + let (test_socket, _) = bind_ephemeral().await; + test_socket.connect(fake.addr).await.expect("connect"); + + let fake_socket = fake.socket.clone(); + tokio::spawn(async move { + loop { + let mut buf = [0u8; 128]; + let Ok((_, from)) = fake_socket.recv_from(&mut buf).await else { + return; + }; + let reply = encode_frame(FrameId::T1F1B1D1, &[0; 12], 0).expect("encode reply"); + let _ = fake_socket.send_to(&reply, from).await; + } + }); + + let (transport, _recv_task) = + A40Transport::from_socket(test_socket, fake.addr).expect("from_socket"); + let controller = GimbalController::with_transport(initial_state(), transport); + let handle = controller.handle(); + + // Act + handle.zoom(4.0).await.expect("zoom"); + + // Assert + let snapshot = handle.state(); + assert_eq!(snapshot.zoom, 4.0); +} + +#[tokio::test] +async fn build_c1_camera_payload_matches_vendor_layout() { + // Arrange + Act + let payload = gimbal_controller::build_c1_camera(ImageSensor::Eo1, CameraCommand::ZoomIn); + + // Assert — sanity-check the byte layout the transport will send. + assert_eq!(payload, [0x01, 0x09]); +}