Files
Oleksandr Bezdieniezhnykh 740bf37d76 [AZ-641] [AZ-642] [AZ-644] mavlink transport + codec + mission pull
Lands the second batch under epic AZ-626's implementation plan.

mavlink_layer (AZ-641 + AZ-642):
- Hand-rolled MAVLink v2 codec covering the §7.7 surface: HEARTBEAT,
  SYS_STATUS, SET_MODE, ATTITUDE, GLOBAL_POSITION_INT, MISSION_* (7),
  COMMAND_LONG, COMMAND_ACK, EXTENDED_SYS_STATE, STATUSTEXT (17 total).
- Streaming decoder demuxes arbitrary-sized byte arrivals, drops malformed
  frames with typed parse-error counters (crc/truncated/unknown_id/seq_gap),
  and surfaces sequence gaps without hard-failing the link.
- Encoder tracks the per-link tx_seq counter and applies the MAVLink v2
  trailing-zero payload truncation rule.
- UDP and POSIX-serial transports behind a single async Transport trait;
  the run loop owns transport open with bounded exponential backoff
  (2 s serial / 5 s UDP cap) and a tokio::select! per-link read+write
  loop.
- 1 Hz outbound HEARTBEAT scheduler + inbound-heartbeat watchdog that
  fires LinkUp / LinkLost on a broadcast channel and feeds health detail
  (connected, last_heartbeat_age_ms, signing_enabled, parse_errors).

mission_client (AZ-644):
- HTTPS GET /missions/{id} over rustls (no OpenSSL on the airframe).
- Bundled JSON Schema (crates/shared/contracts/mission-schema.json,
  draft-07, additionalProperties:false) validates every response;
  schema-invalid bodies surface as FetchError::SchemaInvalid with a
  1 KiB sample of the raw body for offline analysis.
- Transient failures (timeout, 5xx, 429) retry with bounded exponential
  backoff up to MissionClientOptions.max_attempts (default 5); permanent
  failures (4xx, malformed URL) abort immediately.
- Health surface mirrors AC-1's contract: last_fetch_ts,
  fetch_errors_total, schema_version, connection_state.

Caught and fixed before commit (NOT a code-review finding — caught by
the unit test that hand-computed CRC("123456789")): the hand-rolled
X.25 CRC accumulator was operating in u16 throughout. The MAVLink C
reference declares `tmp` as uint8_t, which silently truncates the
shifted-in bits. Round-trip tests passed (encoder and decoder shared
the bug); a real MAVLink peer would have rejected every frame. Fixed
by mirroring the C reference: `let mut tmp: u8 = …; tmp ^= tmp.wrapping_shl(4);`.
Added a regression test asserting CRC("123456789") == 0x6F91 against
pymavlink's reference value (NOT the textbook 0x29B1 — MAVLink uses a
byte-wise variant, not the bit-reflected CCITT).

AC verification (full detail in
_docs/03_implementation/batch_02_cycle1_report.md):

