Files
Oleksandr Bezdieniezhnykh ff790bd639
ci/woodpecker/push/build-arm Pipeline failed
[AZ-675] telemetry_stream Tonic gRPC server + per-client lossy queue
Pins operator-link transport to gRPC server-streaming (closes
architecture Q2 in favour of gRPC). Adds first-time tonic / prost /
tonic-build infrastructure to the workspace; uses
protoc-bin-vendored so neither dev machines nor CI need system
protoc installed.

Design — back-pressure lives in the per-topic tokio::sync::broadcast
ring, drained directly by the tonic-streamed response via
BroadcastStream + StreamMap. No intermediate mpsc buffer that could
absorb back-pressure invisibly. Slow client overrun -> Lagged(n)
event -> per-(client_id, topic) drop counter incremented; healthy
clients on the same topic are unaffected.

Service surface — Subscribe(SubscribeRequest) -> stream
TelemetryMessage; five topics (TelemetrySample, GimbalState,
DetectionEvent, MovementCandidate, MapObjectsBundle); empty topics
list defaults to subscribe-all; empty client_id rejected; stream
drop decrements subscribed_clients via StreamGuard. TelemetrySink
push_detections is now real; push_frame still NotImplemented(AZ-676
video path).

Tests — 6 unit + 5 integration (AC-1..AC-3 via in-process gRPC
client, plus subscribe-all default + empty-client_id rejection).
Clippy on telemetry_stream clean.

Pre-existing mission_executor ac3 test polling race surfaces more
reliably under the new tonic build pressure; documented as
_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md
and unchanged by this batch.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 12:44:39 +03:00

11 KiB

Batch 14 / Cycle 1 — Implementation Report

Date: 2026-05-20 Tasks: AZ-675 Verdict: PASS_WITH_WARNINGS

  • Pre-existing autopilot lint from batch 4 (C5) still open.
  • Pre-existing intermittent flake mission_executor::state_machine::ac3_bounded_retry_then_success (carried from batch 8) now fails reproducibly under workspace load on this dev box but still passes in isolation; root cause is a 5 ms polling-interval race in the test, not in mission_executor production code. Documented as A2 below — unchanged by this batch and unrelated to telemetry_stream.

1. Scope

Ticket Title Crate Complexity
AZ-675 telemetry_stream Tonic gRPC server + per-client lossy queue telemetry_stream 3

Batch 14 is a single-ticket batch by deliberate choice. Both AZ-675 and AZ-658 were the only unblocked tasks; AZ-658 has an open architectural decision (which H.264 binding) and was held back. Picking AZ-675 also unblocks AZ-676 / AZ-677 / AZ-678 / AZ-679 (the full telemetry → operator-bridge frontier) for subsequent batches.

2. Approach

Tonic infrastructure decision

telemetry_stream/description.md §9 lists the operator-link protocol (WebRTC / WebSocket-H.264 / gRPC server-streaming) as an open architectural question. AZ-675's task spec, however, names Tonic gRPC explicitly and the Runtime Completeness gate says "Production code that must exist: real gRPC server". The user picked path A: commit to Tonic now, which:

  • Pins the operator-link transport to gRPC server-streaming (closes architecture Q2 in the affirmative for the gRPC option).
  • Adds first-time tonic / prost / tonic-build infrastructure to the workspace. The detection_client/Cargo.toml comment on line 16 anticipated this; the next ticket to need it (AZ-660) can now reuse the same workspace pins.
  • Uses protoc-bin-vendored as a build-dependency so neither dev machines nor CI need a system protoc install. The build is hermetic and reproducible across platforms.

Workspace pins added: tonic = "0.14", tonic-prost = "0.14", prost = "0.14", prost-types = "0.14", tonic-prost-build = "0.14" (build-dep), protoc-bin-vendored = "3" (build-dep), tokio-stream = "0.1" with sync,net features (needed for BroadcastStream + TcpListenerStream), parking_lot = "0.12".

Back-pressure model — broadcast-direct, no intermediate buffer

