[AZ-658] frame_ingest H.264/265 decoder (NVDEC + sw fallback)

Wires a real ffmpeg-next 8.1 decoder into the frame_ingest lifecycle
loop. NVDEC is probed at runtime via h264_cuvid / hevc_cuvid; CUDA-less
hosts transparently fall back to software h264 / hevc. Each decoded
frame is stamped with capture_ts (taken at packet receipt) and
decode_ts (taken after decode returns) so movement_detector sees
accurate frame-arrival times. Single-frame decode errors are counted
toward decode_errors_total and dropped; the stream is never aborted.

Adds new public API on FrameIngestHandle: decoder_backend(),
decode_errors_total(), frames_decoded_total(), decode_ms_first_frame(),
decode_ms_p50(), decode_ms_p99(). Integration tests under
crates/frame_ingest/tests/decoder_pipeline.rs cover AC-1, AC-3, AC-4
end-to-end through the real FfmpegDecoder using libx264-encoded
synthetic streams; AC-2 positive (NVDEC selection) is opt-in via
--ignored on a CUDA host. AZ-657 lifecycle tests retained via a
StubDecoder.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 17:05:27 +03:00
parent c1558ac5c3
commit 251ebed1c2
12 changed files with 1566 additions and 65 deletions
Generated
+121 -3
View File
@@ -273,6 +273,24 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bindgen"
version = "0.72.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895"
dependencies = [
"bitflags 2.11.1",
"cexpr",
"clang-sys",
"itertools 0.13.0",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn",
]
[[package]] [[package]]
name = "bit-set" name = "bit-set"
version = "0.5.3" version = "0.5.3"
@@ -337,6 +355,15 @@ dependencies = [
"shlex", "shlex",
] ]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom 7.1.3",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.4" version = "1.0.4"
@@ -361,6 +388,17 @@ dependencies = [
"windows-link", "windows-link",
] ]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.6.1" version = "4.6.1"
@@ -591,6 +629,31 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "ffmpeg-next"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c4bd5ab1ac61f29c634df1175d350ded29cf74c3c6d4f7030431a5ae3c7d5d"
dependencies = [
"bitflags 2.11.1",
"ffmpeg-sys-next",
"libc",
]
[[package]]
name = "ffmpeg-sys-next"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a314bc0e022a33a99567ed4bd2576bd58ffd8fcff7891c29194cfecc26a62547"
dependencies = [
"bindgen",
"cc",
"libc",
"num_cpus",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "find-msvc-tools" name = "find-msvc-tools"
version = "0.1.9" version = "0.1.9"
@@ -656,6 +719,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"ffmpeg-next",
"parking_lot",
"serde", "serde",
"shared", "shared",
"thiserror 1.0.69", "thiserror 1.0.69",
@@ -813,6 +878,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.14" version = "0.4.14"
@@ -1179,7 +1250,16 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46"
dependencies = [ dependencies = [
"nom", "nom 8.0.0",
]
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
] ]
[[package]] [[package]]
@@ -1255,6 +1335,16 @@ version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]]
name = "libloading"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55"
dependencies = [
"cfg-if",
"windows-link",
]
[[package]] [[package]]
name = "libm" name = "libm"
version = "0.2.16" version = "0.2.16"
@@ -1368,6 +1458,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.8.9" version = "0.8.9"
@@ -1477,6 +1573,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "8.0.0" version = "8.0.0"
@@ -1687,6 +1793,12 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]] [[package]]
name = "potential_utf" name = "potential_utf"
version = "0.1.5" version = "0.1.5"
@@ -1747,7 +1859,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
dependencies = [ dependencies = [
"heck", "heck",
"itertools", "itertools 0.14.0",
"log", "log",
"multimap", "multimap",
"petgraph", "petgraph",
@@ -1768,7 +1880,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools", "itertools 0.14.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn",
@@ -2948,6 +3060,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.5" version = "0.9.5"
+5
View File
@@ -87,6 +87,11 @@ libc = "0.2"
# Geospatial # Geospatial
h3o = "0.7" h3o = "0.7"
# Multimedia (RTSP + H.264/265 decode for frame_ingest — see AZ-658).
# Linked dynamically against the host FFmpeg 8.x install (libavcodec /
# libavformat / libavutil / libswscale / libswresample) via pkg-config.
ffmpeg-next = "8.1"
# Test scaffolding # Test scaffolding
wiremock = "0.6" wiremock = "0.6"
tempfile = "3" tempfile = "3"
@@ -0,0 +1,91 @@
# Batch Report
**Batch**: 16
**Cycle**: 1
**Tasks**: AZ-658
**Date**: 2026-05-20
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|---------------|-------|-------------|--------|
| AZ-658_frame_ingest_decoder | Done | 7 files | 24 passed, 1 ignored | 4/4 ACs covered | None |
## AC Coverage map
| AC | Test | File | Notes |
|----|------|------|-------|
| AC-1 software decode + ≥285/300 throughput + monotonic seq + `decoder_backend = "Software"` | `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | `crates/frame_ingest/tests/decoder_pipeline.rs` | 60-frame variant exercises the same software decode path; literal 1080p/10s NFR validated at deploy on Jetson per `description.md §8` |
| AC-2 NVDEC selected on Jetson | `ac2_nvdec_backend_selected_on_cuda_host` (`#[ignore]` — opt-in via `--ignored` on CUDA host) | same file | Negative direction (no CUDA → Software) covered both by the unit test `ffmpeg_decoder_falls_back_to_software_on_macos_dev_host` and by the AC-1 test; together they pin the selection rule from both sides |
| AC-3 single-frame error doesn't abort | `ac3_corrupted_frame_is_counted_and_does_not_abort_stream` | same file | Asserts `decode_errors_total == 1` after one garbage packet between valid streams; subsequent frames continue to land with strictly monotonic seq |
| AC-4 monotonic capture timestamps | rides on `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | same file | Asserts `capture_ts_monotonic_ns` strictly increases and `decode_ts ≥ capture_ts` for every frame |
## AC Test Coverage: All covered (4/4 — AC-2 positive direction is `#[ignore]`d behind the Jetson prerequisite, which counts as covered per implement skill Step 8)
## Code Review Verdict: PASS_WITH_WARNINGS (self-review — see findings below)
## Auto-Fix Attempts: 0 (no findings escalated to auto-fix)
## Stuck Agents: None
## Files modified
```
M Cargo.toml (workspace dep: ffmpeg-next = "8.1")
M crates/frame_ingest/Cargo.toml (deps: ffmpeg-next, parking_lot)
A crates/frame_ingest/src/internal/decoder.rs (NEW: trait + FfmpegDecoder + DecodeStats)
A crates/frame_ingest/src/internal/timestamp.rs (NEW: SeqCounter + FrameStamper)
M crates/frame_ingest/src/internal/mod.rs (+decoder, +timestamp modules)
M crates/frame_ingest/src/lib.rs (lifecycle loop now wires the decoder; new health/metric accessors)
A crates/frame_ingest/tests/decoder_pipeline.rs (NEW: AC-1, AC-2 ignored, AC-3, AC-4)
M crates/frame_ingest/tests/rtsp_lifecycle.rs (StubDecoder for AZ-657 lifecycle tests)
R _docs/02_tasks/todo/AZ-658_frame_ingest_decoder.md → _docs/02_tasks/done/...
```
## Notable design decisions
1. **FFmpeg stack** — user picked `ffmpeg-next 8.1` (workspace-pinned to FFmpeg 8.1 already on the host). NVDEC is probed at runtime via `ffmpeg::codec::decoder::find_by_name("h264_cuvid")` / `"hevc_cuvid"`; on a CUDA-less host we transparently fall back to the software `h264` / `hevc` decoder. No feature flag — both code paths are always compiled.
2. **NV12 normalisation** — the decoder always emits NV12 (the canonical pixel format for downstream consumers per `description.md §3` and what NVDEC produces natively on Jetson). A reusable `sws_scale` context converts whatever the inner decoder returned (typically YUV420P from libx264 software, NV12 from NVDEC). Non-Send `SwsContext` is wrapped with `unsafe impl Send for FfmpegDecoder` — the safety justification (exclusive ownership by the spawned lifecycle task) is documented in `decoder.rs`.
3. **Stats**`DecodeStats` is a lock-free counter set with a 1024-sample ring buffer behind `parking_lot::Mutex` for p50/p99 readout. Cold-start metric (`decode_ms_first_frame`) is recorded only on the first successful decode per session; subsequent calls are no-ops.
4. **Trait shape**`FrameDecoder::decode(payload, out: &mut Vec<DecodedPixels>)` instead of `Result<Frame>` because FFmpeg may buffer encoded packets internally before producing any decoded frames (e.g. while assembling SPS/PPS for the first IDR). Zero, one, or many frames per call.
5. **Timestamp boundary** — capture timestamp + sequence number are taken **before** the decoder runs (the moment the lifecycle loop pulls the packet off the transport). `decode_ts_monotonic_ns` is read after the decoder returns. This matches `description.md §4` and gives `movement_detector` accurate frame-arrival timestamps for the telemetry-skew gate.
## Self-review findings
| # | Severity | Category | Location | Finding | Disposition |
|---|----------|----------|----------|---------|-------------|
| 1 | Low | Maintainability | `decoder.rs::is_eagain` | Detects EAGAIN by string-matching `Error` Display output rather than a typed errno. Reason: `ffmpeg-next` does not re-export the EAGAIN constant across its 48 versions in a stable shape. | Accepted as a small surface area (only used inside the decode loop); will be tightened when FFmpeg 9 changes the error variants. |
| 2 | Low | Architecture | `crates/autopilot/src/runtime.rs:84` | Pre-existing dead-code warning on `vlm_provider_name` — leftover entry exists. | Out of batch 16 scope (different component); leftover stays for the next batch that touches autopilot. |
| 3 | Info | Spec gap (out of scope) | `crates/frame_ingest/src/internal/rtsp_client.rs:5-12` | The AZ-657 author's docstring says "the full RTSP client is folded into AZ-658 alongside the decoder". The AZ-658 task spec **explicitly excludes** RTSP lifecycle ("Excluded: RTSP session lifecycle (task 18)"). The real production RTSP `RtspTransport` impl is therefore still TBD — it will be a separate follow-up task or wired during runtime composition. | Not a regression; not in AZ-658 scope. The Product Implementation Completeness Gate (Step 15) will surface this if the system needs it before final reporting. |
## Test results
```
running 17 tests (frame_ingest unit + lib tests)
test result: ok. 17 passed; 0 failed; 0 ignored
running 3 tests (tests/decoder_pipeline.rs)
test ac3_corrupted_frame_is_counted_and_does_not_abort_stream ... ok
test ac1_ac4_software_decode_preserves_throughput_and_monotonicity ... ok
test ac2_nvdec_backend_selected_on_cuda_host ... ignored, AC-2 positive: requires a CUDA-capable FFmpeg
test result: ok. 2 passed; 0 failed; 1 ignored
running 5 tests (tests/rtsp_lifecycle.rs)
test result: ok. 5 passed; 0 failed; 0 ignored
```
## Quality gates
- `cargo check --workspace --all-targets` → clean (only the documented pre-existing autopilot dead-code warning)
- `cargo clippy -p frame_ingest --all-targets -- -D warnings` → clean
- `cargo fmt -p frame_ingest --check` → clean
## Next Batch
Batch 17 candidates (ready by deps):
- AZ-680 `operator_bridge_command_dispatch` (3 pts)
- AZ-681 `operator_bridge_safety_and_bit_ack` (3 pts)
- AZ-659 `frame_ingest_publisher` (3 pts) — newly unblocked because AZ-658 is now in `done/`
Suggested grouping: AZ-680 + AZ-681 (tightly coupled — both depend on AZ-678 operator_bridge command auth). AZ-659 fits a separate batch focused on the frame_ingest pipeline's tail.
## Cumulative review cadence
Last cumulative: batches 1315 (`cumulative_review_batches_13-15_cycle1_report.md`). Next due: end of batch 18 (no cumulative review for batch 16).
+13 -13
View File
@@ -4,27 +4,27 @@
flow: greenfield flow: greenfield
step: 7 step: 7
name: Implement name: Implement
status: between-batches status: in_progress
sub_step: sub_step:
phase: 0 phase: 11
name: batch-16-select name: commit
detail: "" detail: "batch 16 — AZ-658 awaiting commit + push approval"
retry_count: 0 retry_count: 0
cycle: 1 cycle: 1
tracker: jira tracker: jira
## Last Completed Batch ## Last Completed Batch
batch: 15 batch: 16
commit: ccf929a commit: pending
ticket: AZ-676 / AZ-677 / AZ-678 / AZ-679 ticket: AZ-658
jira_status: In Testing (all 4 confirmed via read-back) jira_status: In Progress (transition to In Testing pending commit)
pushed_to: origin/dev pushed_to: pending
report: _docs/03_implementation/batch_15_cycle1_report.md report: _docs/03_implementation/batch_16_cycle1_report.md
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
## Process Leftovers ## Process Leftovers
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`C5 replay - `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`out-of-scope for batch 16
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`C6 fix recipe - `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`out-of-scope for batch 16
## Cumulative Review Cadence ## Cumulative Review Cadence
Last cumulative: batches 1315 (just produced). Next due: end of batch 18. Last cumulative: batches 1315. Next due: end of batch 18.
+6
View File
@@ -15,6 +15,12 @@ async-trait = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
bytes = { workspace = true } bytes = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
parking_lot = { workspace = true }
# AZ-658: H.264/265 decode via FFmpeg (libavcodec). NVDEC support is
# probed at runtime by looking up `h264_cuvid` / `hevc_cuvid` through
# `ffmpeg::codec::decoder::find_by_name`; no separate feature flag is
# required.
ffmpeg-next = { workspace = true }
[dev-dependencies] [dev-dependencies]
tokio = { workspace = true, features = ["test-util"] } tokio = { workspace = true, features = ["test-util"] }
+610
View File
@@ -0,0 +1,610 @@
//! AZ-658 — H.264/265 decoder with NVDEC primary + software fallback.
//!
//! This module owns the production decode path required by the task:
//! **real NVDEC binding when present, real software fallback always**.
//! Both code paths exist as production code (per task spec → Runtime
//! Completeness); the runtime selection between them is a startup
//! probe of FFmpeg's decoder registry, not a feature flag.
//!
//! ## Design
//!
//! The lifecycle loop in [`crate::lib::lifecycle_loop`] receives raw
//! RTSP payload bytes from the transport. Those bytes are:
//!
//! 1. NAL units in Annex-B format (start-code prefixed `00 00 00 01`)
//! when the transport is the production FFmpeg avformat-backed
//! client (avformat hands access-unit-aligned packets in Annex-B
//! by default for RTSP); or
//! 2. Whatever bytes a test transport pushes (the AZ-658 integration
//! test feeds a synthetic H.264 stream produced in-process).
//!
//! Either way the bytes are funnelled into [`FrameDecoder::decode`].
//! Each call may produce **zero or more** decoded frames (the FFmpeg
//! API can buffer encoded packets internally before any decoded
//! frame is ready, e.g. while the SPS/PPS for the first IDR are
//! still being assembled), so the trait pushes results into an
//! out-buffer instead of returning a single `Result<Frame, _>`.
//!
//! ## Backend selection
//!
//! Construction tries the NVDEC variants first. On a Jetson Orin
//! Nano with the FFmpeg-cuda packages installed, `find_by_name`
//! resolves `h264_cuvid` / `hevc_cuvid` and the decoder opens with
//! [`DecoderBackend::Nvdec`]. On a pure-CPU host (CI, this Mac dev
//! box) those names resolve to `None` and we fall back to the
//! software `h264` / `hevc` decoders → [`DecoderBackend::Software`].
//! There is no manual override; deployments that want NVDEC must
//! ship a CUDA-capable FFmpeg.
//!
//! ## Stats
//!
//! `description.md §3` mandates `decode_ms_p50`, `decode_ms_p99`,
//! `decoder_backend`, `decode_errors_total`, plus a one-shot cold
//! start metric (`decode_ms_first_frame`). The lock-free
//! [`DecodeStats`] counter set is updated by the lifecycle loop; the
//! handle re-reads it on every `health()` call.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use ffmpeg_next as ffmpeg;
use parking_lot::Mutex;
use shared::models::frame::PixelFormat;
use thiserror::Error;
/// Codec the lifecycle loop is decoding. Picked at session open from
/// the camera config (`RtspSessionConfig` carries the negotiated codec
/// once the production transport lands; for now the only consumer is
/// AZ-658 tests that always pass `Codec::H264`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Codec {
H264,
Hevc,
}
impl Codec {
fn nvdec_name(&self) -> &'static str {
match self {
Codec::H264 => "h264_cuvid",
Codec::Hevc => "hevc_cuvid",
}
}
fn software_name(&self) -> &'static str {
match self {
Codec::H264 => "h264",
Codec::Hevc => "hevc",
}
}
}
/// Which backend was selected at construction. Surfaced through
/// `FrameIngestHandle::decoder_backend()` so the operator UI and AC-2
/// can verify the selection rule from outside the crate.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DecoderBackend {
Nvdec,
Software,
}
/// Errors emitted by [`FrameDecoder::decode`]. The lifecycle loop
/// counts every variant towards `decode_errors_total` and continues
/// — single-frame decode errors must never abort the stream
/// (`description.md §6`, AC-3).
#[derive(Debug, Error)]
pub enum DecodeError {
#[error("send_packet failed: {0}")]
SendPacket(ffmpeg::Error),
#[error("receive_frame failed: {0}")]
ReceiveFrame(ffmpeg::Error),
#[error("unsupported decoded pixel format: {0:?}")]
UnsupportedPixelFormat(ffmpeg::format::Pixel),
#[error("decoded frame had zero dimensions")]
EmptyFrame,
}
/// Errors emitted at decoder-construction time. The lifecycle loop
/// treats this as a hard-fail — a session whose codec we cannot open
/// at all is operationally identical to `OpenError::UnsupportedProfile`
/// and the FSM lands in `Failing { attempt: u32::MAX }`.
#[derive(Debug, Error)]
pub enum DecoderInitError {
#[error("FFmpeg init failed: {0}")]
FfmpegInit(ffmpeg::Error),
#[error("no FFmpeg decoder registered for {codec:?}")]
NoDecoderRegistered { codec: Codec },
#[error("FFmpeg decoder open failed: {0}")]
OpenFailed(ffmpeg::Error),
}
/// One decoded frame's worth of pixel data + its observed dimensions.
/// The lifecycle loop wraps this into a `shared::models::frame::Frame`
/// alongside the capture/decode timestamps from
/// [`crate::internal::timestamp::FrameStamper`].
#[derive(Debug, Clone)]
pub struct DecodedPixels {
pub pixels: Bytes,
pub width: u32,
pub height: u32,
pub pix_fmt: PixelFormat,
/// Decode latency for THIS frame (decoder-internal, measured
/// across `send_packet + receive_frame`). Used by the stats
/// histogram; the lifecycle still computes its own
/// "capture → publish" latency separately for the §8 NFR.
pub decode_duration: Duration,
}
/// Trait implemented by both the production [`FfmpegDecoder`] and
/// any test stub. The lifecycle loop holds it as
/// `Box<dyn FrameDecoder + Send>`.
///
/// Object-safe by construction: no generics, no `Self` returns.
pub trait FrameDecoder: Send {
fn backend(&self) -> DecoderBackend;
/// Feed encoded bytes into the decoder. May produce zero or more
/// decoded frames (the FFmpeg API can hold a packet internally
/// while waiting for SPS/PPS or B-frame reorder buffers).
/// Decoded frames are pushed into `out`; the call returns
/// `Ok(())` when every frame the decoder could produce from
/// these bytes has been pushed.
///
/// On error, `out` may be partially populated — frames pushed
/// before the error are still valid; the caller must drop the
/// failing packet but keep the decoder for the next call.
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError>;
}
/// FFmpeg-backed decoder. Holds the open `decoder::Video`, a sws
/// scaler that converts whatever pixel format the decoder produces
/// into NV12 (the canonical pixel format for downstream consumers),
/// and reusable scratch frames so each `decode` call avoids
/// allocation in the hot path.
pub struct FfmpegDecoder {
decoder: ffmpeg::decoder::Video,
backend: DecoderBackend,
/// Lazily constructed once we observe the decoder's output pixel
/// format on the first decoded frame. NV12 is the sentinel target
/// because Jetson NVDEC outputs NV12 natively and the operator
/// stream encoder expects NV12 (`description.md §3`).
scaler: Option<ffmpeg::software::scaling::Context>,
raw: ffmpeg::frame::Video,
converted: ffmpeg::frame::Video,
in_packet: ffmpeg::codec::packet::Packet,
}
impl FfmpegDecoder {
/// Construct a real decoder for `codec`. Tries `h264_cuvid` /
/// `hevc_cuvid` first; falls back to the software decoder if the
/// cuvid variant is not registered (no CUDA host) OR if it
/// fails to open (e.g. a CUDA-capable FFmpeg without a runtime
/// driver). On a fully missing software decoder we hard-fail.
pub fn new(codec: Codec) -> Result<Self, DecoderInitError> {
// `ffmpeg::init()` is idempotent and safe to call concurrently;
// the underlying `av_register_all` was removed in FFmpeg 4.0,
// so this just ensures the network init for RTSP is done.
ffmpeg::init().map_err(DecoderInitError::FfmpegInit)?;
let (decoder, backend) = open_with_backend(codec)?;
Ok(Self {
decoder,
backend,
scaler: None,
raw: ffmpeg::frame::Video::empty(),
converted: ffmpeg::frame::Video::empty(),
in_packet: ffmpeg::codec::packet::Packet::empty(),
})
}
fn ensure_scaler(
&mut self,
src_fmt: ffmpeg::format::Pixel,
width: u32,
height: u32,
) -> Result<&mut ffmpeg::software::scaling::Context, DecodeError> {
// Build / rebuild the scaler whenever the source format or
// dimensions change. NVDEC and software paths can both emit
// YUV420P or NV12 depending on the camera; we converge on
// NV12 for downstream consumers (`description.md §3`).
let needs_rebuild = match self.scaler.as_ref() {
None => true,
Some(s) => {
s.input().format != src_fmt
|| s.input().width != width
|| s.input().height != height
}
};
if needs_rebuild {
let ctx = ffmpeg::software::scaling::Context::get(
src_fmt,
width,
height,
ffmpeg::format::Pixel::NV12,
width,
height,
ffmpeg::software::scaling::Flags::BILINEAR,
)
.map_err(|e| {
// Scaler-build failure is reported as a per-frame
// decode error so the lifecycle counts it and drops
// the frame; if the same format keeps failing, the
// sustained `decode_errors_total` will surface
// through health.
DecodeError::ReceiveFrame(e)
})?;
self.scaler = Some(ctx);
}
Ok(self.scaler.as_mut().expect("just inserted"))
}
}
fn open_with_backend(
codec: Codec,
) -> Result<(ffmpeg::decoder::Video, DecoderBackend), DecoderInitError> {
// Try NVDEC first. `find_by_name` resolves `None` on hosts where
// the cuvid decoder is not registered (the macOS dev box, CI
// without CUDA, etc.).
if let Some(nv) = ffmpeg::codec::decoder::find_by_name(codec.nvdec_name()) {
match try_open(nv) {
Ok(d) => {
tracing::info!(
backend = "nvdec",
codec = ?codec,
"frame_ingest decoder opened with NVDEC"
);
return Ok((d, DecoderBackend::Nvdec));
}
Err(e) => {
tracing::warn!(
error = %e,
codec = ?codec,
"NVDEC decoder registered but failed to open; falling back to software"
);
}
}
}
let sw = ffmpeg::codec::decoder::find_by_name(codec.software_name())
.ok_or(DecoderInitError::NoDecoderRegistered { codec })?;
let opened = try_open(sw)?;
tracing::info!(
backend = "software",
codec = ?codec,
"frame_ingest decoder opened with software fallback"
);
Ok((opened, DecoderBackend::Software))
}
fn try_open(codec: ffmpeg::Codec) -> Result<ffmpeg::decoder::Video, DecoderInitError> {
let ctx = ffmpeg::codec::Context::new();
let opened = ctx
.decoder()
.open_as(codec)
.map_err(DecoderInitError::OpenFailed)?;
opened.video().map_err(DecoderInitError::OpenFailed)
}
// SAFETY:
// `ffmpeg_next::software::scaling::Context` (sws scaler) wraps a
// `*mut SwsContext`, so the auto-trait analysis flags it `!Send`.
// FFmpeg's sws context is documented as **single-thread-owned** but
// safe to MOVE between threads as long as no two threads use the
// same instance concurrently (the same invariant Rust's `Send`
// expresses). The `FfmpegDecoder` is held inside `Box<dyn
// FrameDecoder + Send>` and is *only* ever called from the spawned
// `lifecycle_loop` tokio task, which has exclusive `&mut`. No other
// task can observe the inner pointer; the `Send` here transfers
// ownership at construction (one thread builds the decoder, the
// spawned task is the sole subsequent user) — exactly the case
// `unsafe impl Send` is intended for.
unsafe impl Send for FfmpegDecoder {}
impl FrameDecoder for FfmpegDecoder {
fn backend(&self) -> DecoderBackend {
self.backend
}
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError> {
let send_started = std::time::Instant::now();
// FFmpeg requires the packet's data to outlive `send_packet`,
// so we copy here. The cost is one memcpy of NAL-unit bytes
// (typically <100 KB per packet at 1080p); negligible
// compared to the decode itself.
self.in_packet = ffmpeg::codec::packet::Packet::copy(payload);
self.decoder
.send_packet(&self.in_packet)
.map_err(DecodeError::SendPacket)?;
loop {
match self.decoder.receive_frame(&mut self.raw) {
Ok(()) => {
let decode_duration = send_started.elapsed();
let src_fmt = self.raw.format();
let w = self.raw.width();
let h = self.raw.height();
if w == 0 || h == 0 {
return Err(DecodeError::EmptyFrame);
}
self.ensure_scaler(src_fmt, w, h)?;
let scaler = self.scaler.as_mut().expect("ensure_scaler set this");
scaler
.run(&self.raw, &mut self.converted)
.map_err(DecodeError::ReceiveFrame)?;
let nv12_bytes = pack_nv12(&self.converted, w, h)?;
out.push(DecodedPixels {
pixels: nv12_bytes,
width: w,
height: h,
pix_fmt: PixelFormat::Nv12,
decode_duration,
});
}
Err(e) => {
// FFmpeg returns EAGAIN (insufficient input) and
// EOF as `Error::Other` variants; those are
// expected control flow, not failures. We treat
// any other error as a per-frame error.
if is_eagain(&e) || is_eof(&e) {
return Ok(());
}
return Err(DecodeError::ReceiveFrame(e));
}
}
}
}
}
fn is_eagain(err: &ffmpeg::Error) -> bool {
// FFmpeg's `ffmpeg-next` exposes EAGAIN as `Error::Other { errno: AVERROR(EAGAIN) }`
// — we identify it by string match because the constant isn't
// re-exported across crate versions.
let s = format!("{err}");
s.contains("Resource temporarily unavailable") || s.contains("EAGAIN")
}
fn is_eof(err: &ffmpeg::Error) -> bool {
matches!(err, ffmpeg::Error::Eof)
}
/// Copy a planar NV12 frame's two planes (Y then UV) into a single
/// `Bytes` buffer of length `w*h + (w*h)/2`. Uses the frame's per-
/// plane stride (which can exceed `w` due to FFmpeg's alignment
/// padding) to avoid leaking that padding into the downstream
/// consumer-visible buffer.
fn pack_nv12(frame: &ffmpeg::frame::Video, width: u32, height: u32) -> Result<Bytes, DecodeError> {
let w = width as usize;
let h = height as usize;
let y_size = w * h;
let uv_size = (w * h) / 2;
let mut out = Vec::with_capacity(y_size + uv_size);
let y_plane = frame.data(0);
let y_stride = frame.stride(0);
if y_stride < w {
return Err(DecodeError::EmptyFrame);
}
for row in 0..h {
let start = row * y_stride;
let end = start + w;
if end > y_plane.len() {
return Err(DecodeError::EmptyFrame);
}
out.extend_from_slice(&y_plane[start..end]);
}
let uv_plane = frame.data(1);
let uv_stride = frame.stride(1);
let uv_rows = h / 2;
if uv_stride < w {
return Err(DecodeError::EmptyFrame);
}
for row in 0..uv_rows {
let start = row * uv_stride;
let end = start + w;
if end > uv_plane.len() {
return Err(DecodeError::EmptyFrame);
}
out.extend_from_slice(&uv_plane[start..end]);
}
Ok(Bytes::from(out))
}
/// Lock-free counter set fed by the lifecycle loop on every decode
/// call. Mirrors the `description.md §3` health surface:
///
/// - `decode_errors_total` — incremented on every failed decode.
/// - `first_frame_decode_duration_ns` — recorded once per session
/// open (set when the first successful decode lands; later writes
/// are no-ops).
/// - `recent_durations` — small ring buffer for p50/p99 readout. Kept
/// behind a `parking_lot::Mutex` because the operations are
/// batched (one push per frame) and the lock window is a single
/// array index update; the lifecycle loop runs in a single tokio
/// task so contention is bounded to "lifecycle vs. health-server
/// readout".
#[derive(Debug)]
pub struct DecodeStats {
pub decode_errors_total: AtomicU64,
pub first_frame_decode_duration_ns: AtomicU64,
pub frames_decoded_total: AtomicU64,
recent_durations_ns: Mutex<RingBuffer>,
}
impl Default for DecodeStats {
fn default() -> Self {
Self::new()
}
}
impl DecodeStats {
pub const RING_CAP: usize = 1024;
pub fn new() -> Self {
Self {
decode_errors_total: AtomicU64::new(0),
first_frame_decode_duration_ns: AtomicU64::new(0),
frames_decoded_total: AtomicU64::new(0),
recent_durations_ns: Mutex::new(RingBuffer::new(Self::RING_CAP)),
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
pub fn note_decode_error(&self) {
self.decode_errors_total.fetch_add(1, Ordering::Relaxed);
}
pub fn note_decoded(&self, duration: Duration) {
let prev_count = self.frames_decoded_total.fetch_add(1, Ordering::Relaxed);
let ns = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
if prev_count == 0 {
// Only the first writer sets the cold-start metric; all
// subsequent decodes are no-ops on this field.
self.first_frame_decode_duration_ns
.store(ns, Ordering::Relaxed);
}
self.recent_durations_ns.lock().push(ns);
}
pub fn p50_ns(&self) -> Option<u64> {
self.percentile_ns(0.50)
}
pub fn p99_ns(&self) -> Option<u64> {
self.percentile_ns(0.99)
}
fn percentile_ns(&self, q: f64) -> Option<u64> {
let buf = self.recent_durations_ns.lock();
if buf.len() == 0 {
return None;
}
let mut snap: Vec<u64> = buf.iter().collect();
snap.sort_unstable();
let idx = ((snap.len() as f64) * q).floor() as usize;
let idx = idx.min(snap.len() - 1);
Some(snap[idx])
}
}
#[derive(Debug)]
struct RingBuffer {
buf: Vec<u64>,
head: usize,
cap: usize,
/// Number of items that have actually been written. Saturates at
/// `cap` once the ring is full.
len: usize,
}
impl RingBuffer {
fn new(cap: usize) -> Self {
Self {
buf: vec![0; cap],
head: 0,
cap,
len: 0,
}
}
fn push(&mut self, v: u64) {
self.buf[self.head] = v;
self.head = (self.head + 1) % self.cap;
if self.len < self.cap {
self.len += 1;
}
}
fn len(&self) -> usize {
self.len
}
fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.buf.iter().take(self.len).copied()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ffmpeg_decoder_falls_back_to_software_on_macos_dev_host() {
// Arrange — the macOS dev box ships ffmpeg without CUDA so
// `h264_cuvid` is not registered and the decoder must select
// Software.
let dec = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open");
// Assert
assert_eq!(dec.backend(), DecoderBackend::Software);
}
#[test]
fn ring_buffer_tracks_recent_window() {
// Arrange
let mut r = RingBuffer::new(3);
// Act
r.push(10);
r.push(20);
r.push(30);
r.push(40);
// Assert — oldest entry was overwritten by the wrap.
let v: Vec<u64> = r.iter().collect();
// After wrap-around, the in-buffer order is [40, 20, 30].
// Iteration order is not promised by the buffer; what
// matters for percentile correctness is the SET of values.
let mut sorted = v.clone();
sorted.sort_unstable();
assert_eq!(sorted, vec![20, 30, 40]);
}
#[test]
fn decode_stats_records_first_frame_duration_only_once() {
// Arrange
let s = DecodeStats::new();
// Act
s.note_decoded(Duration::from_millis(7));
s.note_decoded(Duration::from_millis(99));
// Assert
assert_eq!(
s.first_frame_decode_duration_ns.load(Ordering::Relaxed),
Duration::from_millis(7).as_nanos() as u64,
"second decode must not overwrite first-frame metric"
);
assert_eq!(s.frames_decoded_total.load(Ordering::Relaxed), 2);
}
#[test]
fn decode_stats_p50_p99_reflect_sample_distribution() {
// Arrange
let s = DecodeStats::new();
for i in 1..=100u64 {
s.note_decoded(Duration::from_millis(i));
}
// Act
let p50 = s.p50_ns().expect("non-empty");
let p99 = s.p99_ns().expect("non-empty");
// Assert — 50th of 100 sorted ms-values is the 50th sample;
// 99th is the 99th sample. Allow ±1 ms slack for floor()
// index rounding.
assert!(
p50 >= Duration::from_millis(49).as_nanos() as u64
&& p50 <= Duration::from_millis(51).as_nanos() as u64,
"p50 = {p50}"
);
assert!(
p99 >= Duration::from_millis(98).as_nanos() as u64
&& p99 <= Duration::from_millis(100).as_nanos() as u64,
"p99 = {p99}"
);
}
}
+2
View File
@@ -1,4 +1,6 @@
//! Internal modules for `frame_ingest`. Not part of the public API. //! Internal modules for `frame_ingest`. Not part of the public API.
pub mod decoder;
pub mod lifecycle; pub mod lifecycle;
pub mod rtsp_client; pub mod rtsp_client;
pub mod timestamp;
@@ -0,0 +1,153 @@
//! AZ-658 — frame timestamping helpers.
//!
//! `description.md §4` requires every emitted [`Frame`] to carry a
//! monotonic capture timestamp stamped at the earliest practical
//! point in the pipeline (the moment the lifecycle loop receives an
//! RTSP packet from the transport). The decoder runs *after* that
//! point, so the [`Frame::decode_ts_monotonic_ns`] field records when
//! `FrameDecoder::decode` returned — the difference is the per-frame
//! decode latency that feeds the `decode_ms_p50` / `decode_ms_p99` /
//! `decode_ms_first_frame` health metrics.
//!
//! This module owns:
//! - [`SeqCounter`] — a strictly-monotonic `u64` sequence number used
//! as the frame's identity downstream of the decoder. Saturates at
//! `u64::MAX` so a session that never restarts cannot wrap and
//! produce duplicate IDs (saturating is preferred over wrapping
//! here because `movement_detector` keys per-frame state by `seq`
//! and a wrap would corrupt that map).
//! - [`FrameStamper`] — pairs a `MonoClock` and a `SeqCounter` so the
//! lifecycle loop has one place to read both timestamps for a
//! single packet → frame transition.
use shared::clock::MonoClock;
/// Strictly-monotonic frame sequence counter. Saturates at
/// `u64::MAX`; in practice a 30 fps stream takes ~19.5 billion years
/// to overflow `u64`, so saturation behaviour is observable only as a
/// post-condition for tests with `u64::MAX - 1` priming.
#[derive(Debug, Default)]
pub struct SeqCounter {
next: u64,
}
impl SeqCounter {
pub fn new() -> Self {
Self { next: 0 }
}
/// Returns the next sequence number and advances internal state.
/// Saturates at `u64::MAX` (subsequent calls keep returning
/// `u64::MAX`). Named `advance` rather than `next` so that the
/// type does not collide with `Iterator::next` semantics in
/// caller code (and to satisfy `clippy::should_implement_trait`
/// — `SeqCounter` is intentionally NOT an Iterator: an unbounded
/// monotonic counter has no natural `None` terminator).
pub fn advance(&mut self) -> u64 {
let s = self.next;
self.next = self.next.saturating_add(1);
s
}
}
/// Holds a clock + sequence counter so the lifecycle loop only has
/// to call [`FrameStamper::capture`] (immediately on packet receipt)
/// and [`FrameStamper::decoded`] (immediately after decode returns)
/// to produce both monotonic timestamps for the next frame.
#[derive(Debug)]
pub struct FrameStamper {
clock: MonoClock,
seq: SeqCounter,
}
impl FrameStamper {
pub fn new(clock: MonoClock) -> Self {
Self {
clock,
seq: SeqCounter::new(),
}
}
/// Snapshot the capture-side timestamp + sequence number. Call
/// this the moment the transport hands us the packet, BEFORE
/// invoking the decoder. The capture timestamp is the head of
/// the per-frame latency budget (`description.md §8`: ≤30 ms p99
/// from RTSP rx → publish on Jetson Orin Nano).
pub fn capture(&mut self) -> CaptureMark {
CaptureMark {
seq: self.seq.advance(),
ts_ns: self.clock.elapsed_ns(),
}
}
/// Read the decode-side timestamp at the moment
/// `FrameDecoder::decode` returned. Used both for the emitted
/// `Frame::decode_ts_monotonic_ns` field and to compute
/// `decode_duration = decode_ts - capture_ts` for the histogram.
pub fn decoded(&self) -> u64 {
self.clock.elapsed_ns()
}
}
/// One capture-side mark per packet. Carried through the decode call
/// so the emitted `Frame` keeps the timestamp from packet receipt,
/// not from after-decode.
#[derive(Debug, Clone, Copy)]
pub struct CaptureMark {
pub seq: u64,
pub ts_ns: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn seq_counter_is_strictly_monotonic() {
// Arrange
let mut c = SeqCounter::new();
// Act
let a = c.advance();
let b = c.advance();
let d = c.advance();
// Assert
assert_eq!(a, 0);
assert_eq!(b, 1);
assert_eq!(d, 2);
}
#[test]
fn seq_counter_saturates_at_max_instead_of_wrapping() {
// Arrange — prime to u64::MAX - 1 by direct field assignment
// so the test runs in O(1).
let mut c = SeqCounter { next: u64::MAX - 1 };
// Act
let a = c.advance();
let b = c.advance();
let d = c.advance();
// Assert — once we hit MAX, every subsequent call must keep
// returning MAX (no wrap to 0).
assert_eq!(a, u64::MAX - 1);
assert_eq!(b, u64::MAX);
assert_eq!(d, u64::MAX);
}
#[test]
fn frame_stamper_capture_advances_seq_and_ts() {
// Arrange
let mut s = FrameStamper::new(MonoClock::new());
// Act
let m1 = s.capture();
let m2 = s.capture();
// Assert
assert_eq!(m1.seq, 0);
assert_eq!(m2.seq, 1);
assert!(m2.ts_ns >= m1.ts_ns, "monotonic clock went backwards");
}
}
+123 -38
View File
@@ -3,26 +3,22 @@
//! Real implementation lands in: //! Real implementation lands in:
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded //! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
//! reconnect + AI-lock plumb (this crate, modules in `internal/`). //! reconnect + AI-lock plumb (this crate, modules in `internal/`).
//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode into raw //! - AZ-658 `frame_ingest_decoder` — H.264/265 decode (NVDEC + sw
//! pixel buffers + retina/FFmpeg/GStreamer transport binding. //! fallback) + per-frame monotonic timestamping + decode stats
//! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`).
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer //! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
//! drop policy. //! drop policy.
//! //!
//! ## AZ-657 surface //! ## AZ-658 surface (extends AZ-657)
//! //!
//! - [`FrameIngest::new`] — construct in `Closed` state. //! `FrameIngest::run` now takes a [`FrameDecoder`]. The lifecycle loop
//! - [`FrameIngest::run`] — spawn the lifecycle loop driving the given //! stamps the capture timestamp the moment a packet leaves the
//! `RtspTransport` through `connect → stream → reconnect` cycles //! transport, hands the encoded payload to the decoder, and emits one
//! with bounded backoff. Returns a `JoinHandle`. //! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set
//! - [`FrameIngestHandle::subscribe`] — broadcast frame stream (the //! when the decoder returned. Single-frame decode errors increment
//! AZ-657 lifecycle emits only synthetic header frames; real //! `decode_errors_total` and drop the frame; the stream is never
//! decoded frames come in AZ-658). //! aborted (AC-3). The decoder backend (`Nvdec` / `Software`) is
//! - [`FrameIngestHandle::set_ai_lock`] — `bringCameraDown` / //! observable via [`FrameIngestHandle::decoder_backend`].
//! `bringCameraUp` signal. Stamps `Frame.ai_locked` on every
//! subsequently emitted frame.
//! - [`FrameIngestHandle::session_state`] — current FSM state.
//! - [`FrameIngestHandle::health`] — `ComponentHealth` reflecting the
//! FSM state + `last_packet_age` + `ai_locked`.
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
@@ -37,10 +33,15 @@ use shared::models::frame::Frame;
pub mod internal; pub mod internal;
pub use internal::decoder::{
Codec, DecodeError, DecodeStats, DecodedPixels, DecoderBackend, DecoderInitError,
FfmpegDecoder, FrameDecoder,
};
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState}; pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
pub use internal::rtsp_client::{ pub use internal::rtsp_client::{
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
}; };
pub use internal::timestamp::FrameStamper;
use internal::lifecycle::{transition, Trigger}; use internal::lifecycle::{transition, Trigger};
@@ -56,7 +57,9 @@ pub struct FrameIngest {
ai_lock_tx: watch::Sender<bool>, ai_lock_tx: watch::Sender<bool>,
state_tx: watch::Sender<SessionState>, state_tx: watch::Sender<SessionState>,
shutdown_tx: watch::Sender<bool>, shutdown_tx: watch::Sender<bool>,
backend_tx: watch::Sender<Option<DecoderBackend>>,
stats: Arc<LifecycleStats>, stats: Arc<LifecycleStats>,
decode_stats: Arc<DecodeStats>,
backoff: BackoffPolicy, backoff: BackoffPolicy,
clock: MonoClock, clock: MonoClock,
} }
@@ -74,12 +77,15 @@ impl FrameIngest {
let (ai_lock_tx, _) = watch::channel(false); let (ai_lock_tx, _) = watch::channel(false);
let (state_tx, _) = watch::channel(SessionState::Closed); let (state_tx, _) = watch::channel(SessionState::Closed);
let (shutdown_tx, _) = watch::channel(false); let (shutdown_tx, _) = watch::channel(false);
let (backend_tx, _) = watch::channel(None);
Self { Self {
tx, tx,
ai_lock_tx, ai_lock_tx,
state_tx, state_tx,
shutdown_tx, shutdown_tx,
backend_tx,
stats: LifecycleStats::new(), stats: LifecycleStats::new(),
decode_stats: DecodeStats::shared(),
backoff, backoff,
clock: MonoClock::new(), clock: MonoClock::new(),
} }
@@ -91,36 +97,50 @@ impl FrameIngest {
ai_lock_tx: self.ai_lock_tx.clone(), ai_lock_tx: self.ai_lock_tx.clone(),
state_rx: self.state_tx.subscribe(), state_rx: self.state_tx.subscribe(),
shutdown_tx: self.shutdown_tx.clone(), shutdown_tx: self.shutdown_tx.clone(),
backend_rx: self.backend_tx.subscribe(),
stats: Arc::clone(&self.stats), stats: Arc::clone(&self.stats),
decode_stats: Arc::clone(&self.decode_stats),
clock: self.clock, clock: self.clock,
} }
} }
/// Spawn the lifecycle loop. The returned handle resolves when /// Spawn the lifecycle loop. Returns a `JoinHandle` that resolves
/// the loop exits (shutdown signalled via /// when the loop exits (shutdown signalled via
/// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM). /// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM).
pub fn run<T>(&self, transport: T, config: RtspSessionConfig) -> JoinHandle<()> ///
/// `decoder` is owned exclusively by the spawned task; only one
/// decoder is active per `FrameIngest` instance.
pub fn run<T, D>(&self, transport: T, decoder: D, config: RtspSessionConfig) -> JoinHandle<()>
where where
T: RtspTransport + 'static, T: RtspTransport + 'static,
D: FrameDecoder + 'static,
{ {
let tx = self.tx.clone(); let tx = self.tx.clone();
let ai_lock = self.ai_lock_tx.subscribe(); let ai_lock = self.ai_lock_tx.subscribe();
let state_tx = self.state_tx.clone(); let state_tx = self.state_tx.clone();
let backend_tx = self.backend_tx.clone();
let shutdown_rx = self.shutdown_tx.subscribe(); let shutdown_rx = self.shutdown_tx.subscribe();
let stats = Arc::clone(&self.stats); let stats = Arc::clone(&self.stats);
let decode_stats = Arc::clone(&self.decode_stats);
let backoff = self.backoff; let backoff = self.backoff;
let clock = self.clock; let clock = self.clock;
let transport = Arc::new(Mutex::new(transport)); let transport = Arc::new(Mutex::new(transport));
let decoder: Box<dyn FrameDecoder + Send> = Box::new(decoder);
// Snapshot the decoder backend immediately so it is observable
// even before the first packet.
backend_tx.send_replace(Some(decoder.backend()));
tokio::spawn(async move { tokio::spawn(async move {
lifecycle_loop( lifecycle_loop(
transport, transport,
decoder,
config, config,
tx, tx,
ai_lock, ai_lock,
state_tx, state_tx,
shutdown_rx, shutdown_rx,
stats, stats,
decode_stats,
backoff, backoff,
clock, clock,
) )
@@ -136,19 +156,22 @@ fn is_shutdown(rx: &watch::Receiver<bool>) -> bool {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn lifecycle_loop<T>( async fn lifecycle_loop<T>(
transport: Arc<Mutex<T>>, transport: Arc<Mutex<T>>,
mut decoder: Box<dyn FrameDecoder + Send>,
config: RtspSessionConfig, config: RtspSessionConfig,
tx: broadcast::Sender<Frame>, tx: broadcast::Sender<Frame>,
mut ai_lock: watch::Receiver<bool>, mut ai_lock: watch::Receiver<bool>,
state_tx: watch::Sender<SessionState>, state_tx: watch::Sender<SessionState>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
stats: Arc<LifecycleStats>, stats: Arc<LifecycleStats>,
decode_stats: Arc<DecodeStats>,
backoff: BackoffPolicy, backoff: BackoffPolicy,
clock: MonoClock, clock: MonoClock,
) where ) where
T: RtspTransport, T: RtspTransport,
{ {
let mut state = SessionState::Closed; let mut state = SessionState::Closed;
let mut seq: u64 = 0; let mut stamper = FrameStamper::new(clock);
let mut decoded_buffer: Vec<DecodedPixels> = Vec::with_capacity(4);
loop { loop {
if is_shutdown(&shutdown_rx) { if is_shutdown(&shutdown_rx) {
@@ -203,30 +226,48 @@ async fn lifecycle_loop<T>(
match packet { match packet {
Ok(pkt) => { Ok(pkt) => {
let now_ns = clock.elapsed_ns(); // Capture timestamp + sequence number are
stats.note_packet(now_ns); // taken at the EARLIEST point per
// `description.md §4` — before the decoder
// has run, so movement_detector's skew
// gate sees the original packet arrival
// time.
let mark = stamper.capture();
stats.note_packet(mark.ts_ns);
let locked = *ai_lock.borrow_and_update(); let locked = *ai_lock.borrow_and_update();
// AZ-657 emits a synthetic frame envelope decoded_buffer.clear();
// per inbound RTSP packet so the lifecycle match decoder.decode(&pkt.payload, &mut decoded_buffer) {
// FSM can be exercised end-to-end without Ok(()) => {
// the decoder (AZ-658 swaps this for the for dp in decoded_buffer.drain(..) {
// actual decoded frame). decode_stats.note_decoded(dp.decode_duration);
let frame = Frame { let frame = Frame {
seq, seq: mark.seq,
capture_ts_monotonic_ns: now_ns, capture_ts_monotonic_ns: mark.ts_ns,
decode_ts_monotonic_ns: now_ns, decode_ts_monotonic_ns: stamper.decoded(),
pixels: Arc::new(pkt.payload), pixels: Arc::new(dp.pixels),
width: 0, width: dp.width,
height: 0, height: dp.height,
pix_fmt: shared::models::frame::PixelFormat::Nv12, pix_fmt: dp.pix_fmt,
ai_locked: locked, ai_locked: locked,
}; };
seq = seq.saturating_add(1); // Send errors are no-ops when
// A no-subscriber send is a no-op error in // the broadcast has no
// the broadcast channel; the lifecycle // subscribers; per-consumer
// does not care. // back-pressure is AZ-659's
// problem.
let _ = tx.send(frame); let _ = tx.send(frame);
} }
}
Err(e) => {
decode_stats.note_decode_error();
tracing::warn!(
error = %e,
seq = mark.seq,
"frame_ingest dropped a frame on decode error"
);
}
}
}
Err(e) => { Err(e) => {
let trig = Trigger::from_stream_error(&e); let trig = Trigger::from_stream_error(&e);
let t = transition(state, trig, &backoff); let t = transition(state, trig, &backoff);
@@ -272,7 +313,9 @@ pub struct FrameIngestHandle {
ai_lock_tx: watch::Sender<bool>, ai_lock_tx: watch::Sender<bool>,
state_rx: watch::Receiver<SessionState>, state_rx: watch::Receiver<SessionState>,
shutdown_tx: watch::Sender<bool>, shutdown_tx: watch::Sender<bool>,
backend_rx: watch::Receiver<Option<DecoderBackend>>,
stats: Arc<LifecycleStats>, stats: Arc<LifecycleStats>,
decode_stats: Arc<DecodeStats>,
clock: MonoClock, clock: MonoClock,
} }
@@ -314,6 +357,44 @@ impl FrameIngestHandle {
self.stats.reopens_total.load(Ordering::Relaxed) self.stats.reopens_total.load(Ordering::Relaxed)
} }
/// Backend the active decoder selected at construction. `None`
/// before `FrameIngest::run` has been called.
pub fn decoder_backend(&self) -> Option<DecoderBackend> {
*self.backend_rx.borrow()
}
pub fn decode_errors_total(&self) -> u64 {
self.decode_stats
.decode_errors_total
.load(Ordering::Relaxed)
}
pub fn frames_decoded_total(&self) -> u64 {
self.decode_stats
.frames_decoded_total
.load(Ordering::Relaxed)
}
pub fn decode_ms_first_frame(&self) -> Option<Duration> {
let ns = self
.decode_stats
.first_frame_decode_duration_ns
.load(Ordering::Relaxed);
if ns == 0 && self.frames_decoded_total() == 0 {
None
} else {
Some(Duration::from_nanos(ns))
}
}
pub fn decode_ms_p50(&self) -> Option<Duration> {
self.decode_stats.p50_ns().map(Duration::from_nanos)
}
pub fn decode_ms_p99(&self) -> Option<Duration> {
self.decode_stats.p99_ns().map(Duration::from_nanos)
}
/// Request the lifecycle loop to drain to `Closed` and exit. The /// Request the lifecycle loop to drain to `Closed` and exit. The
/// loop races every transport call against this signal, so a /// loop races every transport call against this signal, so a
/// hung transport cannot wedge graceful exit. /// hung transport cannot wedge graceful exit.
@@ -366,6 +447,10 @@ mod tests {
let h = FrameIngest::new(8).handle(); let h = FrameIngest::new(8).handle();
assert_eq!(h.session_state(), SessionState::Closed); assert_eq!(h.session_state(), SessionState::Closed);
assert_eq!(h.health().level, HealthLevel::Disabled); assert_eq!(h.health().level, HealthLevel::Disabled);
assert!(
h.decoder_backend().is_none(),
"no decoder is wired until run() is called"
);
} }
#[test] #[test]
@@ -0,0 +1,386 @@
//! AZ-658 — decoder pipeline integration tests.
//!
//! These tests drive the **real** [`FfmpegDecoder`] (libavcodec) end
//! to end through the lifecycle loop. A synthetic H.264 bitstream is
//! produced in-process by libx264 (the same FFmpeg install that
//! `FfmpegDecoder` uses to decode), so the tests exercise the
//! production decode path rather than a stub.
//!
//! ACs covered here:
//! - AC-1 — software-path throughput preservation (≥95 % of input
//! frames decoded; sequence numbers strictly monotonic; decoder
//! backend reports `Software` on a CUDA-less host).
//! - AC-3 — a single corrupted "packet" between valid ones must
//! increment `decode_errors_total` exactly once and NOT abort the
//! stream.
//! - AC-4 — `capture_ts_monotonic_ns` is strictly increasing across
//! the emitted frame stream (rides on AC-1's setup).
//!
//! AC-2 (NVDEC selection on Jetson) cannot be exercised here — there
//! is no CUDA-capable FFmpeg on the dev/CI host. The unit-test
//! counterpart in `internal/decoder.rs::tests` asserts the negative
//! direction (CUDA-less host → Software backend); the positive
//! direction is validated on the Jetson at deployment time and is
//! covered by the Run Tests gate downstream of this batch.
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use ffmpeg_next as ffmpeg;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::timeout;
use frame_ingest::{
BackoffPolicy, Codec, DecoderBackend, FfmpegDecoder, FrameDecoder, FrameIngest, OpenError,
RtspPacket, RtspSessionConfig, RtspTransport, StreamError,
};
/// Synthetic H.264 bitstream generator. Encodes `num_frames` frames
/// of a checkerboard pattern at `width`x`height` and 30 fps with
/// libx264 (preset `ultrafast`, tune `zerolatency`, GOP every 30
/// frames so each test run gets a few IDRs). Returns a vector of
/// per-AVPacket byte blobs, each ready to feed into the decoder as
/// the payload of an `RtspPacket`.
fn synth_h264_stream(num_frames: usize, width: u32, height: u32) -> Vec<Bytes> {
ffmpeg::init().expect("ffmpeg init");
let codec = ffmpeg::codec::encoder::find_by_name("libx264")
.or_else(|| ffmpeg::codec::encoder::find_by_name("h264"))
.expect("an H.264 encoder must be registered");
let context = ffmpeg::codec::Context::new_with_codec(codec);
let mut encoder = context
.encoder()
.video()
.expect("encoder context yields video");
encoder.set_width(width);
encoder.set_height(height);
encoder.set_format(ffmpeg::format::Pixel::YUV420P);
encoder.set_time_base(ffmpeg::Rational::new(1, 30));
encoder.set_frame_rate(Some(ffmpeg::Rational::new(30, 1)));
encoder.set_gop(30);
encoder.set_max_b_frames(0);
let mut opts = ffmpeg::Dictionary::new();
opts.set("preset", "ultrafast");
opts.set("tune", "zerolatency");
let mut opened = encoder
.open_with(opts)
.expect("libx264 encoder must open with ultrafast/zerolatency");
let mut out = Vec::with_capacity(num_frames + 4);
let mut packet = ffmpeg::Packet::empty();
for i in 0..num_frames {
let mut input = ffmpeg::frame::Video::new(ffmpeg::format::Pixel::YUV420P, width, height);
// Fill Y plane with a per-frame gradient so the encoder has
// motion to compress (a constant frame is degenerate and
// libx264 can choose to emit zero packets for some inputs).
let y_stride = input.stride(0);
let y = input.data_mut(0);
for row in 0..height as usize {
let v = ((i + row) & 0xFF) as u8;
for col in 0..width as usize {
y[row * y_stride + col] = v ^ ((col & 0xFF) as u8);
}
}
for plane in 1..=2 {
let stride = input.stride(plane);
let data = input.data_mut(plane);
for row in 0..(height as usize) / 2 {
for col in 0..(width as usize) / 2 {
data[row * stride + col] = 128;
}
}
}
input.set_pts(Some(i as i64));
opened
.send_frame(&input)
.unwrap_or_else(|e| panic!("encoder send_frame ({i}) failed: {e}"));
while opened.receive_packet(&mut packet).is_ok() {
if let Some(d) = packet.data() {
out.push(Bytes::copy_from_slice(d));
}
}
}
opened.send_eof().expect("encoder eof");
while opened.receive_packet(&mut packet).is_ok() {
if let Some(d) = packet.data() {
out.push(Bytes::copy_from_slice(d));
}
}
assert!(
!out.is_empty(),
"synthetic encoder must produce at least one packet"
);
out
}
/// RTSP-shaped transport that replays a pre-built script of byte
/// blobs, then parks (so the FrameIngest task stays in `Streaming`
/// until the test calls `shutdown`). When the script is exhausted,
/// `next_packet` returns a parked future — the lifecycle loop's
/// `tokio::select!` against the shutdown watch is what unblocks
/// teardown.
struct ScriptedBytesTransport {
queue: Arc<AsyncMutex<VecDeque<ScriptItem>>>,
}
#[derive(Debug, Clone)]
enum ScriptItem {
Bytes(Bytes),
}
impl ScriptedBytesTransport {
fn new(packets: Vec<Bytes>) -> Self {
let queue = packets
.into_iter()
.map(ScriptItem::Bytes)
.collect::<VecDeque<_>>();
Self {
queue: Arc::new(AsyncMutex::new(queue)),
}
}
}
#[async_trait]
impl RtspTransport for ScriptedBytesTransport {
async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> {
Ok(())
}
async fn close(&mut self) {}
async fn next_packet(&mut self) -> Result<RtspPacket, StreamError> {
loop {
let item = {
let mut q = self.queue.lock().await;
q.pop_front()
};
match item {
Some(ScriptItem::Bytes(b)) => {
return Ok(RtspPacket {
timestamp_rtp: 0,
payload: b,
});
}
None => {
// Park forever; the lifecycle loop's shutdown
// watch breaks us out via select!.
std::future::pending::<()>().await;
}
}
}
}
}
fn fast_backoff() -> BackoffPolicy {
BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40))
}
/// AC-1 + AC-4 — a software-decoded synthetic stream must preserve
/// at least 95 % of input frames and stamp them with strictly
/// monotonic capture timestamps + sequence numbers. The dev/CI host
/// has no CUDA so backend MUST report `Software`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac1_ac4_software_decode_preserves_throughput_and_monotonicity() {
// Arrange — encode 60 frames (2 s of 30 fps content). The AC's
// literal 1080p / 10 s budget is validated against the real
// camera at deploy; the dev test exercises the same code path
// at smaller scale to keep CI <5 s.
let width = 320u32;
let height = 240u32;
let input_frames = 60usize;
let stream = synth_h264_stream(input_frames, width, height);
assert!(
stream.len() >= input_frames - 5,
"encoder produced {} packets for {input_frames} frames; expected ~1:1",
stream.len()
);
let transport = ScriptedBytesTransport::new(stream);
let decoder =
FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open on this host");
let ingest = FrameIngest::with_backoff(input_frames + 16, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
let mut received = Vec::with_capacity(input_frames);
let deadline = Duration::from_secs(10);
let start = tokio::time::Instant::now();
while received.len() < input_frames && start.elapsed() < deadline {
match timeout(Duration::from_millis(500), frames.recv()).await {
Ok(Ok(f)) => received.push(f),
Ok(Err(_)) => break,
Err(_) => {
if handle.frames_decoded_total() as usize == received.len() {
// No more frames are coming — the encoder may
// have produced fewer access units than input
// frames (rare with `tune=zerolatency` but
// possible). Stop waiting.
break;
}
}
}
}
handle.shutdown();
let _ = timeout(Duration::from_secs(2), task).await;
// Assert — backend selection (AC-2 negative direction): CUDA-less
// host MUST select Software.
assert_eq!(
handle.decoder_backend(),
Some(DecoderBackend::Software),
"host without h264_cuvid must fall back to Software"
);
// AC-1 — at least 95 % of input frames decoded.
let kept = received.len();
let min_required = (input_frames as f64 * 0.95).ceil() as usize;
assert!(
kept >= min_required,
"decoded {kept} frames; AC-1 requires ≥{min_required} of {input_frames} ({}%)",
(kept * 100) / input_frames
);
// AC-1 + AC-4 — sequence numbers strictly monotonic.
for w in received.windows(2) {
assert!(
w[0].seq < w[1].seq,
"seq must strictly increase: {} → {}",
w[0].seq,
w[1].seq
);
}
// AC-4 — capture timestamps strictly monotonic.
for w in received.windows(2) {
assert!(
w[0].capture_ts_monotonic_ns < w[1].capture_ts_monotonic_ns,
"capture_ts must strictly increase: {} → {}",
w[0].capture_ts_monotonic_ns,
w[1].capture_ts_monotonic_ns
);
}
// Decode timestamps must be at-or-after capture timestamps for
// every frame (decode happens after packet receipt by
// construction).
for f in &received {
assert!(
f.decode_ts_monotonic_ns >= f.capture_ts_monotonic_ns,
"decode_ts {} must be ≥ capture_ts {}",
f.decode_ts_monotonic_ns,
f.capture_ts_monotonic_ns
);
}
// First-frame cold-start metric was recorded.
assert!(
handle.decode_ms_first_frame().is_some(),
"decode_ms_first_frame must be populated after the first decode"
);
assert!(handle.decode_ms_p50().is_some(), "p50 must be populated");
assert!(handle.decode_ms_p99().is_some(), "p99 must be populated");
}
/// AC-2 (positive direction) — on a CUDA-capable host, the decoder
/// MUST select `DecoderBackend::Nvdec`. This test cannot run on the
/// Mac/Linux dev box (no CUDA-enabled FFmpeg), so it is `#[ignore]`d
/// by default and explicitly opt-in via `cargo test -- --ignored`
/// on a Jetson Orin Nano with the FFmpeg-cuda packages installed.
/// The negative direction (no CUDA → Software) is asserted both in
/// `internal::decoder::tests::ffmpeg_decoder_falls_back_to_software_on_macos_dev_host`
/// and in `ac1_ac4_software_decode_preserves_throughput_and_monotonicity`
/// above; together they pin the selection rule from both sides.
#[tokio::test]
#[ignore = "AC-2 positive: requires a CUDA-capable FFmpeg (h264_cuvid registered) — only runs on Jetson"]
async fn ac2_nvdec_backend_selected_on_cuda_host() {
// Arrange + Act
let dec = FfmpegDecoder::new(Codec::H264).expect("h264 decoder must open on Jetson");
// Assert
assert_eq!(
dec.backend(),
DecoderBackend::Nvdec,
"Jetson Orin Nano with CUDA-enabled FFmpeg MUST select NVDEC"
);
}
/// AC-3 — a corrupted packet between valid ones must be counted as
/// `decode_errors_total += 1` and the stream must keep producing
/// frames after it.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ac3_corrupted_frame_is_counted_and_does_not_abort_stream() {
// Arrange — generate two synthetic streams, one for "before" and
// one for "after"; splice a garbage packet between them.
let width = 320u32;
let height = 240u32;
let mut script: Vec<Bytes> = synth_h264_stream(20, width, height);
let after = synth_h264_stream(20, width, height);
let pre_count = script.len();
// Corrupted packet: random bytes that are not a valid NAL unit.
// The decoder rejects them via `send_packet` (Annex-B start code
// missing) or `receive_frame` (parsed as an unsupported NAL
// type), either way returning an error from
// `FfmpegDecoder::decode`.
let garbage = Bytes::from_static(&[
0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0x12, 0x34, 0x56, 0x78,
]);
script.push(garbage);
script.extend(after);
let total_packets = script.len();
let transport = ScriptedBytesTransport::new(script);
let decoder = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open");
let ingest = FrameIngest::with_backoff(total_packets + 16, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act — drain frames until either we've collected enough to know
// post-error frames landed, or we time out.
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
let mut received_seqs: Vec<u64> = Vec::new();
let deadline = Duration::from_secs(10);
let start = tokio::time::Instant::now();
let target_frames = (pre_count + 5).min(35); // pre + a few post
while received_seqs.len() < target_frames && start.elapsed() < deadline {
match timeout(Duration::from_millis(500), frames.recv()).await {
Ok(Ok(f)) => received_seqs.push(f.seq),
Ok(Err(_)) => break,
Err(_) => {
if handle.decode_errors_total() == 0 && handle.frames_decoded_total() == 0 {
continue;
}
if (handle.frames_decoded_total() as usize) == received_seqs.len() {
break;
}
}
}
}
handle.shutdown();
let _ = timeout(Duration::from_secs(2), task).await;
// Assert — exactly one decode error (the garbage packet); valid
// frames continued to land afterwards.
assert_eq!(
handle.decode_errors_total(),
1,
"one corrupted packet must produce exactly one decode error"
);
assert!(
received_seqs.len() >= pre_count,
"must receive at least the pre-error frames ({pre_count}); got {}",
received_seqs.len()
);
// Frames sequence is monotonic across the corrupted packet.
for w in received_seqs.windows(2) {
assert!(
w[0] < w[1],
"seq must remain strictly monotonic across decode errors: {} → {}",
w[0],
w[1]
);
}
}
+52 -7
View File
@@ -17,9 +17,34 @@ use tokio::sync::mpsc;
use tokio::time::{timeout, Instant}; use tokio::time::{timeout, Instant};
use frame_ingest::{ use frame_ingest::{
BackoffPolicy, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, BackoffPolicy, DecodeError, DecodedPixels, DecoderBackend, FrameDecoder, FrameIngest,
SessionState, StreamError, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, SessionState, StreamError,
}; };
use shared::models::frame::PixelFormat;
/// Test-only decoder that pushes one synthetic `DecodedPixels` per
/// call. Used by the AZ-657 lifecycle tests, which verify FSM /
/// reconnect / AI-lock semantics — they don't care what pixels the
/// decoder produced. The production decoder path is exercised
/// separately by `decoder_pipeline.rs` (AZ-658).
struct StubDecoder;
impl FrameDecoder for StubDecoder {
fn backend(&self) -> DecoderBackend {
DecoderBackend::Software
}
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError> {
out.push(DecodedPixels {
pixels: Bytes::copy_from_slice(payload),
width: 320,
height: 240,
pix_fmt: PixelFormat::Nv12,
decode_duration: Duration::from_micros(100),
});
Ok(())
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum Scripted { enum Scripted {
@@ -158,7 +183,11 @@ async fn ac1_open_succeeds_and_session_reaches_streaming() {
let mut frames = handle.subscribe(); let mut frames = handle.subscribe();
// Act // Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); let task = ingest.run(
transport,
StubDecoder,
RtspSessionConfig::new("rtsp://fake/0"),
);
let first = timeout(Duration::from_secs(1), frames.recv()) let first = timeout(Duration::from_secs(1), frames.recv())
.await .await
.expect("frame within 1 s") .expect("frame within 1 s")
@@ -197,7 +226,11 @@ async fn ac2_bounded_reconnect_recovers_after_transient_failure() {
let started = Instant::now(); let started = Instant::now();
// Act // Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); let task = ingest.run(
transport,
StubDecoder,
RtspSessionConfig::new("rtsp://fake/0"),
);
let _ = timeout(Duration::from_secs(2), frames.recv()) let _ = timeout(Duration::from_secs(2), frames.recv())
.await .await
.expect("frame within 2 s") .expect("frame within 2 s")
@@ -233,7 +266,11 @@ async fn ac2b_stream_drop_increments_reopens_total() {
let mut frames = handle.subscribe(); let mut frames = handle.subscribe();
// Act // Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); let task = ingest.run(
transport,
StubDecoder,
RtspSessionConfig::new("rtsp://fake/0"),
);
let _ = timeout(Duration::from_secs(1), frames.recv()) let _ = timeout(Duration::from_secs(1), frames.recv())
.await .await
.expect("first frame") .expect("first frame")
@@ -268,7 +305,11 @@ async fn ac3_unsupported_profile_hard_fails_session() {
let handle = ingest.handle(); let handle = ingest.handle();
// Act // Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); let task = ingest.run(
transport,
StubDecoder,
RtspSessionConfig::new("rtsp://fake/0"),
);
let _ = timeout(Duration::from_secs(1), task) let _ = timeout(Duration::from_secs(1), task)
.await .await
.expect("lifecycle loop exits on hard-fail"); .expect("lifecycle loop exits on hard-fail");
@@ -295,7 +336,11 @@ async fn ac4_ai_lock_toggle_propagates_to_frames() {
let mut frames = handle.subscribe(); let mut frames = handle.subscribe();
// Act // Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); let task = ingest.run(
transport,
StubDecoder,
RtspSessionConfig::new("rtsp://fake/0"),
);
let f1 = timeout(Duration::from_secs(1), frames.recv()) let f1 = timeout(Duration::from_secs(1), frames.recv())
.await .await
.expect("first frame") .expect("first frame")