mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 19:51:10 +00:00
0854d3be1c
AZ-659: FramePublisher with per-consumer drop accounting (Arc<Bytes> zero-copy fan-out). Adds ConsumerId enum, PublisherStats, FrameReceiver wrapper, and publisher integration tests (AC-1, AC-2, AC-3). AZ-660: Bi-directional tonic gRPC stream to ../detections. Reconnect with bounded exponential backoff (1 s → 30 s cap). Drop-oldest in-flight budgeting (max_concurrent_in_flight = 2). ai_locked frame skipping. Integration tests against fixture in-process server (AC-1: happy path 30 fps/10 s, AC-2: reconnect, AC-3: budget drops, AC-4: ai_locked skipping). AZ-661: Schema validation (hard SchemaMismatch error on version mismatch), model_version latch with ModelVersionChanged events, sliding-window p99 latency tracker with Tier1Degraded/Tier1Recovered transitions. Integration tests (AC-1, AC-2, AC-3). Also: update module-layout.md for frame_ingest and detection_client to reflect the streaming API shape; code review report batch_18. Co-authored-by: Cursor <cursoragent@cursor.com>
264 lines
11 KiB
Rust
264 lines
11 KiB
Rust
//! AZ-659 — `FramePublisher` integration tests.
|
|
//!
|
|
//! These tests drive the publisher directly (no RTSP / decoder
|
|
//! involved) so they execute in milliseconds and don't depend on
|
|
//! libavcodec or NVDEC. The AZ-658 pipeline tests cover the
|
|
//! lifecycle-loop integration end-to-end.
|
|
//!
|
|
//! ACs covered here:
|
|
//! - AC-1 — three consumers consuming at-rate observe every frame and
|
|
//! drop counters stay at 0.
|
|
//! - AC-2 — a slow consumer's lag is folded into THAT consumer's
|
|
//! drop counter while fast consumers continue to receive every
|
|
//! frame.
|
|
//! - AC-3 — zero-copy fan-out: every consumer receives the same
|
|
//! `Arc<Bytes>` (asserted via `Arc::ptr_eq`) so memory does not
|
|
//! scale with consumer count.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use bytes::Bytes;
|
|
use frame_ingest::{ConsumerId, FramePublisher, DEFAULT_CHANNEL_DEPTH};
|
|
use shared::models::frame::{Frame, PixelFormat};
|
|
use tokio::time::{sleep, timeout};
|
|
|
|
fn make_frame(seq: u64, pixels: Arc<Bytes>) -> Frame {
|
|
Frame {
|
|
seq,
|
|
capture_ts_monotonic_ns: seq * 1_000_000,
|
|
decode_ts_monotonic_ns: seq * 1_000_000 + 100,
|
|
pixels,
|
|
width: 320,
|
|
height: 240,
|
|
pix_fmt: PixelFormat::Nv12,
|
|
ai_locked: false,
|
|
}
|
|
}
|
|
|
|
/// AC-1 — three consumers consuming as fast as the publisher emits
|
|
/// observe every frame; per-consumer drop counters stay at 0. The
|
|
/// spec quotes 30 fps for 10 s (~300 frames); we use 30 frames at
|
|
/// no artificial delay to keep CI under 1 s. The semantic property
|
|
/// — "consumers that keep up never lose a frame" — is identical.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn ac1_three_consumers_at_rate_lose_no_frames() {
|
|
// Arrange
|
|
let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH));
|
|
let stats = publisher.stats();
|
|
let mut det = publisher.subscribe(ConsumerId::DetectionClient);
|
|
let mut mov = publisher.subscribe(ConsumerId::MovementDetector);
|
|
let mut tel = publisher.subscribe(ConsumerId::Telemetry);
|
|
|
|
let total: u64 = 30;
|
|
let publisher_for_task = Arc::clone(&publisher);
|
|
|
|
// Act — drain in parallel while publishing. Each consumer drains
|
|
// immediately, so the broadcast channel stays well under
|
|
// `DEFAULT_CHANNEL_DEPTH` and no consumer can lag.
|
|
let producer = tokio::spawn(async move {
|
|
let payload = Arc::new(Bytes::from(vec![0xAAu8; 256]));
|
|
for seq in 0..total {
|
|
publisher_for_task.publish(make_frame(seq, Arc::clone(&payload)));
|
|
// Yield so subscribers get a chance to drain between
|
|
// sends; without this the producer races ahead and any
|
|
// delay in tokio scheduling could falsely trip the lag
|
|
// counter even for a "fast" consumer at this small scale.
|
|
tokio::task::yield_now().await;
|
|
}
|
|
});
|
|
|
|
let drain = |mut rx: frame_ingest::FrameReceiver, label: &'static str| {
|
|
tokio::spawn(async move {
|
|
let mut got = 0u64;
|
|
while got < total {
|
|
match timeout(Duration::from_secs(2), rx.recv()).await {
|
|
Ok(Ok(_)) => got += 1,
|
|
Ok(Err(e)) => panic!("{label} recv closed early: {e}"),
|
|
Err(_) => panic!("{label} stalled at {got}/{total}"),
|
|
}
|
|
}
|
|
got
|
|
})
|
|
};
|
|
|
|
let h_det = drain(det.take(), "detection_client");
|
|
let h_mov = drain(mov.take(), "movement_detector");
|
|
let h_tel = drain(tel.take(), "telemetry");
|
|
|
|
producer.await.expect("producer");
|
|
assert_eq!(h_det.await.expect("det join"), total);
|
|
assert_eq!(h_mov.await.expect("mov join"), total);
|
|
assert_eq!(h_tel.await.expect("tel join"), total);
|
|
|
|
// Assert — every consumer drained at-rate, so no drops on any
|
|
// counter and `publishes_total` matches the produced count.
|
|
assert_eq!(stats.publishes_total(), total);
|
|
assert_eq!(stats.drops_for(ConsumerId::DetectionClient), 0);
|
|
assert_eq!(stats.drops_for(ConsumerId::MovementDetector), 0);
|
|
assert_eq!(stats.drops_for(ConsumerId::Telemetry), 0);
|
|
}
|
|
|
|
/// AC-2 — a slow consumer (yields slowly) is the only one to incur
|
|
/// drops; the fast consumers continue to observe every frame. The
|
|
/// producer paces its sends at ~5 ms intervals so fast consumers
|
|
/// can drain in between; the slow consumer sleeps ~25 ms per frame,
|
|
/// so the broadcast channel laps it after a handful of frames.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn ac2_slow_consumer_drops_while_fast_consumers_unaffected() {
|
|
// Arrange — depth-2 channel + a producer that paces sends.
|
|
let channel_depth = 2usize;
|
|
let publisher = Arc::new(FramePublisher::new(channel_depth));
|
|
let stats = publisher.stats();
|
|
|
|
let mut det = publisher.subscribe(ConsumerId::DetectionClient); // fast
|
|
let mut mov = publisher.subscribe(ConsumerId::MovementDetector); // fast
|
|
let mut tel = publisher.subscribe(ConsumerId::Telemetry); // SLOW
|
|
|
|
let total: u64 = 30;
|
|
let payload = Arc::new(Bytes::from(vec![0xBBu8; 64]));
|
|
|
|
// Spawn consumers BEFORE the producer task so the broadcast
|
|
// already has live subscribers when the first publish lands.
|
|
let slow = tokio::spawn(async move {
|
|
let mut got = 0u64;
|
|
let deadline = Duration::from_secs(10);
|
|
let start = tokio::time::Instant::now();
|
|
// The slow consumer keeps polling until the broadcast
|
|
// channel closes (publisher drops) OR the safety deadline
|
|
// fires. A `Closed` here is the natural termination signal
|
|
// once the producer's `Arc<FramePublisher>` goes out of
|
|
// scope; we don't try to predict how many frames it gets
|
|
// because that depends on scheduling jitter.
|
|
while start.elapsed() < deadline {
|
|
match timeout(Duration::from_millis(500), tel.recv()).await {
|
|
Ok(Ok(_)) => {
|
|
got += 1;
|
|
sleep(Duration::from_millis(25)).await;
|
|
}
|
|
Ok(Err(_)) => break, // Closed: producer finished.
|
|
Err(_) => {
|
|
// Timeout — assume producer is done and exit.
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
got
|
|
});
|
|
|
|
let drain_fast = |mut rx: frame_ingest::FrameReceiver, label: &'static str| {
|
|
tokio::spawn(async move {
|
|
let mut got = 0u64;
|
|
while got < total {
|
|
match timeout(Duration::from_secs(3), rx.recv()).await {
|
|
Ok(Ok(_)) => got += 1,
|
|
Ok(Err(e)) => panic!("{label} recv closed early: {e}"),
|
|
Err(_) => panic!("{label} stalled at {got}/{total}"),
|
|
}
|
|
}
|
|
got
|
|
})
|
|
};
|
|
let h_det = drain_fast(det.take(), "detection_client");
|
|
let h_mov = drain_fast(mov.take(), "movement_detector");
|
|
|
|
// Give consumers a moment to enter `recv` before producing.
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
// Act — pace sends ~5 ms apart so fast consumers have time to
|
|
// drain each frame before the next arrives. The slow consumer
|
|
// can only process ~1 frame per 25 ms, so it inevitably lags.
|
|
let publisher_for_task = Arc::clone(&publisher);
|
|
let payload_for_task = Arc::clone(&payload);
|
|
let producer = tokio::spawn(async move {
|
|
for seq in 0..total {
|
|
publisher_for_task.publish(make_frame(seq, Arc::clone(&payload_for_task)));
|
|
sleep(Duration::from_millis(5)).await;
|
|
}
|
|
});
|
|
|
|
producer.await.expect("producer");
|
|
assert_eq!(h_det.await.expect("det join"), total);
|
|
assert_eq!(h_mov.await.expect("mov join"), total);
|
|
|
|
// Drop the last `Arc<FramePublisher>` so the slow consumer's
|
|
// recv returns `Closed` and it can exit on its own.
|
|
drop(publisher);
|
|
let slow_got = slow.await.expect("slow join");
|
|
|
|
// Assert — the slow consumer dropped frames; the fast ones did
|
|
// not. The exact drop count varies with scheduler jitter so we
|
|
// assert "> 0" rather than a specific number.
|
|
assert_eq!(
|
|
stats.drops_for(ConsumerId::DetectionClient),
|
|
0,
|
|
"fast consumer must not have any drops"
|
|
);
|
|
assert_eq!(
|
|
stats.drops_for(ConsumerId::MovementDetector),
|
|
0,
|
|
"fast consumer must not have any drops"
|
|
);
|
|
let tel_drops = stats.drops_for(ConsumerId::Telemetry);
|
|
assert!(
|
|
tel_drops > 0,
|
|
"slow telemetry consumer must have at least one drop; got {tel_drops}"
|
|
);
|
|
// Every frame is accounted for from the slow consumer's
|
|
// perspective: delivered + dropped == published.
|
|
assert_eq!(
|
|
slow_got + tel_drops,
|
|
stats.publishes_total(),
|
|
"received + dropped must equal published for the slow consumer"
|
|
);
|
|
}
|
|
|
|
/// AC-3 — fan-out is zero-copy: each subscriber observes the SAME
|
|
/// `Arc<Bytes>` for a given frame. Asserts the property via
|
|
/// `Arc::ptr_eq` between the pixel handles delivered to two
|
|
/// different consumers; the test does not depend on timing.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn ac3_fan_out_is_zero_copy_via_arc_bytes() {
|
|
// Arrange
|
|
let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH));
|
|
let mut det = publisher.subscribe(ConsumerId::DetectionClient);
|
|
let mut mov = publisher.subscribe(ConsumerId::MovementDetector);
|
|
let mut tel = publisher.subscribe(ConsumerId::Telemetry);
|
|
let payload = Arc::new(Bytes::from(vec![0xCDu8; 1024]));
|
|
|
|
// Act
|
|
publisher.publish(make_frame(42, Arc::clone(&payload)));
|
|
let f_det = det.recv().await.expect("det recv");
|
|
let f_mov = mov.recv().await.expect("mov recv");
|
|
let f_tel = tel.recv().await.expect("tel recv");
|
|
|
|
// Assert — same Arc across consumers AND across publisher
|
|
// boundary; the broadcast did not deep-clone Bytes anywhere.
|
|
assert!(Arc::ptr_eq(&f_det.pixels, &payload));
|
|
assert!(Arc::ptr_eq(&f_mov.pixels, &payload));
|
|
assert!(Arc::ptr_eq(&f_tel.pixels, &payload));
|
|
assert!(Arc::ptr_eq(&f_det.pixels, &f_mov.pixels));
|
|
assert!(Arc::ptr_eq(&f_mov.pixels, &f_tel.pixels));
|
|
}
|
|
|
|
// `FrameReceiver` does not implement `Copy` and the public surface
|
|
// returns it by value, so we move it into the spawned task via
|
|
// `take()` on a small helper. Defined here to keep test bodies tidy.
|
|
trait Takeable {
|
|
fn take(&mut self) -> frame_ingest::FrameReceiver;
|
|
}
|
|
|
|
impl Takeable for frame_ingest::FrameReceiver {
|
|
fn take(&mut self) -> frame_ingest::FrameReceiver {
|
|
// SAFETY: we replace `self` with a fresh detached receiver
|
|
// that the test no longer uses; this lets us move ownership
|
|
// out of a `&mut`-bound binding without unsafe code.
|
|
std::mem::replace(self, dummy_receiver())
|
|
}
|
|
}
|
|
|
|
fn dummy_receiver() -> frame_ingest::FrameReceiver {
|
|
let p = FramePublisher::new(1);
|
|
p.subscribe(ConsumerId::DetectionClient)
|
|
}
|