Files
Oleksandr Bezdieniezhnykh ccf929af69 [AZ-676] [AZ-677] [AZ-678] [AZ-679] telemetry+operator foundation
Batch 15 ships the four foundation tickets sitting on top of AZ-675
(gRPC server) and AZ-667 (mapobjects_store hydrate):

* AZ-676: telemetry_stream video path (rtsp_forward + bytes_inline)
  with ai_locked atomic + session counter, SubscribeVideo RPC.
* AZ-677: MapObjects snapshot-on-subscribe + diff broadcast +
  reconnect-resync (StartThen stream-prepend pattern).
* AZ-678: HmacOperatorValidator with per-session monotonic seq,
  in-process session registry + TTL, constant-time HMAC compare,
  rejection-reason counters, sliding 60 s sig-failure red-health gate.
  Trait OperatorCommandValidator in shared::contracts::operator_auth.
* AZ-679: PoiSurfaceMapper produces OperatorPoiEvent per architecture
  §7.10; PoiDequeued events on rotate/age-out/complete; pushed via
  new TelemetrySink::push_operator_event extension on Topic::OperatorEvent.

Cross-task wiring: TelemetrySink trait extended with
push_operator_event; OperatorBridge gets optional builder methods
with_telemetry_sink / with_validator (composition root wires in
AZ-680). Workspace deps: hmac = "0.12"; per-crate adds bytes,
serde_json, parking_lot, chrono, uuid, sha2, thiserror.

Tests: 14/14 ACs verified locally (4 + 3 + 5 + 3 by AC) plus
6 supporting unit tests + 7 integration tests + 2 shared serde
roundtrips. cargo clippy clean on touched crates. Cumulative
review for batches 13-15 produced; verdict PASS_WITH_WARNINGS
(0 Critical, 0 High, 1 Medium, 4 Low — all carry-overs or
deferred-producer notes for AZ-680/AZ-684).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 16:18:40 +03:00

19 KiB

Batch 15 / Cycle 1 — Implementation Report

Date: 2026-05-20 Tasks: AZ-676, AZ-677, AZ-678, AZ-679 Verdict: PASS_WITH_WARNINGS

  • Pre-existing autopilot dead-code warning still open (C5; not touched by this batch).
  • Pre-existing mission_executor::state_machine::ac3_bounded_retry_then_success flake still intermittent under workspace test load (C6; not touched by this batch).
  • New optional surface in OperatorBridge (telemetry sink wiring) is gated by with_telemetry_sink / with_validator constructors — composition root in crates/autopilot will wire them in a future ticket (AZ-680 dispatch).

1. Scope

Ticket Title Crate Complexity
AZ-676 telemetry_stream video path (rtsp_forward + bytes_inline) + ai_locked telemetry_stream 3
AZ-677 telemetry_stream MapObjects snapshot + diffs + reconnect resync telemetry_stream 3
AZ-678 operator_bridge command authentication (HMAC, replay, session) operator_bridge 5
AZ-679 operator_bridge POI surface mapper + dequeue + deadline carriage operator_bridge 3

Batch chosen explicitly for Telemetry+Operator foundation cohesion — all four tickets sit on top of AZ-675 (gRPC server, shipped in batch 14) and AZ-667 (mapobjects_store hydrate, prior). AZ-676 closes the video transport question for the operator side; AZ-677 closes the MapObjects-bundle transport pattern; AZ-678 lays down the authentication invariants every command will cross; AZ-679 produces the wire-format POI events the GS UI consumes. Subsequent operator-side work (AZ-680 dispatch, AZ-681 safety/BIT ACK, AZ-684 VLM label) plugs into these four contracts.

AZ-658 (frame_ingest decoder, 5 pts) and AZ-668 (scan_controller queue) remained unblocked but were deliberately deferred: AZ-658 has an open H.264-binding decision the team hasn't committed to (retina vs ffmpeg-rs vs gstreamer; cf. cumulative C7-adjacent risk), and AZ-668 is better picked up as part of the next scan_controller batch where its consumer surface lands.

2. Approach

AZ-676 — Video path

Two delivery modes named in the task spec map to a VideoPath enum (RtspForward { url } / BytesInline { … }) on the runtime, and to a single SubscribeVideo RPC on the wire. The session-start contract was promoted into its own proto message (VideoSessionStart) so the client can branch on oneof without re-reading config.

