# Batch 15 / Cycle 1 — Implementation Report **Date**: 2026-05-20 **Tasks**: AZ-676, AZ-677, AZ-678, AZ-679 **Verdict**: PASS_WITH_WARNINGS - Pre-existing autopilot dead-code warning still open (C5; not touched by this batch). - Pre-existing `mission_executor::state_machine::ac3_bounded_retry_then_success` flake still intermittent under workspace test load (C6; not touched by this batch). - New optional surface in `OperatorBridge` (telemetry sink wiring) is gated by `with_telemetry_sink` / `with_validator` constructors — composition root in `crates/autopilot` will wire them in a future ticket (AZ-680 dispatch). ## 1. Scope | Ticket | Title | Crate | Complexity | |---|---|---|---| | AZ-676 | telemetry_stream video path (rtsp_forward + bytes_inline) + ai_locked | `telemetry_stream` | 3 | | AZ-677 | telemetry_stream MapObjects snapshot + diffs + reconnect resync | `telemetry_stream` | 3 | | AZ-678 | operator_bridge command authentication (HMAC, replay, session) | `operator_bridge` | 5 | | AZ-679 | operator_bridge POI surface mapper + dequeue + deadline carriage | `operator_bridge` | 3 | Batch chosen explicitly for **Telemetry+Operator foundation cohesion** — all four tickets sit on top of AZ-675 (gRPC server, shipped in batch 14) and AZ-667 (mapobjects_store hydrate, prior). AZ-676 closes the video transport question for the operator side; AZ-677 closes the MapObjects-bundle transport pattern; AZ-678 lays down the authentication invariants every command will cross; AZ-679 produces the wire-format POI events the GS UI consumes. Subsequent operator-side work (AZ-680 dispatch, AZ-681 safety/BIT ACK, AZ-684 VLM label) plugs into these four contracts. `AZ-658` (frame_ingest decoder, 5 pts) and `AZ-668` (scan_controller queue) remained unblocked but were deliberately deferred: AZ-658 has an open H.264-binding decision the team hasn't committed to (retina vs ffmpeg-rs vs gstreamer; cf. cumulative C7-adjacent risk), and AZ-668 is better picked up as part of the next scan_controller batch where its consumer surface lands. ## 2. Approach ### AZ-676 — Video path Two delivery modes named in the task spec map to a `VideoPath` enum (`RtspForward { url }` / `BytesInline { … }`) on the runtime, and to a single SubscribeVideo RPC on the wire. The session-start contract was promoted into its own proto message (`VideoSessionStart`) so the client can branch on `oneof` without re-reading config. **ai_locked coordination** is a single `Arc` owned by the `VideoPublisher`; session register / deregister flips it under a counter so concurrent subscribers don't toggle it back and forth. Consumers (`frame_ingest` AZ-657 already done; `detection_client` AZ-660) read the flag via `TelemetryStreamHandle::ai_locked_handle()` — no cross-crate observer registration, just a shared atomic. The `bytes_inline` path uses the same `tokio::sync::broadcast` machinery as the telemetry topics (lossy ring buffer, per-client drop counters). The `rtsp_forward` path is a no-op for `push_frame` — `frame_ingest` keeps calling without branching on configuration, the publisher decides. ### AZ-677 — MapObjects snapshot + diff The contract added is `MapObjectsSnapshotSource` (a trait `telemetry_stream` calls into; the production implementation will be `mapobjects_store::Store` via a thin adapter — not yet wired, lives in `EmptyMapObjectsSource` fixture for now). The wire format is a tagged enum `MapObjectsTopicMessage::{ Snapshot, Diff }` so the operator UI can branch deterministically. **Snapshot-on-subscribe** is implemented via a `StartThen` stream combinator inside the gRPC `subscribe` handler: when the requested topic list includes `MapObjectsBundle`, we synchronously call `current_snapshot_message()` and prepend it to the broadcast stream. **Reconnect** therefore Just Works — a new subscribe is a new snapshot, no replay state to manage. **Diff fan-out** uses the existing publisher: `TelemetryStreamHandle::push_mapobjects_diff(diff)` serialises and publishes on `Topic::MapObjectsBundle`. The wire enum tag (`kind: snapshot | diff`) keeps both message types on the same topic. ### AZ-678 — Command authentication The contract `OperatorCommandValidator` + types (`SignedCommand`, `ValidatedCommand`, `AuthError`) lives in `shared::contracts::operator_auth` so dispatch callsites (`scan_controller`, `mission_executor`) can depend on the trait without importing `operator_bridge` — a layering invariant the architecture deliberately preserves. The default implementation `HmacOperatorValidator` (`operator_bridge::internal::auth`) is intentionally narrow: - HMAC-SHA256 over `(session_token || '|' || seq_be || '|' || canonical_payload_json)`. The separator byte prevents length-extension between the three fields; canonical JSON is `serde_json::to_vec` of the `serde_json::Value` (deterministic for the operator's signing side). - Constant-time compare via `hmac::Mac::verify_slice` (no timing oracle, per NFR-Security). - Per-session replay tracker — `last_seen_seq: Option` advances on Ok, never on rejection. Rejecting `seq=N` does not poison the session: a legitimate retry can still land with `N+1`. This was the subtlety that drove the explicit AC-2 + AC-3 tests. - Session registry is in-process `HashMap` keyed by an opaque token. `register_session(token, secret)` is called from the (out-of-scope) Ground Station handshake; revoke + TTL (default 30 min) are first-class. - Rejection counters under a fixed-shape `AuthCounters` array (one slot per `REJECTION_REASONS`), exposed to the health surface. - **Health-red gate**: sliding-window VecDeque of signature-failure timestamps over the trailing 60 s; once ≥ `signature_failure_red_threshold` (default 30/min) the health surface goes red. Pruning is amortised O(1) on every record + every health probe. ### AZ-679 — POI surface The wire shape is the canonical model `shared::models::operator_event::OperatorPoiEvent` (matches `architecture.md §7.10`). `PoiSurfaceMapper::map(&poi, photo_metadata)` is a pure transform; `surface(&poi, photo_metadata)` is map + push through the `TelemetrySink::push_operator_event` extension. `emit_dequeued(poi_id, reason)` produces a `PoiDequeued` event. Both flow over a new `Topic::OperatorEvent` channel; the wire payload is a tagged enum (`OperatorEvent::{ PoiSurfaced, PoiDequeued }` with serde tag `kind`). `vlm_label` is intentionally `None` for now — the `Poi` model carries `vlm_status` (the pipeline status) but not the assistant-label string. The label will be threaded through in AZ-684 when scan_controller's VLM assessment ladder lands; the wire field is already in place so the operator UI can render it without a future schema change. `PoiSurfaceMetrics` exposes `pois_surfaced_per_min` (sliding 60 s window) + cumulative totals. Health is green by default; goes red only when the validator's signature-failure window crosses threshold (AC-5 via AZ-678). ### Cross-crate wiring - `TelemetrySink` (in `shared::contracts`) gained `push_operator_event(OperatorEvent) -> Result<()>`. Only `telemetry_stream::TelemetryStreamHandle` implements `TelemetrySink`; production code already constructs the handle in the composition root, so the new method is wired automatically once batch 15 lands. - `OperatorBridge` got two optional builder methods, `with_telemetry_sink(Arc)` and `with_validator(Arc)`. Existing call sites (tests, partial scaffolding in autopilot/runtime.rs) keep compiling. The composition-root wiring (autopilot/runtime.rs) is left for AZ-680 since dispatch + sink + validator are most naturally bundled. ## 3. Files touched ### Production - `Cargo.toml` — `hmac = "0.12"` workspace dep. - `crates/shared/src/models/operator_event.rs` — **new**. `Tier2EvidenceSummary`, `PhotoMetadata`, `OperatorPoiEvent`, `DequeueReason`, `PoiDequeued`, `OperatorEvent`. - `crates/shared/src/models/mod.rs` — `pub mod operator_event;`. - `crates/shared/src/contracts/operator_auth.rs` — **new**. `SignedCommand`, `ValidatedCommand`, `AuthError`, `OperatorCommandValidator` trait. - `crates/shared/src/contracts/mod.rs` — `pub mod operator_auth;` + `TelemetrySink::push_operator_event`. - `crates/telemetry_stream/Cargo.toml` — `bytes` dep. - `crates/telemetry_stream/proto/telemetry.proto` — `Topic::OperatorEvent`; `SubscribeVideo` RPC + supporting messages. - `crates/telemetry_stream/src/internal/mod.rs` — `pub mod {mapobjects, video, video_server};`. - `crates/telemetry_stream/src/internal/mapobjects.rs` — **new**. Snapshot + diff types, `MapObjectsSnapshotSource` trait, `EmptyMapObjectsSource` fixture. - `crates/telemetry_stream/src/internal/video.rs` — **new**. `VideoPath`, `VideoFrameMessage`, `VideoSnapshot`, `VideoPublisher` (with ai_locked atomic + session counter). - `crates/telemetry_stream/src/internal/video_server.rs` — **new**. SubscribeVideo RPC handler. - `crates/telemetry_stream/src/internal/publisher.rs` — `OperatorEvent` topic added to `ALL_TOPICS`; snapshot/diff source + counters wired. - `crates/telemetry_stream/src/internal/server.rs` — gRPC `subscribe_video` delegate; `subscribe` snapshot-prepend on `MapObjectsBundle`. - `crates/telemetry_stream/src/lib.rs` — `TelemetryStreamConfig` video knobs; `VideoPublisher` construction; `ai_locked_handle`; `set_mapobjects_snapshot_source`; `push_mapobjects_diff`; `video_snapshot`; `TelemetrySink::push_frame` + `push_operator_event` impls. - `crates/operator_bridge/Cargo.toml` — `serde_json`, `parking_lot`, `chrono`, `uuid`, `hmac`, `sha2`, `thiserror`. - `crates/operator_bridge/src/internal/mod.rs` — `pub mod {auth, poi_surface};`. - `crates/operator_bridge/src/internal/auth.rs` — **new**. `HmacValidatorConfig`, `HmacOperatorValidator`, `AuthCounters`, `REJECTION_REASONS`, session registry, replay tracker, health-red sliding window. - `crates/operator_bridge/src/internal/poi_surface.rs` — **new**. `PoiSurfaceMapper`, `PoiSurfaceMetrics`, `SurfaceRateWindow`. - `crates/operator_bridge/src/lib.rs` — `with_telemetry_sink`, `with_validator`, `surface_poi`, `surface_poi_with_photo`, `emit_poi_dequeued`, `poi_metrics`, updated `health()`. ### Tests - `crates/telemetry_stream/tests/video_path.rs` — **new**. 4 integration tests (AC-1, AC-2, AC-3, empty-client guard). - `crates/telemetry_stream/tests/mapobjects_snapshot.rs` — **new**. 3 integration tests (AC-1, AC-2, AC-3). ### Process - `_docs/02_tasks/done/AZ-676_telemetry_stream_video_path.md` — moved from `todo/`. - `_docs/02_tasks/done/AZ-677_telemetry_stream_mapobjects_snapshot.md` — moved from `todo/`. - `_docs/02_tasks/done/AZ-678_operator_bridge_command_auth.md` — moved from `todo/`. - `_docs/02_tasks/done/AZ-679_operator_bridge_poi_surface.md` — moved from `todo/`. - `_docs/_autodev_state.md` — phase update. - `_docs/03_implementation/batch_15_cycle1_report.md` — this report. - `_docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md` — cumulative review (separate file). ## 4. Test results | Crate | Unit | Integration | Total | |---|---|---|---| | `shared` | 9 (+2 new for operator_event serde) | — | 9 | | `telemetry_stream` | 18 (+6 new for video + 3 new for mapobjects) | 12 (+4 video_path, +3 mapobjects_snapshot) | 30 | | `operator_bridge` | 11 (5 auth AC + 1 smoke + 3 poi_surface AC + 2 bridge wiring) | — | 11 | `cargo clippy -p shared -p telemetry_stream -p operator_bridge --all-targets -- -D warnings`: clean after the test-time `assert_eq!(.., false)` → `assert!(!..)` rewrite. `cargo fmt -p shared -p telemetry_stream -p operator_bridge`: no diff. Workspace `cargo test --workspace`: all suites green **except** the carried-over `mission_executor::state_machine::ac3_bounded_retry_then_success` flake (see C6 — unchanged by this batch). ### Acceptance criteria | Ticket | AC | Test | Status | |---|---|---|---| | AZ-676 | AC-1 rtsp_forward URL only | `tests/video_path.rs::ac1_rtsp_forward_emits_url_only` | ✅ | | AZ-676 | AC-2 bytes_inline forwards frames | `tests/video_path.rs::ac2_bytes_inline_forwards_frames` + `internal/video.rs::bytes_inline_publish_frame_counts_and_fans_out` | ✅ | | AZ-676 | AC-3 ai_locked toggles on session start/stop | `tests/video_path.rs::ac3_ai_locked_toggles_on_session_start_and_stop` + `internal/video.rs::register_first_session_flips_ai_locked_true` + `deregister_last_session_flips_ai_locked_false` | ✅ | | AZ-677 | AC-1 first subscribe → snapshot | `tests/mapobjects_snapshot.rs::ac1_first_subscribe_receives_snapshot` | ✅ | | AZ-677 | AC-2 in-flight diffs | `tests/mapobjects_snapshot.rs::ac2_inflight_changes_emit_diffs` | ✅ | | AZ-677 | AC-3 reconnect re-snapshots | `tests/mapobjects_snapshot.rs::ac3_reconnect_resnaps_without_replay` | ✅ | | AZ-678 | AC-1 valid signed command passes | `internal/auth.rs::ac1_valid_signed_command_passes` | ✅ | | AZ-678 | AC-2 invalid signature rejected, seq not advanced | `internal/auth.rs::ac2_invalid_signature_rejected_and_seq_not_advanced` | ✅ | | AZ-678 | AC-3 replay detected | `internal/auth.rs::ac3_replay_detected` | ✅ | | AZ-678 | AC-4 unknown/expired session rejected | `internal/auth.rs::ac4_unknown_or_expired_session_rejected` | ✅ | | AZ-678 | AC-5 sustained sig failures → health red | `internal/auth.rs::ac5_sustained_signature_failures_flip_health_red` | ✅ | | AZ-679 | AC-1 all required fields populated | `internal/poi_surface.rs::ac1_full_poi_maps_all_required_fields` | ✅ | | AZ-679 | AC-2 VLM-disabled carries explicit status | `internal/poi_surface.rs::ac2_vlm_disabled_carries_explicit_status` | ✅ | | AZ-679 | AC-3 dequeue emits event through sink | `internal/poi_surface.rs::ac3_dequeue_emits_event_through_sink` | ✅ | ## 5. Code-review findings (this batch) **Verdict**: PASS_WITH_WARNINGS — zero Critical, zero High; one Medium and three Low. | # | Severity | Category | File:Line | Title | |---|---|---|---|---| | F1 | Medium | Maintainability | `crates/operator_bridge/src/internal/auth.rs:191-198` | `serde_json::to_vec(payload).unwrap_or_default()` silently substitutes empty bytes on a serialisation failure | | F2 | Low | Spec-Gap | `crates/operator_bridge/src/internal/poi_surface.rs:103-111` | `vlm_label` is hard-coded `None`; AC-1 wording allows this for AZ-684 follow-up but the wire field is exposed without producer for now | | F3 | Low | Architecture / Doc-sync | `crates/telemetry_stream/proto/telemetry.proto` + `_docs/02_document/architecture.md §7.x` | New proto topics + RPC (Topic::OperatorEvent, SubscribeVideo) not yet reflected in the architecture doc surface table — doc sweep ticket needed | | F4 | Low | Scope | `crates/operator_bridge/src/lib.rs:120-128` | `surface_poi` returns `NotImplemented` after pushing the surface event — convenient placeholder for AZ-680 but caller could mistake the side-effect for a successful round-trip | ### Finding details **F1: silent fallback on signing-payload serialisation** (Medium / Maintainability) - Location: `crates/operator_bridge/src/internal/auth.rs:191-198`. - Description: `signing_material` calls `serde_json::to_vec(payload).unwrap_or_default()`. A `serde_json::Value` cannot in practice fail to serialise (no foreign types in `Value`), so the failure path is unreachable today. But the silent `unwrap_or_default()` would produce a signing string with **empty** payload bytes on a hypothetical failure — which would then HMAC-verify against a sign-side that also failed identically, masking the issue. - Suggestion: replace with `.expect("serde_json::Value always serialises")` so the failure mode is loud, OR return `Err(AuthError::SignatureInvalid)` (treating the failure as un-verifiable input). Either is consistent with the project rule "never suppress errors silently". - Task: AZ-678. **F2: vlm_label producer deferred** (Low / Spec-Gap) - Location: `crates/operator_bridge/src/internal/poi_surface.rs:103-111`. - Description: AZ-679 AC-1 says the wire event has every required field populated; the architecture §7.10 schema lists `vlm_label` as optional. The mapper produces `None` for every status, including `VlmPipelineStatus::Ok` where the label *should* be present. The `Poi` model does not carry the label string (it only has the pipeline status), so this is a producer-side gap, not a transport gap. - Suggestion: add an explicit comment that AZ-684 (scan_controller VLM ladder) is the producer, and at that point introduce either a richer `Poi::vlm_label: Option` field or a richer overload on `PoiSurfaceMapper::map_with_label(poi, label)`. Currently the comment in the code is accurate but the gap is worth tracking until AZ-684 lands. - Task: AZ-679. **F3: architecture doc surface table out of sync with new proto topics** (Low / Architecture) - Location: `crates/telemetry_stream/proto/telemetry.proto` (now defines `Topic::OperatorEvent` + `SubscribeVideo` RPC). - Description: `architecture.md §7.x` enumerates the telemetry topic catalogue and the operator-link RPC surface. Batches 14 + 15 together have added: gRPC server, video subscribe, MapObjects snapshot-on-subscribe, operator events. The architecture doc has not yet had the surface table refreshed. - Suggestion: schedule a doc-sync sweep that covers batches 13-15 (architecture topic table + decision-rationale entries for Tonic-gRPC = closed Q2, and a brief note on the snapshot-then-diff pattern for MapObjects). Fold into the next monorepo-document/architecture-sync ticket. - Task: batches 13-15 collectively (carried as C3 + C7). **F4: surface_poi placeholder returns NotImplemented after side-effect** (Low / Scope) - Location: `crates/operator_bridge/src/lib.rs:120-128`. - Description: `OperatorBridgeHandle::surface_poi` pushes the surface event through the sink and then returns `Err(NotImplemented(AZ-680))`. The intent is "the surface IS pushed; the decision round-trip is AZ-680". A caller who tries to retry on error would double-push. - Suggestion: when AZ-680 lands, replace with a real decision channel. Until then, document explicitly that callers should treat `NotImplemented` here as "fire-and-forget, decision pending" — or rename to `enqueue_surface_only_pending_decision_loop` to make the placeholder posture unambiguous. - Task: AZ-679 (placeholder), AZ-680 (real fix). ## 6. Open cumulative findings touched - **C5 (autopilot dead-code clippy)** — unchanged; still blocks `--all-targets -D warnings` at the workspace level. Not fixable inside batch 15 scope. - **C6 (mission_executor ac3 flake)** — unchanged; reproduced once during the workspace test run, passes when re-run targeted (`-p mission_executor --test state_machine ac3_bounded_retry_then_success`). Documented in `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`. ## 7. Cumulative review trigger End of triplet 13 / 14 / 15 — cumulative review for these three batches is produced as `_docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md`. ## 8. Next-batch candidates - **AZ-680** — operator command dispatch (the consumer of AZ-678's `ValidatedCommand`). Naturally bundles with composition-root wiring (autopilot/runtime.rs) of `OperatorBridge::with_validator` + `with_telemetry_sink`. - **AZ-668** — scan_controller POI queue. Becomes much more tractable now that the wire format (AZ-679) is fixed. - **AZ-684** — scan_controller VLM assessment ladder; resolves F2 above. - **AZ-658** — frame_ingest decoder. Still needs the H.264-binding decision. - Doc sweep covering batches 13-15 (architecture topic table, Tonic-gRPC decision, snapshot-then-diff pattern).