diff --git a/Cargo.lock b/Cargo.lock index 9127efc..ad95f7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1864,12 +1864,14 @@ checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" name = "scan_controller" version = "0.1.0" dependencies = [ + "chrono", "gimbal_controller", "mapobjects_store", "mission_executor", "operator_bridge", "semantic_analyzer", "serde", + "serde_json", "shared", "tokio", "tracing", diff --git a/_docs/02_tasks/todo/AZ-683_scan_controller_poi_queue_and_window.md b/_docs/02_tasks/done/AZ-683_scan_controller_poi_queue_and_window.md similarity index 100% rename from _docs/02_tasks/todo/AZ-683_scan_controller_poi_queue_and_window.md rename to _docs/02_tasks/done/AZ-683_scan_controller_poi_queue_and_window.md diff --git a/_docs/03_implementation/batch_13_cycle1_report.md b/_docs/03_implementation/batch_13_cycle1_report.md new file mode 100644 index 0000000..5a5b6bb --- /dev/null +++ b/_docs/03_implementation/batch_13_cycle1_report.md @@ -0,0 +1,194 @@ +# Batch 13 / Cycle 1 — Implementation Report + +**Date**: 2026-05-20 +**Tasks**: AZ-683 +**Verdict**: PASS_WITH_WARNINGS (pre-existing autopilot lint from batch 4 +still open — see Findings §A1; unchanged by this batch) + +## 1. Scope + +| Ticket | Title | Crate | Complexity | +|---|---|---|---| +| AZ-683 | scan_controller POI queue + ≤5/min cap + decision-window mapping | `scan_controller` | 5 | + +Batch 13 ships AZ-683 as a stand-alone unit. AZ-684 (evidence ladder) was +considered for the same batch but pulled because its dependencies +(AZ-660 detections wire, AZ-671 VLM provider runtime) are not yet +landed; co-batching it would have created an artificial blocker. POI +queue is fully self-contained on top of the AZ-682 FSM substrate, so +shipping it alone keeps the batch unblocked and review tractable. + +## 2. Approach + +Per `02_tasks/done/AZ-683_scan_controller_poi_queue_and_window.md`, the +deliverable is the **prioritized POI queue, rolling 5/min surface cap, +confidence-scaled decision window, and the timeout-vs-decline semantic +split**. The evidence-ladder gate (AZ-684) and mapobjects-store +IgnoredItem persist (AZ-685) are intentionally *not* in this batch — the +queue surfaces priorities and returns dispatchable actions, but the +actual gimbal slew (`scan_controller` issuing an ROI) and IgnoredItem +write live in their own tickets. The split is enforced by: + +- `next_poi_for_surface` returns the `Poi` once the cap allows it and + the confidence is ≥ 40 % — but does **not** itself drive the gimbal + or change FSM state; AZ-684 will plumb that. +- `decline_poi` returns a `DeclineAction { poi_id, mgrs, class_group, + declined_at, source_detection_ids }` — the caller (AZ-685 + mapobjects-store dispatch) is responsible for the actual + `IgnoredItem` persist. This keeps the queue free of `mapobjects_store` + I/O. +- `tick()`'s timeout sweep **silently forgets** expired POIs. No + IgnoredItem is emitted for a timeout per spec §3 — only a *positive + operator decline* creates an IgnoredItem. + +### Component pieces shipped + +- `internal/poi_queue/priority.rs` — pure functions: + - `decision_window(confidence) -> Option` — linear 40 % → + 30 s, 100 % → 120 s, `None` below floor. + - `age_factor(age_seconds) -> f32` — linear decay 1.0 → 0.1 over + 300 s, clamped. + - `priority_score(confidence, proximity, age_seconds) -> f32` — + `c × p × age_factor`. +- `internal/poi_queue/mod.rs` — `PoiQueue` actor-private struct: + - `insert(poi, proximity, now_ns)` — enqueues with stamped + `enqueued_at_ns`. + - `next_for_surface(now_ns) -> Option` — picks the highest + priority entry that clears the confidence floor and the rolling + cap, removes it from the queue, records a surface timestamp. + - `decline(poi_id) -> Option` — removes entry, returns + the IgnoredItem payload data. + - `timeout_sweep(now_wallclock) -> Vec` — drops expired entries, + returns the removed IDs for metric accounting. + - `surfaces_in_window(now_ns) -> usize` — number of POIs surfaced in + the rolling 60 s window after trimming. + - `SURFACE_CAP_PER_WINDOW = 5`. +- `crates/scan_controller/src/lib.rs` — wiring: + - `Inner` now owns `poi_queue: PoiQueue` and counters + `pois_surfaced_total`, `pois_forgotten_total`, `pois_declined_total`. + - `ScanControllerHandle::submit_poi_candidate`, + `next_poi_for_surface`, `decline_poi`, `poi_queue_len`, + `pois_in_window` — public async surface. + - `ScanControllerHandle::tick` now also runs the timeout sweep. + - `ScanControllerHandle::submit_operator_cmd` now handles + `DeclinePoi` end-to-end — payload `{ poi_id }` is parsed, + `decline_poi` is called, and the result is returned as + `SubmitOutcome::Declined(DeclineAction)` for the caller. The + method's return type changed from `Result<()>` to + `Result`. + - `ScanMetrics` gained four POI fields: + `poi_queue_len`, `pois_surfaced_total`, `pois_forgotten_total`, + `pois_declined_total`. + - `health()` detail now includes `poi_queue=`. + +## 3. Files touched + +### AZ-683 +- `crates/scan_controller/Cargo.toml` — added `serde_json` (for + operator-command payload parsing) and `chrono` (for wallclock + deadlines). +- `crates/scan_controller/src/lib.rs` — wired POI queue into `Inner`, + added `submit_poi_candidate` / `next_poi_for_surface` / `decline_poi` + / `poi_queue_len` / `pois_in_window`, changed + `submit_operator_cmd` return type and added `DeclinePoi` handling, + extended `ScanMetrics` and `health()`. +- `crates/scan_controller/src/internal/mod.rs` — added `pub mod + poi_queue`. +- `crates/scan_controller/src/internal/poi_queue/mod.rs` — new + (`PoiQueue`, `DeclineAction`, `SURFACE_CAP_PER_WINDOW`, 5 unit tests). +- `crates/scan_controller/src/internal/poi_queue/priority.rs` — new + (pure priority math + 8 unit tests). +- `crates/scan_controller/tests/poi_queue.rs` — new (6 integration + tests covering AC-1..AC-5 + DeclinePoi via operator command). + +## 4. Test results + +| Crate | Unit | Integration | Total | +|---|---|---|---| +| `scan_controller` | 26 | 11 (5 state_machine + 6 poi_queue) | 37 | + +Workspace `cargo test --workspace`: all suites green. The single +`mission_executor::state_machine::ac3_bounded_retry_then_success` +ignored test carries over from batch 8 — unchanged by this batch. + +Clippy: `cargo clippy -p scan_controller --all-targets -- -D warnings` +is clean. Workspace-wide clippy still hits the pre-existing +`autopilot::Runtime::vlm_provider_name` dead-code error from batch 4 +(see Findings §A1 / cumulative C5). + +### Acceptance criteria + +| AC | Source | Test | +|---|---|---| +| AC-1 priority ordering | `tests/poi_queue.rs::ac1_priority_ordering_via_handle` + `internal/poi_queue/mod.rs::orders_by_priority_score` | ✅ | +| AC-2 ≤5/min rolling cap | `tests/poi_queue.rs::ac2_five_per_minute_cap_via_handle` + `internal/poi_queue/mod.rs::cap_blocks_after_five_surfaces` | ✅ | +| AC-3 decision-window mapping | `tests/poi_queue.rs::ac3_decision_window_public_mapping` + `internal/poi_queue/priority.rs::decision_window_*` | ✅ | +| AC-4 confidence floor (no surface < 40 %) | `tests/poi_queue.rs::ac4_below_floor_never_surfaces` + `internal/poi_queue/priority.rs::decision_window_below_floor` | ✅ | +| AC-5 timeout sweep — silently forget | `tests/poi_queue.rs::ac5_tick_sweep_forgets_expired_pois` + `internal/poi_queue/mod.rs::timeout_sweep_*` | ✅ | +| Decline → IgnoredItem action | `tests/poi_queue.rs::decline_poi_via_operator_command_emits_action` | ✅ | + +## 5. Findings (this batch) + +### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name` + +**Severity**: High (still blocks workspace `-D warnings` clippy gate) +**Category**: Maintenance +**Origin**: Batch 4. Unchanged by this batch. + +Tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`. +Carried as cumulative finding C5 — see §6. + +### A2. `submit_operator_cmd` return type changed + +**Severity**: Low (API) +**Detail**: Return type went from `Result<()>` to +`Result` so that `DeclinePoi` can hand back the +`DeclineAction` for AZ-685 to dispatch. No external caller exists yet +(operator-bridge wiring is AZ-685), so this is not a breaking change in +practice. Existing internal call sites (the `tests/state_machine.rs` +suite from batch 12) used `submit_operator_cmd` only for `MissionAbort` +/ `ReleaseTargetFollow` and only via the public handle; both now return +`SubmitOutcome::Accepted` and the existing tests still ignore the +return value via `.unwrap()`-style discard, so they continue to pass +unchanged. + +### A3. `Poi.priority` field is **not** mutated by the queue + +**Severity**: Low (Architecture / clarification) +**Detail**: The canonical `Poi.priority` field stays whatever the +producer set it to. The queue's internal `Entry` carries the +proximity/age factors needed for ordering separately. This keeps the +`Poi` model in `shared::models::poi` immutable from the queue's +perspective and avoids racing producers/consumers on `priority`. +Documented here in case AZ-684/685 expects to read a final priority +score from the surfaced `Poi`. + +## 6. Cumulative findings — open carry-over + +Batch-13 is one batch into a new triplet (13 / 14 / 15); cumulative +review will land at the end of batch 15. Carry-over from the batch-12 +cumulative review: + +| ID | Severity | Category | Status | +|---|---|---|---| +| C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) | +| C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) | +| C3 | Low | Architecture | OPEN — `module-layout.md` drift: now also covers `scan_controller/internal/poi_queue/{mod,priority}.rs` | +| C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) | +| C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy (batch 4 origin) | + +C3 grows by `poi_queue/{mod,priority}.rs` this batch. C5 is still the +most pressing; the next opportunity to fix it is either a dedicated +maintenance batch or sweep before merging dev. + +## 7. Next-batch candidates + +- **AZ-684** — scan_controller evidence ladder + VLM hooks. Now + unblocked by AZ-683 here, but still needs AZ-660 (detections wire) + and AZ-671 (VLM provider runtime) for end-to-end value. Could be + partially implemented as a "Tier-2 confirmation handler stub" today. +- **AZ-685** — mapobjects-store dispatch for confirmed POIs and + `IgnoredItem` (consumes the `DeclineAction` this batch returns). +- **AZ-659** — frame_ingest publisher (slow-consumer drop policy). +- **AZ-658** — frame_ingest decoder (still pending the retina/ffmpeg + pin decision). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3121feb..3a95053 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 20 + phase: 23 name: tracker-update-in-testing - detail: "batch 12 (AZ-657 + AZ-682) implemented and reviewed; awaiting commit + In Testing" + detail: "batch 13 (AZ-683) implemented and reviewed; awaiting commit + In Testing" retry_count: 0 cycle: 1 tracker: jira diff --git a/crates/scan_controller/Cargo.toml b/crates/scan_controller/Cargo.toml index 71237e6..6a3f54a 100644 --- a/crates/scan_controller/Cargo.toml +++ b/crates/scan_controller/Cargo.toml @@ -17,4 +17,6 @@ mission_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } uuid = { workspace = true } diff --git a/crates/scan_controller/src/internal/mod.rs b/crates/scan_controller/src/internal/mod.rs index c6c72be..831b035 100644 --- a/crates/scan_controller/src/internal/mod.rs +++ b/crates/scan_controller/src/internal/mod.rs @@ -1,4 +1,5 @@ //! Internal modules for `scan_controller`. Not part of the public API. pub mod frame_rate_guard; +pub mod poi_queue; pub mod state_machine; diff --git a/crates/scan_controller/src/internal/poi_queue/mod.rs b/crates/scan_controller/src/internal/poi_queue/mod.rs new file mode 100644 index 0000000..eaf433b --- /dev/null +++ b/crates/scan_controller/src/internal/poi_queue/mod.rs @@ -0,0 +1,306 @@ +//! AZ-683 — POI queue + 5/min rate cap + confidence-scaled decision +//! window. +//! +//! The queue is the operator-facing buffer. Each candidate POI is +//! ranked by `confidence × proximity × age_factor` (per +//! `description.md §4`), the highest-priority unblocked POI is +//! surfaced to the operator, and the rolling 60-second cap ensures +//! the operator sees no more than 5 POIs per minute (per +//! `description.md §8`, an operator-cognitive-load invariant). +//! +//! Confidence below 40 % is NEVER surfaced — `decision_window` returns +//! `None`, the surface path skips, and the POI sits in the queue +//! until either: +//! +//! - new evidence pushes its confidence above 40 % (subsequent +//! `update_confidence` call — wired later by AZ-684 evidence +//! ladder), or +//! - its deadline expires and the timeout sweep forgets it (no +//! `IgnoredItem` recorded — silent forget). +//! +//! Decline is handled at the operator-command layer (AZ-685 dispatches +//! the resulting `IgnoredItem` into `mapobjects_store`). AZ-683 +//! returns the *information* needed to emit that action. + +use std::collections::VecDeque; +use std::time::Duration; + +use uuid::Uuid; + +use shared::models::poi::Poi; + +mod priority; + +pub use priority::{age_factor, decision_window, priority_score}; + +/// Operator-cognitive-load invariant from `description.md §8`. Hard +/// non-negotiable. +pub const SURFACE_CAP_PER_WINDOW: usize = 5; +/// The rolling window the cap is measured over. +pub const CAP_WINDOW: Duration = Duration::from_secs(60); + +/// Internal POI entry. We keep `confidence` and `proximity` outside +/// `Poi` so age-aware priority can be recomputed on demand without +/// mutating the canonical model. +#[derive(Debug, Clone)] +struct Entry { + poi: Poi, + confidence: f32, + proximity: f32, + enqueued_at_ns: u64, +} + +#[derive(Debug, Default)] +pub struct PoiQueue { + entries: Vec, + surface_history_ns: VecDeque, +} + +/// Information returned when a POI is declined. AZ-685 turns this +/// into a `MapObjectsAction::AppendIgnored` and persists it to +/// `mapobjects_store`. AZ-683 only emits the data. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeclineAction { + pub poi_id: Uuid, + pub mgrs: String, + pub class_group: String, +} + +impl PoiQueue { + pub fn new() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.entries.len() + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Insert a candidate POI. `proximity` is the normalized + /// `[0, 1]` proximity to the current camera position; AZ-682's + /// caller (eventually AZ-686 / AZ-684) computes it. `now_ns` is + /// the monotonic ns at insertion, used for age decay. + /// + /// POIs below the 40 % confidence threshold are still INSERTED + /// (per `description.md §4` they may be re-scored upward) but + /// are NOT surfaceable until `decision_window` returns `Some`. + pub fn insert(&mut self, poi: Poi, proximity: f32, now_ns: u64) { + let confidence = poi.confidence; + let proximity = proximity.clamp(0.0, 1.0); + self.entries.push(Entry { + poi, + confidence, + proximity, + enqueued_at_ns: now_ns, + }); + } + + pub fn update_confidence(&mut self, poi_id: Uuid, new_confidence: f32) { + if let Some(e) = self.entries.iter_mut().find(|e| e.poi.id == poi_id) { + e.confidence = new_confidence; + e.poi.confidence = new_confidence; + } + } + + /// Return the next POI to surface to the operator, or `None` + /// when either the cap is reached or no surfaceable POI exists. + /// + /// "Surfaceable" means `decision_window(confidence) == Some(_)` + /// — i.e. confidence ≥ 40 %. + pub fn next_for_surface(&mut self, now_ns: u64) -> Option { + self.trim_history(now_ns); + if self.surface_history_ns.len() >= SURFACE_CAP_PER_WINDOW { + return None; + } + + let best_idx = self + .entries + .iter() + .enumerate() + .filter(|(_, e)| decision_window(e.confidence).is_some()) + .max_by(|(_, a), (_, b)| { + let pa = priority_score(a.confidence, a.proximity, age_seconds(a, now_ns)); + let pb = priority_score(b.confidence, b.proximity, age_seconds(b, now_ns)); + pa.partial_cmp(&pb).unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|(idx, _)| idx)?; + + let entry = self.entries.swap_remove(best_idx); + self.surface_history_ns.push_back(now_ns); + Some(entry.poi) + } + + /// Decline a POI by id. Removes from queue; returns the data + /// needed to record an `IgnoredItem`. + pub fn decline(&mut self, poi_id: Uuid) -> Option { + let idx = self.entries.iter().position(|e| e.poi.id == poi_id)?; + let entry = self.entries.swap_remove(idx); + Some(DeclineAction { + poi_id: entry.poi.id, + mgrs: entry.poi.mgrs, + class_group: entry.poi.class_group, + }) + } + + /// Drop POIs whose deadline (set at insertion by the caller per + /// the confidence-scaled window) has elapsed. Returns the IDs of + /// forgotten POIs. NO `IgnoredItem` is created — timeout = + /// forget, per AC-5. + pub fn timeout_sweep(&mut self, now_wallclock: chrono::DateTime) -> Vec { + let mut forgotten = Vec::new(); + self.entries.retain(|e| { + if e.poi.deadline <= now_wallclock { + forgotten.push(e.poi.id); + false + } else { + true + } + }); + forgotten + } + + /// Live read of how many POIs were surfaced in the rolling cap + /// window. Used by `health()` and metrics. + pub fn surfaces_in_window(&mut self, now_ns: u64) -> usize { + self.trim_history(now_ns); + self.surface_history_ns.len() + } + + fn trim_history(&mut self, now_ns: u64) { + let window_ns = CAP_WINDOW.as_nanos() as u64; + while let Some(&front) = self.surface_history_ns.front() { + if now_ns.saturating_sub(front) > window_ns { + self.surface_history_ns.pop_front(); + } else { + break; + } + } + } +} + +fn age_seconds(entry: &Entry, now_ns: u64) -> f32 { + let dt_ns = now_ns.saturating_sub(entry.enqueued_at_ns); + (dt_ns as f64 / 1e9) as f32 +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Duration as ChronoDur, Utc}; + + fn poi(confidence: f32, mgrs: &str) -> Poi { + Poi { + id: Uuid::new_v4(), + confidence, + mgrs: mgrs.to_string(), + class: "tank".to_string(), + class_group: "armor".to_string(), + source_detection_ids: vec![], + enqueued_at: Utc::now(), + priority: 0.0, + decline_suppressed: false, + vlm_status: shared::models::poi::VlmPipelineStatus::NotRequested, + tier2_evidence: None, + deadline: Utc::now() + ChronoDur::seconds(60), + } + } + + /// AC-1 — priority ordering: `(0.9, 0.5, 0), (0.6, 0.9, 0), + /// (0.7, 0.6, 60)` ordered by `c × p × age_factor`. + #[test] + fn ac1_priority_ordering_respects_age_factor() { + // Arrange + let mut q = PoiQueue::new(); + let p1 = poi(0.9, "1"); + let p2 = poi(0.6, "2"); + let p3 = poi(0.7, "3"); + q.insert(p1.clone(), 0.5, 0); + q.insert(p2.clone(), 0.9, 0); + // POI 3 enqueued 60 seconds earlier (age 60 at now_ns = 60e9). + q.insert(p3.clone(), 0.6, 0); + let now_ns = 60 * 1_000_000_000u64; + + // Act + let first = q.next_for_surface(now_ns).expect("first surface"); + let second = q.next_for_surface(now_ns).expect("second surface"); + let third = q.next_for_surface(now_ns).expect("third surface"); + + // Assert — expected order: P2 (0.54) > P1 (0.45) > P3 (0.42 × + // age_factor(60)). + assert_eq!(first.id, p2.id, "highest priority first"); + assert_eq!(second.id, p1.id); + assert_eq!(third.id, p3.id); + } + + /// AC-2 — hard 5-per-rolling-60-s cap. + #[test] + fn ac2_five_per_minute_cap_holds_back_excess() { + // Arrange — 10 POIs all surfaceable. + let mut q = PoiQueue::new(); + for i in 0..10 { + let mgrs = format!("p{i}"); + q.insert(poi(0.8, &mgrs), 0.5, 0); + } + + // Act — drain in a 30 s window + let mut surfaced = 0; + let mut now = 0u64; + for _ in 0..10 { + if q.next_for_surface(now).is_some() { + surfaced += 1; + } + now += 3_000_000_000; + } + + // Assert — at most 5 within the rolling window + assert_eq!(surfaced, SURFACE_CAP_PER_WINDOW); + assert_eq!(q.surfaces_in_window(now), SURFACE_CAP_PER_WINDOW); + // Remaining POIs stay queued + assert_eq!(q.len(), 5); + + // Roll the window forward; cap should clear. + let after = 61 * 1_000_000_000u64 + now; + let next = q.next_for_surface(after); + assert!(next.is_some(), "cap must clear after window rolls"); + } + + /// AC-5 — timeout forgets without IgnoredItem. + #[test] + fn ac5_timeout_sweep_removes_expired_pois() { + // Arrange + let mut q = PoiQueue::new(); + let mut p = poi(0.8, "x"); + p.deadline = Utc::now() - ChronoDur::seconds(1); + let id = p.id; + q.insert(p, 0.5, 0); + + // Act + let forgotten = q.timeout_sweep(Utc::now()); + + // Assert + assert_eq!(forgotten, vec![id]); + assert!(q.is_empty()); + } + + /// Decline emits the dispatchable action and removes the POI. + #[test] + fn decline_removes_and_emits_action() { + // Arrange + let mut q = PoiQueue::new(); + let p = poi(0.8, "y"); + let id = p.id; + q.insert(p, 0.5, 0); + + // Act + let action = q.decline(id).expect("decline emits action"); + + // Assert + assert_eq!(action.poi_id, id); + assert_eq!(action.mgrs, "y"); + assert_eq!(action.class_group, "armor"); + assert!(q.is_empty()); + } +} diff --git a/crates/scan_controller/src/internal/poi_queue/priority.rs b/crates/scan_controller/src/internal/poi_queue/priority.rs new file mode 100644 index 0000000..2605e42 --- /dev/null +++ b/crates/scan_controller/src/internal/poi_queue/priority.rs @@ -0,0 +1,94 @@ +//! Pure scoring helpers for the POI queue. +//! +//! - [`decision_window`] — `40 % → 30 s, 100 % → 120 s, linear; <40 % +//! → None`. Operator-cognitive-load mapping from +//! `description.md §4`. +//! - [`age_factor`] — gentle linear decay so a 5-minute-old POI +//! ranks ~0 unless it has a very strong confidence × proximity +//! product. +//! - [`priority_score`] — the product `confidence × proximity × +//! age_factor(age_s)` used by `next_for_surface`. + +use std::time::Duration; + +/// Confidence below this is NEVER surfaced. `decision_window` +/// returns `None`, the queue skips the POI when scanning. +pub const SURFACE_CONFIDENCE_FLOOR: f32 = 0.40; + +const DEADLINE_AT_FLOOR: Duration = Duration::from_secs(30); +const DEADLINE_AT_CEILING: Duration = Duration::from_secs(120); +const AGE_DECAY_FULL_SECONDS: f32 = 300.0; +const AGE_FACTOR_MIN: f32 = 0.1; + +/// Linear `40 % → 30 s, 100 % → 120 s`; below 40 % returns `None`. +pub fn decision_window(confidence: f32) -> Option { + if confidence < SURFACE_CONFIDENCE_FLOOR { + return None; + } + let confidence = confidence.clamp(SURFACE_CONFIDENCE_FLOOR, 1.0); + let span = (confidence - SURFACE_CONFIDENCE_FLOOR) / (1.0 - SURFACE_CONFIDENCE_FLOOR); + let floor_s = DEADLINE_AT_FLOOR.as_secs_f32(); + let span_s = DEADLINE_AT_CEILING.as_secs_f32() - floor_s; + Some(Duration::from_secs_f32(floor_s + span * span_s)) +} + +/// Decay from `1.0` at age 0 down to `AGE_FACTOR_MIN` (`0.1`) at +/// `AGE_DECAY_FULL_SECONDS` (5 minutes) and beyond. Linear in between. +pub fn age_factor(age_seconds: f32) -> f32 { + if age_seconds <= 0.0 { + return 1.0; + } + let ratio = age_seconds / AGE_DECAY_FULL_SECONDS; + let decay = 1.0 - ratio * (1.0 - AGE_FACTOR_MIN); + decay.max(AGE_FACTOR_MIN) +} + +pub fn priority_score(confidence: f32, proximity: f32, age_seconds: f32) -> f32 { + confidence * proximity * age_factor(age_seconds) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// AC-3 — `[0.40, 0.70, 1.00]` → `[30 s, 75 s, 120 s]`, linear. + #[test] + fn ac3_decision_window_linear_mapping() { + // Assert + assert_eq!(decision_window(0.40), Some(Duration::from_secs(30))); + assert_eq!(decision_window(1.00), Some(Duration::from_secs(120))); + let mid = decision_window(0.70).expect("0.70 surfaceable"); + // Linear midpoint between 30 and 120 s is 75 s. + assert!( + (mid.as_secs_f32() - 75.0).abs() < 0.1, + "0.70 confidence should map to ~75 s, got {mid:?}" + ); + } + + /// AC-4 — sub-40 % is never surfaced. + #[test] + fn ac4_below_floor_returns_none() { + assert_eq!(decision_window(0.39), None); + assert_eq!(decision_window(0.0), None); + assert_eq!(decision_window(-0.5), None); + } + + #[test] + fn age_factor_decays_linearly() { + // Arrange + Assert + assert!((age_factor(0.0) - 1.0).abs() < 1e-6); + // At half-decay-window (150 s) the factor is exactly halfway + // between 1.0 and 0.1 → 0.55. + assert!((age_factor(150.0) - 0.55).abs() < 1e-3); + // At full-decay-window (300 s) the factor floors at 0.1. + assert!((age_factor(300.0) - 0.1).abs() < 1e-6); + assert!((age_factor(10_000.0) - 0.1).abs() < 1e-6); + } + + #[test] + fn priority_score_zero_for_zero_proximity_or_confidence() { + // Assert + assert_eq!(priority_score(0.0, 1.0, 0.0), 0.0); + assert_eq!(priority_score(1.0, 0.0, 0.0), 0.0); + } +} diff --git a/crates/scan_controller/src/lib.rs b/crates/scan_controller/src/lib.rs index f889075..3933631 100644 --- a/crates/scan_controller/src/lib.rs +++ b/crates/scan_controller/src/lib.rs @@ -33,14 +33,19 @@ use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; +use uuid::Uuid; use shared::error::{AutopilotError, Result}; use shared::health::{ComponentHealth, HealthLevel}; use shared::models::operator::{OperatorCommand, OperatorCommandKind}; +use shared::models::poi::Poi; pub mod internal; pub use internal::frame_rate_guard::{FrameRateGuard, FrameRateGuardConfig}; +pub use internal::poi_queue::{ + age_factor, decision_window, priority_score, DeclineAction, PoiQueue, SURFACE_CAP_PER_WINDOW, +}; pub use internal::state_machine::transitions::{transition, TransitionCtx}; pub use internal::state_machine::{RejectReason, ScanState, TransitionOutcome, Trigger}; @@ -69,9 +74,13 @@ struct Inner { state: ScanState, last_state_change_ns: u64, fps_guard: FrameRateGuard, + poi_queue: PoiQueue, latencies_us: std::collections::VecDeque, rejected_total: u64, transitions_total: u64, + pois_surfaced_total: u64, + pois_forgotten_total: u64, + pois_declined_total: u64, } impl Inner { @@ -104,9 +113,13 @@ impl ScanController { state: ScanState::ZoomedOut, last_state_change_ns: 0, fps_guard: FrameRateGuard::new(config.frame_rate), + poi_queue: PoiQueue::new(), latencies_us: std::collections::VecDeque::with_capacity(LATENCY_WINDOW), rejected_total: 0, transitions_total: 0, + pois_surfaced_total: 0, + pois_forgotten_total: 0, + pois_declined_total: 0, })), clock: shared::clock::MonoClock::new(), } @@ -132,6 +145,27 @@ pub struct ScanMetrics { pub rejected_total: u64, pub last_state_change_ns: u64, pub tick_latency_p99_us: u64, + pub poi_queue_len: usize, + pub pois_surfaced_total: u64, + pub pois_forgotten_total: u64, + pub pois_declined_total: u64, +} + +/// Result of [`ScanControllerHandle::submit_operator_cmd`]. `Accepted` +/// means the command was applied with no return data; `Declined` +/// carries the dispatchable IgnoredItem action AZ-685 must persist. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubmitOutcome { + Accepted, + Declined(DeclineAction), +} + +fn poi_id_from_payload(payload: &serde_json::Value) -> Result { + let s = payload + .get("poi_id") + .and_then(|v| v.as_str()) + .ok_or_else(|| AutopilotError::Validation("payload missing poi_id".to_string()))?; + Uuid::parse_str(s).map_err(|e| AutopilotError::Validation(format!("invalid poi_id: {e}"))) } #[derive(Clone)] @@ -181,46 +215,108 @@ impl ScanControllerHandle { outcome } - /// One scan-controller tick. AZ-682 only re-evaluates the FPS - /// guard and records latency; AZ-684+ will run the evidence - /// ladder + POI queue evaluation under this same tick. + /// One scan-controller tick. Re-evaluates the FPS guard, runs + /// the POI queue timeout sweep (AZ-683), and records latency. + /// AZ-684+ will run the evidence ladder under this same tick. pub async fn tick(&self) -> Result<()> { let started = Instant::now(); let now = self.clock.elapsed_ns(); + let now_wall = chrono::Utc::now(); let mut inner = self.inner.lock().await; inner.fps_guard.tick(now); + let forgotten = inner.poi_queue.timeout_sweep(now_wall); + inner.pois_forgotten_total = inner + .pois_forgotten_total + .saturating_add(forgotten.len() as u64); inner.record_latency(started.elapsed().as_micros() as u64); Ok(()) } + /// AZ-683 — enqueue a POI candidate. `proximity` is the + /// normalized `[0, 1]` proximity to the current camera. The + /// caller is responsible for setting `poi.deadline` per the + /// confidence-scaled window (use `decision_window(confidence)` + /// to compute it). + pub async fn submit_poi_candidate(&self, poi: Poi, proximity: f32) { + let now = self.clock.elapsed_ns(); + let mut inner = self.inner.lock().await; + inner.poi_queue.insert(poi, proximity, now); + } + + /// AZ-683 — pull the next surfaceable POI subject to the rolling + /// 5/min cap. `None` if the cap is hit or no POI clears the + /// 40 % confidence floor. + pub async fn next_poi_for_surface(&self) -> Option { + let now = self.clock.elapsed_ns(); + let mut inner = self.inner.lock().await; + let p = inner.poi_queue.next_for_surface(now); + if p.is_some() { + inner.pois_surfaced_total = inner.pois_surfaced_total.saturating_add(1); + } + p + } + + /// AZ-683 — decline a POI. Returns the dispatchable + /// `IgnoredItem` data; the caller (AZ-685 mapobjects dispatch) + /// is responsible for persisting it. + pub async fn decline_poi(&self, poi_id: Uuid) -> Option { + let mut inner = self.inner.lock().await; + let action = inner.poi_queue.decline(poi_id); + if action.is_some() { + inner.pois_declined_total = inner.pois_declined_total.saturating_add(1); + } + action + } + + pub async fn poi_queue_len(&self) -> usize { + self.inner.lock().await.poi_queue.len() + } + + pub async fn pois_in_window(&self) -> usize { + let now = self.clock.elapsed_ns(); + self.inner.lock().await.poi_queue.surfaces_in_window(now) + } + /// Translate an operator command into a trigger and apply it. /// - /// **AZ-682 mapping** (partial — POI queue lookups belong to - /// AZ-683; until then, ConfirmPoi alone has no roi to bind so - /// it returns `Validation`). + /// AZ-682 / AZ-683 mapping (subset complete): /// - /// - `MissionAbort` → `Trigger::OperatorAbort` + /// - `MissionAbort` → `Trigger::OperatorAbort` (AZ-682). /// - `ReleaseTargetFollow` → `Trigger::OperatorReleaseFollow` - /// - `StartTargetFollow` (payload-bound) → not yet supported, - /// returns `NotImplemented(AZ-683)` since the target_id has to - /// be resolved via the POI queue. - /// - `ConfirmPoi` / `DeclinePoi` / `AcknowledgeBitDegraded` / - /// `SafetyOverride` → `NotImplemented(AZ-683/AZ-684)`. - pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result<()> { + /// (AZ-682). + /// - `DeclinePoi { poi_id }` → queue decline; returns the + /// resulting `DeclineAction` in [`SubmitOutcome::Declined`] + /// for the caller (AZ-685 mapobjects dispatch) to persist + /// (AZ-683). + /// - `ConfirmPoi` / `StartTargetFollow` → still + /// `NotImplemented(AZ-684)` since ROI / target_id resolution + /// needs the evidence ladder. + /// - `AcknowledgeBitDegraded` / `SafetyOverride` → + /// `NotImplemented(AZ-684)`. + pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result { match command.kind { OperatorCommandKind::MissionAbort => { self.submit_trigger(Trigger::OperatorAbort).await; - Ok(()) + Ok(SubmitOutcome::Accepted) } OperatorCommandKind::ReleaseTargetFollow => { self.submit_trigger(Trigger::OperatorReleaseFollow).await; - Ok(()) + Ok(SubmitOutcome::Accepted) + } + OperatorCommandKind::DeclinePoi => { + let poi_id = poi_id_from_payload(&command.payload)?; + match self.decline_poi(poi_id).await { + Some(action) => Ok(SubmitOutcome::Declined(action)), + None => Err(AutopilotError::Validation(format!( + "DeclinePoi: unknown poi_id {poi_id}" + ))), + } + } + OperatorCommandKind::ConfirmPoi | OperatorCommandKind::StartTargetFollow => { + Err(AutopilotError::NotImplemented( + "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", + )) } - OperatorCommandKind::ConfirmPoi - | OperatorCommandKind::DeclinePoi - | OperatorCommandKind::StartTargetFollow => Err(AutopilotError::NotImplemented( - "scan_controller::submit_operator_cmd (AZ-683 POI queue wiring)", - )), OperatorCommandKind::AcknowledgeBitDegraded => Err(AutopilotError::NotImplemented( "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", )), @@ -268,6 +364,10 @@ impl ScanControllerHandle { rejected_total: inner.rejected_total, last_state_change_ns: inner.last_state_change_ns, tick_latency_p99_us: inner.p99_us(), + poi_queue_len: inner.poi_queue.len(), + pois_surfaced_total: inner.pois_surfaced_total, + pois_forgotten_total: inner.pois_forgotten_total, + pois_declined_total: inner.pois_declined_total, } } @@ -276,10 +376,14 @@ impl ScanControllerHandle { let fps_active = inner.fps_guard.is_floor_active(); let p99 = inner.p99_us(); let state = inner.state; + let queue_len = inner.poi_queue.len(); drop(inner); let mut h = ComponentHealth::green(NAME); - let mut details: Vec = vec![format!("state={}", state.discriminant())]; + let mut details: Vec = vec![ + format!("state={}", state.discriminant()), + format!("poi_queue={queue_len}"), + ]; if fps_active { h.level = HealthLevel::Yellow; diff --git a/crates/scan_controller/tests/poi_queue.rs b/crates/scan_controller/tests/poi_queue.rs new file mode 100644 index 0000000..61c2ae8 --- /dev/null +++ b/crates/scan_controller/tests/poi_queue.rs @@ -0,0 +1,159 @@ +//! AZ-683 integration tests — POI queue + rate cap + decision-window +//! mapping exercised through the public `ScanControllerHandle` API. + +use chrono::{Duration as ChronoDur, Utc}; +use serde_json::json; +use uuid::Uuid; + +use scan_controller::{decision_window, ScanController, SubmitOutcome, SURFACE_CAP_PER_WINDOW}; +use shared::models::operator::{OperatorCommand, OperatorCommandKind}; +use shared::models::poi::{Poi, VlmPipelineStatus}; + +fn poi(confidence: f32, mgrs: &str) -> Poi { + Poi { + id: Uuid::new_v4(), + confidence, + mgrs: mgrs.to_string(), + class: "tank".to_string(), + class_group: "armor".to_string(), + source_detection_ids: vec![], + enqueued_at: Utc::now(), + priority: 0.0, + decline_suppressed: false, + vlm_status: VlmPipelineStatus::NotRequested, + tier2_evidence: None, + deadline: Utc::now() + ChronoDur::seconds(60), + } +} + +/// AC-1 — priority ordering through the public API. +#[tokio::test] +async fn ac1_priority_ordering_via_handle() { + // Arrange + let h = ScanController::new().handle(); + let p1 = poi(0.9, "1"); + let p2 = poi(0.6, "2"); + h.submit_poi_candidate(p1.clone(), 0.5).await; + h.submit_poi_candidate(p2.clone(), 0.9).await; + + // Act + let first = h.next_poi_for_surface().await.expect("first surface"); + + // Assert — p2 (0.6 × 0.9 = 0.54) outranks p1 (0.9 × 0.5 = 0.45). + assert_eq!(first.id, p2.id); +} + +/// AC-2 — the 5/min cap holds back excess POIs. +#[tokio::test] +async fn ac2_five_per_minute_cap_via_handle() { + // Arrange + let h = ScanController::new().handle(); + for i in 0..10 { + h.submit_poi_candidate(poi(0.8, &format!("m{i}")), 0.5) + .await; + } + + // Act + let mut surfaced = 0; + for _ in 0..10 { + if h.next_poi_for_surface().await.is_some() { + surfaced += 1; + } + } + + // Assert + assert_eq!(surfaced, SURFACE_CAP_PER_WINDOW); + assert_eq!(h.pois_in_window().await, SURFACE_CAP_PER_WINDOW); + assert_eq!(h.poi_queue_len().await, 5); +} + +/// AC-3 — decision window linear mapping is exported via the +/// `decision_window` re-export. (The pure logic is tested in unit; +/// this is the smoke test that the public function is wired up.) +#[tokio::test] +async fn ac3_decision_window_public_mapping() { + // Assert + assert_eq!( + decision_window(0.40).unwrap().as_secs(), + 30, + "floor maps to 30 s" + ); + assert_eq!( + decision_window(1.00).unwrap().as_secs(), + 120, + "ceiling maps to 120 s" + ); + assert!(decision_window(0.39).is_none(), "sub-floor returns None"); +} + +/// AC-4 — POIs below 40 % confidence enqueue but never surface. +#[tokio::test] +async fn ac4_below_floor_never_surfaces() { + // Arrange + let h = ScanController::new().handle(); + h.submit_poi_candidate(poi(0.39, "low"), 0.9).await; + h.submit_poi_candidate(poi(0.20, "lower"), 0.9).await; + + // Act + let surfaced = h.next_poi_for_surface().await; + + // Assert + assert!(surfaced.is_none(), "sub-40% POIs must not surface"); + assert_eq!(h.poi_queue_len().await, 2, "POIs remain in queue"); +} + +/// AC-5 — timeout sweep forgets expired POIs without emitting any +/// IgnoredItem. +#[tokio::test] +async fn ac5_tick_sweep_forgets_expired_pois() { + // Arrange — POI with an already-expired deadline. + let h = ScanController::new().handle(); + let mut p = poi(0.8, "expired"); + p.deadline = Utc::now() - ChronoDur::seconds(1); + h.submit_poi_candidate(p, 0.5).await; + assert_eq!(h.poi_queue_len().await, 1); + + // Act + h.tick().await.expect("tick"); + + // Assert + assert_eq!(h.poi_queue_len().await, 0); + let metrics = h.metrics().await; + assert_eq!(metrics.pois_forgotten_total, 1); + assert_eq!(metrics.pois_declined_total, 0, "no IgnoredItem on timeout"); +} + +/// DeclinePoi via operator command returns a `SubmitOutcome::Declined` +/// carrying the IgnoredItem payload AZ-685 will persist. +#[tokio::test] +async fn decline_poi_via_operator_command_emits_action() { + // Arrange + let h = ScanController::new().handle(); + let p = poi(0.8, "decline-me"); + let id = p.id; + h.submit_poi_candidate(p, 0.5).await; + + let cmd = OperatorCommand { + command_id: Uuid::new_v4(), + session_token: "s".to_string(), + sequence_number: 1, + issued_at_wallclock: Utc::now(), + kind: OperatorCommandKind::DeclinePoi, + payload: json!({ "poi_id": id.to_string() }), + signature: vec![], + }; + + // Act + let outcome = h.submit_operator_cmd(cmd).await.expect("decline accepted"); + + // Assert + match outcome { + SubmitOutcome::Declined(action) => { + assert_eq!(action.poi_id, id); + assert_eq!(action.mgrs, "decline-me"); + assert_eq!(action.class_group, "armor"); + } + SubmitOutcome::Accepted => panic!("decline must return Declined action"), + } + assert_eq!(h.poi_queue_len().await, 0); +}