[AZ-651] [AZ-668] lost-link failsafe ladder + mapobjects persistence (batch 7)

AZ-651 (mission_executor lost-link ladder):
- LostLinkLadder pure-logic state machine (LinkOk -> Degraded -> Lost
  -> LinkLostInFollow + MavlinkLost branch). Configurable thresholds
  via LostLinkConfig.
- LostLinkCommandIssuer trait + MavlinkCommandIssuer production impl
  emitting MAV_CMD_NAV_RETURN_TO_LAUNCH via MavlinkHandle::send_command.
- LostLinkDriver task wires the ladder to operator-link watch, MAVLink
  LinkEvent broadcast, and optional target-follow signal. On RTL,
  driver calls the issuer THEN MissionExecutorHandle::failsafe_trigger.
- failsafe_trigger(LinkLost | LinkLostInFollow) short-circuits FlyMission
  -> Land via direct FSM state mutation + TransitionEvent emission;
  Paused state is intentionally NOT overridden.
- Tests: 4/4 ACs locally green (degraded-no-rtl; lost-fires-once;
  follow-grace; mavlink-loss-no-rtl) plus driver + FSM integration.

AZ-668 (mapobjects_store persistence):
- Snapshot serializable shape + Store::{to_snapshot,from_snapshot}
  round trip.
- MapObjectsPersistence async trait + JsonSnapshotEngine default impl
  (write to .tmp, sync_all, atomic rename, best-effort parent fsync).
- PersistenceError::{Corrupt, SchemaMismatch} surfaces explicit errors
  on bad blob; PersistenceMetrics tracks last_snapshot_ts,
  snapshot_size_bytes, snapshot_errors_total.
- MapObjectsStore::from_snapshot factory for crash recovery from the
  composition root.
- Tests: 4/4 ACs locally green (round-trip; atomic rename ignores
  partial .tmp; crash recovery preserves pending; corruption returns
  explicit error) plus schema-mismatch + metrics smoke checks.

