mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 12:11:10 +00:00
c4eff40dbc
Add the operator-command dispatcher behind a typed CommandAck: 60 s per-command-id idempotency cache, surfaced-POI registry with unknown_poi_id + expired gates, BIT-degraded ack severity check, and SafetyOverride forwarding to mission_executor with structured audit log (redacts signature + session_token). Cross-layer wiring goes through three new traits in shared::contracts (ScanCommandRouter, MissionSafetyRouter, BitReportSeverityLookup) so operator_bridge stays free of direct scan_controller / mission_executor imports. scan_controller::ScanControllerHandle implements the scan router; a new mission_executor::SafetyDispatchHandle wraps the BIT ack channel + battery monitor handle and implements the safety router; BitControllerHandle gains a bounded (16-entry) report-severity cache for the lookup trait. scan_controller also picks up ConfirmPoi handling: PoiQueue::confirm removes the entry and SubmitOutcome::Confirmed carries the typed (target_mgrs, target_class) hint for AZ-684/AZ-686 downstream. Tests: 9 new integration tests in operator_bridge/tests/dispatcher.rs cover AZ-680 AC-1..AC-5 + AZ-681 AC-1..AC-4. scan_controller adds 2 ConfirmPoi tests. All modified-crate suites green; one pre-existing mission_executor state-machine test flake (already documented in _docs/_process_leftovers) updated to note ac1 also affected. Co-authored-by: Cursor <cursoragent@cursor.com>
440 lines
13 KiB
Rust
440 lines
13 KiB
Rust
//! AZ-680 + AZ-681 — operator-command dispatcher acceptance tests.
|
|
//!
|
|
//! These tests exercise the dispatcher through the public
|
|
//! `OperatorBridgeHandle::dispatch_command` surface so the wiring
|
|
//! between the surfaced-POI registry, the idempotency cache, the
|
|
//! scan router, the safety router, the BIT severity lookup, and the
|
|
//! audit sink is covered end-to-end.
|
|
|
|
use std::sync::Arc;
|
|
use std::sync::Mutex as StdMutex;
|
|
|
|
use async_trait::async_trait;
|
|
use chrono::{Duration as ChronoDuration, Utc};
|
|
use parking_lot::Mutex;
|
|
use serde_json::json;
|
|
use uuid::Uuid;
|
|
|
|
use operator_bridge::{
|
|
ack_reasons, AuditEntry, AuditSink, CommandAck, OperatorBridge, SurfacedPoi,
|
|
};
|
|
use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter};
|
|
use shared::error::Result;
|
|
use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope};
|
|
|
|
// ============================================================================
|
|
// Test doubles
|
|
// ============================================================================
|
|
|
|
#[derive(Default)]
|
|
struct RecordingScanRouter {
|
|
calls: StdMutex<Vec<OperatorCommand>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ScanCommandRouter for RecordingScanRouter {
|
|
async fn route(&self, command: OperatorCommand) -> Result<()> {
|
|
self.calls.lock().unwrap().push(command);
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct RecordingSafetyRouter {
|
|
bit_acks: StdMutex<Vec<(Uuid, Option<String>)>>,
|
|
overrides: StdMutex<Vec<(SafetyOverrideScope, u32, String, String)>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl MissionSafetyRouter for RecordingSafetyRouter {
|
|
async fn acknowledge_bit_degraded(
|
|
&self,
|
|
report_id: Uuid,
|
|
operator_id: Option<String>,
|
|
) -> Result<()> {
|
|
self.bit_acks.lock().unwrap().push((report_id, operator_id));
|
|
Ok(())
|
|
}
|
|
|
|
async fn apply_safety_override(
|
|
&self,
|
|
scope: SafetyOverrideScope,
|
|
duration_secs: u32,
|
|
operator_id: String,
|
|
rationale: String,
|
|
) -> Result<()> {
|
|
self.overrides
|
|
.lock()
|
|
.unwrap()
|
|
.push((scope, duration_secs, operator_id, rationale));
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Severity lookup that returns whatever is registered for each id.
|
|
/// `Some(true)` for acknowledgeable (Degraded), `Some(false)` for
|
|
/// Fail, `None` for unknown.
|
|
#[derive(Default)]
|
|
struct StubBitSeverity {
|
|
inner: StdMutex<std::collections::HashMap<Uuid, bool>>,
|
|
}
|
|
|
|
impl StubBitSeverity {
|
|
fn set(&self, report_id: Uuid, acknowledgeable: bool) {
|
|
self.inner
|
|
.lock()
|
|
.unwrap()
|
|
.insert(report_id, acknowledgeable);
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl BitReportSeverityLookup for StubBitSeverity {
|
|
async fn is_acknowledgeable(&self, report_id: Uuid) -> Option<bool> {
|
|
self.inner.lock().unwrap().get(&report_id).copied()
|
|
}
|
|
}
|
|
|
|
#[derive(Default, Clone)]
|
|
struct RecordingAuditSink {
|
|
entries: Arc<Mutex<Vec<AuditEntry>>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl AuditSink for RecordingAuditSink {
|
|
async fn record(&self, entry: AuditEntry) {
|
|
self.entries.lock().push(entry);
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Helpers
|
|
// ============================================================================
|
|
|
|
fn cmd(kind: OperatorCommandKind, payload: serde_json::Value) -> OperatorCommand {
|
|
OperatorCommand {
|
|
command_id: Uuid::new_v4(),
|
|
session_token: "session".to_string(),
|
|
sequence_number: 1,
|
|
issued_at_wallclock: Utc::now(),
|
|
kind,
|
|
payload,
|
|
signature: vec![],
|
|
}
|
|
}
|
|
|
|
fn surfaced(deadline_secs: i64) -> SurfacedPoi {
|
|
SurfacedPoi {
|
|
poi_id: Uuid::new_v4(),
|
|
mgrs: "33UWP05".into(),
|
|
class_group: "vehicle".into(),
|
|
deadline: Utc::now() + ChronoDuration::seconds(deadline_secs),
|
|
}
|
|
}
|
|
|
|
struct Harness {
|
|
bridge: OperatorBridge,
|
|
scan: Arc<RecordingScanRouter>,
|
|
safety: Arc<RecordingSafetyRouter>,
|
|
severity: Arc<StubBitSeverity>,
|
|
audit: RecordingAuditSink,
|
|
}
|
|
|
|
fn harness() -> Harness {
|
|
let scan = Arc::new(RecordingScanRouter::default());
|
|
let safety = Arc::new(RecordingSafetyRouter::default());
|
|
let severity = Arc::new(StubBitSeverity::default());
|
|
let audit = RecordingAuditSink::default();
|
|
let bridge = OperatorBridge::new(8)
|
|
.with_scan_router(scan.clone() as Arc<dyn ScanCommandRouter>)
|
|
.with_safety_router(safety.clone() as Arc<dyn MissionSafetyRouter>)
|
|
.with_bit_severity_lookup(severity.clone() as Arc<dyn BitReportSeverityLookup>)
|
|
.with_audit_sink(Arc::new(audit.clone()) as Arc<dyn AuditSink>)
|
|
.with_dispatcher();
|
|
Harness {
|
|
bridge,
|
|
scan,
|
|
safety,
|
|
severity,
|
|
audit,
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// AZ-680 ACs
|
|
// ============================================================================
|
|
|
|
/// AZ-680 AC-1 — Confirm forwards target hint.
|
|
#[tokio::test]
|
|
async fn az680_ac1_confirm_forwards_to_scan_router() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let surfaced = surfaced(120);
|
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::ConfirmPoi,
|
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack, CommandAck::Ok);
|
|
let calls = h.scan.calls.lock().unwrap();
|
|
assert_eq!(calls.len(), 1, "scan_router::route called exactly once");
|
|
assert!(matches!(calls[0].kind, OperatorCommandKind::ConfirmPoi));
|
|
}
|
|
|
|
/// AZ-680 AC-2 — Re-transmit returns cached ack.
|
|
#[tokio::test]
|
|
async fn az680_ac2_retransmit_returns_cached_ack() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let surfaced = surfaced(120);
|
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
|
let command = cmd(
|
|
OperatorCommandKind::ConfirmPoi,
|
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
|
);
|
|
|
|
// Act — same command_id dispatched twice
|
|
let ack1 = handle.dispatch_command(command.clone()).await;
|
|
let ack2 = handle.dispatch_command(command.clone()).await;
|
|
|
|
// Assert
|
|
assert_eq!(ack1, CommandAck::Ok);
|
|
assert_eq!(ack2, CommandAck::Ok);
|
|
let calls = h.scan.calls.lock().unwrap();
|
|
assert_eq!(
|
|
calls.len(),
|
|
1,
|
|
"scan_router::route must be invoked exactly once across retransmits"
|
|
);
|
|
}
|
|
|
|
/// AZ-680 AC-3 — Unknown POI id rejected.
|
|
#[tokio::test]
|
|
async fn az680_ac3_unknown_poi_id_rejected() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
|
|
// Act — POI id never surfaced
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::ConfirmPoi,
|
|
json!({ "poi_id": Uuid::new_v4().to_string() }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack.reason(), Some(ack_reasons::UNKNOWN_POI_ID));
|
|
assert!(
|
|
h.scan.calls.lock().unwrap().is_empty(),
|
|
"scan_router must not be invoked"
|
|
);
|
|
}
|
|
|
|
/// AZ-680 AC-4 — Expired POI rejected.
|
|
#[tokio::test]
|
|
async fn az680_ac4_expired_poi_rejected() {
|
|
// Arrange — surface a POI whose deadline has already passed.
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let expired = SurfacedPoi {
|
|
deadline: Utc::now() - ChronoDuration::seconds(1),
|
|
..surfaced(0)
|
|
};
|
|
h.bridge.surfaced_registry().record(expired.clone());
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::ConfirmPoi,
|
|
json!({ "poi_id": expired.poi_id.to_string() }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack.reason(), Some(ack_reasons::EXPIRED));
|
|
assert!(
|
|
h.scan.calls.lock().unwrap().is_empty(),
|
|
"scan_router must not be invoked on expired POI"
|
|
);
|
|
}
|
|
|
|
/// AZ-680 AC-5 — Decline appends IgnoredItem via scan_controller.
|
|
#[tokio::test]
|
|
async fn az680_ac5_decline_forwards_to_scan_router() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let surfaced = surfaced(120);
|
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::DeclinePoi,
|
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack, CommandAck::Ok);
|
|
let calls = h.scan.calls.lock().unwrap();
|
|
assert_eq!(
|
|
calls.len(),
|
|
1,
|
|
"DeclinePoi must reach scan_router exactly once"
|
|
);
|
|
assert!(matches!(calls[0].kind, OperatorCommandKind::DeclinePoi));
|
|
}
|
|
|
|
// ============================================================================
|
|
// AZ-681 ACs
|
|
// ============================================================================
|
|
|
|
/// AZ-681 AC-1 — BIT-DEGRADED ack succeeds.
|
|
#[tokio::test]
|
|
async fn az681_ac1_bit_degraded_ack_forwards() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let report_id = Uuid::new_v4();
|
|
h.severity.set(report_id, true);
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::AcknowledgeBitDegraded,
|
|
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack, CommandAck::Ok);
|
|
let acks = h.safety.bit_acks.lock().unwrap();
|
|
assert_eq!(acks.len(), 1);
|
|
assert_eq!(acks[0], (report_id, Some("op1".to_string())));
|
|
}
|
|
|
|
/// AZ-681 AC-2 — BIT-FAIL ack rejected.
|
|
#[tokio::test]
|
|
async fn az681_ac2_bit_fail_ack_rejected() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
let report_id = Uuid::new_v4();
|
|
h.severity.set(report_id, false);
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::AcknowledgeBitDegraded,
|
|
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
|
|
))
|
|
.await;
|
|
|
|
// Assert
|
|
assert_eq!(ack.reason(), Some(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL));
|
|
assert!(
|
|
h.safety.bit_acks.lock().unwrap().is_empty(),
|
|
"safety_router must not be invoked on Fail report"
|
|
);
|
|
}
|
|
|
|
/// AZ-681 AC-3 — Safety-override forwards with scope + duration, and
|
|
/// an audit entry is written.
|
|
#[tokio::test]
|
|
async fn az681_ac3_safety_override_forwards_with_audit_entry() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
|
|
// Act
|
|
let ack = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::SafetyOverride,
|
|
json!({
|
|
"scope": "battery_rtl",
|
|
"duration_secs": 60,
|
|
"operator_id": "op1",
|
|
"rationale": "post-mission RTL too aggressive"
|
|
}),
|
|
))
|
|
.await;
|
|
|
|
// Assert — router invoked with the right scope + duration.
|
|
assert_eq!(ack, CommandAck::Ok);
|
|
let overrides = h.safety.overrides.lock().unwrap();
|
|
assert_eq!(overrides.len(), 1);
|
|
assert_eq!(overrides[0].0, SafetyOverrideScope::BatteryRtl);
|
|
assert_eq!(overrides[0].1, 60);
|
|
assert_eq!(overrides[0].2, "op1");
|
|
|
|
// Assert — audit log has exactly one safety-override entry.
|
|
let entries = h.audit.entries.lock();
|
|
let safety_entries: Vec<_> = entries
|
|
.iter()
|
|
.filter(|e| matches!(e, AuditEntry::SafetyOverride { .. }))
|
|
.collect();
|
|
assert_eq!(safety_entries.len(), 1);
|
|
match safety_entries[0] {
|
|
AuditEntry::SafetyOverride {
|
|
scope,
|
|
duration_secs,
|
|
operator_id,
|
|
outcome,
|
|
..
|
|
} => {
|
|
assert_eq!(*scope, SafetyOverrideScope::BatteryRtl);
|
|
assert_eq!(*duration_secs, 60);
|
|
assert_eq!(operator_id.as_deref(), Some("op1"));
|
|
assert_eq!(outcome, &CommandAck::Ok);
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
|
|
/// AZ-681 AC-4 — Audit log redacts secrets.
|
|
#[tokio::test]
|
|
async fn az681_ac4_audit_log_contains_no_signature_or_session_token() {
|
|
// Arrange
|
|
let h = harness();
|
|
let handle = h.bridge.handle();
|
|
|
|
// Act
|
|
let _ = handle
|
|
.dispatch_command(cmd(
|
|
OperatorCommandKind::SafetyOverride,
|
|
json!({
|
|
"scope": "battery_rtl",
|
|
"duration_secs": 30,
|
|
"operator_id": "op1",
|
|
"rationale": "test"
|
|
}),
|
|
))
|
|
.await;
|
|
|
|
// Assert — every audit entry serialised to JSON must omit
|
|
// `signature` and `session_token`.
|
|
let entries = h.audit.entries.lock();
|
|
assert!(!entries.is_empty());
|
|
for entry in entries.iter() {
|
|
let json = serde_json::to_string(entry).expect("serialises");
|
|
assert!(
|
|
!json.contains("signature"),
|
|
"audit entry leaked signature: {json}"
|
|
);
|
|
assert!(
|
|
!json.contains("session_token"),
|
|
"audit entry leaked session_token: {json}"
|
|
);
|
|
}
|
|
}
|