ai_locked coordination is a single Arc<AtomicBool> owned by the VideoPublisher; session register / deregister flips it under a counter so concurrent subscribers don't toggle it back and forth. Consumers (frame_ingest AZ-657 already done; detection_client AZ-660) read the flag via TelemetryStreamHandle::ai_locked_handle() — no cross-crate observer registration, just a shared atomic.

The bytes_inline path uses the same tokio::sync::broadcast machinery as the telemetry topics (lossy ring buffer, per-client drop counters). The rtsp_forward path is a no-op for push_frameframe_ingest keeps calling without branching on configuration, the publisher decides.

AZ-677 — MapObjects snapshot + diff

The contract added is MapObjectsSnapshotSource (a trait telemetry_stream calls into; the production implementation will be mapobjects_store::Store via a thin adapter — not yet wired, lives in EmptyMapObjectsSource fixture for now). The wire format is a tagged enum MapObjectsTopicMessage::{ Snapshot, Diff } so the operator UI can branch deterministically.

Snapshot-on-subscribe is implemented via a StartThen stream combinator inside the gRPC subscribe handler: when the requested topic list includes MapObjectsBundle, we synchronously call current_snapshot_message() and prepend it to the broadcast stream. Reconnect therefore Just Works — a new subscribe is a new snapshot, no replay state to manage.

Diff fan-out uses the existing publisher: TelemetryStreamHandle::push_mapobjects_diff(diff) serialises and publishes on Topic::MapObjectsBundle. The wire enum tag (kind: snapshot | diff) keeps both message types on the same topic.

AZ-678 — Command authentication

The contract OperatorCommandValidator + types (SignedCommand, ValidatedCommand, AuthError) lives in shared::contracts::operator_auth so dispatch callsites (scan_controller, mission_executor) can depend on the trait without importing operator_bridge — a layering invariant the architecture deliberately preserves.

