Files
Oleksandr Bezdieniezhnykh c4eff40dbc [AZ-680] [AZ-681] operator_bridge command dispatch + safety lane
Add the operator-command dispatcher behind a typed CommandAck:
60 s per-command-id idempotency cache, surfaced-POI registry with
unknown_poi_id + expired gates, BIT-degraded ack severity check, and
SafetyOverride forwarding to mission_executor with structured audit
log (redacts signature + session_token).

Cross-layer wiring goes through three new traits in shared::contracts
(ScanCommandRouter, MissionSafetyRouter, BitReportSeverityLookup) so
operator_bridge stays free of direct scan_controller / mission_executor
imports. scan_controller::ScanControllerHandle implements the scan
router; a new mission_executor::SafetyDispatchHandle wraps the BIT
ack channel + battery monitor handle and implements the safety router;
BitControllerHandle gains a bounded (16-entry) report-severity cache
for the lookup trait.

scan_controller also picks up ConfirmPoi handling: PoiQueue::confirm
removes the entry and SubmitOutcome::Confirmed carries the typed
(target_mgrs, target_class) hint for AZ-684/AZ-686 downstream.

Tests: 9 new integration tests in operator_bridge/tests/dispatcher.rs
cover AZ-680 AC-1..AC-5 + AZ-681 AC-1..AC-4. scan_controller adds 2
ConfirmPoi tests. All modified-crate suites green; one pre-existing
mission_executor state-machine test flake (already documented in
_docs/_process_leftovers) updated to note ac1 also affected.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 17:32:59 +03:00

440 lines
13 KiB
Rust