AZ-641: AC-1 + AC-3 + AC-4 verified via UDP loopback integration tests;
        AC-2 (serial) requires a socat pty pair and runs in the SITL/CI
        tier (test exists as #[ignore]-marked stub).
AZ-642: AC-1 + AC-2 + AC-3 verified via exhaustive codec round-trip and
        decoder negative-path tests; AC-4 (SITL round-trip) requires
        ArduPilot SITL — the CRC fix above means the codec is now
        wire-correct, ready for the sitl-conformance Woodpecker stage.
AZ-644: all four ACs verified via wiremock-driven integration tests.

Workspace gates green:
- cargo check --workspace                                clean
- cargo check --workspace --no-default-features          clean
- cargo fmt --all -- --check                             clean
- cargo clippy --workspace --all-targets -- -D warnings  clean
- cargo test --workspace                                 pass (1 expected ignore)

Layering invariants from module-layout.md hold: mavlink_layer and
mission_client are Layer 2 actors importing only `shared`; no sibling
Layer-2 imports; MavlinkHandle implements shared::contracts::MavlinkSink.

Jira: AZ-641, AZ-642, AZ-644 transitioned To Do → In Progress at batch
start; the matching In Testing transitions follow this commit.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 12:29:49 +03:00

171 lines
5.9 KiB
Rust

//! AZ-644 integration tests driven by `wiremock`.
//!
//! Coverage:
//! - AC-1: happy-path fetch returns `Ok(Mission)` + health reflects connection_state="ok"
//! - AC-2: schema-invalid response returns `Err(SchemaInvalid)` with a sample
//! - AC-3: transient 503 → 200 sequence retries within budget
//! - AC-4: 5 consecutive 503s → `Err(MaxRetriesExceeded)` and health red
use std::time::Duration;
use shared::health::HealthLevel;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use mission_client::{FetchError, MissionClient, MissionClientOptions};
fn good_mission_body(mission_id: &str) -> String {
serde_json::json!({
"mission_id": mission_id,
"schema_version": "1.0.0",
"items": [
{ "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "kind": "waypoint",
"at": { "latitude": 49.1, "longitude": 31.2, "altitude_m": 100.0 } }
],
"geofences": [],
"return_point": { "latitude": 49.0, "longitude": 31.0, "altitude_m": 0.0 }
})
.to_string()
}
fn options_for(mock: &MockServer, attempts: u32) -> MissionClientOptions {
let mut o = MissionClientOptions::new(mock.uri());
o.max_attempts = attempts;
o.backoff_base = Duration::from_millis(10);
o.backoff_cap = Duration::from_millis(50);
o.request_timeout = Duration::from_secs(2);
o.connect_timeout = Duration::from_secs(1);
o
}
#[tokio::test]
async fn ac1_happy_path_fetch() {
// Arrange
let mock = MockServer::start().await;
let mission_id = "11111111-2222-3333-4444-555555555555";
Mock::given(method("GET"))
.and(path(format!("/missions/{mission_id}")))
.respond_with(ResponseTemplate::new(200).set_body_string(good_mission_body(mission_id)))
.mount(&mock)
.await;
let client = MissionClient::new(options_for(&mock, 5)).expect("client builds");
let h = client.handle();
// Act
let mission = h.pull_mission(mission_id).await.expect("happy fetch");
// Assert
assert_eq!(mission.mission_id.to_string(), mission_id);
assert_eq!(mission.schema_version, "1.0.0");
let health = h.health();
assert_eq!(health.level, HealthLevel::Green);
}
#[tokio::test]
async fn ac2_schema_invalid_is_rejected() {
// Arrange: HTTP 200 but the body is missing the required `mission_id`.
let mock = MockServer::start().await;
let bad_body = serde_json::json!({
"schema_version": "1.0.0",
"items": [
{ "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "kind": "waypoint",
"at": { "latitude": 49.1, "longitude": 31.2, "altitude_m": 100.0 } }
],
"geofences": [],
"return_point": { "latitude": 49.0, "longitude": 31.0, "altitude_m": 0.0 }
})
.to_string();
Mock::given(method("GET"))
.and(path("/missions/M1"))
.respond_with(ResponseTemplate::new(200).set_body_string(bad_body.clone()))
.mount(&mock)
.await;
let client = MissionClient::new(options_for(&mock, 5)).expect("client builds");
let h = client.handle();
// Act
let err = h.pull_mission("M1").await.unwrap_err();
// Assert
match err {
FetchError::SchemaInvalid { messages, sample } => {
assert!(messages.iter().any(|m| m.contains("mission_id")));
assert!(!sample.is_empty());
}
other => panic!("expected SchemaInvalid, got {other:?}"),
}
let health = h.health();
assert_eq!(health.level, HealthLevel::Red);
}
#[tokio::test]
async fn ac3_transient_failure_retries_within_budget() {
// Arrange: first two requests return 503, third returns 200.
let mock = MockServer::start().await;
let mission_id = "22222222-3333-4444-5555-666666666666";
Mock::given(method("GET"))
.and(path(format!("/missions/{mission_id}")))
.respond_with(ResponseTemplate::new(503))
.up_to_n_times(2)
.mount(&mock)
.await;
Mock::given(method("GET"))
.and(path(format!("/missions/{mission_id}")))
.respond_with(ResponseTemplate::new(200).set_body_string(good_mission_body(mission_id)))
.mount(&mock)
.await;
let client = MissionClient::new(options_for(&mock, 5)).expect("client builds");
let h = client.handle();
// Act
let mission = h.pull_mission(mission_id).await.expect("retry succeeds");
// Assert
assert_eq!(mission.mission_id.to_string(), mission_id);
}
#[tokio::test]
async fn ac4_cap_exhaustion_returns_max_retries() {
// Arrange: every request returns 503; we configure 3 attempts to keep the test fast.
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/missions/M-cap"))
.respond_with(ResponseTemplate::new(503))
.mount(&mock)
.await;
let client = MissionClient::new(options_for(&mock, 3)).expect("client builds");
let h = client.handle();
// Act
let err = h.pull_mission("M-cap").await.unwrap_err();
// Assert
match err {
FetchError::MaxRetriesExceeded { attempts } => assert_eq!(attempts, 3),
other => panic!("expected MaxRetriesExceeded, got {other:?}"),
}
let health = h.health();
assert_eq!(health.level, HealthLevel::Red);
}
#[tokio::test]
async fn permanent_client_error_does_not_retry() {
// Arrange: 404 should be permanent (no retry).
let mock = MockServer::start().await;
let scoped_mock = Mock::given(method("GET"))
.and(path("/missions/M-perm"))
.respond_with(ResponseTemplate::new(404).set_body_string("not found"))
.expect(1)
.mount_as_scoped(&mock)
.await;
let client = MissionClient::new(options_for(&mock, 5)).expect("client builds");
let h = client.handle();
// Act
let err = h.pull_mission("M-perm").await.unwrap_err();
// Assert
assert!(matches!(err, FetchError::Permanent(_)));
drop(scoped_mock); // sanity-asserts the `.expect(1)` count was honored
}