From ff790bd639768e9dc47d733cc23399b2e283ac46 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 20 May 2026 12:44:08 +0300 Subject: [PATCH] [AZ-675] telemetry_stream Tonic gRPC server + per-client lossy queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pins operator-link transport to gRPC server-streaming (closes architecture Q2 in favour of gRPC). Adds first-time tonic / prost / tonic-build infrastructure to the workspace; uses protoc-bin-vendored so neither dev machines nor CI need system protoc installed. Design — back-pressure lives in the per-topic tokio::sync::broadcast ring, drained directly by the tonic-streamed response via BroadcastStream + StreamMap. No intermediate mpsc buffer that could absorb back-pressure invisibly. Slow client overrun -> Lagged(n) event -> per-(client_id, topic) drop counter incremented; healthy clients on the same topic are unaffected. Service surface — Subscribe(SubscribeRequest) -> stream TelemetryMessage; five topics (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate, MapObjectsBundle); empty topics list defaults to subscribe-all; empty client_id rejected; stream drop decrements subscribed_clients via StreamGuard. TelemetrySink push_detections is now real; push_frame still NotImplemented(AZ-676 video path). Tests — 6 unit + 5 integration (AC-1..AC-3 via in-process gRPC client, plus subscribe-all default + empty-client_id rejection). Clippy on telemetry_stream clean. Pre-existing mission_executor ac3 test polling race surfaces more reliably under the new tonic build pressure; documented as _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md and unchanged by this batch. Co-authored-by: Cursor --- Cargo.lock | 359 ++++++++++++++++- Cargo.toml | 12 + .../AZ-675_telemetry_stream_grpc_server.md | 0 .../batch_14_cycle1_report.md | 131 +++++++ _docs/_autodev_state.md | 6 +- .../2026-05-20_mission_executor_ac3_flake.md | 38 ++ crates/telemetry_stream/Cargo.toml | 18 + crates/telemetry_stream/build.rs | 19 + crates/telemetry_stream/proto/telemetry.proto | 64 +++ crates/telemetry_stream/src/internal/mod.rs | 5 + crates/telemetry_stream/src/internal/proto.rs | 10 + .../src/internal/publisher.rs | 314 +++++++++++++++ .../telemetry_stream/src/internal/server.rs | 127 ++++++ crates/telemetry_stream/src/lib.rs | 256 +++++++++++- .../telemetry_stream/tests/grpc_subscribe.rs | 366 ++++++++++++++++++ 15 files changed, 1700 insertions(+), 25 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-675_telemetry_stream_grpc_server.md (100%) create mode 100644 _docs/03_implementation/batch_14_cycle1_report.md create mode 100644 _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md create mode 100644 crates/telemetry_stream/build.rs create mode 100644 crates/telemetry_stream/proto/telemetry.proto create mode 100644 crates/telemetry_stream/src/internal/mod.rs create mode 100644 crates/telemetry_stream/src/internal/proto.rs create mode 100644 crates/telemetry_stream/src/internal/publisher.rs create mode 100644 crates/telemetry_stream/src/internal/server.rs create mode 100644 crates/telemetry_stream/tests/grpc_subscribe.rs diff --git a/Cargo.lock b/Cargo.lock index ad95f7f..924c1cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,7 +147,7 @@ name = "autopilot" version = "0.1.0" dependencies = [ "anyhow", - "axum", + "axum 0.7.9", "chrono", "clap", "detection_client", @@ -179,7 +179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http", @@ -188,7 +188,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -204,6 +204,31 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -224,6 +249,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.22.1" @@ -553,6 +596,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.9" @@ -911,6 +960,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -1110,6 +1172,15 @@ dependencies = [ "nom", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -1254,6 +1325,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "mavlink_layer" version = "0.1.0" @@ -1361,6 +1438,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nix" version = "0.26.4" @@ -1550,6 +1633,37 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455" +dependencies = [ + "fixedbitset", + "hashbrown 0.15.5", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -1599,6 +1713,143 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" +dependencies = [ + "heck", + "itertools", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +dependencies = [ + "prost", +] + +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + +[[package]] +name = "pulldown-cmark" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f068eba8e7071c5f9511831b44f32c740d5adf574e990f946ddb53db2f314e" +dependencies = [ + "bitflags 2.11.1", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "22.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "quinn" version = "0.11.9" @@ -2136,9 +2387,21 @@ name = "telemetry_stream" version = "0.1.0" dependencies = [ "async-trait", + "chrono", + "parking_lot", + "prost", + "protoc-bin-vendored", + "serde", + "serde_json", "shared", + "thiserror 1.0.69", "tokio", + "tokio-stream", + "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", + "uuid", ] [[package]] @@ -2318,6 +2581,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2372,6 +2647,74 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" +dependencies = [ + "async-trait", + "axum 0.8.9", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68f61875ac5293cf72e6c8cf0158086428c82c37229e98c840878f1706b0322" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "654e5643eff75d7f8c99197ce1440ed19a3474eada74c12bbac488b2cafdae27" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn", + "tempfile", + "tonic-build", +] + [[package]] name = "tower" version = "0.5.3" @@ -2380,11 +2723,15 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -2517,6 +2864,12 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 09b37b9..267b2c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,18 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus jsonschema = { version = "0.18", default-features = false } tokio-serial = "5" +# gRPC (operator-link transport — see telemetry_stream / detection_client) +tonic = "0.14" +tonic-prost = "0.14" +prost = "0.14" +prost-types = "0.14" +tonic-prost-build = "0.14" +protoc-bin-vendored = "3" +tokio-stream = { version = "0.1", features = ["sync", "net"] } + +# Lock-free / sync helpers +parking_lot = "0.12" + # Crypto / hashing sha2 = "0.10" diff --git a/_docs/02_tasks/todo/AZ-675_telemetry_stream_grpc_server.md b/_docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md similarity index 100% rename from _docs/02_tasks/todo/AZ-675_telemetry_stream_grpc_server.md rename to _docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md diff --git a/_docs/03_implementation/batch_14_cycle1_report.md b/_docs/03_implementation/batch_14_cycle1_report.md new file mode 100644 index 0000000..9365095 --- /dev/null +++ b/_docs/03_implementation/batch_14_cycle1_report.md @@ -0,0 +1,131 @@ +# Batch 14 / Cycle 1 — Implementation Report + +**Date**: 2026-05-20 +**Tasks**: AZ-675 +**Verdict**: PASS_WITH_WARNINGS +- Pre-existing autopilot lint from batch 4 (C5) still open. +- Pre-existing intermittent flake `mission_executor::state_machine::ac3_bounded_retry_then_success` (carried from batch 8) now fails reproducibly *under workspace load* on this dev box but still passes in isolation; root cause is a 5 ms polling-interval race in the test, not in `mission_executor` production code. Documented as A2 below — unchanged by this batch and unrelated to telemetry_stream. + +## 1. Scope + +| Ticket | Title | Crate | Complexity | +|---|---|---|---| +| AZ-675 | telemetry_stream Tonic gRPC server + per-client lossy queue | `telemetry_stream` | 3 | + +Batch 14 is a single-ticket batch by deliberate choice. Both AZ-675 and AZ-658 were the only unblocked tasks; AZ-658 has an open architectural decision (which H.264 binding) and was held back. Picking AZ-675 also unblocks AZ-676 / AZ-677 / AZ-678 / AZ-679 (the full telemetry → operator-bridge frontier) for subsequent batches. + +## 2. Approach + +### Tonic infrastructure decision + +`telemetry_stream/description.md §9` lists the operator-link protocol (WebRTC / WebSocket-H.264 / gRPC server-streaming) as an open architectural question. AZ-675's task spec, however, names **Tonic gRPC** explicitly and the Runtime Completeness gate says "Production code that must exist: real gRPC server". The user picked path **A: commit to Tonic now**, which: + +- Pins the operator-link transport to gRPC server-streaming (closes architecture Q2 in the affirmative for the gRPC option). +- Adds **first-time** `tonic` / `prost` / `tonic-build` infrastructure to the workspace. The `detection_client/Cargo.toml` comment on line 16 anticipated this; the next ticket to need it (AZ-660) can now reuse the same workspace pins. +- Uses `protoc-bin-vendored` as a build-dependency so neither dev machines nor CI need a system `protoc` install. The build is hermetic and reproducible across platforms. + +Workspace pins added: `tonic = "0.14"`, `tonic-prost = "0.14"`, `prost = "0.14"`, `prost-types = "0.14"`, `tonic-prost-build = "0.14"` (build-dep), `protoc-bin-vendored = "3"` (build-dep), `tokio-stream = "0.1"` with `sync,net` features (needed for `BroadcastStream` + `TcpListenerStream`), `parking_lot = "0.12"`. + +### Back-pressure model — broadcast-direct, no intermediate buffer + +The first draft of `internal/server.rs` used a per-client mpsc forwarder between the broadcast queue and the tonic stream. That hid the back-pressure: the forwarder blocked on `mpsc::send` long before the broadcast ring ever overflowed, so `RecvError::Lagged(n)` never fired and drop counters stayed at zero. **Lesson**: in a multi-stage queue where the *outer* stage is supposed to enforce drop-oldest, do not introduce a buffering middle stage that absorbs the back-pressure invisibly. + +The shipped design feeds the broadcast receivers **directly** into the tonic-streamed response (via `tokio_stream::wrappers::BroadcastStream` merged through `tokio_stream::StreamMap`). When a wire/client is slow, tonic stops polling our stream → broadcast ring overruns that client's cursor → next poll yields `Err(BroadcastStreamRecvError::Lagged(n))` → drop counter incremented per (client_id, topic). Other clients are unaffected. + +### What this batch ships in production + +- **`proto/telemetry.proto`** — `TelemetryStream` service with a single server-streaming `Subscribe(SubscribeRequest) -> stream TelemetryMessage` RPC, five topics (`TelemetrySample`, `GimbalState`, `DetectionEvent`, `MovementCandidate`, `MapObjectsBundle`). Payloads are carried as opaque JSON in `bytes payload_json` so the canonical Rust models in `crates/shared/models/` stay authoritative. +- **`build.rs`** — wires `protoc-bin-vendored` into `tonic-prost-build` so codegen runs from `cargo build` alone. +- **`internal/publisher.rs`** — `TelemetryPublisher` with one `tokio::sync::broadcast` channel per topic, per-(client, topic) drop counters under `parking_lot::Mutex`, atomic `subscribed_clients` / `published_total` / `bytes_out_per_topic`. +- **`internal/server.rs`** — `TelemetryService` implementing `proto::telemetry_stream_server::TelemetryStream::subscribe`; validates `client_id` non-empty; resolves topic list (empty = subscribe-all); merges per-topic `BroadcastStream`s with `StreamMap`; converts `Lagged` into drop-counter updates; `StreamGuard` decrements `subscribed_clients` on stream drop. +- **`src/lib.rs`** — rewritten public surface: + - `TelemetryStreamConfig { listen_addr, topic_capacity, downlink_capacity }`. + - `TelemetryStream::spawn_grpc_server` (binds an addr) and `spawn_grpc_server_on(listener)` (binds a pre-resolved `std::net::TcpListener` — used by tests to pick ephemeral ports). + - `GrpcShutdown` RAII handle. + - `TelemetryStreamHandle::publish(topic, &T)` non-blocking publish API. + - `TelemetryStreamHandle::snapshot()` for health-aggregator integration. + - `TelemetryStreamHandle::health()` flips yellow when any (client, topic) has ≥ 100 drops. + - `TelemetrySink::push_detections` is now real (publishes on `DetectionEvent` topic). `push_frame` still returns `NotImplemented(AZ-676)` because video carries different framing semantics that AZ-676 will pin. + +## 3. Files touched + +- `Cargo.toml` — workspace pins for tonic stack + tokio-stream features + parking_lot. +- `Cargo.lock` — regenerated for the new deps. +- `crates/telemetry_stream/Cargo.toml` — concrete deps + `build.rs` declaration. +- `crates/telemetry_stream/build.rs` — new (vendored protoc + tonic-prost-build). +- `crates/telemetry_stream/proto/telemetry.proto` — new. +- `crates/telemetry_stream/src/lib.rs` — rewrite (public surface). +- `crates/telemetry_stream/src/internal/mod.rs` — new. +- `crates/telemetry_stream/src/internal/proto.rs` — new (`tonic::include_proto!` hook). +- `crates/telemetry_stream/src/internal/publisher.rs` — new (with 4 unit tests). +- `crates/telemetry_stream/src/internal/server.rs` — new (gRPC service impl). +- `crates/telemetry_stream/tests/grpc_subscribe.rs` — new (5 integration tests covering AC-1..AC-3 + edge cases). +- `_docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md` — moved from `todo/`. +- `_docs/_autodev_state.md` — phase update. +- `_docs/03_implementation/batch_14_cycle1_report.md` — this report. + +## 4. Test results + +| Crate | Unit | Integration | Total | +|---|---|---|---| +| `telemetry_stream` | 6 | 5 | 11 | + +Clippy: `cargo clippy -p telemetry_stream --all-targets -- -D warnings` is clean. + +Workspace `cargo test --workspace`: all suites green **except** the pre-existing `mission_executor::state_machine::ac3_bounded_retry_then_success` flake — see A2. + +### Acceptance criteria + +| AC | Test | Status | +|---|---|---| +| AC-1 multiple subscribers receive same stream (ordering preserved) | `tests/grpc_subscribe.rs::ac1_multiple_subscribers_receive_same_stream` | ✅ | +| AC-2 slow subscriber drops oldest, healthy unaffected | `tests/grpc_subscribe.rs::ac2_slow_subscriber_drops_oldest_healthy_unaffected` + `internal/publisher.rs::slow_subscriber_lags_fast_subscriber_does_not` | ✅ | +| AC-3 disconnect cleanly removes subscriber | `tests/grpc_subscribe.rs::ac3_disconnect_decrements_subscribed_clients` | ✅ | +| Empty topics defaults to ALL | `tests/grpc_subscribe.rs::empty_topics_list_defaults_to_all` | ✅ | +| Empty client_id rejected at boundary | `tests/grpc_subscribe.rs::empty_client_id_is_rejected` | ✅ | + +## 5. Findings (this batch) + +### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name` + +**Severity**: High (still blocks workspace `-D warnings` clippy gate) +**Status**: OPEN — carried since batch 4. Not introduced by this batch. Tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` and cumulative finding C5. + +### A2. Pre-existing `ac3_bounded_retry_then_success` flake escalation + +**Severity**: Medium (Test design) +**Category**: Tests +**Origin**: Batch 8 mission_executor; behaviour unchanged by this batch. + +The test polls `handle.state()` every 5 ms while waiting for `MissionUploaded`, but with `tick_interval=5ms` and a one-rejected-then-accepted scripted driver, the FSM can pass *through* `MissionUploaded` faster than the poll cadence and the await reports `stuck at WaitAuto`. Confirmed pre-existing — `git stash` of batch-14 changes reproduces the same intermittent failure, and the test passes in isolation. The proximate cause is the test's polling design, not `mission_executor` production code. + +This batch's new transitive deps (tonic/prost stack) increase background compile / runtime load on dev boxes, which may make the race more likely to lose. The fix belongs to a small focused test refactor (latch on FSM transition events instead of polling), filed as a leftover. + +→ Filed `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`. + +### A3. Architecture Q2 (operator-link protocol) now decided + +**Severity**: Low (Architecture / doc sync) +**Detail**: `telemetry_stream/description.md §9` listed protocol as TBD. With AZ-675 shipping a Tonic-based gRPC server, this is effectively decided in favour of gRPC server-streaming. The architecture doc was not edited in this batch (out of scope; see C3 doc-sync sweep). When the doc sweep runs, this should move from "open question" to a recorded decision in `_docs/02_document/decision-rationale.md`. + +## 6. Cumulative findings — open carry-over + +Batch 14 is mid-triplet (13 / 14 / 15); cumulative review lands at the end of batch 15. + +| ID | Severity | Category | Status | +|---|---|---|---| +| C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) | +| C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) | +| C3 | Low | Architecture | OPEN — `module-layout.md` drift: grows by `telemetry_stream/internal/{publisher,server,proto}.rs` this batch | +| C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) | +| C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy | +| C6 (new) | Medium | Tests | OPEN — `ac3_bounded_retry_then_success` polling race (see A2) | +| C7 (new) | Low | Architecture | OPEN — record Tonic-gRPC decision in `decision-rationale.md` (see A3) | + +## 7. Next-batch candidates + +- **AZ-678** — operator_bridge command authentication. Depends on AZ-675 (now done). 5 pts. +- **AZ-679** — operator_bridge POI surface. Depends on AZ-675 (now done) + uses the AZ-683 POI queue + AZ-685 decline path. 3 pts. Cleanly buildable as a Subscribe-style or push consumer of `MapObjectsBundle` / `POI` topics through the AZ-675 server. +- **AZ-676** — telemetry_stream video path. Depends on AZ-675 (now done) + AZ-657 (already done). 3 pts. Self-contained extension to the AZ-675 server. +- **AZ-677** — telemetry_stream MapObjects snapshot. Depends on AZ-675 (now done) + AZ-667 (not done — blocked). +- **AZ-658** — frame_ingest decoder. Still needs the H.264 binding decision (retina vs ffmpeg-rs vs gstreamer). 5 pts. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3a95053..9f5bc32 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 23 - name: tracker-update-in-testing - detail: "batch 13 (AZ-683) implemented and reviewed; awaiting commit + In Testing" + phase: 27 + name: awaiting-push + detail: "batch 14 (AZ-675) committed (ebf4aef) + In Testing in Jira; awaiting user push approval" retry_count: 0 cycle: 1 tracker: jira diff --git a/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md b/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md new file mode 100644 index 0000000..02377ab --- /dev/null +++ b/_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md @@ -0,0 +1,38 @@ +# Leftover: `mission_executor::ac3_bounded_retry_then_success` polling race + +**Timestamp**: 2026-05-20T08:30:00+02:00 +**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13 as intermittent. Reproduces more reliably on dev box under batch 14 workspace test load (the new tonic stack increases build/runtime pressure). +**Severity**: Medium (test design, not production code) +**Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race. + +## Symptom + +``` +test ac3_bounded_retry_then_success ... FAILED +thread 'ac3_bounded_retry_then_success' panicked at +crates/mission_executor/tests/state_machine.rs:116: +FSM did not reach MissionUploaded; stuck at WaitAuto +``` + +`WaitAuto` is the FSM state *after* `MissionUploaded`. The FSM passed *through* `MissionUploaded` faster than the test's 5 ms polling cadence could observe it. The post-assertion (`matches!(state, WaitAuto | MissionUploaded)`) acknowledges either is fine, but `await_state(target=MissionUploaded)` panics before that assertion runs. + +## Root cause + +`crates/mission_executor/tests/state_machine.rs` lines 100-118 — `await_state` polls every 5 ms; FSM `tick_interval` is also 5 ms; a successful retry+upload can complete in less than one polling interval. + +## Recommended fix (out of scope for current batch) + +Replace polling with an event latch: + +- Have `MissionExecutorHandle::state_stream()` (or expose `tokio::sync::watch::Receiver`) so tests can `await` on the channel changing through the target state. +- Or: record a `Vec` history in `Inner` and assert the target is *in* the history at the end, not the current state. + +Either approach is ~30 lines of test-only refactor. Production code does not need to change. + +## Replay instructions + +When working on `mission_executor` next (e.g. batch that touches the state machine or tick loop): + +1. Pick one of the two fixes above. +2. Re-run `cargo test --workspace` to confirm flake is gone. +3. Delete this leftover. diff --git a/crates/telemetry_stream/Cargo.toml b/crates/telemetry_stream/Cargo.toml index 45ea1e5..6f50c37 100644 --- a/crates/telemetry_stream/Cargo.toml +++ b/crates/telemetry_stream/Cargo.toml @@ -6,9 +6,27 @@ rust-version.workspace = true license.workspace = true publish.workspace = true authors.workspace = true +build = "build.rs" [dependencies] shared = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tracing = { workspace = true } async-trait = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +prost = { workspace = true } +tonic = { workspace = true } +tonic-prost = { workspace = true } +parking_lot = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } + +[build-dependencies] +tonic-prost-build = { workspace = true } +protoc-bin-vendored = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/telemetry_stream/build.rs b/crates/telemetry_stream/build.rs new file mode 100644 index 0000000..1cfb408 --- /dev/null +++ b/crates/telemetry_stream/build.rs @@ -0,0 +1,19 @@ +//! Build-time codegen for the AZ-675 telemetry gRPC contract. +//! +//! We use `protoc-bin-vendored` so the build is self-contained — no +//! system protoc install is required on dev or CI. The PROTOC env +//! var is set before invoking `tonic-prost-build`, which is what +//! tonic uses to locate the compiler. + +fn main() -> Result<(), Box> { + let protoc = protoc_bin_vendored::protoc_bin_path()?; + std::env::set_var("PROTOC", protoc); + + tonic_prost_build::configure() + .build_client(true) + .build_server(true) + .compile_protos(&["proto/telemetry.proto"], &["proto"])?; + + println!("cargo:rerun-if-changed=proto/telemetry.proto"); + Ok(()) +} diff --git a/crates/telemetry_stream/proto/telemetry.proto b/crates/telemetry_stream/proto/telemetry.proto new file mode 100644 index 0000000..c50584d --- /dev/null +++ b/crates/telemetry_stream/proto/telemetry.proto @@ -0,0 +1,64 @@ +// AZ-675 telemetry_stream — operator-bound gRPC contract. +// +// One service, one bi-directional Subscribe RPC. Client opens a stream +// declaring which topics it wants; server pushes messages for those +// topics until the client disconnects. +// +// The server enforces per-client back-pressure: when a client cannot +// keep up the oldest message in *that client's* queue is dropped and +// a per-(client, topic) drop counter is incremented. Other clients +// are unaffected. +// +// AZ-676 will add the video path (separate RPC, server-streamed binary +// frames). AZ-677 will add the MapObjectsBundle snapshot RPC. Keep +// those concerns out of this contract. + +syntax = "proto3"; + +package autopilot.telemetry.v1; + +// Topics a client can subscribe to. Repeated in SubscribeRequest so a +// single stream multiplexes whichever subset the client cares about. +enum Topic { + TOPIC_UNSPECIFIED = 0; + TOPIC_TELEMETRY_SAMPLE = 1; + TOPIC_GIMBAL_STATE = 2; + TOPIC_DETECTION_EVENT = 3; + TOPIC_MOVEMENT_CANDIDATE = 4; + TOPIC_MAP_OBJECTS_BUNDLE = 5; +} + +message SubscribeRequest { + // Operator/client identifier. Plumbed into per-client drop counters + // and log lines. Free-form for AZ-675; AZ-678 (operator command + // auth) will tighten the format. + string client_id = 1; + repeated Topic topics = 2; +} + +// Each message carries the topic tag + an opaque JSON payload. We +// don't pin schemas per topic in proto here because the canonical +// payload shapes are already authoritative in `crates/shared/models` +// (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate, +// MapObjectsBundle). Using JSON keeps the wire honest with what the +// rest of the system already serializes; if a topic later needs +// schema enforcement, add a typed message variant in a future bump. +message TelemetryMessage { + Topic topic = 1; + // Monotonic nanoseconds since the autopilot process started. Used + // by the operator for ordering and latency measurement. + uint64 monotonic_ts_ns = 2; + // Server-side per-client sequence number. Strictly increases per + // (client, topic) stream; gaps imply drops. + uint64 sequence = 3; + // Serialized JSON payload (utf-8). Topic determines the shape. + bytes payload_json = 4; +} + +service TelemetryStream { + // Server-streaming subscribe. The client sends ONE SubscribeRequest; + // the server pushes TelemetryMessage values until the client cancels + // the stream or the server shuts down. The server applies per- + // client drop-oldest back-pressure if the client cannot keep up. + rpc Subscribe(SubscribeRequest) returns (stream TelemetryMessage); +} diff --git a/crates/telemetry_stream/src/internal/mod.rs b/crates/telemetry_stream/src/internal/mod.rs new file mode 100644 index 0000000..9d2c167 --- /dev/null +++ b/crates/telemetry_stream/src/internal/mod.rs @@ -0,0 +1,5 @@ +//! Internal modules for `telemetry_stream`. Not part of the public API. + +pub mod proto; +pub mod publisher; +pub mod server; diff --git a/crates/telemetry_stream/src/internal/proto.rs b/crates/telemetry_stream/src/internal/proto.rs new file mode 100644 index 0000000..06976db --- /dev/null +++ b/crates/telemetry_stream/src/internal/proto.rs @@ -0,0 +1,10 @@ +//! Generated tonic+prost code for the telemetry gRPC contract. +//! +//! The actual `.rs` file is produced at build time by `build.rs` +//! (see workspace `tonic-prost-build` / `protoc-bin-vendored` deps) +//! and dropped into `OUT_DIR`. We pull it in here under a stable +//! module path so the rest of the crate doesn't reach into `OUT_DIR`. + +#![allow(clippy::derive_partial_eq_without_eq)] + +tonic::include_proto!("autopilot.telemetry.v1"); diff --git a/crates/telemetry_stream/src/internal/publisher.rs b/crates/telemetry_stream/src/internal/publisher.rs new file mode 100644 index 0000000..8bdfcbb --- /dev/null +++ b/crates/telemetry_stream/src/internal/publisher.rs @@ -0,0 +1,314 @@ +//! AZ-675 — multi-topic, per-client lossy publisher. +//! +//! The publisher owns one `tokio::sync::broadcast` channel per topic. +//! Each subscribed client gets per-topic receivers; falling behind +//! more than the channel capacity causes `tokio::sync::broadcast` to +//! return `RecvError::Lagged(n)` which the server-side stream +//! handler turns into a `drops_total{client_id, topic} += n` +//! increment. Slow clients never block the publisher, and a slow +//! client on one topic does not affect any other client or topic. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::Arc; + +use parking_lot::Mutex; +use serde::Serialize; +use tokio::sync::broadcast; +use tracing::warn; + +use crate::internal::proto::{TelemetryMessage, Topic}; + +/// Per-topic broadcast capacity. A client falling more than this many +/// messages behind on a topic experiences drops on that topic. The +/// value is chosen empirically: 10 Hz telemetry → 256 messages ≈ 25 +/// seconds of buffering, which is more than enough for transient +/// modem stalls but bounded for memory. +pub const DEFAULT_TOPIC_CAPACITY: usize = 256; + +/// All known topics, in the same order as the proto enum (skipping +/// UNSPECIFIED). +pub const ALL_TOPICS: &[Topic] = &[ + Topic::TelemetrySample, + Topic::GimbalState, + Topic::DetectionEvent, + Topic::MovementCandidate, + Topic::MapObjectsBundle, +]; + +/// Errors returned by [`TelemetryPublisher::publish`]. Publish never +/// blocks on slow clients; the only producer-side failure is +/// serialization of the payload, which is a programmer error caught +/// at compile time everywhere we control. We surface it explicitly +/// rather than swallow. +#[derive(Debug, thiserror::Error)] +pub enum PublishError { + #[error("serialize topic={topic:?}: {source}")] + Serialize { + topic: Topic, + #[source] + source: serde_json::Error, + }, +} + +#[derive(Debug, Clone)] +pub struct PerTopicCounters { + pub published: u64, + pub bytes_out: u64, +} + +/// Snapshot of publisher health for the [`crate::TelemetryStreamHandle::health`] +/// surface and the AZ-675 NFR observability hooks. +#[derive(Debug, Clone, Default)] +pub struct PublisherSnapshot { + pub subscribed_clients: usize, + pub published_total: u64, + pub per_topic: HashMap, + pub drops_total: HashMap<(String, Topic), u64>, +} + +struct TopicChannel { + tx: broadcast::Sender, + seq: AtomicU64, + published: AtomicU64, + bytes_out: AtomicU64, +} + +impl TopicChannel { + fn new(capacity: usize) -> Self { + let (tx, _) = broadcast::channel(capacity); + Self { + tx, + seq: AtomicU64::new(0), + published: AtomicU64::new(0), + bytes_out: AtomicU64::new(0), + } + } +} + +/// Drop counter map. Keyed by `(client_id, topic)`. We isolate it +/// behind a `parking_lot::Mutex` because writes happen from +/// per-client tonic stream tasks; reads happen from the health +/// surface — both are infrequent compared to the broadcast hot path. +type DropMap = Mutex>; + +pub struct TelemetryPublisher { + topics: HashMap, + drops: DropMap, + subscribed_clients: AtomicUsize, +} + +impl TelemetryPublisher { + /// Build a publisher with the same per-topic capacity for every + /// topic. Use [`new_with_capacities`] if a single topic needs a + /// different buffer (e.g. MapObjectsBundle which is bursty). + pub fn new(capacity: usize) -> Arc { + let mut topics = HashMap::with_capacity(ALL_TOPICS.len()); + for &t in ALL_TOPICS { + topics.insert(t, TopicChannel::new(capacity)); + } + Arc::new(Self { + topics, + drops: Mutex::new(HashMap::new()), + subscribed_clients: AtomicUsize::new(0), + }) + } + + pub fn default_capacity() -> Arc { + Self::new(DEFAULT_TOPIC_CAPACITY) + } + + /// Serialise `payload` and fan out to every subscriber on `topic`. + /// + /// Never blocks. `broadcast::Sender::send` returns the number of + /// receivers it queued for; a return of 0 means no current + /// subscribers, which is fine — we still bump the published + /// counter so the operator can confirm the producer is alive. + pub fn publish(&self, topic: Topic, payload: &T) -> Result<(), PublishError> { + let channel = match self.topics.get(&topic) { + Some(c) => c, + None => { + warn!(?topic, "unknown topic; dropping publish"); + return Ok(()); + } + }; + + let payload_json = serde_json::to_vec(payload) + .map_err(|e| PublishError::Serialize { topic, source: e })?; + let bytes_len = payload_json.len() as u64; + + let seq = channel.seq.fetch_add(1, Ordering::Relaxed) + 1; + let msg = TelemetryMessage { + topic: topic as i32, + monotonic_ts_ns: shared::clock::MonoClock::new().elapsed_ns(), + sequence: seq, + payload_json, + }; + + let _ = channel.tx.send(msg); + channel.published.fetch_add(1, Ordering::Relaxed); + channel.bytes_out.fetch_add(bytes_len, Ordering::Relaxed); + Ok(()) + } + + /// Open a per-client receiver for `topic`. The caller (the + /// `Subscribe` RPC handler) is responsible for handling + /// `RecvError::Lagged` by calling [`record_drops`]. + pub(crate) fn subscribe_topic( + &self, + topic: Topic, + ) -> Option> { + self.topics.get(&topic).map(|c| c.tx.subscribe()) + } + + pub(crate) fn register_client(&self) { + self.subscribed_clients.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn deregister_client(&self) { + self.subscribed_clients.fetch_sub(1, Ordering::Relaxed); + } + + pub fn record_drops(&self, client_id: &str, topic: Topic, n: u64) { + if n == 0 { + return; + } + let mut map = self.drops.lock(); + let key = (client_id.to_string(), topic); + map.entry(key) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(n, Ordering::Relaxed); + } + + pub fn snapshot(&self) -> PublisherSnapshot { + let mut per_topic = HashMap::with_capacity(self.topics.len()); + let mut total_published = 0u64; + for (&t, c) in &self.topics { + let published = c.published.load(Ordering::Relaxed); + let bytes_out = c.bytes_out.load(Ordering::Relaxed); + total_published = total_published.saturating_add(published); + per_topic.insert( + t, + PerTopicCounters { + published, + bytes_out, + }, + ); + } + let drops_map = self.drops.lock(); + let drops_total: HashMap<(String, Topic), u64> = drops_map + .iter() + .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed))) + .collect(); + PublisherSnapshot { + subscribed_clients: self.subscribed_clients.load(Ordering::Relaxed), + published_total: total_published, + per_topic, + drops_total, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + + #[derive(Serialize, Deserialize, Debug, PartialEq)] + struct Sample { + v: u32, + } + + #[test] + fn publish_with_no_subscribers_is_no_op() { + // Arrange + let pub_ = TelemetryPublisher::new(8); + + // Act + pub_.publish(Topic::TelemetrySample, &Sample { v: 1 }) + .unwrap(); + + // Assert + let snap = pub_.snapshot(); + assert_eq!(snap.subscribed_clients, 0); + assert_eq!(snap.per_topic[&Topic::TelemetrySample].published, 1); + } + + #[tokio::test] + async fn slow_subscriber_lags_fast_subscriber_does_not() { + // Arrange — capacity 4 so 100 publishes overflow trivially. + let pub_ = TelemetryPublisher::new(4); + let mut fast = pub_.subscribe_topic(Topic::TelemetrySample).unwrap(); + let mut slow = pub_.subscribe_topic(Topic::TelemetrySample).unwrap(); + + // Act — burst 100 messages WITHOUT giving `slow` a chance to drain. + for v in 0..100u32 { + pub_.publish(Topic::TelemetrySample, &Sample { v }).unwrap(); + // fast drains immediately; slow does nothing. + let _ = fast.recv().await.unwrap(); + } + + // Drain `slow` and count lag. + let mut total_drops: u64 = 0; + let mut delivered: u64 = 0; + loop { + match slow.try_recv() { + Ok(_) => delivered += 1, + Err(broadcast::error::TryRecvError::Lagged(n)) => total_drops += n, + Err(_) => break, + } + } + + // Assert — fast got all 100; slow lost most but delivered ≤ capacity. + assert!(delivered <= 4, "slow can hold at most capacity (4)"); + assert_eq!( + total_drops + delivered, + 100, + "every published msg accounted for" + ); + assert!(total_drops > 0, "slow subscriber MUST have lagged"); + } + + #[test] + fn record_drops_accumulates_per_client_topic() { + // Arrange + let pub_ = TelemetryPublisher::new(8); + + // Act + pub_.record_drops("client_a", Topic::TelemetrySample, 5); + pub_.record_drops("client_a", Topic::TelemetrySample, 3); + pub_.record_drops("client_a", Topic::GimbalState, 2); + pub_.record_drops("client_b", Topic::TelemetrySample, 1); + + // Assert + let snap = pub_.snapshot(); + assert_eq!( + snap.drops_total[&("client_a".to_string(), Topic::TelemetrySample)], + 8 + ); + assert_eq!( + snap.drops_total[&("client_a".to_string(), Topic::GimbalState)], + 2 + ); + assert_eq!( + snap.drops_total[&("client_b".to_string(), Topic::TelemetrySample)], + 1 + ); + } + + #[test] + fn register_deregister_balance_tracks_subscribed_clients() { + // Arrange + let pub_ = TelemetryPublisher::new(8); + assert_eq!(pub_.snapshot().subscribed_clients, 0); + + // Act + pub_.register_client(); + pub_.register_client(); + assert_eq!(pub_.snapshot().subscribed_clients, 2); + pub_.deregister_client(); + + // Assert + assert_eq!(pub_.snapshot().subscribed_clients, 1); + } +} diff --git a/crates/telemetry_stream/src/internal/server.rs b/crates/telemetry_stream/src/internal/server.rs new file mode 100644 index 0000000..49488c5 --- /dev/null +++ b/crates/telemetry_stream/src/internal/server.rs @@ -0,0 +1,127 @@ +//! AZ-675 — gRPC `TelemetryStream::Subscribe` service implementation. +//! +//! The client sends a single `SubscribeRequest`; the server returns a +//! server-streaming response built directly from per-topic +//! `BroadcastStream`s merged with `StreamMap`. The tonic transport +//! is what polls our stream — when the wire (or the operator client) +//! cannot keep up, the broadcast ring overflows that client's cursor +//! and `BroadcastStream` yields `Err(BroadcastStreamRecvError::Lagged(n))` +//! on the next poll. That is the *only* place drop accounting +//! happens: there is no intermediate mpsc buffer that could absorb +//! back-pressure and hide lag. +//! +//! `StreamGuard` decrements `subscribed_clients` on stream drop. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::{Stream, StreamExt, StreamMap}; +use tonic::{Request, Response, Status}; +use tracing::{info, warn}; + +use crate::internal::proto::telemetry_stream_server::TelemetryStream; +use crate::internal::proto::{SubscribeRequest, TelemetryMessage, Topic}; +use crate::internal::publisher::{TelemetryPublisher, ALL_TOPICS}; + +pub struct TelemetryService { + publisher: Arc, +} + +impl TelemetryService { + pub fn new(publisher: Arc) -> Self { + Self { publisher } + } +} + +type SubscribeStream = Pin> + Send>>; + +#[tonic::async_trait] +impl TelemetryStream for TelemetryService { + type SubscribeStream = SubscribeStream; + + async fn subscribe( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.client_id.trim().is_empty() { + return Err(Status::invalid_argument("client_id is required")); + } + let client_id = req.client_id.clone(); + + let requested: Vec = if req.topics.is_empty() { + ALL_TOPICS.to_vec() + } else { + let mut out = Vec::with_capacity(req.topics.len()); + for raw in &req.topics { + let t = Topic::try_from(*raw) + .map_err(|_| Status::invalid_argument(format!("unknown topic {raw}")))?; + if matches!(t, Topic::Unspecified) { + return Err(Status::invalid_argument("TOPIC_UNSPECIFIED not allowed")); + } + out.push(t); + } + out + }; + + let mut map: StreamMap> = StreamMap::new(); + for &t in &requested { + match self.publisher.subscribe_topic(t) { + Some(rx) => { + map.insert(t, BroadcastStream::new(rx)); + } + None => { + return Err(Status::failed_precondition(format!( + "topic {t:?} not registered" + ))) + } + } + } + + self.publisher.register_client(); + info!(client_id = %client_id, topics = ?requested, "telemetry subscribe"); + + let publisher = Arc::clone(&self.publisher); + let cid = client_id.clone(); + let stream = map.filter_map(move |(topic, item)| match item { + Ok(msg) => Some(Ok(msg)), + Err(BroadcastStreamRecvError::Lagged(n)) => { + warn!(client_id = %cid, ?topic, dropped = n, "slow client lagged"); + publisher.record_drops(&cid, topic, n); + None + } + }); + + let stream = StreamGuard { + inner: stream, + publisher: Arc::clone(&self.publisher), + }; + + Ok(Response::new(Box::pin(stream) as Self::SubscribeStream)) + } +} + +/// Decrement `subscribed_clients` when the per-client outbound +/// stream is dropped (tonic drops the stream when the client side +/// goes away). +struct StreamGuard { + inner: S, + publisher: Arc, +} + +impl Stream for StreamGuard { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + +impl Drop for StreamGuard { + fn drop(&mut self) { + self.publisher.deregister_client(); + } +} diff --git a/crates/telemetry_stream/src/lib.rs b/crates/telemetry_stream/src/lib.rs index c14dcdd..d4809f6 100644 --- a/crates/telemetry_stream/src/lib.rs +++ b/crates/telemetry_stream/src/lib.rs @@ -1,57 +1,224 @@ //! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink. //! -//! Real implementation lands in: -//! - AZ-675 `telemetry_stream_grpc_server` -//! - AZ-676 `telemetry_stream_video_path` -//! - AZ-677 `telemetry_stream_mapobjects_snapshot` +//! Real implementations: +//! - **AZ-675 (this crate, this batch)**: Tonic gRPC server, per-client +//! bounded queue, drop-oldest back-pressure, drop counters. Topics: +//! `TelemetrySample`, `GimbalState`, `DetectionEvent`, +//! `MovementCandidate`, `MapObjectsBundle`. +//! - **AZ-676**: video frame topic (separate RPC, server-streamed +//! binary payloads). +//! - **AZ-677**: diff-based snapshot emission for `MapObjectsBundle`. +//! - **AZ-678+**: command-auth on the return path (operator_bridge). + +pub mod internal; + +use std::net::SocketAddr; +use std::sync::Arc; use async_trait::async_trait; use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tonic::transport::Server; use shared::contracts::TelemetrySink; use shared::error::{AutopilotError, Result}; -use shared::health::ComponentHealth; +use shared::health::{ComponentHealth, HealthLevel}; use shared::models::detection::DetectionBatch; use shared::models::frame::Frame; use shared::models::operator::OperatorCommand; +use crate::internal::proto::telemetry_stream_server::TelemetryStreamServer; +use crate::internal::proto::Topic; +use crate::internal::publisher::{TelemetryPublisher, DEFAULT_TOPIC_CAPACITY}; +use crate::internal::server::TelemetryService; + +pub use crate::internal::proto::{ + telemetry_stream_client::TelemetryStreamClient, SubscribeRequest, TelemetryMessage, + Topic as TelemetryTopic, +}; +pub use crate::internal::publisher::{ + PerTopicCounters, PublishError, PublisherSnapshot, ALL_TOPICS, +}; + const NAME: &str = "telemetry_stream"; +/// Per-(client, topic) drop rate at or above which health flips to +/// yellow. Picked to surface persistent slow consumers without +/// flapping on a single transient lag spike. +const DROP_YELLOW_THRESHOLD: u64 = 100; + +#[derive(Debug, Clone)] +pub struct TelemetryStreamConfig { + /// Where the Tonic gRPC server binds. `0.0.0.0:50061` by default. + pub listen_addr: SocketAddr, + /// Per-topic broadcast capacity (per subscriber buffer). + pub topic_capacity: usize, + /// Bounded capacity of the downlink command channel that feeds + /// `operator_bridge`. + pub downlink_capacity: usize, +} + +impl Default for TelemetryStreamConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:50061".parse().expect("hardcoded addr parses"), + topic_capacity: DEFAULT_TOPIC_CAPACITY, + downlink_capacity: 64, + } + } +} + pub struct TelemetryStream { + publisher: Arc, commands_tx: mpsc::Sender, commands_rx: Option>, + config: TelemetryStreamConfig, } impl TelemetryStream { pub fn new(downlink_capacity: usize) -> Self { - let (commands_tx, commands_rx) = mpsc::channel(downlink_capacity); + Self::with_config(TelemetryStreamConfig { + downlink_capacity, + ..TelemetryStreamConfig::default() + }) + } + + pub fn with_config(config: TelemetryStreamConfig) -> Self { + let publisher = TelemetryPublisher::new(config.topic_capacity); + let (commands_tx, commands_rx) = mpsc::channel(config.downlink_capacity); Self { + publisher, commands_tx, commands_rx: Some(commands_rx), + config, } } pub fn handle(&self) -> TelemetryStreamHandle { TelemetryStreamHandle { + publisher: Arc::clone(&self.publisher), commands_tx: self.commands_tx.clone(), } } - /// Take the downlink command receiver. The composition root forwards it to - /// `operator_bridge` as `Receiver`. + /// Take the downlink command receiver. The composition root + /// forwards it to `operator_bridge` as `Receiver`. pub fn take_command_receiver(&mut self) -> Option> { self.commands_rx.take() } + + /// Spawn the Tonic server. Returns a JoinHandle that runs until + /// `shutdown` is signalled (closing the returned `shutdown_tx`). + /// The server is bound on `config.listen_addr`. + pub fn spawn_grpc_server( + &self, + ) -> Result<( + JoinHandle>, + GrpcShutdown, + )> { + let listen_addr = self.config.listen_addr; + let publisher = Arc::clone(&self.publisher); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + let svc = TelemetryStreamServer::new(TelemetryService::new(publisher)); + let join = tokio::spawn(async move { + Server::builder() + .add_service(svc) + .serve_with_shutdown(listen_addr, async move { + let _ = shutdown_rx.await; + }) + .await + }); + + Ok(( + join, + GrpcShutdown { + tx: Some(shutdown_tx), + }, + )) + } + + /// Spawn the Tonic server bound on a specific `TcpListener`. + /// Useful for tests that need to know the actual port ahead of + /// time (bind to `127.0.0.1:0` then read the assigned port). + pub fn spawn_grpc_server_on( + &self, + listener: std::net::TcpListener, + ) -> Result<( + JoinHandle>, + GrpcShutdown, + )> { + listener + .set_nonblocking(true) + .map_err(|e| AutopilotError::Internal(format!("set_nonblocking: {e}")))?; + let tokio_listener = tokio::net::TcpListener::from_std(listener) + .map_err(|e| AutopilotError::Internal(format!("TcpListener::from_std: {e}")))?; + let stream = tokio_stream::wrappers::TcpListenerStream::new(tokio_listener); + + let publisher = Arc::clone(&self.publisher); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let svc = TelemetryStreamServer::new(TelemetryService::new(publisher)); + + let join = tokio::spawn(async move { + Server::builder() + .add_service(svc) + .serve_with_incoming_shutdown(stream, async move { + let _ = shutdown_rx.await; + }) + .await + }); + + Ok(( + join, + GrpcShutdown { + tx: Some(shutdown_tx), + }, + )) + } +} + +/// RAII shutdown trigger for the spawned gRPC server. Drop the value +/// or call `shutdown()` to stop the server. +pub struct GrpcShutdown { + tx: Option>, +} + +impl GrpcShutdown { + pub fn shutdown(mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(()); + } + } +} + +impl Drop for GrpcShutdown { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(()); + } + } } #[derive(Clone)] pub struct TelemetryStreamHandle { + publisher: Arc, commands_tx: mpsc::Sender, } impl TelemetryStreamHandle { - /// Inject an operator command. Production path is fed by the downlink - /// receiver in `internal::downlink/*`; tests can call this directly. + /// Publish a payload on `topic`. Never blocks the caller; slow + /// subscribers experience drops accounted in [`snapshot`]. + pub fn publish( + &self, + topic: TelemetryTopic, + payload: &T, + ) -> std::result::Result<(), PublishError> { + self.publisher.publish(topic, payload) + } + + /// Inject an operator command downlink. Production path is fed + /// by the gRPC return half once AZ-678 lands; tests may call this + /// directly. pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> { self.commands_tx .send(command) @@ -59,8 +226,32 @@ impl TelemetryStreamHandle { .map_err(|_| AutopilotError::Internal("downlink channel closed".into())) } + pub fn snapshot(&self) -> PublisherSnapshot { + self.publisher.snapshot() + } + pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + let snap = self.publisher.snapshot(); + let mut h = ComponentHealth::green(NAME); + + let hot_drops: Vec<_> = snap + .drops_total + .iter() + .filter(|(_, &v)| v >= DROP_YELLOW_THRESHOLD) + .collect(); + + let detail = format!( + "subscribers={} published_total={} hot_drop_pairs={}", + snap.subscribed_clients, + snap.published_total, + hot_drops.len() + ); + + if !hot_drops.is_empty() { + h.level = HealthLevel::Yellow; + } + h.detail = Some(detail); + h } } @@ -68,14 +259,14 @@ impl TelemetryStreamHandle { impl TelemetrySink for TelemetryStreamHandle { async fn push_frame(&self, _frame: Frame) -> Result<()> { Err(AutopilotError::NotImplemented( - "telemetry_stream::push_frame (AZ-676)", + "telemetry_stream::push_frame (AZ-676 video path)", )) } - async fn push_detections(&self, _batch: DetectionBatch) -> Result<()> { - Err(AutopilotError::NotImplemented( - "telemetry_stream::push_detections (AZ-675)", - )) + async fn push_detections(&self, batch: DetectionBatch) -> Result<()> { + self.publisher + .publish(Topic::DetectionEvent, &batch) + .map_err(|e| AutopilotError::Internal(format!("publish detections: {e}"))) } } @@ -84,8 +275,35 @@ mod tests { use super::*; #[test] - fn it_compiles() { - let h = TelemetryStream::new(8).handle(); - assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + fn handle_starts_with_zero_subscribers_and_green_health() { + // Arrange + let s = TelemetryStream::new(8); + let h = s.handle(); + + // Act + let snap = h.snapshot(); + let health = h.health(); + + // Assert + assert_eq!(snap.subscribed_clients, 0); + assert_eq!(snap.published_total, 0); + assert_eq!(health.level, HealthLevel::Green); + } + + #[test] + fn publish_without_subscribers_is_no_op_but_counts() { + // Arrange + let s = TelemetryStream::new(8); + let h = s.handle(); + + // Act + h.publish( + TelemetryTopic::TelemetrySample, + &serde_json::json!({"v": 1}), + ) + .unwrap(); + + // Assert + assert_eq!(h.snapshot().per_topic[&Topic::TelemetrySample].published, 1); } } diff --git a/crates/telemetry_stream/tests/grpc_subscribe.rs b/crates/telemetry_stream/tests/grpc_subscribe.rs new file mode 100644 index 0000000..60df0e2 --- /dev/null +++ b/crates/telemetry_stream/tests/grpc_subscribe.rs @@ -0,0 +1,366 @@ +//! AZ-675 integration tests — Tonic server + per-client lossy queue +//! exercised through an in-process gRPC client (the only stub allowed +//! per the task's Runtime Completeness gate). + +use std::net::TcpListener; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::time::timeout; +use tokio_stream::StreamExt; +use tonic::transport::{Channel, Endpoint}; +use tonic::Request; + +use telemetry_stream::{SubscribeRequest, TelemetryStream, TelemetryStreamClient, TelemetryTopic}; + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +struct Sample { + n: u32, +} + +/// Bind a TCP listener on `127.0.0.1:0`, return the listener and the +/// resolved port. The test stack uses ephemeral ports so multiple +/// integration tests in the same suite never collide. +fn bind_ephemeral() -> (TcpListener, u16) { + let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral"); + let port = l.local_addr().unwrap().port(); + (l, port) +} + +async fn connect(port: u16) -> TelemetryStreamClient { + let url = format!("http://127.0.0.1:{port}"); + let endpoint = Endpoint::from_shared(url) + .unwrap() + .connect_timeout(Duration::from_secs(2)); + + // Tiny retry loop — the server can take a few ms to bind. + for _ in 0..50 { + if let Ok(c) = TelemetryStreamClient::connect(endpoint.clone()).await { + return c; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("gRPC client failed to connect within ~1s"); +} + +/// AC-1 — multiple subscribers receive the same stream, ordering preserved. +#[tokio::test] +async fn ac1_multiple_subscribers_receive_same_stream() { + // Arrange + let (listener, port) = bind_ephemeral(); + let server = TelemetryStream::new(8); + let handle = server.handle(); + let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); + + let mut c1 = connect(port).await; + let mut c2 = connect(port).await; + let mut c3 = connect(port).await; + + let req = |id: &str| { + Request::new(SubscribeRequest { + client_id: id.to_string(), + topics: vec![TelemetryTopic::TelemetrySample as i32], + }) + }; + + let mut s1 = c1.subscribe(req("a")).await.unwrap().into_inner(); + let mut s2 = c2.subscribe(req("b")).await.unwrap().into_inner(); + let mut s3 = c3.subscribe(req("c")).await.unwrap().into_inner(); + + // Give the server time to register all three before publishing. + for _ in 0..50 { + if handle.snapshot().subscribed_clients == 3 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert_eq!(handle.snapshot().subscribed_clients, 3); + + // Act + for n in 0..100u32 { + handle + .publish(TelemetryTopic::TelemetrySample, &Sample { n }) + .unwrap(); + } + + async fn collect100( + s: &mut tonic::Streaming, + ) -> Vec { + let mut out = Vec::new(); + while out.len() < 100 { + let msg = timeout(Duration::from_secs(2), s.next()) + .await + .expect("ac1 client did not receive 100 in 2s") + .expect("stream ended") + .expect("server status"); + let sample: Sample = serde_json::from_slice(&msg.payload_json).unwrap(); + out.push(sample); + } + out + } + + let r1 = collect100(&mut s1).await; + let r2 = collect100(&mut s2).await; + let r3 = collect100(&mut s3).await; + + // Assert — all three receive all 100 in order. + let expected: Vec = (0..100).map(|n| Sample { n }).collect(); + assert_eq!(r1, expected); + assert_eq!(r2, expected); + assert_eq!(r3, expected); +} + +/// AC-2 — slow subscriber drops oldest, healthy unaffected. +/// +/// We can't simulate "client never polls" through real tonic+H2 +/// because the OS TCP + HTTP/2 receive buffers (often 256 KB+) will +/// absorb a small JSON burst entirely with no observable drop. The +/// realistic AC-2 condition is "slow drain rate" — the slow client +/// polls but at a fraction of the publish rate. With large payloads +/// and a small per-topic broadcast capacity, the slow client's +/// broadcast cursor falls behind the producer and `BroadcastStream` +/// emits `Lagged(n)` — the only signal AZ-675 wires into the per- +/// `(client_id, topic)` drop counters. +#[tokio::test] +async fn ac2_slow_subscriber_drops_oldest_healthy_unaffected() { + // Arrange — capacity 4, big payloads so wire back-pressure + // propagates fast and broadcast lag is observable end-to-end. + let (listener, port) = bind_ephemeral(); + let cfg = telemetry_stream::TelemetryStreamConfig { + topic_capacity: 4, + ..telemetry_stream::TelemetryStreamConfig::default() + }; + let server = TelemetryStream::with_config(cfg); + let handle = server.handle(); + let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); + + #[derive(Serialize)] + struct Big { + n: u32, + // 64 KB filler per message — fills H2 receive window quickly. + pad: Vec, + } + + let mut c_fast = connect(port).await; + let mut c_slow = connect(port).await; + let req = |id: &str| { + Request::new(SubscribeRequest { + client_id: id.to_string(), + topics: vec![TelemetryTopic::TelemetrySample as i32], + }) + }; + let mut s_fast = c_fast.subscribe(req("fast")).await.unwrap().into_inner(); + let mut s_slow = c_slow.subscribe(req("slow")).await.unwrap().into_inner(); + + for _ in 0..50 { + if handle.snapshot().subscribed_clients == 2 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert_eq!(handle.snapshot().subscribed_clients, 2); + + let total: u32 = 300; + + // Fast drains immediately, slow drains 1 msg every 50ms (way below publish rate). + let fast_task = tokio::spawn(async move { + let mut seen = 0u32; + while seen < total { + let _msg = timeout(Duration::from_secs(15), s_fast.next()) + .await + .expect("fast did not receive everything in 15s") + .expect("stream ended") + .expect("server status"); + seen += 1; + } + seen + }); + let slow_task = tokio::spawn(async move { + let mut received = 0u32; + let deadline = std::time::Instant::now() + Duration::from_secs(30); + while std::time::Instant::now() < deadline { + match timeout(Duration::from_millis(500), s_slow.next()).await { + Ok(Some(Ok(_msg))) => received += 1, + Ok(Some(Err(_))) => break, + Ok(None) => break, + Err(_) => break, + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + received + }); + + // Act — burst publishes at ~1ms cadence with 64 KB payloads so + // broadcast capacity is overrun for the slow consumer. + let payload_pad = vec![0u8; 64 * 1024]; + for n in 0..total { + handle + .publish( + TelemetryTopic::TelemetrySample, + &Big { + n, + pad: payload_pad.clone(), + }, + ) + .unwrap(); + tokio::time::sleep(Duration::from_millis(2)).await; + } + + let fast_count = fast_task.await.unwrap(); + let slow_count = slow_task.await.unwrap(); + + let snap = handle.snapshot(); + let slow_drops = snap + .drops_total + .get(&("slow".to_string(), TelemetryTopic::TelemetrySample)) + .copied() + .unwrap_or(0); + let fast_drops = snap + .drops_total + .get(&("fast".to_string(), TelemetryTopic::TelemetrySample)) + .copied() + .unwrap_or(0); + + // Assert + assert_eq!(fast_count, total, "fast client receives every message"); + assert_eq!(fast_drops, 0, "fast client MUST NOT have drops"); + assert!( + slow_drops > 0, + "slow client MUST have lagged at some point (got 0 drops; slow_count={slow_count})" + ); + assert!( + slow_count < total, + "slow client should have lost messages (got {slow_count}/{total})" + ); + assert!( + (slow_count as u64) + slow_drops <= (total as u64) + 4, + "accounted={} should not exceed total+capacity", + (slow_count as u64) + slow_drops + ); +} + +/// AC-3 — disconnect cleanly removes subscriber. +#[tokio::test] +async fn ac3_disconnect_decrements_subscribed_clients() { + // Arrange + let (listener, port) = bind_ephemeral(); + let server = TelemetryStream::new(8); + let handle = server.handle(); + let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); + + let mut c1 = connect(port).await; + let req = || { + Request::new(SubscribeRequest { + client_id: "ephemeral".to_string(), + topics: vec![], + }) + }; + let s1 = c1.subscribe(req()).await.unwrap().into_inner(); + + for _ in 0..50 { + if handle.snapshot().subscribed_clients == 1 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert_eq!(handle.snapshot().subscribed_clients, 1); + + // Act — drop the stream (simulates client cancel). + drop(s1); + drop(c1); + + // Assert — register/deregister is eventually consistent; wait briefly. + let mut final_count = 99; + for _ in 0..100 { + final_count = handle.snapshot().subscribed_clients; + if final_count == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert_eq!( + final_count, 0, + "disconnect must decrement subscribed_clients" + ); +} + +/// Subscribing to an empty `topics` list defaults to ALL topics. +#[tokio::test] +async fn empty_topics_list_defaults_to_all() { + // Arrange + let (listener, port) = bind_ephemeral(); + let server = TelemetryStream::new(8); + let handle = server.handle(); + let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); + + let mut client = connect(port).await; + let mut stream = client + .subscribe(Request::new(SubscribeRequest { + client_id: "all".to_string(), + topics: vec![], + })) + .await + .unwrap() + .into_inner(); + + for _ in 0..50 { + if handle.snapshot().subscribed_clients == 1 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Act — publish one message on each topic. + handle + .publish(TelemetryTopic::TelemetrySample, &Sample { n: 1 }) + .unwrap(); + handle + .publish(TelemetryTopic::GimbalState, &Sample { n: 2 }) + .unwrap(); + handle + .publish(TelemetryTopic::DetectionEvent, &Sample { n: 3 }) + .unwrap(); + handle + .publish(TelemetryTopic::MovementCandidate, &Sample { n: 4 }) + .unwrap(); + handle + .publish(TelemetryTopic::MapObjectsBundle, &Sample { n: 5 }) + .unwrap(); + + let mut seen_topics = std::collections::HashSet::new(); + while seen_topics.len() < 5 { + let msg = timeout(Duration::from_secs(2), stream.next()) + .await + .expect("did not receive all 5 topics in 2s") + .unwrap() + .unwrap(); + seen_topics.insert(msg.topic); + } + + // Assert + assert_eq!(seen_topics.len(), 5); +} + +/// Empty client_id is rejected at the server boundary. +#[tokio::test] +async fn empty_client_id_is_rejected() { + // Arrange + let (listener, port) = bind_ephemeral(); + let server = TelemetryStream::new(8); + let _h = server.handle(); + let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap(); + + let mut client = connect(port).await; + + // Act + let err = client + .subscribe(Request::new(SubscribeRequest { + client_id: String::new(), + topics: vec![], + })) + .await + .expect_err("empty client_id must return Status error"); + + // Assert + assert_eq!(err.code(), tonic::Code::InvalidArgument); +}