//! `frame_ingest` — RTSP pull + decode + timestamp + publish. //! //! Real implementation lands in: //! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded //! reconnect + AI-lock plumb (this crate, modules in `internal/`). //! - AZ-658 `frame_ingest_decoder` — H.264/265 decode (NVDEC + sw //! fallback) + per-frame monotonic timestamping + decode stats //! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`). //! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer //! drop policy (this crate, `internal/publisher.rs`). //! //! ## AZ-658 surface (extends AZ-657) //! //! `FrameIngest::run` takes a [`FrameDecoder`]. The lifecycle loop //! stamps the capture timestamp the moment a packet leaves the //! transport, hands the encoded payload to the decoder, and emits one //! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set //! when the decoder returned. Single-frame decode errors increment //! `decode_errors_total` and drop the frame; the stream is never //! aborted. The decoder backend (`Nvdec` / `Software`) is observable //! via [`FrameIngestHandle::decoder_backend`]. //! //! ## AZ-659 surface (extends AZ-658) //! //! Decoded frames flow through a [`FramePublisher`]. The publisher //! exposes [`FrameIngestHandle::subscribe_as`] for the three known //! consumers (`detection_client`, `movement_detector`, //! `telemetry_stream`); each subscriber's lag is folded into a //! per-consumer drop counter visible via //! [`FrameIngestHandle::dropped_frames`]. Drops are *counted* and //! `tracing::warn`-logged with a reason tag — never silent. //! `FrameIngestHandle::subscribe()` is preserved for legacy callers //! that don't fit one of the three named consumer roles; lag on //! those raw receivers is not attributed to any consumer counter. use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch, Mutex}; use tokio::task::JoinHandle; use shared::clock::MonoClock; use shared::health::{ComponentHealth, HealthLevel}; use shared::models::frame::Frame; pub mod internal; pub use internal::decoder::{ Codec, DecodeError, DecodeStats, DecodedPixels, DecoderBackend, DecoderInitError, FfmpegDecoder, FrameDecoder, }; pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState}; pub use internal::publisher::{ ConsumerId, FramePublisher, FrameReceiver, PublisherStats, RecvError as FrameRecvError, TryRecvError as FrameTryRecvError, DEFAULT_CHANNEL_DEPTH, }; pub use internal::rtsp_client::{ OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError, }; pub use internal::timestamp::FrameStamper; use internal::lifecycle::{transition, Trigger}; const NAME: &str = "frame_ingest"; /// Threshold past which `health()` flips to `Red` while the session is /// not `Streaming`. Aligned with `description.md §6` (red after /// `last_frame_age_ms` exceeds a configured threshold). const RED_FRAME_AGE: Duration = Duration::from_secs(5); pub struct FrameIngest { publisher: Arc, ai_lock_tx: watch::Sender, state_tx: watch::Sender, shutdown_tx: watch::Sender, backend_tx: watch::Sender>, stats: Arc, decode_stats: Arc, backoff: BackoffPolicy, clock: MonoClock, } impl FrameIngest { /// Default constructor — `channel_capacity` maps directly to the /// publisher's `channel_depth` (see `description.md §3`). Use /// [`Self::with_backoff`] when both the depth and the reopen /// backoff need to be customised. pub fn new(channel_capacity: usize) -> Self { Self::with_backoff( channel_capacity, BackoffPolicy::new(Duration::from_secs(1), Duration::from_secs(30)), ) } pub fn with_backoff(channel_capacity: usize, backoff: BackoffPolicy) -> Self { let publisher = Arc::new(FramePublisher::new(channel_capacity)); let (ai_lock_tx, _) = watch::channel(false); let (state_tx, _) = watch::channel(SessionState::Closed); let (shutdown_tx, _) = watch::channel(false); let (backend_tx, _) = watch::channel(None); Self { publisher, ai_lock_tx, state_tx, shutdown_tx, backend_tx, stats: LifecycleStats::new(), decode_stats: DecodeStats::shared(), backoff, clock: MonoClock::new(), } } /// Shared accessor for the underlying [`FramePublisher`]. The /// composition root passes this `Arc` to consumers that prefer /// to subscribe themselves (named via [`ConsumerId`]) rather /// than receiving a pre-built [`FrameReceiver`] over the /// handle. pub fn publisher(&self) -> Arc { Arc::clone(&self.publisher) } pub fn handle(&self) -> FrameIngestHandle { FrameIngestHandle { publisher: Arc::clone(&self.publisher), ai_lock_tx: self.ai_lock_tx.clone(), state_rx: self.state_tx.subscribe(), shutdown_tx: self.shutdown_tx.clone(), backend_rx: self.backend_tx.subscribe(), stats: Arc::clone(&self.stats), decode_stats: Arc::clone(&self.decode_stats), clock: self.clock, } } /// Spawn the lifecycle loop. Returns a `JoinHandle` that resolves /// when the loop exits (shutdown signalled via /// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM). /// /// `decoder` is owned exclusively by the spawned task; only one /// decoder is active per `FrameIngest` instance. pub fn run(&self, transport: T, decoder: D, config: RtspSessionConfig) -> JoinHandle<()> where T: RtspTransport + 'static, D: FrameDecoder + 'static, { let publisher = Arc::clone(&self.publisher); let ai_lock = self.ai_lock_tx.subscribe(); let state_tx = self.state_tx.clone(); let backend_tx = self.backend_tx.clone(); let shutdown_rx = self.shutdown_tx.subscribe(); let stats = Arc::clone(&self.stats); let decode_stats = Arc::clone(&self.decode_stats); let backoff = self.backoff; let clock = self.clock; let transport = Arc::new(Mutex::new(transport)); let decoder: Box = Box::new(decoder); // Snapshot the decoder backend immediately so it is observable // even before the first packet. backend_tx.send_replace(Some(decoder.backend())); tokio::spawn(async move { lifecycle_loop( transport, decoder, config, publisher, ai_lock, state_tx, shutdown_rx, stats, decode_stats, backoff, clock, ) .await; }) } } fn is_shutdown(rx: &watch::Receiver) -> bool { *rx.borrow() } #[allow(clippy::too_many_arguments)] async fn lifecycle_loop( transport: Arc>, mut decoder: Box, config: RtspSessionConfig, publisher: Arc, mut ai_lock: watch::Receiver, state_tx: watch::Sender, mut shutdown_rx: watch::Receiver, stats: Arc, decode_stats: Arc, backoff: BackoffPolicy, clock: MonoClock, ) where T: RtspTransport, { let mut state = SessionState::Closed; let mut stamper = FrameStamper::new(clock); let mut decoded_buffer: Vec = Vec::with_capacity(4); loop { if is_shutdown(&shutdown_rx) { let mut t = transport.lock().await; t.close().await; state_tx.send_replace(SessionState::Closed); return; } state = transition(state, Trigger::OpenAttempted, &backoff).next; state_tx.send_replace(state); // Race the open call against shutdown so a hung transport // (real RTSP can block on `DESCRIBE` for many seconds) cannot // wedge graceful exit. let open_result = tokio::select! { biased; res = async { let mut t = transport.lock().await; t.open(&config).await } => res, _ = shutdown_rx.changed() => { let mut t = transport.lock().await; t.close().await; state_tx.send_replace(SessionState::Closed); return; } }; match open_result { Ok(()) => { state = transition(state, Trigger::OpenSucceeded, &backoff).next; state_tx.send_replace(state); stats.note_streaming(); loop { let packet = tokio::select! { biased; res = async { let mut t = transport.lock().await; t.next_packet().await } => Some(res), _ = shutdown_rx.changed() => None, }; let Some(packet) = packet else { let mut t = transport.lock().await; t.close().await; state_tx.send_replace(SessionState::Closed); return; }; match packet { Ok(pkt) => { // Capture timestamp + sequence number are // taken at the EARLIEST point per // `description.md §4` — before the decoder // has run, so movement_detector's skew // gate sees the original packet arrival // time. let mark = stamper.capture(); stats.note_packet(mark.ts_ns); let locked = *ai_lock.borrow_and_update(); decoded_buffer.clear(); match decoder.decode(&pkt.payload, &mut decoded_buffer) { Ok(()) => { for dp in decoded_buffer.drain(..) { decode_stats.note_decoded(dp.decode_duration); let frame = Frame { seq: mark.seq, capture_ts_monotonic_ns: mark.ts_ns, decode_ts_monotonic_ns: stamper.decoded(), pixels: Arc::new(dp.pixels), width: dp.width, height: dp.height, pix_fmt: dp.pix_fmt, ai_locked: locked, }; // The publisher folds lag // into per-consumer drop // counters; the lifecycle // loop never blocks on a // slow consumer. Return // value (subscriber count) // is informational. publisher.publish(frame); } } Err(e) => { decode_stats.note_decode_error(); tracing::warn!( error = %e, seq = mark.seq, "frame_ingest dropped a frame on decode error" ); } } } Err(e) => { let trig = Trigger::from_stream_error(&e); let t = transition(state, trig, &backoff); state = t.next; state_tx.send_replace(state); stats.note_reopen(); if let Some(wait) = t.wait_before_next { tokio::time::sleep(wait).await; } if !t.reopen { return; } break; } } } } Err(err) => { let trig = Trigger::from_open_error(&err); let t = transition(state, trig, &backoff); state = t.next; state_tx.send_replace(state); if let SessionState::Failing { attempt } = state { stats.note_open_failure(attempt); } if let Some(wait) = t.wait_before_next { tokio::time::sleep(wait).await; } if !t.reopen { // Hard-fail (e.g. UnsupportedProfile): leave the // FSM parked in Failing and exit. The supervisor // restarts the process; the operator decides. return; } } } } } #[derive(Clone)] pub struct FrameIngestHandle { publisher: Arc, ai_lock_tx: watch::Sender, state_rx: watch::Receiver, shutdown_tx: watch::Sender, backend_rx: watch::Receiver>, stats: Arc, decode_stats: Arc, clock: MonoClock, } impl FrameIngestHandle { /// Raw, unaccounted subscription. Used by legacy callers and /// tests that don't fit one of the three named [`ConsumerId`] /// roles. Lag on this receiver is *not* attributed to any /// per-consumer drop counter — prefer [`Self::subscribe_as`] for /// production consumers so the per-consumer drop dashboard /// stays accurate. pub fn subscribe(&self) -> broadcast::Receiver { self.publisher.subscribe_raw() } /// Subscribe under a named consumer identity. Per-consumer lag /// is folded into the matching drop counter and surfaced via /// [`Self::dropped_frames`]. The returned [`FrameReceiver`] /// transparently retries past lag so callers never observe /// `Lagged` — they only see the next available frame. pub fn subscribe_as(&self, consumer: ConsumerId) -> FrameReceiver { self.publisher.subscribe(consumer) } /// Shared accessor for the underlying [`FramePublisher`]. Useful /// when a consumer needs to subscribe multiple times (e.g. /// reopening a receiver after a transient logical reset) without /// holding the full ingest handle. pub fn publisher(&self) -> Arc { Arc::clone(&self.publisher) } /// Per-consumer drop counter. Increments by `n` every time the /// matching [`FrameReceiver`] would otherwise have surfaced /// `RecvError::Lagged(n)`. pub fn dropped_frames(&self, consumer: ConsumerId) -> u64 { self.publisher.stats().drops_for(consumer) } /// Total publish attempts since the publisher was constructed. /// Increments on every decoded frame even when there are zero /// subscribers — the metric is the publish *rate*, not the /// delivered-frame rate. Use [`Self::dropped_frames`] for the /// delivered-vs-published delta per consumer. pub fn publishes_total(&self) -> u64 { self.publisher.stats().publishes_total() } /// `bringCameraDown`/`bringCameraUp` per `description.md §2`. When /// `locked == true`, every subsequently emitted frame has /// `Frame::ai_locked = true` and downstream AI consumers /// (detection_client, movement_detector) MUST skip detection. /// `telemetry_stream` continues consuming so the operator sees /// the raw stream. pub fn set_ai_lock(&self, locked: bool) { self.ai_lock_tx.send_replace(locked); } pub fn ai_locked(&self) -> bool { *self.ai_lock_tx.borrow() } pub fn session_state(&self) -> SessionState { *self.state_rx.borrow() } /// Subscribe to FSM state transitions. Useful for operator UI and /// supervisor watchdogs (the latter restarts on prolonged /// `Failing`). pub fn session_state_stream(&self) -> watch::Receiver { self.state_rx.clone() } pub fn reopens_total(&self) -> u64 { self.stats.reopens_total.load(Ordering::Relaxed) } /// Backend the active decoder selected at construction. `None` /// before `FrameIngest::run` has been called. pub fn decoder_backend(&self) -> Option { *self.backend_rx.borrow() } pub fn decode_errors_total(&self) -> u64 { self.decode_stats .decode_errors_total .load(Ordering::Relaxed) } pub fn frames_decoded_total(&self) -> u64 { self.decode_stats .frames_decoded_total .load(Ordering::Relaxed) } pub fn decode_ms_first_frame(&self) -> Option { let ns = self .decode_stats .first_frame_decode_duration_ns .load(Ordering::Relaxed); if ns == 0 && self.frames_decoded_total() == 0 { None } else { Some(Duration::from_nanos(ns)) } } pub fn decode_ms_p50(&self) -> Option { self.decode_stats.p50_ns().map(Duration::from_nanos) } pub fn decode_ms_p99(&self) -> Option { self.decode_stats.p99_ns().map(Duration::from_nanos) } /// Request the lifecycle loop to drain to `Closed` and exit. The /// loop races every transport call against this signal, so a /// hung transport cannot wedge graceful exit. pub fn shutdown(&self) { self.shutdown_tx.send_replace(true); } pub fn health(&self) -> ComponentHealth { let state = self.session_state(); let now_ns = self.clock.elapsed_ns(); let last_pkt_ns = self.stats.last_packet_at_ns.load(Ordering::Relaxed); let age = now_ns.saturating_sub(last_pkt_ns); match state { SessionState::Closed => ComponentHealth::disabled(NAME), SessionState::Streaming if last_pkt_ns == 0 => { ComponentHealth::yellow(NAME, "streaming, awaiting first packet") } SessionState::Streaming if age > RED_FRAME_AGE.as_nanos() as u64 => { ComponentHealth::red(NAME, format!("last packet age {} ms", age / 1_000_000)) } SessionState::Streaming => { let mut h = ComponentHealth::green(NAME); if self.ai_locked() { h.level = HealthLevel::Yellow; h.detail = Some("ai_locked".to_string()); } h } SessionState::Connecting { attempt } => { ComponentHealth::yellow(NAME, format!("connecting (attempt {attempt})")) } SessionState::Failing { attempt } => { if age > RED_FRAME_AGE.as_nanos() as u64 { ComponentHealth::red(NAME, format!("failing, attempt {attempt}")) } else { ComponentHealth::yellow(NAME, format!("failing, attempt {attempt}")) } } } } } #[cfg(test)] mod tests { use super::*; #[test] fn it_compiles() { let h = FrameIngest::new(8).handle(); assert_eq!(h.session_state(), SessionState::Closed); assert_eq!(h.health().level, HealthLevel::Disabled); assert!( h.decoder_backend().is_none(), "no decoder is wired until run() is called" ); } #[test] fn ai_lock_toggle_propagates() { // Arrange let ingest = FrameIngest::new(8); let handle = ingest.handle(); // Act handle.set_ai_lock(true); // Assert assert!(handle.ai_locked()); handle.set_ai_lock(false); assert!(!handle.ai_locked()); } #[test] fn handle_exposes_publisher_metrics_before_run() { // Arrange let ingest = FrameIngest::new(4); let handle = ingest.handle(); // Assert — fresh publisher exposes zero metrics for every // known consumer (the AZ-659 health surface contract). assert_eq!(handle.publishes_total(), 0); assert_eq!(handle.dropped_frames(ConsumerId::DetectionClient), 0); assert_eq!(handle.dropped_frames(ConsumerId::MovementDetector), 0); assert_eq!(handle.dropped_frames(ConsumerId::Telemetry), 0); assert_eq!( handle.publisher().channel_depth(), 4, "channel_capacity from constructor must propagate to the publisher" ); } }