mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 12:11:10 +00:00
0854d3be1c
AZ-659: FramePublisher with per-consumer drop accounting (Arc<Bytes> zero-copy fan-out). Adds ConsumerId enum, PublisherStats, FrameReceiver wrapper, and publisher integration tests (AC-1, AC-2, AC-3). AZ-660: Bi-directional tonic gRPC stream to ../detections. Reconnect with bounded exponential backoff (1 s → 30 s cap). Drop-oldest in-flight budgeting (max_concurrent_in_flight = 2). ai_locked frame skipping. Integration tests against fixture in-process server (AC-1: happy path 30 fps/10 s, AC-2: reconnect, AC-3: budget drops, AC-4: ai_locked skipping). AZ-661: Schema validation (hard SchemaMismatch error on version mismatch), model_version latch with ModelVersionChanged events, sliding-window p99 latency tracker with Tier1Degraded/Tier1Recovered transitions. Integration tests (AC-1, AC-2, AC-3). Also: update module-layout.md for frame_ingest and detection_client to reflect the streaming API shape; code review report batch_18. Co-authored-by: Cursor <cursoragent@cursor.com>
552 lines
19 KiB
Rust
552 lines
19 KiB
Rust
//! AZ-660 + AZ-661 integration tests — fixture in-process gRPC server.
|
|
//!
|
|
//! AC-660-1 takes ~10 s; all others complete in ≤5 s.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use async_trait::async_trait;
|
|
use bytes::Bytes;
|
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
|
use tonic::transport::Server;
|
|
use tonic::{Request, Response, Status};
|
|
|
|
use detection_client::internal::proto::{
|
|
detection_service_server::{DetectionService, DetectionServiceServer},
|
|
DetectionResponse, FrameRequest,
|
|
};
|
|
use detection_client::{ConnectionState, DetectionClient, DetectionClientConfig, DetectionEvent};
|
|
use shared::models::frame::{Frame, PixelFormat};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Frame factory
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn make_frame(seq: u64, ai_locked: bool) -> Frame {
|
|
Frame {
|
|
seq,
|
|
capture_ts_monotonic_ns: seq * 33_333_333,
|
|
decode_ts_monotonic_ns: seq * 33_333_333 + 1_000_000,
|
|
pixels: Arc::new(Bytes::from_static(b"\x80")),
|
|
width: 1,
|
|
height: 1,
|
|
pix_fmt: PixelFormat::Nv12,
|
|
ai_locked,
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Fixture: configurable echo server
|
|
//
|
|
// `close_after` is per-stream-session (reset on each `stream()` call) so the
|
|
// server can be re-used across reconnects without freezing on the second
|
|
// session.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[derive(Clone)]
|
|
struct FixtureServer {
|
|
latency_ms: u64,
|
|
schema_version: u32,
|
|
model_version: String,
|
|
close_after: Option<u32>,
|
|
}
|
|
|
|
impl FixtureServer {
|
|
fn fast() -> Self {
|
|
Self {
|
|
latency_ms: 10,
|
|
schema_version: 1,
|
|
model_version: "v1.0".to_string(),
|
|
close_after: None,
|
|
}
|
|
}
|
|
fn slow(latency_ms: u64) -> Self {
|
|
Self {
|
|
latency_ms,
|
|
..Self::fast()
|
|
}
|
|
}
|
|
fn with_schema_version(mut self, v: u32) -> Self {
|
|
self.schema_version = v;
|
|
self
|
|
}
|
|
fn with_close_after(mut self, n: u32) -> Self {
|
|
self.close_after = Some(n);
|
|
self
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl DetectionService for FixtureServer {
|
|
type StreamStream = ReceiverStream<Result<DetectionResponse, Status>>;
|
|
|
|
async fn stream(
|
|
&self,
|
|
request: Request<tonic::Streaming<FrameRequest>>,
|
|
) -> Result<Response<Self::StreamStream>, Status> {
|
|
let latency = Duration::from_millis(self.latency_ms);
|
|
let schema_version = self.schema_version;
|
|
let model_version = self.model_version.clone();
|
|
let close_after = self.close_after;
|
|
let mut inbound = request.into_inner();
|
|
let (tx, rx) = mpsc::channel::<Result<DetectionResponse, Status>>(32);
|
|
|
|
tokio::spawn(async move {
|
|
let mut session_count = 0u32;
|
|
while let Ok(Some(req)) = inbound.message().await {
|
|
tokio::time::sleep(latency).await;
|
|
session_count += 1;
|
|
let resp = DetectionResponse {
|
|
schema_version,
|
|
model_version: model_version.clone(),
|
|
frame_seq: req.frame_seq,
|
|
latency_ms: latency.as_millis() as u32,
|
|
detections: vec![],
|
|
};
|
|
if tx.send(Ok(resp)).await.is_err() {
|
|
break;
|
|
}
|
|
if close_after.map(|n| session_count >= n).unwrap_or(false) {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Response::new(ReceiverStream::new(rx)))
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Fixture: server that switches model_version mid-stream
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[derive(Clone)]
|
|
struct VersionSwitchServer {
|
|
first_model: String,
|
|
second_model: String,
|
|
/// Return `first_model` for the first `switch_after` responses, then
|
|
/// `second_model` for all subsequent ones within the SAME session.
|
|
switch_after: u32,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl DetectionService for VersionSwitchServer {
|
|
type StreamStream = ReceiverStream<Result<DetectionResponse, Status>>;
|
|
|
|
async fn stream(
|
|
&self,
|
|
request: Request<tonic::Streaming<FrameRequest>>,
|
|
) -> Result<Response<Self::StreamStream>, Status> {
|
|
let first = self.first_model.clone();
|
|
let second = self.second_model.clone();
|
|
let switch_after = self.switch_after;
|
|
let mut inbound = request.into_inner();
|
|
let (tx, rx) = mpsc::channel::<Result<DetectionResponse, Status>>(32);
|
|
|
|
tokio::spawn(async move {
|
|
let mut count = 0u32;
|
|
while let Ok(Some(req)) = inbound.message().await {
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
let model = if count < switch_after {
|
|
first.clone()
|
|
} else {
|
|
second.clone()
|
|
};
|
|
count += 1;
|
|
let resp = DetectionResponse {
|
|
schema_version: 1,
|
|
model_version: model,
|
|
frame_seq: req.frame_seq,
|
|
latency_ms: 10,
|
|
detections: vec![],
|
|
};
|
|
if tx.send(Ok(resp)).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Response::new(ReceiverStream::new(rx)))
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Server harness
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async fn start_server_with<S>(svc: S) -> (String, oneshot::Sender<()>)
|
|
where
|
|
S: DetectionService + Clone + Send + Sync + 'static,
|
|
{
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
let stream = TcpListenerStream::new(listener);
|
|
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
|
|
|
tokio::spawn(async move {
|
|
Server::builder()
|
|
.add_service(DetectionServiceServer::new(svc))
|
|
.serve_with_incoming_shutdown(stream, async {
|
|
let _ = shutdown_rx.await;
|
|
})
|
|
.await
|
|
.unwrap();
|
|
});
|
|
|
|
(format!("http://{addr}"), shutdown_tx)
|
|
}
|
|
|
|
async fn wait_connected(handle: &detection_client::DetectionClientHandle) {
|
|
let mut conn = handle.connection_state_stream();
|
|
tokio::time::timeout(Duration::from_secs(5), async {
|
|
loop {
|
|
if *conn.borrow() == ConnectionState::Connected {
|
|
break;
|
|
}
|
|
let _ = conn.changed().await;
|
|
}
|
|
})
|
|
.await
|
|
.expect("client connected within 5 s");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-660 AC-1 — happy path, 30 fps for 10 s, ≥285 batches, p99 ≤100 ms
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac660_1_happy_path_30fps_285_batches() {
|
|
// Arrange
|
|
let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await;
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(512);
|
|
let config = DetectionClientConfig::new(endpoint);
|
|
let (_join, handle) = DetectionClient::new(config).run(frame_rx);
|
|
wait_connected(&handle).await;
|
|
|
|
let mut events = handle.subscribe_events();
|
|
let collector = tokio::spawn(async move {
|
|
let mut count = 0u64;
|
|
loop {
|
|
match tokio::time::timeout(Duration::from_secs(2), events.recv()).await {
|
|
Ok(Ok(DetectionEvent::Batch { .. })) => count += 1,
|
|
Ok(Ok(_)) => {}
|
|
_ => break,
|
|
}
|
|
}
|
|
count
|
|
});
|
|
|
|
// Act — 30 fps for 10 s
|
|
let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333));
|
|
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
|
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
|
|
let mut seq = 0u64;
|
|
loop {
|
|
ticker.tick().await;
|
|
if tokio::time::Instant::now() >= deadline {
|
|
break;
|
|
}
|
|
let _ = frame_tx.send(make_frame(seq, false));
|
|
seq += 1;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
handle.shutdown();
|
|
|
|
let batch_count = tokio::time::timeout(Duration::from_secs(3), collector)
|
|
.await
|
|
.expect("collector timed out")
|
|
.expect("collector panicked");
|
|
|
|
// Assert
|
|
assert!(
|
|
batch_count >= 285,
|
|
"expected ≥285 batches, got {batch_count}"
|
|
);
|
|
assert_eq!(
|
|
handle.stats().budget_drops_total(),
|
|
0,
|
|
"expected no budget drops"
|
|
);
|
|
if let Some(p99) = handle.latency_p99() {
|
|
assert!(p99 <= Duration::from_millis(100), "p99 {p99:?} > 100 ms");
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-660 AC-2 — reconnect after server closes stream
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac660_2_reconnects_after_stream_close() {
|
|
// The FixtureServer closes each stream-session after 3 responses; the
|
|
// client must reconnect and continue receiving within 2 s.
|
|
let (endpoint, _shutdown) = start_server_with(FixtureServer::fast().with_close_after(3)).await;
|
|
|
|
let config = DetectionClientConfig {
|
|
reconnect_initial: Duration::from_millis(100),
|
|
reconnect_cap: Duration::from_millis(500),
|
|
..DetectionClientConfig::new(endpoint)
|
|
};
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(64);
|
|
let (_join, handle) = DetectionClient::new(config).run(frame_rx);
|
|
wait_connected(&handle).await;
|
|
|
|
let mut events = handle.subscribe_events();
|
|
|
|
// Send 3 frames → server closes stream after the 3rd response.
|
|
for i in 0u64..3 {
|
|
let _ = frame_tx.send(make_frame(i, false));
|
|
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
}
|
|
// Give the stream-close time to propagate and the reconnect to happen.
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
|
|
// Wait up to 2 s for the client to reconnect (AC-2 requirement).
|
|
let mut conn = handle.connection_state_stream();
|
|
tokio::time::timeout(Duration::from_secs(2), async {
|
|
loop {
|
|
if *conn.borrow() == ConnectionState::Connected {
|
|
break;
|
|
}
|
|
let _ = conn.changed().await;
|
|
}
|
|
})
|
|
.await
|
|
.expect("reconnected within 2 s");
|
|
|
|
// Verify frames continue to flow after reconnect.
|
|
for i in 3u64..6 {
|
|
let _ = frame_tx.send(make_frame(i, false));
|
|
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
}
|
|
let post_reconnect_batch = tokio::time::timeout(Duration::from_secs(2), async {
|
|
loop {
|
|
match events.recv().await {
|
|
Ok(DetectionEvent::Batch { .. }) => return true,
|
|
Ok(_) => {}
|
|
Err(_) => return false,
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.unwrap_or(false);
|
|
|
|
// Assert
|
|
assert!(post_reconnect_batch, "frames flow after reconnect");
|
|
// Same model version on reconnect must NOT fire a second ModelVersionChanged.
|
|
let model_changes = handle.stats().model_version_changes_total();
|
|
assert_eq!(
|
|
model_changes, 1,
|
|
"same model version across reconnect must not repeat the event"
|
|
);
|
|
|
|
handle.shutdown();
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-660 AC-3 — budget drops on slow server (200 ms latency, 30 fps source)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac660_3_budget_drops_on_slow_server() {
|
|
// Arrange
|
|
let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(200)).await;
|
|
let config = DetectionClientConfig {
|
|
max_concurrent_in_flight: 2,
|
|
..DetectionClientConfig::new(endpoint)
|
|
};
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(512);
|
|
let (_join, handle) = DetectionClient::new(config).run(frame_rx);
|
|
wait_connected(&handle).await;
|
|
|
|
// Act — 30 fps for 5 s; server takes 200 ms → budget full after frame 2.
|
|
let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333));
|
|
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
|
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
|
|
let mut seq = 0u64;
|
|
loop {
|
|
ticker.tick().await;
|
|
if tokio::time::Instant::now() >= deadline {
|
|
break;
|
|
}
|
|
let _ = frame_tx.send(make_frame(seq, false));
|
|
seq += 1;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
handle.shutdown();
|
|
|
|
// Assert
|
|
let drops = handle.stats().budget_drops_total();
|
|
assert!(drops > 0, "expected budget_drops > 0, got 0");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-660 AC-4 — ai_locked frames are skipped
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac660_4_ai_locked_frames_skipped() {
|
|
// Arrange
|
|
let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await;
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(256);
|
|
let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx);
|
|
wait_connected(&handle).await;
|
|
|
|
// Act — 20 frames; every 5th is ai_locked (frames 4, 9, 14, 19 → 4 locked).
|
|
for i in 0u64..20 {
|
|
let ai_locked = (i + 1) % 5 == 0;
|
|
let _ = frame_tx.send(make_frame(i, ai_locked));
|
|
tokio::time::sleep(Duration::from_millis(15)).await;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
handle.shutdown();
|
|
|
|
// Assert
|
|
let skipped = handle.stats().ai_locked_skipped_total();
|
|
let sent = handle.stats().requests_sent_total();
|
|
assert_eq!(skipped, 4, "expected 4 ai_locked skips, got {skipped}");
|
|
assert!(sent <= 16, "expected ≤16 requests sent, got {sent}");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-661 AC-1 — schema mismatch surfaces as hard error + counter
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac661_1_schema_mismatch_hard_error() {
|
|
// Arrange — server returns schema_version 99 (incompatible with expected 1).
|
|
let (endpoint, _shutdown) =
|
|
start_server_with(FixtureServer::fast().with_schema_version(99)).await;
|
|
let config = DetectionClientConfig {
|
|
expected_schema_version: 1,
|
|
..DetectionClientConfig::new(endpoint)
|
|
};
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(64);
|
|
let (_join, handle) = DetectionClient::new(config).run(frame_rx);
|
|
let mut events = handle.subscribe_events();
|
|
wait_connected(&handle).await;
|
|
|
|
// Act
|
|
let _ = frame_tx.send(make_frame(1, false));
|
|
|
|
// Assert — SchemaMismatch event emitted and counter increments.
|
|
let got_mismatch = tokio::time::timeout(Duration::from_secs(2), async {
|
|
loop {
|
|
match events.recv().await {
|
|
Ok(DetectionEvent::SchemaMismatch { .. }) => return true,
|
|
Ok(_) => {}
|
|
Err(_) => return false,
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.unwrap_or(false);
|
|
|
|
assert!(got_mismatch, "expected SchemaMismatch event");
|
|
assert!(
|
|
handle.stats().schema_mismatch_total() >= 1,
|
|
"expected schema_mismatch_total ≥ 1"
|
|
);
|
|
|
|
handle.shutdown();
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-661 AC-2 — model_version change is signalled exactly once
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac661_2_model_version_change_emits_event() {
|
|
// Arrange — server returns "v1.2" for the first response, then "v1.3".
|
|
let (endpoint, _shutdown) = start_server_with(VersionSwitchServer {
|
|
first_model: "v1.2".to_string(),
|
|
second_model: "v1.3".to_string(),
|
|
switch_after: 1,
|
|
})
|
|
.await;
|
|
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(64);
|
|
let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx);
|
|
let mut events = handle.subscribe_events();
|
|
wait_connected(&handle).await;
|
|
|
|
// Act — send 5 frames; responses 1 = "v1.2", responses 2-5 = "v1.3".
|
|
for i in 0u64..5 {
|
|
let _ = frame_tx.send(make_frame(i, false));
|
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
|
}
|
|
|
|
// Drain all pending events within a 500 ms window.
|
|
let mut v13_events = 0u32;
|
|
let drain_deadline = tokio::time::Instant::now() + Duration::from_millis(500);
|
|
loop {
|
|
let remaining = drain_deadline.saturating_duration_since(tokio::time::Instant::now());
|
|
if remaining.is_zero() {
|
|
break;
|
|
}
|
|
match tokio::time::timeout(remaining, events.recv()).await {
|
|
Ok(Ok(DetectionEvent::ModelVersionChanged { current, .. })) => {
|
|
if current == "v1.3" {
|
|
v13_events += 1;
|
|
}
|
|
}
|
|
Ok(Ok(_)) => {}
|
|
_ => break,
|
|
}
|
|
}
|
|
|
|
handle.shutdown();
|
|
|
|
// Assert — exactly one transition to "v1.3".
|
|
assert_eq!(
|
|
v13_events, 1,
|
|
"expected exactly one ModelVersionChanged(v1.3), got {v13_events}"
|
|
);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AZ-661 AC-3 — Tier1Degraded emitted exactly once on latency spike
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn ac661_3_tier1_degraded_emitted_once_on_latency_spike() {
|
|
// Arrange — small latency window (8 samples) so the window fills quickly;
|
|
// server latency 150 ms > threshold 100 ms.
|
|
let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(150)).await;
|
|
let config = DetectionClientConfig {
|
|
latency_window_capacity: 8,
|
|
latency_p99_threshold: Duration::from_millis(100),
|
|
..DetectionClientConfig::new(endpoint)
|
|
};
|
|
let (frame_tx, frame_rx) = broadcast::channel::<Frame>(64);
|
|
let (_join, handle) = DetectionClient::new(config).run(frame_rx);
|
|
let mut events = handle.subscribe_events();
|
|
wait_connected(&handle).await;
|
|
|
|
// Act — send 10 frames; server responds in 150 ms each.
|
|
// The latency window (capacity 8) will be full of 150 ms samples after
|
|
// 8 responses; p99 = 150 ms > 100 ms → exactly one Tier1Degraded event.
|
|
for i in 0u64..10 {
|
|
let _ = frame_tx.send(make_frame(i, false));
|
|
tokio::time::sleep(Duration::from_millis(160)).await;
|
|
}
|
|
handle.shutdown();
|
|
|
|
// Drain events.
|
|
let mut degraded_count = 0u32;
|
|
loop {
|
|
match events.try_recv() {
|
|
Ok(DetectionEvent::Tier1Degraded { .. }) => degraded_count += 1,
|
|
Err(_) => break,
|
|
Ok(_) => {}
|
|
}
|
|
}
|
|
|
|
// Assert — the latch fires exactly once per degraded→healthy transition.
|
|
assert_eq!(
|
|
degraded_count, 1,
|
|
"expected exactly one Tier1Degraded event, got {degraded_count}"
|
|
);
|
|
}
|