mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 20:01:09 +00:00
[AZ-680] [AZ-681] operator_bridge command dispatch + safety lane
Add the operator-command dispatcher behind a typed CommandAck: 60 s per-command-id idempotency cache, surfaced-POI registry with unknown_poi_id + expired gates, BIT-degraded ack severity check, and SafetyOverride forwarding to mission_executor with structured audit log (redacts signature + session_token). Cross-layer wiring goes through three new traits in shared::contracts (ScanCommandRouter, MissionSafetyRouter, BitReportSeverityLookup) so operator_bridge stays free of direct scan_controller / mission_executor imports. scan_controller::ScanControllerHandle implements the scan router; a new mission_executor::SafetyDispatchHandle wraps the BIT ack channel + battery monitor handle and implements the safety router; BitControllerHandle gains a bounded (16-entry) report-severity cache for the lookup trait. scan_controller also picks up ConfirmPoi handling: PoiQueue::confirm removes the entry and SubmitOutcome::Confirmed carries the typed (target_mgrs, target_class) hint for AZ-684/AZ-686 downstream. Tests: 9 new integration tests in operator_bridge/tests/dispatcher.rs cover AZ-680 AC-1..AC-5 + AZ-681 AC-1..AC-4. scan_controller adds 2 ConfirmPoi tests. All modified-crate suites green; one pre-existing mission_executor state-machine test flake (already documented in _docs/_process_leftovers) updated to note ac1 also affected. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Generated
+1
@@ -2244,6 +2244,7 @@ checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
|
|||||||
name = "scan_controller"
|
name = "scan_controller"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
"chrono",
|
"chrono",
|
||||||
"gimbal_controller",
|
"gimbal_controller",
|
||||||
"mapobjects_store",
|
"mapobjects_store",
|
||||||
|
|||||||
@@ -0,0 +1,89 @@
|
|||||||
|
# Batch Report
|
||||||
|
|
||||||
|
**Batch**: 17
|
||||||
|
**Cycle**: 1
|
||||||
|
**Tasks**: AZ-680, AZ-681
|
||||||
|
**Date**: 2026-05-20
|
||||||
|
|
||||||
|
## Task Results
|
||||||
|
|
||||||
|
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|
||||||
|
|------|--------|---------------|-------|-------------|--------|
|
||||||
|
| AZ-680_operator_bridge_command_dispatch | Done | 14 files | scan_controller: 8 (2 new); operator_bridge: 20 lib + 9 integration; mission_executor: 35 lib | 5/5 ACs covered | None |
|
||||||
|
| AZ-681_operator_bridge_safety_and_bit_ack | Done | shared with AZ-680 | (counted above; 4 new integration tests cover AZ-681 ACs) | 4/4 ACs covered | None |
|
||||||
|
|
||||||
|
## AC Coverage map — AZ-680
|
||||||
|
|
||||||
|
| AC | Test | File | Notes |
|
||||||
|
|----|------|------|-------|
|
||||||
|
| AC-1 Confirm forwards target hint | `az680_ac1_confirm_forwards_to_scan_router` | `crates/operator_bridge/tests/dispatcher.rs` | Records POI in registry, dispatches `ConfirmPoi`, asserts `scan_router.route` invoked exactly once with the original command |
|
||||||
|
| AC-2 Re-transmit returns cached ack | `az680_ac2_retransmit_returns_cached_ack` | same file | Same `command_id` dispatched twice; second call returns `Ok` without re-invoking router (60 s `IdempotencyCache`) |
|
||||||
|
| AC-3 Unknown POI id rejected | `az680_ac3_unknown_poi_id_rejected` | same file | Asserts `CommandAck::Error { reason: "unknown_poi_id" }` and router never invoked |
|
||||||
|
| AC-4 Expired POI rejected | `az680_ac4_expired_poi_rejected` | same file | Pre-seeds a surfaced POI with past `deadline`; asserts `expired` ack and router not invoked |
|
||||||
|
| AC-5 Decline appends IgnoredItem via scan_controller | `az680_ac5_decline_forwards_to_scan_router` | same file | DeclinePoi dispatches into `scan_router.route` exactly once; ack `Ok` |
|
||||||
|
|
||||||
|
Plus scan_controller native coverage of the `ConfirmPoi` path (queue-side resolution): `confirm_poi_via_operator_command_emits_action` + `confirm_poi_unknown_id_is_validation_error` in `crates/scan_controller/tests/poi_queue.rs`.
|
||||||
|
|
||||||
|
## AC Coverage map — AZ-681
|
||||||
|
|
||||||
|
| AC | Test | File | Notes |
|
||||||
|
|----|------|------|-------|
|
||||||
|
| AC-1 BIT-DEGRADED ack succeeds | `az681_ac1_bit_degraded_ack_forwards` | `crates/operator_bridge/tests/dispatcher.rs` | Severity lookup returns `Some(true)`; safety_router.acknowledge_bit_degraded invoked exactly once with the report_id + operator_id |
|
||||||
|
| AC-2 BIT-FAIL ack rejected | `az681_ac2_bit_fail_ack_rejected` | same file | Severity lookup returns `Some(false)`; ack returns `cannot_acknowledge_fail`; safety_router not invoked |
|
||||||
|
| AC-3 Safety-override forwards with scope + duration | `az681_ac3_safety_override_forwards_with_audit_entry` | same file | SafetyOverride { BatteryRtl, 60s } dispatched; safety_router.apply_safety_override called once with the exact scope/duration; audit log contains exactly one matching `SafetyOverride` entry with `outcome: Ok` |
|
||||||
|
| AC-4 Audit log redacts secrets | `az681_ac4_audit_log_contains_no_signature_or_session_token` | same file | Every audit entry serialised to JSON; asserts no `signature` and no `session_token` substring. Lock-in: `AuditEntry` enum has no fields that could leak either secret |
|
||||||
|
|
||||||
|
## AC Test Coverage: All covered (9/9 across both tasks)
|
||||||
|
## Code Review Verdict: PASS (self-review — see findings below)
|
||||||
|
## Auto-Fix Attempts: 0
|
||||||
|
## Stuck Agents: None
|
||||||
|
|
||||||
|
## Files modified
|
||||||
|
|
||||||
|
```
|
||||||
|
M crates/shared/src/models/operator.rs (+SafetyOverrideScope)
|
||||||
|
M crates/shared/src/contracts/mod.rs (+ScanCommandRouter +MissionSafetyRouter +BitReportSeverityLookup)
|
||||||
|
M crates/scan_controller/Cargo.toml (+async-trait)
|
||||||
|
M crates/scan_controller/src/lib.rs (confirm_poi + ScanCommandRouter impl + SubmitOutcome::Confirmed)
|
||||||
|
M crates/scan_controller/src/internal/poi_queue/mod.rs (+ConfirmAction + PoiQueue::confirm)
|
||||||
|
M crates/scan_controller/tests/poi_queue.rs (+2 tests: confirm path; replaced exhaustive match with catch-all to handle new variant)
|
||||||
|
M crates/mission_executor/src/lib.rs (+pub use SafetyDispatchHandle)
|
||||||
|
M crates/mission_executor/src/internal/mod.rs (+safety_dispatch module)
|
||||||
|
A crates/mission_executor/src/internal/safety_dispatch.rs (NEW: MissionSafetyRouter impl)
|
||||||
|
M crates/mission_executor/src/internal/bit.rs (+bounded report_overalls FIFO; +report_overall + BitReportSeverityLookup impl on BitControllerHandle)
|
||||||
|
M crates/operator_bridge/src/lib.rs (registry+dispatcher wiring; with_scan_router/safety_router/bit_severity_lookup/audit_sink/dispatcher; dispatch_command; OperatorCommandSink impl now real; registry forget/record on dequeue/surface)
|
||||||
|
M crates/operator_bridge/src/internal/mod.rs (+audit +dispatcher +idempotency +poi_registry)
|
||||||
|
A crates/operator_bridge/src/ack.rs (NEW: CommandAck + ack_reasons)
|
||||||
|
A crates/operator_bridge/src/internal/audit.rs (NEW: AuditEntry / AuditSink / TracingAuditSink)
|
||||||
|
A crates/operator_bridge/src/internal/dispatcher.rs (NEW: OperatorCommandDispatcher + Builder)
|
||||||
|
A crates/operator_bridge/src/internal/idempotency.rs (NEW: IdempotencyCache 60s TTL)
|
||||||
|
A crates/operator_bridge/src/internal/poi_registry.rs (NEW: SurfacedPoi + SurfacedPoiRegistry)
|
||||||
|
A crates/operator_bridge/tests/dispatcher.rs (NEW: 9 integration tests)
|
||||||
|
M _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md (note: ac1 also flakes)
|
||||||
|
R _docs/02_tasks/todo/AZ-680_operator_bridge_command_dispatch.md → done/...
|
||||||
|
R _docs/02_tasks/todo/AZ-681_operator_bridge_safety_and_bit_ack.md → done/...
|
||||||
|
```
|
||||||
|
|
||||||
|
## Architecture notes
|
||||||
|
|
||||||
|
- The cross-component dispatch shape is now: `operator_bridge` (Layer 3) → `ScanCommandRouter` / `MissionSafetyRouter` / `BitReportSeverityLookup` traits in `shared::contracts` (Layer 1) → concrete impls on `ScanControllerHandle` and on the new `SafetyDispatchHandle` (constructed at the composition root from `BitController::ack_tx` + `BatteryMonitorHandle`).
|
||||||
|
- `BitControllerHandle` now retains a bounded FIFO of the last 16 `(report_id, overall)` pairs so `is_acknowledgeable` can answer for any report id observed in the current pre-flight gate cycle. Beyond that horizon, the dispatcher rejects with `unknown_bit_report` rather than guessing.
|
||||||
|
- `SafetyOverrideScope` is `#[non_exhaustive]` so future variants (`LinkLost`, `Geofence`) extend without breaking downstream matchers. `SafetyDispatchHandle::apply_safety_override` returns a typed Validation error on any unwired scope, so adding a variant to the enum without wiring the executor side fails closed.
|
||||||
|
- The audit log is a structured `tracing::info!` per entry by default (`TracingAuditSink`). The `AuditSink` trait keeps the door open for a file-based persistent sink later; integration tests substitute a recording sink.
|
||||||
|
- Idempotency cache TTL: 60 s per the task spec. Lazy eviction on each lookup/insert keeps the cache small without a background sweeper.
|
||||||
|
|
||||||
|
## Quality gates
|
||||||
|
|
||||||
|
- `cargo fmt --all`: clean
|
||||||
|
- `cargo clippy -p shared -p scan_controller -p mission_executor -p operator_bridge --all-targets -- -D warnings`: clean
|
||||||
|
- `cargo clippy --workspace --all-targets -- -D warnings`: pre-existing `Runtime::vlm_provider_name` dead-code lint (out-of-scope; tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`)
|
||||||
|
- `cargo test -p shared -p scan_controller -p operator_bridge -p mission_executor`: all green
|
||||||
|
- `cargo test --workspace`: one pre-existing flake — `mission_executor::ac1_multirotor_happy_path_reaches_done` (same `await_state` polling race as the documented `ac3` flake; passes on retry; leftover updated)
|
||||||
|
|
||||||
|
## Suggested next batch
|
||||||
|
|
||||||
|
From `_docs/02_tasks/_dependencies_table.md`, ready tasks after this batch:
|
||||||
|
|
||||||
|
- `AZ-659_frame_ingest_publisher` (3pt, no new deps) — was eligible for this batch but excluded for cohesion
|
||||||
|
- `AZ-682_scan_controller_state_machine_skeleton` follow-ups (AZ-684 evidence ladder) once `scan_controller` confirm path lands the FSM-side follow-through
|
||||||
|
- `AZ-685_mapobjects_store_ignored_items` (consumes the `DeclineAction` payload AZ-680 now produces end-to-end)
|
||||||
@@ -7,24 +7,23 @@ name: Implement
|
|||||||
status: between-batches
|
status: between-batches
|
||||||
sub_step:
|
sub_step:
|
||||||
phase: 0
|
phase: 0
|
||||||
name: batch-17-select
|
name: batch-18-select
|
||||||
detail: ""
|
detail: ""
|
||||||
retry_count: 0
|
retry_count: 0
|
||||||
cycle: 1
|
cycle: 1
|
||||||
tracker: jira
|
tracker: jira
|
||||||
|
|
||||||
## Last Completed Batch
|
## Last Completed Batch
|
||||||
batch: 16
|
batch: 17
|
||||||
commit: 251ebed
|
commit: ec494b3
|
||||||
ticket: AZ-658
|
ticket: AZ-680, AZ-681
|
||||||
jira_status: In Testing (confirmed via read-back)
|
jira_status: In Testing (confirmed via read-back for both)
|
||||||
pushed_to: origin/dev
|
report: _docs/03_implementation/batch_17_cycle1_report.md
|
||||||
report: _docs/03_implementation/batch_16_cycle1_report.md
|
|
||||||
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
|
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
|
||||||
|
|
||||||
## Process Leftovers
|
## Process Leftovers
|
||||||
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — out-of-scope for batch 16
|
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 17
|
||||||
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — out-of-scope for batch 16
|
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; flake also hit `ac1` during batch 17 workspace run
|
||||||
|
|
||||||
## Cumulative Review Cadence
|
## Cumulative Review Cadence
|
||||||
Last cumulative: batches 13–15. Next due: end of batch 18.
|
Last cumulative: batches 13–15. Next due: end of batch 18.
|
||||||
|
|||||||
@@ -1,7 +1,11 @@
|
|||||||
# Leftover: `mission_executor::ac3_bounded_retry_then_success` polling race
|
# Leftover: `mission_executor` state-machine polling race
|
||||||
|
|
||||||
**Timestamp**: 2026-05-20T08:30:00+02:00
|
**Timestamp**: 2026-05-20T17:08:00+03:00 (originally 2026-05-20T08:30:00+02:00)
|
||||||
**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13 as intermittent. Reproduces more reliably on dev box under batch 14 workspace test load (the new tonic stack increases build/runtime pressure).
|
**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13, 17 as intermittent. Reproduces more reliably on dev box under workspace test load.
|
||||||
|
|
||||||
|
**Affected tests**:
|
||||||
|
- `ac3_bounded_retry_then_success` (original)
|
||||||
|
- `ac1_multirotor_happy_path_reaches_done` (batch 17 — same `await_state` polling race in the same file)
|
||||||
**Severity**: Medium (test design, not production code)
|
**Severity**: Medium (test design, not production code)
|
||||||
**Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race.
|
**Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race.
|
||||||
|
|
||||||
|
|||||||
@@ -33,16 +33,27 @@
|
|||||||
//! subsequent `Degraded` / `Fail` flips it back to `false` and the
|
//! subsequent `Degraded` / `Fail` flips it back to `false` and the
|
||||||
//! FSM's `bit_ok` guard fails closed.
|
//! FSM's `bit_ok` guard fails closed.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use shared::contracts::BitReportSeverityLookup;
|
||||||
use tokio::sync::{broadcast, mpsc, watch, Mutex};
|
use tokio::sync::{broadcast, mpsc, watch, Mutex};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// AZ-681 — bounded FIFO cap for the per-report `BitOverall` cache
|
||||||
|
/// queried by [`BitControllerHandle::is_acknowledgeable`]. BIT is a
|
||||||
|
/// pre-flight gate that goes sticky-Pass after success, so the
|
||||||
|
/// number of distinct report ids generated in one flight is small
|
||||||
|
/// (one per evaluation cycle until Pass / Failed). 16 is generous
|
||||||
|
/// without unbounded growth.
|
||||||
|
const REPORT_OVERALL_CAP: usize = 16;
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Public surface — types
|
// Public surface — types
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -236,6 +247,7 @@ impl BitController {
|
|||||||
state: BitState::Idle,
|
state: BitState::Idle,
|
||||||
last_report: None,
|
last_report: None,
|
||||||
sticky_pass: false,
|
sticky_pass: false,
|
||||||
|
report_overalls: VecDeque::with_capacity(REPORT_OVERALL_CAP),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let handle = BitControllerHandle {
|
let handle = BitControllerHandle {
|
||||||
@@ -335,6 +347,11 @@ impl BitController {
|
|||||||
config.ack_timeout,
|
config.ack_timeout,
|
||||||
);
|
);
|
||||||
let report_clone = report.clone();
|
let report_clone = report.clone();
|
||||||
|
record_report_overall(
|
||||||
|
&mut guard.report_overalls,
|
||||||
|
report.id,
|
||||||
|
report.overall,
|
||||||
|
);
|
||||||
guard.last_report = Some(report);
|
guard.last_report = Some(report);
|
||||||
if new_state != from {
|
if new_state != from {
|
||||||
guard.state = new_state.clone();
|
guard.state = new_state.clone();
|
||||||
@@ -442,6 +459,28 @@ struct ControllerInner {
|
|||||||
/// downstream surfaces (lost-link ladder, geofence, battery —
|
/// downstream surfaces (lost-link ladder, geofence, battery —
|
||||||
/// AZ-651 / AZ-652).
|
/// AZ-651 / AZ-652).
|
||||||
sticky_pass: bool,
|
sticky_pass: bool,
|
||||||
|
/// AZ-681 — recent `(report_id, overall)` pairs for the
|
||||||
|
/// `BitReportSeverityLookup` impl. Bounded FIFO; oldest evicted
|
||||||
|
/// at [`REPORT_OVERALL_CAP`]. A `None` lookup result means the
|
||||||
|
/// id has either never been generated or has aged out.
|
||||||
|
report_overalls: VecDeque<(Uuid, BitOverall)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a `(report_id, overall)` pair onto the bounded FIFO cache.
|
||||||
|
/// Re-recording an existing id is a no-op (preserves the original
|
||||||
|
/// position so callers can't accidentally refresh aging).
|
||||||
|
fn record_report_overall(
|
||||||
|
cache: &mut VecDeque<(Uuid, BitOverall)>,
|
||||||
|
report_id: Uuid,
|
||||||
|
overall: BitOverall,
|
||||||
|
) {
|
||||||
|
if cache.iter().any(|(id, _)| *id == report_id) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if cache.len() == REPORT_OVERALL_CAP {
|
||||||
|
cache.pop_front();
|
||||||
|
}
|
||||||
|
cache.push_back((report_id, overall));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read-side handle for the BIT controller. Cloneable.
|
/// Read-side handle for the BIT controller. Cloneable.
|
||||||
@@ -475,6 +514,32 @@ impl BitControllerHandle {
|
|||||||
pub async fn last_report(&self) -> Option<BitReport> {
|
pub async fn last_report(&self) -> Option<BitReport> {
|
||||||
self.inner.lock().await.last_report.clone()
|
self.inner.lock().await.last_report.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — overall verdict for a previously-generated report.
|
||||||
|
/// Returns `None` if the id has never been generated or has aged
|
||||||
|
/// out of the bounded cache.
|
||||||
|
pub async fn report_overall(&self, report_id: Uuid) -> Option<BitOverall> {
|
||||||
|
self.inner
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.report_overalls
|
||||||
|
.iter()
|
||||||
|
.find_map(|(id, o)| (*id == report_id).then_some(*o))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — `operator_bridge` (Layer 3) consults this before
|
||||||
|
/// forwarding a BIT-degraded ack. `Fail` reports are never
|
||||||
|
/// acknowledgeable (per AZ-681 AC-2). An aged-out / never-seen id
|
||||||
|
/// returns `None` so the bridge can NACK with a typed
|
||||||
|
/// "unknown report id" reason.
|
||||||
|
#[async_trait]
|
||||||
|
impl BitReportSeverityLookup for BitControllerHandle {
|
||||||
|
async fn is_acknowledgeable(&self, report_id: Uuid) -> Option<bool> {
|
||||||
|
self.report_overall(report_id)
|
||||||
|
.await
|
||||||
|
.map(|o| !matches!(o, BitOverall::Fail))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -11,5 +11,6 @@ pub mod lost_link;
|
|||||||
pub mod middle_waypoint;
|
pub mod middle_waypoint;
|
||||||
pub mod multirotor;
|
pub mod multirotor;
|
||||||
pub mod post_flight;
|
pub mod post_flight;
|
||||||
|
pub mod safety_dispatch;
|
||||||
pub mod telemetry;
|
pub mod telemetry;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|||||||
@@ -0,0 +1,97 @@
|
|||||||
|
//! AZ-681 — concrete [`MissionSafetyRouter`] implementation owned by
|
||||||
|
//! `mission_executor` so `operator_bridge` (Layer 3) can stay free of
|
||||||
|
//! direct `mission_executor` imports.
|
||||||
|
//!
|
||||||
|
//! The composition root constructs a [`SafetyDispatchHandle`] from the
|
||||||
|
//! BIT controller's `ack` mpsc sender and the battery monitor's handle,
|
||||||
|
//! then hands an `Arc<dyn MissionSafetyRouter>` to the operator-bridge
|
||||||
|
//! builder.
|
||||||
|
//!
|
||||||
|
//! Mapping (per `architecture.md §F10`):
|
||||||
|
//!
|
||||||
|
//! - `acknowledge_bit_degraded` → push a [`BitDegradedAck`] onto the
|
||||||
|
//! BIT controller's ack channel. The controller validates the
|
||||||
|
//! `report_id` matches `AwaitingAck`; `operator_bridge` has already
|
||||||
|
//! validated the signature + checked `BitReportSeverityLookup` to
|
||||||
|
//! ensure the report is acknowledgeable (NOT `Fail`).
|
||||||
|
//! - `apply_safety_override` → translate `SafetyOverrideScope` into the
|
||||||
|
//! subsystem-specific override. Only `BatteryRtl` is supported in
|
||||||
|
//! AZ-681 (other failsafe families add their own paths later); the
|
||||||
|
//! hard-floor land-now is NEVER suppressible regardless of scope.
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::time::Instant;
|
||||||
|
|
||||||
|
use shared::contracts::MissionSafetyRouter;
|
||||||
|
use shared::error::{AutopilotError, Result};
|
||||||
|
use shared::models::operator::SafetyOverrideScope;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::internal::battery_thresholds::{BatteryMonitorHandle, BatteryOverride};
|
||||||
|
use crate::internal::bit::BitDegradedAck;
|
||||||
|
|
||||||
|
/// Concrete dispatcher for safety-critical operator commands. Owns
|
||||||
|
/// only the handles it needs; do not stuff additional concerns here.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SafetyDispatchHandle {
|
||||||
|
bit_ack_tx: mpsc::Sender<BitDegradedAck>,
|
||||||
|
battery: BatteryMonitorHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SafetyDispatchHandle {
|
||||||
|
pub fn new(bit_ack_tx: mpsc::Sender<BitDegradedAck>, battery: BatteryMonitorHandle) -> Self {
|
||||||
|
Self {
|
||||||
|
bit_ack_tx,
|
||||||
|
battery,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl MissionSafetyRouter for SafetyDispatchHandle {
|
||||||
|
async fn acknowledge_bit_degraded(
|
||||||
|
&self,
|
||||||
|
report_id: Uuid,
|
||||||
|
operator_id: Option<String>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.bit_ack_tx
|
||||||
|
.send(BitDegradedAck {
|
||||||
|
report_id,
|
||||||
|
operator_id,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| AutopilotError::Internal(format!("bit ack channel closed: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn apply_safety_override(
|
||||||
|
&self,
|
||||||
|
scope: SafetyOverrideScope,
|
||||||
|
duration_secs: u32,
|
||||||
|
operator_id: String,
|
||||||
|
rationale: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
match scope {
|
||||||
|
SafetyOverrideScope::BatteryRtl => {
|
||||||
|
let until = Instant::now() + Duration::from_secs(u64::from(duration_secs));
|
||||||
|
self.battery
|
||||||
|
.apply_override(BatteryOverride {
|
||||||
|
until,
|
||||||
|
operator_id,
|
||||||
|
rationale,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
// `SafetyOverrideScope` is `#[non_exhaustive]`; future
|
||||||
|
// variants (e.g. `LinkLost`, `Geofence`) MUST be wired
|
||||||
|
// explicitly here before they become usable. Until then,
|
||||||
|
// surface a typed Validation error so `operator_bridge`
|
||||||
|
// can NACK to the operator UI.
|
||||||
|
other => Err(AutopilotError::Validation(format!(
|
||||||
|
"safety override scope {other:?} not wired in mission_executor"
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -58,6 +58,7 @@ pub use internal::lost_link::{
|
|||||||
};
|
};
|
||||||
pub use internal::middle_waypoint::{MiddleWaypointHint, MissionRePlanner};
|
pub use internal::middle_waypoint::{MiddleWaypointHint, MissionRePlanner};
|
||||||
pub use internal::post_flight::{MapObjectsDiffSource, MapObjectsPusher, PostFlightPusher};
|
pub use internal::post_flight::{MapObjectsDiffSource, MapObjectsPusher, PostFlightPusher};
|
||||||
|
pub use internal::safety_dispatch::SafetyDispatchHandle;
|
||||||
pub use internal::telemetry::{
|
pub use internal::telemetry::{
|
||||||
Consumer, DropCountingReceiver, MavlinkProjection, TelemetryForwarder,
|
Consumer, DropCountingReceiver, MavlinkProjection, TelemetryForwarder,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -0,0 +1,54 @@
|
|||||||
|
//! AZ-680 / AZ-681 — the typed acknowledgement returned by every
|
||||||
|
//! dispatched operator command.
|
||||||
|
//!
|
||||||
|
//! The dispatcher does NOT propagate downstream errors verbatim into
|
||||||
|
//! the operator UI — the surface here is a small fixed enum so the
|
||||||
|
//! UI can colour-code the result and so the idempotency cache key
|
||||||
|
//! space stays bounded.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Stable kebab-case reason strings emitted in
|
||||||
|
/// [`CommandAck::Error::reason`]. Exposed as constants so the unit +
|
||||||
|
/// integration tests can reference them without retyping the strings
|
||||||
|
/// (drift between caller assertions and the actual emit site has bit
|
||||||
|
/// us before).
|
||||||
|
pub mod ack_reasons {
|
||||||
|
pub const UNKNOWN_POI_ID: &str = "unknown_poi_id";
|
||||||
|
pub const EXPIRED: &str = "expired";
|
||||||
|
pub const CANNOT_ACKNOWLEDGE_FAIL: &str = "cannot_acknowledge_fail";
|
||||||
|
pub const UNKNOWN_BIT_REPORT: &str = "unknown_bit_report";
|
||||||
|
pub const INVALID_PAYLOAD: &str = "invalid_payload";
|
||||||
|
pub const ROUTER_NOT_WIRED: &str = "router_not_wired";
|
||||||
|
pub const ROUTER_ERROR: &str = "router_error";
|
||||||
|
pub const UNSUPPORTED_KIND: &str = "unsupported_kind";
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Result of a dispatched operator command. Carries either `Ok` or a
|
||||||
|
/// typed `Error { reason }` whose `reason` string is one of the
|
||||||
|
/// kebab-case constants in [`ack_reasons`].
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
pub enum CommandAck {
|
||||||
|
Ok,
|
||||||
|
Error { reason: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandAck {
|
||||||
|
pub fn error(reason: &str) -> Self {
|
||||||
|
Self::Error {
|
||||||
|
reason: reason.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_ok(&self) -> bool {
|
||||||
|
matches!(self, Self::Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reason(&self) -> Option<&str> {
|
||||||
|
match self {
|
||||||
|
Self::Ok => None,
|
||||||
|
Self::Error { reason } => Some(reason.as_str()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,151 @@
|
|||||||
|
//! AZ-681 — structured audit log for safety-critical operator commands.
|
||||||
|
//!
|
||||||
|
//! Per the task spec (AC-4): every dispatched `BitDegradedAck` and
|
||||||
|
//! `SafetyOverride` writes an audit entry containing:
|
||||||
|
//!
|
||||||
|
//! - command id
|
||||||
|
//! - timestamp (UTC, ms precision)
|
||||||
|
//! - operator id (when known)
|
||||||
|
//! - scope / duration (for `SafetyOverride`) or `report_id` (for
|
||||||
|
//! `BitDegradedAck`)
|
||||||
|
//! - outcome (`Ok` / `Error { reason }`)
|
||||||
|
//!
|
||||||
|
//! Entries MUST NEVER contain the raw signature bytes or the session
|
||||||
|
//! token (AC-4). Callers pass already-redacted fields; the writer
|
||||||
|
//! has no access to the signature in the first place.
|
||||||
|
//!
|
||||||
|
//! ## Why both a sink trait + a tracing default
|
||||||
|
//!
|
||||||
|
//! - The default ([`TracingAuditSink`]) emits one structured
|
||||||
|
//! `tracing::info!` per entry — meets the spec's "file or
|
||||||
|
//! structured logger" requirement and integrates with whatever
|
||||||
|
//! tracing subscriber the composition root wires.
|
||||||
|
//! - The trait ([`AuditSink`]) lets tests substitute a recording
|
||||||
|
//! sink without piggy-backing on tracing's global subscriber
|
||||||
|
//! state (which other tests can race against). The integration
|
||||||
|
//! tests in `tests/dispatcher.rs` use the recording sink.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::Serialize;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::ack::CommandAck;
|
||||||
|
use shared::models::operator::SafetyOverrideScope;
|
||||||
|
|
||||||
|
/// One entry in the audit log. Variants map 1:1 to the AZ-681
|
||||||
|
/// command kinds.
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
pub enum AuditEntry {
|
||||||
|
BitDegradedAck {
|
||||||
|
command_id: Uuid,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
operator_id: Option<String>,
|
||||||
|
report_id: Uuid,
|
||||||
|
outcome: CommandAck,
|
||||||
|
},
|
||||||
|
SafetyOverride {
|
||||||
|
command_id: Uuid,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
operator_id: Option<String>,
|
||||||
|
scope: SafetyOverrideScope,
|
||||||
|
duration_secs: u32,
|
||||||
|
outcome: CommandAck,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sink for audit entries. Composition root injects the concrete
|
||||||
|
/// implementation; the default is [`TracingAuditSink`].
|
||||||
|
#[async_trait]
|
||||||
|
pub trait AuditSink: Send + Sync {
|
||||||
|
async fn record(&self, entry: AuditEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Default sink — emits a single `tracing::info!` per entry. The
|
||||||
|
/// structured fields are picked up by any `tracing_subscriber` JSON
|
||||||
|
/// layer the composition root configures.
|
||||||
|
pub struct TracingAuditSink;
|
||||||
|
|
||||||
|
impl TracingAuditSink {
|
||||||
|
pub fn arc() -> Arc<dyn AuditSink> {
|
||||||
|
Arc::new(Self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AuditSink for TracingAuditSink {
|
||||||
|
async fn record(&self, entry: AuditEntry) {
|
||||||
|
match &entry {
|
||||||
|
AuditEntry::BitDegradedAck {
|
||||||
|
command_id,
|
||||||
|
timestamp,
|
||||||
|
operator_id,
|
||||||
|
report_id,
|
||||||
|
outcome,
|
||||||
|
} => {
|
||||||
|
tracing::info!(
|
||||||
|
audit = "bit_degraded_ack",
|
||||||
|
command_id = %command_id,
|
||||||
|
timestamp = %timestamp.to_rfc3339(),
|
||||||
|
operator_id = operator_id.as_deref().unwrap_or(""),
|
||||||
|
report_id = %report_id,
|
||||||
|
outcome = ?outcome,
|
||||||
|
"operator_bridge audit: bit_degraded_ack"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
AuditEntry::SafetyOverride {
|
||||||
|
command_id,
|
||||||
|
timestamp,
|
||||||
|
operator_id,
|
||||||
|
scope,
|
||||||
|
duration_secs,
|
||||||
|
outcome,
|
||||||
|
} => {
|
||||||
|
tracing::info!(
|
||||||
|
audit = "safety_override",
|
||||||
|
command_id = %command_id,
|
||||||
|
timestamp = %timestamp.to_rfc3339(),
|
||||||
|
operator_id = operator_id.as_deref().unwrap_or(""),
|
||||||
|
scope = scope.label(),
|
||||||
|
duration_secs = duration_secs,
|
||||||
|
outcome = ?outcome,
|
||||||
|
"operator_bridge audit: safety_override"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// AC-4 sanity: an entry serialised to JSON contains no
|
||||||
|
/// signature/session_token field. The entry struct itself has
|
||||||
|
/// no such field, so this is a static guarantee — but we
|
||||||
|
/// assert on the JSON shape to lock the wire contract.
|
||||||
|
#[test]
|
||||||
|
fn entry_json_has_no_signature_or_session_token() {
|
||||||
|
// Arrange
|
||||||
|
let entry = AuditEntry::SafetyOverride {
|
||||||
|
command_id: Uuid::new_v4(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
operator_id: Some("op-1".into()),
|
||||||
|
scope: SafetyOverrideScope::BatteryRtl,
|
||||||
|
duration_secs: 60,
|
||||||
|
outcome: CommandAck::Ok,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let json = serde_json::to_string(&entry).expect("serialises");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert!(!json.contains("signature"));
|
||||||
|
assert!(!json.contains("session_token"));
|
||||||
|
assert!(json.contains("battery_rtl"));
|
||||||
|
assert!(json.contains("\"duration_secs\":60"));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,386 @@
|
|||||||
|
//! AZ-680 + AZ-681 — operator-command dispatcher.
|
||||||
|
//!
|
||||||
|
//! Sits between the validated-command boundary (AZ-678) and the
|
||||||
|
//! downstream routers. Responsibilities:
|
||||||
|
//!
|
||||||
|
//! - Per-`command_id` idempotency (60 s TTL — AZ-680 AC-2).
|
||||||
|
//! - POI-id validity + deadline checks for POI-bound commands
|
||||||
|
//! (AZ-680 AC-3 / AC-4).
|
||||||
|
//! - BIT-report severity gate for `AcknowledgeBitDegraded`
|
||||||
|
//! (AZ-681 AC-2).
|
||||||
|
//! - Routing — POI commands → `ScanCommandRouter`, BIT acks +
|
||||||
|
//! safety overrides → `MissionSafetyRouter`.
|
||||||
|
//! - Audit logging for every safety-critical command
|
||||||
|
//! (AZ-681 AC-3 / AC-4).
|
||||||
|
//!
|
||||||
|
//! The dispatcher OWNS the registry / cache / audit sink and is
|
||||||
|
//! constructed once by the composition root. It is cheap to clone
|
||||||
|
//! (all internals are `Arc`s).
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter};
|
||||||
|
use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope};
|
||||||
|
|
||||||
|
use crate::ack::{ack_reasons, CommandAck};
|
||||||
|
use crate::internal::audit::{AuditEntry, AuditSink, TracingAuditSink};
|
||||||
|
use crate::internal::idempotency::IdempotencyCache;
|
||||||
|
use crate::internal::poi_registry::SurfacedPoiRegistry;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct OperatorCommandDispatcher {
|
||||||
|
pub(crate) registry: SurfacedPoiRegistry,
|
||||||
|
cache: IdempotencyCache,
|
||||||
|
audit: Arc<dyn AuditSink>,
|
||||||
|
scan_router: Option<Arc<dyn ScanCommandRouter>>,
|
||||||
|
safety_router: Option<Arc<dyn MissionSafetyRouter>>,
|
||||||
|
bit_severity: Option<Arc<dyn BitReportSeverityLookup>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OperatorCommandDispatcher {
|
||||||
|
pub fn builder() -> OperatorCommandDispatcherBuilder {
|
||||||
|
OperatorCommandDispatcherBuilder::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Public test helper: peek into the idempotency cache. Used by
|
||||||
|
/// the integration tests to assert AC-2 ("re-transmit returns
|
||||||
|
/// cached ack").
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn cache_len(&self) -> usize {
|
||||||
|
self.cache.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 / AZ-681 — dispatch one validated command. Returns the
|
||||||
|
/// typed [`CommandAck`]. Idempotency is handled inside; callers
|
||||||
|
/// just re-submit the same `command_id` on retransmit.
|
||||||
|
pub async fn dispatch(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
let cmd_id = cmd.command_id;
|
||||||
|
self.cache
|
||||||
|
.get_or_insert_with(cmd_id, || async move { self.dispatch_inner(cmd).await })
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_inner(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
match cmd.kind {
|
||||||
|
OperatorCommandKind::ConfirmPoi
|
||||||
|
| OperatorCommandKind::DeclinePoi
|
||||||
|
| OperatorCommandKind::StartTargetFollow => self.dispatch_poi_bound(cmd).await,
|
||||||
|
OperatorCommandKind::ReleaseTargetFollow => self.dispatch_via_scan_router(cmd).await,
|
||||||
|
OperatorCommandKind::AcknowledgeBitDegraded => self.dispatch_bit_ack(cmd).await,
|
||||||
|
OperatorCommandKind::SafetyOverride => self.dispatch_safety_override(cmd).await,
|
||||||
|
OperatorCommandKind::MissionAbort => self.dispatch_via_scan_router(cmd).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POI-bound dispatch path: enforces `unknown_poi_id` (AC-3) +
|
||||||
|
/// `expired` (AC-4) before forwarding to `scan_controller`.
|
||||||
|
async fn dispatch_poi_bound(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
let poi_id = match poi_id_from_payload(&cmd.payload) {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(_) => return CommandAck::error(ack_reasons::INVALID_PAYLOAD),
|
||||||
|
};
|
||||||
|
let Some(surfaced) = self.registry.get(poi_id) else {
|
||||||
|
return CommandAck::error(ack_reasons::UNKNOWN_POI_ID);
|
||||||
|
};
|
||||||
|
if surfaced.deadline <= Utc::now() {
|
||||||
|
return CommandAck::error(ack_reasons::EXPIRED);
|
||||||
|
}
|
||||||
|
self.dispatch_via_scan_router(cmd).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_via_scan_router(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
let Some(router) = self.scan_router.as_ref() else {
|
||||||
|
return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED);
|
||||||
|
};
|
||||||
|
match router.route(cmd).await {
|
||||||
|
Ok(()) => CommandAck::Ok,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "scan router rejected operator command");
|
||||||
|
CommandAck::error(ack_reasons::ROUTER_ERROR)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_bit_ack(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
let payload = match BitAckPayload::from_value(&cmd.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
let ack = CommandAck::error(ack_reasons::INVALID_PAYLOAD);
|
||||||
|
self.audit_bit(&cmd, Uuid::nil(), &ack).await;
|
||||||
|
return ack;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let ack = self.evaluate_bit_ack(&cmd, &payload).await;
|
||||||
|
self.audit_bit(&cmd, payload.report_id, &ack).await;
|
||||||
|
ack
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn evaluate_bit_ack(&self, cmd: &OperatorCommand, payload: &BitAckPayload) -> CommandAck {
|
||||||
|
let Some(severity) = self.bit_severity.as_ref() else {
|
||||||
|
return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED);
|
||||||
|
};
|
||||||
|
match severity.is_acknowledgeable(payload.report_id).await {
|
||||||
|
Some(true) => match self.safety_router.as_ref() {
|
||||||
|
Some(router) => match router
|
||||||
|
.acknowledge_bit_degraded(payload.report_id, payload.operator_id.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => CommandAck::Ok,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "mission safety router rejected bit ack");
|
||||||
|
CommandAck::error(ack_reasons::ROUTER_ERROR)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => CommandAck::error(ack_reasons::ROUTER_NOT_WIRED),
|
||||||
|
},
|
||||||
|
Some(false) => CommandAck::error(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL),
|
||||||
|
None => {
|
||||||
|
tracing::warn!(
|
||||||
|
command_id = %cmd.command_id,
|
||||||
|
report_id = %payload.report_id,
|
||||||
|
"bit_degraded_ack: unknown report id"
|
||||||
|
);
|
||||||
|
CommandAck::error(ack_reasons::UNKNOWN_BIT_REPORT)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_safety_override(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
let payload = match SafetyOverridePayload::from_value(&cmd.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
let ack = CommandAck::error(ack_reasons::INVALID_PAYLOAD);
|
||||||
|
self.audit_safety(&cmd, None, 0, &ack).await;
|
||||||
|
return ack;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let ack = self.apply_safety_override(&payload).await;
|
||||||
|
self.audit_safety(&cmd, Some(payload.scope), payload.duration_secs, &ack)
|
||||||
|
.await;
|
||||||
|
ack
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn apply_safety_override(&self, payload: &SafetyOverridePayload) -> CommandAck {
|
||||||
|
let Some(router) = self.safety_router.as_ref() else {
|
||||||
|
return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED);
|
||||||
|
};
|
||||||
|
match router
|
||||||
|
.apply_safety_override(
|
||||||
|
payload.scope,
|
||||||
|
payload.duration_secs,
|
||||||
|
payload.operator_id.clone(),
|
||||||
|
payload.rationale.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => CommandAck::Ok,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "mission safety router rejected safety override");
|
||||||
|
CommandAck::error(ack_reasons::ROUTER_ERROR)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn audit_bit(&self, cmd: &OperatorCommand, report_id: Uuid, outcome: &CommandAck) {
|
||||||
|
self.audit
|
||||||
|
.record(AuditEntry::BitDegradedAck {
|
||||||
|
command_id: cmd.command_id,
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
operator_id: cmd
|
||||||
|
.payload
|
||||||
|
.get("operator_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(String::from),
|
||||||
|
report_id,
|
||||||
|
outcome: outcome.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn audit_safety(
|
||||||
|
&self,
|
||||||
|
cmd: &OperatorCommand,
|
||||||
|
scope: Option<SafetyOverrideScope>,
|
||||||
|
duration_secs: u32,
|
||||||
|
outcome: &CommandAck,
|
||||||
|
) {
|
||||||
|
self.audit
|
||||||
|
.record(AuditEntry::SafetyOverride {
|
||||||
|
command_id: cmd.command_id,
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
operator_id: cmd
|
||||||
|
.payload
|
||||||
|
.get("operator_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(String::from),
|
||||||
|
scope: scope.unwrap_or(SafetyOverrideScope::BatteryRtl),
|
||||||
|
duration_secs,
|
||||||
|
outcome: outcome.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Builder
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct OperatorCommandDispatcherBuilder {
|
||||||
|
registry: Option<SurfacedPoiRegistry>,
|
||||||
|
cache: Option<IdempotencyCache>,
|
||||||
|
audit: Option<Arc<dyn AuditSink>>,
|
||||||
|
scan_router: Option<Arc<dyn ScanCommandRouter>>,
|
||||||
|
safety_router: Option<Arc<dyn MissionSafetyRouter>>,
|
||||||
|
bit_severity: Option<Arc<dyn BitReportSeverityLookup>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OperatorCommandDispatcherBuilder {
|
||||||
|
pub fn registry(mut self, r: SurfacedPoiRegistry) -> Self {
|
||||||
|
self.registry = Some(r);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn idempotency_cache(mut self, c: IdempotencyCache) -> Self {
|
||||||
|
self.cache = Some(c);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn audit_sink(mut self, s: Arc<dyn AuditSink>) -> Self {
|
||||||
|
self.audit = Some(s);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn scan_router(mut self, r: Arc<dyn ScanCommandRouter>) -> Self {
|
||||||
|
self.scan_router = Some(r);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn safety_router(mut self, r: Arc<dyn MissionSafetyRouter>) -> Self {
|
||||||
|
self.safety_router = Some(r);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bit_severity(mut self, s: Arc<dyn BitReportSeverityLookup>) -> Self {
|
||||||
|
self.bit_severity = Some(s);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(self) -> OperatorCommandDispatcher {
|
||||||
|
OperatorCommandDispatcher {
|
||||||
|
registry: self.registry.unwrap_or_default(),
|
||||||
|
cache: self
|
||||||
|
.cache
|
||||||
|
.unwrap_or_else(IdempotencyCache::with_default_ttl),
|
||||||
|
audit: self.audit.unwrap_or_else(TracingAuditSink::arc),
|
||||||
|
scan_router: self.scan_router,
|
||||||
|
safety_router: self.safety_router,
|
||||||
|
bit_severity: self.bit_severity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Payload extraction
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Extract `poi_id` from a POI-bound command payload.
|
||||||
|
///
|
||||||
|
/// Wire shape: `{ "poi_id": "<uuid>" }`. Anything else is a hard
|
||||||
|
/// `invalid_payload` error — the auth layer guarantees the payload
|
||||||
|
/// bytes weren't tampered with, but the operator UI might still send
|
||||||
|
/// the wrong shape on a build-skew between client and autopilot.
|
||||||
|
fn poi_id_from_payload(payload: &serde_json::Value) -> Result<Uuid, ()> {
|
||||||
|
let v = payload.get("poi_id").and_then(|v| v.as_str()).ok_or(())?;
|
||||||
|
Uuid::parse_str(v).map_err(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct BitAckPayload {
|
||||||
|
report_id: Uuid,
|
||||||
|
#[serde(default)]
|
||||||
|
operator_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BitAckPayload {
|
||||||
|
fn from_value(v: &serde_json::Value) -> Result<Self, serde_json::Error> {
|
||||||
|
serde_json::from_value(v.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct SafetyOverridePayload {
|
||||||
|
scope: SafetyOverrideScope,
|
||||||
|
duration_secs: u32,
|
||||||
|
operator_id: String,
|
||||||
|
#[serde(default)]
|
||||||
|
rationale: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SafetyOverridePayload {
|
||||||
|
fn from_value(v: &serde_json::Value) -> Result<Self, serde_json::Error> {
|
||||||
|
serde_json::from_value(v.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poi_id_extracts_uuid() {
|
||||||
|
// Arrange
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let v = json!({ "poi_id": id.to_string() });
|
||||||
|
|
||||||
|
// Act + Assert
|
||||||
|
assert_eq!(poi_id_from_payload(&v).unwrap(), id);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poi_id_missing_is_err() {
|
||||||
|
// Arrange
|
||||||
|
let v = json!({ "other": "x" });
|
||||||
|
|
||||||
|
// Act + Assert
|
||||||
|
assert!(poi_id_from_payload(&v).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bit_ack_payload_round_trip() {
|
||||||
|
// Arrange
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let v = json!({ "report_id": id.to_string(), "operator_id": "op1" });
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let p = BitAckPayload::from_value(&v).expect("parse");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(p.report_id, id);
|
||||||
|
assert_eq!(p.operator_id, Some("op1".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safety_override_payload_round_trip() {
|
||||||
|
// Arrange
|
||||||
|
let v = json!({
|
||||||
|
"scope": "battery_rtl",
|
||||||
|
"duration_secs": 60,
|
||||||
|
"operator_id": "op1",
|
||||||
|
"rationale": "post-mission RTL too aggressive"
|
||||||
|
});
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let p = SafetyOverridePayload::from_value(&v).expect("parse");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(p.scope, SafetyOverrideScope::BatteryRtl);
|
||||||
|
assert_eq!(p.duration_secs, 60);
|
||||||
|
assert_eq!(p.operator_id, "op1");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,173 @@
|
|||||||
|
//! AZ-680 — per-`command_id` idempotency cache.
|
||||||
|
//!
|
||||||
|
//! The spec (AC-2): "Re-transmit returns cached ack". A 60 s sliding
|
||||||
|
//! window over `command_id → CommandAck` so the operator UI can
|
||||||
|
//! safely retransmit on a flaky modem without causing the autopilot
|
||||||
|
//! to double-dispatch.
|
||||||
|
//!
|
||||||
|
//! Design notes:
|
||||||
|
//!
|
||||||
|
//! - Lazy eviction. `get_or_insert_with` purges expired entries before
|
||||||
|
//! inserting. We do not run a background sweeper task — at the
|
||||||
|
//! command rate of ≤5 confirms/min (operator workflow), the cache
|
||||||
|
//! stays small and per-call eviction is cheap.
|
||||||
|
//! - Returns the *cached* ack on hit; on miss, runs the supplied
|
||||||
|
//! future, caches its result, returns it. The future is NOT spawned
|
||||||
|
//! — the caller awaits it.
|
||||||
|
//! - Cache key is the full `Uuid`; the operator UI generates fresh
|
||||||
|
//! `command_id`s per logical command, so collisions imply a true
|
||||||
|
//! retransmit and we want to honour that.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::ack::CommandAck;
|
||||||
|
|
||||||
|
/// Default TTL per AZ-680 spec.
|
||||||
|
pub const DEFAULT_IDEMPOTENCY_TTL: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Entry {
|
||||||
|
ack: CommandAck,
|
||||||
|
cached_at: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded-by-TTL idempotency cache. Cheap to `clone` (internals are
|
||||||
|
/// an `Arc<Mutex<_>>`).
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct IdempotencyCache {
|
||||||
|
ttl: Duration,
|
||||||
|
inner: Arc<Mutex<HashMap<Uuid, Entry>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IdempotencyCache {
|
||||||
|
pub fn new(ttl: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
ttl,
|
||||||
|
inner: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_default_ttl() -> Self {
|
||||||
|
Self::new(DEFAULT_IDEMPOTENCY_TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the cached ack if `command_id` is present and not
|
||||||
|
/// expired; otherwise runs `produce`, caches its result, and
|
||||||
|
/// returns it. Concurrent calls with the same `command_id` MAY
|
||||||
|
/// each execute `produce` once — that is acceptable here because
|
||||||
|
/// the downstream routers themselves are idempotent for the same
|
||||||
|
/// validated payload (the router-level side effect is the same
|
||||||
|
/// across retries; the registry/queue lookups deduplicate POI
|
||||||
|
/// state). The cache's primary role is to short-circuit
|
||||||
|
/// re-transmits that arrive seconds later, not to serialise
|
||||||
|
/// concurrent dispatchers of the same id.
|
||||||
|
pub async fn get_or_insert_with<F, Fut>(&self, command_id: Uuid, produce: F) -> CommandAck
|
||||||
|
where
|
||||||
|
F: FnOnce() -> Fut,
|
||||||
|
Fut: Future<Output = CommandAck>,
|
||||||
|
{
|
||||||
|
if let Some(cached) = self.get(command_id) {
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
let ack = produce().await;
|
||||||
|
self.insert(command_id, ack.clone());
|
||||||
|
ack
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot lookup — also evicts expired entries opportunistically.
|
||||||
|
pub fn get(&self, command_id: Uuid) -> Option<CommandAck> {
|
||||||
|
let mut guard = self.inner.lock();
|
||||||
|
self.evict_expired(&mut guard);
|
||||||
|
guard.get(&command_id).map(|e| e.ack.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, command_id: Uuid, ack: CommandAck) {
|
||||||
|
let mut guard = self.inner.lock();
|
||||||
|
self.evict_expired(&mut guard);
|
||||||
|
guard.insert(
|
||||||
|
command_id,
|
||||||
|
Entry {
|
||||||
|
ack,
|
||||||
|
cached_at: Instant::now(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn evict_expired(&self, guard: &mut HashMap<Uuid, Entry>) {
|
||||||
|
let now = Instant::now();
|
||||||
|
guard.retain(|_, e| now.duration_since(e.cached_at) < self.ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
let mut guard = self.inner.lock();
|
||||||
|
self.evict_expired(&mut guard);
|
||||||
|
guard.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn miss_then_hit_runs_once() {
|
||||||
|
// Arrange
|
||||||
|
let cache = IdempotencyCache::with_default_ttl();
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let count = AtomicU32::new(0);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let _ = cache
|
||||||
|
.get_or_insert_with(id, || async {
|
||||||
|
count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
CommandAck::Ok
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
let _ = cache
|
||||||
|
.get_or_insert_with(id, || async {
|
||||||
|
count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
CommandAck::Ok
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(count.load(Ordering::SeqCst), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ttl_expiry_re_runs_producer() {
|
||||||
|
// Arrange — short TTL to keep the test fast.
|
||||||
|
let cache = IdempotencyCache::new(Duration::from_millis(20));
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let count = AtomicU32::new(0);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let _ = cache
|
||||||
|
.get_or_insert_with(id, || async {
|
||||||
|
count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
CommandAck::Ok
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(40)).await;
|
||||||
|
let _ = cache
|
||||||
|
.get_or_insert_with(id, || async {
|
||||||
|
count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
CommandAck::Ok
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(count.load(Ordering::SeqCst), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,8 @@
|
|||||||
//! Internal modules for `operator_bridge`. Not part of the public API.
|
//! Internal modules for `operator_bridge`. Not part of the public API.
|
||||||
|
|
||||||
|
pub mod audit;
|
||||||
pub mod auth;
|
pub mod auth;
|
||||||
|
pub mod dispatcher;
|
||||||
|
pub mod idempotency;
|
||||||
|
pub mod poi_registry;
|
||||||
pub mod poi_surface;
|
pub mod poi_surface;
|
||||||
|
|||||||
@@ -0,0 +1,128 @@
|
|||||||
|
//! AZ-680 — currently-surfaced POI registry.
|
||||||
|
//!
|
||||||
|
//! Tracks the subset of POIs that have been pushed to the operator UI
|
||||||
|
//! and have not yet been dequeued. The dispatcher consults this
|
||||||
|
//! registry to reject:
|
||||||
|
//!
|
||||||
|
//! - `Confirm` / `Decline` / `StartTargetFollow` for unknown
|
||||||
|
//! `poi_id`s (AC-3 → `unknown_poi_id`).
|
||||||
|
//! - Commands whose POI deadline has elapsed (AC-4 → `expired`).
|
||||||
|
//!
|
||||||
|
//! The registry is intentionally a plain `HashMap` behind a
|
||||||
|
//! [`parking_lot::Mutex`] — the dispatcher's lock window is short
|
||||||
|
//! (one O(1) lookup + one O(1) remove). A `RwLock` would not buy us
|
||||||
|
//! anything because the dispatcher writes on every confirm/decline.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use shared::models::poi::Poi;
|
||||||
|
|
||||||
|
/// Snapshot of the POI fields the dispatcher needs to enforce
|
||||||
|
/// validity + deadline checks without holding a reference to the
|
||||||
|
/// full [`Poi`] struct.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct SurfacedPoi {
|
||||||
|
pub poi_id: Uuid,
|
||||||
|
pub mgrs: String,
|
||||||
|
pub class_group: String,
|
||||||
|
pub deadline: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&Poi> for SurfacedPoi {
|
||||||
|
fn from(poi: &Poi) -> Self {
|
||||||
|
Self {
|
||||||
|
poi_id: poi.id,
|
||||||
|
mgrs: poi.mgrs.clone(),
|
||||||
|
class_group: poi.class_group.clone(),
|
||||||
|
deadline: poi.deadline,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory registry of surfaced-but-not-dequeued POIs. Cheap to
|
||||||
|
/// `clone` — internals are an `Arc<Mutex<_>>`.
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct SurfacedPoiRegistry {
|
||||||
|
inner: Arc<Mutex<HashMap<Uuid, SurfacedPoi>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SurfacedPoiRegistry {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a surfaced POI. Overwrites any prior entry with the
|
||||||
|
/// same id (the POI was re-surfaced after a rotation).
|
||||||
|
pub fn record(&self, poi: SurfacedPoi) {
|
||||||
|
self.inner.lock().insert(poi.poi_id, poi);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a POI from the surfaced set. Called when the POI is
|
||||||
|
/// dequeued (rotated, aged out, or operator-decided).
|
||||||
|
pub fn forget(&self, poi_id: Uuid) {
|
||||||
|
self.inner.lock().remove(&poi_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Look up a surfaced POI. Returns `None` if the id has never
|
||||||
|
/// been surfaced or has already been dequeued.
|
||||||
|
pub fn get(&self, poi_id: Uuid) -> Option<SurfacedPoi> {
|
||||||
|
self.inner.lock().get(&poi_id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.inner.lock().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Duration;
|
||||||
|
|
||||||
|
fn surfaced(deadline_secs: i64) -> SurfacedPoi {
|
||||||
|
SurfacedPoi {
|
||||||
|
poi_id: Uuid::new_v4(),
|
||||||
|
mgrs: "33UWP05".into(),
|
||||||
|
class_group: "vehicle".into(),
|
||||||
|
deadline: Utc::now() + Duration::seconds(deadline_secs),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn record_then_get_returns_clone() {
|
||||||
|
// Arrange
|
||||||
|
let r = SurfacedPoiRegistry::new();
|
||||||
|
let p = surfaced(120);
|
||||||
|
r.record(p.clone());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let got = r.get(p.poi_id).expect("must be present");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(got, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn forget_removes_entry() {
|
||||||
|
// Arrange
|
||||||
|
let r = SurfacedPoiRegistry::new();
|
||||||
|
let p = surfaced(120);
|
||||||
|
r.record(p.clone());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
r.forget(p.poi_id);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert!(r.get(p.poi_id).is_none());
|
||||||
|
assert!(r.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
//! `operator_bridge` — POI surfacing + operator command authentication.
|
//! `operator_bridge` — POI surfacing + operator command authentication
|
||||||
|
//! + dispatch.
|
||||||
//!
|
//!
|
||||||
//! Real implementation in this batch:
|
//! Real implementation in this batch:
|
||||||
//! - **AZ-678** `internal::auth::HmacOperatorValidator` — HMAC-SHA256
|
//! - **AZ-678** `internal::auth::HmacOperatorValidator` — HMAC-SHA256
|
||||||
@@ -7,11 +8,15 @@
|
|||||||
//! counters; sliding-window red-health gate.
|
//! counters; sliding-window red-health gate.
|
||||||
//! - **AZ-679** `internal::poi_surface::PoiSurfaceMapper` — wire-format
|
//! - **AZ-679** `internal::poi_surface::PoiSurfaceMapper` — wire-format
|
||||||
//! POI events + `PoiDequeued` events pushed through `TelemetrySink`.
|
//! POI events + `PoiDequeued` events pushed through `TelemetrySink`.
|
||||||
//!
|
//! - **AZ-680** `internal::dispatcher::OperatorCommandDispatcher` —
|
||||||
//! Real implementation lands in:
|
//! POI-bound dispatch path, per-`command_id` idempotency cache,
|
||||||
//! - AZ-680 `operator_bridge_command_dispatch`
|
//! unknown-POI + expired-deadline gates.
|
||||||
//! - AZ-681 `operator_bridge_safety_and_bit_ack`
|
//! - **AZ-681** `internal::dispatcher::OperatorCommandDispatcher` —
|
||||||
|
//! BIT-degraded ack severity gate + `SafetyOverride` forwarding
|
||||||
|
//! into `mission_executor` via `MissionSafetyRouter`; structured
|
||||||
|
//! audit log entry per safety command.
|
||||||
|
|
||||||
|
pub mod ack;
|
||||||
pub mod internal;
|
pub mod internal;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -20,7 +25,10 @@ use async_trait::async_trait;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use shared::contracts::{OperatorCommandSink, TelemetrySink};
|
use shared::contracts::{
|
||||||
|
BitReportSeverityLookup, MissionSafetyRouter, OperatorCommandSink, ScanCommandRouter,
|
||||||
|
TelemetrySink,
|
||||||
|
};
|
||||||
use shared::error::{AutopilotError, Result};
|
use shared::error::{AutopilotError, Result};
|
||||||
use shared::health::{ComponentHealth, HealthLevel};
|
use shared::health::{ComponentHealth, HealthLevel};
|
||||||
use shared::models::mission::Coordinate;
|
use shared::models::mission::Coordinate;
|
||||||
@@ -28,9 +36,16 @@ use shared::models::operator::OperatorCommand;
|
|||||||
use shared::models::operator_event::{DequeueReason, PhotoMetadata};
|
use shared::models::operator_event::{DequeueReason, PhotoMetadata};
|
||||||
use shared::models::poi::Poi;
|
use shared::models::poi::Poi;
|
||||||
|
|
||||||
|
pub use crate::ack::{ack_reasons, CommandAck};
|
||||||
|
pub use crate::internal::audit::{AuditEntry, AuditSink, TracingAuditSink};
|
||||||
pub use crate::internal::auth::{
|
pub use crate::internal::auth::{
|
||||||
AuthCounters, HmacOperatorValidator, HmacValidatorConfig, REJECTION_REASONS,
|
AuthCounters, HmacOperatorValidator, HmacValidatorConfig, REJECTION_REASONS,
|
||||||
};
|
};
|
||||||
|
pub use crate::internal::dispatcher::{
|
||||||
|
OperatorCommandDispatcher, OperatorCommandDispatcherBuilder,
|
||||||
|
};
|
||||||
|
pub use crate::internal::idempotency::{IdempotencyCache, DEFAULT_IDEMPOTENCY_TTL};
|
||||||
|
pub use crate::internal::poi_registry::{SurfacedPoi, SurfacedPoiRegistry};
|
||||||
pub use crate::internal::poi_surface::{PoiSurfaceMapper, PoiSurfaceMetrics};
|
pub use crate::internal::poi_surface::{PoiSurfaceMapper, PoiSurfaceMetrics};
|
||||||
|
|
||||||
const NAME: &str = "operator_bridge";
|
const NAME: &str = "operator_bridge";
|
||||||
@@ -71,6 +86,20 @@ pub struct OperatorBridge {
|
|||||||
/// `poi_mapper` so legacy callers continue to compile until the
|
/// `poi_mapper` so legacy callers continue to compile until the
|
||||||
/// composition root wires it in.
|
/// composition root wires it in.
|
||||||
validator: Option<Arc<HmacOperatorValidator>>,
|
validator: Option<Arc<HmacOperatorValidator>>,
|
||||||
|
/// AZ-680 — currently-surfaced POI registry. Shared between the
|
||||||
|
/// `surface_poi` / `emit_poi_dequeued` write-side and the
|
||||||
|
/// dispatcher's POI-id validity check.
|
||||||
|
poi_registry: SurfacedPoiRegistry,
|
||||||
|
/// AZ-680 / AZ-681 — command dispatcher. Optional until both the
|
||||||
|
/// scan + safety routers are wired; without it `dispatch` returns
|
||||||
|
/// `router_not_wired`.
|
||||||
|
dispatcher: Option<Arc<OperatorCommandDispatcher>>,
|
||||||
|
/// Builder-only accumulators for the dispatcher's routers + sink.
|
||||||
|
/// Consumed in [`OperatorBridge::with_dispatcher`].
|
||||||
|
scan_router: Option<Arc<dyn ScanCommandRouter>>,
|
||||||
|
safety_router: Option<Arc<dyn MissionSafetyRouter>>,
|
||||||
|
bit_severity: Option<Arc<dyn BitReportSeverityLookup>>,
|
||||||
|
audit_sink: Option<Arc<dyn AuditSink>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OperatorBridge {
|
impl OperatorBridge {
|
||||||
@@ -84,6 +113,12 @@ impl OperatorBridge {
|
|||||||
target_follow_rx: Some(tf_rx),
|
target_follow_rx: Some(tf_rx),
|
||||||
poi_mapper: None,
|
poi_mapper: None,
|
||||||
validator: None,
|
validator: None,
|
||||||
|
poi_registry: SurfacedPoiRegistry::new(),
|
||||||
|
dispatcher: None,
|
||||||
|
scan_router: None,
|
||||||
|
safety_router: None,
|
||||||
|
bit_severity: None,
|
||||||
|
audit_sink: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,12 +132,63 @@ impl OperatorBridge {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — wire `scan_controller`'s [`ScanCommandRouter`] impl.
|
||||||
|
pub fn with_scan_router(mut self, router: Arc<dyn ScanCommandRouter>) -> Self {
|
||||||
|
self.scan_router = Some(router);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — wire `mission_executor`'s [`MissionSafetyRouter`] impl.
|
||||||
|
pub fn with_safety_router(mut self, router: Arc<dyn MissionSafetyRouter>) -> Self {
|
||||||
|
self.safety_router = Some(router);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — wire `mission_executor`'s
|
||||||
|
/// [`BitReportSeverityLookup`] impl.
|
||||||
|
pub fn with_bit_severity_lookup(mut self, lookup: Arc<dyn BitReportSeverityLookup>) -> Self {
|
||||||
|
self.bit_severity = Some(lookup);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — override the default tracing audit sink. Used by
|
||||||
|
/// integration tests; production wires the default.
|
||||||
|
pub fn with_audit_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
|
||||||
|
self.audit_sink = Some(sink);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 / AZ-681 — finalise the dispatcher. Returns `self` so
|
||||||
|
/// the call can sit at the end of the builder chain. Idempotent
|
||||||
|
/// (calling twice rebuilds the dispatcher with the most-recent
|
||||||
|
/// wiring) — this matters because the composition root sometimes
|
||||||
|
/// re-runs the wiring sequence on subsystem restart.
|
||||||
|
pub fn with_dispatcher(mut self) -> Self {
|
||||||
|
let mut builder = OperatorCommandDispatcher::builder().registry(self.poi_registry.clone());
|
||||||
|
if let Some(r) = self.scan_router.clone() {
|
||||||
|
builder = builder.scan_router(r);
|
||||||
|
}
|
||||||
|
if let Some(r) = self.safety_router.clone() {
|
||||||
|
builder = builder.safety_router(r);
|
||||||
|
}
|
||||||
|
if let Some(s) = self.bit_severity.clone() {
|
||||||
|
builder = builder.bit_severity(s);
|
||||||
|
}
|
||||||
|
if let Some(s) = self.audit_sink.clone() {
|
||||||
|
builder = builder.audit_sink(s);
|
||||||
|
}
|
||||||
|
self.dispatcher = Some(Arc::new(builder.build()));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub fn handle(&self) -> OperatorBridgeHandle {
|
pub fn handle(&self) -> OperatorBridgeHandle {
|
||||||
OperatorBridgeHandle {
|
OperatorBridgeHandle {
|
||||||
middle_waypoint_tx: self.middle_waypoint_tx.clone(),
|
middle_waypoint_tx: self.middle_waypoint_tx.clone(),
|
||||||
target_follow_tx: self.target_follow_tx.clone(),
|
target_follow_tx: self.target_follow_tx.clone(),
|
||||||
poi_mapper: self.poi_mapper.clone(),
|
poi_mapper: self.poi_mapper.clone(),
|
||||||
validator: self.validator.clone(),
|
validator: self.validator.clone(),
|
||||||
|
poi_registry: self.poi_registry.clone(),
|
||||||
|
dispatcher: self.dispatcher.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,6 +199,15 @@ impl OperatorBridge {
|
|||||||
pub fn take_target_follow_receiver(&mut self) -> Option<mpsc::Receiver<TargetFollowEvent>> {
|
pub fn take_target_follow_receiver(&mut self) -> Option<mpsc::Receiver<TargetFollowEvent>> {
|
||||||
self.target_follow_rx.take()
|
self.target_follow_rx.take()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — clone of the surfaced-POI registry. Exposed so the
|
||||||
|
/// composition root can pre-seed entries on subsystem restart
|
||||||
|
/// and so integration tests can register POIs without spinning
|
||||||
|
/// up a TelemetrySink. The registry is also wired into the
|
||||||
|
/// dispatcher.
|
||||||
|
pub fn surfaced_registry(&self) -> SurfacedPoiRegistry {
|
||||||
|
self.poi_registry.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -123,19 +218,33 @@ pub struct OperatorBridgeHandle {
|
|||||||
target_follow_tx: mpsc::Sender<TargetFollowEvent>,
|
target_follow_tx: mpsc::Sender<TargetFollowEvent>,
|
||||||
poi_mapper: Option<Arc<PoiSurfaceMapper>>,
|
poi_mapper: Option<Arc<PoiSurfaceMapper>>,
|
||||||
validator: Option<Arc<HmacOperatorValidator>>,
|
validator: Option<Arc<HmacOperatorValidator>>,
|
||||||
|
/// AZ-680 — registry of surfaced-but-not-dequeued POIs. The
|
||||||
|
/// dispatcher consults this for unknown-id + deadline checks.
|
||||||
|
poi_registry: SurfacedPoiRegistry,
|
||||||
|
dispatcher: Option<Arc<OperatorCommandDispatcher>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OperatorBridgeHandle {
|
impl OperatorBridgeHandle {
|
||||||
/// AZ-679 — surface a POI to the operator and await the decision.
|
/// AZ-679 + AZ-680 — surface a POI to the operator. Records the
|
||||||
/// Today returns `NotImplemented` (the decision loop is AZ-680);
|
/// POI in the dispatcher's validity registry so subsequent
|
||||||
/// the surface event itself IS pushed (via the configured
|
/// confirm/decline/start-follow commands resolve. The event itself
|
||||||
/// `TelemetrySink`), so the operator UI receives it.
|
/// is pushed via the configured `TelemetrySink`.
|
||||||
|
///
|
||||||
|
/// Returns `OperatorDecision::Confirmed`/`Declined`/... is NOT
|
||||||
|
/// the responsibility of this method any more — the decision
|
||||||
|
/// arrives asynchronously via `dispatch` and the operator UI
|
||||||
|
/// applies it. The legacy `Result<OperatorDecision>` shape is
|
||||||
|
/// retained for callers that have not yet migrated; today the
|
||||||
|
/// method returns `NotImplemented` after the surface emits, and
|
||||||
|
/// `scan_controller` should use the non-decision-returning path
|
||||||
|
/// in `surface_poi_with_photo` instead.
|
||||||
pub async fn surface_poi(&self, poi: Poi) -> Result<OperatorDecision> {
|
pub async fn surface_poi(&self, poi: Poi) -> Result<OperatorDecision> {
|
||||||
match &self.poi_mapper {
|
match &self.poi_mapper {
|
||||||
Some(mapper) => {
|
Some(mapper) => {
|
||||||
|
self.poi_registry.record(SurfacedPoi::from(&poi));
|
||||||
mapper.surface(&poi, None).await?;
|
mapper.surface(&poi, None).await?;
|
||||||
Err(AutopilotError::NotImplemented(
|
Err(AutopilotError::NotImplemented(
|
||||||
"operator_bridge::surface_poi → decision loop (AZ-680)",
|
"operator_bridge::surface_poi → decision is async via dispatch (AZ-680)",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
None => Err(AutopilotError::NotImplemented(
|
None => Err(AutopilotError::NotImplemented(
|
||||||
@@ -144,8 +253,9 @@ impl OperatorBridgeHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// AZ-679 — surface a POI together with photo metadata (preferred
|
/// AZ-679 + AZ-680 — surface a POI together with photo metadata
|
||||||
/// path when the source detection carries an ROI snapshot).
|
/// (preferred path when the source detection carries an ROI
|
||||||
|
/// snapshot). Records the POI in the dispatcher's registry.
|
||||||
pub async fn surface_poi_with_photo(
|
pub async fn surface_poi_with_photo(
|
||||||
&self,
|
&self,
|
||||||
poi: &Poi,
|
poi: &Poi,
|
||||||
@@ -154,18 +264,39 @@ impl OperatorBridgeHandle {
|
|||||||
let mapper = self.poi_mapper.as_ref().ok_or_else(|| {
|
let mapper = self.poi_mapper.as_ref().ok_or_else(|| {
|
||||||
AutopilotError::Internal("surface_poi_with_photo: telemetry sink not wired".into())
|
AutopilotError::Internal("surface_poi_with_photo: telemetry sink not wired".into())
|
||||||
})?;
|
})?;
|
||||||
|
self.poi_registry.record(SurfacedPoi::from(poi));
|
||||||
mapper.surface(poi, Some(photo_metadata)).await.map(|_| ())
|
mapper.surface(poi, Some(photo_metadata)).await.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// AZ-679 — emit a `PoiDequeued` event (rotation / age-out /
|
/// AZ-679 + AZ-680 — emit a `PoiDequeued` event (rotation /
|
||||||
/// completion). Called by `scan_controller` through the bridge.
|
/// age-out / completion). Removes the POI from the dispatcher's
|
||||||
|
/// registry so any further confirm/decline for the same id
|
||||||
|
/// resolves to `unknown_poi_id`.
|
||||||
pub async fn emit_poi_dequeued(&self, poi_id: uuid::Uuid, reason: DequeueReason) -> Result<()> {
|
pub async fn emit_poi_dequeued(&self, poi_id: uuid::Uuid, reason: DequeueReason) -> Result<()> {
|
||||||
let mapper = self.poi_mapper.as_ref().ok_or_else(|| {
|
let mapper = self.poi_mapper.as_ref().ok_or_else(|| {
|
||||||
AutopilotError::Internal("emit_poi_dequeued: telemetry sink not wired".into())
|
AutopilotError::Internal("emit_poi_dequeued: telemetry sink not wired".into())
|
||||||
})?;
|
})?;
|
||||||
|
self.poi_registry.forget(poi_id);
|
||||||
mapper.emit_dequeued(poi_id, reason).await
|
mapper.emit_dequeued(poi_id, reason).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 / AZ-681 — dispatch a validated operator command and
|
||||||
|
/// return the typed [`CommandAck`]. The dispatcher must be wired
|
||||||
|
/// via `OperatorBridge::with_dispatcher`; without it every
|
||||||
|
/// command returns `router_not_wired`.
|
||||||
|
pub async fn dispatch_command(&self, cmd: OperatorCommand) -> CommandAck {
|
||||||
|
match &self.dispatcher {
|
||||||
|
Some(d) => d.dispatch(cmd).await,
|
||||||
|
None => CommandAck::error(ack_reasons::ROUTER_NOT_WIRED),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test/observability hook: peek the surfaced-POI registry.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn surfaced_poi_count(&self) -> usize {
|
||||||
|
self.poi_registry.len()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn poi_metrics(&self) -> Option<PoiSurfaceMetrics> {
|
pub fn poi_metrics(&self) -> Option<PoiSurfaceMetrics> {
|
||||||
self.poi_mapper.as_ref().map(|m| m.metrics())
|
self.poi_mapper.as_ref().map(|m| m.metrics())
|
||||||
}
|
}
|
||||||
@@ -197,12 +328,25 @@ impl OperatorBridgeHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — wire the bridge into the `OperatorCommandSink` trait so
|
||||||
|
/// `telemetry_stream`'s downlink can forward validated commands
|
||||||
|
/// uniformly. The trait surface is binary (`Result<()>`); the typed
|
||||||
|
/// [`CommandAck`] surfaces through [`OperatorBridgeHandle::dispatch_command`]
|
||||||
|
/// for callers that need the rejection reason. The trait impl maps:
|
||||||
|
///
|
||||||
|
/// - `CommandAck::Ok` → `Ok(())`
|
||||||
|
/// - `CommandAck::Error { reason }` → `Err(AutopilotError::Validation(reason))`
|
||||||
|
///
|
||||||
|
/// This keeps the trait minimal while still propagating actionable
|
||||||
|
/// rejection reasons to downstream consumers that only see the
|
||||||
|
/// trait surface.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl OperatorCommandSink for OperatorBridgeHandle {
|
impl OperatorCommandSink for OperatorBridgeHandle {
|
||||||
async fn dispatch(&self, _command: OperatorCommand) -> Result<()> {
|
async fn dispatch(&self, command: OperatorCommand) -> Result<()> {
|
||||||
Err(AutopilotError::NotImplemented(
|
match self.dispatch_command(command).await {
|
||||||
"operator_bridge::dispatch (AZ-680)",
|
CommandAck::Ok => Ok(()),
|
||||||
))
|
CommandAck::Error { reason } => Err(AutopilotError::Validation(reason)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,439 @@
|
|||||||
|
//! AZ-680 + AZ-681 — operator-command dispatcher acceptance tests.
|
||||||
|
//!
|
||||||
|
//! These tests exercise the dispatcher through the public
|
||||||
|
//! `OperatorBridgeHandle::dispatch_command` surface so the wiring
|
||||||
|
//! between the surfaced-POI registry, the idempotency cache, the
|
||||||
|
//! scan router, the safety router, the BIT severity lookup, and the
|
||||||
|
//! audit sink is covered end-to-end.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex as StdMutex;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{Duration as ChronoDuration, Utc};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use serde_json::json;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use operator_bridge::{
|
||||||
|
ack_reasons, AuditEntry, AuditSink, CommandAck, OperatorBridge, SurfacedPoi,
|
||||||
|
};
|
||||||
|
use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter};
|
||||||
|
use shared::error::Result;
|
||||||
|
use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope};
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Test doubles
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RecordingScanRouter {
|
||||||
|
calls: StdMutex<Vec<OperatorCommand>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ScanCommandRouter for RecordingScanRouter {
|
||||||
|
async fn route(&self, command: OperatorCommand) -> Result<()> {
|
||||||
|
self.calls.lock().unwrap().push(command);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RecordingSafetyRouter {
|
||||||
|
bit_acks: StdMutex<Vec<(Uuid, Option<String>)>>,
|
||||||
|
overrides: StdMutex<Vec<(SafetyOverrideScope, u32, String, String)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl MissionSafetyRouter for RecordingSafetyRouter {
|
||||||
|
async fn acknowledge_bit_degraded(
|
||||||
|
&self,
|
||||||
|
report_id: Uuid,
|
||||||
|
operator_id: Option<String>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.bit_acks.lock().unwrap().push((report_id, operator_id));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn apply_safety_override(
|
||||||
|
&self,
|
||||||
|
scope: SafetyOverrideScope,
|
||||||
|
duration_secs: u32,
|
||||||
|
operator_id: String,
|
||||||
|
rationale: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.overrides
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.push((scope, duration_secs, operator_id, rationale));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Severity lookup that returns whatever is registered for each id.
|
||||||
|
/// `Some(true)` for acknowledgeable (Degraded), `Some(false)` for
|
||||||
|
/// Fail, `None` for unknown.
|
||||||
|
#[derive(Default)]
|
||||||
|
struct StubBitSeverity {
|
||||||
|
inner: StdMutex<std::collections::HashMap<Uuid, bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StubBitSeverity {
|
||||||
|
fn set(&self, report_id: Uuid, acknowledgeable: bool) {
|
||||||
|
self.inner
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(report_id, acknowledgeable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl BitReportSeverityLookup for StubBitSeverity {
|
||||||
|
async fn is_acknowledgeable(&self, report_id: Uuid) -> Option<bool> {
|
||||||
|
self.inner.lock().unwrap().get(&report_id).copied()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
struct RecordingAuditSink {
|
||||||
|
entries: Arc<Mutex<Vec<AuditEntry>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AuditSink for RecordingAuditSink {
|
||||||
|
async fn record(&self, entry: AuditEntry) {
|
||||||
|
self.entries.lock().push(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Helpers
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
fn cmd(kind: OperatorCommandKind, payload: serde_json::Value) -> OperatorCommand {
|
||||||
|
OperatorCommand {
|
||||||
|
command_id: Uuid::new_v4(),
|
||||||
|
session_token: "session".to_string(),
|
||||||
|
sequence_number: 1,
|
||||||
|
issued_at_wallclock: Utc::now(),
|
||||||
|
kind,
|
||||||
|
payload,
|
||||||
|
signature: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn surfaced(deadline_secs: i64) -> SurfacedPoi {
|
||||||
|
SurfacedPoi {
|
||||||
|
poi_id: Uuid::new_v4(),
|
||||||
|
mgrs: "33UWP05".into(),
|
||||||
|
class_group: "vehicle".into(),
|
||||||
|
deadline: Utc::now() + ChronoDuration::seconds(deadline_secs),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Harness {
|
||||||
|
bridge: OperatorBridge,
|
||||||
|
scan: Arc<RecordingScanRouter>,
|
||||||
|
safety: Arc<RecordingSafetyRouter>,
|
||||||
|
severity: Arc<StubBitSeverity>,
|
||||||
|
audit: RecordingAuditSink,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn harness() -> Harness {
|
||||||
|
let scan = Arc::new(RecordingScanRouter::default());
|
||||||
|
let safety = Arc::new(RecordingSafetyRouter::default());
|
||||||
|
let severity = Arc::new(StubBitSeverity::default());
|
||||||
|
let audit = RecordingAuditSink::default();
|
||||||
|
let bridge = OperatorBridge::new(8)
|
||||||
|
.with_scan_router(scan.clone() as Arc<dyn ScanCommandRouter>)
|
||||||
|
.with_safety_router(safety.clone() as Arc<dyn MissionSafetyRouter>)
|
||||||
|
.with_bit_severity_lookup(severity.clone() as Arc<dyn BitReportSeverityLookup>)
|
||||||
|
.with_audit_sink(Arc::new(audit.clone()) as Arc<dyn AuditSink>)
|
||||||
|
.with_dispatcher();
|
||||||
|
Harness {
|
||||||
|
bridge,
|
||||||
|
scan,
|
||||||
|
safety,
|
||||||
|
severity,
|
||||||
|
audit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// AZ-680 ACs
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// AZ-680 AC-1 — Confirm forwards target hint.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az680_ac1_confirm_forwards_to_scan_router() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let surfaced = surfaced(120);
|
||||||
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::ConfirmPoi,
|
||||||
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack, CommandAck::Ok);
|
||||||
|
let calls = h.scan.calls.lock().unwrap();
|
||||||
|
assert_eq!(calls.len(), 1, "scan_router::route called exactly once");
|
||||||
|
assert!(matches!(calls[0].kind, OperatorCommandKind::ConfirmPoi));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 AC-2 — Re-transmit returns cached ack.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az680_ac2_retransmit_returns_cached_ack() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let surfaced = surfaced(120);
|
||||||
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
||||||
|
let command = cmd(
|
||||||
|
OperatorCommandKind::ConfirmPoi,
|
||||||
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Act — same command_id dispatched twice
|
||||||
|
let ack1 = handle.dispatch_command(command.clone()).await;
|
||||||
|
let ack2 = handle.dispatch_command(command.clone()).await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack1, CommandAck::Ok);
|
||||||
|
assert_eq!(ack2, CommandAck::Ok);
|
||||||
|
let calls = h.scan.calls.lock().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
calls.len(),
|
||||||
|
1,
|
||||||
|
"scan_router::route must be invoked exactly once across retransmits"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 AC-3 — Unknown POI id rejected.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az680_ac3_unknown_poi_id_rejected() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
|
||||||
|
// Act — POI id never surfaced
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::ConfirmPoi,
|
||||||
|
json!({ "poi_id": Uuid::new_v4().to_string() }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack.reason(), Some(ack_reasons::UNKNOWN_POI_ID));
|
||||||
|
assert!(
|
||||||
|
h.scan.calls.lock().unwrap().is_empty(),
|
||||||
|
"scan_router must not be invoked"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 AC-4 — Expired POI rejected.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az680_ac4_expired_poi_rejected() {
|
||||||
|
// Arrange — surface a POI whose deadline has already passed.
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let expired = SurfacedPoi {
|
||||||
|
deadline: Utc::now() - ChronoDuration::seconds(1),
|
||||||
|
..surfaced(0)
|
||||||
|
};
|
||||||
|
h.bridge.surfaced_registry().record(expired.clone());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::ConfirmPoi,
|
||||||
|
json!({ "poi_id": expired.poi_id.to_string() }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack.reason(), Some(ack_reasons::EXPIRED));
|
||||||
|
assert!(
|
||||||
|
h.scan.calls.lock().unwrap().is_empty(),
|
||||||
|
"scan_router must not be invoked on expired POI"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 AC-5 — Decline appends IgnoredItem via scan_controller.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az680_ac5_decline_forwards_to_scan_router() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let surfaced = surfaced(120);
|
||||||
|
h.bridge.surfaced_registry().record(surfaced.clone());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::DeclinePoi,
|
||||||
|
json!({ "poi_id": surfaced.poi_id.to_string() }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack, CommandAck::Ok);
|
||||||
|
let calls = h.scan.calls.lock().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
calls.len(),
|
||||||
|
1,
|
||||||
|
"DeclinePoi must reach scan_router exactly once"
|
||||||
|
);
|
||||||
|
assert!(matches!(calls[0].kind, OperatorCommandKind::DeclinePoi));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// AZ-681 ACs
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// AZ-681 AC-1 — BIT-DEGRADED ack succeeds.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az681_ac1_bit_degraded_ack_forwards() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let report_id = Uuid::new_v4();
|
||||||
|
h.severity.set(report_id, true);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::AcknowledgeBitDegraded,
|
||||||
|
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack, CommandAck::Ok);
|
||||||
|
let acks = h.safety.bit_acks.lock().unwrap();
|
||||||
|
assert_eq!(acks.len(), 1);
|
||||||
|
assert_eq!(acks[0], (report_id, Some("op1".to_string())));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 AC-2 — BIT-FAIL ack rejected.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az681_ac2_bit_fail_ack_rejected() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
let report_id = Uuid::new_v4();
|
||||||
|
h.severity.set(report_id, false);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::AcknowledgeBitDegraded,
|
||||||
|
json!({ "report_id": report_id.to_string(), "operator_id": "op1" }),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(ack.reason(), Some(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL));
|
||||||
|
assert!(
|
||||||
|
h.safety.bit_acks.lock().unwrap().is_empty(),
|
||||||
|
"safety_router must not be invoked on Fail report"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 AC-3 — Safety-override forwards with scope + duration, and
|
||||||
|
/// an audit entry is written.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az681_ac3_safety_override_forwards_with_audit_entry() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let ack = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::SafetyOverride,
|
||||||
|
json!({
|
||||||
|
"scope": "battery_rtl",
|
||||||
|
"duration_secs": 60,
|
||||||
|
"operator_id": "op1",
|
||||||
|
"rationale": "post-mission RTL too aggressive"
|
||||||
|
}),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert — router invoked with the right scope + duration.
|
||||||
|
assert_eq!(ack, CommandAck::Ok);
|
||||||
|
let overrides = h.safety.overrides.lock().unwrap();
|
||||||
|
assert_eq!(overrides.len(), 1);
|
||||||
|
assert_eq!(overrides[0].0, SafetyOverrideScope::BatteryRtl);
|
||||||
|
assert_eq!(overrides[0].1, 60);
|
||||||
|
assert_eq!(overrides[0].2, "op1");
|
||||||
|
|
||||||
|
// Assert — audit log has exactly one safety-override entry.
|
||||||
|
let entries = h.audit.entries.lock();
|
||||||
|
let safety_entries: Vec<_> = entries
|
||||||
|
.iter()
|
||||||
|
.filter(|e| matches!(e, AuditEntry::SafetyOverride { .. }))
|
||||||
|
.collect();
|
||||||
|
assert_eq!(safety_entries.len(), 1);
|
||||||
|
match safety_entries[0] {
|
||||||
|
AuditEntry::SafetyOverride {
|
||||||
|
scope,
|
||||||
|
duration_secs,
|
||||||
|
operator_id,
|
||||||
|
outcome,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
assert_eq!(*scope, SafetyOverrideScope::BatteryRtl);
|
||||||
|
assert_eq!(*duration_secs, 60);
|
||||||
|
assert_eq!(operator_id.as_deref(), Some("op1"));
|
||||||
|
assert_eq!(outcome, &CommandAck::Ok);
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 AC-4 — Audit log redacts secrets.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn az681_ac4_audit_log_contains_no_signature_or_session_token() {
|
||||||
|
// Arrange
|
||||||
|
let h = harness();
|
||||||
|
let handle = h.bridge.handle();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let _ = handle
|
||||||
|
.dispatch_command(cmd(
|
||||||
|
OperatorCommandKind::SafetyOverride,
|
||||||
|
json!({
|
||||||
|
"scope": "battery_rtl",
|
||||||
|
"duration_secs": 30,
|
||||||
|
"operator_id": "op1",
|
||||||
|
"rationale": "test"
|
||||||
|
}),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Assert — every audit entry serialised to JSON must omit
|
||||||
|
// `signature` and `session_token`.
|
||||||
|
let entries = h.audit.entries.lock();
|
||||||
|
assert!(!entries.is_empty());
|
||||||
|
for entry in entries.iter() {
|
||||||
|
let json = serde_json::to_string(entry).expect("serialises");
|
||||||
|
assert!(
|
||||||
|
!json.contains("signature"),
|
||||||
|
"audit entry leaked signature: {json}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!json.contains("session_token"),
|
||||||
|
"audit entry leaked session_token: {json}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,3 +20,4 @@ serde = { workspace = true }
|
|||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
|||||||
@@ -66,6 +66,22 @@ pub struct DeclineAction {
|
|||||||
pub class_group: String,
|
pub class_group: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — information returned when a POI is confirmed (or selected
|
||||||
|
/// for target-follow start). Mirrors [`DeclineAction`] so consumers
|
||||||
|
/// downstream of the confirm path (AZ-684 evidence ladder, AZ-685
|
||||||
|
/// mapobjects dispatch, AZ-686 gimbal issuance) get a typed
|
||||||
|
/// `(target_mgrs, target_class)` hint without re-querying the queue.
|
||||||
|
///
|
||||||
|
/// The POI is removed from the queue as part of `confirm`. A
|
||||||
|
/// subsequent confirm with the same `poi_id` returns `None`.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct ConfirmAction {
|
||||||
|
pub poi_id: Uuid,
|
||||||
|
pub target_mgrs: String,
|
||||||
|
pub target_class: String,
|
||||||
|
pub class_group: String,
|
||||||
|
}
|
||||||
|
|
||||||
impl PoiQueue {
|
impl PoiQueue {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
@@ -145,6 +161,23 @@ impl PoiQueue {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Confirm a POI by id. Removes from queue; returns the typed
|
||||||
|
/// `(target_mgrs, target_class)` hint that downstream consumers
|
||||||
|
/// (AZ-684 evidence ladder, AZ-686 gimbal issuance) build the
|
||||||
|
/// follow-up plan from. AZ-680 only needs the removal + the hint
|
||||||
|
/// to be carried back through `submit_operator_cmd`'s return
|
||||||
|
/// value.
|
||||||
|
pub fn confirm(&mut self, poi_id: Uuid) -> Option<ConfirmAction> {
|
||||||
|
let idx = self.entries.iter().position(|e| e.poi.id == poi_id)?;
|
||||||
|
let entry = self.entries.swap_remove(idx);
|
||||||
|
Some(ConfirmAction {
|
||||||
|
poi_id: entry.poi.id,
|
||||||
|
target_mgrs: entry.poi.mgrs,
|
||||||
|
target_class: entry.poi.class,
|
||||||
|
class_group: entry.poi.class_group,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Drop POIs whose deadline (set at insertion by the caller per
|
/// Drop POIs whose deadline (set at insertion by the caller per
|
||||||
/// the confidence-scaled window) has elapsed. Returns the IDs of
|
/// the confidence-scaled window) has elapsed. Returns the IDs of
|
||||||
/// forgotten POIs. NO `IgnoredItem` is created — timeout =
|
/// forgotten POIs. NO `IgnoredItem` is created — timeout =
|
||||||
|
|||||||
@@ -31,10 +31,12 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use shared::contracts::ScanCommandRouter;
|
||||||
use shared::error::{AutopilotError, Result};
|
use shared::error::{AutopilotError, Result};
|
||||||
use shared::health::{ComponentHealth, HealthLevel};
|
use shared::health::{ComponentHealth, HealthLevel};
|
||||||
use shared::models::operator::{OperatorCommand, OperatorCommandKind};
|
use shared::models::operator::{OperatorCommand, OperatorCommandKind};
|
||||||
@@ -44,7 +46,8 @@ pub mod internal;
|
|||||||
|
|
||||||
pub use internal::frame_rate_guard::{FrameRateGuard, FrameRateGuardConfig};
|
pub use internal::frame_rate_guard::{FrameRateGuard, FrameRateGuardConfig};
|
||||||
pub use internal::poi_queue::{
|
pub use internal::poi_queue::{
|
||||||
age_factor, decision_window, priority_score, DeclineAction, PoiQueue, SURFACE_CAP_PER_WINDOW,
|
age_factor, decision_window, priority_score, ConfirmAction, DeclineAction, PoiQueue,
|
||||||
|
SURFACE_CAP_PER_WINDOW,
|
||||||
};
|
};
|
||||||
pub use internal::state_machine::transitions::{transition, TransitionCtx};
|
pub use internal::state_machine::transitions::{transition, TransitionCtx};
|
||||||
pub use internal::state_machine::{RejectReason, ScanState, TransitionOutcome, Trigger};
|
pub use internal::state_machine::{RejectReason, ScanState, TransitionOutcome, Trigger};
|
||||||
@@ -153,11 +156,14 @@ pub struct ScanMetrics {
|
|||||||
|
|
||||||
/// Result of [`ScanControllerHandle::submit_operator_cmd`]. `Accepted`
|
/// Result of [`ScanControllerHandle::submit_operator_cmd`]. `Accepted`
|
||||||
/// means the command was applied with no return data; `Declined`
|
/// means the command was applied with no return data; `Declined`
|
||||||
/// carries the dispatchable IgnoredItem action AZ-685 must persist.
|
/// carries the dispatchable IgnoredItem action AZ-685 must persist;
|
||||||
|
/// `Confirmed` carries the typed `(target_mgrs, target_class)` hint
|
||||||
|
/// AZ-684 / AZ-686 build a follow-up plan from.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum SubmitOutcome {
|
pub enum SubmitOutcome {
|
||||||
Accepted,
|
Accepted,
|
||||||
Declined(DeclineAction),
|
Declined(DeclineAction),
|
||||||
|
Confirmed(ConfirmAction),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poi_id_from_payload(payload: &serde_json::Value) -> Result<Uuid> {
|
fn poi_id_from_payload(payload: &serde_json::Value) -> Result<Uuid> {
|
||||||
@@ -268,6 +274,18 @@ impl ScanControllerHandle {
|
|||||||
action
|
action
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — confirm a POI (or target-follow start). Looks up the
|
||||||
|
/// POI by id, removes it from the queue, and returns the typed
|
||||||
|
/// `(target_mgrs, target_class)` hint for downstream consumers.
|
||||||
|
///
|
||||||
|
/// The FSM-side follow-through (zoom-in trigger, target-follow
|
||||||
|
/// transition) is AZ-684's evidence-ladder scope and is NOT
|
||||||
|
/// performed here — this method only resolves the queue entry.
|
||||||
|
pub async fn confirm_poi(&self, poi_id: Uuid) -> Option<ConfirmAction> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
inner.poi_queue.confirm(poi_id)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn poi_queue_len(&self) -> usize {
|
pub async fn poi_queue_len(&self) -> usize {
|
||||||
self.inner.lock().await.poi_queue.len()
|
self.inner.lock().await.poi_queue.len()
|
||||||
}
|
}
|
||||||
@@ -279,20 +297,24 @@ impl ScanControllerHandle {
|
|||||||
|
|
||||||
/// Translate an operator command into a trigger and apply it.
|
/// Translate an operator command into a trigger and apply it.
|
||||||
///
|
///
|
||||||
/// AZ-682 / AZ-683 mapping (subset complete):
|
/// Mapping (AZ-682 / AZ-683 / AZ-680):
|
||||||
///
|
///
|
||||||
/// - `MissionAbort` → `Trigger::OperatorAbort` (AZ-682).
|
/// - `MissionAbort` → `Trigger::OperatorAbort` (AZ-682).
|
||||||
/// - `ReleaseTargetFollow` → `Trigger::OperatorReleaseFollow`
|
/// - `ReleaseTargetFollow` → `Trigger::OperatorReleaseFollow`
|
||||||
/// (AZ-682).
|
/// (AZ-682).
|
||||||
/// - `DeclinePoi { poi_id }` → queue decline; returns the
|
/// - `DeclinePoi { poi_id }` → queue decline; returns
|
||||||
/// resulting `DeclineAction` in [`SubmitOutcome::Declined`]
|
/// [`SubmitOutcome::Declined`] for the caller (AZ-685
|
||||||
/// for the caller (AZ-685 mapobjects dispatch) to persist
|
/// mapobjects dispatch) to persist (AZ-683).
|
||||||
/// (AZ-683).
|
/// - `ConfirmPoi { poi_id }` / `StartTargetFollow { poi_id }` →
|
||||||
/// - `ConfirmPoi` / `StartTargetFollow` → still
|
/// queue lookup + removal; returns
|
||||||
/// `NotImplemented(AZ-684)` since ROI / target_id resolution
|
/// [`SubmitOutcome::Confirmed`] carrying the typed
|
||||||
/// needs the evidence ladder.
|
/// `(target_mgrs, target_class)` hint (AZ-680). The FSM-side
|
||||||
/// - `AcknowledgeBitDegraded` / `SafetyOverride` →
|
/// follow-through (zoom-in trigger, target-follow transition)
|
||||||
/// `NotImplemented(AZ-684)`.
|
/// is AZ-684's scope.
|
||||||
|
/// - `AcknowledgeBitDegraded` / `SafetyOverride` are NOT
|
||||||
|
/// handled here — those go to `mission_executor` via the
|
||||||
|
/// `MissionSafetyRouter` path wired by `operator_bridge`
|
||||||
|
/// (AZ-681). Receiving one in this method is a routing bug.
|
||||||
pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result<SubmitOutcome> {
|
pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result<SubmitOutcome> {
|
||||||
match command.kind {
|
match command.kind {
|
||||||
OperatorCommandKind::MissionAbort => {
|
OperatorCommandKind::MissionAbort => {
|
||||||
@@ -313,16 +335,21 @@ impl ScanControllerHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
OperatorCommandKind::ConfirmPoi | OperatorCommandKind::StartTargetFollow => {
|
OperatorCommandKind::ConfirmPoi | OperatorCommandKind::StartTargetFollow => {
|
||||||
Err(AutopilotError::NotImplemented(
|
let poi_id = poi_id_from_payload(&command.payload)?;
|
||||||
"scan_controller::submit_operator_cmd (AZ-684 evidence ladder)",
|
match self.confirm_poi(poi_id).await {
|
||||||
))
|
Some(action) => Ok(SubmitOutcome::Confirmed(action)),
|
||||||
|
None => Err(AutopilotError::Validation(format!(
|
||||||
|
"{:?}: unknown poi_id {poi_id}",
|
||||||
|
command.kind
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OperatorCommandKind::AcknowledgeBitDegraded | OperatorCommandKind::SafetyOverride => {
|
||||||
|
Err(AutopilotError::Validation(format!(
|
||||||
|
"scan_controller does not handle {:?}; route via MissionSafetyRouter",
|
||||||
|
command.kind
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
OperatorCommandKind::AcknowledgeBitDegraded => Err(AutopilotError::NotImplemented(
|
|
||||||
"scan_controller::submit_operator_cmd (AZ-684 evidence ladder)",
|
|
||||||
)),
|
|
||||||
OperatorCommandKind::SafetyOverride => Err(AutopilotError::NotImplemented(
|
|
||||||
"scan_controller::submit_operator_cmd (AZ-684 evidence ladder)",
|
|
||||||
)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -400,6 +427,22 @@ impl ScanControllerHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — adapter for the `shared::contracts::ScanCommandRouter`
|
||||||
|
/// trait so `operator_bridge` (Layer 3) can dispatch operator
|
||||||
|
/// commands into `scan_controller` (Layer 4) without importing this
|
||||||
|
/// crate directly. Forwards to the inherent
|
||||||
|
/// [`ScanControllerHandle::submit_operator_cmd`] and discards the
|
||||||
|
/// `SubmitOutcome` (the trait surface is intentionally minimal —
|
||||||
|
/// `operator_bridge` does not need the typed hint; AZ-685 wires the
|
||||||
|
/// `Confirmed`/`Declined` actions into `mapobjects_store` through a
|
||||||
|
/// different path).
|
||||||
|
#[async_trait]
|
||||||
|
impl ScanCommandRouter for ScanControllerHandle {
|
||||||
|
async fn route(&self, command: OperatorCommand) -> Result<()> {
|
||||||
|
self.submit_operator_cmd(command).await.map(|_| ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -153,7 +153,73 @@ async fn decline_poi_via_operator_command_emits_action() {
|
|||||||
assert_eq!(action.mgrs, "decline-me");
|
assert_eq!(action.mgrs, "decline-me");
|
||||||
assert_eq!(action.class_group, "armor");
|
assert_eq!(action.class_group, "armor");
|
||||||
}
|
}
|
||||||
SubmitOutcome::Accepted => panic!("decline must return Declined action"),
|
other => panic!("decline must return Declined action, got {other:?}"),
|
||||||
}
|
}
|
||||||
assert_eq!(h.poi_queue_len().await, 0);
|
assert_eq!(h.poi_queue_len().await, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — ConfirmPoi via operator command returns
|
||||||
|
/// `SubmitOutcome::Confirmed` with the typed target hint and drains
|
||||||
|
/// the POI from the queue.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn confirm_poi_via_operator_command_emits_action() {
|
||||||
|
// Arrange
|
||||||
|
let h = ScanController::new().handle();
|
||||||
|
let p = poi(0.8, "confirm-me");
|
||||||
|
let id = p.id;
|
||||||
|
let expected_class = p.class.clone();
|
||||||
|
let expected_group = p.class_group.clone();
|
||||||
|
h.submit_poi_candidate(p, 0.5).await;
|
||||||
|
|
||||||
|
let cmd = OperatorCommand {
|
||||||
|
command_id: Uuid::new_v4(),
|
||||||
|
session_token: "s".to_string(),
|
||||||
|
sequence_number: 1,
|
||||||
|
issued_at_wallclock: Utc::now(),
|
||||||
|
kind: OperatorCommandKind::ConfirmPoi,
|
||||||
|
payload: json!({ "poi_id": id.to_string() }),
|
||||||
|
signature: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let outcome = h.submit_operator_cmd(cmd).await.expect("confirm accepted");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
match outcome {
|
||||||
|
SubmitOutcome::Confirmed(action) => {
|
||||||
|
assert_eq!(action.poi_id, id);
|
||||||
|
assert_eq!(action.target_mgrs, "confirm-me");
|
||||||
|
assert_eq!(action.target_class, expected_class);
|
||||||
|
assert_eq!(action.class_group, expected_group);
|
||||||
|
}
|
||||||
|
other => panic!("confirm must return Confirmed action, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert_eq!(h.poi_queue_len().await, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — ConfirmPoi for an unknown poi_id must NOT silently
|
||||||
|
/// succeed. Returns a `Validation` error so `operator_bridge` can
|
||||||
|
/// surface a typed NACK to the operator UI.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn confirm_poi_unknown_id_is_validation_error() {
|
||||||
|
// Arrange
|
||||||
|
let h = ScanController::new().handle();
|
||||||
|
let cmd = OperatorCommand {
|
||||||
|
command_id: Uuid::new_v4(),
|
||||||
|
session_token: "s".to_string(),
|
||||||
|
sequence_number: 1,
|
||||||
|
issued_at_wallclock: Utc::now(),
|
||||||
|
kind: OperatorCommandKind::ConfirmPoi,
|
||||||
|
payload: json!({ "poi_id": Uuid::new_v4().to_string() }),
|
||||||
|
signature: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let err = h
|
||||||
|
.submit_operator_cmd(cmd)
|
||||||
|
.await
|
||||||
|
.expect_err("unknown poi must error");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert!(matches!(err, shared::error::AutopilotError::Validation(_)));
|
||||||
|
}
|
||||||
|
|||||||
@@ -83,6 +83,66 @@ pub trait OperatorCommandSink: Send + Sync {
|
|||||||
async fn dispatch(&self, command: OperatorCommand) -> Result<()>;
|
async fn dispatch(&self, command: OperatorCommand) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-680 — route a validated `OperatorCommand` into `scan_controller`.
|
||||||
|
///
|
||||||
|
/// Lives in `shared::contracts` so `operator_bridge` (Layer 3) can
|
||||||
|
/// depend on the trait without importing `scan_controller` (Layer 4).
|
||||||
|
/// `scan_controller` implements this for its public `Handle`.
|
||||||
|
///
|
||||||
|
/// The trait name uses `route` instead of `submit_operator_cmd` to
|
||||||
|
/// avoid a name collision with the inherent method on
|
||||||
|
/// `ScanControllerHandle`. Implementations forward to the inherent
|
||||||
|
/// method.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ScanCommandRouter: Send + Sync {
|
||||||
|
async fn route(&self, command: OperatorCommand) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — forward safety-critical operator commands (BIT acks,
|
||||||
|
/// safety overrides) into `mission_executor`.
|
||||||
|
///
|
||||||
|
/// `operator_bridge` (Layer 3) cannot import `mission_executor`
|
||||||
|
/// (Layer 3 sibling). The composition root constructs a concrete
|
||||||
|
/// impl that wraps the executor's BIT ack channel + battery monitor
|
||||||
|
/// handle.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait MissionSafetyRouter: Send + Sync {
|
||||||
|
/// Forward a signed BIT-degraded acknowledgement. The
|
||||||
|
/// `report_id` identifies the originating BIT report that
|
||||||
|
/// produced the `Degraded` verdict. `operator_id` is carried for
|
||||||
|
/// the executor's structured-log trail.
|
||||||
|
async fn acknowledge_bit_degraded(
|
||||||
|
&self,
|
||||||
|
report_id: uuid::Uuid,
|
||||||
|
operator_id: Option<String>,
|
||||||
|
) -> Result<()>;
|
||||||
|
|
||||||
|
/// Apply a signed safety override. The override is bounded by
|
||||||
|
/// `duration_secs`; the receiving subsystem (e.g. battery
|
||||||
|
/// monitor) is responsible for enforcing the deadline.
|
||||||
|
async fn apply_safety_override(
|
||||||
|
&self,
|
||||||
|
scope: crate::models::operator::SafetyOverrideScope,
|
||||||
|
duration_secs: u32,
|
||||||
|
operator_id: String,
|
||||||
|
rationale: String,
|
||||||
|
) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — look up the severity of a previously-generated BIT report
|
||||||
|
/// by id. `operator_bridge` consults this before forwarding a BIT-
|
||||||
|
/// degraded ack: a `Fail` severity is never acknowledgeable (per
|
||||||
|
/// AC-2).
|
||||||
|
///
|
||||||
|
/// Returns `Some(true)` when the report exists and is acknowledgeable
|
||||||
|
/// (severity is NOT `Fail`); `Some(false)` when known and `Fail`;
|
||||||
|
/// `None` when the report id has never been generated (or has aged
|
||||||
|
/// out of the lookup cache).
|
||||||
|
#[async_trait]
|
||||||
|
pub trait BitReportSeverityLookup: Send + Sync {
|
||||||
|
async fn is_acknowledgeable(&self, report_id: uuid::Uuid) -> Option<bool>;
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -20,6 +20,31 @@ pub enum OperatorCommandKind {
|
|||||||
MissionAbort,
|
MissionAbort,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// AZ-681 — scope of a `SafetyOverride` command. Each variant maps to
|
||||||
|
/// a specific failsafe family in `mission_executor` that the operator
|
||||||
|
/// is suppressing for a bounded duration (architecture.md §F10).
|
||||||
|
///
|
||||||
|
/// Marked `#[non_exhaustive]` so adding `LinkLost` / `Geofence` later
|
||||||
|
/// is a non-breaking change to downstream matchers.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum SafetyOverrideScope {
|
||||||
|
/// Suppress battery-RTL until the override deadline elapses. The
|
||||||
|
/// `hard_floor` land-now is NEVER suppressible regardless of
|
||||||
|
/// override (per `architecture.md §F10`).
|
||||||
|
BatteryRtl,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SafetyOverrideScope {
|
||||||
|
/// Stable kebab-case label for audit logs and metrics.
|
||||||
|
pub fn label(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::BatteryRtl => "battery_rtl",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct OperatorCommand {
|
pub struct OperatorCommand {
|
||||||
pub command_id: Uuid,
|
pub command_id: Uuid,
|
||||||
|
|||||||
Reference in New Issue
Block a user