//! `mission_executor` — multirotor + fixed-wing FSMs, geofence, failsafe. //! //! AZ-648 lands the variant-aware state machine, the per-transition //! retry budget, and the broadcast event stream. Subsequent tasks add: //! - AZ-649 telemetry forwarding (wires real `Telemetry` from `mavlink_layer`) //! - AZ-650 BIT F9 //! - AZ-651 lost-link ladder //! - AZ-652 safety + resume + middle-waypoint insert //! //! The FSM core is variant-agnostic; per-variant transition tables in //! [`internal::multirotor`] and [`internal::fixed_wing`] supply the //! allowed state graph. Each transition is either: //! - **Pure** — advances when its `Telemetry` guard returns `Ready`; //! no driver call is issued. //! - **Action-bearing** — invokes [`MissionDriver`] (arm, takeoff, //! mission upload, set-auto, post-flight sync) and only advances on //! `Ok(())`. On `Err` the per-transition retry counter increments; //! on cap exhaustion the FSM moves to [`MissionState::Paused`] and //! health flips to red. use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, watch, Mutex}; use tokio::task::JoinHandle; use tokio::time::Instant; use shared::error::{AutopilotError, Result}; use shared::health::ComponentHealth; use shared::models::mission::{Coordinate, MissionItem, MissionWaypoint}; mod internal; pub use internal::battery_thresholds::{ BatteryAction, BatteryCommandIssuer, BatteryConfig, BatteryDriver, BatteryEvent, BatteryMonitor, BatteryMonitorHandle, BatteryOverride, MavlinkBatteryCommandIssuer, MAV_CMD_NAV_LAND, }; pub use internal::bit::{ BitController, BitControllerConfig, BitControllerHandle, BitDegradedAck, BitEvaluator, BitEvent, BitItem, BitItemStatus, BitOverall, BitReport, BitState, }; pub use internal::bit_evaluators::{ MapObjectsSyncedEvaluator, MissionLoadedEvaluator, StateDirFreeSpaceEvaluator, WallClockBoundEvaluator, }; pub use internal::driver::{DriverError, MissionDriver}; pub use internal::geofence::{ GeofenceCommandIssuer, GeofenceDriver, GeofenceEvent, GeofenceMonitor, GeofenceMonitorHandle, GeofenceVerdict, MavlinkGeofenceCommandIssuer, }; pub use internal::lost_link::{ LadderEvent, LadderInput, LadderOutput, LadderState, LostLinkCommandIssuer, LostLinkConfig, LostLinkDriver, LostLinkLadder, LostLinkLadderHandle, MavlinkCommandIssuer, MAV_CMD_NAV_RETURN_TO_LAUNCH, }; pub use internal::middle_waypoint::{MiddleWaypointHint, MissionRePlanner}; pub use internal::post_flight::{MapObjectsDiffSource, MapObjectsPusher, PostFlightPusher}; pub use internal::safety_dispatch::SafetyDispatchHandle; pub use internal::telemetry::{ Consumer, DropCountingReceiver, MavlinkProjection, TelemetryForwarder, }; pub use internal::types::{ MissionState, StepOutcome, Telemetry, TransitionEvent, TransitionKey, Variant, }; use internal::fsm::{step_one, FsmCore}; const NAME: &str = "mission_executor"; /// Default per-transition retry budget per AZ-648 §Non-Functional Requirements. pub const DEFAULT_RETRY_CAP: u32 = 3; /// Default tick interval. ≤10 ms p99 budget per AZ-648; we tick at 50 Hz /// so each tick has ample headroom for one driver call. pub const DEFAULT_TICK: Duration = Duration::from_millis(20); /// FSM construction parameters. #[derive(Debug, Clone)] pub struct MissionExecutorConfig { pub variant: Variant, /// Multirotor only. Ignored for fixed-wing. pub takeoff_altitude_m: f32, /// Default = [`DEFAULT_RETRY_CAP`]. pub retry_cap: u32, /// Default = [`DEFAULT_TICK`]. pub tick_interval: Duration, /// Broadcast channel capacity for [`TransitionEvent`]. Consumers /// that lag past this fall behind and lose events; transitions /// themselves still happen. pub event_channel_capacity: usize, } impl MissionExecutorConfig { pub fn multirotor(takeoff_altitude_m: f32) -> Self { Self { variant: Variant::Multirotor, takeoff_altitude_m, retry_cap: DEFAULT_RETRY_CAP, tick_interval: DEFAULT_TICK, event_channel_capacity: 64, } } pub fn fixed_wing() -> Self { Self { variant: Variant::FixedWing, takeoff_altitude_m: 0.0, retry_cap: DEFAULT_RETRY_CAP, tick_interval: DEFAULT_TICK, event_channel_capacity: 64, } } } // Legacy enums retained for AZ-651 / AZ-652 to consume. Not part of the // AZ-648 surface but still publicly exported to keep the public crate // API stable. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum FailsafeKind { LinkDegraded, LinkLost, LinkLostInFollow, BatteryRtl, BatteryHardFloor, GeofenceInclusion, GeofenceExclusion, } /// Top-level executor. Construct, then call [`MissionExecutor::run`] /// to spawn the FSM task. The returned [`MissionExecutorHandle`] is /// the read-side: state, health, transition event subscription. pub struct MissionExecutor { config: MissionExecutorConfig, } impl MissionExecutor { pub fn new(config: MissionExecutorConfig) -> Self { Self { config } } /// Spawn the FSM driver. Returns a handle to read state and a join /// handle for the background task. /// /// `telemetry_rx` is a `watch::Receiver` so the producer (the /// `mavlink_layer` telemetry forwarder per AZ-649) can publish the /// latest snapshot without back-pressure. Each tick reads the /// current value; missed intermediate updates are intentionally /// dropped (the guards are level-triggered). pub fn run( &self, driver: Arc, mission: Vec, telemetry_rx: watch::Receiver, ) -> (MissionExecutorHandle, JoinHandle<()>) where D: MissionDriver + 'static, { let (events_tx, _events_rx) = broadcast::channel(self.config.event_channel_capacity.max(1)); let core = FsmCore::new( self.config.variant, self.config.retry_cap, mission, events_tx.clone(), self.config.takeoff_altitude_m, ); let core = Arc::new(Mutex::new(core)); let table: &'static [internal::fsm::Transition] = match self.config.variant { Variant::Multirotor => internal::multirotor::TABLE, Variant::FixedWing => internal::fixed_wing::TABLE, }; let tick = self.config.tick_interval; let core_for_task = core.clone(); let driver_for_task: Arc = driver; let handle = MissionExecutorHandle { core: core.clone(), events_tx: events_tx.clone(), driver: driver_for_task.clone(), hard_floor_active: Arc::new(AtomicBool::new(false)), }; let join = tokio::spawn(async move { run_loop(core_for_task, table, driver_for_task, telemetry_rx, tick).await; }); (handle, join) } } async fn run_loop( core: Arc>, table: &'static [internal::fsm::Transition], driver: Arc, mut telemetry_rx: watch::Receiver, tick: Duration, ) { let mut ticker = tokio::time::interval_at(Instant::now() + tick, tick); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; let telemetry = *telemetry_rx.borrow_and_update(); let mut guard = core.lock().await; let outcome = step_one(&mut guard, table, &telemetry, driver.as_ref()).await; let terminal = matches!( outcome, StepOutcome::AlreadyDone | StepOutcome::Paused { .. } ); drop(guard); if terminal { return; } } } /// Read-side handle. Clone-safe. #[derive(Clone)] pub struct MissionExecutorHandle { core: Arc>, events_tx: broadcast::Sender, /// Driver used by [`insert_middle_waypoint`] and any other /// failsafe path that needs to issue a fresh mission upload. driver: Arc, /// Set to `true` once the battery hard floor (15 % default) has /// fired. Latched: only the operator-level recovery flow can /// clear it. Drives `health()` → red while active. hard_floor_active: Arc, } impl MissionExecutorHandle { /// Current FSM state. Cheap (single mutex lock). pub async fn state(&self) -> MissionState { self.core.lock().await.state } /// Subscribe to the broadcast stream of [`TransitionEvent`]s. /// Each new subscriber starts from the next event published; past /// events are not replayed. pub fn subscribe(&self) -> broadcast::Receiver { self.events_tx.subscribe() } /// Post-increment retry counter for the given transition. pub async fn retry_count(&self, key: TransitionKey) -> u32 { self.core.lock().await.retry_count(&key) } /// Reason the FSM paused, if it is paused. pub async fn paused_reason(&self) -> Option { self.core.lock().await.paused_reason.clone() } /// `true` once the battery hard floor (15 % default) has fired. /// Drives `health()` → red until cleared via /// [`MissionExecutorHandle::clear_hard_floor`]. pub fn hard_floor_active(&self) -> bool { self.hard_floor_active.load(Ordering::Relaxed) } /// Operator-acknowledged clear of the hard-floor latch. Intended /// for ground-test workflows where the battery has been swapped. pub fn clear_hard_floor(&self) { self.hard_floor_active.store(false, Ordering::Relaxed); } /// Aggregated health: red when paused or while the battery hard /// floor has fired, green when `Done`, yellow otherwise. pub async fn health(&self) -> ComponentHealth { if self.hard_floor_active() { return ComponentHealth::red(NAME, "battery hard floor active"); } let guard = self.core.lock().await; match guard.state { MissionState::Paused => { let reason = guard .paused_reason .clone() .unwrap_or_else(|| "paused".to_string()); ComponentHealth::red(NAME, reason) } MissionState::Done => ComponentHealth::green(NAME).with_detail("mission complete"), other => ComponentHealth::yellow(NAME, format!("state={other:?}")), } } /// Insert a single middle waypoint immediately after the /// currently-active waypoint (or, if the mission has not started /// yet, at the head) and re-upload via the driver. Returns once /// the airframe has acknowledged the new mission. Strategic /// placement decisions (where in geographic space the new /// waypoint belongs) are owned by `scan_controller`; this entry /// point handles the **mechanics** of patch + re-upload only. pub async fn insert_middle_waypoint(&self, at: Coordinate) -> Result<()> { let hint = MiddleWaypointHint { at, // Insert after seq 0 so the airframe still treats seq 0 // as the rejoin anchor. scan_controller will eventually // supply a richer hint via a follow-up surface. insert_after_seq: 0, label: None, }; let current_mission: Vec = { let guard = self.core.lock().await; guard.mission.clone() }; let planner = MissionRePlanner::new(self.driver.clone()); let patched = planner .on_middle_waypoint(hint, ¤t_mission) .await .map_err(|e| AutopilotError::Internal(format!("middle-waypoint re-upload: {e}")))?; let mut guard = self.core.lock().await; guard.mission = patched; Ok(()) } /// Apply a failsafe response immediately. /// /// All non-degraded variants short-circuit `MissionState::FlyMission` /// → `MissionState::Land`. The actual MAVLink command /// (`MAV_CMD_NAV_RETURN_TO_LAUNCH` or `MAV_CMD_NAV_LAND`) is /// issued by the dedicated driver for each failsafe family /// (`LostLinkDriver` for `LinkLost*`, `BatteryDriver` for /// `BatteryRtl` / `BatteryHardFloor`, `GeofenceDriver` for the /// geofence variants). The FSM transition recorded here is the /// autopilot's internal accounting of the abort; the airframe /// follows the command sent by the driver. /// /// Earlier states (`Disconnected`, `Connected`, `HealthOk`, /// `BitOk`, `Armed`, `TakeOff`, `MissionUploaded`) are NOT /// overridden: in those states the airframe's own failsafe and /// the driver's command are the right authority. /// /// Calling this while the FSM is already `Paused` is a no-op. pub async fn failsafe_trigger(&self, kind: FailsafeKind) -> Result<()> { match kind { FailsafeKind::LinkDegraded => { // Degraded is yellow-health-only; no transition needed. Ok(()) } FailsafeKind::LinkLost | FailsafeKind::LinkLostInFollow | FailsafeKind::BatteryRtl | FailsafeKind::GeofenceInclusion | FailsafeKind::GeofenceExclusion => { self.transition_flymission_to_land().await; Ok(()) } FailsafeKind::BatteryHardFloor => { self.hard_floor_active.store(true, Ordering::Relaxed); self.transition_flymission_to_land().await; Ok(()) } } } async fn transition_flymission_to_land(&self) { let mut core = self.core.lock().await; if core.state == MissionState::FlyMission { let from = core.state; core.state = MissionState::Land; let _ = self.events_tx.send(TransitionEvent { variant: core.variant, from, to: MissionState::Land, at: chrono::Utc::now(), retry_count: 0, }); } } /// Test-only back-door for forcing FSM state. The FSM normally /// advances through telemetry-gated transitions; integration /// tests that need to assert failsafe behaviour in a specific /// state use this rather than wiring a full transition harness. /// Not part of the production API. #[doc(hidden)] pub async fn force_state_for_tests(&self, state: MissionState) { let mut core = self.core.lock().await; core.state = state; } /// Pre-AZ-648 helper kept for callers that only need to validate a /// mission shape. The proper start path is [`MissionExecutor::run`]. pub async fn start(&self, _mission: Vec) -> Result<()> { Err(AutopilotError::NotImplemented( "mission_executor::start: use MissionExecutor::run (AZ-648)", )) } } trait HealthDetail { fn with_detail(self, detail: impl Into) -> Self; } impl HealthDetail for ComponentHealth { fn with_detail(mut self, detail: impl Into) -> Self { self.detail = Some(detail.into()); self } } /// Spawn a task that subscribes to `mavlink_handle.subscribe_inbound()` /// and republishes every telemetry-bearing message through /// `forwarder`. Returns the task handle. /// /// Non-telemetry MAVLink messages (mission protocol, command acks, /// status text, etc.) are intentionally ignored — they are consumed /// by other paths inside `mavlink_layer` (`send_command` demux, /// `mission_client` pull, …). /// /// `RecvError::Lagged(n)` on the inbound subscription is treated as /// a hard drop on this side too: we log `n` skipped frames at warn /// level (the forwarder doesn't even see them) and continue. The /// forwarder's downstream channels are independent and unaffected. pub fn spawn_mavlink_pump( mavlink_handle: mavlink_layer::MavlinkHandle, forwarder: Arc, ) -> JoinHandle<()> { let mut rx = mavlink_handle.subscribe_inbound(); tokio::spawn(async move { loop { match rx.recv().await { Ok(inbound) => { if let Some(projection) = MavlinkProjection::from_mavlink(&inbound.message) { forwarder.publish_from_mavlink(&projection); } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!( skipped = n, "mission_executor telemetry pump lagged on mavlink inbound stream" ); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { tracing::info!( "mission_executor telemetry pump: mavlink inbound stream closed" ); return; } } } }) } #[cfg(test)] mod tests { use super::*; use async_trait::async_trait; struct NeverCalledDriver; #[async_trait] impl MissionDriver for NeverCalledDriver { async fn arm(&self) -> std::result::Result<(), DriverError> { panic!("arm called"); } async fn takeoff(&self, _altitude_m: f32) -> std::result::Result<(), DriverError> { panic!("takeoff called"); } async fn upload_mission( &self, _items: &[MissionWaypoint], ) -> std::result::Result<(), DriverError> { panic!("upload_mission called"); } async fn set_auto_mode(&self) -> std::result::Result<(), DriverError> { panic!("set_auto_mode called"); } async fn post_flight_sync(&self) -> std::result::Result<(), DriverError> { panic!("post_flight_sync called"); } } #[tokio::test] async fn handle_starts_in_disconnected_with_yellow_health() { // Arrange let exec = MissionExecutor::new(MissionExecutorConfig::multirotor(10.0)); let (_tx, rx) = watch::channel(Telemetry::default()); let driver = Arc::new(NeverCalledDriver); // Act let (handle, join) = exec.run(driver, vec![], rx); // Assert assert_eq!(handle.state().await, MissionState::Disconnected); let health = handle.health().await; assert_eq!(health.level, shared::health::HealthLevel::Yellow); // Cleanup join.abort(); } }