Files
Oleksandr Bezdieniezhnykh 0a87c0f716
ci/woodpecker/push/build-arm Pipeline failed
[AZ-645] [AZ-646] [AZ-647] mission_client: middle-waypoint POST + mapobjects pull/push
Batch 3 of greenfield Step 7 — mission_client epic AZ-638 close-out.

AZ-645 (Middle-waypoint POST)
- post_middle_waypoint(mission_id, &Mission) -> Result<MissionUpdateAck, PostError>
- Bounded retry (default 3 attempts) shared with the rest of missions_api
- Health: last_middle_waypoint_post_status (ok/error)

AZ-646 (Pre-flight MapObjects pull)
- pull_mapobjects(mission_id) -> Result<MapObjectsBundle, PullError>
- Schema-validated against bundled shared/contracts/mapobjects-bundle.json
- Typed errors: Unreachable / SchemaInvalid / MaxRetriesExceeded / Internal
- Health: mapobjects_pull_state, last_mapobjects_pull_ts

AZ-647 (Post-flight push + durable disk queue)
- push_mapobjects_diff(mission_id, MapObjectsDiff) -> PushReport
- recover_pending_pushes() -> Vec<PushReport> for crash recovery
- Write-ahead atomic-rename persistence under ${state_dir}/mapobjects_push/
- Per-endpoint independent retry: observations + ignored_items
- Partial success rewrites the disk file with only the failing portion
- Health: mapobjects_push_pending, last_push_ts, per-endpoint last error

Infrastructure
- Schemas: shared/contracts/mapobjects-{bundle,observations,ignored}.json
- Restructured schema/ into mission.rs + mapobjects.rs sub-modules
- New mapobjects_sync/ (pull, push, queue)
- workspace dep tempfile=3; mission_client dev-deps add tempfile + chrono

Tests
- 12/12 ACs verified locally (4 AZ-645 + 4 AZ-646 + 5 AZ-647)
- mission_client suite: 15 unit + 18 integration = 33 tests pass
- AZ-646 AC-4 proxy: 1000-object + 1000-ignored bundle within 30s
- AZ-647 AC-5 proxy: 5000-obs + 500-ignored push within 2min

Code review verdict: PASS_WITH_WARNINGS (inline). Cumulative review
(K=3 trigger) PASS_WITH_WARNINGS — full report in
_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md.

Open follow-ups (non-blocking):
- module-layout.md: rename push_mapobjects -> push_mapobjects_diff (Step 13)
- ExponentialBackoff still duplicated across crates; promote to shared::retry
  when the third caller lands (likely detection_client AZ-660/661)
- state_dir default is relative; composition root must override

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 12:54:15 +03:00

