[AZ-675] telemetry_stream Tonic gRPC server + per-client lossy queue
ci/woodpecker/push/build-arm Pipeline failed

Pins operator-link transport to gRPC server-streaming (closes
architecture Q2 in favour of gRPC). Adds first-time tonic / prost /
tonic-build infrastructure to the workspace; uses
protoc-bin-vendored so neither dev machines nor CI need system
protoc installed.

Design — back-pressure lives in the per-topic tokio::sync::broadcast
ring, drained directly by the tonic-streamed response via
BroadcastStream + StreamMap. No intermediate mpsc buffer that could
absorb back-pressure invisibly. Slow client overrun -> Lagged(n)
event -> per-(client_id, topic) drop counter incremented; healthy
clients on the same topic are unaffected.

Service surface — Subscribe(SubscribeRequest) -> stream
TelemetryMessage; five topics (TelemetrySample, GimbalState,
DetectionEvent, MovementCandidate, MapObjectsBundle); empty topics
list defaults to subscribe-all; empty client_id rejected; stream
drop decrements subscribed_clients via StreamGuard. TelemetrySink
push_detections is now real; push_frame still NotImplemented(AZ-676
video path).

Tests — 6 unit + 5 integration (AC-1..AC-3 via in-process gRPC
client, plus subscribe-all default + empty-client_id rejection).
Clippy on telemetry_stream clean.

