//! AZ-657 integration tests — RTSP session lifecycle, bounded //! reconnect, AI-lock plumb. //! //! Uses a [`FakeRtspTransport`] (not a real RTSP server) to keep tests //! deterministic and free of external fixtures. The session lifecycle //! FSM in `FrameIngest::run` is the production deliverable; the real //! retina-backed transport that talks to the camera lands in AZ-658 //! alongside the H.264 decoder. use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::mpsc; use tokio::time::{timeout, Instant}; use frame_ingest::{ BackoffPolicy, DecodeError, DecodedPixels, DecoderBackend, FrameDecoder, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, SessionState, StreamError, }; use shared::models::frame::PixelFormat; /// Test-only decoder that pushes one synthetic `DecodedPixels` per /// call. Used by the AZ-657 lifecycle tests, which verify FSM / /// reconnect / AI-lock semantics — they don't care what pixels the /// decoder produced. The production decoder path is exercised /// separately by `decoder_pipeline.rs` (AZ-658). struct StubDecoder; impl FrameDecoder for StubDecoder { fn backend(&self) -> DecoderBackend { DecoderBackend::Software } fn decode(&mut self, payload: &[u8], out: &mut Vec) -> Result<(), DecodeError> { out.push(DecodedPixels { pixels: Bytes::copy_from_slice(payload), width: 320, height: 240, pix_fmt: PixelFormat::Nv12, decode_duration: Duration::from_micros(100), }); Ok(()) } } #[derive(Debug, Clone)] enum Scripted { OpenOk, OpenFail(OpenErrKind), OpenHardFail, PacketOk, StreamDropped, } #[derive(Debug, Clone, Copy)] enum OpenErrKind { Timeout, Network, } impl OpenErrKind { fn into_err(self) -> OpenError { match self { OpenErrKind::Timeout => OpenError::Timeout, OpenErrKind::Network => OpenError::Network("connection refused".to_string()), } } } /// Test-driven RTSP transport. The lifecycle loop pulls events from /// an mpsc channel that the test pushes into. When the channel is /// empty the transport parks (mirroring a healthy idle RTSP open /// that blocks until the next packet arrives). The test ends the /// session via `FrameIngestHandle::shutdown`, which the lifecycle /// loop observes through `tokio::select!`. struct FakeRtspTransport { rx: Arc>>, opens: Arc, packets_sent: Arc, } /// Controller side of the fake transport. The test pushes events, /// the lifecycle loop consumes them. struct ScriptCtl { tx: mpsc::UnboundedSender, } impl ScriptCtl { fn push(&self, ev: Scripted) { self.tx.send(ev).expect("script controller channel closed"); } } impl FakeRtspTransport { fn new() -> (Self, ScriptCtl, Arc, Arc) { let (tx, rx) = mpsc::unbounded_channel(); let opens = Arc::new(AtomicU32::new(0)); let packets_sent = Arc::new(AtomicU32::new(0)); ( Self { rx: Arc::new(tokio::sync::Mutex::new(rx)), opens: Arc::clone(&opens), packets_sent: Arc::clone(&packets_sent), }, ScriptCtl { tx }, opens, packets_sent, ) } fn from_script(script: Vec) -> (Self, ScriptCtl, Arc, Arc) { let (t, ctl, o, p) = Self::new(); for ev in script { ctl.push(ev); } (t, ctl, o, p) } async fn next_event(&self) -> Scripted { let mut rx = self.rx.lock().await; match rx.recv().await { Some(ev) => ev, // Sender dropped → park forever; the lifecycle observes // shutdown via select! and exits cleanly. None => std::future::pending().await, } } } #[async_trait] impl RtspTransport for FakeRtspTransport { async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> { self.opens.fetch_add(1, Ordering::Relaxed); match self.next_event().await { Scripted::OpenOk => Ok(()), Scripted::OpenFail(kind) => Err(kind.into_err()), Scripted::OpenHardFail => Err(OpenError::UnsupportedProfile { details: "H265 main10 not supported".to_string(), }), other => Err(OpenError::Network(format!( "fake transport: open called when script expected {other:?}" ))), } } async fn close(&mut self) {} async fn next_packet(&mut self) -> Result { match self.next_event().await { Scripted::PacketOk => { self.packets_sent.fetch_add(1, Ordering::Relaxed); Ok(RtspPacket { timestamp_rtp: 0, payload: Bytes::from_static(b"nal-unit"), }) } Scripted::StreamDropped => Err(StreamError::Dropped("scripted drop".to_string())), // Out-of-band events while streaming surface as a drop // so the FSM re-enters the reconnect ladder. other => Err(StreamError::Dropped(format!( "script expected non-packet: {other:?}" ))), } } } fn fast_backoff() -> BackoffPolicy { BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40)) } /// AC-1 — happy path: a single `OpenOk` followed by a packet must /// bring the FSM to `Streaming` and emit a frame on the broadcast. #[tokio::test] async fn ac1_open_succeeds_and_session_reaches_streaming() { // Arrange let (transport, _ctl, opens, packets) = FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]); let ingest = FrameIngest::with_backoff(8, fast_backoff()); let handle = ingest.handle(); let mut frames = handle.subscribe(); // Act let task = ingest.run( transport, StubDecoder, RtspSessionConfig::new("rtsp://fake/0"), ); let first = timeout(Duration::from_secs(1), frames.recv()) .await .expect("frame within 1 s") .expect("broadcast send succeeded"); // Assert — receiving the frame proves Closed → Connecting → // Streaming was traversed; the FakeTransport parks after the // packet so the FSM stays in Streaming. assert!(!first.ai_locked, "ai_lock should default to false"); assert_eq!(handle.session_state(), SessionState::Streaming); assert_eq!(opens.load(Ordering::Relaxed), 1); assert_eq!(packets.load(Ordering::Relaxed), 1); handle.shutdown(); let _ = timeout(Duration::from_secs(1), task) .await .expect("lifecycle exits on shutdown"); } /// AC-2 — bounded reconnect: an initial failure followed by a success /// must increment `reopens_total` and converge to `Streaming`. The /// backoff sleeps used (initial 10 ms, doubling) must be observed via /// elapsed wall time. #[tokio::test] async fn ac2_bounded_reconnect_recovers_after_transient_failure() { // Arrange let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![ Scripted::OpenFail(OpenErrKind::Network), Scripted::OpenFail(OpenErrKind::Timeout), Scripted::OpenOk, Scripted::PacketOk, ]); let ingest = FrameIngest::with_backoff(8, fast_backoff()); let handle = ingest.handle(); let mut frames = handle.subscribe(); let started = Instant::now(); // Act let task = ingest.run( transport, StubDecoder, RtspSessionConfig::new("rtsp://fake/0"), ); let _ = timeout(Duration::from_secs(2), frames.recv()) .await .expect("frame within 2 s") .expect("broadcast send succeeded"); let elapsed = started.elapsed(); // Assert assert!( elapsed >= Duration::from_millis(30), "must observe two backoff sleeps (10 ms + 20 ms = 30 ms), got {elapsed:?}" ); assert_eq!(handle.session_state(), SessionState::Streaming); assert_eq!(opens.load(Ordering::Relaxed), 3); handle.shutdown(); let _ = timeout(Duration::from_secs(1), task).await; } /// AC-2.b — stream drop after streaming starts must re-enter /// `Failing` and reopen. #[tokio::test] async fn ac2b_stream_drop_increments_reopens_total() { // Arrange let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![ Scripted::OpenOk, Scripted::PacketOk, Scripted::StreamDropped, Scripted::OpenOk, Scripted::PacketOk, ]); let ingest = FrameIngest::with_backoff(8, fast_backoff()); let handle = ingest.handle(); let mut frames = handle.subscribe(); // Act let task = ingest.run( transport, StubDecoder, RtspSessionConfig::new("rtsp://fake/0"), ); let _ = timeout(Duration::from_secs(1), frames.recv()) .await .expect("first frame") .expect("first frame ok"); let _ = timeout(Duration::from_secs(1), frames.recv()) .await .expect("second frame") .expect("second frame ok"); // Assert assert!( handle.reopens_total() >= 1, "stream drop must record at least one reopen, got {}", handle.reopens_total() ); assert_eq!(opens.load(Ordering::Relaxed), 2); assert_eq!(handle.session_state(), SessionState::Streaming); handle.shutdown(); let _ = timeout(Duration::from_secs(1), task).await; } /// AC-3 — SPS/PPS mismatch must hard-fail the session. The loop /// exits and does NOT retry, leaving the FSM in `Failing` with no /// further opens. #[tokio::test] async fn ac3_unsupported_profile_hard_fails_session() { // Arrange let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![Scripted::OpenHardFail]); let ingest = FrameIngest::with_backoff(8, fast_backoff()); let handle = ingest.handle(); // Act let task = ingest.run( transport, StubDecoder, RtspSessionConfig::new("rtsp://fake/0"), ); let _ = timeout(Duration::from_secs(1), task) .await .expect("lifecycle loop exits on hard-fail"); // Assert assert!(matches!( handle.session_state(), SessionState::Failing { .. } )); assert_eq!(opens.load(Ordering::Relaxed), 1, "no automatic retry"); } /// AC-4 — AI-lock toggle: every frame emitted AFTER `set_ai_lock(true)` /// must carry `ai_locked = true`. The test controls packet emission /// timing via `ScriptCtl` so the toggle is guaranteed to precede the /// second packet. #[tokio::test] async fn ac4_ai_lock_toggle_propagates_to_frames() { // Arrange let (transport, ctl, _opens, _packets) = FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]); let ingest = FrameIngest::with_backoff(8, fast_backoff()); let handle = ingest.handle(); let mut frames = handle.subscribe(); // Act let task = ingest.run( transport, StubDecoder, RtspSessionConfig::new("rtsp://fake/0"), ); let f1 = timeout(Duration::from_secs(1), frames.recv()) .await .expect("first frame") .expect("first frame ok"); handle.set_ai_lock(true); ctl.push(Scripted::PacketOk); let f2 = timeout(Duration::from_secs(1), frames.recv()) .await .expect("second frame") .expect("second frame ok"); // Assert assert!(!f1.ai_locked, "pre-toggle frame must be unlocked"); assert!( f2.ai_locked, "post-toggle frame must carry ai_locked = true" ); assert!(handle.ai_locked()); handle.shutdown(); let _ = timeout(Duration::from_secs(1), task).await; }