[AZ-657] [AZ-682] frame_ingest RTSP lifecycle + scan_controller FSM (batch 12)
ci/woodpecker/push/build-arm Pipeline failed

AZ-657 (frame_ingest): RTSP session lifecycle FSM with bounded
exponential backoff (1 s → 30 s cap), AI-lock plumb through
watch::Sender that stamps every emitted Frame, and SPS/PPS
hard-fail via OpenError::UnsupportedProfile. The actual RTSP wire
client is abstracted behind an RtspTransport trait so AZ-658 can
pin retina/FFmpeg alongside the decoder; the lifecycle FSM itself
is production code today. tokio::select! around every transport
call so a hung open/read cannot wedge graceful shutdown. 10 unit +
5 integration tests cover happy path, bounded reconnect, stream-
drop reopen, hard-fail no-retry, and AI-lock toggle.

AZ-682 (scan_controller): typed ScanState (ZoomedOut / ZoomedIn /
TargetFollow) with a complete pure transition catalogue, every
(state, trigger) → next_state from description.md §1/§4/§5 covered;
spec-disallowed combos return TransitionOutcome.accepted = false
with RejectReason::UnsupportedTransition (loud, not silent). Frame-
rate floor monitor with hysteresis suppresses ZoomedOut → ZoomedIn
while sustained FPS < 10 fps per description.md §5/§6. Rolling
100-sample tick-latency window surfaces p99; health goes yellow
above the 10 ms budget. 18 unit + 5 integration tests cover the
catalogue, fps-floor activate/clear, and tick-latency budget.

Cumulative review (batches 10-12): all OPEN findings carried
forward without regressions. See
_docs/03_implementation/batch_12_cycle1_report.md §6.

