//! AZ-659 — `FramePublisher` integration tests. //! //! These tests drive the publisher directly (no RTSP / decoder //! involved) so they execute in milliseconds and don't depend on //! libavcodec or NVDEC. The AZ-658 pipeline tests cover the //! lifecycle-loop integration end-to-end. //! //! ACs covered here: //! - AC-1 — three consumers consuming at-rate observe every frame and //! drop counters stay at 0. //! - AC-2 — a slow consumer's lag is folded into THAT consumer's //! drop counter while fast consumers continue to receive every //! frame. //! - AC-3 — zero-copy fan-out: every consumer receives the same //! `Arc` (asserted via `Arc::ptr_eq`) so memory does not //! scale with consumer count. use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use frame_ingest::{ConsumerId, FramePublisher, DEFAULT_CHANNEL_DEPTH}; use shared::models::frame::{Frame, PixelFormat}; use tokio::time::{sleep, timeout}; fn make_frame(seq: u64, pixels: Arc) -> Frame { Frame { seq, capture_ts_monotonic_ns: seq * 1_000_000, decode_ts_monotonic_ns: seq * 1_000_000 + 100, pixels, width: 320, height: 240, pix_fmt: PixelFormat::Nv12, ai_locked: false, } } /// AC-1 — three consumers consuming as fast as the publisher emits /// observe every frame; per-consumer drop counters stay at 0. The /// spec quotes 30 fps for 10 s (~300 frames); we use 30 frames at /// no artificial delay to keep CI under 1 s. The semantic property /// — "consumers that keep up never lose a frame" — is identical. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn ac1_three_consumers_at_rate_lose_no_frames() { // Arrange let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH)); let stats = publisher.stats(); let mut det = publisher.subscribe(ConsumerId::DetectionClient); let mut mov = publisher.subscribe(ConsumerId::MovementDetector); let mut tel = publisher.subscribe(ConsumerId::Telemetry); let total: u64 = 30; let publisher_for_task = Arc::clone(&publisher); // Act — drain in parallel while publishing. Each consumer drains // immediately, so the broadcast channel stays well under // `DEFAULT_CHANNEL_DEPTH` and no consumer can lag. let producer = tokio::spawn(async move { let payload = Arc::new(Bytes::from(vec![0xAAu8; 256])); for seq in 0..total { publisher_for_task.publish(make_frame(seq, Arc::clone(&payload))); // Yield so subscribers get a chance to drain between // sends; without this the producer races ahead and any // delay in tokio scheduling could falsely trip the lag // counter even for a "fast" consumer at this small scale. tokio::task::yield_now().await; } }); let drain = |mut rx: frame_ingest::FrameReceiver, label: &'static str| { tokio::spawn(async move { let mut got = 0u64; while got < total { match timeout(Duration::from_secs(2), rx.recv()).await { Ok(Ok(_)) => got += 1, Ok(Err(e)) => panic!("{label} recv closed early: {e}"), Err(_) => panic!("{label} stalled at {got}/{total}"), } } got }) }; let h_det = drain(det.take(), "detection_client"); let h_mov = drain(mov.take(), "movement_detector"); let h_tel = drain(tel.take(), "telemetry"); producer.await.expect("producer"); assert_eq!(h_det.await.expect("det join"), total); assert_eq!(h_mov.await.expect("mov join"), total); assert_eq!(h_tel.await.expect("tel join"), total); // Assert — every consumer drained at-rate, so no drops on any // counter and `publishes_total` matches the produced count. assert_eq!(stats.publishes_total(), total); assert_eq!(stats.drops_for(ConsumerId::DetectionClient), 0); assert_eq!(stats.drops_for(ConsumerId::MovementDetector), 0); assert_eq!(stats.drops_for(ConsumerId::Telemetry), 0); } /// AC-2 — a slow consumer (yields slowly) is the only one to incur /// drops; the fast consumers continue to observe every frame. The /// producer paces its sends at ~5 ms intervals so fast consumers /// can drain in between; the slow consumer sleeps ~25 ms per frame, /// so the broadcast channel laps it after a handful of frames. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn ac2_slow_consumer_drops_while_fast_consumers_unaffected() { // Arrange — depth-2 channel + a producer that paces sends. let channel_depth = 2usize; let publisher = Arc::new(FramePublisher::new(channel_depth)); let stats = publisher.stats(); let mut det = publisher.subscribe(ConsumerId::DetectionClient); // fast let mut mov = publisher.subscribe(ConsumerId::MovementDetector); // fast let mut tel = publisher.subscribe(ConsumerId::Telemetry); // SLOW let total: u64 = 30; let payload = Arc::new(Bytes::from(vec![0xBBu8; 64])); // Spawn consumers BEFORE the producer task so the broadcast // already has live subscribers when the first publish lands. let slow = tokio::spawn(async move { let mut got = 0u64; let deadline = Duration::from_secs(10); let start = tokio::time::Instant::now(); // The slow consumer keeps polling until the broadcast // channel closes (publisher drops) OR the safety deadline // fires. A `Closed` here is the natural termination signal // once the producer's `Arc` goes out of // scope; we don't try to predict how many frames it gets // because that depends on scheduling jitter. while start.elapsed() < deadline { match timeout(Duration::from_millis(500), tel.recv()).await { Ok(Ok(_)) => { got += 1; sleep(Duration::from_millis(25)).await; } Ok(Err(_)) => break, // Closed: producer finished. Err(_) => { // Timeout — assume producer is done and exit. break; } } } got }); let drain_fast = |mut rx: frame_ingest::FrameReceiver, label: &'static str| { tokio::spawn(async move { let mut got = 0u64; while got < total { match timeout(Duration::from_secs(3), rx.recv()).await { Ok(Ok(_)) => got += 1, Ok(Err(e)) => panic!("{label} recv closed early: {e}"), Err(_) => panic!("{label} stalled at {got}/{total}"), } } got }) }; let h_det = drain_fast(det.take(), "detection_client"); let h_mov = drain_fast(mov.take(), "movement_detector"); // Give consumers a moment to enter `recv` before producing. sleep(Duration::from_millis(10)).await; // Act — pace sends ~5 ms apart so fast consumers have time to // drain each frame before the next arrives. The slow consumer // can only process ~1 frame per 25 ms, so it inevitably lags. let publisher_for_task = Arc::clone(&publisher); let payload_for_task = Arc::clone(&payload); let producer = tokio::spawn(async move { for seq in 0..total { publisher_for_task.publish(make_frame(seq, Arc::clone(&payload_for_task))); sleep(Duration::from_millis(5)).await; } }); producer.await.expect("producer"); assert_eq!(h_det.await.expect("det join"), total); assert_eq!(h_mov.await.expect("mov join"), total); // Drop the last `Arc` so the slow consumer's // recv returns `Closed` and it can exit on its own. drop(publisher); let slow_got = slow.await.expect("slow join"); // Assert — the slow consumer dropped frames; the fast ones did // not. The exact drop count varies with scheduler jitter so we // assert "> 0" rather than a specific number. assert_eq!( stats.drops_for(ConsumerId::DetectionClient), 0, "fast consumer must not have any drops" ); assert_eq!( stats.drops_for(ConsumerId::MovementDetector), 0, "fast consumer must not have any drops" ); let tel_drops = stats.drops_for(ConsumerId::Telemetry); assert!( tel_drops > 0, "slow telemetry consumer must have at least one drop; got {tel_drops}" ); // Every frame is accounted for from the slow consumer's // perspective: delivered + dropped == published. assert_eq!( slow_got + tel_drops, stats.publishes_total(), "received + dropped must equal published for the slow consumer" ); } /// AC-3 — fan-out is zero-copy: each subscriber observes the SAME /// `Arc` for a given frame. Asserts the property via /// `Arc::ptr_eq` between the pixel handles delivered to two /// different consumers; the test does not depend on timing. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ac3_fan_out_is_zero_copy_via_arc_bytes() { // Arrange let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH)); let mut det = publisher.subscribe(ConsumerId::DetectionClient); let mut mov = publisher.subscribe(ConsumerId::MovementDetector); let mut tel = publisher.subscribe(ConsumerId::Telemetry); let payload = Arc::new(Bytes::from(vec![0xCDu8; 1024])); // Act publisher.publish(make_frame(42, Arc::clone(&payload))); let f_det = det.recv().await.expect("det recv"); let f_mov = mov.recv().await.expect("mov recv"); let f_tel = tel.recv().await.expect("tel recv"); // Assert — same Arc across consumers AND across publisher // boundary; the broadcast did not deep-clone Bytes anywhere. assert!(Arc::ptr_eq(&f_det.pixels, &payload)); assert!(Arc::ptr_eq(&f_mov.pixels, &payload)); assert!(Arc::ptr_eq(&f_tel.pixels, &payload)); assert!(Arc::ptr_eq(&f_det.pixels, &f_mov.pixels)); assert!(Arc::ptr_eq(&f_mov.pixels, &f_tel.pixels)); } // `FrameReceiver` does not implement `Copy` and the public surface // returns it by value, so we move it into the spawned task via // `take()` on a small helper. Defined here to keep test bodies tidy. trait Takeable { fn take(&mut self) -> frame_ingest::FrameReceiver; } impl Takeable for frame_ingest::FrameReceiver { fn take(&mut self) -> frame_ingest::FrameReceiver { // SAFETY: we replace `self` with a fresh detached receiver // that the test no longer uses; this lets us move ownership // out of a `&mut`-bound binding without unsafe code. std::mem::replace(self, dummy_receiver()) } } fn dummy_receiver() -> frame_ingest::FrameReceiver { let p = FramePublisher::new(1); p.subscribe(ConsumerId::DetectionClient) }