diff --git a/Cargo.lock b/Cargo.lock index 70caf86..9127efc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -604,7 +604,11 @@ dependencies = [ name = "frame_ingest" version = "0.1.0" dependencies = [ + "async-trait", + "bytes", + "serde", "shared", + "thiserror 1.0.69", "tokio", "tracing", ] diff --git a/_docs/02_tasks/todo/AZ-657_frame_ingest_rtsp_session.md b/_docs/02_tasks/done/AZ-657_frame_ingest_rtsp_session.md similarity index 100% rename from _docs/02_tasks/todo/AZ-657_frame_ingest_rtsp_session.md rename to _docs/02_tasks/done/AZ-657_frame_ingest_rtsp_session.md diff --git a/_docs/02_tasks/todo/AZ-682_scan_controller_state_machine.md b/_docs/02_tasks/done/AZ-682_scan_controller_state_machine.md similarity index 100% rename from _docs/02_tasks/todo/AZ-682_scan_controller_state_machine.md rename to _docs/02_tasks/done/AZ-682_scan_controller_state_machine.md diff --git a/_docs/03_implementation/batch_12_cycle1_report.md b/_docs/03_implementation/batch_12_cycle1_report.md new file mode 100644 index 0000000..60b2197 --- /dev/null +++ b/_docs/03_implementation/batch_12_cycle1_report.md @@ -0,0 +1,230 @@ +# Batch 12 / Cycle 1 — Implementation Report + +**Date**: 2026-05-20 +**Tasks**: AZ-657, AZ-682 +**Verdict**: PASS_WITH_WARNINGS (pre-existing autopilot lint pre-dates this +batch — see Findings §A1) + +## 1. Scope + +| Ticket | Title | Crate | Complexity | +|---|---|---|---| +| AZ-657 | frame_ingest RTSP session + reconnect + AI-lock | `frame_ingest` | 3 | +| AZ-682 | scan_controller typed state machine + fps-floor monitor | `scan_controller` | 5 | + +## 2. Approach + +### AZ-657 — RTSP session lifecycle + +Per the task spec, the *production deliverable* is the **session lifecycle FSM ++ bounded reconnect + AI-lock plumb**. The actual RTSP wire client (retina / +FFmpeg / GStreamer binding) is pinned in AZ-658 alongside the H.264 decoder, +because the codec choice is what pins the client. To deliver real production +code today without prematurely committing to a binding, the lifecycle is +abstracted over an `RtspTransport` trait — the same pattern AZ-653 uses for +the A40 UDP wire. + +**What this batch ships in production**: + +- `RtspSessionConfig`, `OpenError` (incl. `UnsupportedProfile` for the AC-3 + SPS/PPS hard-fail), `StreamError`, `RtspTransport` trait, `RtspPacket` + envelope. (`internal/rtsp_client.rs`) +- `SessionState` FSM (`Closed | Connecting { attempt } | Streaming | + Failing { attempt }`), pure `transition(state, trigger, backoff)`, + `BackoffPolicy` (1 s → 30 s cap per `description.md §6`), + `LifecycleStats`. (`internal/lifecycle.rs`) +- `FrameIngest::run(transport, config)` — the actor that drives the + lifecycle: opens via the transport, races every transport call against a + shutdown signal via `tokio::select!` (so a hung transport cannot wedge + graceful exit), pulls packets, stamps `Frame.ai_locked` from the + supervisor `watch::Sender`, broadcasts. (`src/lib.rs`) +- `FrameIngestHandle` — public surface: `subscribe()`, `set_ai_lock`, + `session_state`, `session_state_stream`, `reopens_total`, `shutdown`, + `health` (Disabled/Yellow/Red mapped per `description.md §6`). + +**What ships in AZ-658** (already scaffolded as the `RtspTransport` trait): + +- The real client binding (retina or FFmpeg-rs). +- The H.264/265 decoder that turns `RtspPacket` payloads into pixel buffers. +- Real-camera + MediaMTX integration tests gated behind a `--features + live-rtsp` flag. + +### AZ-682 — Scan controller state machine + +Per the task spec, scope is the **typed FSM + frame-rate floor + tick +observability**. The POI queue (AZ-683), evidence ladder (AZ-684), mapobjects +dispatch (AZ-685), and gimbal issuance (AZ-686) are intentionally left to +follow-up tickets. The FSM here is the substrate those tickets build on. + +**What this batch ships**: + +- `ScanState { ZoomedOut | ZoomedIn { roi, hold_started_at_ns } | + TargetFollow { target_id, started_at_ns } }` — typed, exhaustive, lives + in `internal/state_machine/mod.rs`. +- `Trigger` catalogue — `PoiSelected | RoiRejected | RoiHoldTimeout | + TargetConfirmed | TargetLost | OperatorReleaseFollow | OperatorAbort`. + Every `(state, trigger) → next_state` from `description.md §1/§4/§5` is + enumerated; spec-disallowed pairs return + `TransitionOutcome { accepted: false, reject_reason: + UnsupportedTransition }` instead of silently no-opping. +- `transition(state, trigger, ctx)` — pure function in + `internal/state_machine/transitions.rs`, unit-testable without spinning + up the actor. +- `FrameRateGuard` — rolling window of frame arrivals, hysteresis band + `[fps_floor, fps_clear)` to dampen oscillation, 1-second window. + Gates `ZoomedOut → ZoomedIn` per `description.md §5/§6/§8`. + (`internal/frame_rate_guard.rs`) +- `ScanController` / `ScanControllerHandle` — async-safe wrapper around a + `tokio::Mutex` holding the state, FPS guard, rolling latency + window (100 samples ≈ 10 s at 10 Hz), transition counters. Records + per-call latency on `submit_trigger` and `tick`; surfaces `health()` + yellow when fps-floor active or tick p99 > 10 ms. +- `OperatorCommand → Trigger` mapping for the kinds that don't need POI + queue context (`MissionAbort → OperatorAbort`, + `ReleaseTargetFollow → OperatorReleaseFollow`); the rest deliberately + return `NotImplemented(AZ-683/AZ-684)` so the wiring failure is loud. + +## 3. Files touched + +### AZ-657 +- `crates/frame_ingest/Cargo.toml` — added `async-trait`, `thiserror`, + `bytes`, `serde`. +- `crates/frame_ingest/src/lib.rs` — full rewrite (lifecycle loop, + handle, health). +- `crates/frame_ingest/src/internal/mod.rs` — new. +- `crates/frame_ingest/src/internal/rtsp_client.rs` — new. +- `crates/frame_ingest/src/internal/lifecycle.rs` — new. +- `crates/frame_ingest/tests/rtsp_lifecycle.rs` — new (5 ACs + fake + transport with explicit script controller). + +### AZ-682 +- `crates/scan_controller/src/lib.rs` — full rewrite (handle, metrics, + health, operator-cmd mapping). +- `crates/scan_controller/src/internal/mod.rs` — new. +- `crates/scan_controller/src/internal/state_machine/mod.rs` — new + (ScanState + Trigger + TransitionOutcome + RejectReason). +- `crates/scan_controller/src/internal/state_machine/transitions.rs` — + new (pure transition function + 7 unit tests). +- `crates/scan_controller/src/internal/frame_rate_guard.rs` — new (FPS + monitor + hysteresis + 6 unit tests). +- `crates/scan_controller/tests/state_machine.rs` — new (5 ACs). + +## 4. Test results + +| Crate | Unit | Integration | Total | +|---|---|---|---| +| `frame_ingest` | 10 | 5 | 15 | +| `scan_controller` | 18 | 5 | 23 | + +Workspace `cargo test --workspace`: 280+ tests pass, 1 ignored (pre-existing +flaky `mission_executor::state_machine::ac3_bounded_retry_then_success` +documented in batch 8 — still passes in isolation, intermittent under load, +unchanged by this batch). + +Clippy: `cargo clippy -p frame_ingest -p scan_controller --all-targets -- +-D warnings` is clean. Workspace-wide clippy hits one pre-existing dead-code +error in `autopilot/src/runtime.rs` (see Findings §A1). + +## 5. Findings (this batch) + +### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name` + +**Severity**: High (blocks workspace `-D warnings` clippy gate) +**Category**: Maintenance +**Origin**: Batch 4 (commit 69c0629, `[AZ-643] [AZ-665] [AZ-672] +mavlink+mapobjects+vlm batch 4`). Predates this batch. + +`Runtime::vlm_provider_name` is only called from `#[cfg(test)]` code in the +same file. Compiling the `autopilot` binary target without test cfg flags +it as dead code, which under `-D warnings` becomes an error. Not introduced +by AZ-657 or AZ-682 — confirmed by stashing this batch and running clippy +against batch-11 HEAD. + +Per `coderule.mdc` "Pre-existing lint errors should only be fixed if they're +in the modified area" → not fixed here. Recorded as a leftover for a +follow-up sweep: + +→ See `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`. + +### A2. AZ-682 `Inner` fields surfaced via new `metrics()` API + +**Severity**: Low (would have been dead-code in clippy) +**Resolution**: Added `pub async fn metrics() -> ScanMetrics` returning +`transitions_total`, `rejected_total`, `last_state_change_ns`, +`tick_latency_p99_us` — fields are now publicly observable per the +documented health surface in `description.md §3`. No deferred warning. + +### A3. Spec drift — `module-layout.md` is now out of date for `frame_ingest` + and `scan_controller` + +**Severity**: Low (Architecture) +**Detail**: `module-layout.md` already lists the right internal paths for +both components, but `gimbal_controller` and now `frame_ingest` / +`scan_controller` have actual files present that the doc does not yet +enumerate by stable name (sweep.rs/smooth_pan.rs/centre_on_target.rs/ +transport.rs from batches 10-11 are still pending; this batch adds +lifecycle.rs/rtsp_client.rs/state_machine/{mod,transitions}.rs/ +frame_rate_guard.rs). + +Cumulative leftover with batches 10-11 — same item, deferred to the +documentation sync sweep. + +### A4. Spec drift — `data_model.md §PanPlan` still missing from batch 11 + +**Severity**: Low (Architecture) +**Detail**: Carried from batch 11 — `PanPlan` / `PanGoal` exist in +`crates/shared/src/models/gimbal.rs` but are not enumerated in +`data_model.md`. Unchanged by this batch. + +## 6. Cumulative code review — batches 10, 11, 12 + +The autodev cadence is "cumulative code review every 3 batches". Inputs: +batch 10 (AZ-653 A40 UDP transport), batch 11 (AZ-654/655/656 sweep/ +smooth_pan/centre_on_target + MonoClock fix), batch 12 (AZ-657 RTSP +lifecycle + AZ-682 scan FSM). + +### Cumulative findings + +| ID | Severity | Category | Status | +|---|---|---|---| +| C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) | +| C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) | +| C3 | Low | Architecture | OPEN — `module-layout.md` drift: `gimbal_controller/internal/transport.rs`, `sweep.rs`, `smooth_pan.rs`, `centre_on_target.rs`, `frame_ingest/internal/{lifecycle,rtsp_client}.rs`, `scan_controller/internal/{state_machine,frame_rate_guard}.rs` | +| C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) | +| C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy (batch 4 origin) | + +### Cross-batch positive observations + +- **Pattern consistency**: AZ-653 (A40Transport trait), AZ-655 (PlanExecutor + taking real Instant clock), AZ-657 (RtspTransport trait) all follow the + same "trait + real impl + fake-for-tests" pattern. This is starting to + look like a workspace idiom worth documenting in `coderule.mdc` — + candidate rule: "wire I/O behind a trait; production impl talks to real + hardware; test impl is in-memory / deterministic; bound the trait in + one place to keep the abstraction thin". +- **MonoClock adoption**: AZ-653's flawed `SystemTime::now()` was caught + by AZ-656 (batch 11) and fixed. AZ-657 and AZ-682 both depend on + `shared::clock::MonoClock` directly from the start — no repeat of the + bug. +- **Error-typing discipline**: AZ-657's `OpenError::UnsupportedProfile` + and AZ-682's `RejectReason::UnsupportedTransition` both use the typed + refusal pattern instead of silent no-op or panic. Good practice that's + now consistent across the brain (scan_controller) and the perception + edge (frame_ingest). + +### Cumulative recommendation + +None of C1–C5 are blockers for batch 12. C5 is the most pressing and is +recorded as a non-user-input leftover for next autodev tick. C3 / C4 are +documentation sync that should land before the next architecture review. + +## 7. Next-batch candidates + +The natural follow-on to batch 12 is: + +- **AZ-658** — frame_ingest decoder (the H.264 decode that turns + `RtspPacket.payload` into a real `Frame.pixels` buffer). Needs the + retina/ffmpeg pin decision. +- **AZ-683** — scan_controller POI queue + ≤5/min cap + operator-decision + window. Uses the AZ-682 FSM as the substrate. +- **AZ-659** — frame_ingest publisher (slow-consumer drop policy). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 02d2566..3121feb 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 17 + phase: 20 name: tracker-update-in-testing - detail: "batch 11 (AZ-654/655/656) committed; awaiting In Testing + push" + detail: "batch 12 (AZ-657 + AZ-682) implemented and reviewed; awaiting commit + In Testing" retry_count: 0 cycle: 1 tracker: jira diff --git a/_docs/_process_leftovers/2026-05-20_autopilot_clippy.md b/_docs/_process_leftovers/2026-05-20_autopilot_clippy.md new file mode 100644 index 0000000..eb8d1cb --- /dev/null +++ b/_docs/_process_leftovers/2026-05-20_autopilot_clippy.md @@ -0,0 +1,59 @@ +# Leftover — autopilot dead-code clippy gate + +- **Timestamp**: 2026-05-20T05:30:00Z +- **Source**: discovered during batch 12 (`AZ-657` + `AZ-682`) +- **Origin**: commit `69c0629` — `[AZ-643] [AZ-665] [AZ-672] + mavlink+mapobjects+vlm batch 4` +- **Blocked operation**: `cargo clippy --workspace --all-targets -- + -D warnings` + +## Symptom + +``` +error: method `vlm_provider_name` is never used + --> crates/autopilot/src/runtime.rs:84:12 + | +58 | impl Runtime { + | ------------ method in this implementation +... +84 | pub fn vlm_provider_name(&self) -> &'static str { + | ^^^^^^^^^^^^^^^^^ + | + = note: `-D dead-code` implied by `-D warnings` +``` + +`Runtime::vlm_provider_name` is only called from `#[cfg(test)]` code in the +same file (`runtime.rs:215`, `runtime.rs:228`). Compiling the `autopilot` +binary target without test cfg flags it as dead code; under `-D warnings` +this is an error. + +## Why not fixed in batch 12 + +Per `.cursor/rules/coderule.mdc`: + +> Pre-existing lint errors should only be fixed if they're in the modified +> area. + +The autopilot crate is outside the AZ-657 / AZ-682 scope (which touch +`frame_ingest` and `scan_controller` only). Fixing this would expand scope +and obscure the batch-12 diff. The lint must be cleared before the next +CI gate that enforces workspace `-D warnings`. + +## Recommended fix + +Pick the smallest of: + +1. `#[cfg(test)]` on the method (it's only called from tests). +2. `#[allow(dead_code)]` on the method. +3. Add a real (non-test) caller — e.g. expose it through the `/health` + JSON so the field becomes load-bearing. + +Option (3) is preferred because it surfaces a useful field; (1) is the +narrowest change. + +## Replay + +This leftover requires no Jira write — it is a code-quality gate. Replay +on the next autodev tick by either folding (3) into a relevant batch +(any batch that touches `autopilot/src/runtime.rs` or the health surface) +or opening a small standalone Maintenance ticket. diff --git a/crates/frame_ingest/Cargo.toml b/crates/frame_ingest/Cargo.toml index 49e6f4f..1dbba3f 100644 --- a/crates/frame_ingest/Cargo.toml +++ b/crates/frame_ingest/Cargo.toml @@ -11,3 +11,10 @@ authors.workspace = true shared = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +async-trait = { workspace = true } +thiserror = { workspace = true } +bytes = { workspace = true } +serde = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/frame_ingest/src/internal/lifecycle.rs b/crates/frame_ingest/src/internal/lifecycle.rs new file mode 100644 index 0000000..a370f9b --- /dev/null +++ b/crates/frame_ingest/src/internal/lifecycle.rs @@ -0,0 +1,341 @@ +//! AZ-657 — RTSP session lifecycle FSM. +//! +//! Owns the state transitions between `Closed → Connecting → Streaming +//! → Failing → Connecting → …` and the bounded exponential backoff. +//! Pure FSM logic + `LifecycleStats` are tested in this module; the +//! end-to-end loop that drives the FSM against a transport lives in +//! [`super::super::FrameIngest::run`]. + +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use serde::Serialize; + +use super::rtsp_client::{OpenError, StreamError}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(tag = "state", rename_all = "snake_case")] +pub enum SessionState { + Closed, + /// `attempt` is 1 on the first open attempt and increments by 1 + /// every time a reopen is launched. + Connecting { + attempt: u32, + }, + Streaming, + /// Backoff is active. `attempt` is the attempt that just failed + /// (0 if we landed here from a stream drop without any preceding + /// failed open). The next `OpenAttempted` transitions to + /// `Connecting { attempt: attempt + 1 }`. + Failing { + attempt: u32, + }, +} + +/// Result of feeding a transport event into the FSM. `wait_before_next` +/// is `Some(d)` when the FSM has moved into `Failing` and the loop +/// owes a `tokio::time::sleep(d)` before re-attempting open. +#[derive(Debug, PartialEq, Eq)] +pub struct Transition { + pub next: SessionState, + pub wait_before_next: Option, + pub reopen: bool, +} + +/// Bounded exponential backoff per `description.md §6` (1 s → 30 s +/// cap). Pure value object, no I/O. +#[derive(Debug, Clone, Copy)] +pub struct BackoffPolicy { + pub initial: Duration, + pub cap: Duration, + pub factor: u32, +} + +impl BackoffPolicy { + pub fn new(initial: Duration, cap: Duration) -> Self { + Self { + initial, + cap, + factor: 2, + } + } + + /// `attempt` is 1-indexed: first failure waits `initial`, second + /// waits `initial * factor`, capped at `cap`. Saturating math + /// guards against overflow on pathological backoff configs. + pub fn next_delay(&self, attempt: u32) -> Duration { + if attempt == 0 { + return self.initial; + } + let exp = attempt.saturating_sub(1); + let mult = self.factor.saturating_pow(exp); + let raw = self.initial.saturating_mul(mult); + raw.min(self.cap) + } +} + +/// Pure transition function: given the current state + the latest +/// transport event, return the next state and (optionally) the +/// backoff delay the loop must sleep before reopening. +/// +/// Triggers: +/// - [`Trigger::OpenAttempted`] — loop entering `Connecting`. +/// - [`Trigger::OpenSucceeded`] — transport `open()` returned `Ok`. +/// - [`Trigger::OpenFailed`] — transport `open()` returned an error +/// that is NOT a hard-fail (e.g. transient network). +/// - [`Trigger::HardFail`] — `OpenError::UnsupportedProfile` per AC-3. +/// The session does NOT auto-reopen; the FSM stays in `Failing` +/// indefinitely until an operator-driven reset. +/// - [`Trigger::StreamDropped`] — `next_packet` returned a +/// `StreamError`, including `EndOfStream`. +/// - [`Trigger::Closed`] — supervisor-driven shutdown. +#[derive(Debug, PartialEq, Eq)] +pub enum Trigger { + OpenAttempted, + OpenSucceeded, + OpenFailed, + HardFail, + StreamDropped, + Closed, +} + +impl Trigger { + pub fn from_open_error(err: &OpenError) -> Self { + match err { + OpenError::UnsupportedProfile { .. } => Trigger::HardFail, + _ => Trigger::OpenFailed, + } + } + + pub fn from_stream_error(_err: &StreamError) -> Self { + Trigger::StreamDropped + } +} + +pub fn transition(state: SessionState, trigger: Trigger, backoff: &BackoffPolicy) -> Transition { + match (state, trigger) { + (_, Trigger::Closed) => Transition { + next: SessionState::Closed, + wait_before_next: None, + reopen: false, + }, + (SessionState::Closed, Trigger::OpenAttempted) => Transition { + next: SessionState::Connecting { attempt: 1 }, + wait_before_next: None, + reopen: false, + }, + (SessionState::Failing { attempt }, Trigger::OpenAttempted) => Transition { + next: SessionState::Connecting { + attempt: attempt.saturating_add(1), + }, + wait_before_next: None, + reopen: false, + }, + (SessionState::Connecting { .. }, Trigger::OpenSucceeded) => Transition { + next: SessionState::Streaming, + wait_before_next: None, + reopen: false, + }, + (SessionState::Connecting { attempt }, Trigger::OpenFailed) => Transition { + next: SessionState::Failing { attempt }, + wait_before_next: Some(backoff.next_delay(attempt)), + reopen: true, + }, + (SessionState::Connecting { .. } | SessionState::Failing { .. }, Trigger::HardFail) => { + Transition { + next: SessionState::Failing { attempt: u32::MAX }, + wait_before_next: None, + reopen: false, + } + } + (SessionState::Streaming, Trigger::StreamDropped) => Transition { + // A drop hasn't failed an open yet; record `attempt = 0` + // so the next OpenAttempted → `Connecting { attempt: 1 }`. + next: SessionState::Failing { attempt: 0 }, + wait_before_next: Some(backoff.next_delay(1)), + reopen: true, + }, + // Defensive: any unexpected (state, trigger) combo is a + // no-op — the FSM stays put. A transport bug cannot crash + // the lifecycle loop. + _ => Transition { + next: state, + wait_before_next: None, + reopen: false, + }, + } +} + +/// Process-wide counters consumed by `FrameIngestHandle::health`. +/// Kept lock-free so the lifecycle loop never blocks on metric +/// updates. +#[derive(Debug, Default)] +pub struct LifecycleStats { + pub reopens_total: AtomicU64, + pub open_failures_total: AtomicU64, + pub last_packet_at_ns: AtomicU64, + pub current_attempt: AtomicU32, +} + +impl LifecycleStats { + pub fn new() -> Arc { + Arc::new(Self::default()) + } + + pub fn note_reopen(&self) { + self.reopens_total.fetch_add(1, Ordering::Relaxed); + } + + pub fn note_open_failure(&self, attempt: u32) { + self.open_failures_total.fetch_add(1, Ordering::Relaxed); + self.current_attempt.store(attempt, Ordering::Relaxed); + } + + pub fn note_streaming(&self) { + self.current_attempt.store(0, Ordering::Relaxed); + } + + pub fn note_packet(&self, ts_ns: u64) { + self.last_packet_at_ns.store(ts_ns, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn policy() -> BackoffPolicy { + BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30)) + } + + #[test] + fn backoff_increments_then_caps() { + // Arrange + let p = policy(); + + // Assert + assert_eq!(p.next_delay(1), Duration::from_millis(100)); + assert_eq!(p.next_delay(2), Duration::from_millis(200)); + assert_eq!(p.next_delay(3), Duration::from_millis(400)); + assert!(p.next_delay(20) <= p.cap); + assert_eq!(p.next_delay(20), p.cap); + } + + #[test] + fn happy_path_closed_to_streaming() { + // Arrange + let p = policy(); + + // Act + let t1 = transition(SessionState::Closed, Trigger::OpenAttempted, &p); + let t2 = transition(t1.next, Trigger::OpenSucceeded, &p); + + // Assert + assert_eq!(t1.next, SessionState::Connecting { attempt: 1 }); + assert_eq!(t2.next, SessionState::Streaming); + assert!(t2.wait_before_next.is_none()); + } + + #[test] + fn open_failure_enters_failing_with_backoff() { + // Arrange + let p = policy(); + + // Act + let connecting = transition(SessionState::Closed, Trigger::OpenAttempted, &p).next; + let t = transition(connecting, Trigger::OpenFailed, &p); + + // Assert + assert_eq!(t.next, SessionState::Failing { attempt: 1 }); + assert_eq!(t.wait_before_next, Some(Duration::from_millis(100))); + assert!(t.reopen); + } + + #[test] + fn repeated_failures_grow_backoff() { + // Arrange + let p = policy(); + let mut state = SessionState::Closed; + let mut delays = vec![]; + + // Act — drive the loop the same way `lifecycle_loop` does: + // OpenAttempted → OpenFailed cycle, attempt grows by one + // per cycle. + for _ in 0..4 { + state = transition(state, Trigger::OpenAttempted, &p).next; + let t = transition(state, Trigger::OpenFailed, &p); + delays.push(t.wait_before_next.unwrap()); + state = t.next; + } + + // Assert + assert_eq!(delays[0], Duration::from_millis(100)); + assert_eq!(delays[1], Duration::from_millis(200)); + assert_eq!(delays[2], Duration::from_millis(400)); + assert_eq!(delays[3], Duration::from_millis(800)); + } + + #[test] + fn stream_drop_triggers_reopen_at_initial_delay() { + // Arrange + let p = policy(); + + // Act + let t = transition(SessionState::Streaming, Trigger::StreamDropped, &p); + + // Assert + assert_eq!(t.next, SessionState::Failing { attempt: 0 }); + assert!(t.reopen); + assert_eq!(t.wait_before_next, Some(Duration::from_millis(100))); + } + + #[test] + fn hard_fail_stays_failing_without_reopen() { + // Arrange + let p = policy(); + + // Act + let t = transition( + SessionState::Connecting { attempt: 1 }, + Trigger::HardFail, + &p, + ); + + // Assert + assert_eq!(t.next, SessionState::Failing { attempt: u32::MAX }); + assert!(!t.reopen); + assert_eq!(t.wait_before_next, None); + } + + #[test] + fn closed_trigger_resets_from_any_state() { + // Arrange + let p = policy(); + + // Assert + for s in [ + SessionState::Closed, + SessionState::Connecting { attempt: 3 }, + SessionState::Streaming, + SessionState::Failing { attempt: 7 }, + ] { + let t = transition(s, Trigger::Closed, &p); + assert_eq!(t.next, SessionState::Closed); + assert!(!t.reopen); + } + } + + #[test] + fn from_open_error_maps_unsupported_profile_to_hard_fail() { + // Arrange + let unsupported = OpenError::UnsupportedProfile { + details: "H265 main10".to_string(), + }; + let transient = OpenError::Timeout; + + // Assert + assert_eq!(Trigger::from_open_error(&unsupported), Trigger::HardFail); + assert_eq!(Trigger::from_open_error(&transient), Trigger::OpenFailed); + } +} diff --git a/crates/frame_ingest/src/internal/mod.rs b/crates/frame_ingest/src/internal/mod.rs new file mode 100644 index 0000000..8f7aaf9 --- /dev/null +++ b/crates/frame_ingest/src/internal/mod.rs @@ -0,0 +1,4 @@ +//! Internal modules for `frame_ingest`. Not part of the public API. + +pub mod lifecycle; +pub mod rtsp_client; diff --git a/crates/frame_ingest/src/internal/rtsp_client.rs b/crates/frame_ingest/src/internal/rtsp_client.rs new file mode 100644 index 0000000..a53d33b --- /dev/null +++ b/crates/frame_ingest/src/internal/rtsp_client.rs @@ -0,0 +1,117 @@ +//! AZ-657 — RTSP transport abstraction. +//! +//! The session lifecycle (open / reconnect / AI-lock) is the production +//! deliverable of AZ-657 and lives in [`super::lifecycle`]. The +//! transport that actually speaks RTSP to the camera is wired in +//! through this trait so: +//! +//! - **Production**: a real client (retina or FFmpeg/GStreamer binding, +//! pinned by AZ-658) opens RTSP against the ViewPro A40 and pushes +//! raw NAL units up to the decoder. The full client is folded into +//! AZ-658 alongside the decoder because the codec choice is what +//! pins the client. +//! - **Tests**: a fake transport drives the lifecycle deterministically +//! without needing MediaMTX / Docker. This is the same `*Transport` +//! pattern AZ-653 uses for the A40 UDP wire. +//! +//! What AZ-657 owns regardless of transport: +//! - `RtspSessionConfig` (url, transport hint, backoff override). +//! - `OpenError` / `StreamError` taxonomy (including the +//! `UnsupportedProfile` hard-fail required by AC-3). +//! - The `RtspTransport` trait every transport must implement. + +use std::time::Duration; + +use async_trait::async_trait; +use thiserror::Error; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RtspTransportHint { + Tcp, + Udp, + Auto, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct RtspSessionConfig { + pub url: String, + pub transport: RtspTransportHint, + pub open_timeout: Duration, + pub backoff_initial: Duration, + pub backoff_cap: Duration, +} + +impl RtspSessionConfig { + /// Builder default per `description.md §8`: ≤5 s reconnect target + /// drives `backoff_initial = 1 s`, `backoff_cap = 30 s`. The + /// open_timeout is conservative (≤2 s) to match AC-1. + pub fn new(url: impl Into) -> Self { + Self { + url: url.into(), + transport: RtspTransportHint::Auto, + open_timeout: Duration::from_secs(2), + backoff_initial: Duration::from_secs(1), + backoff_cap: Duration::from_secs(30), + } + } +} + +/// Errors returned from `RtspTransport::open`. `UnsupportedProfile` is +/// the AC-3 hard-fail: if the camera negotiates a codec / profile the +/// decoder cannot consume, the session must fail with this typed +/// variant instead of silently picking a wrong decode path. +#[derive(Debug, Error)] +pub enum OpenError { + #[error("RTSP open timed out")] + Timeout, + + #[error("RTSP network error: {0}")] + Network(String), + + #[error("RTSP authentication failed")] + AuthFailed, + + #[error("RTSP unsupported codec profile: {details}")] + UnsupportedProfile { details: String }, +} + +/// Errors emitted from `RtspTransport::next_packet`. Distinguished +/// from `OpenError` because reconnect policy differs: stream loss +/// triggers backoff + reopen; an open-time hard error (e.g. +/// `UnsupportedProfile`) escalates to the session's `failing` state +/// and surfaces health red. +#[derive(Debug, Error)] +pub enum StreamError { + #[error("RTSP stream ended (clean EOS)")] + EndOfStream, + + #[error("RTSP stream dropped: {0}")] + Dropped(String), + + #[error("RTSP read timed out")] + ReadTimeout, +} + +/// Trait the lifecycle FSM consumes. Implementors are responsible for +/// real RTSP I/O OR for simulating it in tests. Lifetime semantics: +/// `open` must be safe to call repeatedly (the FSM calls it on every +/// reconnect attempt); `close` must be safe to call on a not-yet-open +/// transport. +#[async_trait] +pub trait RtspTransport: Send + Sync { + async fn open(&mut self, config: &RtspSessionConfig) -> Result<(), OpenError>; + async fn close(&mut self); + /// Returns the next packet from the open session, or a + /// `StreamError` if the session has dropped. Calls after the FSM + /// reaches `Failing` state are not expected. + async fn next_packet(&mut self) -> Result; +} + +/// Minimal envelope carrying one inbound RTSP unit. AZ-657's lifecycle +/// loop only counts packets and timestamps them; AZ-658 parses the +/// `payload` bytes through the H.264/265 decoder. +#[derive(Debug, Clone)] +pub struct RtspPacket { + pub timestamp_rtp: u32, + pub payload: bytes::Bytes, +} diff --git a/crates/frame_ingest/src/lib.rs b/crates/frame_ingest/src/lib.rs index 195d4b8..f05110d 100644 --- a/crates/frame_ingest/src/lib.rs +++ b/crates/frame_ingest/src/lib.rs @@ -1,30 +1,267 @@ //! `frame_ingest` — RTSP pull + decode + timestamp. //! //! Real implementation lands in: -//! - AZ-657 `frame_ingest_rtsp_session` -//! - AZ-658 `frame_ingest_decoder` -//! - AZ-659 `frame_ingest_publisher` +//! - AZ-657 `frame_ingest_rtsp_session` — session lifecycle + bounded +//! reconnect + AI-lock plumb (this crate, modules in `internal/`). +//! - AZ-658 `frame_ingest_decoder` — H.264/265 decode into raw +//! pixel buffers + retina/FFmpeg/GStreamer transport binding. +//! - AZ-659 `frame_ingest_publisher` — bounded broadcast + per-consumer +//! drop policy. +//! +//! ## AZ-657 surface +//! +//! - [`FrameIngest::new`] — construct in `Closed` state. +//! - [`FrameIngest::run`] — spawn the lifecycle loop driving the given +//! `RtspTransport` through `connect → stream → reconnect` cycles +//! with bounded backoff. Returns a `JoinHandle`. +//! - [`FrameIngestHandle::subscribe`] — broadcast frame stream (the +//! AZ-657 lifecycle emits only synthetic header frames; real +//! decoded frames come in AZ-658). +//! - [`FrameIngestHandle::set_ai_lock`] — `bringCameraDown` / +//! `bringCameraUp` signal. Stamps `Frame.ai_locked` on every +//! subsequently emitted frame. +//! - [`FrameIngestHandle::session_state`] — current FSM state. +//! - [`FrameIngestHandle::health`] — `ComponentHealth` reflecting the +//! FSM state + `last_packet_age` + `ai_locked`. -use tokio::sync::broadcast; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; -use shared::health::ComponentHealth; +use tokio::sync::{broadcast, watch, Mutex}; +use tokio::task::JoinHandle; + +use shared::clock::MonoClock; +use shared::health::{ComponentHealth, HealthLevel}; use shared::models::frame::Frame; +pub mod internal; + +pub use internal::lifecycle::{BackoffPolicy, LifecycleStats, SessionState}; +pub use internal::rtsp_client::{ + OpenError, RtspPacket, RtspSessionConfig, RtspTransport, RtspTransportHint, StreamError, +}; + +use internal::lifecycle::{transition, Trigger}; + const NAME: &str = "frame_ingest"; +/// Threshold past which `health()` flips to `Red` while the session is +/// not `Streaming`. Aligned with `description.md §6` (red after +/// `last_frame_age_ms` exceeds a configured threshold). +const RED_FRAME_AGE: Duration = Duration::from_secs(5); + pub struct FrameIngest { tx: broadcast::Sender, + ai_lock_tx: watch::Sender, + state_tx: watch::Sender, + shutdown_tx: watch::Sender, + stats: Arc, + backoff: BackoffPolicy, + clock: MonoClock, } impl FrameIngest { pub fn new(channel_capacity: usize) -> Self { + Self::with_backoff( + channel_capacity, + BackoffPolicy::new(Duration::from_secs(1), Duration::from_secs(30)), + ) + } + + pub fn with_backoff(channel_capacity: usize, backoff: BackoffPolicy) -> Self { let (tx, _rx) = broadcast::channel(channel_capacity); - Self { tx } + let (ai_lock_tx, _) = watch::channel(false); + let (state_tx, _) = watch::channel(SessionState::Closed); + let (shutdown_tx, _) = watch::channel(false); + Self { + tx, + ai_lock_tx, + state_tx, + shutdown_tx, + stats: LifecycleStats::new(), + backoff, + clock: MonoClock::new(), + } } pub fn handle(&self) -> FrameIngestHandle { FrameIngestHandle { tx: self.tx.clone(), + ai_lock_tx: self.ai_lock_tx.clone(), + state_rx: self.state_tx.subscribe(), + shutdown_tx: self.shutdown_tx.clone(), + stats: Arc::clone(&self.stats), + clock: self.clock, + } + } + + /// Spawn the lifecycle loop. The returned handle resolves when + /// the loop exits (shutdown signalled via + /// [`FrameIngestHandle::shutdown`] or a hard-fail trapped the FSM). + pub fn run(&self, transport: T, config: RtspSessionConfig) -> JoinHandle<()> + where + T: RtspTransport + 'static, + { + let tx = self.tx.clone(); + let ai_lock = self.ai_lock_tx.subscribe(); + let state_tx = self.state_tx.clone(); + let shutdown_rx = self.shutdown_tx.subscribe(); + let stats = Arc::clone(&self.stats); + let backoff = self.backoff; + let clock = self.clock; + let transport = Arc::new(Mutex::new(transport)); + + tokio::spawn(async move { + lifecycle_loop( + transport, + config, + tx, + ai_lock, + state_tx, + shutdown_rx, + stats, + backoff, + clock, + ) + .await; + }) + } +} + +fn is_shutdown(rx: &watch::Receiver) -> bool { + *rx.borrow() +} + +#[allow(clippy::too_many_arguments)] +async fn lifecycle_loop( + transport: Arc>, + config: RtspSessionConfig, + tx: broadcast::Sender, + mut ai_lock: watch::Receiver, + state_tx: watch::Sender, + mut shutdown_rx: watch::Receiver, + stats: Arc, + backoff: BackoffPolicy, + clock: MonoClock, +) where + T: RtspTransport, +{ + let mut state = SessionState::Closed; + let mut seq: u64 = 0; + + loop { + if is_shutdown(&shutdown_rx) { + let mut t = transport.lock().await; + t.close().await; + state_tx.send_replace(SessionState::Closed); + return; + } + + state = transition(state, Trigger::OpenAttempted, &backoff).next; + state_tx.send_replace(state); + + // Race the open call against shutdown so a hung transport + // (real RTSP can block on `DESCRIBE` for many seconds) cannot + // wedge graceful exit. + let open_result = tokio::select! { + biased; + res = async { + let mut t = transport.lock().await; + t.open(&config).await + } => res, + _ = shutdown_rx.changed() => { + let mut t = transport.lock().await; + t.close().await; + state_tx.send_replace(SessionState::Closed); + return; + } + }; + + match open_result { + Ok(()) => { + state = transition(state, Trigger::OpenSucceeded, &backoff).next; + state_tx.send_replace(state); + stats.note_streaming(); + + loop { + let packet = tokio::select! { + biased; + res = async { + let mut t = transport.lock().await; + t.next_packet().await + } => Some(res), + _ = shutdown_rx.changed() => None, + }; + + let Some(packet) = packet else { + let mut t = transport.lock().await; + t.close().await; + state_tx.send_replace(SessionState::Closed); + return; + }; + + match packet { + Ok(pkt) => { + let now_ns = clock.elapsed_ns(); + stats.note_packet(now_ns); + let locked = *ai_lock.borrow_and_update(); + // AZ-657 emits a synthetic frame envelope + // per inbound RTSP packet so the lifecycle + // FSM can be exercised end-to-end without + // the decoder (AZ-658 swaps this for the + // actual decoded frame). + let frame = Frame { + seq, + capture_ts_monotonic_ns: now_ns, + decode_ts_monotonic_ns: now_ns, + pixels: Arc::new(pkt.payload), + width: 0, + height: 0, + pix_fmt: shared::models::frame::PixelFormat::Nv12, + ai_locked: locked, + }; + seq = seq.saturating_add(1); + // A no-subscriber send is a no-op error in + // the broadcast channel; the lifecycle + // does not care. + let _ = tx.send(frame); + } + Err(e) => { + let trig = Trigger::from_stream_error(&e); + let t = transition(state, trig, &backoff); + state = t.next; + state_tx.send_replace(state); + stats.note_reopen(); + if let Some(wait) = t.wait_before_next { + tokio::time::sleep(wait).await; + } + if !t.reopen { + return; + } + break; + } + } + } + } + Err(err) => { + let trig = Trigger::from_open_error(&err); + let t = transition(state, trig, &backoff); + state = t.next; + state_tx.send_replace(state); + if let SessionState::Failing { attempt } = state { + stats.note_open_failure(attempt); + } + if let Some(wait) = t.wait_before_next { + tokio::time::sleep(wait).await; + } + if !t.reopen { + // Hard-fail (e.g. UnsupportedProfile): leave the + // FSM parked in Failing and exit. The supervisor + // restarts the process; the operator decides. + return; + } + } } } } @@ -32,18 +269,91 @@ impl FrameIngest { #[derive(Clone)] pub struct FrameIngestHandle { tx: broadcast::Sender, + ai_lock_tx: watch::Sender, + state_rx: watch::Receiver, + shutdown_tx: watch::Sender, + stats: Arc, + clock: MonoClock, } impl FrameIngestHandle { - /// Subscribe to the frame stream. Consumers receive every frame after they - /// subscribed; back-pressure is implemented via broadcast channel lag (see - /// AZ-659 for the slow-consumer policy). + /// Subscribe to the frame stream. Consumers receive every frame + /// after they subscribed; back-pressure is implemented via + /// broadcast channel lag (see AZ-659 for the slow-consumer + /// policy). pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } + /// `bringCameraDown`/`bringCameraUp` per `description.md §2`. When + /// `locked == true`, every subsequently emitted frame has + /// `Frame::ai_locked = true` and downstream AI consumers + /// (detection_client, movement_detector) MUST skip detection. + /// `telemetry_stream` continues consuming so the operator sees + /// the raw stream. + pub fn set_ai_lock(&self, locked: bool) { + self.ai_lock_tx.send_replace(locked); + } + + pub fn ai_locked(&self) -> bool { + *self.ai_lock_tx.borrow() + } + + pub fn session_state(&self) -> SessionState { + *self.state_rx.borrow() + } + + /// Subscribe to FSM state transitions. Useful for operator UI and + /// supervisor watchdogs (the latter restarts on prolonged + /// `Failing`). + pub fn session_state_stream(&self) -> watch::Receiver { + self.state_rx.clone() + } + + pub fn reopens_total(&self) -> u64 { + self.stats.reopens_total.load(Ordering::Relaxed) + } + + /// Request the lifecycle loop to drain to `Closed` and exit. The + /// loop races every transport call against this signal, so a + /// hung transport cannot wedge graceful exit. + pub fn shutdown(&self) { + self.shutdown_tx.send_replace(true); + } + pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + let state = self.session_state(); + let now_ns = self.clock.elapsed_ns(); + let last_pkt_ns = self.stats.last_packet_at_ns.load(Ordering::Relaxed); + let age = now_ns.saturating_sub(last_pkt_ns); + + match state { + SessionState::Closed => ComponentHealth::disabled(NAME), + SessionState::Streaming if last_pkt_ns == 0 => { + ComponentHealth::yellow(NAME, "streaming, awaiting first packet") + } + SessionState::Streaming if age > RED_FRAME_AGE.as_nanos() as u64 => { + ComponentHealth::red(NAME, format!("last packet age {} ms", age / 1_000_000)) + } + SessionState::Streaming => { + let mut h = ComponentHealth::green(NAME); + if self.ai_locked() { + h.level = HealthLevel::Yellow; + h.detail = Some("ai_locked".to_string()); + } + h + } + SessionState::Connecting { attempt } => { + ComponentHealth::yellow(NAME, format!("connecting (attempt {attempt})")) + } + SessionState::Failing { attempt } => { + if age > RED_FRAME_AGE.as_nanos() as u64 { + ComponentHealth::red(NAME, format!("failing, attempt {attempt}")) + } else { + ComponentHealth::yellow(NAME, format!("failing, attempt {attempt}")) + } + } + } } } @@ -54,6 +364,22 @@ mod tests { #[test] fn it_compiles() { let h = FrameIngest::new(8).handle(); - assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + assert_eq!(h.session_state(), SessionState::Closed); + assert_eq!(h.health().level, HealthLevel::Disabled); + } + + #[test] + fn ai_lock_toggle_propagates() { + // Arrange + let ingest = FrameIngest::new(8); + let handle = ingest.handle(); + + // Act + handle.set_ai_lock(true); + + // Assert + assert!(handle.ai_locked()); + handle.set_ai_lock(false); + assert!(!handle.ai_locked()); } } diff --git a/crates/frame_ingest/tests/rtsp_lifecycle.rs b/crates/frame_ingest/tests/rtsp_lifecycle.rs new file mode 100644 index 0000000..4af4289 --- /dev/null +++ b/crates/frame_ingest/tests/rtsp_lifecycle.rs @@ -0,0 +1,320 @@ +//! AZ-657 integration tests — RTSP session lifecycle, bounded +//! reconnect, AI-lock plumb. +//! +//! Uses a [`FakeRtspTransport`] (not a real RTSP server) to keep tests +//! deterministic and free of external fixtures. The session lifecycle +//! FSM in `FrameIngest::run` is the production deliverable; the real +//! retina-backed transport that talks to the camera lands in AZ-658 +//! alongside the H.264 decoder. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use tokio::sync::mpsc; +use tokio::time::{timeout, Instant}; + +use frame_ingest::{ + BackoffPolicy, FrameIngest, OpenError, RtspPacket, RtspSessionConfig, RtspTransport, + SessionState, StreamError, +}; + +#[derive(Debug, Clone)] +enum Scripted { + OpenOk, + OpenFail(OpenErrKind), + OpenHardFail, + PacketOk, + StreamDropped, +} + +#[derive(Debug, Clone, Copy)] +enum OpenErrKind { + Timeout, + Network, +} + +impl OpenErrKind { + fn into_err(self) -> OpenError { + match self { + OpenErrKind::Timeout => OpenError::Timeout, + OpenErrKind::Network => OpenError::Network("connection refused".to_string()), + } + } +} + +/// Test-driven RTSP transport. The lifecycle loop pulls events from +/// an mpsc channel that the test pushes into. When the channel is +/// empty the transport parks (mirroring a healthy idle RTSP open +/// that blocks until the next packet arrives). The test ends the +/// session via `FrameIngestHandle::shutdown`, which the lifecycle +/// loop observes through `tokio::select!`. +struct FakeRtspTransport { + rx: Arc>>, + opens: Arc, + packets_sent: Arc, +} + +/// Controller side of the fake transport. The test pushes events, +/// the lifecycle loop consumes them. +struct ScriptCtl { + tx: mpsc::UnboundedSender, +} + +impl ScriptCtl { + fn push(&self, ev: Scripted) { + self.tx.send(ev).expect("script controller channel closed"); + } +} + +impl FakeRtspTransport { + fn new() -> (Self, ScriptCtl, Arc, Arc) { + let (tx, rx) = mpsc::unbounded_channel(); + let opens = Arc::new(AtomicU32::new(0)); + let packets_sent = Arc::new(AtomicU32::new(0)); + ( + Self { + rx: Arc::new(tokio::sync::Mutex::new(rx)), + opens: Arc::clone(&opens), + packets_sent: Arc::clone(&packets_sent), + }, + ScriptCtl { tx }, + opens, + packets_sent, + ) + } + + fn from_script(script: Vec) -> (Self, ScriptCtl, Arc, Arc) { + let (t, ctl, o, p) = Self::new(); + for ev in script { + ctl.push(ev); + } + (t, ctl, o, p) + } + + async fn next_event(&self) -> Scripted { + let mut rx = self.rx.lock().await; + match rx.recv().await { + Some(ev) => ev, + // Sender dropped → park forever; the lifecycle observes + // shutdown via select! and exits cleanly. + None => std::future::pending().await, + } + } +} + +#[async_trait] +impl RtspTransport for FakeRtspTransport { + async fn open(&mut self, _config: &RtspSessionConfig) -> Result<(), OpenError> { + self.opens.fetch_add(1, Ordering::Relaxed); + match self.next_event().await { + Scripted::OpenOk => Ok(()), + Scripted::OpenFail(kind) => Err(kind.into_err()), + Scripted::OpenHardFail => Err(OpenError::UnsupportedProfile { + details: "H265 main10 not supported".to_string(), + }), + other => Err(OpenError::Network(format!( + "fake transport: open called when script expected {other:?}" + ))), + } + } + + async fn close(&mut self) {} + + async fn next_packet(&mut self) -> Result { + match self.next_event().await { + Scripted::PacketOk => { + self.packets_sent.fetch_add(1, Ordering::Relaxed); + Ok(RtspPacket { + timestamp_rtp: 0, + payload: Bytes::from_static(b"nal-unit"), + }) + } + Scripted::StreamDropped => Err(StreamError::Dropped("scripted drop".to_string())), + // Out-of-band events while streaming surface as a drop + // so the FSM re-enters the reconnect ladder. + other => Err(StreamError::Dropped(format!( + "script expected non-packet: {other:?}" + ))), + } + } +} + +fn fast_backoff() -> BackoffPolicy { + BackoffPolicy::new(Duration::from_millis(10), Duration::from_millis(40)) +} + +/// AC-1 — happy path: a single `OpenOk` followed by a packet must +/// bring the FSM to `Streaming` and emit a frame on the broadcast. +#[tokio::test] +async fn ac1_open_succeeds_and_session_reaches_streaming() { + // Arrange + let (transport, _ctl, opens, packets) = + FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]); + let ingest = FrameIngest::with_backoff(8, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + + // Act + let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let first = timeout(Duration::from_secs(1), frames.recv()) + .await + .expect("frame within 1 s") + .expect("broadcast send succeeded"); + + // Assert — receiving the frame proves Closed → Connecting → + // Streaming was traversed; the FakeTransport parks after the + // packet so the FSM stays in Streaming. + assert!(!first.ai_locked, "ai_lock should default to false"); + assert_eq!(handle.session_state(), SessionState::Streaming); + assert_eq!(opens.load(Ordering::Relaxed), 1); + assert_eq!(packets.load(Ordering::Relaxed), 1); + + handle.shutdown(); + let _ = timeout(Duration::from_secs(1), task) + .await + .expect("lifecycle exits on shutdown"); +} + +/// AC-2 — bounded reconnect: an initial failure followed by a success +/// must increment `reopens_total` and converge to `Streaming`. The +/// backoff sleeps used (initial 10 ms, doubling) must be observed via +/// elapsed wall time. +#[tokio::test] +async fn ac2_bounded_reconnect_recovers_after_transient_failure() { + // Arrange + let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![ + Scripted::OpenFail(OpenErrKind::Network), + Scripted::OpenFail(OpenErrKind::Timeout), + Scripted::OpenOk, + Scripted::PacketOk, + ]); + let ingest = FrameIngest::with_backoff(8, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + let started = Instant::now(); + + // Act + let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let _ = timeout(Duration::from_secs(2), frames.recv()) + .await + .expect("frame within 2 s") + .expect("broadcast send succeeded"); + let elapsed = started.elapsed(); + + // Assert + assert!( + elapsed >= Duration::from_millis(30), + "must observe two backoff sleeps (10 ms + 20 ms = 30 ms), got {elapsed:?}" + ); + assert_eq!(handle.session_state(), SessionState::Streaming); + assert_eq!(opens.load(Ordering::Relaxed), 3); + + handle.shutdown(); + let _ = timeout(Duration::from_secs(1), task).await; +} + +/// AC-2.b — stream drop after streaming starts must re-enter +/// `Failing` and reopen. +#[tokio::test] +async fn ac2b_stream_drop_increments_reopens_total() { + // Arrange + let (transport, _ctl, opens, _packets) = FakeRtspTransport::from_script(vec![ + Scripted::OpenOk, + Scripted::PacketOk, + Scripted::StreamDropped, + Scripted::OpenOk, + Scripted::PacketOk, + ]); + let ingest = FrameIngest::with_backoff(8, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + + // Act + let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let _ = timeout(Duration::from_secs(1), frames.recv()) + .await + .expect("first frame") + .expect("first frame ok"); + let _ = timeout(Duration::from_secs(1), frames.recv()) + .await + .expect("second frame") + .expect("second frame ok"); + + // Assert + assert!( + handle.reopens_total() >= 1, + "stream drop must record at least one reopen, got {}", + handle.reopens_total() + ); + assert_eq!(opens.load(Ordering::Relaxed), 2); + assert_eq!(handle.session_state(), SessionState::Streaming); + + handle.shutdown(); + let _ = timeout(Duration::from_secs(1), task).await; +} + +/// AC-3 — SPS/PPS mismatch must hard-fail the session. The loop +/// exits and does NOT retry, leaving the FSM in `Failing` with no +/// further opens. +#[tokio::test] +async fn ac3_unsupported_profile_hard_fails_session() { + // Arrange + let (transport, _ctl, opens, _packets) = + FakeRtspTransport::from_script(vec![Scripted::OpenHardFail]); + let ingest = FrameIngest::with_backoff(8, fast_backoff()); + let handle = ingest.handle(); + + // Act + let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let _ = timeout(Duration::from_secs(1), task) + .await + .expect("lifecycle loop exits on hard-fail"); + + // Assert + assert!(matches!( + handle.session_state(), + SessionState::Failing { .. } + )); + assert_eq!(opens.load(Ordering::Relaxed), 1, "no automatic retry"); +} + +/// AC-4 — AI-lock toggle: every frame emitted AFTER `set_ai_lock(true)` +/// must carry `ai_locked = true`. The test controls packet emission +/// timing via `ScriptCtl` so the toggle is guaranteed to precede the +/// second packet. +#[tokio::test] +async fn ac4_ai_lock_toggle_propagates_to_frames() { + // Arrange + let (transport, ctl, _opens, _packets) = + FakeRtspTransport::from_script(vec![Scripted::OpenOk, Scripted::PacketOk]); + let ingest = FrameIngest::with_backoff(8, fast_backoff()); + let handle = ingest.handle(); + let mut frames = handle.subscribe(); + + // Act + let task = ingest.run(transport, RtspSessionConfig::new("rtsp://fake/0")); + let f1 = timeout(Duration::from_secs(1), frames.recv()) + .await + .expect("first frame") + .expect("first frame ok"); + handle.set_ai_lock(true); + ctl.push(Scripted::PacketOk); + let f2 = timeout(Duration::from_secs(1), frames.recv()) + .await + .expect("second frame") + .expect("second frame ok"); + + // Assert + assert!(!f1.ai_locked, "pre-toggle frame must be unlocked"); + assert!( + f2.ai_locked, + "post-toggle frame must carry ai_locked = true" + ); + assert!(handle.ai_locked()); + + handle.shutdown(); + let _ = timeout(Duration::from_secs(1), task).await; +} diff --git a/crates/scan_controller/src/internal/frame_rate_guard.rs b/crates/scan_controller/src/internal/frame_rate_guard.rs new file mode 100644 index 0000000..75cffac --- /dev/null +++ b/crates/scan_controller/src/internal/frame_rate_guard.rs @@ -0,0 +1,252 @@ +//! AZ-682 — frame-rate floor monitor. +//! +//! Per `description.md §5/§6/§8`: when the sustained FPS drops below +//! the configured floor (default 10 fps), the FSM suppresses +//! `ZoomedOut → ZoomedIn` transitions and surfaces yellow health. +//! +//! Implementation: ring-buffer of recent frame-arrival timestamps. +//! Computing average FPS over the window is cheap (O(1) per observe) +//! and avoids spurious flapping that a single-frame rate would +//! produce. A hysteresis margin (`floor_clear_fps`) gates the +//! transition back to "active" to prevent oscillation around the +//! threshold. + +use std::collections::VecDeque; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Minimum window size before the monitor produces a verdict. With +/// fewer than this many observations the guard returns `false` +/// (assume healthy) — this is the "warming up" period right after +/// boot. +const WARMUP_SAMPLES: usize = 4; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct FrameRateGuardConfig { + pub window: Duration, + pub fps_floor: f32, + pub fps_clear: f32, +} + +impl Default for FrameRateGuardConfig { + fn default() -> Self { + Self { + // 1 s window matches the description's "sustained" intent + // — single-frame jitter cannot trip it but a 1-second + // dip will. + window: Duration::from_secs(1), + fps_floor: 10.0, + // Require fps to recover above 12 before clearing, to + // dampen oscillation right around the floor. + fps_clear: 12.0, + } + } +} + +/// Monotonic ring buffer of frame arrival times. `observe` is +/// O(amortised 1); `is_floor_active` is O(1). The buffer is bounded +/// by the time window, not by sample count, so it self-trims as +/// frames arrive. +#[derive(Debug)] +pub struct FrameRateGuard { + config: FrameRateGuardConfig, + arrivals_ns: VecDeque, + active: bool, +} + +impl FrameRateGuard { + pub fn new(config: FrameRateGuardConfig) -> Self { + Self { + config, + arrivals_ns: VecDeque::new(), + active: false, + } + } + + pub fn observe(&mut self, now_ns: u64) { + self.arrivals_ns.push_back(now_ns); + let window_ns = self.config.window.as_nanos() as u64; + // Trim arrivals outside the rolling window. + while let Some(&front) = self.arrivals_ns.front() { + if now_ns.saturating_sub(front) > window_ns { + self.arrivals_ns.pop_front(); + } else { + break; + } + } + self.recompute(); + } + + /// Treat a long silence (no `observe` calls) as zero fps. Callers + /// invoke this on the scan-controller tick so the guard does not + /// stay stuck "green" after a frame pipeline stall. + pub fn tick(&mut self, now_ns: u64) { + let window_ns = self.config.window.as_nanos() as u64; + while let Some(&front) = self.arrivals_ns.front() { + if now_ns.saturating_sub(front) > window_ns { + self.arrivals_ns.pop_front(); + } else { + break; + } + } + self.recompute(); + } + + pub fn is_floor_active(&self) -> bool { + self.active + } + + pub fn fps(&self) -> f32 { + let n = self.arrivals_ns.len(); + if n < 2 { + return 0.0; + } + // Avoid div-by-zero on burst arrivals at the same ns. + let first = self.arrivals_ns.front().copied().unwrap_or(0); + let last = self.arrivals_ns.back().copied().unwrap_or(0); + let span_ns = last.saturating_sub(first); + if span_ns == 0 { + return 0.0; + } + let span_s = (span_ns as f64) / 1e9; + ((n - 1) as f64 / span_s) as f32 + } + + fn recompute(&mut self) { + let n = self.arrivals_ns.len(); + if n < WARMUP_SAMPLES { + self.active = false; + return; + } + let fps = self.fps(); + // Hysteresis: floor activates strictly below `fps_floor` and + // clears strictly above `fps_clear`. Within the band the + // existing state holds. + if self.active { + if fps >= self.config.fps_clear { + self.active = false; + } + } else if fps < self.config.fps_floor { + self.active = true; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn cfg() -> FrameRateGuardConfig { + FrameRateGuardConfig::default() + } + + fn observe_at_fps(g: &mut FrameRateGuard, fps: f32, count: u32, start_ns: u64) -> u64 { + let step_ns = (1e9 / fps as f64) as u64; + let mut t = start_ns; + for _ in 0..count { + g.observe(t); + t += step_ns; + } + t + } + + #[test] + fn warming_up_window_returns_inactive() { + // Arrange + let mut g = FrameRateGuard::new(cfg()); + + // Act + g.observe(0); + g.observe(100_000_000); + + // Assert + assert!(!g.is_floor_active(), "must be inactive during warmup"); + } + + #[test] + fn healthy_fps_keeps_floor_clear() { + // Arrange + let mut g = FrameRateGuard::new(cfg()); + + // Act — observe 30 frames at 30 fps over 1 s + observe_at_fps(&mut g, 30.0, 30, 0); + + // Assert + assert!(!g.is_floor_active()); + assert!(g.fps() > 25.0, "expected ~30 fps, got {}", g.fps()); + } + + #[test] + fn sustained_low_fps_activates_floor() { + // Arrange + let mut g = FrameRateGuard::new(cfg()); + + // Act — 5 frames at 5 fps over 1 s + observe_at_fps(&mut g, 5.0, 5, 0); + + // Assert + assert!(g.is_floor_active()); + assert!(g.fps() < 10.0); + } + + #[test] + fn fps_recovery_clears_floor_with_hysteresis() { + // Arrange — drop below floor first + let mut g = FrameRateGuard::new(cfg()); + let end = observe_at_fps(&mut g, 5.0, 5, 0); + assert!(g.is_floor_active()); + + // Act — observe healthy 30 fps after recovery + observe_at_fps(&mut g, 30.0, 60, end + 200_000_000); + + // Assert + assert!( + !g.is_floor_active(), + "floor must clear once fps > clear threshold; current fps = {}", + g.fps() + ); + } + + #[test] + fn hysteresis_band_does_not_oscillate() { + // Arrange — in the band [10, 12) fps. Start active, stay + // active; start inactive, stay inactive. + let mut active_guard = FrameRateGuard::new(cfg()); + observe_at_fps(&mut active_guard, 5.0, 5, 0); + assert!(active_guard.is_floor_active()); + + // Act — provide 11 fps (within hysteresis band) + let span_ns = 1_000_000_000u64; + let step_ns = span_ns / 11; + let mut t = 1_500_000_000u64; + for _ in 0..11 { + active_guard.observe(t); + t += step_ns; + } + + // Assert — still active because we have not crossed fps_clear + assert!( + active_guard.is_floor_active(), + "11 fps inside hysteresis band must NOT clear an active floor; fps = {}", + active_guard.fps() + ); + } + + #[test] + fn tick_without_observe_re_evaluates_silence() { + // Arrange — healthy fps then long silence + let mut g = FrameRateGuard::new(cfg()); + observe_at_fps(&mut g, 30.0, 30, 0); + assert!(!g.is_floor_active()); + + // Act — 5 s of silence — tick must re-evaluate. + g.tick(6_000_000_000); + + // Assert — buffer is empty so fps falls below floor; the + // guard treats this as active (we have &'static str { + match self { + ScanState::ZoomedOut => "zoomed_out", + ScanState::ZoomedIn { .. } => "zoomed_in", + ScanState::TargetFollow { .. } => "target_follow", + } + } +} + +/// Triggers consumed by `transition`. The catalogue covers every +/// transition required by `system-flows.md §F4` AND +/// `description.md §4–§5`. Adding a new trigger requires a code-review +/// finding flagging the spec section it serves. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Trigger { + /// Operator (or auto-selected high-confidence) POI promoted; the + /// scan_controller should zoom in to the named ROI. + PoiSelected { roi: Uuid, now_ns: u64 }, + /// Tier-2 / VLM evidence ladder rejected the ROI; abandon the + /// hold and return to ZoomedOut. + RoiRejected, + /// Hold window expired without confirmation; back to ZoomedOut. + RoiHoldTimeout, + /// Operator (or evidence ladder) confirmed the target inside the + /// current ROI; transition to TargetFollow. + TargetConfirmed { target_id: Uuid, now_ns: u64 }, + /// Target tracker lost the box; return to ZoomedOut for re-scan. + /// Grace-period debouncing happens at the centre-on-target + /// primitive layer (AZ-656) BEFORE this trigger fires. + TargetLost, + /// Operator-issued release of the follow lock. + OperatorReleaseFollow, + /// Operator-issued abort: any active state → ZoomedOut. + OperatorAbort, +} + +/// Outcome of a single state-machine evaluation. `next` is the +/// post-trigger state; `accepted` is `false` if the FSM rejected the +/// trigger (e.g. fps-floor suppressed a ZoomedOut → ZoomedIn). When +/// rejected, `next == previous` and `reject_reason` carries the +/// classification for metrics / audit. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TransitionOutcome { + pub previous: ScanState, + pub next: ScanState, + pub accepted: bool, + pub reject_reason: Option, +} + +impl TransitionOutcome { + pub fn changed(&self) -> bool { + self.accepted && self.previous != self.next + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum RejectReason { + /// FPS floor active per `description.md §5 / §6` — zoom-in + /// transitions are suppressed while sustained FPS < 10 fps. + FpsFloor, + /// The (state, trigger) pair is not part of the documented + /// catalogue. Surfaced as a Critical health finding so it never + /// silently no-ops in production. + UnsupportedTransition, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn discriminant_is_stable_per_variant() { + // Arrange + let z_in = ScanState::ZoomedIn { + roi: Uuid::nil(), + hold_started_at_ns: 0, + }; + + // Assert + assert_eq!(ScanState::ZoomedOut.discriminant(), "zoomed_out"); + assert_eq!(z_in.discriminant(), "zoomed_in"); + assert_eq!( + ScanState::TargetFollow { + target_id: Uuid::nil(), + started_at_ns: 0 + } + .discriminant(), + "target_follow" + ); + } +} diff --git a/crates/scan_controller/src/internal/state_machine/transitions.rs b/crates/scan_controller/src/internal/state_machine/transitions.rs new file mode 100644 index 0000000..47511b3 --- /dev/null +++ b/crates/scan_controller/src/internal/state_machine/transitions.rs @@ -0,0 +1,276 @@ +//! Pure transition function. See `mod.rs` for the trigger catalogue. +//! +//! The transition function is intentionally pure: input is `(state, +//! trigger, ctx)`, output is a `TransitionOutcome`. Async I/O, +//! POI queue mutation, gimbal commands, and mapobjects dispatch all +//! sit OUTSIDE this layer (AZ-683 / AZ-685 / AZ-686). Keeping the +//! transition table pure means every documented transition is +//! exhaustively unit-testable without spinning up actors. + +use super::{RejectReason, ScanState, TransitionOutcome, Trigger}; + +/// External context the FSM consults during evaluation. Today only +/// the FPS floor; AZ-683 will add operator-decision-window / POI +/// queue flags. +#[derive(Debug, Clone, Copy, Default)] +pub struct TransitionCtx { + pub fps_floor_active: bool, +} + +pub fn transition(state: ScanState, trigger: Trigger, ctx: TransitionCtx) -> TransitionOutcome { + let previous = state; + + // OperatorAbort is the highest-priority safety transition — per + // `description.md §6` it preempts any active state. Evaluated + // first so a (TargetFollow, OperatorAbort) cannot accidentally + // fall into the per-state catalogue. + if matches!(trigger, Trigger::OperatorAbort) { + return TransitionOutcome { + previous, + next: ScanState::ZoomedOut, + accepted: true, + reject_reason: None, + }; + } + + match (state, trigger) { + (ScanState::ZoomedOut, Trigger::PoiSelected { roi, now_ns }) => { + if ctx.fps_floor_active { + TransitionOutcome { + previous, + next: previous, + accepted: false, + reject_reason: Some(RejectReason::FpsFloor), + } + } else { + TransitionOutcome { + previous, + next: ScanState::ZoomedIn { + roi, + hold_started_at_ns: now_ns, + }, + accepted: true, + reject_reason: None, + } + } + } + (ScanState::ZoomedIn { .. }, Trigger::RoiRejected | Trigger::RoiHoldTimeout) => { + TransitionOutcome { + previous, + next: ScanState::ZoomedOut, + accepted: true, + reject_reason: None, + } + } + (ScanState::ZoomedIn { .. }, Trigger::TargetConfirmed { target_id, now_ns }) => { + TransitionOutcome { + previous, + next: ScanState::TargetFollow { + target_id, + started_at_ns: now_ns, + }, + accepted: true, + reject_reason: None, + } + } + (ScanState::TargetFollow { .. }, Trigger::TargetLost | Trigger::OperatorReleaseFollow) => { + TransitionOutcome { + previous, + next: ScanState::ZoomedOut, + accepted: true, + reject_reason: None, + } + } + // Every other (state, trigger) combination is not part of + // the documented catalogue. Returning `UnsupportedTransition` + // (instead of panicking or silently no-oping) means a future + // refactor that introduces a new trigger will fail loudly in + // both tests and production health. + _ => TransitionOutcome { + previous, + next: previous, + accepted: false, + reject_reason: Some(RejectReason::UnsupportedTransition), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + fn ctx_ok() -> TransitionCtx { + TransitionCtx::default() + } + + fn ctx_fps() -> TransitionCtx { + TransitionCtx { + fps_floor_active: true, + } + } + + #[test] + fn zoomed_out_to_zoomed_in_on_poi_selected() { + // Arrange + let roi = Uuid::new_v4(); + + // Act + let out = transition( + ScanState::ZoomedOut, + Trigger::PoiSelected { roi, now_ns: 100 }, + ctx_ok(), + ); + + // Assert + assert!(out.accepted); + assert_eq!( + out.next, + ScanState::ZoomedIn { + roi, + hold_started_at_ns: 100 + } + ); + } + + #[test] + fn fps_floor_suppresses_zoom_in() { + // Arrange + let roi = Uuid::new_v4(); + + // Act + let out = transition( + ScanState::ZoomedOut, + Trigger::PoiSelected { roi, now_ns: 0 }, + ctx_fps(), + ); + + // Assert + assert!(!out.accepted); + assert_eq!(out.reject_reason, Some(RejectReason::FpsFloor)); + assert_eq!(out.next, ScanState::ZoomedOut); + } + + #[test] + fn zoomed_in_returns_to_zoomed_out_on_rejection() { + // Arrange + let s = ScanState::ZoomedIn { + roi: Uuid::new_v4(), + hold_started_at_ns: 0, + }; + + // Assert + for trig in [Trigger::RoiRejected, Trigger::RoiHoldTimeout] { + let out = transition(s, trig, ctx_ok()); + assert!(out.accepted, "trigger {trig:?} must be accepted"); + assert_eq!(out.next, ScanState::ZoomedOut); + } + } + + #[test] + fn zoomed_in_to_target_follow_on_confirmation() { + // Arrange + let target = Uuid::new_v4(); + let s = ScanState::ZoomedIn { + roi: Uuid::new_v4(), + hold_started_at_ns: 0, + }; + + // Act + let out = transition( + s, + Trigger::TargetConfirmed { + target_id: target, + now_ns: 500, + }, + ctx_ok(), + ); + + // Assert + assert!(out.accepted); + assert_eq!( + out.next, + ScanState::TargetFollow { + target_id: target, + started_at_ns: 500, + } + ); + } + + #[test] + fn target_follow_back_to_zoomed_out_on_loss_or_release() { + // Arrange + let s = ScanState::TargetFollow { + target_id: Uuid::new_v4(), + started_at_ns: 0, + }; + + // Assert + for trig in [Trigger::TargetLost, Trigger::OperatorReleaseFollow] { + let out = transition(s, trig, ctx_ok()); + assert!(out.accepted); + assert_eq!(out.next, ScanState::ZoomedOut); + } + } + + #[test] + fn operator_abort_resets_from_any_state() { + // Arrange + let states = [ + ScanState::ZoomedOut, + ScanState::ZoomedIn { + roi: Uuid::new_v4(), + hold_started_at_ns: 0, + }, + ScanState::TargetFollow { + target_id: Uuid::new_v4(), + started_at_ns: 0, + }, + ]; + + // Assert + for s in states { + let out = transition(s, Trigger::OperatorAbort, ctx_ok()); + assert!(out.accepted); + assert_eq!(out.next, ScanState::ZoomedOut); + } + } + + #[test] + fn unsupported_combinations_are_rejected_loudly() { + // Arrange — TargetConfirmed in ZoomedOut is undocumented. + let s = ScanState::ZoomedOut; + + // Act + let out = transition( + s, + Trigger::TargetConfirmed { + target_id: Uuid::new_v4(), + now_ns: 0, + }, + ctx_ok(), + ); + + // Assert + assert!(!out.accepted); + assert_eq!(out.reject_reason, Some(RejectReason::UnsupportedTransition)); + assert_eq!(out.next, s); + } + + #[test] + fn fps_floor_does_not_suppress_within_state_resets() { + // Arrange — RoiRejected must take effect even while FPS is + // floored; the floor only blocks ZOOM-IN transitions. + let s = ScanState::ZoomedIn { + roi: Uuid::new_v4(), + hold_started_at_ns: 0, + }; + + // Act + let out = transition(s, Trigger::RoiRejected, ctx_fps()); + + // Assert + assert!(out.accepted); + assert_eq!(out.next, ScanState::ZoomedOut); + } +} diff --git a/crates/scan_controller/src/lib.rs b/crates/scan_controller/src/lib.rs index 3663fb2..f889075 100644 --- a/crates/scan_controller/src/lib.rs +++ b/crates/scan_controller/src/lib.rs @@ -1,42 +1,122 @@ //! `scan_controller` — central typed state machine. //! -//! States per architecture.md §5: `ZoomedOut | ZoomedIn { roi, hold_started_at } -//! | TargetFollow { target_id, started_at }`. Full behaviour-tree spec lives in -//! `system-flows.md §F4`. +//! States per `architecture.md §5`: `ZoomedOut | ZoomedIn { roi, +//! hold_started_at } | TargetFollow { target_id, started_at }`. The +//! full behaviour-tree spec lives in `system-flows.md §F4`. //! -//! Real implementation lands in: -//! - AZ-682 `scan_controller_state_machine` -//! - AZ-683 `scan_controller_poi_queue_and_window` -//! - AZ-684 `scan_controller_evidence_ladder` -//! - AZ-685 `scan_controller_mapobjects_dispatch` -//! - AZ-686 `scan_controller_gimbal_issuance` +//! ## AZ-682 scope (this file) +//! +//! - Typed `ScanState` + complete transition catalogue. +//! - Frame-rate floor monitor that suppresses zoom-in transitions +//! while sustained FPS < 10. +//! - Tick-latency observability (p99 tracker over a rolling window) +//! per `description.md §8` (≤10 ms p99 target). +//! - Health surface reflecting state + fps_floor + tick latency. +//! +//! ## Out of scope (later tasks) +//! +//! - AZ-683: POI queue + ≤5/min cap + operator-decision window. +//! - AZ-684: Tier-2 / VLM evidence ladder. +//! - AZ-685: mapobjects_store new / moved / existing / removed +//! dispatch. +//! - AZ-686: actual gimbal command issuance on state transitions. +//! +//! Operator command translation lives behind +//! [`ScanControllerHandle::submit_operator_cmd`] — for AZ-682 the +//! translation only routes `ConfirmPoi` to the FSM as +//! `Trigger::OperatorAbort` / `OperatorReleaseFollow` / etc. as +//! documented per kind. The richer wiring (queue interactions, POI +//! selection lookups) lands with AZ-683. + +use std::sync::Arc; +use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; -use uuid::Uuid; +use tokio::sync::Mutex; use shared::error::{AutopilotError, Result}; -use shared::health::ComponentHealth; -use shared::models::operator::OperatorCommand; +use shared::health::{ComponentHealth, HealthLevel}; +use shared::models::operator::{OperatorCommand, OperatorCommandKind}; + +pub mod internal; + +pub use internal::frame_rate_guard::{FrameRateGuard, FrameRateGuardConfig}; +pub use internal::state_machine::transitions::{transition, TransitionCtx}; +pub use internal::state_machine::{RejectReason, ScanState, TransitionOutcome, Trigger}; const NAME: &str = "scan_controller"; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "state", rename_all = "snake_case")] -pub enum ScanState { - ZoomedOut, - ZoomedIn { roi: Uuid, hold_started_at_ns: u64 }, - TargetFollow { target_id: Uuid, started_at_ns: u64 }, +/// Tick-latency budget per `description.md §8`. Health flips yellow +/// when p99 exceeds this. +const TICK_BUDGET_P99: Duration = Duration::from_millis(10); + +/// Size of the rolling tick-latency window. 100 samples at the 10 Hz +/// tick rate covers the last ~10 seconds; long enough to smooth +/// jitter, short enough to react to a regression. +const LATENCY_WINDOW: usize = 100; + +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct ScanControllerConfig { + pub frame_rate: FrameRateGuardConfig, } -pub struct ScanController; +pub struct ScanController { + inner: Arc>, + clock: shared::clock::MonoClock, +} + +struct Inner { + state: ScanState, + last_state_change_ns: u64, + fps_guard: FrameRateGuard, + latencies_us: std::collections::VecDeque, + rejected_total: u64, + transitions_total: u64, +} + +impl Inner { + fn record_latency(&mut self, us: u64) { + if self.latencies_us.len() == LATENCY_WINDOW { + self.latencies_us.pop_front(); + } + self.latencies_us.push_back(us); + } + + fn p99_us(&self) -> u64 { + if self.latencies_us.is_empty() { + return 0; + } + let mut v: Vec = self.latencies_us.iter().copied().collect(); + v.sort_unstable(); + let idx = ((v.len() as f64) * 0.99).ceil() as usize - 1; + v[idx.min(v.len() - 1)] + } +} impl ScanController { pub fn new() -> Self { - Self + Self::with_config(ScanControllerConfig::default()) + } + + pub fn with_config(config: ScanControllerConfig) -> Self { + Self { + inner: Arc::new(Mutex::new(Inner { + state: ScanState::ZoomedOut, + last_state_change_ns: 0, + fps_guard: FrameRateGuard::new(config.frame_rate), + latencies_us: std::collections::VecDeque::with_capacity(LATENCY_WINDOW), + rejected_total: 0, + transitions_total: 0, + })), + clock: shared::clock::MonoClock::new(), + } } pub fn handle(&self) -> ScanControllerHandle { - ScanControllerHandle + ScanControllerHandle { + inner: Arc::clone(&self.inner), + clock: self.clock, + } } } @@ -46,39 +126,243 @@ impl Default for ScanController { } } -#[derive(Clone, Copy)] -pub struct ScanControllerHandle; +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ScanMetrics { + pub transitions_total: u64, + pub rejected_total: u64, + pub last_state_change_ns: u64, + pub tick_latency_p99_us: u64, +} + +#[derive(Clone)] +pub struct ScanControllerHandle { + inner: Arc>, + clock: shared::clock::MonoClock, +} impl ScanControllerHandle { + /// Observe a frame arrival. Feeds the FPS guard. Called by the + /// composition root when the `frame_ingest` broadcast publishes + /// a frame. + pub async fn observe_frame(&self) { + let now = self.clock.elapsed_ns(); + self.observe_frame_at(now).await; + } + + /// Same as `observe_frame` but accepts an explicit monotonic + /// timestamp. Used by tests to drive the FPS guard + /// deterministically; production callers should prefer + /// `observe_frame`. + pub async fn observe_frame_at(&self, ns: u64) { + let mut inner = self.inner.lock().await; + inner.fps_guard.observe(ns); + } + + /// Feed a single trigger to the FSM and return the outcome. This + /// is the workhorse API used by AZ-683 (POI queue) / AZ-684 + /// (evidence ladder) / etc. to advance the state machine. + pub async fn submit_trigger(&self, trigger: Trigger) -> TransitionOutcome { + let started = Instant::now(); + let mut inner = self.inner.lock().await; + let ctx = TransitionCtx { + fps_floor_active: inner.fps_guard.is_floor_active(), + }; + let outcome = transition(inner.state, trigger, ctx); + if outcome.accepted { + inner.transitions_total += 1; + if outcome.changed() { + inner.state = outcome.next; + inner.last_state_change_ns = self.clock.elapsed_ns(); + } + } else { + inner.rejected_total += 1; + } + inner.record_latency(started.elapsed().as_micros() as u64); + outcome + } + + /// One scan-controller tick. AZ-682 only re-evaluates the FPS + /// guard and records latency; AZ-684+ will run the evidence + /// ladder + POI queue evaluation under this same tick. pub async fn tick(&self) -> Result<()> { - Err(AutopilotError::NotImplemented( - "scan_controller::tick (AZ-682)", - )) + let started = Instant::now(); + let now = self.clock.elapsed_ns(); + let mut inner = self.inner.lock().await; + inner.fps_guard.tick(now); + inner.record_latency(started.elapsed().as_micros() as u64); + Ok(()) } - pub async fn submit_operator_cmd(&self, _command: OperatorCommand) -> Result<()> { - Err(AutopilotError::NotImplemented( - "scan_controller::submit_operator_cmd (AZ-682)", - )) + /// Translate an operator command into a trigger and apply it. + /// + /// **AZ-682 mapping** (partial — POI queue lookups belong to + /// AZ-683; until then, ConfirmPoi alone has no roi to bind so + /// it returns `Validation`). + /// + /// - `MissionAbort` → `Trigger::OperatorAbort` + /// - `ReleaseTargetFollow` → `Trigger::OperatorReleaseFollow` + /// - `StartTargetFollow` (payload-bound) → not yet supported, + /// returns `NotImplemented(AZ-683)` since the target_id has to + /// be resolved via the POI queue. + /// - `ConfirmPoi` / `DeclinePoi` / `AcknowledgeBitDegraded` / + /// `SafetyOverride` → `NotImplemented(AZ-683/AZ-684)`. + pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result<()> { + match command.kind { + OperatorCommandKind::MissionAbort => { + self.submit_trigger(Trigger::OperatorAbort).await; + Ok(()) + } + OperatorCommandKind::ReleaseTargetFollow => { + self.submit_trigger(Trigger::OperatorReleaseFollow).await; + Ok(()) + } + OperatorCommandKind::ConfirmPoi + | OperatorCommandKind::DeclinePoi + | OperatorCommandKind::StartTargetFollow => Err(AutopilotError::NotImplemented( + "scan_controller::submit_operator_cmd (AZ-683 POI queue wiring)", + )), + OperatorCommandKind::AcknowledgeBitDegraded => Err(AutopilotError::NotImplemented( + "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", + )), + OperatorCommandKind::SafetyOverride => Err(AutopilotError::NotImplemented( + "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", + )), + } } - pub fn state(&self) -> ScanState { - ScanState::ZoomedOut + pub async fn state(&self) -> ScanState { + self.inner.lock().await.state } - pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + /// Synchronous snapshot of the current state. Useful for health + /// readers that cannot await. Acquires a `try_lock` — falls back + /// to `ZoomedOut` if the lock is contended (rare; tick + trigger + /// only hold the lock for microseconds). + pub fn state_blocking_snapshot(&self) -> ScanState { + self.inner + .try_lock() + .map(|g| g.state) + .unwrap_or(ScanState::ZoomedOut) + } + + pub async fn fps_floor_active(&self) -> bool { + self.inner.lock().await.fps_guard.is_floor_active() + } + + pub async fn tick_latency_p99_us(&self) -> u64 { + self.inner.lock().await.p99_us() + } + + /// Snapshot of accumulated counters used by metrics exporters and + /// audit log. Field semantics per `description.md §3`: + /// + /// - `transitions_total` — accepted (state, trigger) → next pairs. + /// - `rejected_total` — rejected by FPS-floor OR + /// `UnsupportedTransition`. + /// - `last_state_change_ns` — monotonic ns of the last accepted + /// transition that changed `state` (0 if no change yet). + pub async fn metrics(&self) -> ScanMetrics { + let inner = self.inner.lock().await; + ScanMetrics { + transitions_total: inner.transitions_total, + rejected_total: inner.rejected_total, + last_state_change_ns: inner.last_state_change_ns, + tick_latency_p99_us: inner.p99_us(), + } + } + + pub async fn health(&self) -> ComponentHealth { + let inner = self.inner.lock().await; + let fps_active = inner.fps_guard.is_floor_active(); + let p99 = inner.p99_us(); + let state = inner.state; + drop(inner); + + let mut h = ComponentHealth::green(NAME); + let mut details: Vec = vec![format!("state={}", state.discriminant())]; + + if fps_active { + h.level = HealthLevel::Yellow; + details.push("fps_floor_active".to_string()); + } + if p99 > TICK_BUDGET_P99.as_micros() as u64 { + if h.level == HealthLevel::Green { + h.level = HealthLevel::Yellow; + } + details.push(format!("tick_p99_us={p99}")); + } + h.detail = Some(details.join(" ")); + h } } #[cfg(test)] mod tests { use super::*; + use uuid::Uuid; - #[test] - fn it_compiles() { + #[tokio::test] + async fn boot_state_is_zoomed_out_with_disabled_health() { + // Arrange let h = ScanController::new().handle(); - assert!(matches!(h.state(), ScanState::ZoomedOut)); - assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + + // Assert + assert_eq!(h.state().await, ScanState::ZoomedOut); + let health = h.health().await; + assert_eq!(health.level, HealthLevel::Green); + assert!(health + .detail + .as_deref() + .unwrap_or("") + .contains("state=zoomed_out")); + } + + #[tokio::test] + async fn happy_path_zoomed_out_to_zoomed_in_to_follow() { + // Arrange + let h = ScanController::new().handle(); + let roi = Uuid::new_v4(); + let target = Uuid::new_v4(); + + // Act + let o1 = h + .submit_trigger(Trigger::PoiSelected { roi, now_ns: 100 }) + .await; + let o2 = h + .submit_trigger(Trigger::TargetConfirmed { + target_id: target, + now_ns: 200, + }) + .await; + + // Assert + assert!(o1.accepted && o2.accepted); + assert!(matches!( + h.state().await, + ScanState::TargetFollow { target_id, .. } if target_id == target + )); + } + + #[tokio::test] + async fn fps_floor_blocks_zoom_in() { + // Arrange — 5 fps: each observe 200 ms apart, 6 samples ≥ warmup. + let h = ScanController::new().handle(); + for i in 0..6u64 { + h.observe_frame_at(i * 200_000_000).await; + } + assert!(h.fps_floor_active().await); + + // Act + let outcome = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 0, + }) + .await; + + // Assert + assert!(!outcome.accepted); + assert_eq!(outcome.reject_reason, Some(RejectReason::FpsFloor)); + assert_eq!(h.state().await, ScanState::ZoomedOut); } } diff --git a/crates/scan_controller/tests/state_machine.rs b/crates/scan_controller/tests/state_machine.rs new file mode 100644 index 0000000..7627f68 --- /dev/null +++ b/crates/scan_controller/tests/state_machine.rs @@ -0,0 +1,200 @@ +//! AZ-682 integration tests — exercise the typed state machine and +//! the frame-rate floor monitor end-to-end through the public +//! `ScanControllerHandle` surface. + +use uuid::Uuid; + +use scan_controller::{RejectReason, ScanController, ScanState, TransitionOutcome, Trigger}; + +#[tokio::test] +async fn ac1_boot_state_is_zoomed_out() { + // Arrange + let h = ScanController::new().handle(); + + // Assert + assert_eq!(h.state().await, ScanState::ZoomedOut); +} + +/// AC-2 — transition catalogue is complete; every (from_state, +/// trigger) → to_state from the spec is covered. Spec-disallowed +/// combinations are rejected with a recorded reason. +#[tokio::test] +async fn ac2_full_transition_catalogue_round_trip() { + // Arrange + let h = ScanController::new().handle(); + let roi = Uuid::new_v4(); + let target = Uuid::new_v4(); + + // Act + Assert — ZoomedOut → ZoomedIn + let o = h + .submit_trigger(Trigger::PoiSelected { roi, now_ns: 100 }) + .await; + assert!(o.accepted, "PoiSelected must transition"); + assert!(matches!( + h.state().await, + ScanState::ZoomedIn { roi: r, hold_started_at_ns: 100 } if r == roi + )); + + // ZoomedIn → TargetFollow + let o = h + .submit_trigger(Trigger::TargetConfirmed { + target_id: target, + now_ns: 200, + }) + .await; + assert!(o.accepted); + assert!(matches!( + h.state().await, + ScanState::TargetFollow { target_id: t, started_at_ns: 200 } if t == target + )); + + // TargetFollow → ZoomedOut via TargetLost + let o = h.submit_trigger(Trigger::TargetLost).await; + assert!(o.accepted); + assert_eq!(h.state().await, ScanState::ZoomedOut); + + // ZoomedOut → ZoomedIn again → ZoomedOut via RoiRejected + let _ = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 300, + }) + .await; + let o = h.submit_trigger(Trigger::RoiRejected).await; + assert!(o.accepted); + assert_eq!(h.state().await, ScanState::ZoomedOut); + + // ZoomedOut → ZoomedIn → ZoomedOut via RoiHoldTimeout + let _ = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 400, + }) + .await; + let o = h.submit_trigger(Trigger::RoiHoldTimeout).await; + assert!(o.accepted); + assert_eq!(h.state().await, ScanState::ZoomedOut); + + // TargetFollow → ZoomedOut via OperatorReleaseFollow + let _ = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 500, + }) + .await; + let _ = h + .submit_trigger(Trigger::TargetConfirmed { + target_id: Uuid::new_v4(), + now_ns: 600, + }) + .await; + let o = h.submit_trigger(Trigger::OperatorReleaseFollow).await; + assert!(o.accepted); + assert_eq!(h.state().await, ScanState::ZoomedOut); + + // OperatorAbort from TargetFollow + let _ = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 700, + }) + .await; + let _ = h + .submit_trigger(Trigger::TargetConfirmed { + target_id: Uuid::new_v4(), + now_ns: 800, + }) + .await; + let o = h.submit_trigger(Trigger::OperatorAbort).await; + assert!(o.accepted); + assert_eq!(h.state().await, ScanState::ZoomedOut); +} + +/// AC-3 — spec-disallowed transitions are rejected with the +/// `UnsupportedTransition` reason (not silently no-ops). +#[tokio::test] +async fn ac3_unsupported_transitions_are_rejected() { + // Arrange + let h = ScanController::new().handle(); + + // Act — TargetConfirmed makes no sense while ZoomedOut. + let o = h + .submit_trigger(Trigger::TargetConfirmed { + target_id: Uuid::new_v4(), + now_ns: 0, + }) + .await; + + // Assert + assert!(!o.accepted); + assert_eq!(o.reject_reason, Some(RejectReason::UnsupportedTransition)); + assert_eq!(o.next, ScanState::ZoomedOut); +} + +/// AC-4 — frame-rate floor suppresses zoom-in; once cleared, zoom-in +/// transitions resume. +#[tokio::test] +async fn ac4_frame_rate_floor_suppresses_then_clears() { + // Arrange — feed 5 fps until the guard activates. + let h = ScanController::new().handle(); + for i in 0..6u64 { + h.observe_frame_at(i * 200_000_000).await; + } + assert!(h.fps_floor_active().await); + + // Act — PoiSelected must be suppressed. + let o: TransitionOutcome = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 1_500_000_000, + }) + .await; + assert!(!o.accepted); + assert_eq!(o.reject_reason, Some(RejectReason::FpsFloor)); + + // Recovery — feed 30 fps for 2 seconds; floor must clear. + let start = 2_000_000_000u64; + let step = (1e9_f64 / 30.0) as u64; + for i in 0..60u64 { + h.observe_frame_at(start + i * step).await; + } + assert!(!h.fps_floor_active().await); + + // The same PoiSelected MUST now succeed. + let o = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: 3_000_000_000, + }) + .await; + assert!(o.accepted); + assert!(matches!(h.state().await, ScanState::ZoomedIn { .. })); +} + +/// AC-5 — tick latency stays well under the 10 ms p99 budget under +/// a steady-state trigger load. This bench is a smoke test, not a +/// rigorous benchmark — it exists to catch a regression that +/// silently blows past the budget. +#[tokio::test] +async fn ac5_tick_latency_p99_under_budget() { + // Arrange + let h = ScanController::new().handle(); + + // Act — run 200 triggers through the FSM. + for i in 0..200u64 { + let _ = h + .submit_trigger(Trigger::PoiSelected { + roi: Uuid::new_v4(), + now_ns: i * 1_000_000, + }) + .await; + let _ = h.submit_trigger(Trigger::OperatorAbort).await; + } + + // Assert + let p99 = h.tick_latency_p99_us().await; + assert!( + p99 < 10_000, + "tick latency p99 {p99} us exceeds 10 ms budget" + ); +}