mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 17:41:10 +00:00
[AZ-658] frame_ingest H.264/265 decoder (NVDEC + sw fallback)
Wires a real ffmpeg-next 8.1 decoder into the frame_ingest lifecycle loop. NVDEC is probed at runtime via h264_cuvid / hevc_cuvid; CUDA-less hosts transparently fall back to software h264 / hevc. Each decoded frame is stamped with capture_ts (taken at packet receipt) and decode_ts (taken after decode returns) so movement_detector sees accurate frame-arrival times. Single-frame decode errors are counted toward decode_errors_total and dropped; the stream is never aborted. Adds new public API on FrameIngestHandle: decoder_backend(), decode_errors_total(), frames_decoded_total(), decode_ms_first_frame(), decode_ms_p50(), decode_ms_p99(). Integration tests under crates/frame_ingest/tests/decoder_pipeline.rs cover AC-1, AC-3, AC-4 end-to-end through the real FfmpegDecoder using libx264-encoded synthetic streams; AC-2 positive (NVDEC selection) is opt-in via --ignored on a CUDA host. AZ-657 lifecycle tests retained via a StubDecoder. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Generated
+121
-3
@@ -273,6 +273,24 @@ version = "0.22.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
|
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bindgen"
|
||||||
|
version = "0.72.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.11.1",
|
||||||
|
"cexpr",
|
||||||
|
"clang-sys",
|
||||||
|
"itertools 0.13.0",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"regex",
|
||||||
|
"rustc-hash",
|
||||||
|
"shlex",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bit-set"
|
name = "bit-set"
|
||||||
version = "0.5.3"
|
version = "0.5.3"
|
||||||
@@ -337,6 +355,15 @@ dependencies = [
|
|||||||
"shlex",
|
"shlex",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cexpr"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||||
|
dependencies = [
|
||||||
|
"nom 7.1.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cfg-if"
|
name = "cfg-if"
|
||||||
version = "1.0.4"
|
version = "1.0.4"
|
||||||
@@ -361,6 +388,17 @@ dependencies = [
|
|||||||
"windows-link",
|
"windows-link",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "clang-sys"
|
||||||
|
version = "1.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
|
||||||
|
dependencies = [
|
||||||
|
"glob",
|
||||||
|
"libc",
|
||||||
|
"libloading",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "4.6.1"
|
version = "4.6.1"
|
||||||
@@ -591,6 +629,31 @@ version = "2.4.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
|
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ffmpeg-next"
|
||||||
|
version = "8.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f7c4bd5ab1ac61f29c634df1175d350ded29cf74c3c6d4f7030431a5ae3c7d5d"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.11.1",
|
||||||
|
"ffmpeg-sys-next",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ffmpeg-sys-next"
|
||||||
|
version = "8.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a314bc0e022a33a99567ed4bd2576bd58ffd8fcff7891c29194cfecc26a62547"
|
||||||
|
dependencies = [
|
||||||
|
"bindgen",
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
"num_cpus",
|
||||||
|
"pkg-config",
|
||||||
|
"vcpkg",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "find-msvc-tools"
|
name = "find-msvc-tools"
|
||||||
version = "0.1.9"
|
version = "0.1.9"
|
||||||
@@ -656,6 +719,8 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"ffmpeg-next",
|
||||||
|
"parking_lot",
|
||||||
"serde",
|
"serde",
|
||||||
"shared",
|
"shared",
|
||||||
"thiserror 1.0.69",
|
"thiserror 1.0.69",
|
||||||
@@ -813,6 +878,12 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "glob"
|
||||||
|
version = "0.3.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "h2"
|
name = "h2"
|
||||||
version = "0.4.14"
|
version = "0.4.14"
|
||||||
@@ -1179,7 +1250,16 @@ version = "0.6.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46"
|
checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"nom",
|
"nom 8.0.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.13.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1255,6 +1335,16 @@ version = "0.2.186"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
|
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libloading"
|
||||||
|
version = "0.8.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"windows-link",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libm"
|
name = "libm"
|
||||||
version = "0.2.16"
|
version = "0.2.16"
|
||||||
@@ -1368,6 +1458,12 @@ version = "0.3.17"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "minimal-lexical"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "miniz_oxide"
|
name = "miniz_oxide"
|
||||||
version = "0.8.9"
|
version = "0.8.9"
|
||||||
@@ -1477,6 +1573,16 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nom"
|
||||||
|
version = "7.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"minimal-lexical",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nom"
|
name = "nom"
|
||||||
version = "8.0.0"
|
version = "8.0.0"
|
||||||
@@ -1687,6 +1793,12 @@ version = "0.2.17"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
|
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pkg-config"
|
||||||
|
version = "0.3.33"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "potential_utf"
|
name = "potential_utf"
|
||||||
version = "0.1.5"
|
version = "0.1.5"
|
||||||
@@ -1747,7 +1859,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
|
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"heck",
|
"heck",
|
||||||
"itertools",
|
"itertools 0.14.0",
|
||||||
"log",
|
"log",
|
||||||
"multimap",
|
"multimap",
|
||||||
"petgraph",
|
"petgraph",
|
||||||
@@ -1768,7 +1880,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
|
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"itertools",
|
"itertools 0.14.0",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn",
|
"syn",
|
||||||
@@ -2948,6 +3060,12 @@ version = "0.1.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "vcpkg"
|
||||||
|
version = "0.2.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.5"
|
version = "0.9.5"
|
||||||
|
|||||||
@@ -87,6 +87,11 @@ libc = "0.2"
|
|||||||
# Geospatial
|
# Geospatial
|
||||||
h3o = "0.7"
|
h3o = "0.7"
|
||||||
|
|
||||||
|
# Multimedia (RTSP + H.264/265 decode for frame_ingest — see AZ-658).
|
||||||
|
# Linked dynamically against the host FFmpeg 8.x install (libavcodec /
|
||||||
|
# libavformat / libavutil / libswscale / libswresample) via pkg-config.
|
||||||
|
ffmpeg-next = "8.1"
|
||||||
|
|
||||||
# Test scaffolding
|
# Test scaffolding
|
||||||
wiremock = "0.6"
|
wiremock = "0.6"
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
|||||||
@@ -0,0 +1,91 @@
|
|||||||
|
# Batch Report
|
||||||
|
|
||||||
|
**Batch**: 16
|
||||||
|
**Cycle**: 1
|
||||||
|
**Tasks**: AZ-658
|
||||||
|
**Date**: 2026-05-20
|
||||||
|
|
||||||
|
## Task Results
|
||||||
|
|
||||||
|
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|
||||||
|
|------|--------|---------------|-------|-------------|--------|
|
||||||
|
| AZ-658_frame_ingest_decoder | Done | 7 files | 24 passed, 1 ignored | 4/4 ACs covered | None |
|
||||||
|
|
||||||
|
## AC Coverage map
|
||||||
|
|
||||||
|
| AC | Test | File | Notes |
|
||||||
|
|----|------|------|-------|
|
||||||
|
| AC-1 software decode + ≥285/300 throughput + monotonic seq + `decoder_backend = "Software"` | `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | `crates/frame_ingest/tests/decoder_pipeline.rs` | 60-frame variant exercises the same software decode path; literal 1080p/10s NFR validated at deploy on Jetson per `description.md §8` |
|
||||||
|
| AC-2 NVDEC selected on Jetson | `ac2_nvdec_backend_selected_on_cuda_host` (`#[ignore]` — opt-in via `--ignored` on CUDA host) | same file | Negative direction (no CUDA → Software) covered both by the unit test `ffmpeg_decoder_falls_back_to_software_on_macos_dev_host` and by the AC-1 test; together they pin the selection rule from both sides |
|
||||||
|
| AC-3 single-frame error doesn't abort | `ac3_corrupted_frame_is_counted_and_does_not_abort_stream` | same file | Asserts `decode_errors_total == 1` after one garbage packet between valid streams; subsequent frames continue to land with strictly monotonic seq |
|
||||||
|
| AC-4 monotonic capture timestamps | rides on `ac1_ac4_software_decode_preserves_throughput_and_monotonicity` | same file | Asserts `capture_ts_monotonic_ns` strictly increases and `decode_ts ≥ capture_ts` for every frame |
|
||||||
|
|
||||||
|
## AC Test Coverage: All covered (4/4 — AC-2 positive direction is `#[ignore]`d behind the Jetson prerequisite, which counts as covered per implement skill Step 8)
|
||||||
|
## Code Review Verdict: PASS_WITH_WARNINGS (self-review — see findings below)
|
||||||
|
## Auto-Fix Attempts: 0 (no findings escalated to auto-fix)
|
||||||
|
## Stuck Agents: None
|
||||||
|
|
||||||
|
## Files modified
|
||||||
|
|
||||||
|
```
|
||||||
|
M Cargo.toml (workspace dep: ffmpeg-next = "8.1")
|
||||||
|
M crates/frame_ingest/Cargo.toml (deps: ffmpeg-next, parking_lot)
|
||||||
|
A crates/frame_ingest/src/internal/decoder.rs (NEW: trait + FfmpegDecoder + DecodeStats)
|
||||||
|
A crates/frame_ingest/src/internal/timestamp.rs (NEW: SeqCounter + FrameStamper)
|
||||||
|
M crates/frame_ingest/src/internal/mod.rs (+decoder, +timestamp modules)
|
||||||
|
M crates/frame_ingest/src/lib.rs (lifecycle loop now wires the decoder; new health/metric accessors)
|
||||||
|
A crates/frame_ingest/tests/decoder_pipeline.rs (NEW: AC-1, AC-2 ignored, AC-3, AC-4)
|
||||||
|
M crates/frame_ingest/tests/rtsp_lifecycle.rs (StubDecoder for AZ-657 lifecycle tests)
|
||||||
|
R _docs/02_tasks/todo/AZ-658_frame_ingest_decoder.md → _docs/02_tasks/done/...
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notable design decisions
|
||||||
|
|
||||||
|
1. **FFmpeg stack** — user picked `ffmpeg-next 8.1` (workspace-pinned to FFmpeg 8.1 already on the host). NVDEC is probed at runtime via `ffmpeg::codec::decoder::find_by_name("h264_cuvid")` / `"hevc_cuvid"`; on a CUDA-less host we transparently fall back to the software `h264` / `hevc` decoder. No feature flag — both code paths are always compiled.
|
||||||
|
2. **NV12 normalisation** — the decoder always emits NV12 (the canonical pixel format for downstream consumers per `description.md §3` and what NVDEC produces natively on Jetson). A reusable `sws_scale` context converts whatever the inner decoder returned (typically YUV420P from libx264 software, NV12 from NVDEC). Non-Send `SwsContext` is wrapped with `unsafe impl Send for FfmpegDecoder` — the safety justification (exclusive ownership by the spawned lifecycle task) is documented in `decoder.rs`.
|
||||||
|
3. **Stats** — `DecodeStats` is a lock-free counter set with a 1024-sample ring buffer behind `parking_lot::Mutex` for p50/p99 readout. Cold-start metric (`decode_ms_first_frame`) is recorded only on the first successful decode per session; subsequent calls are no-ops.
|
||||||
|
4. **Trait shape** — `FrameDecoder::decode(payload, out: &mut Vec<DecodedPixels>)` instead of `Result<Frame>` because FFmpeg may buffer encoded packets internally before producing any decoded frames (e.g. while assembling SPS/PPS for the first IDR). Zero, one, or many frames per call.
|
||||||
|
5. **Timestamp boundary** — capture timestamp + sequence number are taken **before** the decoder runs (the moment the lifecycle loop pulls the packet off the transport). `decode_ts_monotonic_ns` is read after the decoder returns. This matches `description.md §4` and gives `movement_detector` accurate frame-arrival timestamps for the telemetry-skew gate.
|
||||||
|
|
||||||
|
## Self-review findings
|
||||||
|
|
||||||
|
| # | Severity | Category | Location | Finding | Disposition |
|
||||||
|
|---|----------|----------|----------|---------|-------------|
|
||||||
|
| 1 | Low | Maintainability | `decoder.rs::is_eagain` | Detects EAGAIN by string-matching `Error` Display output rather than a typed errno. Reason: `ffmpeg-next` does not re-export the EAGAIN constant across its 4–8 versions in a stable shape. | Accepted as a small surface area (only used inside the decode loop); will be tightened when FFmpeg 9 changes the error variants. |
|
||||||
|
| 2 | Low | Architecture | `crates/autopilot/src/runtime.rs:84` | Pre-existing dead-code warning on `vlm_provider_name` — leftover entry exists. | Out of batch 16 scope (different component); leftover stays for the next batch that touches autopilot. |
|
||||||
|
| 3 | Info | Spec gap (out of scope) | `crates/frame_ingest/src/internal/rtsp_client.rs:5-12` | The AZ-657 author's docstring says "the full RTSP client is folded into AZ-658 alongside the decoder". The AZ-658 task spec **explicitly excludes** RTSP lifecycle ("Excluded: RTSP session lifecycle (task 18)"). The real production RTSP `RtspTransport` impl is therefore still TBD — it will be a separate follow-up task or wired during runtime composition. | Not a regression; not in AZ-658 scope. The Product Implementation Completeness Gate (Step 15) will surface this if the system needs it before final reporting. |
|
||||||
|
|
||||||
|
## Test results
|
||||||
|
|
||||||
|
```
|
||||||
|
running 17 tests (frame_ingest unit + lib tests)
|
||||||
|
test result: ok. 17 passed; 0 failed; 0 ignored
|
||||||
|
|
||||||
|
running 3 tests (tests/decoder_pipeline.rs)
|
||||||
|
test ac3_corrupted_frame_is_counted_and_does_not_abort_stream ... ok
|
||||||
|
test ac1_ac4_software_decode_preserves_throughput_and_monotonicity ... ok
|
||||||
|
test ac2_nvdec_backend_selected_on_cuda_host ... ignored, AC-2 positive: requires a CUDA-capable FFmpeg
|
||||||
|
test result: ok. 2 passed; 0 failed; 1 ignored
|
||||||
|
|
||||||
|
running 5 tests (tests/rtsp_lifecycle.rs)
|
||||||
|
test result: ok. 5 passed; 0 failed; 0 ignored
|
||||||
|
```
|
||||||
|
|
||||||
|
## Quality gates
|
||||||
|
|
||||||
|
- `cargo check --workspace --all-targets` → clean (only the documented pre-existing autopilot dead-code warning)
|
||||||
|
- `cargo clippy -p frame_ingest --all-targets -- -D warnings` → clean
|
||||||
|
- `cargo fmt -p frame_ingest --check` → clean
|
||||||
|
|
||||||
|
## Next Batch
|
||||||
|
|
||||||
|
Batch 17 candidates (ready by deps):
|
||||||
|
- AZ-680 `operator_bridge_command_dispatch` (3 pts)
|
||||||
|
- AZ-681 `operator_bridge_safety_and_bit_ack` (3 pts)
|
||||||
|
- AZ-659 `frame_ingest_publisher` (3 pts) — newly unblocked because AZ-658 is now in `done/`
|
||||||
|
|
||||||
|
Suggested grouping: AZ-680 + AZ-681 (tightly coupled — both depend on AZ-678 operator_bridge command auth). AZ-659 fits a separate batch focused on the frame_ingest pipeline's tail.
|
||||||
|
|
||||||
|
## Cumulative review cadence
|
||||||
|
|
||||||
|
Last cumulative: batches 13–15 (`cumulative_review_batches_13-15_cycle1_report.md`). Next due: end of batch 18 (no cumulative review for batch 16).
|
||||||
+13
-13
@@ -4,27 +4,27 @@
|
|||||||
flow: greenfield
|
flow: greenfield
|
||||||
step: 7
|
step: 7
|
||||||
name: Implement
|
name: Implement
|
||||||
status: between-batches
|
status: in_progress
|
||||||
sub_step:
|
sub_step:
|
||||||
phase: 0
|
phase: 11
|
||||||
name: batch-16-select
|
name: commit
|
||||||
detail: ""
|
detail: "batch 16 — AZ-658 awaiting commit + push approval"
|
||||||
retry_count: 0
|
retry_count: 0
|
||||||
cycle: 1
|
cycle: 1
|
||||||
tracker: jira
|
tracker: jira
|
||||||
|
|
||||||
## Last Completed Batch
|
## Last Completed Batch
|
||||||
batch: 15
|
batch: 16
|
||||||
commit: ccf929a
|
commit: pending
|
||||||
ticket: AZ-676 / AZ-677 / AZ-678 / AZ-679
|
ticket: AZ-658
|
||||||
jira_status: In Testing (all 4 confirmed via read-back)
|
jira_status: In Progress (transition to In Testing pending commit)
|
||||||
pushed_to: origin/dev
|
pushed_to: pending
|
||||||
report: _docs/03_implementation/batch_15_cycle1_report.md
|
report: _docs/03_implementation/batch_16_cycle1_report.md
|
||||||
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
|
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
|
||||||
|
|
||||||
## Process Leftovers
|
## Process Leftovers
|
||||||
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — C5 replay
|
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — out-of-scope for batch 16
|
||||||
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — C6 fix recipe
|
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — out-of-scope for batch 16
|
||||||
|
|
||||||
## Cumulative Review Cadence
|
## Cumulative Review Cadence
|
||||||
Last cumulative: batches 13–15 (just produced). Next due: end of batch 18.
|
Last cumulative: batches 13–15. Next due: end of batch 18.
|
||||||
|
|||||||
@@ -15,6 +15,12 @@ async-trait = { workspace = true }
|
|||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
bytes = { workspace = true }
|
bytes = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
parking_lot = { workspace = true }
|
||||||
|
# AZ-658: H.264/265 decode via FFmpeg (libavcodec). NVDEC support is
|
||||||
|
# probed at runtime by looking up `h264_cuvid` / `hevc_cuvid` through
|
||||||
|
# `ffmpeg::codec::decoder::find_by_name`; no separate feature flag is
|
||||||
|
# required.
|
||||||
|
ffmpeg-next = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { workspace = true, features = ["test-util"] }
|
tokio = { workspace = true, features = ["test-util"] }
|
||||||
|
|||||||
@@ -0,0 +1,610 @@
|
|||||||
|
//! AZ-658 — H.264/265 decoder with NVDEC primary + software fallback.
|
||||||
|
//!
|
||||||
|
//! This module owns the production decode path required by the task:
|
||||||
|
//! **real NVDEC binding when present, real software fallback always**.
|
||||||
|
//! Both code paths exist as production code (per task spec → Runtime
|
||||||
|
//! Completeness); the runtime selection between them is a startup
|
||||||
|
//! probe of FFmpeg's decoder registry, not a feature flag.
|
||||||
|
//!
|
||||||
|
//! ## Design
|
||||||
|
//!
|
||||||
|
//! The lifecycle loop in [`crate::lib::lifecycle_loop`] receives raw
|
||||||
|
//! RTSP payload bytes from the transport. Those bytes are:
|
||||||
|
//!
|
||||||
|
//! 1. NAL units in Annex-B format (start-code prefixed `00 00 00 01`)
|
||||||
|
//! when the transport is the production FFmpeg avformat-backed
|
||||||
|
//! client (avformat hands access-unit-aligned packets in Annex-B
|
||||||
|
//! by default for RTSP); or
|
||||||
|
//! 2. Whatever bytes a test transport pushes (the AZ-658 integration
|
||||||
|
//! test feeds a synthetic H.264 stream produced in-process).
|
||||||
|
//!
|
||||||
|
//! Either way the bytes are funnelled into [`FrameDecoder::decode`].
|
||||||
|
//! Each call may produce **zero or more** decoded frames (the FFmpeg
|
||||||
|
//! API can buffer encoded packets internally before any decoded
|
||||||
|
//! frame is ready, e.g. while the SPS/PPS for the first IDR are
|
||||||
|
//! still being assembled), so the trait pushes results into an
|
||||||
|
//! out-buffer instead of returning a single `Result<Frame, _>`.
|
||||||
|
//!
|
||||||
|
//! ## Backend selection
|
||||||
|
//!
|
||||||
|
//! Construction tries the NVDEC variants first. On a Jetson Orin
|
||||||
|
//! Nano with the FFmpeg-cuda packages installed, `find_by_name`
|
||||||
|
//! resolves `h264_cuvid` / `hevc_cuvid` and the decoder opens with
|
||||||
|
//! [`DecoderBackend::Nvdec`]. On a pure-CPU host (CI, this Mac dev
|
||||||
|
//! box) those names resolve to `None` and we fall back to the
|
||||||
|
//! software `h264` / `hevc` decoders → [`DecoderBackend::Software`].
|
||||||
|
//! There is no manual override; deployments that want NVDEC must
|
||||||
|
//! ship a CUDA-capable FFmpeg.
|
||||||
|
//!
|
||||||
|
//! ## Stats
|
||||||
|
//!
|
||||||
|
//! `description.md §3` mandates `decode_ms_p50`, `decode_ms_p99`,
|
||||||
|
//! `decoder_backend`, `decode_errors_total`, plus a one-shot cold
|
||||||
|
//! start metric (`decode_ms_first_frame`). The lock-free
|
||||||
|
//! [`DecodeStats`] counter set is updated by the lifecycle loop; the
|
||||||
|
//! handle re-reads it on every `health()` call.
|
||||||
|
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use ffmpeg_next as ffmpeg;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use shared::models::frame::PixelFormat;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Codec the lifecycle loop is decoding. Picked at session open from
|
||||||
|
/// the camera config (`RtspSessionConfig` carries the negotiated codec
|
||||||
|
/// once the production transport lands; for now the only consumer is
|
||||||
|
/// AZ-658 tests that always pass `Codec::H264`).
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum Codec {
|
||||||
|
H264,
|
||||||
|
Hevc,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Codec {
|
||||||
|
fn nvdec_name(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Codec::H264 => "h264_cuvid",
|
||||||
|
Codec::Hevc => "hevc_cuvid",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn software_name(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Codec::H264 => "h264",
|
||||||
|
Codec::Hevc => "hevc",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Which backend was selected at construction. Surfaced through
|
||||||
|
/// `FrameIngestHandle::decoder_backend()` so the operator UI and AC-2
|
||||||
|
/// can verify the selection rule from outside the crate.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum DecoderBackend {
|
||||||
|
Nvdec,
|
||||||
|
Software,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Errors emitted by [`FrameDecoder::decode`]. The lifecycle loop
|
||||||
|
/// counts every variant towards `decode_errors_total` and continues
|
||||||
|
/// — single-frame decode errors must never abort the stream
|
||||||
|
/// (`description.md §6`, AC-3).
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum DecodeError {
|
||||||
|
#[error("send_packet failed: {0}")]
|
||||||
|
SendPacket(ffmpeg::Error),
|
||||||
|
#[error("receive_frame failed: {0}")]
|
||||||
|
ReceiveFrame(ffmpeg::Error),
|
||||||
|
#[error("unsupported decoded pixel format: {0:?}")]
|
||||||
|
UnsupportedPixelFormat(ffmpeg::format::Pixel),
|
||||||
|
#[error("decoded frame had zero dimensions")]
|
||||||
|
EmptyFrame,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Errors emitted at decoder-construction time. The lifecycle loop
|
||||||
|
/// treats this as a hard-fail — a session whose codec we cannot open
|
||||||
|
/// at all is operationally identical to `OpenError::UnsupportedProfile`
|
||||||
|
/// and the FSM lands in `Failing { attempt: u32::MAX }`.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum DecoderInitError {
|
||||||
|
#[error("FFmpeg init failed: {0}")]
|
||||||
|
FfmpegInit(ffmpeg::Error),
|
||||||
|
#[error("no FFmpeg decoder registered for {codec:?}")]
|
||||||
|
NoDecoderRegistered { codec: Codec },
|
||||||
|
#[error("FFmpeg decoder open failed: {0}")]
|
||||||
|
OpenFailed(ffmpeg::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One decoded frame's worth of pixel data + its observed dimensions.
|
||||||
|
/// The lifecycle loop wraps this into a `shared::models::frame::Frame`
|
||||||
|
/// alongside the capture/decode timestamps from
|
||||||
|
/// [`crate::internal::timestamp::FrameStamper`].
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DecodedPixels {
|
||||||
|
pub pixels: Bytes,
|
||||||
|
pub width: u32,
|
||||||
|
pub height: u32,
|
||||||
|
pub pix_fmt: PixelFormat,
|
||||||
|
/// Decode latency for THIS frame (decoder-internal, measured
|
||||||
|
/// across `send_packet + receive_frame`). Used by the stats
|
||||||
|
/// histogram; the lifecycle still computes its own
|
||||||
|
/// "capture → publish" latency separately for the §8 NFR.
|
||||||
|
pub decode_duration: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait implemented by both the production [`FfmpegDecoder`] and
|
||||||
|
/// any test stub. The lifecycle loop holds it as
|
||||||
|
/// `Box<dyn FrameDecoder + Send>`.
|
||||||
|
///
|
||||||
|
/// Object-safe by construction: no generics, no `Self` returns.
|
||||||
|
pub trait FrameDecoder: Send {
|
||||||
|
fn backend(&self) -> DecoderBackend;
|
||||||
|
|
||||||
|
/// Feed encoded bytes into the decoder. May produce zero or more
|
||||||
|
/// decoded frames (the FFmpeg API can hold a packet internally
|
||||||
|
/// while waiting for SPS/PPS or B-frame reorder buffers).
|
||||||
|
/// Decoded frames are pushed into `out`; the call returns
|
||||||
|
/// `Ok(())` when every frame the decoder could produce from
|
||||||
|
/// these bytes has been pushed.
|
||||||
|
///
|
||||||
|
/// On error, `out` may be partially populated — frames pushed
|
||||||
|
/// before the error are still valid; the caller must drop the
|
||||||
|
/// failing packet but keep the decoder for the next call.
|
||||||
|
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// FFmpeg-backed decoder. Holds the open `decoder::Video`, a sws
|
||||||
|
/// scaler that converts whatever pixel format the decoder produces
|
||||||
|
/// into NV12 (the canonical pixel format for downstream consumers),
|
||||||
|
/// and reusable scratch frames so each `decode` call avoids
|
||||||
|
/// allocation in the hot path.
|
||||||
|
pub struct FfmpegDecoder {
|
||||||
|
decoder: ffmpeg::decoder::Video,
|
||||||
|
backend: DecoderBackend,
|
||||||
|
/// Lazily constructed once we observe the decoder's output pixel
|
||||||
|
/// format on the first decoded frame. NV12 is the sentinel target
|
||||||
|
/// because Jetson NVDEC outputs NV12 natively and the operator
|
||||||
|
/// stream encoder expects NV12 (`description.md §3`).
|
||||||
|
scaler: Option<ffmpeg::software::scaling::Context>,
|
||||||
|
raw: ffmpeg::frame::Video,
|
||||||
|
converted: ffmpeg::frame::Video,
|
||||||
|
in_packet: ffmpeg::codec::packet::Packet,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FfmpegDecoder {
|
||||||
|
/// Construct a real decoder for `codec`. Tries `h264_cuvid` /
|
||||||
|
/// `hevc_cuvid` first; falls back to the software decoder if the
|
||||||
|
/// cuvid variant is not registered (no CUDA host) OR if it
|
||||||
|
/// fails to open (e.g. a CUDA-capable FFmpeg without a runtime
|
||||||
|
/// driver). On a fully missing software decoder we hard-fail.
|
||||||
|
pub fn new(codec: Codec) -> Result<Self, DecoderInitError> {
|
||||||
|
// `ffmpeg::init()` is idempotent and safe to call concurrently;
|
||||||
|
// the underlying `av_register_all` was removed in FFmpeg 4.0,
|
||||||
|
// so this just ensures the network init for RTSP is done.
|
||||||
|
ffmpeg::init().map_err(DecoderInitError::FfmpegInit)?;
|
||||||
|
|
||||||
|
let (decoder, backend) = open_with_backend(codec)?;
|
||||||
|
Ok(Self {
|
||||||
|
decoder,
|
||||||
|
backend,
|
||||||
|
scaler: None,
|
||||||
|
raw: ffmpeg::frame::Video::empty(),
|
||||||
|
converted: ffmpeg::frame::Video::empty(),
|
||||||
|
in_packet: ffmpeg::codec::packet::Packet::empty(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_scaler(
|
||||||
|
&mut self,
|
||||||
|
src_fmt: ffmpeg::format::Pixel,
|
||||||
|
width: u32,
|
||||||
|
height: u32,
|
||||||
|
) -> Result<&mut ffmpeg::software::scaling::Context, DecodeError> {
|
||||||
|
// Build / rebuild the scaler whenever the source format or
|
||||||
|
// dimensions change. NVDEC and software paths can both emit
|
||||||
|
// YUV420P or NV12 depending on the camera; we converge on
|
||||||
|
// NV12 for downstream consumers (`description.md §3`).
|
||||||
|
let needs_rebuild = match self.scaler.as_ref() {
|
||||||
|
None => true,
|
||||||
|
Some(s) => {
|
||||||
|
s.input().format != src_fmt
|
||||||
|
|| s.input().width != width
|
||||||
|
|| s.input().height != height
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if needs_rebuild {
|
||||||
|
let ctx = ffmpeg::software::scaling::Context::get(
|
||||||
|
src_fmt,
|
||||||
|
width,
|
||||||
|
height,
|
||||||
|
ffmpeg::format::Pixel::NV12,
|
||||||
|
width,
|
||||||
|
height,
|
||||||
|
ffmpeg::software::scaling::Flags::BILINEAR,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
// Scaler-build failure is reported as a per-frame
|
||||||
|
// decode error so the lifecycle counts it and drops
|
||||||
|
// the frame; if the same format keeps failing, the
|
||||||
|
// sustained `decode_errors_total` will surface
|
||||||
|
// through health.
|
||||||
|
DecodeError::ReceiveFrame(e)
|
||||||
|
})?;
|
||||||
|
self.scaler = Some(ctx);
|
||||||
|
}
|
||||||
|
Ok(self.scaler.as_mut().expect("just inserted"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_with_backend(
|
||||||
|
codec: Codec,
|
||||||
|
) -> Result<(ffmpeg::decoder::Video, DecoderBackend), DecoderInitError> {
|
||||||
|
// Try NVDEC first. `find_by_name` resolves `None` on hosts where
|
||||||
|
// the cuvid decoder is not registered (the macOS dev box, CI
|
||||||
|
// without CUDA, etc.).
|
||||||
|
if let Some(nv) = ffmpeg::codec::decoder::find_by_name(codec.nvdec_name()) {
|
||||||
|
match try_open(nv) {
|
||||||
|
Ok(d) => {
|
||||||
|
tracing::info!(
|
||||||
|
backend = "nvdec",
|
||||||
|
codec = ?codec,
|
||||||
|
"frame_ingest decoder opened with NVDEC"
|
||||||
|
);
|
||||||
|
return Ok((d, DecoderBackend::Nvdec));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
error = %e,
|
||||||
|
codec = ?codec,
|
||||||
|
"NVDEC decoder registered but failed to open; falling back to software"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let sw = ffmpeg::codec::decoder::find_by_name(codec.software_name())
|
||||||
|
.ok_or(DecoderInitError::NoDecoderRegistered { codec })?;
|
||||||
|
let opened = try_open(sw)?;
|
||||||
|
tracing::info!(
|
||||||
|
backend = "software",
|
||||||
|
codec = ?codec,
|
||||||
|
"frame_ingest decoder opened with software fallback"
|
||||||
|
);
|
||||||
|
Ok((opened, DecoderBackend::Software))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_open(codec: ffmpeg::Codec) -> Result<ffmpeg::decoder::Video, DecoderInitError> {
|
||||||
|
let ctx = ffmpeg::codec::Context::new();
|
||||||
|
let opened = ctx
|
||||||
|
.decoder()
|
||||||
|
.open_as(codec)
|
||||||
|
.map_err(DecoderInitError::OpenFailed)?;
|
||||||
|
opened.video().map_err(DecoderInitError::OpenFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY:
|
||||||
|
// `ffmpeg_next::software::scaling::Context` (sws scaler) wraps a
|
||||||
|
// `*mut SwsContext`, so the auto-trait analysis flags it `!Send`.
|
||||||
|
// FFmpeg's sws context is documented as **single-thread-owned** but
|
||||||
|
// safe to MOVE between threads as long as no two threads use the
|
||||||
|
// same instance concurrently (the same invariant Rust's `Send`
|
||||||
|
// expresses). The `FfmpegDecoder` is held inside `Box<dyn
|
||||||
|
// FrameDecoder + Send>` and is *only* ever called from the spawned
|
||||||
|
// `lifecycle_loop` tokio task, which has exclusive `&mut`. No other
|
||||||
|
// task can observe the inner pointer; the `Send` here transfers
|
||||||
|
// ownership at construction (one thread builds the decoder, the
|
||||||
|
// spawned task is the sole subsequent user) — exactly the case
|
||||||
|
// `unsafe impl Send` is intended for.
|
||||||
|
unsafe impl Send for FfmpegDecoder {}
|
||||||
|
|
||||||
|
impl FrameDecoder for FfmpegDecoder {
|
||||||
|
fn backend(&self) -> DecoderBackend {
|
||||||
|
self.backend
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError> {
|
||||||
|
let send_started = std::time::Instant::now();
|
||||||
|
// FFmpeg requires the packet's data to outlive `send_packet`,
|
||||||
|
// so we copy here. The cost is one memcpy of NAL-unit bytes
|
||||||
|
// (typically <100 KB per packet at 1080p); negligible
|
||||||
|
// compared to the decode itself.
|
||||||
|
self.in_packet = ffmpeg::codec::packet::Packet::copy(payload);
|
||||||
|
self.decoder
|
||||||
|
.send_packet(&self.in_packet)
|
||||||
|
.map_err(DecodeError::SendPacket)?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.decoder.receive_frame(&mut self.raw) {
|
||||||
|
Ok(()) => {
|
||||||
|
let decode_duration = send_started.elapsed();
|
||||||
|
let src_fmt = self.raw.format();
|
||||||
|
let w = self.raw.width();
|
||||||
|
let h = self.raw.height();
|
||||||
|
if w == 0 || h == 0 {
|
||||||
|
return Err(DecodeError::EmptyFrame);
|
||||||
|
}
|
||||||
|
self.ensure_scaler(src_fmt, w, h)?;
|
||||||
|
let scaler = self.scaler.as_mut().expect("ensure_scaler set this");
|
||||||
|
scaler
|
||||||
|
.run(&self.raw, &mut self.converted)
|
||||||
|
.map_err(DecodeError::ReceiveFrame)?;
|
||||||
|
let nv12_bytes = pack_nv12(&self.converted, w, h)?;
|
||||||
|
out.push(DecodedPixels {
|
||||||
|
pixels: nv12_bytes,
|
||||||
|
width: w,
|
||||||
|
height: h,
|
||||||
|
pix_fmt: PixelFormat::Nv12,
|
||||||
|
decode_duration,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// FFmpeg returns EAGAIN (insufficient input) and
|
||||||
|
// EOF as `Error::Other` variants; those are
|
||||||
|
// expected control flow, not failures. We treat
|
||||||
|
// any other error as a per-frame error.
|
||||||
|
if is_eagain(&e) || is_eof(&e) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
return Err(DecodeError::ReceiveFrame(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_eagain(err: &ffmpeg::Error) -> bool {
|
||||||
|
// FFmpeg's `ffmpeg-next` exposes EAGAIN as `Error::Other { errno: AVERROR(EAGAIN) }`
|
||||||
|
// — we identify it by string match because the constant isn't
|
||||||
|
// re-exported across crate versions.
|
||||||
|
let s = format!("{err}");
|
||||||
|
s.contains("Resource temporarily unavailable") || s.contains("EAGAIN")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_eof(err: &ffmpeg::Error) -> bool {
|
||||||
|
matches!(err, ffmpeg::Error::Eof)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Copy a planar NV12 frame's two planes (Y then UV) into a single
|
||||||
|
/// `Bytes` buffer of length `w*h + (w*h)/2`. Uses the frame's per-
|
||||||
|
/// plane stride (which can exceed `w` due to FFmpeg's alignment
|
||||||
|
/// padding) to avoid leaking that padding into the downstream
|
||||||
|
/// consumer-visible buffer.
|
||||||
|
fn pack_nv12(frame: &ffmpeg::frame::Video, width: u32, height: u32) -> Result<Bytes, DecodeError> {
|
||||||
|
let w = width as usize;
|
||||||
|
let h = height as usize;
|
||||||
|
let y_size = w * h;
|
||||||
|
let uv_size = (w * h) / 2;
|
||||||
|
let mut out = Vec::with_capacity(y_size + uv_size);
|
||||||
|
|
||||||
|
let y_plane = frame.data(0);
|
||||||
|
let y_stride = frame.stride(0);
|
||||||
|
if y_stride < w {
|
||||||
|
return Err(DecodeError::EmptyFrame);
|
||||||
|
}
|
||||||
|
for row in 0..h {
|
||||||
|
let start = row * y_stride;
|
||||||
|
let end = start + w;
|
||||||
|
if end > y_plane.len() {
|
||||||
|
return Err(DecodeError::EmptyFrame);
|
||||||
|
}
|
||||||
|
out.extend_from_slice(&y_plane[start..end]);
|
||||||
|
}
|
||||||
|
let uv_plane = frame.data(1);
|
||||||
|
let uv_stride = frame.stride(1);
|
||||||
|
let uv_rows = h / 2;
|
||||||
|
if uv_stride < w {
|
||||||
|
return Err(DecodeError::EmptyFrame);
|
||||||
|
}
|
||||||
|
for row in 0..uv_rows {
|
||||||
|
let start = row * uv_stride;
|
||||||
|
let end = start + w;
|
||||||
|
if end > uv_plane.len() {
|
||||||
|
return Err(DecodeError::EmptyFrame);
|
||||||
|
}
|
||||||
|
out.extend_from_slice(&uv_plane[start..end]);
|
||||||
|
}
|
||||||
|
Ok(Bytes::from(out))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lock-free counter set fed by the lifecycle loop on every decode
|
||||||
|
/// call. Mirrors the `description.md §3` health surface:
|
||||||
|
///
|
||||||
|
/// - `decode_errors_total` — incremented on every failed decode.
|
||||||
|
/// - `first_frame_decode_duration_ns` — recorded once per session
|
||||||
|
/// open (set when the first successful decode lands; later writes
|
||||||
|
/// are no-ops).
|
||||||
|
/// - `recent_durations` — small ring buffer for p50/p99 readout. Kept
|
||||||
|
/// behind a `parking_lot::Mutex` because the operations are
|
||||||
|
/// batched (one push per frame) and the lock window is a single
|
||||||
|
/// array index update; the lifecycle loop runs in a single tokio
|
||||||
|
/// task so contention is bounded to "lifecycle vs. health-server
|
||||||
|
/// readout".
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DecodeStats {
|
||||||
|
pub decode_errors_total: AtomicU64,
|
||||||
|
pub first_frame_decode_duration_ns: AtomicU64,
|
||||||
|
pub frames_decoded_total: AtomicU64,
|
||||||
|
recent_durations_ns: Mutex<RingBuffer>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for DecodeStats {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DecodeStats {
|
||||||
|
pub const RING_CAP: usize = 1024;
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
decode_errors_total: AtomicU64::new(0),
|
||||||
|
first_frame_decode_duration_ns: AtomicU64::new(0),
|
||||||
|
frames_decoded_total: AtomicU64::new(0),
|
||||||
|
recent_durations_ns: Mutex::new(RingBuffer::new(Self::RING_CAP)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shared() -> Arc<Self> {
|
||||||
|
Arc::new(Self::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn note_decode_error(&self) {
|
||||||
|
self.decode_errors_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn note_decoded(&self, duration: Duration) {
|
||||||
|
let prev_count = self.frames_decoded_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let ns = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
|
||||||
|
if prev_count == 0 {
|
||||||
|
// Only the first writer sets the cold-start metric; all
|
||||||
|
// subsequent decodes are no-ops on this field.
|
||||||
|
self.first_frame_decode_duration_ns
|
||||||
|
.store(ns, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
self.recent_durations_ns.lock().push(ns);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn p50_ns(&self) -> Option<u64> {
|
||||||
|
self.percentile_ns(0.50)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn p99_ns(&self) -> Option<u64> {
|
||||||
|
self.percentile_ns(0.99)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn percentile_ns(&self, q: f64) -> Option<u64> {
|
||||||
|
let buf = self.recent_durations_ns.lock();
|
||||||
|
if buf.len() == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let mut snap: Vec<u64> = buf.iter().collect();
|
||||||
|
snap.sort_unstable();
|
||||||
|
let idx = ((snap.len() as f64) * q).floor() as usize;
|
||||||
|
let idx = idx.min(snap.len() - 1);
|
||||||
|
Some(snap[idx])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RingBuffer {
|
||||||
|
buf: Vec<u64>,
|
||||||
|
head: usize,
|
||||||
|
cap: usize,
|
||||||
|
/// Number of items that have actually been written. Saturates at
|
||||||
|
/// `cap` once the ring is full.
|
||||||
|
len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RingBuffer {
|
||||||
|
fn new(cap: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
buf: vec![0; cap],
|
||||||
|
head: 0,
|
||||||
|
cap,
|
||||||
|
len: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push(&mut self, v: u64) {
|
||||||
|
self.buf[self.head] = v;
|
||||||
|
self.head = (self.head + 1) % self.cap;
|
||||||
|
if self.len < self.cap {
|
||||||
|
self.len += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.len
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self) -> impl Iterator<Item = u64> + '_ {
|
||||||
|
self.buf.iter().take(self.len).copied()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ffmpeg_decoder_falls_back_to_software_on_macos_dev_host() {
|
||||||
|
// Arrange — the macOS dev box ships ffmpeg without CUDA so
|
||||||
|
// `h264_cuvid` is not registered and the decoder must select
|
||||||
|
// Software.
|
||||||
|
let dec = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(dec.backend(), DecoderBackend::Software);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ring_buffer_tracks_recent_window() {
|
||||||
|
// Arrange
|
||||||
|
let mut r = RingBuffer::new(3);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
r.push(10);
|
||||||
|
r.push(20);
|
||||||
|
r.push(30);
|
||||||
|
r.push(40);
|
||||||
|
|
||||||
|
// Assert — oldest entry was overwritten by the wrap.
|
||||||
|
let v: Vec<u64> = r.iter().collect();
|
||||||
|
// After wrap-around, the in-buffer order is [40, 20, 30].
|
||||||
|
// Iteration order is not promised by the buffer; what
|
||||||
|
// matters for percentile correctness is the SET of values.
|
||||||
|
let mut sorted = v.clone();
|
||||||
|
sorted.sort_unstable();
|
||||||
|
assert_eq!(sorted, vec![20, 30, 40]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn decode_stats_records_first_frame_duration_only_once() {
|
||||||
|
// Arrange
|
||||||
|
let s = DecodeStats::new();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
s.note_decoded(Duration::from_millis(7));
|
||||||
|
s.note_decoded(Duration::from_millis(99));
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(
|
||||||
|
s.first_frame_decode_duration_ns.load(Ordering::Relaxed),
|
||||||
|
Duration::from_millis(7).as_nanos() as u64,
|
||||||
|
"second decode must not overwrite first-frame metric"
|
||||||
|
);
|
||||||
|
assert_eq!(s.frames_decoded_total.load(Ordering::Relaxed), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn decode_stats_p50_p99_reflect_sample_distribution() {
|
||||||
|
// Arrange
|
||||||
|
let s = DecodeStats::new();
|
||||||
|
for i in 1..=100u64 {
|
||||||
|
s.note_decoded(Duration::from_millis(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let p50 = s.p50_ns().expect("non-empty");
|
||||||
|
let p99 = s.p99_ns().expect("non-empty");
|
||||||
|
|
||||||
|
// Assert — 50th of 100 sorted ms-values is the 50th sample;
|
||||||
|
// 99th is the 99th sample. Allow ±1 ms slack for floor()
|
||||||
|
// index rounding.
|
||||||
|
assert!(
|
||||||
|
p50 >= Duration::from_millis(49).as_nanos() as u64
|
||||||
|
&& p50 <= Duration::from_millis(51).as_nanos() as u64,
|
||||||
|
"p50 = {p50}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
p99 >= Duration::from_millis(98).as_nanos() as u64
|
||||||
|
&& p99 <= Duration::from_millis(100).as_nanos() as u64,
|
||||||
|
"p99 = {p99}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,6 @@
|
|||||||
//! Internal modules for `frame_ingest`. Not part of the public API.
|
//! Internal modules for `frame_ingest`. Not part of the public API.
|
||||||
|
|
||||||
|
pub mod decoder;
|
||||||
pub mod lifecycle;
|
pub mod lifecycle;
|
||||||
pub mod rtsp_client;
|
pub mod rtsp_client;
|
||||||
|
pub mod timestamp;
|
||||||
|
|||||||
@@ -0,0 +1,153 @@
|
|||||||
|
//! AZ-658 — frame timestamping helpers.
|
||||||
|
//!
|
||||||
|
//! `description.md §4` requires every emitted [`Frame`] to carry a
|
||||||
|
//! monotonic capture timestamp stamped at the earliest practical
|
||||||
|
//! point in the pipeline (the moment the lifecycle loop receives an
|
||||||
|
//! RTSP packet from the transport). The decoder runs *after* that
|
||||||
|
//! point, so the [`Frame::decode_ts_monotonic_ns`] field records when
|
||||||
|
//! `FrameDecoder::decode` returned — the difference is the per-frame
|
||||||
|
//! decode latency that feeds the `decode_ms_p50` / `decode_ms_p99` /
|
||||||
|
//! `decode_ms_first_frame` health metrics.
|
||||||
|
//!
|
||||||
|
//! This module owns:
|
||||||
|
//! - [`SeqCounter`] — a strictly-monotonic `u64` sequence number used
|
||||||
|
//! as the frame's identity downstream of the decoder. Saturates at
|
||||||
|
//! `u64::MAX` so a session that never restarts cannot wrap and
|
||||||
|
//! produce duplicate IDs (saturating is preferred over wrapping
|
||||||
|
//! here because `movement_detector` keys per-frame state by `seq`
|
||||||
|
//! and a wrap would corrupt that map).
|
||||||
|
//! - [`FrameStamper`] — pairs a `MonoClock` and a `SeqCounter` so the
|
||||||
|
//! lifecycle loop has one place to read both timestamps for a
|
||||||
|
//! single packet → frame transition.
|
||||||
|
|
||||||
|
use shared::clock::MonoClock;
|
||||||
|
|
||||||
|
/// Strictly-monotonic frame sequence counter. Saturates at
|
||||||
|
/// `u64::MAX`; in practice a 30 fps stream takes ~19.5 billion years
|
||||||
|
/// to overflow `u64`, so saturation behaviour is observable only as a
|
||||||
|
/// post-condition for tests with `u64::MAX - 1` priming.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct SeqCounter {
|
||||||
|
next: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeqCounter {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { next: 0 }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the next sequence number and advances internal state.
|
||||||
|
/// Saturates at `u64::MAX` (subsequent calls keep returning
|
||||||
|
/// `u64::MAX`). Named `advance` rather than `next` so that the
|
||||||
|
/// type does not collide with `Iterator::next` semantics in
|
||||||
|
/// caller code (and to satisfy `clippy::should_implement_trait`
|
||||||
|
/// — `SeqCounter` is intentionally NOT an Iterator: an unbounded
|
||||||
|
/// monotonic counter has no natural `None` terminator).
|
||||||
|
pub fn advance(&mut self) -> u64 {
|
||||||
|
let s = self.next;
|
||||||
|
self.next = self.next.saturating_add(1);
|
||||||
|
s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holds a clock + sequence counter so the lifecycle loop only has
|
||||||
|
/// to call [`FrameStamper::capture`] (immediately on packet receipt)
|
||||||
|
/// and [`FrameStamper::decoded`] (immediately after decode returns)
|
||||||
|
/// to produce both monotonic timestamps for the next frame.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FrameStamper {
|
||||||
|
clock: MonoClock,
|
||||||
|
seq: SeqCounter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrameStamper {
|
||||||
|
pub fn new(clock: MonoClock) -> Self {
|
||||||
|
Self {
|
||||||
|
clock,
|
||||||
|
seq: SeqCounter::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot the capture-side timestamp + sequence number. Call
|
||||||
|
/// this the moment the transport hands us the packet, BEFORE
|
||||||
|
/// invoking the decoder. The capture timestamp is the head of
|
||||||
|
/// the per-frame latency budget (`description.md §8`: ≤30 ms p99
|
||||||
|
/// from RTSP rx → publish on Jetson Orin Nano).
|
||||||
|
pub fn capture(&mut self) -> CaptureMark {
|
||||||
|
CaptureMark {
|
||||||
|
seq: self.seq.advance(),
|
||||||
|
ts_ns: self.clock.elapsed_ns(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the decode-side timestamp at the moment
|
||||||
|
/// `FrameDecoder::decode` returned. Used both for the emitted
|
||||||
|
/// `Frame::decode_ts_monotonic_ns` field and to compute
|
||||||
|
/// `decode_duration = decode_ts - capture_ts` for the histogram.
|
||||||
|
pub fn decoded(&self) -> u64 {
|
||||||
|
self.clock.elapsed_ns()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One capture-side mark per packet. Carried through the decode call
|
||||||
|
/// so the emitted `Frame` keeps the timestamp from packet receipt,
|
||||||
|
/// not from after-decode.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct CaptureMark {
|
||||||
|
pub seq: u64,
|
||||||
|
pub ts_ns: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn seq_counter_is_strictly_monotonic() {
|
||||||
|
// Arrange
|
||||||
|
let mut c = SeqCounter::new();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let a = c.advance();
|
||||||
|
let b = c.advance();
|
||||||
|
let d = c.advance();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(a, 0);
|
||||||
|
assert_eq!(b, 1);
|
||||||
|
assert_eq!(d, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn seq_counter_saturates_at_max_instead_of_wrapping() {
|
||||||
|
// Arrange — prime to u64::MAX - 1 by direct field assignment
|
||||||
|
// so the test runs in O(1).
|
||||||
|
let mut c = SeqCounter { next: u64::MAX - 1 };
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let a = c.advance();
|
||||||
|
let b = c.advance();
|
||||||
|
let d = c.advance();
|
||||||
|
|
||||||
|
// Assert — once we hit MAX, every subsequent call must keep
|
||||||
|
// returning MAX (no wrap to 0).
|
||||||
|
assert_eq!(a, u64::MAX - 1);
|
||||||
|
assert_eq!(b, u64::MAX);
|
||||||
|
assert_eq!(d, u64::MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_stamper_capture_advances_seq_and_ts() {
|
||||||
|
// Arrange
|
||||||
|
let mut s = FrameStamper::new(MonoClock::new());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let m1 = s.capture();
|
||||||
|
let m2 = s.capture();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(m1.seq, 0);
|
||||||
|
assert_eq!(m2.seq, 1);
|
||||||
|
assert!(m2.ts_ns >= m1.ts_ns, "monotonic clock went backwards");
|
||||||
|
}
|
||||||
|
}
|
||||||
+127
-42
@@ -3,26 +3,22 @@
|
|||||||
//! Real implementation lands in:
|
//! Real implementation lands in:
|
||||||
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
|
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
|
||||||
//! reconnect + AI-lock plumb (this crate, modules in `internal/`).
|
//! reconnect + AI-lock plumb (this crate, modules in `internal/`).
|
||||||
//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode into raw
|
//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode (NVDEC + sw
|
||||||
//! pixel buffers + retina/FFmpeg/GStreamer transport binding.
|
//! fallback) + per-frame monotonic timestamping + decode stats
|
||||||
|
//! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`).
|
||||||
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
|
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
|
||||||
//! drop policy.
|
//! drop policy.
|
||||||
//!
|
//!
|
||||||
//! ## AZ-657 surface
|
//! ## AZ-658 surface (extends AZ-657)
|
||||||
//!
|
//!
|
||||||
//! - [`FrameIngest::new`] — construct in `Closed` state.
|
//! `FrameIngest::run` now takes a [`FrameDecoder`]. The lifecycle loop
|
||||||
//! - [`FrameIngest::run`] — spawn the lifecycle loop driving the given
|
//! stamps the capture timestamp the moment a packet leaves the
|
||||||
//! `RtspTransport` through `connect → stream → reconnect` cycles
|
//! transport, hands the encoded payload to the decoder, and emits one
|
||||||
//! with bounded backoff. Returns a `JoinHandle`.
|
//! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set
|
||||||
//! - [`FrameIngestHandle::subscribe`] — broadcast frame stream (the
|
//! when the decoder returned. Single-frame decode errors increment
|
||||||
//! AZ-657 lifecycle emits only synthetic header frames; real
|
//! `decode_errors_total` and drop the frame; the stream is never
|
||||||
//! decoded frames come in AZ-658).
|
//! aborted (AC-3). The decoder backend (`Nvdec` / `Software`) is
|
||||||
//! - [`FrameIngestHandle::set_ai_lock`] — `bringCameraDown` /
|
//! observable via [`FrameIngestHandle::decoder_backend`].
|
||||||
//! `bringCameraUp` signal. Stamps `Frame.ai_locked` on every
|
|
||||||
//! subsequently emitted frame.
|
|
||||||
//! - [`FrameIngestHandle::session_state`] — current FSM state.
|
|
||||||
//! - [`FrameIngestHandle::health`] — `ComponentHealth` reflecting the
|
|
||||||
//! FSM state + `last_packet_age` + `ai_locked`.
|
|
||||||
|
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -37,10 +33,15 @@ use shared::models::frame::Frame;
|
|||||||
|
|
||||||
pub mod internal;
|
pub mod internal;
|
||||||
|
|
||||||
|
pub use internal::decoder::{
|
||||||
|
Codec, DecodeError, DecodeStats, DecodedPixels, DecoderBackend, DecoderInitError,
|
||||||
|
FfmpegDecoder, FrameDecoder,
|
||||||
|
};
|
||||||
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
|
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
|
||||||
pub use internal::rtsp_client::{
|
pub use internal::rtsp_client::{
|
||||||
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
|
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
|
||||||
};
|
};
|
||||||
|
pub use internal::timestamp::FrameStamper;
|
||||||
|
|
||||||
use internal::lifecycle::{transition, Trigger};
|
use internal::lifecycle::{transition, Trigger};
|
||||||
|
|
||||||
@@ -56,7 +57,9 @@ pub struct FrameIngest {
|
|||||||
ai_lock_tx: watch::Sender<bool>,
|
ai_lock_tx: watch::Sender<bool>,
|
||||||
state_tx: watch::Sender<SessionState>,
|
state_tx: watch::Sender<SessionState>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
backend_tx: watch::Sender<Option<DecoderBackend>>,
|
||||||
stats: Arc<LifecycleStats>,
|
stats: Arc<LifecycleStats>,
|
||||||
|
decode_stats: Arc<DecodeStats>,
|
||||||
backoff: BackoffPolicy,
|
backoff: BackoffPolicy,
|
||||||
clock: MonoClock,
|
clock: MonoClock,
|
||||||
}
|
}
|
||||||
@@ -74,12 +77,15 @@ impl FrameIngest {
|
|||||||
let (ai_lock_tx, _) = watch::channel(false);
|
let (ai_lock_tx, _) = watch::channel(false);
|
||||||
let (state_tx, _) = watch::channel(SessionState::Closed);
|
let (state_tx, _) = watch::channel(SessionState::Closed);
|
||||||
let (shutdown_tx, _) = watch::channel(false);
|
let (shutdown_tx, _) = watch::channel(false);
|
||||||
|
let (backend_tx, _) = watch::channel(None);
|
||||||
Self {
|
Self {
|
||||||
tx,
|
tx,
|
||||||
ai_lock_tx,
|
ai_lock_tx,
|
||||||
state_tx,
|
state_tx,
|
||||||
shutdown_tx,
|
shutdown_tx,
|
||||||
|
backend_tx,
|
||||||
stats: LifecycleStats::new(),
|
stats: LifecycleStats::new(),
|
||||||
|
decode_stats: DecodeStats::shared(),
|
||||||
backoff,
|
backoff,
|
||||||
clock: MonoClock::new(),
|
clock: MonoClock::new(),
|
||||||
}
|
}
|
||||||
@@ -91,36 +97,50 @@ impl FrameIngest {
|
|||||||
ai_lock_tx: self.ai_lock_tx.clone(),
|
ai_lock_tx: self.ai_lock_tx.clone(),
|
||||||
state_rx: self.state_tx.subscribe(),
|
state_rx: self.state_tx.subscribe(),
|
||||||
shutdown_tx: self.shutdown_tx.clone(),
|
shutdown_tx: self.shutdown_tx.clone(),
|
||||||
|
backend_rx: self.backend_tx.subscribe(),
|
||||||
stats: Arc::clone(&self.stats),
|
stats: Arc::clone(&self.stats),
|
||||||
|
decode_stats: Arc::clone(&self.decode_stats),
|
||||||
clock: self.clock,
|
clock: self.clock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn the lifecycle loop. The returned handle resolves when
|
/// Spawn the lifecycle loop. Returns a `JoinHandle` that resolves
|
||||||
/// the loop exits (shutdown signalled via
|
/// when the loop exits (shutdown signalled via
|
||||||
/// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM).
|
/// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM).
|
||||||
pub fn run<T>(&self, transport: T, config: RtspSessionConfig) -> JoinHandle<()>
|
///
|
||||||
|
/// `decoder` is owned exclusively by the spawned task; only one
|
||||||
|
/// decoder is active per `FrameIngest` instance.
|
||||||
|
pub fn run<T, D>(&self, transport: T, decoder: D, config: RtspSessionConfig) -> JoinHandle<()>
|
||||||
where
|
where
|
||||||
T: RtspTransport + 'static,
|
T: RtspTransport + 'static,
|
||||||
|
D: FrameDecoder + 'static,
|
||||||
{
|
{
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
let ai_lock = self.ai_lock_tx.subscribe();
|
let ai_lock = self.ai_lock_tx.subscribe();
|
||||||
let state_tx = self.state_tx.clone();
|
let state_tx = self.state_tx.clone();
|
||||||
|
let backend_tx = self.backend_tx.clone();
|
||||||
let shutdown_rx = self.shutdown_tx.subscribe();
|
let shutdown_rx = self.shutdown_tx.subscribe();
|
||||||
let stats = Arc::clone(&self.stats);
|
let stats = Arc::clone(&self.stats);
|
||||||
|
let decode_stats = Arc::clone(&self.decode_stats);
|
||||||
let backoff = self.backoff;
|
let backoff = self.backoff;
|
||||||
let clock = self.clock;
|
let clock = self.clock;
|
||||||
let transport = Arc::new(Mutex::new(transport));
|
let transport = Arc::new(Mutex::new(transport));
|
||||||
|
let decoder: Box<dyn FrameDecoder + Send> = Box::new(decoder);
|
||||||
|
// Snapshot the decoder backend immediately so it is observable
|
||||||
|
// even before the first packet.
|
||||||
|
backend_tx.send_replace(Some(decoder.backend()));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
lifecycle_loop(
|
lifecycle_loop(
|
||||||
transport,
|
transport,
|
||||||
|
decoder,
|
||||||
config,
|
config,
|
||||||
tx,
|
tx,
|
||||||
ai_lock,
|
ai_lock,
|
||||||
state_tx,
|
state_tx,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
stats,
|
stats,
|
||||||
|
decode_stats,
|
||||||
backoff,
|
backoff,
|
||||||
clock,
|
clock,
|
||||||
)
|
)
|
||||||
@@ -136,19 +156,22 @@ fn is_shutdown(rx: &watch::Receiver<bool>) -> bool {
|
|||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn lifecycle_loop<T>(
|
async fn lifecycle_loop<T>(
|
||||||
transport: Arc<Mutex<T>>,
|
transport: Arc<Mutex<T>>,
|
||||||
|
mut decoder: Box<dyn FrameDecoder + Send>,
|
||||||
config: RtspSessionConfig,
|
config: RtspSessionConfig,
|
||||||
tx: broadcast::Sender<Frame>,
|
tx: broadcast::Sender<Frame>,
|
||||||
mut ai_lock: watch::Receiver<bool>,
|
mut ai_lock: watch::Receiver<bool>,
|
||||||
state_tx: watch::Sender<SessionState>,
|
state_tx: watch::Sender<SessionState>,
|
||||||
mut shutdown_rx: watch::Receiver<bool>,
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
stats: Arc<LifecycleStats>,
|
stats: Arc<LifecycleStats>,
|
||||||
|
decode_stats: Arc<DecodeStats>,
|
||||||
backoff: BackoffPolicy,
|
backoff: BackoffPolicy,
|
||||||
clock: MonoClock,
|
clock: MonoClock,
|
||||||
) where
|
) where
|
||||||
T: RtspTransport,
|
T: RtspTransport,
|
||||||
{
|
{
|
||||||
let mut state = SessionState::Closed;
|
let mut state = SessionState::Closed;
|
||||||
let mut seq: u64 = 0;
|
let mut stamper = FrameStamper::new(clock);
|
||||||
|
let mut decoded_buffer: Vec<DecodedPixels> = Vec::with_capacity(4);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if is_shutdown(&shutdown_rx) {
|
if is_shutdown(&shutdown_rx) {
|
||||||
@@ -203,29 +226,47 @@ async fn lifecycle_loop<T>(
|
|||||||
|
|
||||||
match packet {
|
match packet {
|
||||||
Ok(pkt) => {
|
Ok(pkt) => {
|
||||||
let now_ns = clock.elapsed_ns();
|
// Capture timestamp + sequence number are
|
||||||
stats.note_packet(now_ns);
|
// taken at the EARLIEST point per
|
||||||
|
// `description.md §4` — before the decoder
|
||||||
|
// has run, so movement_detector's skew
|
||||||
|
// gate sees the original packet arrival
|
||||||
|
// time.
|
||||||
|
let mark = stamper.capture();
|
||||||
|
stats.note_packet(mark.ts_ns);
|
||||||
let locked = *ai_lock.borrow_and_update();
|
let locked = *ai_lock.borrow_and_update();
|
||||||
// AZ-657 emits a synthetic frame envelope
|
decoded_buffer.clear();
|
||||||
// per inbound RTSP packet so the lifecycle
|
match decoder.decode(&pkt.payload, &mut decoded_buffer) {
|
||||||
// FSM can be exercised end-to-end without
|
Ok(()) => {
|
||||||
// the decoder (AZ-658 swaps this for the
|
for dp in decoded_buffer.drain(..) {
|
||||||
// actual decoded frame).
|
decode_stats.note_decoded(dp.decode_duration);
|
||||||
let frame = Frame {
|
let frame = Frame {
|
||||||
seq,
|
seq: mark.seq,
|
||||||
capture_ts_monotonic_ns: now_ns,
|
capture_ts_monotonic_ns: mark.ts_ns,
|
||||||
decode_ts_monotonic_ns: now_ns,
|
decode_ts_monotonic_ns: stamper.decoded(),
|
||||||
pixels: Arc::new(pkt.payload),
|
pixels: Arc::new(dp.pixels),
|
||||||
width: 0,
|
width: dp.width,
|
||||||
height: 0,
|
height: dp.height,
|
||||||
pix_fmt: shared::models::frame::PixelFormat::Nv12,
|
pix_fmt: dp.pix_fmt,
|
||||||
ai_locked: locked,
|
ai_locked: locked,
|
||||||
};
|
};
|
||||||
seq = seq.saturating_add(1);
|
// Send errors are no-ops when
|
||||||
// A no-subscriber send is a no-op error in
|
// the broadcast has no
|
||||||
// the broadcast channel; the lifecycle
|
// subscribers; per-consumer
|
||||||
// does not care.
|
// back-pressure is AZ-659's
|
||||||
let _ = tx.send(frame);
|
// problem.
|
||||||
|
let _ = tx.send(frame);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
decode_stats.note_decode_error();
|
||||||
|
tracing::warn!(
|
||||||
|
error = %e,
|
||||||
|
seq = mark.seq,
|
||||||
|
"frame_ingest dropped a frame on decode error"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let trig = Trigger::from_stream_error(&e);
|
let trig = Trigger::from_stream_error(&e);
|
||||||
@@ -272,7 +313,9 @@ pub struct FrameIngestHandle {
|
|||||||
ai_lock_tx: watch::Sender<bool>,
|
ai_lock_tx: watch::Sender<bool>,
|
||||||
state_rx: watch::Receiver<SessionState>,
|
state_rx: watch::Receiver<SessionState>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
backend_rx: watch::Receiver<Option<DecoderBackend>>,
|
||||||
stats: Arc<LifecycleStats>,
|
stats: Arc<LifecycleStats>,
|
||||||
|
decode_stats: Arc<DecodeStats>,
|
||||||
clock: MonoClock,
|
clock: MonoClock,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -314,6 +357,44 @@ impl FrameIngestHandle {
|
|||||||
self.stats.reopens_total.load(Ordering::Relaxed)
|
self.stats.reopens_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Backend the active decoder selected at construction. `None`
|
||||||
|
/// before `FrameIngest::run` has been called.
|
||||||
|
pub fn decoder_backend(&self) -> Option<DecoderBackend> {
|
||||||
|
*self.backend_rx.borrow()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_errors_total(&self) -> u64 {
|
||||||
|
self.decode_stats
|
||||||
|
.decode_errors_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn frames_decoded_total(&self) -> u64 {
|
||||||
|
self.decode_stats
|
||||||
|
.frames_decoded_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_ms_first_frame(&self) -> Option<Duration> {
|
||||||
|
let ns = self
|
||||||
|
.decode_stats
|
||||||
|
.first_frame_decode_duration_ns
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
|
if ns == 0 && self.frames_decoded_total() == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Duration::from_nanos(ns))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_ms_p50(&self) -> Option<Duration> {
|
||||||
|
self.decode_stats.p50_ns().map(Duration::from_nanos)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_ms_p99(&self) -> Option<Duration> {
|
||||||
|
self.decode_stats.p99_ns().map(Duration::from_nanos)
|
||||||
|
}
|
||||||
|
|
||||||
/// Request the lifecycle loop to drain to `Closed` and exit. The
|
/// Request the lifecycle loop to drain to `Closed` and exit. The
|
||||||
/// loop races every transport call against this signal, so a
|
/// loop races every transport call against this signal, so a
|
||||||
/// hung transport cannot wedge graceful exit.
|
/// hung transport cannot wedge graceful exit.
|
||||||
@@ -366,6 +447,10 @@ mod tests {
|
|||||||
let h = FrameIngest::new(8).handle();
|
let h = FrameIngest::new(8).handle();
|
||||||
assert_eq!(h.session_state(), SessionState::Closed);
|
assert_eq!(h.session_state(), SessionState::Closed);
|
||||||
assert_eq!(h.health().level, HealthLevel::Disabled);
|
assert_eq!(h.health().level, HealthLevel::Disabled);
|
||||||
|
assert!(
|
||||||
|
h.decoder_backend().is_none(),
|
||||||
|
"no decoder is wired until run() is called"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -0,0 +1,386 @@
|
|||||||
|
//! AZ-658 — decoder pipeline integration tests.
|
||||||
|
//!
|
||||||
|
//! These tests drive the **real** [`FfmpegDecoder`] (libavcodec) end
|
||||||
|
//! to end through the lifecycle loop. A synthetic H.264 bitstream is
|
||||||
|
//! produced in-process by libx264 (the same FFmpeg install that
|
||||||
|
//! `FfmpegDecoder` uses to decode), so the tests exercise the
|
||||||
|
//! production decode path rather than a stub.
|
||||||
|
//!
|
||||||
|
//! ACs covered here:
|
||||||
|
//! - AC-1 — software-path throughput preservation (≥95 % of input
|
||||||
|
//! frames decoded; sequence numbers strictly monotonic; decoder
|
||||||
|
//! backend reports `Software` on a CUDA-less host).
|
||||||
|
//! - AC-3 — a single corrupted "packet" between valid ones must
|
||||||
|
//! increment `decode_errors_total` exactly once and NOT abort the
|
||||||
|
//! stream.
|
||||||
|
//! - AC-4 — `capture_ts_monotonic_ns` is strictly increasing across
|
||||||
|
//! the emitted frame stream (rides on AC-1's setup).
|
||||||
|
//!
|
||||||
|
//! AC-2 (NVDEC selection on Jetson) cannot be exercised here — there
|
||||||
|
//! is no CUDA-capable FFmpeg on the dev/CI host. The unit-test
|
||||||
|
//! counterpart in `internal/decoder.rs::tests` asserts the negative
|
||||||
|
//! direction (CUDA-less host → Software backend); the positive
|
||||||
|
//! direction is validated on the Jetson at deployment time and is
|
||||||
|
//! covered by the Run Tests gate downstream of this batch.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use ffmpeg_next as ffmpeg;
|
||||||
|
use tokio::sync::Mutex as AsyncMutex;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
use frame_ingest::{
|
||||||
|
BackoffPolicy, Codec, DecoderBackend, FfmpegDecoder, FrameDecoder, FrameIngest, OpenError,
|
||||||
|
RtspPacket, RtspSessionConfig, RtspTransport, StreamError,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Synthetic H.264 bitstream generator. Encodes `num_frames` frames
|
||||||
|
/// of a checkerboard pattern at `width`x`height` and 30 fps with
|
||||||
|
/// libx264 (preset `ultrafast`, tune `zerolatency`, GOP every 30
|
||||||
|
/// frames so each test run gets a few IDRs). Returns a vector of
|
||||||
|
/// per-AVPacket byte blobs, each ready to feed into the decoder as
|
||||||
|
/// the payload of an `RtspPacket`.
|
||||||
|
fn synth_h264_stream(num_frames: usize, width: u32, height: u32) -> Vec<Bytes> {
|
||||||
|
ffmpeg::init().expect("ffmpeg init");
|
||||||
|
let codec = ffmpeg::codec::encoder::find_by_name("libx264")
|
||||||
|
.or_else(|| ffmpeg::codec::encoder::find_by_name("h264"))
|
||||||
|
.expect("an H.264 encoder must be registered");
|
||||||
|
|
||||||
|
let context = ffmpeg::codec::Context::new_with_codec(codec);
|
||||||
|
let mut encoder = context
|
||||||
|
.encoder()
|
||||||
|
.video()
|
||||||
|
.expect("encoder context yields video");
|
||||||
|
encoder.set_width(width);
|
||||||
|
encoder.set_height(height);
|
||||||
|
encoder.set_format(ffmpeg::format::Pixel::YUV420P);
|
||||||
|
encoder.set_time_base(ffmpeg::Rational::new(1, 30));
|
||||||
|
encoder.set_frame_rate(Some(ffmpeg::Rational::new(30, 1)));
|
||||||
|
encoder.set_gop(30);
|
||||||
|
encoder.set_max_b_frames(0);
|
||||||
|
|
||||||
|
let mut opts = ffmpeg::Dictionary::new();
|
||||||
|
opts.set("preset", "ultrafast");
|
||||||
|
opts.set("tune", "zerolatency");
|
||||||
|
let mut opened = encoder
|
||||||
|
.open_with(opts)
|
||||||
|
.expect("libx264 encoder must open with ultrafast/zerolatency");
|
||||||
|
|
||||||
|
let mut out = Vec::with_capacity(num_frames + 4);
|
||||||
|
let mut packet = ffmpeg::Packet::empty();
|
||||||
|
|
||||||
|
for i in 0..num_frames {
|
||||||
|
let mut input = ffmpeg::frame::Video::new(ffmpeg::format::Pixel::YUV420P, width, height);
|
||||||
|
// Fill Y plane with a per-frame gradient so the encoder has
|
||||||
|
// motion to compress (a constant frame is degenerate and
|
||||||
|
// libx264 can choose to emit zero packets for some inputs).
|
||||||
|
let y_stride = input.stride(0);
|
||||||
|
let y = input.data_mut(0);
|
||||||
|
for row in 0..height as usize {
|
||||||
|
let v = ((i + row) & 0xFF) as u8;
|
||||||
|
for col in 0..width as usize {
|
||||||
|
y[row * y_stride + col] = v ^ ((col & 0xFF) as u8);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for plane in 1..=2 {
|
||||||
|
let stride = input.stride(plane);
|
||||||
|
let data = input.data_mut(plane);
|
||||||
|
for row in 0..(height as usize) / 2 {
|
||||||
|
for col in 0..(width as usize) / 2 {
|
||||||
|
data[row * stride + col] = 128;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
input.set_pts(Some(i as i64));
|
||||||
|
opened
|
||||||
|
.send_frame(&input)
|
||||||
|
.unwrap_or_else(|e| panic!("encoder send_frame ({i}) failed: {e}"));
|
||||||
|
while opened.receive_packet(&mut packet).is_ok() {
|
||||||
|
if let Some(d) = packet.data() {
|
||||||
|
out.push(Bytes::copy_from_slice(d));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opened.send_eof().expect("encoder eof");
|
||||||
|
while opened.receive_packet(&mut packet).is_ok() {
|
||||||
|
if let Some(d) = packet.data() {
|
||||||
|
out.push(Bytes::copy_from_slice(d));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(
|
||||||
|
!out.is_empty(),
|
||||||
|
"synthetic encoder must produce at least one packet"
|
||||||
|
);
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RTSP-shaped transport that replays a pre-built script of byte
|
||||||
|
/// blobs, then parks (so the FrameIngest task stays in `Streaming`
|
||||||
|
/// until the test calls `shutdown`). When the script is exhausted,
|
||||||
|
/// `next_packet` returns a parked future — the lifecycle loop's
|
||||||
|
/// `tokio::select!` against the shutdown watch is what unblocks
|
||||||
|
/// teardown.
|
||||||
|
struct ScriptedBytesTransport {
|
||||||
|
queue: Arc<AsyncMutex<VecDeque<ScriptItem>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
enum ScriptItem {
|
||||||
|
Bytes(Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScriptedBytesTransport {
|
||||||
|
fn new(packets: Vec<Bytes>) -> Self {
|
||||||
|
let queue = packets
|
||||||
|
.into_iter()
|
||||||
|
.map(ScriptItem::Bytes)
|
||||||
|
.collect::<VecDeque<_>>();
|
||||||
|
Self {
|
||||||
|
queue: Arc::new(AsyncMutex::new(queue)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RtspTransport for ScriptedBytesTransport {
|
||||||
|
async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&mut self) {}
|
||||||
|
|
||||||
|
async fn next_packet(&mut self) -> Result<RtspPacket, StreamError> {
|
||||||
|
loop {
|
||||||
|
let item = {
|
||||||
|
let mut q = self.queue.lock().await;
|
||||||
|
q.pop_front()
|
||||||
|
};
|
||||||
|
match item {
|
||||||
|
Some(ScriptItem::Bytes(b)) => {
|
||||||
|
return Ok(RtspPacket {
|
||||||
|
timestamp_rtp: 0,
|
||||||
|
payload: b,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Park forever; the lifecycle loop's shutdown
|
||||||
|
// watch breaks us out via select!.
|
||||||
|
std::future::pending::<()>().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fast_backoff() -> BackoffPolicy {
|
||||||
|
BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-1 + AC-4 — a software-decoded synthetic stream must preserve
|
||||||
|
/// at least 95 % of input frames and stamp them with strictly
|
||||||
|
/// monotonic capture timestamps + sequence numbers. The dev/CI host
|
||||||
|
/// has no CUDA so backend MUST report `Software`.
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn ac1_ac4_software_decode_preserves_throughput_and_monotonicity() {
|
||||||
|
// Arrange — encode 60 frames (2 s of 30 fps content). The AC's
|
||||||
|
// literal 1080p / 10 s budget is validated against the real
|
||||||
|
// camera at deploy; the dev test exercises the same code path
|
||||||
|
// at smaller scale to keep CI <5 s.
|
||||||
|
let width = 320u32;
|
||||||
|
let height = 240u32;
|
||||||
|
let input_frames = 60usize;
|
||||||
|
let stream = synth_h264_stream(input_frames, width, height);
|
||||||
|
assert!(
|
||||||
|
stream.len() >= input_frames - 5,
|
||||||
|
"encoder produced {} packets for {input_frames} frames; expected ~1:1",
|
||||||
|
stream.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
let transport = ScriptedBytesTransport::new(stream);
|
||||||
|
let decoder =
|
||||||
|
FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open on this host");
|
||||||
|
let ingest = FrameIngest::with_backoff(input_frames + 16, fast_backoff());
|
||||||
|
let handle = ingest.handle();
|
||||||
|
let mut frames = handle.subscribe();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
|
||||||
|
let mut received = Vec::with_capacity(input_frames);
|
||||||
|
let deadline = Duration::from_secs(10);
|
||||||
|
let start = tokio::time::Instant::now();
|
||||||
|
while received.len() < input_frames && start.elapsed() < deadline {
|
||||||
|
match timeout(Duration::from_millis(500), frames.recv()).await {
|
||||||
|
Ok(Ok(f)) => received.push(f),
|
||||||
|
Ok(Err(_)) => break,
|
||||||
|
Err(_) => {
|
||||||
|
if handle.frames_decoded_total() as usize == received.len() {
|
||||||
|
// No more frames are coming — the encoder may
|
||||||
|
// have produced fewer access units than input
|
||||||
|
// frames (rare with `tune=zerolatency` but
|
||||||
|
// possible). Stop waiting.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handle.shutdown();
|
||||||
|
let _ = timeout(Duration::from_secs(2), task).await;
|
||||||
|
|
||||||
|
// Assert — backend selection (AC-2 negative direction): CUDA-less
|
||||||
|
// host MUST select Software.
|
||||||
|
assert_eq!(
|
||||||
|
handle.decoder_backend(),
|
||||||
|
Some(DecoderBackend::Software),
|
||||||
|
"host without h264_cuvid must fall back to Software"
|
||||||
|
);
|
||||||
|
|
||||||
|
// AC-1 — at least 95 % of input frames decoded.
|
||||||
|
let kept = received.len();
|
||||||
|
let min_required = (input_frames as f64 * 0.95).ceil() as usize;
|
||||||
|
assert!(
|
||||||
|
kept >= min_required,
|
||||||
|
"decoded {kept} frames; AC-1 requires ≥{min_required} of {input_frames} ({}%)",
|
||||||
|
(kept * 100) / input_frames
|
||||||
|
);
|
||||||
|
|
||||||
|
// AC-1 + AC-4 — sequence numbers strictly monotonic.
|
||||||
|
for w in received.windows(2) {
|
||||||
|
assert!(
|
||||||
|
w[0].seq < w[1].seq,
|
||||||
|
"seq must strictly increase: {} → {}",
|
||||||
|
w[0].seq,
|
||||||
|
w[1].seq
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// AC-4 — capture timestamps strictly monotonic.
|
||||||
|
for w in received.windows(2) {
|
||||||
|
assert!(
|
||||||
|
w[0].capture_ts_monotonic_ns < w[1].capture_ts_monotonic_ns,
|
||||||
|
"capture_ts must strictly increase: {} → {}",
|
||||||
|
w[0].capture_ts_monotonic_ns,
|
||||||
|
w[1].capture_ts_monotonic_ns
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Decode timestamps must be at-or-after capture timestamps for
|
||||||
|
// every frame (decode happens after packet receipt by
|
||||||
|
// construction).
|
||||||
|
for f in &received {
|
||||||
|
assert!(
|
||||||
|
f.decode_ts_monotonic_ns >= f.capture_ts_monotonic_ns,
|
||||||
|
"decode_ts {} must be ≥ capture_ts {}",
|
||||||
|
f.decode_ts_monotonic_ns,
|
||||||
|
f.capture_ts_monotonic_ns
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// First-frame cold-start metric was recorded.
|
||||||
|
assert!(
|
||||||
|
handle.decode_ms_first_frame().is_some(),
|
||||||
|
"decode_ms_first_frame must be populated after the first decode"
|
||||||
|
);
|
||||||
|
assert!(handle.decode_ms_p50().is_some(), "p50 must be populated");
|
||||||
|
assert!(handle.decode_ms_p99().is_some(), "p99 must be populated");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-2 (positive direction) — on a CUDA-capable host, the decoder
|
||||||
|
/// MUST select `DecoderBackend::Nvdec`. This test cannot run on the
|
||||||
|
/// Mac/Linux dev box (no CUDA-enabled FFmpeg), so it is `#[ignore]`d
|
||||||
|
/// by default and explicitly opt-in via `cargo test -- --ignored`
|
||||||
|
/// on a Jetson Orin Nano with the FFmpeg-cuda packages installed.
|
||||||
|
/// The negative direction (no CUDA → Software) is asserted both in
|
||||||
|
/// `internal::decoder::tests::ffmpeg_decoder_falls_back_to_software_on_macos_dev_host`
|
||||||
|
/// and in `ac1_ac4_software_decode_preserves_throughput_and_monotonicity`
|
||||||
|
/// above; together they pin the selection rule from both sides.
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "AC-2 positive: requires a CUDA-capable FFmpeg (h264_cuvid registered) — only runs on Jetson"]
|
||||||
|
async fn ac2_nvdec_backend_selected_on_cuda_host() {
|
||||||
|
// Arrange + Act
|
||||||
|
let dec = FfmpegDecoder::new(Codec::H264).expect("h264 decoder must open on Jetson");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(
|
||||||
|
dec.backend(),
|
||||||
|
DecoderBackend::Nvdec,
|
||||||
|
"Jetson Orin Nano with CUDA-enabled FFmpeg MUST select NVDEC"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-3 — a corrupted packet between valid ones must be counted as
|
||||||
|
/// `decode_errors_total += 1` and the stream must keep producing
|
||||||
|
/// frames after it.
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn ac3_corrupted_frame_is_counted_and_does_not_abort_stream() {
|
||||||
|
// Arrange — generate two synthetic streams, one for "before" and
|
||||||
|
// one for "after"; splice a garbage packet between them.
|
||||||
|
let width = 320u32;
|
||||||
|
let height = 240u32;
|
||||||
|
let mut script: Vec<Bytes> = synth_h264_stream(20, width, height);
|
||||||
|
let after = synth_h264_stream(20, width, height);
|
||||||
|
let pre_count = script.len();
|
||||||
|
// Corrupted packet: random bytes that are not a valid NAL unit.
|
||||||
|
// The decoder rejects them via `send_packet` (Annex-B start code
|
||||||
|
// missing) or `receive_frame` (parsed as an unsupported NAL
|
||||||
|
// type), either way returning an error from
|
||||||
|
// `FfmpegDecoder::decode`.
|
||||||
|
let garbage = Bytes::from_static(&[
|
||||||
|
0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0x12, 0x34, 0x56, 0x78,
|
||||||
|
]);
|
||||||
|
script.push(garbage);
|
||||||
|
script.extend(after);
|
||||||
|
let total_packets = script.len();
|
||||||
|
|
||||||
|
let transport = ScriptedBytesTransport::new(script);
|
||||||
|
let decoder = FfmpegDecoder::new(Codec::H264).expect("software h264 decoder must open");
|
||||||
|
let ingest = FrameIngest::with_backoff(total_packets + 16, fast_backoff());
|
||||||
|
let handle = ingest.handle();
|
||||||
|
let mut frames = handle.subscribe();
|
||||||
|
|
||||||
|
// Act — drain frames until either we've collected enough to know
|
||||||
|
// post-error frames landed, or we time out.
|
||||||
|
let task = ingest.run(transport, decoder, RtspSessionConfig::new("rtsp://fake/0"));
|
||||||
|
let mut received_seqs: Vec<u64> = Vec::new();
|
||||||
|
let deadline = Duration::from_secs(10);
|
||||||
|
let start = tokio::time::Instant::now();
|
||||||
|
let target_frames = (pre_count + 5).min(35); // pre + a few post
|
||||||
|
while received_seqs.len() < target_frames && start.elapsed() < deadline {
|
||||||
|
match timeout(Duration::from_millis(500), frames.recv()).await {
|
||||||
|
Ok(Ok(f)) => received_seqs.push(f.seq),
|
||||||
|
Ok(Err(_)) => break,
|
||||||
|
Err(_) => {
|
||||||
|
if handle.decode_errors_total() == 0 && handle.frames_decoded_total() == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (handle.frames_decoded_total() as usize) == received_seqs.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handle.shutdown();
|
||||||
|
let _ = timeout(Duration::from_secs(2), task).await;
|
||||||
|
|
||||||
|
// Assert — exactly one decode error (the garbage packet); valid
|
||||||
|
// frames continued to land afterwards.
|
||||||
|
assert_eq!(
|
||||||
|
handle.decode_errors_total(),
|
||||||
|
1,
|
||||||
|
"one corrupted packet must produce exactly one decode error"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
received_seqs.len() >= pre_count,
|
||||||
|
"must receive at least the pre-error frames ({pre_count}); got {}",
|
||||||
|
received_seqs.len()
|
||||||
|
);
|
||||||
|
// Frames sequence is monotonic across the corrupted packet.
|
||||||
|
for w in received_seqs.windows(2) {
|
||||||
|
assert!(
|
||||||
|
w[0] < w[1],
|
||||||
|
"seq must remain strictly monotonic across decode errors: {} → {}",
|
||||||
|
w[0],
|
||||||
|
w[1]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,9 +17,34 @@ use tokio::sync::mpsc;
|
|||||||
use tokio::time::{timeout, Instant};
|
use tokio::time::{timeout, Instant};
|
||||||
|
|
||||||
use frame_ingest::{
|
use frame_ingest::{
|
||||||
BackoffPolicy, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport,
|
BackoffPolicy, DecodeError, DecodedPixels, DecoderBackend, FrameDecoder, FrameIngest,
|
||||||
SessionState, StreamError,
|
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, SessionState, StreamError,
|
||||||
};
|
};
|
||||||
|
use shared::models::frame::PixelFormat;
|
||||||
|
|
||||||
|
/// Test-only decoder that pushes one synthetic `DecodedPixels` per
|
||||||
|
/// call. Used by the AZ-657 lifecycle tests, which verify FSM /
|
||||||
|
/// reconnect / AI-lock semantics — they don't care what pixels the
|
||||||
|
/// decoder produced. The production decoder path is exercised
|
||||||
|
/// separately by `decoder_pipeline.rs` (AZ-658).
|
||||||
|
struct StubDecoder;
|
||||||
|
|
||||||
|
impl FrameDecoder for StubDecoder {
|
||||||
|
fn backend(&self) -> DecoderBackend {
|
||||||
|
DecoderBackend::Software
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode(&mut self, payload: &[u8], out: &mut Vec<DecodedPixels>) -> Result<(), DecodeError> {
|
||||||
|
out.push(DecodedPixels {
|
||||||
|
pixels: Bytes::copy_from_slice(payload),
|
||||||
|
width: 320,
|
||||||
|
height: 240,
|
||||||
|
pix_fmt: PixelFormat::Nv12,
|
||||||
|
decode_duration: Duration::from_micros(100),
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
enum Scripted {
|
enum Scripted {
|
||||||
@@ -158,7 +183,11 @@ async fn ac1_open_succeeds_and_session_reaches_streaming() {
|
|||||||
let mut frames = handle.subscribe();
|
let mut frames = handle.subscribe();
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
|
let task = ingest.run(
|
||||||
|
transport,
|
||||||
|
StubDecoder,
|
||||||
|
RtspSessionConfig::new("rtsp://fake/0"),
|
||||||
|
);
|
||||||
let first = timeout(Duration::from_secs(1), frames.recv())
|
let first = timeout(Duration::from_secs(1), frames.recv())
|
||||||
.await
|
.await
|
||||||
.expect("frame within 1 s")
|
.expect("frame within 1 s")
|
||||||
@@ -197,7 +226,11 @@ async fn ac2_bounded_reconnect_recovers_after_transient_failure() {
|
|||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
|
let task = ingest.run(
|
||||||
|
transport,
|
||||||
|
StubDecoder,
|
||||||
|
RtspSessionConfig::new("rtsp://fake/0"),
|
||||||
|
);
|
||||||
let _ = timeout(Duration::from_secs(2), frames.recv())
|
let _ = timeout(Duration::from_secs(2), frames.recv())
|
||||||
.await
|
.await
|
||||||
.expect("frame within 2 s")
|
.expect("frame within 2 s")
|
||||||
@@ -233,7 +266,11 @@ async fn ac2b_stream_drop_increments_reopens_total() {
|
|||||||
let mut frames = handle.subscribe();
|
let mut frames = handle.subscribe();
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
|
let task = ingest.run(
|
||||||
|
transport,
|
||||||
|
StubDecoder,
|
||||||
|
RtspSessionConfig::new("rtsp://fake/0"),
|
||||||
|
);
|
||||||
let _ = timeout(Duration::from_secs(1), frames.recv())
|
let _ = timeout(Duration::from_secs(1), frames.recv())
|
||||||
.await
|
.await
|
||||||
.expect("first frame")
|
.expect("first frame")
|
||||||
@@ -268,7 +305,11 @@ async fn ac3_unsupported_profile_hard_fails_session() {
|
|||||||
let handle = ingest.handle();
|
let handle = ingest.handle();
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
|
let task = ingest.run(
|
||||||
|
transport,
|
||||||
|
StubDecoder,
|
||||||
|
RtspSessionConfig::new("rtsp://fake/0"),
|
||||||
|
);
|
||||||
let _ = timeout(Duration::from_secs(1), task)
|
let _ = timeout(Duration::from_secs(1), task)
|
||||||
.await
|
.await
|
||||||
.expect("lifecycle loop exits on hard-fail");
|
.expect("lifecycle loop exits on hard-fail");
|
||||||
@@ -295,7 +336,11 @@ async fn ac4_ai_lock_toggle_propagates_to_frames() {
|
|||||||
let mut frames = handle.subscribe();
|
let mut frames = handle.subscribe();
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
|
let task = ingest.run(
|
||||||
|
transport,
|
||||||
|
StubDecoder,
|
||||||
|
RtspSessionConfig::new("rtsp://fake/0"),
|
||||||
|
);
|
||||||
let f1 = timeout(Duration::from_secs(1), frames.recv())
|
let f1 = timeout(Duration::from_secs(1), frames.recv())
|
||||||
.await
|
.await
|
||||||
.expect("first frame")
|
.expect("first frame")
|
||||||
|
|||||||
Reference in New Issue
Block a user