218 lines
7.7 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! AZ-646 integration tests driven by `wiremock`.
//!
//! Coverage:
//! - AC-1: happy-path GET returns `Ok(MapObjectsBundle)`; health
//! `mapobjects_pull_state` is `synced`.
//! - AC-2: schema-invalid bundle (missing required field) returns
//! `Err(SchemaInvalid)` — no silent acceptance.
//! - AC-3: unreachable server (a port that refuses connections) returns
//! `Err(Unreachable)` / `MaxRetriesExceeded`; health goes Red.
//! - AC-4: a fixture sized for a 30 km × 30 km mission area validates and
//! deserialises within the 30 s budget on loopback. We use 1 000 objects +
//! 1 000 ignored items as the proxy — a real 30×30 km mission is bounded by
//! this order of magnitude per the AZ-666 ignored-cap NFR.
use std::time::{Duration, Instant};
use chrono::{TimeZone, Utc};
use mission_client::{MissionClient, MissionClientOptions, PullError};
use shared::health::HealthLevel;
use uuid::Uuid;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn options_for(
endpoint: &str,
max_attempts: u32,
tmp_dir: &std::path::Path,
) -> MissionClientOptions {
let mut o = MissionClientOptions::new(endpoint);
o.max_attempts = max_attempts;
o.backoff_base = Duration::from_millis(10);
o.backoff_cap = Duration::from_millis(50);
o.request_timeout = Duration::from_secs(2);
o.connect_timeout = Duration::from_secs(1);
o.state_dir = tmp_dir.to_path_buf();
o
}
fn good_bundle(mission_id: &str, objects: usize, ignored: usize) -> serde_json::Value {
let map_objects: Vec<_> = (0..objects)
.map(|i| {
serde_json::json!({
"h3_cell": 10_000 + i as u64,
"mgrs_key": format!("MGRS-{i}"),
"class": "tank",
"class_group": "armor",
"gps_lat": 49.0 + (i as f64) * 0.001,
"gps_lon": 31.0 + (i as f64) * 0.001,
"size_width_m": 3.5,
"size_length_m": 7.0,
"confidence": 0.85,
"first_seen": "2026-05-19T11:00:00Z",
"last_seen": "2026-05-19T11:30:00Z",
"mission_id": mission_id,
"source": "central_pulled",
"pending_upload": false
})
})
.collect();
let ignored_items: Vec<_> = (0..ignored)
.map(|i| {
serde_json::json!({
"id": Uuid::from_u128(0xdead_0000 + i as u128).to_string(),
"mgrs": format!("MGRS-IG-{i}"),
"h3_cell": 99_000 + i as u64,
"class_group": "civilian",
"decline_time": "2026-05-19T11:15:00Z",
"mission_id": mission_id,
"retention_scope": "mission",
"source": "central_pulled",
"pending_upload": false
})
})
.collect();
serde_json::json!({
"schema_version": "1.0.0",
"mission_id": mission_id,
"bbox": [
{ "latitude": 49.5, "longitude": 31.0, "altitude_m": 0.0 },
{ "latitude": 49.0, "longitude": 31.5, "altitude_m": 0.0 }
],
"map_objects": map_objects,
"observations": [],
"ignored_items": ignored_items,
"as_of": "2026-05-19T12:00:00Z",
"freshness": "fresh"
})
}
#[tokio::test]
async fn ac1_happy_path_pull() {
// Arrange
let tmp = tempfile::tempdir().unwrap();
let mock = MockServer::start().await;
let mission_id = "M-mo-1";
Mock::given(method("GET"))
.and(path(format!("/missions/{mission_id}/mapobjects")))
.respond_with(
ResponseTemplate::new(200).set_body_string(good_bundle(mission_id, 3, 1).to_string()),
)
.expect(1)
.mount(&mock)
.await;
let client =
MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds");
let h = client.handle();
// Act
let bundle = h.pull_mapobjects(mission_id).await.expect("happy pull");
// Assert
assert_eq!(bundle.mission_id, mission_id);
assert_eq!(bundle.map_objects.len(), 3);
assert_eq!(bundle.ignored_items.len(), 1);
let detail = h.health().detail.unwrap_or_default();
assert!(
detail.contains("mapobjects_pull_state=synced"),
"health detail did not record synced: {detail}"
);
}
#[tokio::test]
async fn ac2_schema_invalid_is_rejected() {
// Arrange: 200 OK but the bundle is missing the required `mission_id`.
let tmp = tempfile::tempdir().unwrap();
let mock = MockServer::start().await;
let mut bad = good_bundle("M-bad", 1, 0);
let obj = bad.as_object_mut().unwrap();
obj.remove("mission_id");
Mock::given(method("GET"))
.and(path("/missions/M-bad/mapobjects"))
.respond_with(ResponseTemplate::new(200).set_body_string(bad.to_string()))
.mount(&mock)
.await;
let client =
MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds");
let h = client.handle();
// Act
let err = h.pull_mapobjects("M-bad").await.unwrap_err();
// Assert
match err {
PullError::SchemaInvalid { messages, sample } => {
assert!(messages.iter().any(|m| m.contains("mission_id")));
assert!(!sample.is_empty());
}
other => panic!("expected SchemaInvalid, got {other:?}"),
}
assert_eq!(h.health().level, HealthLevel::Red);
}
#[tokio::test]
async fn ac3_unreachable_surfaces_failure() {
// Arrange: a port the OS will refuse to connect to.
let tmp = tempfile::tempdir().unwrap();
// Bind a TcpListener to discover a free port, then immediately drop it so
// connect() refuses.
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
let endpoint = format!("http://{addr}");
let client = MissionClient::new(options_for(&endpoint, 2, tmp.path())).expect("client builds");
let h = client.handle();
// Act
let err = h.pull_mapobjects("M-unreach").await.unwrap_err();
// Assert
match err {
PullError::Unreachable(reason) => {
assert!(!reason.is_empty(), "Unreachable reason should not be empty");
}
PullError::MaxRetriesExceeded { attempts, .. } => {
assert_eq!(attempts, 2);
}
other => panic!("expected Unreachable or MaxRetriesExceeded, got {other:?}"),
}
assert_eq!(h.health().level, HealthLevel::Red);
}
#[tokio::test]
async fn ac4_large_bundle_within_budget() {
// Arrange: 1 000 map objects + 1 000 ignored items as the proxy for a
// 30 km × 30 km mission area on loopback.
let tmp = tempfile::tempdir().unwrap();
let mock = MockServer::start().await;
let mission_id = "M-large";
Mock::given(method("GET"))
.and(path(format!("/missions/{mission_id}/mapobjects")))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(good_bundle(mission_id, 1_000, 1_000).to_string()),
)
.mount(&mock)
.await;
let client =
MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds");
let h = client.handle();
// Act
let started = Instant::now();
let bundle = h.pull_mapobjects(mission_id).await.expect("happy pull");
let elapsed = started.elapsed();
// Assert
assert_eq!(bundle.map_objects.len(), 1000);
assert_eq!(bundle.ignored_items.len(), 1000);
assert!(
elapsed < Duration::from_secs(30),
"pull took {elapsed:?}; budget is 30 s"
);
// Sanity-touch chrono to silence dead_code on the import; the wallclock
// dates inside the bundle are validated by the bundle schema's
// `format: date-time` constraint.
let _ = Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap();
}