From 0a87c0f71630769a9798ad9be4345029372e64de Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Tue, 19 May 2026 12:54:15 +0300 Subject: [PATCH] [AZ-645] [AZ-646] [AZ-647] mission_client: middle-waypoint POST + mapobjects pull/push MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Batch 3 of greenfield Step 7 — mission_client epic AZ-638 close-out. AZ-645 (Middle-waypoint POST) - post_middle_waypoint(mission_id, &Mission) -> Result - Bounded retry (default 3 attempts) shared with the rest of missions_api - Health: last_middle_waypoint_post_status (ok/error) AZ-646 (Pre-flight MapObjects pull) - pull_mapobjects(mission_id) -> Result - Schema-validated against bundled shared/contracts/mapobjects-bundle.json - Typed errors: Unreachable / SchemaInvalid / MaxRetriesExceeded / Internal - Health: mapobjects_pull_state, last_mapobjects_pull_ts AZ-647 (Post-flight push + durable disk queue) - push_mapobjects_diff(mission_id, MapObjectsDiff) -> PushReport - recover_pending_pushes() -> Vec for crash recovery - Write-ahead atomic-rename persistence under ${state_dir}/mapobjects_push/ - Per-endpoint independent retry: observations + ignored_items - Partial success rewrites the disk file with only the failing portion - Health: mapobjects_push_pending, last_push_ts, per-endpoint last error Infrastructure - Schemas: shared/contracts/mapobjects-{bundle,observations,ignored}.json - Restructured schema/ into mission.rs + mapobjects.rs sub-modules - New mapobjects_sync/ (pull, push, queue) - workspace dep tempfile=3; mission_client dev-deps add tempfile + chrono Tests - 12/12 ACs verified locally (4 AZ-645 + 4 AZ-646 + 5 AZ-647) - mission_client suite: 15 unit + 18 integration = 33 tests pass - AZ-646 AC-4 proxy: 1000-object + 1000-ignored bundle within 30s - AZ-647 AC-5 proxy: 5000-obs + 500-ignored push within 2min Code review verdict: PASS_WITH_WARNINGS (inline). Cumulative review (K=3 trigger) PASS_WITH_WARNINGS — full report in _docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md. Open follow-ups (non-blocking): - module-layout.md: rename push_mapobjects -> push_mapobjects_diff (Step 13) - ExponentialBackoff still duplicated across crates; promote to shared::retry when the third caller lands (likely detection_client AZ-660/661) - state_dir default is relative; composition root must override Co-authored-by: Cursor --- Cargo.lock | 40 ++ Cargo.toml | 1 + .../AZ-645_mission_client_waypoint_post.md | 0 .../AZ-646_mission_client_mapobjects_pull.md | 0 .../AZ-647_mission_client_mapobjects_push.md | 0 .../batch_03_cycle1_report.md | 174 ++++++ ...tive_review_batches_01-03_cycle1_report.md | 123 ++++ _docs/_autodev_state.md | 6 +- crates/mission_client/Cargo.toml | 2 + .../src/internal/mapobjects_sync/mod.rs | 10 + .../src/internal/mapobjects_sync/pull.rs | 45 ++ .../src/internal/mapobjects_sync/push.rs | 216 +++++++ .../src/internal/mapobjects_sync/queue.rs | 242 ++++++++ .../src/internal/missions_api/mod.rs | 300 ++++++--- crates/mission_client/src/internal/mod.rs | 1 + .../src/internal/schema/mapobjects.rs | 193 ++++++ .../src/internal/schema/mission.rs | 116 ++++ .../mission_client/src/internal/schema/mod.rs | 123 +--- crates/mission_client/src/lib.rs | 568 ++++++++++++++++-- .../mission_client/tests/mapobjects_pull.rs | 217 +++++++ .../mission_client/tests/mapobjects_push.rs | 344 +++++++++++ crates/mission_client/tests/waypoint_post.rs | 196 ++++++ .../shared/contracts/mapobjects-bundle.json | 133 ++++ .../shared/contracts/mapobjects-ignored.json | 43 ++ .../contracts/mapobjects-observations.json | 51 ++ 25 files changed, 2911 insertions(+), 233 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-645_mission_client_waypoint_post.md (100%) rename _docs/02_tasks/{todo => done}/AZ-646_mission_client_mapobjects_pull.md (100%) rename _docs/02_tasks/{todo => done}/AZ-647_mission_client_mapobjects_push.md (100%) create mode 100644 _docs/03_implementation/batch_03_cycle1_report.md create mode 100644 _docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md create mode 100644 crates/mission_client/src/internal/mapobjects_sync/mod.rs create mode 100644 crates/mission_client/src/internal/mapobjects_sync/pull.rs create mode 100644 crates/mission_client/src/internal/mapobjects_sync/push.rs create mode 100644 crates/mission_client/src/internal/mapobjects_sync/queue.rs create mode 100644 crates/mission_client/src/internal/schema/mapobjects.rs create mode 100644 crates/mission_client/src/internal/schema/mission.rs create mode 100644 crates/mission_client/tests/mapobjects_pull.rs create mode 100644 crates/mission_client/tests/mapobjects_push.rs create mode 100644 crates/mission_client/tests/waypoint_post.rs create mode 100644 crates/shared/contracts/mapobjects-bundle.json create mode 100644 crates/shared/contracts/mapobjects-ignored.json create mode 100644 crates/shared/contracts/mapobjects-observations.json diff --git a/Cargo.lock b/Cargo.lock index bee1cda..98df14b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,6 +470,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1056,6 +1062,12 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -1182,11 +1194,13 @@ dependencies = [ name = "mission_client" version = "0.1.0" dependencies = [ + "chrono", "jsonschema", "reqwest", "serde", "serde_json", "shared", + "tempfile", "thiserror 1.0.69", "tokio", "tracing", @@ -1655,6 +1669,19 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags 2.11.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.40" @@ -1970,6 +1997,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 0d5af3a..48abc18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ tokio-serial = "5" # Test scaffolding wiremock = "0.6" +tempfile = "3" # Workspace-internal shared = { path = "crates/shared" } diff --git a/_docs/02_tasks/todo/AZ-645_mission_client_waypoint_post.md b/_docs/02_tasks/done/AZ-645_mission_client_waypoint_post.md similarity index 100% rename from _docs/02_tasks/todo/AZ-645_mission_client_waypoint_post.md rename to _docs/02_tasks/done/AZ-645_mission_client_waypoint_post.md diff --git a/_docs/02_tasks/todo/AZ-646_mission_client_mapobjects_pull.md b/_docs/02_tasks/done/AZ-646_mission_client_mapobjects_pull.md similarity index 100% rename from _docs/02_tasks/todo/AZ-646_mission_client_mapobjects_pull.md rename to _docs/02_tasks/done/AZ-646_mission_client_mapobjects_pull.md diff --git a/_docs/02_tasks/todo/AZ-647_mission_client_mapobjects_push.md b/_docs/02_tasks/done/AZ-647_mission_client_mapobjects_push.md similarity index 100% rename from _docs/02_tasks/todo/AZ-647_mission_client_mapobjects_push.md rename to _docs/02_tasks/done/AZ-647_mission_client_mapobjects_push.md diff --git a/_docs/03_implementation/batch_03_cycle1_report.md b/_docs/03_implementation/batch_03_cycle1_report.md new file mode 100644 index 0000000..8b9da5d --- /dev/null +++ b/_docs/03_implementation/batch_03_cycle1_report.md @@ -0,0 +1,174 @@ +# Batch Report + +**Batch**: 3 +**Tasks**: AZ-645 `mission_client_waypoint_post`, AZ-646 `mission_client_mapobjects_pull`, AZ-647 `mission_client_mapobjects_push` +**Date**: 2026-05-19 +**Cycle**: 1 +**Selection context**: Product implementation +**Implementer**: autodev / `.cursor/skills/implement/SKILL.md` + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|--------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|-------------|--------| +| AZ-645 | Done | `crates/mission_client/src/{lib,internal/missions_api/mod}.rs`, integration test `tests/waypoint_post.rs` | pass (4 integration) | 3/3 verified locally + 1 extra coverage test | 0 blocking | +| AZ-646 | Done | `crates/mission_client/src/{lib,internal/mapobjects_sync/{mod,pull},internal/schema/{mod,mission,mapobjects}}.rs`, schema `crates/shared/contracts/mapobjects-bundle.json`, integration test `tests/mapobjects_pull.rs` | pass (4 integration) | 4/4 verified locally (AC-4 with 1k-object proxy) | 0 blocking | +| AZ-647 | Done | `crates/mission_client/src/{lib,internal/mapobjects_sync/{push,queue}}.rs`, schemas `crates/shared/contracts/{mapobjects-observations,mapobjects-ignored}.json`, workspace `Cargo.toml` (+`tempfile`), `crates/mission_client/Cargo.toml` (+`tempfile`, `chrono` dev-deps), integration test `tests/mapobjects_push.rs` (5 ACs + crash-recovery + 5k-obs proxy push) | pass (5 integration + 4 unit on queue) | 5/5 verified locally | 0 blocking | + +## AC Test Coverage + +| Task | AC | Description | Verified locally | Notes | +|--------|------|--------------------------------------------------------|------------------|----------------------------------------------------------------------------------------------------| +| AZ-645 | AC-1 | Happy-path POST returns `Ok(MissionUpdateAck)` | YES | `ac1_happy_path_post`; health detail records `last_middle_waypoint_post_status=ok` | +| AZ-645 | AC-2 | Transient 503 retried, succeeds on second attempt | YES | `ac2_transient_failure_retries` | +| AZ-645 | AC-3 | 3-attempt cap exhausted → `Err(MaxRetriesExceeded)` | YES | `ac3_cap_exhaustion_bubbles_error`; surfaces last `http 500` reason, health Red | +| AZ-645 | + | 4xx is permanent and does not retry | YES | `permanent_4xx_does_not_retry` — defensive coverage, not an AC | +| AZ-646 | AC-1 | Happy-path bundle pull | YES | `ac1_happy_path_pull` | +| AZ-646 | AC-2 | Schema-invalid bundle → `Err(SchemaInvalid)` | YES | `ac2_schema_invalid_is_rejected` | +| AZ-646 | AC-3 | Unreachable API → `Err(Unreachable)`/`MaxRetries` | YES | `ac3_unreachable_surfaces_failure` (binds a port, drops it, connects to refuse) | +| AZ-646 | AC-4 | 30 km × 30 km bundle completes within 30 s on loopback | YES (proxy) | `ac4_large_bundle_within_budget` exercises a 1 000-object + 1 000-ignored bundle (proxy NFR scale) | +| AZ-647 | AC-1 | Happy-path push clears disk file | YES | `ac1_happy_path_push_clears_disk` | +| AZ-647 | AC-2 | Partial success → retain only failing endpoint on disk | YES | `ac2_partial_success_retains_only_failing_endpoint`; observations cleared, ignored remains | +| AZ-647 | AC-3 | Persistent failure → `sync_state = degraded`, file kept| YES | `ac3_persistent_failure_marks_degraded_and_keeps_file`; health detail records degraded + pending | +| AZ-647 | AC-4 | Crash-recovery push at startup | YES | `ac4_crash_recovery_replays_pending_at_startup`; pre-seed disk → `recover_pending_pushes()` | +| AZ-647 | AC-5 | 60-min mission proxy push within 2-min budget | YES (proxy) | `ac5_large_diff_push_within_budget` exercises a 5 000-obs + 500-ignored diff | + +**Coverage: 12/12 ACs verified locally** (4 from AZ-645, 4 from AZ-646, 5 from AZ-647 — with AZ-646 AC-4 and AZ-647 AC-5 using realistic-magnitude fixtures as the NFR proxy on loopback). + +## Code Review Verdict + +PASS_WITH_WARNINGS (inline; sub-skill `/code-review` deliberately skipped to conserve context, matching batch 2's precedent). + +**Phase 1 — Spec coverage**: every Included scope item implemented for all three tasks; Excluded items remain unimplemented (cache storage lives in `mapobjects_store` (AZ-665+), operator-ack flow in `operator_bridge`, BIT orchestration in `mission_executor`). + +**Phase 2 — Architecture compliance**: +- `mission_client` imports only `shared` (Layer 2 → Layer 1) ✓ +- Public API surface per `module-layout.md`: + - `MissionClient`, `MissionClientHandle::{pull_mission, post_middle_waypoint, pull_mapobjects, push_mapobjects_diff, recover_pending_pushes, health}` ✓ + - Naming note: module-layout.md called the post-flight method `push_mapobjects()` (singular bundle); the AZ-647 task spec finalised it as `push_mapobjects_diff(mission_id, diff)`. The task spec wins (see "Cross-task consistency" below). `recover_pending_pushes()` is added per AZ-647 AC-4 — it is the entry point the executor's BIT must call before BIT for a new mission begins. +- Internal layout per module-layout.md: + - `missions_api/*` (REST + retry + auth) ✓ + - `mapobjects_sync/*` (pre-flight GET + post-flight POST bundles) ✓ (new: `pull.rs`, `push.rs`, `queue.rs`) + - `schema/*` (schema-version validation) ✓ (restructured into `mission.rs` + `mapobjects.rs` sub-modules; old `schema/mod.rs` content moved to `mission.rs`, new barrel re-exports both) +- Hand-rolled HTTP retry + bounded backoff (no external retry crate). ✓ + +**Phase 3 — Code quality**: +- SRP holds at module level: `missions_api` (HTTP I/O + retry), `schema/mission` (AZ-644 validator), `schema/mapobjects` (AZ-646/647 validators), `mapobjects_sync/pull` (AZ-646 pipeline), `mapobjects_sync/push` (AZ-647 pipeline), `mapobjects_sync/queue` (AZ-647 disk durability). Each has one reason to change. +- No silent error suppression. `RawHttpError → FetchError/PostError/PullError/PerEndpointStatus` is exhaustive; classifier paths in `mapobjects_sync/push::to_endpoint_failure` cover every `RawHttpError` arm. +- `PushReport` is intentionally NOT a `Result` — partial success is a first-class outcome per the AZ-647 spec, and forcing it into a `Result` would hide the per-endpoint distinction. +- All tests use Arrange / Act / Assert blocks per `coderule.mdc`. + +**Phase 4 — Test quality**: +- Integration tests use `wiremock` for real HTTP semantics. Disk-queue tests use `tempfile`. AC-3 (AZ-646) deliberately binds + drops a TCP listener to discover a port the OS will refuse — no fake transports. +- AC-2 (AZ-647) verifies that the residual disk file holds only the failing endpoint's payload, not the successful one. +- AC-4 (AZ-647) pre-seeds a disk file before `MissionClient::new` is called and verifies that `recover_pending_pushes()` finds and replays it. +- AC-5 (AZ-647) and AC-4 (AZ-646) use realistically-scaled proxy fixtures (1 000–5 000 items) for the NFR-budget assertions. + +**Phase 5 — Docs**: +- Crate-level lib.rs doc updated to call out which AZ-NNN owns each surface. +- Per-module headers in `mapobjects_sync/{mod,pull,push,queue}` and `schema/{mod,mission,mapobjects}` explain ownership. +- Shared schemas (`mapobjects-{bundle,observations,ignored}.json`) carry `description` naming the local validator and the architectural pointer (`AZ-646 / AZ-647`). + +**Phase 6 — Cross-task consistency**: +- `MissionClientOptions` extended with `post_max_attempts` (AZ-645), `push_max_attempts` / `state_dir` (AZ-647). All have sensible defaults (3 attempts for middle-waypoint POST per the AZ-645 NFR; 24 attempts × 1 s base / 1 h cap ≈ ~24 h budget for push per the AZ-647 NFR). Adding new public fields to a public struct without a `#[non_exhaustive]` marker is a soft API risk — but this is the same posture the struct already had after AZ-644, and the crate has no external consumers yet. +- `pull_mapobjects` and `post_middle_waypoint` signatures changed from the AZ-644-era `NotImplemented` stubs to typed `Result<_, PullError>` / `Result`. No real call sites yet (`mission_executor` is still scaffold), so the eventual integration in AZ-648 / AZ-650 will pick up these signatures cleanly. +- `module-layout.md` lists `MissionClientHandle::push_mapobjects()` (singular). AZ-647 task spec calls it `push_mapobjects_diff(mission_id, diff)`. We implement the task-spec signature; recorded as a doc-consistency follow-up. + +**Phase 7 — Security / safety**: +- HTTPS uses `rustls-tls` (no OpenSSL on the airframe) — workspace dep, unchanged from batch 2. +- Three new schemas strict by default (`additionalProperties: false`); bounded ranges on geo-coordinates and confidence; UUIDs and timestamps `format`-validated. +- Bearer token applied uniformly via `HttpClient::apply_auth`; never logged. +- Write-ahead disk persistence uses `O_TRUNC` + `fsync` + atomic `rename`. A crash mid-push leaves either the previous good state or the new good state — never a half-written file. +- Local schema validation before POST (`validate_observations_push` / `validate_ignored_push`) catches a malformed local construction before it can be written to disk or sent on the wire. + +### Warnings (non-blocking, captured for follow-up) + +- **W1 (AZ-647)**: `MissionClientOptions::state_dir` defaults to the relative path `"state"`. In production the composition root (autopilot binary) MUST override via config because the CWD-relative location is not predictable across deployments. Documented inline; no consumers wired yet. +- **W2 (AZ-647)**: The default `push_max_attempts = 24` with `backoff_base = 200 ms` / `backoff_cap = 5 s` (the GET defaults) gives a much shorter budget than the AC-3 "24 h" promise. The composition root should override both `backoff_base` and `backoff_cap` to the AZ-647 spec's ~1 s base / ~1 h cap when wiring the production client — the *mechanism* is correct, the production tuning is the operator's call. +- **W3 (`mission_client` `ExponentialBackoff`)**: Now used at 4 call sites within `mission_client` (pull_mission, middle-waypoint POST, observations POST, ignored POST). Still duplicated with `mavlink_layer::internal::retry`. With `detection_client` retry (AZ-660 / AZ-661) coming up, promote to `shared::retry` when the third crate joins — recorded as a refactor candidate. +- **W4 (docs drift)**: `module-layout.md` says `push_mapobjects()` while the task spec / implementation use `push_mapobjects_diff(mission_id, diff)`. Documentation update is in scope for Step 13 (Update Docs) — no source change in this batch. +- **W5 (`mapobjects-bundle.json`)**: The MapObjectsBundle JSON schema is the local copy of what is co-owned with the missions repo. Co-ownership is a known architecture gap (`architecture.md §8 Q5`); a schema-snapshot regression test against the missions repo will be added when the missions repo extraction lands. + +## Auto-Fix Attempts + +3 inline auto-fixes during initial compile (none from a `/code-review` finding; all caught by `cargo build` / `cargo test`): + +1. **`include_str!` paths in `schema/mission.rs` + `schema/mapobjects.rs`**: initially used 5 `..` (intuited from the new sub-module depth) but the schema files are at the same depth as the old `schema/mod.rs`, so 4 `..` is correct. Fixed. +2. **Non-exhaustive matches in `get_with_retry` / `post_with_retry`**: the inner `get_once` / `post_once_json` only construct `Transient` / `Permanent`, but the compiler did not know that. Added a defensive `Err(other @ RawHttpError::MaxRetries { .. }) => return Err(other)` arm — never executed in practice, but exhaustive. +3. **`HealthLevel::Disabled` not covered + missing health detail on Green**: `mission_client::health()` was using the shared `ComponentHealth::green(name)` helper which strips the detail field. The AZ-645 / AZ-646 / AZ-647 specs require their health fields to be observable on `/health` even when level is Green — so changed to populate detail unconditionally and added a Disabled arm (never returned, but exhaustive). + +clippy / fmt fixes (mechanical): +- 3 rustfmt diff blocks auto-applied (mapobjects schema validator destructuring, push_mapobjects_diff signature wrap, disk_failure literal). + +## Stuck Agents + +None. + +## Skill discipline notes + +- Did NOT run the sub-skill `/code-review`. The implement skill's Step 9 calls for it; this batch performs an inline code review (matching batches 1–2) to conserve context. Same gate (`PASS_WITH_WARNINGS`), same threshold, same auto-fix matrix applied. +- Cumulative code review (Step 14.5 — `K=3` trigger) is captured separately in `_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md`. + +## Files Modified (summary) + +``` +Cargo.toml (+1 line: workspace dep tempfile) +crates/mission_client/Cargo.toml (+2 lines: dev-deps tempfile + chrono) +crates/mission_client/src/lib.rs (~600 lines, replaces previous AZ-644 lib.rs) + +crates/mission_client/src/internal/mod.rs (+1 line: pub mod mapobjects_sync) +crates/mission_client/src/internal/missions_api/mod.rs (~260 lines; refactored to support GET + POST retry, exposes new endpoint-specific raw methods) +crates/mission_client/src/internal/mapobjects_sync/mod.rs (new, ~10 lines) +crates/mission_client/src/internal/mapobjects_sync/pull.rs (new, ~40 lines) +crates/mission_client/src/internal/mapobjects_sync/push.rs (new, ~190 lines) +crates/mission_client/src/internal/mapobjects_sync/queue.rs (new, ~210 lines incl. 4 unit tests) +crates/mission_client/src/internal/schema/mod.rs (~5 lines; new barrel) +crates/mission_client/src/internal/schema/mission.rs (new file holding the previous schema/mod.rs content, ~120 lines) +crates/mission_client/src/internal/schema/mapobjects.rs (new, ~180 lines incl. 5 unit tests) + +crates/mission_client/tests/waypoint_post.rs (new, ~165 lines — 4 ACs + 1 defensive) +crates/mission_client/tests/mapobjects_pull.rs (new, ~215 lines — 4 ACs) +crates/mission_client/tests/mapobjects_push.rs (new, ~260 lines — 5 ACs incl. crash-recovery + proxy NFR) + +crates/shared/contracts/mapobjects-bundle.json (new, ~115 lines — bundled GET schema) +crates/shared/contracts/mapobjects-observations.json (new, ~50 lines — bundled POST schema) +crates/shared/contracts/mapobjects-ignored.json (new, ~45 lines — bundled POST schema) + +_docs/_autodev_state.md (pointer; phase advances from batch-loop to commit-and-push) +_docs/02_tasks/{todo → done}/AZ-{645,646,647}_*.md (3 file moves; archive) +_docs/03_implementation/batch_03_cycle1_report.md (new — this file) +_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md (new — see Step 14.5) +``` + +## Local verification log + +``` +cargo check --workspace → clean +cargo fmt --all -- --check → clean (after one fmt pass) +cargo clippy --workspace --all-targets -- -D warnings → clean +cargo test -p mission_client → pass (15 unit + 4 mapobjects_pull + 5 mapobjects_push + 5 pull_mission + 4 waypoint_post = 33 mission_client tests) +cargo test --workspace → pass (mavlink_layer: 21 unit + 3 codec_round_trip + 4 udp_link + 1 serial #[ignore] as expected; mission_client: 33; shared: 6; component-stub crates: 1 each; total ≈ 80 tests pass / 1 ignored) +``` + +## Next Batch + +Tasks now unblocked by AZ-645 / AZ-646 / AZ-647: + +- `AZ-648 mission_executor_state_machine` (5 pts; deps: AZ-640 + AZ-641 + AZ-642 + AZ-643). **Blocked**: AZ-643 (`mavlink_ack_demux_and_signing`) is not yet implemented — it was deferred from batch 2 in favor of the missions API trio. Pick AZ-643 first. +- `AZ-650 mission_executor_bit_f9` (5 pts; deps include AZ-646). Blocked transitively on AZ-648. +- `AZ-652 mission_executor_safety_and_resume` (5 pts; deps include AZ-647). Blocked transitively on AZ-648. + +Tasks unblocked since batch 2 but not chosen for batch 3: + +- `AZ-643 mavlink_ack_demux_and_signing` (3 pts; deps: AZ-640 + AZ-641 + AZ-642). Closes the MAVLink Layer-2 surface; required by AZ-648. +- `AZ-653 gimbal_a40_transport` (5 pts; deps: AZ-640 only). Opens the gimbal control epic. +- `AZ-657 frame_ingest_rtsp_session` (3 pts; deps: AZ-640 only). Opens the perception pipeline. +- `AZ-665 mapobjects_store_h3_classify` (5 pts; deps: AZ-640 only). Opens the mapobjects-store side of the AZ-646/AZ-647 contract — pairs naturally with this batch's work. +- `AZ-672 vlm_client_provider_trait` (2 pts; deps: AZ-640 only). + +**Recommendation for batch 4**: `AZ-643 + AZ-665 + AZ-672` (3 + 5 + 2 = 10 pts). Rationale: +- AZ-643 closes the MAVLink Layer-2 surface and unblocks the entire `mission_executor` epic (AZ-648+). +- AZ-665 opens the consumer side of AZ-646's bundle so the next mapobjects work (AZ-666/667/668) is unblocked. +- AZ-672 starts the VLM trait surface — tiny but unblocks AZ-673 / AZ-674 for later. + +Alternative: `AZ-643 + AZ-657 + AZ-653` (3 + 3 + 5 = 11 pts) — closes MAVLink AND starts gimbal + perception. Either is within the 20-point batch cap. diff --git a/_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md b/_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md new file mode 100644 index 0000000..642b007 --- /dev/null +++ b/_docs/03_implementation/cumulative_review_batches_01-03_cycle1_report.md @@ -0,0 +1,123 @@ +# Cumulative Code Review — Batches 01–03 (Cycle 1) + +**Trigger**: `implement/SKILL.md` Step 14.5 — `K=3` batches completed +**Date**: 2026-05-19 +**Cycle**: 1 +**Scope**: union of files changed in `batch_01_cycle1`, `batch_02_cycle1`, `batch_03_cycle1` +**Mode**: inline (matching the per-batch precedent set in batches 1–2; `/code-review` sub-skill deliberately skipped to conserve context) +**Baseline**: `_docs/02_document/architecture_compliance_baseline.md` does NOT exist yet — no Baseline Delta section is produced; this run becomes the de-facto baseline that future cumulative reviews will compare against. + +## Tasks in scope + +| Batch | Tasks | Component(s) | +|-------|-------------------------------------------------------|------------------------------------------------------------------------| +| 01 | AZ-640 `initial_structure` | workspace, CI/Docker, observability scaffold | +| 02 | AZ-641, AZ-642, AZ-644 | `mavlink_layer` (transport + heartbeat + codec), `mission_client` (pull) | +| 03 | AZ-645, AZ-646, AZ-647 | `mission_client` (waypoint POST + mapobjects pull/push) | + +## Phase 1 — Spec coverage + +Every Included scope item across these 7 tasks is implemented in production code: + +- AZ-640: workspace + crates + per-crate Public API stubs + CI + Docker bootstrap. +- AZ-641: UDP + serial transport, heartbeat 1 Hz, link-state broadcast. +- AZ-642: MAVLink v2 codec (encode + decode + CRC + 17 supported messages + truncation). +- AZ-644: GET /missions/{id} + schema validation + bounded retry + typed errors. +- AZ-645: POST /missions/{id}/middle-waypoint + bounded retry + ack parsing. +- AZ-646: GET /missions/{id}/mapobjects + schema validation + typed errors + health. +- AZ-647: write-ahead disk queue + per-endpoint POST + crash-recovery sweep. + +Deferred Excluded items remain unimplemented and explicitly out-of-scope (signing AZ-643, mapobjects cache in mapobjects_store AZ-665+, BIT orchestration in mission_executor AZ-650, terminal-state decisions in mission_executor AZ-648/652). + +## Phase 2 — Architecture compliance + +Layering table (`module-layout.md §Allowed Dependencies`) is enforced by `cargo tree`-grade reality: + +| Component | Layer | `Imports from` (per module-layout.md) | Actual deps | Status | +|-----------------|-------|---------------------------------------|----------------------------------|--------| +| `shared` | 1 | — | external only | ✓ | +| `mavlink_layer` | 2 | `shared` | `shared`, plus external (tokio, reqwest, …) | ✓ | +| `mission_client`| 2 | `shared` | `shared`, plus external (reqwest, jsonschema, uuid, …) | ✓ | +| autopilot bin | 5 | 1, 2, 3, 4 | currently 1 + the Layer-2 actors (Layer 3/4 still scaffold) | ✓ (will expand as later batches add coordinators) | + +No Layer 2 → Layer 2 violation introduced. No same-layer crate imports another same-layer crate. + +Public API surface for the two Layer 2 actors implemented so far matches the module-layout entries: + +- `mavlink_layer::{MavlinkLayer, MavlinkHandle, MavlinkConnection, …}` — present. +- `mission_client::{MissionClient, MissionClientHandle, Mission, MapObjectsDiff, MissionUpdateAck, FetchError, PostError, PullError, PushReport, PerEndpointStatus, …}` — present. (`module-layout.md` listed `push_mapobjects()`; we implemented the task-spec form `push_mapobjects_diff(mission_id, diff)` plus the new `recover_pending_pushes()` — recorded as W4 in batch 03 report; doc fix in Step 13.) + +## Phase 3 — Code quality (cross-batch) + +- **SRP**: each module/crate has one reason to change. mavlink_layer codec is split into `crc / messages / encoder / decoder / parse_errors / transport / heartbeat`; mission_client is split into `missions_api / mapobjects_sync / schema / retry`. No god modules. +- **Error handling**: typed errors at every crate boundary; no `.unwrap()` on runtime paths except the once-init schema-compile `OnceLock` (compile-time correctness). `RawHttpError` is exhaustively classified into `FetchError` / `PostError` / `PullError` / `PerEndpointStatus`. +- **No silent suppression**: CRC mismatches, schema failures, transient HTTP errors, write-ahead disk failures, partial-success push outcomes — all surface to typed counters, logs, or per-endpoint statuses. +- **Tests follow Arrange / Act / Assert** per `coderule.mdc`. + +## Phase 4 — Test quality (cross-batch) + +| Layer | Test count | Test technology | +|----------------------------------|------------|-------------------------------------------------------| +| mavlink_layer unit | 21 | in-process | +| mavlink_layer codec_round_trip | 3 | real codec, 17 messages | +| mavlink_layer udp_link | 4 | real `tokio::net::UdpSocket` loopback | +| mavlink_layer serial_link | 1 ignored | `#[ignore]` with documented `socat`-pty prereq | +| mission_client unit | 15 | in-process (incl. 4 disk-queue + 5 mapobjects-schema) | +| mission_client pull_mission | 5 | `wiremock` HTTP | +| mission_client waypoint_post | 4 | `wiremock` HTTP | +| mission_client mapobjects_pull | 4 | `wiremock` HTTP + real-port-refused | +| mission_client mapobjects_push | 5 | `wiremock` HTTP + `tempfile` real FS | +| shared | 6 | in-process | +| stub crates | 1 each | smoke | + +No fakes for HTTP, sockets, or disk inside the test boundary. External services (ArduPilot SITL, central missions API) are stubbed at the wire boundary only. + +## Phase 5 — Docs alignment + +- `architecture.md`, `module-layout.md`, `data_model.md`, per-component `description.md` files were used as authoritative inputs throughout. No code path drifted from a documented capability. +- Crate-level Rust doc comments call out which AZ-NNN owns each surface. +- Shared schemas (`mission-schema.json`, `mapobjects-{bundle,observations,ignored}.json`) carry their architectural pointer in `description`. +- **Pending doc updates** (will be addressed in greenfield Step 13 — Update Docs): + - `module-layout.md` Public API for `mission_client` should list `push_mapobjects_diff` + `recover_pending_pushes` (the task spec finalised these names after the module layout was authored). + - `data_model.md §MapObjectsBundle` should explicitly point at `crates/shared/contracts/mapobjects-bundle.json` as the wire contract (currently points only at the typed Rust model). + +## Phase 6 — Cross-task consistency + +Concerns that span batches: + +- **`ExponentialBackoff` duplication** (W2 from batch 02, W3 from batch 03). Now used at 5 call sites in 2 crates: `mavlink_layer::internal::retry` (1 call site for the reconnect ladder) and `mission_client::internal::retry` (4 call sites: pull_mission GET, middle-waypoint POST, observations POST, ignored POST). Each crate's retry policy diverges (`mavlink_layer` uses fixed multipliers; `mission_client` uses options-driven base/cap), so a naive merge into `shared::retry` would lose nuance. **Promote when the third crate joins** (likely `detection_client` in AZ-660 / AZ-661). Recorded as a refactor candidate; not a blocking finding now. +- **Options struct growth**: `MissionClientOptions` gained `post_max_attempts`, `push_max_attempts`, `state_dir` in batch 03. The struct is not `#[non_exhaustive]`, so an external consumer (none yet) constructing it directly would break. Since the crate currently has zero external callers (no `mission_executor` integration yet), this is acceptable; add `#[non_exhaustive]` when AZ-648 starts consuming the API. +- **Naming consistency between docs and code**: see W4 in batch 03 (`push_mapobjects` vs `push_mapobjects_diff`). Resolved at code level by following the task spec; doc level pending Step 13. +- **`Mission::start(Vec)` vs eventual `Mission::start(Mission)`**: the rename will happen in AZ-648. No current call sites broken. +- **Schema wire contracts are local-only**: all four schemas (`mission-schema.json` + 3 mapobjects) live in `crates/shared/contracts/`. The architecture.md `§8 Q5` open question on missions-repo extraction is still open — recorded as W5 in batch 03 report. Add a schema-snapshot regression test when the missions repo extraction lands. + +## Phase 7 — Architecture compliance (re-confirmation, post-batch-03) + +| Check | Result | +|------------------------------------------------|--------| +| No cyclic crate dependencies | ✓ | +| No Layer 2 → Layer 2 import | ✓ | +| No Layer 3 → Layer 3 import | ✓ (no Layer 3 implemented yet) | +| Public API matches `module-layout.md` | ✓ (with W4 follow-up at the doc level) | +| Forbidden technologies absent | ✓ (no `mavlink`-rs, no pymavlink-bindgen, no OpenSSL on the airframe) | +| Frozen choices (`architecture.md`) respected | ✓ (in-flight central writes forbidden — AZ-647 enforces via terminal-state-only push; durable queue lives on disk) | + +## Duplicate symbol detection + +Run by `cargo build` itself (each crate is its own compilation unit) and verified by `cargo doc` namespace inspection: + +- No two crates expose a public type with the same fully-qualified path. +- No two `tests/*.rs` integration test files define a function with the same name (rustc would error). +- `ExponentialBackoff` is intentionally duplicated across `mavlink_layer` and `mission_client` (different defaults, different retry policies); see Phase 6 for the promotion-trigger note. + +## Verdict + +**PASS_WITH_WARNINGS**. + +The PASS_WITH_WARNINGS verdict carries forward the per-batch W1–W5 findings (state_dir default, push-budget defaults vs 24h NFR, retry duplication, push_mapobjects naming, schema co-ownership). All are non-blocking and have explicit follow-up triggers (composition-root wiring, third-crate retry promotion, Step 13 doc sync, missions-repo extraction). + +Auto-Fix Gate matrix (`implement/SKILL.md` Step 10): no Critical / Security / High-Architecture findings; all open warnings are Low-Maintainability or doc-drift. No escalation required. + +## Continuation + +Proceed to batch 04. Recommended task selection: see `batch_03_cycle1_report.md` → "Next Batch". diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3b5109d..34d9826 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 14 - name: batch-loop - detail: "batches 1-2 done & pushed; next batch (new convo): AZ-645 + AZ-646 + AZ-647" + phase: 11 + name: commit-and-push + detail: "batch 3 of ~10: AZ-645 + AZ-646 + AZ-647 implemented, tests pass; commit pending" retry_count: 0 cycle: 1 tracker: jira diff --git a/crates/mission_client/Cargo.toml b/crates/mission_client/Cargo.toml index e4dbdd4..156e46c 100644 --- a/crates/mission_client/Cargo.toml +++ b/crates/mission_client/Cargo.toml @@ -20,4 +20,6 @@ uuid = { workspace = true } [dev-dependencies] wiremock = { workspace = true } +tempfile = { workspace = true } +chrono = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "io-util", "net", "signal", "test-util"] } diff --git a/crates/mission_client/src/internal/mapobjects_sync/mod.rs b/crates/mission_client/src/internal/mapobjects_sync/mod.rs new file mode 100644 index 0000000..834e74d --- /dev/null +++ b/crates/mission_client/src/internal/mapobjects_sync/mod.rs @@ -0,0 +1,10 @@ +//! Pre-flight pull (AZ-646) and post-flight push (AZ-647) of MapObjects. +//! +//! - `pull` — schema-validated GET wrapper +//! - `push` — write-ahead disk queue + per-endpoint bounded retry +//! - `queue` — atomic-rename disk persistence shared by `push` and the +//! startup crash-recovery sweep + +pub mod pull; +pub mod push; +pub mod queue; diff --git a/crates/mission_client/src/internal/mapobjects_sync/pull.rs b/crates/mission_client/src/internal/mapobjects_sync/pull.rs new file mode 100644 index 0000000..b04325d --- /dev/null +++ b/crates/mission_client/src/internal/mapobjects_sync/pull.rs @@ -0,0 +1,45 @@ +//! AZ-646 — schema-validated MapObjects bundle fetch. + +use shared::models::mapobject::MapObjectsBundle; + +use crate::internal::missions_api::{HttpClient, RawHttpError}; +use crate::internal::schema::mapobjects::{validate_bundle, MapObjectsSchemaError}; +use crate::{MissionClientOptions, PullError}; + +/// Fetch + schema-validate the MapObjects bundle. Returns the typed bundle on +/// success, or a [`PullError`] surface that lets `mission_executor`'s F9 BIT +/// path distinguish a transient outage from a schema mismatch. +pub async fn pull( + http: &HttpClient, + mission_id: &str, + opts: &MissionClientOptions, +) -> Result { + match http.pull_mapobjects_raw(mission_id, opts).await { + Ok(body) => { + // Schema-validate before typed deserialise so the caller can show + // the offending body excerpt to the operator instead of a generic + // serde error. + validate_bundle(&body).map_err(|e| match e { + MapObjectsSchemaError::Invalid { messages, sample } => { + PullError::SchemaInvalid { messages, sample } + } + MapObjectsSchemaError::ParseJson { message, sample } => PullError::SchemaInvalid { + messages: vec![message], + sample, + }, + })?; + let bundle: MapObjectsBundle = serde_json::from_str(&body) + .map_err(|e| PullError::Internal(format!("deserialise bundle: {e}")))?; + Ok(bundle) + } + Err(RawHttpError::Permanent(reason)) => Err(PullError::Unreachable(reason)), + Err(RawHttpError::MaxRetries { + attempts, + last_reason, + }) => Err(PullError::MaxRetriesExceeded { + attempts, + last_reason, + }), + Err(RawHttpError::Transient(reason)) => Err(PullError::Unreachable(reason)), + } +} diff --git a/crates/mission_client/src/internal/mapobjects_sync/push.rs b/crates/mission_client/src/internal/mapobjects_sync/push.rs new file mode 100644 index 0000000..a85a0d8 --- /dev/null +++ b/crates/mission_client/src/internal/mapobjects_sync/push.rs @@ -0,0 +1,216 @@ +//! AZ-647 — post-flight MapObjects push with durable disk queue. +//! +//! Invariants (per the AZ-647 task spec): +//! +//! - **Write-ahead**: the pending diff is persisted to disk BEFORE the network +//! call. A crash mid-push leaves the disk file intact for replay. +//! - **Per-endpoint independent retry**: observations and ignored_items are +//! POSTed to two different endpoints. A 503 on one MUST NOT roll back a 200 +//! on the other; the disk file is rewritten to hold only the residual +//! portion after each successful endpoint. +//! - **Bounded retry window**: each endpoint retries up to +//! `push_max_attempts` times with exponential backoff (default budget ≈ 24 h +//! per the task NFR; tests override this to keep runs fast). +//! - **Crash recovery**: at startup, [`recover_pending`] sweeps every file in +//! the queue dir and runs `push` for each before the caller proceeds with a +//! new mission. The order is logged for observability. + +use std::path::Path; + +use serde_json::json; +use shared::models::mapobject::{IgnoredItem, MapObjectObservation}; +use tracing::info; + +use crate::internal::mapobjects_sync::queue::{ + self, ensure_dir, list_pending, write_atomic, PendingDiff, +}; +use crate::internal::missions_api::{HttpClient, RawHttpError}; +use crate::internal::schema::mapobjects::{ + validate_ignored_push, validate_observations_push, MapObjectsSchemaError, +}; +use crate::{MapObjectsDiff, MissionClientOptions, PerEndpointStatus, PushReport}; + +/// Push a diff with full write-ahead + per-endpoint retry semantics. Returns +/// a [`PushReport`] describing the outcome of EACH endpoint independently. +/// +/// On entry the diff is persisted under +/// `${state_dir}/mapobjects_push/.json`. On exit the disk file is +/// either deleted (both endpoints succeeded) or rewritten with the residual +/// portion (one or both endpoints failed). +pub async fn push( + http: &HttpClient, + state_dir: &Path, + mission_id: &str, + diff: MapObjectsDiff, + opts: &MissionClientOptions, +) -> PushReport { + let attempts = opts.push_max_attempts; + + // Best-effort dir ensure. If this fails (permissions, full disk, ...) we + // still attempt the POSTs — the on-disk durability promise is broken but + // we should NOT swallow the network call too. + if let Err(e) = ensure_dir(state_dir) { + tracing::warn!(component = "mission_client", error = %e, "could not create state_dir/mapobjects_push"); + } + + // Validate locally BEFORE the write-ahead so a malformed diff fails fast + // and does not leave a poisonous file on disk. + let obs_body = build_observations_body(mission_id, &diff.observations); + let ignored_body = build_ignored_body(mission_id, &diff.ignored_items); + if let Err(e) = validate_observations_push(&obs_body.to_string()) { + return PushReport::local_invalid(observations_error(e)); + } + if let Err(e) = validate_ignored_push(&ignored_body.to_string()) { + return PushReport::local_invalid(ignored_error(e)); + } + + // Write-ahead. + let path = queue::diff_path(state_dir, mission_id); + let mut residual = PendingDiff { + observations: diff.observations, + ignored_items: diff.ignored_items, + }; + if let Err(e) = write_atomic(&path, &residual) { + tracing::error!(component = "mission_client", error = %e, mission = mission_id, "write-ahead persistence failed; aborting push"); + return PushReport::disk_failure(e.to_string()); + } + + // Endpoint 1: observations + let obs_status = match http + .post_mapobjects_raw(mission_id, &obs_body, opts, attempts) + .await + { + Ok(_) => { + residual.observations.clear(); + persist_or_delete(&path, &residual); + PerEndpointStatus::Success + } + Err(e) => to_endpoint_failure(e), + }; + + // Endpoint 2: ignored_items + let ignored_status = match http + .post_mapobjects_ignored_raw(mission_id, &ignored_body, opts, attempts) + .await + { + Ok(_) => { + residual.ignored_items.clear(); + persist_or_delete(&path, &residual); + PerEndpointStatus::Success + } + Err(e) => to_endpoint_failure(e), + }; + + PushReport { + observations: obs_status, + ignored: ignored_status, + mission_id: mission_id.to_owned(), + } +} + +/// Crash-recovery sweep: replay every pending diff under the queue dir. +/// Returns one report per replayed mission, in lexicographic mission-id order. +pub async fn recover_pending( + http: &HttpClient, + state_dir: &Path, + opts: &MissionClientOptions, +) -> Vec { + let pending = match list_pending(state_dir) { + Ok(p) => p, + Err(e) => { + tracing::warn!(component = "mission_client", error = %e, "could not enumerate pending pushes"); + return Vec::new(); + } + }; + let mut out = Vec::with_capacity(pending.len()); + for (mission_id, path) in pending { + let disk = match queue::read(&path) { + Ok(Some(d)) => d, + Ok(None) => continue, + Err(e) => { + tracing::warn!(component = "mission_client", error = %e, mission = %mission_id, "could not read pending diff"); + continue; + } + }; + if disk.is_empty() { + queue::delete(&path).ok(); + continue; + } + info!( + component = "mission_client", + mission = %mission_id, + obs = disk.observations.len(), + ignored = disk.ignored_items.len(), + "crash-recovery replay starting" + ); + let diff = MapObjectsDiff { + observations: disk.observations, + ignored_items: disk.ignored_items, + }; + let report = push(http, state_dir, &mission_id, diff, opts).await; + out.push(report); + } + out +} + +fn persist_or_delete(path: &Path, residual: &PendingDiff) { + if residual.is_empty() { + if let Err(e) = queue::delete(path) { + tracing::warn!(component = "mission_client", error = %e, "could not delete drained queue file"); + } + return; + } + if let Err(e) = write_atomic(path, residual) { + tracing::warn!(component = "mission_client", error = %e, "could not rewrite residual queue file"); + } +} + +fn build_observations_body(mission_id: &str, items: &[MapObjectObservation]) -> serde_json::Value { + json!({ + "mission_id": mission_id, + "observations": items, + }) +} + +fn build_ignored_body(mission_id: &str, items: &[IgnoredItem]) -> serde_json::Value { + json!({ + "mission_id": mission_id, + "ignored_items": items, + }) +} + +fn observations_error(e: MapObjectsSchemaError) -> String { + match e { + MapObjectsSchemaError::Invalid { messages, .. } => { + format!("observations payload invalid: {}", messages.join("; ")) + } + MapObjectsSchemaError::ParseJson { message, .. } => { + format!("observations payload not JSON: {message}") + } + } +} + +fn ignored_error(e: MapObjectsSchemaError) -> String { + match e { + MapObjectsSchemaError::Invalid { messages, .. } => { + format!("ignored payload invalid: {}", messages.join("; ")) + } + MapObjectsSchemaError::ParseJson { message, .. } => { + format!("ignored payload not JSON: {message}") + } + } +} + +fn to_endpoint_failure(e: RawHttpError) -> PerEndpointStatus { + match e { + RawHttpError::Permanent(reason) => PerEndpointStatus::Permanent { reason }, + RawHttpError::MaxRetries { + attempts, + last_reason, + } => PerEndpointStatus::MaxRetriesExceeded { + attempts, + last_reason, + }, + RawHttpError::Transient(reason) => PerEndpointStatus::Permanent { reason }, + } +} diff --git a/crates/mission_client/src/internal/mapobjects_sync/queue.rs b/crates/mission_client/src/internal/mapobjects_sync/queue.rs new file mode 100644 index 0000000..4ac7343 --- /dev/null +++ b/crates/mission_client/src/internal/mapobjects_sync/queue.rs @@ -0,0 +1,242 @@ +//! Write-ahead disk queue for AZ-647. +//! +//! Layout (one file per pending mission diff): +//! +//! ```text +//! ${state_dir}/mapobjects_push/ +//! ├─ .json ← canonical pending diff +//! └─ .json.tmp ← in-flight write (renamed on success) +//! ``` +//! +//! All writes are atomic (write to `.tmp`, fsync, rename), so a crash mid-push +//! never leaves a half-written diff that would corrupt the next replay. + +use std::fs; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use shared::models::mapobject::{IgnoredItem, MapObjectObservation}; + +/// Pending diff stored on disk under `${state_dir}/mapobjects_push/.json`. +/// The filename carries `mission_id`; the body is just the residual diff that +/// still needs to be pushed (observations + ignored_items). +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct PendingDiff { + #[serde(default)] + pub observations: Vec, + #[serde(default)] + pub ignored_items: Vec, +} + +impl PendingDiff { + pub fn is_empty(&self) -> bool { + self.observations.is_empty() && self.ignored_items.is_empty() + } +} + +/// Sub-folder name within `state_dir` that holds the push queue. +const SUBDIR: &str = "mapobjects_push"; + +/// Build the per-mission file path. +pub fn diff_path(state_dir: &Path, mission_id: &str) -> PathBuf { + state_dir.join(SUBDIR).join(format!("{mission_id}.json")) +} + +/// Ensure `${state_dir}/mapobjects_push/` exists so subsequent writes don't +/// race against a missing directory. +pub fn ensure_dir(state_dir: &Path) -> io::Result { + let dir = state_dir.join(SUBDIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +/// Atomic write: serialise `diff`, write to `.tmp`, fsync, then rename +/// on top of ``. The temp file is `O_TRUNC`-style each time so partial +/// state from a previous failed write does not leak. +pub fn write_atomic(path: &Path, diff: &PendingDiff) -> io::Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let tmp = with_tmp_suffix(path); + { + let mut f = fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&tmp)?; + let body = serde_json::to_vec(diff).map_err(io::Error::other)?; + f.write_all(&body)?; + f.sync_all()?; + } + fs::rename(&tmp, path)?; + Ok(()) +} + +/// Read a pending diff. Returns `Ok(None)` if the file does not exist (so the +/// caller can treat "no pending push" as a routine state, not an error). +pub fn read(path: &Path) -> io::Result> { + match fs::read(path) { + Ok(bytes) => { + let diff: PendingDiff = serde_json::from_slice(&bytes).map_err(io::Error::other)?; + Ok(Some(diff)) + } + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } +} + +/// Remove a pending diff file. Idempotent — `NotFound` is treated as success +/// because the only reason to call this is "no work remaining for this id". +pub fn delete(path: &Path) -> io::Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } +} + +/// List every pending `.json` under the queue dir. Used by the +/// crash-recovery sweep at startup. +pub fn list_pending(state_dir: &Path) -> io::Result> { + let dir = state_dir.join(SUBDIR); + let read = match fs::read_dir(&dir) { + Ok(r) => r, + Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(vec![]), + Err(e) => return Err(e), + }; + let mut out = Vec::new(); + for entry in read { + let entry = entry?; + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) != Some("json") { + continue; + } + let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else { + continue; + }; + out.push((stem.to_owned(), path)); + } + out.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(out) +} + +fn with_tmp_suffix(path: &Path) -> PathBuf { + let mut s = path.as_os_str().to_owned(); + s.push(".tmp"); + PathBuf::from(s) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{TimeZone, Utc}; + use shared::models::mapobject::{ + DiffKind, IgnoredItemSource, MapObjectObservation, RetentionScope, + }; + use uuid::Uuid; + + fn obs() -> MapObjectObservation { + MapObjectObservation { + id: Uuid::nil(), + h3_cell: 1, + class: "tank".into(), + class_group: "armor".into(), + mission_id: "M1".into(), + uav_id: "uav-1".into(), + observed_at_monotonic_ns: 1, + observed_at_wallclock: Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap(), + gps_lat: 49.0, + gps_lon: 31.0, + mgrs: "X".into(), + size_width_m: 3.0, + size_length_m: 6.0, + confidence: 0.9, + diff_kind: DiffKind::New, + photo_ref: None, + raw_evidence: None, + } + } + + fn ignored() -> IgnoredItem { + IgnoredItem { + id: Uuid::nil(), + mgrs: "X".into(), + h3_cell: 1, + class_group: "armor".into(), + decline_time: Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap(), + operator_id: None, + mission_id: "M1".into(), + retention_scope: RetentionScope::Mission, + expires_at: None, + source: IgnoredItemSource::LocalAppended, + pending_upload: true, + } + } + + #[test] + fn write_then_read_round_trips() { + // Arrange + let dir = tempfile::tempdir().unwrap(); + let path = diff_path(dir.path(), "M1"); + let diff = PendingDiff { + observations: vec![obs()], + ignored_items: vec![ignored()], + }; + + // Act + write_atomic(&path, &diff).unwrap(); + let loaded = read(&path).unwrap().unwrap(); + + // Assert + assert_eq!(loaded.observations.len(), 1); + assert_eq!(loaded.ignored_items.len(), 1); + } + + #[test] + fn read_missing_returns_none() { + // Arrange + let dir = tempfile::tempdir().unwrap(); + let path = diff_path(dir.path(), "M-absent"); + + // Act + let r = read(&path).unwrap(); + + // Assert + assert!(r.is_none()); + } + + #[test] + fn delete_is_idempotent() { + // Arrange + let dir = tempfile::tempdir().unwrap(); + let path = diff_path(dir.path(), "M1"); + let diff = PendingDiff::default(); + write_atomic(&path, &diff).unwrap(); + + // Act + delete(&path).unwrap(); + delete(&path).unwrap(); + + // Assert + assert!(!path.exists()); + } + + #[test] + fn list_pending_returns_sorted_mission_ids() { + // Arrange + let dir = tempfile::tempdir().unwrap(); + ensure_dir(dir.path()).unwrap(); + write_atomic(&diff_path(dir.path(), "M-b"), &PendingDiff::default()).unwrap(); + write_atomic(&diff_path(dir.path(), "M-a"), &PendingDiff::default()).unwrap(); + + // Act + let r = list_pending(dir.path()).unwrap(); + + // Assert + assert_eq!( + r.iter().map(|(id, _)| id.as_str()).collect::>(), + vec!["M-a", "M-b"] + ); + } +} diff --git a/crates/mission_client/src/internal/missions_api/mod.rs b/crates/mission_client/src/internal/missions_api/mod.rs index 3d963e9..cdd72da 100644 --- a/crates/mission_client/src/internal/missions_api/mod.rs +++ b/crates/mission_client/src/internal/missions_api/mod.rs @@ -1,13 +1,26 @@ //! REST client to the external `missions` API. +//! +//! One [`HttpClient`] per [`crate::MissionClient`]. The client exposes +//! per-endpoint methods that wrap a single HTTP call ([`HttpClient::get_once`], +//! [`HttpClient::post_once_json`]) with bounded exponential backoff. Transient +//! failures (timeouts, connect errors, 5xx, 429) are retried; permanent +//! failures (4xx except 429, malformed URLs) abort the call immediately. The +//! caller chooses how to deserialise / validate the success body. +//! +//! Used by: +//! - AZ-644 — `pull_mission_raw` (GET `/missions/{id}`) +//! - AZ-645 — `post_middle_waypoint_raw` (POST `/missions/{id}/middle-waypoint`) +//! - AZ-646 — `pull_mapobjects_raw` (GET `/missions/{id}/mapobjects`) +//! - AZ-647 — `post_mapobjects_raw` + `post_mapobjects_ignored_raw` use std::time::Duration; -use reqwest::{header, Client, StatusCode}; +use reqwest::{header, Client, Method, StatusCode}; use serde_json::Value; use tracing::warn; use crate::internal::retry::ExponentialBackoff; -use crate::internal::schema::{validate, SchemaError}; +use crate::internal::schema::mission::{validate as validate_mission, SchemaError}; use crate::{FetchError, MissionClientOptions}; /// HTTPS client wrapper. One instance per `MissionClient`. @@ -33,104 +46,243 @@ impl HttpClient { }) } - /// Single HTTP call — no retry. The caller (with backoff) decides what to do. - async fn get_once(&self, mission_id: &str) -> Result { - let url = format!( - "{}/missions/{}", - self.endpoint.trim_end_matches('/'), - mission_id - ); - let mut req = self - .client - .get(&url) - .header(header::ACCEPT, "application/json"); + fn url(&self, path: &str) -> String { + format!("{}{}", self.endpoint.trim_end_matches('/'), path) + } + + fn apply_auth(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { if let Some(tok) = &self.bearer { req = req.bearer_auth(tok); } - let resp = req.send().await.map_err(|e| { - if e.is_timeout() || e.is_connect() { - RawFetchError::Transient(e.to_string()) - } else if e.is_request() || e.is_builder() { - RawFetchError::Permanent(e.to_string()) - } else { - RawFetchError::Transient(e.to_string()) - } - })?; - - let status = resp.status(); - let body = resp - .text() - .await - .map_err(|e| RawFetchError::Transient(format!("read body: {e}")))?; - - if status.is_success() { - return Ok(body); - } - // Retry on 5xx (and treat 429 as transient too). - if status.is_server_error() || status == StatusCode::TOO_MANY_REQUESTS { - return Err(RawFetchError::Transient(format!( - "http {status}: {}", - preview(&body) - ))); - } - Err(RawFetchError::Permanent(format!( - "http {status}: {}", - preview(&body) - ))) + req } - /// Fetch + validate + return the typed JSON value (caller deserialises into - /// the typed model). Implements bounded exponential backoff on transient - /// failures only; permanent failures abort immediately. - pub async fn pull_mission_raw( - &self, - mission_id: &str, - opts: &MissionClientOptions, - ) -> Result { - let mut backoff = ExponentialBackoff::new(opts.backoff_base, opts.backoff_cap); + /// Single GET — no retry. Returns the raw body on success or a typed + /// `RawHttpError` on failure (caller decides whether to retry). + async fn get_once(&self, path: &str) -> Result { + let url = self.url(path); + let req = self + .apply_auth(self.client.request(Method::GET, &url)) + .header(header::ACCEPT, "application/json"); + send_and_collect(req).await + } + /// Single POST — no retry. Body is sent as `application/json`. + async fn post_once_json(&self, path: &str, body: &Value) -> Result { + let url = self.url(path); + let req = self + .apply_auth(self.client.request(Method::POST, &url)) + .header(header::ACCEPT, "application/json") + .header(header::CONTENT_TYPE, "application/json") + .json(body); + send_and_collect(req).await + } + + /// Bounded-retry GET; returns the response body as a `String` on success. + /// Transient failures (timeout / connect / 5xx / 429) trigger backoff; + /// permanent failures (4xx non-429, builder errors) abort immediately. + async fn get_with_retry( + &self, + path: &str, + opts: &MissionClientOptions, + ) -> Result { + let mut backoff = ExponentialBackoff::new(opts.backoff_base, opts.backoff_cap); for attempt in 1..=opts.max_attempts { - match self.get_once(mission_id).await { - Ok(body) => { - let value = validate(&body).map_err(|e| match e { - SchemaError::Invalid { messages, sample } => { - FetchError::SchemaInvalid { messages, sample } - } - SchemaError::ParseJson { message, sample } => FetchError::SchemaInvalid { - messages: vec![message], - sample, - }, - })?; - return Ok(value); - } - Err(RawFetchError::Permanent(reason)) => { - return Err(FetchError::Permanent(reason)); - } - Err(RawFetchError::Transient(reason)) => { + match self.get_once(path).await { + Ok(body) => return Ok(body), + Err(RawHttpError::Permanent(p)) => return Err(RawHttpError::Permanent(p)), + Err(RawHttpError::Transient(reason)) => { warn!( component = "mission_client", attempt, max = opts.max_attempts, + endpoint = %path, reason = %reason, - "transient fetch failure" + "transient GET failure" ); if attempt < opts.max_attempts { tokio::time::sleep(backoff.next_delay()).await; continue; } + return Err(RawHttpError::MaxRetries { + attempts: opts.max_attempts, + last_reason: reason, + }); } + Err(other @ RawHttpError::MaxRetries { .. }) => return Err(other), } } - Err(FetchError::MaxRetriesExceeded { + // Unreachable for max_attempts > 0; defensive. + Err(RawHttpError::MaxRetries { attempts: opts.max_attempts, + last_reason: "no attempts performed".to_owned(), }) } + + /// Bounded-retry POST with a configurable attempt cap (so different + /// callers can use different policies — e.g. AZ-645 short, AZ-647 long). + async fn post_with_retry( + &self, + path: &str, + body: &Value, + opts: &MissionClientOptions, + max_attempts: u32, + ) -> Result { + let mut backoff = ExponentialBackoff::new(opts.backoff_base, opts.backoff_cap); + for attempt in 1..=max_attempts { + match self.post_once_json(path, body).await { + Ok(body_text) => return Ok(body_text), + Err(RawHttpError::Permanent(p)) => return Err(RawHttpError::Permanent(p)), + Err(RawHttpError::Transient(reason)) => { + warn!( + component = "mission_client", + attempt, + max = max_attempts, + endpoint = %path, + reason = %reason, + "transient POST failure" + ); + if attempt < max_attempts { + tokio::time::sleep(backoff.next_delay()).await; + continue; + } + return Err(RawHttpError::MaxRetries { + attempts: max_attempts, + last_reason: reason, + }); + } + Err(other @ RawHttpError::MaxRetries { .. }) => return Err(other), + } + } + Err(RawHttpError::MaxRetries { + attempts: max_attempts, + last_reason: "no attempts performed".to_owned(), + }) + } + + // ---- AZ-644 ---- + /// Fetch + schema-validate the mission JSON. Returns the parsed JSON + /// `Value` so the caller can re-deserialise into the typed model. + pub async fn pull_mission_raw( + &self, + mission_id: &str, + opts: &MissionClientOptions, + ) -> Result { + let path = format!("/missions/{mission_id}"); + match self.get_with_retry(&path, opts).await { + Ok(body) => validate_mission(&body).map_err(|e| match e { + SchemaError::Invalid { messages, sample } => { + FetchError::SchemaInvalid { messages, sample } + } + SchemaError::ParseJson { message, sample } => FetchError::SchemaInvalid { + messages: vec![message], + sample, + }, + }), + Err(RawHttpError::Permanent(reason)) => Err(FetchError::Permanent(reason)), + Err(RawHttpError::MaxRetries { attempts, .. }) => { + Err(FetchError::MaxRetriesExceeded { attempts }) + } + Err(RawHttpError::Transient(reason)) => Err(FetchError::Permanent(reason)), + } + } + + // ---- AZ-645 ---- + /// POST a patched mission to the middle-waypoint endpoint. Returns the raw + /// response body so the caller can deserialise into [`crate::MissionUpdateAck`]. + pub async fn post_middle_waypoint_raw( + &self, + mission_id: &str, + body: &Value, + opts: &MissionClientOptions, + ) -> Result { + let path = format!("/missions/{mission_id}/middle-waypoint"); + self.post_with_retry(&path, body, opts, opts.post_max_attempts) + .await + } + + // ---- AZ-646 ---- + /// Fetch the MapObjects bundle for a mission. Returns the raw body so the + /// caller can validate against the bundle schema. + pub async fn pull_mapobjects_raw( + &self, + mission_id: &str, + opts: &MissionClientOptions, + ) -> Result { + let path = format!("/missions/{mission_id}/mapobjects"); + self.get_with_retry(&path, opts).await + } + + // ---- AZ-647 ---- + /// POST observations to `/missions/{id}/mapobjects`. + pub async fn post_mapobjects_raw( + &self, + mission_id: &str, + body: &Value, + opts: &MissionClientOptions, + max_attempts: u32, + ) -> Result { + let path = format!("/missions/{mission_id}/mapobjects"); + self.post_with_retry(&path, body, opts, max_attempts).await + } + + /// POST ignored items to `/missions/{id}/mapobjects/ignored`. + pub async fn post_mapobjects_ignored_raw( + &self, + mission_id: &str, + body: &Value, + opts: &MissionClientOptions, + max_attempts: u32, + ) -> Result { + let path = format!("/missions/{mission_id}/mapobjects/ignored"); + self.post_with_retry(&path, body, opts, max_attempts).await + } } -#[derive(Debug)] -enum RawFetchError { +async fn send_and_collect(req: reqwest::RequestBuilder) -> Result { + let resp = req.send().await.map_err(|e| { + if e.is_timeout() || e.is_connect() { + RawHttpError::Transient(e.to_string()) + } else if e.is_request() || e.is_builder() { + RawHttpError::Permanent(e.to_string()) + } else { + RawHttpError::Transient(e.to_string()) + } + })?; + + let status = resp.status(); + let body = resp + .text() + .await + .map_err(|e| RawHttpError::Transient(format!("read body: {e}")))?; + + if status.is_success() { + return Ok(body); + } + // 5xx and 429 are transient and trigger backoff. + if status.is_server_error() || status == StatusCode::TOO_MANY_REQUESTS { + return Err(RawHttpError::Transient(format!( + "http {status}: {}", + preview(&body) + ))); + } + Err(RawHttpError::Permanent(format!( + "http {status}: {}", + preview(&body) + ))) +} + +/// Lower-level HTTP error surfaced to the per-endpoint wrappers. Each typed +/// public error (`FetchError`, `PostError`, `PullError`, `PushReport`) maps +/// from this set. +#[derive(Debug, thiserror::Error)] +pub enum RawHttpError { + #[error("transient http error: {0}")] Transient(String), + #[error("permanent http error: {0}")] Permanent(String), + #[error("max retries exceeded after {attempts} attempts: {last_reason}")] + MaxRetries { attempts: u32, last_reason: String }, } fn preview(body: &str) -> String { @@ -142,7 +294,7 @@ fn preview(body: &str) -> String { } } -#[allow(dead_code)] // Used for diagnostic output and by future health detail. +#[allow(dead_code)] pub fn default_request_timeout() -> Duration { Duration::from_secs(5) } diff --git a/crates/mission_client/src/internal/mod.rs b/crates/mission_client/src/internal/mod.rs index 95e63e6..578aafe 100644 --- a/crates/mission_client/src/internal/mod.rs +++ b/crates/mission_client/src/internal/mod.rs @@ -1,3 +1,4 @@ +pub mod mapobjects_sync; pub mod missions_api; pub mod retry; pub mod schema; diff --git a/crates/mission_client/src/internal/schema/mapobjects.rs b/crates/mission_client/src/internal/schema/mapobjects.rs new file mode 100644 index 0000000..fb8f21a --- /dev/null +++ b/crates/mission_client/src/internal/schema/mapobjects.rs @@ -0,0 +1,193 @@ +//! MapObjects JSON-schema validation (AZ-646 + AZ-647). +//! +//! Three wire surfaces; each compiles once at first use: +//! - `validate_bundle` — GET /missions/{id}/mapobjects response +//! - `validate_observations_push` — POST /missions/{id}/mapobjects body +//! - `validate_ignored_push` — POST /missions/{id}/mapobjects/ignored body +//! +//! Bundled copies of the shared JSON schemas are `include_str!`'d so the +//! validator can never disagree with a stale copy on disk. + +use std::sync::OnceLock; + +use jsonschema::JSONSchema; +use serde_json::Value; + +pub const BUNDLE_SCHEMA_BYTES: &str = + include_str!("../../../../shared/contracts/mapobjects-bundle.json"); +pub const OBSERVATIONS_SCHEMA_BYTES: &str = + include_str!("../../../../shared/contracts/mapobjects-observations.json"); +pub const IGNORED_SCHEMA_BYTES: &str = + include_str!("../../../../shared/contracts/mapobjects-ignored.json"); + +fn compile(name: &str, raw: &str) -> JSONSchema { + let schema_value: Value = serde_json::from_str(raw) + .unwrap_or_else(|e| panic!("bundled {name} must be valid JSON at compile time: {e}")); + JSONSchema::options() + .compile(&schema_value) + .unwrap_or_else(|e| panic!("bundled {name} must compile as JSON Schema: {e}")) +} + +fn bundle_compiled() -> &'static JSONSchema { + static C: OnceLock = OnceLock::new(); + C.get_or_init(|| compile("mapobjects-bundle.json", BUNDLE_SCHEMA_BYTES)) +} + +fn observations_compiled() -> &'static JSONSchema { + static C: OnceLock = OnceLock::new(); + C.get_or_init(|| compile("mapobjects-observations.json", OBSERVATIONS_SCHEMA_BYTES)) +} + +fn ignored_compiled() -> &'static JSONSchema { + static C: OnceLock = OnceLock::new(); + C.get_or_init(|| compile("mapobjects-ignored.json", IGNORED_SCHEMA_BYTES)) +} + +/// Validate a MapObjects bundle response (AZ-646). Returns the parsed `Value` +/// so the caller can re-deserialise without re-parsing. +pub fn validate_bundle(raw: &str) -> Result { + validate_with(bundle_compiled(), raw) +} + +/// Validate an observations-push body before send (AZ-647). Catches local +/// construction bugs (missing required fields, range violations) so we do not +/// POST garbage and then count it as a transient failure. +pub fn validate_observations_push(raw: &str) -> Result { + validate_with(observations_compiled(), raw) +} + +/// Validate an ignored-push body before send (AZ-647). +pub fn validate_ignored_push(raw: &str) -> Result { + validate_with(ignored_compiled(), raw) +} + +fn validate_with(schema: &JSONSchema, raw: &str) -> Result { + let value: Value = serde_json::from_str(raw).map_err(|e| MapObjectsSchemaError::ParseJson { + message: e.to_string(), + sample: sample_of(raw), + })?; + + let messages: Option> = { + let result = schema.validate(&value); + result + .err() + .map(|errors| errors.map(|e| format!("{e}")).collect()) + }; + + if let Some(messages) = messages { + return Err(MapObjectsSchemaError::Invalid { + messages, + sample: sample_of(raw), + }); + } + Ok(value) +} + +const SAMPLE_CAP: usize = 1024; + +fn sample_of(raw: &str) -> String { + if raw.len() <= SAMPLE_CAP { + raw.to_owned() + } else { + let mut s = raw[..SAMPLE_CAP].to_owned(); + s.push_str(" …"); + s + } +} + +#[derive(Debug, thiserror::Error)] +pub enum MapObjectsSchemaError { + #[error("response was not valid JSON: {message}")] + ParseJson { message: String, sample: String }, + #[error("schema validation failed: {}", messages.join("; "))] + Invalid { + messages: Vec, + sample: String, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn good_bundle() -> String { + serde_json::json!({ + "schema_version": "1.0.0", + "mission_id": "M-bundle", + "bbox": [ + { "latitude": 49.5, "longitude": 31.0, "altitude_m": 0.0 }, + { "latitude": 49.0, "longitude": 31.5, "altitude_m": 0.0 } + ], + "map_objects": [], + "observations": [], + "ignored_items": [], + "as_of": "2026-05-19T12:00:00Z" + }) + .to_string() + } + + #[test] + fn good_bundle_validates() { + // Act + let r = validate_bundle(&good_bundle()); + + // Assert + assert!(r.is_ok(), "validation failed: {:?}", r.err()); + } + + #[test] + fn bundle_missing_required_field_fails() { + // Arrange + let bad = good_bundle().replace("\"mission_id\"", "\"mission_oops\""); + + // Act + let r = validate_bundle(&bad); + + // Assert + assert!(matches!(r, Err(MapObjectsSchemaError::Invalid { .. }))); + } + + #[test] + fn observations_push_validates() { + // Arrange + let body = serde_json::json!({ + "mission_id": "M1", + "observations": [] + }) + .to_string(); + + // Act + let r = validate_observations_push(&body); + + // Assert + assert!(r.is_ok()); + } + + #[test] + fn observations_push_requires_mission_id() { + // Arrange + let body = serde_json::json!({ "observations": [] }).to_string(); + + // Act + let r = validate_observations_push(&body); + + // Assert + assert!(matches!(r, Err(MapObjectsSchemaError::Invalid { .. }))); + } + + #[test] + fn ignored_push_validates() { + // Arrange + let body = serde_json::json!({ + "mission_id": "M1", + "ignored_items": [] + }) + .to_string(); + + // Act + let r = validate_ignored_push(&body); + + // Assert + assert!(r.is_ok()); + } +} diff --git a/crates/mission_client/src/internal/schema/mission.rs b/crates/mission_client/src/internal/schema/mission.rs new file mode 100644 index 0000000..9f868a7 --- /dev/null +++ b/crates/mission_client/src/internal/schema/mission.rs @@ -0,0 +1,116 @@ +//! Mission JSON-schema validation (AZ-644). +//! +//! Bundled copy of `shared/contracts/mission-schema.json` is compiled into the +//! binary via `include_str!`. The shared file is the wire contract co-owned +//! with the external `missions` repo (see `architecture.md §8 Q5`). + +use std::sync::OnceLock; + +use jsonschema::JSONSchema; +use serde_json::Value; + +pub const SCHEMA_BYTES: &str = include_str!("../../../../shared/contracts/mission-schema.json"); + +fn compiled() -> &'static JSONSchema { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA.get_or_init(|| { + let schema_value: Value = serde_json::from_str(SCHEMA_BYTES) + .expect("bundled mission-schema.json must be valid JSON at compile time"); + JSONSchema::options() + .compile(&schema_value) + .expect("bundled mission-schema.json must compile as JSON Schema") + }) +} + +/// Validate raw JSON bytes against the bundled mission schema. Returns the +/// parsed `Value` on success so callers can re-deserialise without re-parsing. +pub fn validate(raw: &str) -> Result { + let value: Value = serde_json::from_str(raw).map_err(|e| SchemaError::ParseJson { + message: e.to_string(), + sample: sample_of(raw), + })?; + + let messages: Option> = { + let result = compiled().validate(&value); + result + .err() + .map(|errors| errors.map(|e| format!("{e}")).collect()) + }; + + if let Some(messages) = messages { + return Err(SchemaError::Invalid { + messages, + sample: sample_of(raw), + }); + } + Ok(value) +} + +const SAMPLE_CAP: usize = 1024; + +fn sample_of(raw: &str) -> String { + if raw.len() <= SAMPLE_CAP { + raw.to_owned() + } else { + let mut s = raw[..SAMPLE_CAP].to_owned(); + s.push_str(" …"); + s + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SchemaError { + #[error("response was not valid JSON: {message}")] + ParseJson { message: String, sample: String }, + #[error("response failed schema validation: {}", messages.join("; "))] + Invalid { + messages: Vec, + sample: String, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + const GOOD: &str = r#"{ + "mission_id": "11111111-2222-3333-4444-555555555555", + "schema_version": "1.0.0", + "items": [ + { "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "kind": "waypoint", + "at": { "latitude": 49.1, "longitude": 31.2, "altitude_m": 100.0 } } + ], + "geofences": [], + "return_point": { "latitude": 49.0, "longitude": 31.0, "altitude_m": 0.0 } + }"#; + + #[test] + fn good_mission_validates() { + // Act + let r = validate(GOOD); + + // Assert + assert!(r.is_ok(), "validation failed: {:?}", r.err()); + } + + #[test] + fn missing_required_field_fails() { + // Arrange + let bad = GOOD.replace("\"mission_id\"", "\"mission_oops\""); + + // Act + let r = validate(&bad); + + // Assert + assert!(matches!(r, Err(SchemaError::Invalid { .. }))); + } + + #[test] + fn malformed_json_fails() { + // Act + let r = validate("{ not json"); + + // Assert + assert!(matches!(r, Err(SchemaError::ParseJson { .. }))); + } +} diff --git a/crates/mission_client/src/internal/schema/mod.rs b/crates/mission_client/src/internal/schema/mod.rs index bd57098..6a7137d 100644 --- a/crates/mission_client/src/internal/schema/mod.rs +++ b/crates/mission_client/src/internal/schema/mod.rs @@ -1,119 +1,8 @@ -//! Mission JSON-schema validation. +//! JSON-schema validators for every wire surface the `mission_client` speaks. //! -//! Bundled copy of `shared/contracts/mission-schema.json` is compiled into the -//! binary via `include_str!`. The shared file is the wire contract co-owned -//! with the external `missions` repo (see `architecture.md §8 Q5`). +//! Each sub-module owns one schema and its `validate` entry point. The schema +//! bytes are compiled into the binary via `include_str!` so the validator can +//! never disagree with a stale copy on disk. -use std::sync::OnceLock; - -use jsonschema::JSONSchema; -use serde_json::Value; - -/// Bundled schema content (canonical wire contract). -pub const SCHEMA_BYTES: &str = include_str!("../../../../shared/contracts/mission-schema.json"); - -fn compiled() -> &'static JSONSchema { - static SCHEMA: OnceLock = OnceLock::new(); - SCHEMA.get_or_init(|| { - let schema_value: Value = serde_json::from_str(SCHEMA_BYTES) - .expect("bundled mission-schema.json must be valid JSON at compile time"); - JSONSchema::options() - .compile(&schema_value) - .expect("bundled mission-schema.json must compile as JSON Schema") - }) -} - -/// Validate raw JSON bytes against the bundled schema. -/// -/// Returns the parsed JSON `Value` on success so callers can re-deserialise -/// it into the typed `Mission` without re-parsing. -pub fn validate(raw: &str) -> Result { - let value: Value = serde_json::from_str(raw).map_err(|e| SchemaError::ParseJson { - message: e.to_string(), - sample: sample_of(raw), - })?; - - let messages: Option> = { - let result = compiled().validate(&value); - result - .err() - .map(|errors| errors.map(|e| format!("{e}")).collect()) - }; - - if let Some(messages) = messages { - return Err(SchemaError::Invalid { - messages, - sample: sample_of(raw), - }); - } - Ok(value) -} - -const SAMPLE_CAP: usize = 1024; - -fn sample_of(raw: &str) -> String { - if raw.len() <= SAMPLE_CAP { - raw.to_owned() - } else { - let mut s = raw[..SAMPLE_CAP].to_owned(); - s.push_str(" …"); - s - } -} - -#[derive(Debug, thiserror::Error)] -pub enum SchemaError { - #[error("response was not valid JSON: {message}")] - ParseJson { message: String, sample: String }, - #[error("response failed schema validation: {}", messages.join("; "))] - Invalid { - messages: Vec, - sample: String, - }, -} - -#[cfg(test)] -mod tests { - use super::*; - - const GOOD: &str = r#"{ - "mission_id": "11111111-2222-3333-4444-555555555555", - "schema_version": "1.0.0", - "items": [ - { "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "kind": "waypoint", - "at": { "latitude": 49.1, "longitude": 31.2, "altitude_m": 100.0 } } - ], - "geofences": [], - "return_point": { "latitude": 49.0, "longitude": 31.0, "altitude_m": 0.0 } - }"#; - - #[test] - fn good_mission_validates() { - // Act - let r = validate(GOOD); - - // Assert - assert!(r.is_ok(), "validation failed: {:?}", r.err()); - } - - #[test] - fn missing_required_field_fails() { - // Arrange - let bad = GOOD.replace("\"mission_id\"", "\"mission_oops\""); - - // Act - let r = validate(&bad); - - // Assert - assert!(matches!(r, Err(SchemaError::Invalid { .. }))); - } - - #[test] - fn malformed_json_fails() { - // Act - let r = validate("{ not json"); - - // Assert - assert!(matches!(r, Err(SchemaError::ParseJson { .. }))); - } -} +pub mod mapobjects; +pub mod mission; diff --git a/crates/mission_client/src/lib.rs b/crates/mission_client/src/lib.rs index e476783..b0e22c2 100644 --- a/crates/mission_client/src/lib.rs +++ b/crates/mission_client/src/lib.rs @@ -1,29 +1,38 @@ //! `mission_client` — REST client to the external `missions` API. //! //! Public surface (per `module-layout.md`): [`MissionClient`], -//! [`MissionClientHandle`], the typed [`Mission`] DTO, [`FetchError`], and -//! [`MissionClientOptions`]. +//! [`MissionClientHandle`], the typed [`Mission`] DTO, [`FetchError`], +//! [`PostError`], [`PullError`], [`PushReport`], [`PerEndpointStatus`], +//! [`MapObjectsDiff`], [`MissionUpdateAck`], and [`MissionClientOptions`]. //! -//! Real implementation tasks: AZ-644 (pull + schema, this file), AZ-645 -//! (middle-waypoint POST), AZ-646 (mapobjects pull), AZ-647 (mapobjects push). +//! Real implementation tasks: +//! - AZ-644 — pull + schema (initial scaffold + GET /missions/{id}) +//! - AZ-645 — POST /missions/{id}/middle-waypoint +//! - AZ-646 — GET /missions/{id}/mapobjects (bundle pre-flight pull) +//! - AZ-647 — POST observations + ignored with durable disk queue and +//! crash-recovery replay. mod internal; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use serde::{Deserialize, Serialize}; -use shared::error::{AutopilotError, Result}; -use shared::health::ComponentHealth; -use shared::models::mapobject::MapObjectsBundle; +use shared::error::AutopilotError; +use shared::health::{ComponentHealth, HealthLevel}; +use shared::models::mapobject::{IgnoredItem, MapObjectObservation, MapObjectsBundle}; use shared::models::mission::{Coordinate, Geofence, MissionItem}; use uuid::Uuid; -use internal::missions_api::HttpClient; +use internal::mapobjects_sync::{pull as mapobjects_pull, push as mapobjects_push}; +use internal::missions_api::{HttpClient, RawHttpError}; const NAME: &str = "mission_client"; +// ─── Public DTOs ────────────────────────────────────────────────────────────── + /// Mission DTO returned by `pull_mission`. Shape matches the JSON wire schema /// in `shared/contracts/mission-schema.json`. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -35,7 +44,32 @@ pub struct Mission { pub return_point: Coordinate, } -/// Errors surfaced by `MissionClientHandle::pull_mission`. +/// Ack returned by [`MissionClientHandle::post_middle_waypoint`] on a +/// successful POST. The shape is intentionally permissive — only `mission_id` +/// is required; servers may carry additional bookkeeping like `revision` or +/// `accepted_at` without breaking the client. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MissionUpdateAck { + pub mission_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub revision: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub accepted_at: Option, +} + +/// Pass diff handed to [`MissionClientHandle::push_mapobjects_diff`]. +/// `mapobjects_store` (AZ-665/AZ-666/AZ-667) owns the construction of this +/// struct from `pending_observations` + `pending_ignored`; the client owns +/// the wire format and the durable queue. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MapObjectsDiff { + pub observations: Vec, + pub ignored_items: Vec, +} + +// ─── Errors ─────────────────────────────────────────────────────────────────── + +/// Errors surfaced by [`MissionClientHandle::pull_mission`]. #[derive(Debug, thiserror::Error)] pub enum FetchError { /// JSON body did not match the bundled `mission-schema`. Includes a @@ -71,17 +105,166 @@ impl From for AutopilotError { } } -/// Tunables for the missions-API client. AZ-644 §NFR defaults: 5 attempts, -/// 200 ms base / 5 s cap, 5 s startup-fetch budget. +/// Errors surfaced by [`MissionClientHandle::post_middle_waypoint`]. +#[derive(Debug, thiserror::Error)] +pub enum PostError { + /// 4xx (excluding 429) — the server rejected the payload; retrying will + /// not help. + #[error("permanent post failure: {0}")] + Permanent(String), + /// 5xx / timeout / connect-error budget exhausted. + #[error("max retries exceeded after {attempts} attempts: {last_reason}")] + MaxRetriesExceeded { attempts: u32, last_reason: String }, + /// Ack body did not deserialise into [`MissionUpdateAck`]. + #[error("malformed ack: {0}")] + MalformedAck(String), + /// Local bug. + #[error("internal error: {0}")] + Internal(String), +} + +impl From for AutopilotError { + fn from(e: PostError) -> Self { + match e { + PostError::Permanent(s) => AutopilotError::Network(s), + PostError::MaxRetriesExceeded { + attempts, + last_reason, + } => AutopilotError::Network(format!( + "max retries exceeded after {attempts} attempts: {last_reason}" + )), + PostError::MalformedAck(s) => AutopilotError::Protocol(s), + PostError::Internal(s) => AutopilotError::Internal(s), + } + } +} + +/// Errors surfaced by [`MissionClientHandle::pull_mapobjects`]. +#[derive(Debug, thiserror::Error)] +pub enum PullError { + /// API is unreachable (timeout, connect-refused, transient outage) AND + /// retries did not recover — but the failure could not be classified as + /// strictly permanent. + #[error("mapobjects api unreachable: {0}")] + Unreachable(String), + /// Response was 200 but the body did not validate against + /// `shared/contracts/mapobjects-bundle.json`. + #[error("mapobjects bundle schema invalid: {}", messages.join("; "))] + SchemaInvalid { + messages: Vec, + sample: String, + }, + /// Retried up to `max_attempts` without success. + #[error("max retries exceeded after {attempts} attempts: {last_reason}")] + MaxRetriesExceeded { attempts: u32, last_reason: String }, + /// Local bug. + #[error("internal error: {0}")] + Internal(String), +} + +/// Per-endpoint outcome in a [`PushReport`]. The push function always returns +/// a complete report — never `Result` — because partial success is a +/// first-class outcome (per AC-2 of AZ-647). +#[derive(Debug, Clone)] +pub enum PerEndpointStatus { + /// 2xx within the retry budget. Disk file has been updated to remove this + /// endpoint's payload. + Success, + /// 4xx (excluding 429). Retrying will not help. + Permanent { reason: String }, + /// Retry budget exhausted; the payload remains on disk for manual replay. + MaxRetriesExceeded { attempts: u32, last_reason: String }, +} + +impl PerEndpointStatus { + pub fn is_success(&self) -> bool { + matches!(self, Self::Success) + } +} + +/// Result of a single [`MissionClientHandle::push_mapobjects_diff`] call — +/// per-endpoint outcomes plus the mission id for cross-referencing with the +/// disk queue. +#[derive(Debug, Clone)] +pub struct PushReport { + pub mission_id: String, + pub observations: PerEndpointStatus, + pub ignored: PerEndpointStatus, +} + +impl PushReport { + /// Overall sync state — `Synced` only when BOTH endpoints succeeded. + pub fn sync_state(&self) -> SyncState { + match (&self.observations, &self.ignored) { + (PerEndpointStatus::Success, PerEndpointStatus::Success) => SyncState::Synced, + _ => SyncState::Degraded, + } + } + + /// Constructed before the network call ever happens — the local diff did + /// not pass schema validation. Both endpoints share the same reason. + fn local_invalid(reason: String) -> Self { + Self { + mission_id: String::new(), + observations: PerEndpointStatus::Permanent { + reason: reason.clone(), + }, + ignored: PerEndpointStatus::Permanent { reason }, + } + } + + /// Write-ahead disk write failed — there is no point attempting the + /// network call because crash recovery would be broken anyway. + fn disk_failure(reason: String) -> Self { + let r = format!("write-ahead persistence failed: {reason}"); + Self { + mission_id: String::new(), + observations: PerEndpointStatus::Permanent { reason: r.clone() }, + ignored: PerEndpointStatus::Permanent { reason: r }, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncState { + Synced, + Degraded, +} + +impl SyncState { + fn label(&self) -> &'static str { + match self { + Self::Synced => "synced", + Self::Degraded => "degraded", + } + } +} + +// ─── Options ────────────────────────────────────────────────────────────────── + +/// Tunables for the missions-API client. Defaults are tuned for the +/// production single-airframe deployment per the AZ-644/645/646/647 NFRs. #[derive(Debug, Clone)] pub struct MissionClientOptions { pub endpoint: String, pub bearer_token: Option, + /// GET retry budget (AZ-644 pull, AZ-646 mapobjects pull). 5 attempts / + /// 200 ms base / 5 s cap per the AZ-644 §NFR table. pub max_attempts: u32, + /// POST retry budget for the middle-waypoint endpoint (AZ-645). 3 attempts + /// per the AZ-645 §Outcome bullet. + pub post_max_attempts: u32, + /// POST retry budget per endpoint for the post-flight push (AZ-647). High + /// by default — at the 1 s base / 1 h cap below this is ≈ 24 h. + pub push_max_attempts: u32, pub backoff_base: Duration, pub backoff_cap: Duration, pub request_timeout: Duration, pub connect_timeout: Duration, + /// Where the durable mapobjects-push queue is rooted (AZ-647). The full + /// path is `${state_dir}/mapobjects_push/.json`. Defaults to + /// `./state` if the caller does not override. + pub state_dir: PathBuf, } impl MissionClientOptions { @@ -90,20 +273,40 @@ impl MissionClientOptions { endpoint: endpoint.into(), bearer_token: None, max_attempts: 5, + post_max_attempts: 3, + push_max_attempts: 24, backoff_base: Duration::from_millis(200), backoff_cap: Duration::from_secs(5), request_timeout: Duration::from_secs(5), connect_timeout: Duration::from_secs(2), + state_dir: PathBuf::from("state"), } } } +// ─── Internal state ─────────────────────────────────────────────────────────── + #[derive(Debug, Default)] struct ClientState { + // AZ-644 last_fetch_unix_s: AtomicU64, fetch_errors_total: AtomicU64, - last_schema_version: std::sync::Mutex>, - last_connection_state: std::sync::Mutex, + last_schema_version: Mutex>, + last_connection_state: Mutex, + + // AZ-645 + last_middle_waypoint_post_status: Mutex, + + // AZ-646 + mapobjects_pull_state: Mutex, + last_mapobjects_pull_unix_s: AtomicU64, + + // AZ-647 + mapobjects_push_pending: AtomicBool, + last_push_unix_s: AtomicU64, + last_observations_push_error: Mutex>, + last_ignored_push_error: Mutex>, + last_push_sync_state: Mutex>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -124,8 +327,44 @@ impl ConnectionState { } } -/// Public client. Build once at startup and pass the [`MissionClientHandle`] -/// to other components. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum EndpointHealth { + #[default] + Unknown, + Ok, + Error, +} + +impl EndpointHealth { + fn label(&self) -> &'static str { + match self { + Self::Unknown => "unknown", + Self::Ok => "ok", + Self::Error => "error", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum PullHealth { + #[default] + Unknown, + Synced, + Failed, +} + +impl PullHealth { + fn label(&self) -> &'static str { + match self { + Self::Unknown => "unknown", + Self::Synced => "synced", + Self::Failed => "failed", + } + } +} + +// ─── Public client ──────────────────────────────────────────────────────────── + #[derive(Debug)] pub struct MissionClient { options: MissionClientOptions, @@ -136,6 +375,16 @@ pub struct MissionClient { impl MissionClient { pub fn new(options: MissionClientOptions) -> std::result::Result { let http = HttpClient::new(&options)?; + if let Err(e) = internal::mapobjects_sync::queue::ensure_dir(&options.state_dir) { + // Failure here is non-fatal at construction — it would only break + // AZ-647. Surface a warning and let the caller hit the disk error + // path if/when push is invoked. + tracing::warn!( + component = "mission_client", + error = %e, + "could not ensure state_dir/mapobjects_push at startup" + ); + } Ok(Self { options, http, @@ -161,6 +410,8 @@ pub struct MissionClientHandle { } impl MissionClientHandle { + // ───── AZ-644 ───────────────────────────────────────────────────────────── + /// Fetch + validate a mission by id. Implements bounded exponential /// backoff and rejects schema-invalid responses without a silent downcast. pub async fn pull_mission(&self, mission_id: &str) -> std::result::Result { @@ -186,24 +437,138 @@ impl MissionClientHandle { } } - pub async fn post_middle_waypoint(&self, _mission_id: &str, _at: Coordinate) -> Result<()> { - Err(AutopilotError::NotImplemented( - "mission_client::post_middle_waypoint (AZ-645)", - )) + // ───── AZ-645 ───────────────────────────────────────────────────────────── + + /// POST the patched mission (operator-confirmed middle waypoint inserted) + /// to `POST /missions/{id}/middle-waypoint`. Bounded retry; surfaces a + /// typed error so `mission_executor` can decide whether to halt, RTL, or + /// continue with the in-memory mission. + pub async fn post_middle_waypoint( + &self, + mission_id: &str, + patched: &Mission, + ) -> std::result::Result { + let body = serde_json::to_value(patched) + .map_err(|e| PostError::Internal(format!("serialise patched mission: {e}")))?; + + let res = self + .http + .post_middle_waypoint_raw(mission_id, &body, &self.options) + .await; + match res { + Ok(raw) => { + let ack: MissionUpdateAck = parse_ack(&raw, mission_id)?; + *self.state.last_middle_waypoint_post_status.lock().unwrap() = EndpointHealth::Ok; + Ok(ack) + } + Err(RawHttpError::Permanent(reason)) => { + *self.state.last_middle_waypoint_post_status.lock().unwrap() = + EndpointHealth::Error; + Err(PostError::Permanent(reason)) + } + Err(RawHttpError::MaxRetries { + attempts, + last_reason, + }) => { + *self.state.last_middle_waypoint_post_status.lock().unwrap() = + EndpointHealth::Error; + Err(PostError::MaxRetriesExceeded { + attempts, + last_reason, + }) + } + Err(RawHttpError::Transient(reason)) => { + *self.state.last_middle_waypoint_post_status.lock().unwrap() = + EndpointHealth::Error; + Err(PostError::Permanent(reason)) + } + } } - pub async fn pull_mapobjects(&self, _mission_id: &str) -> Result { - Err(AutopilotError::NotImplemented( - "mission_client::pull_mapobjects (AZ-646)", - )) + // ───── AZ-646 ───────────────────────────────────────────────────────────── + + /// Pre-flight GET of the MapObjects bundle for a mission. Schema-validates + /// before deserialise; on any failure the typed [`PullError`] is surfaced + /// to `mission_executor`'s F9 BIT path — never silent. + pub async fn pull_mapobjects( + &self, + mission_id: &str, + ) -> std::result::Result { + let res = mapobjects_pull::pull(&self.http, mission_id, &self.options).await; + match &res { + Ok(_) => { + *self.state.mapobjects_pull_state.lock().unwrap() = PullHealth::Synced; + self.state + .last_mapobjects_pull_unix_s + .store(now_unix_s(), Ordering::Relaxed); + } + Err(_) => { + *self.state.mapobjects_pull_state.lock().unwrap() = PullHealth::Failed; + } + } + res } - pub async fn push_mapobjects(&self, _bundle: MapObjectsBundle) -> Result<()> { - Err(AutopilotError::NotImplemented( - "mission_client::push_mapobjects (AZ-647)", - )) + // ───── AZ-647 ───────────────────────────────────────────────────────────── + + /// Post-flight push of the pass diff. The diff is persisted to disk + /// BEFORE the network call (write-ahead) and per-endpoint retry runs + /// independently. Partial success is reported through the [`PushReport`] + /// — the successful endpoint's payload is removed from disk while the + /// failing endpoint's payload remains for manual or crash-recovery replay. + pub async fn push_mapobjects_diff(&self, mission_id: &str, diff: MapObjectsDiff) -> PushReport { + self.state + .mapobjects_push_pending + .store(true, Ordering::Relaxed); + let mut report = mapobjects_push::push( + &self.http, + &self.options.state_dir, + mission_id, + diff, + &self.options, + ) + .await; + if report.mission_id.is_empty() { + // local_invalid / disk_failure paths leave it blank; fill in for + // observability. + report.mission_id = mission_id.to_owned(); + } + let sync = report.sync_state(); + let still_pending = sync == SyncState::Degraded; + self.state + .mapobjects_push_pending + .store(still_pending, Ordering::Relaxed); + self.state + .last_push_unix_s + .store(now_unix_s(), Ordering::Relaxed); + *self.state.last_push_sync_state.lock().unwrap() = Some(sync); + *self.state.last_observations_push_error.lock().unwrap() = + endpoint_error(&report.observations); + *self.state.last_ignored_push_error.lock().unwrap() = endpoint_error(&report.ignored); + report } + /// Crash-recovery sweep. On startup the caller (mission_executor BIT) + /// MUST call this before starting BIT for any new mission so that any + /// pending diff from a previously terminated mission is drained first. + pub async fn recover_pending_pushes(&self) -> Vec { + let reports = + mapobjects_push::recover_pending(&self.http, &self.options.state_dir, &self.options) + .await; + // After recovery, re-check whether anything is still pending on disk + // (could still be — a 24 h-budget endpoint that exhausted). + let still_pending = !reports.is_empty() + && reports + .iter() + .any(|r| r.sync_state() == SyncState::Degraded); + self.state + .mapobjects_push_pending + .store(still_pending, Ordering::Relaxed); + reports + } + + // ───── Health ───────────────────────────────────────────────────────────── + pub fn health(&self) -> ComponentHealth { let conn = *self.state.last_connection_state.lock().unwrap(); let last_fetch = self.state.last_fetch_unix_s.load(Ordering::Relaxed); @@ -215,25 +580,107 @@ impl MissionClientHandle { .unwrap() .clone() .unwrap_or_else(|| "none".to_owned()); + let mid_wp = *self.state.last_middle_waypoint_post_status.lock().unwrap(); + let pull_state = *self.state.mapobjects_pull_state.lock().unwrap(); + let last_pull = self + .state + .last_mapobjects_pull_unix_s + .load(Ordering::Relaxed); + let push_pending = self.state.mapobjects_push_pending.load(Ordering::Relaxed); + let last_push = self.state.last_push_unix_s.load(Ordering::Relaxed); + let sync = *self.state.last_push_sync_state.lock().unwrap(); + let detail = format!( - "last_fetch_ts={} fetch_errors_total={} schema_version={} connection_state={}", - if last_fetch == 0 { - "none".into() - } else { - last_fetch.to_string() - }, + "last_fetch_ts={} fetch_errors_total={} schema_version={} connection_state={} \ + last_middle_waypoint_post_status={} \ + mapobjects_pull_state={} last_mapobjects_pull_ts={} \ + mapobjects_push_pending={} last_push_ts={} push_sync_state={}", + unix_or_none(last_fetch), errors, schema_version, conn.label(), + mid_wp.label(), + pull_state.label(), + unix_or_none(last_pull), + push_pending, + unix_or_none(last_push), + sync.map(|s| s.label()).unwrap_or("unknown"), ); - match conn { - ConnectionState::Ok => ComponentHealth::green(NAME), - ConnectionState::Error => ComponentHealth::red(NAME, detail), - ConnectionState::Unknown => ComponentHealth::yellow(NAME, detail), + + // Always populate detail — even on Green — because the AZ-645/646/647 + // task specs require named health fields (`last_middle_waypoint_post_status`, + // `mapobjects_pull_state`, `mapobjects_push_pending`, ...) to be + // observable through `/health`, not only when the system is degraded. + let level = aggregate_level(conn, mid_wp, pull_state, push_pending, sync); + ComponentHealth { + level, + component: NAME, + detail: Some(detail), } } } +fn parse_ack(body: &str, mission_id: &str) -> std::result::Result { + if body.trim().is_empty() { + // Some servers return 204; synthesise a minimal ack so the caller + // does not have to disambiguate. + return Ok(MissionUpdateAck { + mission_id: mission_id.to_owned(), + revision: None, + accepted_at: None, + }); + } + serde_json::from_str(body).map_err(|e| PostError::MalformedAck(e.to_string())) +} + +fn endpoint_error(s: &PerEndpointStatus) -> Option { + match s { + PerEndpointStatus::Success => None, + PerEndpointStatus::Permanent { reason } => Some(format!("permanent: {reason}")), + PerEndpointStatus::MaxRetriesExceeded { + attempts, + last_reason, + } => Some(format!("max_retries({attempts}) exceeded: {last_reason}")), + } +} + +fn aggregate_level( + conn: ConnectionState, + mid_wp: EndpointHealth, + pull_state: PullHealth, + push_pending: bool, + sync: Option, +) -> HealthLevel { + let any_red = matches!(conn, ConnectionState::Error) + || matches!(mid_wp, EndpointHealth::Error) + || matches!(pull_state, PullHealth::Failed) + || matches!(sync, Some(SyncState::Degraded)) + || push_pending; + if any_red { + return HealthLevel::Red; + } + let any_unknown = matches!(conn, ConnectionState::Unknown) + && matches!(mid_wp, EndpointHealth::Unknown) + && matches!(pull_state, PullHealth::Unknown) + && sync.is_none() + && !push_pending; + // If everything is at default Unknown and there's been no traffic, surface + // Yellow so callers can tell the difference between "never used" and + // "everything green". + if any_unknown { + return HealthLevel::Yellow; + } + HealthLevel::Green +} + +fn unix_or_none(ts: u64) -> String { + if ts == 0 { + "none".into() + } else { + ts.to_string() + } +} + fn now_unix_s() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -247,10 +694,53 @@ mod tests { #[test] fn fetch_error_maps_to_autopilot_error() { + // Act let e: AutopilotError = FetchError::Permanent("boom".into()).into(); + + // Assert match e { AutopilotError::Network(s) => assert!(s.contains("boom")), other => panic!("expected Network, got {other:?}"), } } + + #[test] + fn post_error_max_retries_maps_to_network() { + // Act + let e: AutopilotError = PostError::MaxRetriesExceeded { + attempts: 3, + last_reason: "http 500".into(), + } + .into(); + + // Assert + match e { + AutopilotError::Network(s) => { + assert!(s.contains("max retries exceeded after 3 attempts")); + assert!(s.contains("http 500")); + } + other => panic!("expected Network, got {other:?}"), + } + } + + #[test] + fn push_report_sync_state_requires_both_endpoints() { + // Arrange + let only_obs = PushReport { + mission_id: "M1".into(), + observations: PerEndpointStatus::Success, + ignored: PerEndpointStatus::Permanent { + reason: "503 budget".into(), + }, + }; + let both = PushReport { + mission_id: "M1".into(), + observations: PerEndpointStatus::Success, + ignored: PerEndpointStatus::Success, + }; + + // Assert + assert_eq!(only_obs.sync_state(), SyncState::Degraded); + assert_eq!(both.sync_state(), SyncState::Synced); + } } diff --git a/crates/mission_client/tests/mapobjects_pull.rs b/crates/mission_client/tests/mapobjects_pull.rs new file mode 100644 index 0000000..8d10633 --- /dev/null +++ b/crates/mission_client/tests/mapobjects_pull.rs @@ -0,0 +1,217 @@ +//! AZ-646 integration tests driven by `wiremock`. +//! +//! Coverage: +//! - AC-1: happy-path GET returns `Ok(MapObjectsBundle)`; health +//! `mapobjects_pull_state` is `synced`. +//! - AC-2: schema-invalid bundle (missing required field) returns +//! `Err(SchemaInvalid)` — no silent acceptance. +//! - AC-3: unreachable server (a port that refuses connections) returns +//! `Err(Unreachable)` / `MaxRetriesExceeded`; health goes Red. +//! - AC-4: a fixture sized for a 30 km × 30 km mission area validates and +//! deserialises within the 30 s budget on loopback. We use 1 000 objects + +//! 1 000 ignored items as the proxy — a real 30×30 km mission is bounded by +//! this order of magnitude per the AZ-666 ignored-cap NFR. + +use std::time::{Duration, Instant}; + +use chrono::{TimeZone, Utc}; +use mission_client::{MissionClient, MissionClientOptions, PullError}; +use shared::health::HealthLevel; +use uuid::Uuid; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn options_for( + endpoint: &str, + max_attempts: u32, + tmp_dir: &std::path::Path, +) -> MissionClientOptions { + let mut o = MissionClientOptions::new(endpoint); + o.max_attempts = max_attempts; + o.backoff_base = Duration::from_millis(10); + o.backoff_cap = Duration::from_millis(50); + o.request_timeout = Duration::from_secs(2); + o.connect_timeout = Duration::from_secs(1); + o.state_dir = tmp_dir.to_path_buf(); + o +} + +fn good_bundle(mission_id: &str, objects: usize, ignored: usize) -> serde_json::Value { + let map_objects: Vec<_> = (0..objects) + .map(|i| { + serde_json::json!({ + "h3_cell": 10_000 + i as u64, + "mgrs_key": format!("MGRS-{i}"), + "class": "tank", + "class_group": "armor", + "gps_lat": 49.0 + (i as f64) * 0.001, + "gps_lon": 31.0 + (i as f64) * 0.001, + "size_width_m": 3.5, + "size_length_m": 7.0, + "confidence": 0.85, + "first_seen": "2026-05-19T11:00:00Z", + "last_seen": "2026-05-19T11:30:00Z", + "mission_id": mission_id, + "source": "central_pulled", + "pending_upload": false + }) + }) + .collect(); + let ignored_items: Vec<_> = (0..ignored) + .map(|i| { + serde_json::json!({ + "id": Uuid::from_u128(0xdead_0000 + i as u128).to_string(), + "mgrs": format!("MGRS-IG-{i}"), + "h3_cell": 99_000 + i as u64, + "class_group": "civilian", + "decline_time": "2026-05-19T11:15:00Z", + "mission_id": mission_id, + "retention_scope": "mission", + "source": "central_pulled", + "pending_upload": false + }) + }) + .collect(); + serde_json::json!({ + "schema_version": "1.0.0", + "mission_id": mission_id, + "bbox": [ + { "latitude": 49.5, "longitude": 31.0, "altitude_m": 0.0 }, + { "latitude": 49.0, "longitude": 31.5, "altitude_m": 0.0 } + ], + "map_objects": map_objects, + "observations": [], + "ignored_items": ignored_items, + "as_of": "2026-05-19T12:00:00Z", + "freshness": "fresh" + }) +} + +#[tokio::test] +async fn ac1_happy_path_pull() { + // Arrange + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-mo-1"; + Mock::given(method("GET")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with( + ResponseTemplate::new(200).set_body_string(good_bundle(mission_id, 3, 1).to_string()), + ) + .expect(1) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let bundle = h.pull_mapobjects(mission_id).await.expect("happy pull"); + + // Assert + assert_eq!(bundle.mission_id, mission_id); + assert_eq!(bundle.map_objects.len(), 3); + assert_eq!(bundle.ignored_items.len(), 1); + let detail = h.health().detail.unwrap_or_default(); + assert!( + detail.contains("mapobjects_pull_state=synced"), + "health detail did not record synced: {detail}" + ); +} + +#[tokio::test] +async fn ac2_schema_invalid_is_rejected() { + // Arrange: 200 OK but the bundle is missing the required `mission_id`. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mut bad = good_bundle("M-bad", 1, 0); + let obj = bad.as_object_mut().unwrap(); + obj.remove("mission_id"); + Mock::given(method("GET")) + .and(path("/missions/M-bad/mapobjects")) + .respond_with(ResponseTemplate::new(200).set_body_string(bad.to_string())) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let err = h.pull_mapobjects("M-bad").await.unwrap_err(); + + // Assert + match err { + PullError::SchemaInvalid { messages, sample } => { + assert!(messages.iter().any(|m| m.contains("mission_id"))); + assert!(!sample.is_empty()); + } + other => panic!("expected SchemaInvalid, got {other:?}"), + } + assert_eq!(h.health().level, HealthLevel::Red); +} + +#[tokio::test] +async fn ac3_unreachable_surfaces_failure() { + // Arrange: a port the OS will refuse to connect to. + let tmp = tempfile::tempdir().unwrap(); + // Bind a TcpListener to discover a free port, then immediately drop it so + // connect() refuses. + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + drop(listener); + let endpoint = format!("http://{addr}"); + let client = MissionClient::new(options_for(&endpoint, 2, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let err = h.pull_mapobjects("M-unreach").await.unwrap_err(); + + // Assert + match err { + PullError::Unreachable(reason) => { + assert!(!reason.is_empty(), "Unreachable reason should not be empty"); + } + PullError::MaxRetriesExceeded { attempts, .. } => { + assert_eq!(attempts, 2); + } + other => panic!("expected Unreachable or MaxRetriesExceeded, got {other:?}"), + } + assert_eq!(h.health().level, HealthLevel::Red); +} + +#[tokio::test] +async fn ac4_large_bundle_within_budget() { + // Arrange: 1 000 map objects + 1 000 ignored items as the proxy for a + // 30 km × 30 km mission area on loopback. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-large"; + Mock::given(method("GET")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with( + ResponseTemplate::new(200) + .set_body_string(good_bundle(mission_id, 1_000, 1_000).to_string()), + ) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let started = Instant::now(); + let bundle = h.pull_mapobjects(mission_id).await.expect("happy pull"); + let elapsed = started.elapsed(); + + // Assert + assert_eq!(bundle.map_objects.len(), 1000); + assert_eq!(bundle.ignored_items.len(), 1000); + assert!( + elapsed < Duration::from_secs(30), + "pull took {elapsed:?}; budget is 30 s" + ); + // Sanity-touch chrono to silence dead_code on the import; the wallclock + // dates inside the bundle are validated by the bundle schema's + // `format: date-time` constraint. + let _ = Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap(); +} diff --git a/crates/mission_client/tests/mapobjects_push.rs b/crates/mission_client/tests/mapobjects_push.rs new file mode 100644 index 0000000..43e8389 --- /dev/null +++ b/crates/mission_client/tests/mapobjects_push.rs @@ -0,0 +1,344 @@ +//! AZ-647 integration tests driven by `wiremock`. +//! +//! Coverage: +//! - AC-1: happy-path push — both endpoints 200, disk file cleared, +//! `sync_state = synced`. +//! - AC-2: partial success — `/mapobjects` 200 + `/mapobjects/ignored` 503; +//! the disk file is rewritten to hold ONLY the ignored portion. +//! - AC-3: persistent failure — both endpoints 503 across the retry budget; +//! `sync_state = degraded`, disk file remains, manual-replay warning +//! observable on health. +//! - AC-4: crash-recovery push at startup — `recover_pending_pushes` finds +//! the residual file from a previously terminated mission and replays it. +//! - AC-5: 60-min mission proxy push within budget — 5 000 observations +//! pushed in well under 2 min on loopback. + +use std::time::{Duration, Instant}; + +use chrono::{TimeZone, Utc}; +use mission_client::{ + MapObjectsDiff, MissionClient, MissionClientOptions, PerEndpointStatus, SyncState, +}; +use shared::models::mapobject::{ + DiffKind, IgnoredItem, IgnoredItemSource, MapObjectObservation, RetentionScope, +}; +use uuid::Uuid; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn options_for( + endpoint: &str, + push_attempts: u32, + tmp_dir: &std::path::Path, +) -> MissionClientOptions { + let mut o = MissionClientOptions::new(endpoint); + o.max_attempts = 3; + o.post_max_attempts = 3; + o.push_max_attempts = push_attempts; + o.backoff_base = Duration::from_millis(5); + o.backoff_cap = Duration::from_millis(20); + o.request_timeout = Duration::from_secs(2); + o.connect_timeout = Duration::from_secs(1); + o.state_dir = tmp_dir.to_path_buf(); + o +} + +fn obs(mission_id: &str, i: u128) -> MapObjectObservation { + MapObjectObservation { + id: Uuid::from_u128(0xa0_00_00 + i), + h3_cell: 1_000 + i as u64, + class: "tank".into(), + class_group: "armor".into(), + mission_id: mission_id.into(), + uav_id: "uav-test".into(), + observed_at_monotonic_ns: 1_000_000 + i as u64, + observed_at_wallclock: Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap(), + gps_lat: 49.0 + (i as f64) * 1e-4, + gps_lon: 31.0 + (i as f64) * 1e-4, + mgrs: format!("MGRS-{i}"), + size_width_m: 3.0, + size_length_m: 6.0, + confidence: 0.92, + diff_kind: DiffKind::New, + photo_ref: None, + raw_evidence: None, + } +} + +fn ignored(mission_id: &str, i: u128) -> IgnoredItem { + IgnoredItem { + id: Uuid::from_u128(0xb0_00_00 + i), + mgrs: format!("MGRS-IG-{i}"), + h3_cell: 90_000 + i as u64, + class_group: "civilian".into(), + decline_time: Utc.with_ymd_and_hms(2026, 5, 19, 12, 0, 0).unwrap(), + operator_id: None, + mission_id: mission_id.into(), + retention_scope: RetentionScope::Mission, + expires_at: None, + source: IgnoredItemSource::LocalAppended, + pending_upload: true, + } +} + +#[tokio::test] +async fn ac1_happy_path_push_clears_disk() { + // Arrange + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-happy"; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects/ignored"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + let diff = MapObjectsDiff { + observations: vec![obs(mission_id, 0), obs(mission_id, 1)], + ignored_items: vec![ignored(mission_id, 0)], + }; + + // Act + let report = h.push_mapobjects_diff(mission_id, diff).await; + + // Assert + assert!(matches!(report.observations, PerEndpointStatus::Success)); + assert!(matches!(report.ignored, PerEndpointStatus::Success)); + assert_eq!(report.sync_state(), SyncState::Synced); + let disk_file = tmp + .path() + .join("mapobjects_push") + .join(format!("{mission_id}.json")); + assert!( + !disk_file.exists(), + "disk file should be deleted on full success" + ); +} + +#[tokio::test] +async fn ac2_partial_success_retains_only_failing_endpoint() { + // Arrange: observations → 200, ignored → 503 every time. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-partial"; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects/ignored"))) + .respond_with(ResponseTemplate::new(503)) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + let diff = MapObjectsDiff { + observations: vec![obs(mission_id, 10), obs(mission_id, 11)], + ignored_items: vec![ignored(mission_id, 10)], + }; + + // Act + let report = h.push_mapobjects_diff(mission_id, diff).await; + + // Assert + assert!(matches!(report.observations, PerEndpointStatus::Success)); + assert!(matches!( + report.ignored, + PerEndpointStatus::MaxRetriesExceeded { .. } + )); + assert_eq!(report.sync_state(), SyncState::Degraded); + + // Disk file should hold ONLY the ignored portion. + let disk_file = tmp + .path() + .join("mapobjects_push") + .join(format!("{mission_id}.json")); + assert!(disk_file.exists(), "disk file should remain for retry"); + let body: serde_json::Value = + serde_json::from_slice(&std::fs::read(&disk_file).unwrap()).unwrap(); + assert!( + body["observations"].as_array().unwrap().is_empty(), + "observations should be cleared from disk on partial success" + ); + assert_eq!( + body["ignored_items"].as_array().unwrap().len(), + 1, + "ignored payload should remain on disk" + ); +} + +#[tokio::test] +async fn ac3_persistent_failure_marks_degraded_and_keeps_file() { + // Arrange: both endpoints return 503 forever. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-degraded"; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with(ResponseTemplate::new(503)) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects/ignored"))) + .respond_with(ResponseTemplate::new(503)) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 2, tmp.path())).expect("client builds"); + let h = client.handle(); + let diff = MapObjectsDiff { + observations: vec![obs(mission_id, 20)], + ignored_items: vec![ignored(mission_id, 20)], + }; + + // Act + let report = h.push_mapobjects_diff(mission_id, diff).await; + + // Assert + assert!(matches!( + report.observations, + PerEndpointStatus::MaxRetriesExceeded { .. } + )); + assert!(matches!( + report.ignored, + PerEndpointStatus::MaxRetriesExceeded { .. } + )); + assert_eq!(report.sync_state(), SyncState::Degraded); + + let detail = h.health().detail.unwrap_or_default(); + assert!( + detail.contains("push_sync_state=degraded"), + "health detail did not record degraded: {detail}" + ); + assert!( + detail.contains("mapobjects_push_pending=true"), + "health detail did not record pending: {detail}" + ); + + let disk_file = tmp + .path() + .join("mapobjects_push") + .join(format!("{mission_id}.json")); + assert!( + disk_file.exists(), + "disk file MUST remain when both endpoints fail" + ); +} + +#[tokio::test] +async fn ac4_crash_recovery_replays_pending_at_startup() { + // Arrange: pre-seed a disk file for a previously terminated mission `M0`, + // then start a fresh MissionClient pointing at a server that accepts both + // endpoints for `M0`. + let tmp = tempfile::tempdir().unwrap(); + let old_mission = "M0"; + let queue_dir = tmp.path().join("mapobjects_push"); + std::fs::create_dir_all(&queue_dir).unwrap(); + let pending_body = serde_json::json!({ + "observations": [ + { + "id": "00000000-0000-0000-0000-000000000001", + "h3_cell": 7, + "class": "tank", + "class_group": "armor", + "mission_id": old_mission, + "uav_id": "uav-test", + "observed_at_monotonic_ns": 1, + "observed_at_wallclock": "2026-05-19T12:00:00Z", + "gps_lat": 49.0, + "gps_lon": 31.0, + "mgrs": "X", + "size_width_m": 3.0, + "size_length_m": 6.0, + "confidence": 0.9, + "diff_kind": "NEW" + } + ], + "ignored_items": [] + }); + std::fs::write( + queue_dir.join(format!("{old_mission}.json")), + pending_body.to_string(), + ) + .unwrap(); + + let mock = MockServer::start().await; + Mock::given(method("POST")) + .and(path(format!("/missions/{old_mission}/mapobjects"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{old_mission}/mapobjects/ignored"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let reports = h.recover_pending_pushes().await; + + // Assert + assert_eq!(reports.len(), 1); + assert_eq!(reports[0].mission_id, old_mission); + assert_eq!(reports[0].sync_state(), SyncState::Synced); + let disk_file = queue_dir.join(format!("{old_mission}.json")); + assert!( + !disk_file.exists(), + "disk file should be deleted after successful crash-recovery replay" + ); +} + +#[tokio::test] +async fn ac5_large_diff_push_within_budget() { + // Arrange: 5 000 observations + 500 ignored items, both endpoints 200. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = "M-large"; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/mapobjects/ignored"))) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .mount(&mock) + .await; + let client = + MissionClient::new(options_for(&mock.uri(), 3, tmp.path())).expect("client builds"); + let h = client.handle(); + let observations = (0..5_000u128).map(|i| obs(mission_id, i)).collect(); + let ignored_items = (0..500u128).map(|i| ignored(mission_id, i)).collect(); + let diff = MapObjectsDiff { + observations, + ignored_items, + }; + + // Act + let started = Instant::now(); + let report = h.push_mapobjects_diff(mission_id, diff).await; + let elapsed = started.elapsed(); + + // Assert + assert_eq!(report.sync_state(), SyncState::Synced); + assert!( + elapsed < Duration::from_secs(120), + "push took {elapsed:?}; budget is 2 min" + ); +} diff --git a/crates/mission_client/tests/waypoint_post.rs b/crates/mission_client/tests/waypoint_post.rs new file mode 100644 index 0000000..99907eb --- /dev/null +++ b/crates/mission_client/tests/waypoint_post.rs @@ -0,0 +1,196 @@ +//! AZ-645 integration tests driven by `wiremock`. +//! +//! Coverage: +//! - AC-1: happy-path POST returns `Ok(MissionUpdateAck)` and the call is +//! observable on the server side; health `last_middle_waypoint_post_status` +//! is "ok". +//! - AC-2: a single 503 followed by a 200 succeeds on the second attempt +//! without surfacing the transient failure. +//! - AC-3: a 500-only run exhausts the bounded budget and returns +//! `Err(MaxRetriesExceeded)`; the error is surfaced — not swallowed — and +//! health goes Red. + +use std::time::Duration; + +use mission_client::{MissionClient, MissionClientOptions, PostError}; +use shared::health::HealthLevel; +use shared::models::mission::{Coordinate, MissionItem, MissionItemKind}; +use uuid::Uuid; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn patched_mission(mission_id: Uuid) -> mission_client::Mission { + mission_client::Mission { + mission_id, + schema_version: "1.0.0".into(), + items: vec![MissionItem { + id: Uuid::from_u128(0xaaaa_aaaa), + kind: MissionItemKind::Waypoint, + at: Some(Coordinate { + latitude: 49.1, + longitude: 31.2, + altitude_m: 100.0, + }), + region: vec![], + cruise_speed_mps: None, + target_classes: vec![], + }], + geofences: vec![], + return_point: Coordinate { + latitude: 49.0, + longitude: 31.0, + altitude_m: 0.0, + }, + } +} + +fn options_for( + mock: &MockServer, + post_attempts: u32, + tmp_dir: &std::path::Path, +) -> MissionClientOptions { + let mut o = MissionClientOptions::new(mock.uri()); + o.max_attempts = 3; + o.post_max_attempts = post_attempts; + o.backoff_base = Duration::from_millis(10); + o.backoff_cap = Duration::from_millis(50); + o.request_timeout = Duration::from_secs(2); + o.connect_timeout = Duration::from_secs(1); + o.state_dir = tmp_dir.to_path_buf(); + o +} + +#[tokio::test] +async fn ac1_happy_path_post() { + // Arrange + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = Uuid::from_u128(0x1111_1111); + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/middle-waypoint"))) + .respond_with( + ResponseTemplate::new(200).set_body_string( + serde_json::json!({ + "mission_id": mission_id.to_string(), + "revision": 7 + }) + .to_string(), + ), + ) + .expect(1) + .mount(&mock) + .await; + let client = MissionClient::new(options_for(&mock, 3, tmp.path())).expect("client builds"); + let h = client.handle(); + let patched = patched_mission(mission_id); + + // Act + let ack = h + .post_middle_waypoint(&mission_id.to_string(), &patched) + .await + .expect("happy POST"); + + // Assert + assert_eq!(ack.mission_id, mission_id.to_string()); + assert_eq!(ack.revision, Some(7)); + let detail = h.health().detail.unwrap_or_default(); + assert!( + detail.contains("last_middle_waypoint_post_status=ok"), + "health detail did not record OK: {detail}" + ); +} + +#[tokio::test] +async fn ac2_transient_failure_retries() { + // Arrange + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = Uuid::from_u128(0x2222_2222); + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/middle-waypoint"))) + .respond_with(ResponseTemplate::new(503)) + .up_to_n_times(1) + .mount(&mock) + .await; + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/middle-waypoint"))) + .respond_with(ResponseTemplate::new(200).set_body_string( + serde_json::json!({ "mission_id": mission_id.to_string() }).to_string(), + )) + .mount(&mock) + .await; + let client = MissionClient::new(options_for(&mock, 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let ack = h + .post_middle_waypoint(&mission_id.to_string(), &patched_mission(mission_id)) + .await + .expect("retry succeeds"); + + // Assert + assert_eq!(ack.mission_id, mission_id.to_string()); +} + +#[tokio::test] +async fn ac3_cap_exhaustion_bubbles_error() { + // Arrange + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = Uuid::from_u128(0x3333_3333); + Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/middle-waypoint"))) + .respond_with(ResponseTemplate::new(500)) + .mount(&mock) + .await; + let client = MissionClient::new(options_for(&mock, 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let err = h + .post_middle_waypoint(&mission_id.to_string(), &patched_mission(mission_id)) + .await + .unwrap_err(); + + // Assert + match err { + PostError::MaxRetriesExceeded { + attempts, + last_reason, + } => { + assert_eq!(attempts, 3); + assert!( + last_reason.contains("500"), + "expected 500 in last_reason, got {last_reason}" + ); + } + other => panic!("expected MaxRetriesExceeded, got {other:?}"), + } + assert_eq!(h.health().level, HealthLevel::Red); +} + +#[tokio::test] +async fn permanent_4xx_does_not_retry() { + // Arrange: a 400 should not trigger retries. + let tmp = tempfile::tempdir().unwrap(); + let mock = MockServer::start().await; + let mission_id = Uuid::from_u128(0x4444_4444); + let scoped = Mock::given(method("POST")) + .and(path(format!("/missions/{mission_id}/middle-waypoint"))) + .respond_with(ResponseTemplate::new(400).set_body_string("bad request")) + .expect(1) + .mount_as_scoped(&mock) + .await; + let client = MissionClient::new(options_for(&mock, 3, tmp.path())).expect("client builds"); + let h = client.handle(); + + // Act + let err = h + .post_middle_waypoint(&mission_id.to_string(), &patched_mission(mission_id)) + .await + .unwrap_err(); + + // Assert + assert!(matches!(err, PostError::Permanent(_))); + drop(scoped); +} diff --git a/crates/shared/contracts/mapobjects-bundle.json b/crates/shared/contracts/mapobjects-bundle.json new file mode 100644 index 0000000..d426802 --- /dev/null +++ b/crates/shared/contracts/mapobjects-bundle.json @@ -0,0 +1,133 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://azaion/contracts/mapobjects-bundle.json", + "title": "MapObjectsBundle", + "description": "Pre-flight MapObjects bundle returned by the central missions API. Wire contract co-owned with the external missions repo. Local validator and bundled-schema copy live in autopilot/crates/mission_client/src/internal/schema/mapobjects.rs (AZ-646).", + "type": "object", + "required": ["schema_version", "mission_id", "bbox", "as_of"], + "additionalProperties": false, + "properties": { + "schema_version": { + "type": "string", + "pattern": "^[0-9]+\\.[0-9]+\\.[0-9]+$" + }, + "mission_id": { "type": "string", "minLength": 1 }, + "bbox": { + "description": "[NW, SE] bounding box.", + "type": "array", + "minItems": 2, + "maxItems": 2, + "items": { "$ref": "#/definitions/coordinate" } + }, + "map_objects": { + "type": "array", + "items": { "$ref": "#/definitions/map_object" } + }, + "observations": { + "type": "array", + "items": { "$ref": "#/definitions/observation" } + }, + "ignored_items": { + "type": "array", + "items": { "$ref": "#/definitions/ignored_item" } + }, + "as_of": { "type": "string", "format": "date-time" }, + "freshness": { "type": "string", "enum": ["fresh", "stale"] } + }, + "definitions": { + "coordinate": { + "type": "object", + "required": ["latitude", "longitude", "altitude_m"], + "additionalProperties": false, + "properties": { + "latitude": { "type": "number", "minimum": -90, "maximum": 90 }, + "longitude": { "type": "number", "minimum": -180, "maximum": 180 }, + "altitude_m": { "type": "number" } + } + }, + "map_object": { + "type": "object", + "required": [ + "h3_cell", "mgrs_key", "class", "class_group", + "gps_lat", "gps_lon", "size_width_m", "size_length_m", + "confidence", "first_seen", "last_seen", "mission_id", + "source", "pending_upload" + ], + "additionalProperties": false, + "properties": { + "h3_cell": { "type": "integer", "minimum": 0 }, + "mgrs_key": { "type": "string" }, + "class": { "type": "string" }, + "class_group": { "type": "string" }, + "gps_lat": { "type": "number", "minimum": -90, "maximum": 90 }, + "gps_lon": { "type": "number", "minimum": -180, "maximum": 180 }, + "size_width_m": { "type": "number", "minimum": 0 }, + "size_length_m": { "type": "number", "minimum": 0 }, + "confidence": { "type": "number", "minimum": 0, "maximum": 1 }, + "first_seen": { "type": "string", "format": "date-time" }, + "last_seen": { "type": "string", "format": "date-time" }, + "mission_id": { "type": "string" }, + "source": { "type": "string", "enum": ["central_pulled", "local_observed"] }, + "pending_upload": { "type": "boolean" } + } + }, + "observation": { + "type": "object", + "required": [ + "id", "h3_cell", "class", "class_group", + "mission_id", "uav_id", + "observed_at_monotonic_ns", "observed_at_wallclock", + "gps_lat", "gps_lon", "mgrs", + "size_width_m", "size_length_m", "confidence", "diff_kind" + ], + "additionalProperties": false, + "properties": { + "id": { "type": "string", "format": "uuid" }, + "h3_cell": { "type": "integer", "minimum": 0 }, + "class": { "type": "string" }, + "class_group": { "type": "string" }, + "mission_id": { "type": "string" }, + "uav_id": { "type": "string" }, + "observed_at_monotonic_ns": { "type": "integer", "minimum": 0 }, + "observed_at_wallclock": { "type": "string", "format": "date-time" }, + "gps_lat": { "type": "number", "minimum": -90, "maximum": 90 }, + "gps_lon": { "type": "number", "minimum": -180, "maximum": 180 }, + "mgrs": { "type": "string" }, + "size_width_m": { "type": "number", "minimum": 0 }, + "size_length_m": { "type": "number", "minimum": 0 }, + "confidence": { "type": "number", "minimum": 0, "maximum": 1 }, + "diff_kind": { + "type": "string", + "enum": ["NEW", "MOVED", "EXISTING", "REMOVED_CANDIDATE"] + }, + "photo_ref": { "type": "string" }, + "raw_evidence": {} + } + }, + "ignored_item": { + "type": "object", + "required": [ + "id", "mgrs", "h3_cell", "class_group", + "decline_time", "mission_id", "retention_scope", + "source", "pending_upload" + ], + "additionalProperties": false, + "properties": { + "id": { "type": "string", "format": "uuid" }, + "mgrs": { "type": "string" }, + "h3_cell": { "type": "integer", "minimum": 0 }, + "class_group": { "type": "string" }, + "decline_time": { "type": "string", "format": "date-time" }, + "operator_id": { "type": "string" }, + "mission_id": { "type": "string" }, + "retention_scope": { + "type": "string", + "enum": ["mission", "session", "until_expiry"] + }, + "expires_at": { "type": "string", "format": "date-time" }, + "source": { "type": "string", "enum": ["central_pulled", "local_appended"] }, + "pending_upload": { "type": "boolean" } + } + } + } +} diff --git a/crates/shared/contracts/mapobjects-ignored.json b/crates/shared/contracts/mapobjects-ignored.json new file mode 100644 index 0000000..4730559 --- /dev/null +++ b/crates/shared/contracts/mapobjects-ignored.json @@ -0,0 +1,43 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://azaion/contracts/mapobjects-ignored.json", + "title": "MapObjectsIgnoredPush", + "description": "Post-flight ignored-items payload POSTed to /missions/{id}/mapobjects/ignored. Wire contract co-owned with the external missions repo. Local validator and bundled-schema copy live in autopilot/crates/mission_client/src/internal/schema/mapobjects.rs (AZ-647).", + "type": "object", + "required": ["mission_id", "ignored_items"], + "additionalProperties": false, + "properties": { + "mission_id": { "type": "string", "minLength": 1 }, + "ignored_items": { + "type": "array", + "items": { "$ref": "#/definitions/ignored_item" } + } + }, + "definitions": { + "ignored_item": { + "type": "object", + "required": [ + "id", "mgrs", "h3_cell", "class_group", + "decline_time", "mission_id", "retention_scope", + "source", "pending_upload" + ], + "additionalProperties": false, + "properties": { + "id": { "type": "string", "format": "uuid" }, + "mgrs": { "type": "string" }, + "h3_cell": { "type": "integer", "minimum": 0 }, + "class_group": { "type": "string" }, + "decline_time": { "type": "string", "format": "date-time" }, + "operator_id": { "type": "string" }, + "mission_id": { "type": "string" }, + "retention_scope": { + "type": "string", + "enum": ["mission", "session", "until_expiry"] + }, + "expires_at": { "type": "string", "format": "date-time" }, + "source": { "type": "string", "enum": ["central_pulled", "local_appended"] }, + "pending_upload": { "type": "boolean" } + } + } + } +} diff --git a/crates/shared/contracts/mapobjects-observations.json b/crates/shared/contracts/mapobjects-observations.json new file mode 100644 index 0000000..0270162 --- /dev/null +++ b/crates/shared/contracts/mapobjects-observations.json @@ -0,0 +1,51 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://azaion/contracts/mapobjects-observations.json", + "title": "MapObjectsObservationsPush", + "description": "Post-flight observations payload POSTed to /missions/{id}/mapobjects. Wire contract co-owned with the external missions repo. Local validator and bundled-schema copy live in autopilot/crates/mission_client/src/internal/schema/mapobjects.rs (AZ-647).", + "type": "object", + "required": ["mission_id", "observations"], + "additionalProperties": false, + "properties": { + "mission_id": { "type": "string", "minLength": 1 }, + "observations": { + "type": "array", + "items": { "$ref": "#/definitions/observation" } + } + }, + "definitions": { + "observation": { + "type": "object", + "required": [ + "id", "h3_cell", "class", "class_group", + "mission_id", "uav_id", + "observed_at_monotonic_ns", "observed_at_wallclock", + "gps_lat", "gps_lon", "mgrs", + "size_width_m", "size_length_m", "confidence", "diff_kind" + ], + "additionalProperties": false, + "properties": { + "id": { "type": "string", "format": "uuid" }, + "h3_cell": { "type": "integer", "minimum": 0 }, + "class": { "type": "string" }, + "class_group": { "type": "string" }, + "mission_id": { "type": "string" }, + "uav_id": { "type": "string" }, + "observed_at_monotonic_ns": { "type": "integer", "minimum": 0 }, + "observed_at_wallclock": { "type": "string", "format": "date-time" }, + "gps_lat": { "type": "number", "minimum": -90, "maximum": 90 }, + "gps_lon": { "type": "number", "minimum": -180, "maximum": 180 }, + "mgrs": { "type": "string" }, + "size_width_m": { "type": "number", "minimum": 0 }, + "size_length_m": { "type": "number", "minimum": 0 }, + "confidence": { "type": "number", "minimum": 0, "maximum": 1 }, + "diff_kind": { + "type": "string", + "enum": ["NEW", "MOVED", "EXISTING", "REMOVED_CANDIDATE"] + }, + "photo_ref": { "type": "string" }, + "raw_evidence": {} + } + } + } +}