mirror of
https://github.com/azaion/autopilot.git
synced 2026-06-21 08:31:10 +00:00
chore: clean up batch 18 todo stubs
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1,64 +0,0 @@
|
||||
# Multi-Consumer Frame Publisher + Back-Pressure Drops
|
||||
|
||||
**Task**: AZ-659_frame_ingest_publisher
|
||||
**Name**: Tokio broadcast publisher + per-consumer drop counters + zero-copy `Arc<Bytes>`
|
||||
**Description**: Publish `Frame`s through a single multi-consumer channel using `Arc<Bytes>` for pixel data so consumers do not copy. Drop frames when downstream consumers fall behind beyond a configured queue depth; record per-consumer drop counters with reason tags.
|
||||
**Complexity**: 3 points
|
||||
**Dependencies**: AZ-640_initial_structure, AZ-657_frame_ingest_rtsp_session, AZ-658_frame_ingest_decoder
|
||||
**Component**: frame_ingest
|
||||
**Tracker**: AZ-659
|
||||
**Epic**: AZ-627
|
||||
|
||||
## Problem
|
||||
|
||||
Three downstream consumers (`detection_client`, `movement_detector`, `telemetry_stream`) all need the same frames at the same rate. A single-consumer queue would serialise the slowest; a per-consumer fan-out with cloned pixel buffers would multiply memory. The right structure is a Tokio `broadcast` channel (or equivalent) carrying `Arc<Bytes>` so pixels are shared by reference. Slow consumers drop their oldest frame, with the drop counted (and reason-tagged) — never silently coalesced.
|
||||
|
||||
## Outcome
|
||||
|
||||
- `FramePublisher::subscribe() -> FrameReceiver` returns a per-consumer receiver.
|
||||
- `Frame` carries `Arc<Bytes>` for `pixels` so consumers do not copy.
|
||||
- When a consumer falls behind beyond `channel_depth` (configurable, default 4), the oldest frame is dropped for THAT consumer; per-consumer counters increment with reason tag (`{detection_client_slow, movement_detector_slow, telemetry_slow}`).
|
||||
- Health surface: per-consumer drop counters, total publish count.
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- `tokio::sync::broadcast` (or equivalent) with `Arc<Bytes>` payload.
|
||||
- Per-consumer drop counter (statically known three consumer ids; future-extensible).
|
||||
- Channel-depth config.
|
||||
|
||||
### Excluded
|
||||
- RTSP session (task 18).
|
||||
- Decoder (task 19).
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Three consumers receive every frame at nominal rate**
|
||||
Given three subscribers consuming at 30 fps and source at 30 fps
|
||||
When the publisher runs for 10 s
|
||||
Then each consumer observes ~300 frames; per-consumer drop counters = 0.
|
||||
|
||||
**AC-2: Slow consumer drops, fast consumers unaffected**
|
||||
Given a slow consumer that yields every 200 ms while source is 30 fps and `channel_depth = 4`
|
||||
When the publisher runs for 5 s
|
||||
Then the slow consumer's drop counter increments and fast consumers continue to receive every frame.
|
||||
|
||||
**AC-3: Zero-copy under load**
|
||||
Given a publisher emitting at 30 fps for 60 s with three subscribers
|
||||
When peak memory is sampled
|
||||
Then memory does not scale linearly with consumer count (i.e. `Arc<Bytes>` is correctly shared).
|
||||
|
||||
## Non-Functional Requirements
|
||||
|
||||
**Performance**
|
||||
- Publish-to-consumer p99 ≤5 ms (helps keep total RTSP-rx-to-publish under the 30 ms p99 budget).
|
||||
|
||||
**Reliability**
|
||||
- Drops are counted with reason; never silent.
|
||||
- No unbounded memory growth on slow consumer.
|
||||
|
||||
## Runtime Completeness
|
||||
|
||||
- **Named capability**: lossy multi-consumer frame fan-out with `Arc<Bytes>`.
|
||||
- **Production code that must exist**: real broadcast channel; real per-consumer drop accounting.
|
||||
- **Unacceptable substitutes**: cloning pixel buffers per consumer is unacceptable (multiplies memory); blocking the publisher on a slow consumer is unacceptable (gates the whole pipeline).
|
||||
@@ -1,77 +0,0 @@
|
||||
# Detection gRPC Bi-Directional Stream + Frame Budgeting
|
||||
|
||||
**Task**: AZ-660_detection_client_grpc_stream
|
||||
**Name**: Bi-directional gRPC stream to ../detections + drop-oldest frame budgeting
|
||||
**Description**: Single bi-directional gRPC stream to the external `../detections` service. Reconnect on stream loss with bounded exponential backoff. Frame budgeting: drop older in-flight frames if a new frame arrives before the previous response, respecting the Tier-1 ≤100 ms/frame target.
|
||||
**Complexity**: 5 points
|
||||
**Dependencies**: AZ-640_initial_structure, AZ-659_frame_ingest_publisher
|
||||
**Component**: detection_client
|
||||
**Tracker**: AZ-660
|
||||
**Epic**: AZ-628
|
||||
|
||||
## Problem
|
||||
|
||||
`detection_client` is the only autopilot component talking to `../detections`. The contract is a bi-directional gRPC stream; the client must maintain it (reconnect with bounded backoff), respect the Tier-1 latency target by NOT queueing frames indefinitely (drop-oldest in-flight when a newer frame arrives), and never block the upstream `frame_ingest` publisher.
|
||||
|
||||
## Outcome
|
||||
|
||||
- `DetectionClient::run(frame_rx)` maintains one bi-directional gRPC stream to `../detections`; reconnect on stream loss with exponential backoff capped at 30 s.
|
||||
- Outbound: send each `Frame` (skipping `ai_locked` ones) up to `max_concurrent_in_flight` (default 2); drop older in-flight frames when the budget is full and a new frame arrives (logged as `budget_drop`).
|
||||
- Inbound: receive `DetectionBatch` and publish on the output channel; tag with the source frame's `monotonic_ts`.
|
||||
- Health surface: `gRPC_connection_state`, `requests_in_flight`, `latency_p50/p99`, `errors_by_kind`, `budget_drops_total`.
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- `tonic` (or equivalent) gRPC client + bi-directional streaming.
|
||||
- Reconnect state machine.
|
||||
- In-flight tracker (sliding window of `frame_seq`).
|
||||
- Drop-oldest budgeting.
|
||||
|
||||
### Excluded
|
||||
- Schema validation + model_version handling (task 22).
|
||||
- The `../detections` service itself (separate repo).
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Happy path against fixture**
|
||||
Given a fixture gRPC server that returns a `DetectionBatch` per request within 50 ms
|
||||
When `DetectionClient::run` is started against a 30 fps frame source for 10 s
|
||||
Then ≥285 `DetectionBatch` are observed on the output channel; latency_p99 ≤100 ms; budget_drops_total = 0.
|
||||
|
||||
**AC-2: Reconnect after server restart**
|
||||
Given a healthy stream
|
||||
When the gRPC server is killed and restarted
|
||||
Then the client reconnects within ≤2 s; subsequent frames flow through.
|
||||
|
||||
**AC-3: Budget drop on slow server**
|
||||
Given the server takes 200 ms per response and the source is 30 fps
|
||||
When the client runs for 5 s
|
||||
Then `budget_drops_total > 0`, frames continue to flow, and the publisher is never blocked.
|
||||
|
||||
**AC-4: ai_locked frames are skipped**
|
||||
Given a frame stream where every 5th frame has `ai_locked = true`
|
||||
When the client runs
|
||||
Then no requests are sent for `ai_locked` frames (observable via outgoing count).
|
||||
|
||||
## Non-Functional Requirements
|
||||
|
||||
**Performance**
|
||||
- Per-frame round-trip ≤100 ms p99 (Tier-1 NFR; mostly owned by `../detections`).
|
||||
- Reconnect latency: ≤2 s after `../detections` returns.
|
||||
|
||||
**Reliability**
|
||||
- Drop-oldest never queues indefinitely.
|
||||
- Reconnect is bounded.
|
||||
|
||||
## Contract
|
||||
|
||||
- gRPC service contract owner: `../_docs/03_detections.md`.
|
||||
- Canonical typed model: `data_model.md §Detection`, `§DetectionBatch`.
|
||||
|
||||
## Runtime Completeness
|
||||
|
||||
- **Named capability**: bi-directional gRPC stream against `../detections`.
|
||||
- **Production code that must exist**: real `tonic` (or equivalent) bi-directional stream; real budgeting.
|
||||
- **Allowed external stubs**: a fixture gRPC server in tests; the real `../detections` for integration.
|
||||
- **Unacceptable substitutes**: a unary call-per-frame instead of streaming is unacceptable (multiplies per-request overhead).
|
||||
@@ -1,62 +0,0 @@
|
||||
# Detection Schema Validation + Model-Version + Health
|
||||
|
||||
**Task**: AZ-661_detection_client_schema_and_health
|
||||
**Name**: Response schema validation + model_version tracking + Tier-1 health degradation signal
|
||||
**Description**: Validate every `DetectionBatch` response against the schema version the client was built against. Surface a hard error on schema mismatch (never silent downcast). Track `model_version`; on change, surface to `scan_controller` so per-class thresholds can be reloaded. Track sliding-window latency; on `latency_p99 > 100 ms` flip health → yellow so `scan_controller` can degrade to alternate-frame inference.
|
||||
**Complexity**: 2 points
|
||||
**Dependencies**: AZ-640_initial_structure, AZ-660_detection_client_grpc_stream
|
||||
**Component**: detection_client
|
||||
**Tracker**: AZ-661
|
||||
**Epic**: AZ-628
|
||||
|
||||
## Problem
|
||||
|
||||
Schema drift between `../detections` and autopilot must be caught loudly — not silently downcast. The model version can change at runtime (model swap); when it does, the per-class confidence thresholds may need to be reloaded by `scan_controller`. The Tier-1 latency target (≤100 ms) is mostly owned by `../detections` but autopilot must observe drift and surface health degradation so the scan controller can take action.
|
||||
|
||||
## Outcome
|
||||
|
||||
- Every response is validated against the bundled schema; on mismatch, returns a hard error to the output channel and health → red.
|
||||
- `last_model_version` is tracked; on change, a `ModelVersionChanged(new_version)` event is emitted on the output channel.
|
||||
- A sliding-window latency tracker (e.g. last 1 min) emits a `Tier1Degraded { reason: HighLatency }` event when `latency_p99 > 100 ms`.
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- Schema validation hook on every response.
|
||||
- `model_version` tracker.
|
||||
- Sliding-window latency tracker + degradation signal.
|
||||
|
||||
### Excluded
|
||||
- The reaction to `Tier1Degraded` (lives in `scan_controller`).
|
||||
- The schema definition itself (lives in the contract).
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Schema mismatch surfaces as hard error**
|
||||
Given the fixture server returns a `DetectionBatch` with an unknown field type
|
||||
When the client validates the response
|
||||
Then a hard error is emitted on the output channel and `errors_by_kind{kind="schema_mismatch"}` increments by 1.
|
||||
|
||||
**AC-2: Model version change is signalled**
|
||||
Given the server reports `model_version = "v1.2"` on initial stream open
|
||||
When a subsequent response reports `model_version = "v1.3"`
|
||||
Then exactly one `ModelVersionChanged("v1.3")` event is emitted.
|
||||
|
||||
**AC-3: Latency degradation signal**
|
||||
Given the server's response latency rises to 150 ms p99 over a 1-min window
|
||||
When the latency tracker evaluates
|
||||
Then `Tier1Degraded { reason: HighLatency }` is emitted exactly once until latency falls back below 100 ms.
|
||||
|
||||
## Non-Functional Requirements
|
||||
|
||||
**Performance**
|
||||
- Validation overhead: ≤1 ms per response.
|
||||
|
||||
**Reliability**
|
||||
- Schema mismatches never silent.
|
||||
|
||||
## Runtime Completeness
|
||||
|
||||
- **Named capability**: response schema validation + model-version awareness + latency-degradation signal.
|
||||
- **Production code that must exist**: real schema validation; real model-version tracker; real percentile tracker.
|
||||
- **Unacceptable substitutes**: silently downcasting an unknown response shape is unacceptable.
|
||||
Reference in New Issue
Block a user