mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 20:11:12 +00:00
[AZ-675] telemetry_stream Tonic gRPC server + per-client lossy queue
ci/woodpecker/push/build-arm Pipeline failed
ci/woodpecker/push/build-arm Pipeline failed
Pins operator-link transport to gRPC server-streaming (closes architecture Q2 in favour of gRPC). Adds first-time tonic / prost / tonic-build infrastructure to the workspace; uses protoc-bin-vendored so neither dev machines nor CI need system protoc installed. Design — back-pressure lives in the per-topic tokio::sync::broadcast ring, drained directly by the tonic-streamed response via BroadcastStream + StreamMap. No intermediate mpsc buffer that could absorb back-pressure invisibly. Slow client overrun -> Lagged(n) event -> per-(client_id, topic) drop counter incremented; healthy clients on the same topic are unaffected. Service surface — Subscribe(SubscribeRequest) -> stream TelemetryMessage; five topics (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate, MapObjectsBundle); empty topics list defaults to subscribe-all; empty client_id rejected; stream drop decrements subscribed_clients via StreamGuard. TelemetrySink push_detections is now real; push_frame still NotImplemented(AZ-676 video path). Tests — 6 unit + 5 integration (AC-1..AC-3 via in-process gRPC client, plus subscribe-all default + empty-client_id rejection). Clippy on telemetry_stream clean. Pre-existing mission_executor ac3 test polling race surfaces more reliably under the new tonic build pressure; documented as _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md and unchanged by this batch. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Generated
+356
-3
@@ -147,7 +147,7 @@ name = "autopilot"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"axum",
|
"axum 0.7.9",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"detection_client",
|
"detection_client",
|
||||||
@@ -179,7 +179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum-core",
|
"axum-core 0.4.5",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"http",
|
"http",
|
||||||
@@ -188,7 +188,7 @@ dependencies = [
|
|||||||
"hyper",
|
"hyper",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"itoa",
|
"itoa",
|
||||||
"matchit",
|
"matchit 0.7.3",
|
||||||
"memchr",
|
"memchr",
|
||||||
"mime",
|
"mime",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
@@ -204,6 +204,31 @@ dependencies = [
|
|||||||
"tower-service",
|
"tower-service",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum"
|
||||||
|
version = "0.8.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
|
||||||
|
dependencies = [
|
||||||
|
"axum-core 0.5.6",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-body-util",
|
||||||
|
"itoa",
|
||||||
|
"matchit 0.8.4",
|
||||||
|
"memchr",
|
||||||
|
"mime",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project-lite",
|
||||||
|
"serde_core",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "axum-core"
|
name = "axum-core"
|
||||||
version = "0.4.5"
|
version = "0.4.5"
|
||||||
@@ -224,6 +249,24 @@ dependencies = [
|
|||||||
"tower-service",
|
"tower-service",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum-core"
|
||||||
|
version = "0.5.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-body-util",
|
||||||
|
"mime",
|
||||||
|
"pin-project-lite",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64"
|
name = "base64"
|
||||||
version = "0.22.1"
|
version = "0.22.1"
|
||||||
@@ -553,6 +596,12 @@ version = "0.1.9"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
|
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fixedbitset"
|
||||||
|
version = "0.5.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.1.9"
|
version = "1.1.9"
|
||||||
@@ -911,6 +960,19 @@ dependencies = [
|
|||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hyper-timeout"
|
||||||
|
version = "0.5.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
|
||||||
|
dependencies = [
|
||||||
|
"hyper",
|
||||||
|
"hyper-util",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper-util"
|
name = "hyper-util"
|
||||||
version = "0.1.20"
|
version = "0.1.20"
|
||||||
@@ -1110,6 +1172,15 @@ dependencies = [
|
|||||||
"nom",
|
"nom",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.18"
|
version = "1.0.18"
|
||||||
@@ -1254,6 +1325,12 @@ version = "0.7.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "matchit"
|
||||||
|
version = "0.8.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mavlink_layer"
|
name = "mavlink_layer"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -1361,6 +1438,12 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "multimap"
|
||||||
|
version = "0.10.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nix"
|
name = "nix"
|
||||||
version = "0.26.4"
|
version = "0.26.4"
|
||||||
@@ -1550,6 +1633,37 @@ version = "2.3.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
|
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "petgraph"
|
||||||
|
version = "0.8.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455"
|
||||||
|
dependencies = [
|
||||||
|
"fixedbitset",
|
||||||
|
"hashbrown 0.15.5",
|
||||||
|
"indexmap",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project"
|
||||||
|
version = "1.1.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project-internal",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project-internal"
|
||||||
|
version = "1.1.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.17"
|
version = "0.2.17"
|
||||||
@@ -1599,6 +1713,143 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost"
|
||||||
|
version = "0.14.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost-derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-build"
|
||||||
|
version = "0.14.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
|
||||||
|
dependencies = [
|
||||||
|
"heck",
|
||||||
|
"itertools",
|
||||||
|
"log",
|
||||||
|
"multimap",
|
||||||
|
"petgraph",
|
||||||
|
"prettyplease",
|
||||||
|
"prost",
|
||||||
|
"prost-types",
|
||||||
|
"pulldown-cmark",
|
||||||
|
"pulldown-cmark-to-cmark",
|
||||||
|
"regex",
|
||||||
|
"syn",
|
||||||
|
"tempfile",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-derive"
|
||||||
|
version = "0.14.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"itertools",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-types"
|
||||||
|
version = "0.14.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7"
|
||||||
|
dependencies = [
|
||||||
|
"prost",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa"
|
||||||
|
dependencies = [
|
||||||
|
"protoc-bin-vendored-linux-aarch_64",
|
||||||
|
"protoc-bin-vendored-linux-ppcle_64",
|
||||||
|
"protoc-bin-vendored-linux-s390_64",
|
||||||
|
"protoc-bin-vendored-linux-x86_32",
|
||||||
|
"protoc-bin-vendored-linux-x86_64",
|
||||||
|
"protoc-bin-vendored-macos-aarch_64",
|
||||||
|
"protoc-bin-vendored-macos-x86_64",
|
||||||
|
"protoc-bin-vendored-win32",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-linux-aarch_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-linux-ppcle_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-linux-s390_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-linux-x86_32"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-linux-x86_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-macos-aarch_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-macos-x86_64"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protoc-bin-vendored-win32"
|
||||||
|
version = "3.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pulldown-cmark"
|
||||||
|
version = "0.13.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e9f068eba8e7071c5f9511831b44f32c740d5adf574e990f946ddb53db2f314e"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.11.1",
|
||||||
|
"memchr",
|
||||||
|
"unicase",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pulldown-cmark-to-cmark"
|
||||||
|
version = "22.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90"
|
||||||
|
dependencies = [
|
||||||
|
"pulldown-cmark",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quinn"
|
name = "quinn"
|
||||||
version = "0.11.9"
|
version = "0.11.9"
|
||||||
@@ -2136,9 +2387,21 @@ name = "telemetry_stream"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"parking_lot",
|
||||||
|
"prost",
|
||||||
|
"protoc-bin-vendored",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"shared",
|
"shared",
|
||||||
|
"thiserror 1.0.69",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tonic",
|
||||||
|
"tonic-prost",
|
||||||
|
"tonic-prost-build",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2318,6 +2581,18 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-stream"
|
||||||
|
version = "0.1.18"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.7.18"
|
version = "0.7.18"
|
||||||
@@ -2372,6 +2647,74 @@ version = "0.1.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
|
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic"
|
||||||
|
version = "0.14.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"axum 0.8.9",
|
||||||
|
"base64",
|
||||||
|
"bytes",
|
||||||
|
"h2",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-body-util",
|
||||||
|
"hyper",
|
||||||
|
"hyper-timeout",
|
||||||
|
"hyper-util",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project",
|
||||||
|
"socket2",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic-build"
|
||||||
|
version = "0.14.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c68f61875ac5293cf72e6c8cf0158086428c82c37229e98c840878f1706b0322"
|
||||||
|
dependencies = [
|
||||||
|
"prettyplease",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic-prost"
|
||||||
|
version = "0.14.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost",
|
||||||
|
"tonic",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic-prost-build"
|
||||||
|
version = "0.14.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "654e5643eff75d7f8c99197ce1440ed19a3474eada74c12bbac488b2cafdae27"
|
||||||
|
dependencies = [
|
||||||
|
"prettyplease",
|
||||||
|
"proc-macro2",
|
||||||
|
"prost-build",
|
||||||
|
"prost-types",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
"tempfile",
|
||||||
|
"tonic-build",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower"
|
name = "tower"
|
||||||
version = "0.5.3"
|
version = "0.5.3"
|
||||||
@@ -2380,11 +2723,15 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"indexmap",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
|
"slab",
|
||||||
"sync_wrapper",
|
"sync_wrapper",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2517,6 +2864,12 @@ dependencies = [
|
|||||||
"thiserror 2.0.18",
|
"thiserror 2.0.18",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicase"
|
||||||
|
version = "2.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-ident"
|
name = "unicode-ident"
|
||||||
version = "1.0.24"
|
version = "1.0.24"
|
||||||
|
|||||||
+12
@@ -62,6 +62,18 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
|
|||||||
jsonschema = { version = "0.18", default-features = false }
|
jsonschema = { version = "0.18", default-features = false }
|
||||||
tokio-serial = "5"
|
tokio-serial = "5"
|
||||||
|
|
||||||
|
# gRPC (operator-link transport — see telemetry_stream / detection_client)
|
||||||
|
tonic = "0.14"
|
||||||
|
tonic-prost = "0.14"
|
||||||
|
prost = "0.14"
|
||||||
|
prost-types = "0.14"
|
||||||
|
tonic-prost-build = "0.14"
|
||||||
|
protoc-bin-vendored = "3"
|
||||||
|
tokio-stream = { version = "0.1", features = ["sync", "net"] }
|
||||||
|
|
||||||
|
# Lock-free / sync helpers
|
||||||
|
parking_lot = "0.12"
|
||||||
|
|
||||||
# Crypto / hashing
|
# Crypto / hashing
|
||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,131 @@
|
|||||||
|
# Batch 14 / Cycle 1 — Implementation Report
|
||||||
|
|
||||||
|
**Date**: 2026-05-20
|
||||||
|
**Tasks**: AZ-675
|
||||||
|
**Verdict**: PASS_WITH_WARNINGS
|
||||||
|
- Pre-existing autopilot lint from batch 4 (C5) still open.
|
||||||
|
- Pre-existing intermittent flake `mission_executor::state_machine::ac3_bounded_retry_then_success` (carried from batch 8) now fails reproducibly *under workspace load* on this dev box but still passes in isolation; root cause is a 5 ms polling-interval race in the test, not in `mission_executor` production code. Documented as A2 below — unchanged by this batch and unrelated to telemetry_stream.
|
||||||
|
|
||||||
|
## 1. Scope
|
||||||
|
|
||||||
|
| Ticket | Title | Crate | Complexity |
|
||||||
|
|---|---|---|---|
|
||||||
|
| AZ-675 | telemetry_stream Tonic gRPC server + per-client lossy queue | `telemetry_stream` | 3 |
|
||||||
|
|
||||||
|
Batch 14 is a single-ticket batch by deliberate choice. Both AZ-675 and AZ-658 were the only unblocked tasks; AZ-658 has an open architectural decision (which H.264 binding) and was held back. Picking AZ-675 also unblocks AZ-676 / AZ-677 / AZ-678 / AZ-679 (the full telemetry → operator-bridge frontier) for subsequent batches.
|
||||||
|
|
||||||
|
## 2. Approach
|
||||||
|
|
||||||
|
### Tonic infrastructure decision
|
||||||
|
|
||||||
|
`telemetry_stream/description.md §9` lists the operator-link protocol (WebRTC / WebSocket-H.264 / gRPC server-streaming) as an open architectural question. AZ-675's task spec, however, names **Tonic gRPC** explicitly and the Runtime Completeness gate says "Production code that must exist: real gRPC server". The user picked path **A: commit to Tonic now**, which:
|
||||||
|
|
||||||
|
- Pins the operator-link transport to gRPC server-streaming (closes architecture Q2 in the affirmative for the gRPC option).
|
||||||
|
- Adds **first-time** `tonic` / `prost` / `tonic-build` infrastructure to the workspace. The `detection_client/Cargo.toml` comment on line 16 anticipated this; the next ticket to need it (AZ-660) can now reuse the same workspace pins.
|
||||||
|
- Uses `protoc-bin-vendored` as a build-dependency so neither dev machines nor CI need a system `protoc` install. The build is hermetic and reproducible across platforms.
|
||||||
|
|
||||||
|
Workspace pins added: `tonic = "0.14"`, `tonic-prost = "0.14"`, `prost = "0.14"`, `prost-types = "0.14"`, `tonic-prost-build = "0.14"` (build-dep), `protoc-bin-vendored = "3"` (build-dep), `tokio-stream = "0.1"` with `sync,net` features (needed for `BroadcastStream` + `TcpListenerStream`), `parking_lot = "0.12"`.
|
||||||
|
|
||||||
|
### Back-pressure model — broadcast-direct, no intermediate buffer
|
||||||
|
|
||||||
|
The first draft of `internal/server.rs` used a per-client mpsc forwarder between the broadcast queue and the tonic stream. That hid the back-pressure: the forwarder blocked on `mpsc::send` long before the broadcast ring ever overflowed, so `RecvError::Lagged(n)` never fired and drop counters stayed at zero. **Lesson**: in a multi-stage queue where the *outer* stage is supposed to enforce drop-oldest, do not introduce a buffering middle stage that absorbs the back-pressure invisibly.
|
||||||
|
|
||||||
|
The shipped design feeds the broadcast receivers **directly** into the tonic-streamed response (via `tokio_stream::wrappers::BroadcastStream` merged through `tokio_stream::StreamMap`). When a wire/client is slow, tonic stops polling our stream → broadcast ring overruns that client's cursor → next poll yields `Err(BroadcastStreamRecvError::Lagged(n))` → drop counter incremented per (client_id, topic). Other clients are unaffected.
|
||||||
|
|
||||||
|
### What this batch ships in production
|
||||||
|
|
||||||
|
- **`proto/telemetry.proto`** — `TelemetryStream` service with a single server-streaming `Subscribe(SubscribeRequest) -> stream TelemetryMessage` RPC, five topics (`TelemetrySample`, `GimbalState`, `DetectionEvent`, `MovementCandidate`, `MapObjectsBundle`). Payloads are carried as opaque JSON in `bytes payload_json` so the canonical Rust models in `crates/shared/models/` stay authoritative.
|
||||||
|
- **`build.rs`** — wires `protoc-bin-vendored` into `tonic-prost-build` so codegen runs from `cargo build` alone.
|
||||||
|
- **`internal/publisher.rs`** — `TelemetryPublisher` with one `tokio::sync::broadcast` channel per topic, per-(client, topic) drop counters under `parking_lot::Mutex`, atomic `subscribed_clients` / `published_total` / `bytes_out_per_topic`.
|
||||||
|
- **`internal/server.rs`** — `TelemetryService` implementing `proto::telemetry_stream_server::TelemetryStream::subscribe`; validates `client_id` non-empty; resolves topic list (empty = subscribe-all); merges per-topic `BroadcastStream`s with `StreamMap`; converts `Lagged` into drop-counter updates; `StreamGuard` decrements `subscribed_clients` on stream drop.
|
||||||
|
- **`src/lib.rs`** — rewritten public surface:
|
||||||
|
- `TelemetryStreamConfig { listen_addr, topic_capacity, downlink_capacity }`.
|
||||||
|
- `TelemetryStream::spawn_grpc_server` (binds an addr) and `spawn_grpc_server_on(listener)` (binds a pre-resolved `std::net::TcpListener` — used by tests to pick ephemeral ports).
|
||||||
|
- `GrpcShutdown` RAII handle.
|
||||||
|
- `TelemetryStreamHandle::publish<T>(topic, &T)` non-blocking publish API.
|
||||||
|
- `TelemetryStreamHandle::snapshot()` for health-aggregator integration.
|
||||||
|
- `TelemetryStreamHandle::health()` flips yellow when any (client, topic) has ≥ 100 drops.
|
||||||
|
- `TelemetrySink::push_detections` is now real (publishes on `DetectionEvent` topic). `push_frame` still returns `NotImplemented(AZ-676)` because video carries different framing semantics that AZ-676 will pin.
|
||||||
|
|
||||||
|
## 3. Files touched
|
||||||
|
|
||||||
|
- `Cargo.toml` — workspace pins for tonic stack + tokio-stream features + parking_lot.
|
||||||
|
- `Cargo.lock` — regenerated for the new deps.
|
||||||
|
- `crates/telemetry_stream/Cargo.toml` — concrete deps + `build.rs` declaration.
|
||||||
|
- `crates/telemetry_stream/build.rs` — new (vendored protoc + tonic-prost-build).
|
||||||
|
- `crates/telemetry_stream/proto/telemetry.proto` — new.
|
||||||
|
- `crates/telemetry_stream/src/lib.rs` — rewrite (public surface).
|
||||||
|
- `crates/telemetry_stream/src/internal/mod.rs` — new.
|
||||||
|
- `crates/telemetry_stream/src/internal/proto.rs` — new (`tonic::include_proto!` hook).
|
||||||
|
- `crates/telemetry_stream/src/internal/publisher.rs` — new (with 4 unit tests).
|
||||||
|
- `crates/telemetry_stream/src/internal/server.rs` — new (gRPC service impl).
|
||||||
|
- `crates/telemetry_stream/tests/grpc_subscribe.rs` — new (5 integration tests covering AC-1..AC-3 + edge cases).
|
||||||
|
- `_docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md` — moved from `todo/`.
|
||||||
|
- `_docs/_autodev_state.md` — phase update.
|
||||||
|
- `_docs/03_implementation/batch_14_cycle1_report.md` — this report.
|
||||||
|
|
||||||
|
## 4. Test results
|
||||||
|
|
||||||
|
| Crate | Unit | Integration | Total |
|
||||||
|
|---|---|---|---|
|
||||||
|
| `telemetry_stream` | 6 | 5 | 11 |
|
||||||
|
|
||||||
|
Clippy: `cargo clippy -p telemetry_stream --all-targets -- -D warnings` is clean.
|
||||||
|
|
||||||
|
Workspace `cargo test --workspace`: all suites green **except** the pre-existing `mission_executor::state_machine::ac3_bounded_retry_then_success` flake — see A2.
|
||||||
|
|
||||||
|
### Acceptance criteria
|
||||||
|
|
||||||
|
| AC | Test | Status |
|
||||||
|
|---|---|---|
|
||||||
|
| AC-1 multiple subscribers receive same stream (ordering preserved) | `tests/grpc_subscribe.rs::ac1_multiple_subscribers_receive_same_stream` | ✅ |
|
||||||
|
| AC-2 slow subscriber drops oldest, healthy unaffected | `tests/grpc_subscribe.rs::ac2_slow_subscriber_drops_oldest_healthy_unaffected` + `internal/publisher.rs::slow_subscriber_lags_fast_subscriber_does_not` | ✅ |
|
||||||
|
| AC-3 disconnect cleanly removes subscriber | `tests/grpc_subscribe.rs::ac3_disconnect_decrements_subscribed_clients` | ✅ |
|
||||||
|
| Empty topics defaults to ALL | `tests/grpc_subscribe.rs::empty_topics_list_defaults_to_all` | ✅ |
|
||||||
|
| Empty client_id rejected at boundary | `tests/grpc_subscribe.rs::empty_client_id_is_rejected` | ✅ |
|
||||||
|
|
||||||
|
## 5. Findings (this batch)
|
||||||
|
|
||||||
|
### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name`
|
||||||
|
|
||||||
|
**Severity**: High (still blocks workspace `-D warnings` clippy gate)
|
||||||
|
**Status**: OPEN — carried since batch 4. Not introduced by this batch. Tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` and cumulative finding C5.
|
||||||
|
|
||||||
|
### A2. Pre-existing `ac3_bounded_retry_then_success` flake escalation
|
||||||
|
|
||||||
|
**Severity**: Medium (Test design)
|
||||||
|
**Category**: Tests
|
||||||
|
**Origin**: Batch 8 mission_executor; behaviour unchanged by this batch.
|
||||||
|
|
||||||
|
The test polls `handle.state()` every 5 ms while waiting for `MissionUploaded`, but with `tick_interval=5ms` and a one-rejected-then-accepted scripted driver, the FSM can pass *through* `MissionUploaded` faster than the poll cadence and the await reports `stuck at WaitAuto`. Confirmed pre-existing — `git stash` of batch-14 changes reproduces the same intermittent failure, and the test passes in isolation. The proximate cause is the test's polling design, not `mission_executor` production code.
|
||||||
|
|
||||||
|
This batch's new transitive deps (tonic/prost stack) increase background compile / runtime load on dev boxes, which may make the race more likely to lose. The fix belongs to a small focused test refactor (latch on FSM transition events instead of polling), filed as a leftover.
|
||||||
|
|
||||||
|
→ Filed `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`.
|
||||||
|
|
||||||
|
### A3. Architecture Q2 (operator-link protocol) now decided
|
||||||
|
|
||||||
|
**Severity**: Low (Architecture / doc sync)
|
||||||
|
**Detail**: `telemetry_stream/description.md §9` listed protocol as TBD. With AZ-675 shipping a Tonic-based gRPC server, this is effectively decided in favour of gRPC server-streaming. The architecture doc was not edited in this batch (out of scope; see C3 doc-sync sweep). When the doc sweep runs, this should move from "open question" to a recorded decision in `_docs/02_document/decision-rationale.md`.
|
||||||
|
|
||||||
|
## 6. Cumulative findings — open carry-over
|
||||||
|
|
||||||
|
Batch 14 is mid-triplet (13 / 14 / 15); cumulative review lands at the end of batch 15.
|
||||||
|
|
||||||
|
| ID | Severity | Category | Status |
|
||||||
|
|---|---|---|---|
|
||||||
|
| C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) |
|
||||||
|
| C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) |
|
||||||
|
| C3 | Low | Architecture | OPEN — `module-layout.md` drift: grows by `telemetry_stream/internal/{publisher,server,proto}.rs` this batch |
|
||||||
|
| C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) |
|
||||||
|
| C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy |
|
||||||
|
| C6 (new) | Medium | Tests | OPEN — `ac3_bounded_retry_then_success` polling race (see A2) |
|
||||||
|
| C7 (new) | Low | Architecture | OPEN — record Tonic-gRPC decision in `decision-rationale.md` (see A3) |
|
||||||
|
|
||||||
|
## 7. Next-batch candidates
|
||||||
|
|
||||||
|
- **AZ-678** — operator_bridge command authentication. Depends on AZ-675 (now done). 5 pts.
|
||||||
|
- **AZ-679** — operator_bridge POI surface. Depends on AZ-675 (now done) + uses the AZ-683 POI queue + AZ-685 decline path. 3 pts. Cleanly buildable as a Subscribe-style or push consumer of `MapObjectsBundle` / `POI` topics through the AZ-675 server.
|
||||||
|
- **AZ-676** — telemetry_stream video path. Depends on AZ-675 (now done) + AZ-657 (already done). 3 pts. Self-contained extension to the AZ-675 server.
|
||||||
|
- **AZ-677** — telemetry_stream MapObjects snapshot. Depends on AZ-675 (now done) + AZ-667 (not done — blocked).
|
||||||
|
- **AZ-658** — frame_ingest decoder. Still needs the H.264 binding decision (retina vs ffmpeg-rs vs gstreamer). 5 pts.
|
||||||
@@ -6,9 +6,9 @@ step: 7
|
|||||||
name: Implement
|
name: Implement
|
||||||
status: in_progress
|
status: in_progress
|
||||||
sub_step:
|
sub_step:
|
||||||
phase: 23
|
phase: 27
|
||||||
name: tracker-update-in-testing
|
name: awaiting-push
|
||||||
detail: "batch 13 (AZ-683) implemented and reviewed; awaiting commit + In Testing"
|
detail: "batch 14 (AZ-675) committed (ebf4aef) + In Testing in Jira; awaiting user push approval"
|
||||||
retry_count: 0
|
retry_count: 0
|
||||||
cycle: 1
|
cycle: 1
|
||||||
tracker: jira
|
tracker: jira
|
||||||
|
|||||||
@@ -0,0 +1,38 @@
|
|||||||
|
# Leftover: `mission_executor::ac3_bounded_retry_then_success` polling race
|
||||||
|
|
||||||
|
**Timestamp**: 2026-05-20T08:30:00+02:00
|
||||||
|
**Origin**: Batch 8 (mission_executor state machine). Surfaced in batches 11, 12, 13 as intermittent. Reproduces more reliably on dev box under batch 14 workspace test load (the new tonic stack increases build/runtime pressure).
|
||||||
|
**Severity**: Medium (test design, not production code)
|
||||||
|
**Not blocking**: pre-existing failure in unrelated area; production `mission_executor` behaviour is correct — the test simply has a polling race.
|
||||||
|
|
||||||
|
## Symptom
|
||||||
|
|
||||||
|
```
|
||||||
|
test ac3_bounded_retry_then_success ... FAILED
|
||||||
|
thread 'ac3_bounded_retry_then_success' panicked at
|
||||||
|
crates/mission_executor/tests/state_machine.rs:116:
|
||||||
|
FSM did not reach MissionUploaded; stuck at WaitAuto
|
||||||
|
```
|
||||||
|
|
||||||
|
`WaitAuto` is the FSM state *after* `MissionUploaded`. The FSM passed *through* `MissionUploaded` faster than the test's 5 ms polling cadence could observe it. The post-assertion (`matches!(state, WaitAuto | MissionUploaded)`) acknowledges either is fine, but `await_state(target=MissionUploaded)` panics before that assertion runs.
|
||||||
|
|
||||||
|
## Root cause
|
||||||
|
|
||||||
|
`crates/mission_executor/tests/state_machine.rs` lines 100-118 — `await_state` polls every 5 ms; FSM `tick_interval` is also 5 ms; a successful retry+upload can complete in less than one polling interval.
|
||||||
|
|
||||||
|
## Recommended fix (out of scope for current batch)
|
||||||
|
|
||||||
|
Replace polling with an event latch:
|
||||||
|
|
||||||
|
- Have `MissionExecutorHandle::state_stream()` (or expose `tokio::sync::watch::Receiver<MissionState>`) so tests can `await` on the channel changing through the target state.
|
||||||
|
- Or: record a `Vec<MissionState>` history in `Inner` and assert the target is *in* the history at the end, not the current state.
|
||||||
|
|
||||||
|
Either approach is ~30 lines of test-only refactor. Production code does not need to change.
|
||||||
|
|
||||||
|
## Replay instructions
|
||||||
|
|
||||||
|
When working on `mission_executor` next (e.g. batch that touches the state machine or tick loop):
|
||||||
|
|
||||||
|
1. Pick one of the two fixes above.
|
||||||
|
2. Re-run `cargo test --workspace` to confirm flake is gone.
|
||||||
|
3. Delete this leftover.
|
||||||
@@ -6,9 +6,27 @@ rust-version.workspace = true
|
|||||||
license.workspace = true
|
license.workspace = true
|
||||||
publish.workspace = true
|
publish.workspace = true
|
||||||
authors.workspace = true
|
authors.workspace = true
|
||||||
|
build = "build.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
shared = { workspace = true }
|
shared = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
|
tokio-stream = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
prost = { workspace = true }
|
||||||
|
tonic = { workspace = true }
|
||||||
|
tonic-prost = { workspace = true }
|
||||||
|
parking_lot = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
tonic-prost-build = { workspace = true }
|
||||||
|
protoc-bin-vendored = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["test-util"] }
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
//! Build-time codegen for the AZ-675 telemetry gRPC contract.
|
||||||
|
//!
|
||||||
|
//! We use `protoc-bin-vendored` so the build is self-contained — no
|
||||||
|
//! system protoc install is required on dev or CI. The PROTOC env
|
||||||
|
//! var is set before invoking `tonic-prost-build`, which is what
|
||||||
|
//! tonic uses to locate the compiler.
|
||||||
|
|
||||||
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let protoc = protoc_bin_vendored::protoc_bin_path()?;
|
||||||
|
std::env::set_var("PROTOC", protoc);
|
||||||
|
|
||||||
|
tonic_prost_build::configure()
|
||||||
|
.build_client(true)
|
||||||
|
.build_server(true)
|
||||||
|
.compile_protos(&["proto/telemetry.proto"], &["proto"])?;
|
||||||
|
|
||||||
|
println!("cargo:rerun-if-changed=proto/telemetry.proto");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
// AZ-675 telemetry_stream — operator-bound gRPC contract.
|
||||||
|
//
|
||||||
|
// One service, one bi-directional Subscribe RPC. Client opens a stream
|
||||||
|
// declaring which topics it wants; server pushes messages for those
|
||||||
|
// topics until the client disconnects.
|
||||||
|
//
|
||||||
|
// The server enforces per-client back-pressure: when a client cannot
|
||||||
|
// keep up the oldest message in *that client's* queue is dropped and
|
||||||
|
// a per-(client, topic) drop counter is incremented. Other clients
|
||||||
|
// are unaffected.
|
||||||
|
//
|
||||||
|
// AZ-676 will add the video path (separate RPC, server-streamed binary
|
||||||
|
// frames). AZ-677 will add the MapObjectsBundle snapshot RPC. Keep
|
||||||
|
// those concerns out of this contract.
|
||||||
|
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package autopilot.telemetry.v1;
|
||||||
|
|
||||||
|
// Topics a client can subscribe to. Repeated in SubscribeRequest so a
|
||||||
|
// single stream multiplexes whichever subset the client cares about.
|
||||||
|
enum Topic {
|
||||||
|
TOPIC_UNSPECIFIED = 0;
|
||||||
|
TOPIC_TELEMETRY_SAMPLE = 1;
|
||||||
|
TOPIC_GIMBAL_STATE = 2;
|
||||||
|
TOPIC_DETECTION_EVENT = 3;
|
||||||
|
TOPIC_MOVEMENT_CANDIDATE = 4;
|
||||||
|
TOPIC_MAP_OBJECTS_BUNDLE = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeRequest {
|
||||||
|
// Operator/client identifier. Plumbed into per-client drop counters
|
||||||
|
// and log lines. Free-form for AZ-675; AZ-678 (operator command
|
||||||
|
// auth) will tighten the format.
|
||||||
|
string client_id = 1;
|
||||||
|
repeated Topic topics = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each message carries the topic tag + an opaque JSON payload. We
|
||||||
|
// don't pin schemas per topic in proto here because the canonical
|
||||||
|
// payload shapes are already authoritative in `crates/shared/models`
|
||||||
|
// (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate,
|
||||||
|
// MapObjectsBundle). Using JSON keeps the wire honest with what the
|
||||||
|
// rest of the system already serializes; if a topic later needs
|
||||||
|
// schema enforcement, add a typed message variant in a future bump.
|
||||||
|
message TelemetryMessage {
|
||||||
|
Topic topic = 1;
|
||||||
|
// Monotonic nanoseconds since the autopilot process started. Used
|
||||||
|
// by the operator for ordering and latency measurement.
|
||||||
|
uint64 monotonic_ts_ns = 2;
|
||||||
|
// Server-side per-client sequence number. Strictly increases per
|
||||||
|
// (client, topic) stream; gaps imply drops.
|
||||||
|
uint64 sequence = 3;
|
||||||
|
// Serialized JSON payload (utf-8). Topic determines the shape.
|
||||||
|
bytes payload_json = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
service TelemetryStream {
|
||||||
|
// Server-streaming subscribe. The client sends ONE SubscribeRequest;
|
||||||
|
// the server pushes TelemetryMessage values until the client cancels
|
||||||
|
// the stream or the server shuts down. The server applies per-
|
||||||
|
// client drop-oldest back-pressure if the client cannot keep up.
|
||||||
|
rpc Subscribe(SubscribeRequest) returns (stream TelemetryMessage);
|
||||||
|
}
|
||||||
@@ -0,0 +1,5 @@
|
|||||||
|
//! Internal modules for `telemetry_stream`. Not part of the public API.
|
||||||
|
|
||||||
|
pub mod proto;
|
||||||
|
pub mod publisher;
|
||||||
|
pub mod server;
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
//! Generated tonic+prost code for the telemetry gRPC contract.
|
||||||
|
//!
|
||||||
|
//! The actual `.rs` file is produced at build time by `build.rs`
|
||||||
|
//! (see workspace `tonic-prost-build` / `protoc-bin-vendored` deps)
|
||||||
|
//! and dropped into `OUT_DIR`. We pull it in here under a stable
|
||||||
|
//! module path so the rest of the crate doesn't reach into `OUT_DIR`.
|
||||||
|
|
||||||
|
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
|
||||||
|
tonic::include_proto!("autopilot.telemetry.v1");
|
||||||
@@ -0,0 +1,314 @@
|
|||||||
|
//! AZ-675 — multi-topic, per-client lossy publisher.
|
||||||
|
//!
|
||||||
|
//! The publisher owns one `tokio::sync::broadcast` channel per topic.
|
||||||
|
//! Each subscribed client gets per-topic receivers; falling behind
|
||||||
|
//! more than the channel capacity causes `tokio::sync::broadcast` to
|
||||||
|
//! return `RecvError::Lagged(n)` which the server-side stream
|
||||||
|
//! handler turns into a `drops_total{client_id, topic} += n`
|
||||||
|
//! increment. Slow clients never block the publisher, and a slow
|
||||||
|
//! client on one topic does not affect any other client or topic.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use serde::Serialize;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
|
use crate::internal::proto::{TelemetryMessage, Topic};
|
||||||
|
|
||||||
|
/// Per-topic broadcast capacity. A client falling more than this many
|
||||||
|
/// messages behind on a topic experiences drops on that topic. The
|
||||||
|
/// value is chosen empirically: 10 Hz telemetry → 256 messages ≈ 25
|
||||||
|
/// seconds of buffering, which is more than enough for transient
|
||||||
|
/// modem stalls but bounded for memory.
|
||||||
|
pub const DEFAULT_TOPIC_CAPACITY: usize = 256;
|
||||||
|
|
||||||
|
/// All known topics, in the same order as the proto enum (skipping
|
||||||
|
/// UNSPECIFIED).
|
||||||
|
pub const ALL_TOPICS: &[Topic] = &[
|
||||||
|
Topic::TelemetrySample,
|
||||||
|
Topic::GimbalState,
|
||||||
|
Topic::DetectionEvent,
|
||||||
|
Topic::MovementCandidate,
|
||||||
|
Topic::MapObjectsBundle,
|
||||||
|
];
|
||||||
|
|
||||||
|
/// Errors returned by [`TelemetryPublisher::publish`]. Publish never
|
||||||
|
/// blocks on slow clients; the only producer-side failure is
|
||||||
|
/// serialization of the payload, which is a programmer error caught
|
||||||
|
/// at compile time everywhere we control. We surface it explicitly
|
||||||
|
/// rather than swallow.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum PublishError {
|
||||||
|
#[error("serialize topic={topic:?}: {source}")]
|
||||||
|
Serialize {
|
||||||
|
topic: Topic,
|
||||||
|
#[source]
|
||||||
|
source: serde_json::Error,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PerTopicCounters {
|
||||||
|
pub published: u64,
|
||||||
|
pub bytes_out: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot of publisher health for the [`crate::TelemetryStreamHandle::health`]
|
||||||
|
/// surface and the AZ-675 NFR observability hooks.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct PublisherSnapshot {
|
||||||
|
pub subscribed_clients: usize,
|
||||||
|
pub published_total: u64,
|
||||||
|
pub per_topic: HashMap<Topic, PerTopicCounters>,
|
||||||
|
pub drops_total: HashMap<(String, Topic), u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TopicChannel {
|
||||||
|
tx: broadcast::Sender<TelemetryMessage>,
|
||||||
|
seq: AtomicU64,
|
||||||
|
published: AtomicU64,
|
||||||
|
bytes_out: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TopicChannel {
|
||||||
|
fn new(capacity: usize) -> Self {
|
||||||
|
let (tx, _) = broadcast::channel(capacity);
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
seq: AtomicU64::new(0),
|
||||||
|
published: AtomicU64::new(0),
|
||||||
|
bytes_out: AtomicU64::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drop counter map. Keyed by `(client_id, topic)`. We isolate it
|
||||||
|
/// behind a `parking_lot::Mutex` because writes happen from
|
||||||
|
/// per-client tonic stream tasks; reads happen from the health
|
||||||
|
/// surface — both are infrequent compared to the broadcast hot path.
|
||||||
|
type DropMap = Mutex<HashMap<(String, Topic), AtomicU64>>;
|
||||||
|
|
||||||
|
pub struct TelemetryPublisher {
|
||||||
|
topics: HashMap<Topic, TopicChannel>,
|
||||||
|
drops: DropMap,
|
||||||
|
subscribed_clients: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TelemetryPublisher {
|
||||||
|
/// Build a publisher with the same per-topic capacity for every
|
||||||
|
/// topic. Use [`new_with_capacities`] if a single topic needs a
|
||||||
|
/// different buffer (e.g. MapObjectsBundle which is bursty).
|
||||||
|
pub fn new(capacity: usize) -> Arc<Self> {
|
||||||
|
let mut topics = HashMap::with_capacity(ALL_TOPICS.len());
|
||||||
|
for &t in ALL_TOPICS {
|
||||||
|
topics.insert(t, TopicChannel::new(capacity));
|
||||||
|
}
|
||||||
|
Arc::new(Self {
|
||||||
|
topics,
|
||||||
|
drops: Mutex::new(HashMap::new()),
|
||||||
|
subscribed_clients: AtomicUsize::new(0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn default_capacity() -> Arc<Self> {
|
||||||
|
Self::new(DEFAULT_TOPIC_CAPACITY)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serialise `payload` and fan out to every subscriber on `topic`.
|
||||||
|
///
|
||||||
|
/// Never blocks. `broadcast::Sender::send` returns the number of
|
||||||
|
/// receivers it queued for; a return of 0 means no current
|
||||||
|
/// subscribers, which is fine — we still bump the published
|
||||||
|
/// counter so the operator can confirm the producer is alive.
|
||||||
|
pub fn publish<T: Serialize>(&self, topic: Topic, payload: &T) -> Result<(), PublishError> {
|
||||||
|
let channel = match self.topics.get(&topic) {
|
||||||
|
Some(c) => c,
|
||||||
|
None => {
|
||||||
|
warn!(?topic, "unknown topic; dropping publish");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let payload_json = serde_json::to_vec(payload)
|
||||||
|
.map_err(|e| PublishError::Serialize { topic, source: e })?;
|
||||||
|
let bytes_len = payload_json.len() as u64;
|
||||||
|
|
||||||
|
let seq = channel.seq.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let msg = TelemetryMessage {
|
||||||
|
topic: topic as i32,
|
||||||
|
monotonic_ts_ns: shared::clock::MonoClock::new().elapsed_ns(),
|
||||||
|
sequence: seq,
|
||||||
|
payload_json,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = channel.tx.send(msg);
|
||||||
|
channel.published.fetch_add(1, Ordering::Relaxed);
|
||||||
|
channel.bytes_out.fetch_add(bytes_len, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Open a per-client receiver for `topic`. The caller (the
|
||||||
|
/// `Subscribe` RPC handler) is responsible for handling
|
||||||
|
/// `RecvError::Lagged` by calling [`record_drops`].
|
||||||
|
pub(crate) fn subscribe_topic(
|
||||||
|
&self,
|
||||||
|
topic: Topic,
|
||||||
|
) -> Option<broadcast::Receiver<TelemetryMessage>> {
|
||||||
|
self.topics.get(&topic).map(|c| c.tx.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn register_client(&self) {
|
||||||
|
self.subscribed_clients.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn deregister_client(&self) {
|
||||||
|
self.subscribed_clients.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn record_drops(&self, client_id: &str, topic: Topic, n: u64) {
|
||||||
|
if n == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut map = self.drops.lock();
|
||||||
|
let key = (client_id.to_string(), topic);
|
||||||
|
map.entry(key)
|
||||||
|
.or_insert_with(|| AtomicU64::new(0))
|
||||||
|
.fetch_add(n, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn snapshot(&self) -> PublisherSnapshot {
|
||||||
|
let mut per_topic = HashMap::with_capacity(self.topics.len());
|
||||||
|
let mut total_published = 0u64;
|
||||||
|
for (&t, c) in &self.topics {
|
||||||
|
let published = c.published.load(Ordering::Relaxed);
|
||||||
|
let bytes_out = c.bytes_out.load(Ordering::Relaxed);
|
||||||
|
total_published = total_published.saturating_add(published);
|
||||||
|
per_topic.insert(
|
||||||
|
t,
|
||||||
|
PerTopicCounters {
|
||||||
|
published,
|
||||||
|
bytes_out,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let drops_map = self.drops.lock();
|
||||||
|
let drops_total: HashMap<(String, Topic), u64> = drops_map
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
|
||||||
|
.collect();
|
||||||
|
PublisherSnapshot {
|
||||||
|
subscribed_clients: self.subscribed_clients.load(Ordering::Relaxed),
|
||||||
|
published_total: total_published,
|
||||||
|
per_topic,
|
||||||
|
drops_total,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||||
|
struct Sample {
|
||||||
|
v: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn publish_with_no_subscribers_is_no_op() {
|
||||||
|
// Arrange
|
||||||
|
let pub_ = TelemetryPublisher::new(8);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
pub_.publish(Topic::TelemetrySample, &Sample { v: 1 })
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
let snap = pub_.snapshot();
|
||||||
|
assert_eq!(snap.subscribed_clients, 0);
|
||||||
|
assert_eq!(snap.per_topic[&Topic::TelemetrySample].published, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn slow_subscriber_lags_fast_subscriber_does_not() {
|
||||||
|
// Arrange — capacity 4 so 100 publishes overflow trivially.
|
||||||
|
let pub_ = TelemetryPublisher::new(4);
|
||||||
|
let mut fast = pub_.subscribe_topic(Topic::TelemetrySample).unwrap();
|
||||||
|
let mut slow = pub_.subscribe_topic(Topic::TelemetrySample).unwrap();
|
||||||
|
|
||||||
|
// Act — burst 100 messages WITHOUT giving `slow` a chance to drain.
|
||||||
|
for v in 0..100u32 {
|
||||||
|
pub_.publish(Topic::TelemetrySample, &Sample { v }).unwrap();
|
||||||
|
// fast drains immediately; slow does nothing.
|
||||||
|
let _ = fast.recv().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain `slow` and count lag.
|
||||||
|
let mut total_drops: u64 = 0;
|
||||||
|
let mut delivered: u64 = 0;
|
||||||
|
loop {
|
||||||
|
match slow.try_recv() {
|
||||||
|
Ok(_) => delivered += 1,
|
||||||
|
Err(broadcast::error::TryRecvError::Lagged(n)) => total_drops += n,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert — fast got all 100; slow lost most but delivered ≤ capacity.
|
||||||
|
assert!(delivered <= 4, "slow can hold at most capacity (4)");
|
||||||
|
assert_eq!(
|
||||||
|
total_drops + delivered,
|
||||||
|
100,
|
||||||
|
"every published msg accounted for"
|
||||||
|
);
|
||||||
|
assert!(total_drops > 0, "slow subscriber MUST have lagged");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn record_drops_accumulates_per_client_topic() {
|
||||||
|
// Arrange
|
||||||
|
let pub_ = TelemetryPublisher::new(8);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
pub_.record_drops("client_a", Topic::TelemetrySample, 5);
|
||||||
|
pub_.record_drops("client_a", Topic::TelemetrySample, 3);
|
||||||
|
pub_.record_drops("client_a", Topic::GimbalState, 2);
|
||||||
|
pub_.record_drops("client_b", Topic::TelemetrySample, 1);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
let snap = pub_.snapshot();
|
||||||
|
assert_eq!(
|
||||||
|
snap.drops_total[&("client_a".to_string(), Topic::TelemetrySample)],
|
||||||
|
8
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
snap.drops_total[&("client_a".to_string(), Topic::GimbalState)],
|
||||||
|
2
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
snap.drops_total[&("client_b".to_string(), Topic::TelemetrySample)],
|
||||||
|
1
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn register_deregister_balance_tracks_subscribed_clients() {
|
||||||
|
// Arrange
|
||||||
|
let pub_ = TelemetryPublisher::new(8);
|
||||||
|
assert_eq!(pub_.snapshot().subscribed_clients, 0);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
pub_.register_client();
|
||||||
|
pub_.register_client();
|
||||||
|
assert_eq!(pub_.snapshot().subscribed_clients, 2);
|
||||||
|
pub_.deregister_client();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(pub_.snapshot().subscribed_clients, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,127 @@
|
|||||||
|
//! AZ-675 — gRPC `TelemetryStream::Subscribe` service implementation.
|
||||||
|
//!
|
||||||
|
//! The client sends a single `SubscribeRequest`; the server returns a
|
||||||
|
//! server-streaming response built directly from per-topic
|
||||||
|
//! `BroadcastStream`s merged with `StreamMap`. The tonic transport
|
||||||
|
//! is what polls our stream — when the wire (or the operator client)
|
||||||
|
//! cannot keep up, the broadcast ring overflows that client's cursor
|
||||||
|
//! and `BroadcastStream` yields `Err(BroadcastStreamRecvError::Lagged(n))`
|
||||||
|
//! on the next poll. That is the *only* place drop accounting
|
||||||
|
//! happens: there is no intermediate mpsc buffer that could absorb
|
||||||
|
//! back-pressure and hide lag.
|
||||||
|
//!
|
||||||
|
//! `StreamGuard` decrements `subscribed_clients` on stream drop.
|
||||||
|
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
|
||||||
|
use tokio_stream::wrappers::BroadcastStream;
|
||||||
|
use tokio_stream::{Stream, StreamExt, StreamMap};
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::internal::proto::telemetry_stream_server::TelemetryStream;
|
||||||
|
use crate::internal::proto::{SubscribeRequest, TelemetryMessage, Topic};
|
||||||
|
use crate::internal::publisher::{TelemetryPublisher, ALL_TOPICS};
|
||||||
|
|
||||||
|
pub struct TelemetryService {
|
||||||
|
publisher: Arc<TelemetryPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TelemetryService {
|
||||||
|
pub fn new(publisher: Arc<TelemetryPublisher>) -> Self {
|
||||||
|
Self { publisher }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubscribeStream = Pin<Box<dyn Stream<Item = Result<TelemetryMessage, Status>> + Send>>;
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl TelemetryStream for TelemetryService {
|
||||||
|
type SubscribeStream = SubscribeStream;
|
||||||
|
|
||||||
|
async fn subscribe(
|
||||||
|
&self,
|
||||||
|
request: Request<SubscribeRequest>,
|
||||||
|
) -> Result<Response<Self::SubscribeStream>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
if req.client_id.trim().is_empty() {
|
||||||
|
return Err(Status::invalid_argument("client_id is required"));
|
||||||
|
}
|
||||||
|
let client_id = req.client_id.clone();
|
||||||
|
|
||||||
|
let requested: Vec<Topic> = if req.topics.is_empty() {
|
||||||
|
ALL_TOPICS.to_vec()
|
||||||
|
} else {
|
||||||
|
let mut out = Vec::with_capacity(req.topics.len());
|
||||||
|
for raw in &req.topics {
|
||||||
|
let t = Topic::try_from(*raw)
|
||||||
|
.map_err(|_| Status::invalid_argument(format!("unknown topic {raw}")))?;
|
||||||
|
if matches!(t, Topic::Unspecified) {
|
||||||
|
return Err(Status::invalid_argument("TOPIC_UNSPECIFIED not allowed"));
|
||||||
|
}
|
||||||
|
out.push(t);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut map: StreamMap<Topic, BroadcastStream<TelemetryMessage>> = StreamMap::new();
|
||||||
|
for &t in &requested {
|
||||||
|
match self.publisher.subscribe_topic(t) {
|
||||||
|
Some(rx) => {
|
||||||
|
map.insert(t, BroadcastStream::new(rx));
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(Status::failed_precondition(format!(
|
||||||
|
"topic {t:?} not registered"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.publisher.register_client();
|
||||||
|
info!(client_id = %client_id, topics = ?requested, "telemetry subscribe");
|
||||||
|
|
||||||
|
let publisher = Arc::clone(&self.publisher);
|
||||||
|
let cid = client_id.clone();
|
||||||
|
let stream = map.filter_map(move |(topic, item)| match item {
|
||||||
|
Ok(msg) => Some(Ok(msg)),
|
||||||
|
Err(BroadcastStreamRecvError::Lagged(n)) => {
|
||||||
|
warn!(client_id = %cid, ?topic, dropped = n, "slow client lagged");
|
||||||
|
publisher.record_drops(&cid, topic, n);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream = StreamGuard {
|
||||||
|
inner: stream,
|
||||||
|
publisher: Arc::clone(&self.publisher),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decrement `subscribed_clients` when the per-client outbound
|
||||||
|
/// stream is dropped (tonic drops the stream when the client side
|
||||||
|
/// goes away).
|
||||||
|
struct StreamGuard<S> {
|
||||||
|
inner: S,
|
||||||
|
publisher: Arc<TelemetryPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Stream + Unpin> Stream for StreamGuard<S> {
|
||||||
|
type Item = S::Item;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
Pin::new(&mut self.inner).poll_next(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Drop for StreamGuard<S> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.publisher.deregister_client();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,57 +1,224 @@
|
|||||||
//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink.
|
//! `telemetry_stream` — always-on uplink to the Ground Station + operator-command downlink.
|
||||||
//!
|
//!
|
||||||
//! Real implementation lands in:
|
//! Real implementations:
|
||||||
//! - AZ-675 `telemetry_stream_grpc_server`
|
//! - **AZ-675 (this crate, this batch)**: Tonic gRPC server, per-client
|
||||||
//! - AZ-676 `telemetry_stream_video_path`
|
//! bounded queue, drop-oldest back-pressure, drop counters. Topics:
|
||||||
//! - AZ-677 `telemetry_stream_mapobjects_snapshot`
|
//! `TelemetrySample`, `GimbalState`, `DetectionEvent`,
|
||||||
|
//! `MovementCandidate`, `MapObjectsBundle`.
|
||||||
|
//! - **AZ-676**: video frame topic (separate RPC, server-streamed
|
||||||
|
//! binary payloads).
|
||||||
|
//! - **AZ-677**: diff-based snapshot emission for `MapObjectsBundle`.
|
||||||
|
//! - **AZ-678+**: command-auth on the return path (operator_bridge).
|
||||||
|
|
||||||
|
pub mod internal;
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tonic::transport::Server;
|
||||||
|
|
||||||
use shared::contracts::TelemetrySink;
|
use shared::contracts::TelemetrySink;
|
||||||
use shared::error::{AutopilotError, Result};
|
use shared::error::{AutopilotError, Result};
|
||||||
use shared::health::ComponentHealth;
|
use shared::health::{ComponentHealth, HealthLevel};
|
||||||
use shared::models::detection::DetectionBatch;
|
use shared::models::detection::DetectionBatch;
|
||||||
use shared::models::frame::Frame;
|
use shared::models::frame::Frame;
|
||||||
use shared::models::operator::OperatorCommand;
|
use shared::models::operator::OperatorCommand;
|
||||||
|
|
||||||
|
use crate::internal::proto::telemetry_stream_server::TelemetryStreamServer;
|
||||||
|
use crate::internal::proto::Topic;
|
||||||
|
use crate::internal::publisher::{TelemetryPublisher, DEFAULT_TOPIC_CAPACITY};
|
||||||
|
use crate::internal::server::TelemetryService;
|
||||||
|
|
||||||
|
pub use crate::internal::proto::{
|
||||||
|
telemetry_stream_client::TelemetryStreamClient, SubscribeRequest, TelemetryMessage,
|
||||||
|
Topic as TelemetryTopic,
|
||||||
|
};
|
||||||
|
pub use crate::internal::publisher::{
|
||||||
|
PerTopicCounters, PublishError, PublisherSnapshot, ALL_TOPICS,
|
||||||
|
};
|
||||||
|
|
||||||
const NAME: &str = "telemetry_stream";
|
const NAME: &str = "telemetry_stream";
|
||||||
|
|
||||||
|
/// Per-(client, topic) drop rate at or above which health flips to
|
||||||
|
/// yellow. Picked to surface persistent slow consumers without
|
||||||
|
/// flapping on a single transient lag spike.
|
||||||
|
const DROP_YELLOW_THRESHOLD: u64 = 100;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct TelemetryStreamConfig {
|
||||||
|
/// Where the Tonic gRPC server binds. `0.0.0.0:50061` by default.
|
||||||
|
pub listen_addr: SocketAddr,
|
||||||
|
/// Per-topic broadcast capacity (per subscriber buffer).
|
||||||
|
pub topic_capacity: usize,
|
||||||
|
/// Bounded capacity of the downlink command channel that feeds
|
||||||
|
/// `operator_bridge`.
|
||||||
|
pub downlink_capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for TelemetryStreamConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
listen_addr: "0.0.0.0:50061".parse().expect("hardcoded addr parses"),
|
||||||
|
topic_capacity: DEFAULT_TOPIC_CAPACITY,
|
||||||
|
downlink_capacity: 64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct TelemetryStream {
|
pub struct TelemetryStream {
|
||||||
|
publisher: Arc<TelemetryPublisher>,
|
||||||
commands_tx: mpsc::Sender<OperatorCommand>,
|
commands_tx: mpsc::Sender<OperatorCommand>,
|
||||||
commands_rx: Option<mpsc::Receiver<OperatorCommand>>,
|
commands_rx: Option<mpsc::Receiver<OperatorCommand>>,
|
||||||
|
config: TelemetryStreamConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TelemetryStream {
|
impl TelemetryStream {
|
||||||
pub fn new(downlink_capacity: usize) -> Self {
|
pub fn new(downlink_capacity: usize) -> Self {
|
||||||
let (commands_tx, commands_rx) = mpsc::channel(downlink_capacity);
|
Self::with_config(TelemetryStreamConfig {
|
||||||
|
downlink_capacity,
|
||||||
|
..TelemetryStreamConfig::default()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_config(config: TelemetryStreamConfig) -> Self {
|
||||||
|
let publisher = TelemetryPublisher::new(config.topic_capacity);
|
||||||
|
let (commands_tx, commands_rx) = mpsc::channel(config.downlink_capacity);
|
||||||
Self {
|
Self {
|
||||||
|
publisher,
|
||||||
commands_tx,
|
commands_tx,
|
||||||
commands_rx: Some(commands_rx),
|
commands_rx: Some(commands_rx),
|
||||||
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle(&self) -> TelemetryStreamHandle {
|
pub fn handle(&self) -> TelemetryStreamHandle {
|
||||||
TelemetryStreamHandle {
|
TelemetryStreamHandle {
|
||||||
|
publisher: Arc::clone(&self.publisher),
|
||||||
commands_tx: self.commands_tx.clone(),
|
commands_tx: self.commands_tx.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take the downlink command receiver. The composition root forwards it to
|
/// Take the downlink command receiver. The composition root
|
||||||
/// `operator_bridge` as `Receiver<OperatorCommand>`.
|
/// forwards it to `operator_bridge` as `Receiver<OperatorCommand>`.
|
||||||
pub fn take_command_receiver(&mut self) -> Option<mpsc::Receiver<OperatorCommand>> {
|
pub fn take_command_receiver(&mut self) -> Option<mpsc::Receiver<OperatorCommand>> {
|
||||||
self.commands_rx.take()
|
self.commands_rx.take()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn the Tonic server. Returns a JoinHandle that runs until
|
||||||
|
/// `shutdown` is signalled (closing the returned `shutdown_tx`).
|
||||||
|
/// The server is bound on `config.listen_addr`.
|
||||||
|
pub fn spawn_grpc_server(
|
||||||
|
&self,
|
||||||
|
) -> Result<(
|
||||||
|
JoinHandle<std::result::Result<(), tonic::transport::Error>>,
|
||||||
|
GrpcShutdown,
|
||||||
|
)> {
|
||||||
|
let listen_addr = self.config.listen_addr;
|
||||||
|
let publisher = Arc::clone(&self.publisher);
|
||||||
|
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
|
||||||
|
|
||||||
|
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
|
||||||
|
let join = tokio::spawn(async move {
|
||||||
|
Server::builder()
|
||||||
|
.add_service(svc)
|
||||||
|
.serve_with_shutdown(listen_addr, async move {
|
||||||
|
let _ = shutdown_rx.await;
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
join,
|
||||||
|
GrpcShutdown {
|
||||||
|
tx: Some(shutdown_tx),
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the Tonic server bound on a specific `TcpListener`.
|
||||||
|
/// Useful for tests that need to know the actual port ahead of
|
||||||
|
/// time (bind to `127.0.0.1:0` then read the assigned port).
|
||||||
|
pub fn spawn_grpc_server_on(
|
||||||
|
&self,
|
||||||
|
listener: std::net::TcpListener,
|
||||||
|
) -> Result<(
|
||||||
|
JoinHandle<std::result::Result<(), tonic::transport::Error>>,
|
||||||
|
GrpcShutdown,
|
||||||
|
)> {
|
||||||
|
listener
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.map_err(|e| AutopilotError::Internal(format!("set_nonblocking: {e}")))?;
|
||||||
|
let tokio_listener = tokio::net::TcpListener::from_std(listener)
|
||||||
|
.map_err(|e| AutopilotError::Internal(format!("TcpListener::from_std: {e}")))?;
|
||||||
|
let stream = tokio_stream::wrappers::TcpListenerStream::new(tokio_listener);
|
||||||
|
|
||||||
|
let publisher = Arc::clone(&self.publisher);
|
||||||
|
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
|
||||||
|
let svc = TelemetryStreamServer::new(TelemetryService::new(publisher));
|
||||||
|
|
||||||
|
let join = tokio::spawn(async move {
|
||||||
|
Server::builder()
|
||||||
|
.add_service(svc)
|
||||||
|
.serve_with_incoming_shutdown(stream, async move {
|
||||||
|
let _ = shutdown_rx.await;
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
join,
|
||||||
|
GrpcShutdown {
|
||||||
|
tx: Some(shutdown_tx),
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RAII shutdown trigger for the spawned gRPC server. Drop the value
|
||||||
|
/// or call `shutdown()` to stop the server.
|
||||||
|
pub struct GrpcShutdown {
|
||||||
|
tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GrpcShutdown {
|
||||||
|
pub fn shutdown(mut self) {
|
||||||
|
if let Some(tx) = self.tx.take() {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for GrpcShutdown {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(tx) = self.tx.take() {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TelemetryStreamHandle {
|
pub struct TelemetryStreamHandle {
|
||||||
|
publisher: Arc<TelemetryPublisher>,
|
||||||
commands_tx: mpsc::Sender<OperatorCommand>,
|
commands_tx: mpsc::Sender<OperatorCommand>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TelemetryStreamHandle {
|
impl TelemetryStreamHandle {
|
||||||
/// Inject an operator command. Production path is fed by the downlink
|
/// Publish a payload on `topic`. Never blocks the caller; slow
|
||||||
/// receiver in `internal::downlink/*`; tests can call this directly.
|
/// subscribers experience drops accounted in [`snapshot`].
|
||||||
|
pub fn publish<T: serde::Serialize>(
|
||||||
|
&self,
|
||||||
|
topic: TelemetryTopic,
|
||||||
|
payload: &T,
|
||||||
|
) -> std::result::Result<(), PublishError> {
|
||||||
|
self.publisher.publish(topic, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inject an operator command downlink. Production path is fed
|
||||||
|
/// by the gRPC return half once AZ-678 lands; tests may call this
|
||||||
|
/// directly.
|
||||||
pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> {
|
pub async fn submit_command(&self, command: OperatorCommand) -> Result<()> {
|
||||||
self.commands_tx
|
self.commands_tx
|
||||||
.send(command)
|
.send(command)
|
||||||
@@ -59,8 +226,32 @@ impl TelemetryStreamHandle {
|
|||||||
.map_err(|_| AutopilotError::Internal("downlink channel closed".into()))
|
.map_err(|_| AutopilotError::Internal("downlink channel closed".into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn snapshot(&self) -> PublisherSnapshot {
|
||||||
|
self.publisher.snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn health(&self) -> ComponentHealth {
|
pub fn health(&self) -> ComponentHealth {
|
||||||
ComponentHealth::disabled(NAME)
|
let snap = self.publisher.snapshot();
|
||||||
|
let mut h = ComponentHealth::green(NAME);
|
||||||
|
|
||||||
|
let hot_drops: Vec<_> = snap
|
||||||
|
.drops_total
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, &v)| v >= DROP_YELLOW_THRESHOLD)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let detail = format!(
|
||||||
|
"subscribers={} published_total={} hot_drop_pairs={}",
|
||||||
|
snap.subscribed_clients,
|
||||||
|
snap.published_total,
|
||||||
|
hot_drops.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
if !hot_drops.is_empty() {
|
||||||
|
h.level = HealthLevel::Yellow;
|
||||||
|
}
|
||||||
|
h.detail = Some(detail);
|
||||||
|
h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,14 +259,14 @@ impl TelemetryStreamHandle {
|
|||||||
impl TelemetrySink for TelemetryStreamHandle {
|
impl TelemetrySink for TelemetryStreamHandle {
|
||||||
async fn push_frame(&self, _frame: Frame) -> Result<()> {
|
async fn push_frame(&self, _frame: Frame) -> Result<()> {
|
||||||
Err(AutopilotError::NotImplemented(
|
Err(AutopilotError::NotImplemented(
|
||||||
"telemetry_stream::push_frame (AZ-676)",
|
"telemetry_stream::push_frame (AZ-676 video path)",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn push_detections(&self, _batch: DetectionBatch) -> Result<()> {
|
async fn push_detections(&self, batch: DetectionBatch) -> Result<()> {
|
||||||
Err(AutopilotError::NotImplemented(
|
self.publisher
|
||||||
"telemetry_stream::push_detections (AZ-675)",
|
.publish(Topic::DetectionEvent, &batch)
|
||||||
))
|
.map_err(|e| AutopilotError::Internal(format!("publish detections: {e}")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -84,8 +275,35 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn it_compiles() {
|
fn handle_starts_with_zero_subscribers_and_green_health() {
|
||||||
let h = TelemetryStream::new(8).handle();
|
// Arrange
|
||||||
assert_eq!(h.health().level, shared::health::HealthLevel::Disabled);
|
let s = TelemetryStream::new(8);
|
||||||
|
let h = s.handle();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let snap = h.snapshot();
|
||||||
|
let health = h.health();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(snap.subscribed_clients, 0);
|
||||||
|
assert_eq!(snap.published_total, 0);
|
||||||
|
assert_eq!(health.level, HealthLevel::Green);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn publish_without_subscribers_is_no_op_but_counts() {
|
||||||
|
// Arrange
|
||||||
|
let s = TelemetryStream::new(8);
|
||||||
|
let h = s.handle();
|
||||||
|
|
||||||
|
// Act
|
||||||
|
h.publish(
|
||||||
|
TelemetryTopic::TelemetrySample,
|
||||||
|
&serde_json::json!({"v": 1}),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(h.snapshot().per_topic[&Topic::TelemetrySample].published, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,366 @@
|
|||||||
|
//! AZ-675 integration tests — Tonic server + per-client lossy queue
|
||||||
|
//! exercised through an in-process gRPC client (the only stub allowed
|
||||||
|
//! per the task's Runtime Completeness gate).
|
||||||
|
|
||||||
|
use std::net::TcpListener;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tonic::transport::{Channel, Endpoint};
|
||||||
|
use tonic::Request;
|
||||||
|
|
||||||
|
use telemetry_stream::{SubscribeRequest, TelemetryStream, TelemetryStreamClient, TelemetryTopic};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
|
||||||
|
struct Sample {
|
||||||
|
n: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bind a TCP listener on `127.0.0.1:0`, return the listener and the
|
||||||
|
/// resolved port. The test stack uses ephemeral ports so multiple
|
||||||
|
/// integration tests in the same suite never collide.
|
||||||
|
fn bind_ephemeral() -> (TcpListener, u16) {
|
||||||
|
let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral");
|
||||||
|
let port = l.local_addr().unwrap().port();
|
||||||
|
(l, port)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect(port: u16) -> TelemetryStreamClient<Channel> {
|
||||||
|
let url = format!("http://127.0.0.1:{port}");
|
||||||
|
let endpoint = Endpoint::from_shared(url)
|
||||||
|
.unwrap()
|
||||||
|
.connect_timeout(Duration::from_secs(2));
|
||||||
|
|
||||||
|
// Tiny retry loop — the server can take a few ms to bind.
|
||||||
|
for _ in 0..50 {
|
||||||
|
if let Ok(c) = TelemetryStreamClient::connect(endpoint.clone()).await {
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||||
|
}
|
||||||
|
panic!("gRPC client failed to connect within ~1s");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-1 — multiple subscribers receive the same stream, ordering preserved.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ac1_multiple_subscribers_receive_same_stream() {
|
||||||
|
// Arrange
|
||||||
|
let (listener, port) = bind_ephemeral();
|
||||||
|
let server = TelemetryStream::new(8);
|
||||||
|
let handle = server.handle();
|
||||||
|
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
|
||||||
|
|
||||||
|
let mut c1 = connect(port).await;
|
||||||
|
let mut c2 = connect(port).await;
|
||||||
|
let mut c3 = connect(port).await;
|
||||||
|
|
||||||
|
let req = |id: &str| {
|
||||||
|
Request::new(SubscribeRequest {
|
||||||
|
client_id: id.to_string(),
|
||||||
|
topics: vec![TelemetryTopic::TelemetrySample as i32],
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut s1 = c1.subscribe(req("a")).await.unwrap().into_inner();
|
||||||
|
let mut s2 = c2.subscribe(req("b")).await.unwrap().into_inner();
|
||||||
|
let mut s3 = c3.subscribe(req("c")).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
// Give the server time to register all three before publishing.
|
||||||
|
for _ in 0..50 {
|
||||||
|
if handle.snapshot().subscribed_clients == 3 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
assert_eq!(handle.snapshot().subscribed_clients, 3);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for n in 0..100u32 {
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::TelemetrySample, &Sample { n })
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn collect100(
|
||||||
|
s: &mut tonic::Streaming<telemetry_stream::TelemetryMessage>,
|
||||||
|
) -> Vec<Sample> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
while out.len() < 100 {
|
||||||
|
let msg = timeout(Duration::from_secs(2), s.next())
|
||||||
|
.await
|
||||||
|
.expect("ac1 client did not receive 100 in 2s")
|
||||||
|
.expect("stream ended")
|
||||||
|
.expect("server status");
|
||||||
|
let sample: Sample = serde_json::from_slice(&msg.payload_json).unwrap();
|
||||||
|
out.push(sample);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
let r1 = collect100(&mut s1).await;
|
||||||
|
let r2 = collect100(&mut s2).await;
|
||||||
|
let r3 = collect100(&mut s3).await;
|
||||||
|
|
||||||
|
// Assert — all three receive all 100 in order.
|
||||||
|
let expected: Vec<Sample> = (0..100).map(|n| Sample { n }).collect();
|
||||||
|
assert_eq!(r1, expected);
|
||||||
|
assert_eq!(r2, expected);
|
||||||
|
assert_eq!(r3, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-2 — slow subscriber drops oldest, healthy unaffected.
|
||||||
|
///
|
||||||
|
/// We can't simulate "client never polls" through real tonic+H2
|
||||||
|
/// because the OS TCP + HTTP/2 receive buffers (often 256 KB+) will
|
||||||
|
/// absorb a small JSON burst entirely with no observable drop. The
|
||||||
|
/// realistic AC-2 condition is "slow drain rate" — the slow client
|
||||||
|
/// polls but at a fraction of the publish rate. With large payloads
|
||||||
|
/// and a small per-topic broadcast capacity, the slow client's
|
||||||
|
/// broadcast cursor falls behind the producer and `BroadcastStream`
|
||||||
|
/// emits `Lagged(n)` — the only signal AZ-675 wires into the per-
|
||||||
|
/// `(client_id, topic)` drop counters.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ac2_slow_subscriber_drops_oldest_healthy_unaffected() {
|
||||||
|
// Arrange — capacity 4, big payloads so wire back-pressure
|
||||||
|
// propagates fast and broadcast lag is observable end-to-end.
|
||||||
|
let (listener, port) = bind_ephemeral();
|
||||||
|
let cfg = telemetry_stream::TelemetryStreamConfig {
|
||||||
|
topic_capacity: 4,
|
||||||
|
..telemetry_stream::TelemetryStreamConfig::default()
|
||||||
|
};
|
||||||
|
let server = TelemetryStream::with_config(cfg);
|
||||||
|
let handle = server.handle();
|
||||||
|
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct Big {
|
||||||
|
n: u32,
|
||||||
|
// 64 KB filler per message — fills H2 receive window quickly.
|
||||||
|
pad: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut c_fast = connect(port).await;
|
||||||
|
let mut c_slow = connect(port).await;
|
||||||
|
let req = |id: &str| {
|
||||||
|
Request::new(SubscribeRequest {
|
||||||
|
client_id: id.to_string(),
|
||||||
|
topics: vec![TelemetryTopic::TelemetrySample as i32],
|
||||||
|
})
|
||||||
|
};
|
||||||
|
let mut s_fast = c_fast.subscribe(req("fast")).await.unwrap().into_inner();
|
||||||
|
let mut s_slow = c_slow.subscribe(req("slow")).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
for _ in 0..50 {
|
||||||
|
if handle.snapshot().subscribed_clients == 2 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
assert_eq!(handle.snapshot().subscribed_clients, 2);
|
||||||
|
|
||||||
|
let total: u32 = 300;
|
||||||
|
|
||||||
|
// Fast drains immediately, slow drains 1 msg every 50ms (way below publish rate).
|
||||||
|
let fast_task = tokio::spawn(async move {
|
||||||
|
let mut seen = 0u32;
|
||||||
|
while seen < total {
|
||||||
|
let _msg = timeout(Duration::from_secs(15), s_fast.next())
|
||||||
|
.await
|
||||||
|
.expect("fast did not receive everything in 15s")
|
||||||
|
.expect("stream ended")
|
||||||
|
.expect("server status");
|
||||||
|
seen += 1;
|
||||||
|
}
|
||||||
|
seen
|
||||||
|
});
|
||||||
|
let slow_task = tokio::spawn(async move {
|
||||||
|
let mut received = 0u32;
|
||||||
|
let deadline = std::time::Instant::now() + Duration::from_secs(30);
|
||||||
|
while std::time::Instant::now() < deadline {
|
||||||
|
match timeout(Duration::from_millis(500), s_slow.next()).await {
|
||||||
|
Ok(Some(Ok(_msg))) => received += 1,
|
||||||
|
Ok(Some(Err(_))) => break,
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
}
|
||||||
|
received
|
||||||
|
});
|
||||||
|
|
||||||
|
// Act — burst publishes at ~1ms cadence with 64 KB payloads so
|
||||||
|
// broadcast capacity is overrun for the slow consumer.
|
||||||
|
let payload_pad = vec![0u8; 64 * 1024];
|
||||||
|
for n in 0..total {
|
||||||
|
handle
|
||||||
|
.publish(
|
||||||
|
TelemetryTopic::TelemetrySample,
|
||||||
|
&Big {
|
||||||
|
n,
|
||||||
|
pad: payload_pad.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
tokio::time::sleep(Duration::from_millis(2)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let fast_count = fast_task.await.unwrap();
|
||||||
|
let slow_count = slow_task.await.unwrap();
|
||||||
|
|
||||||
|
let snap = handle.snapshot();
|
||||||
|
let slow_drops = snap
|
||||||
|
.drops_total
|
||||||
|
.get(&("slow".to_string(), TelemetryTopic::TelemetrySample))
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0);
|
||||||
|
let fast_drops = snap
|
||||||
|
.drops_total
|
||||||
|
.get(&("fast".to_string(), TelemetryTopic::TelemetrySample))
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(fast_count, total, "fast client receives every message");
|
||||||
|
assert_eq!(fast_drops, 0, "fast client MUST NOT have drops");
|
||||||
|
assert!(
|
||||||
|
slow_drops > 0,
|
||||||
|
"slow client MUST have lagged at some point (got 0 drops; slow_count={slow_count})"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
slow_count < total,
|
||||||
|
"slow client should have lost messages (got {slow_count}/{total})"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
(slow_count as u64) + slow_drops <= (total as u64) + 4,
|
||||||
|
"accounted={} should not exceed total+capacity",
|
||||||
|
(slow_count as u64) + slow_drops
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC-3 — disconnect cleanly removes subscriber.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ac3_disconnect_decrements_subscribed_clients() {
|
||||||
|
// Arrange
|
||||||
|
let (listener, port) = bind_ephemeral();
|
||||||
|
let server = TelemetryStream::new(8);
|
||||||
|
let handle = server.handle();
|
||||||
|
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
|
||||||
|
|
||||||
|
let mut c1 = connect(port).await;
|
||||||
|
let req = || {
|
||||||
|
Request::new(SubscribeRequest {
|
||||||
|
client_id: "ephemeral".to_string(),
|
||||||
|
topics: vec![],
|
||||||
|
})
|
||||||
|
};
|
||||||
|
let s1 = c1.subscribe(req()).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
for _ in 0..50 {
|
||||||
|
if handle.snapshot().subscribed_clients == 1 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
assert_eq!(handle.snapshot().subscribed_clients, 1);
|
||||||
|
|
||||||
|
// Act — drop the stream (simulates client cancel).
|
||||||
|
drop(s1);
|
||||||
|
drop(c1);
|
||||||
|
|
||||||
|
// Assert — register/deregister is eventually consistent; wait briefly.
|
||||||
|
let mut final_count = 99;
|
||||||
|
for _ in 0..100 {
|
||||||
|
final_count = handle.snapshot().subscribed_clients;
|
||||||
|
if final_count == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
final_count, 0,
|
||||||
|
"disconnect must decrement subscribed_clients"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribing to an empty `topics` list defaults to ALL topics.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_topics_list_defaults_to_all() {
|
||||||
|
// Arrange
|
||||||
|
let (listener, port) = bind_ephemeral();
|
||||||
|
let server = TelemetryStream::new(8);
|
||||||
|
let handle = server.handle();
|
||||||
|
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
|
||||||
|
|
||||||
|
let mut client = connect(port).await;
|
||||||
|
let mut stream = client
|
||||||
|
.subscribe(Request::new(SubscribeRequest {
|
||||||
|
client_id: "all".to_string(),
|
||||||
|
topics: vec![],
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
for _ in 0..50 {
|
||||||
|
if handle.snapshot().subscribed_clients == 1 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act — publish one message on each topic.
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::TelemetrySample, &Sample { n: 1 })
|
||||||
|
.unwrap();
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::GimbalState, &Sample { n: 2 })
|
||||||
|
.unwrap();
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::DetectionEvent, &Sample { n: 3 })
|
||||||
|
.unwrap();
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::MovementCandidate, &Sample { n: 4 })
|
||||||
|
.unwrap();
|
||||||
|
handle
|
||||||
|
.publish(TelemetryTopic::MapObjectsBundle, &Sample { n: 5 })
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut seen_topics = std::collections::HashSet::new();
|
||||||
|
while seen_topics.len() < 5 {
|
||||||
|
let msg = timeout(Duration::from_secs(2), stream.next())
|
||||||
|
.await
|
||||||
|
.expect("did not receive all 5 topics in 2s")
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
seen_topics.insert(msg.topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(seen_topics.len(), 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Empty client_id is rejected at the server boundary.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_client_id_is_rejected() {
|
||||||
|
// Arrange
|
||||||
|
let (listener, port) = bind_ephemeral();
|
||||||
|
let server = TelemetryStream::new(8);
|
||||||
|
let _h = server.handle();
|
||||||
|
let (_join, _guard) = server.spawn_grpc_server_on(listener).unwrap();
|
||||||
|
|
||||||
|
let mut client = connect(port).await;
|
||||||
|
|
||||||
|
// Act
|
||||||
|
let err = client
|
||||||
|
.subscribe(Request::new(SubscribeRequest {
|
||||||
|
client_id: String::new(),
|
||||||
|
topics: vec![],
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect_err("empty client_id must return Status error");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user