The default implementation HmacOperatorValidator (operator_bridge::internal::auth) is intentionally narrow:

  • HMAC-SHA256 over (session_token || '|' || seq_be || '|' || canonical_payload_json). The separator byte prevents length-extension between the three fields; canonical JSON is serde_json::to_vec of the serde_json::Value (deterministic for the operator's signing side).
  • Constant-time compare via hmac::Mac::verify_slice (no timing oracle, per NFR-Security).
  • Per-session replay tracker — last_seen_seq: Option<u64> advances on Ok, never on rejection. Rejecting seq=N does not poison the session: a legitimate retry can still land with N+1. This was the subtlety that drove the explicit AC-2 + AC-3 tests.
  • Session registry is in-process HashMap<token, SessionEntry> keyed by an opaque token. register_session(token, secret) is called from the (out-of-scope) Ground Station handshake; revoke + TTL (default 30 min) are first-class.
  • Rejection counters under a fixed-shape AuthCounters array (one slot per REJECTION_REASONS), exposed to the health surface.
  • Health-red gate: sliding-window VecDeque of signature-failure timestamps over the trailing 60 s; once ≥ signature_failure_red_threshold (default 30/min) the health surface goes red. Pruning is amortised O(1) on every record + every health probe.

AZ-679 — POI surface

The wire shape is the canonical model shared::models::operator_event::OperatorPoiEvent (matches architecture.md §7.10). PoiSurfaceMapper::map(&poi, photo_metadata) is a pure transform; surface(&poi, photo_metadata) is map + push through the TelemetrySink::push_operator_event extension. emit_dequeued(poi_id, reason) produces a PoiDequeued event. Both flow over a new Topic::OperatorEvent channel; the wire payload is a tagged enum (OperatorEvent::{ PoiSurfaced, PoiDequeued } with serde tag kind).

vlm_label is intentionally None for now — the Poi model carries vlm_status (the pipeline status) but not the assistant-label string. The label will be threaded through in AZ-684 when scan_controller's VLM assessment ladder lands; the wire field is already in place so the operator UI can render it without a future schema change.

PoiSurfaceMetrics exposes pois_surfaced_per_min (sliding 60 s window) + cumulative totals. Health is green by default; goes red only when the validator's signature-failure window crosses threshold (AC-5 via AZ-678).

Cross-crate wiring

  • TelemetrySink (in shared::contracts) gained push_operator_event(OperatorEvent) -> Result<()>. Only telemetry_stream::TelemetryStreamHandle implements TelemetrySink; production code already constructs the handle in the composition root, so the new method is wired automatically once batch 15 lands.
  • OperatorBridge got two optional builder methods, with_telemetry_sink(Arc<dyn TelemetrySink>) and with_validator(Arc<HmacOperatorValidator>). Existing call sites (tests, partial scaffolding in autopilot/runtime.rs) keep compiling. The composition-root wiring (autopilot/runtime.rs) is left for AZ-680 since dispatch + sink + validator are most naturally bundled.

3. Files touched

Production

  • Cargo.tomlhmac = "0.12" workspace dep.
  • crates/shared/src/models/operator_event.rsnew. Tier2EvidenceSummary, PhotoMetadata, OperatorPoiEvent, DequeueReason, PoiDequeued, OperatorEvent.
  • crates/shared/src/models/mod.rspub mod operator_event;.
  • crates/shared/src/contracts/operator_auth.rsnew. SignedCommand, ValidatedCommand, AuthError, OperatorCommandValidator trait.
  • crates/shared/src/contracts/mod.rspub mod operator_auth; + TelemetrySink::push_operator_event.
  • crates/telemetry_stream/Cargo.tomlbytes dep.
  • crates/telemetry_stream/proto/telemetry.protoTopic::OperatorEvent; SubscribeVideo RPC + supporting messages.
  • crates/telemetry_stream/src/internal/mod.rspub mod {mapobjects, video, video_server};.
  • crates/telemetry_stream/src/internal/mapobjects.rsnew. Snapshot + diff types, MapObjectsSnapshotSource trait, EmptyMapObjectsSource fixture.
  • crates/telemetry_stream/src/internal/video.rsnew. VideoPath, VideoFrameMessage, VideoSnapshot, VideoPublisher (with ai_locked atomic + session counter).
  • crates/telemetry_stream/src/internal/video_server.rsnew. SubscribeVideo RPC handler.
  • crates/telemetry_stream/src/internal/publisher.rsOperatorEvent topic added to ALL_TOPICS; snapshot/diff source + counters wired.
  • crates/telemetry_stream/src/internal/server.rs — gRPC subscribe_video delegate; subscribe snapshot-prepend on MapObjectsBundle.
  • crates/telemetry_stream/src/lib.rsTelemetryStreamConfig video knobs; VideoPublisher construction; ai_locked_handle; set_mapobjects_snapshot_source; push_mapobjects_diff; video_snapshot; TelemetrySink::push_frame + push_operator_event impls.
  • crates/operator_bridge/Cargo.tomlserde_json, parking_lot, chrono, uuid, hmac, sha2, thiserror.
  • crates/operator_bridge/src/internal/mod.rspub mod {auth, poi_surface};.
  • crates/operator_bridge/src/internal/auth.rsnew. HmacValidatorConfig, HmacOperatorValidator, AuthCounters, REJECTION_REASONS, session registry, replay tracker, health-red sliding window.
  • crates/operator_bridge/src/internal/poi_surface.rsnew. PoiSurfaceMapper, PoiSurfaceMetrics, SurfaceRateWindow.
  • crates/operator_bridge/src/lib.rswith_telemetry_sink, with_validator, surface_poi, surface_poi_with_photo, emit_poi_dequeued, poi_metrics, updated health().

Tests

  • crates/telemetry_stream/tests/video_path.rsnew. 4 integration tests (AC-1, AC-2, AC-3, empty-client guard).
  • crates/telemetry_stream/tests/mapobjects_snapshot.rsnew. 3 integration tests (AC-1, AC-2, AC-3).

Process

  • _docs/02_tasks/done/AZ-676_telemetry_stream_video_path.md — moved from todo/.
  • _docs/02_tasks/done/AZ-677_telemetry_stream_mapobjects_snapshot.md — moved from todo/.
  • _docs/02_tasks/done/AZ-678_operator_bridge_command_auth.md — moved from todo/.
  • _docs/02_tasks/done/AZ-679_operator_bridge_poi_surface.md — moved from todo/.
  • _docs/_autodev_state.md — phase update.
  • _docs/03_implementation/batch_15_cycle1_report.md — this report.
  • _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md — cumulative review (separate file).

4. Test results

Crate Unit Integration Total
shared 9 (+2 new for operator_event serde) 9
telemetry_stream 18 (+6 new for video + 3 new for mapobjects) 12 (+4 video_path, +3 mapobjects_snapshot) 30
operator_bridge 11 (5 auth AC + 1 smoke + 3 poi_surface AC + 2 bridge wiring) 11

cargo clippy -p shared -p telemetry_stream -p operator_bridge --all-targets -- -D warnings: clean after the test-time assert_eq!(.., false)assert!(!..) rewrite.

cargo fmt -p shared -p telemetry_stream -p operator_bridge: no diff.

Workspace cargo test --workspace: all suites green except the carried-over mission_executor::state_machine::ac3_bounded_retry_then_success flake (see C6 — unchanged by this batch).

Acceptance criteria

Ticket AC Test Status
AZ-676 AC-1 rtsp_forward URL only tests/video_path.rs::ac1_rtsp_forward_emits_url_only
AZ-676 AC-2 bytes_inline forwards frames tests/video_path.rs::ac2_bytes_inline_forwards_frames + internal/video.rs::bytes_inline_publish_frame_counts_and_fans_out
AZ-676 AC-3 ai_locked toggles on session start/stop tests/video_path.rs::ac3_ai_locked_toggles_on_session_start_and_stop + internal/video.rs::register_first_session_flips_ai_locked_true + deregister_last_session_flips_ai_locked_false
AZ-677 AC-1 first subscribe → snapshot tests/mapobjects_snapshot.rs::ac1_first_subscribe_receives_snapshot
AZ-677 AC-2 in-flight diffs tests/mapobjects_snapshot.rs::ac2_inflight_changes_emit_diffs
AZ-677 AC-3 reconnect re-snapshots tests/mapobjects_snapshot.rs::ac3_reconnect_resnaps_without_replay
AZ-678 AC-1 valid signed command passes internal/auth.rs::ac1_valid_signed_command_passes
AZ-678 AC-2 invalid signature rejected, seq not advanced internal/auth.rs::ac2_invalid_signature_rejected_and_seq_not_advanced
AZ-678 AC-3 replay detected internal/auth.rs::ac3_replay_detected
AZ-678 AC-4 unknown/expired session rejected internal/auth.rs::ac4_unknown_or_expired_session_rejected
AZ-678 AC-5 sustained sig failures → health red internal/auth.rs::ac5_sustained_signature_failures_flip_health_red
AZ-679 AC-1 all required fields populated internal/poi_surface.rs::ac1_full_poi_maps_all_required_fields
AZ-679 AC-2 VLM-disabled carries explicit status internal/poi_surface.rs::ac2_vlm_disabled_carries_explicit_status
AZ-679 AC-3 dequeue emits event through sink internal/poi_surface.rs::ac3_dequeue_emits_event_through_sink

5. Code-review findings (this batch)

Verdict: PASS_WITH_WARNINGS — zero Critical, zero High; one Medium and three Low.

# Severity Category File:Line Title
F1 Medium Maintainability crates/operator_bridge/src/internal/auth.rs:191-198 serde_json::to_vec(payload).unwrap_or_default() silently substitutes empty bytes on a serialisation failure
F2 Low Spec-Gap crates/operator_bridge/src/internal/poi_surface.rs:103-111 vlm_label is hard-coded None; AC-1 wording allows this for AZ-684 follow-up but the wire field is exposed without producer for now
F3 Low Architecture / Doc-sync crates/telemetry_stream/proto/telemetry.proto + _docs/02_document/architecture.md §7.x New proto topics + RPC (Topic::OperatorEvent, SubscribeVideo) not yet reflected in the architecture doc surface table — doc sweep ticket needed
F4 Low Scope crates/operator_bridge/src/lib.rs:120-128 surface_poi returns NotImplemented after pushing the surface event — convenient placeholder for AZ-680 but caller could mistake the side-effect for a successful round-trip

Finding details

F1: silent fallback on signing-payload serialisation (Medium / Maintainability)

  • Location: crates/operator_bridge/src/internal/auth.rs:191-198.
  • Description: signing_material calls serde_json::to_vec(payload).unwrap_or_default(). A serde_json::Value cannot in practice fail to serialise (no foreign types in Value), so the failure path is unreachable today. But the silent unwrap_or_default() would produce a signing string with empty payload bytes on a hypothetical failure — which would then HMAC-verify against a sign-side that also failed identically, masking the issue.
  • Suggestion: replace with .expect("serde_json::Value always serialises") so the failure mode is loud, OR return Err(AuthError::SignatureInvalid) (treating the failure as un-verifiable input). Either is consistent with the project rule "never suppress errors silently".
  • Task: AZ-678.

F2: vlm_label producer deferred (Low / Spec-Gap)

  • Location: crates/operator_bridge/src/internal/poi_surface.rs:103-111.
  • Description: AZ-679 AC-1 says the wire event has every required field populated; the architecture §7.10 schema lists vlm_label as optional. The mapper produces None for every status, including VlmPipelineStatus::Ok where the label should be present. The Poi model does not carry the label string (it only has the pipeline status), so this is a producer-side gap, not a transport gap.
  • Suggestion: add an explicit comment that AZ-684 (scan_controller VLM ladder) is the producer, and at that point introduce either a richer Poi::vlm_label: Option<String> field or a richer overload on PoiSurfaceMapper::map_with_label(poi, label). Currently the comment in the code is accurate but the gap is worth tracking until AZ-684 lands.
  • Task: AZ-679.

F3: architecture doc surface table out of sync with new proto topics (Low / Architecture)

  • Location: crates/telemetry_stream/proto/telemetry.proto (now defines Topic::OperatorEvent + SubscribeVideo RPC).
  • Description: architecture.md §7.x enumerates the telemetry topic catalogue and the operator-link RPC surface. Batches 14 + 15 together have added: gRPC server, video subscribe, MapObjects snapshot-on-subscribe, operator events. The architecture doc has not yet had the surface table refreshed.
  • Suggestion: schedule a doc-sync sweep that covers batches 13-15 (architecture topic table + decision-rationale entries for Tonic-gRPC = closed Q2, and a brief note on the snapshot-then-diff pattern for MapObjects). Fold into the next monorepo-document/architecture-sync ticket.
  • Task: batches 13-15 collectively (carried as C3 + C7).

F4: surface_poi placeholder returns NotImplemented after side-effect (Low / Scope)

  • Location: crates/operator_bridge/src/lib.rs:120-128.
  • Description: OperatorBridgeHandle::surface_poi pushes the surface event through the sink and then returns Err(NotImplemented(AZ-680)). The intent is "the surface IS pushed; the decision round-trip is AZ-680". A caller who tries to retry on error would double-push.
  • Suggestion: when AZ-680 lands, replace with a real decision channel. Until then, document explicitly that callers should treat NotImplemented here as "fire-and-forget, decision pending" — or rename to enqueue_surface_only_pending_decision_loop to make the placeholder posture unambiguous.
  • Task: AZ-679 (placeholder), AZ-680 (real fix).

6. Open cumulative findings touched

  • C5 (autopilot dead-code clippy) — unchanged; still blocks --all-targets -D warnings at the workspace level. Not fixable inside batch 15 scope.
  • C6 (mission_executor ac3 flake) — unchanged; reproduced once during the workspace test run, passes when re-run targeted (-p mission_executor --test state_machine ac3_bounded_retry_then_success). Documented in _docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md.

7. Cumulative review trigger

End of triplet 13 / 14 / 15 — cumulative review for these three batches is produced as _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md.

8. Next-batch candidates

  • AZ-680 — operator command dispatch (the consumer of AZ-678's ValidatedCommand). Naturally bundles with composition-root wiring (autopilot/runtime.rs) of OperatorBridge::with_validator + with_telemetry_sink.
  • AZ-668 — scan_controller POI queue. Becomes much more tractable now that the wire format (AZ-679) is fixed.
  • AZ-684 — scan_controller VLM assessment ladder; resolves F2 above.
  • AZ-658 — frame_ingest decoder. Still needs the H.264-binding decision.
  • Doc sweep covering batches 13-15 (architecture topic table, Tonic-gRPC decision, snapshot-then-diff pattern).