Pre-existing mission_executor ac3 test polling race surfaces more
reliably under the new tonic build pressure; documented as
_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md
and unchanged by this batch.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 12:44:08 +03:00
parent 9fe0bbeac9
commit ff790bd639
15 changed files with 1700 additions and 25 deletions
Generated
+356 -3
View File
@@ -147,7 +147,7 @@ name = "autopilot"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"axum 0.7.9",
"chrono",
"clap",
"detection_client",
@@ -179,7 +179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http",
@@ -188,7 +188,7 @@ dependencies = [
"hyper",
"hyper-util",
"itoa",
"matchit",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
@@ -204,6 +204,31 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
dependencies = [
"axum-core 0.5.6",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde_core",
"sync_wrapper",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.5"
@@ -224,6 +249,24 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"tower-layer",
"tower-service",
]
[[package]]
name = "base64"
version = "0.22.1"
@@ -553,6 +596,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "fixedbitset"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.9"
@@ -911,6 +960,19 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
@@ -1110,6 +1172,15 @@ dependencies = [
"nom",
]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.18"
@@ -1254,6 +1325,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "mavlink_layer"
version = "0.1.0"
@@ -1361,6 +1438,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]]
name = "nix"
version = "0.26.4"
@@ -1550,6 +1633,37 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "petgraph"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455"
dependencies = [
"fixedbitset",
"hashbrown 0.15.5",
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.17"
@@ -1599,6 +1713,143 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prost"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
dependencies = [
"heck",
"itertools",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"pulldown-cmark",
"pulldown-cmark-to-cmark",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7"
dependencies = [
"prost",
]
[[package]]
name = "protoc-bin-vendored"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa"
dependencies = [
"protoc-bin-vendored-linux-aarch_64",
"protoc-bin-vendored-linux-ppcle_64",
"protoc-bin-vendored-linux-s390_64",
"protoc-bin-vendored-linux-x86_32",
"protoc-bin-vendored-linux-x86_64",
"protoc-bin-vendored-macos-aarch_64",
"protoc-bin-vendored-macos-x86_64",
"protoc-bin-vendored-win32",
]
[[package]]
name = "protoc-bin-vendored-linux-aarch_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c"
[[package]]
name = "protoc-bin-vendored-linux-ppcle_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c"
[[package]]
name = "protoc-bin-vendored-linux-s390_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0"
[[package]]
name = "protoc-bin-vendored-linux-x86_32"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5"
[[package]]
name = "protoc-bin-vendored-linux-x86_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78"
[[package]]
name = "protoc-bin-vendored-macos-aarch_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092"
[[package]]
name = "protoc-bin-vendored-macos-x86_64"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756"
[[package]]
name = "protoc-bin-vendored-win32"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3"
[[package]]
name = "pulldown-cmark"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9f068eba8e7071c5f9511831b44f32c740d5adf574e990f946ddb53db2f314e"
dependencies = [
"bitflags 2.11.1",
"memchr",
"unicase",
]
[[package]]
name = "pulldown-cmark-to-cmark"
version = "22.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90"
dependencies = [
"pulldown-cmark",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -2136,9 +2387,21 @@ name = "telemetry_stream"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"parking_lot",
"prost",
"protoc-bin-vendored",
"serde",
"serde_json",
"shared",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tonic",
"tonic-prost",
"tonic-prost-build",
"tracing",
"uuid",
]
[[package]]
@@ -2318,6 +2581,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@@ -2372,6 +2647,74 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tonic"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef"
dependencies = [
"async-trait",
"axum 0.8.9",
"base64",
"bytes",
"h2",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"socket2",
"sync_wrapper",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c68f61875ac5293cf72e6c8cf0158086428c82c37229e98c840878f1706b0322"
dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tonic-prost"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0"
dependencies = [
"bytes",
"prost",
"tonic",
]
[[package]]
name = "tonic-prost-build"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "654e5643eff75d7f8c99197ce1440ed19a3474eada74c12bbac488b2cafdae27"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types",
"quote",
"syn",
"tempfile",
"tonic-build",
]
[[package]]
name = "tower"
version = "0.5.3"
@@ -2380,11 +2723,15 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project-lite",
"slab",
"sync_wrapper",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -2517,6 +2864,12 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-ident"
version = "1.0.24"
+12
View File
@@ -62,6 +62,18 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
jsonschema = { version = "0.18", default-features = false }
tokio-serial = "5"
# gRPC (operator-link transport — see telemetry_stream / detection_client)
tonic = "0.14"
tonic-prost = "0.14"
prost = "0.14"
prost-types = "0.14"
tonic-prost-build = "0.14"
protoc-bin-vendored = "3"
tokio-stream = { version = "0.1", features = ["sync", "net"] }
# Lock-free / sync helpers
parking_lot = "0.12"
# Crypto / hashing
sha2 = "0.10"
@@ -0,0 +1,131 @@
# Batch 14 / Cycle 1 — Implementation Report
**Date**: 2026-05-20
**Tasks**: AZ-675
**Verdict**: PASS_WITH_WARNINGS
- Pre-existing autopilot lint from batch 4 (C5) still open.
- Pre-existing intermittent flake `mission_executor::state_machine::ac3_bounded_retry_then_success` (carried from batch 8) now fails reproducibly *under workspace load* on this dev box but still passes in isolation; root cause is a 5 ms polling-interval race in the test, not in `mission_executor` production code. Documented as A2 below — unchanged by this batch and unrelated to telemetry_stream.
## 1. Scope
| Ticket | Title | Crate | Complexity |
|---|---|---|---|
| AZ-675 | telemetry_stream Tonic gRPC server + per-client lossy queue | `telemetry_stream` | 3 |
Batch 14 is a single-ticket batch by deliberate choice. Both AZ-675 and AZ-658 were the only unblocked tasks; AZ-658 has an open architectural decision (which H.264 binding) and was held back. Picking AZ-675 also unblocks AZ-676 / AZ-677 / AZ-678 / AZ-679 (the full telemetry → operator-bridge frontier) for subsequent batches.
## 2. Approach
### Tonic infrastructure decision
`telemetry_stream/description.md §9` lists the operator-link protocol (WebRTC / WebSocket-H.264 / gRPC server-streaming) as an open architectural question. AZ-675's task spec, however, names **Tonic gRPC** explicitly and the Runtime Completeness gate says "Production code that must exist: real gRPC server". The user picked path **A: commit to Tonic now**, which:
- Pins the operator-link transport to gRPC server-streaming (closes architecture Q2 in the affirmative for the gRPC option).
- Adds **first-time** `tonic` / `prost` / `tonic-build` infrastructure to the workspace. The `detection_client/Cargo.toml` comment on line 16 anticipated this; the next ticket to need it (AZ-660) can now reuse the same workspace pins.
- Uses `protoc-bin-vendored` as a build-dependency so neither dev machines nor CI need a system `protoc` install. The build is hermetic and reproducible across platforms.
Workspace pins added: `tonic = "0.14"`, `tonic-prost = "0.14"`, `prost = "0.14"`, `prost-types = "0.14"`, `tonic-prost-build = "0.14"` (build-dep), `protoc-bin-vendored = "3"` (build-dep), `tokio-stream = "0.1"` with `sync,net` features (needed for `BroadcastStream` + `TcpListenerStream`), `parking_lot = "0.12"`.
### Back-pressure model — broadcast-direct, no intermediate buffer
The first draft of `internal/server.rs` used a per-client mpsc forwarder between the broadcast queue and the tonic stream. That hid the back-pressure: the forwarder blocked on `mpsc::send` long before the broadcast ring ever overflowed, so `RecvError::Lagged(n)` never fired and drop counters stayed at zero. **Lesson**: in a multi-stage queue where the *outer* stage is supposed to enforce drop-oldest, do not introduce a buffering middle stage that absorbs the back-pressure invisibly.
The shipped design feeds the broadcast receivers **directly** into the tonic-streamed response (via `tokio_stream::wrappers::BroadcastStream` merged through `tokio_stream::StreamMap`). When a wire/client is slow, tonic stops polling our stream → broadcast ring overruns that client's cursor → next poll yields `Err(BroadcastStreamRecvError::Lagged(n))` → drop counter incremented per (client_id, topic). Other clients are unaffected.
### What this batch ships in production
- **`proto/telemetry.proto`** — `TelemetryStream` service with a single server-streaming `Subscribe(SubscribeRequest) -> stream TelemetryMessage` RPC, five topics (`TelemetrySample`, `GimbalState`, `DetectionEvent`, `MovementCandidate`, `MapObjectsBundle`). Payloads are carried as opaque JSON in `bytes payload_json` so the canonical Rust models in `crates/shared/models/` stay authoritative.
- **`build.rs`** — wires `protoc-bin-vendored` into `tonic-prost-build` so codegen runs from `cargo build` alone.
- **`internal/publisher.rs`** — `TelemetryPublisher` with one `tokio::sync::broadcast` channel per topic, per-(client, topic) drop counters under `parking_lot::Mutex`, atomic `subscribed_clients` / `published_total` / `bytes_out_per_topic`.
- **`internal/server.rs`** — `TelemetryService` implementing `proto::telemetry_stream_server::TelemetryStream::subscribe`; validates `client_id` non-empty; resolves topic list (empty = subscribe-all); merges per-topic `BroadcastStream`s with `StreamMap`; converts `Lagged` into drop-counter updates; `StreamGuard` decrements `subscribed_clients` on stream drop.
- **`src/lib.rs`** — rewritten public surface:
- `TelemetryStreamConfig { listen_addr, topic_capacity, downlink_capacity }`.
- `TelemetryStream::spawn_grpc_server` (binds an addr) and `spawn_grpc_server_on(listener)` (binds a pre-resolved `std::net::TcpListener` — used by tests to pick ephemeral ports).
- `GrpcShutdown` RAII handle.
- `TelemetryStreamHandle::publish<T>(topic, &T)` non-blocking publish API.
- `TelemetryStreamHandle::snapshot()` for health-aggregator integration.
- `TelemetryStreamHandle::health()` flips yellow when any (client, topic) has ≥ 100 drops.
- `TelemetrySink::push_detections` is now real (publishes on `DetectionEvent` topic). `push_frame` still returns `NotImplemented(AZ-676)` because video carries different framing semantics that AZ-676 will pin.
## 3. Files touched
- `Cargo.toml` — workspace pins for tonic stack + tokio-stream features + parking_lot.
- `Cargo.lock` — regenerated for the new deps.
- `crates/telemetry_stream/Cargo.toml` — concrete deps + `build.rs` declaration.
- `crates/telemetry_stream/build.rs` — new (vendored protoc + tonic-prost-build).
- `crates/telemetry_stream/proto/telemetry.proto` — new.
- `crates/telemetry_stream/src/lib.rs` — rewrite (public surface).
- `crates/telemetry_stream/src/internal/mod.rs` — new.
- `crates/telemetry_stream/src/internal/proto.rs` — new (`tonic::include_proto!` hook).
- `crates/telemetry_stream/src/internal/publisher.rs` — new (with 4 unit tests).
- `crates/telemetry_stream/src/internal/server.rs` — new (gRPC service impl).
- `crates/telemetry_stream/tests/grpc_subscribe.rs` — new (5 integration tests covering AC-1..AC-3 + edge cases).
- `_docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md` — moved from `todo/`.
- `_docs/_autodev_state.md` — phase update.
- `_docs/03_implementation/batch_14_cycle1_report.md` — this report.
## 4. Test results
| Crate | Unit | Integration | Total |
|---|---|---|---|
| `telemetry_stream` | 6 | 5 | 11 |
Clippy: `cargo clippy -p telemetry_stream --all-targets -- -D warnings` is clean.
Workspace `cargo test --workspace`: all suites green **except** the pre-existing `mission_executor::state_machine::ac3_bounded_retry_then_success` flake — see A2.
### Acceptance criteria
| AC | Test | Status |
|---|---|---|
| AC-1 multiple subscribers receive same stream (ordering preserved) | `tests/grpc_subscribe.rs::ac1_multiple_subscribers_receive_same_stream` | ✅ |
| AC-2 slow subscriber drops oldest, healthy unaffected | `tests/grpc_subscribe.rs::ac2_slow_subscriber_drops_oldest_healthy_unaffected` + `internal/publisher.rs::slow_subscriber_lags_fast_subscriber_does_not` | ✅ |
| AC-3 disconnect cleanly removes subscriber | `tests/grpc_subscribe.rs::ac3_disconnect_decrements_subscribed_clients` | ✅ |
| Empty topics defaults to ALL | `tests/grpc_subscribe.rs::empty_topics_list_defaults_to_all` | ✅ |
| Empty client_id rejected at boundary | `tests/grpc_subscribe.rs::empty_client_id_is_rejected` | ✅ |
## 5. Findings (this batch)
### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name`
**Severity**: High (still blocks workspace `-D warnings` clippy gate)
**Status**: OPEN — carried since batch 4. Not introduced by this batch. Tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` and cumulative finding C5.
### A2. Pre-existing `ac3_bounded_retry_then_success` flake escalation
**Severity**: Medium (Test design)
**Category**: Tests
**Origin**: Batch 8 mission_executor; behaviour unchanged by this batch.
The test polls `handle.state()` every 5 ms while waiting for `MissionUploaded`, but with `tick_interval=5ms` and a one-rejected-then-accepted scripted driver, the FSM can pass *through* `MissionUploaded` faster than the poll cadence and the await reports `stuck at WaitAuto`. Confirmed pre-existing — `git stash` of batch-14 changes reproduces the same intermittent failure, and the test passes in isolation. The proximate cause is the test's polling design, not `mission_executor` production code.
This batch's new transitive deps (tonic/prost stack) increase background compile / runtime load on dev boxes, which may make the race more likely to lose. The fix belongs to a small focused test refactor (latch on FSM transition events instead of polling), filed as a leftover.
→ Filed `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`.
### A3. Architecture Q2 (operator-link protocol) now decided
**Severity**: Low (Architecture / doc sync)
**Detail**: `telemetry_stream/description.md §9` listed protocol as TBD. With AZ-675 shipping a Tonic-based gRPC server, this is effectively decided in favour of gRPC server-streaming. The architecture doc was not edited in this batch (out of scope; see C3 doc-sync sweep). When the doc sweep runs, this should move from "open question" to a recorded decision in `_docs/02_document/decision-rationale.md`.
## 6. Cumulative findings — open carry-over
Batch 14 is mid-triplet (13 / 14 / 15); cumulative review lands at the end of batch 15.
| ID | Severity | Category | Status |
|---|---|---|---|
| C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) |
| C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) |
| C3 | Low | Architecture | OPEN — `module-layout.md` drift: grows by `telemetry_stream/internal/{publisher,server,proto}.rs` this batch |
| C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) |
| C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy |
| C6 (new) | Medium | Tests | OPEN — `ac3_bounded_retry_then_success` polling race (see A2) |
| C7 (new) | Low | Architecture | OPEN — record Tonic-gRPC decision in `decision-rationale.md` (see A3) |
## 7. Next-batch candidates
- **AZ-678** — operator_bridge command authentication. Depends on AZ-675 (now done). 5 pts.
- **AZ-679** — operator_bridge POI surface. Depends on AZ-675 (now done) + uses the AZ-683 POI queue + AZ-685 decline path. 3 pts. Cleanly buildable as a Subscribe-style or push consumer of `MapObjectsBundle` / `POI` topics through the AZ-675 server.
- **AZ-676** — telemetry_stream video path. Depends on AZ-675 (now done) + AZ-657 (already done). 3 pts. Self-contained extension to the AZ-675 server.
- **AZ-677** — telemetry_stream MapObjects snapshot. Depends on AZ-675 (now done) + AZ-667 (not done — blocked).
- **AZ-658** — frame_ingest decoder. Still needs the H.264 binding decision (retina vs ffmpeg-rs vs gstreamer). 5 pts.
+3 -3
View File
@@ -6,9 +6,9 @@ step: 7
name: Implement
status: in_progress
sub_step:
phase: 23
name: tracker-update-in-testing
detail: "batch 13 (AZ-683) implemented and reviewed; awaiting commit + In Testing"
phase: 27
name: awaiting-push
detail: "batch 14 (AZ-675) committed (ebf4aef) + In Testing in Jira; awaiting user push approval"
retry_count: 0
cycle: 1
tracker: jira
@@ -0,0 +1,38 @@
# Leftover: `mission_executor::ac3_bounded_retry_then_success` polling race
**Timestamp**: 2026-05-20T08:30:00+02:00
**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13 as intermittent. Reproduces more reliably on dev box under batch 14 workspace test load (the new tonic stack increases build/runtime pressure).
**Severity**: Medium (test design, not production code)
**Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race.
## Symptom
```
test ac3_bounded_retry_then_success ... FAILED
thread 'ac3_bounded_retry_then_success' panicked at
crates/mission_executor/tests/state_machine.rs:116:
FSM did not reach MissionUploaded; stuck at WaitAuto
```
`WaitAuto` is the FSM state *after* `MissionUploaded`. The FSM passed *through* `MissionUploaded` faster than the test's 5 ms polling cadence could observe it. The post-assertion (`matches!(state, WaitAuto | MissionUploaded)`) acknowledges either is fine, but `await_state(target=MissionUploaded)` panics before that assertion runs.
## Root cause
`crates/mission_executor/tests/state_machine.rs` lines 100-118 — `await_state` polls every 5 ms; FSM `tick_interval` is also 5 ms; a successful retry+upload can complete in less than one polling interval.
## Recommended fix (out of scope for current batch)
Replace polling with an event latch:
- Have `MissionExecutorHandle::state_stream()` (or expose `tokio::sync::watch::Receiver<MissionState>`) so tests can `await` on the channel changing through the target state.
- Or: record a `Vec<MissionState>` history in `Inner` and assert the target is *in* the history at the end, not the current state.
Either approach is ~30 lines of test-only refactor. Production code does not need to change.
## Replay instructions
When working on `mission_executor` next (e.g. batch that touches the state machine or tick loop):
1. Pick one of the two fixes above.
2. Re-run `cargo test --workspace` to confirm flake is gone.
3. Delete this leftover.
+18
View File
@@ -6,9 +6,27 @@ rust-version.workspace = true
license.workspace = true
publish.workspace = true
authors.workspace = true
build = "build.rs"
[dependencies]
shared = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
prost = { workspace = true }
tonic = { workspace = true }
tonic-prost = { workspace = true }
parking_lot = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
[build-dependencies]
tonic-prost-build = { workspace = true }
protoc-bin-vendored = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
+19
View File
@@ -0,0 +1,19 @@
//! Build-time codegen for the AZ-675 telemetry gRPC contract.
//!
//! We use `protoc-bin-vendored` so the build is self-contained — no
//! system protoc install is required on dev or CI. The PROTOC env
//! var is set before invoking `tonic-prost-build`, which is what
//! tonic uses to locate the compiler.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let protoc = protoc_bin_vendored::protoc_bin_path()?;
std::env::set_var("PROTOC", protoc);
tonic_prost_build::configure()
.build_client(true)
.build_server(true)
.compile_protos(&["proto/telemetry.proto"], &["proto"])?;
println!("cargo:rerun-if-changed=proto/telemetry.proto");
Ok(())
}
@@ -0,0 +1,64 @@
// AZ-675 telemetry_stream — operator-bound gRPC contract.
//
// One service, one bi-directional Subscribe RPC. Client opens a stream
// declaring which topics it wants; server pushes messages for those
// topics until the client disconnects.
//
// The server enforces per-client back-pressure: when a client cannot
// keep up the oldest message in *that client's* queue is dropped and
// a per-(client, topic) drop counter is incremented. Other clients
// are unaffected.
//
// AZ-676 will add the video path (separate RPC, server-streamed binary
// frames). AZ-677 will add the MapObjectsBundle snapshot RPC. Keep
// those concerns out of this contract.
syntax = "proto3";
package autopilot.telemetry.v1;
// Topics a client can subscribe to. Repeated in SubscribeRequest so a
// single stream multiplexes whichever subset the client cares about.
enum Topic {
TOPIC_UNSPECIFIED = 0;
TOPIC_TELEMETRY_SAMPLE = 1;
TOPIC_GIMBAL_STATE = 2;
TOPIC_DETECTION_EVENT = 3;
TOPIC_MOVEMENT_CANDIDATE = 4;
TOPIC_MAP_OBJECTS_BUNDLE = 5;
}
message SubscribeRequest {
// Operator/client identifier. Plumbed into per-client drop counters
// and log lines. Free-form for AZ-675; AZ-678 (operator command
// auth) will tighten the format.
string client_id = 1;
repeated Topic topics = 2;
}
// Each message carries the topic tag + an opaque JSON payload. We
// don't pin schemas per topic in proto here because the canonical
// payload shapes are already authoritative in `crates/shared/models`
// (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate,
// MapObjectsBundle). Using JSON keeps the wire honest with what the
// rest of the system already serializes; if a topic later needs
// schema enforcement, add a typed message variant in a future bump.
message TelemetryMessage {
Topic topic = 1;
// Monotonic nanoseconds since the autopilot process started. Used
// by the operator for ordering and latency measurement.
uint64 monotonic_ts_ns = 2;
// Server-side per-client sequence number. Strictly increases per
// (client, topic) stream; gaps imply drops.
uint64 sequence = 3;
// Serialized JSON payload (utf-8). Topic determines the shape.
bytes payload_json = 4;
}
service TelemetryStream {
// Server-streaming subscribe. The client sends ONE SubscribeRequest;
// the server pushes TelemetryMessage values until the client cancels
// the stream or the server shuts down. The server applies per-
// client drop-oldest back-pressure if the client cannot keep up.
rpc Subscribe(SubscribeRequest) returns (stream TelemetryMessage);
}
@@ -0,0 +1,5 @@
//! Internal modules for `telemetry_stream`. Not part of the public API.
pub mod proto;
pub mod publisher;
pub mod server;
@@ -0,0 +1,10 @@
//! Generated tonic+prost code for the telemetry gRPC contract.
//!
//! The actual `.rs` file is produced at build time by `build.rs`
//! (see workspace `tonic-prost-build` / `protoc-bin-vendored` deps)
//! and dropped into `OUT_DIR`. We pull it in here under a stable
//! module path so the rest of the crate doesn't reach into `OUT_DIR`.
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("autopilot.telemetry.v1");
@@ -0,0 +1,314 @@
//! AZ-675 — multi-topic, per-client lossy publisher.
//!
//! The publisher owns one `tokio::sync::broadcast` channel per topic.
//! Each subscribed client gets per-topic receivers; falling behind
//! more than the channel capacity causes `tokio::sync::broadcast` to
//! return `RecvError::Lagged(n)` which the server-side stream
//! handler turns into a `drops_total{client_id, topic} += n`
//! increment. Slow clients never block the publisher, and a slow
//! client on one topic does not affect any other client or topic.
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use serde::Serialize;
use tokio::sync::broadcast;
use tracing::warn;
use crate::internal::proto::{TelemetryMessage, Topic};
/// Per-topic broadcast capacity. A client falling more than this many
/// messages behind on a topic experiences drops on that topic. The
/// value is chosen empirically: 10 Hz telemetry → 256 messages ≈ 25
/// seconds of buffering, which is more than enough for transient
/// modem stalls but bounded for memory.
pub const DEFAULT_TOPIC_CAPACITY: usize = 256;
/// All known topics, in the same order as the proto enum (skipping
/// UNSPECIFIED).
pub const ALL_TOPICS: &[Topic] = &[
Topic::TelemetrySample,
Topic::GimbalState,
Topic::DetectionEvent,
Topic::MovementCandidate,
Topic::MapObjectsBundle,
];
/// Errors returned by [`TelemetryPublisher::publish`]. Publish never
/// blocks on slow clients; the only producer-side failure is
/// serialization of the payload, which is a programmer error caught
/// at compile time everywhere we control. We surface it explicitly
/// rather than swallow.
#[derive(Debug, thiserror::Error)]
pub enum PublishError {
#[error("serialize topic={topic:?}: {source}")]
Serialize {
topic: Topic,
#[source]
source: serde_json::Error,
},
}
#[derive(Debug, Clone)]
pub struct PerTopicCounters {
pub published: u64,
pub bytes_out: u64,
}
/// Snapshot of publisher health for the [`crate::TelemetryStreamHandle::health`]
/// surface and the AZ-675 NFR observability hooks.
#[derive(Debug, Clone, Default)]
pub struct PublisherSnapshot {
pub subscribed_clients: usize,
pub published_total: u64,
pub per_topic: HashMap<Topic, PerTopicCounters>,
pub drops_total: HashMap<(String, Topic), u64>,
}
struct TopicChannel {
tx: broadcast::Sender<TelemetryMessage>,
seq: AtomicU64,
published: AtomicU64,
bytes_out: AtomicU64,
}
impl TopicChannel {
fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
tx,
seq: AtomicU64::new(0),
published: AtomicU64::new(0),
bytes_out: AtomicU64::new(0),
}
}
}
/// Drop counter map. Keyed by `(client_id, topic)`. We isolate it
/// behind a `parking_lot::Mutex` because writes happen from
/// per-client tonic stream tasks; reads happen from the health
/// surface — both are infrequent compared to the broadcast hot path.
type DropMap = Mutex<HashMap<(String, Topic), AtomicU64>>;
pub struct TelemetryPublisher {
topics: HashMap<Topic, TopicChannel>,
drops: DropMap,
subscribed_clients: AtomicUsize,
}
impl TelemetryPublisher {
/// Build a publisher with the same per-topic capacity for every
/// topic. Use [`new_with_capacities`] if a single topic needs a
/// different buffer (e.g. MapObjectsBundle which is bursty).
pub fn new(capacity: usize) -> Arc<Self> {
let mut topics = HashMap::with_capacity(ALL_TOPICS.len());
for &t in ALL_TOPICS {
topics.insert(t, TopicChannel::new(capacity));
}
Arc::new(Self {
topics,
drops: Mutex::new(HashMap::new()),
subscribed_clients: AtomicUsize::new(0),
})
}
pub fn default_capacity() -> Arc<Self> {
Self::new(DEFAULT_TOPIC_CAPACITY)
}
/// Serialise `payload` and fan out to every subscriber on `topic`.
///
/// Never blocks. `broadcast::Sender::send` returns the number of
/// receivers it queued for; a return of 0 means no current
/// subscribers, which is fine — we still bump the published
/// counter so the operator can confirm the producer is alive.
pub fn publish<T: Serialize>(&self, topic: Topic, payload: &T) -> Result<(), PublishError> {
let channel = match self.topics.get(&topic) {
Some(c) => c,
None => {
warn!(?topic, "unknown topic; dropping publish");
return Ok(());
}
};
let payload_json = serde_json::to_vec(payload)
.map_err(|e| PublishError::Serialize { topic, source: e })?;
let bytes_len = payload_json.len() as u64;
let seq = channel.seq.fetch_add(1, Ordering::Relaxed) + 1;
let msg = TelemetryMessage {
topic: topic as i32,
monotonic_ts_ns: shared::clock::MonoClock::new().elapsed_ns(),
sequence: seq,
payload_json,
};
let _ = channel.tx.send(msg);
channel.published.fetch_add(1, Ordering::Relaxed);
channel.bytes_out.fetch_add(bytes_len, Ordering::Relaxed);
Ok(())
}
/// Open a per-client receiver for `topic`. The caller (the
/// `Subscribe` RPC handler) is responsible for handling
/// `RecvError::Lagged` by calling [`record_drops`].
pub(crate) fn subscribe_topic(
&self,
topic: Topic,
) -> Option<broadcast::Receiver<TelemetryMessage>> {
self.topics.get(&topic).map(|c| c.tx.subscribe())
}
pub(crate) fn register_client(&self) {
self.subscribed_clients.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn deregister_client(&self) {
self.subscribed_clients.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_drops(&self, client_id: &str, topic: Topic, n: u64) {
if n == 0 {
return;
}
let mut map = self.drops.lock();
let key = (client_id.to_string(), topic);
map.entry(key)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(n, Ordering::Relaxed);
}
pub fn snapshot(&self) -> PublisherSnapshot {
let mut per_topic = HashMap::with_capacity(self.topics.len());
let mut total_published = 0u64;
for (&t, c) in &self.topics {
let published = c.published.load(Ordering::Relaxed);
let bytes_out = c.bytes_out.load(Ordering::Relaxed);
total_published = total_published.saturating_add(published);
per_topic.insert(
t,
PerTopicCounters {
published,
bytes_out,
},
);
}
let drops_map = self.drops.lock();
let drops_total: HashMap<(String, Topic), u64> = drops_map
.iter()
.map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
.collect();
PublisherSnapshot {
subscribed_clients: self.subscribed_clients.load(Ordering::Relaxed),
published_total: total_published,
per_topic,
drops_total,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Sample {
v: u32,
}
#[test]
fn publish_with_no_subscribers_is_no_op() {
// Arrange
let pub_ = TelemetryPublisher::new(8);
// Act
pub_.publish(Topic::TelemetrySample, &Sample { v: 1 })
.unwrap();
// Assert
let snap = pub_.snapshot();
assert_eq!(snap.subscribed_clients, 0);
assert_eq!(snap.per_topic[&Topic::TelemetrySample].published, 1);
}
#[tokio::test]
async fn slow_subscriber_lags_fast_subscriber_does_not() {
// Arrange — capacity 4 so 100 publishes overflow trivially.
let pub_ = TelemetryPublisher::new(4);
let mut fast = pub_.subscribe_topic(Topic::TelemetrySample).unwrap();
let mut slow = pub_.subscribe_topic(Topic::TelemetrySample).unwrap();
// Act — burst 100 messages WITHOUT giving `slow` a chance to drain.
for v in 0..100u32 {
pub_.publish(Topic::TelemetrySample, &Sample { v }).unwrap();
// fast drains immediately; slow does nothing.
let _ = fast.recv().await.unwrap();
}
// Drain `slow` and count lag.
let mut total_drops: u64 = 0;
let mut delivered: u64 = 0;
loop {
match slow.try_recv() {
Ok(_) => delivered += 1,
Err(broadcast::error::TryRecvError::Lagged(n)) => total_drops += n,
Err(_) => break,
}
}
// Assert — fast got all 100; slow lost most but delivered ≤ capacity.
assert!(delivered <= 4, "slow can hold at most capacity (4)");
assert_eq!(
total_drops + delivered,
100,
"every published msg accounted for"
);
assert!(total_drops > 0, "slow subscriber MUST have lagged");
}
#[test]
fn record_drops_accumulates_per_client_topic() {
// Arrange
let pub_ = TelemetryPublisher::new(8);
// Act
pub_.record_drops("client_a", Topic::TelemetrySample, 5);
pub_.record_drops("client_a", Topic::TelemetrySample, 3);
pub_.record_drops("client_a", Topic::GimbalState, 2);
pub_.record_drops("client_b", Topic::TelemetrySample, 1);
// Assert
let snap = pub_.snapshot();
assert_eq!(
snap.drops_total[&("client_a".to_string(), Topic::TelemetrySample)],
8
);
assert_eq!(
snap.drops_total[&("client_a".to_string(), Topic::GimbalState)],
2
);
assert_eq!(
snap.drops_total[&("client_b".to_string(), Topic::TelemetrySample)],
1
);
}
#[test]
fn register_deregister_balance_tracks_subscribed_clients() {
// Arrange
let pub_ = TelemetryPublisher::new(8);
assert_eq!(pub_.snapshot().subscribed_clients, 0);
// Act
pub_.register_client();
pub_.register_client();
assert_eq!(pub_.snapshot().subscribed_clients, 2);
pub_.deregister_client();
// Assert
assert_eq!(pub_.snapshot().subscribed_clients, 1);
}
}
@@ -0,0 +1,127 @@
//! AZ-675 — gRPC `TelemetryStream::Subscribe` service implementation.
//!
//! The client sends a single `SubscribeRequest`; the server returns a
//! server-streaming response built directly from per-topic
//! `BroadcastStream`s merged with `StreamMap`. The tonic transport
//! is what polls our stream — when the wire (or the operator client)
//! cannot keep up, the broadcast ring overflows that client's cursor
//! and `BroadcastStream` yields `Err(BroadcastStreamRecvError::Lagged(n))`
//! on the next poll. That is the *only* place drop accounting
//! happens: there is no intermediate mpsc buffer that could absorb
//! back-pressure and hide lag.
//!
//! `StreamGuard` decrements `subscribed_clients` on stream drop.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt, StreamMap};
use tonic::{Request, Response, Status};
use tracing::{info, warn};
use crate::internal::proto::telemetry_stream_server::TelemetryStream;
use crate::internal::proto::{SubscribeRequest, TelemetryMessage, Topic};
use crate::internal::publisher::{TelemetryPublisher, ALL_TOPICS};
pub struct TelemetryService {
publisher: Arc<TelemetryPublisher>,
}
impl TelemetryService {
pub fn new(publisher: Arc<TelemetryPublisher>) -> Self {
Self { publisher }
}
}
type SubscribeStream = Pin<Box<dyn Stream<Item = Result<TelemetryMessage, Status>> + Send>>;
#[tonic::async_trait]
impl TelemetryStream for TelemetryService {
type SubscribeStream = SubscribeStream;
async fn subscribe(
&self,
request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let req = request.into_inner();
if req.client_id.trim().is_empty() {
return Err(Status::invalid_argument("client_id is required"));
}
let client_id = req.client_id.clone();
let requested: Vec<Topic> = if req.topics.is_empty() {
ALL_TOPICS.to_vec()
} else {
let mut out = Vec::with_capacity(req.topics.len());
for raw in &req.topics {
let t = Topic::try_from(*raw)
.map_err(|_| Status::invalid_argument(format!("unknown topic {raw}")))?;
if matches!(t, Topic::Unspecified) {
return Err(Status::invalid_argument("TOPIC_UNSPECIFIED not allowed"));
}
out.push(t);
}
out
};
let mut map: StreamMap<Topic, BroadcastStream<TelemetryMessage>> = StreamMap::new();
for &t in &requested {
match self.publisher.subscribe_topic(t) {
Some(rx) => {
map.insert(t, BroadcastStream::new(rx));
}
None => {
return Err(Status::failed_precondition(format!(
"topic {t:?} not registered"
)))
}
}
}
self.publisher.register_client();
info!(client_id = %client_id, topics = ?requested, "telemetry subscribe");
let publisher = Arc::clone(&self.publisher);
let cid = client_id.clone();
let stream = map.filter_map(move |(topic, item)| match item {
Ok(msg) => Some(Ok(msg)),
Err(BroadcastStreamRecvError::Lagged(n)) => {
warn!(client_id = %cid, ?topic, dropped = n, "slow client lagged");
publisher.record_drops(&cid, topic, n);
None
}
});
let stream = StreamGuard {
inner: stream,
publisher: Arc::clone(&self.publisher),
};
Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
}
}
/// Decrement `subscribed_clients` when the per-client outbound
/// stream is dropped (tonic drops the stream when the client side
/// goes away).
struct StreamGuard<S> {
inner: S,
publisher: Arc<TelemetryPublisher>,
}
impl<S: Stream + Unpin> Stream for StreamGuard<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<S> Drop for StreamGuard<S> {
fn drop(&mut self) {
self.publisher.deregister_client();
}
}
+237 -19
View File
@@ -1,57 +1,224 @@
//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink.
//!
//! Real implementation lands in:
//! - AZ-675 `telemetry_stream_grpc_server`
//! - AZ-676 `telemetry_stream_video_path`
//! - AZ-677 `telemetry_stream_mapobjects_snapshot`
//! Real implementations:
//! - **AZ-675 (this crate, this batch)**: Tonic gRPC server, per-client
//! bounded queue, drop-oldest back-pressure, drop counters. Topics:
//! `TelemetrySample`, `GimbalState`, `DetectionEvent`,
//! `MovementCandidate`, `MapObjectsBundle`.
//! - **AZ-676**: video frame topic (separate RPC, server-streamed
//! binary payloads).
//! - **AZ-677**: diff-based snapshot emission for `MapObjectsBundle`.
//! - **AZ-678+**: command-auth on the return path (operator_bridge).
pub mod internal;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tonic::transport::Server;
use shared::contracts::TelemetrySink;
use shared::error::{AutopilotError, Result};
use shared::health::ComponentHealth;
use shared::health::{ComponentHealth, HealthLevel};
use shared::models::detection::DetectionBatch;
use shared::models::frame::Frame;
use shared::models::operator::OperatorCommand;
use crate::internal::proto::telemetry_stream_server::TelemetryStreamServer;
use crate::internal::proto::Topic;
use crate::internal::publisher::{TelemetryPublisher, DEFAULT_TOPIC_CAPACITY};
use crate::internal::server::TelemetryService;
pub use crate::internal::proto::{
telemetry_stream_client::TelemetryStreamClient, SubscribeRequest, TelemetryMessage,
Topic as TelemetryTopic,
};
pub use crate::internal::publisher::{
PerTopicCounters, PublishError, PublisherSnapshot, ALL_TOPICS,
};
const NAME: &str = "telemetry_stream";
/// Per-(client, topic) drop rate at or above which health flips to
/// yellow. Picked to surface persistent slow consumers without
/// flapping on a single transient lag spike.
const DROP_YELLOW_THRESHOLD: u64 = 100;
#[derive(Debug, Clone)]
pub struct TelemetryStreamConfig {
/// Where the Tonic gRPC server binds. `0.0.0.0:50061` by default.
pub listen_addr: SocketAddr,
/// Per-topic broadcast capacity (per subscriber buffer).
pub topic_capacity: usize,
/// Bounded capacity of the downlink command channel that feeds
/// `operator_bridge`.
pub downlink_capacity: usize,
}
impl Default for TelemetryStreamConfig {
fn default() -> Self {
Self {
listen_addr: "0.0.0.0:50061".parse().expect("hardcoded addr parses"),
topic_capacity: DEFAULT_TOPIC_CAPACITY,
downlink_capacity: 64,
}
}
}
pub struct TelemetryStream {
publisher: Arc<TelemetryPublisher>,
commands_tx: mpsc::Sender<OperatorCommand>,
commands_rx: Option<mpsc::Receiver<OperatorCommand>>,
config: TelemetryStreamConfig,
}
impl TelemetryStream {
pub fn new(downlink_capacity: usize) -> Self {
let (commands_tx, commands_rx) = mpsc::channel(downlink_capacity);
Self::with_config(TelemetryStreamConfig {
downlink_capacity,
..TelemetryStreamConfig::default()
})
}
pub fn with_config(config: TelemetryStreamConfig) -> Self {
let publisher = TelemetryPublisher::new(config.topic_capacity);
let (commands_tx, commands_rx) = mpsc::channel(config.downlink_capacity);
Self {
publisher,
commands_tx,
commands_rx: Some(commands_rx),
config,
}
}
pub fn handle(&self) -> TelemetryStreamHandle {
TelemetryStreamHandle {
publisher: Arc::clone(&self.publisher),
commands_tx: self.commands_tx.clone(),
}
}
/// Take the downlink command receiver. The composition root forwards it to
/// `operator_bridge` as `Receiver<OperatorCommand>`.
/// Take the downlink command receiver. The composition root
/// forwards it to `operator_bridge` as `Receiver<OperatorCommand>`.
pub fn take_command_receiver(&mut self) -> Option<mpsc::Receiver<OperatorCommand>> {
self.commands_rx.take()
}
/// Spawn the Tonic server. Returns a JoinHandle that runs until
/// `shutdown` is signalled (closing the returned `shutdown_tx`).
/// The server is bound on `config.listen_addr`.
pub fn spawn_grpc_server(
&self,
) -> Result<(
JoinHandle<std::result::Result<(), tonic::transport::Error>>,
GrpcShutdown,
)> {
let listen_addr = self.config.listen_addr;
let publisher = Arc::clone(&self.publisher);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
let join = tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_shutdown(listen_addr, async move {
let _ = shutdown_rx.await;
})
.await
});
Ok((
join,
GrpcShutdown {
tx: Some(shutdown_tx),
},
))
}
/// Spawn the Tonic server bound on a specific `TcpListener`.
/// Useful for tests that need to know the actual port ahead of
/// time (bind to `127.0.0.1:0` then read the assigned port).
pub fn spawn_grpc_server_on(
&self,
listener: std::net::TcpListener,
) -> Result<(
JoinHandle<std::result::Result<(), tonic::transport::Error>>,
GrpcShutdown,
)> {
listener
.set_nonblocking(true)
.map_err(|e| AutopilotError::Internal(format!("set_nonblocking: {e}")))?;
let tokio_listener = tokio::net::TcpListener::from_std(listener)
.map_err(|e| AutopilotError::Internal(format!("TcpListener::from_std: {e}")))?;
let stream = tokio_stream::wrappers::TcpListenerStream::new(tokio_listener);
let publisher = Arc::clone(&self.publisher);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
let join = tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming_shutdown(stream, async move {
let _ = shutdown_rx.await;
})
.await
});
Ok((
join,
GrpcShutdown {
tx: Some(shutdown_tx),
},
))
}
}
/// RAII shutdown trigger for the spawned gRPC server. Drop the value
/// or call `shutdown()` to stop the server.
pub struct GrpcShutdown {
tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl GrpcShutdown {
pub fn shutdown(mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
impl Drop for GrpcShutdown {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
#[derive(Clone)]
pub struct TelemetryStreamHandle {
publisher: Arc<TelemetryPublisher>,
commands_tx: mpsc::Sender<OperatorCommand>,
}
impl TelemetryStreamHandle {
/// Inject an operator command. Production path is fed by the downlink
/// receiver in `internal::downlink/*`; tests can call this directly.
/// Publish a payload on `topic`. Never blocks the caller; slow
/// subscribers experience drops accounted in [`snapshot`].
pub fn publish<T: serde::Serialize>(
&self,
topic: TelemetryTopic,
payload: &T,
) -> std::result::Result<(), PublishError> {
self.publisher.publish(topic, payload)
}
/// Inject an operator command downlink. Production path is fed
/// by the gRPC return half once AZ-678 lands; tests may call this
/// directly.
pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> {
self.commands_tx
.send(command)
@@ -59,8 +226,32 @@ impl TelemetryStreamHandle {
.map_err(|_| AutopilotError::Internal("downlink channel closed".into()))
}
pub fn snapshot(&self) -> PublisherSnapshot {
self.publisher.snapshot()
}
pub fn health(&self) -> ComponentHealth {
ComponentHealth::disabled(NAME)
let snap = self.publisher.snapshot();
let mut h = ComponentHealth::green(NAME);
let hot_drops: Vec<_> = snap
.drops_total
.iter()
.filter(|(_, &v)| v >= DROP_YELLOW_THRESHOLD)
.collect();
let detail = format!(
"subscribers={} published_total={} hot_drop_pairs={}",
snap.subscribed_clients,
snap.published_total,
hot_drops.len()
);
if !hot_drops.is_empty() {
h.level = HealthLevel::Yellow;
}
h.detail = Some(detail);
h
}
}
@@ -68,14 +259,14 @@ impl TelemetryStreamHandle {
impl TelemetrySink for TelemetryStreamHandle {
async fn push_frame(&self, _frame: Frame) -> Result<()> {
Err(AutopilotError::NotImplemented(
"telemetry_stream::push_frame (AZ-676)",
"telemetry_stream::push_frame (AZ-676 video path)",
))
}
async fn push_detections(&self, _batch: DetectionBatch) -> Result<()> {
Err(AutopilotError::NotImplemented(
"telemetry_stream::push_detections (AZ-675)",
))
async fn push_detections(&self, batch: DetectionBatch) -> Result<()> {
self.publisher
.publish(Topic::DetectionEvent, &batch)
.map_err(|e| AutopilotError::Internal(format!("publish detections: {e}")))
}
}
@@ -84,8 +275,35 @@ mod tests {
use super::*;
#[test]
fn it_compiles() {
let h = TelemetryStream::new(8).handle();
assert_eq!(h.health().level, shared::health::HealthLevel::Disabled);
fn handle_starts_with_zero_subscribers_and_green_health() {
// Arrange
let s = TelemetryStream::new(8);
let h = s.handle();
// Act
let snap = h.snapshot();
let health = h.health();
// Assert
assert_eq!(snap.subscribed_clients, 0);
assert_eq!(snap.published_total, 0);
assert_eq!(health.level, HealthLevel::Green);
}
#[test]
fn publish_without_subscribers_is_no_op_but_counts() {
// Arrange
let s = TelemetryStream::new(8);
let h = s.handle();
// Act
h.publish(
TelemetryTopic::TelemetrySample,
&serde_json::json!({"v": 1}),
)
.unwrap();
// Assert
assert_eq!(h.snapshot().per_topic[&Topic::TelemetrySample].published, 1);
}
}
@@ -0,0 +1,366 @@
//! AZ-675 integration tests — Tonic server + per-client lossy queue
//! exercised through an in-process gRPC client (the only stub allowed
//! per the task's Runtime Completeness gate).
use std::net::TcpListener;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tonic::transport::{Channel, Endpoint};
use tonic::Request;
use telemetry_stream::{SubscribeRequest, TelemetryStream, TelemetryStreamClient, TelemetryTopic};
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Sample {
n: u32,
}
/// Bind a TCP listener on `127.0.0.1:0`, return the listener and the
/// resolved port. The test stack uses ephemeral ports so multiple
/// integration tests in the same suite never collide.
fn bind_ephemeral() -> (TcpListener, u16) {
let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral");
let port = l.local_addr().unwrap().port();
(l, port)
}
async fn connect(port: u16) -> TelemetryStreamClient<Channel> {
let url = format!("http://127.0.0.1:{port}");
let endpoint = Endpoint::from_shared(url)
.unwrap()
.connect_timeout(Duration::from_secs(2));
// Tiny retry loop — the server can take a few ms to bind.
for _ in 0..50 {
if let Ok(c) = TelemetryStreamClient::connect(endpoint.clone()).await {
return c;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("gRPC client failed to connect within ~1s");
}
/// AC-1 — multiple subscribers receive the same stream, ordering preserved.
#[tokio::test]
async fn ac1_multiple_subscribers_receive_same_stream() {
// Arrange
let (listener, port) = bind_ephemeral();
let server = TelemetryStream::new(8);
let handle = server.handle();
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
let mut c1 = connect(port).await;
let mut c2 = connect(port).await;
let mut c3 = connect(port).await;
let req = |id: &str| {
Request::new(SubscribeRequest {
client_id: id.to_string(),
topics: vec![TelemetryTopic::TelemetrySample as i32],
})
};
let mut s1 = c1.subscribe(req("a")).await.unwrap().into_inner();
let mut s2 = c2.subscribe(req("b")).await.unwrap().into_inner();
let mut s3 = c3.subscribe(req("c")).await.unwrap().into_inner();
// Give the server time to register all three before publishing.
for _ in 0..50 {
if handle.snapshot().subscribed_clients == 3 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(handle.snapshot().subscribed_clients, 3);
// Act
for n in 0..100u32 {
handle
.publish(TelemetryTopic::TelemetrySample, &Sample { n })
.unwrap();
}
async fn collect100(
s: &mut tonic::Streaming<telemetry_stream::TelemetryMessage>,
) -> Vec<Sample> {
let mut out = Vec::new();
while out.len() < 100 {
let msg = timeout(Duration::from_secs(2), s.next())
.await
.expect("ac1 client did not receive 100 in 2s")
.expect("stream ended")
.expect("server status");
let sample: Sample = serde_json::from_slice(&msg.payload_json).unwrap();
out.push(sample);
}
out
}
let r1 = collect100(&mut s1).await;
let r2 = collect100(&mut s2).await;
let r3 = collect100(&mut s3).await;
// Assert — all three receive all 100 in order.
let expected: Vec<Sample> = (0..100).map(|n| Sample { n }).collect();
assert_eq!(r1, expected);
assert_eq!(r2, expected);
assert_eq!(r3, expected);
}
/// AC-2 — slow subscriber drops oldest, healthy unaffected.
///
/// We can't simulate "client never polls" through real tonic+H2
/// because the OS TCP + HTTP/2 receive buffers (often 256 KB+) will
/// absorb a small JSON burst entirely with no observable drop. The
/// realistic AC-2 condition is "slow drain rate" — the slow client
/// polls but at a fraction of the publish rate. With large payloads
/// and a small per-topic broadcast capacity, the slow client's
/// broadcast cursor falls behind the producer and `BroadcastStream`
/// emits `Lagged(n)` — the only signal AZ-675 wires into the per-
/// `(client_id, topic)` drop counters.
#[tokio::test]
async fn ac2_slow_subscriber_drops_oldest_healthy_unaffected() {
// Arrange — capacity 4, big payloads so wire back-pressure
// propagates fast and broadcast lag is observable end-to-end.
let (listener, port) = bind_ephemeral();
let cfg = telemetry_stream::TelemetryStreamConfig {
topic_capacity: 4,
..telemetry_stream::TelemetryStreamConfig::default()
};
let server = TelemetryStream::with_config(cfg);
let handle = server.handle();
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
#[derive(Serialize)]
struct Big {
n: u32,
// 64 KB filler per message — fills H2 receive window quickly.
pad: Vec<u8>,
}
let mut c_fast = connect(port).await;
let mut c_slow = connect(port).await;
let req = |id: &str| {
Request::new(SubscribeRequest {
client_id: id.to_string(),
topics: vec![TelemetryTopic::TelemetrySample as i32],
})
};
let mut s_fast = c_fast.subscribe(req("fast")).await.unwrap().into_inner();
let mut s_slow = c_slow.subscribe(req("slow")).await.unwrap().into_inner();
for _ in 0..50 {
if handle.snapshot().subscribed_clients == 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(handle.snapshot().subscribed_clients, 2);
let total: u32 = 300;
// Fast drains immediately, slow drains 1 msg every 50ms (way below publish rate).
let fast_task = tokio::spawn(async move {
let mut seen = 0u32;
while seen < total {
let _msg = timeout(Duration::from_secs(15), s_fast.next())
.await
.expect("fast did not receive everything in 15s")
.expect("stream ended")
.expect("server status");
seen += 1;
}
seen
});
let slow_task = tokio::spawn(async move {
let mut received = 0u32;
let deadline = std::time::Instant::now() + Duration::from_secs(30);
while std::time::Instant::now() < deadline {
match timeout(Duration::from_millis(500), s_slow.next()).await {
Ok(Some(Ok(_msg))) => received += 1,
Ok(Some(Err(_))) => break,
Ok(None) => break,
Err(_) => break,
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
received
});
// Act — burst publishes at ~1ms cadence with 64 KB payloads so
// broadcast capacity is overrun for the slow consumer.
let payload_pad = vec![0u8; 64 * 1024];
for n in 0..total {
handle
.publish(
TelemetryTopic::TelemetrySample,
&Big {
n,
pad: payload_pad.clone(),
},
)
.unwrap();
tokio::time::sleep(Duration::from_millis(2)).await;
}
let fast_count = fast_task.await.unwrap();
let slow_count = slow_task.await.unwrap();
let snap = handle.snapshot();
let slow_drops = snap
.drops_total
.get(&("slow".to_string(), TelemetryTopic::TelemetrySample))
.copied()
.unwrap_or(0);
let fast_drops = snap
.drops_total
.get(&("fast".to_string(), TelemetryTopic::TelemetrySample))
.copied()
.unwrap_or(0);
// Assert
assert_eq!(fast_count, total, "fast client receives every message");
assert_eq!(fast_drops, 0, "fast client MUST NOT have drops");
assert!(
slow_drops > 0,
"slow client MUST have lagged at some point (got 0 drops; slow_count={slow_count})"
);
assert!(
slow_count < total,
"slow client should have lost messages (got {slow_count}/{total})"
);
assert!(
(slow_count as u64) + slow_drops <= (total as u64) + 4,
"accounted={} should not exceed total+capacity",
(slow_count as u64) + slow_drops
);
}
/// AC-3 — disconnect cleanly removes subscriber.
#[tokio::test]
async fn ac3_disconnect_decrements_subscribed_clients() {
// Arrange
let (listener, port) = bind_ephemeral();
let server = TelemetryStream::new(8);
let handle = server.handle();
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
let mut c1 = connect(port).await;
let req = || {
Request::new(SubscribeRequest {
client_id: "ephemeral".to_string(),
topics: vec![],
})
};
let s1 = c1.subscribe(req()).await.unwrap().into_inner();
for _ in 0..50 {
if handle.snapshot().subscribed_clients == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(handle.snapshot().subscribed_clients, 1);
// Act — drop the stream (simulates client cancel).
drop(s1);
drop(c1);
// Assert — register/deregister is eventually consistent; wait briefly.
let mut final_count = 99;
for _ in 0..100 {
final_count = handle.snapshot().subscribed_clients;
if final_count == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(
final_count, 0,
"disconnect must decrement subscribed_clients"
);
}
/// Subscribing to an empty `topics` list defaults to ALL topics.
#[tokio::test]
async fn empty_topics_list_defaults_to_all() {
// Arrange
let (listener, port) = bind_ephemeral();
let server = TelemetryStream::new(8);
let handle = server.handle();
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
let mut client = connect(port).await;
let mut stream = client
.subscribe(Request::new(SubscribeRequest {
client_id: "all".to_string(),
topics: vec![],
}))
.await
.unwrap()
.into_inner();
for _ in 0..50 {
if handle.snapshot().subscribed_clients == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
// Act — publish one message on each topic.
handle
.publish(TelemetryTopic::TelemetrySample, &Sample { n: 1 })
.unwrap();
handle
.publish(TelemetryTopic::GimbalState, &Sample { n: 2 })
.unwrap();
handle
.publish(TelemetryTopic::DetectionEvent, &Sample { n: 3 })
.unwrap();
handle
.publish(TelemetryTopic::MovementCandidate, &Sample { n: 4 })
.unwrap();
handle
.publish(TelemetryTopic::MapObjectsBundle, &Sample { n: 5 })
.unwrap();
let mut seen_topics = std::collections::HashSet::new();
while seen_topics.len() < 5 {
let msg = timeout(Duration::from_secs(2), stream.next())
.await
.expect("did not receive all 5 topics in 2s")
.unwrap()
.unwrap();
seen_topics.insert(msg.topic);
}
// Assert
assert_eq!(seen_topics.len(), 5);
}
/// Empty client_id is rejected at the server boundary.
#[tokio::test]
async fn empty_client_id_is_rejected() {
// Arrange
let (listener, port) = bind_ephemeral();
let server = TelemetryStream::new(8);
let _h = server.handle();
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
let mut client = connect(port).await;
// Act
let err = client
.subscribe(Request::new(SubscribeRequest {
client_id: String::new(),
topics: vec![],
}))
.await
.expect_err("empty client_id must return Status error");
// Assert
assert_eq!(err.code(), tonic::Code::InvalidArgument);
}