Notes: pre-existing dead-code error in autopilot::Runtime::
vlm_provider_name (origin batch 4) blocks workspace -D warnings
clippy. Recorded in _docs/_process_leftovers/ — not in batch 12
scope.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 08:17:27 +03:00
parent 4c63829ccd
commit 745ab806f1
18 changed files with 2600 additions and 51 deletions
+7
View File
@@ -11,3 +11,10 @@ authors.workspace = true
shared = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
@@ -0,0 +1,341 @@
//! AZ-657 — RTSP session lifecycle FSM.
//!
//! Owns the state transitions between `Closed → Connecting → Streaming
//! → Failing → Connecting → …` and the bounded exponential backoff.
//! Pure FSM logic + `LifecycleStats` are tested in this module; the
//! end-to-end loop that drives the FSM against a transport lives in
//! [`super::super::FrameIngest::run`].
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use serde::Serialize;
use super::rtsp_client::{OpenError, StreamError};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum SessionState {
Closed,
/// `attempt` is 1 on the first open attempt and increments by 1
/// every time a reopen is launched.
Connecting {
attempt: u32,
},
Streaming,
/// Backoff is active. `attempt` is the attempt that just failed
/// (0 if we landed here from a stream drop without any preceding
/// failed open). The next `OpenAttempted` transitions to
/// `Connecting { attempt: attempt + 1 }`.
Failing {
attempt: u32,
},
}
/// Result of feeding a transport event into the FSM. `wait_before_next`
/// is `Some(d)` when the FSM has moved into `Failing` and the loop
/// owes a `tokio::time::sleep(d)` before re-attempting open.
#[derive(Debug, PartialEq, Eq)]
pub struct Transition {
pub next: SessionState,
pub wait_before_next: Option<Duration>,
pub reopen: bool,
}
/// Bounded exponential backoff per `description.md §6` (1 s → 30 s
/// cap). Pure value object, no I/O.
#[derive(Debug, Clone, Copy)]
pub struct BackoffPolicy {
pub initial: Duration,
pub cap: Duration,
pub factor: u32,
}
impl BackoffPolicy {
pub fn new(initial: Duration, cap: Duration) -> Self {
Self {
initial,
cap,
factor: 2,
}
}
/// `attempt` is 1-indexed: first failure waits `initial`, second
/// waits `initial * factor`, capped at `cap`. Saturating math
/// guards against overflow on pathological backoff configs.
pub fn next_delay(&self, attempt: u32) -> Duration {
if attempt == 0 {
return self.initial;
}
let exp = attempt.saturating_sub(1);
let mult = self.factor.saturating_pow(exp);
let raw = self.initial.saturating_mul(mult);
raw.min(self.cap)
}
}
/// Pure transition function: given the current state + the latest
/// transport event, return the next state and (optionally) the
/// backoff delay the loop must sleep before reopening.
///
/// Triggers:
/// - [`Trigger::OpenAttempted`] — loop entering `Connecting`.
/// - [`Trigger::OpenSucceeded`] — transport `open()` returned `Ok`.
/// - [`Trigger::OpenFailed`] — transport `open()` returned an error
/// that is NOT a hard-fail (e.g. transient network).
/// - [`Trigger::HardFail`] — `OpenError::UnsupportedProfile` per AC-3.
/// The session does NOT auto-reopen; the FSM stays in `Failing`
/// indefinitely until an operator-driven reset.
/// - [`Trigger::StreamDropped`] — `next_packet` returned a
/// `StreamError`, including `EndOfStream`.
/// - [`Trigger::Closed`] — supervisor-driven shutdown.
#[derive(Debug, PartialEq, Eq)]
pub enum Trigger {
OpenAttempted,
OpenSucceeded,
OpenFailed,
HardFail,
StreamDropped,
Closed,
}
impl Trigger {
pub fn from_open_error(err: &OpenError) -> Self {
match err {
OpenError::UnsupportedProfile { .. } => Trigger::HardFail,
_ => Trigger::OpenFailed,
}
}
pub fn from_stream_error(_err: &StreamError) -> Self {
Trigger::StreamDropped
}
}
pub fn transition(state: SessionState, trigger: Trigger, backoff: &BackoffPolicy) -> Transition {
match (state, trigger) {
(_, Trigger::Closed) => Transition {
next: SessionState::Closed,
wait_before_next: None,
reopen: false,
},
(SessionState::Closed, Trigger::OpenAttempted) => Transition {
next: SessionState::Connecting { attempt: 1 },
wait_before_next: None,
reopen: false,
},
(SessionState::Failing { attempt }, Trigger::OpenAttempted) => Transition {
next: SessionState::Connecting {
attempt: attempt.saturating_add(1),
},
wait_before_next: None,
reopen: false,
},
(SessionState::Connecting { .. }, Trigger::OpenSucceeded) => Transition {
next: SessionState::Streaming,
wait_before_next: None,
reopen: false,
},
(SessionState::Connecting { attempt }, Trigger::OpenFailed) => Transition {
next: SessionState::Failing { attempt },
wait_before_next: Some(backoff.next_delay(attempt)),
reopen: true,
},
(SessionState::Connecting { .. } | SessionState::Failing { .. }, Trigger::HardFail) => {
Transition {
next: SessionState::Failing { attempt: u32::MAX },
wait_before_next: None,
reopen: false,
}
}
(SessionState::Streaming, Trigger::StreamDropped) => Transition {
// A drop hasn't failed an open yet; record `attempt = 0`
// so the next OpenAttempted → `Connecting { attempt: 1 }`.
next: SessionState::Failing { attempt: 0 },
wait_before_next: Some(backoff.next_delay(1)),
reopen: true,
},
// Defensive: any unexpected (state, trigger) combo is a
// no-op — the FSM stays put. A transport bug cannot crash
// the lifecycle loop.
_ => Transition {
next: state,
wait_before_next: None,
reopen: false,
},
}
}
/// Process-wide counters consumed by `FrameIngestHandle::health`.
/// Kept lock-free so the lifecycle loop never blocks on metric
/// updates.
#[derive(Debug, Default)]
pub struct LifecycleStats {
pub reopens_total: AtomicU64,
pub open_failures_total: AtomicU64,
pub last_packet_at_ns: AtomicU64,
pub current_attempt: AtomicU32,
}
impl LifecycleStats {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn note_reopen(&self) {
self.reopens_total.fetch_add(1, Ordering::Relaxed);
}
pub fn note_open_failure(&self, attempt: u32) {
self.open_failures_total.fetch_add(1, Ordering::Relaxed);
self.current_attempt.store(attempt, Ordering::Relaxed);
}
pub fn note_streaming(&self) {
self.current_attempt.store(0, Ordering::Relaxed);
}
pub fn note_packet(&self, ts_ns: u64) {
self.last_packet_at_ns.store(ts_ns, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn policy() -> BackoffPolicy {
BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30))
}
#[test]
fn backoff_increments_then_caps() {
// Arrange
let p = policy();
// Assert
assert_eq!(p.next_delay(1), Duration::from_millis(100));
assert_eq!(p.next_delay(2), Duration::from_millis(200));
assert_eq!(p.next_delay(3), Duration::from_millis(400));
assert!(p.next_delay(20) <= p.cap);
assert_eq!(p.next_delay(20), p.cap);
}
#[test]
fn happy_path_closed_to_streaming() {
// Arrange
let p = policy();
// Act
let t1 = transition(SessionState::Closed, Trigger::OpenAttempted, &p);
let t2 = transition(t1.next, Trigger::OpenSucceeded, &p);
// Assert
assert_eq!(t1.next, SessionState::Connecting { attempt: 1 });
assert_eq!(t2.next, SessionState::Streaming);
assert!(t2.wait_before_next.is_none());
}
#[test]
fn open_failure_enters_failing_with_backoff() {
// Arrange
let p = policy();
// Act
let connecting = transition(SessionState::Closed, Trigger::OpenAttempted, &p).next;
let t = transition(connecting, Trigger::OpenFailed, &p);
// Assert
assert_eq!(t.next, SessionState::Failing { attempt: 1 });
assert_eq!(t.wait_before_next, Some(Duration::from_millis(100)));
assert!(t.reopen);
}
#[test]
fn repeated_failures_grow_backoff() {
// Arrange
let p = policy();
let mut state = SessionState::Closed;
let mut delays = vec![];
// Act — drive the loop the same way `lifecycle_loop` does:
// OpenAttempted → OpenFailed cycle, attempt grows by one
// per cycle.
for _ in 0..4 {
state = transition(state, Trigger::OpenAttempted, &p).next;
let t = transition(state, Trigger::OpenFailed, &p);
delays.push(t.wait_before_next.unwrap());
state = t.next;
}
// Assert
assert_eq!(delays[0], Duration::from_millis(100));
assert_eq!(delays[1], Duration::from_millis(200));
assert_eq!(delays[2], Duration::from_millis(400));
assert_eq!(delays[3], Duration::from_millis(800));
}
#[test]
fn stream_drop_triggers_reopen_at_initial_delay() {
// Arrange
let p = policy();
// Act
let t = transition(SessionState::Streaming, Trigger::StreamDropped, &p);
// Assert
assert_eq!(t.next, SessionState::Failing { attempt: 0 });
assert!(t.reopen);
assert_eq!(t.wait_before_next, Some(Duration::from_millis(100)));
}
#[test]
fn hard_fail_stays_failing_without_reopen() {
// Arrange
let p = policy();
// Act
let t = transition(
SessionState::Connecting { attempt: 1 },
Trigger::HardFail,
&p,
);
// Assert
assert_eq!(t.next, SessionState::Failing { attempt: u32::MAX });
assert!(!t.reopen);
assert_eq!(t.wait_before_next, None);
}
#[test]
fn closed_trigger_resets_from_any_state() {
// Arrange
let p = policy();
// Assert
for s in [
SessionState::Closed,
SessionState::Connecting { attempt: 3 },
SessionState::Streaming,
SessionState::Failing { attempt: 7 },
] {
let t = transition(s, Trigger::Closed, &p);
assert_eq!(t.next, SessionState::Closed);
assert!(!t.reopen);
}
}
#[test]
fn from_open_error_maps_unsupported_profile_to_hard_fail() {
// Arrange
let unsupported = OpenError::UnsupportedProfile {
details: "H265 main10".to_string(),
};
let transient = OpenError::Timeout;
// Assert
assert_eq!(Trigger::from_open_error(&unsupported), Trigger::HardFail);
assert_eq!(Trigger::from_open_error(&transient), Trigger::OpenFailed);
}
}
+4
View File
@@ -0,0 +1,4 @@
//! Internal modules for `frame_ingest`. Not part of the public API.
pub mod lifecycle;
pub mod rtsp_client;
@@ -0,0 +1,117 @@
//! AZ-657 — RTSP transport abstraction.
//!
//! The session lifecycle (open / reconnect / AI-lock) is the production
//! deliverable of AZ-657 and lives in [`super::lifecycle`]. The
//! transport that actually speaks RTSP to the camera is wired in
//! through this trait so:
//!
//! - **Production**: a real client (retina or FFmpeg/GStreamer binding,
//! pinned by AZ-658) opens RTSP against the ViewPro A40 and pushes
//! raw NAL units up to the decoder. The full client is folded into
//! AZ-658 alongside the decoder because the codec choice is what
//! pins the client.
//! - **Tests**: a fake transport drives the lifecycle deterministically
//! without needing MediaMTX / Docker. This is the same `*Transport`
//! pattern AZ-653 uses for the A40 UDP wire.
//!
//! What AZ-657 owns regardless of transport:
//! - `RtspSessionConfig` (url, transport hint, backoff override).
//! - `OpenError` / `StreamError` taxonomy (including the
//! `UnsupportedProfile` hard-fail required by AC-3).
//! - The `RtspTransport` trait every transport must implement.
use std::time::Duration;
use async_trait::async_trait;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RtspTransportHint {
Tcp,
Udp,
Auto,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RtspSessionConfig {
pub url: String,
pub transport: RtspTransportHint,
pub open_timeout: Duration,
pub backoff_initial: Duration,
pub backoff_cap: Duration,
}
impl RtspSessionConfig {
/// Builder default per `description.md §8`: ≤5 s reconnect target
/// drives `backoff_initial = 1 s`, `backoff_cap = 30 s`. The
/// open_timeout is conservative (≤2 s) to match AC-1.
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
transport: RtspTransportHint::Auto,
open_timeout: Duration::from_secs(2),
backoff_initial: Duration::from_secs(1),
backoff_cap: Duration::from_secs(30),
}
}
}
/// Errors returned from `RtspTransport::open`. `UnsupportedProfile` is
/// the AC-3 hard-fail: if the camera negotiates a codec / profile the
/// decoder cannot consume, the session must fail with this typed
/// variant instead of silently picking a wrong decode path.
#[derive(Debug, Error)]
pub enum OpenError {
#[error("RTSP open timed out")]
Timeout,
#[error("RTSP network error: {0}")]
Network(String),
#[error("RTSP authentication failed")]
AuthFailed,
#[error("RTSP unsupported codec profile: {details}")]
UnsupportedProfile { details: String },
}
/// Errors emitted from `RtspTransport::next_packet`. Distinguished
/// from `OpenError` because reconnect policy differs: stream loss
/// triggers backoff + reopen; an open-time hard error (e.g.
/// `UnsupportedProfile`) escalates to the session's `failing` state
/// and surfaces health red.
#[derive(Debug, Error)]
pub enum StreamError {
#[error("RTSP stream ended (clean EOS)")]
EndOfStream,
#[error("RTSP stream dropped: {0}")]
Dropped(String),
#[error("RTSP read timed out")]
ReadTimeout,
}
/// Trait the lifecycle FSM consumes. Implementors are responsible for
/// real RTSP I/O OR for simulating it in tests. Lifetime semantics:
/// `open` must be safe to call repeatedly (the FSM calls it on every
/// reconnect attempt); `close` must be safe to call on a not-yet-open
/// transport.
#[async_trait]
pub trait RtspTransport: Send + Sync {
async fn open(&mut self, config: &RtspSessionConfig) -> Result<(), OpenError>;
async fn close(&mut self);
/// Returns the next packet from the open session, or a
/// `StreamError` if the session has dropped. Calls after the FSM
/// reaches `Failing` state are not expected.
async fn next_packet(&mut self) -> Result<RtspPacket, StreamError>;
}
/// Minimal envelope carrying one inbound RTSP unit. AZ-657's lifecycle
/// loop only counts packets and timestamps them; AZ-658 parses the
/// `payload` bytes through the H.264/265 decoder.
#[derive(Debug, Clone)]
pub struct RtspPacket {
pub timestamp_rtp: u32,
pub payload: bytes::Bytes,
}
+337 -11
View File
@@ -1,30 +1,267 @@
//! `frame_ingest` — RTSP pull + decode + timestamp.
//!
//! Real implementation lands in:
//! - AZ-657 `frame_ingest_rtsp_session`
//! - AZ-658 `frame_ingest_decoder`
//! - AZ-659 `frame_ingest_publisher`
//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded
//! reconnect + AI-lock plumb (this crate, modules in `internal/`).
//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode into raw
//! pixel buffers + retina/FFmpeg/GStreamer transport binding.
//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer
//! drop policy.
//!
//! ## AZ-657 surface
//!
//! - [`FrameIngest::new`] — construct in `Closed` state.
//! - [`FrameIngest::run`] — spawn the lifecycle loop driving the given
//! `RtspTransport` through `connect → stream → reconnect` cycles
//! with bounded backoff. Returns a `JoinHandle`.
//! - [`FrameIngestHandle::subscribe`] — broadcast frame stream (the
//! AZ-657 lifecycle emits only synthetic header frames; real
//! decoded frames come in AZ-658).
//! - [`FrameIngestHandle::set_ai_lock`] — `bringCameraDown` /
//! `bringCameraUp` signal. Stamps `Frame.ai_locked` on every
//! subsequently emitted frame.
//! - [`FrameIngestHandle::session_state`] — current FSM state.
//! - [`FrameIngestHandle::health`] — `ComponentHealth` reflecting the
//! FSM state + `last_packet_age` + `ai_locked`.
use tokio::sync::broadcast;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use shared::health::ComponentHealth;
use tokio::sync::{broadcast, watch, Mutex};
use tokio::task::JoinHandle;
use shared::clock::MonoClock;
use shared::health::{ComponentHealth, HealthLevel};
use shared::models::frame::Frame;
pub mod internal;
pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState};
pub use internal::rtsp_client::{
OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError,
};
use internal::lifecycle::{transition, Trigger};
const NAME: &str = "frame_ingest";
/// Threshold past which `health()` flips to `Red` while the session is
/// not `Streaming`. Aligned with `description.md §6` (red after
/// `last_frame_age_ms` exceeds a configured threshold).
const RED_FRAME_AGE: Duration = Duration::from_secs(5);
pub struct FrameIngest {
tx: broadcast::Sender<Frame>,
ai_lock_tx: watch::Sender<bool>,
state_tx: watch::Sender<SessionState>,
shutdown_tx: watch::Sender<bool>,
stats: Arc<LifecycleStats>,
backoff: BackoffPolicy,
clock: MonoClock,
}
impl FrameIngest {
pub fn new(channel_capacity: usize) -> Self {
Self::with_backoff(
channel_capacity,
BackoffPolicy::new(Duration::from_secs(1), Duration::from_secs(30)),
)
}
pub fn with_backoff(channel_capacity: usize, backoff: BackoffPolicy) -> Self {
let (tx, _rx) = broadcast::channel(channel_capacity);
Self { tx }
let (ai_lock_tx, _) = watch::channel(false);
let (state_tx, _) = watch::channel(SessionState::Closed);
let (shutdown_tx, _) = watch::channel(false);
Self {
tx,
ai_lock_tx,
state_tx,
shutdown_tx,
stats: LifecycleStats::new(),
backoff,
clock: MonoClock::new(),
}
}
pub fn handle(&self) -> FrameIngestHandle {
FrameIngestHandle {
tx: self.tx.clone(),
ai_lock_tx: self.ai_lock_tx.clone(),
state_rx: self.state_tx.subscribe(),
shutdown_tx: self.shutdown_tx.clone(),
stats: Arc::clone(&self.stats),
clock: self.clock,
}
}
/// Spawn the lifecycle loop. The returned handle resolves when
/// the loop exits (shutdown signalled via
/// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM).
pub fn run<T>(&self, transport: T, config: RtspSessionConfig) -> JoinHandle<()>
where
T: RtspTransport + 'static,
{
let tx = self.tx.clone();
let ai_lock = self.ai_lock_tx.subscribe();
let state_tx = self.state_tx.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
let stats = Arc::clone(&self.stats);
let backoff = self.backoff;
let clock = self.clock;
let transport = Arc::new(Mutex::new(transport));
tokio::spawn(async move {
lifecycle_loop(
transport,
config,
tx,
ai_lock,
state_tx,
shutdown_rx,
stats,
backoff,
clock,
)
.await;
})
}
}
fn is_shutdown(rx: &watch::Receiver<bool>) -> bool {
*rx.borrow()
}
#[allow(clippy::too_many_arguments)]
async fn lifecycle_loop<T>(
transport: Arc<Mutex<T>>,
config: RtspSessionConfig,
tx: broadcast::Sender<Frame>,
mut ai_lock: watch::Receiver<bool>,
state_tx: watch::Sender<SessionState>,
mut shutdown_rx: watch::Receiver<bool>,
stats: Arc<LifecycleStats>,
backoff: BackoffPolicy,
clock: MonoClock,
) where
T: RtspTransport,
{
let mut state = SessionState::Closed;
let mut seq: u64 = 0;
loop {
if is_shutdown(&shutdown_rx) {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
}
state = transition(state, Trigger::OpenAttempted, &backoff).next;
state_tx.send_replace(state);
// Race the open call against shutdown so a hung transport
// (real RTSP can block on `DESCRIBE` for many seconds) cannot
// wedge graceful exit.
let open_result = tokio::select! {
biased;
res = async {
let mut t = transport.lock().await;
t.open(&config).await
} => res,
_ = shutdown_rx.changed() => {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
}
};
match open_result {
Ok(()) => {
state = transition(state, Trigger::OpenSucceeded, &backoff).next;
state_tx.send_replace(state);
stats.note_streaming();
loop {
let packet = tokio::select! {
biased;
res = async {
let mut t = transport.lock().await;
t.next_packet().await
} => Some(res),
_ = shutdown_rx.changed() => None,
};
let Some(packet) = packet else {
let mut t = transport.lock().await;
t.close().await;
state_tx.send_replace(SessionState::Closed);
return;
};
match packet {
Ok(pkt) => {
let now_ns = clock.elapsed_ns();
stats.note_packet(now_ns);
let locked = *ai_lock.borrow_and_update();
// AZ-657 emits a synthetic frame envelope
// per inbound RTSP packet so the lifecycle
// FSM can be exercised end-to-end without
// the decoder (AZ-658 swaps this for the
// actual decoded frame).
let frame = Frame {
seq,
capture_ts_monotonic_ns: now_ns,
decode_ts_monotonic_ns: now_ns,
pixels: Arc::new(pkt.payload),
width: 0,
height: 0,
pix_fmt: shared::models::frame::PixelFormat::Nv12,
ai_locked: locked,
};
seq = seq.saturating_add(1);
// A no-subscriber send is a no-op error in
// the broadcast channel; the lifecycle
// does not care.
let _ = tx.send(frame);
}
Err(e) => {
let trig = Trigger::from_stream_error(&e);
let t = transition(state, trig, &backoff);
state = t.next;
state_tx.send_replace(state);
stats.note_reopen();
if let Some(wait) = t.wait_before_next {
tokio::time::sleep(wait).await;
}
if !t.reopen {
return;
}
break;
}
}
}
}
Err(err) => {
let trig = Trigger::from_open_error(&err);
let t = transition(state, trig, &backoff);
state = t.next;
state_tx.send_replace(state);
if let SessionState::Failing { attempt } = state {
stats.note_open_failure(attempt);
}
if let Some(wait) = t.wait_before_next {
tokio::time::sleep(wait).await;
}
if !t.reopen {
// Hard-fail (e.g. UnsupportedProfile): leave the
// FSM parked in Failing and exit. The supervisor
// restarts the process; the operator decides.
return;
}
}
}
}
}
@@ -32,18 +269,91 @@ impl FrameIngest {
#[derive(Clone)]
pub struct FrameIngestHandle {
tx: broadcast::Sender<Frame>,
ai_lock_tx: watch::Sender<bool>,
state_rx: watch::Receiver<SessionState>,
shutdown_tx: watch::Sender<bool>,
stats: Arc<LifecycleStats>,
clock: MonoClock,
}
impl FrameIngestHandle {
/// Subscribe to the frame stream. Consumers receive every frame after they
/// subscribed; back-pressure is implemented via broadcast channel lag (see
/// AZ-659 for the slow-consumer policy).
/// Subscribe to the frame stream. Consumers receive every frame
/// after they subscribed; back-pressure is implemented via
/// broadcast channel lag (see AZ-659 for the slow-consumer
/// policy).
pub fn subscribe(&self) -> broadcast::Receiver<Frame> {
self.tx.subscribe()
}
/// `bringCameraDown`/`bringCameraUp` per `description.md §2`. When
/// `locked == true`, every subsequently emitted frame has
/// `Frame::ai_locked = true` and downstream AI consumers
/// (detection_client, movement_detector) MUST skip detection.
/// `telemetry_stream` continues consuming so the operator sees
/// the raw stream.
pub fn set_ai_lock(&self, locked: bool) {
self.ai_lock_tx.send_replace(locked);
}
pub fn ai_locked(&self) -> bool {
*self.ai_lock_tx.borrow()
}
pub fn session_state(&self) -> SessionState {
*self.state_rx.borrow()
}
/// Subscribe to FSM state transitions. Useful for operator UI and
/// supervisor watchdogs (the latter restarts on prolonged
/// `Failing`).
pub fn session_state_stream(&self) -> watch::Receiver<SessionState> {
self.state_rx.clone()
}
pub fn reopens_total(&self) -> u64 {
self.stats.reopens_total.load(Ordering::Relaxed)
}
/// Request the lifecycle loop to drain to `Closed` and exit. The
/// loop races every transport call against this signal, so a
/// hung transport cannot wedge graceful exit.
pub fn shutdown(&self) {
self.shutdown_tx.send_replace(true);
}
pub fn health(&self) -> ComponentHealth {
ComponentHealth::disabled(NAME)
let state = self.session_state();
let now_ns = self.clock.elapsed_ns();
let last_pkt_ns = self.stats.last_packet_at_ns.load(Ordering::Relaxed);
let age = now_ns.saturating_sub(last_pkt_ns);
match state {
SessionState::Closed => ComponentHealth::disabled(NAME),
SessionState::Streaming if last_pkt_ns == 0 => {
ComponentHealth::yellow(NAME, "streaming, awaiting first packet")
}
SessionState::Streaming if age > RED_FRAME_AGE.as_nanos() as u64 => {
ComponentHealth::red(NAME, format!("last packet age {} ms", age / 1_000_000))
}
SessionState::Streaming => {
let mut h = ComponentHealth::green(NAME);
if self.ai_locked() {
h.level = HealthLevel::Yellow;
h.detail = Some("ai_locked".to_string());
}
h
}
SessionState::Connecting { attempt } => {
ComponentHealth::yellow(NAME, format!("connecting (attempt {attempt})"))
}
SessionState::Failing { attempt } => {
if age > RED_FRAME_AGE.as_nanos() as u64 {
ComponentHealth::red(NAME, format!("failing, attempt {attempt}"))
} else {
ComponentHealth::yellow(NAME, format!("failing, attempt {attempt}"))
}
}
}
}
}
@@ -54,6 +364,22 @@ mod tests {
#[test]
fn it_compiles() {
let h = FrameIngest::new(8).handle();
assert_eq!(h.health().level, shared::health::HealthLevel::Disabled);
assert_eq!(h.session_state(), SessionState::Closed);
assert_eq!(h.health().level, HealthLevel::Disabled);
}
#[test]
fn ai_lock_toggle_propagates() {
// Arrange
let ingest = FrameIngest::new(8);
let handle = ingest.handle();
// Act
handle.set_ai_lock(true);
// Assert
assert!(handle.ai_locked());
handle.set_ai_lock(false);
assert!(!handle.ai_locked());
}
}
+320
View File
@@ -0,0 +1,320 @@
//! AZ-657 integration tests — RTSP session lifecycle, bounded
//! reconnect, AI-lock plumb.
//!
//! Uses a [`FakeRtspTransport`] (not a real RTSP server) to keep tests
//! deterministic and free of external fixtures. The session lifecycle
//! FSM in `FrameIngest::run` is the production deliverable; the real
//! retina-backed transport that talks to the camera lands in AZ-658
//! alongside the H.264 decoder.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::mpsc;
use tokio::time::{timeout, Instant};
use frame_ingest::{
BackoffPolicy, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport,
SessionState, StreamError,
};
#[derive(Debug, Clone)]
enum Scripted {
OpenOk,
OpenFail(OpenErrKind),
OpenHardFail,
PacketOk,
StreamDropped,
}
#[derive(Debug, Clone, Copy)]
enum OpenErrKind {
Timeout,
Network,
}
impl OpenErrKind {
fn into_err(self) -> OpenError {
match self {
OpenErrKind::Timeout => OpenError::Timeout,
OpenErrKind::Network => OpenError::Network("connection refused".to_string()),
}
}
}
/// Test-driven RTSP transport. The lifecycle loop pulls events from
/// an mpsc channel that the test pushes into. When the channel is
/// empty the transport parks (mirroring a healthy idle RTSP open
/// that blocks until the next packet arrives). The test ends the
/// session via `FrameIngestHandle::shutdown`, which the lifecycle
/// loop observes through `tokio::select!`.
struct FakeRtspTransport {
rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<Scripted>>>,
opens: Arc<AtomicU32>,
packets_sent: Arc<AtomicU32>,
}
/// Controller side of the fake transport. The test pushes events,
/// the lifecycle loop consumes them.
struct ScriptCtl {
tx: mpsc::UnboundedSender<Scripted>,
}
impl ScriptCtl {
fn push(&self, ev: Scripted) {
self.tx.send(ev).expect("script controller channel closed");
}
}
impl FakeRtspTransport {
fn new() -> (Self, ScriptCtl, Arc<AtomicU32>, Arc<AtomicU32>) {
let (tx, rx) = mpsc::unbounded_channel();
let opens = Arc::new(AtomicU32::new(0));
let packets_sent = Arc::new(AtomicU32::new(0));
(
Self {
rx: Arc::new(tokio::sync::Mutex::new(rx)),
opens: Arc::clone(&opens),
packets_sent: Arc::clone(&packets_sent),
},
ScriptCtl { tx },
opens,
packets_sent,
)
}
fn from_script(script: Vec<Scripted>) -> (Self, ScriptCtl, Arc<AtomicU32>, Arc<AtomicU32>) {
let (t, ctl, o, p) = Self::new();
for ev in script {
ctl.push(ev);
}
(t, ctl, o, p)
}
async fn next_event(&self) -> Scripted {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(ev) => ev,
// Sender dropped → park forever; the lifecycle observes
// shutdown via select! and exits cleanly.
None => std::future::pending().await,
}
}
}
#[async_trait]
impl RtspTransport for FakeRtspTransport {
async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> {
self.opens.fetch_add(1, Ordering::Relaxed);
match self.next_event().await {
Scripted::OpenOk => Ok(()),
Scripted::OpenFail(kind) => Err(kind.into_err()),
Scripted::OpenHardFail => Err(OpenError::UnsupportedProfile {
details: "H265 main10 not supported".to_string(),
}),
other => Err(OpenError::Network(format!(
"fake transport: open called when script expected {other:?}"
))),
}
}
async fn close(&mut self) {}
async fn next_packet(&mut self) -> Result<RtspPacket, StreamError> {
match self.next_event().await {
Scripted::PacketOk => {
self.packets_sent.fetch_add(1, Ordering::Relaxed);
Ok(RtspPacket {
timestamp_rtp: 0,
payload: Bytes::from_static(b"nal-unit"),
})
}
Scripted::StreamDropped => Err(StreamError::Dropped("scripted drop".to_string())),
// Out-of-band events while streaming surface as a drop
// so the FSM re-enters the reconnect ladder.
other => Err(StreamError::Dropped(format!(
"script expected non-packet: {other:?}"
))),
}
}
}
fn fast_backoff() -> BackoffPolicy {
BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40))
}
/// AC-1 — happy path: a single `OpenOk` followed by a packet must
/// bring the FSM to `Streaming` and emit a frame on the broadcast.
#[tokio::test]
async fn ac1_open_succeeds_and_session_reaches_streaming() {
// Arrange
let (transport, _ctl, opens, packets) =
FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]);
let ingest = FrameIngest::with_backoff(8, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
let first = timeout(Duration::from_secs(1), frames.recv())
.await
.expect("frame within 1 s")
.expect("broadcast send succeeded");
// Assert — receiving the frame proves Closed → Connecting →
// Streaming was traversed; the FakeTransport parks after the
// packet so the FSM stays in Streaming.
assert!(!first.ai_locked, "ai_lock should default to false");
assert_eq!(handle.session_state(), SessionState::Streaming);
assert_eq!(opens.load(Ordering::Relaxed), 1);
assert_eq!(packets.load(Ordering::Relaxed), 1);
handle.shutdown();
let _ = timeout(Duration::from_secs(1), task)
.await
.expect("lifecycle exits on shutdown");
}
/// AC-2 — bounded reconnect: an initial failure followed by a success
/// must increment `reopens_total` and converge to `Streaming`. The
/// backoff sleeps used (initial 10 ms, doubling) must be observed via
/// elapsed wall time.
#[tokio::test]
async fn ac2_bounded_reconnect_recovers_after_transient_failure() {
// Arrange
let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![
Scripted::OpenFail(OpenErrKind::Network),
Scripted::OpenFail(OpenErrKind::Timeout),
Scripted::OpenOk,
Scripted::PacketOk,
]);
let ingest = FrameIngest::with_backoff(8, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
let started = Instant::now();
// Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
let _ = timeout(Duration::from_secs(2), frames.recv())
.await
.expect("frame within 2 s")
.expect("broadcast send succeeded");
let elapsed = started.elapsed();
// Assert
assert!(
elapsed >= Duration::from_millis(30),
"must observe two backoff sleeps (10 ms + 20 ms = 30 ms), got {elapsed:?}"
);
assert_eq!(handle.session_state(), SessionState::Streaming);
assert_eq!(opens.load(Ordering::Relaxed), 3);
handle.shutdown();
let _ = timeout(Duration::from_secs(1), task).await;
}
/// AC-2.b — stream drop after streaming starts must re-enter
/// `Failing` and reopen.
#[tokio::test]
async fn ac2b_stream_drop_increments_reopens_total() {
// Arrange
let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![
Scripted::OpenOk,
Scripted::PacketOk,
Scripted::StreamDropped,
Scripted::OpenOk,
Scripted::PacketOk,
]);
let ingest = FrameIngest::with_backoff(8, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
let _ = timeout(Duration::from_secs(1), frames.recv())
.await
.expect("first frame")
.expect("first frame ok");
let _ = timeout(Duration::from_secs(1), frames.recv())
.await
.expect("second frame")
.expect("second frame ok");
// Assert
assert!(
handle.reopens_total() >= 1,
"stream drop must record at least one reopen, got {}",
handle.reopens_total()
);
assert_eq!(opens.load(Ordering::Relaxed), 2);
assert_eq!(handle.session_state(), SessionState::Streaming);
handle.shutdown();
let _ = timeout(Duration::from_secs(1), task).await;
}
/// AC-3 — SPS/PPS mismatch must hard-fail the session. The loop
/// exits and does NOT retry, leaving the FSM in `Failing` with no
/// further opens.
#[tokio::test]
async fn ac3_unsupported_profile_hard_fails_session() {
// Arrange
let (transport, _ctl, opens, _packets) =
FakeRtspTransport::from_script(vec![Scripted::OpenHardFail]);
let ingest = FrameIngest::with_backoff(8, fast_backoff());
let handle = ingest.handle();
// Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
let _ = timeout(Duration::from_secs(1), task)
.await
.expect("lifecycle loop exits on hard-fail");
// Assert
assert!(matches!(
handle.session_state(),
SessionState::Failing { .. }
));
assert_eq!(opens.load(Ordering::Relaxed), 1, "no automatic retry");
}
/// AC-4 — AI-lock toggle: every frame emitted AFTER `set_ai_lock(true)`
/// must carry `ai_locked = true`. The test controls packet emission
/// timing via `ScriptCtl` so the toggle is guaranteed to precede the
/// second packet.
#[tokio::test]
async fn ac4_ai_lock_toggle_propagates_to_frames() {
// Arrange
let (transport, ctl, _opens, _packets) =
FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]);
let ingest = FrameIngest::with_backoff(8, fast_backoff());
let handle = ingest.handle();
let mut frames = handle.subscribe();
// Act
let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0"));
let f1 = timeout(Duration::from_secs(1), frames.recv())
.await
.expect("first frame")
.expect("first frame ok");
handle.set_ai_lock(true);
ctl.push(Scripted::PacketOk);
let f2 = timeout(Duration::from_secs(1), frames.recv())
.await
.expect("second frame")
.expect("second frame ok");
// Assert
assert!(!f1.ai_locked, "pre-toggle frame must be unlocked");
assert!(
f2.ai_locked,
"post-toggle frame must carry ai_locked = true"
);
assert!(handle.ai_locked());
handle.shutdown();
let _ = timeout(Duration::from_secs(1), task).await;
}