Files
autopilot/crates/vlm_client/src/internal/wire.rs
T
Oleksandr Bezdieniezhnykh e56d428753 [AZ-649] [AZ-674] [AZ-667] telemetry + vlm schema + mapobjects hydrate batch 6
AZ-649 mission_executor telemetry forwarding:
- shared::models::telemetry::UavTelemetry canonical model
- TelemetryForwarder with atomic ArcSwap snapshot + 3 lossy
  tokio::sync::broadcast channels (MissionExecutor, ScanController,
  MavlinkUplink) + per-consumer drop counters
- MavlinkProjection::from_mavlink for HEARTBEAT/GLOBAL_POSITION_INT/
  ATTITUDE/SYS_STATUS
- spawn_mavlink_pump bridges mavlink_layer into the forwarder at the
  binary edge

AZ-674 vlm_client schema validation + model_version tracking:
- AssessmentParser owns schema validation + model-version state
- wire::read_response_raw splits raw bytes from parsing so invalid
  payloads can be logged size-capped
- VlmStatus gains an Inconclusive variant; exhaustive-match test
  guards downstream consumers
- VlmPipelineStatus mirrors the new variant in shared::models::poi

AZ-667 mapobjects_store hydrate + pending logs + cascade:
- SyncState enum aligned with description.md (FreshBoot, Synced,
  CachedFallback, Degraded, Failed)
- Store::hydrate(MapObjectsBundle) replaces in-memory map atomically;
  freshness=Stale -> CachedFallback
- classify() + end_of_pass append MapObjectObservation events to
  pending_observations (New/Moved/Existing/RemovedCandidate)
- apply_decline + LocalAppended ignored items append to pending_ignored
- drain_pending() returns and clears both logs
- cascade_mission(id) purges by_cell + IgnoredSet + pending logs
- Health surface reports sync_state, pending_obs, pending_ign

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 17:40:43 +03:00

170 lines
5.7 KiB
Rust

