//! AZ-649 acceptance criteria. //! //! AC-1 — telemetry reaches all three downstream consumers at the //! arriving rate. //! AC-2 — slow consumer drops, fast consumers unaffected. //! AC-3 — `latest_snapshot()` is monotonic across concurrent reads. use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use mission_executor::{Consumer, MavlinkProjection, TelemetryForwarder}; use shared::models::telemetry::{UavAttitude, UavPosition}; use tokio::time::timeout; fn pos(lat: i32) -> UavPosition { UavPosition { lat_e7: lat, lon_e7: 0, alt_m: 100.0, relative_alt_m: 50.0, vx_mps: 0.0, vy_mps: 0.0, vz_mps: 0.0, heading_deg: 0.0, ts_boot_ms: lat as u32, } } fn att(yaw: f32) -> UavAttitude { UavAttitude { roll: 0.0, pitch: 0.0, yaw, rollspeed: 0.0, pitchspeed: 0.0, yawspeed: 0.0, ts_boot_ms: 0, } } #[tokio::test] async fn ac1_telemetry_reaches_all_three_consumers() { // Arrange — three fast consumers and a producer that publishes // 10 alternating position/attitude updates (simulating 10 Hz). let f = Arc::new(TelemetryForwarder::new()); let mut rx_scan = f.subscribe(Consumer::ScanController); let mut rx_movement = f.subscribe(Consumer::MovementDetector); let mut rx_telemetry = f.subscribe(Consumer::TelemetryStream); // Act — publish 10 updates (5 position, 5 attitude). for i in 0..10 { if i % 2 == 0 { f.publish_from_mavlink(&MavlinkProjection::Position(pos(i))); } else { f.publish_from_mavlink(&MavlinkProjection::Attitude(att(i as f32))); } } // Assert — each consumer received exactly 10 snapshots; the last // one carries the latest position and last-set attitude. let mut count_scan = 0; let mut last_scan = None; while let Ok(snap) = rx_scan.try_recv() { count_scan += 1; last_scan = Some(snap); } assert_eq!(count_scan, 10); let snap = last_scan.unwrap(); assert_eq!(snap.position.unwrap().lat_e7, 8); assert_eq!(snap.attitude.unwrap().yaw, 9.0); let count_movement = drain_count(&mut rx_movement); let count_telemetry = drain_count(&mut rx_telemetry); assert_eq!(count_movement, 10); assert_eq!(count_telemetry, 10); // No drops on any channel — every consumer kept up. for c in Consumer::ALL { assert_eq!(f.drop_count(c), 0, "{} drop count should be 0", c.as_str()); } } fn drain_count(rx: &mut mission_executor::DropCountingReceiver) -> usize { let mut count = 0; while rx.try_recv().is_ok() { count += 1; } count } #[tokio::test] async fn ac2_slow_consumer_drops_fast_consumers_unaffected() { // Arrange — channel cap = 4. We publish 16 messages with a slow // consumer that waits before reading. The 16 - 4 = 12 oldest // messages should be overwritten in its buffer and surface as // Lagged(12) on the next recv. let f = Arc::new(TelemetryForwarder::with_capacity(4)); let mut slow = f.subscribe(Consumer::ScanController); let mut fast1 = f.subscribe(Consumer::MovementDetector); let mut fast2 = f.subscribe(Consumer::TelemetryStream); // Spawn fast consumers that drain into local counters as messages arrive. let fast1_count = Arc::new(AtomicU64::new(0)); let fast2_count = Arc::new(AtomicU64::new(0)); let fast1_count_h = fast1_count.clone(); let fast2_count_h = fast2_count.clone(); let fast1_task = tokio::spawn(async move { loop { match fast1.recv().await { Ok(_) => { fast1_count_h.fetch_add(1, Ordering::SeqCst); } Err(_) => return, } } }); let fast2_task = tokio::spawn(async move { loop { match fast2.recv().await { Ok(_) => { fast2_count_h.fetch_add(1, Ordering::SeqCst); } Err(_) => return, } } }); // Act — publish 16 messages with a tiny yield between each so the // fast consumers can keep up while the slow consumer stays // un-polled. for i in 0..16 { f.publish_from_mavlink(&MavlinkProjection::Position(pos(i))); tokio::time::sleep(Duration::from_millis(2)).await; } // Give the fast consumers a moment to flush. tokio::time::sleep(Duration::from_millis(50)).await; // Slow consumer reads ONE message — recv returns the next not- // yet-overwritten value AND the drop counter advances by // (16 - cap) under-the-hood. let _ = timeout(Duration::from_secs(1), slow.recv()).await.unwrap(); // Assert — fast consumers saw every message; slow saw drops. assert_eq!(fast1_count.load(Ordering::SeqCst), 16); assert_eq!(fast2_count.load(Ordering::SeqCst), 16); let slow_drops = f.drop_count(Consumer::ScanController); assert!( slow_drops > 0, "expected slow consumer to register some drops, got {slow_drops}" ); // Fast consumers saw zero drops. assert_eq!(f.drop_count(Consumer::MovementDetector), 0); assert_eq!(f.drop_count(Consumer::TelemetryStream), 0); // Cleanup fast1_task.abort(); fast2_task.abort(); let _ = fast1_task.await; let _ = fast2_task.await; } #[tokio::test] async fn ac3_latest_snapshot_is_monotonic_under_concurrent_reads() { // Arrange — a producer that publishes 1 000 times in a tight // loop, and 4 reader tasks that each take 1 000 snapshots and // verify monotonicity in their own observed sequence. let f = Arc::new(TelemetryForwarder::new()); let producer = { let f = f.clone(); tokio::spawn(async move { for i in 0..1_000_i32 { f.publish_from_mavlink(&MavlinkProjection::Position(pos(i))); tokio::task::yield_now().await; } }) }; let mut readers = Vec::new(); for _ in 0..4 { let f = f.clone(); readers.push(tokio::spawn(async move { let mut prev = 0u64; for _ in 0..1_000 { let snap = f.latest_snapshot(); assert!( snap.monotonic_ts_ns >= prev, "snapshot regressed: prev={} new={}", prev, snap.monotonic_ts_ns ); prev = snap.monotonic_ts_ns; tokio::task::yield_now().await; } })); } // Act / Assert — every task must complete without panicking. producer.await.unwrap(); for r in readers { r.await.unwrap(); } // Final snapshot must have a non-zero monotonic timestamp. assert!(f.last_monotonic_ns() > 0); }