The first draft of internal/server.rs used a per-client mpsc forwarder between the broadcast queue and the tonic stream. That hid the back-pressure: the forwarder blocked on mpsc::send long before the broadcast ring ever overflowed, so RecvError::Lagged(n) never fired and drop counters stayed at zero. Lesson: in a multi-stage queue where the outer stage is supposed to enforce drop-oldest, do not introduce a buffering middle stage that absorbs the back-pressure invisibly.

The shipped design feeds the broadcast receivers directly into the tonic-streamed response (via tokio_stream::wrappers::BroadcastStream merged through tokio_stream::StreamMap). When a wire/client is slow, tonic stops polling our stream → broadcast ring overruns that client's cursor → next poll yields Err(BroadcastStreamRecvError::Lagged(n)) → drop counter incremented per (client_id, topic). Other clients are unaffected.

What this batch ships in production

  • proto/telemetry.protoTelemetryStream service with a single server-streaming Subscribe(SubscribeRequest) -> stream TelemetryMessage RPC, five topics (TelemetrySample, GimbalState, DetectionEvent, MovementCandidate, MapObjectsBundle). Payloads are carried as opaque JSON in bytes payload_json so the canonical Rust models in crates/shared/models/ stay authoritative.
  • build.rs — wires protoc-bin-vendored into tonic-prost-build so codegen runs from cargo build alone.
  • internal/publisher.rsTelemetryPublisher with one tokio::sync::broadcast channel per topic, per-(client, topic) drop counters under parking_lot::Mutex, atomic subscribed_clients / published_total / bytes_out_per_topic.
  • internal/server.rsTelemetryService implementing proto::telemetry_stream_server::TelemetryStream::subscribe; validates client_id non-empty; resolves topic list (empty = subscribe-all); merges per-topic BroadcastStreams with StreamMap; converts Lagged into drop-counter updates; StreamGuard decrements subscribed_clients on stream drop.
  • src/lib.rs — rewritten public surface:
    • TelemetryStreamConfig { listen_addr, topic_capacity, downlink_capacity }.
    • TelemetryStream::spawn_grpc_server (binds an addr) and spawn_grpc_server_on(listener) (binds a pre-resolved std::net::TcpListener — used by tests to pick ephemeral ports).
    • GrpcShutdown RAII handle.
    • TelemetryStreamHandle::publish<T>(topic, &T) non-blocking publish API.
    • TelemetryStreamHandle::snapshot() for health-aggregator integration.
    • TelemetryStreamHandle::health() flips yellow when any (client, topic) has ≥ 100 drops.
    • TelemetrySink::push_detections is now real (publishes on DetectionEvent topic). push_frame still returns NotImplemented(AZ-676) because video carries different framing semantics that AZ-676 will pin.

3. Files touched

  • Cargo.toml — workspace pins for tonic stack + tokio-stream features + parking_lot.
  • Cargo.lock — regenerated for the new deps.
  • crates/telemetry_stream/Cargo.toml — concrete deps + build.rs declaration.
  • crates/telemetry_stream/build.rs — new (vendored protoc + tonic-prost-build).
  • crates/telemetry_stream/proto/telemetry.proto — new.
  • crates/telemetry_stream/src/lib.rs — rewrite (public surface).
  • crates/telemetry_stream/src/internal/mod.rs — new.
  • crates/telemetry_stream/src/internal/proto.rs — new (tonic::include_proto! hook).
  • crates/telemetry_stream/src/internal/publisher.rs — new (with 4 unit tests).
  • crates/telemetry_stream/src/internal/server.rs — new (gRPC service impl).
  • crates/telemetry_stream/tests/grpc_subscribe.rs — new (5 integration tests covering AC-1..AC-3 + edge cases).
  • _docs/02_tasks/done/AZ-675_telemetry_stream_grpc_server.md — moved from todo/.
  • _docs/_autodev_state.md — phase update.
  • _docs/03_implementation/batch_14_cycle1_report.md — this report.

4. Test results

Crate Unit Integration Total
telemetry_stream 6 5 11

Clippy: cargo clippy -p telemetry_stream --all-targets -- -D warnings is clean.

Workspace cargo test --workspace: all suites green except the pre-existing mission_executor::state_machine::ac3_bounded_retry_then_success flake — see A2.

Acceptance criteria

