# Batch 14 / Cycle 1 — Implementation Report **Date**: 2026-05-20 **Tasks**: AZ-675 **Verdict**: PASS_WITH_WARNINGS - Pre-existing autopilot lint from batch 4 (C5) still open. - Pre-existing intermittent flake `mission_executor::state_machine::ac3_bounded_retry_then_success` (carried from batch 8) now fails reproducibly *under workspace load* on this dev box but still passes in isolation; root cause is a 5 ms polling-interval race in the test, not in `mission_executor` production code. Documented as A2 below — unchanged by this batch and unrelated to telemetry_stream. ## 1. Scope | Ticket | Title | Crate | Complexity | |---|---|---|---| | AZ-675 | telemetry_stream Tonic gRPC server + per-client lossy queue | `telemetry_stream` | 3 | Batch 14 is a single-ticket batch by deliberate choice. Both AZ-675 and AZ-658 were the only unblocked tasks; AZ-658 has an open architectural decision (which H.264 binding) and was held back. Picking AZ-675 also unblocks AZ-676 / AZ-677 / AZ-678 / AZ-679 (the full telemetry → operator-bridge frontier) for subsequent batches. ## 2. Approach ### Tonic infrastructure decision `telemetry_stream/description.md §9` lists the operator-link protocol (WebRTC / WebSocket-H.264 / gRPC server-streaming) as an open architectural question. AZ-675's task spec, however, names **Tonic gRPC** explicitly and the Runtime Completeness gate says "Production code that must exist: real gRPC server". The user picked path **A: commit to Tonic now**, which: - Pins the operator-link transport to gRPC server-streaming (closes architecture Q2 in the affirmative for the gRPC option). - Adds **first-time** `tonic` / `prost` / `tonic-build` infrastructure to the workspace. The `detection_client/Cargo.toml` comment on line 16 anticipated this; the next ticket to need it (AZ-660) can now reuse the same workspace pins. - Uses `protoc-bin-vendored` as a build-dependency so neither dev machines nor CI need a system `protoc` install. The build is hermetic and reproducible across platforms. Workspace pins added: `tonic = "0.14"`, `tonic-prost = "0.14"`, `prost = "0.14"`, `prost-types = "0.14"`, `tonic-prost-build = "0.14"` (build-dep), `protoc-bin-vendored = "3"` (build-dep), `tokio-stream = "0.1"` with `sync,net` features (needed for `BroadcastStream` + `TcpListenerStream`), `parking_lot = "0.12"`. ### Back-pressure model — broadcast-direct, no intermediate buffer The first draft of `internal/server.rs` used a per-client mpsc forwarder between the broadcast queue and the tonic stream. That hid the back-pressure: the forwarder blocked on `mpsc::send` long before the broadcast ring ever overflowed, so `RecvError::Lagged(n)` never fired and drop counters stayed at zero. **Lesson**: in a multi-stage queue where the *outer* stage is supposed to enforce drop-oldest, do not introduce a buffering middle stage that absorbs the back-pressure invisibly. The shipped design feeds the broadcast receivers **directly** into the tonic-streamed response (via `tokio_stream::wrappers::BroadcastStream` merged through `tokio_stream::StreamMap`). When a wire/client is slow, tonic stops polling our stream → broadcast ring overruns that client's cursor → next poll yields `Err(BroadcastStreamRecvError::Lagged(n))` → drop counter incremented per (client_id, topic). Other clients are unaffected. ### What this batch ships in production - **`proto/telemetry.proto`** — `TelemetryStream` service with a single server-streaming `Subscribe(SubscribeRequest) -> stream TelemetryMessage` RPC, five topics (`TelemetrySample`, `GimbalState`, `DetectionEvent`, `MovementCandidate`, `MapObjectsBundle`). Payloads are carried as opaque JSON in `bytes payload_json` so the canonical Rust models in `crates/shared/models/` stay authoritative. - **`build.rs`** — wires `protoc-bin-vendored` into `tonic-prost-build` so codegen runs from `cargo build` alone. - **`internal/publisher.rs`** — `TelemetryPublisher` with one `tokio::sync::broadcast` channel per topic, per-(client, topic) drop counters under `parking_lot::Mutex`, atomic `subscribed_clients` / `published_total` / `bytes_out_per_topic`. - **`internal/server.rs`** — `TelemetryService` implementing `proto::telemetry_stream_server::TelemetryStream::subscribe`; validates `client_id` non-empty; resolves topic list (empty = subscribe-all); merges per-topic `BroadcastStream`s with `StreamMap`; converts `Lagged` into drop-counter updates; `StreamGuard` decrements `subscribed_clients` on stream drop. - **`src/lib.rs`** — rewritten public surface: - `TelemetryStreamConfig { listen_addr, topic_capacity, downlink_capacity }`. - `TelemetryStream::spawn_grpc_server` (binds an addr) and `spawn_grpc_server_on(listener)` (binds a pre-resolved `std::net::TcpListener` — used by tests to pick ephemeral ports). - `GrpcShutdown` RAII handle. - `TelemetryStreamHandle::publish(topic, &T)` non-blocking publish API. - `TelemetryStreamHandle::snapshot()` for health-aggregator integration. - `TelemetryStreamHandle::health()` flips yellow when any (client, topic) has ≥ 100 drops. - `TelemetrySink::push_detections` is now real (publishes on `DetectionEvent` topic). `push_frame` still returns `NotImplemented(AZ-676)` because video carries different framing semantics that AZ-676 will pin. ## 3. Files touched - `Cargo.toml` — workspace pins for tonic stack + tokio-stream features + parking_lot. - `Cargo.lock` — regenerated for the new deps. - `crates/telemetry_stream/Cargo.toml` — concrete deps + `build.rs` declaration. - `crates/telemetry_stream/build.rs` — new (vendored protoc + tonic-prost-build). - `crates/telemetry_stream/proto/telemetry.proto` — new. - `crates/telemetry_stream/src/lib.rs` — rewrite (public surface). - `crates/telemetry_stream/src/internal/mod.rs` — new. - `crates/telemetry_stream/src/internal/proto.rs` — new (`tonic::include_proto!` hook). - `crates/telemetry_stream/src/internal/publisher.rs` — new (with 4 unit tests). - `crates/telemetry_stream/src/internal/server.rs` — new (gRPC service impl). - `crates/telemetry_stream/tests/grpc_subscribe.rs` — new (5 integration tests covering AC-1..AC-3 + edge cases). - `_docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md` — moved from `todo/`. - `_docs/_autodev_state.md` — phase update. - `_docs/03_implementation/batch_14_cycle1_report.md` — this report. ## 4. Test results | Crate | Unit | Integration | Total | |---|---|---|---| | `telemetry_stream` | 6 | 5 | 11 | Clippy: `cargo clippy -p telemetry_stream --all-targets -- -D warnings` is clean. Workspace `cargo test --workspace`: all suites green **except** the pre-existing `mission_executor::state_machine::ac3_bounded_retry_then_success` flake — see A2. ### Acceptance criteria | AC | Test | Status | |---|---|---| | AC-1 multiple subscribers receive same stream (ordering preserved) | `tests/grpc_subscribe.rs::ac1_multiple_subscribers_receive_same_stream` | ✅ | | AC-2 slow subscriber drops oldest, healthy unaffected | `tests/grpc_subscribe.rs::ac2_slow_subscriber_drops_oldest_healthy_unaffected` + `internal/publisher.rs::slow_subscriber_lags_fast_subscriber_does_not` | ✅ | | AC-3 disconnect cleanly removes subscriber | `tests/grpc_subscribe.rs::ac3_disconnect_decrements_subscribed_clients` | ✅ | | Empty topics defaults to ALL | `tests/grpc_subscribe.rs::empty_topics_list_defaults_to_all` | ✅ | | Empty client_id rejected at boundary | `tests/grpc_subscribe.rs::empty_client_id_is_rejected` | ✅ | ## 5. Findings (this batch) ### A1. Pre-existing dead-code error in `autopilot::Runtime::vlm_provider_name` **Severity**: High (still blocks workspace `-D warnings` clippy gate) **Status**: OPEN — carried since batch 4. Not introduced by this batch. Tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` and cumulative finding C5. ### A2. Pre-existing `ac3_bounded_retry_then_success` flake escalation **Severity**: Medium (Test design) **Category**: Tests **Origin**: Batch 8 mission_executor; behaviour unchanged by this batch. The test polls `handle.state()` every 5 ms while waiting for `MissionUploaded`, but with `tick_interval=5ms` and a one-rejected-then-accepted scripted driver, the FSM can pass *through* `MissionUploaded` faster than the poll cadence and the await reports `stuck at WaitAuto`. Confirmed pre-existing — `git stash` of batch-14 changes reproduces the same intermittent failure, and the test passes in isolation. The proximate cause is the test's polling design, not `mission_executor` production code. This batch's new transitive deps (tonic/prost stack) increase background compile / runtime load on dev boxes, which may make the race more likely to lose. The fix belongs to a small focused test refactor (latch on FSM transition events instead of polling), filed as a leftover. → Filed `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`. ### A3. Architecture Q2 (operator-link protocol) now decided **Severity**: Low (Architecture / doc sync) **Detail**: `telemetry_stream/description.md §9` listed protocol as TBD. With AZ-675 shipping a Tonic-based gRPC server, this is effectively decided in favour of gRPC server-streaming. The architecture doc was not edited in this batch (out of scope; see C3 doc-sync sweep). When the doc sweep runs, this should move from "open question" to a recorded decision in `_docs/02_document/decision-rationale.md`. ## 6. Cumulative findings — open carry-over Batch 14 is mid-triplet (13 / 14 / 15); cumulative review lands at the end of batch 15. | ID | Severity | Category | Status | |---|---|---|---| | C1 | Medium | Maintainability | OPEN — duplicated `SendCommandError` mapping in `gimbal_controller` (batches 9-10) | | C2 | Low | Style | OPEN — `MavlinkCommandIssuer` naming inconsistency (batch 9) | | C3 | Low | Architecture | OPEN — `module-layout.md` drift: grows by `telemetry_stream/internal/{publisher,server,proto}.rs` this batch | | C4 | Low | Architecture | OPEN — `data_model.md §PanPlan` definition still missing (batch 11) | | C5 | High | Maintenance | OPEN — pre-existing `autopilot/runtime.rs::vlm_provider_name` dead-code error blocking workspace `-D warnings` clippy | | C6 (new) | Medium | Tests | OPEN — `ac3_bounded_retry_then_success` polling race (see A2) | | C7 (new) | Low | Architecture | OPEN — record Tonic-gRPC decision in `decision-rationale.md` (see A3) | ## 7. Next-batch candidates - **AZ-678** — operator_bridge command authentication. Depends on AZ-675 (now done). 5 pts. - **AZ-679** — operator_bridge POI surface. Depends on AZ-675 (now done) + uses the AZ-683 POI queue + AZ-685 decline path. 3 pts. Cleanly buildable as a Subscribe-style or push consumer of `MapObjectsBundle` / `POI` topics through the AZ-675 server. - **AZ-676** — telemetry_stream video path. Depends on AZ-675 (now done) + AZ-657 (already done). 3 pts. Self-contained extension to the AZ-675 server. - **AZ-677** — telemetry_stream MapObjects snapshot. Depends on AZ-675 (now done) + AZ-667 (not done — blocked). - **AZ-658** — frame_ingest decoder. Still needs the H.264 binding decision (retina vs ffmpeg-rs vs gstreamer). 5 pts.