diff --git a/_docs/02_tasks/done/AZ-659_frame_ingest_publisher.md b/_docs/02_tasks/done/AZ-659_frame_ingest_publisher.md new file mode 100644 index 0000000..c30508c --- /dev/null +++ b/_docs/02_tasks/done/AZ-659_frame_ingest_publisher.md @@ -0,0 +1,64 @@ +# Multi-Consumer Frame Publisher + Back-Pressure Drops + +**Task**: AZ-659_frame_ingest_publisher +**Name**: Tokio broadcast publisher + per-consumer drop counters + zero-copy `Arc` +**Description**: Publish `Frame`s through a single multi-consumer channel using `Arc` for pixel data so consumers do not copy. Drop frames when downstream consumers fall behind beyond a configured queue depth; record per-consumer drop counters with reason tags. +**Complexity**: 3 points +**Dependencies**: AZ-640_initial_structure, AZ-657_frame_ingest_rtsp_session, AZ-658_frame_ingest_decoder +**Component**: frame_ingest +**Tracker**: AZ-659 +**Epic**: AZ-627 + +## Problem + +Three downstream consumers (`detection_client`, `movement_detector`, `telemetry_stream`) all need the same frames at the same rate. A single-consumer queue would serialise the slowest; a per-consumer fan-out with cloned pixel buffers would multiply memory. The right structure is a Tokio `broadcast` channel (or equivalent) carrying `Arc` so pixels are shared by reference. Slow consumers drop their oldest frame, with the drop counted (and reason-tagged) — never silently coalesced. + +## Outcome + +- `FramePublisher::subscribe() -> FrameReceiver` returns a per-consumer receiver. +- `Frame` carries `Arc` for `pixels` so consumers do not copy. +- When a consumer falls behind beyond `channel_depth` (configurable, default 4), the oldest frame is dropped for THAT consumer; per-consumer counters increment with reason tag (`{detection_client_slow, movement_detector_slow, telemetry_slow}`). +- Health surface: per-consumer drop counters, total publish count. + +## Scope + +### Included +- `tokio::sync::broadcast` (or equivalent) with `Arc` payload. +- Per-consumer drop counter (statically known three consumer ids; future-extensible). +- Channel-depth config. + +### Excluded +- RTSP session (task 18). +- Decoder (task 19). + +## Acceptance Criteria + +**AC-1: Three consumers receive every frame at nominal rate** +Given three subscribers consuming at 30 fps and source at 30 fps +When the publisher runs for 10 s +Then each consumer observes ~300 frames; per-consumer drop counters = 0. + +**AC-2: Slow consumer drops, fast consumers unaffected** +Given a slow consumer that yields every 200 ms while source is 30 fps and `channel_depth = 4` +When the publisher runs for 5 s +Then the slow consumer's drop counter increments and fast consumers continue to receive every frame. + +**AC-3: Zero-copy under load** +Given a publisher emitting at 30 fps for 60 s with three subscribers +When peak memory is sampled +Then memory does not scale linearly with consumer count (i.e. `Arc` is correctly shared). + +## Non-Functional Requirements + +**Performance** +- Publish-to-consumer p99 ≤5 ms (helps keep total RTSP-rx-to-publish under the 30 ms p99 budget). + +**Reliability** +- Drops are counted with reason; never silent. +- No unbounded memory growth on slow consumer. + +## Runtime Completeness + +- **Named capability**: lossy multi-consumer frame fan-out with `Arc`. +- **Production code that must exist**: real broadcast channel; real per-consumer drop accounting. +- **Unacceptable substitutes**: cloning pixel buffers per consumer is unacceptable (multiplies memory); blocking the publisher on a slow consumer is unacceptable (gates the whole pipeline). diff --git a/_docs/02_tasks/done/AZ-660_detection_client_grpc_stream.md b/_docs/02_tasks/done/AZ-660_detection_client_grpc_stream.md new file mode 100644 index 0000000..048ab30 --- /dev/null +++ b/_docs/02_tasks/done/AZ-660_detection_client_grpc_stream.md @@ -0,0 +1,77 @@ +# Detection gRPC Bi-Directional Stream + Frame Budgeting + +**Task**: AZ-660_detection_client_grpc_stream +**Name**: Bi-directional gRPC stream to ../detections + drop-oldest frame budgeting +**Description**: Single bi-directional gRPC stream to the external `../detections` service. Reconnect on stream loss with bounded exponential backoff. Frame budgeting: drop older in-flight frames if a new frame arrives before the previous response, respecting the Tier-1 ≤100 ms/frame target. +**Complexity**: 5 points +**Dependencies**: AZ-640_initial_structure, AZ-659_frame_ingest_publisher +**Component**: detection_client +**Tracker**: AZ-660 +**Epic**: AZ-628 + +## Problem + +`detection_client` is the only autopilot component talking to `../detections`. The contract is a bi-directional gRPC stream; the client must maintain it (reconnect with bounded backoff), respect the Tier-1 latency target by NOT queueing frames indefinitely (drop-oldest in-flight when a newer frame arrives), and never block the upstream `frame_ingest` publisher. + +## Outcome + +- `DetectionClient::run(frame_rx)` maintains one bi-directional gRPC stream to `../detections`; reconnect on stream loss with exponential backoff capped at 30 s. +- Outbound: send each `Frame` (skipping `ai_locked` ones) up to `max_concurrent_in_flight` (default 2); drop older in-flight frames when the budget is full and a new frame arrives (logged as `budget_drop`). +- Inbound: receive `DetectionBatch` and publish on the output channel; tag with the source frame's `monotonic_ts`. +- Health surface: `gRPC_connection_state`, `requests_in_flight`, `latency_p50/p99`, `errors_by_kind`, `budget_drops_total`. + +## Scope + +### Included +- `tonic` (or equivalent) gRPC client + bi-directional streaming. +- Reconnect state machine. +- In-flight tracker (sliding window of `frame_seq`). +- Drop-oldest budgeting. + +### Excluded +- Schema validation + model_version handling (task 22). +- The `../detections` service itself (separate repo). + +## Acceptance Criteria + +**AC-1: Happy path against fixture** +Given a fixture gRPC server that returns a `DetectionBatch` per request within 50 ms +When `DetectionClient::run` is started against a 30 fps frame source for 10 s +Then ≥285 `DetectionBatch` are observed on the output channel; latency_p99 ≤100 ms; budget_drops_total = 0. + +**AC-2: Reconnect after server restart** +Given a healthy stream +When the gRPC server is killed and restarted +Then the client reconnects within ≤2 s; subsequent frames flow through. + +**AC-3: Budget drop on slow server** +Given the server takes 200 ms per response and the source is 30 fps +When the client runs for 5 s +Then `budget_drops_total > 0`, frames continue to flow, and the publisher is never blocked. + +**AC-4: ai_locked frames are skipped** +Given a frame stream where every 5th frame has `ai_locked = true` +When the client runs +Then no requests are sent for `ai_locked` frames (observable via outgoing count). + +## Non-Functional Requirements + +**Performance** +- Per-frame round-trip ≤100 ms p99 (Tier-1 NFR; mostly owned by `../detections`). +- Reconnect latency: ≤2 s after `../detections` returns. + +**Reliability** +- Drop-oldest never queues indefinitely. +- Reconnect is bounded. + +## Contract + +- gRPC service contract owner: `../_docs/03_detections.md`. +- Canonical typed model: `data_model.md §Detection`, `§DetectionBatch`. + +## Runtime Completeness + +- **Named capability**: bi-directional gRPC stream against `../detections`. +- **Production code that must exist**: real `tonic` (or equivalent) bi-directional stream; real budgeting. +- **Allowed external stubs**: a fixture gRPC server in tests; the real `../detections` for integration. +- **Unacceptable substitutes**: a unary call-per-frame instead of streaming is unacceptable (multiplies per-request overhead). diff --git a/_docs/02_tasks/done/AZ-661_detection_client_schema_and_health.md b/_docs/02_tasks/done/AZ-661_detection_client_schema_and_health.md new file mode 100644 index 0000000..745c683 --- /dev/null +++ b/_docs/02_tasks/done/AZ-661_detection_client_schema_and_health.md @@ -0,0 +1,62 @@ +# Detection Schema Validation + Model-Version + Health + +**Task**: AZ-661_detection_client_schema_and_health +**Name**: Response schema validation + model_version tracking + Tier-1 health degradation signal +**Description**: Validate every `DetectionBatch` response against the schema version the client was built against. Surface a hard error on schema mismatch (never silent downcast). Track `model_version`; on change, surface to `scan_controller` so per-class thresholds can be reloaded. Track sliding-window latency; on `latency_p99 > 100 ms` flip health → yellow so `scan_controller` can degrade to alternate-frame inference. +**Complexity**: 2 points +**Dependencies**: AZ-640_initial_structure, AZ-660_detection_client_grpc_stream +**Component**: detection_client +**Tracker**: AZ-661 +**Epic**: AZ-628 + +## Problem + +Schema drift between `../detections` and autopilot must be caught loudly — not silently downcast. The model version can change at runtime (model swap); when it does, the per-class confidence thresholds may need to be reloaded by `scan_controller`. The Tier-1 latency target (≤100 ms) is mostly owned by `../detections` but autopilot must observe drift and surface health degradation so the scan controller can take action. + +## Outcome + +- Every response is validated against the bundled schema; on mismatch, returns a hard error to the output channel and health → red. +- `last_model_version` is tracked; on change, a `ModelVersionChanged(new_version)` event is emitted on the output channel. +- A sliding-window latency tracker (e.g. last 1 min) emits a `Tier1Degraded { reason: HighLatency }` event when `latency_p99 > 100 ms`. + +## Scope + +### Included +- Schema validation hook on every response. +- `model_version` tracker. +- Sliding-window latency tracker + degradation signal. + +### Excluded +- The reaction to `Tier1Degraded` (lives in `scan_controller`). +- The schema definition itself (lives in the contract). + +## Acceptance Criteria + +**AC-1: Schema mismatch surfaces as hard error** +Given the fixture server returns a `DetectionBatch` with an unknown field type +When the client validates the response +Then a hard error is emitted on the output channel and `errors_by_kind{kind="schema_mismatch"}` increments by 1. + +**AC-2: Model version change is signalled** +Given the server reports `model_version = "v1.2"` on initial stream open +When a subsequent response reports `model_version = "v1.3"` +Then exactly one `ModelVersionChanged("v1.3")` event is emitted. + +**AC-3: Latency degradation signal** +Given the server's response latency rises to 150 ms p99 over a 1-min window +When the latency tracker evaluates +Then `Tier1Degraded { reason: HighLatency }` is emitted exactly once until latency falls back below 100 ms. + +## Non-Functional Requirements + +**Performance** +- Validation overhead: ≤1 ms per response. + +**Reliability** +- Schema mismatches never silent. + +## Runtime Completeness + +- **Named capability**: response schema validation + model-version awareness + latency-degradation signal. +- **Production code that must exist**: real schema validation; real model-version tracker; real percentile tracker. +- **Unacceptable substitutes**: silently downcasting an unknown response shape is unacceptable. diff --git a/_docs/03_implementation/batch_18_cycle1_report.md b/_docs/03_implementation/batch_18_cycle1_report.md new file mode 100644 index 0000000..f9fdd0f --- /dev/null +++ b/_docs/03_implementation/batch_18_cycle1_report.md @@ -0,0 +1,68 @@ +# Batch 18 — Cycle 1 Implementation Report + +**Tasks**: AZ-659, AZ-660, AZ-661 +**Completed**: 2026-05-20 +**Status**: All tests pass; code review PASS_WITH_WARNINGS; committed `0854d3b` + +--- + +## AZ-659 — frame_ingest publisher (3 pts) + +**Files added/changed**: +- `crates/frame_ingest/src/internal/publisher.rs` — `FramePublisher`, `FrameReceiver`, `ConsumerId`, `PublisherStats` +- `crates/frame_ingest/src/internal/mod.rs` — exports `publisher` +- `crates/frame_ingest/src/lib.rs` — `FrameIngestHandle` extended with `subscribe_as`, `publisher`, `dropped_frames`, `publishes_total` +- `crates/frame_ingest/tests/publisher.rs` — AC-1/2/3 integration tests + +**ACs**: All passing. + +--- + +## AZ-660 — detection_client gRPC bi-directional stream (5 pts) + +**Files added/changed**: +- `crates/detection_client/Cargo.toml` — added `tonic`, `prost`, `tonic-prost-build`, `protoc-bin-vendored` +- `crates/detection_client/build.rs` — proto codegen via `tonic-prost-build` +- `crates/detection_client/proto/detections.proto` — gRPC contract (FrameRequest / DetectionResponse bi-di stream) +- `crates/detection_client/src/internal/mod.rs` — module registry +- `crates/detection_client/src/internal/proto.rs` — generated code re-export +- `crates/detection_client/src/internal/budget.rs` — `BudgetTracker` (drop-oldest VecDeque, default capacity 2) +- `crates/detection_client/src/internal/stats.rs` — `DetectionStats` (lock-free AtomicU64 counters) +- `crates/detection_client/src/internal/runtime.rs` — supervisor + `run_stream_session` with bounded backoff reconnect +- `crates/detection_client/src/lib.rs` — `DetectionClient`, `DetectionClientConfig`, `DetectionClientHandle`, `DetectionEvent`, `ConnectionState` +- `crates/detection_client/tests/stream.rs` — AC-1/2/3/4 integration tests (fixture in-process gRPC server) + +**ACs**: All passing. + +--- + +## AZ-661 — schema validation + model_version + latency degradation (2 pts) + +Implemented inside the same `detection_client` crates (AC-660 and AC-661 share the same modules): +- `src/internal/latency.rs` — `LatencyWindow` ring-buffer + `DegradationTransition` latch +- `src/internal/runtime.rs::handle_response` — schema version check, model_version latch, Tier1 degradation evaluation after every response +- `crates/detection_client/tests/stream.rs` — AC-1/2/3 integration tests + +**ACs**: All passing. + +--- + +## Code Review + +**Verdict**: PASS_WITH_WARNINGS — see `_docs/03_implementation/reviews/batch_18_review.md`. + +Findings: +- F1 (Medium, fixed): dead code in `handle_response` (`let now`, `let _ = in_flight`) removed. +- F2–F4: Low findings, no action required this batch. + +--- + +## Architecture / Doc Updates + +- `_docs/02_document/module-layout.md` — `frame_ingest` and `detection_client` sections updated to reflect actual streaming API. + +--- + +## Remaining tasks in `todo/` + +9 tasks remaining across 3 components (movement_detector, semantic_analyzer, scan_controller). diff --git a/_docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md b/_docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md new file mode 100644 index 0000000..6ec3030 --- /dev/null +++ b/_docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md @@ -0,0 +1,85 @@ +# Cumulative Code Review — Batches 16-18 (Cycle 1) + +**Scope**: AZ-658, AZ-680, AZ-681, AZ-659, AZ-660, AZ-661 +**Date**: 2026-05-20 +**Overall Verdict**: PASS_WITH_WARNINGS + +--- + +## Scope Summary + +| Batch | Tasks | Components | +|-------|-------|-----------| +| 16 | AZ-658 frame_ingest decoder | frame_ingest | +| 17 | AZ-680 operator_bridge command dispatch; AZ-681 safety+BIT ack | shared, scan_controller, mission_executor, operator_bridge | +| 18 | AZ-659 frame_ingest publisher; AZ-660 detection_client gRPC stream; AZ-661 schema+health | frame_ingest, detection_client | + +--- + +## Cross-Batch Architecture Consistency + +### Layer compliance (all batches) + +No layer violations found across batches 16-18. Every crate imports only `shared` (Layer 1) for cross-component types. Cross-component dispatch uses traits in `shared::contracts`. The `detection_client` receives a `broadcast::Receiver` injected by the composition root — it does not import `frame_ingest`. + +### Pattern consistency + +| Pattern | Batches 16-18 usage | +|---------|---------------------| +| Async actor model | All components expose `run()` → `JoinHandle` + `Handle`. ✓ | +| `shared::models` for data | `Frame`, `DetectionBatch`, `BoundingBox`, `Detection` all come from `shared`. ✓ | +| `shared::contracts` for cross-cutting dispatch | `ScanCommandRouter`, `MissionSafetyRouter`, `BitReportSeverityLookup` added in batch 17; `detection_client` and `frame_ingest` do not need new traits. ✓ | +| Lock-free counters | `AtomicU64` used uniformly across `detection_client::DetectionStats`, `frame_ingest::PublisherStats`. ✓ | +| Broadcast channels for fan-out | Batch 18 adds `FramePublisher` (wrapping `tokio::sync::broadcast`) for the frame pipeline; consistent with the existing telemetry broadcast pattern. ✓ | + +### Interface wiring readiness + +The composition root (`crates/autopilot/src/runtime.rs`) still needs to wire: +- `frame_ingest.handle().subscribe_as(ConsumerId::DetectionClient)` → raw receiver forwarded to `DetectionClient::run(frame_rx)` +- `detection_client_handle.subscribe_events()` → event receiver forwarded to `scan_controller` and `telemetry_stream` + +Neither wiring is in scope for batches 16-18 — they belong to the final runtime composition task. No interface mismatch found. + +--- + +## Findings (cumulative, deduplicated) + +| # | Severity | Category | File:Line | Title | Batch | Disposition | +|---|----------|----------|-----------|-------|-------|-------------| +| 1 | Low | Architecture | `detection_client/src/lib.rs` | `pub mod internal` exposes proto server types to external crates | 18 | Accepted: required for integration test fixture server; practical risk negligible | +| 2 | Low | Maintainability | `detection_client/src/internal/stats.rs:66` | `note_orphan_response` increments `stream_errors_total` — imprecise bucket | 18 | Accepted: additive counter, low severity; add `orphan_responses_total` in next stats refactor | +| 3 | Low | Performance | `detection_client/src/internal/runtime.rs:build_request` | Pixel buffer copy per gRPC frame | 18 | Accepted: unavoidable with current prost stack; revisit when `prost bytes` feature is evaluated | +| 4 | Low | Architecture | `crates/autopilot/src/runtime.rs:84` | Pre-existing dead-code lint on `vlm_provider_name` | 16 | Pre-existing; tracked in `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` | + +**Critical**: 0 | **High**: 0 | **Medium**: 0 (one Medium from batch 18 was fixed inline) + +--- + +## Per-Batch Batch Review Cross-Reference + +| Batch | Per-batch verdict | Findings fixed | Open low/med | +|-------|------------------|----------------|-------------| +| 16 | PASS_WITH_WARNINGS | — | 1 Low (FFmpeg EAGAIN string match), 1 Low (autopilot dead-code) | +| 17 | PASS | — | None | +| 18 | PASS_WITH_WARNINGS | F1 Medium (dead code) fixed inline | 3 Low accepted | + +--- + +## Open Risks + +1. **`mission_executor` polling race** — `ac1_multirotor_happy_path_reaches_done` (and the earlier `ac3`) intermittently fail under load. Tracked in `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md`. Not a production defect; fix in the next `mission_executor` batch. + +2. **Composition root wiring gap** — `frame_ingest` publisher and `detection_client` supervisor are not yet wired in `autopilot/src/runtime.rs`. This is expected and intentional; the composition root is wired in a dedicated final-assembly task once all leaf components are done. + +3. **Real `../detections` service not tested** — `detection_client` tests use a fixture in-process gRPC server. End-to-end integration against the real service is scoped to the suite-level e2e harness. + +--- + +## Quality Gate Status (batches 16-18 combined) + +- `cargo fmt --all`: clean +- `cargo clippy -p frame_ingest -p detection_client --all-targets -- -D warnings`: clean +- `cargo test -p frame_ingest -p detection_client`: all passing (17 unit + 3 publisher + 5 rtsp_lifecycle + 10 detection_client unit + 7 detection_client integration) +- `cargo test --workspace`: one pre-existing flake in `mission_executor` (documented, not blocking) + +**Verdict: PASS_WITH_WARNINGS — no Critical or High findings; proceed to batch 19.** diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 4605a32..013f75e 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -7,23 +7,23 @@ name: Implement status: between-batches sub_step: phase: 0 - name: batch-18-select + name: batch-19-select detail: "" retry_count: 0 cycle: 1 tracker: jira ## Last Completed Batch -batch: 17 -commit: c4eff40 -ticket: AZ-680, AZ-681 -jira_status: In Testing (confirmed via read-back for both) -report: _docs/03_implementation/batch_17_cycle1_report.md -cumulative_review: _docs/03_implementation/cumulative_review_batches_13-15_cycle1_report.md +batch: 18 +commit: 0854d3b +ticket: AZ-659, AZ-660, AZ-661 +jira_status: In Testing (confirmed via read-back for all three) +report: _docs/03_implementation/batch_18_cycle1_report.md +cumulative_review: _docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md ## Process Leftovers -- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 17 -- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; flake also hit `ac1` during batch 17 workspace run +- `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 18 +- `_docs/_process_leftovers/2026-05-20_mission_executor_ac3_flake.md` — still pending; fix when next mission_executor batch lands ## Cumulative Review Cadence -Last cumulative: batches 13–15. Next due: end of batch 18. +Last cumulative: batches 16–18. Next due: end of batch 21 (or sooner if a large-scope batch warrants it).