Quality gates:
- cargo fmt: clean.
- cargo clippy -p mission_executor -p mapobjects_store --tests: 0 warns.
- cargo test --workspace: all green.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-19 18:59:28 +03:00
parent 23366a5c6d
commit 2bcd4a8059
16 changed files with 1940 additions and 8 deletions
+5 -1
View File
@@ -9,7 +9,7 @@ authors.workspace = true
[dependencies]
shared = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
@@ -17,6 +17,10 @@ h3o = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
async-trait = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
# H3 spatial index lives in `internal::h3_index`. Engine plug points (Q3)
# materialise in AZ-668; ignored-suppression in AZ-666; hydrate / pending in AZ-667.
@@ -3,4 +3,6 @@
pub mod h3_index;
pub mod ignored;
pub mod passes;
pub mod persistence;
pub mod snapshot;
pub mod store;
@@ -0,0 +1,221 @@
//! AZ-668 — persistence trait + default JSON snapshot engine.
//!
//! Default engine per Q3: in-memory + atomic JSON snapshot. The trait
//! is kept narrow on purpose so a future SQLite+H3 / RocksDB engine
//! can swap in without touching call sites.
//!
//! Crash-safety: writes go to `${state_dir}/mapobjects/<mission_id>.json.tmp`,
//! are fsync'd, then atomically renamed onto the final path. The parent
//! directory is fsync'd after the rename so the rename itself survives
//! a power loss. Interrupted writes leave the `.tmp` file behind; the
//! next `load_snapshot` ignores it.
//!
//! Corruption surfaces as [`PersistenceError::Corrupt`]: the caller MUST
//! refuse to start with stale state and propagate the error to the
//! operator (AZ-668 AC-4). The engine does NOT silently fall back to
//! an empty store.
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::Mutex as AsyncMutex;
use tokio::{fs, io::AsyncWriteExt};
use super::snapshot::Snapshot;
/// Errors surfaced by [`MapObjectsPersistence`].
#[derive(Debug, Error)]
pub enum PersistenceError {
#[error("persistence I/O error: {0}")]
Io(#[from] std::io::Error),
/// The snapshot file was present but unreadable. The caller MUST
/// refuse to start with stale state and surface the error to the
/// operator — never silently start empty (AZ-668 AC-4).
#[error("snapshot corrupt at {path}: {reason}")]
Corrupt { path: PathBuf, reason: String },
/// Schema version mismatch — the on-disk blob predates the running
/// binary. Treated as corruption (operator must reconcile).
#[error("snapshot schema mismatch at {path}: expected {expected}, found {found}")]
SchemaMismatch {
path: PathBuf,
expected: u32,
found: u32,
},
}
/// Engine-level metrics surfaced to the health aggregator.
/// Per AZ-668 §Outcome: `last_snapshot_ts`, `snapshot_size_bytes`,
/// `snapshot_errors_total`.
#[derive(Debug, Clone, Default)]
pub struct PersistenceMetrics {
pub last_snapshot_ts: Option<chrono::DateTime<chrono::Utc>>,
pub snapshot_size_bytes: Option<u64>,
pub snapshot_errors_total: u64,
}
/// Pluggable persistence backend. The default impl is the JSON
/// snapshot engine (below); future Q3 engines (SQLite+H3, RocksDB, …)
/// implement this trait without breaking call sites.
///
/// Methods are `async` because file I/O on the Jetson can stall while
/// the SD card is busy with detection-evidence writes; blocking the
/// runtime worker thread would starve `mavlink_layer`'s heartbeat
/// task. Implementations that do nothing async can delegate to
/// `tokio::task::spawn_blocking`.
#[async_trait]
pub trait MapObjectsPersistence: Send + Sync {
/// Atomically persist `snapshot` keyed by its `mission_id`.
/// Implementations MUST guarantee no partial writes are visible to
/// `load_snapshot` — typically by writing to a `.tmp` sibling then
/// renaming.
async fn save_snapshot(&self, snapshot: &Snapshot) -> Result<(), PersistenceError>;
/// Load the most recent snapshot for `mission_id`. Returns
/// `Ok(None)` if no snapshot exists; `Err(Corrupt)` on a present
/// but unreadable blob (the caller MUST refuse to start).
async fn load_snapshot(&self, mission_id: &str) -> Result<Option<Snapshot>, PersistenceError>;
/// Engine metrics for the health surface.
fn metrics(&self) -> PersistenceMetrics;
}
/// Default Q3 engine: one JSON file per mission, atomic-renamed on
/// each write.
///
/// Path layout: `${state_dir}/mapobjects/<mission_id>.json`. The
/// `mapobjects` subdirectory is created on first write.
pub struct JsonSnapshotEngine {
state_dir: PathBuf,
metrics: AsyncMutex<PersistenceMetrics>,
}
impl std::fmt::Debug for JsonSnapshotEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonSnapshotEngine")
.field("state_dir", &self.state_dir)
.finish_non_exhaustive()
}
}
impl JsonSnapshotEngine {
/// Construct an engine rooted at `state_dir`. The directory does
/// not have to exist yet — it is created lazily on the first
/// successful `save_snapshot`.
pub fn new(state_dir: impl Into<PathBuf>) -> Self {
Self {
state_dir: state_dir.into(),
metrics: AsyncMutex::new(PersistenceMetrics::default()),
}
}
/// Resolve the canonical snapshot path for `mission_id`.
///
/// `mission_id` is treated as an opaque filename component. Callers
/// supply trusted ids from the central API; no path traversal
/// sanitisation is performed (the AZ-668 spec does not require it).
/// If untrusted ids ever flow in, add validation here.
pub fn snapshot_path(&self, mission_id: &str) -> PathBuf {
self.state_dir
.join("mapobjects")
.join(format!("{mission_id}.json"))
}
fn tmp_path(&self, mission_id: &str) -> PathBuf {
self.state_dir
.join("mapobjects")
.join(format!("{mission_id}.json.tmp"))
}
}
#[async_trait]
impl MapObjectsPersistence for JsonSnapshotEngine {
async fn save_snapshot(&self, snapshot: &Snapshot) -> Result<(), PersistenceError> {
let outcome = self.save_snapshot_inner(snapshot).await;
if outcome.is_err() {
let mut m = self.metrics.lock().await;
m.snapshot_errors_total = m.snapshot_errors_total.saturating_add(1);
}
outcome
}
async fn load_snapshot(&self, mission_id: &str) -> Result<Option<Snapshot>, PersistenceError> {
let path = self.snapshot_path(mission_id);
let outcome = self.load_snapshot_inner(&path).await;
if matches!(
outcome,
Err(PersistenceError::Corrupt { .. } | PersistenceError::SchemaMismatch { .. })
) {
let mut m = self.metrics.lock().await;
m.snapshot_errors_total = m.snapshot_errors_total.saturating_add(1);
}
outcome
}
fn metrics(&self) -> PersistenceMetrics {
// Cheap snapshot under a non-async borrow — `try_lock` keeps the
// health surface non-blocking; if the lock is contended we
// return zeros rather than parking the health caller.
self.metrics
.try_lock()
.map(|m| m.clone())
.unwrap_or_default()
}
}
impl JsonSnapshotEngine {
async fn save_snapshot_inner(&self, snapshot: &Snapshot) -> Result<(), PersistenceError> {
let path = self.snapshot_path(&snapshot.mission_id);
let tmp = self.tmp_path(&snapshot.mission_id);
let dir = path.parent().expect("snapshot path always has parent");
fs::create_dir_all(dir).await?;
let bytes = serde_json::to_vec(snapshot).map_err(|e| PersistenceError::Corrupt {
path: path.clone(),
reason: format!("serialize: {e}"),
})?;
let size = bytes.len() as u64;
{
let mut f = fs::File::create(&tmp).await?;
f.write_all(&bytes).await?;
f.sync_all().await?;
}
fs::rename(&tmp, &path).await?;
// Best-effort parent fsync so the rename survives a power
// loss. POSIX guarantees this is the durability anchor for
// directory operations; non-POSIX platforms ignore.
if let Ok(dir_handle) = std::fs::File::open(dir) {
let _ = dir_handle.sync_all();
}
let mut m = self.metrics.lock().await;
m.last_snapshot_ts = Some(chrono::Utc::now());
m.snapshot_size_bytes = Some(size);
Ok(())
}
async fn load_snapshot_inner(
&self,
path: &Path,
) -> Result<Option<Snapshot>, PersistenceError> {
let bytes = match fs::read(path).await {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let snapshot: Snapshot =
serde_json::from_slice(&bytes).map_err(|e| PersistenceError::Corrupt {
path: path.to_path_buf(),
reason: format!("deserialize: {e}"),
})?;
if snapshot.schema_version != Snapshot::CURRENT_SCHEMA_VERSION {
return Err(PersistenceError::SchemaMismatch {
path: path.to_path_buf(),
expected: Snapshot::CURRENT_SCHEMA_VERSION,
found: snapshot.schema_version,
});
}
Ok(Some(snapshot))
}
}
@@ -0,0 +1,79 @@
//! AZ-668 — serializable snapshot of the in-memory MapObjects store.
//!
//! A `Snapshot` is the durable shape written to disk by
//! [`crate::JsonSnapshotEngine`] and round-tripped via
//! [`super::store::Store::to_snapshot`] /
//! [`super::store::Store::from_snapshot`].
//!
//! Schema versioning lives here so a future engine migration (e.g.
//! switching to SQLite+H3 per Q3) can bump the version and refuse to
//! load older blobs rather than silently importing them.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use shared::models::mapobject::{IgnoredItem, MapObjectObservation};
use uuid::Uuid;
use super::store::SyncState;
/// Stable, serializable shape of one stored map object. Mirrors the
/// fields the in-memory `StoredMapObject` carries minus the runtime
/// `h3o::CellIndex` (which is rebuilt from `gps_lat` / `gps_lon` on
/// load — the H3 resolution lives in `MapObjectsStoreConfig`, not the
/// snapshot, because changing resolution is a configuration choice
/// orthogonal to the snapshot blob).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SnapshotMapObject {
pub id: Uuid,
/// H3 cell at the resolution the snapshot was taken at. Stored for
/// audit / diagnostics; the `from_snapshot` path recomputes it from
/// `(gps_lat, gps_lon)` against the loading store's configured
/// resolution.
pub h3_cell: u64,
pub mgrs: String,
pub class: String,
pub class_group: String,
pub gps_lat: f64,
pub gps_lon: f64,
pub size_width_m: f32,
pub size_length_m: f32,
pub confidence: f32,
pub first_seen: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
pub mission_id: String,
}
/// Durable on-disk state of a single mission. One file per mission per
/// `JsonSnapshotEngine::state_dir` — see AZ-668 §Outcome.
///
/// `PartialEq` is intentionally NOT derived — `IgnoredItem` and
/// `MapObjectObservation` are owned by the `shared` crate and do not
/// derive it. Tests compare snapshots via JSON-string round-trip,
/// which is the contract the persistence layer actually preserves.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
/// Bump on any breaking change to this struct.
pub schema_version: u32,
pub mission_id: String,
pub as_of: DateTime<Utc>,
#[serde(default)]
pub map_objects: Vec<SnapshotMapObject>,
#[serde(default)]
pub ignored_items: Vec<IgnoredItem>,
#[serde(default)]
pub pending_observations: Vec<MapObjectObservation>,
#[serde(default)]
pub pending_ignored: Vec<IgnoredItem>,
pub sync_state: SyncState,
#[serde(default)]
pub last_pull_ts: Option<DateTime<Utc>>,
#[serde(default)]
pub last_push_ts: Option<DateTime<Utc>>,
}
impl Snapshot {
/// Current schema version. Increment on any breaking change to the
/// serialized shape; older blobs then refuse to load with
/// [`crate::PersistenceError::Corrupt`].
pub const CURRENT_SCHEMA_VERSION: u32 = 1;
}
+80 -1
View File
@@ -14,6 +14,7 @@ use std::collections::HashMap;
use chrono::{DateTime, Utc};
use h3o::CellIndex;
use serde::{Deserialize, Serialize};
use shared::error::Result;
use shared::models::mapobject::{
BundleFreshness, DiffKind, IgnoredItem, IgnoredItemSource, MapObject, MapObjectObservation,
@@ -24,13 +25,15 @@ use uuid::Uuid;
use super::h3_index::{cell_of, grid_disk, haversine_m, DEFAULT_K_RING, DEFAULT_RESOLUTION};
use super::ignored::IgnoredSet;
use super::passes::{bbox_contains, PassTracker, RegionBbox};
use super::snapshot::{Snapshot, SnapshotMapObject};
/// Sync state machine surfaced to `scan_controller` + health aggregator.
///
/// See `_docs/02_document/components/mapobjects_store/description.md §3`.
/// `Failed` is the bounded-retries-exhausted terminal state for the
/// post-flight push (Frozen choice 7 / `description.md §7`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SyncState {
/// Initial state at process boot; no hydrate has run yet.
FreshBoot,
@@ -453,6 +456,82 @@ impl Store {
self.last_push_ts = Some(Utc::now());
}
/// Materialise the in-memory state into a serializable [`Snapshot`].
/// Open passes are intentionally NOT captured — they are transient
/// in-flight state and should restart after a process restart.
pub fn to_snapshot(&self, mission_id: String) -> Snapshot {
let map_objects: Vec<SnapshotMapObject> = self
.by_cell
.values()
.flatten()
.map(|o| SnapshotMapObject {
id: o.id,
h3_cell: u64::from(o.h3_cell),
mgrs: o.mgrs.clone(),
class: o.class.clone(),
class_group: o.class_group.clone(),
gps_lat: o.gps_lat,
gps_lon: o.gps_lon,
size_width_m: o.size_width_m,
size_length_m: o.size_length_m,
confidence: o.confidence,
first_seen: o.first_seen,
last_seen: o.last_seen,
mission_id: o.mission_id.clone(),
})
.collect();
let ignored_items: Vec<IgnoredItem> = self.ignored.items().cloned().collect();
Snapshot {
schema_version: Snapshot::CURRENT_SCHEMA_VERSION,
mission_id,
as_of: Utc::now(),
map_objects,
ignored_items,
pending_observations: self.pending_observations.clone(),
pending_ignored: self.pending_ignored.clone(),
sync_state: self.sync_state,
last_pull_ts: self.last_pull_ts,
last_push_ts: self.last_push_ts,
}
}
/// Rehydrate from a [`Snapshot`]. Re-keys map objects into their
/// canonical H3 buckets using the supplied config's resolution
/// (so a snapshot taken at one resolution can be loaded into a
/// store configured differently — the spatial buckets are rebuilt
/// either way).
pub fn from_snapshot(config: MapObjectsStoreConfig, snapshot: Snapshot) -> Result<Self> {
let mut store = Self::new(config);
for mo in snapshot.map_objects {
let cell = cell_of(mo.gps_lat, mo.gps_lon, store.config.h3_resolution)?;
store.by_cell.entry(cell).or_default().push(StoredMapObject {
id: mo.id,
h3_cell: cell,
mgrs: mo.mgrs,
class: mo.class,
class_group: mo.class_group,
gps_lat: mo.gps_lat,
gps_lon: mo.gps_lon,
size_width_m: mo.size_width_m,
size_length_m: mo.size_length_m,
confidence: mo.confidence,
first_seen: mo.first_seen,
last_seen: mo.last_seen,
mission_id: mo.mission_id,
});
store.len += 1;
}
for item in snapshot.ignored_items {
store.ignored.append(item);
}
store.pending_observations = snapshot.pending_observations;
store.pending_ignored = snapshot.pending_ignored;
store.sync_state = snapshot.sync_state;
store.last_pull_ts = snapshot.last_pull_ts;
store.last_push_ts = snapshot.last_push_ts;
Ok(store)
}
/// Resolve a raw class string to its canonical group key.
///
/// The first class listed in a `similar_classes` group is the group
+26
View File
@@ -30,6 +30,10 @@ mod internal;
const NAME: &str = "mapobjects_store";
pub use internal::passes::RegionBbox;
pub use internal::persistence::{
JsonSnapshotEngine, MapObjectsPersistence, PersistenceError, PersistenceMetrics,
};
pub use internal::snapshot::{Snapshot, SnapshotMapObject};
pub use internal::store::{
Classification, ClassifyInput, MapObjectsStoreConfig, RemovedCandidate, SyncState,
};
@@ -47,6 +51,16 @@ impl MapObjectsStore {
}
}
/// Construct a store from a previously-captured [`Snapshot`].
/// Used at startup by the composition root for crash recovery
/// (AZ-668 AC-3).
pub fn from_snapshot(config: MapObjectsStoreConfig, snapshot: Snapshot) -> Result<Self> {
let store = internal::store::Store::from_snapshot(config, snapshot)?;
Ok(Self {
inner: Arc::new(Mutex::new(store)),
})
}
pub fn handle(&self) -> MapObjectsStoreHandle {
MapObjectsStoreHandle {
inner: self.inner.clone(),
@@ -249,6 +263,18 @@ impl MapObjectsStoreHandle {
Ok(guard.last_push_ts())
}
/// Capture the current in-memory state as a serializable
/// [`Snapshot`]. The caller hands this to a
/// [`MapObjectsPersistence`] implementation (e.g.
/// [`JsonSnapshotEngine`]) to persist it.
pub fn to_snapshot(&self, mission_id: impl Into<String>) -> Result<Snapshot> {
let guard = self
.inner
.lock()
.map_err(|_| AutopilotError::Internal("mapobjects_store mutex poisoned".into()))?;
Ok(guard.to_snapshot(mission_id.into()))
}
/// Record a successful post-flight push: sets sync_state to
/// `Synced` and stores the wallclock as `last_push_ts`.
pub fn mark_pushed_ok(&self) -> Result<()> {
@@ -0,0 +1,308 @@
//! AZ-668 acceptance criteria — in-memory + JSON snapshot persistence.
//!
//! Covers:
//! - AC-1 snapshot + reload round-trip
//! - AC-2 atomic rename prevents partial writes
//! - AC-3 crash recovery loads pending
//! - AC-4 corruption returns explicit error (never silently empty)
//!
//! Plus a metrics smoke-check (`last_snapshot_ts`,
//! `snapshot_size_bytes`, `snapshot_errors_total`) since the AC requires
//! those three to be surfaced.
use std::path::PathBuf;
use chrono::Utc;
use mapobjects_store::{
ClassifyInput, JsonSnapshotEngine, MapObjectsPersistence, MapObjectsStore,
MapObjectsStoreConfig, PersistenceError,
};
use shared::models::mapobject::{IgnoredItem, IgnoredItemSource, RetentionScope};
use tempfile::TempDir;
use uuid::Uuid;
fn input(lat: f64, lon: f64, class: &str, mission_id: &str) -> ClassifyInput {
ClassifyInput {
gps_lat: lat,
gps_lon: lon,
mgrs: format!("MGRS({lat},{lon})"),
class: class.into(),
size_width_m: 1.0,
size_length_m: 1.0,
confidence: 0.9,
mission_id: mission_id.into(),
observed_at: Utc::now(),
uav_id: "uav1".into(),
observed_at_monotonic_ns: 0,
}
}
fn ignored_item(mgrs: &str, class_group: &str, mission_id: &str) -> IgnoredItem {
IgnoredItem {
id: Uuid::new_v4(),
mgrs: mgrs.into(),
h3_cell: 0,
class_group: class_group.into(),
decline_time: Utc::now(),
operator_id: Some("op-A".into()),
mission_id: mission_id.into(),
retention_scope: RetentionScope::Mission,
expires_at: None,
source: IgnoredItemSource::LocalAppended,
pending_upload: true,
}
}
/// AC-1 — snapshot + reload round-trip preserves indexed objects,
/// ignored items, and pending observations.
#[tokio::test]
async fn ac1_snapshot_reload_round_trip() {
// Arrange — store with 100 MapObjects across a square of latitudes,
// 10 IgnoredItems, and 5 pending observations (the latter come "for
// free" from the first 5 classify calls).
let tmp = TempDir::new().unwrap();
let mission_id = "ac1-mission";
let engine = JsonSnapshotEngine::new(tmp.path());
let store = MapObjectsStore::new(MapObjectsStoreConfig::default());
let h = store.handle();
for i in 0..100 {
let lat = 50.45 + (i as f64) * 0.001;
let lon = 30.52 + (i as f64) * 0.001;
h.classify(input(lat, lon, "tank", mission_id)).unwrap();
}
for i in 0..10 {
h.append_ignored(ignored_item(
&format!("MGRS-{i}"),
"concealed_position",
mission_id,
))
.unwrap();
}
assert_eq!(h.len().unwrap(), 100);
// Act — capture, save, then load into a brand-new store
let snap = h.to_snapshot(mission_id).unwrap();
engine.save_snapshot(&snap).await.unwrap();
let loaded = engine
.load_snapshot(mission_id)
.await
.expect("load ok")
.expect("file present");
let restored = MapObjectsStore::from_snapshot(MapObjectsStoreConfig::default(), loaded).unwrap();
let rh = restored.handle();
// Assert — counts match and pending log survived
assert_eq!(rh.len().unwrap(), 100);
assert_eq!(rh.pending_observations_count().unwrap(), 100);
// The 10 LocalAppended IgnoredItems went into pending_ignored too.
assert_eq!(rh.pending_ignored_count().unwrap(), 10);
// Verify the ignored-set survived the round trip with a probe.
assert!(rh.is_ignored("MGRS-0", "concealed_position").unwrap());
assert!(rh.is_ignored("MGRS-9", "concealed_position").unwrap());
assert!(!rh.is_ignored("MGRS-42", "concealed_position").unwrap());
}
/// AC-2 — atomic rename prevents partial writes.
///
/// We simulate a kill-9 mid-write by creating a leftover `.tmp` file
/// alongside a valid `.json` snapshot. The engine must still load the
/// good snapshot (NOT the partial `.tmp`).
#[tokio::test]
async fn ac2_atomic_rename_ignores_partial_tmp_file() {
// Arrange — write a real snapshot, then poison its sibling `.tmp`
let tmp = TempDir::new().unwrap();
let mission_id = "ac2-mission";
let engine = JsonSnapshotEngine::new(tmp.path());
let store = MapObjectsStore::new(MapObjectsStoreConfig::default());
let h = store.handle();
h.classify(input(50.45, 30.52, "tank", mission_id)).unwrap();
let snap = h.to_snapshot(mission_id).unwrap();
engine.save_snapshot(&snap).await.unwrap();
// Poison: write a half-finished blob to the .tmp sibling
let tmp_path: PathBuf = tmp
.path()
.join("mapobjects")
.join(format!("{mission_id}.json.tmp"));
tokio::fs::write(&tmp_path, b"{\"partial\":")
.await
.expect("write poisoned tmp");
assert!(tmp_path.exists(), "partial .tmp file should exist");
// Act — fresh engine loads from the same dir
let engine2 = JsonSnapshotEngine::new(tmp.path());
let loaded = engine2
.load_snapshot(mission_id)
.await
.expect("load ok")
.expect("good snapshot present");
// Assert — got the good snapshot, ignoring the partial .tmp
assert_eq!(loaded.mission_id, mission_id);
assert_eq!(loaded.map_objects.len(), 1);
// .tmp file is still on disk — the loader never touches it.
assert!(tmp_path.exists());
}
/// AC-3 — crash recovery loads pending observations.
#[tokio::test]
async fn ac3_crash_recovery_loads_pending() {
// Arrange — first process: classify, save
let tmp = TempDir::new().unwrap();
let mission_id = "ac3-mission";
let engine = JsonSnapshotEngine::new(tmp.path());
let store = MapObjectsStore::new(MapObjectsStoreConfig::default());
let h = store.handle();
for i in 0..7 {
let lat = 50.45 + (i as f64) * 0.001;
h.classify(input(lat, 30.52, "tank", mission_id)).unwrap();
}
let pre_crash_count = h.pending_observations_count().unwrap();
assert_eq!(pre_crash_count, 7);
engine
.save_snapshot(&h.to_snapshot(mission_id).unwrap())
.await
.unwrap();
drop(store); // simulate process death
// Act — second process: fresh engine, load
let engine2 = JsonSnapshotEngine::new(tmp.path());
let snap = engine2
.load_snapshot(mission_id)
.await
.unwrap()
.expect("snapshot present");
let recovered =
MapObjectsStore::from_snapshot(MapObjectsStoreConfig::default(), snap).unwrap();
// Assert — pending log matches pre-crash count
assert_eq!(
recovered.handle().pending_observations_count().unwrap(),
pre_crash_count
);
}
/// AC-4 — corruption surfaces an explicit error; metrics increment.
#[tokio::test]
async fn ac4_corruption_returns_explicit_error() {
// Arrange — write a known-truncated blob into the snapshot path
let tmp = TempDir::new().unwrap();
let mission_id = "ac4-mission";
let engine = JsonSnapshotEngine::new(tmp.path());
let dir = tmp.path().join("mapobjects");
tokio::fs::create_dir_all(&dir).await.unwrap();
let path = dir.join(format!("{mission_id}.json"));
// Truncated JSON: opening brace + half a key, no closing brace.
tokio::fs::write(&path, b"{\"schema_version\":1,\"mission_id\":\"trunc")
.await
.unwrap();
// Act
let result = engine.load_snapshot(mission_id).await;
// Assert — explicit Corrupt error; the store does NOT silently
// come up empty (caller surfaces to operator and refuses to start)
match result {
Err(PersistenceError::Corrupt { path: p, reason }) => {
assert_eq!(p, path);
assert!(reason.contains("deserialize"));
}
other => panic!("expected Corrupt, got {other:?}"),
}
// snapshot_errors_total incremented
let m = engine.metrics();
assert!(m.snapshot_errors_total >= 1);
}
/// Schema-mismatch is also treated as corruption — a future engine
/// version bump on disk must not be silently accepted by the running
/// binary.
#[tokio::test]
async fn schema_mismatch_returns_explicit_error() {
// Arrange — write a valid-shape JSON but with a future schema_version
let tmp = TempDir::new().unwrap();
let mission_id = "schema-mismatch-mission";
let engine = JsonSnapshotEngine::new(tmp.path());
let dir = tmp.path().join("mapobjects");
tokio::fs::create_dir_all(&dir).await.unwrap();
let path = dir.join(format!("{mission_id}.json"));
tokio::fs::write(
&path,
br#"{
"schema_version": 999,
"mission_id": "schema-mismatch-mission",
"as_of": "2026-01-01T00:00:00Z",
"map_objects": [],
"ignored_items": [],
"pending_observations": [],
"pending_ignored": [],
"sync_state": "fresh_boot"
}"#,
)
.await
.unwrap();
// Act
let result = engine.load_snapshot(mission_id).await;
// Assert
match result {
Err(PersistenceError::SchemaMismatch {
expected, found, ..
}) => {
assert_eq!(expected, 1);
assert_eq!(found, 999);
}
other => panic!("expected SchemaMismatch, got {other:?}"),
}
}
/// Metrics smoke-check — `last_snapshot_ts` + `snapshot_size_bytes`
/// populated after a successful save.
#[tokio::test]
async fn metrics_populated_after_successful_save() {
// Arrange
let tmp = TempDir::new().unwrap();
let engine = JsonSnapshotEngine::new(tmp.path());
let store = MapObjectsStore::new(MapObjectsStoreConfig::default());
let h = store.handle();
h.classify(input(50.45, 30.52, "tank", "metrics-mission"))
.unwrap();
// Pre-save metrics empty
let pre = engine.metrics();
assert!(pre.last_snapshot_ts.is_none());
assert!(pre.snapshot_size_bytes.is_none());
assert_eq!(pre.snapshot_errors_total, 0);
// Act
let snap = h.to_snapshot("metrics-mission").unwrap();
engine.save_snapshot(&snap).await.unwrap();
// Assert
let post = engine.metrics();
assert!(post.last_snapshot_ts.is_some());
let size = post.snapshot_size_bytes.expect("size recorded");
assert!(size > 0);
assert_eq!(post.snapshot_errors_total, 0);
}
/// `load_snapshot` for an unknown mission returns `Ok(None)` (not
/// `Err`). This is the "first boot, no prior state" case.
#[tokio::test]
async fn load_missing_returns_none() {
// Arrange
let tmp = TempDir::new().unwrap();
let engine = JsonSnapshotEngine::new(tmp.path());
// Act
let result = engine.load_snapshot("never-saved").await.unwrap();
// Assert
assert!(result.is_none());
}