diff --git a/Cargo.lock b/Cargo.lock index 4bdce49..3a4fcaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2244,6 +2244,7 @@ checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" name = "scan_controller" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "gimbal_controller", "mapobjects_store", diff --git a/_docs/02_tasks/todo/AZ-680_operator_bridge_command_dispatch.md b/_docs/02_tasks/done/AZ-680_operator_bridge_command_dispatch.md similarity index 100% rename from _docs/02_tasks/todo/AZ-680_operator_bridge_command_dispatch.md rename to _docs/02_tasks/done/AZ-680_operator_bridge_command_dispatch.md diff --git a/_docs/02_tasks/todo/AZ-681_operator_bridge_safety_and_bit_ack.md b/_docs/02_tasks/done/AZ-681_operator_bridge_safety_and_bit_ack.md similarity index 100% rename from _docs/02_tasks/todo/AZ-681_operator_bridge_safety_and_bit_ack.md rename to _docs/02_tasks/done/AZ-681_operator_bridge_safety_and_bit_ack.md diff --git a/_docs/03_implementation/batch_17_cycle1_report.md b/_docs/03_implementation/batch_17_cycle1_report.md new file mode 100644 index 0000000..30a313e --- /dev/null +++ b/_docs/03_implementation/batch_17_cycle1_report.md @@ -0,0 +1,89 @@ +# Batch Report + +**Batch**: 17 +**Cycle**: 1 +**Tasks**: AZ-680, AZ-681 +**Date**: 2026-05-20 + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|---------------|-------|-------------|--------| +| AZ-680_operator_bridge_command_dispatch | Done | 14 files | scan_controller: 8 (2 new); operator_bridge: 20 lib + 9 integration; mission_executor: 35 lib | 5/5 ACs covered | None | +| AZ-681_operator_bridge_safety_and_bit_ack | Done | shared with AZ-680 | (counted above; 4 new integration tests cover AZ-681 ACs) | 4/4 ACs covered | None | + +## AC Coverage map — AZ-680 + +| AC | Test | File | Notes | +|----|------|------|-------| +| AC-1 Confirm forwards target hint | `az680_ac1_confirm_forwards_to_scan_router` | `crates/operator_bridge/tests/dispatcher.rs` | Records POI in registry, dispatches `ConfirmPoi`, asserts `scan_router.route` invoked exactly once with the original command | +| AC-2 Re-transmit returns cached ack | `az680_ac2_retransmit_returns_cached_ack` | same file | Same `command_id` dispatched twice; second call returns `Ok` without re-invoking router (60 s `IdempotencyCache`) | +| AC-3 Unknown POI id rejected | `az680_ac3_unknown_poi_id_rejected` | same file | Asserts `CommandAck::Error { reason: "unknown_poi_id" }` and router never invoked | +| AC-4 Expired POI rejected | `az680_ac4_expired_poi_rejected` | same file | Pre-seeds a surfaced POI with past `deadline`; asserts `expired` ack and router not invoked | +| AC-5 Decline appends IgnoredItem via scan_controller | `az680_ac5_decline_forwards_to_scan_router` | same file | DeclinePoi dispatches into `scan_router.route` exactly once; ack `Ok` | + +Plus scan_controller native coverage of the `ConfirmPoi` path (queue-side resolution): `confirm_poi_via_operator_command_emits_action` + `confirm_poi_unknown_id_is_validation_error` in `crates/scan_controller/tests/poi_queue.rs`. + +## AC Coverage map — AZ-681 + +| AC | Test | File | Notes | +|----|------|------|-------| +| AC-1 BIT-DEGRADED ack succeeds | `az681_ac1_bit_degraded_ack_forwards` | `crates/operator_bridge/tests/dispatcher.rs` | Severity lookup returns `Some(true)`; safety_router.acknowledge_bit_degraded invoked exactly once with the report_id + operator_id | +| AC-2 BIT-FAIL ack rejected | `az681_ac2_bit_fail_ack_rejected` | same file | Severity lookup returns `Some(false)`; ack returns `cannot_acknowledge_fail`; safety_router not invoked | +| AC-3 Safety-override forwards with scope + duration | `az681_ac3_safety_override_forwards_with_audit_entry` | same file | SafetyOverride { BatteryRtl, 60s } dispatched; safety_router.apply_safety_override called once with the exact scope/duration; audit log contains exactly one matching `SafetyOverride` entry with `outcome: Ok` | +| AC-4 Audit log redacts secrets | `az681_ac4_audit_log_contains_no_signature_or_session_token` | same file | Every audit entry serialised to JSON; asserts no `signature` and no `session_token` substring. Lock-in: `AuditEntry` enum has no fields that could leak either secret | + +## AC Test Coverage: All covered (9/9 across both tasks) +## Code Review Verdict: PASS (self-review — see findings below) +## Auto-Fix Attempts: 0 +## Stuck Agents: None + +## Files modified + +``` +M crates/shared/src/models/operator.rs (+SafetyOverrideScope) +M crates/shared/src/contracts/mod.rs (+ScanCommandRouter +MissionSafetyRouter +BitReportSeverityLookup) +M crates/scan_controller/Cargo.toml (+async-trait) +M crates/scan_controller/src/lib.rs (confirm_poi + ScanCommandRouter impl + SubmitOutcome::Confirmed) +M crates/scan_controller/src/internal/poi_queue/mod.rs (+ConfirmAction + PoiQueue::confirm) +M crates/scan_controller/tests/poi_queue.rs (+2 tests: confirm path; replaced exhaustive match with catch-all to handle new variant) +M crates/mission_executor/src/lib.rs (+pub use SafetyDispatchHandle) +M crates/mission_executor/src/internal/mod.rs (+safety_dispatch module) +A crates/mission_executor/src/internal/safety_dispatch.rs (NEW: MissionSafetyRouter impl) +M crates/mission_executor/src/internal/bit.rs (+bounded report_overalls FIFO; +report_overall + BitReportSeverityLookup impl on BitControllerHandle) +M crates/operator_bridge/src/lib.rs (registry+dispatcher wiring; with_scan_router/safety_router/bit_severity_lookup/audit_sink/dispatcher; dispatch_command; OperatorCommandSink impl now real; registry forget/record on dequeue/surface) +M crates/operator_bridge/src/internal/mod.rs (+audit +dispatcher +idempotency +poi_registry) +A crates/operator_bridge/src/ack.rs (NEW: CommandAck + ack_reasons) +A crates/operator_bridge/src/internal/audit.rs (NEW: AuditEntry / AuditSink / TracingAuditSink) +A crates/operator_bridge/src/internal/dispatcher.rs (NEW: OperatorCommandDispatcher + Builder) +A crates/operator_bridge/src/internal/idempotency.rs (NEW: IdempotencyCache 60s TTL) +A crates/operator_bridge/src/internal/poi_registry.rs (NEW: SurfacedPoi + SurfacedPoiRegistry) +A crates/operator_bridge/tests/dispatcher.rs (NEW: 9 integration tests) +M _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md (note: ac1 also flakes) +R _docs/02_tasks/todo/AZ-680_operator_bridge_command_dispatch.md → done/... +R _docs/02_tasks/todo/AZ-681_operator_bridge_safety_and_bit_ack.md → done/... +``` + +## Architecture notes + +- The cross-component dispatch shape is now: `operator_bridge` (Layer 3) → `ScanCommandRouter` / `MissionSafetyRouter` / `BitReportSeverityLookup` traits in `shared::contracts` (Layer 1) → concrete impls on `ScanControllerHandle` and on the new `SafetyDispatchHandle` (constructed at the composition root from `BitController::ack_tx` + `BatteryMonitorHandle`). +- `BitControllerHandle` now retains a bounded FIFO of the last 16 `(report_id, overall)` pairs so `is_acknowledgeable` can answer for any report id observed in the current pre-flight gate cycle. Beyond that horizon, the dispatcher rejects with `unknown_bit_report` rather than guessing. +- `SafetyOverrideScope` is `#[non_exhaustive]` so future variants (`LinkLost`, `Geofence`) extend without breaking downstream matchers. `SafetyDispatchHandle::apply_safety_override` returns a typed Validation error on any unwired scope, so adding a variant to the enum without wiring the executor side fails closed. +- The audit log is a structured `tracing::info!` per entry by default (`TracingAuditSink`). The `AuditSink` trait keeps the door open for a file-based persistent sink later; integration tests substitute a recording sink. +- Idempotency cache TTL: 60 s per the task spec. Lazy eviction on each lookup/insert keeps the cache small without a background sweeper. + +## Quality gates + +- `cargo fmt --all`: clean +- `cargo clippy -p shared -p scan_controller -p mission_executor -p operator_bridge --all-targets -- -D warnings`: clean +- `cargo clippy --workspace --all-targets -- -D warnings`: pre-existing `Runtime::vlm_provider_name` dead-code lint (out-of-scope; tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md`) +- `cargo test -p shared -p scan_controller -p operator_bridge -p mission_executor`: all green +- `cargo test --workspace`: one pre-existing flake — `mission_executor::ac1_multirotor_happy_path_reaches_done` (same `await_state` polling race as the documented `ac3` flake; passes on retry; leftover updated) + +## Suggested next batch + +From `_docs/02_tasks/_dependencies_table.md`, ready tasks after this batch: + +- `AZ-659_frame_ingest_publisher` (3pt, no new deps) — was eligible for this batch but excluded for cohesion +- `AZ-682_scan_controller_state_machine_skeleton` follow-ups (AZ-684 evidence ladder) once `scan_controller` confirm path lands the FSM-side follow-through +- `AZ-685_mapobjects_store_ignored_items` (consumes the `DeclineAction` payload AZ-680 now produces end-to-end) diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 85928df..b4b9e04 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -7,24 +7,23 @@ name: Implement status: between-batches sub_step: phase: 0 - name: batch-17-select + name: batch-18-select detail: "" retry_count: 0 cycle: 1 tracker: jira ## Last Completed Batch -batch: 16 -commit: 251ebed -ticket: AZ-658 -jira_status: In Testing (confirmed via read-back) -pushed_to: origin/dev -report: _docs/03_implementation/batch_16_cycle1_report.md +batch: 17 +commit: ec494b3 +ticket: AZ-680, AZ-681 +jira_status: In Testing (confirmed via read-back for both) +report: _docs/03_implementation/batch_17_cycle1_report.md cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md ## Process Leftovers -- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — out-of-scope for batch 16 -- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — out-of-scope for batch 16 +- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 17 +- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; flake also hit `ac1` during batch 17 workspace run ## Cumulative Review Cadence Last cumulative: batches 13–15. Next due: end of batch 18. diff --git a/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md b/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md index 02377ab..8820bfb 100644 --- a/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md +++ b/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md @@ -1,7 +1,11 @@ -# Leftover: `mission_executor::ac3_bounded_retry_then_success` polling race +# Leftover: `mission_executor` state-machine polling race -**Timestamp**: 2026-05-20T08:30:00+02:00 -**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13 as intermittent. Reproduces more reliably on dev box under batch 14 workspace test load (the new tonic stack increases build/runtime pressure). +**Timestamp**: 2026-05-20T17:08:00+03:00 (originally 2026-05-20T08:30:00+02:00) +**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13, 17 as intermittent. Reproduces more reliably on dev box under workspace test load. + +**Affected tests**: +- `ac3_bounded_retry_then_success` (original) +- `ac1_multirotor_happy_path_reaches_done` (batch 17 — same `await_state` polling race in the same file) **Severity**: Medium (test design, not production code) **Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race. diff --git a/crates/mission_executor/src/internal/bit.rs b/crates/mission_executor/src/internal/bit.rs index 7dd6758..5eacb24 100644 --- a/crates/mission_executor/src/internal/bit.rs +++ b/crates/mission_executor/src/internal/bit.rs @@ -33,16 +33,27 @@ //! subsequent `Degraded` / `Fail` flips it back to `false` and the //! FSM's `bit_ok` guard fails closed. +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use shared::contracts::BitReportSeverityLookup; use tokio::sync::{broadcast, mpsc, watch, Mutex}; use tokio::task::JoinHandle; use tokio::time::Instant; use uuid::Uuid; +/// AZ-681 — bounded FIFO cap for the per-report `BitOverall` cache +/// queried by [`BitControllerHandle::is_acknowledgeable`]. BIT is a +/// pre-flight gate that goes sticky-Pass after success, so the +/// number of distinct report ids generated in one flight is small +/// (one per evaluation cycle until Pass / Failed). 16 is generous +/// without unbounded growth. +const REPORT_OVERALL_CAP: usize = 16; + // ============================================================================ // Public surface — types // ============================================================================ @@ -236,6 +247,7 @@ impl BitController { state: BitState::Idle, last_report: None, sticky_pass: false, + report_overalls: VecDeque::with_capacity(REPORT_OVERALL_CAP), })); let handle = BitControllerHandle { @@ -335,6 +347,11 @@ impl BitController { config.ack_timeout, ); let report_clone = report.clone(); + record_report_overall( + &mut guard.report_overalls, + report.id, + report.overall, + ); guard.last_report = Some(report); if new_state != from { guard.state = new_state.clone(); @@ -442,6 +459,28 @@ struct ControllerInner { /// downstream surfaces (lost-link ladder, geofence, battery — /// AZ-651 / AZ-652). sticky_pass: bool, + /// AZ-681 — recent `(report_id, overall)` pairs for the + /// `BitReportSeverityLookup` impl. Bounded FIFO; oldest evicted + /// at [`REPORT_OVERALL_CAP`]. A `None` lookup result means the + /// id has either never been generated or has aged out. + report_overalls: VecDeque<(Uuid, BitOverall)>, +} + +/// Push a `(report_id, overall)` pair onto the bounded FIFO cache. +/// Re-recording an existing id is a no-op (preserves the original +/// position so callers can't accidentally refresh aging). +fn record_report_overall( + cache: &mut VecDeque<(Uuid, BitOverall)>, + report_id: Uuid, + overall: BitOverall, +) { + if cache.iter().any(|(id, _)| *id == report_id) { + return; + } + if cache.len() == REPORT_OVERALL_CAP { + cache.pop_front(); + } + cache.push_back((report_id, overall)); } /// Read-side handle for the BIT controller. Cloneable. @@ -475,6 +514,32 @@ impl BitControllerHandle { pub async fn last_report(&self) -> Option { self.inner.lock().await.last_report.clone() } + + /// AZ-681 — overall verdict for a previously-generated report. + /// Returns `None` if the id has never been generated or has aged + /// out of the bounded cache. + pub async fn report_overall(&self, report_id: Uuid) -> Option { + self.inner + .lock() + .await + .report_overalls + .iter() + .find_map(|(id, o)| (*id == report_id).then_some(*o)) + } +} + +/// AZ-681 — `operator_bridge` (Layer 3) consults this before +/// forwarding a BIT-degraded ack. `Fail` reports are never +/// acknowledgeable (per AZ-681 AC-2). An aged-out / never-seen id +/// returns `None` so the bridge can NACK with a typed +/// "unknown report id" reason. +#[async_trait] +impl BitReportSeverityLookup for BitControllerHandle { + async fn is_acknowledgeable(&self, report_id: Uuid) -> Option { + self.report_overall(report_id) + .await + .map(|o| !matches!(o, BitOverall::Fail)) + } } #[cfg(test)] diff --git a/crates/mission_executor/src/internal/mod.rs b/crates/mission_executor/src/internal/mod.rs index 9b8b39f..d3e77bf 100644 --- a/crates/mission_executor/src/internal/mod.rs +++ b/crates/mission_executor/src/internal/mod.rs @@ -11,5 +11,6 @@ pub mod lost_link; pub mod middle_waypoint; pub mod multirotor; pub mod post_flight; +pub mod safety_dispatch; pub mod telemetry; pub mod types; diff --git a/crates/mission_executor/src/internal/safety_dispatch.rs b/crates/mission_executor/src/internal/safety_dispatch.rs new file mode 100644 index 0000000..323a408 --- /dev/null +++ b/crates/mission_executor/src/internal/safety_dispatch.rs @@ -0,0 +1,97 @@ +//! AZ-681 — concrete [`MissionSafetyRouter`] implementation owned by +//! `mission_executor` so `operator_bridge` (Layer 3) can stay free of +//! direct `mission_executor` imports. +//! +//! The composition root constructs a [`SafetyDispatchHandle`] from the +//! BIT controller's `ack` mpsc sender and the battery monitor's handle, +//! then hands an `Arc` to the operator-bridge +//! builder. +//! +//! Mapping (per `architecture.md §F10`): +//! +//! - `acknowledge_bit_degraded` → push a [`BitDegradedAck`] onto the +//! BIT controller's ack channel. The controller validates the +//! `report_id` matches `AwaitingAck`; `operator_bridge` has already +//! validated the signature + checked `BitReportSeverityLookup` to +//! ensure the report is acknowledgeable (NOT `Fail`). +//! - `apply_safety_override` → translate `SafetyOverrideScope` into the +//! subsystem-specific override. Only `BatteryRtl` is supported in +//! AZ-681 (other failsafe families add their own paths later); the +//! hard-floor land-now is NEVER suppressible regardless of scope. + +use std::time::Duration; + +use async_trait::async_trait; +use tokio::sync::mpsc; +use tokio::time::Instant; + +use shared::contracts::MissionSafetyRouter; +use shared::error::{AutopilotError, Result}; +use shared::models::operator::SafetyOverrideScope; +use uuid::Uuid; + +use crate::internal::battery_thresholds::{BatteryMonitorHandle, BatteryOverride}; +use crate::internal::bit::BitDegradedAck; + +/// Concrete dispatcher for safety-critical operator commands. Owns +/// only the handles it needs; do not stuff additional concerns here. +#[derive(Clone)] +pub struct SafetyDispatchHandle { + bit_ack_tx: mpsc::Sender, + battery: BatteryMonitorHandle, +} + +impl SafetyDispatchHandle { + pub fn new(bit_ack_tx: mpsc::Sender, battery: BatteryMonitorHandle) -> Self { + Self { + bit_ack_tx, + battery, + } + } +} + +#[async_trait] +impl MissionSafetyRouter for SafetyDispatchHandle { + async fn acknowledge_bit_degraded( + &self, + report_id: Uuid, + operator_id: Option, + ) -> Result<()> { + self.bit_ack_tx + .send(BitDegradedAck { + report_id, + operator_id, + }) + .await + .map_err(|e| AutopilotError::Internal(format!("bit ack channel closed: {e}"))) + } + + async fn apply_safety_override( + &self, + scope: SafetyOverrideScope, + duration_secs: u32, + operator_id: String, + rationale: String, + ) -> Result<()> { + match scope { + SafetyOverrideScope::BatteryRtl => { + let until = Instant::now() + Duration::from_secs(u64::from(duration_secs)); + self.battery + .apply_override(BatteryOverride { + until, + operator_id, + rationale, + }) + .await + } + // `SafetyOverrideScope` is `#[non_exhaustive]`; future + // variants (e.g. `LinkLost`, `Geofence`) MUST be wired + // explicitly here before they become usable. Until then, + // surface a typed Validation error so `operator_bridge` + // can NACK to the operator UI. + other => Err(AutopilotError::Validation(format!( + "safety override scope {other:?} not wired in mission_executor" + ))), + } + } +} diff --git a/crates/mission_executor/src/lib.rs b/crates/mission_executor/src/lib.rs index 3d5accf..f1c63c9 100644 --- a/crates/mission_executor/src/lib.rs +++ b/crates/mission_executor/src/lib.rs @@ -58,6 +58,7 @@ pub use internal::lost_link::{ }; pub use internal::middle_waypoint::{MiddleWaypointHint, MissionRePlanner}; pub use internal::post_flight::{MapObjectsDiffSource, MapObjectsPusher, PostFlightPusher}; +pub use internal::safety_dispatch::SafetyDispatchHandle; pub use internal::telemetry::{ Consumer, DropCountingReceiver, MavlinkProjection, TelemetryForwarder, }; diff --git a/crates/operator_bridge/src/ack.rs b/crates/operator_bridge/src/ack.rs new file mode 100644 index 0000000..6c377dd --- /dev/null +++ b/crates/operator_bridge/src/ack.rs @@ -0,0 +1,54 @@ +//! AZ-680 / AZ-681 — the typed acknowledgement returned by every +//! dispatched operator command. +//! +//! The dispatcher does NOT propagate downstream errors verbatim into +//! the operator UI — the surface here is a small fixed enum so the +//! UI can colour-code the result and so the idempotency cache key +//! space stays bounded. + +use serde::{Deserialize, Serialize}; + +/// Stable kebab-case reason strings emitted in +/// [`CommandAck::Error::reason`]. Exposed as constants so the unit + +/// integration tests can reference them without retyping the strings +/// (drift between caller assertions and the actual emit site has bit +/// us before). +pub mod ack_reasons { + pub const UNKNOWN_POI_ID: &str = "unknown_poi_id"; + pub const EXPIRED: &str = "expired"; + pub const CANNOT_ACKNOWLEDGE_FAIL: &str = "cannot_acknowledge_fail"; + pub const UNKNOWN_BIT_REPORT: &str = "unknown_bit_report"; + pub const INVALID_PAYLOAD: &str = "invalid_payload"; + pub const ROUTER_NOT_WIRED: &str = "router_not_wired"; + pub const ROUTER_ERROR: &str = "router_error"; + pub const UNSUPPORTED_KIND: &str = "unsupported_kind"; +} + +/// Result of a dispatched operator command. Carries either `Ok` or a +/// typed `Error { reason }` whose `reason` string is one of the +/// kebab-case constants in [`ack_reasons`]. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum CommandAck { + Ok, + Error { reason: String }, +} + +impl CommandAck { + pub fn error(reason: &str) -> Self { + Self::Error { + reason: reason.to_string(), + } + } + + pub fn is_ok(&self) -> bool { + matches!(self, Self::Ok) + } + + pub fn reason(&self) -> Option<&str> { + match self { + Self::Ok => None, + Self::Error { reason } => Some(reason.as_str()), + } + } +} diff --git a/crates/operator_bridge/src/internal/audit.rs b/crates/operator_bridge/src/internal/audit.rs new file mode 100644 index 0000000..7b1f008 --- /dev/null +++ b/crates/operator_bridge/src/internal/audit.rs @@ -0,0 +1,151 @@ +//! AZ-681 — structured audit log for safety-critical operator commands. +//! +//! Per the task spec (AC-4): every dispatched `BitDegradedAck` and +//! `SafetyOverride` writes an audit entry containing: +//! +//! - command id +//! - timestamp (UTC, ms precision) +//! - operator id (when known) +//! - scope / duration (for `SafetyOverride`) or `report_id` (for +//! `BitDegradedAck`) +//! - outcome (`Ok` / `Error { reason }`) +//! +//! Entries MUST NEVER contain the raw signature bytes or the session +//! token (AC-4). Callers pass already-redacted fields; the writer +//! has no access to the signature in the first place. +//! +//! ## Why both a sink trait + a tracing default +//! +//! - The default ([`TracingAuditSink`]) emits one structured +//! `tracing::info!` per entry — meets the spec's "file or +//! structured logger" requirement and integrates with whatever +//! tracing subscriber the composition root wires. +//! - The trait ([`AuditSink`]) lets tests substitute a recording +//! sink without piggy-backing on tracing's global subscriber +//! state (which other tests can race against). The integration +//! tests in `tests/dispatcher.rs` use the recording sink. + +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use uuid::Uuid; + +use crate::ack::CommandAck; +use shared::models::operator::SafetyOverrideScope; + +/// One entry in the audit log. Variants map 1:1 to the AZ-681 +/// command kinds. +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum AuditEntry { + BitDegradedAck { + command_id: Uuid, + timestamp: DateTime, + operator_id: Option, + report_id: Uuid, + outcome: CommandAck, + }, + SafetyOverride { + command_id: Uuid, + timestamp: DateTime, + operator_id: Option, + scope: SafetyOverrideScope, + duration_secs: u32, + outcome: CommandAck, + }, +} + +/// Sink for audit entries. Composition root injects the concrete +/// implementation; the default is [`TracingAuditSink`]. +#[async_trait] +pub trait AuditSink: Send + Sync { + async fn record(&self, entry: AuditEntry); +} + +/// Default sink — emits a single `tracing::info!` per entry. The +/// structured fields are picked up by any `tracing_subscriber` JSON +/// layer the composition root configures. +pub struct TracingAuditSink; + +impl TracingAuditSink { + pub fn arc() -> Arc { + Arc::new(Self) + } +} + +#[async_trait] +impl AuditSink for TracingAuditSink { + async fn record(&self, entry: AuditEntry) { + match &entry { + AuditEntry::BitDegradedAck { + command_id, + timestamp, + operator_id, + report_id, + outcome, + } => { + tracing::info!( + audit = "bit_degraded_ack", + command_id = %command_id, + timestamp = %timestamp.to_rfc3339(), + operator_id = operator_id.as_deref().unwrap_or(""), + report_id = %report_id, + outcome = ?outcome, + "operator_bridge audit: bit_degraded_ack" + ); + } + AuditEntry::SafetyOverride { + command_id, + timestamp, + operator_id, + scope, + duration_secs, + outcome, + } => { + tracing::info!( + audit = "safety_override", + command_id = %command_id, + timestamp = %timestamp.to_rfc3339(), + operator_id = operator_id.as_deref().unwrap_or(""), + scope = scope.label(), + duration_secs = duration_secs, + outcome = ?outcome, + "operator_bridge audit: safety_override" + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// AC-4 sanity: an entry serialised to JSON contains no + /// signature/session_token field. The entry struct itself has + /// no such field, so this is a static guarantee — but we + /// assert on the JSON shape to lock the wire contract. + #[test] + fn entry_json_has_no_signature_or_session_token() { + // Arrange + let entry = AuditEntry::SafetyOverride { + command_id: Uuid::new_v4(), + timestamp: Utc::now(), + operator_id: Some("op-1".into()), + scope: SafetyOverrideScope::BatteryRtl, + duration_secs: 60, + outcome: CommandAck::Ok, + }; + + // Act + let json = serde_json::to_string(&entry).expect("serialises"); + + // Assert + assert!(!json.contains("signature")); + assert!(!json.contains("session_token")); + assert!(json.contains("battery_rtl")); + assert!(json.contains("\"duration_secs\":60")); + } +} diff --git a/crates/operator_bridge/src/internal/dispatcher.rs b/crates/operator_bridge/src/internal/dispatcher.rs new file mode 100644 index 0000000..20b301d --- /dev/null +++ b/crates/operator_bridge/src/internal/dispatcher.rs @@ -0,0 +1,386 @@ +//! AZ-680 + AZ-681 — operator-command dispatcher. +//! +//! Sits between the validated-command boundary (AZ-678) and the +//! downstream routers. Responsibilities: +//! +//! - Per-`command_id` idempotency (60 s TTL — AZ-680 AC-2). +//! - POI-id validity + deadline checks for POI-bound commands +//! (AZ-680 AC-3 / AC-4). +//! - BIT-report severity gate for `AcknowledgeBitDegraded` +//! (AZ-681 AC-2). +//! - Routing — POI commands → `ScanCommandRouter`, BIT acks + +//! safety overrides → `MissionSafetyRouter`. +//! - Audit logging for every safety-critical command +//! (AZ-681 AC-3 / AC-4). +//! +//! The dispatcher OWNS the registry / cache / audit sink and is +//! constructed once by the composition root. It is cheap to clone +//! (all internals are `Arc`s). + +use std::sync::Arc; + +use chrono::Utc; +use serde::Deserialize; +use uuid::Uuid; + +use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter}; +use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope}; + +use crate::ack::{ack_reasons, CommandAck}; +use crate::internal::audit::{AuditEntry, AuditSink, TracingAuditSink}; +use crate::internal::idempotency::IdempotencyCache; +use crate::internal::poi_registry::SurfacedPoiRegistry; + +#[derive(Clone)] +pub struct OperatorCommandDispatcher { + pub(crate) registry: SurfacedPoiRegistry, + cache: IdempotencyCache, + audit: Arc, + scan_router: Option>, + safety_router: Option>, + bit_severity: Option>, +} + +impl OperatorCommandDispatcher { + pub fn builder() -> OperatorCommandDispatcherBuilder { + OperatorCommandDispatcherBuilder::default() + } + + /// Public test helper: peek into the idempotency cache. Used by + /// the integration tests to assert AC-2 ("re-transmit returns + /// cached ack"). + #[doc(hidden)] + pub fn cache_len(&self) -> usize { + self.cache.len() + } + + /// AZ-680 / AZ-681 — dispatch one validated command. Returns the + /// typed [`CommandAck`]. Idempotency is handled inside; callers + /// just re-submit the same `command_id` on retransmit. + pub async fn dispatch(&self, cmd: OperatorCommand) -> CommandAck { + let cmd_id = cmd.command_id; + self.cache + .get_or_insert_with(cmd_id, || async move { self.dispatch_inner(cmd).await }) + .await + } + + async fn dispatch_inner(&self, cmd: OperatorCommand) -> CommandAck { + match cmd.kind { + OperatorCommandKind::ConfirmPoi + | OperatorCommandKind::DeclinePoi + | OperatorCommandKind::StartTargetFollow => self.dispatch_poi_bound(cmd).await, + OperatorCommandKind::ReleaseTargetFollow => self.dispatch_via_scan_router(cmd).await, + OperatorCommandKind::AcknowledgeBitDegraded => self.dispatch_bit_ack(cmd).await, + OperatorCommandKind::SafetyOverride => self.dispatch_safety_override(cmd).await, + OperatorCommandKind::MissionAbort => self.dispatch_via_scan_router(cmd).await, + } + } + + /// POI-bound dispatch path: enforces `unknown_poi_id` (AC-3) + + /// `expired` (AC-4) before forwarding to `scan_controller`. + async fn dispatch_poi_bound(&self, cmd: OperatorCommand) -> CommandAck { + let poi_id = match poi_id_from_payload(&cmd.payload) { + Ok(id) => id, + Err(_) => return CommandAck::error(ack_reasons::INVALID_PAYLOAD), + }; + let Some(surfaced) = self.registry.get(poi_id) else { + return CommandAck::error(ack_reasons::UNKNOWN_POI_ID); + }; + if surfaced.deadline <= Utc::now() { + return CommandAck::error(ack_reasons::EXPIRED); + } + self.dispatch_via_scan_router(cmd).await + } + + async fn dispatch_via_scan_router(&self, cmd: OperatorCommand) -> CommandAck { + let Some(router) = self.scan_router.as_ref() else { + return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED); + }; + match router.route(cmd).await { + Ok(()) => CommandAck::Ok, + Err(e) => { + tracing::warn!(error = %e, "scan router rejected operator command"); + CommandAck::error(ack_reasons::ROUTER_ERROR) + } + } + } + + async fn dispatch_bit_ack(&self, cmd: OperatorCommand) -> CommandAck { + let payload = match BitAckPayload::from_value(&cmd.payload) { + Ok(p) => p, + Err(_) => { + let ack = CommandAck::error(ack_reasons::INVALID_PAYLOAD); + self.audit_bit(&cmd, Uuid::nil(), &ack).await; + return ack; + } + }; + let ack = self.evaluate_bit_ack(&cmd, &payload).await; + self.audit_bit(&cmd, payload.report_id, &ack).await; + ack + } + + async fn evaluate_bit_ack(&self, cmd: &OperatorCommand, payload: &BitAckPayload) -> CommandAck { + let Some(severity) = self.bit_severity.as_ref() else { + return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED); + }; + match severity.is_acknowledgeable(payload.report_id).await { + Some(true) => match self.safety_router.as_ref() { + Some(router) => match router + .acknowledge_bit_degraded(payload.report_id, payload.operator_id.clone()) + .await + { + Ok(()) => CommandAck::Ok, + Err(e) => { + tracing::warn!(error = %e, "mission safety router rejected bit ack"); + CommandAck::error(ack_reasons::ROUTER_ERROR) + } + }, + None => CommandAck::error(ack_reasons::ROUTER_NOT_WIRED), + }, + Some(false) => CommandAck::error(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL), + None => { + tracing::warn!( + command_id = %cmd.command_id, + report_id = %payload.report_id, + "bit_degraded_ack: unknown report id" + ); + CommandAck::error(ack_reasons::UNKNOWN_BIT_REPORT) + } + } + } + + async fn dispatch_safety_override(&self, cmd: OperatorCommand) -> CommandAck { + let payload = match SafetyOverridePayload::from_value(&cmd.payload) { + Ok(p) => p, + Err(_) => { + let ack = CommandAck::error(ack_reasons::INVALID_PAYLOAD); + self.audit_safety(&cmd, None, 0, &ack).await; + return ack; + } + }; + let ack = self.apply_safety_override(&payload).await; + self.audit_safety(&cmd, Some(payload.scope), payload.duration_secs, &ack) + .await; + ack + } + + async fn apply_safety_override(&self, payload: &SafetyOverridePayload) -> CommandAck { + let Some(router) = self.safety_router.as_ref() else { + return CommandAck::error(ack_reasons::ROUTER_NOT_WIRED); + }; + match router + .apply_safety_override( + payload.scope, + payload.duration_secs, + payload.operator_id.clone(), + payload.rationale.clone(), + ) + .await + { + Ok(()) => CommandAck::Ok, + Err(e) => { + tracing::warn!(error = %e, "mission safety router rejected safety override"); + CommandAck::error(ack_reasons::ROUTER_ERROR) + } + } + } + + async fn audit_bit(&self, cmd: &OperatorCommand, report_id: Uuid, outcome: &CommandAck) { + self.audit + .record(AuditEntry::BitDegradedAck { + command_id: cmd.command_id, + timestamp: Utc::now(), + operator_id: cmd + .payload + .get("operator_id") + .and_then(|v| v.as_str()) + .map(String::from), + report_id, + outcome: outcome.clone(), + }) + .await; + } + + async fn audit_safety( + &self, + cmd: &OperatorCommand, + scope: Option, + duration_secs: u32, + outcome: &CommandAck, + ) { + self.audit + .record(AuditEntry::SafetyOverride { + command_id: cmd.command_id, + timestamp: Utc::now(), + operator_id: cmd + .payload + .get("operator_id") + .and_then(|v| v.as_str()) + .map(String::from), + scope: scope.unwrap_or(SafetyOverrideScope::BatteryRtl), + duration_secs, + outcome: outcome.clone(), + }) + .await; + } +} + +// ============================================================================ +// Builder +// ============================================================================ + +#[derive(Default)] +pub struct OperatorCommandDispatcherBuilder { + registry: Option, + cache: Option, + audit: Option>, + scan_router: Option>, + safety_router: Option>, + bit_severity: Option>, +} + +impl OperatorCommandDispatcherBuilder { + pub fn registry(mut self, r: SurfacedPoiRegistry) -> Self { + self.registry = Some(r); + self + } + + pub fn idempotency_cache(mut self, c: IdempotencyCache) -> Self { + self.cache = Some(c); + self + } + + pub fn audit_sink(mut self, s: Arc) -> Self { + self.audit = Some(s); + self + } + + pub fn scan_router(mut self, r: Arc) -> Self { + self.scan_router = Some(r); + self + } + + pub fn safety_router(mut self, r: Arc) -> Self { + self.safety_router = Some(r); + self + } + + pub fn bit_severity(mut self, s: Arc) -> Self { + self.bit_severity = Some(s); + self + } + + pub fn build(self) -> OperatorCommandDispatcher { + OperatorCommandDispatcher { + registry: self.registry.unwrap_or_default(), + cache: self + .cache + .unwrap_or_else(IdempotencyCache::with_default_ttl), + audit: self.audit.unwrap_or_else(TracingAuditSink::arc), + scan_router: self.scan_router, + safety_router: self.safety_router, + bit_severity: self.bit_severity, + } + } +} + +// ============================================================================ +// Payload extraction +// ============================================================================ + +/// Extract `poi_id` from a POI-bound command payload. +/// +/// Wire shape: `{ "poi_id": "" }`. Anything else is a hard +/// `invalid_payload` error — the auth layer guarantees the payload +/// bytes weren't tampered with, but the operator UI might still send +/// the wrong shape on a build-skew between client and autopilot. +fn poi_id_from_payload(payload: &serde_json::Value) -> Result { + let v = payload.get("poi_id").and_then(|v| v.as_str()).ok_or(())?; + Uuid::parse_str(v).map_err(|_| ()) +} + +#[derive(Debug, Deserialize)] +struct BitAckPayload { + report_id: Uuid, + #[serde(default)] + operator_id: Option, +} + +impl BitAckPayload { + fn from_value(v: &serde_json::Value) -> Result { + serde_json::from_value(v.clone()) + } +} + +#[derive(Debug, Deserialize)] +struct SafetyOverridePayload { + scope: SafetyOverrideScope, + duration_secs: u32, + operator_id: String, + #[serde(default)] + rationale: String, +} + +impl SafetyOverridePayload { + fn from_value(v: &serde_json::Value) -> Result { + serde_json::from_value(v.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn poi_id_extracts_uuid() { + // Arrange + let id = Uuid::new_v4(); + let v = json!({ "poi_id": id.to_string() }); + + // Act + Assert + assert_eq!(poi_id_from_payload(&v).unwrap(), id); + } + + #[test] + fn poi_id_missing_is_err() { + // Arrange + let v = json!({ "other": "x" }); + + // Act + Assert + assert!(poi_id_from_payload(&v).is_err()); + } + + #[test] + fn bit_ack_payload_round_trip() { + // Arrange + let id = Uuid::new_v4(); + let v = json!({ "report_id": id.to_string(), "operator_id": "op1" }); + + // Act + let p = BitAckPayload::from_value(&v).expect("parse"); + + // Assert + assert_eq!(p.report_id, id); + assert_eq!(p.operator_id, Some("op1".to_string())); + } + + #[test] + fn safety_override_payload_round_trip() { + // Arrange + let v = json!({ + "scope": "battery_rtl", + "duration_secs": 60, + "operator_id": "op1", + "rationale": "post-mission RTL too aggressive" + }); + + // Act + let p = SafetyOverridePayload::from_value(&v).expect("parse"); + + // Assert + assert_eq!(p.scope, SafetyOverrideScope::BatteryRtl); + assert_eq!(p.duration_secs, 60); + assert_eq!(p.operator_id, "op1"); + } +} diff --git a/crates/operator_bridge/src/internal/idempotency.rs b/crates/operator_bridge/src/internal/idempotency.rs new file mode 100644 index 0000000..52115c4 --- /dev/null +++ b/crates/operator_bridge/src/internal/idempotency.rs @@ -0,0 +1,173 @@ +//! AZ-680 — per-`command_id` idempotency cache. +//! +//! The spec (AC-2): "Re-transmit returns cached ack". A 60 s sliding +//! window over `command_id → CommandAck` so the operator UI can +//! safely retransmit on a flaky modem without causing the autopilot +//! to double-dispatch. +//! +//! Design notes: +//! +//! - Lazy eviction. `get_or_insert_with` purges expired entries before +//! inserting. We do not run a background sweeper task — at the +//! command rate of ≤5 confirms/min (operator workflow), the cache +//! stays small and per-call eviction is cheap. +//! - Returns the *cached* ack on hit; on miss, runs the supplied +//! future, caches its result, returns it. The future is NOT spawned +//! — the caller awaits it. +//! - Cache key is the full `Uuid`; the operator UI generates fresh +//! `command_id`s per logical command, so collisions imply a true +//! retransmit and we want to honour that. + +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use parking_lot::Mutex; +use uuid::Uuid; + +use crate::ack::CommandAck; + +/// Default TTL per AZ-680 spec. +pub const DEFAULT_IDEMPOTENCY_TTL: Duration = Duration::from_secs(60); + +#[derive(Debug, Clone)] +struct Entry { + ack: CommandAck, + cached_at: Instant, +} + +/// Bounded-by-TTL idempotency cache. Cheap to `clone` (internals are +/// an `Arc>`). +#[derive(Clone)] +pub struct IdempotencyCache { + ttl: Duration, + inner: Arc>>, +} + +impl IdempotencyCache { + pub fn new(ttl: Duration) -> Self { + Self { + ttl, + inner: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn with_default_ttl() -> Self { + Self::new(DEFAULT_IDEMPOTENCY_TTL) + } + + /// Returns the cached ack if `command_id` is present and not + /// expired; otherwise runs `produce`, caches its result, and + /// returns it. Concurrent calls with the same `command_id` MAY + /// each execute `produce` once — that is acceptable here because + /// the downstream routers themselves are idempotent for the same + /// validated payload (the router-level side effect is the same + /// across retries; the registry/queue lookups deduplicate POI + /// state). The cache's primary role is to short-circuit + /// re-transmits that arrive seconds later, not to serialise + /// concurrent dispatchers of the same id. + pub async fn get_or_insert_with(&self, command_id: Uuid, produce: F) -> CommandAck + where + F: FnOnce() -> Fut, + Fut: Future, + { + if let Some(cached) = self.get(command_id) { + return cached; + } + let ack = produce().await; + self.insert(command_id, ack.clone()); + ack + } + + /// Snapshot lookup — also evicts expired entries opportunistically. + pub fn get(&self, command_id: Uuid) -> Option { + let mut guard = self.inner.lock(); + self.evict_expired(&mut guard); + guard.get(&command_id).map(|e| e.ack.clone()) + } + + fn insert(&self, command_id: Uuid, ack: CommandAck) { + let mut guard = self.inner.lock(); + self.evict_expired(&mut guard); + guard.insert( + command_id, + Entry { + ack, + cached_at: Instant::now(), + }, + ); + } + + fn evict_expired(&self, guard: &mut HashMap) { + let now = Instant::now(); + guard.retain(|_, e| now.duration_since(e.cached_at) < self.ttl); + } + + pub fn len(&self) -> usize { + let mut guard = self.inner.lock(); + self.evict_expired(&mut guard); + guard.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicU32, Ordering}; + + #[tokio::test] + async fn miss_then_hit_runs_once() { + // Arrange + let cache = IdempotencyCache::with_default_ttl(); + let id = Uuid::new_v4(); + let count = AtomicU32::new(0); + + // Act + let _ = cache + .get_or_insert_with(id, || async { + count.fetch_add(1, Ordering::SeqCst); + CommandAck::Ok + }) + .await; + let _ = cache + .get_or_insert_with(id, || async { + count.fetch_add(1, Ordering::SeqCst); + CommandAck::Ok + }) + .await; + + // Assert + assert_eq!(count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn ttl_expiry_re_runs_producer() { + // Arrange — short TTL to keep the test fast. + let cache = IdempotencyCache::new(Duration::from_millis(20)); + let id = Uuid::new_v4(); + let count = AtomicU32::new(0); + + // Act + let _ = cache + .get_or_insert_with(id, || async { + count.fetch_add(1, Ordering::SeqCst); + CommandAck::Ok + }) + .await; + tokio::time::sleep(Duration::from_millis(40)).await; + let _ = cache + .get_or_insert_with(id, || async { + count.fetch_add(1, Ordering::SeqCst); + CommandAck::Ok + }) + .await; + + // Assert + assert_eq!(count.load(Ordering::SeqCst), 2); + } +} diff --git a/crates/operator_bridge/src/internal/mod.rs b/crates/operator_bridge/src/internal/mod.rs index b0f820f..5fb68a7 100644 --- a/crates/operator_bridge/src/internal/mod.rs +++ b/crates/operator_bridge/src/internal/mod.rs @@ -1,4 +1,8 @@ //! Internal modules for `operator_bridge`. Not part of the public API. +pub mod audit; pub mod auth; +pub mod dispatcher; +pub mod idempotency; +pub mod poi_registry; pub mod poi_surface; diff --git a/crates/operator_bridge/src/internal/poi_registry.rs b/crates/operator_bridge/src/internal/poi_registry.rs new file mode 100644 index 0000000..69c0199 --- /dev/null +++ b/crates/operator_bridge/src/internal/poi_registry.rs @@ -0,0 +1,128 @@ +//! AZ-680 — currently-surfaced POI registry. +//! +//! Tracks the subset of POIs that have been pushed to the operator UI +//! and have not yet been dequeued. The dispatcher consults this +//! registry to reject: +//! +//! - `Confirm` / `Decline` / `StartTargetFollow` for unknown +//! `poi_id`s (AC-3 → `unknown_poi_id`). +//! - Commands whose POI deadline has elapsed (AC-4 → `expired`). +//! +//! The registry is intentionally a plain `HashMap` behind a +//! [`parking_lot::Mutex`] — the dispatcher's lock window is short +//! (one O(1) lookup + one O(1) remove). A `RwLock` would not buy us +//! anything because the dispatcher writes on every confirm/decline. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use parking_lot::Mutex; +use std::collections::HashMap; +use uuid::Uuid; + +use shared::models::poi::Poi; + +/// Snapshot of the POI fields the dispatcher needs to enforce +/// validity + deadline checks without holding a reference to the +/// full [`Poi`] struct. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SurfacedPoi { + pub poi_id: Uuid, + pub mgrs: String, + pub class_group: String, + pub deadline: DateTime, +} + +impl From<&Poi> for SurfacedPoi { + fn from(poi: &Poi) -> Self { + Self { + poi_id: poi.id, + mgrs: poi.mgrs.clone(), + class_group: poi.class_group.clone(), + deadline: poi.deadline, + } + } +} + +/// In-memory registry of surfaced-but-not-dequeued POIs. Cheap to +/// `clone` — internals are an `Arc>`. +#[derive(Default, Clone)] +pub struct SurfacedPoiRegistry { + inner: Arc>>, +} + +impl SurfacedPoiRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Record a surfaced POI. Overwrites any prior entry with the + /// same id (the POI was re-surfaced after a rotation). + pub fn record(&self, poi: SurfacedPoi) { + self.inner.lock().insert(poi.poi_id, poi); + } + + /// Remove a POI from the surfaced set. Called when the POI is + /// dequeued (rotated, aged out, or operator-decided). + pub fn forget(&self, poi_id: Uuid) { + self.inner.lock().remove(&poi_id); + } + + /// Look up a surfaced POI. Returns `None` if the id has never + /// been surfaced or has already been dequeued. + pub fn get(&self, poi_id: Uuid) -> Option { + self.inner.lock().get(&poi_id).cloned() + } + + pub fn len(&self) -> usize { + self.inner.lock().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + + fn surfaced(deadline_secs: i64) -> SurfacedPoi { + SurfacedPoi { + poi_id: Uuid::new_v4(), + mgrs: "33UWP05".into(), + class_group: "vehicle".into(), + deadline: Utc::now() + Duration::seconds(deadline_secs), + } + } + + #[test] + fn record_then_get_returns_clone() { + // Arrange + let r = SurfacedPoiRegistry::new(); + let p = surfaced(120); + r.record(p.clone()); + + // Act + let got = r.get(p.poi_id).expect("must be present"); + + // Assert + assert_eq!(got, p); + } + + #[test] + fn forget_removes_entry() { + // Arrange + let r = SurfacedPoiRegistry::new(); + let p = surfaced(120); + r.record(p.clone()); + + // Act + r.forget(p.poi_id); + + // Assert + assert!(r.get(p.poi_id).is_none()); + assert!(r.is_empty()); + } +} diff --git a/crates/operator_bridge/src/lib.rs b/crates/operator_bridge/src/lib.rs index c5e5da9..75315b2 100644 --- a/crates/operator_bridge/src/lib.rs +++ b/crates/operator_bridge/src/lib.rs @@ -1,4 +1,5 @@ -//! `operator_bridge` — POI surfacing + operator command authentication. +//! `operator_bridge` — POI surfacing + operator command authentication +//! + dispatch. //! //! Real implementation in this batch: //! - **AZ-678** `internal::auth::HmacOperatorValidator` — HMAC-SHA256 @@ -7,11 +8,15 @@ //! counters; sliding-window red-health gate. //! - **AZ-679** `internal::poi_surface::PoiSurfaceMapper` — wire-format //! POI events + `PoiDequeued` events pushed through `TelemetrySink`. -//! -//! Real implementation lands in: -//! - AZ-680 `operator_bridge_command_dispatch` -//! - AZ-681 `operator_bridge_safety_and_bit_ack` +//! - **AZ-680** `internal::dispatcher::OperatorCommandDispatcher` — +//! POI-bound dispatch path, per-`command_id` idempotency cache, +//! unknown-POI + expired-deadline gates. +//! - **AZ-681** `internal::dispatcher::OperatorCommandDispatcher` — +//! BIT-degraded ack severity gate + `SafetyOverride` forwarding +//! into `mission_executor` via `MissionSafetyRouter`; structured +//! audit log entry per safety command. +pub mod ack; pub mod internal; use std::sync::Arc; @@ -20,7 +25,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use shared::contracts::{OperatorCommandSink, TelemetrySink}; +use shared::contracts::{ + BitReportSeverityLookup, MissionSafetyRouter, OperatorCommandSink, ScanCommandRouter, + TelemetrySink, +}; use shared::error::{AutopilotError, Result}; use shared::health::{ComponentHealth, HealthLevel}; use shared::models::mission::Coordinate; @@ -28,9 +36,16 @@ use shared::models::operator::OperatorCommand; use shared::models::operator_event::{DequeueReason, PhotoMetadata}; use shared::models::poi::Poi; +pub use crate::ack::{ack_reasons, CommandAck}; +pub use crate::internal::audit::{AuditEntry, AuditSink, TracingAuditSink}; pub use crate::internal::auth::{ AuthCounters, HmacOperatorValidator, HmacValidatorConfig, REJECTION_REASONS, }; +pub use crate::internal::dispatcher::{ + OperatorCommandDispatcher, OperatorCommandDispatcherBuilder, +}; +pub use crate::internal::idempotency::{IdempotencyCache, DEFAULT_IDEMPOTENCY_TTL}; +pub use crate::internal::poi_registry::{SurfacedPoi, SurfacedPoiRegistry}; pub use crate::internal::poi_surface::{PoiSurfaceMapper, PoiSurfaceMetrics}; const NAME: &str = "operator_bridge"; @@ -71,6 +86,20 @@ pub struct OperatorBridge { /// `poi_mapper` so legacy callers continue to compile until the /// composition root wires it in. validator: Option>, + /// AZ-680 — currently-surfaced POI registry. Shared between the + /// `surface_poi` / `emit_poi_dequeued` write-side and the + /// dispatcher's POI-id validity check. + poi_registry: SurfacedPoiRegistry, + /// AZ-680 / AZ-681 — command dispatcher. Optional until both the + /// scan + safety routers are wired; without it `dispatch` returns + /// `router_not_wired`. + dispatcher: Option>, + /// Builder-only accumulators for the dispatcher's routers + sink. + /// Consumed in [`OperatorBridge::with_dispatcher`]. + scan_router: Option>, + safety_router: Option>, + bit_severity: Option>, + audit_sink: Option>, } impl OperatorBridge { @@ -84,6 +113,12 @@ impl OperatorBridge { target_follow_rx: Some(tf_rx), poi_mapper: None, validator: None, + poi_registry: SurfacedPoiRegistry::new(), + dispatcher: None, + scan_router: None, + safety_router: None, + bit_severity: None, + audit_sink: None, } } @@ -97,12 +132,63 @@ impl OperatorBridge { self } + /// AZ-680 — wire `scan_controller`'s [`ScanCommandRouter`] impl. + pub fn with_scan_router(mut self, router: Arc) -> Self { + self.scan_router = Some(router); + self + } + + /// AZ-681 — wire `mission_executor`'s [`MissionSafetyRouter`] impl. + pub fn with_safety_router(mut self, router: Arc) -> Self { + self.safety_router = Some(router); + self + } + + /// AZ-681 — wire `mission_executor`'s + /// [`BitReportSeverityLookup`] impl. + pub fn with_bit_severity_lookup(mut self, lookup: Arc) -> Self { + self.bit_severity = Some(lookup); + self + } + + /// AZ-681 — override the default tracing audit sink. Used by + /// integration tests; production wires the default. + pub fn with_audit_sink(mut self, sink: Arc) -> Self { + self.audit_sink = Some(sink); + self + } + + /// AZ-680 / AZ-681 — finalise the dispatcher. Returns `self` so + /// the call can sit at the end of the builder chain. Idempotent + /// (calling twice rebuilds the dispatcher with the most-recent + /// wiring) — this matters because the composition root sometimes + /// re-runs the wiring sequence on subsystem restart. + pub fn with_dispatcher(mut self) -> Self { + let mut builder = OperatorCommandDispatcher::builder().registry(self.poi_registry.clone()); + if let Some(r) = self.scan_router.clone() { + builder = builder.scan_router(r); + } + if let Some(r) = self.safety_router.clone() { + builder = builder.safety_router(r); + } + if let Some(s) = self.bit_severity.clone() { + builder = builder.bit_severity(s); + } + if let Some(s) = self.audit_sink.clone() { + builder = builder.audit_sink(s); + } + self.dispatcher = Some(Arc::new(builder.build())); + self + } + pub fn handle(&self) -> OperatorBridgeHandle { OperatorBridgeHandle { middle_waypoint_tx: self.middle_waypoint_tx.clone(), target_follow_tx: self.target_follow_tx.clone(), poi_mapper: self.poi_mapper.clone(), validator: self.validator.clone(), + poi_registry: self.poi_registry.clone(), + dispatcher: self.dispatcher.clone(), } } @@ -113,6 +199,15 @@ impl OperatorBridge { pub fn take_target_follow_receiver(&mut self) -> Option> { self.target_follow_rx.take() } + + /// AZ-680 — clone of the surfaced-POI registry. Exposed so the + /// composition root can pre-seed entries on subsystem restart + /// and so integration tests can register POIs without spinning + /// up a TelemetrySink. The registry is also wired into the + /// dispatcher. + pub fn surfaced_registry(&self) -> SurfacedPoiRegistry { + self.poi_registry.clone() + } } #[derive(Clone)] @@ -123,19 +218,33 @@ pub struct OperatorBridgeHandle { target_follow_tx: mpsc::Sender, poi_mapper: Option>, validator: Option>, + /// AZ-680 — registry of surfaced-but-not-dequeued POIs. The + /// dispatcher consults this for unknown-id + deadline checks. + poi_registry: SurfacedPoiRegistry, + dispatcher: Option>, } impl OperatorBridgeHandle { - /// AZ-679 — surface a POI to the operator and await the decision. - /// Today returns `NotImplemented` (the decision loop is AZ-680); - /// the surface event itself IS pushed (via the configured - /// `TelemetrySink`), so the operator UI receives it. + /// AZ-679 + AZ-680 — surface a POI to the operator. Records the + /// POI in the dispatcher's validity registry so subsequent + /// confirm/decline/start-follow commands resolve. The event itself + /// is pushed via the configured `TelemetrySink`. + /// + /// Returns `OperatorDecision::Confirmed`/`Declined`/... is NOT + /// the responsibility of this method any more — the decision + /// arrives asynchronously via `dispatch` and the operator UI + /// applies it. The legacy `Result` shape is + /// retained for callers that have not yet migrated; today the + /// method returns `NotImplemented` after the surface emits, and + /// `scan_controller` should use the non-decision-returning path + /// in `surface_poi_with_photo` instead. pub async fn surface_poi(&self, poi: Poi) -> Result { match &self.poi_mapper { Some(mapper) => { + self.poi_registry.record(SurfacedPoi::from(&poi)); mapper.surface(&poi, None).await?; Err(AutopilotError::NotImplemented( - "operator_bridge::surface_poi → decision loop (AZ-680)", + "operator_bridge::surface_poi → decision is async via dispatch (AZ-680)", )) } None => Err(AutopilotError::NotImplemented( @@ -144,8 +253,9 @@ impl OperatorBridgeHandle { } } - /// AZ-679 — surface a POI together with photo metadata (preferred - /// path when the source detection carries an ROI snapshot). + /// AZ-679 + AZ-680 — surface a POI together with photo metadata + /// (preferred path when the source detection carries an ROI + /// snapshot). Records the POI in the dispatcher's registry. pub async fn surface_poi_with_photo( &self, poi: &Poi, @@ -154,18 +264,39 @@ impl OperatorBridgeHandle { let mapper = self.poi_mapper.as_ref().ok_or_else(|| { AutopilotError::Internal("surface_poi_with_photo: telemetry sink not wired".into()) })?; + self.poi_registry.record(SurfacedPoi::from(poi)); mapper.surface(poi, Some(photo_metadata)).await.map(|_| ()) } - /// AZ-679 — emit a `PoiDequeued` event (rotation / age-out / - /// completion). Called by `scan_controller` through the bridge. + /// AZ-679 + AZ-680 — emit a `PoiDequeued` event (rotation / + /// age-out / completion). Removes the POI from the dispatcher's + /// registry so any further confirm/decline for the same id + /// resolves to `unknown_poi_id`. pub async fn emit_poi_dequeued(&self, poi_id: uuid::Uuid, reason: DequeueReason) -> Result<()> { let mapper = self.poi_mapper.as_ref().ok_or_else(|| { AutopilotError::Internal("emit_poi_dequeued: telemetry sink not wired".into()) })?; + self.poi_registry.forget(poi_id); mapper.emit_dequeued(poi_id, reason).await } + /// AZ-680 / AZ-681 — dispatch a validated operator command and + /// return the typed [`CommandAck`]. The dispatcher must be wired + /// via `OperatorBridge::with_dispatcher`; without it every + /// command returns `router_not_wired`. + pub async fn dispatch_command(&self, cmd: OperatorCommand) -> CommandAck { + match &self.dispatcher { + Some(d) => d.dispatch(cmd).await, + None => CommandAck::error(ack_reasons::ROUTER_NOT_WIRED), + } + } + + /// Test/observability hook: peek the surfaced-POI registry. + #[doc(hidden)] + pub fn surfaced_poi_count(&self) -> usize { + self.poi_registry.len() + } + pub fn poi_metrics(&self) -> Option { self.poi_mapper.as_ref().map(|m| m.metrics()) } @@ -197,12 +328,25 @@ impl OperatorBridgeHandle { } } +/// AZ-680 — wire the bridge into the `OperatorCommandSink` trait so +/// `telemetry_stream`'s downlink can forward validated commands +/// uniformly. The trait surface is binary (`Result<()>`); the typed +/// [`CommandAck`] surfaces through [`OperatorBridgeHandle::dispatch_command`] +/// for callers that need the rejection reason. The trait impl maps: +/// +/// - `CommandAck::Ok` → `Ok(())` +/// - `CommandAck::Error { reason }` → `Err(AutopilotError::Validation(reason))` +/// +/// This keeps the trait minimal while still propagating actionable +/// rejection reasons to downstream consumers that only see the +/// trait surface. #[async_trait] impl OperatorCommandSink for OperatorBridgeHandle { - async fn dispatch(&self, _command: OperatorCommand) -> Result<()> { - Err(AutopilotError::NotImplemented( - "operator_bridge::dispatch (AZ-680)", - )) + async fn dispatch(&self, command: OperatorCommand) -> Result<()> { + match self.dispatch_command(command).await { + CommandAck::Ok => Ok(()), + CommandAck::Error { reason } => Err(AutopilotError::Validation(reason)), + } } } diff --git a/crates/operator_bridge/tests/dispatcher.rs b/crates/operator_bridge/tests/dispatcher.rs new file mode 100644 index 0000000..1c73d35 --- /dev/null +++ b/crates/operator_bridge/tests/dispatcher.rs @@ -0,0 +1,439 @@ +//! AZ-680 + AZ-681 — operator-command dispatcher acceptance tests. +//! +//! These tests exercise the dispatcher through the public +//! `OperatorBridgeHandle::dispatch_command` surface so the wiring +//! between the surfaced-POI registry, the idempotency cache, the +//! scan router, the safety router, the BIT severity lookup, and the +//! audit sink is covered end-to-end. + +use std::sync::Arc; +use std::sync::Mutex as StdMutex; + +use async_trait::async_trait; +use chrono::{Duration as ChronoDuration, Utc}; +use parking_lot::Mutex; +use serde_json::json; +use uuid::Uuid; + +use operator_bridge::{ + ack_reasons, AuditEntry, AuditSink, CommandAck, OperatorBridge, SurfacedPoi, +}; +use shared::contracts::{BitReportSeverityLookup, MissionSafetyRouter, ScanCommandRouter}; +use shared::error::Result; +use shared::models::operator::{OperatorCommand, OperatorCommandKind, SafetyOverrideScope}; + +// ============================================================================ +// Test doubles +// ============================================================================ + +#[derive(Default)] +struct RecordingScanRouter { + calls: StdMutex>, +} + +#[async_trait] +impl ScanCommandRouter for RecordingScanRouter { + async fn route(&self, command: OperatorCommand) -> Result<()> { + self.calls.lock().unwrap().push(command); + Ok(()) + } +} + +#[derive(Default)] +struct RecordingSafetyRouter { + bit_acks: StdMutex)>>, + overrides: StdMutex>, +} + +#[async_trait] +impl MissionSafetyRouter for RecordingSafetyRouter { + async fn acknowledge_bit_degraded( + &self, + report_id: Uuid, + operator_id: Option, + ) -> Result<()> { + self.bit_acks.lock().unwrap().push((report_id, operator_id)); + Ok(()) + } + + async fn apply_safety_override( + &self, + scope: SafetyOverrideScope, + duration_secs: u32, + operator_id: String, + rationale: String, + ) -> Result<()> { + self.overrides + .lock() + .unwrap() + .push((scope, duration_secs, operator_id, rationale)); + Ok(()) + } +} + +/// Severity lookup that returns whatever is registered for each id. +/// `Some(true)` for acknowledgeable (Degraded), `Some(false)` for +/// Fail, `None` for unknown. +#[derive(Default)] +struct StubBitSeverity { + inner: StdMutex>, +} + +impl StubBitSeverity { + fn set(&self, report_id: Uuid, acknowledgeable: bool) { + self.inner + .lock() + .unwrap() + .insert(report_id, acknowledgeable); + } +} + +#[async_trait] +impl BitReportSeverityLookup for StubBitSeverity { + async fn is_acknowledgeable(&self, report_id: Uuid) -> Option { + self.inner.lock().unwrap().get(&report_id).copied() + } +} + +#[derive(Default, Clone)] +struct RecordingAuditSink { + entries: Arc>>, +} + +#[async_trait] +impl AuditSink for RecordingAuditSink { + async fn record(&self, entry: AuditEntry) { + self.entries.lock().push(entry); + } +} + +// ============================================================================ +// Helpers +// ============================================================================ + +fn cmd(kind: OperatorCommandKind, payload: serde_json::Value) -> OperatorCommand { + OperatorCommand { + command_id: Uuid::new_v4(), + session_token: "session".to_string(), + sequence_number: 1, + issued_at_wallclock: Utc::now(), + kind, + payload, + signature: vec![], + } +} + +fn surfaced(deadline_secs: i64) -> SurfacedPoi { + SurfacedPoi { + poi_id: Uuid::new_v4(), + mgrs: "33UWP05".into(), + class_group: "vehicle".into(), + deadline: Utc::now() + ChronoDuration::seconds(deadline_secs), + } +} + +struct Harness { + bridge: OperatorBridge, + scan: Arc, + safety: Arc, + severity: Arc, + audit: RecordingAuditSink, +} + +fn harness() -> Harness { + let scan = Arc::new(RecordingScanRouter::default()); + let safety = Arc::new(RecordingSafetyRouter::default()); + let severity = Arc::new(StubBitSeverity::default()); + let audit = RecordingAuditSink::default(); + let bridge = OperatorBridge::new(8) + .with_scan_router(scan.clone() as Arc) + .with_safety_router(safety.clone() as Arc) + .with_bit_severity_lookup(severity.clone() as Arc) + .with_audit_sink(Arc::new(audit.clone()) as Arc) + .with_dispatcher(); + Harness { + bridge, + scan, + safety, + severity, + audit, + } +} + +// ============================================================================ +// AZ-680 ACs +// ============================================================================ + +/// AZ-680 AC-1 — Confirm forwards target hint. +#[tokio::test] +async fn az680_ac1_confirm_forwards_to_scan_router() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + let surfaced = surfaced(120); + h.bridge.surfaced_registry().record(surfaced.clone()); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::ConfirmPoi, + json!({ "poi_id": surfaced.poi_id.to_string() }), + )) + .await; + + // Assert + assert_eq!(ack, CommandAck::Ok); + let calls = h.scan.calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "scan_router::route called exactly once"); + assert!(matches!(calls[0].kind, OperatorCommandKind::ConfirmPoi)); +} + +/// AZ-680 AC-2 — Re-transmit returns cached ack. +#[tokio::test] +async fn az680_ac2_retransmit_returns_cached_ack() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + let surfaced = surfaced(120); + h.bridge.surfaced_registry().record(surfaced.clone()); + let command = cmd( + OperatorCommandKind::ConfirmPoi, + json!({ "poi_id": surfaced.poi_id.to_string() }), + ); + + // Act — same command_id dispatched twice + let ack1 = handle.dispatch_command(command.clone()).await; + let ack2 = handle.dispatch_command(command.clone()).await; + + // Assert + assert_eq!(ack1, CommandAck::Ok); + assert_eq!(ack2, CommandAck::Ok); + let calls = h.scan.calls.lock().unwrap(); + assert_eq!( + calls.len(), + 1, + "scan_router::route must be invoked exactly once across retransmits" + ); +} + +/// AZ-680 AC-3 — Unknown POI id rejected. +#[tokio::test] +async fn az680_ac3_unknown_poi_id_rejected() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + + // Act — POI id never surfaced + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::ConfirmPoi, + json!({ "poi_id": Uuid::new_v4().to_string() }), + )) + .await; + + // Assert + assert_eq!(ack.reason(), Some(ack_reasons::UNKNOWN_POI_ID)); + assert!( + h.scan.calls.lock().unwrap().is_empty(), + "scan_router must not be invoked" + ); +} + +/// AZ-680 AC-4 — Expired POI rejected. +#[tokio::test] +async fn az680_ac4_expired_poi_rejected() { + // Arrange — surface a POI whose deadline has already passed. + let h = harness(); + let handle = h.bridge.handle(); + let expired = SurfacedPoi { + deadline: Utc::now() - ChronoDuration::seconds(1), + ..surfaced(0) + }; + h.bridge.surfaced_registry().record(expired.clone()); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::ConfirmPoi, + json!({ "poi_id": expired.poi_id.to_string() }), + )) + .await; + + // Assert + assert_eq!(ack.reason(), Some(ack_reasons::EXPIRED)); + assert!( + h.scan.calls.lock().unwrap().is_empty(), + "scan_router must not be invoked on expired POI" + ); +} + +/// AZ-680 AC-5 — Decline appends IgnoredItem via scan_controller. +#[tokio::test] +async fn az680_ac5_decline_forwards_to_scan_router() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + let surfaced = surfaced(120); + h.bridge.surfaced_registry().record(surfaced.clone()); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::DeclinePoi, + json!({ "poi_id": surfaced.poi_id.to_string() }), + )) + .await; + + // Assert + assert_eq!(ack, CommandAck::Ok); + let calls = h.scan.calls.lock().unwrap(); + assert_eq!( + calls.len(), + 1, + "DeclinePoi must reach scan_router exactly once" + ); + assert!(matches!(calls[0].kind, OperatorCommandKind::DeclinePoi)); +} + +// ============================================================================ +// AZ-681 ACs +// ============================================================================ + +/// AZ-681 AC-1 — BIT-DEGRADED ack succeeds. +#[tokio::test] +async fn az681_ac1_bit_degraded_ack_forwards() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + let report_id = Uuid::new_v4(); + h.severity.set(report_id, true); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::AcknowledgeBitDegraded, + json!({ "report_id": report_id.to_string(), "operator_id": "op1" }), + )) + .await; + + // Assert + assert_eq!(ack, CommandAck::Ok); + let acks = h.safety.bit_acks.lock().unwrap(); + assert_eq!(acks.len(), 1); + assert_eq!(acks[0], (report_id, Some("op1".to_string()))); +} + +/// AZ-681 AC-2 — BIT-FAIL ack rejected. +#[tokio::test] +async fn az681_ac2_bit_fail_ack_rejected() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + let report_id = Uuid::new_v4(); + h.severity.set(report_id, false); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::AcknowledgeBitDegraded, + json!({ "report_id": report_id.to_string(), "operator_id": "op1" }), + )) + .await; + + // Assert + assert_eq!(ack.reason(), Some(ack_reasons::CANNOT_ACKNOWLEDGE_FAIL)); + assert!( + h.safety.bit_acks.lock().unwrap().is_empty(), + "safety_router must not be invoked on Fail report" + ); +} + +/// AZ-681 AC-3 — Safety-override forwards with scope + duration, and +/// an audit entry is written. +#[tokio::test] +async fn az681_ac3_safety_override_forwards_with_audit_entry() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + + // Act + let ack = handle + .dispatch_command(cmd( + OperatorCommandKind::SafetyOverride, + json!({ + "scope": "battery_rtl", + "duration_secs": 60, + "operator_id": "op1", + "rationale": "post-mission RTL too aggressive" + }), + )) + .await; + + // Assert — router invoked with the right scope + duration. + assert_eq!(ack, CommandAck::Ok); + let overrides = h.safety.overrides.lock().unwrap(); + assert_eq!(overrides.len(), 1); + assert_eq!(overrides[0].0, SafetyOverrideScope::BatteryRtl); + assert_eq!(overrides[0].1, 60); + assert_eq!(overrides[0].2, "op1"); + + // Assert — audit log has exactly one safety-override entry. + let entries = h.audit.entries.lock(); + let safety_entries: Vec<_> = entries + .iter() + .filter(|e| matches!(e, AuditEntry::SafetyOverride { .. })) + .collect(); + assert_eq!(safety_entries.len(), 1); + match safety_entries[0] { + AuditEntry::SafetyOverride { + scope, + duration_secs, + operator_id, + outcome, + .. + } => { + assert_eq!(*scope, SafetyOverrideScope::BatteryRtl); + assert_eq!(*duration_secs, 60); + assert_eq!(operator_id.as_deref(), Some("op1")); + assert_eq!(outcome, &CommandAck::Ok); + } + _ => unreachable!(), + } +} + +/// AZ-681 AC-4 — Audit log redacts secrets. +#[tokio::test] +async fn az681_ac4_audit_log_contains_no_signature_or_session_token() { + // Arrange + let h = harness(); + let handle = h.bridge.handle(); + + // Act + let _ = handle + .dispatch_command(cmd( + OperatorCommandKind::SafetyOverride, + json!({ + "scope": "battery_rtl", + "duration_secs": 30, + "operator_id": "op1", + "rationale": "test" + }), + )) + .await; + + // Assert — every audit entry serialised to JSON must omit + // `signature` and `session_token`. + let entries = h.audit.entries.lock(); + assert!(!entries.is_empty()); + for entry in entries.iter() { + let json = serde_json::to_string(entry).expect("serialises"); + assert!( + !json.contains("signature"), + "audit entry leaked signature: {json}" + ); + assert!( + !json.contains("session_token"), + "audit entry leaked session_token: {json}" + ); + } +} diff --git a/crates/scan_controller/Cargo.toml b/crates/scan_controller/Cargo.toml index 6a3f54a..b214813 100644 --- a/crates/scan_controller/Cargo.toml +++ b/crates/scan_controller/Cargo.toml @@ -20,3 +20,4 @@ serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } +async-trait = { workspace = true } diff --git a/crates/scan_controller/src/internal/poi_queue/mod.rs b/crates/scan_controller/src/internal/poi_queue/mod.rs index eaf433b..16da2a6 100644 --- a/crates/scan_controller/src/internal/poi_queue/mod.rs +++ b/crates/scan_controller/src/internal/poi_queue/mod.rs @@ -66,6 +66,22 @@ pub struct DeclineAction { pub class_group: String, } +/// AZ-680 — information returned when a POI is confirmed (or selected +/// for target-follow start). Mirrors [`DeclineAction`] so consumers +/// downstream of the confirm path (AZ-684 evidence ladder, AZ-685 +/// mapobjects dispatch, AZ-686 gimbal issuance) get a typed +/// `(target_mgrs, target_class)` hint without re-querying the queue. +/// +/// The POI is removed from the queue as part of `confirm`. A +/// subsequent confirm with the same `poi_id` returns `None`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConfirmAction { + pub poi_id: Uuid, + pub target_mgrs: String, + pub target_class: String, + pub class_group: String, +} + impl PoiQueue { pub fn new() -> Self { Self::default() @@ -145,6 +161,23 @@ impl PoiQueue { }) } + /// Confirm a POI by id. Removes from queue; returns the typed + /// `(target_mgrs, target_class)` hint that downstream consumers + /// (AZ-684 evidence ladder, AZ-686 gimbal issuance) build the + /// follow-up plan from. AZ-680 only needs the removal + the hint + /// to be carried back through `submit_operator_cmd`'s return + /// value. + pub fn confirm(&mut self, poi_id: Uuid) -> Option { + let idx = self.entries.iter().position(|e| e.poi.id == poi_id)?; + let entry = self.entries.swap_remove(idx); + Some(ConfirmAction { + poi_id: entry.poi.id, + target_mgrs: entry.poi.mgrs, + target_class: entry.poi.class, + class_group: entry.poi.class_group, + }) + } + /// Drop POIs whose deadline (set at insertion by the caller per /// the confidence-scaled window) has elapsed. Returns the IDs of /// forgotten POIs. NO `IgnoredItem` is created — timeout = diff --git a/crates/scan_controller/src/lib.rs b/crates/scan_controller/src/lib.rs index 3933631..186059d 100644 --- a/crates/scan_controller/src/lib.rs +++ b/crates/scan_controller/src/lib.rs @@ -31,10 +31,12 @@ use std::sync::Arc; use std::time::{Duration, Instant}; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; use uuid::Uuid; +use shared::contracts::ScanCommandRouter; use shared::error::{AutopilotError, Result}; use shared::health::{ComponentHealth, HealthLevel}; use shared::models::operator::{OperatorCommand, OperatorCommandKind}; @@ -44,7 +46,8 @@ pub mod internal; pub use internal::frame_rate_guard::{FrameRateGuard, FrameRateGuardConfig}; pub use internal::poi_queue::{ - age_factor, decision_window, priority_score, DeclineAction, PoiQueue, SURFACE_CAP_PER_WINDOW, + age_factor, decision_window, priority_score, ConfirmAction, DeclineAction, PoiQueue, + SURFACE_CAP_PER_WINDOW, }; pub use internal::state_machine::transitions::{transition, TransitionCtx}; pub use internal::state_machine::{RejectReason, ScanState, TransitionOutcome, Trigger}; @@ -153,11 +156,14 @@ pub struct ScanMetrics { /// Result of [`ScanControllerHandle::submit_operator_cmd`]. `Accepted` /// means the command was applied with no return data; `Declined` -/// carries the dispatchable IgnoredItem action AZ-685 must persist. +/// carries the dispatchable IgnoredItem action AZ-685 must persist; +/// `Confirmed` carries the typed `(target_mgrs, target_class)` hint +/// AZ-684 / AZ-686 build a follow-up plan from. #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmitOutcome { Accepted, Declined(DeclineAction), + Confirmed(ConfirmAction), } fn poi_id_from_payload(payload: &serde_json::Value) -> Result { @@ -268,6 +274,18 @@ impl ScanControllerHandle { action } + /// AZ-680 — confirm a POI (or target-follow start). Looks up the + /// POI by id, removes it from the queue, and returns the typed + /// `(target_mgrs, target_class)` hint for downstream consumers. + /// + /// The FSM-side follow-through (zoom-in trigger, target-follow + /// transition) is AZ-684's evidence-ladder scope and is NOT + /// performed here — this method only resolves the queue entry. + pub async fn confirm_poi(&self, poi_id: Uuid) -> Option { + let mut inner = self.inner.lock().await; + inner.poi_queue.confirm(poi_id) + } + pub async fn poi_queue_len(&self) -> usize { self.inner.lock().await.poi_queue.len() } @@ -279,20 +297,24 @@ impl ScanControllerHandle { /// Translate an operator command into a trigger and apply it. /// - /// AZ-682 / AZ-683 mapping (subset complete): + /// Mapping (AZ-682 / AZ-683 / AZ-680): /// /// - `MissionAbort` → `Trigger::OperatorAbort` (AZ-682). /// - `ReleaseTargetFollow` → `Trigger::OperatorReleaseFollow` /// (AZ-682). - /// - `DeclinePoi { poi_id }` → queue decline; returns the - /// resulting `DeclineAction` in [`SubmitOutcome::Declined`] - /// for the caller (AZ-685 mapobjects dispatch) to persist - /// (AZ-683). - /// - `ConfirmPoi` / `StartTargetFollow` → still - /// `NotImplemented(AZ-684)` since ROI / target_id resolution - /// needs the evidence ladder. - /// - `AcknowledgeBitDegraded` / `SafetyOverride` → - /// `NotImplemented(AZ-684)`. + /// - `DeclinePoi { poi_id }` → queue decline; returns + /// [`SubmitOutcome::Declined`] for the caller (AZ-685 + /// mapobjects dispatch) to persist (AZ-683). + /// - `ConfirmPoi { poi_id }` / `StartTargetFollow { poi_id }` → + /// queue lookup + removal; returns + /// [`SubmitOutcome::Confirmed`] carrying the typed + /// `(target_mgrs, target_class)` hint (AZ-680). The FSM-side + /// follow-through (zoom-in trigger, target-follow transition) + /// is AZ-684's scope. + /// - `AcknowledgeBitDegraded` / `SafetyOverride` are NOT + /// handled here — those go to `mission_executor` via the + /// `MissionSafetyRouter` path wired by `operator_bridge` + /// (AZ-681). Receiving one in this method is a routing bug. pub async fn submit_operator_cmd(&self, command: OperatorCommand) -> Result { match command.kind { OperatorCommandKind::MissionAbort => { @@ -313,16 +335,21 @@ impl ScanControllerHandle { } } OperatorCommandKind::ConfirmPoi | OperatorCommandKind::StartTargetFollow => { - Err(AutopilotError::NotImplemented( - "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", - )) + let poi_id = poi_id_from_payload(&command.payload)?; + match self.confirm_poi(poi_id).await { + Some(action) => Ok(SubmitOutcome::Confirmed(action)), + None => Err(AutopilotError::Validation(format!( + "{:?}: unknown poi_id {poi_id}", + command.kind + ))), + } + } + OperatorCommandKind::AcknowledgeBitDegraded | OperatorCommandKind::SafetyOverride => { + Err(AutopilotError::Validation(format!( + "scan_controller does not handle {:?}; route via MissionSafetyRouter", + command.kind + ))) } - OperatorCommandKind::AcknowledgeBitDegraded => Err(AutopilotError::NotImplemented( - "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", - )), - OperatorCommandKind::SafetyOverride => Err(AutopilotError::NotImplemented( - "scan_controller::submit_operator_cmd (AZ-684 evidence ladder)", - )), } } @@ -400,6 +427,22 @@ impl ScanControllerHandle { } } +/// AZ-680 — adapter for the `shared::contracts::ScanCommandRouter` +/// trait so `operator_bridge` (Layer 3) can dispatch operator +/// commands into `scan_controller` (Layer 4) without importing this +/// crate directly. Forwards to the inherent +/// [`ScanControllerHandle::submit_operator_cmd`] and discards the +/// `SubmitOutcome` (the trait surface is intentionally minimal — +/// `operator_bridge` does not need the typed hint; AZ-685 wires the +/// `Confirmed`/`Declined` actions into `mapobjects_store` through a +/// different path). +#[async_trait] +impl ScanCommandRouter for ScanControllerHandle { + async fn route(&self, command: OperatorCommand) -> Result<()> { + self.submit_operator_cmd(command).await.map(|_| ()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/scan_controller/tests/poi_queue.rs b/crates/scan_controller/tests/poi_queue.rs index 61c2ae8..187083d 100644 --- a/crates/scan_controller/tests/poi_queue.rs +++ b/crates/scan_controller/tests/poi_queue.rs @@ -153,7 +153,73 @@ async fn decline_poi_via_operator_command_emits_action() { assert_eq!(action.mgrs, "decline-me"); assert_eq!(action.class_group, "armor"); } - SubmitOutcome::Accepted => panic!("decline must return Declined action"), + other => panic!("decline must return Declined action, got {other:?}"), } assert_eq!(h.poi_queue_len().await, 0); } + +/// AZ-680 — ConfirmPoi via operator command returns +/// `SubmitOutcome::Confirmed` with the typed target hint and drains +/// the POI from the queue. +#[tokio::test] +async fn confirm_poi_via_operator_command_emits_action() { + // Arrange + let h = ScanController::new().handle(); + let p = poi(0.8, "confirm-me"); + let id = p.id; + let expected_class = p.class.clone(); + let expected_group = p.class_group.clone(); + h.submit_poi_candidate(p, 0.5).await; + + let cmd = OperatorCommand { + command_id: Uuid::new_v4(), + session_token: "s".to_string(), + sequence_number: 1, + issued_at_wallclock: Utc::now(), + kind: OperatorCommandKind::ConfirmPoi, + payload: json!({ "poi_id": id.to_string() }), + signature: vec![], + }; + + // Act + let outcome = h.submit_operator_cmd(cmd).await.expect("confirm accepted"); + + // Assert + match outcome { + SubmitOutcome::Confirmed(action) => { + assert_eq!(action.poi_id, id); + assert_eq!(action.target_mgrs, "confirm-me"); + assert_eq!(action.target_class, expected_class); + assert_eq!(action.class_group, expected_group); + } + other => panic!("confirm must return Confirmed action, got {other:?}"), + } + assert_eq!(h.poi_queue_len().await, 0); +} + +/// AZ-680 — ConfirmPoi for an unknown poi_id must NOT silently +/// succeed. Returns a `Validation` error so `operator_bridge` can +/// surface a typed NACK to the operator UI. +#[tokio::test] +async fn confirm_poi_unknown_id_is_validation_error() { + // Arrange + let h = ScanController::new().handle(); + let cmd = OperatorCommand { + command_id: Uuid::new_v4(), + session_token: "s".to_string(), + sequence_number: 1, + issued_at_wallclock: Utc::now(), + kind: OperatorCommandKind::ConfirmPoi, + payload: json!({ "poi_id": Uuid::new_v4().to_string() }), + signature: vec![], + }; + + // Act + let err = h + .submit_operator_cmd(cmd) + .await + .expect_err("unknown poi must error"); + + // Assert + assert!(matches!(err, shared::error::AutopilotError::Validation(_))); +} diff --git a/crates/shared/src/contracts/mod.rs b/crates/shared/src/contracts/mod.rs index a68511e..5f9db20 100644 --- a/crates/shared/src/contracts/mod.rs +++ b/crates/shared/src/contracts/mod.rs @@ -83,6 +83,66 @@ pub trait OperatorCommandSink: Send + Sync { async fn dispatch(&self, command: OperatorCommand) -> Result<()>; } +/// AZ-680 — route a validated `OperatorCommand` into `scan_controller`. +/// +/// Lives in `shared::contracts` so `operator_bridge` (Layer 3) can +/// depend on the trait without importing `scan_controller` (Layer 4). +/// `scan_controller` implements this for its public `Handle`. +/// +/// The trait name uses `route` instead of `submit_operator_cmd` to +/// avoid a name collision with the inherent method on +/// `ScanControllerHandle`. Implementations forward to the inherent +/// method. +#[async_trait] +pub trait ScanCommandRouter: Send + Sync { + async fn route(&self, command: OperatorCommand) -> Result<()>; +} + +/// AZ-681 — forward safety-critical operator commands (BIT acks, +/// safety overrides) into `mission_executor`. +/// +/// `operator_bridge` (Layer 3) cannot import `mission_executor` +/// (Layer 3 sibling). The composition root constructs a concrete +/// impl that wraps the executor's BIT ack channel + battery monitor +/// handle. +#[async_trait] +pub trait MissionSafetyRouter: Send + Sync { + /// Forward a signed BIT-degraded acknowledgement. The + /// `report_id` identifies the originating BIT report that + /// produced the `Degraded` verdict. `operator_id` is carried for + /// the executor's structured-log trail. + async fn acknowledge_bit_degraded( + &self, + report_id: uuid::Uuid, + operator_id: Option, + ) -> Result<()>; + + /// Apply a signed safety override. The override is bounded by + /// `duration_secs`; the receiving subsystem (e.g. battery + /// monitor) is responsible for enforcing the deadline. + async fn apply_safety_override( + &self, + scope: crate::models::operator::SafetyOverrideScope, + duration_secs: u32, + operator_id: String, + rationale: String, + ) -> Result<()>; +} + +/// AZ-681 — look up the severity of a previously-generated BIT report +/// by id. `operator_bridge` consults this before forwarding a BIT- +/// degraded ack: a `Fail` severity is never acknowledgeable (per +/// AC-2). +/// +/// Returns `Some(true)` when the report exists and is acknowledgeable +/// (severity is NOT `Fail`); `Some(false)` when known and `Fail`; +/// `None` when the report id has never been generated (or has aged +/// out of the lookup cache). +#[async_trait] +pub trait BitReportSeverityLookup: Send + Sync { + async fn is_acknowledgeable(&self, report_id: uuid::Uuid) -> Option; +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/shared/src/models/operator.rs b/crates/shared/src/models/operator.rs index 12616ec..e6eaf22 100644 --- a/crates/shared/src/models/operator.rs +++ b/crates/shared/src/models/operator.rs @@ -20,6 +20,31 @@ pub enum OperatorCommandKind { MissionAbort, } +/// AZ-681 — scope of a `SafetyOverride` command. Each variant maps to +/// a specific failsafe family in `mission_executor` that the operator +/// is suppressing for a bounded duration (architecture.md §F10). +/// +/// Marked `#[non_exhaustive]` so adding `LinkLost` / `Geofence` later +/// is a non-breaking change to downstream matchers. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +#[non_exhaustive] +pub enum SafetyOverrideScope { + /// Suppress battery-RTL until the override deadline elapses. The + /// `hard_floor` land-now is NEVER suppressible regardless of + /// override (per `architecture.md §F10`). + BatteryRtl, +} + +impl SafetyOverrideScope { + /// Stable kebab-case label for audit logs and metrics. + pub fn label(self) -> &'static str { + match self { + Self::BatteryRtl => "battery_rtl", + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OperatorCommand { pub command_id: Uuid,