From 251ebed1c2505c894d30677df55f74634a8eabd0 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 20 May 2026 17:05:27 +0300 Subject: [PATCH] [AZ-658] frame_ingest H.264/265 decoder (NVDEC + sw fallback) Wires a real ffmpeg-next 8.1 decoder into the frame_ingest lifecycle loop. NVDEC is probed at runtime via h264_cuvid / hevc_cuvid; CUDA-less hosts transparently fall back to software h264 / hevc. Each decoded frame is stamped with capture_ts (taken at packet receipt) and decode_ts (taken after decode returns) so movement_detector sees accurate frame-arrival times. Single-frame decode errors are counted toward decode_errors_total and dropped; the stream is never aborted. Adds new public API on FrameIngestHandle: decoder_backend(), decode_errors_total(), frames_decoded_total(), decode_ms_first_frame(), decode_ms_p50(), decode_ms_p99(). Integration tests under crates/frame_ingest/tests/decoder_pipeline.rs cover AC-1, AC-3, AC-4 end-to-end through the real FfmpegDecoder using libx264-encoded synthetic streams; AC-2 positive (NVDEC selection) is opt-in via --ignored on a CUDA host. AZ-657 lifecycle tests retained via a StubDecoder. Co-authored-by: Cursor --- Cargo.lock | 124 +++- Cargo.toml | 5 + .../AZ-658_frame_ingest_decoder.md | 0 .../batch_16_cycle1_report.md | 91 +++ _docs/_autodev_state.md | 26 +- crates/frame_ingest/Cargo.toml | 6 + crates/frame_ingest/src/internal/decoder.rs | 610 ++++++++++++++++++ crates/frame_ingest/src/internal/mod.rs | 2 + crates/frame_ingest/src/internal/timestamp.rs | 153 +++++ crates/frame_ingest/src/lib.rs | 169 +++-- crates/frame_ingest/tests/decoder_pipeline.rs | 386 +++++++++++ crates/frame_ingest/tests/rtsp_lifecycle.rs | 59 +- 12 files changed, 1566 insertions(+), 65 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-658_frame_ingest_decoder.md (100%) create mode 100644 _docs/03_implementation/batch_16_cycle1_report.md create mode 100644 crates/frame_ingest/src/internal/decoder.rs create mode 100644 crates/frame_ingest/src/internal/timestamp.rs create mode 100644 crates/frame_ingest/tests/decoder_pipeline.rs diff --git a/Cargo.lock b/Cargo.lock index 5ae691f..4bdce49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,6 +273,24 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags 2.11.1", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -337,6 +355,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -361,6 +388,17 @@ dependencies = [ "windows-link", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.6.1" @@ -591,6 +629,31 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "ffmpeg-next" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c4bd5ab1ac61f29c634df1175d350ded29cf74c3c6d4f7030431a5ae3c7d5d" +dependencies = [ + "bitflags 2.11.1", + "ffmpeg-sys-next", + "libc", +] + +[[package]] +name = "ffmpeg-sys-next" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a314bc0e022a33a99567ed4bd2576bd58ffd8fcff7891c29194cfecc26a62547" +dependencies = [ + "bindgen", + "cc", + "libc", + "num_cpus", + "pkg-config", + "vcpkg", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -656,6 +719,8 @@ version = "0.1.0" dependencies = [ "async-trait", "bytes", + "ffmpeg-next", + "parking_lot", "serde", "shared", "thiserror 1.0.69", @@ -813,6 +878,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "h2" version = "0.4.14" @@ -1179,7 +1250,16 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" dependencies = [ - "nom", + "nom 8.0.0", +] + +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", ] [[package]] @@ -1255,6 +1335,16 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "libm" version = "0.2.16" @@ -1368,6 +1458,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1477,6 +1573,16 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nom" version = "8.0.0" @@ -1687,6 +1793,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "potential_utf" version = "0.1.5" @@ -1747,7 +1859,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -1768,7 +1880,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn", @@ -2948,6 +3060,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index c1f5ab1..165064f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,11 @@ libc = "0.2" # Geospatial h3o = "0.7" +# Multimedia (RTSP + H.264/265 decode for frame_ingest — see AZ-658). +# Linked dynamically against the host FFmpeg 8.x install (libavcodec / +# libavformat / libavutil / libswscale / libswresample) via pkg-config. +ffmpeg-next = "8.1" + # Test scaffolding wiremock = "0.6" tempfile = "3" diff --git a/_docs/02_tasks/todo/AZ-658_frame_ingest_decoder.md b/_docs/02_tasks/done/AZ-658_frame_ingest_decoder.md similarity index 100% rename from _docs/02_tasks/todo/AZ-658_frame_ingest_decoder.md rename to _docs/02_tasks/done/AZ-658_frame_ingest_decoder.md diff --git a/_docs/03_implementation/batch_16_cycle1_report.md b/_docs/03_implementation/batch_16_cycle1_report.md new file mode 100644 index 0000000..ee0b9a4 --- /dev/null +++ b/_docs/03_implementation/batch_16_cycle1_report.md @@ -0,0 +1,91 @@ +# Batch Report + +**Batch**: 16 +**Cycle**: 1 +**Tasks**: AZ-658 +**Date**: 2026-05-20 + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|---------------|-------|-------------|--------| +| AZ-658_frame_ingest_decoder | Done | 7 files | 24 passed, 1 ignored | 4/4 ACs covered | None | + +## AC Coverage map + +| AC | Test | File | Notes | +|----|------|------|-------| +| AC-1 software decode + ≥285/300 throughput + monotonic seq + `decoder_backend = "Software"` | `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | `crates/frame_ingest/tests/decoder_pipeline.rs` | 60-frame variant exercises the same software decode path; literal 1080p/10s NFR validated at deploy on Jetson per `description.md §8` | +| AC-2 NVDEC selected on Jetson | `ac2_nvdec_backend_selected_on_cuda_host` (`#[ignore]` — opt-in via `--ignored` on CUDA host) | same file | Negative direction (no CUDA → Software) covered both by the unit test `ffmpeg_decoder_falls_back_to_software_on_macos_dev_host` and by the AC-1 test; together they pin the selection rule from both sides | +| AC-3 single-frame error doesn't abort | `ac3_corrupted_frame_is_counted_and_does_not_abort_stream` | same file | Asserts `decode_errors_total == 1` after one garbage packet between valid streams; subsequent frames continue to land with strictly monotonic seq | +| AC-4 monotonic capture timestamps | rides on `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | same file | Asserts `capture_ts_monotonic_ns` strictly increases and `decode_ts ≥ capture_ts` for every frame | + +## AC Test Coverage: All covered (4/4 — AC-2 positive direction is `#[ignore]`d behind the Jetson prerequisite, which counts as covered per implement skill Step 8) +## Code Review Verdict: PASS_WITH_WARNINGS (self-review — see findings below) +## Auto-Fix Attempts: 0 (no findings escalated to auto-fix) +## Stuck Agents: None + +## Files modified + +``` +M Cargo.toml (workspace dep: ffmpeg-next = "8.1") +M crates/frame_ingest/Cargo.toml (deps: ffmpeg-next, parking_lot) +A crates/frame_ingest/src/internal/decoder.rs (NEW: trait + FfmpegDecoder + DecodeStats) +A crates/frame_ingest/src/internal/timestamp.rs (NEW: SeqCounter + FrameStamper) +M crates/frame_ingest/src/internal/mod.rs (+decoder, +timestamp modules) +M crates/frame_ingest/src/lib.rs (lifecycle loop now wires the decoder; new health/metric accessors) +A crates/frame_ingest/tests/decoder_pipeline.rs (NEW: AC-1, AC-2 ignored, AC-3, AC-4) +M crates/frame_ingest/tests/rtsp_lifecycle.rs (StubDecoder for AZ-657 lifecycle tests) +R _docs/02_tasks/todo/AZ-658_frame_ingest_decoder.md → _docs/02_tasks/done/... +``` + +## Notable design decisions + +1. **FFmpeg stack** — user picked `ffmpeg-next 8.1` (workspace-pinned to FFmpeg 8.1 already on the host). NVDEC is probed at runtime via `ffmpeg::codec::decoder::find_by_name("h264_cuvid")` / `"hevc_cuvid"`; on a CUDA-less host we transparently fall back to the software `h264` / `hevc` decoder. No feature flag — both code paths are always compiled. +2. **NV12 normalisation** — the decoder always emits NV12 (the canonical pixel format for downstream consumers per `description.md §3` and what NVDEC produces natively on Jetson). A reusable `sws_scale` context converts whatever the inner decoder returned (typically YUV420P from libx264 software, NV12 from NVDEC). Non-Send `SwsContext` is wrapped with `unsafe impl Send for FfmpegDecoder` — the safety justification (exclusive ownership by the spawned lifecycle task) is documented in `decoder.rs`. +3. **Stats** — `DecodeStats` is a lock-free counter set with a 1024-sample ring buffer behind `parking_lot::Mutex` for p50/p99 readout. Cold-start metric (`decode_ms_first_frame`) is recorded only on the first successful decode per session; subsequent calls are no-ops. +4. **Trait shape** — `FrameDecoder::decode(payload, out: &mut Vec)` instead of `Result` because FFmpeg may buffer encoded packets internally before producing any decoded frames (e.g. while assembling SPS/PPS for the first IDR). Zero, one, or many frames per call. +5. **Timestamp boundary** — capture timestamp + sequence number are taken **before** the decoder runs (the moment the lifecycle loop pulls the packet off the transport). `decode_ts_monotonic_ns` is read after the decoder returns. This matches `description.md §4` and gives `movement_detector` accurate frame-arrival timestamps for the telemetry-skew gate. + +## Self-review findings + +| # | Severity | Category | Location | Finding | Disposition | +|---|----------|----------|----------|---------|-------------| +| 1 | Low | Maintainability | `decoder.rs::is_eagain` | Detects EAGAIN by string-matching `Error` Display output rather than a typed errno. Reason: `ffmpeg-next` does not re-export the EAGAIN constant across its 4–8 versions in a stable shape. | Accepted as a small surface area (only used inside the decode loop); will be tightened when FFmpeg 9 changes the error variants. | +| 2 | Low | Architecture | `crates/autopilot/src/runtime.rs:84` | Pre-existing dead-code warning on `vlm_provider_name` — leftover entry exists. | Out of batch 16 scope (different component); leftover stays for the next batch that touches autopilot. | +| 3 | Info | Spec gap (out of scope) | `crates/frame_ingest/src/internal/rtsp_client.rs:5-12` | The AZ-657 author's docstring says "the full RTSP client is folded into AZ-658 alongside the decoder". The AZ-658 task spec **explicitly excludes** RTSP lifecycle ("Excluded: RTSP session lifecycle (task 18)"). The real production RTSP `RtspTransport` impl is therefore still TBD — it will be a separate follow-up task or wired during runtime composition. | Not a regression; not in AZ-658 scope. The Product Implementation Completeness Gate (Step 15) will surface this if the system needs it before final reporting. | + +## Test results + +``` +running 17 tests (frame_ingest unit + lib tests) +test result: ok. 17 passed; 0 failed; 0 ignored + +running 3 tests (tests/decoder_pipeline.rs) +test ac3_corrupted_frame_is_counted_and_does_not_abort_stream ... ok +test ac1_ac4_software_decode_preserves_throughput_and_monotonicity ... ok +test ac2_nvdec_backend_selected_on_cuda_host ... ignored, AC-2 positive: requires a CUDA-capable FFmpeg +test result: ok. 2 passed; 0 failed; 1 ignored + +running 5 tests (tests/rtsp_lifecycle.rs) +test result: ok. 5 passed; 0 failed; 0 ignored +``` + +## Quality gates + +- `cargo check --workspace --all-targets` → clean (only the documented pre-existing autopilot dead-code warning) +- `cargo clippy -p frame_ingest --all-targets -- -D warnings` → clean +- `cargo fmt -p frame_ingest --check` → clean + +## Next Batch + +Batch 17 candidates (ready by deps): +- AZ-680 `operator_bridge_command_dispatch` (3 pts) +- AZ-681 `operator_bridge_safety_and_bit_ack` (3 pts) +- AZ-659 `frame_ingest_publisher` (3 pts) — newly unblocked because AZ-658 is now in `done/` + +Suggested grouping: AZ-680 + AZ-681 (tightly coupled — both depend on AZ-678 operator_bridge command auth). AZ-659 fits a separate batch focused on the frame_ingest pipeline's tail. + +## Cumulative review cadence + +Last cumulative: batches 13–15 (`cumulative_review_batches_13-15_cycle1_report.md`). Next due: end of batch 18 (no cumulative review for batch 16). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 7439269..715ff0b 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -4,27 +4,27 @@ flow: greenfield step: 7 name: Implement -status: between-batches +status: in_progress sub_step: - phase: 0 - name: batch-16-select - detail: "" + phase: 11 + name: commit + detail: "batch 16 — AZ-658 awaiting commit + push approval" retry_count: 0 cycle: 1 tracker: jira ## Last Completed Batch -batch: 15 -commit: ccf929a -ticket: AZ-676 / AZ-677 / AZ-678 / AZ-679 -jira_status: In Testing (all 4 confirmed via read-back) -pushed_to: origin/dev -report: _docs/03_implementation/batch_15_cycle1_report.md +batch: 16 +commit: pending +ticket: AZ-658 +jira_status: In Progress (transition to In Testing pending commit) +pushed_to: pending +report: _docs/03_implementation/batch_16_cycle1_report.md cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md ## Process Leftovers -- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — C5 replay -- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — C6 fix recipe +- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — out-of-scope for batch 16 +- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — out-of-scope for batch 16 ## Cumulative Review Cadence -Last cumulative: batches 13–15 (just produced). Next due: end of batch 18. +Last cumulative: batches 13–15. Next due: end of batch 18. diff --git a/crates/frame_ingest/Cargo.toml b/crates/frame_ingest/Cargo.toml index 1dbba3f..4c5550c 100644 --- a/crates/frame_ingest/Cargo.toml +++ b/crates/frame_ingest/Cargo.toml @@ -15,6 +15,12 @@ async-trait = { workspace = true } thiserror = { workspace = true } bytes = { workspace = true } serde = { workspace = true } +parking_lot = { workspace = true } +# AZ-658: H.264/265 decode via FFmpeg (libavcodec). NVDEC support is +# probed at runtime by looking up `h264_cuvid` / `hevc_cuvid` through +# `ffmpeg::codec::decoder::find_by_name`; no separate feature flag is +# required. +ffmpeg-next = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/frame_ingest/src/internal/decoder.rs b/crates/frame_ingest/src/internal/decoder.rs new file mode 100644 index 0000000..a836629 --- /dev/null +++ b/crates/frame_ingest/src/internal/decoder.rs @@ -0,0 +1,610 @@ +//! AZ-658 — H.264/265 decoder with NVDEC primary + software fallback. +//! +//! This module owns the production decode path required by the task: +//! **real NVDEC binding when present, real software fallback always**. +//! Both code paths exist as production code (per task spec → Runtime +//! Completeness); the runtime selection between them is a startup +//! probe of FFmpeg's decoder registry, not a feature flag. +//! +//! ## Design +//! +//! The lifecycle loop in [`crate::lib::lifecycle_loop`] receives raw +//! RTSP payload bytes from the transport. Those bytes are: +//! +//! 1. NAL units in Annex-B format (start-code prefixed `00 00 00 01`) +//! when the transport is the production FFmpeg avformat-backed +//! client (avformat hands access-unit-aligned packets in Annex-B +//! by default for RTSP); or +//! 2. Whatever bytes a test transport pushes (the AZ-658 integration +//! test feeds a synthetic H.264 stream produced in-process). +//! +//! Either way the bytes are funnelled into [`FrameDecoder::decode`]. +//! Each call may produce **zero or more** decoded frames (the FFmpeg +//! API can buffer encoded packets internally before any decoded +//! frame is ready, e.g. while the SPS/PPS for the first IDR are +//! still being assembled), so the trait pushes results into an +//! out-buffer instead of returning a single `Result`. +//! +//! ## Backend selection +//! +//! Construction tries the NVDEC variants first. On a Jetson Orin +//! Nano with the FFmpeg-cuda packages installed, `find_by_name` +//! resolves `h264_cuvid` / `hevc_cuvid` and the decoder opens with +//! [`DecoderBackend::Nvdec`]. On a pure-CPU host (CI, this Mac dev +//! box) those names resolve to `None` and we fall back to the +//! software `h264` / `hevc` decoders → [`DecoderBackend::Software`]. +//! There is no manual override; deployments that want NVDEC must +//! ship a CUDA-capable FFmpeg. +//! +//! ## Stats +//! +//! `description.md §3` mandates `decode_ms_p50`, `decode_ms_p99`, +//! `decoder_backend`, `decode_errors_total`, plus a one-shot cold +//! start metric (`decode_ms_first_frame`). The lock-free +//! [`DecodeStats`] counter set is updated by the lifecycle loop; the +//! handle re-reads it on every `health()` call. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use ffmpeg_next as ffmpeg; +use parking_lot::Mutex; +use shared::models::frame::PixelFormat; +use thiserror::Error; + +/// Codec the lifecycle loop is decoding. Picked at session open from +/// the camera config (`RtspSessionConfig` carries the negotiated codec +/// once the production transport lands; for now the only consumer is +/// AZ-658 tests that always pass `Codec::H264`). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Codec { + H264, + Hevc, +} + +impl Codec { + fn nvdec_name(&self) -> &'static str { + match self { + Codec::H264 => "h264_cuvid", + Codec::Hevc => "hevc_cuvid", + } + } + + fn software_name(&self) -> &'static str { + match self { + Codec::H264 => "h264", + Codec::Hevc => "hevc", + } + } +} + +/// Which backend was selected at construction. Surfaced through +/// `FrameIngestHandle::decoder_backend()` so the operator UI and AC-2 +/// can verify the selection rule from outside the crate. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DecoderBackend { + Nvdec, + Software, +} + +/// Errors emitted by [`FrameDecoder::decode`]. The lifecycle loop +/// counts every variant towards `decode_errors_total` and continues +/// — single-frame decode errors must never abort the stream +/// (`description.md §6`, AC-3). +#[derive(Debug, Error)] +pub enum DecodeError { + #[error("send_packet failed: {0}")] + SendPacket(ffmpeg::Error), + #[error("receive_frame failed: {0}")] + ReceiveFrame(ffmpeg::Error), + #[error("unsupported decoded pixel format: {0:?}")] + UnsupportedPixelFormat(ffmpeg::format::Pixel), + #[error("decoded frame had zero dimensions")] + EmptyFrame, +} + +/// Errors emitted at decoder-construction time. The lifecycle loop +/// treats this as a hard-fail — a session whose codec we cannot open +/// at all is operationally identical to `OpenError::UnsupportedProfile` +/// and the FSM lands in `Failing { attempt: u32::MAX }`. +#[derive(Debug, Error)] +pub enum DecoderInitError { + #[error("FFmpeg init failed: {0}")] + FfmpegInit(ffmpeg::Error), + #[error("no FFmpeg decoder registered for {codec:?}")] + NoDecoderRegistered { codec: Codec }, + #[error("FFmpeg decoder open failed: {0}")] + OpenFailed(ffmpeg::Error), +} + +/// One decoded frame's worth of pixel data + its observed dimensions. +/// The lifecycle loop wraps this into a `shared::models::frame::Frame` +/// alongside the capture/decode timestamps from +/// [`crate::internal::timestamp::FrameStamper`]. +#[derive(Debug, Clone)] +pub struct DecodedPixels { + pub pixels: Bytes, + pub width: u32, + pub height: u32, + pub pix_fmt: PixelFormat, + /// Decode latency for THIS frame (decoder-internal, measured + /// across `send_packet + receive_frame`). Used by the stats + /// histogram; the lifecycle still computes its own + /// "capture → publish" latency separately for the §8 NFR. + pub decode_duration: Duration, +} + +/// Trait implemented by both the production [`FfmpegDecoder`] and +/// any test stub. The lifecycle loop holds it as +/// `Box`. +/// +/// Object-safe by construction: no generics, no `Self` returns. +pub trait FrameDecoder: Send { + fn backend(&self) -> DecoderBackend; + + /// Feed encoded bytes into the decoder. May produce zero or more + /// decoded frames (the FFmpeg API can hold a packet internally + /// while waiting for SPS/PPS or B-frame reorder buffers). + /// Decoded frames are pushed into `out`; the call returns + /// `Ok(())` when every frame the decoder could produce from + /// these bytes has been pushed. + /// + /// On error, `out` may be partially populated — frames pushed + /// before the error are still valid; the caller must drop the + /// failing packet but keep the decoder for the next call. + fn decode(&mut self, payload: &[u8], out: &mut Vec) -> Result<(), DecodeError>; +} + +/// FFmpeg-backed decoder. Holds the open `decoder::Video`, a sws +/// scaler that converts whatever pixel format the decoder produces +/// into NV12 (the canonical pixel format for downstream consumers), +/// and reusable scratch frames so each `decode` call avoids +/// allocation in the hot path. +pub struct FfmpegDecoder { + decoder: ffmpeg::decoder::Video, + backend: DecoderBackend, + /// Lazily constructed once we observe the decoder's output pixel + /// format on the first decoded frame. NV12 is the sentinel target + /// because Jetson NVDEC outputs NV12 natively and the operator + /// stream encoder expects NV12 (`description.md §3`). + scaler: Option, + raw: ffmpeg::frame::Video, + converted: ffmpeg::frame::Video, + in_packet: ffmpeg::codec::packet::Packet, +} + +impl FfmpegDecoder { + /// Construct a real decoder for `codec`. Tries `h264_cuvid` / + /// `hevc_cuvid` first; falls back to the software decoder if the + /// cuvid variant is not registered (no CUDA host) OR if it + /// fails to open (e.g. a CUDA-capable FFmpeg without a runtime + /// driver). On a fully missing software decoder we hard-fail. + pub fn new(codec: Codec) -> Result { + // `ffmpeg::init()` is idempotent and safe to call concurrently; + // the underlying `av_register_all` was removed in FFmpeg 4.0, + // so this just ensures the network init for RTSP is done. + ffmpeg::init().map_err(DecoderInitError::FfmpegInit)?; + + let (decoder, backend) = open_with_backend(codec)?; + Ok(Self { + decoder, + backend, + scaler: None, + raw: ffmpeg::frame::Video::empty(), + converted: ffmpeg::frame::Video::empty(), + in_packet: ffmpeg::codec::packet::Packet::empty(), + }) + } + + fn ensure_scaler( + &mut self, + src_fmt: ffmpeg::format::Pixel, + width: u32, + height: u32, + ) -> Result<&mut ffmpeg::software::scaling::Context, DecodeError> { + // Build / rebuild the scaler whenever the source format or + // dimensions change. NVDEC and software paths can both emit + // YUV420P or NV12 depending on the camera; we converge on + // NV12 for downstream consumers (`description.md §3`). + let needs_rebuild = match self.scaler.as_ref() { + None => true, + Some(s) => { + s.input().format != src_fmt + || s.input().width != width + || s.input().height != height + } + }; + if needs_rebuild { + let ctx = ffmpeg::software::scaling::Context::get( + src_fmt, + width, + height, + ffmpeg::format::Pixel::NV12, + width, + height, + ffmpeg::software::scaling::Flags::BILINEAR, + ) + .map_err(|e| { + // Scaler-build failure is reported as a per-frame + // decode error so the lifecycle counts it and drops + // the frame; if the same format keeps failing, the + // sustained `decode_errors_total` will surface + // through health. + DecodeError::ReceiveFrame(e) + })?; + self.scaler = Some(ctx); + } + Ok(self.scaler.as_mut().expect("just inserted")) + } +} + +fn open_with_backend( + codec: Codec, +) -> Result<(ffmpeg::decoder::Video, DecoderBackend), DecoderInitError> { + // Try NVDEC first. `find_by_name` resolves `None` on hosts where + // the cuvid decoder is not registered (the macOS dev box, CI + // without CUDA, etc.). + if let Some(nv) = ffmpeg::codec::decoder::find_by_name(codec.nvdec_name()) { + match try_open(nv) { + Ok(d) => { + tracing::info!( + backend = "nvdec", + codec = ?codec, + "frame_ingest decoder opened with NVDEC" + ); + return Ok((d, DecoderBackend::Nvdec)); + } + Err(e) => { + tracing::warn!( + error = %e, + codec = ?codec, + "NVDEC decoder registered but failed to open; falling back to software" + ); + } + } + } + let sw = ffmpeg::codec::decoder::find_by_name(codec.software_name()) + .ok_or(DecoderInitError::NoDecoderRegistered { codec })?; + let opened = try_open(sw)?; + tracing::info!( + backend = "software", + codec = ?codec, + "frame_ingest decoder opened with software fallback" + ); + Ok((opened, DecoderBackend::Software)) +} + +fn try_open(codec: ffmpeg::Codec) -> Result { + let ctx = ffmpeg::codec::Context::new(); + let opened = ctx + .decoder() + .open_as(codec) + .map_err(DecoderInitError::OpenFailed)?; + opened.video().map_err(DecoderInitError::OpenFailed) +} + +// SAFETY: +// `ffmpeg_next::software::scaling::Context` (sws scaler) wraps a +// `*mut SwsContext`, so the auto-trait analysis flags it `!Send`. +// FFmpeg's sws context is documented as **single-thread-owned** but +// safe to MOVE between threads as long as no two threads use the +// same instance concurrently (the same invariant Rust's `Send` +// expresses). The `FfmpegDecoder` is held inside `Box` and is *only* ever called from the spawned +// `lifecycle_loop` tokio task, which has exclusive `&mut`. No other +// task can observe the inner pointer; the `Send` here transfers +// ownership at construction (one thread builds the decoder, the +// spawned task is the sole subsequent user) — exactly the case +// `unsafe impl Send` is intended for. +unsafe impl Send for FfmpegDecoder {} + +impl FrameDecoder for FfmpegDecoder { + fn backend(&self) -> DecoderBackend { + self.backend + } + + fn decode(&mut self, payload: &[u8], out: &mut Vec) -> Result<(), DecodeError> { + let send_started = std::time::Instant::now(); + // FFmpeg requires the packet's data to outlive `send_packet`, + // so we copy here. The cost is one memcpy of NAL-unit bytes + // (typically <100 KB per packet at 1080p); negligible + // compared to the decode itself. + self.in_packet = ffmpeg::codec::packet::Packet::copy(payload); + self.decoder + .send_packet(&self.in_packet) + .map_err(DecodeError::SendPacket)?; + + loop { + match self.decoder.receive_frame(&mut self.raw) { + Ok(()) => { + let decode_duration = send_started.elapsed(); + let src_fmt = self.raw.format(); + let w = self.raw.width(); + let h = self.raw.height(); + if w == 0 || h == 0 { + return Err(DecodeError::EmptyFrame); + } + self.ensure_scaler(src_fmt, w, h)?; + let scaler = self.scaler.as_mut().expect("ensure_scaler set this"); + scaler + .run(&self.raw, &mut self.converted) + .map_err(DecodeError::ReceiveFrame)?; + let nv12_bytes = pack_nv12(&self.converted, w, h)?; + out.push(DecodedPixels { + pixels: nv12_bytes, + width: w, + height: h, + pix_fmt: PixelFormat::Nv12, + decode_duration, + }); + } + Err(e) => { + // FFmpeg returns EAGAIN (insufficient input) and + // EOF as `Error::Other` variants; those are + // expected control flow, not failures. We treat + // any other error as a per-frame error. + if is_eagain(&e) || is_eof(&e) { + return Ok(()); + } + return Err(DecodeError::ReceiveFrame(e)); + } + } + } + } +} + +fn is_eagain(err: &ffmpeg::Error) -> bool { + // FFmpeg's `ffmpeg-next` exposes EAGAIN as `Error::Other { errno: AVERROR(EAGAIN) }` + // — we identify it by string match because the constant isn't + // re-exported across crate versions. + let s = format!("{err}"); + s.contains("Resource temporarily unavailable") || s.contains("EAGAIN") +} + +fn is_eof(err: &ffmpeg::Error) -> bool { + matches!(err, ffmpeg::Error::Eof) +} + +/// Copy a planar NV12 frame's two planes (Y then UV) into a single +/// `Bytes` buffer of length `w*h + (w*h)/2`. Uses the frame's per- +/// plane stride (which can exceed `w` due to FFmpeg's alignment +/// padding) to avoid leaking that padding into the downstream +/// consumer-visible buffer. +fn pack_nv12(frame: &ffmpeg::frame::Video, width: u32, height: u32) -> Result { + let w = width as usize; + let h = height as usize; + let y_size = w * h; + let uv_size = (w * h) / 2; + let mut out = Vec::with_capacity(y_size + uv_size); + + let y_plane = frame.data(0); + let y_stride = frame.stride(0); + if y_stride < w { + return Err(DecodeError::EmptyFrame); + } + for row in 0..h { + let start = row * y_stride; + let end = start + w; + if end > y_plane.len() { + return Err(DecodeError::EmptyFrame); + } + out.extend_from_slice(&y_plane[start..end]); + } + let uv_plane = frame.data(1); + let uv_stride = frame.stride(1); + let uv_rows = h / 2; + if uv_stride < w { + return Err(DecodeError::EmptyFrame); + } + for row in 0..uv_rows { + let start = row * uv_stride; + let end = start + w; + if end > uv_plane.len() { + return Err(DecodeError::EmptyFrame); + } + out.extend_from_slice(&uv_plane[start..end]); + } + Ok(Bytes::from(out)) +} + +/// Lock-free counter set fed by the lifecycle loop on every decode +/// call. Mirrors the `description.md §3` health surface: +/// +/// - `decode_errors_total` — incremented on every failed decode. +/// - `first_frame_decode_duration_ns` — recorded once per session +/// open (set when the first successful decode lands; later writes +/// are no-ops). +/// - `recent_durations` — small ring buffer for p50/p99 readout. Kept +/// behind a `parking_lot::Mutex` because the operations are +/// batched (one push per frame) and the lock window is a single +/// array index update; the lifecycle loop runs in a single tokio +/// task so contention is bounded to "lifecycle vs. health-server +/// readout". +#[derive(Debug)] +pub struct DecodeStats { + pub decode_errors_total: AtomicU64, + pub first_frame_decode_duration_ns: AtomicU64, + pub frames_decoded_total: AtomicU64, + recent_durations_ns: Mutex, +} + +impl Default for DecodeStats { + fn default() -> Self { + Self::new() + } +} + +impl DecodeStats { + pub const RING_CAP: usize = 1024; + + pub fn new() -> Self { + Self { + decode_errors_total: AtomicU64::new(0), + first_frame_decode_duration_ns: AtomicU64::new(0), + frames_decoded_total: AtomicU64::new(0), + recent_durations_ns: Mutex::new(RingBuffer::new(Self::RING_CAP)), + } + } + + pub fn shared() -> Arc { + Arc::new(Self::new()) + } + + pub fn note_decode_error(&self) { + self.decode_errors_total.fetch_add(1, Ordering::Relaxed); + } + + pub fn note_decoded(&self, duration: Duration) { + let prev_count = self.frames_decoded_total.fetch_add(1, Ordering::Relaxed); + let ns = duration.as_nanos().min(u128::from(u64::MAX)) as u64; + if prev_count == 0 { + // Only the first writer sets the cold-start metric; all + // subsequent decodes are no-ops on this field. + self.first_frame_decode_duration_ns + .store(ns, Ordering::Relaxed); + } + self.recent_durations_ns.lock().push(ns); + } + + pub fn p50_ns(&self) -> Option { + self.percentile_ns(0.50) + } + + pub fn p99_ns(&self) -> Option { + self.percentile_ns(0.99) + } + + fn percentile_ns(&self, q: f64) -> Option { + let buf = self.recent_durations_ns.lock(); + if buf.len() == 0 { + return None; + } + let mut snap: Vec = buf.iter().collect(); + snap.sort_unstable(); + let idx = ((snap.len() as f64) * q).floor() as usize; + let idx = idx.min(snap.len() - 1); + Some(snap[idx]) + } +} + +#[derive(Debug)] +struct RingBuffer { + buf: Vec, + head: usize, + cap: usize, + /// Number of items that have actually been written. Saturates at + /// `cap` once the ring is full. + len: usize, +} + +impl RingBuffer { + fn new(cap: usize) -> Self { + Self { + buf: vec![0; cap], + head: 0, + cap, + len: 0, + } + } + + fn push(&mut self, v: u64) { + self.buf[self.head] = v; + self.head = (self.head + 1) % self.cap; + if self.len < self.cap { + self.len += 1; + } + } + + fn len(&self) -> usize { + self.len + } + + fn iter(&self) -> impl Iterator + '_ { + self.buf.iter().take(self.len).copied() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ffmpeg_decoder_falls_back_to_software_on_macos_dev_host() { + // Arrange — the macOS dev box ships ffmpeg without CUDA so + // `h264_cuvid` is not registered and the decoder must select + // Software. + let dec = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open"); + + // Assert + assert_eq!(dec.backend(), DecoderBackend::Software); + } + + #[test] + fn ring_buffer_tracks_recent_window() { + // Arrange + let mut r = RingBuffer::new(3); + + // Act + r.push(10); + r.push(20); + r.push(30); + r.push(40); + + // Assert — oldest entry was overwritten by the wrap. + let v: Vec = r.iter().collect(); + // After wrap-around, the in-buffer order is [40, 20, 30]. + // Iteration order is not promised by the buffer; what + // matters for percentile correctness is the SET of values. + let mut sorted = v.clone(); + sorted.sort_unstable(); + assert_eq!(sorted, vec![20, 30, 40]); + } + + #[test] + fn decode_stats_records_first_frame_duration_only_once() { + // Arrange + let s = DecodeStats::new(); + + // Act + s.note_decoded(Duration::from_millis(7)); + s.note_decoded(Duration::from_millis(99)); + + // Assert + assert_eq!( + s.first_frame_decode_duration_ns.load(Ordering::Relaxed), + Duration::from_millis(7).as_nanos() as u64, + "second decode must not overwrite first-frame metric" + ); + assert_eq!(s.frames_decoded_total.load(Ordering::Relaxed), 2); + } + + #[test] + fn decode_stats_p50_p99_reflect_sample_distribution() { + // Arrange + let s = DecodeStats::new(); + for i in 1..=100u64 { + s.note_decoded(Duration::from_millis(i)); + } + + // Act + let p50 = s.p50_ns().expect("non-empty"); + let p99 = s.p99_ns().expect("non-empty"); + + // Assert — 50th of 100 sorted ms-values is the 50th sample; + // 99th is the 99th sample. Allow ±1 ms slack for floor() + // index rounding. + assert!( + p50 >= Duration::from_millis(49).as_nanos() as u64 + && p50 <= Duration::from_millis(51).as_nanos() as u64, + "p50 = {p50}" + ); + assert!( + p99 >= Duration::from_millis(98).as_nanos() as u64 + && p99 <= Duration::from_millis(100).as_nanos() as u64, + "p99 = {p99}" + ); + } +} diff --git a/crates/frame_ingest/src/internal/mod.rs b/crates/frame_ingest/src/internal/mod.rs index 8f7aaf9..9ce5dba 100644 --- a/crates/frame_ingest/src/internal/mod.rs +++ b/crates/frame_ingest/src/internal/mod.rs @@ -1,4 +1,6 @@ //! Internal modules for `frame_ingest`. Not part of the public API. +pub mod decoder; pub mod lifecycle; pub mod rtsp_client; +pub mod timestamp; diff --git a/crates/frame_ingest/src/internal/timestamp.rs b/crates/frame_ingest/src/internal/timestamp.rs new file mode 100644 index 0000000..0d950d8 --- /dev/null +++ b/crates/frame_ingest/src/internal/timestamp.rs @@ -0,0 +1,153 @@ +//! AZ-658 — frame timestamping helpers. +//! +//! `description.md §4` requires every emitted [`Frame`] to carry a +//! monotonic capture timestamp stamped at the earliest practical +//! point in the pipeline (the moment the lifecycle loop receives an +//! RTSP packet from the transport). The decoder runs *after* that +//! point, so the [`Frame::decode_ts_monotonic_ns`] field records when +//! `FrameDecoder::decode` returned — the difference is the per-frame +//! decode latency that feeds the `decode_ms_p50` / `decode_ms_p99` / +//! `decode_ms_first_frame` health metrics. +//! +//! This module owns: +//! - [`SeqCounter`] — a strictly-monotonic `u64` sequence number used +//! as the frame's identity downstream of the decoder. Saturates at +//! `u64::MAX` so a session that never restarts cannot wrap and +//! produce duplicate IDs (saturating is preferred over wrapping +//! here because `movement_detector` keys per-frame state by `seq` +//! and a wrap would corrupt that map). +//! - [`FrameStamper`] — pairs a `MonoClock` and a `SeqCounter` so the +//! lifecycle loop has one place to read both timestamps for a +//! single packet → frame transition. + +use shared::clock::MonoClock; + +/// Strictly-monotonic frame sequence counter. Saturates at +/// `u64::MAX`; in practice a 30 fps stream takes ~19.5 billion years +/// to overflow `u64`, so saturation behaviour is observable only as a +/// post-condition for tests with `u64::MAX - 1` priming. +#[derive(Debug, Default)] +pub struct SeqCounter { + next: u64, +} + +impl SeqCounter { + pub fn new() -> Self { + Self { next: 0 } + } + + /// Returns the next sequence number and advances internal state. + /// Saturates at `u64::MAX` (subsequent calls keep returning + /// `u64::MAX`). Named `advance` rather than `next` so that the + /// type does not collide with `Iterator::next` semantics in + /// caller code (and to satisfy `clippy::should_implement_trait` + /// — `SeqCounter` is intentionally NOT an Iterator: an unbounded + /// monotonic counter has no natural `None` terminator). + pub fn advance(&mut self) -> u64 { + let s = self.next; + self.next = self.next.saturating_add(1); + s + } +} + +/// Holds a clock + sequence counter so the lifecycle loop only has +/// to call [`FrameStamper::capture`] (immediately on packet receipt) +/// and [`FrameStamper::decoded`] (immediately after decode returns) +/// to produce both monotonic timestamps for the next frame. +#[derive(Debug)] +pub struct FrameStamper { + clock: MonoClock, + seq: SeqCounter, +} + +impl FrameStamper { + pub fn new(clock: MonoClock) -> Self { + Self { + clock, + seq: SeqCounter::new(), + } + } + + /// Snapshot the capture-side timestamp + sequence number. Call + /// this the moment the transport hands us the packet, BEFORE + /// invoking the decoder. The capture timestamp is the head of + /// the per-frame latency budget (`description.md §8`: ≤30 ms p99 + /// from RTSP rx → publish on Jetson Orin Nano). + pub fn capture(&mut self) -> CaptureMark { + CaptureMark { + seq: self.seq.advance(), + ts_ns: self.clock.elapsed_ns(), + } + } + + /// Read the decode-side timestamp at the moment + /// `FrameDecoder::decode` returned. Used both for the emitted + /// `Frame::decode_ts_monotonic_ns` field and to compute + /// `decode_duration = decode_ts - capture_ts` for the histogram. + pub fn decoded(&self) -> u64 { + self.clock.elapsed_ns() + } +} + +/// One capture-side mark per packet. Carried through the decode call +/// so the emitted `Frame` keeps the timestamp from packet receipt, +/// not from after-decode. +#[derive(Debug, Clone, Copy)] +pub struct CaptureMark { + pub seq: u64, + pub ts_ns: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn seq_counter_is_strictly_monotonic() { + // Arrange + let mut c = SeqCounter::new(); + + // Act + let a = c.advance(); + let b = c.advance(); + let d = c.advance(); + + // Assert + assert_eq!(a, 0); + assert_eq!(b, 1); + assert_eq!(d, 2); + } + + #[test] + fn seq_counter_saturates_at_max_instead_of_wrapping() { + // Arrange — prime to u64::MAX - 1 by direct field assignment + // so the test runs in O(1). + let mut c = SeqCounter { next: u64::MAX - 1 }; + + // Act + let a = c.advance(); + let b = c.advance(); + let d = c.advance(); + + // Assert — once we hit MAX, every subsequent call must keep + // returning MAX (no wrap to 0). + assert_eq!(a, u64::MAX - 1); + assert_eq!(b, u64::MAX); + assert_eq!(d, u64::MAX); + } + + #[test] + fn frame_stamper_capture_advances_seq_and_ts() { + // Arrange + let mut s = FrameStamper::new(MonoClock::new()); + + // Act + let m1 = s.capture(); + let m2 = s.capture(); + + // Assert + assert_eq!(m1.seq, 0); + assert_eq!(m2.seq, 1); + assert!(m2.ts_ns >= m1.ts_ns, "monotonic clock went backwards"); + } +} diff --git a/crates/frame_ingest/src/lib.rs b/crates/frame_ingest/src/lib.rs index f05110d..5ca9b45 100644 --- a/crates/frame_ingest/src/lib.rs +++ b/crates/frame_ingest/src/lib.rs @@ -3,26 +3,22 @@ //! Real implementation lands in: //! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded //! reconnect + AI-lock plumb (this crate, modules in `internal/`). -//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode into raw -//! pixel buffers + retina/FFmpeg/GStreamer transport binding. +//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode (NVDEC + sw +//! fallback) + per-frame monotonic timestamping + decode stats +//! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`). //! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer //! drop policy. //! -//! ## AZ-657 surface +//! ## AZ-658 surface (extends AZ-657) //! -//! - [`FrameIngest::new`] — construct in `Closed` state. -//! - [`FrameIngest::run`] — spawn the lifecycle loop driving the given -//! `RtspTransport` through `connect → stream → reconnect` cycles -//! with bounded backoff. Returns a `JoinHandle`. -//! - [`FrameIngestHandle::subscribe`] — broadcast frame stream (the -//! AZ-657 lifecycle emits only synthetic header frames; real -//! decoded frames come in AZ-658). -//! - [`FrameIngestHandle::set_ai_lock`] — `bringCameraDown` / -//! `bringCameraUp` signal. Stamps `Frame.ai_locked` on every -//! subsequently emitted frame. -//! - [`FrameIngestHandle::session_state`] — current FSM state. -//! - [`FrameIngestHandle::health`] — `ComponentHealth` reflecting the -//! FSM state + `last_packet_age` + `ai_locked`. +//! `FrameIngest::run` now takes a [`FrameDecoder`]. The lifecycle loop +//! stamps the capture timestamp the moment a packet leaves the +//! transport, hands the encoded payload to the decoder, and emits one +//! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set +//! when the decoder returned. Single-frame decode errors increment +//! `decode_errors_total` and drop the frame; the stream is never +//! aborted (AC-3). The decoder backend (`Nvdec` / `Software`) is +//! observable via [`FrameIngestHandle::decoder_backend`]. use std::sync::atomic::Ordering; use std::sync::Arc; @@ -37,10 +33,15 @@ use shared::models::frame::Frame; pub mod internal; +pub use internal::decoder::{ + Codec, DecodeError, DecodeStats, DecodedPixels, DecoderBackend, DecoderInitError, + FfmpegDecoder, FrameDecoder, +}; pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState}; pub use internal::rtsp_client::{ OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError, }; +pub use internal::timestamp::FrameStamper; use internal::lifecycle::{transition, Trigger}; @@ -56,7 +57,9 @@ pub struct FrameIngest { ai_lock_tx: watch::Sender, state_tx: watch::Sender, shutdown_tx: watch::Sender, + backend_tx: watch::Sender>, stats: Arc, + decode_stats: Arc, backoff: BackoffPolicy, clock: MonoClock, } @@ -74,12 +77,15 @@ impl FrameIngest { let (ai_lock_tx, _) = watch::channel(false); let (state_tx, _) = watch::channel(SessionState::Closed); let (shutdown_tx, _) = watch::channel(false); + let (backend_tx, _) = watch::channel(None); Self { tx, ai_lock_tx, state_tx, shutdown_tx, + backend_tx, stats: LifecycleStats::new(), + decode_stats: DecodeStats::shared(), backoff, clock: MonoClock::new(), } @@ -91,36 +97,50 @@ impl FrameIngest { ai_lock_tx: self.ai_lock_tx.clone(), state_rx: self.state_tx.subscribe(), shutdown_tx: self.shutdown_tx.clone(), + backend_rx: self.backend_tx.subscribe(), stats: Arc::clone(&self.stats), + decode_stats: Arc::clone(&self.decode_stats), clock: self.clock, } } - /// Spawn the lifecycle loop. The returned handle resolves when - /// the loop exits (shutdown signalled via + /// Spawn the lifecycle loop. Returns a `JoinHandle` that resolves + /// when the loop exits (shutdown signalled via /// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM). - pub fn run(&self, transport: T, config: RtspSessionConfig) -> JoinHandle<()> + /// + /// `decoder` is owned exclusively by the spawned task; only one + /// decoder is active per `FrameIngest` instance. + pub fn run(&self, transport: T, decoder: D, config: RtspSessionConfig) -> JoinHandle<()> where T: RtspTransport + 'static, + D: FrameDecoder + 'static, { let tx = self.tx.clone(); let ai_lock = self.ai_lock_tx.subscribe(); let state_tx = self.state_tx.clone(); + let backend_tx = self.backend_tx.clone(); let shutdown_rx = self.shutdown_tx.subscribe(); let stats = Arc::clone(&self.stats); + let decode_stats = Arc::clone(&self.decode_stats); let backoff = self.backoff; let clock = self.clock; let transport = Arc::new(Mutex::new(transport)); + let decoder: Box = Box::new(decoder); + // Snapshot the decoder backend immediately so it is observable + // even before the first packet. + backend_tx.send_replace(Some(decoder.backend())); tokio::spawn(async move { lifecycle_loop( transport, + decoder, config, tx, ai_lock, state_tx, shutdown_rx, stats, + decode_stats, backoff, clock, ) @@ -136,19 +156,22 @@ fn is_shutdown(rx: &watch::Receiver) -> bool { #[allow(clippy::too_many_arguments)] async fn lifecycle_loop( transport: Arc>, + mut decoder: Box, config: RtspSessionConfig, tx: broadcast::Sender, mut ai_lock: watch::Receiver, state_tx: watch::Sender, mut shutdown_rx: watch::Receiver, stats: Arc, + decode_stats: Arc, backoff: BackoffPolicy, clock: MonoClock, ) where T: RtspTransport, { let mut state = SessionState::Closed; - let mut seq: u64 = 0; + let mut stamper = FrameStamper::new(clock); + let mut decoded_buffer: Vec = Vec::with_capacity(4); loop { if is_shutdown(&shutdown_rx) { @@ -203,29 +226,47 @@ async fn lifecycle_loop( match packet { Ok(pkt) => { - let now_ns = clock.elapsed_ns(); - stats.note_packet(now_ns); + // Capture timestamp + sequence number are + // taken at the EARLIEST point per + // `description.md §4` — before the decoder + // has run, so movement_detector's skew + // gate sees the original packet arrival + // time. + let mark = stamper.capture(); + stats.note_packet(mark.ts_ns); let locked = *ai_lock.borrow_and_update(); - // AZ-657 emits a synthetic frame envelope - // per inbound RTSP packet so the lifecycle - // FSM can be exercised end-to-end without - // the decoder (AZ-658 swaps this for the - // actual decoded frame). - let frame = Frame { - seq, - capture_ts_monotonic_ns: now_ns, - decode_ts_monotonic_ns: now_ns, - pixels: Arc::new(pkt.payload), - width: 0, - height: 0, - pix_fmt: shared::models::frame::PixelFormat::Nv12, - ai_locked: locked, - }; - seq = seq.saturating_add(1); - // A no-subscriber send is a no-op error in - // the broadcast channel; the lifecycle - // does not care. - let _ = tx.send(frame); + decoded_buffer.clear(); + match decoder.decode(&pkt.payload, &mut decoded_buffer) { + Ok(()) => { + for dp in decoded_buffer.drain(..) { + decode_stats.note_decoded(dp.decode_duration); + let frame = Frame { + seq: mark.seq, + capture_ts_monotonic_ns: mark.ts_ns, + decode_ts_monotonic_ns: stamper.decoded(), + pixels: Arc::new(dp.pixels), + width: dp.width, + height: dp.height, + pix_fmt: dp.pix_fmt, + ai_locked: locked, + }; + // Send errors are no-ops when + // the broadcast has no + // subscribers; per-consumer + // back-pressure is AZ-659's + // problem. + let _ = tx.send(frame); + } + } + Err(e) => { + decode_stats.note_decode_error(); + tracing::warn!( + error = %e, + seq = mark.seq, + "frame_ingest dropped a frame on decode error" + ); + } + } } Err(e) => { let trig = Trigger::from_stream_error(&e); @@ -272,7 +313,9 @@ pub struct FrameIngestHandle { ai_lock_tx: watch::Sender, state_rx: watch::Receiver, shutdown_tx: watch::Sender, + backend_rx: watch::Receiver>, stats: Arc, + decode_stats: Arc, clock: MonoClock, } @@ -314,6 +357,44 @@ impl FrameIngestHandle { self.stats.reopens_total.load(Ordering::Relaxed) } + /// Backend the active decoder selected at construction. `None` + /// before `FrameIngest::run` has been called. + pub fn decoder_backend(&self) -> Option { + *self.backend_rx.borrow() + } + + pub fn decode_errors_total(&self) -> u64 { + self.decode_stats + .decode_errors_total + .load(Ordering::Relaxed) + } + + pub fn frames_decoded_total(&self) -> u64 { + self.decode_stats + .frames_decoded_total + .load(Ordering::Relaxed) + } + + pub fn decode_ms_first_frame(&self) -> Option { + let ns = self + .decode_stats + .first_frame_decode_duration_ns + .load(Ordering::Relaxed); + if ns == 0 && self.frames_decoded_total() == 0 { + None + } else { + Some(Duration::from_nanos(ns)) + } + } + + pub fn decode_ms_p50(&self) -> Option { + self.decode_stats.p50_ns().map(Duration::from_nanos) + } + + pub fn decode_ms_p99(&self) -> Option { + self.decode_stats.p99_ns().map(Duration::from_nanos) + } + /// Request the lifecycle loop to drain to `Closed` and exit. The /// loop races every transport call against this signal, so a /// hung transport cannot wedge graceful exit. @@ -366,6 +447,10 @@ mod tests { let h = FrameIngest::new(8).handle(); assert_eq!(h.session_state(), SessionState::Closed); assert_eq!(h.health().level, HealthLevel::Disabled); + assert!( + h.decoder_backend().is_none(), + "no decoder is wired until run() is called" + ); } #[test] diff --git a/crates/frame_ingest/tests/decoder_pipeline.rs b/crates/frame_ingest/tests/decoder_pipeline.rs new file mode 100644 index 0000000..c8f1d55 --- /dev/null +++ b/crates/frame_ingest/tests/decoder_pipeline.rs @@ -0,0 +1,386 @@ +//! AZ-658 — decoder pipeline integration tests. +//! +//! These tests drive the **real** [`FfmpegDecoder`] (libavcodec) end +//! to end through the lifecycle loop. A synthetic H.264 bitstream is +//! produced in-process by libx264 (the same FFmpeg install that +//! `FfmpegDecoder` uses to decode), so the tests exercise the +//! production decode path rather than a stub. +//! +//! ACs covered here: +//! - AC-1 — software-path throughput preservation (≥95 % of input +//! frames decoded; sequence numbers strictly monotonic; decoder +//! backend reports `Software` on a CUDA-less host). +//! - AC-3 — a single corrupted "packet" between valid ones must +//! increment `decode_errors_total` exactly once and NOT abort the +//! stream. +//! - AC-4 — `capture_ts_monotonic_ns` is strictly increasing across +//! the emitted frame stream (rides on AC-1's setup). +//! +//! AC-2 (NVDEC selection on Jetson) cannot be exercised here — there +//! is no CUDA-capable FFmpeg on the dev/CI host. The unit-test +//! counterpart in `internal/decoder.rs::tests` asserts the negative +//! direction (CUDA-less host → Software backend); the positive +//! direction is validated on the Jetson at deployment time and is +//! covered by the Run Tests gate downstream of this batch. + +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use ffmpeg_next as ffmpeg; +use tokio::sync::Mutex as AsyncMutex; +use tokio::time::timeout; + +use frame_ingest::{ + BackoffPolicy, Codec, DecoderBackend, FfmpegDecoder, FrameDecoder, FrameIngest, OpenError, + RtspPacket, RtspSessionConfig, RtspTransport, StreamError, +}; + +/// Synthetic H.264 bitstream generator. Encodes `num_frames` frames +/// of a checkerboard pattern at `width`x`height` and 30 fps with +/// libx264 (preset `ultrafast`, tune `zerolatency`, GOP every 30 +/// frames so each test run gets a few IDRs). Returns a vector of +/// per-AVPacket byte blobs, each ready to feed into the decoder as +/// the payload of an `RtspPacket`. +fn synth_h264_stream(num_frames: usize, width: u32, height: u32) -> Vec { + ffmpeg::init().expect("ffmpeg init"); + let codec = ffmpeg::codec::encoder::find_by_name("libx264") + .or_else(|| ffmpeg::codec::encoder::find_by_name("h264")) + .expect("an H.264 encoder must be registered"); + + let context = ffmpeg::codec::Context::new_with_codec(codec); + let mut encoder = context + .encoder() + .video() + .expect("encoder context yields video"); + encoder.set_width(width); + encoder.set_height(height); + encoder.set_format(ffmpeg::format::Pixel::YUV420P); + encoder.set_time_base(ffmpeg::Rational::new(1, 30)); + encoder.set_frame_rate(Some(ffmpeg::Rational::new(30, 1))); + encoder.set_gop(30); + encoder.set_max_b_frames(0); + + let mut opts = ffmpeg::Dictionary::new(); + opts.set("preset", "ultrafast"); + opts.set("tune", "zerolatency"); + let mut opened = encoder + .open_with(opts) + .expect("libx264 encoder must open with ultrafast/zerolatency"); + + let mut out = Vec::with_capacity(num_frames + 4); + let mut packet = ffmpeg::Packet::empty(); + + for i in 0..num_frames { + let mut input = ffmpeg::frame::Video::new(ffmpeg::format::Pixel::YUV420P, width, height); + // Fill Y plane with a per-frame gradient so the encoder has + // motion to compress (a constant frame is degenerate and + // libx264 can choose to emit zero packets for some inputs). + let y_stride = input.stride(0); + let y = input.data_mut(0); + for row in 0..height as usize { + let v = ((i + row) & 0xFF) as u8; + for col in 0..width as usize { + y[row * y_stride + col] = v ^ ((col & 0xFF) as u8); + } + } + for plane in 1..=2 { + let stride = input.stride(plane); + let data = input.data_mut(plane); + for row in 0..(height as usize) / 2 { + for col in 0..(width as usize) / 2 { + data[row * stride + col] = 128; + } + } + } + input.set_pts(Some(i as i64)); + opened + .send_frame(&input) + .unwrap_or_else(|e| panic!("encoder send_frame ({i}) failed: {e}")); + while opened.receive_packet(&mut packet).is_ok() { + if let Some(d) = packet.data() { + out.push(Bytes::copy_from_slice(d)); + } + } + } + opened.send_eof().expect("encoder eof"); + while opened.receive_packet(&mut packet).is_ok() { + if let Some(d) = packet.data() { + out.push(Bytes::copy_from_slice(d)); + } + } + assert!( + !out.is_empty(), + "synthetic encoder must produce at least one packet" + ); + out +} + +/// RTSP-shaped transport that replays a pre-built script of byte +/// blobs, then parks (so the FrameIngest task stays in `Streaming` +/// until the test calls `shutdown`). When the script is exhausted, +/// `next_packet` returns a parked future — the lifecycle loop's +/// `tokio::select!` against the shutdown watch is what unblocks +/// teardown. +struct ScriptedBytesTransport { + queue: Arc>>, +} + +#[derive(Debug, Clone)] +enum ScriptItem { + Bytes(Bytes), +} + +impl ScriptedBytesTransport { + fn new(packets: Vec) -> Self { + let queue = packets + .into_iter() + .map(ScriptItem::Bytes) + .collect::>(); + Self { + queue: Arc::new(AsyncMutex::new(queue)), + } + } +} + +#[async_trait] +impl RtspTransport for ScriptedBytesTransport { + async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> { + Ok(()) + } + + async fn close(&mut self) {} + + async fn next_packet(&mut self) -> Result { + loop { + let item = { + let mut q = self.queue.lock().await; + q.pop_front() + }; + match item { + Some(ScriptItem::Bytes(b)) => { + return Ok(RtspPacket { + timestamp_rtp: 0, + payload: b, + }); + } + None => { + // Park forever; the lifecycle loop's shutdown + // watch breaks us out via select!. + std::future::pending::<()>().await; + } + } + } + } +} + +fn fast_backoff() -> BackoffPolicy { + BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40)) +} + +/// AC-1 + AC-4 — a software-decoded synthetic stream must preserve +/// at least 95 % of input frames and stamp them with strictly +/// monotonic capture timestamps + sequence numbers. The dev/CI host +/// has no CUDA so backend MUST report `Software`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ac1_ac4_software_decode_preserves_throughput_and_monotonicity() { + // Arrange — encode 60 frames (2 s of 30 fps content). The AC's + // literal 1080p / 10 s budget is validated against the real + // camera at deploy; the dev test exercises the same code path + // at smaller scale to keep CI <5 s. + let width = 320u32; + let height = 240u32; + let input_frames = 60usize; + let stream = synth_h264_stream(input_frames, width, height); + assert!( + stream.len() >= input_frames - 5, + "encoder produced {} packets for {input_frames} frames; expected ~1:1", + stream.len() + ); + + let transport = ScriptedBytesTransport::new(stream); + let decoder = + FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open on this host"); + let ingest = FrameIngest::with_backoff(input_frames + 16, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + + // Act + let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0")); + let mut received = Vec::with_capacity(input_frames); + let deadline = Duration::from_secs(10); + let start = tokio::time::Instant::now(); + while received.len() < input_frames && start.elapsed() < deadline { + match timeout(Duration::from_millis(500), frames.recv()).await { + Ok(Ok(f)) => received.push(f), + Ok(Err(_)) => break, + Err(_) => { + if handle.frames_decoded_total() as usize == received.len() { + // No more frames are coming — the encoder may + // have produced fewer access units than input + // frames (rare with `tune=zerolatency` but + // possible). Stop waiting. + break; + } + } + } + } + handle.shutdown(); + let _ = timeout(Duration::from_secs(2), task).await; + + // Assert — backend selection (AC-2 negative direction): CUDA-less + // host MUST select Software. + assert_eq!( + handle.decoder_backend(), + Some(DecoderBackend::Software), + "host without h264_cuvid must fall back to Software" + ); + + // AC-1 — at least 95 % of input frames decoded. + let kept = received.len(); + let min_required = (input_frames as f64 * 0.95).ceil() as usize; + assert!( + kept >= min_required, + "decoded {kept} frames; AC-1 requires ≥{min_required} of {input_frames} ({}%)", + (kept * 100) / input_frames + ); + + // AC-1 + AC-4 — sequence numbers strictly monotonic. + for w in received.windows(2) { + assert!( + w[0].seq < w[1].seq, + "seq must strictly increase: {} → {}", + w[0].seq, + w[1].seq + ); + } + // AC-4 — capture timestamps strictly monotonic. + for w in received.windows(2) { + assert!( + w[0].capture_ts_monotonic_ns < w[1].capture_ts_monotonic_ns, + "capture_ts must strictly increase: {} → {}", + w[0].capture_ts_monotonic_ns, + w[1].capture_ts_monotonic_ns + ); + } + // Decode timestamps must be at-or-after capture timestamps for + // every frame (decode happens after packet receipt by + // construction). + for f in &received { + assert!( + f.decode_ts_monotonic_ns >= f.capture_ts_monotonic_ns, + "decode_ts {} must be ≥ capture_ts {}", + f.decode_ts_monotonic_ns, + f.capture_ts_monotonic_ns + ); + } + // First-frame cold-start metric was recorded. + assert!( + handle.decode_ms_first_frame().is_some(), + "decode_ms_first_frame must be populated after the first decode" + ); + assert!(handle.decode_ms_p50().is_some(), "p50 must be populated"); + assert!(handle.decode_ms_p99().is_some(), "p99 must be populated"); +} + +/// AC-2 (positive direction) — on a CUDA-capable host, the decoder +/// MUST select `DecoderBackend::Nvdec`. This test cannot run on the +/// Mac/Linux dev box (no CUDA-enabled FFmpeg), so it is `#[ignore]`d +/// by default and explicitly opt-in via `cargo test -- --ignored` +/// on a Jetson Orin Nano with the FFmpeg-cuda packages installed. +/// The negative direction (no CUDA → Software) is asserted both in +/// `internal::decoder::tests::ffmpeg_decoder_falls_back_to_software_on_macos_dev_host` +/// and in `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` +/// above; together they pin the selection rule from both sides. +#[tokio::test] +#[ignore = "AC-2 positive: requires a CUDA-capable FFmpeg (h264_cuvid registered) — only runs on Jetson"] +async fn ac2_nvdec_backend_selected_on_cuda_host() { + // Arrange + Act + let dec = FfmpegDecoder::new(Codec::H264).expect("h264 decoder must open on Jetson"); + + // Assert + assert_eq!( + dec.backend(), + DecoderBackend::Nvdec, + "Jetson Orin Nano with CUDA-enabled FFmpeg MUST select NVDEC" + ); +} + +/// AC-3 — a corrupted packet between valid ones must be counted as +/// `decode_errors_total += 1` and the stream must keep producing +/// frames after it. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ac3_corrupted_frame_is_counted_and_does_not_abort_stream() { + // Arrange — generate two synthetic streams, one for "before" and + // one for "after"; splice a garbage packet between them. + let width = 320u32; + let height = 240u32; + let mut script: Vec = synth_h264_stream(20, width, height); + let after = synth_h264_stream(20, width, height); + let pre_count = script.len(); + // Corrupted packet: random bytes that are not a valid NAL unit. + // The decoder rejects them via `send_packet` (Annex-B start code + // missing) or `receive_frame` (parsed as an unsupported NAL + // type), either way returning an error from + // `FfmpegDecoder::decode`. + let garbage = Bytes::from_static(&[ + 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0x12, 0x34, 0x56, 0x78, + ]); + script.push(garbage); + script.extend(after); + let total_packets = script.len(); + + let transport = ScriptedBytesTransport::new(script); + let decoder = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open"); + let ingest = FrameIngest::with_backoff(total_packets + 16, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + + // Act — drain frames until either we've collected enough to know + // post-error frames landed, or we time out. + let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0")); + let mut received_seqs: Vec = Vec::new(); + let deadline = Duration::from_secs(10); + let start = tokio::time::Instant::now(); + let target_frames = (pre_count + 5).min(35); // pre + a few post + while received_seqs.len() < target_frames && start.elapsed() < deadline { + match timeout(Duration::from_millis(500), frames.recv()).await { + Ok(Ok(f)) => received_seqs.push(f.seq), + Ok(Err(_)) => break, + Err(_) => { + if handle.decode_errors_total() == 0 && handle.frames_decoded_total() == 0 { + continue; + } + if (handle.frames_decoded_total() as usize) == received_seqs.len() { + break; + } + } + } + } + handle.shutdown(); + let _ = timeout(Duration::from_secs(2), task).await; + + // Assert — exactly one decode error (the garbage packet); valid + // frames continued to land afterwards. + assert_eq!( + handle.decode_errors_total(), + 1, + "one corrupted packet must produce exactly one decode error" + ); + assert!( + received_seqs.len() >= pre_count, + "must receive at least the pre-error frames ({pre_count}); got {}", + received_seqs.len() + ); + // Frames sequence is monotonic across the corrupted packet. + for w in received_seqs.windows(2) { + assert!( + w[0] < w[1], + "seq must remain strictly monotonic across decode errors: {} → {}", + w[0], + w[1] + ); + } +} diff --git a/crates/frame_ingest/tests/rtsp_lifecycle.rs b/crates/frame_ingest/tests/rtsp_lifecycle.rs index 4af4289..cebec20 100644 --- a/crates/frame_ingest/tests/rtsp_lifecycle.rs +++ b/crates/frame_ingest/tests/rtsp_lifecycle.rs @@ -17,9 +17,34 @@ use tokio::sync::mpsc; use tokio::time::{timeout, Instant}; use frame_ingest::{ - BackoffPolicy, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, - SessionState, StreamError, + BackoffPolicy, DecodeError, DecodedPixels, DecoderBackend, FrameDecoder, FrameIngest, + OpenError, RtspPacket, RtspSessionConfig, RtspTransport, SessionState, StreamError, }; +use shared::models::frame::PixelFormat; + +/// Test-only decoder that pushes one synthetic `DecodedPixels` per +/// call. Used by the AZ-657 lifecycle tests, which verify FSM / +/// reconnect / AI-lock semantics — they don't care what pixels the +/// decoder produced. The production decoder path is exercised +/// separately by `decoder_pipeline.rs` (AZ-658). +struct StubDecoder; + +impl FrameDecoder for StubDecoder { + fn backend(&self) -> DecoderBackend { + DecoderBackend::Software + } + + fn decode(&mut self, payload: &[u8], out: &mut Vec) -> Result<(), DecodeError> { + out.push(DecodedPixels { + pixels: Bytes::copy_from_slice(payload), + width: 320, + height: 240, + pix_fmt: PixelFormat::Nv12, + decode_duration: Duration::from_micros(100), + }); + Ok(()) + } +} #[derive(Debug, Clone)] enum Scripted { @@ -158,7 +183,11 @@ async fn ac1_open_succeeds_and_session_reaches_streaming() { let mut frames = handle.subscribe(); // Act - let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let task = ingest.run( + transport, + StubDecoder, + RtspSessionConfig::new("rtsp://fake/0"), + ); let first = timeout(Duration::from_secs(1), frames.recv()) .await .expect("frame within 1 s") @@ -197,7 +226,11 @@ async fn ac2_bounded_reconnect_recovers_after_transient_failure() { let started = Instant::now(); // Act - let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let task = ingest.run( + transport, + StubDecoder, + RtspSessionConfig::new("rtsp://fake/0"), + ); let _ = timeout(Duration::from_secs(2), frames.recv()) .await .expect("frame within 2 s") @@ -233,7 +266,11 @@ async fn ac2b_stream_drop_increments_reopens_total() { let mut frames = handle.subscribe(); // Act - let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let task = ingest.run( + transport, + StubDecoder, + RtspSessionConfig::new("rtsp://fake/0"), + ); let _ = timeout(Duration::from_secs(1), frames.recv()) .await .expect("first frame") @@ -268,7 +305,11 @@ async fn ac3_unsupported_profile_hard_fails_session() { let handle = ingest.handle(); // Act - let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let task = ingest.run( + transport, + StubDecoder, + RtspSessionConfig::new("rtsp://fake/0"), + ); let _ = timeout(Duration::from_secs(1), task) .await .expect("lifecycle loop exits on hard-fail"); @@ -295,7 +336,11 @@ async fn ac4_ai_lock_toggle_propagates_to_frames() { let mut frames = handle.subscribe(); // Act - let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let task = ingest.run( + transport, + StubDecoder, + RtspSessionConfig::new("rtsp://fake/0"), + ); let f1 = timeout(Duration::from_secs(1), frames.recv()) .await .expect("first frame")