[AZ-659] [AZ-660] [AZ-661] Archive batch 18; update state and cumulative review

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-20 18:27:15 +03:00
parent 0854d3be1c
commit 72cddc9c42
6 changed files with 366 additions and 10 deletions
@@ -0,0 +1,64 @@
# Multi-Consumer Frame Publisher + Back-Pressure Drops
**Task**: AZ-659_frame_ingest_publisher
**Name**: Tokio broadcast publisher + per-consumer drop counters + zero-copy `Arc<Bytes>`
**Description**: Publish `Frame`s through a single multi-consumer channel using `Arc<Bytes>` for pixel data so consumers do not copy. Drop frames when downstream consumers fall behind beyond a configured queue depth; record per-consumer drop counters with reason tags.
**Complexity**: 3 points
**Dependencies**: AZ-640_initial_structure, AZ-657_frame_ingest_rtsp_session, AZ-658_frame_ingest_decoder
**Component**: frame_ingest
**Tracker**: AZ-659
**Epic**: AZ-627
## Problem
Three downstream consumers (`detection_client`, `movement_detector`, `telemetry_stream`) all need the same frames at the same rate. A single-consumer queue would serialise the slowest; a per-consumer fan-out with cloned pixel buffers would multiply memory. The right structure is a Tokio `broadcast` channel (or equivalent) carrying `Arc<Bytes>` so pixels are shared by reference. Slow consumers drop their oldest frame, with the drop counted (and reason-tagged) — never silently coalesced.
## Outcome
- `FramePublisher::subscribe() -> FrameReceiver` returns a per-consumer receiver.
- `Frame` carries `Arc<Bytes>` for `pixels` so consumers do not copy.
- When a consumer falls behind beyond `channel_depth` (configurable, default 4), the oldest frame is dropped for THAT consumer; per-consumer counters increment with reason tag (`{detection_client_slow, movement_detector_slow, telemetry_slow}`).
- Health surface: per-consumer drop counters, total publish count.
## Scope
### Included
- `tokio::sync::broadcast` (or equivalent) with `Arc<Bytes>` payload.
- Per-consumer drop counter (statically known three consumer ids; future-extensible).
- Channel-depth config.
### Excluded
- RTSP session (task 18).
- Decoder (task 19).
## Acceptance Criteria
**AC-1: Three consumers receive every frame at nominal rate**
Given three subscribers consuming at 30 fps and source at 30 fps
When the publisher runs for 10 s
Then each consumer observes ~300 frames; per-consumer drop counters = 0.
**AC-2: Slow consumer drops, fast consumers unaffected**
Given a slow consumer that yields every 200 ms while source is 30 fps and `channel_depth = 4`
When the publisher runs for 5 s
Then the slow consumer's drop counter increments and fast consumers continue to receive every frame.
**AC-3: Zero-copy under load**
Given a publisher emitting at 30 fps for 60 s with three subscribers
When peak memory is sampled
Then memory does not scale linearly with consumer count (i.e. `Arc<Bytes>` is correctly shared).
## Non-Functional Requirements
**Performance**
- Publish-to-consumer p99 ≤5 ms (helps keep total RTSP-rx-to-publish under the 30 ms p99 budget).
**Reliability**
- Drops are counted with reason; never silent.
- No unbounded memory growth on slow consumer.
## Runtime Completeness
- **Named capability**: lossy multi-consumer frame fan-out with `Arc<Bytes>`.
- **Production code that must exist**: real broadcast channel; real per-consumer drop accounting.
- **Unacceptable substitutes**: cloning pixel buffers per consumer is unacceptable (multiplies memory); blocking the publisher on a slow consumer is unacceptable (gates the whole pipeline).
@@ -0,0 +1,77 @@
# Detection gRPC Bi-Directional Stream + Frame Budgeting
**Task**: AZ-660_detection_client_grpc_stream
**Name**: Bi-directional gRPC stream to ../detections + drop-oldest frame budgeting
**Description**: Single bi-directional gRPC stream to the external `../detections` service. Reconnect on stream loss with bounded exponential backoff. Frame budgeting: drop older in-flight frames if a new frame arrives before the previous response, respecting the Tier-1 ≤100 ms/frame target.
**Complexity**: 5 points
**Dependencies**: AZ-640_initial_structure, AZ-659_frame_ingest_publisher
**Component**: detection_client
**Tracker**: AZ-660
**Epic**: AZ-628
## Problem
`detection_client` is the only autopilot component talking to `../detections`. The contract is a bi-directional gRPC stream; the client must maintain it (reconnect with bounded backoff), respect the Tier-1 latency target by NOT queueing frames indefinitely (drop-oldest in-flight when a newer frame arrives), and never block the upstream `frame_ingest` publisher.
## Outcome
- `DetectionClient::run(frame_rx)` maintains one bi-directional gRPC stream to `../detections`; reconnect on stream loss with exponential backoff capped at 30 s.
- Outbound: send each `Frame` (skipping `ai_locked` ones) up to `max_concurrent_in_flight` (default 2); drop older in-flight frames when the budget is full and a new frame arrives (logged as `budget_drop`).
- Inbound: receive `DetectionBatch` and publish on the output channel; tag with the source frame's `monotonic_ts`.
- Health surface: `gRPC_connection_state`, `requests_in_flight`, `latency_p50/p99`, `errors_by_kind`, `budget_drops_total`.
## Scope
### Included
- `tonic` (or equivalent) gRPC client + bi-directional streaming.
- Reconnect state machine.
- In-flight tracker (sliding window of `frame_seq`).
- Drop-oldest budgeting.
### Excluded
- Schema validation + model_version handling (task 22).
- The `../detections` service itself (separate repo).
## Acceptance Criteria
**AC-1: Happy path against fixture**
Given a fixture gRPC server that returns a `DetectionBatch` per request within 50 ms
When `DetectionClient::run` is started against a 30 fps frame source for 10 s
Then ≥285 `DetectionBatch` are observed on the output channel; latency_p99 ≤100 ms; budget_drops_total = 0.
**AC-2: Reconnect after server restart**
Given a healthy stream
When the gRPC server is killed and restarted
Then the client reconnects within ≤2 s; subsequent frames flow through.
**AC-3: Budget drop on slow server**
Given the server takes 200 ms per response and the source is 30 fps
When the client runs for 5 s
Then `budget_drops_total > 0`, frames continue to flow, and the publisher is never blocked.
**AC-4: ai_locked frames are skipped**
Given a frame stream where every 5th frame has `ai_locked = true`
When the client runs
Then no requests are sent for `ai_locked` frames (observable via outgoing count).
## Non-Functional Requirements
**Performance**
- Per-frame round-trip ≤100 ms p99 (Tier-1 NFR; mostly owned by `../detections`).
- Reconnect latency: ≤2 s after `../detections` returns.
**Reliability**
- Drop-oldest never queues indefinitely.
- Reconnect is bounded.
## Contract
- gRPC service contract owner: `../_docs/03_detections.md`.
- Canonical typed model: `data_model.md §Detection`, `§DetectionBatch`.
## Runtime Completeness
- **Named capability**: bi-directional gRPC stream against `../detections`.
- **Production code that must exist**: real `tonic` (or equivalent) bi-directional stream; real budgeting.
- **Allowed external stubs**: a fixture gRPC server in tests; the real `../detections` for integration.
- **Unacceptable substitutes**: a unary call-per-frame instead of streaming is unacceptable (multiplies per-request overhead).
@@ -0,0 +1,62 @@
# Detection Schema Validation + Model-Version + Health
**Task**: AZ-661_detection_client_schema_and_health
**Name**: Response schema validation + model_version tracking + Tier-1 health degradation signal
**Description**: Validate every `DetectionBatch` response against the schema version the client was built against. Surface a hard error on schema mismatch (never silent downcast). Track `model_version`; on change, surface to `scan_controller` so per-class thresholds can be reloaded. Track sliding-window latency; on `latency_p99 > 100 ms` flip health → yellow so `scan_controller` can degrade to alternate-frame inference.
**Complexity**: 2 points
**Dependencies**: AZ-640_initial_structure, AZ-660_detection_client_grpc_stream
**Component**: detection_client
**Tracker**: AZ-661
**Epic**: AZ-628
## Problem
Schema drift between `../detections` and autopilot must be caught loudly — not silently downcast. The model version can change at runtime (model swap); when it does, the per-class confidence thresholds may need to be reloaded by `scan_controller`. The Tier-1 latency target (≤100 ms) is mostly owned by `../detections` but autopilot must observe drift and surface health degradation so the scan controller can take action.
## Outcome
- Every response is validated against the bundled schema; on mismatch, returns a hard error to the output channel and health → red.
- `last_model_version` is tracked; on change, a `ModelVersionChanged(new_version)` event is emitted on the output channel.
- A sliding-window latency tracker (e.g. last 1 min) emits a `Tier1Degraded { reason: HighLatency }` event when `latency_p99 > 100 ms`.
## Scope
### Included
- Schema validation hook on every response.
- `model_version` tracker.
- Sliding-window latency tracker + degradation signal.
### Excluded
- The reaction to `Tier1Degraded` (lives in `scan_controller`).
- The schema definition itself (lives in the contract).
## Acceptance Criteria
**AC-1: Schema mismatch surfaces as hard error**
Given the fixture server returns a `DetectionBatch` with an unknown field type
When the client validates the response
Then a hard error is emitted on the output channel and `errors_by_kind{kind="schema_mismatch"}` increments by 1.
**AC-2: Model version change is signalled**
Given the server reports `model_version = "v1.2"` on initial stream open
When a subsequent response reports `model_version = "v1.3"`
Then exactly one `ModelVersionChanged("v1.3")` event is emitted.
**AC-3: Latency degradation signal**
Given the server's response latency rises to 150 ms p99 over a 1-min window
When the latency tracker evaluates
Then `Tier1Degraded { reason: HighLatency }` is emitted exactly once until latency falls back below 100 ms.
## Non-Functional Requirements
**Performance**
- Validation overhead: ≤1 ms per response.
**Reliability**
- Schema mismatches never silent.
## Runtime Completeness
- **Named capability**: response schema validation + model-version awareness + latency-degradation signal.
- **Production code that must exist**: real schema validation; real model-version tracker; real percentile tracker.
- **Unacceptable substitutes**: silently downcasting an unknown response shape is unacceptable.
@@ -0,0 +1,68 @@
# Batch 18 — Cycle 1 Implementation Report
**Tasks**: AZ-659, AZ-660, AZ-661
**Completed**: 2026-05-20
**Status**: All tests pass; code review PASS_WITH_WARNINGS; committed `0854d3b`
---
## AZ-659 — frame_ingest publisher (3 pts)
**Files added/changed**:
- `crates/frame_ingest/src/internal/publisher.rs``FramePublisher`, `FrameReceiver`, `ConsumerId`, `PublisherStats`
- `crates/frame_ingest/src/internal/mod.rs` — exports `publisher`
- `crates/frame_ingest/src/lib.rs``FrameIngestHandle` extended with `subscribe_as`, `publisher`, `dropped_frames`, `publishes_total`
- `crates/frame_ingest/tests/publisher.rs` — AC-1/2/3 integration tests
**ACs**: All passing.
---
## AZ-660 — detection_client gRPC bi-directional stream (5 pts)
**Files added/changed**:
- `crates/detection_client/Cargo.toml` — added `tonic`, `prost`, `tonic-prost-build`, `protoc-bin-vendored`
- `crates/detection_client/build.rs` — proto codegen via `tonic-prost-build`
- `crates/detection_client/proto/detections.proto` — gRPC contract (FrameRequest / DetectionResponse bi-di stream)
- `crates/detection_client/src/internal/mod.rs` — module registry
- `crates/detection_client/src/internal/proto.rs` — generated code re-export
- `crates/detection_client/src/internal/budget.rs``BudgetTracker` (drop-oldest VecDeque, default capacity 2)
- `crates/detection_client/src/internal/stats.rs``DetectionStats` (lock-free AtomicU64 counters)
- `crates/detection_client/src/internal/runtime.rs` — supervisor + `run_stream_session` with bounded backoff reconnect
- `crates/detection_client/src/lib.rs``DetectionClient`, `DetectionClientConfig`, `DetectionClientHandle`, `DetectionEvent`, `ConnectionState`
- `crates/detection_client/tests/stream.rs` — AC-1/2/3/4 integration tests (fixture in-process gRPC server)
**ACs**: All passing.
---
## AZ-661 — schema validation + model_version + latency degradation (2 pts)
Implemented inside the same `detection_client` crates (AC-660 and AC-661 share the same modules):
- `src/internal/latency.rs``LatencyWindow` ring-buffer + `DegradationTransition` latch
- `src/internal/runtime.rs::handle_response` — schema version check, model_version latch, Tier1 degradation evaluation after every response
- `crates/detection_client/tests/stream.rs` — AC-1/2/3 integration tests
**ACs**: All passing.
---
## Code Review
**Verdict**: PASS_WITH_WARNINGS — see `_docs/03_implementation/reviews/batch_18_review.md`.
Findings:
- F1 (Medium, fixed): dead code in `handle_response` (`let now`, `let _ = in_flight`) removed.
- F2F4: Low findings, no action required this batch.
---
## Architecture / Doc Updates
- `_docs/02_document/module-layout.md``frame_ingest` and `detection_client` sections updated to reflect actual streaming API.
---
## Remaining tasks in `todo/`
9 tasks remaining across 3 components (movement_detector, semantic_analyzer, scan_controller).
@@ -0,0 +1,85 @@
# Cumulative Code Review — Batches 16-18 (Cycle 1)
**Scope**: AZ-658, AZ-680, AZ-681, AZ-659, AZ-660, AZ-661
**Date**: 2026-05-20
**Overall Verdict**: PASS_WITH_WARNINGS
---
## Scope Summary
| Batch | Tasks | Components |
|-------|-------|-----------|
| 16 | AZ-658 frame_ingest decoder | frame_ingest |
| 17 | AZ-680 operator_bridge command dispatch; AZ-681 safety+BIT ack | shared, scan_controller, mission_executor, operator_bridge |
| 18 | AZ-659 frame_ingest publisher; AZ-660 detection_client gRPC stream; AZ-661 schema+health | frame_ingest, detection_client |
---
## Cross-Batch Architecture Consistency
### Layer compliance (all batches)
No layer violations found across batches 16-18. Every crate imports only `shared` (Layer 1) for cross-component types. Cross-component dispatch uses traits in `shared::contracts`. The `detection_client` receives a `broadcast::Receiver<Frame>` injected by the composition root — it does not import `frame_ingest`.
### Pattern consistency
| Pattern | Batches 16-18 usage |
|---------|---------------------|
| Async actor model | All components expose `run()``JoinHandle` + `Handle`. ✓ |
| `shared::models` for data | `Frame`, `DetectionBatch`, `BoundingBox`, `Detection` all come from `shared`. ✓ |
| `shared::contracts` for cross-cutting dispatch | `ScanCommandRouter`, `MissionSafetyRouter`, `BitReportSeverityLookup` added in batch 17; `detection_client` and `frame_ingest` do not need new traits. ✓ |
| Lock-free counters | `AtomicU64` used uniformly across `detection_client::DetectionStats`, `frame_ingest::PublisherStats`. ✓ |
| Broadcast channels for fan-out | Batch 18 adds `FramePublisher` (wrapping `tokio::sync::broadcast`) for the frame pipeline; consistent with the existing telemetry broadcast pattern. ✓ |
### Interface wiring readiness
The composition root (`crates/autopilot/src/runtime.rs`) still needs to wire:
- `frame_ingest.handle().subscribe_as(ConsumerId::DetectionClient)` → raw receiver forwarded to `DetectionClient::run(frame_rx)`
- `detection_client_handle.subscribe_events()` → event receiver forwarded to `scan_controller` and `telemetry_stream`
Neither wiring is in scope for batches 16-18 — they belong to the final runtime composition task. No interface mismatch found.
---
## Findings (cumulative, deduplicated)
| # | Severity | Category | File:Line | Title | Batch | Disposition |
|---|----------|----------|-----------|-------|-------|-------------|
| 1 | Low | Architecture | `detection_client/src/lib.rs` | `pub mod internal` exposes proto server types to external crates | 18 | Accepted: required for integration test fixture server; practical risk negligible |
| 2 | Low | Maintainability | `detection_client/src/internal/stats.rs:66` | `note_orphan_response` increments `stream_errors_total` — imprecise bucket | 18 | Accepted: additive counter, low severity; add `orphan_responses_total` in next stats refactor |
| 3 | Low | Performance | `detection_client/src/internal/runtime.rs:build_request` | Pixel buffer copy per gRPC frame | 18 | Accepted: unavoidable with current prost stack; revisit when `prost bytes` feature is evaluated |
| 4 | Low | Architecture | `crates/autopilot/src/runtime.rs:84` | Pre-existing dead-code lint on `vlm_provider_name` | 16 | Pre-existing; tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` |
**Critical**: 0 | **High**: 0 | **Medium**: 0 (one Medium from batch 18 was fixed inline)
---
## Per-Batch Batch Review Cross-Reference
| Batch | Per-batch verdict | Findings fixed | Open low/med |
|-------|------------------|----------------|-------------|
| 16 | PASS_WITH_WARNINGS | — | 1 Low (FFmpeg EAGAIN string match), 1 Low (autopilot dead-code) |
| 17 | PASS | — | None |
| 18 | PASS_WITH_WARNINGS | F1 Medium (dead code) fixed inline | 3 Low accepted |
---
## Open Risks
1. **`mission_executor` polling race** — `ac1_multirotor_happy_path_reaches_done` (and the earlier `ac3`) intermittently fail under load. Tracked in `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`. Not a production defect; fix in the next `mission_executor` batch.
2. **Composition root wiring gap**`frame_ingest` publisher and `detection_client` supervisor are not yet wired in `autopilot/src/runtime.rs`. This is expected and intentional; the composition root is wired in a dedicated final-assembly task once all leaf components are done.
3. **Real `../detections` service not tested**`detection_client` tests use a fixture in-process gRPC server. End-to-end integration against the real service is scoped to the suite-level e2e harness.
---
## Quality Gate Status (batches 16-18 combined)
- `cargo fmt --all`: clean
- `cargo clippy -p frame_ingest -p detection_client --all-targets -- -D warnings`: clean
- `cargo test -p frame_ingest -p detection_client`: all passing (17 unit + 3 publisher + 5 rtsp_lifecycle + 10 detection_client unit + 7 detection_client integration)
- `cargo test --workspace`: one pre-existing flake in `mission_executor` (documented, not blocking)
**Verdict: PASS_WITH_WARNINGS — no Critical or High findings; proceed to batch 19.**
+10 -10
View File
@@ -7,23 +7,23 @@ name: Implement
status: between-batches
sub_step:
phase: 0
name: batch-18-select
name: batch-19-select
detail: ""
retry_count: 0
cycle: 1
tracker: jira
## Last Completed Batch
batch: 17
commit: c4eff40
ticket: AZ-680, AZ-681
jira_status: In Testing (confirmed via read-back for both)
report: _docs/03_implementation/batch_17_cycle1_report.md
cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md
batch: 18
commit: 0854d3b
ticket: AZ-659, AZ-660, AZ-661
jira_status: In Testing (confirmed via read-back for all three)
report: _docs/03_implementation/batch_18_cycle1_report.md
cumulative_review: _docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md
## Process Leftovers
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 17
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; flake also hit `ac1` during batch 17 workspace run
- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 18
- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; fix when next mission_executor batch lands
## Cumulative Review Cadence
Last cumulative: batches 1315. Next due: end of batch 18.
Last cumulative: batches 1618. Next due: end of batch 21 (or sooner if a large-scope batch warrants it).