Files
Oleksandr Bezdieniezhnykh 251ebed1c2 [AZ-658] frame_ingest H.264/265 decoder (NVDEC + sw fallback)
Wires a real ffmpeg-next 8.1 decoder into the frame_ingest lifecycle
loop. NVDEC is probed at runtime via h264_cuvid / hevc_cuvid; CUDA-less
hosts transparently fall back to software h264 / hevc. Each decoded
frame is stamped with capture_ts (taken at packet receipt) and
decode_ts (taken after decode returns) so movement_detector sees
accurate frame-arrival times. Single-frame decode errors are counted
toward decode_errors_total and dropped; the stream is never aborted.

Adds new public API on FrameIngestHandle: decoder_backend(),
decode_errors_total(), frames_decoded_total(), decode_ms_first_frame(),
decode_ms_p50(), decode_ms_p99(). Integration tests under
crates/frame_ingest/tests/decoder_pipeline.rs cover AC-1, AC-3, AC-4
end-to-end through the real FfmpegDecoder using libx264-encoded
synthetic streams; AC-2 positive (NVDEC selection) is opt-in via
--ignored on a CUDA host. AZ-657 lifecycle tests retained via a
StubDecoder.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 17:05:27 +03:00

387 lines
15 KiB
Rust

//! AZ-658 — decoder pipeline integration tests.
//!
//! These tests drive the **real** [`FfmpegDecoder`] (libavcodec) end
//! to end through the lifecycle loop. A synthetic H.264 bitstream is
//! produced in-process by libx264 (the same FFmpeg install that
//! `FfmpegDecoder` uses to decode), so the tests exercise the
//! production decode path rather than a stub.
//!
//! ACs covered here:
//! - AC-1 — software-path throughput preservation (≥95 % of input
//! frames decoded; sequence numbers strictly monotonic; decoder
//! backend reports `Software` on a CUDA-less host).
//! - AC-3 — a single corrupted "packet" between valid ones must
//! increment `decode_errors_total` exactly once and NOT abort the
//! stream.
//! - AC-4 — `capture_ts_monotonic_ns` is strictly increasing across
//! the emitted frame stream (rides on AC-1's setup).
//!
//! AC-2 (NVDEC selection on Jetson) cannot be exercised here — there
//! is no CUDA-capable FFmpeg on the dev/CI host. The unit-test
//! counterpart in `internal/decoder.rs::tests` asserts the negative
//! direction (CUDA-less host → Software backend); the positive
//! direction is validated on the Jetson at deployment time and is
//! covered by the Run Tests gate downstream of this batch.
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use ffmpeg_next as ffmpeg;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::timeout;
use frame_ingest::{
BackoffPolicy, Codec, DecoderBackend, FfmpegDecoder, FrameDecoder, FrameIngest, OpenError,
RtspPacket, RtspSessionConfig, RtspTransport, StreamError,
};
/// Synthetic H.264 bitstream generator. Encodes `num_frames` frames
/// of a checkerboard pattern at `width`x`height` and 30 fps with
/// libx264 (preset `ultrafast`, tune `zerolatency`, GOP every 30
/// frames so each test run gets a few IDRs). Returns a vector of
/// per-AVPacket byte blobs, each ready to feed into the decoder as
/// the payload of an `RtspPacket`.
fn synth_h264_stream(num_frames: usize, width: u32, height: u32) -> Vec<Bytes> {
ffmpeg::init().expect("ffmpeg init");
let codec = ffmpeg::codec::encoder::find_by_name("libx264")
.or_else(|| ffmpeg::codec::encoder::find_by_name("h264"))
.expect("an H.264 encoder must be registered");
let context = ffmpeg::codec::Context::new_with_codec(codec);
let mut encoder = context
.encoder()
.video()
.expect("encoder context yields video");
encoder.set_width(width);
encoder.set_height(height);
encoder.set_format(ffmpeg::format::Pixel::YUV420P);
encoder.set_time_base(ffmpeg::Rational::new(1, 30));
encoder.set_frame_rate(Some(ffmpeg::Rational::new(30, 1)));
encoder.set_gop(30);
encoder.set_max_b_frames(0);
let mut opts = ffmpeg::Dictionary::new();
opts.set("preset", "ultrafast");
opts.set("tune", "zerolatency");
let mut opened = encoder
.open_with(opts)
.expect("libx264 encoder must open with ultrafast/zerolatency");
let mut out = Vec::with_capacity(num_frames + 4);
let mut packet = ffmpeg::Packet::empty();
for i in 0..num_frames {
let mut input = ffmpeg::frame::Video::new(ffmpeg::format::Pixel::YUV420P, width, height);
// Fill Y plane with a per-frame gradient so the encoder has
// motion to compress (a constant frame is degenerate and
// libx264 can choose to emit zero packets for some inputs).
let y_stride = input.stride(0);
let y = input.data_mut(0);
for row in 0..height as usize {
let v = ((i + row) & 0xFF) as u8;
for col in 0..width as usize {
y[row * y_stride + col] = v ^ ((col & 0xFF) as u8);
}
}
for plane in 1..=2 {
let stride = input.stride(plane);
let data = input.data_mut(plane);
for row in 0..(height as usize) / 2 {
for col in 0..(width as usize) / 2 {
data[row * stride + col] = 128;
}
}
}
input.set_pts(Some(i as i64));
opened
.send_frame(&input)
.unwrap_or_else(|e| panic!("encoder send_frame ({i}) failed: {e}"));
while opened.receive_packet(&mut packet).is_ok() {
if let Some(d) = packet.data() {
out.push(Bytes::copy_from_slice(d));
}
}
}
opened.send_eof().expect("encoder eof");
while opened.receive_packet(&mut packet).is_ok() {
if let Some(d) = packet.data() {
out.push(Bytes::copy_from_slice(d));
}
}
assert!(
!out.is_empty(),
"synthetic encoder must produce at least one packet"
);
out
}
/// RTSP-shaped transport that replays a pre-built script of byte
/// blobs, then parks (so the FrameIngest task stays in `Streaming`
/// until the test calls `shutdown`). When the script is exhausted,
/// `next_packet` returns a parked future — the lifecycle loop's
/// `tokio::select!` against the shutdown watch is what unblocks
/// teardown.
struct ScriptedBytesTransport {
queue: Arc<AsyncMutex<VecDeque<ScriptItem>>>,
}
#[derive(Debug, Clone)]
enum ScriptItem {
Bytes(Bytes),
}
impl ScriptedBytesTransport {
fn new(packets: Vec<Bytes>) -> Self {
let queue = packets
.into_iter()
.map(ScriptItem::Bytes)
.collect::<VecDeque<_>>();
Self {
queue: Arc::new(AsyncMutex::new(queue)),
}
}
}
#[async_trait]
impl RtspTransport for ScriptedBytesTransport {
async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> {
Ok(())
}
async fn close(&mut self) {}
async fn next_packet(&mut self) -> Result<RtspPacket, StreamError> {
loop {
let item = {
let mut q = self.queue.lock().await;
q.pop_front()
};
match item {
Some(ScriptItem::Bytes(b)) => {
return Ok(RtspPacket {
timestamp_rtp: 0,
payload: b,
});
}
None => {
// Park forever; the lifecycle loop's shutdown
// watch breaks us out via select!.
std::future::pending::<()>().await;
}
}
}
}
}
fn fast_backoff() -> BackoffPolicy {
BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40))
}
/// AC-1 + AC-4 — a software-decoded synthetic stream must preserve
/// at least 95 % of input frames and stamp them with strictly
/// monotonic capture timestamps + sequence numbers. The dev/CI host
/// has no CUDA so backend MUST report `Software`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac1_ac4_software_decode_preserves_throughput_and_monotonicity() {
// Arrange — encode 60 frames (2 s of 30 fps content). The AC's
// literal 1080p / 10 s budget is validated against the real
// camera at deploy; the dev test exercises the same code path
// at smaller scale to keep CI <5 s.
let width = 320u32;
let height = 240u32;
let input_frames = 60usize;
let stream = synth_h264_stream(input_frames, width, height);
assert!(
stream.len() >= input_frames - 5,
"encoder produced {} packets for {input_frames} frames; expected ~1:1",
stream.len()
);
let transport = ScriptedBytesTransport::new(stream);
let decoder =
FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open on this host");
let ingest = FrameIngest::with_backoff(input_frames + 16, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
let mut received = Vec::with_capacity(input_frames);
let deadline = Duration::from_secs(10);
let start = tokio::time::Instant::now();
while received.len() < input_frames && start.elapsed() < deadline {
match timeout(Duration::from_millis(500), frames.recv()).await {
Ok(Ok(f)) => received.push(f),
Ok(Err(_)) => break,
Err(_) => {
if handle.frames_decoded_total() as usize == received.len() {
// No more frames are coming — the encoder may
// have produced fewer access units than input
// frames (rare with `tune=zerolatency` but
// possible). Stop waiting.
break;
}
}
}
}
handle.shutdown();
let _ = timeout(Duration::from_secs(2), task).await;
// Assert — backend selection (AC-2 negative direction): CUDA-less
// host MUST select Software.
assert_eq!(
handle.decoder_backend(),
Some(DecoderBackend::Software),
"host without h264_cuvid must fall back to Software"
);
// AC-1 — at least 95 % of input frames decoded.
let kept = received.len();
let min_required = (input_frames as f64 * 0.95).ceil() as usize;
assert!(
kept >= min_required,
"decoded {kept} frames; AC-1 requires ≥{min_required} of {input_frames} ({}%)",
(kept * 100) / input_frames
);
// AC-1 + AC-4 — sequence numbers strictly monotonic.
for w in received.windows(2) {
assert!(
w[0].seq < w[1].seq,
"seq must strictly increase: {} → {}",
w[0].seq,
w[1].seq
);
}
// AC-4 — capture timestamps strictly monotonic.
for w in received.windows(2) {
assert!(
w[0].capture_ts_monotonic_ns < w[1].capture_ts_monotonic_ns,
"capture_ts must strictly increase: {} → {}",
w[0].capture_ts_monotonic_ns,
w[1].capture_ts_monotonic_ns
);
}
// Decode timestamps must be at-or-after capture timestamps for
// every frame (decode happens after packet receipt by
// construction).
for f in &received {
assert!(
f.decode_ts_monotonic_ns >= f.capture_ts_monotonic_ns,
"decode_ts {} must be ≥ capture_ts {}",
f.decode_ts_monotonic_ns,
f.capture_ts_monotonic_ns
);
}
// First-frame cold-start metric was recorded.
assert!(
handle.decode_ms_first_frame().is_some(),
"decode_ms_first_frame must be populated after the first decode"
);
assert!(handle.decode_ms_p50().is_some(), "p50 must be populated");
assert!(handle.decode_ms_p99().is_some(), "p99 must be populated");
}
/// AC-2 (positive direction) — on a CUDA-capable host, the decoder
/// MUST select `DecoderBackend::Nvdec`. This test cannot run on the
/// Mac/Linux dev box (no CUDA-enabled FFmpeg), so it is `#[ignore]`d
/// by default and explicitly opt-in via `cargo test -- --ignored`
/// on a Jetson Orin Nano with the FFmpeg-cuda packages installed.
/// The negative direction (no CUDA → Software) is asserted both in
/// `internal::decoder::tests::ffmpeg_decoder_falls_back_to_software_on_macos_dev_host`
/// and in `ac1_ac4_software_decode_preserves_throughput_and_monotonicity`
/// above; together they pin the selection rule from both sides.
#[tokio::test]
#[ignore = "AC-2 positive: requires a CUDA-capable FFmpeg (h264_cuvid registered) — only runs on Jetson"]
async fn ac2_nvdec_backend_selected_on_cuda_host() {
// Arrange + Act
let dec = FfmpegDecoder::new(Codec::H264).expect("h264 decoder must open on Jetson");
// Assert
assert_eq!(
dec.backend(),
DecoderBackend::Nvdec,
"Jetson Orin Nano with CUDA-enabled FFmpeg MUST select NVDEC"
);
}
/// AC-3 — a corrupted packet between valid ones must be counted as
/// `decode_errors_total += 1` and the stream must keep producing
/// frames after it.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac3_corrupted_frame_is_counted_and_does_not_abort_stream() {
// Arrange — generate two synthetic streams, one for "before" and
// one for "after"; splice a garbage packet between them.
let width = 320u32;
let height = 240u32;
let mut script: Vec<Bytes> = synth_h264_stream(20, width, height);
let after = synth_h264_stream(20, width, height);
let pre_count = script.len();
// Corrupted packet: random bytes that are not a valid NAL unit.
// The decoder rejects them via `send_packet` (Annex-B start code
// missing) or `receive_frame` (parsed as an unsupported NAL
// type), either way returning an error from
// `FfmpegDecoder::decode`.
let garbage = Bytes::from_static(&[
0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0x12, 0x34, 0x56, 0x78,
]);
script.push(garbage);
script.extend(after);
let total_packets = script.len();
let transport = ScriptedBytesTransport::new(script);
let decoder = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open");
let ingest = FrameIngest::with_backoff(total_packets + 16, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act — drain frames until either we've collected enough to know
// post-error frames landed, or we time out.
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
let mut received_seqs: Vec<u64> = Vec::new();
let deadline = Duration::from_secs(10);
let start = tokio::time::Instant::now();
let target_frames = (pre_count + 5).min(35); // pre + a few post
while received_seqs.len() < target_frames && start.elapsed() < deadline {
match timeout(Duration::from_millis(500), frames.recv()).await {
Ok(Ok(f)) => received_seqs.push(f.seq),
Ok(Err(_)) => break,
Err(_) => {
if handle.decode_errors_total() == 0 && handle.frames_decoded_total() == 0 {
continue;
}
if (handle.frames_decoded_total() as usize) == received_seqs.len() {
break;
}
}
}
}
handle.shutdown();
let _ = timeout(Duration::from_secs(2), task).await;
// Assert — exactly one decode error (the garbage packet); valid
// frames continued to land afterwards.
assert_eq!(
handle.decode_errors_total(),
1,
"one corrupted packet must produce exactly one decode error"
);
assert!(
received_seqs.len() >= pre_count,
"must receive at least the pre-error frames ({pre_count}); got {}",
received_seqs.len()
);
// Frames sequence is monotonic across the corrupted packet.
for w in received_seqs.windows(2) {
assert!(
w[0] < w[1],
"seq must remain strictly monotonic across decode errors: {} → {}",
w[0],
w[1]
);
}
}