[AZ-683] scan_controller POI queue + 5/min cap + decision window
ci/woodpecker/push/build-arm Pipeline failed

Adds the prioritized POI queue on top of the AZ-682 FSM substrate:
priority = confidence x proximity x age_factor; rolling 60s window
caps surfaces at 5; confidence-scaled decision window (40% -> 30s,
100% -> 120s, linear; <40% never surfaces); tick() runs the timeout
sweep and silently forgets expired POIs (no IgnoredItem per spec);
DeclinePoi via operator command returns a DeclineAction for AZ-685
to persist.

ScanControllerHandle gains submit_poi_candidate /
next_poi_for_surface / decline_poi / poi_queue_len /
pois_in_window. submit_operator_cmd return type widens from
Result<()> to Result<SubmitOutcome>. ScanMetrics and health()
surface queue depth and counters.

Tests: 26 unit + 11 integration in scan_controller (all AC1..AC5 +
DeclinePoi end-to-end). Workspace clippy on scan_controller clean.
Pre-existing autopilot::Runtime::vlm_provider_name dead-code error
from batch 4 still open (see cumulative C5).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 09:04:29 +03:00
parent 745ab806f1
commit 9fe0bbeac9
10 changed files with 885 additions and 23 deletions
+159
View File
@@ -0,0 +1,159 @@
//! AZ-683 integration tests — POI queue + rate cap + decision-window
//! mapping exercised through the public `ScanControllerHandle` API.
use chrono::{Duration as ChronoDur, Utc};
use serde_json::json;
use uuid::Uuid;
use scan_controller::{decision_window, ScanController, SubmitOutcome, SURFACE_CAP_PER_WINDOW};
use shared::models::operator::{OperatorCommand, OperatorCommandKind};
use shared::models::poi::{Poi, VlmPipelineStatus};
fn poi(confidence: f32, mgrs: &str) -> Poi {
Poi {
id: Uuid::new_v4(),
confidence,
mgrs: mgrs.to_string(),
class: "tank".to_string(),
class_group: "armor".to_string(),
source_detection_ids: vec![],
enqueued_at: Utc::now(),
priority: 0.0,
decline_suppressed: false,
vlm_status: VlmPipelineStatus::NotRequested,
tier2_evidence: None,
deadline: Utc::now() + ChronoDur::seconds(60),
}
}
/// AC-1 — priority ordering through the public API.
#[tokio::test]
async fn ac1_priority_ordering_via_handle() {
// Arrange
let h = ScanController::new().handle();
let p1 = poi(0.9, "1");
let p2 = poi(0.6, "2");
h.submit_poi_candidate(p1.clone(), 0.5).await;
h.submit_poi_candidate(p2.clone(), 0.9).await;
// Act
let first = h.next_poi_for_surface().await.expect("first surface");
// Assert — p2 (0.6 × 0.9 = 0.54) outranks p1 (0.9 × 0.5 = 0.45).
assert_eq!(first.id, p2.id);
}
/// AC-2 — the 5/min cap holds back excess POIs.
#[tokio::test]
async fn ac2_five_per_minute_cap_via_handle() {
// Arrange
let h = ScanController::new().handle();
for i in 0..10 {
h.submit_poi_candidate(poi(0.8, &format!("m{i}")), 0.5)
.await;
}
// Act
let mut surfaced = 0;
for _ in 0..10 {
if h.next_poi_for_surface().await.is_some() {
surfaced += 1;
}
}
// Assert
assert_eq!(surfaced, SURFACE_CAP_PER_WINDOW);
assert_eq!(h.pois_in_window().await, SURFACE_CAP_PER_WINDOW);
assert_eq!(h.poi_queue_len().await, 5);
}
/// AC-3 — decision window linear mapping is exported via the
/// `decision_window` re-export. (The pure logic is tested in unit;
/// this is the smoke test that the public function is wired up.)
#[tokio::test]
async fn ac3_decision_window_public_mapping() {
// Assert
assert_eq!(
decision_window(0.40).unwrap().as_secs(),
30,
"floor maps to 30 s"
);
assert_eq!(
decision_window(1.00).unwrap().as_secs(),
120,
"ceiling maps to 120 s"
);
assert!(decision_window(0.39).is_none(), "sub-floor returns None");
}
/// AC-4 — POIs below 40 % confidence enqueue but never surface.
#[tokio::test]
async fn ac4_below_floor_never_surfaces() {
// Arrange
let h = ScanController::new().handle();
h.submit_poi_candidate(poi(0.39, "low"), 0.9).await;
h.submit_poi_candidate(poi(0.20, "lower"), 0.9).await;
// Act
let surfaced = h.next_poi_for_surface().await;
// Assert
assert!(surfaced.is_none(), "sub-40% POIs must not surface");
assert_eq!(h.poi_queue_len().await, 2, "POIs remain in queue");
}
/// AC-5 — timeout sweep forgets expired POIs without emitting any
/// IgnoredItem.
#[tokio::test]
async fn ac5_tick_sweep_forgets_expired_pois() {
// Arrange — POI with an already-expired deadline.
let h = ScanController::new().handle();
let mut p = poi(0.8, "expired");
p.deadline = Utc::now() - ChronoDur::seconds(1);
h.submit_poi_candidate(p, 0.5).await;
assert_eq!(h.poi_queue_len().await, 1);
// Act
h.tick().await.expect("tick");
// Assert
assert_eq!(h.poi_queue_len().await, 0);
let metrics = h.metrics().await;
assert_eq!(metrics.pois_forgotten_total, 1);
assert_eq!(metrics.pois_declined_total, 0, "no IgnoredItem on timeout");
}
/// DeclinePoi via operator command returns a `SubmitOutcome::Declined`
/// carrying the IgnoredItem payload AZ-685 will persist.
#[tokio::test]
async fn decline_poi_via_operator_command_emits_action() {
// Arrange
let h = ScanController::new().handle();
let p = poi(0.8, "decline-me");
let id = p.id;
h.submit_poi_candidate(p, 0.5).await;
let cmd = OperatorCommand {
command_id: Uuid::new_v4(),
session_token: "s".to_string(),
sequence_number: 1,
issued_at_wallclock: Utc::now(),
kind: OperatorCommandKind::DeclinePoi,
payload: json!({ "poi_id": id.to_string() }),
signature: vec![],
};
// Act
let outcome = h.submit_operator_cmd(cmd).await.expect("decline accepted");
// Assert
match outcome {
SubmitOutcome::Declined(action) => {
assert_eq!(action.poi_id, id);
assert_eq!(action.mgrs, "decline-me");
assert_eq!(action.class_group, "armor");
}
SubmitOutcome::Accepted => panic!("decline must return Declined action"),
}
assert_eq!(h.poi_queue_len().await, 0);
}