//! AZ-660 + AZ-661 integration tests — fixture in-process gRPC server. //! //! AC-660-1 takes ~10 s; all others complete in ≤5 s. use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::transport::Server; use tonic::{Request, Response, Status}; use detection_client::internal::proto::{ detection_service_server::{DetectionService, DetectionServiceServer}, DetectionResponse, FrameRequest, }; use detection_client::{ConnectionState, DetectionClient, DetectionClientConfig, DetectionEvent}; use shared::models::frame::{Frame, PixelFormat}; // --------------------------------------------------------------------------- // Frame factory // --------------------------------------------------------------------------- fn make_frame(seq: u64, ai_locked: bool) -> Frame { Frame { seq, capture_ts_monotonic_ns: seq * 33_333_333, decode_ts_monotonic_ns: seq * 33_333_333 + 1_000_000, pixels: Arc::new(Bytes::from_static(b"\x80")), width: 1, height: 1, pix_fmt: PixelFormat::Nv12, ai_locked, } } // --------------------------------------------------------------------------- // Fixture: configurable echo server // // `close_after` is per-stream-session (reset on each `stream()` call) so the // server can be re-used across reconnects without freezing on the second // session. // --------------------------------------------------------------------------- #[derive(Clone)] struct FixtureServer { latency_ms: u64, schema_version: u32, model_version: String, close_after: Option, } impl FixtureServer { fn fast() -> Self { Self { latency_ms: 10, schema_version: 1, model_version: "v1.0".to_string(), close_after: None, } } fn slow(latency_ms: u64) -> Self { Self { latency_ms, ..Self::fast() } } fn with_schema_version(mut self, v: u32) -> Self { self.schema_version = v; self } fn with_close_after(mut self, n: u32) -> Self { self.close_after = Some(n); self } } #[async_trait] impl DetectionService for FixtureServer { type StreamStream = ReceiverStream>; async fn stream( &self, request: Request>, ) -> Result, Status> { let latency = Duration::from_millis(self.latency_ms); let schema_version = self.schema_version; let model_version = self.model_version.clone(); let close_after = self.close_after; let mut inbound = request.into_inner(); let (tx, rx) = mpsc::channel::>(32); tokio::spawn(async move { let mut session_count = 0u32; while let Ok(Some(req)) = inbound.message().await { tokio::time::sleep(latency).await; session_count += 1; let resp = DetectionResponse { schema_version, model_version: model_version.clone(), frame_seq: req.frame_seq, latency_ms: latency.as_millis() as u32, detections: vec![], }; if tx.send(Ok(resp)).await.is_err() { break; } if close_after.map(|n| session_count >= n).unwrap_or(false) { break; } } }); Ok(Response::new(ReceiverStream::new(rx))) } } // --------------------------------------------------------------------------- // Fixture: server that switches model_version mid-stream // --------------------------------------------------------------------------- #[derive(Clone)] struct VersionSwitchServer { first_model: String, second_model: String, /// Return `first_model` for the first `switch_after` responses, then /// `second_model` for all subsequent ones within the SAME session. switch_after: u32, } #[async_trait] impl DetectionService for VersionSwitchServer { type StreamStream = ReceiverStream>; async fn stream( &self, request: Request>, ) -> Result, Status> { let first = self.first_model.clone(); let second = self.second_model.clone(); let switch_after = self.switch_after; let mut inbound = request.into_inner(); let (tx, rx) = mpsc::channel::>(32); tokio::spawn(async move { let mut count = 0u32; while let Ok(Some(req)) = inbound.message().await { tokio::time::sleep(Duration::from_millis(10)).await; let model = if count < switch_after { first.clone() } else { second.clone() }; count += 1; let resp = DetectionResponse { schema_version: 1, model_version: model, frame_seq: req.frame_seq, latency_ms: 10, detections: vec![], }; if tx.send(Ok(resp)).await.is_err() { break; } } }); Ok(Response::new(ReceiverStream::new(rx))) } } // --------------------------------------------------------------------------- // Server harness // --------------------------------------------------------------------------- async fn start_server_with(svc: S) -> (String, oneshot::Sender<()>) where S: DetectionService + Clone + Send + Sync + 'static, { let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let stream = TcpListenerStream::new(listener); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); tokio::spawn(async move { Server::builder() .add_service(DetectionServiceServer::new(svc)) .serve_with_incoming_shutdown(stream, async { let _ = shutdown_rx.await; }) .await .unwrap(); }); (format!("http://{addr}"), shutdown_tx) } async fn wait_connected(handle: &detection_client::DetectionClientHandle) { let mut conn = handle.connection_state_stream(); tokio::time::timeout(Duration::from_secs(5), async { loop { if *conn.borrow() == ConnectionState::Connected { break; } let _ = conn.changed().await; } }) .await .expect("client connected within 5 s"); } // --------------------------------------------------------------------------- // AZ-660 AC-1 — happy path, 30 fps for 10 s, ≥285 batches, p99 ≤100 ms // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac660_1_happy_path_30fps_285_batches() { // Arrange let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await; let (frame_tx, frame_rx) = broadcast::channel::(512); let config = DetectionClientConfig::new(endpoint); let (_join, handle) = DetectionClient::new(config).run(frame_rx); wait_connected(&handle).await; let mut events = handle.subscribe_events(); let collector = tokio::spawn(async move { let mut count = 0u64; loop { match tokio::time::timeout(Duration::from_secs(2), events.recv()).await { Ok(Ok(DetectionEvent::Batch { .. })) => count += 1, Ok(Ok(_)) => {} _ => break, } } count }); // Act — 30 fps for 10 s let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let deadline = tokio::time::Instant::now() + Duration::from_secs(10); let mut seq = 0u64; loop { ticker.tick().await; if tokio::time::Instant::now() >= deadline { break; } let _ = frame_tx.send(make_frame(seq, false)); seq += 1; } tokio::time::sleep(Duration::from_millis(500)).await; handle.shutdown(); let batch_count = tokio::time::timeout(Duration::from_secs(3), collector) .await .expect("collector timed out") .expect("collector panicked"); // Assert assert!( batch_count >= 285, "expected ≥285 batches, got {batch_count}" ); assert_eq!( handle.stats().budget_drops_total(), 0, "expected no budget drops" ); if let Some(p99) = handle.latency_p99() { assert!(p99 <= Duration::from_millis(100), "p99 {p99:?} > 100 ms"); } } // --------------------------------------------------------------------------- // AZ-660 AC-2 — reconnect after server closes stream // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac660_2_reconnects_after_stream_close() { // The FixtureServer closes each stream-session after 3 responses; the // client must reconnect and continue receiving within 2 s. let (endpoint, _shutdown) = start_server_with(FixtureServer::fast().with_close_after(3)).await; let config = DetectionClientConfig { reconnect_initial: Duration::from_millis(100), reconnect_cap: Duration::from_millis(500), ..DetectionClientConfig::new(endpoint) }; let (frame_tx, frame_rx) = broadcast::channel::(64); let (_join, handle) = DetectionClient::new(config).run(frame_rx); wait_connected(&handle).await; let mut events = handle.subscribe_events(); // Send 3 frames → server closes stream after the 3rd response. for i in 0u64..3 { let _ = frame_tx.send(make_frame(i, false)); tokio::time::sleep(Duration::from_millis(25)).await; } // Give the stream-close time to propagate and the reconnect to happen. tokio::time::sleep(Duration::from_millis(300)).await; // Wait up to 2 s for the client to reconnect (AC-2 requirement). let mut conn = handle.connection_state_stream(); tokio::time::timeout(Duration::from_secs(2), async { loop { if *conn.borrow() == ConnectionState::Connected { break; } let _ = conn.changed().await; } }) .await .expect("reconnected within 2 s"); // Verify frames continue to flow after reconnect. for i in 3u64..6 { let _ = frame_tx.send(make_frame(i, false)); tokio::time::sleep(Duration::from_millis(25)).await; } let post_reconnect_batch = tokio::time::timeout(Duration::from_secs(2), async { loop { match events.recv().await { Ok(DetectionEvent::Batch { .. }) => return true, Ok(_) => {} Err(_) => return false, } } }) .await .unwrap_or(false); // Assert assert!(post_reconnect_batch, "frames flow after reconnect"); // Same model version on reconnect must NOT fire a second ModelVersionChanged. let model_changes = handle.stats().model_version_changes_total(); assert_eq!( model_changes, 1, "same model version across reconnect must not repeat the event" ); handle.shutdown(); } // --------------------------------------------------------------------------- // AZ-660 AC-3 — budget drops on slow server (200 ms latency, 30 fps source) // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac660_3_budget_drops_on_slow_server() { // Arrange let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(200)).await; let config = DetectionClientConfig { max_concurrent_in_flight: 2, ..DetectionClientConfig::new(endpoint) }; let (frame_tx, frame_rx) = broadcast::channel::(512); let (_join, handle) = DetectionClient::new(config).run(frame_rx); wait_connected(&handle).await; // Act — 30 fps for 5 s; server takes 200 ms → budget full after frame 2. let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let mut seq = 0u64; loop { ticker.tick().await; if tokio::time::Instant::now() >= deadline { break; } let _ = frame_tx.send(make_frame(seq, false)); seq += 1; } tokio::time::sleep(Duration::from_millis(300)).await; handle.shutdown(); // Assert let drops = handle.stats().budget_drops_total(); assert!(drops > 0, "expected budget_drops > 0, got 0"); } // --------------------------------------------------------------------------- // AZ-660 AC-4 — ai_locked frames are skipped // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac660_4_ai_locked_frames_skipped() { // Arrange let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await; let (frame_tx, frame_rx) = broadcast::channel::(256); let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx); wait_connected(&handle).await; // Act — 20 frames; every 5th is ai_locked (frames 4, 9, 14, 19 → 4 locked). for i in 0u64..20 { let ai_locked = (i + 1) % 5 == 0; let _ = frame_tx.send(make_frame(i, ai_locked)); tokio::time::sleep(Duration::from_millis(15)).await; } tokio::time::sleep(Duration::from_millis(300)).await; handle.shutdown(); // Assert let skipped = handle.stats().ai_locked_skipped_total(); let sent = handle.stats().requests_sent_total(); assert_eq!(skipped, 4, "expected 4 ai_locked skips, got {skipped}"); assert!(sent <= 16, "expected ≤16 requests sent, got {sent}"); } // --------------------------------------------------------------------------- // AZ-661 AC-1 — schema mismatch surfaces as hard error + counter // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac661_1_schema_mismatch_hard_error() { // Arrange — server returns schema_version 99 (incompatible with expected 1). let (endpoint, _shutdown) = start_server_with(FixtureServer::fast().with_schema_version(99)).await; let config = DetectionClientConfig { expected_schema_version: 1, ..DetectionClientConfig::new(endpoint) }; let (frame_tx, frame_rx) = broadcast::channel::(64); let (_join, handle) = DetectionClient::new(config).run(frame_rx); let mut events = handle.subscribe_events(); wait_connected(&handle).await; // Act let _ = frame_tx.send(make_frame(1, false)); // Assert — SchemaMismatch event emitted and counter increments. let got_mismatch = tokio::time::timeout(Duration::from_secs(2), async { loop { match events.recv().await { Ok(DetectionEvent::SchemaMismatch { .. }) => return true, Ok(_) => {} Err(_) => return false, } } }) .await .unwrap_or(false); assert!(got_mismatch, "expected SchemaMismatch event"); assert!( handle.stats().schema_mismatch_total() >= 1, "expected schema_mismatch_total ≥ 1" ); handle.shutdown(); } // --------------------------------------------------------------------------- // AZ-661 AC-2 — model_version change is signalled exactly once // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac661_2_model_version_change_emits_event() { // Arrange — server returns "v1.2" for the first response, then "v1.3". let (endpoint, _shutdown) = start_server_with(VersionSwitchServer { first_model: "v1.2".to_string(), second_model: "v1.3".to_string(), switch_after: 1, }) .await; let (frame_tx, frame_rx) = broadcast::channel::(64); let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx); let mut events = handle.subscribe_events(); wait_connected(&handle).await; // Act — send 5 frames; responses 1 = "v1.2", responses 2-5 = "v1.3". for i in 0u64..5 { let _ = frame_tx.send(make_frame(i, false)); tokio::time::sleep(Duration::from_millis(20)).await; } // Drain all pending events within a 500 ms window. let mut v13_events = 0u32; let drain_deadline = tokio::time::Instant::now() + Duration::from_millis(500); loop { let remaining = drain_deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { break; } match tokio::time::timeout(remaining, events.recv()).await { Ok(Ok(DetectionEvent::ModelVersionChanged { current, .. })) => { if current == "v1.3" { v13_events += 1; } } Ok(Ok(_)) => {} _ => break, } } handle.shutdown(); // Assert — exactly one transition to "v1.3". assert_eq!( v13_events, 1, "expected exactly one ModelVersionChanged(v1.3), got {v13_events}" ); } // --------------------------------------------------------------------------- // AZ-661 AC-3 — Tier1Degraded emitted exactly once on latency spike // --------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread")] async fn ac661_3_tier1_degraded_emitted_once_on_latency_spike() { // Arrange — small latency window (8 samples) so the window fills quickly; // server latency 150 ms > threshold 100 ms. let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(150)).await; let config = DetectionClientConfig { latency_window_capacity: 8, latency_p99_threshold: Duration::from_millis(100), ..DetectionClientConfig::new(endpoint) }; let (frame_tx, frame_rx) = broadcast::channel::(64); let (_join, handle) = DetectionClient::new(config).run(frame_rx); let mut events = handle.subscribe_events(); wait_connected(&handle).await; // Act — send 10 frames; server responds in 150 ms each. // The latency window (capacity 8) will be full of 150 ms samples after // 8 responses; p99 = 150 ms > 100 ms → exactly one Tier1Degraded event. for i in 0u64..10 { let _ = frame_tx.send(make_frame(i, false)); tokio::time::sleep(Duration::from_millis(160)).await; } handle.shutdown(); // Drain events. let mut degraded_count = 0u32; loop { match events.try_recv() { Ok(DetectionEvent::Tier1Degraded { .. }) => degraded_count += 1, Err(_) => break, Ok(_) => {} } } // Assert — the latch fires exactly once per degraded→healthy transition. assert_eq!( degraded_count, 1, "expected exactly one Tier1Degraded event, got {degraded_count}" ); }