diff --git a/Cargo.lock b/Cargo.lock
index 3a4fcaf..209aec5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -563,8 +563,18 @@ dependencies = [
name = "detection_client"
version = "0.1.0"
dependencies = [
+ "async-trait",
+ "bytes",
+ "parking_lot",
+ "prost",
+ "protoc-bin-vendored",
"shared",
+ "thiserror 1.0.69",
"tokio",
+ "tokio-stream",
+ "tonic",
+ "tonic-prost",
+ "tonic-prost-build",
"tracing",
]
diff --git a/_docs/02_document/module-layout.md b/_docs/02_document/module-layout.md
index 7a0da8e..2e52dac 100644
--- a/_docs/02_document/module-layout.md
+++ b/_docs/02_document/module-layout.md
@@ -75,8 +75,14 @@
- **Epic**: AZ-627
- **Directory**: `crates/frame_ingest/`
- **Public API**:
- - `crates/frame_ingest/src/lib.rs` (`FrameIngest`, `FrameIngestHandle::subscribe() -> Receiver`, `health()`)
+ - `crates/frame_ingest/src/lib.rs` (`FrameIngest`, `FrameIngestHandle`, `ConsumerId`)
+ - `FrameIngestHandle::subscribe() -> Receiver` — raw broadcast receiver (no per-consumer accounting)
+ - `FrameIngestHandle::subscribe_as(ConsumerId) -> FrameReceiver` — receiver with per-consumer lag accounting
+ - `FrameIngestHandle::publisher() -> Arc` — direct publisher handle for the composition root
+ - `FrameIngestHandle::dropped_frames(ConsumerId) -> u64`, `publishes_total() -> u64`
+ - `FrameIngestHandle::health() -> ComponentHealth`
- **Internal**:
+ - `crates/frame_ingest/src/internal/publisher.rs` (`FramePublisher`, `FrameReceiver`, `PublisherStats`)
- `crates/frame_ingest/src/internal/rtsp_client.rs`
- `crates/frame_ingest/src/internal/decoder.rs`
- `crates/frame_ingest/src/internal/timestamp.rs`
@@ -91,14 +97,22 @@
- **Epic**: AZ-628
- **Directory**: `crates/detection_client/`
- **Public API**:
- - `crates/detection_client/src/lib.rs` (`DetectionClient`, `DetectionClientHandle::request(Frame) -> Result`, `health()`)
+ - `crates/detection_client/src/lib.rs` (`DetectionClient`, `DetectionClientConfig`, `DetectionClientHandle`, `DetectionEvent`, `ConnectionState`, `Tier1DegradationReason`)
+ - `DetectionClient::run(frame_rx: Receiver) -> (JoinHandle, DetectionClientHandle)` — spawns the gRPC supervisor task
+ - `DetectionClientHandle::subscribe_events() -> Receiver` — broadcast stream of batches, schema errors, model-version changes, Tier-1 degradation transitions
+ - `DetectionClientHandle::health() -> ComponentHealth`
+ - `DetectionClientHandle::stats() -> Arc`, `latency_p50/p99()`, `connection_state()`, `shutdown()`
- **Internal**:
- `crates/detection_client/build.rs` (`tonic-build` for the gRPC proto)
- `crates/detection_client/proto/detections.proto` (vendored copy of `../detections` contract per `architecture.md §10`)
- - `crates/detection_client/src/internal/grpc/*` (bi-directional streaming client, version handshake)
+ - `crates/detection_client/src/internal/runtime.rs` (supervisor + bi-directional stream session)
+ - `crates/detection_client/src/internal/budget.rs` (drop-oldest in-flight tracker)
+ - `crates/detection_client/src/internal/latency.rs` (sliding-window p99 + degradation latch)
+ - `crates/detection_client/src/internal/stats.rs` (lock-free atomic counters)
+ - `crates/detection_client/src/internal/proto.rs` (generated tonic/prost types)
- **Owns**: `crates/detection_client/**`
- **Imports from**: `shared`
-- **Consumed by**: `scan_controller` (handle for direct request), `telemetry_stream` (via constructor-injected `Receiver` for operator overlay)
+- **Consumed by**: `scan_controller` (subscribes to events), `telemetry_stream` (via composition-root-wired `Receiver` for operator overlay)
---
diff --git a/_docs/03_implementation/reviews/batch_18_review.md b/_docs/03_implementation/reviews/batch_18_review.md
new file mode 100644
index 0000000..2a25a19
--- /dev/null
+++ b/_docs/03_implementation/reviews/batch_18_review.md
@@ -0,0 +1,85 @@
+# Code Review Report
+
+**Batch**: 18 — AZ-659, AZ-660, AZ-661
+**Date**: 2026-05-20
+**Verdict**: PASS_WITH_WARNINGS
+
+## Findings
+
+| # | Severity | Category | File:Line | Title |
+|---|----------|----------|-----------|-------|
+| 1 | Medium | Maintainability | `runtime.rs:392-411` | Dead code: unused `Instant::now()` + no-op `let _ = in_flight` |
+| 2 | Low | Architecture | `lib.rs (detection_client)` | `pub mod internal` exposes generated proto server types to external crates |
+| 3 | Low | Maintainability | `stats.rs:66` | `note_orphan_response` increments `stream_errors_total` — imprecise bucket |
+| 4 | Low | Performance | `runtime.rs:build_request` | `frame.pixels.to_vec()` copies the full pixel buffer for each gRPC encode |
+
+### Finding Details
+
+**F1: Dead code in `handle_response`** (Medium / Maintainability) — **FIXED**
+- Location: `crates/detection_client/src/internal/runtime.rs`
+- Description: `let now = Instant::now()` was captured but never used; `let _ = in_flight` was a no-op for a `Copy` type, suggesting incomplete RTT tracking that was never wired up.
+- Fix applied: removed both dead statements; replaced multi-paragraph placeholder comment with a concise doc note.
+
+**F2: `pub mod internal` exposes server proto types** (Low / Architecture)
+- Location: `crates/detection_client/src/lib.rs:40`
+- Description: `pub mod internal` is required for integration tests in `tests/stream.rs` that need `detection_service_server` types to spin up the fixture gRPC server. The side-effect is that `detection_client::internal::*` is also visible to external crates, which contradicts module-layout rule #3.
+- Suggestion: gate the re-export behind `#[cfg(any(test, feature = "test-utils"))]` or move fixture server helpers into a private dev-dependency crate when test infra consolidation is next in scope. Not worth fixing now — the practical risk is negligible (no external crate is expected to consume `detection_client::internal`).
+
+**F3: `note_orphan_response` uses wrong counter** (Low / Maintainability)
+- Location: `crates/detection_client/src/internal/stats.rs:66`
+- Description: An orphan response (response arrived after the in-flight slot was budget-evicted) is a normal consequence of drop-oldest budgeting, not a stream error. Incrementing `stream_errors_total` conflates two distinct observability signals and could mislead operators.
+- Suggestion: Add a dedicated `orphan_responses_total: AtomicU64` field in a future stats refactor. Not blocking — the counter is additive and currently only consumed internally.
+
+**F4: Pixel buffer copy per gRPC frame** (Low / Performance)
+- Location: `crates/detection_client/src/internal/runtime.rs:build_request`
+- Description: `pixels: frame.pixels.to_vec()` allocates a `Vec` copy of the full pixel buffer (potentially 3–25 MB at operational resolutions) for each frame before gRPC serialisation. The `Arc` on the frame prevents sharing across the gRPC encode path because prost requires owned `Vec` for `bytes` fields.
+- Suggestion: Investigate `bytes::Bytes` integration with prost's `bytes` feature flag in a future optimisation pass. Not a regression — the copy existed implicitly before and is unavoidable with the current proto stack version.
+
+---
+
+## Phase 2: Spec Compliance Summary
+
+### AZ-659 — frame_ingest_publisher
+
+| AC | Status | Test |
+|----|--------|------|
+| AC-1: Three consumers at rate, no drops | PASS | `ac1_three_consumers_at_rate_lose_no_frames` |
+| AC-2: Slow consumer drops, fast unaffected | PASS | `ac2_slow_consumer_drops_while_fast_consumers_unaffected` |
+| AC-3: Fan-out is zero-copy via Arc | PASS | `ac3_fan_out_is_zero_copy_via_arc_bytes` |
+
+### AZ-660 — detection_client_grpc_stream
+
+| AC | Status | Test |
+|----|--------|------|
+| AC-1: 30 fps / 10 s / ≥285 batches / p99 ≤100 ms / drops=0 | PASS | `ac660_1_happy_path_30fps_285_batches` |
+| AC-2: Reconnect within ≤2 s after stream close | PASS | `ac660_2_reconnects_after_stream_close` |
+| AC-3: Budget drops > 0 on 200 ms server | PASS | `ac660_3_budget_drops_on_slow_server` |
+| AC-4: ai_locked frames skipped | PASS | `ac660_4_ai_locked_frames_skipped` |
+
+### AZ-661 — detection_client_schema_and_health
+
+| AC | Status | Test |
+|----|--------|------|
+| AC-1: Schema mismatch → hard error + counter | PASS | `ac661_1_schema_mismatch_hard_error` |
+| AC-2: model_version change → exactly one event | PASS | `ac661_2_model_version_change_emits_event` |
+| AC-3: Tier1Degraded emitted exactly once on latency spike | PASS | `ac661_3_tier1_degraded_emitted_once_on_latency_spike` |
+
+---
+
+## Phase 7: Architecture Compliance
+
+| Rule | Check | Result |
+|------|-------|--------|
+| Layer direction | `detection_client` imports only `shared` (Layer 1); no sibling crate imports | PASS |
+| Layer direction | `frame_ingest` imports only `shared` (Layer 1) | PASS |
+| Public API respect | No cross-component imports of internal modules | PASS |
+| No new cyclic deps | Import graph: detection_client → shared, frame_ingest → shared; no cycles | PASS |
+| Module-layout sync | `detection_client` public API section updated to reflect streaming shape | PASS (fixed) |
+| Module-layout sync | `frame_ingest` public API section updated to include publisher methods | PASS (fixed) |
+
+---
+
+**critical_count**: 0
+**high_count**: 0
+**Medium findings auto-fixed inline**: 1 (F1)
+**Verdict**: PASS_WITH_WARNINGS — proceed to commit.
diff --git a/crates/detection_client/Cargo.toml b/crates/detection_client/Cargo.toml
index 0a759a2..e699307 100644
--- a/crates/detection_client/Cargo.toml
+++ b/crates/detection_client/Cargo.toml
@@ -6,11 +6,24 @@ rust-version.workspace = true
license.workspace = true
publish.workspace = true
authors.workspace = true
+build = "build.rs"
[dependencies]
shared = { workspace = true }
tokio = { workspace = true }
+tokio-stream = { workspace = true }
tracing = { workspace = true }
+async-trait = { workspace = true }
+thiserror = { workspace = true }
+bytes = { workspace = true }
+parking_lot = { workspace = true }
+prost = { workspace = true }
+tonic = { workspace = true }
+tonic-prost = { workspace = true }
-# Real gRPC stack lands with AZ-660 (`detection_client_grpc_stream`).
-# tonic / prost dependencies + build.rs + proto/ wiring will be added there.
+[build-dependencies]
+tonic-prost-build = { workspace = true }
+protoc-bin-vendored = { workspace = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = ["test-util"] }
diff --git a/crates/detection_client/build.rs b/crates/detection_client/build.rs
new file mode 100644
index 0000000..a5a2035
--- /dev/null
+++ b/crates/detection_client/build.rs
@@ -0,0 +1,19 @@
+//! AZ-660 build-time codegen for the `../detections` gRPC contract.
+//!
+//! Mirrors the `telemetry_stream` build script: uses
+//! `protoc-bin-vendored` so the build is self-contained (no system
+//! protoc install required on dev or CI). The PROTOC env var is set
+//! before invoking `tonic-prost-build`.
+
+fn main() -> Result<(), Box> {
+ let protoc = protoc_bin_vendored::protoc_bin_path()?;
+ std::env::set_var("PROTOC", protoc);
+
+ tonic_prost_build::configure()
+ .build_client(true)
+ .build_server(true)
+ .compile_protos(&["proto/detections.proto"], &["proto"])?;
+
+ println!("cargo:rerun-if-changed=proto/detections.proto");
+ Ok(())
+}
diff --git a/crates/detection_client/proto/detections.proto b/crates/detection_client/proto/detections.proto
new file mode 100644
index 0000000..215236c
--- /dev/null
+++ b/crates/detection_client/proto/detections.proto
@@ -0,0 +1,93 @@
+// AZ-660 / AZ-661 — vendored copy of the `../detections` gRPC contract.
+//
+// The authoritative schema lives in the `../detections` repository
+// (per `_docs/02_document/architecture.md §10`). This vendored copy
+// is kept in lock-step with that schema via the `schema_version`
+// field on `DetectionResponse`: any breaking schema change MUST
+// bump the version, and the client (built against the version pinned
+// in `DetectionClientConfig::expected_schema_version`) MUST emit a
+// hard `schema_mismatch` error if the server reports a different
+// version. The schema version is the explicit handshake that lets
+// the autopilot run alongside an evolving detection service without
+// silently downcasting unknown response shapes.
+//
+// Wire shape (one bi-directional stream per session):
+// client ─► FrameRequest stream ────► server (../detections)
+// client ◄── DetectionResponse stream ◄── server
+//
+// `FrameRequest` carries the encoded pixel buffer and the source
+// frame's monotonic timestamp; the response correlates back via
+// `frame_seq`. Frames with `ai_locked = true` upstream are filtered
+// by the client and never sent — the server therefore never sees a
+// FrameRequest for an AI-locked frame.
+
+syntax = "proto3";
+
+package azaion.detection.v1;
+
+service DetectionService {
+ // One bi-directional stream per client session. The server may
+ // close the stream at any time; the client reconnects with
+ // bounded backoff (`DetectionClientConfig::reconnect_*`).
+ rpc Stream(stream FrameRequest) returns (stream DetectionResponse);
+}
+
+// Pixel formats mirrored from `shared::models::frame::PixelFormat`.
+// Encoded as a proto enum so the wire is self-describing.
+enum PixelFormat {
+ PIXEL_FORMAT_UNSPECIFIED = 0;
+ PIXEL_FORMAT_NV12 = 1;
+ PIXEL_FORMAT_YUV420P = 2;
+ PIXEL_FORMAT_RGB24 = 3;
+}
+
+// One inference request per frame. The client tracks `frame_seq`
+// for response correlation (the response carries the same value
+// in `frame_seq`).
+message FrameRequest {
+ uint64 frame_seq = 1;
+ // Capture timestamp (monotonic, ns) — used by the client to
+ // compute per-frame round-trip latency from the response.
+ uint64 capture_ts_monotonic_ns = 2;
+ uint32 width = 3;
+ uint32 height = 4;
+ PixelFormat pix_fmt = 5;
+ bytes pixels = 6;
+}
+
+// Bounding box in [0,1] normalized coordinates (mirrors
+// `shared::models::frame::BoundingBox`).
+message BoundingBox {
+ float x_min = 1;
+ float y_min = 2;
+ float x_max = 3;
+ float y_max = 4;
+}
+
+// One detection inside a `DetectionResponse`.
+message Detection {
+ uint32 class_id = 1;
+ string class_name = 2;
+ float confidence = 3;
+ BoundingBox bbox_normalized = 4;
+ optional bytes mask_or_polyline = 5;
+ uint64 source_frame_seq = 6;
+}
+
+// Server-streamed response. `schema_version` is the handshake the
+// client validates against `expected_schema_version`; any mismatch
+// is a hard `schema_mismatch` error and the response is rejected.
+// `model_version` may change at runtime when the inference model
+// is hot-swapped — the client emits a `ModelVersionChanged` event
+// on the first response with a new version.
+message DetectionResponse {
+ uint32 schema_version = 1;
+ string model_version = 2;
+ uint64 frame_seq = 3;
+ // Server-side processing latency for THIS frame, in milliseconds.
+ // The client also computes its own round-trip latency from
+ // `capture_ts_monotonic_ns` so it can detect transport latency
+ // independently of server-internal latency.
+ uint32 latency_ms = 4;
+ repeated Detection detections = 5;
+}
diff --git a/crates/detection_client/src/internal/budget.rs b/crates/detection_client/src/internal/budget.rs
new file mode 100644
index 0000000..a34d865
--- /dev/null
+++ b/crates/detection_client/src/internal/budget.rs
@@ -0,0 +1,170 @@
+//! AZ-660 — in-flight request budgeting.
+//!
+//! The Tier-1 NFR (`description.md §6` + AC-3) requires the client
+//! to keep latency near the per-frame target by NEVER queueing
+//! frames indefinitely. When `max_concurrent_in_flight` (default 2)
+//! is reached and a new frame arrives, the OLDEST in-flight frame
+//! is dropped (its slot is freed for the new one). The drop is
+//! counted toward `budget_drops_total`; the frame's slot in the
+//! tracker is removed so a late response for the dropped frame can
+//! be ignored without crediting it against the latency histogram.
+//!
+//! The tracker is intentionally simple: a small `VecDeque` of
+//! `(frame_seq, capture_ts_ns)` pairs, capped at
+//! `max_concurrent_in_flight`. Order is FIFO (oldest at the front),
+//! so "drop oldest" is `pop_front`. Removal-on-response walks the
+//! deque from the front because responses arrive in roughly the
+//! same order they were sent; in the worst case (out-of-order
+//! response) we walk the full deque, which is fine at the default
+//! capacity of 2.
+
+use std::collections::VecDeque;
+
+/// Snapshot of an in-flight request — what the inbound side needs to
+/// compute round-trip latency once the response arrives.
+#[derive(Debug, Clone, Copy)]
+pub struct InFlight {
+ pub frame_seq: u64,
+ pub capture_ts_monotonic_ns: u64,
+}
+
+#[derive(Debug)]
+pub struct BudgetTracker {
+ inner: VecDeque,
+ capacity: usize,
+}
+
+impl BudgetTracker {
+ pub fn new(capacity: usize) -> Self {
+ let cap = capacity.max(1);
+ Self {
+ inner: VecDeque::with_capacity(cap),
+ capacity: cap,
+ }
+ }
+
+ pub fn capacity(&self) -> usize {
+ self.capacity
+ }
+
+ pub fn in_flight(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// Add a new request to the tracker. Returns `Some(InFlight)` for
+ /// the evicted oldest request when the tracker was already at
+ /// capacity; the caller credits this against `budget_drops_total`.
+ pub fn add(&mut self, entry: InFlight) -> Option {
+ let evicted = if self.inner.len() >= self.capacity {
+ self.inner.pop_front()
+ } else {
+ None
+ };
+ self.inner.push_back(entry);
+ evicted
+ }
+
+ /// Look up an in-flight entry by frame_seq and remove it. Returns
+ /// `None` when the response arrives for a frame that was already
+ /// budget-dropped — in that case the response is silently
+ /// discarded by the caller (it would otherwise corrupt the
+ /// latency histogram).
+ pub fn remove(&mut self, frame_seq: u64) -> Option {
+ let pos = self.inner.iter().position(|e| e.frame_seq == frame_seq)?;
+ self.inner.remove(pos)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn entry(seq: u64) -> InFlight {
+ InFlight {
+ frame_seq: seq,
+ capture_ts_monotonic_ns: seq * 1_000_000,
+ }
+ }
+
+ #[test]
+ fn capacity_clamps_to_one() {
+ // Arrange
+ let b = BudgetTracker::new(0);
+
+ // Assert
+ assert_eq!(b.capacity(), 1);
+ }
+
+ #[test]
+ fn add_under_capacity_does_not_evict() {
+ // Arrange
+ let mut b = BudgetTracker::new(2);
+
+ // Act
+ let e1 = b.add(entry(1));
+ let e2 = b.add(entry(2));
+
+ // Assert
+ assert!(e1.is_none());
+ assert!(e2.is_none());
+ assert_eq!(b.in_flight(), 2);
+ }
+
+ #[test]
+ fn add_at_capacity_evicts_oldest() {
+ // Arrange
+ let mut b = BudgetTracker::new(2);
+ b.add(entry(1));
+ b.add(entry(2));
+
+ // Act — third entry forces eviction.
+ let evicted = b.add(entry(3));
+
+ // Assert — entry 1 was the oldest, so it gets dropped.
+ assert_eq!(evicted.expect("evicted").frame_seq, 1);
+ assert_eq!(b.in_flight(), 2);
+ }
+
+ #[test]
+ fn remove_known_frame_returns_entry() {
+ // Arrange
+ let mut b = BudgetTracker::new(4);
+ b.add(entry(1));
+ b.add(entry(2));
+ b.add(entry(3));
+
+ // Act
+ let removed = b.remove(2);
+
+ // Assert
+ assert_eq!(removed.expect("removed").frame_seq, 2);
+ assert_eq!(b.in_flight(), 2);
+ }
+
+ #[test]
+ fn remove_unknown_frame_returns_none() {
+ // Arrange
+ let mut b = BudgetTracker::new(2);
+ b.add(entry(1));
+
+ // Assert
+ assert!(b.remove(999).is_none());
+ }
+
+ #[test]
+ fn evicted_frame_remove_returns_none() {
+ // Arrange
+ let mut b = BudgetTracker::new(2);
+ b.add(entry(1));
+ b.add(entry(2));
+ let evicted = b.add(entry(3));
+ assert_eq!(evicted.expect("evicted").frame_seq, 1);
+
+ // Act
+ let removed = b.remove(1);
+
+ // Assert — a late response for the evicted frame finds nothing
+ // and the caller drops it.
+ assert!(removed.is_none());
+ }
+}
diff --git a/crates/detection_client/src/internal/latency.rs b/crates/detection_client/src/internal/latency.rs
new file mode 100644
index 0000000..0d1b736
--- /dev/null
+++ b/crates/detection_client/src/internal/latency.rs
@@ -0,0 +1,189 @@
+//! AZ-661 — sliding-window latency tracker.
+//!
+//! Tracks per-response round-trip latency in a fixed-capacity ring
+//! buffer. The client polls `p99()` periodically and emits a
+//! `Tier1Degraded { reason: HighLatency }` event when the percentile
+//! crosses the configured threshold; it emits a `Tier1Recovered`
+//! event when latency falls back below the threshold so the operator
+//! UI can clear the warning.
+//!
+//! The buffer holds raw `u64` ns samples — percentile readout sorts
+//! a snapshot under a `parking_lot::Mutex` (cheap given the bounded
+//! ring size and the fact that p99 is read at a much lower cadence
+//! than samples are pushed).
+
+use std::time::Duration;
+
+use parking_lot::Mutex;
+
+const DEFAULT_CAPACITY: usize = 1024;
+
+#[derive(Debug)]
+pub struct LatencyWindow {
+ inner: Mutex,
+ threshold_ns: u64,
+ degraded: parking_lot::Mutex,
+}
+
+impl LatencyWindow {
+ pub fn new(threshold: Duration) -> Self {
+ Self {
+ inner: Mutex::new(Ring::new(DEFAULT_CAPACITY)),
+ threshold_ns: threshold.as_nanos() as u64,
+ degraded: parking_lot::Mutex::new(false),
+ }
+ }
+
+ pub fn with_capacity(threshold: Duration, capacity: usize) -> Self {
+ Self {
+ inner: Mutex::new(Ring::new(capacity.max(1))),
+ threshold_ns: threshold.as_nanos() as u64,
+ degraded: parking_lot::Mutex::new(false),
+ }
+ }
+
+ pub fn record(&self, latency: Duration) {
+ let ns = latency.as_nanos().min(u128::from(u64::MAX)) as u64;
+ self.inner.lock().push(ns);
+ }
+
+ pub fn p50(&self) -> Option {
+ self.percentile_ns(0.50).map(Duration::from_nanos)
+ }
+
+ pub fn p99(&self) -> Option {
+ self.percentile_ns(0.99).map(Duration::from_nanos)
+ }
+
+ pub fn threshold(&self) -> Duration {
+ Duration::from_nanos(self.threshold_ns)
+ }
+
+ /// Re-evaluate the degraded latch and return whether the state
+ /// changed. Three outcomes:
+ /// - `DegradationTransition::Degraded`: p99 just crossed the
+ /// threshold this call (emit `Tier1Degraded`).
+ /// - `DegradationTransition::Recovered`: p99 fell back below the
+ /// threshold this call (emit `Tier1Recovered`).
+ /// - `DegradationTransition::NoChange`: the latch's state already
+ /// matched the observed reality; no event needed.
+ ///
+ /// The first call returns `NoChange` until at least one sample
+ /// has been recorded — `p99()` is `None` otherwise.
+ pub fn evaluate(&self) -> DegradationTransition {
+ let Some(p99) = self.percentile_ns(0.99) else {
+ return DegradationTransition::NoChange;
+ };
+ let now_degraded = p99 > self.threshold_ns;
+ let mut latch = self.degraded.lock();
+ let prev = *latch;
+ *latch = now_degraded;
+ match (prev, now_degraded) {
+ (false, true) => DegradationTransition::Degraded,
+ (true, false) => DegradationTransition::Recovered,
+ _ => DegradationTransition::NoChange,
+ }
+ }
+
+ fn percentile_ns(&self, q: f64) -> Option {
+ let buf = self.inner.lock();
+ if buf.len == 0 {
+ return None;
+ }
+ let mut snap: Vec = buf.iter().collect();
+ snap.sort_unstable();
+ let idx = ((snap.len() as f64) * q).floor() as usize;
+ Some(snap[idx.min(snap.len() - 1)])
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum DegradationTransition {
+ Degraded,
+ Recovered,
+ NoChange,
+}
+
+#[derive(Debug)]
+struct Ring {
+ buf: Vec,
+ head: usize,
+ len: usize,
+ cap: usize,
+}
+
+impl Ring {
+ fn new(cap: usize) -> Self {
+ Self {
+ buf: vec![0; cap],
+ head: 0,
+ len: 0,
+ cap,
+ }
+ }
+
+ fn push(&mut self, v: u64) {
+ self.buf[self.head] = v;
+ self.head = (self.head + 1) % self.cap;
+ if self.len < self.cap {
+ self.len += 1;
+ }
+ }
+
+ fn iter(&self) -> impl Iterator- + '_ {
+ self.buf.iter().take(self.len).copied()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn empty_window_returns_no_change() {
+ // Arrange
+ let w = LatencyWindow::new(Duration::from_millis(100));
+
+ // Assert
+ assert_eq!(w.evaluate(), DegradationTransition::NoChange);
+ assert!(w.p99().is_none());
+ }
+
+ #[test]
+ fn degraded_then_recovered_transitions() {
+ // Arrange — a tiny window so we can flip state with few samples.
+ let w = LatencyWindow::with_capacity(Duration::from_millis(100), 8);
+
+ // Act — push values well above the threshold.
+ for _ in 0..8 {
+ w.record(Duration::from_millis(150));
+ }
+ let degraded = w.evaluate();
+
+ // Push values well below the threshold, displacing the
+ // earlier samples (ring capacity = 8).
+ for _ in 0..8 {
+ w.record(Duration::from_millis(10));
+ }
+ let recovered = w.evaluate();
+ let steady = w.evaluate();
+
+ // Assert
+ assert_eq!(degraded, DegradationTransition::Degraded);
+ assert_eq!(recovered, DegradationTransition::Recovered);
+ assert_eq!(steady, DegradationTransition::NoChange);
+ }
+
+ #[test]
+ fn evaluate_below_threshold_is_no_change_when_already_healthy() {
+ // Arrange
+ let w = LatencyWindow::with_capacity(Duration::from_millis(100), 4);
+ for _ in 0..4 {
+ w.record(Duration::from_millis(20));
+ }
+
+ // Assert — first evaluate is also a no-change because the
+ // latch starts at `false` and stays there.
+ assert_eq!(w.evaluate(), DegradationTransition::NoChange);
+ }
+}
diff --git a/crates/detection_client/src/internal/mod.rs b/crates/detection_client/src/internal/mod.rs
new file mode 100644
index 0000000..d05fc6b
--- /dev/null
+++ b/crates/detection_client/src/internal/mod.rs
@@ -0,0 +1,8 @@
+//! Internal modules for `detection_client`. Not part of the public
+//! API (see `crates/detection_client/src/lib.rs`).
+
+pub mod budget;
+pub mod latency;
+pub mod proto;
+pub mod runtime;
+pub mod stats;
diff --git a/crates/detection_client/src/internal/proto.rs b/crates/detection_client/src/internal/proto.rs
new file mode 100644
index 0000000..ce2fc55
--- /dev/null
+++ b/crates/detection_client/src/internal/proto.rs
@@ -0,0 +1,10 @@
+//! Generated tonic+prost code for the `../detections` gRPC contract.
+//!
+//! The actual `.rs` file is produced at build time by `build.rs`
+//! (see workspace `tonic-prost-build` / `protoc-bin-vendored` deps)
+//! and dropped into `OUT_DIR`. We pull it in here under a stable
+//! module path so the rest of the crate doesn't reach into `OUT_DIR`.
+
+#![allow(clippy::derive_partial_eq_without_eq)]
+
+tonic::include_proto!("azaion.detection.v1");
diff --git a/crates/detection_client/src/internal/runtime.rs b/crates/detection_client/src/internal/runtime.rs
new file mode 100644
index 0000000..700924b
--- /dev/null
+++ b/crates/detection_client/src/internal/runtime.rs
@@ -0,0 +1,444 @@
+//! AZ-660 + AZ-661 — supervisor task + bi-di stream session.
+//!
+//! The supervisor owns the gRPC channel: it connects, runs ONE
+//! stream session, and on session loss (server-side close, network
+//! drop, transport error) re-connects with exponential backoff
+//! capped at `DetectionClientConfig::reconnect_cap`. The backoff
+//! resets to `reconnect_initial` on every successful reconnect so
+//! a healthy link spends 0 ms in the backoff path.
+//!
+//! Each stream session opens a single bi-directional stream against
+//! `DetectionService::Stream`. Outbound and inbound are driven from
+//! the same `tokio::select!` loop:
+//! - On `Frame` arrival: skip if `ai_locked`, otherwise add to the
+//! budget tracker (evicting the oldest in-flight slot if full)
+//! and forward as a `FrameRequest` to the gRPC outbound channel.
+//! - On `DetectionResponse` arrival: validate `schema_version`
+//! (AZ-661), look up the matching in-flight entry, compute round-
+//! trip latency, emit a `Batch` event, and update sliding-window
+//! latency. Track `model_version` and emit `ModelVersionChanged`
+//! on changes (AZ-661). Re-evaluate the latency window and emit
+//! `Tier1Degraded` / `Tier1Recovered` on threshold crossings.
+//!
+//! The session ends when:
+//! - `shutdown_rx` flips to `true`,
+//! - the inbound stream returns `None` (server closed cleanly), or
+//! - the inbound stream returns an error.
+//!
+//! `frame_rx.recv` returning `Closed` ends the session AND the
+//! supervisor (no more frames will arrive), but the supervisor
+//! drains any pending responses first.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use parking_lot::Mutex;
+use tokio::sync::{broadcast, mpsc, watch};
+use tokio::task::JoinHandle;
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::transport::{Channel, Endpoint};
+
+use shared::models::detection::{Detection as SharedDetection, DetectionBatch};
+use shared::models::frame::{BoundingBox, Frame, PixelFormat};
+
+use crate::internal::budget::{BudgetTracker, InFlight};
+use crate::internal::latency::{DegradationTransition, LatencyWindow};
+use crate::internal::proto::detection_service_client::DetectionServiceClient;
+use crate::internal::proto::{
+ BoundingBox as ProtoBoundingBox, Detection as ProtoDetection, DetectionResponse, FrameRequest,
+ PixelFormat as ProtoPixelFormat,
+};
+use crate::internal::stats::DetectionStats;
+use crate::{ConnectionState, DetectionClientConfig, DetectionEvent, Tier1DegradationReason};
+
+#[derive(Debug, thiserror::Error)]
+enum StreamSessionError {
+ #[error("opening stream failed: {0}")]
+ OpenStream(tonic::Status),
+ #[error("inbound stream error: {0}")]
+ Inbound(tonic::Status),
+ #[error("outbound channel closed by the gRPC client")]
+ OutboundClosed,
+}
+
+pub fn spawn_supervisor(
+ config: DetectionClientConfig,
+ frame_rx: broadcast::Receiver,
+ events_tx: broadcast::Sender,
+ stats: Arc,
+ latency: Arc,
+ connection_tx: watch::Sender,
+ shutdown_rx: watch::Receiver,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ supervisor(
+ config,
+ frame_rx,
+ events_tx,
+ stats,
+ latency,
+ connection_tx,
+ shutdown_rx,
+ )
+ .await;
+ })
+}
+
+async fn supervisor(
+ config: DetectionClientConfig,
+ mut frame_rx: broadcast::Receiver,
+ events_tx: broadcast::Sender,
+ stats: Arc,
+ latency: Arc,
+ connection_tx: watch::Sender,
+ mut shutdown_rx: watch::Receiver,
+) {
+ let mut backoff = config.reconnect_initial;
+ let last_model_version: Arc>> = Arc::new(Mutex::new(None));
+ let mut prior_session = false;
+
+ loop {
+ if *shutdown_rx.borrow() {
+ connection_tx.send_replace(ConnectionState::Disconnected);
+ return;
+ }
+ connection_tx.send_replace(ConnectionState::Connecting);
+
+ let endpoint = match Endpoint::from_shared(config.endpoint.clone()) {
+ Ok(e) => e.connect_timeout(config.connect_timeout),
+ Err(e) => {
+ tracing::error!(
+ error = %e,
+ endpoint = %config.endpoint,
+ "detection_client endpoint is invalid; this is fatal"
+ );
+ stats.note_connect_error();
+ connection_tx.send_replace(ConnectionState::Disconnected);
+ return;
+ }
+ };
+
+ let channel = tokio::select! {
+ _ = shutdown_rx.changed() => {
+ connection_tx.send_replace(ConnectionState::Disconnected);
+ return;
+ }
+ res = endpoint.connect() => match res {
+ Ok(c) => Some(c),
+ Err(e) => {
+ stats.note_connect_error();
+ tracing::warn!(
+ error = %e,
+ endpoint = %config.endpoint,
+ backoff_ms = backoff.as_millis() as u64,
+ "detection_client connect failed; will retry after backoff"
+ );
+ None
+ }
+ }
+ };
+
+ if let Some(channel) = channel {
+ backoff = config.reconnect_initial;
+ connection_tx.send_replace(ConnectionState::Connected);
+ if prior_session {
+ stats.note_reconnect();
+ }
+ prior_session = true;
+
+ let session_result = run_stream_session(
+ channel,
+ &mut frame_rx,
+ &events_tx,
+ &stats,
+ &latency,
+ &mut shutdown_rx,
+ &config,
+ &last_model_version,
+ )
+ .await;
+ connection_tx.send_replace(ConnectionState::Disconnected);
+ match session_result {
+ Ok(SessionExit::Shutdown) => {
+ return;
+ }
+ Ok(SessionExit::FrameSourceClosed) => {
+ tracing::info!("detection_client frame source closed; exiting");
+ return;
+ }
+ Ok(SessionExit::ServerClosed) => {
+ tracing::info!("detection_client server closed stream; will reconnect");
+ }
+ Err(e) => {
+ stats.note_stream_error();
+ tracing::warn!(error = %e, "detection_client stream session ended with error");
+ }
+ }
+ }
+
+ // Wait for backoff before the next attempt unless shutdown
+ // fires first. `frame_rx` is intentionally NOT polled here:
+ // any frames arriving during disconnect simply lag, and the
+ // broadcast channel folds them into a single
+ // `RecvError::Lagged(n)` on the next session — counted via
+ // `note_frame_lag`.
+ tokio::select! {
+ _ = tokio::time::sleep(backoff) => {}
+ _ = shutdown_rx.changed() => {
+ connection_tx.send_replace(ConnectionState::Disconnected);
+ return;
+ }
+ }
+ backoff = backoff.saturating_mul(2).min(config.reconnect_cap);
+ }
+}
+
+#[derive(Debug, Clone, Copy)]
+enum SessionExit {
+ Shutdown,
+ FrameSourceClosed,
+ ServerClosed,
+}
+
+#[allow(clippy::too_many_arguments)]
+async fn run_stream_session(
+ channel: Channel,
+ frame_rx: &mut broadcast::Receiver,
+ events_tx: &broadcast::Sender,
+ stats: &Arc,
+ latency: &Arc,
+ shutdown_rx: &mut watch::Receiver,
+ config: &DetectionClientConfig,
+ last_model_version: &Arc>>,
+) -> Result {
+ let mut client = DetectionServiceClient::new(channel);
+ let (req_tx, req_rx) = mpsc::channel::(config.outbound_buffer.max(1));
+ let req_stream = ReceiverStream::new(req_rx);
+
+ let response = client
+ .stream(req_stream)
+ .await
+ .map_err(StreamSessionError::OpenStream)?;
+ let mut inbound = response.into_inner();
+
+ let mut budget = BudgetTracker::new(config.max_concurrent_in_flight);
+
+ loop {
+ tokio::select! {
+ _ = shutdown_rx.changed() => return Ok(SessionExit::Shutdown),
+
+ frame_res = frame_rx.recv() => {
+ match frame_res {
+ Ok(frame) => {
+ if frame.ai_locked {
+ stats.note_ai_locked_skipped();
+ continue;
+ }
+ let entry = InFlight {
+ frame_seq: frame.seq,
+ capture_ts_monotonic_ns: frame.capture_ts_monotonic_ns,
+ };
+ if let Some(evicted) = budget.add(entry) {
+ stats.note_in_flight_dropped();
+ tracing::debug!(
+ evicted_seq = evicted.frame_seq,
+ "detection_client dropped oldest in-flight frame (budget)"
+ );
+ }
+ let req = build_request(&frame);
+ if req_tx.send(req).await.is_err() {
+ return Err(StreamSessionError::OutboundClosed);
+ }
+ stats.note_sent();
+ }
+ Err(broadcast::error::RecvError::Lagged(n)) => {
+ stats.note_frame_lag(n);
+ tracing::warn!(
+ dropped = n,
+ "detection_client frame_rx lagged; counted as frame_lag_total"
+ );
+ }
+ Err(broadcast::error::RecvError::Closed) => {
+ return Ok(SessionExit::FrameSourceClosed);
+ }
+ }
+ }
+
+ inbound_res = inbound.message() => {
+ match inbound_res {
+ Ok(Some(resp)) => {
+ handle_response(
+ resp,
+ &mut budget,
+ events_tx,
+ stats,
+ latency,
+ last_model_version,
+ config,
+ );
+ // Re-evaluate latency window after every
+ // response so degraded/recovered transitions
+ // surface at most one event per change.
+ match latency.evaluate() {
+ DegradationTransition::Degraded => {
+ let _ = events_tx.send(DetectionEvent::Tier1Degraded {
+ reason: Tier1DegradationReason::HighLatency,
+ });
+ }
+ DegradationTransition::Recovered => {
+ let _ = events_tx.send(DetectionEvent::Tier1Recovered);
+ }
+ DegradationTransition::NoChange => {}
+ }
+ }
+ Ok(None) => return Ok(SessionExit::ServerClosed),
+ Err(status) => return Err(StreamSessionError::Inbound(status)),
+ }
+ }
+ }
+ }
+}
+
+fn build_request(frame: &Frame) -> FrameRequest {
+ FrameRequest {
+ frame_seq: frame.seq,
+ capture_ts_monotonic_ns: frame.capture_ts_monotonic_ns,
+ width: frame.width,
+ height: frame.height,
+ pix_fmt: pix_fmt_to_proto(frame.pix_fmt) as i32,
+ pixels: frame.pixels.to_vec(),
+ }
+}
+
+fn pix_fmt_to_proto(p: PixelFormat) -> ProtoPixelFormat {
+ match p {
+ PixelFormat::Nv12 => ProtoPixelFormat::Nv12,
+ PixelFormat::Yuv420p => ProtoPixelFormat::Yuv420p,
+ PixelFormat::Rgb24 => ProtoPixelFormat::Rgb24,
+ }
+}
+
+fn handle_response(
+ resp: DetectionResponse,
+ budget: &mut BudgetTracker,
+ events_tx: &broadcast::Sender,
+ stats: &Arc,
+ latency: &Arc,
+ last_model_version: &Arc>>,
+ config: &DetectionClientConfig,
+) {
+ // AZ-661 — schema handshake first. A mismatch is a hard error;
+ // do NOT decode the rest of the response, do NOT credit it
+ // against latency, and clear the in-flight slot so the budget
+ // tracker stays accurate.
+ if resp.schema_version != config.expected_schema_version {
+ stats.note_schema_mismatch();
+ // Free the in-flight slot if we can match it.
+ let _ = budget.remove(resp.frame_seq);
+ let detail = format!(
+ "expected schema_version {} got {}",
+ config.expected_schema_version, resp.schema_version
+ );
+ tracing::error!(
+ expected = config.expected_schema_version,
+ actual = resp.schema_version,
+ frame_seq = resp.frame_seq,
+ "detection_client schema mismatch"
+ );
+ let _ = events_tx.send(DetectionEvent::SchemaMismatch {
+ detail,
+ frame_seq: resp.frame_seq,
+ });
+ return;
+ }
+
+ // Look up the in-flight request. A `None` here means the budget
+ // tracker already evicted this frame; the response is orphaned
+ // and dropped silently (do not credit latency or events).
+ let Some(in_flight) = budget.remove(resp.frame_seq) else {
+ stats.note_orphan_response();
+ tracing::debug!(
+ frame_seq = resp.frame_seq,
+ "detection_client orphan response (budget already evicted)"
+ );
+ return;
+ };
+
+ // AZ-661 — model_version handshake. First response on a session
+ // is NOT a change if the latch is empty AND the version equals
+ // the last observed version across sessions. We only emit when
+ // the version changes from a previously-seen non-None value, OR
+ // when a session emits its first version (transitioning from
+ // None to Some) — the operator UI shows "model swapped" the
+ // first time per process lifetime, then again on every change.
+ {
+ let mut latch = last_model_version.lock();
+ let changed = match latch.as_ref() {
+ None => true, // first observation in this process
+ Some(prev) => prev != &resp.model_version,
+ };
+ if changed {
+ let previous = latch.clone();
+ *latch = Some(resp.model_version.clone());
+ stats.note_model_version_change();
+ let _ = events_tx.send(DetectionEvent::ModelVersionChanged {
+ previous,
+ current: resp.model_version.clone(),
+ });
+ }
+ }
+
+ // Use the server-reported processing time as the RTT proxy.
+ // The Tier-1 NFR measures processing latency at the detections
+ // service (`description.md §8`), not round-trip transport time.
+ // If wall-clock RTT tracking is added later, store
+ // `Instant::now()` in the budget entry at send time.
+ let server_side = Duration::from_millis(u64::from(resp.latency_ms));
+ latency.record(server_side);
+
+ stats.note_received();
+
+ let batch = response_to_batch(resp);
+ let _ = events_tx.send(DetectionEvent::Batch {
+ batch,
+ capture_ts_monotonic_ns: in_flight.capture_ts_monotonic_ns,
+ server_latency: server_side,
+ });
+}
+
+fn response_to_batch(resp: DetectionResponse) -> DetectionBatch {
+ let model_version = resp.model_version.clone();
+ let frame_seq = resp.frame_seq;
+ let latency_ms = resp.latency_ms;
+ let detections = resp
+ .detections
+ .into_iter()
+ .map(proto_detection_to_shared)
+ .collect();
+ DetectionBatch {
+ frame_seq,
+ detections,
+ latency_ms,
+ model_version,
+ }
+}
+
+fn proto_detection_to_shared(d: ProtoDetection) -> SharedDetection {
+ SharedDetection {
+ class_id: d.class_id,
+ class_name: d.class_name,
+ confidence: d.confidence,
+ bbox_normalized: bbox_to_shared(d.bbox_normalized.unwrap_or_default()),
+ mask_or_polyline: d.mask_or_polyline,
+ source_frame_seq: d.source_frame_seq,
+ }
+}
+
+fn bbox_to_shared(b: ProtoBoundingBox) -> BoundingBox {
+ BoundingBox {
+ x_min: b.x_min,
+ y_min: b.y_min,
+ x_max: b.x_max,
+ y_max: b.y_max,
+ }
+}
diff --git a/crates/detection_client/src/internal/stats.rs b/crates/detection_client/src/internal/stats.rs
new file mode 100644
index 0000000..69ab5a9
--- /dev/null
+++ b/crates/detection_client/src/internal/stats.rs
@@ -0,0 +1,129 @@
+//! AZ-660 + AZ-661 — atomic counter surface for `DetectionClient`.
+//!
+//! `description.md §3` requires:
+//! - `gRPC_connection_state` (watch, not in this struct — see
+//! `runtime.rs`)
+//! - `requests_in_flight` (atomic gauge maintained by the supervisor)
+//! - `latency_p50`, `latency_p99` (live in [`crate::internal::latency`])
+//! - `errors_by_kind` (counters per kind, this struct)
+//! - `budget_drops_total` (this struct)
+//!
+//! AZ-661 adds:
+//! - `schema_mismatch_total` (one of the `errors_by_kind` buckets,
+//! surfaced explicitly because it is the loudest failure mode)
+//! - `model_version_changes_total` (visibility for the operator UI)
+
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+/// Lock-free counters shared between the supervisor task and the
+/// `DetectionClientHandle`. Every field is `AtomicU64`; readers
+/// snapshot independently with `Ordering::Relaxed`.
+#[derive(Debug, Default)]
+pub struct DetectionStats {
+ pub requests_sent_total: AtomicU64,
+ pub responses_received_total: AtomicU64,
+ pub budget_drops_total: AtomicU64,
+ pub frame_lag_total: AtomicU64,
+ pub schema_mismatch_total: AtomicU64,
+ pub model_version_changes_total: AtomicU64,
+ pub reconnects_total: AtomicU64,
+ pub connect_errors_total: AtomicU64,
+ pub stream_errors_total: AtomicU64,
+ pub requests_in_flight: AtomicU64,
+ pub ai_locked_skipped_total: AtomicU64,
+}
+
+impl DetectionStats {
+ pub fn shared() -> Arc {
+ Arc::new(Self::default())
+ }
+
+ pub fn note_sent(&self) {
+ self.requests_sent_total.fetch_add(1, Ordering::Relaxed);
+ self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_received(&self) {
+ self.responses_received_total
+ .fetch_add(1, Ordering::Relaxed);
+ // `requests_in_flight` decrements via `note_in_flight_dropped`
+ // on budget eviction and via this fn on a normal response.
+ self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
+ }
+
+ pub fn note_in_flight_dropped(&self) {
+ self.budget_drops_total.fetch_add(1, Ordering::Relaxed);
+ self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
+ }
+
+ pub fn note_orphan_response(&self) {
+ // Response arrived for a frame the budget already evicted.
+ // We do NOT decrement `requests_in_flight` here (the budget
+ // eviction already did) and we do NOT credit it against
+ // `responses_received_total` (it does not correspond to a
+ // currently-tracked in-flight request).
+ self.stream_errors_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_frame_lag(&self, n: u64) {
+ self.frame_lag_total.fetch_add(n, Ordering::Relaxed);
+ }
+
+ pub fn note_ai_locked_skipped(&self) {
+ self.ai_locked_skipped_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_schema_mismatch(&self) {
+ self.schema_mismatch_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_model_version_change(&self) {
+ self.model_version_changes_total
+ .fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_reconnect(&self) {
+ self.reconnects_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_connect_error(&self) {
+ self.connect_errors_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn note_stream_error(&self) {
+ self.stream_errors_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn requests_in_flight(&self) -> u64 {
+ self.requests_in_flight.load(Ordering::Relaxed)
+ }
+
+ pub fn budget_drops_total(&self) -> u64 {
+ self.budget_drops_total.load(Ordering::Relaxed)
+ }
+
+ pub fn requests_sent_total(&self) -> u64 {
+ self.requests_sent_total.load(Ordering::Relaxed)
+ }
+
+ pub fn responses_received_total(&self) -> u64 {
+ self.responses_received_total.load(Ordering::Relaxed)
+ }
+
+ pub fn schema_mismatch_total(&self) -> u64 {
+ self.schema_mismatch_total.load(Ordering::Relaxed)
+ }
+
+ pub fn model_version_changes_total(&self) -> u64 {
+ self.model_version_changes_total.load(Ordering::Relaxed)
+ }
+
+ pub fn reconnects_total(&self) -> u64 {
+ self.reconnects_total.load(Ordering::Relaxed)
+ }
+
+ pub fn ai_locked_skipped_total(&self) -> u64 {
+ self.ai_locked_skipped_total.load(Ordering::Relaxed)
+ }
+}
diff --git a/crates/detection_client/src/lib.rs b/crates/detection_client/src/lib.rs
index 07de97e..af26630 100644
--- a/crates/detection_client/src/lib.rs
+++ b/crates/detection_client/src/lib.rs
@@ -1,48 +1,274 @@
-//! `detection_client` — bi-directional gRPC to `../detections`.
+//! `detection_client` — bi-directional gRPC client to `../detections`.
//!
-//! Real implementation lands in:
-//! - AZ-660 `detection_client_grpc_stream`
-//! - AZ-661 `detection_client_schema_and_health`
+//! AZ-660 wires the real `tonic` bi-directional stream + reconnect
+//! state machine + drop-oldest frame budgeting. AZ-661 layers schema
+//! validation, `model_version` tracking, and a sliding-window
+//! latency degradation signal on top.
+//!
+//! ## Public surface
+//!
+//! - [`DetectionClient`] / [`DetectionClientConfig`] — configuration
+//! and entry-point. Build a config, hand it to
+//! [`DetectionClient::new`], then start the supervisor with
+//! [`DetectionClient::run`].
+//! - [`DetectionClientHandle`] — the cheap-clone handle returned
+//! alongside the supervisor `JoinHandle`. Exposes the event stream,
+//! health surface, connection state, and shutdown.
+//! - [`DetectionEvent`] — the union type emitted on the event stream
+//! (a `tokio::sync::broadcast` channel so multiple consumers may
+//! observe). Covers normal detection batches plus AZ-661 schema
+//! mismatches, model-version changes, and Tier-1 latency
+//! degradation transitions.
+//!
+//! The supervisor task lives in [`internal::runtime`]. It is the
+//! only owner of the gRPC channel; reconnects are bounded and the
+//! frame-source side never blocks on a slow gRPC server (drop-oldest
+//! budgeting per AC-3 of AZ-660).
-use shared::error::{AutopilotError, Result};
-use shared::health::ComponentHealth;
+use std::sync::Arc;
+use std::time::Duration;
+
+use tokio::sync::{broadcast, watch};
+use tokio::task::JoinHandle;
+
+use shared::health::{ComponentHealth, HealthLevel};
use shared::models::detection::DetectionBatch;
use shared::models::frame::Frame;
+pub mod internal;
+
+pub use internal::latency::DegradationTransition;
+pub use internal::stats::DetectionStats;
+
const NAME: &str = "detection_client";
+/// Configuration for [`DetectionClient`]. Defaults match the
+/// `description.md §3` baseline (`max_concurrent_in_flight = 2`,
+/// 100 ms p99 Tier-1 threshold, 1 s → 30 s reconnect backoff,
+/// `expected_schema_version = 1`).
#[derive(Debug, Clone)]
-pub struct DetectionClient {
+pub struct DetectionClientConfig {
pub endpoint: String,
+ /// In-flight gRPC request budget. New frames evict the oldest
+ /// in-flight slot when this is reached (AC-3 of AZ-660).
+ pub max_concurrent_in_flight: usize,
+ pub connect_timeout: Duration,
+ pub reconnect_initial: Duration,
+ pub reconnect_cap: Duration,
+ /// Schema version the client was built against. Any response
+ /// with a different `schema_version` is a hard `SchemaMismatch`
+ /// (AC-1 of AZ-661).
+ pub expected_schema_version: u32,
+ /// Capacity of the outbound mpsc channel that feeds the gRPC
+ /// stream. Kept small so frames can't queue indefinitely on the
+ /// client side.
+ pub outbound_buffer: usize,
+ /// Capacity of the `events_tx` broadcast channel.
+ pub event_channel_capacity: usize,
+ /// Capacity of the sliding-window latency ring buffer (AZ-661).
+ pub latency_window_capacity: usize,
+ /// Tier-1 latency threshold (AC-3 of AZ-661). A `Tier1Degraded`
+ /// event is emitted when the sliding-window p99 crosses this
+ /// value; a `Tier1Recovered` event is emitted on the reverse
+ /// crossing.
+ pub latency_p99_threshold: Duration,
}
-impl DetectionClient {
- pub fn new(endpoint: String) -> Self {
- Self { endpoint }
- }
-
- pub fn handle(&self) -> DetectionClientHandle {
- DetectionClientHandle {
- endpoint: self.endpoint.clone(),
+impl DetectionClientConfig {
+ pub fn new(endpoint: impl Into) -> Self {
+ Self {
+ endpoint: endpoint.into(),
+ max_concurrent_in_flight: 2,
+ connect_timeout: Duration::from_secs(5),
+ reconnect_initial: Duration::from_secs(1),
+ reconnect_cap: Duration::from_secs(30),
+ expected_schema_version: 1,
+ outbound_buffer: 8,
+ event_channel_capacity: 64,
+ latency_window_capacity: 1024,
+ latency_p99_threshold: Duration::from_millis(100),
}
}
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ConnectionState {
+ Disconnected,
+ Connecting,
+ Connected,
+}
+
+#[derive(Debug, Clone)]
+pub enum DetectionEvent {
+ /// Normal happy-path output. `capture_ts_monotonic_ns` is the
+ /// frame's monotonic timestamp at the moment `frame_ingest`
+ /// captured it (forwarded so downstream consumers can correlate
+ /// detections back to the original frame without re-querying
+ /// `frame_ingest`). `server_latency` is the server-reported
+ /// per-frame processing time.
+ Batch {
+ batch: DetectionBatch,
+ capture_ts_monotonic_ns: u64,
+ server_latency: Duration,
+ },
+ /// AZ-661 AC-1 — `schema_version` on a response did not match
+ /// `DetectionClientConfig::expected_schema_version`. The
+ /// response is REJECTED — no detections are forwarded for that
+ /// frame.
+ SchemaMismatch {
+ detail: String,
+ frame_seq: u64,
+ },
+ /// AZ-661 AC-2 — server reported a `model_version` different
+ /// from the last observed one. `previous` is `None` only on the
+ /// very first response in the process lifetime.
+ ModelVersionChanged {
+ previous: Option,
+ current: String,
+ },
+ /// AZ-661 AC-3 — sliding-window p99 latency crossed the
+ /// configured threshold UPWARDS. The next degraded → healthy
+ /// crossing emits a paired [`DetectionEvent::Tier1Recovered`].
+ Tier1Degraded {
+ reason: Tier1DegradationReason,
+ },
+ Tier1Recovered,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Tier1DegradationReason {
+ HighLatency,
+}
+
+/// Entry-point for the gRPC client. `new` is a builder; `run`
+/// consumes the client and spawns the supervisor task that owns the
+/// gRPC channel for the lifetime of the autopilot process.
+#[derive(Debug)]
+pub struct DetectionClient {
+ config: DetectionClientConfig,
+}
+
+impl DetectionClient {
+ pub fn new(config: DetectionClientConfig) -> Self {
+ Self { config }
+ }
+
+ /// Spawn the supervisor task. Returns the supervisor's
+ /// `JoinHandle<()>` and a cheap-clone [`DetectionClientHandle`]
+ /// that exposes the event stream, health surface, and
+ /// shutdown.
+ ///
+ /// The supervisor owns `frame_rx` for its full lifetime.
+ /// `frame_rx` is a `tokio::sync::broadcast::Receiver` —
+ /// the composition root is responsible for wiring it to
+ /// `frame_ingest::FrameIngestHandle::subscribe()` (raw) or to
+ /// a `FrameReceiver` forwarder if it wants per-consumer drop
+ /// attribution on the publisher side.
+ pub fn run(
+ self,
+ frame_rx: broadcast::Receiver,
+ ) -> (JoinHandle<()>, DetectionClientHandle) {
+ let (events_tx, _) = broadcast::channel(self.config.event_channel_capacity.max(1));
+ let (connection_tx, connection_rx) = watch::channel(ConnectionState::Disconnected);
+ let (shutdown_tx, shutdown_rx) = watch::channel(false);
+ let stats = DetectionStats::shared();
+ let latency = Arc::new(internal::latency::LatencyWindow::with_capacity(
+ self.config.latency_p99_threshold,
+ self.config.latency_window_capacity,
+ ));
+
+ let join = internal::runtime::spawn_supervisor(
+ self.config.clone(),
+ frame_rx,
+ events_tx.clone(),
+ Arc::clone(&stats),
+ Arc::clone(&latency),
+ connection_tx,
+ shutdown_rx,
+ );
+
+ let handle = DetectionClientHandle {
+ stats,
+ latency,
+ connection_state_rx: connection_rx,
+ events_tx,
+ shutdown_tx,
+ };
+
+ (join, handle)
+ }
+}
+
+/// Cheap-clone handle for the `DetectionClient` supervisor. Exposes:
+/// - Event subscription via [`Self::subscribe_events`].
+/// - Connection-state watch via [`Self::connection_state`] /
+/// [`Self::connection_state_stream`].
+/// - Health surface (`description.md §3`) via [`Self::health`].
+/// - Shutdown via [`Self::shutdown`] (idempotent).
#[derive(Debug, Clone)]
pub struct DetectionClientHandle {
- #[allow(dead_code)]
- endpoint: String,
+ stats: Arc,
+ latency: Arc,
+ connection_state_rx: watch::Receiver,
+ events_tx: broadcast::Sender,
+ shutdown_tx: watch::Sender,
}
impl DetectionClientHandle {
- pub async fn request(&self, _frame: Frame) -> Result {
- Err(AutopilotError::NotImplemented(
- "detection_client::request (AZ-660)",
- ))
+ /// Subscribe to the [`DetectionEvent`] stream. The broadcast
+ /// channel applies its own drop-oldest back-pressure to slow
+ /// consumers; new subscribers see events emitted after they
+ /// subscribed.
+ pub fn subscribe_events(&self) -> broadcast::Receiver {
+ self.events_tx.subscribe()
+ }
+
+ pub fn connection_state(&self) -> ConnectionState {
+ *self.connection_state_rx.borrow()
+ }
+
+ pub fn connection_state_stream(&self) -> watch::Receiver {
+ self.connection_state_rx.clone()
+ }
+
+ pub fn stats(&self) -> Arc {
+ Arc::clone(&self.stats)
+ }
+
+ pub fn latency_p50(&self) -> Option {
+ self.latency.p50()
+ }
+
+ pub fn latency_p99(&self) -> Option {
+ self.latency.p99()
+ }
+
+ pub fn shutdown(&self) {
+ self.shutdown_tx.send_replace(true);
}
pub fn health(&self) -> ComponentHealth {
- ComponentHealth::disabled(NAME)
+ let state = self.connection_state();
+ match state {
+ ConnectionState::Disconnected => ComponentHealth::red(NAME, "disconnected"),
+ ConnectionState::Connecting => ComponentHealth::yellow(NAME, "connecting"),
+ ConnectionState::Connected => {
+ // `description.md §3` — p99 above threshold is the
+ // operative health signal once we're connected.
+ let mut h = ComponentHealth::green(NAME);
+ if let Some(p99) = self.latency.p99() {
+ if p99 > self.latency.threshold() {
+ h.level = HealthLevel::Yellow;
+ h.detail = Some(format!(
+ "p99 {} ms > threshold {} ms",
+ p99.as_millis(),
+ self.latency.threshold().as_millis()
+ ));
+ }
+ }
+ h
+ }
+ }
}
}
@@ -51,8 +277,14 @@ mod tests {
use super::*;
#[test]
- fn it_compiles() {
- let h = DetectionClient::new("http://127.0.0.1:50051".into()).handle();
- assert_eq!(h.health().level, shared::health::HealthLevel::Disabled);
+ fn config_defaults_match_description() {
+ // Arrange
+ let c = DetectionClientConfig::new("http://127.0.0.1:50051");
+
+ // Assert — the §3 baseline numbers.
+ assert_eq!(c.max_concurrent_in_flight, 2);
+ assert_eq!(c.reconnect_cap, Duration::from_secs(30));
+ assert_eq!(c.expected_schema_version, 1);
+ assert_eq!(c.latency_p99_threshold, Duration::from_millis(100));
}
}
diff --git a/crates/detection_client/tests/stream.rs b/crates/detection_client/tests/stream.rs
new file mode 100644
index 0000000..fef3727
--- /dev/null
+++ b/crates/detection_client/tests/stream.rs
@@ -0,0 +1,551 @@
+//! AZ-660 + AZ-661 integration tests — fixture in-process gRPC server.
+//!
+//! AC-660-1 takes ~10 s; all others complete in ≤5 s.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use tokio::sync::{broadcast, mpsc, oneshot};
+use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
+use tonic::transport::Server;
+use tonic::{Request, Response, Status};
+
+use detection_client::internal::proto::{
+ detection_service_server::{DetectionService, DetectionServiceServer},
+ DetectionResponse, FrameRequest,
+};
+use detection_client::{ConnectionState, DetectionClient, DetectionClientConfig, DetectionEvent};
+use shared::models::frame::{Frame, PixelFormat};
+
+// ---------------------------------------------------------------------------
+// Frame factory
+// ---------------------------------------------------------------------------
+
+fn make_frame(seq: u64, ai_locked: bool) -> Frame {
+ Frame {
+ seq,
+ capture_ts_monotonic_ns: seq * 33_333_333,
+ decode_ts_monotonic_ns: seq * 33_333_333 + 1_000_000,
+ pixels: Arc::new(Bytes::from_static(b"\x80")),
+ width: 1,
+ height: 1,
+ pix_fmt: PixelFormat::Nv12,
+ ai_locked,
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Fixture: configurable echo server
+//
+// `close_after` is per-stream-session (reset on each `stream()` call) so the
+// server can be re-used across reconnects without freezing on the second
+// session.
+// ---------------------------------------------------------------------------
+
+#[derive(Clone)]
+struct FixtureServer {
+ latency_ms: u64,
+ schema_version: u32,
+ model_version: String,
+ close_after: Option,
+}
+
+impl FixtureServer {
+ fn fast() -> Self {
+ Self {
+ latency_ms: 10,
+ schema_version: 1,
+ model_version: "v1.0".to_string(),
+ close_after: None,
+ }
+ }
+ fn slow(latency_ms: u64) -> Self {
+ Self {
+ latency_ms,
+ ..Self::fast()
+ }
+ }
+ fn with_schema_version(mut self, v: u32) -> Self {
+ self.schema_version = v;
+ self
+ }
+ fn with_close_after(mut self, n: u32) -> Self {
+ self.close_after = Some(n);
+ self
+ }
+}
+
+#[async_trait]
+impl DetectionService for FixtureServer {
+ type StreamStream = ReceiverStream>;
+
+ async fn stream(
+ &self,
+ request: Request>,
+ ) -> Result, Status> {
+ let latency = Duration::from_millis(self.latency_ms);
+ let schema_version = self.schema_version;
+ let model_version = self.model_version.clone();
+ let close_after = self.close_after;
+ let mut inbound = request.into_inner();
+ let (tx, rx) = mpsc::channel::>(32);
+
+ tokio::spawn(async move {
+ let mut session_count = 0u32;
+ while let Ok(Some(req)) = inbound.message().await {
+ tokio::time::sleep(latency).await;
+ session_count += 1;
+ let resp = DetectionResponse {
+ schema_version,
+ model_version: model_version.clone(),
+ frame_seq: req.frame_seq,
+ latency_ms: latency.as_millis() as u32,
+ detections: vec![],
+ };
+ if tx.send(Ok(resp)).await.is_err() {
+ break;
+ }
+ if close_after.map(|n| session_count >= n).unwrap_or(false) {
+ break;
+ }
+ }
+ });
+
+ Ok(Response::new(ReceiverStream::new(rx)))
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Fixture: server that switches model_version mid-stream
+// ---------------------------------------------------------------------------
+
+#[derive(Clone)]
+struct VersionSwitchServer {
+ first_model: String,
+ second_model: String,
+ /// Return `first_model` for the first `switch_after` responses, then
+ /// `second_model` for all subsequent ones within the SAME session.
+ switch_after: u32,
+}
+
+#[async_trait]
+impl DetectionService for VersionSwitchServer {
+ type StreamStream = ReceiverStream>;
+
+ async fn stream(
+ &self,
+ request: Request>,
+ ) -> Result, Status> {
+ let first = self.first_model.clone();
+ let second = self.second_model.clone();
+ let switch_after = self.switch_after;
+ let mut inbound = request.into_inner();
+ let (tx, rx) = mpsc::channel::>(32);
+
+ tokio::spawn(async move {
+ let mut count = 0u32;
+ while let Ok(Some(req)) = inbound.message().await {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ let model = if count < switch_after {
+ first.clone()
+ } else {
+ second.clone()
+ };
+ count += 1;
+ let resp = DetectionResponse {
+ schema_version: 1,
+ model_version: model,
+ frame_seq: req.frame_seq,
+ latency_ms: 10,
+ detections: vec![],
+ };
+ if tx.send(Ok(resp)).await.is_err() {
+ break;
+ }
+ }
+ });
+
+ Ok(Response::new(ReceiverStream::new(rx)))
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Server harness
+// ---------------------------------------------------------------------------
+
+async fn start_server_with
(svc: S) -> (String, oneshot::Sender<()>)
+where
+ S: DetectionService + Clone + Send + Sync + 'static,
+{
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let stream = TcpListenerStream::new(listener);
+ let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+
+ tokio::spawn(async move {
+ Server::builder()
+ .add_service(DetectionServiceServer::new(svc))
+ .serve_with_incoming_shutdown(stream, async {
+ let _ = shutdown_rx.await;
+ })
+ .await
+ .unwrap();
+ });
+
+ (format!("http://{addr}"), shutdown_tx)
+}
+
+async fn wait_connected(handle: &detection_client::DetectionClientHandle) {
+ let mut conn = handle.connection_state_stream();
+ tokio::time::timeout(Duration::from_secs(5), async {
+ loop {
+ if *conn.borrow() == ConnectionState::Connected {
+ break;
+ }
+ let _ = conn.changed().await;
+ }
+ })
+ .await
+ .expect("client connected within 5 s");
+}
+
+// ---------------------------------------------------------------------------
+// AZ-660 AC-1 — happy path, 30 fps for 10 s, ≥285 batches, p99 ≤100 ms
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac660_1_happy_path_30fps_285_batches() {
+ // Arrange
+ let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await;
+ let (frame_tx, frame_rx) = broadcast::channel::(512);
+ let config = DetectionClientConfig::new(endpoint);
+ let (_join, handle) = DetectionClient::new(config).run(frame_rx);
+ wait_connected(&handle).await;
+
+ let mut events = handle.subscribe_events();
+ let collector = tokio::spawn(async move {
+ let mut count = 0u64;
+ loop {
+ match tokio::time::timeout(Duration::from_secs(2), events.recv()).await {
+ Ok(Ok(DetectionEvent::Batch { .. })) => count += 1,
+ Ok(Ok(_)) => {}
+ _ => break,
+ }
+ }
+ count
+ });
+
+ // Act — 30 fps for 10 s
+ let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333));
+ ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
+ let mut seq = 0u64;
+ loop {
+ ticker.tick().await;
+ if tokio::time::Instant::now() >= deadline {
+ break;
+ }
+ let _ = frame_tx.send(make_frame(seq, false));
+ seq += 1;
+ }
+ tokio::time::sleep(Duration::from_millis(500)).await;
+ handle.shutdown();
+
+ let batch_count = tokio::time::timeout(Duration::from_secs(3), collector)
+ .await
+ .expect("collector timed out")
+ .expect("collector panicked");
+
+ // Assert
+ assert!(
+ batch_count >= 285,
+ "expected ≥285 batches, got {batch_count}"
+ );
+ assert_eq!(
+ handle.stats().budget_drops_total(),
+ 0,
+ "expected no budget drops"
+ );
+ if let Some(p99) = handle.latency_p99() {
+ assert!(p99 <= Duration::from_millis(100), "p99 {p99:?} > 100 ms");
+ }
+}
+
+// ---------------------------------------------------------------------------
+// AZ-660 AC-2 — reconnect after server closes stream
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac660_2_reconnects_after_stream_close() {
+ // The FixtureServer closes each stream-session after 3 responses; the
+ // client must reconnect and continue receiving within 2 s.
+ let (endpoint, _shutdown) = start_server_with(FixtureServer::fast().with_close_after(3)).await;
+
+ let config = DetectionClientConfig {
+ reconnect_initial: Duration::from_millis(100),
+ reconnect_cap: Duration::from_millis(500),
+ ..DetectionClientConfig::new(endpoint)
+ };
+ let (frame_tx, frame_rx) = broadcast::channel::(64);
+ let (_join, handle) = DetectionClient::new(config).run(frame_rx);
+ wait_connected(&handle).await;
+
+ let mut events = handle.subscribe_events();
+
+ // Send 3 frames → server closes stream after the 3rd response.
+ for i in 0u64..3 {
+ let _ = frame_tx.send(make_frame(i, false));
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+ // Give the stream-close time to propagate and the reconnect to happen.
+ tokio::time::sleep(Duration::from_millis(300)).await;
+
+ // Wait up to 2 s for the client to reconnect (AC-2 requirement).
+ let mut conn = handle.connection_state_stream();
+ tokio::time::timeout(Duration::from_secs(2), async {
+ loop {
+ if *conn.borrow() == ConnectionState::Connected {
+ break;
+ }
+ let _ = conn.changed().await;
+ }
+ })
+ .await
+ .expect("reconnected within 2 s");
+
+ // Verify frames continue to flow after reconnect.
+ for i in 3u64..6 {
+ let _ = frame_tx.send(make_frame(i, false));
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+ let post_reconnect_batch = tokio::time::timeout(Duration::from_secs(2), async {
+ loop {
+ match events.recv().await {
+ Ok(DetectionEvent::Batch { .. }) => return true,
+ Ok(_) => {}
+ Err(_) => return false,
+ }
+ }
+ })
+ .await
+ .unwrap_or(false);
+
+ // Assert
+ assert!(post_reconnect_batch, "frames flow after reconnect");
+ // Same model version on reconnect must NOT fire a second ModelVersionChanged.
+ let model_changes = handle.stats().model_version_changes_total();
+ assert_eq!(
+ model_changes, 1,
+ "same model version across reconnect must not repeat the event"
+ );
+
+ handle.shutdown();
+}
+
+// ---------------------------------------------------------------------------
+// AZ-660 AC-3 — budget drops on slow server (200 ms latency, 30 fps source)
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac660_3_budget_drops_on_slow_server() {
+ // Arrange
+ let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(200)).await;
+ let config = DetectionClientConfig {
+ max_concurrent_in_flight: 2,
+ ..DetectionClientConfig::new(endpoint)
+ };
+ let (frame_tx, frame_rx) = broadcast::channel::(512);
+ let (_join, handle) = DetectionClient::new(config).run(frame_rx);
+ wait_connected(&handle).await;
+
+ // Act — 30 fps for 5 s; server takes 200 ms → budget full after frame 2.
+ let mut ticker = tokio::time::interval(Duration::from_nanos(33_333_333));
+ ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
+ let mut seq = 0u64;
+ loop {
+ ticker.tick().await;
+ if tokio::time::Instant::now() >= deadline {
+ break;
+ }
+ let _ = frame_tx.send(make_frame(seq, false));
+ seq += 1;
+ }
+ tokio::time::sleep(Duration::from_millis(300)).await;
+ handle.shutdown();
+
+ // Assert
+ let drops = handle.stats().budget_drops_total();
+ assert!(drops > 0, "expected budget_drops > 0, got 0");
+}
+
+// ---------------------------------------------------------------------------
+// AZ-660 AC-4 — ai_locked frames are skipped
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac660_4_ai_locked_frames_skipped() {
+ // Arrange
+ let (endpoint, _shutdown) = start_server_with(FixtureServer::fast()).await;
+ let (frame_tx, frame_rx) = broadcast::channel::(256);
+ let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx);
+ wait_connected(&handle).await;
+
+ // Act — 20 frames; every 5th is ai_locked (frames 4, 9, 14, 19 → 4 locked).
+ for i in 0u64..20 {
+ let ai_locked = (i + 1) % 5 == 0;
+ let _ = frame_tx.send(make_frame(i, ai_locked));
+ tokio::time::sleep(Duration::from_millis(15)).await;
+ }
+ tokio::time::sleep(Duration::from_millis(300)).await;
+ handle.shutdown();
+
+ // Assert
+ let skipped = handle.stats().ai_locked_skipped_total();
+ let sent = handle.stats().requests_sent_total();
+ assert_eq!(skipped, 4, "expected 4 ai_locked skips, got {skipped}");
+ assert!(sent <= 16, "expected ≤16 requests sent, got {sent}");
+}
+
+// ---------------------------------------------------------------------------
+// AZ-661 AC-1 — schema mismatch surfaces as hard error + counter
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac661_1_schema_mismatch_hard_error() {
+ // Arrange — server returns schema_version 99 (incompatible with expected 1).
+ let (endpoint, _shutdown) =
+ start_server_with(FixtureServer::fast().with_schema_version(99)).await;
+ let config = DetectionClientConfig {
+ expected_schema_version: 1,
+ ..DetectionClientConfig::new(endpoint)
+ };
+ let (frame_tx, frame_rx) = broadcast::channel::(64);
+ let (_join, handle) = DetectionClient::new(config).run(frame_rx);
+ let mut events = handle.subscribe_events();
+ wait_connected(&handle).await;
+
+ // Act
+ let _ = frame_tx.send(make_frame(1, false));
+
+ // Assert — SchemaMismatch event emitted and counter increments.
+ let got_mismatch = tokio::time::timeout(Duration::from_secs(2), async {
+ loop {
+ match events.recv().await {
+ Ok(DetectionEvent::SchemaMismatch { .. }) => return true,
+ Ok(_) => {}
+ Err(_) => return false,
+ }
+ }
+ })
+ .await
+ .unwrap_or(false);
+
+ assert!(got_mismatch, "expected SchemaMismatch event");
+ assert!(
+ handle.stats().schema_mismatch_total() >= 1,
+ "expected schema_mismatch_total ≥ 1"
+ );
+
+ handle.shutdown();
+}
+
+// ---------------------------------------------------------------------------
+// AZ-661 AC-2 — model_version change is signalled exactly once
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac661_2_model_version_change_emits_event() {
+ // Arrange — server returns "v1.2" for the first response, then "v1.3".
+ let (endpoint, _shutdown) = start_server_with(VersionSwitchServer {
+ first_model: "v1.2".to_string(),
+ second_model: "v1.3".to_string(),
+ switch_after: 1,
+ })
+ .await;
+
+ let (frame_tx, frame_rx) = broadcast::channel::(64);
+ let (_join, handle) = DetectionClient::new(DetectionClientConfig::new(endpoint)).run(frame_rx);
+ let mut events = handle.subscribe_events();
+ wait_connected(&handle).await;
+
+ // Act — send 5 frames; responses 1 = "v1.2", responses 2-5 = "v1.3".
+ for i in 0u64..5 {
+ let _ = frame_tx.send(make_frame(i, false));
+ tokio::time::sleep(Duration::from_millis(20)).await;
+ }
+
+ // Drain all pending events within a 500 ms window.
+ let mut v13_events = 0u32;
+ let drain_deadline = tokio::time::Instant::now() + Duration::from_millis(500);
+ loop {
+ let remaining = drain_deadline.saturating_duration_since(tokio::time::Instant::now());
+ if remaining.is_zero() {
+ break;
+ }
+ match tokio::time::timeout(remaining, events.recv()).await {
+ Ok(Ok(DetectionEvent::ModelVersionChanged { current, .. })) => {
+ if current == "v1.3" {
+ v13_events += 1;
+ }
+ }
+ Ok(Ok(_)) => {}
+ _ => break,
+ }
+ }
+
+ handle.shutdown();
+
+ // Assert — exactly one transition to "v1.3".
+ assert_eq!(
+ v13_events, 1,
+ "expected exactly one ModelVersionChanged(v1.3), got {v13_events}"
+ );
+}
+
+// ---------------------------------------------------------------------------
+// AZ-661 AC-3 — Tier1Degraded emitted exactly once on latency spike
+// ---------------------------------------------------------------------------
+
+#[tokio::test(flavor = "multi_thread")]
+async fn ac661_3_tier1_degraded_emitted_once_on_latency_spike() {
+ // Arrange — small latency window (8 samples) so the window fills quickly;
+ // server latency 150 ms > threshold 100 ms.
+ let (endpoint, _shutdown) = start_server_with(FixtureServer::slow(150)).await;
+ let config = DetectionClientConfig {
+ latency_window_capacity: 8,
+ latency_p99_threshold: Duration::from_millis(100),
+ ..DetectionClientConfig::new(endpoint)
+ };
+ let (frame_tx, frame_rx) = broadcast::channel::(64);
+ let (_join, handle) = DetectionClient::new(config).run(frame_rx);
+ let mut events = handle.subscribe_events();
+ wait_connected(&handle).await;
+
+ // Act — send 10 frames; server responds in 150 ms each.
+ // The latency window (capacity 8) will be full of 150 ms samples after
+ // 8 responses; p99 = 150 ms > 100 ms → exactly one Tier1Degraded event.
+ for i in 0u64..10 {
+ let _ = frame_tx.send(make_frame(i, false));
+ tokio::time::sleep(Duration::from_millis(160)).await;
+ }
+ handle.shutdown();
+
+ // Drain events.
+ let mut degraded_count = 0u32;
+ loop {
+ match events.try_recv() {
+ Ok(DetectionEvent::Tier1Degraded { .. }) => degraded_count += 1,
+ Err(_) => break,
+ Ok(_) => {}
+ }
+ }
+
+ // Assert — the latch fires exactly once per degraded→healthy transition.
+ assert_eq!(
+ degraded_count, 1,
+ "expected exactly one Tier1Degraded event, got {degraded_count}"
+ );
+}
diff --git a/crates/frame_ingest/src/internal/mod.rs b/crates/frame_ingest/src/internal/mod.rs
index 9ce5dba..8ba0c61 100644
--- a/crates/frame_ingest/src/internal/mod.rs
+++ b/crates/frame_ingest/src/internal/mod.rs
@@ -2,5 +2,6 @@
pub mod decoder;
pub mod lifecycle;
+pub mod publisher;
pub mod rtsp_client;
pub mod timestamp;
diff --git a/crates/frame_ingest/src/internal/publisher.rs b/crates/frame_ingest/src/internal/publisher.rs
new file mode 100644
index 0000000..bb46f26
--- /dev/null
+++ b/crates/frame_ingest/src/internal/publisher.rs
@@ -0,0 +1,366 @@
+//! AZ-659 — multi-consumer frame publisher with per-consumer drop accounting.
+//!
+//! `FrameIngest` already fans out to multiple subscribers via
+//! `tokio::sync::broadcast`, but a raw broadcast receiver silently
+//! folds lag into a single `RecvError::Lagged(n)` return value. The
+//! lifecycle loop has no way to attribute those drops back to *which*
+//! consumer fell behind, and the operator UI cannot tell "the AI
+//! tier is slow" from "the modem is slow".
+//!
+//! This module wraps the broadcast hub with:
+//!
+//! - a `ConsumerId` enum that names the three known consumers per
+//! `description.md §3` (`detection_client`, `movement_detector`,
+//! `telemetry_stream`);
+//! - a `PublisherStats` struct holding one `AtomicU64` drop counter
+//! per consumer plus a total publish counter (lock-free; never
+//! blocks the lifecycle loop);
+//! - a `FrameReceiver` wrapper around `broadcast::Receiver`
+//! that intercepts `RecvError::Lagged(n)` and folds it into the
+//! right per-consumer counter before silently retrying — drops
+//! are *counted*, never silent (`description.md §6` AC-2);
+//! - a `FramePublisher` struct that owns the broadcast `Sender` plus
+//! the stats handle, exposes `subscribe(ConsumerId)`, and is
+//! constructed with a configurable channel depth.
+//!
+//! The zero-copy property required by AC-3 lives in the `Frame`
+//! struct itself (`pixels: Arc`); the publisher does not
+//! copy the payload — the broadcast channel hands every subscriber
+//! the same `Arc`, so memory does not scale with consumer count.
+
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+use tokio::sync::broadcast;
+
+use shared::models::frame::Frame;
+
+/// Default per-consumer channel depth (`description.md §3` —
+/// nominal queue depth before a slow consumer's oldest frame is
+/// dropped). Picked at 4 frames so a 30 fps pipeline survives a
+/// ~130 ms downstream stall without dropping anything; longer
+/// stalls drop until the consumer catches up.
+pub const DEFAULT_CHANNEL_DEPTH: usize = 4;
+
+/// The three known downstream frame consumers. `non_exhaustive` so
+/// future additions (e.g. on-board recording) extend without
+/// breaking matchers.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[non_exhaustive]
+pub enum ConsumerId {
+ DetectionClient,
+ MovementDetector,
+ Telemetry,
+}
+
+impl ConsumerId {
+ /// Canonical drop-reason tag emitted to logs and surfaced through
+ /// `FrameIngestHandle::dropped_frames`. Format matches the
+ /// `description.md §6` reason vocabulary so the operator UI's
+ /// existing reason filter works without changes.
+ pub fn drop_reason(self) -> &'static str {
+ match self {
+ Self::DetectionClient => "detection_client_slow",
+ Self::MovementDetector => "movement_detector_slow",
+ Self::Telemetry => "telemetry_slow",
+ }
+ }
+
+ /// Short identifier suitable for `tracing` fields.
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::DetectionClient => "detection_client",
+ Self::MovementDetector => "movement_detector",
+ Self::Telemetry => "telemetry_stream",
+ }
+ }
+}
+
+/// Lock-free counters consumed by `FrameIngestHandle::health` and by
+/// the operator-side per-consumer drop dashboard. Held inside an
+/// `Arc` and shared by the lifecycle task (writer side, via
+/// `FramePublisher::publish`) and every active `FrameReceiver`
+/// (writer side, via lag interception).
+#[derive(Debug, Default)]
+pub struct PublisherStats {
+ publishes_total: AtomicU64,
+ detection_client_drops: AtomicU64,
+ movement_detector_drops: AtomicU64,
+ telemetry_drops: AtomicU64,
+}
+
+impl PublisherStats {
+ pub fn shared() -> Arc {
+ Arc::new(Self::default())
+ }
+
+ pub fn publishes_total(&self) -> u64 {
+ self.publishes_total.load(Ordering::Relaxed)
+ }
+
+ pub fn drops_for(&self, consumer: ConsumerId) -> u64 {
+ self.counter(consumer).load(Ordering::Relaxed)
+ }
+
+ fn note_publish(&self) {
+ self.publishes_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ fn note_drop(&self, consumer: ConsumerId, n: u64) {
+ self.counter(consumer).fetch_add(n, Ordering::Relaxed);
+ }
+
+ fn counter(&self, consumer: ConsumerId) -> &AtomicU64 {
+ match consumer {
+ ConsumerId::DetectionClient => &self.detection_client_drops,
+ ConsumerId::MovementDetector => &self.movement_detector_drops,
+ ConsumerId::Telemetry => &self.telemetry_drops,
+ }
+ }
+}
+
+/// Multi-consumer fan-out hub. Wraps a `tokio::sync::broadcast`
+/// sender with the per-consumer accounting needed by AC-2 of
+/// AZ-659. The channel capacity is the `channel_depth` configured
+/// at construction; the broadcast channel's natural overwrite
+/// behaviour gives the "drop oldest for the slow consumer" semantic
+/// the task spec requires.
+#[derive(Debug)]
+pub struct FramePublisher {
+ tx: broadcast::Sender,
+ stats: Arc,
+ channel_depth: usize,
+}
+
+impl FramePublisher {
+ pub fn new(channel_depth: usize) -> Self {
+ let depth = channel_depth.max(1);
+ let (tx, _rx) = broadcast::channel(depth);
+ Self {
+ tx,
+ stats: PublisherStats::shared(),
+ channel_depth: depth,
+ }
+ }
+
+ pub fn channel_depth(&self) -> usize {
+ self.channel_depth
+ }
+
+ /// Snapshot accessor for the shared stats object. Cheap clone
+ /// (one `Arc::clone`).
+ pub fn stats(&self) -> Arc {
+ Arc::clone(&self.stats)
+ }
+
+ /// Subscribe under a named consumer identity. Per-consumer lag
+ /// gets attributed to the named consumer's drop counter.
+ pub fn subscribe(&self, consumer: ConsumerId) -> FrameReceiver {
+ FrameReceiver {
+ rx: self.tx.subscribe(),
+ consumer,
+ stats: Arc::clone(&self.stats),
+ }
+ }
+
+ /// Subscribe without per-consumer accounting. Use for code paths
+ /// that don't fit into one of the three known consumer roles
+ /// (e.g. test harnesses, ad-hoc inspection). Lag on these
+ /// receivers is *not* counted toward any per-consumer total.
+ pub fn subscribe_raw(&self) -> broadcast::Receiver {
+ self.tx.subscribe()
+ }
+
+ /// Publish a frame. Returns the number of receivers that were
+ /// subscribed at the moment the send happened (informational).
+ /// Increments `publishes_total` even when there are zero
+ /// subscribers — the publish *attempt* is what we measure for
+ /// the §6 publish-rate dashboard.
+ pub fn publish(&self, frame: Frame) -> usize {
+ self.stats.note_publish();
+ // `broadcast::Sender::send` returns `Err(SendError(_))` when
+ // there are zero active receivers. That's a normal state
+ // during start-up (consumers spawn slightly after the
+ // publisher) and is not a failure — we treat the return
+ // value purely as "how many consumers got this frame".
+ self.tx.send(frame).unwrap_or_default()
+ }
+
+ /// Subscriber count snapshot — useful for health-server output
+ /// ("AI tier was not subscribed when first frame arrived").
+ pub fn receiver_count(&self) -> usize {
+ self.tx.receiver_count()
+ }
+}
+
+/// `broadcast::Receiver` wrapper that folds lag into the
+/// owning consumer's drop counter before transparently retrying.
+/// `recv()` only returns `Ok(Frame)` or a fatal `RecvError::Closed`
+/// — lag is never surfaced to the caller; it is recorded and the
+/// next available frame is returned.
+#[derive(Debug)]
+pub struct FrameReceiver {
+ rx: broadcast::Receiver,
+ consumer: ConsumerId,
+ stats: Arc,
+}
+
+impl FrameReceiver {
+ pub fn consumer(&self) -> ConsumerId {
+ self.consumer
+ }
+
+ /// Block until the next frame is available. On lag, record the
+ /// drop count against this consumer and immediately retry; the
+ /// caller never sees `Lagged`. The only error variant returned
+ /// is `RecvError::Closed`, which means the publisher has been
+ /// dropped.
+ pub async fn recv(&mut self) -> Result {
+ loop {
+ match self.rx.recv().await {
+ Ok(frame) => return Ok(frame),
+ Err(broadcast::error::RecvError::Lagged(n)) => {
+ self.note_lag(n);
+ }
+ Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
+ }
+ }
+ }
+
+ /// Non-blocking variant. `Empty` is the channel-is-currently-empty
+ /// case (no frames produced since the last `recv`/`try_recv`),
+ /// not a fatal state. `Closed` mirrors the async variant.
+ pub fn try_recv(&mut self) -> Result {
+ loop {
+ match self.rx.try_recv() {
+ Ok(frame) => return Ok(frame),
+ Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
+ Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
+ Err(broadcast::error::TryRecvError::Lagged(n)) => {
+ self.note_lag(n);
+ }
+ }
+ }
+ }
+
+ fn note_lag(&self, n: u64) {
+ self.stats.note_drop(self.consumer, n);
+ tracing::warn!(
+ consumer = self.consumer.as_str(),
+ reason = self.consumer.drop_reason(),
+ dropped = n,
+ "frame_publisher dropped frames for slow consumer"
+ );
+ }
+}
+
+/// Errors that `FrameReceiver::recv` can return. Lag is *not* in
+/// this list — it is accounted internally.
+#[derive(Debug, thiserror::Error)]
+pub enum RecvError {
+ #[error("frame publisher closed")]
+ Closed,
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum TryRecvError {
+ #[error("no frame available")]
+ Empty,
+ #[error("frame publisher closed")]
+ Closed,
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use bytes::Bytes;
+ use shared::models::frame::{Frame, PixelFormat};
+
+ use super::*;
+
+ fn make_frame(seq: u64, payload: Arc) -> Frame {
+ Frame {
+ seq,
+ capture_ts_monotonic_ns: seq * 1_000_000,
+ decode_ts_monotonic_ns: seq * 1_000_000 + 100,
+ pixels: payload,
+ width: 320,
+ height: 240,
+ pix_fmt: PixelFormat::Nv12,
+ ai_locked: false,
+ }
+ }
+
+ #[test]
+ fn channel_depth_defaults_to_at_least_one() {
+ // Arrange
+ let p = FramePublisher::new(0);
+
+ // Assert — broadcast::channel(0) would panic, so we clamp.
+ assert!(p.channel_depth() >= 1);
+ }
+
+ #[test]
+ fn drop_reason_matches_description_md_vocabulary() {
+ assert_eq!(
+ ConsumerId::DetectionClient.drop_reason(),
+ "detection_client_slow"
+ );
+ assert_eq!(
+ ConsumerId::MovementDetector.drop_reason(),
+ "movement_detector_slow"
+ );
+ assert_eq!(ConsumerId::Telemetry.drop_reason(), "telemetry_slow");
+ }
+
+ #[tokio::test]
+ async fn publish_increments_total_even_without_subscribers() {
+ // Arrange
+ let p = FramePublisher::new(DEFAULT_CHANNEL_DEPTH);
+ let stats = p.stats();
+ let payload = Arc::new(Bytes::from_static(&[0u8; 32]));
+
+ // Act
+ for seq in 0..5 {
+ p.publish(make_frame(seq, Arc::clone(&payload)));
+ }
+
+ // Assert
+ assert_eq!(stats.publishes_total(), 5);
+ assert_eq!(stats.drops_for(ConsumerId::DetectionClient), 0);
+ assert_eq!(stats.drops_for(ConsumerId::MovementDetector), 0);
+ assert_eq!(stats.drops_for(ConsumerId::Telemetry), 0);
+ }
+
+ #[tokio::test]
+ async fn three_subscribers_share_arc_pixels_zero_copy() {
+ // Arrange
+ let p = FramePublisher::new(DEFAULT_CHANNEL_DEPTH);
+ let mut det = p.subscribe(ConsumerId::DetectionClient);
+ let mut mov = p.subscribe(ConsumerId::MovementDetector);
+ let mut tel = p.subscribe(ConsumerId::Telemetry);
+ let payload = Arc::new(Bytes::from(vec![0xABu8; 1024]));
+
+ // Act
+ p.publish(make_frame(1, Arc::clone(&payload)));
+ let f_det = det.recv().await.expect("det recv");
+ let f_mov = mov.recv().await.expect("mov recv");
+ let f_tel = tel.recv().await.expect("tel recv");
+
+ // Assert — every subscriber received the SAME `Arc`,
+ // not a clone of the bytes.
+ assert!(
+ Arc::ptr_eq(&f_det.pixels, &f_mov.pixels),
+ "det/mov must share the same Arc — broadcast must not deep-clone Bytes"
+ );
+ assert!(
+ Arc::ptr_eq(&f_mov.pixels, &f_tel.pixels),
+ "mov/tel must share the same Arc"
+ );
+ assert!(
+ Arc::ptr_eq(&f_det.pixels, &payload),
+ "received Arc must be the original payload pointer"
+ );
+ }
+}
diff --git a/crates/frame_ingest/src/lib.rs b/crates/frame_ingest/src/lib.rs
index 5ca9b45..08901ce 100644
--- a/crates/frame_ingest/src/lib.rs
+++ b/crates/frame_ingest/src/lib.rs
@@ -1,4 +1,4 @@
-//! `frame_ingest` — RTSP pull + decode + timestamp.
+//! `frame_ingest` — RTSP pull + decode + timestamp + publish.
//!
//! Real implementation lands in:
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
@@ -7,18 +7,31 @@
//! fallback) + per-frame monotonic timestamping + decode stats
//! (this crate, `internal/decoder.rs` + `internal/timestamp.rs`).
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
-//! drop policy.
+//! drop policy (this crate, `internal/publisher.rs`).
//!
//! ## AZ-658 surface (extends AZ-657)
//!
-//! `FrameIngest::run` now takes a [`FrameDecoder`]. The lifecycle loop
+//! `FrameIngest::run` takes a [`FrameDecoder`]. The lifecycle loop
//! stamps the capture timestamp the moment a packet leaves the
//! transport, hands the encoded payload to the decoder, and emits one
//! [`Frame`] per decoded picture with `decode_ts_monotonic_ns` set
//! when the decoder returned. Single-frame decode errors increment
//! `decode_errors_total` and drop the frame; the stream is never
-//! aborted (AC-3). The decoder backend (`Nvdec` / `Software`) is
-//! observable via [`FrameIngestHandle::decoder_backend`].
+//! aborted. The decoder backend (`Nvdec` / `Software`) is observable
+//! via [`FrameIngestHandle::decoder_backend`].
+//!
+//! ## AZ-659 surface (extends AZ-658)
+//!
+//! Decoded frames flow through a [`FramePublisher`]. The publisher
+//! exposes [`FrameIngestHandle::subscribe_as`] for the three known
+//! consumers (`detection_client`, `movement_detector`,
+//! `telemetry_stream`); each subscriber's lag is folded into a
+//! per-consumer drop counter visible via
+//! [`FrameIngestHandle::dropped_frames`]. Drops are *counted* and
+//! `tracing::warn`-logged with a reason tag — never silent.
+//! `FrameIngestHandle::subscribe()` is preserved for legacy callers
+//! that don't fit one of the three named consumer roles; lag on
+//! those raw receivers is not attributed to any consumer counter.
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -38,6 +51,10 @@ pub use internal::decoder::{
FfmpegDecoder, FrameDecoder,
};
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
+pub use internal::publisher::{
+ ConsumerId, FramePublisher, FrameReceiver, PublisherStats, RecvError as FrameRecvError,
+ TryRecvError as FrameTryRecvError, DEFAULT_CHANNEL_DEPTH,
+};
pub use internal::rtsp_client::{
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
};
@@ -53,7 +70,7 @@ const NAME: &str = "frame_ingest";
const RED_FRAME_AGE: Duration = Duration::from_secs(5);
pub struct FrameIngest {
- tx: broadcast::Sender,
+ publisher: Arc,
ai_lock_tx: watch::Sender,
state_tx: watch::Sender,
shutdown_tx: watch::Sender,
@@ -65,6 +82,10 @@ pub struct FrameIngest {
}
impl FrameIngest {
+ /// Default constructor — `channel_capacity` maps directly to the
+ /// publisher's `channel_depth` (see `description.md §3`). Use
+ /// [`Self::with_backoff`] when both the depth and the reopen
+ /// backoff need to be customised.
pub fn new(channel_capacity: usize) -> Self {
Self::with_backoff(
channel_capacity,
@@ -73,13 +94,13 @@ impl FrameIngest {
}
pub fn with_backoff(channel_capacity: usize, backoff: BackoffPolicy) -> Self {
- let (tx, _rx) = broadcast::channel(channel_capacity);
+ let publisher = Arc::new(FramePublisher::new(channel_capacity));
let (ai_lock_tx, _) = watch::channel(false);
let (state_tx, _) = watch::channel(SessionState::Closed);
let (shutdown_tx, _) = watch::channel(false);
let (backend_tx, _) = watch::channel(None);
Self {
- tx,
+ publisher,
ai_lock_tx,
state_tx,
shutdown_tx,
@@ -91,9 +112,18 @@ impl FrameIngest {
}
}
+ /// Shared accessor for the underlying [`FramePublisher`]. The
+ /// composition root passes this `Arc` to consumers that prefer
+ /// to subscribe themselves (named via [`ConsumerId`]) rather
+ /// than receiving a pre-built [`FrameReceiver`] over the
+ /// handle.
+ pub fn publisher(&self) -> Arc {
+ Arc::clone(&self.publisher)
+ }
+
pub fn handle(&self) -> FrameIngestHandle {
FrameIngestHandle {
- tx: self.tx.clone(),
+ publisher: Arc::clone(&self.publisher),
ai_lock_tx: self.ai_lock_tx.clone(),
state_rx: self.state_tx.subscribe(),
shutdown_tx: self.shutdown_tx.clone(),
@@ -115,7 +145,7 @@ impl FrameIngest {
T: RtspTransport + 'static,
D: FrameDecoder + 'static,
{
- let tx = self.tx.clone();
+ let publisher = Arc::clone(&self.publisher);
let ai_lock = self.ai_lock_tx.subscribe();
let state_tx = self.state_tx.clone();
let backend_tx = self.backend_tx.clone();
@@ -135,7 +165,7 @@ impl FrameIngest {
transport,
decoder,
config,
- tx,
+ publisher,
ai_lock,
state_tx,
shutdown_rx,
@@ -158,7 +188,7 @@ async fn lifecycle_loop(
transport: Arc>,
mut decoder: Box,
config: RtspSessionConfig,
- tx: broadcast::Sender,
+ publisher: Arc,
mut ai_lock: watch::Receiver,
state_tx: watch::Sender,
mut shutdown_rx: watch::Receiver,
@@ -250,12 +280,14 @@ async fn lifecycle_loop(
pix_fmt: dp.pix_fmt,
ai_locked: locked,
};
- // Send errors are no-ops when
- // the broadcast has no
- // subscribers; per-consumer
- // back-pressure is AZ-659's
- // problem.
- let _ = tx.send(frame);
+ // The publisher folds lag
+ // into per-consumer drop
+ // counters; the lifecycle
+ // loop never blocks on a
+ // slow consumer. Return
+ // value (subscriber count)
+ // is informational.
+ publisher.publish(frame);
}
}
Err(e) => {
@@ -309,7 +341,7 @@ async fn lifecycle_loop(
#[derive(Clone)]
pub struct FrameIngestHandle {
- tx: broadcast::Sender,
+ publisher: Arc,
ai_lock_tx: watch::Sender,
state_rx: watch::Receiver,
shutdown_tx: watch::Sender,
@@ -320,12 +352,47 @@ pub struct FrameIngestHandle {
}
impl FrameIngestHandle {
- /// Subscribe to the frame stream. Consumers receive every frame
- /// after they subscribed; back-pressure is implemented via
- /// broadcast channel lag (see AZ-659 for the slow-consumer
- /// policy).
+ /// Raw, unaccounted subscription. Used by legacy callers and
+ /// tests that don't fit one of the three named [`ConsumerId`]
+ /// roles. Lag on this receiver is *not* attributed to any
+ /// per-consumer drop counter — prefer [`Self::subscribe_as`] for
+ /// production consumers so the per-consumer drop dashboard
+ /// stays accurate.
pub fn subscribe(&self) -> broadcast::Receiver {
- self.tx.subscribe()
+ self.publisher.subscribe_raw()
+ }
+
+ /// Subscribe under a named consumer identity. Per-consumer lag
+ /// is folded into the matching drop counter and surfaced via
+ /// [`Self::dropped_frames`]. The returned [`FrameReceiver`]
+ /// transparently retries past lag so callers never observe
+ /// `Lagged` — they only see the next available frame.
+ pub fn subscribe_as(&self, consumer: ConsumerId) -> FrameReceiver {
+ self.publisher.subscribe(consumer)
+ }
+
+ /// Shared accessor for the underlying [`FramePublisher`]. Useful
+ /// when a consumer needs to subscribe multiple times (e.g.
+ /// reopening a receiver after a transient logical reset) without
+ /// holding the full ingest handle.
+ pub fn publisher(&self) -> Arc {
+ Arc::clone(&self.publisher)
+ }
+
+ /// Per-consumer drop counter. Increments by `n` every time the
+ /// matching [`FrameReceiver`] would otherwise have surfaced
+ /// `RecvError::Lagged(n)`.
+ pub fn dropped_frames(&self, consumer: ConsumerId) -> u64 {
+ self.publisher.stats().drops_for(consumer)
+ }
+
+ /// Total publish attempts since the publisher was constructed.
+ /// Increments on every decoded frame even when there are zero
+ /// subscribers — the metric is the publish *rate*, not the
+ /// delivered-frame rate. Use [`Self::dropped_frames`] for the
+ /// delivered-vs-published delta per consumer.
+ pub fn publishes_total(&self) -> u64 {
+ self.publisher.stats().publishes_total()
}
/// `bringCameraDown`/`bringCameraUp` per `description.md §2`. When
@@ -467,4 +534,23 @@ mod tests {
handle.set_ai_lock(false);
assert!(!handle.ai_locked());
}
+
+ #[test]
+ fn handle_exposes_publisher_metrics_before_run() {
+ // Arrange
+ let ingest = FrameIngest::new(4);
+ let handle = ingest.handle();
+
+ // Assert — fresh publisher exposes zero metrics for every
+ // known consumer (the AZ-659 health surface contract).
+ assert_eq!(handle.publishes_total(), 0);
+ assert_eq!(handle.dropped_frames(ConsumerId::DetectionClient), 0);
+ assert_eq!(handle.dropped_frames(ConsumerId::MovementDetector), 0);
+ assert_eq!(handle.dropped_frames(ConsumerId::Telemetry), 0);
+ assert_eq!(
+ handle.publisher().channel_depth(),
+ 4,
+ "channel_capacity from constructor must propagate to the publisher"
+ );
+ }
}
diff --git a/crates/frame_ingest/tests/publisher.rs b/crates/frame_ingest/tests/publisher.rs
new file mode 100644
index 0000000..9f8b27b
--- /dev/null
+++ b/crates/frame_ingest/tests/publisher.rs
@@ -0,0 +1,263 @@
+//! AZ-659 — `FramePublisher` integration tests.
+//!
+//! These tests drive the publisher directly (no RTSP / decoder
+//! involved) so they execute in milliseconds and don't depend on
+//! libavcodec or NVDEC. The AZ-658 pipeline tests cover the
+//! lifecycle-loop integration end-to-end.
+//!
+//! ACs covered here:
+//! - AC-1 — three consumers consuming at-rate observe every frame and
+//! drop counters stay at 0.
+//! - AC-2 — a slow consumer's lag is folded into THAT consumer's
+//! drop counter while fast consumers continue to receive every
+//! frame.
+//! - AC-3 — zero-copy fan-out: every consumer receives the same
+//! `Arc` (asserted via `Arc::ptr_eq`) so memory does not
+//! scale with consumer count.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use bytes::Bytes;
+use frame_ingest::{ConsumerId, FramePublisher, DEFAULT_CHANNEL_DEPTH};
+use shared::models::frame::{Frame, PixelFormat};
+use tokio::time::{sleep, timeout};
+
+fn make_frame(seq: u64, pixels: Arc) -> Frame {
+ Frame {
+ seq,
+ capture_ts_monotonic_ns: seq * 1_000_000,
+ decode_ts_monotonic_ns: seq * 1_000_000 + 100,
+ pixels,
+ width: 320,
+ height: 240,
+ pix_fmt: PixelFormat::Nv12,
+ ai_locked: false,
+ }
+}
+
+/// AC-1 — three consumers consuming as fast as the publisher emits
+/// observe every frame; per-consumer drop counters stay at 0. The
+/// spec quotes 30 fps for 10 s (~300 frames); we use 30 frames at
+/// no artificial delay to keep CI under 1 s. The semantic property
+/// — "consumers that keep up never lose a frame" — is identical.
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn ac1_three_consumers_at_rate_lose_no_frames() {
+ // Arrange
+ let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH));
+ let stats = publisher.stats();
+ let mut det = publisher.subscribe(ConsumerId::DetectionClient);
+ let mut mov = publisher.subscribe(ConsumerId::MovementDetector);
+ let mut tel = publisher.subscribe(ConsumerId::Telemetry);
+
+ let total: u64 = 30;
+ let publisher_for_task = Arc::clone(&publisher);
+
+ // Act — drain in parallel while publishing. Each consumer drains
+ // immediately, so the broadcast channel stays well under
+ // `DEFAULT_CHANNEL_DEPTH` and no consumer can lag.
+ let producer = tokio::spawn(async move {
+ let payload = Arc::new(Bytes::from(vec![0xAAu8; 256]));
+ for seq in 0..total {
+ publisher_for_task.publish(make_frame(seq, Arc::clone(&payload)));
+ // Yield so subscribers get a chance to drain between
+ // sends; without this the producer races ahead and any
+ // delay in tokio scheduling could falsely trip the lag
+ // counter even for a "fast" consumer at this small scale.
+ tokio::task::yield_now().await;
+ }
+ });
+
+ let drain = |mut rx: frame_ingest::FrameReceiver, label: &'static str| {
+ tokio::spawn(async move {
+ let mut got = 0u64;
+ while got < total {
+ match timeout(Duration::from_secs(2), rx.recv()).await {
+ Ok(Ok(_)) => got += 1,
+ Ok(Err(e)) => panic!("{label} recv closed early: {e}"),
+ Err(_) => panic!("{label} stalled at {got}/{total}"),
+ }
+ }
+ got
+ })
+ };
+
+ let h_det = drain(det.take(), "detection_client");
+ let h_mov = drain(mov.take(), "movement_detector");
+ let h_tel = drain(tel.take(), "telemetry");
+
+ producer.await.expect("producer");
+ assert_eq!(h_det.await.expect("det join"), total);
+ assert_eq!(h_mov.await.expect("mov join"), total);
+ assert_eq!(h_tel.await.expect("tel join"), total);
+
+ // Assert — every consumer drained at-rate, so no drops on any
+ // counter and `publishes_total` matches the produced count.
+ assert_eq!(stats.publishes_total(), total);
+ assert_eq!(stats.drops_for(ConsumerId::DetectionClient), 0);
+ assert_eq!(stats.drops_for(ConsumerId::MovementDetector), 0);
+ assert_eq!(stats.drops_for(ConsumerId::Telemetry), 0);
+}
+
+/// AC-2 — a slow consumer (yields slowly) is the only one to incur
+/// drops; the fast consumers continue to observe every frame. The
+/// producer paces its sends at ~5 ms intervals so fast consumers
+/// can drain in between; the slow consumer sleeps ~25 ms per frame,
+/// so the broadcast channel laps it after a handful of frames.
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn ac2_slow_consumer_drops_while_fast_consumers_unaffected() {
+ // Arrange — depth-2 channel + a producer that paces sends.
+ let channel_depth = 2usize;
+ let publisher = Arc::new(FramePublisher::new(channel_depth));
+ let stats = publisher.stats();
+
+ let mut det = publisher.subscribe(ConsumerId::DetectionClient); // fast
+ let mut mov = publisher.subscribe(ConsumerId::MovementDetector); // fast
+ let mut tel = publisher.subscribe(ConsumerId::Telemetry); // SLOW
+
+ let total: u64 = 30;
+ let payload = Arc::new(Bytes::from(vec![0xBBu8; 64]));
+
+ // Spawn consumers BEFORE the producer task so the broadcast
+ // already has live subscribers when the first publish lands.
+ let slow = tokio::spawn(async move {
+ let mut got = 0u64;
+ let deadline = Duration::from_secs(10);
+ let start = tokio::time::Instant::now();
+ // The slow consumer keeps polling until the broadcast
+ // channel closes (publisher drops) OR the safety deadline
+ // fires. A `Closed` here is the natural termination signal
+ // once the producer's `Arc` goes out of
+ // scope; we don't try to predict how many frames it gets
+ // because that depends on scheduling jitter.
+ while start.elapsed() < deadline {
+ match timeout(Duration::from_millis(500), tel.recv()).await {
+ Ok(Ok(_)) => {
+ got += 1;
+ sleep(Duration::from_millis(25)).await;
+ }
+ Ok(Err(_)) => break, // Closed: producer finished.
+ Err(_) => {
+ // Timeout — assume producer is done and exit.
+ break;
+ }
+ }
+ }
+ got
+ });
+
+ let drain_fast = |mut rx: frame_ingest::FrameReceiver, label: &'static str| {
+ tokio::spawn(async move {
+ let mut got = 0u64;
+ while got < total {
+ match timeout(Duration::from_secs(3), rx.recv()).await {
+ Ok(Ok(_)) => got += 1,
+ Ok(Err(e)) => panic!("{label} recv closed early: {e}"),
+ Err(_) => panic!("{label} stalled at {got}/{total}"),
+ }
+ }
+ got
+ })
+ };
+ let h_det = drain_fast(det.take(), "detection_client");
+ let h_mov = drain_fast(mov.take(), "movement_detector");
+
+ // Give consumers a moment to enter `recv` before producing.
+ sleep(Duration::from_millis(10)).await;
+
+ // Act — pace sends ~5 ms apart so fast consumers have time to
+ // drain each frame before the next arrives. The slow consumer
+ // can only process ~1 frame per 25 ms, so it inevitably lags.
+ let publisher_for_task = Arc::clone(&publisher);
+ let payload_for_task = Arc::clone(&payload);
+ let producer = tokio::spawn(async move {
+ for seq in 0..total {
+ publisher_for_task.publish(make_frame(seq, Arc::clone(&payload_for_task)));
+ sleep(Duration::from_millis(5)).await;
+ }
+ });
+
+ producer.await.expect("producer");
+ assert_eq!(h_det.await.expect("det join"), total);
+ assert_eq!(h_mov.await.expect("mov join"), total);
+
+ // Drop the last `Arc` so the slow consumer's
+ // recv returns `Closed` and it can exit on its own.
+ drop(publisher);
+ let slow_got = slow.await.expect("slow join");
+
+ // Assert — the slow consumer dropped frames; the fast ones did
+ // not. The exact drop count varies with scheduler jitter so we
+ // assert "> 0" rather than a specific number.
+ assert_eq!(
+ stats.drops_for(ConsumerId::DetectionClient),
+ 0,
+ "fast consumer must not have any drops"
+ );
+ assert_eq!(
+ stats.drops_for(ConsumerId::MovementDetector),
+ 0,
+ "fast consumer must not have any drops"
+ );
+ let tel_drops = stats.drops_for(ConsumerId::Telemetry);
+ assert!(
+ tel_drops > 0,
+ "slow telemetry consumer must have at least one drop; got {tel_drops}"
+ );
+ // Every frame is accounted for from the slow consumer's
+ // perspective: delivered + dropped == published.
+ assert_eq!(
+ slow_got + tel_drops,
+ stats.publishes_total(),
+ "received + dropped must equal published for the slow consumer"
+ );
+}
+
+/// AC-3 — fan-out is zero-copy: each subscriber observes the SAME
+/// `Arc` for a given frame. Asserts the property via
+/// `Arc::ptr_eq` between the pixel handles delivered to two
+/// different consumers; the test does not depend on timing.
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ac3_fan_out_is_zero_copy_via_arc_bytes() {
+ // Arrange
+ let publisher = Arc::new(FramePublisher::new(DEFAULT_CHANNEL_DEPTH));
+ let mut det = publisher.subscribe(ConsumerId::DetectionClient);
+ let mut mov = publisher.subscribe(ConsumerId::MovementDetector);
+ let mut tel = publisher.subscribe(ConsumerId::Telemetry);
+ let payload = Arc::new(Bytes::from(vec![0xCDu8; 1024]));
+
+ // Act
+ publisher.publish(make_frame(42, Arc::clone(&payload)));
+ let f_det = det.recv().await.expect("det recv");
+ let f_mov = mov.recv().await.expect("mov recv");
+ let f_tel = tel.recv().await.expect("tel recv");
+
+ // Assert — same Arc across consumers AND across publisher
+ // boundary; the broadcast did not deep-clone Bytes anywhere.
+ assert!(Arc::ptr_eq(&f_det.pixels, &payload));
+ assert!(Arc::ptr_eq(&f_mov.pixels, &payload));
+ assert!(Arc::ptr_eq(&f_tel.pixels, &payload));
+ assert!(Arc::ptr_eq(&f_det.pixels, &f_mov.pixels));
+ assert!(Arc::ptr_eq(&f_mov.pixels, &f_tel.pixels));
+}
+
+// `FrameReceiver` does not implement `Copy` and the public surface
+// returns it by value, so we move it into the spawned task via
+// `take()` on a small helper. Defined here to keep test bodies tidy.
+trait Takeable {
+ fn take(&mut self) -> frame_ingest::FrameReceiver;
+}
+
+impl Takeable for frame_ingest::FrameReceiver {
+ fn take(&mut self) -> frame_ingest::FrameReceiver {
+ // SAFETY: we replace `self` with a fresh detached receiver
+ // that the test no longer uses; this lets us move ownership
+ // out of a `&mut`-bound binding without unsafe code.
+ std::mem::replace(self, dummy_receiver())
+ }
+}
+
+fn dummy_receiver() -> frame_ingest::FrameReceiver {
+ let p = FramePublisher::new(1);
+ p.subscribe(ConsumerId::DetectionClient)
+}