//! AZ-680 + AZ-681 — operator-command dispatcher acceptance tests.
//!
//! These tests exercise the dispatcher through the public
//! `OperatorBridgeHandle::dispatch_command` surface so the wiring
//! between the surfaced-POI registry, the idempotency cache, the
//! scan router, the safety router, the BIT severity lookup, and the
//! audit sink is covered end-to-end.
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use async_trait::async_trait;
use chrono::{Duration as ChronoDuration, Utc};
use parking_lot::Mutex;
use serde_json::json;
use uuid::Uuid;
use operator_bridge::{
ack_reasons, AuditEntry, AuditSink, CommandAck, OperatorBridge, SurfacedPoi,
};
use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter};
use shared::error::Result;
use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope};
// ============================================================================
// Test doubles
// ============================================================================
#[derive(Default)]
struct RecordingScanRouter {
calls: StdMutex<Vec<OperatorCommand>>,
}
#[async_trait]
impl ScanCommandRouter for RecordingScanRouter {
async fn route(&self, command: OperatorCommand) -> Result<()> {
self.calls.lock().unwrap().push(command);
Ok(())
}
}
#[derive(Default)]
struct RecordingSafetyRouter {
bit_acks: StdMutex<Vec<(Uuid, Option<String>)>>,
overrides: StdMutex<Vec<(SafetyOverrideScope, u32, String, String)>>,
}
#[async_trait]
impl MissionSafetyRouter for RecordingSafetyRouter {
async fn acknowledge_bit_degraded(
&self,
report_id: Uuid,
operator_id: Option<String>,
) -> Result<()> {
self.bit_acks.lock().unwrap().push((report_id, operator_id));
Ok(())
}
async fn apply_safety_override(
&self,
scope: SafetyOverrideScope,
duration_secs: u32,
operator_id: String,
rationale: String,
) -> Result<()> {
self.overrides
.lock()
.unwrap()
.push((scope, duration_secs, operator_id, rationale));
Ok(())
}
}
/// Severity lookup that returns whatever is registered for each id.
/// `Some(true)` for acknowledgeable (Degraded), `Some(false)` for
/// Fail, `None` for unknown.
#[derive(Default)]
struct StubBitSeverity {
inner: StdMutex<std::collections::HashMap<Uuid, bool>>,
}
impl StubBitSeverity {
fn set(&self, report_id: Uuid, acknowledgeable: bool) {
self.inner
.lock()
.unwrap()
.insert(report_id, acknowledgeable);
}
}
#[async_trait]
impl BitReportSeverityLookup for StubBitSeverity {
async fn is_acknowledgeable(&self, report_id: Uuid) -> Option<bool> {
self.inner.lock().unwrap().get(&report_id).copied()
}
}
#[derive(Default, Clone)]
struct RecordingAuditSink {
entries: Arc<Mutex<Vec<AuditEntry>>>,
}
#[async_trait]
impl AuditSink for RecordingAuditSink {
async fn record(&self, entry: AuditEntry) {
self.entries.lock().push(entry);
}
}
// ============================================================================
// Helpers
// ============================================================================
fn cmd(kind: OperatorCommandKind, payload: serde_json::Value) -> OperatorCommand {
OperatorCommand {
command_id: Uuid::new_v4(),
session_token: "session".to_string(),
sequence_number: 1,
issued_at_wallclock: Utc::now(),
kind,
payload,
signature: vec![],
}
}
fn surfaced(deadline_secs: i64) -> SurfacedPoi {
SurfacedPoi {
poi_id: Uuid::new_v4(),
mgrs: "33UWP05".into(),
class_group: "vehicle".into(),
deadline: Utc::now() + ChronoDuration::seconds(deadline_secs),
}
}
struct Harness {
bridge: OperatorBridge,
scan: Arc<RecordingScanRouter>,
safety: Arc<RecordingSafetyRouter>,
severity: Arc<StubBitSeverity>,
audit: RecordingAuditSink,
}
fn harness() -> Harness {
let scan = Arc::new(RecordingScanRouter::default());
let safety = Arc::new(RecordingSafetyRouter::default());
let severity = Arc::new(StubBitSeverity::default());
let audit = RecordingAuditSink::default();
let bridge = OperatorBridge::new(8)
.with_scan_router(scan.clone() as Arc<dyn ScanCommandRouter>)
.with_safety_router(safety.clone() as Arc<dyn MissionSafetyRouter>)
.with_bit_severity_lookup(severity.clone() as Arc<dyn BitReportSeverityLookup>)
.with_audit_sink(Arc::new(audit.clone()) as Arc<dyn AuditSink>)
.with_dispatcher();
Harness {
bridge,
scan,
safety,
severity,
audit,
}
}
// ============================================================================
// AZ-680 ACs
// ============================================================================
/// AZ-680 AC-1 — Confirm forwards target hint.
#[tokio::test]
async fn az680_ac1_confirm_forwards_to_scan_router() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
let surfaced = surfaced(120);
h.bridge.surfaced_registry().record(surfaced.clone());
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::ConfirmPoi,
json!({ "poi_id": surfaced.poi_id.to_string() }),
))
.await;
// Assert
assert_eq!(ack, CommandAck::Ok);
let calls = h.scan.calls.lock().unwrap();
assert_eq!(calls.len(), 1, "scan_router::route called exactly once");
assert!(matches!(calls[0].kind, OperatorCommandKind::ConfirmPoi));
}
/// AZ-680 AC-2 — Re-transmit returns cached ack.
#[tokio::test]
async fn az680_ac2_retransmit_returns_cached_ack() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
let surfaced = surfaced(120);
h.bridge.surfaced_registry().record(surfaced.clone());
let command = cmd(
OperatorCommandKind::ConfirmPoi,
json!({ "poi_id": surfaced.poi_id.to_string() }),
);
// Act — same command_id dispatched twice
let ack1 = handle.dispatch_command(command.clone()).await;
let ack2 = handle.dispatch_command(command.clone()).await;
// Assert
assert_eq!(ack1, CommandAck::Ok);
assert_eq!(ack2, CommandAck::Ok);
let calls = h.scan.calls.lock().unwrap();
assert_eq!(
calls.len(),
1,
"scan_router::route must be invoked exactly once across retransmits"
);
}
/// AZ-680 AC-3 — Unknown POI id rejected.
#[tokio::test]
async fn az680_ac3_unknown_poi_id_rejected() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
// Act — POI id never surfaced
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::ConfirmPoi,
json!({ "poi_id": Uuid::new_v4().to_string() }),
))
.await;
// Assert
assert_eq!(ack.reason(), Some(ack_reasons::UNKNOWN_POI_ID));
assert!(
h.scan.calls.lock().unwrap().is_empty(),
"scan_router must not be invoked"
);
}
/// AZ-680 AC-4 — Expired POI rejected.
#[tokio::test]
async fn az680_ac4_expired_poi_rejected() {
// Arrange — surface a POI whose deadline has already passed.
let h = harness();
let handle = h.bridge.handle();
let expired = SurfacedPoi {
deadline: Utc::now() - ChronoDuration::seconds(1),
..surfaced(0)
};
h.bridge.surfaced_registry().record(expired.clone());
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::ConfirmPoi,
json!({ "poi_id": expired.poi_id.to_string() }),
))
.await;
// Assert
assert_eq!(ack.reason(), Some(ack_reasons::EXPIRED));
assert!(
h.scan.calls.lock().unwrap().is_empty(),
"scan_router must not be invoked on expired POI"
);
}
/// AZ-680 AC-5 — Decline appends IgnoredItem via scan_controller.
#[tokio::test]
async fn az680_ac5_decline_forwards_to_scan_router() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
let surfaced = surfaced(120);
h.bridge.surfaced_registry().record(surfaced.clone());
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::DeclinePoi,
json!({ "poi_id": surfaced.poi_id.to_string() }),
))
.await;
// Assert
assert_eq!(ack, CommandAck::Ok);
let calls = h.scan.calls.lock().unwrap();
assert_eq!(
calls.len(),
1,
"DeclinePoi must reach scan_router exactly once"
);
assert!(matches!(calls[0].kind, OperatorCommandKind::DeclinePoi));
}
// ============================================================================
// AZ-681 ACs
// ============================================================================
/// AZ-681 AC-1 — BIT-DEGRADED ack succeeds.
#[tokio::test]
async fn az681_ac1_bit_degraded_ack_forwards() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
let report_id = Uuid::new_v4();
h.severity.set(report_id, true);
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::AcknowledgeBitDegraded,
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
))
.await;
// Assert
assert_eq!(ack, CommandAck::Ok);
let acks = h.safety.bit_acks.lock().unwrap();
assert_eq!(acks.len(), 1);
assert_eq!(acks[0], (report_id, Some("op1".to_string())));
}
/// AZ-681 AC-2 — BIT-FAIL ack rejected.
#[tokio::test]
async fn az681_ac2_bit_fail_ack_rejected() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
let report_id = Uuid::new_v4();
h.severity.set(report_id, false);
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::AcknowledgeBitDegraded,
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
))
.await;
// Assert
assert_eq!(ack.reason(), Some(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL));
assert!(
h.safety.bit_acks.lock().unwrap().is_empty(),
"safety_router must not be invoked on Fail report"
);
}
/// AZ-681 AC-3 — Safety-override forwards with scope + duration, and
/// an audit entry is written.
#[tokio::test]
async fn az681_ac3_safety_override_forwards_with_audit_entry() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
// Act
let ack = handle
.dispatch_command(cmd(
OperatorCommandKind::SafetyOverride,
json!({
"scope": "battery_rtl",
"duration_secs": 60,
"operator_id": "op1",
"rationale": "post-mission RTL too aggressive"
}),
))
.await;
// Assert — router invoked with the right scope + duration.
assert_eq!(ack, CommandAck::Ok);
let overrides = h.safety.overrides.lock().unwrap();
assert_eq!(overrides.len(), 1);
assert_eq!(overrides[0].0, SafetyOverrideScope::BatteryRtl);
assert_eq!(overrides[0].1, 60);
assert_eq!(overrides[0].2, "op1");
// Assert — audit log has exactly one safety-override entry.
let entries = h.audit.entries.lock();
let safety_entries: Vec<_> = entries
.iter()
.filter(|e| matches!(e, AuditEntry::SafetyOverride { .. }))
.collect();
assert_eq!(safety_entries.len(), 1);
match safety_entries[0] {
AuditEntry::SafetyOverride {
scope,
duration_secs,
operator_id,
outcome,
..
} => {
assert_eq!(*scope, SafetyOverrideScope::BatteryRtl);
assert_eq!(*duration_secs, 60);
assert_eq!(operator_id.as_deref(), Some("op1"));
assert_eq!(outcome, &CommandAck::Ok);
}
_ => unreachable!(),
}
}
/// AZ-681 AC-4 — Audit log redacts secrets.
#[tokio::test]
async fn az681_ac4_audit_log_contains_no_signature_or_session_token() {
// Arrange
let h = harness();
let handle = h.bridge.handle();
// Act
let _ = handle
.dispatch_command(cmd(
OperatorCommandKind::SafetyOverride,
json!({
"scope": "battery_rtl",
"duration_secs": 30,
"operator_id": "op1",
"rationale": "test"
}),
))
.await;
// Assert — every audit entry serialised to JSON must omit
// `signature` and `session_token`.
let entries = h.audit.entries.lock();
assert!(!entries.is_empty());
for entry in entries.iter() {
let json = serde_json::to_string(entry).expect("serialises");
assert!(
!json.contains("signature"),
"audit entry leaked signature: {json}"
);
assert!(
!json.contains("session_token"),
"audit entry leaked session_token: {json}"
);
}
}