AC Test Status
AC-1 multiple subscribers receive same stream (ordering preserved) tests/grpc_subscribe.rs::ac1_multiple_subscribers_receive_same_stream
AC-2 slow subscriber drops oldest, healthy unaffected tests/grpc_subscribe.rs::ac2_slow_subscriber_drops_oldest_healthy_unaffected + internal/publisher.rs::slow_subscriber_lags_fast_subscriber_does_not
AC-3 disconnect cleanly removes subscriber tests/grpc_subscribe.rs::ac3_disconnect_decrements_subscribed_clients
Empty topics defaults to ALL tests/grpc_subscribe.rs::empty_topics_list_defaults_to_all
Empty client_id rejected at boundary tests/grpc_subscribe.rs::empty_client_id_is_rejected

5. Findings (this batch)

A1. Pre-existing dead-code error in autopilot::Runtime::vlm_provider_name

Severity: High (still blocks workspace -D warnings clippy gate) Status: OPEN — carried since batch 4. Not introduced by this batch. Tracked in _docs/_process_leftovers/2026-05-20_autopilot_clippy.md and cumulative finding C5.

A2. Pre-existing ac3_bounded_retry_then_success flake escalation

Severity: Medium (Test design) Category: Tests Origin: Batch 8 mission_executor; behaviour unchanged by this batch.

The test polls handle.state() every 5 ms while waiting for MissionUploaded, but with tick_interval=5ms and a one-rejected-then-accepted scripted driver, the FSM can pass through MissionUploaded faster than the poll cadence and the await reports stuck at WaitAuto. Confirmed pre-existing — git stash of batch-14 changes reproduces the same intermittent failure, and the test passes in isolation. The proximate cause is the test's polling design, not mission_executor production code.

This batch's new transitive deps (tonic/prost stack) increase background compile / runtime load on dev boxes, which may make the race more likely to lose. The fix belongs to a small focused test refactor (latch on FSM transition events instead of polling), filed as a leftover.

→ Filed _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md.

Severity: Low (Architecture / doc sync) Detail: telemetry_stream/description.md §9 listed protocol as TBD. With AZ-675 shipping a Tonic-based gRPC server, this is effectively decided in favour of gRPC server-streaming. The architecture doc was not edited in this batch (out of scope; see C3 doc-sync sweep). When the doc sweep runs, this should move from "open question" to a recorded decision in _docs/02_document/decision-rationale.md.

6. Cumulative findings — open carry-over

Batch 14 is mid-triplet (13 / 14 / 15); cumulative review lands at the end of batch 15.

ID Severity Category Status
C1 Medium Maintainability OPEN — duplicated SendCommandError mapping in gimbal_controller (batches 9-10)
C2 Low Style OPEN — MavlinkCommandIssuer naming inconsistency (batch 9)
C3 Low Architecture OPEN — module-layout.md drift: grows by telemetry_stream/internal/{publisher,server,proto}.rs this batch
C4 Low Architecture OPEN — data_model.md §PanPlan definition still missing (batch 11)
C5 High Maintenance OPEN — pre-existing autopilot/runtime.rs::vlm_provider_name dead-code error blocking workspace -D warnings clippy
C6 (new) Medium Tests OPEN — ac3_bounded_retry_then_success polling race (see A2)
C7 (new) Low Architecture OPEN — record Tonic-gRPC decision in decision-rationale.md (see A3)

7. Next-batch candidates

  • AZ-678 — operator_bridge command authentication. Depends on AZ-675 (now done). 5 pts.
  • AZ-679 — operator_bridge POI surface. Depends on AZ-675 (now done) + uses the AZ-683 POI queue + AZ-685 decline path. 3 pts. Cleanly buildable as a Subscribe-style or push consumer of MapObjectsBundle / POI topics through the AZ-675 server.
  • AZ-676 — telemetry_stream video path. Depends on AZ-675 (now done) + AZ-657 (already done). 3 pts. Self-contained extension to the AZ-675 server.
  • AZ-677 — telemetry_stream MapObjects snapshot. Depends on AZ-675 (now done) + AZ-667 (not done — blocked).
  • AZ-658 — frame_ingest decoder. Still needs the H.264 binding decision (retina vs ffmpeg-rs vs gstreamer). 5 pts.