//! `frame_ingest` — RTSP pull + decode + timestamp.
//!
//! Real implementation lands in:
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
//! reconnect + AI-lock plumb (this crate, modules in `internal/`).
//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode (NVDEC + sw
//! fallback) + per-frame monotonic timestamping + decode stats
//! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`).
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
//! drop policy.
//!
//! ## AZ-658 surface (extends AZ-657)
//!
//! `FrameIngest::run` now takes a [`FrameDecoder`]. The lifecycle loop
//! stamps the capture timestamp the moment a packet leaves the
//! transport, hands the encoded payload to the decoder, and emits one
//! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set
//! when the decoder returned. Single-frame decode errors increment
//! `decode_errors_total` and drop the frame; the stream is never
//! aborted (AC-3). The decoder backend (`Nvdec` / `Software`) is
//! observable via [`FrameIngestHandle::decoder_backend`].
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch, Mutex};
use tokio::task::JoinHandle;
use shared::clock::MonoClock;
use shared::health::{ComponentHealth, HealthLevel};
use shared::models::frame::Frame;
pub mod internal;
pub use internal::decoder::{
Codec, DecodeError, DecodeStats, DecodedPixels, DecoderBackend, DecoderInitError,
FfmpegDecoder, FrameDecoder,
};
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
pub use internal::rtsp_client::{
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
};
pub use internal::timestamp::FrameStamper;
use internal::lifecycle::{transition, Trigger};
const NAME: &str = "frame_ingest";
/// Threshold past which `health()` flips to `Red` while the session is
/// not `Streaming`. Aligned with `description.md §6` (red after
/// `last_frame_age_ms` exceeds a configured threshold).
const RED_FRAME_AGE: Duration = Duration::from_secs(5);
pub struct FrameIngest {
tx: broadcast::Sender ,
ai_lock_tx: watch::Sender,
state_tx: watch::Sender,
shutdown_tx: watch::Sender,
backend_tx: watch::Sender>,
stats: Arc,
decode_stats: Arc,
backoff: BackoffPolicy,
clock: MonoClock,
}
impl FrameIngest {
pub fn new(channel_capacity: usize) -> Self {
Self::with_backoff(
channel_capacity,
BackoffPolicy::new(Duration::from_secs(1), Duration::from_secs(30)),
)
}
pub fn with_backoff(channel_capacity: usize, backoff: BackoffPolicy) -> Self {
let (tx, _rx) = broadcast::channel(channel_capacity);
let (ai_lock_tx, _) = watch::channel(false);
let (state_tx, _) = watch::channel(SessionState::Closed);
let (shutdown_tx, _) = watch::channel(false);
let (backend_tx, _) = watch::channel(None);
Self {
tx,
ai_lock_tx,
state_tx,
shutdown_tx,
backend_tx,
stats: LifecycleStats::new(),
decode_stats: DecodeStats::shared(),
backoff,
clock: MonoClock::new(),
}
}
pub fn handle(&self) -> FrameIngestHandle {
FrameIngestHandle {
tx: self.tx.clone(),
ai_lock_tx: self.ai_lock_tx.clone(),
state_rx: self.state_tx.subscribe(),
shutdown_tx: self.shutdown_tx.clone(),
backend_rx: self.backend_tx.subscribe(),
stats: Arc::clone(&self.stats),
decode_stats: Arc::clone(&self.decode_stats),
clock: self.clock,
}
}
/// Spawn the lifecycle loop. Returns a `JoinHandle` that resolves
/// when the loop exits (shutdown signalled via
/// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM).
///
/// `decoder` is owned exclusively by the spawned task; only one
/// decoder is active per `FrameIngest` instance.
pub fn run(&self, transport: T, decoder: D, config: RtspSessionConfig) -> JoinHandle<()>
where
T: RtspTransport + 'static,
D: FrameDecoder + 'static,
{
let tx = self.tx.clone();
let ai_lock = self.ai_lock_tx.subscribe();
let state_tx = self.state_tx.clone();
let backend_tx = self.backend_tx.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
let stats = Arc::clone(&self.stats);
let decode_stats = Arc::clone(&self.decode_stats);
let backoff = self.backoff;
let clock = self.clock;
let transport = Arc::new(Mutex::new(transport));
let decoder: Box = Box::new(decoder);
// Snapshot the decoder backend immediately so it is observable
// even before the first packet.
backend_tx.send_replace(Some(decoder.backend()));
tokio::spawn(async move {
lifecycle_loop(
transport,
decoder,
config,
tx,
ai_lock,
state_tx,
shutdown_rx,
stats,
decode_stats,
backoff,
clock,
)
.await;
})
}
}
fn is_shutdown(rx: &watch::Receiver) -> bool {
*rx.borrow()
}
#[allow(clippy::too_many_arguments)]
async fn lifecycle_loop(
transport: Arc>,
mut decoder: Box,
config: RtspSessionConfig,
tx: broadcast::Sender ,
mut ai_lock: watch::Receiver,
state_tx: watch::Sender,
mut shutdown_rx: watch::Receiver,
stats: Arc,
decode_stats: Arc,
backoff: BackoffPolicy,
clock: MonoClock,
) where
T: RtspTransport,
{
let mut state = SessionState::Closed;
let mut stamper = FrameStamper::new(clock);
let mut decoded_buffer: Vec = Vec::with_capacity(4);
loop {
if is_shutdown(&shutdown_rx) {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
}
state = transition(state, Trigger::OpenAttempted, &backoff).next;
state_tx.send_replace(state);
// Race the open call against shutdown so a hung transport
// (real RTSP can block on `DESCRIBE` for many seconds) cannot
// wedge graceful exit.
let open_result = tokio::select! {
biased;
res = async {
let mut t = transport.lock().await;
t.open(&config).await
} => res,
_ = shutdown_rx.changed() => {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
}
};
match open_result {
Ok(()) => {
state = transition(state, Trigger::OpenSucceeded, &backoff).next;
state_tx.send_replace(state);
stats.note_streaming();
loop {
let packet = tokio::select! {
biased;
res = async {
let mut t = transport.lock().await;
t.next_packet().await
} => Some(res),
_ = shutdown_rx.changed() => None,
};
let Some(packet) = packet else {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
};
match packet {
Ok(pkt) => {
// Capture timestamp + sequence number are
// taken at the EARLIEST point per
// `description.md §4` — before the decoder
// has run, so movement_detector's skew
// gate sees the original packet arrival
// time.
let mark = stamper.capture();
stats.note_packet(mark.ts_ns);
let locked = *ai_lock.borrow_and_update();
decoded_buffer.clear();
match decoder.decode(&pkt.payload, &mut decoded_buffer) {
Ok(()) => {
for dp in decoded_buffer.drain(..) {
decode_stats.note_decoded(dp.decode_duration);
let frame = Frame {
seq: mark.seq,
capture_ts_monotonic_ns: mark.ts_ns,
decode_ts_monotonic_ns: stamper.decoded(),
pixels: Arc::new(dp.pixels),
width: dp.width,
height: dp.height,
pix_fmt: dp.pix_fmt,
ai_locked: locked,
};
// Send errors are no-ops when
// the broadcast has no
// subscribers; per-consumer
// back-pressure is AZ-659's
// problem.
let _ = tx.send(frame);
}
}
Err(e) => {
decode_stats.note_decode_error();
tracing::warn!(
error = %e,
seq = mark.seq,
"frame_ingest dropped a frame on decode error"
);
}
}
}
Err(e) => {
let trig = Trigger::from_stream_error(&e);
let t = transition(state, trig, &backoff);
state = t.next;
state_tx.send_replace(state);
stats.note_reopen();
if let Some(wait) = t.wait_before_next {
tokio::time::sleep(wait).await;
}
if !t.reopen {
return;
}
break;
}
}
}
}
Err(err) => {
let trig = Trigger::from_open_error(&err);
let t = transition(state, trig, &backoff);
state = t.next;
state_tx.send_replace(state);
if let SessionState::Failing { attempt } = state {
stats.note_open_failure(attempt);
}
if let Some(wait) = t.wait_before_next {
tokio::time::sleep(wait).await;
}
if !t.reopen {
// Hard-fail (e.g. UnsupportedProfile): leave the
// FSM parked in Failing and exit. The supervisor
// restarts the process; the operator decides.
return;
}
}
}
}
}
#[derive(Clone)]
pub struct FrameIngestHandle {
tx: broadcast::Sender ,
ai_lock_tx: watch::Sender,
state_rx: watch::Receiver,
shutdown_tx: watch::Sender,
backend_rx: watch::Receiver>,
stats: Arc,
decode_stats: Arc,
clock: MonoClock,
}
impl FrameIngestHandle {
/// Subscribe to the frame stream. Consumers receive every frame
/// after they subscribed; back-pressure is implemented via
/// broadcast channel lag (see AZ-659 for the slow-consumer
/// policy).
pub fn subscribe(&self) -> broadcast::Receiver {
self.tx.subscribe()
}
/// `bringCameraDown`/`bringCameraUp` per `description.md §2`. When
/// `locked == true`, every subsequently emitted frame has
/// `Frame::ai_locked = true` and downstream AI consumers
/// (detection_client, movement_detector) MUST skip detection.
/// `telemetry_stream` continues consuming so the operator sees
/// the raw stream.
pub fn set_ai_lock(&self, locked: bool) {
self.ai_lock_tx.send_replace(locked);
}
pub fn ai_locked(&self) -> bool {
*self.ai_lock_tx.borrow()
}
pub fn session_state(&self) -> SessionState {
*self.state_rx.borrow()
}
/// Subscribe to FSM state transitions. Useful for operator UI and
/// supervisor watchdogs (the latter restarts on prolonged
/// `Failing`).
pub fn session_state_stream(&self) -> watch::Receiver {
self.state_rx.clone()
}
pub fn reopens_total(&self) -> u64 {
self.stats.reopens_total.load(Ordering::Relaxed)
}
/// Backend the active decoder selected at construction. `None`
/// before `FrameIngest::run` has been called.
pub fn decoder_backend(&self) -> Option {
*self.backend_rx.borrow()
}
pub fn decode_errors_total(&self) -> u64 {
self.decode_stats
.decode_errors_total
.load(Ordering::Relaxed)
}
pub fn frames_decoded_total(&self) -> u64 {
self.decode_stats
.frames_decoded_total
.load(Ordering::Relaxed)
}
pub fn decode_ms_first_frame(&self) -> Option {
let ns = self
.decode_stats
.first_frame_decode_duration_ns
.load(Ordering::Relaxed);
if ns == 0 && self.frames_decoded_total() == 0 {
None
} else {
Some(Duration::from_nanos(ns))
}
}
pub fn decode_ms_p50(&self) -> Option {
self.decode_stats.p50_ns().map(Duration::from_nanos)
}
pub fn decode_ms_p99(&self) -> Option {
self.decode_stats.p99_ns().map(Duration::from_nanos)
}
/// Request the lifecycle loop to drain to `Closed` and exit. The
/// loop races every transport call against this signal, so a
/// hung transport cannot wedge graceful exit.
pub fn shutdown(&self) {
self.shutdown_tx.send_replace(true);
}
pub fn health(&self) -> ComponentHealth {
let state = self.session_state();
let now_ns = self.clock.elapsed_ns();
let last_pkt_ns = self.stats.last_packet_at_ns.load(Ordering::Relaxed);
let age = now_ns.saturating_sub(last_pkt_ns);
match state {
SessionState::Closed => ComponentHealth::disabled(NAME),
SessionState::Streaming if last_pkt_ns == 0 => {
ComponentHealth::yellow(NAME, "streaming, awaiting first packet")
}
SessionState::Streaming if age > RED_FRAME_AGE.as_nanos() as u64 => {
ComponentHealth::red(NAME, format!("last packet age {} ms", age / 1_000_000))
}
SessionState::Streaming => {
let mut h = ComponentHealth::green(NAME);
if self.ai_locked() {
h.level = HealthLevel::Yellow;
h.detail = Some("ai_locked".to_string());
}
h
}
SessionState::Connecting { attempt } => {
ComponentHealth::yellow(NAME, format!("connecting (attempt {attempt})"))
}
SessionState::Failing { attempt } => {
if age > RED_FRAME_AGE.as_nanos() as u64 {
ComponentHealth::red(NAME, format!("failing, attempt {attempt}"))
} else {
ComponentHealth::yellow(NAME, format!("failing, attempt {attempt}"))
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_compiles() {
let h = FrameIngest::new(8).handle();
assert_eq!(h.session_state(), SessionState::Closed);
assert_eq!(h.health().level, HealthLevel::Disabled);
assert!(
h.decoder_backend().is_none(),
"no decoder is wired until run() is called"
);
}
#[test]
fn ai_lock_toggle_propagates() {
// Arrange
let ingest = FrameIngest::new(8);
let handle = ingest.handle();
// Act
handle.set_ai_lock(true);
// Assert
assert!(handle.ai_locked());
handle.set_ai_lock(false);
assert!(!handle.ai_locked());
}
}