//! AZ-677 integration tests — snapshot on subscribe, diff stream while //! connected, fresh snapshot on reconnect (no diff replay). use std::net::TcpListener; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use chrono::Utc; use tokio::time::timeout; use tokio_stream::StreamExt; use tonic::transport::{Channel, Endpoint}; use tonic::Request; use uuid::Uuid; use shared::models::mapobject::{ BundleFreshness, DiffKind, IgnoredItem, IgnoredItemSource, MapObject, MapObjectObservation, MapObjectSource, MapObjectsBundle, RetentionScope, }; use shared::models::mission::Coordinate; use telemetry_stream::internal::mapobjects::MapObjectsDiff; use telemetry_stream::{ MapObjectsSnapshotSource, MapObjectsTopicMessage, SubscribeRequest, TelemetryStream, TelemetryStreamClient, TelemetryTopic, }; fn bind_ephemeral() -> (TcpListener, u16) { let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral"); let port = l.local_addr().unwrap().port(); (l, port) } async fn connect(port: u16) -> TelemetryStreamClient { let url = format!("http://127.0.0.1:{port}"); let endpoint = Endpoint::from_shared(url) .unwrap() .connect_timeout(Duration::from_secs(2)); for _ in 0..50 { if let Ok(c) = TelemetryStreamClient::connect(endpoint.clone()).await { return c; } tokio::time::sleep(Duration::from_millis(20)).await; } panic!("gRPC client failed to connect"); } fn coord(lat: f64, lon: f64) -> Coordinate { Coordinate { latitude: lat, longitude: lon, altitude_m: 0.0, } } fn make_mapobject(class: &str) -> MapObject { let now = Utc::now(); MapObject { h3_cell: 0, mgrs_key: "33UWP00".to_string(), class: class.to_string(), class_group: "vehicle".to_string(), gps_lat: 0.0, gps_lon: 0.0, size_width_m: 2.0, size_length_m: 4.0, confidence: 0.8, first_seen: now, last_seen: now, mission_id: "m1".to_string(), source: MapObjectSource::LocalObserved, pending_upload: true, } } fn make_ignored() -> IgnoredItem { IgnoredItem { id: Uuid::new_v4(), mgrs: "33UWP01".to_string(), h3_cell: 0, class_group: "vehicle".to_string(), decline_time: Utc::now(), operator_id: None, mission_id: "m1".to_string(), retention_scope: RetentionScope::Mission, expires_at: None, source: IgnoredItemSource::LocalAppended, pending_upload: true, } } fn make_observation(class: &str) -> MapObjectObservation { MapObjectObservation { id: Uuid::new_v4(), h3_cell: 0, class: class.to_string(), class_group: "vehicle".to_string(), mission_id: "m1".to_string(), uav_id: "uav_1".to_string(), observed_at_monotonic_ns: 1, observed_at_wallclock: Utc::now(), gps_lat: 0.0, gps_lon: 0.0, mgrs: "33UWP02".to_string(), size_width_m: 2.0, size_length_m: 4.0, confidence: 0.7, diff_kind: DiffKind::New, photo_ref: None, raw_evidence: None, } } /// Snapshot source whose `snapshot()` content can be mutated between /// subscribes. Tracks how many times it has been called so the /// reconnect test can verify the source was re-queried (rather than a /// cached snapshot). struct MutableSource { bundle: parking_lot::Mutex, snapshots_emitted: AtomicUsize, } impl MutableSource { fn new(initial: MapObjectsBundle) -> Self { Self { bundle: parking_lot::Mutex::new(initial), snapshots_emitted: AtomicUsize::new(0), } } fn set(&self, b: MapObjectsBundle) { *self.bundle.lock() = b; } } impl MapObjectsSnapshotSource for MutableSource { fn snapshot(&self) -> MapObjectsBundle { self.snapshots_emitted.fetch_add(1, Ordering::Relaxed); self.bundle.lock().clone() } } fn empty_bundle() -> MapObjectsBundle { MapObjectsBundle { schema_version: "1.0".to_string(), mission_id: "m1".to_string(), bbox: [coord(0.0, 0.0), coord(1.0, 1.0)], map_objects: Vec::new(), observations: Vec::new(), ignored_items: Vec::new(), as_of: Utc::now(), freshness: Some(BundleFreshness::Fresh), } } /// AC-1 — A fresh subscriber to MapObjectsBundle receives exactly one /// snapshot, populated from the configured source. #[tokio::test] async fn ac1_first_subscribe_receives_snapshot() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(64); let handle = server.handle(); let initial = MapObjectsBundle { map_objects: (0..50).map(|_| make_mapobject("tank")).collect(), ignored_items: (0..10).map(|_| make_ignored()).collect(), ..empty_bundle() }; let src = Arc::new(MutableSource::new(initial)); server.set_mapobjects_snapshot_source(src.clone()); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut client = connect(port).await; let mut stream = client .subscribe(Request::new(SubscribeRequest { client_id: "op_a".to_string(), topics: vec![TelemetryTopic::MapObjectsBundle as i32], })) .await .unwrap() .into_inner(); // Act — pull the first message off the stream. let msg = timeout(Duration::from_secs(2), stream.next()) .await .unwrap() .unwrap() .unwrap(); let payload: MapObjectsTopicMessage = serde_json::from_slice(&msg.payload_json).unwrap(); // Assert let snap = match payload { MapObjectsTopicMessage::Snapshot(s) => s, MapObjectsTopicMessage::Diff(_) => panic!("expected Snapshot first; got Diff"), }; assert_eq!(snap.bundle.map_objects.len(), 50); assert_eq!(snap.bundle.ignored_items.len(), 10); assert_eq!(src.snapshots_emitted.load(Ordering::Relaxed), 1); // Drop client + handle scope (cleanup). drop(stream); drop(client); drop(handle); } /// AC-2 — While connected, diffs appended by the composition root are /// received by the client. #[tokio::test] async fn ac2_inflight_changes_emit_diffs() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(64); let handle = server.handle(); let src = Arc::new(MutableSource::new(empty_bundle())); server.set_mapobjects_snapshot_source(src); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); let mut client = connect(port).await; let mut stream = client .subscribe(Request::new(SubscribeRequest { client_id: "op_b".to_string(), topics: vec![TelemetryTopic::MapObjectsBundle as i32], })) .await .unwrap() .into_inner(); // Drain the snapshot first. let snap_msg = timeout(Duration::from_secs(2), stream.next()) .await .unwrap() .unwrap() .unwrap(); let snap_payload: MapObjectsTopicMessage = serde_json::from_slice(&snap_msg.payload_json).unwrap(); assert!(matches!(snap_payload, MapObjectsTopicMessage::Snapshot(_))); // Wait for the client to register before publishing diffs. for _ in 0..50 { if handle.snapshot().subscribed_clients == 1 { break; } tokio::time::sleep(Duration::from_millis(10)).await; } // Act — push one diff with 3 added observations + 1 ignored item. let diff = MapObjectsDiff { added: vec![ make_observation("tank"), make_observation("apc"), make_observation("truck"), ], moved: vec![], removed_candidates: vec![], ignored: vec![make_ignored()], }; handle.push_mapobjects_diff(diff).unwrap(); // Pull the next message — must be a Diff carrying our content. let diff_msg = timeout(Duration::from_secs(2), stream.next()) .await .unwrap() .unwrap() .unwrap(); let diff_payload: MapObjectsTopicMessage = serde_json::from_slice(&diff_msg.payload_json).unwrap(); let received_diff = match diff_payload { MapObjectsTopicMessage::Diff(d) => d, MapObjectsTopicMessage::Snapshot(_) => panic!("expected Diff; got Snapshot"), }; // Assert assert_eq!(received_diff.added.len(), 3); assert_eq!(received_diff.ignored.len(), 1); assert!(received_diff.moved.is_empty()); assert!(received_diff.removed_candidates.is_empty()); } /// AC-3 — Reconnect after disconnect emits a fresh snapshot reflecting /// current state; diffs that flew during the gap are NOT replayed. #[tokio::test] async fn ac3_reconnect_resnaps_without_replay() { // Arrange let (listener, port) = bind_ephemeral(); let server = TelemetryStream::new(64); let handle = server.handle(); let src = Arc::new(MutableSource::new(empty_bundle())); server.set_mapobjects_snapshot_source(src.clone()); let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); // Subscribe once, drain snapshot, drop. let mut client = connect(port).await; let mut stream = client .subscribe(Request::new(SubscribeRequest { client_id: "op_c".to_string(), topics: vec![TelemetryTopic::MapObjectsBundle as i32], })) .await .unwrap() .into_inner(); let first_snap = timeout(Duration::from_secs(2), stream.next()) .await .unwrap() .unwrap() .unwrap(); let first: MapObjectsTopicMessage = serde_json::from_slice(&first_snap.payload_json).unwrap(); let first_snap = match first { MapObjectsTopicMessage::Snapshot(s) => s, MapObjectsTopicMessage::Diff(_) => panic!("first must be Snapshot"), }; assert!(first_snap.bundle.map_objects.is_empty()); // Disconnect. drop(stream); drop(client); for _ in 0..50 { if handle.snapshot().subscribed_clients == 0 { break; } tokio::time::sleep(Duration::from_millis(20)).await; } // Act — store grew by 5 while client was disconnected. Also push // a couple of diffs that the reconnecting client must NOT see. src.set(MapObjectsBundle { map_objects: (0..5).map(|_| make_mapobject("tank")).collect(), ..empty_bundle() }); handle .push_mapobjects_diff(MapObjectsDiff { added: vec![make_observation("ghost_during_gap")], ..MapObjectsDiff::default() }) .unwrap(); // Reconnect with the same client_id. let mut client2 = connect(port).await; let mut stream2 = client2 .subscribe(Request::new(SubscribeRequest { client_id: "op_c".to_string(), topics: vec![TelemetryTopic::MapObjectsBundle as i32], })) .await .unwrap() .into_inner(); let resnap_msg = timeout(Duration::from_secs(2), stream2.next()) .await .unwrap() .unwrap() .unwrap(); let resnap_payload: MapObjectsTopicMessage = serde_json::from_slice(&resnap_msg.payload_json).unwrap(); // Assert — first message after reconnect is a snapshot reflecting // the new bundle. The skipped-during-gap diff is NOT in the // stream (we read with a short timeout to prove no replay). let resnap = match resnap_payload { MapObjectsTopicMessage::Snapshot(s) => s, MapObjectsTopicMessage::Diff(_) => { panic!("first message after reconnect MUST be Snapshot, not Diff"); } }; assert_eq!( resnap.bundle.map_objects.len(), 5, "snapshot must reflect post-gap store" ); assert!( src.snapshots_emitted.load(Ordering::Relaxed) >= 2, "snapshot source must have been re-queried on reconnect" ); // The reconnected stream should NOT immediately deliver another // message — the gap diff was broadcast before reconnect and a // late subscriber MUST not see it. let maybe_extra = timeout(Duration::from_millis(300), stream2.next()).await; assert!( maybe_extra.is_err(), "reconnect MUST NOT replay gap diff (got unexpected message)" ); }