//! AZ-675 integration tests — Tonic server + per-client lossy queue //! exercised through an in-process gRPC client (the only stub allowed //! per the task's Runtime Completeness gate). use std::net::TcpListener; use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::time::timeout; use tokio_stream::StreamExt; use tonic::transport::{Channel, Endpoint}; use tonic::Request; use telemetry_stream::{SubscribeRequest, TelemetryStream, TelemetryStreamClient, TelemetryTopic}; #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] struct Sample { n: u32, } /// Bind a TCP listener on `127.0.0.1:0`, return the listener and the /// resolved port. The test stack uses ephemeral ports so multiple /// integration tests in the same suite never collide. fn bind_ephemeral() -> (TcpListener, u16) { let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral"); let port = l.local_addr().unwrap().port(); (l, port) } async fn connect(port: u16) -> TelemetryStreamClient { let url = format!("http://127.0.0.1:{port}"); let endpoint = Endpoint::from_shared(url) .unwrap() .connect_timeout(Duration::from_secs(2)); // Tiny retry loop — the server can take a few ms to bind. for _ in 0..50 { if let Ok(c) = TelemetryStreamClient::connect(endpoint.clone()).await { return c; } tokio::time::sleep(Duration::from_millis(20)).await; } panic!("gRPC client failed to connect within ~1s"); } /// AC-1 — multiple subscribers receive the same stream, ordering preserved. #[tokio::test] async fn ac1_multiple_subscribers_receive_same_stream() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(8); let handle = server.handle(); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut c1 = connect(port).await; let mut c2 = connect(port).await; let mut c3 = connect(port).await; let req = |id: &str| { Request::new(SubscribeRequest { client_id: id.to_string(), topics: vec![TelemetryTopic::TelemetrySample as i32], }) }; let mut s1 = c1.subscribe(req("a")).await.unwrap().into_inner(); let mut s2 = c2.subscribe(req("b")).await.unwrap().into_inner(); let mut s3 = c3.subscribe(req("c")).await.unwrap().into_inner(); // Give the server time to register all three before publishing. for _ in 0..50 { if handle.snapshot().subscribed_clients == 3 { break; } tokio::time::sleep(Duration::from_millis(10)).await; } assert_eq!(handle.snapshot().subscribed_clients, 3); // Act for n in 0..100u32 { handle .publish(TelemetryTopic::TelemetrySample, &Sample { n }) .unwrap(); } async fn collect100( s: &mut tonic::Streaming, ) -> Vec { let mut out = Vec::new(); while out.len() < 100 { let msg = timeout(Duration::from_secs(2), s.next()) .await .expect("ac1 client did not receive 100 in 2s") .expect("stream ended") .expect("server status"); let sample: Sample = serde_json::from_slice(&msg.payload_json).unwrap(); out.push(sample); } out } let r1 = collect100(&mut s1).await; let r2 = collect100(&mut s2).await; let r3 = collect100(&mut s3).await; // Assert — all three receive all 100 in order. let expected: Vec = (0..100).map(|n| Sample { n }).collect(); assert_eq!(r1, expected); assert_eq!(r2, expected); assert_eq!(r3, expected); } /// AC-2 — slow subscriber drops oldest, healthy unaffected. /// /// We can't simulate "client never polls" through real tonic+H2 /// because the OS TCP + HTTP/2 receive buffers (often 256 KB+) will /// absorb a small JSON burst entirely with no observable drop. The /// realistic AC-2 condition is "slow drain rate" — the slow client /// polls but at a fraction of the publish rate. With large payloads /// and a small per-topic broadcast capacity, the slow client's /// broadcast cursor falls behind the producer and `BroadcastStream` /// emits `Lagged(n)` — the only signal AZ-675 wires into the per- /// `(client_id, topic)` drop counters. #[tokio::test] async fn ac2_slow_subscriber_drops_oldest_healthy_unaffected() { // Arrange — capacity 4, big payloads so wire back-pressure // propagates fast and broadcast lag is observable end-to-end. let (listener, port) = bind_ephemeral(); let cfg = telemetry_stream::TelemetryStreamConfig { topic_capacity: 4, ..telemetry_stream::TelemetryStreamConfig::default() }; let server = TelemetryStream::with_config(cfg); let handle = server.handle(); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); #[derive(Serialize)] struct Big { n: u32, // 64 KB filler per message — fills H2 receive window quickly. pad: Vec, } let mut c_fast = connect(port).await; let mut c_slow = connect(port).await; let req = |id: &str| { Request::new(SubscribeRequest { client_id: id.to_string(), topics: vec![TelemetryTopic::TelemetrySample as i32], }) }; let mut s_fast = c_fast.subscribe(req("fast")).await.unwrap().into_inner(); let mut s_slow = c_slow.subscribe(req("slow")).await.unwrap().into_inner(); for _ in 0..50 { if handle.snapshot().subscribed_clients == 2 { break; } tokio::time::sleep(Duration::from_millis(10)).await; } assert_eq!(handle.snapshot().subscribed_clients, 2); let total: u32 = 300; // Fast drains immediately, slow drains 1 msg every 50ms (way below publish rate). let fast_task = tokio::spawn(async move { let mut seen = 0u32; while seen < total { let _msg = timeout(Duration::from_secs(15), s_fast.next()) .await .expect("fast did not receive everything in 15s") .expect("stream ended") .expect("server status"); seen += 1; } seen }); let slow_task = tokio::spawn(async move { let mut received = 0u32; let deadline = std::time::Instant::now() + Duration::from_secs(30); while std::time::Instant::now() < deadline { match timeout(Duration::from_millis(500), s_slow.next()).await { Ok(Some(Ok(_msg))) => received += 1, Ok(Some(Err(_))) => break, Ok(None) => break, Err(_) => break, } tokio::time::sleep(Duration::from_millis(50)).await; } received }); // Act — burst publishes at ~1ms cadence with 64 KB payloads so // broadcast capacity is overrun for the slow consumer. let payload_pad = vec![0u8; 64 * 1024]; for n in 0..total { handle .publish( TelemetryTopic::TelemetrySample, &Big { n, pad: payload_pad.clone(), }, ) .unwrap(); tokio::time::sleep(Duration::from_millis(2)).await; } let fast_count = fast_task.await.unwrap(); let slow_count = slow_task.await.unwrap(); let snap = handle.snapshot(); let slow_drops = snap .drops_total .get(&("slow".to_string(), TelemetryTopic::TelemetrySample)) .copied() .unwrap_or(0); let fast_drops = snap .drops_total .get(&("fast".to_string(), TelemetryTopic::TelemetrySample)) .copied() .unwrap_or(0); // Assert assert_eq!(fast_count, total, "fast client receives every message"); assert_eq!(fast_drops, 0, "fast client MUST NOT have drops"); assert!( slow_drops > 0, "slow client MUST have lagged at some point (got 0 drops; slow_count={slow_count})" ); assert!( slow_count < total, "slow client should have lost messages (got {slow_count}/{total})" ); assert!( (slow_count as u64) + slow_drops <= (total as u64) + 4, "accounted={} should not exceed total+capacity", (slow_count as u64) + slow_drops ); } /// AC-3 — disconnect cleanly removes subscriber. #[tokio::test] async fn ac3_disconnect_decrements_subscribed_clients() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(8); let handle = server.handle(); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut c1 = connect(port).await; let req = || { Request::new(SubscribeRequest { client_id: "ephemeral".to_string(), topics: vec![], }) }; let s1 = c1.subscribe(req()).await.unwrap().into_inner(); for _ in 0..50 { if handle.snapshot().subscribed_clients == 1 { break; } tokio::time::sleep(Duration::from_millis(10)).await; } assert_eq!(handle.snapshot().subscribed_clients, 1); // Act — drop the stream (simulates client cancel). drop(s1); drop(c1); // Assert — register/deregister is eventually consistent; wait briefly. let mut final_count = 99; for _ in 0..100 { final_count = handle.snapshot().subscribed_clients; if final_count == 0 { break; } tokio::time::sleep(Duration::from_millis(20)).await; } assert_eq!( final_count, 0, "disconnect must decrement subscribed_clients" ); } /// Subscribing to an empty `topics` list defaults to ALL topics. #[tokio::test] async fn empty_topics_list_defaults_to_all() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(8); let handle = server.handle(); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut client = connect(port).await; let mut stream = client .subscribe(Request::new(SubscribeRequest { client_id: "all".to_string(), topics: vec![], })) .await .unwrap() .into_inner(); for _ in 0..50 { if handle.snapshot().subscribed_clients == 1 { break; } tokio::time::sleep(Duration::from_millis(10)).await; } // Act — publish one message on each topic. handle .publish(TelemetryTopic::TelemetrySample, &Sample { n: 1 }) .unwrap(); handle .publish(TelemetryTopic::GimbalState, &Sample { n: 2 }) .unwrap(); handle .publish(TelemetryTopic::DetectionEvent, &Sample { n: 3 }) .unwrap(); handle .publish(TelemetryTopic::MovementCandidate, &Sample { n: 4 }) .unwrap(); handle .publish(TelemetryTopic::MapObjectsBundle, &Sample { n: 5 }) .unwrap(); let mut seen_topics = std::collections::HashSet::new(); while seen_topics.len() < 5 { let msg = timeout(Duration::from_secs(2), stream.next()) .await .expect("did not receive all 5 topics in 2s") .unwrap() .unwrap(); seen_topics.insert(msg.topic); } // Assert assert_eq!(seen_topics.len(), 5); } /// Empty client_id is rejected at the server boundary. #[tokio::test] async fn empty_client_id_is_rejected() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(8); let _h = server.handle(); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut client = connect(port).await; // Act let err = client .subscribe(Request::new(SubscribeRequest { client_id: String::new(), topics: vec![], })) .await .expect_err("empty client_id must return Status error"); // Assert assert_eq!(err.code(), tonic::Code::InvalidArgument); }