//! Wire framing for NanoLLM UDS IPC.
//!
//! Single request → single response, length-prefixed JSON:
//!
//! ```text
//! uint32 BE length || JSON payload
//! ```
//!
//! The request payload is `{"prompt": "...", "roi_b64": "..."}`. The
//! response payload is a `shared::models::vlm::VlmAssessment` JSON
//! object — the same shape `VlmProvider::assess` returns. AZ-674 will
//! add schema-version validation on top of this; AZ-673 leaves the
//! body un-validated beyond `serde_json::from_slice`.
use base64::Engine;
use serde::{Deserialize, Serialize};
#[cfg(test)]
use shared::models::vlm::VlmAssessment;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
/// Hard maximum on any single inbound frame. Defends against a peer
/// (or a corrupted peer) declaring an arbitrarily large length.
pub const MAX_FRAME_BYTES: u32 = 8 * 1024 * 1024;
#[derive(Debug, Serialize, Deserialize)]
pub struct AssessRequest {
pub prompt: String,
/// Base64-encoded ROI bytes. Kept inline in the JSON envelope so
/// the wire is one read/write per direction.
pub roi_b64: String,
}
#[derive(Debug, thiserror::Error)]
pub enum WireError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("frame too large: {0} bytes (max {MAX_FRAME_BYTES})")]
FrameTooLarge(u32),
#[error("json: {0}")]
Json(#[from] serde_json::Error),
#[error("unexpected eof while reading frame body")]
UnexpectedEof,
}
pub async fn write_request<W: AsyncWrite + Unpin>(
w: &mut W,
prompt: &str,
roi: &[u8],
) -> Result<(), WireError> {
let req = AssessRequest {
prompt: prompt.to_string(),
roi_b64: base64::engine::general_purpose::STANDARD.encode(roi),
};
let body = serde_json::to_vec(&req)?;
let len = body.len() as u32;
if len > MAX_FRAME_BYTES {
return Err(WireError::FrameTooLarge(len));
}
w.write_all(&len.to_be_bytes()).await?;
w.write_all(&body).await?;
w.flush().await?;
Ok(())
}
/// Read one length-prefixed frame body. The body is returned as raw
/// bytes; JSON parsing is the [`crate::internal::parser`]'s job
/// (AZ-674 §AC-2 — schema-invalid responses must be observable as
/// `VlmAssessment{ status: SchemaInvalid }`, not as `Err`).
pub async fn read_response_raw<R: AsyncRead + Unpin>(r: &mut R) -> Result<Vec<u8>, WireError> {
let mut lenbuf = [0u8; 4];
r.read_exact(&mut lenbuf).await?;
let len = u32::from_be_bytes(lenbuf);
if len > MAX_FRAME_BYTES {
return Err(WireError::FrameTooLarge(len));
}
let mut body = vec![0u8; len as usize];
let n = r.read_exact(&mut body).await?;
if n != body.len() {
return Err(WireError::UnexpectedEof);
}
Ok(body)
}
/// Legacy combined-read helper used by the in-tree wire-layer tests.
/// Production code calls `read_response_raw` + `AssessmentParser::parse`.
#[cfg(test)]
pub async fn read_response<R: AsyncRead + Unpin>(r: &mut R) -> Result<VlmAssessment, WireError> {
let body = read_response_raw(r).await?;
let assessment: VlmAssessment = serde_json::from_slice(&body)?;
Ok(assessment)
}
#[cfg(test)]
mod tests {
use super::*;
use shared::models::vlm::{VlmLabel, VlmStatus};
use tokio::io::duplex;
#[tokio::test]
async fn round_trip_request_and_response() {
// Arrange
let (mut a, mut b) = duplex(64 * 1024);
let prompt = "describe";
let roi = b"\xff\xd8\xff\xe0\x00\x10JFIF".to_vec();
// Act — client side writes the request, fixture side reads it
// and writes back a canned response.
let fixture = tokio::spawn(async move {
// Read request frame.
let mut lenbuf = [0u8; 4];
b.read_exact(&mut lenbuf).await.unwrap();
let len = u32::from_be_bytes(lenbuf) as usize;
let mut req_buf = vec![0u8; len];
b.read_exact(&mut req_buf).await.unwrap();
let req: AssessRequest = serde_json::from_slice(&req_buf).unwrap();
assert_eq!(req.prompt, "describe");
assert_eq!(
base64::engine::general_purpose::STANDARD
.decode(req.roi_b64)
.unwrap()
.as_slice(),
b"\xff\xd8\xff\xe0\x00\x10JFIF"
);
// Write canned response.
let response = VlmAssessment {
label: VlmLabel::ConfirmedConcealedPosition,
confidence: 0.91,
evidence_spans: vec!["foliage".into()],
reason: "match".into(),
status: VlmStatus::Ok,
latency_ms: 12,
model_version: "VILA1.5-3B-int4".into(),
};
let body = serde_json::to_vec(&response).unwrap();
let len = body.len() as u32;
b.write_all(&len.to_be_bytes()).await.unwrap();
b.write_all(&body).await.unwrap();
b.flush().await.unwrap();
});
write_request(&mut a, prompt, &roi).await.unwrap();
let resp = read_response(&mut a).await.unwrap();
fixture.await.unwrap();
// Assert
assert_eq!(resp.status, VlmStatus::Ok);
assert_eq!(resp.label, VlmLabel::ConfirmedConcealedPosition);
assert_eq!(resp.model_version, "VILA1.5-3B-int4");
}
#[tokio::test]
async fn rejects_oversized_inbound_frame() {
// Arrange
let (mut a, mut b) = duplex(64);
let huge = MAX_FRAME_BYTES + 1;
b.write_all(&huge.to_be_bytes()).await.unwrap();
b.flush().await.unwrap();
// Act
let err = read_response(&mut a).await.unwrap_err();
// Assert
assert!(matches!(err, WireError::FrameTooLarge(n) if n == huge));
}
}