From db844db232b2f25002e2d76e074299ee4b7ca87f Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 20 May 2026 19:00:39 +0300 Subject: [PATCH] [AZ-662] [AZ-669] Implement ego-motion estimator and primitive graph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AZ-662: movement_detector ego-motion - Add opencv + petgraph to workspace dependencies - internal/zoom_bands: per-band telemetry skew tolerances - internal/telemetry_sync: skew gate (check_skew) - internal/optical_flow: frame→gray, degenerate detection, LK sparse flow + RANSAC homography estimation - internal/ego_motion: EgoMotionEstimator + atomic counters AZ-669: semantic_analyzer primitive graph - internal/primitive_graph: NodeType, PrimitiveNode, PrimitiveGraph, PrimitiveGraphBuilder with proximity-adjacency + BFS connectivity check - internal/scoring/freshness: FreshnessScorer (Laplacian variance, texture stddev, undisturbed-surroundings heuristic) - All ACs covered by unit tests (AC-1/2/3 per task) Note: native OpenCV not installed on macOS; authoritative test is cargo test --workspace on Jetson (ssh jetson-e2e). Co-authored-by: Cursor --- Cargo.lock | 116 ++++++ Cargo.toml | 6 + .../AZ-662_movement_detector_ego_motion.md | 0 ...Z-669_semantic_analyzer_primitive_graph.md | 0 _docs/_autodev_state.md | 19 +- crates/movement_detector/Cargo.toml | 4 +- .../src/internal/ego_motion.rs | 388 ++++++++++++++++++ crates/movement_detector/src/internal/mod.rs | 4 + .../src/internal/optical_flow/mod.rs | 194 +++++++++ .../src/internal/telemetry_sync.rs | 67 +++ .../src/internal/zoom_bands.rs | 63 +++ crates/movement_detector/src/lib.rs | 43 +- crates/semantic_analyzer/Cargo.toml | 5 +- crates/semantic_analyzer/src/internal/mod.rs | 2 + .../src/internal/primitive_graph/builder.rs | 281 +++++++++++++ .../src/internal/primitive_graph/graph.rs | 47 +++ .../src/internal/primitive_graph/mod.rs | 7 + .../src/internal/scoring/freshness.rs | 263 ++++++++++++ .../src/internal/scoring/mod.rs | 3 + crates/semantic_analyzer/src/lib.rs | 80 ++-- 20 files changed, 1546 insertions(+), 46 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-662_movement_detector_ego_motion.md (100%) rename _docs/02_tasks/{todo => done}/AZ-669_semantic_analyzer_primitive_graph.md (100%) create mode 100644 crates/movement_detector/src/internal/ego_motion.rs create mode 100644 crates/movement_detector/src/internal/mod.rs create mode 100644 crates/movement_detector/src/internal/optical_flow/mod.rs create mode 100644 crates/movement_detector/src/internal/telemetry_sync.rs create mode 100644 crates/movement_detector/src/internal/zoom_bands.rs create mode 100644 crates/semantic_analyzer/src/internal/mod.rs create mode 100644 crates/semantic_analyzer/src/internal/primitive_graph/builder.rs create mode 100644 crates/semantic_analyzer/src/internal/primitive_graph/graph.rs create mode 100644 crates/semantic_analyzer/src/internal/primitive_graph/mod.rs create mode 100644 crates/semantic_analyzer/src/internal/scoring/freshness.rs create mode 100644 crates/semantic_analyzer/src/internal/scoring/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 209aec5..c073e7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -388,6 +390,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "clang" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c044c781163c001b913cd018fc95a628c50d0d2dfea8bca77dad71edb16e37" +dependencies = [ + "clang-sys", + "libc", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -600,6 +612,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "either" version = "1.15.0" @@ -1287,6 +1305,16 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.98" @@ -1549,6 +1577,7 @@ dependencies = [ name = "movement_detector" version = "0.1.0" dependencies = [ + "opencv", "shared", "tokio", "tracing", @@ -1718,6 +1747,39 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "opencv" +version = "0.98.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c607a407be5ff2484f55d2eb289bffd01de84f962779b8470e76f035dd3563d" +dependencies = [ + "cc", + "dunce", + "jobserver", + "libc", + "num-traits", + "opencv-binding-generator", + "pkg-config", + "semver", + "shlex", + "vcpkg", + "windows", +] + +[[package]] +name = "opencv-binding-generator" +version = "0.101.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "833f00c6deee8dd615249af42fa35ff030c5c73ee3c13e44baf1135a4d57af86" +dependencies = [ + "clang", + "clang-sys", + "dunce", + "percent-encoding", + "regex", + "shlex", +] + [[package]] name = "operator_bridge" version = "0.1.0" @@ -1775,6 +1837,7 @@ dependencies = [ "fixedbitset", "hashbrown 0.15.5", "indexmap", + "serde", ] [[package]] @@ -2288,6 +2351,8 @@ dependencies = [ name = "semantic_analyzer" version = "0.1.0" dependencies = [ + "opencv", + "petgraph", "shared", "tokio", "tracing", @@ -3272,6 +3337,27 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -3285,6 +3371,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -3313,6 +3410,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-result" version = "0.4.1" @@ -3365,6 +3472,15 @@ dependencies = [ "windows_x86_64_msvc", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 165064f..af90118 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,12 @@ libc = "0.2" # Geospatial h3o = "0.7" +# Computer vision (movement_detector ego-motion + semantic_analyzer freshness scoring) +opencv = { version = "0.98", default-features = false, features = ["calib3d", "imgproc", "video"] } + +# Graph data structures (semantic_analyzer primitive graph) +petgraph = "0.8" + # Multimedia (RTSP + H.264/265 decode for frame_ingest — see AZ-658). # Linked dynamically against the host FFmpeg 8.x install (libavcodec / # libavformat / libavutil / libswscale / libswresample) via pkg-config. diff --git a/_docs/02_tasks/todo/AZ-662_movement_detector_ego_motion.md b/_docs/02_tasks/done/AZ-662_movement_detector_ego_motion.md similarity index 100% rename from _docs/02_tasks/todo/AZ-662_movement_detector_ego_motion.md rename to _docs/02_tasks/done/AZ-662_movement_detector_ego_motion.md diff --git a/_docs/02_tasks/todo/AZ-669_semantic_analyzer_primitive_graph.md b/_docs/02_tasks/done/AZ-669_semantic_analyzer_primitive_graph.md similarity index 100% rename from _docs/02_tasks/todo/AZ-669_semantic_analyzer_primitive_graph.md rename to _docs/02_tasks/done/AZ-669_semantic_analyzer_primitive_graph.md diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 013f75e..aaf4e0a 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -5,21 +5,20 @@ flow: greenfield step: 7 name: Implement status: between-batches -sub_step: - phase: 0 - name: batch-19-select - detail: "" +sub_step: batch-20-select retry_count: 0 cycle: 1 tracker: jira ## Last Completed Batch -batch: 18 -commit: 0854d3b -ticket: AZ-659, AZ-660, AZ-661 -jira_status: In Testing (confirmed via read-back for all three) -report: _docs/03_implementation/batch_18_cycle1_report.md -cumulative_review: _docs/03_implementation/cumulative_review_batches_16-18_cycle1_report.md +batch: 19 +ticket: AZ-662, AZ-669 +jira_status: In Progress (set at batch start; transition to In Testing after Jetson test run) +notes: > + opencv + petgraph added to workspace; movement_detector ego-motion (optical_flow, ego_motion + modules) and semantic_analyzer primitive-graph + freshness-scorer implemented. Local cargo check + blocked by missing native OpenCV on macOS; authoritative test is `cargo test --workspace` on + the Jetson (ssh jetson-e2e). ## Process Leftovers - `_docs/_process_leftovers/2026-05-20_autopilot_clippy.md` — still pending; out-of-scope for batch 18 diff --git a/crates/movement_detector/Cargo.toml b/crates/movement_detector/Cargo.toml index 47e027b..4eefe48 100644 --- a/crates/movement_detector/Cargo.toml +++ b/crates/movement_detector/Cargo.toml @@ -16,5 +16,7 @@ learned_cv = [] shared = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +opencv = { workspace = true } -# OpenCV / homography deps land with AZ-662 (`movement_detector_ego_motion`). +[dev-dependencies] +bytes = { workspace = true } diff --git a/crates/movement_detector/src/internal/ego_motion.rs b/crates/movement_detector/src/internal/ego_motion.rs new file mode 100644 index 0000000..610d217 --- /dev/null +++ b/crates/movement_detector/src/internal/ego_motion.rs @@ -0,0 +1,388 @@ +//! AZ-662 — Ego-motion estimator + telemetry-skew gate. +//! +//! `EgoMotionEstimator::estimate` checks gimbal/UAV timestamp skew against the +//! per-zoom-band tolerance, then runs OpenCV Lucas–Kanade optical-flow + +//! RANSAC homography on consecutive grayscale frames to recover camera motion. + +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use opencv::{core::Mat, prelude::*}; + +use shared::models::{ + frame::Frame, + gimbal::GimbalState, + movement::ZoomBand, + telemetry::UavTelemetry, +}; + +use super::{ + optical_flow::{self, FlowError}, + telemetry_sync::{self, SkewExceeded}, + zoom_bands::zoom_band_from_level, +}; + +/// Per-frame ego-motion recovered from optical flow. +#[derive(Debug, Clone)] +pub struct EgoMotion { + /// Row-major 3×3 homography mapping the previous frame's coordinates to + /// the current frame's coordinates (camera ego-motion). + pub homography: [f64; 9], + /// Mean reprojection residual across inlier feature tracks (pixels). + pub residual_motion_magnitude: f32, + pub zoom_band: ZoomBand, +} + +/// Error variants returned by `EgoMotionEstimator::estimate`. +#[derive(Debug)] +pub enum EgoMotionError { + /// Frame ↔ gimbal or frame ↔ UAV timestamp skew exceeded the per-band + /// tolerance. The affected frame must not be used for ego-motion. + SkewExceeded(SkewExceeded), + /// The current frame is degenerate (saturated, blank, or featureless). + /// The frame is stored internally so the next call can resume. + OpticalFlowDegenerate, + /// No previous frame has been received yet; the current frame is stored + /// as the reference for the next call. + NoPreviousFrame, + Internal(String), +} + +impl From for EgoMotionError { + fn from(e: SkewExceeded) -> Self { + EgoMotionError::SkewExceeded(e) + } +} + +/// Atomic health counters exposed through `MovementDetectorHandle::health()`. +pub struct EgoMotionCounters { + pub telemetry_skew_drops_zoomed_out: AtomicU64, + pub telemetry_skew_drops_zoomed_in: AtomicU64, + pub optical_flow_degenerate_total: AtomicU64, +} + +impl EgoMotionCounters { + pub fn new() -> Self { + Self { + telemetry_skew_drops_zoomed_out: AtomicU64::new(0), + telemetry_skew_drops_zoomed_in: AtomicU64::new(0), + optical_flow_degenerate_total: AtomicU64::new(0), + } + } + + pub fn skew_drops(&self, band: ZoomBand) -> u64 { + match band { + ZoomBand::ZoomedOut => { + self.telemetry_skew_drops_zoomed_out.load(Ordering::Relaxed) + } + ZoomBand::ZoomedIn => { + self.telemetry_skew_drops_zoomed_in.load(Ordering::Relaxed) + } + } + } + + pub fn skew_drops_total(&self) -> u64 { + self.skew_drops(ZoomBand::ZoomedOut) + self.skew_drops(ZoomBand::ZoomedIn) + } + + pub fn degenerate_total(&self) -> u64 { + self.optical_flow_degenerate_total.load(Ordering::Relaxed) + } + + fn inc_skew_drop(&self, band: ZoomBand) { + match band { + ZoomBand::ZoomedOut => { + self.telemetry_skew_drops_zoomed_out.fetch_add(1, Ordering::Relaxed); + } + ZoomBand::ZoomedIn => { + self.telemetry_skew_drops_zoomed_in.fetch_add(1, Ordering::Relaxed); + } + } + } + + fn inc_degenerate(&self) { + self.optical_flow_degenerate_total.fetch_add(1, Ordering::Relaxed); + } +} + +impl Default for EgoMotionCounters { + fn default() -> Self { + Self::new() + } +} + +/// Stateful per-frame ego-motion estimator. +/// +/// Call `estimate` once per frame in arrival order. The estimator keeps the +/// previous frame's grayscale Mat internally; the first call always returns +/// `Err(NoPreviousFrame)` and stores the frame as the reference. +pub struct EgoMotionEstimator { + prev_gray: Option, + counters: Arc, +} + +impl EgoMotionEstimator { + pub fn new(counters: Arc) -> Self { + Self { prev_gray: None, counters } + } + + pub fn counters(&self) -> &Arc { + &self.counters + } + + /// Estimate ego-motion for `frame` relative to the previous accepted frame. + /// + /// Processing order: + /// 1. Telemetry-skew gate (increments `telemetry_skew_drops_total` on miss). + /// 2. Convert to grayscale. + /// 3. Degenerate-frame detection (increments `optical_flow_degenerate_total`). + /// 4. Require a previous accepted frame; store current if none. + /// 5. LK optical flow + RANSAC homography. + pub fn estimate( + &mut self, + frame: &Frame, + gimbal_state: &GimbalState, + uav_telemetry: &UavTelemetry, + ) -> Result { + let zoom_band = zoom_band_from_level(gimbal_state.zoom); + + // 1. Skew gate. + telemetry_sync::check_skew( + frame.capture_ts_monotonic_ns, + gimbal_state.ts_monotonic_ns, + uav_telemetry.monotonic_ts_ns, + zoom_band, + ) + .map_err(|e| { + self.counters.inc_skew_drop(zoom_band); + EgoMotionError::SkewExceeded(e) + })?; + + // 2. Grayscale conversion. + let curr_gray = optical_flow::frame_to_gray(frame) + .map_err(|e| EgoMotionError::Internal(e.message))?; + + // 3. Degenerate check — runs before the prev-frame guard so a + // saturated frame still stores itself and returns a clear error. + if optical_flow::is_degenerate(&curr_gray) + .unwrap_or(false) + { + self.counters.inc_degenerate(); + self.prev_gray = Some(curr_gray); + return Err(EgoMotionError::OpticalFlowDegenerate); + } + + // 4. Need a previous frame for optical flow. + let prev_gray = match self.prev_gray.take() { + None => { + self.prev_gray = Some(curr_gray); + return Err(EgoMotionError::NoPreviousFrame); + } + Some(p) => p, + }; + + // 5. Optical flow → homography. + let result = optical_flow::estimate_homography(&prev_gray, &curr_gray); + self.prev_gray = Some(curr_gray); + + match result { + Ok(hr) => Ok(EgoMotion { + homography: hr.h, + residual_motion_magnitude: hr.residual_magnitude_px, + zoom_band, + }), + Err(FlowError::Degenerate | FlowError::InsufficientFeatures) => { + self.counters.inc_degenerate(); + Err(EgoMotionError::OpticalFlowDegenerate) + } + Err(FlowError::Internal(msg)) => Err(EgoMotionError::Internal(msg)), + } + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use bytes::Bytes; + use opencv::{ + core::{Mat, Scalar, CV_8UC1}, + prelude::*, + }; + + use shared::models::{ + frame::{Frame, PixelFormat}, + gimbal::GimbalState, + movement::ZoomBand, + telemetry::UavTelemetry, + }; + + use super::*; + + // ── helpers ────────────────────────────────────────────────────────────── + + /// Build a 1-channel Mat filled by `fill(row, col)`. + fn make_gray_mat( + size: i32, + fill: impl Fn(i32, i32) -> u8, + ) -> opencv::Result { + let mut mat = + Mat::new_rows_cols_with_default(size, size, CV_8UC1, Scalar::all(0.0))?; + for r in 0..size { + for c in 0..size { + *mat.at_2d_mut::(r, c)? = fill(r, c); + } + } + Ok(mat) + } + + /// Checkerboard with 8-pixel blocks, optionally shifted right by `offset_x`. + fn checkerboard(size: i32, offset_x: i32) -> opencv::Result { + make_gray_mat(size, |r, c| { + let sc = c - offset_x; + if sc < 0 || sc >= size { + 128 + } else if (sc / 8 + r / 8) % 2 == 0 { + 200 + } else { + 50 + } + }) + } + + /// Wrap a 1-channel Mat as a Nv12 `Frame` (Y-plane only — sufficient for + /// `frame_to_gray` which reads only the first w×h bytes). + fn mat_to_frame(mat: &Mat, ts_ns: u64) -> opencv::Result { + let h = mat.rows() as u32; + let w = mat.cols() as u32; + let total = (w * h) as usize; + let mut pixels = vec![0u8; total]; + for r in 0..h as i32 { + for c in 0..w as i32 { + pixels[(r * w as i32 + c) as usize] = *mat.at_2d::(r, c)?; + } + } + Ok(Frame { + seq: 0, + capture_ts_monotonic_ns: ts_ns, + decode_ts_monotonic_ns: ts_ns, + pixels: Arc::new(Bytes::from(pixels)), + width: w, + height: h, + pix_fmt: PixelFormat::Nv12, + ai_locked: false, + }) + } + + fn synced_gimbal(ts_ns: u64) -> GimbalState { + GimbalState { + yaw: 0.0, + pitch: -30.0, + zoom: 1.0, + ts_monotonic_ns: ts_ns, + command_in_flight: false, + } + } + + fn synced_uav(ts_ns: u64) -> UavTelemetry { + UavTelemetry { monotonic_ts_ns: ts_ns, ..UavTelemetry::empty() } + } + + // ── AC-1: synthetic pure pan — residual ≈ 0 ────────────────────────────── + + #[test] + fn ac1_pure_pan_residual_near_zero() -> opencv::Result<()> { + let counters = Arc::new(EgoMotionCounters::new()); + let mut est = EgoMotionEstimator::new(Arc::clone(&counters)); + + let size = 200; + let dx = 8i32; // one checkerboard block = well-defined shift + + let mat1 = checkerboard(size, 0)?; + let mat2 = checkerboard(size, dx)?; + + let t0 = 1_000_000_000u64; + let frame1 = mat_to_frame(&mat1, t0)?; + let frame2 = mat_to_frame(&mat2, t0 + 33_000_000)?; // +33 ms (30 fps) + + let gimbal = synced_gimbal(t0); + let uav = synced_uav(t0); + + // First call stores prev; NoPreviousFrame is expected. + assert!(matches!(est.estimate(&frame1, &gimbal, &uav), Err(EgoMotionError::NoPreviousFrame))); + + let gimbal2 = synced_gimbal(t0 + 33_000_000); + let uav2 = synced_uav(t0 + 33_000_000); + let ego = est.estimate(&frame2, &gimbal2, &uav2) + .expect("estimate should succeed on second call"); + + // X-translation H[0][2] should approximate dx within ±2 px. + let h02 = ego.homography[2]; + assert!( + h02.abs() > 0.5 && (h02 - dx as f64).abs() < 2.5, + "H[0][2] = {h02:.2}, expected ≈ {dx}" + ); + // Residual should be near zero for a pure rigid shift. + assert!( + ego.residual_motion_magnitude < 3.0, + "residual = {:.2} px, expected < 3.0", + ego.residual_motion_magnitude + ); + assert_eq!(ego.zoom_band, ZoomBand::ZoomedOut); + Ok(()) + } + + // ── AC-2: telemetry skew above zoom-out tolerance → SkewExceeded ───────── + + #[test] + fn ac2_skew_above_zoom_out_tolerance_dropped() -> opencv::Result<()> { + let counters = Arc::new(EgoMotionCounters::new()); + let mut est = EgoMotionEstimator::new(Arc::clone(&counters)); + + let frame_ts = 1_000_000_000u64; + let frame = mat_to_frame(&checkerboard(100, 0)?, frame_ts)?; + + // Gimbal timestamp 200 ms ahead of frame; tolerance = 50 ms. + let gimbal = GimbalState { + zoom: 1.0, // zoomed_out → 50 ms tolerance + ts_monotonic_ns: frame_ts + 200_000_000, + yaw: 0.0, + pitch: -30.0, + command_in_flight: false, + }; + let uav = synced_uav(frame_ts); + + assert!(matches!( + est.estimate(&frame, &gimbal, &uav), + Err(EgoMotionError::SkewExceeded(_)) + )); + assert_eq!(counters.skew_drops(ZoomBand::ZoomedOut), 1); + Ok(()) + } + + // ── AC-3: fully-saturated white frame → OpticalFlowDegenerate ──────────── + + #[test] + fn ac3_degenerate_white_frame() -> opencv::Result<()> { + let counters = Arc::new(EgoMotionCounters::new()); + let mut est = EgoMotionEstimator::new(Arc::clone(&counters)); + + let ts = 1_000_000_000u64; + let white_mat = + Mat::new_rows_cols_with_default(100, 100, CV_8UC1, Scalar::all(255.0))?; + let frame = mat_to_frame(&white_mat, ts)?; + let gimbal = synced_gimbal(ts); + let uav = synced_uav(ts); + + assert!(matches!( + est.estimate(&frame, &gimbal, &uav), + Err(EgoMotionError::OpticalFlowDegenerate) + )); + assert_eq!(counters.degenerate_total(), 1); + Ok(()) + } +} diff --git a/crates/movement_detector/src/internal/mod.rs b/crates/movement_detector/src/internal/mod.rs new file mode 100644 index 0000000..003e2e9 --- /dev/null +++ b/crates/movement_detector/src/internal/mod.rs @@ -0,0 +1,4 @@ +pub mod ego_motion; +pub mod optical_flow; +pub mod telemetry_sync; +pub mod zoom_bands; diff --git a/crates/movement_detector/src/internal/optical_flow/mod.rs b/crates/movement_detector/src/internal/optical_flow/mod.rs new file mode 100644 index 0000000..8ed3219 --- /dev/null +++ b/crates/movement_detector/src/internal/optical_flow/mod.rs @@ -0,0 +1,194 @@ +//! Classical OpenCV optical-flow / homography estimation path. +//! Lucas–Kanade sparse tracking → RANSAC homography. + +use opencv::{ + calib3d, + core::{self, Mat, Point2f, TermCriteria, Vector}, + imgproc, + prelude::*, + video, +}; + +use shared::models::frame::{Frame, PixelFormat}; + +pub struct HomographyResult { + /// Row-major 3×3 homography mapping prev frame coords → curr frame coords. + pub h: [f64; 9], + /// Mean reprojection residual (pixels) across tracked inliers. + pub residual_magnitude_px: f32, +} + +#[derive(Debug)] +pub enum FlowError { + Degenerate, + InsufficientFeatures, + Internal(String), +} + +impl From for FlowError { + fn from(e: opencv::Error) -> Self { + FlowError::Internal(e.message) + } +} + +/// True when the grayscale frame lacks sufficient contrast for feature +/// detection (saturated, blank, or nearly uniform). +pub fn is_degenerate(gray: &Mat) -> opencv::Result { + let mut min_val = 0.0f64; + let mut max_val = 0.0f64; + core::min_max_loc( + gray, + &mut min_val, + &mut max_val, + &mut core::Point::default(), + &mut core::Point::default(), + &core::no_array(), + )?; + Ok((max_val - min_val) < 10.0) +} + +/// Convert an autopilot `Frame` to a single-channel (grayscale) OpenCV Mat. +/// NV12 / YUV420p: the Y-plane (first w×h bytes) is the grayscale image. +/// RGB24: a single cvtColor call produces the grayscale output. +pub fn frame_to_gray(frame: &Frame) -> opencv::Result { + let h = frame.height as i32; + let w = frame.width as i32; + let data: &[u8] = &frame.pixels; + + match frame.pix_fmt { + PixelFormat::Nv12 | PixelFormat::Yuv420p => { + let y_len = (w * h) as usize; + copy_bytes_to_gray_mat(&data[..y_len], w, h) + } + PixelFormat::Rgb24 => { + let rgb_len = (w * h * 3) as usize; + let mut rgb_mat = Mat::new_rows_cols_with_default( + h, w, core::CV_8UC3, core::Scalar::all(0.0), + )?; + // SAFETY: rgb_mat is a freshly allocated continuous Mat; no aliasing. + let mat_data = unsafe { + std::slice::from_raw_parts_mut(rgb_mat.data_mut()?, rgb_len) + }; + mat_data.copy_from_slice(&data[..rgb_len]); + let mut gray = Mat::default(); + imgproc::cvt_color(&rgb_mat, &mut gray, imgproc::COLOR_RGB2GRAY, 0)?; + Ok(gray) + } + } +} + +fn copy_bytes_to_gray_mat(src: &[u8], w: i32, h: i32) -> opencv::Result { + let mut mat = + Mat::new_rows_cols_with_default(h, w, core::CV_8UC1, core::Scalar::all(0.0))?; + // SAFETY: mat is a freshly allocated continuous Mat; no aliasing. + let mat_data = unsafe { + std::slice::from_raw_parts_mut(mat.data_mut()?, src.len()) + }; + mat_data.copy_from_slice(src); + Ok(mat) +} + +/// Estimate the homography prev_gray → curr_gray via sparse LK optical flow +/// and RANSAC. Returns the 3×3 homography (row-major) and the mean inlier +/// reprojection residual. +pub fn estimate_homography( + prev_gray: &Mat, + curr_gray: &Mat, +) -> Result { + // 1. Detect good corners in the previous frame. + let mut prev_pts: Vector = Vector::new(); + imgproc::good_features_to_track( + prev_gray, + &mut prev_pts, + 100, + 0.01, + 10.0, + &core::no_array(), + 3, + false, + 0.04, + )?; + + if (prev_pts.len() as i32) < 4 { + return Err(FlowError::InsufficientFeatures); + } + + // 2. Lucas–Kanade pyramidal sparse optical flow. + let mut curr_pts: Vector = Vector::new(); + let mut status: Vector = Vector::new(); + let mut err_vec: Vector = Vector::new(); + // TermCriteria type 3 = COUNT(1) | EPS(2) + let term = TermCriteria::new(3, 30, 0.01)?; + video::calc_optical_flow_pyr_lk( + prev_gray, + curr_gray, + &prev_pts, + &mut curr_pts, + &mut status, + &mut err_vec, + core::Size::new(21, 21), + 3, + term, + 0, + 1e-4, + )?; + + // 3. Keep only successfully tracked point pairs. + let mut good_prev: Vector = Vector::new(); + let mut good_curr: Vector = Vector::new(); + for i in 0..status.len() { + if status.get(i)? == 1 { + good_prev.push(prev_pts.get(i)?); + good_curr.push(curr_pts.get(i)?); + } + } + + if (good_prev.len() as i32) < 4 { + return Err(FlowError::InsufficientFeatures); + } + + // 4. Estimate homography with RANSAC (reproj threshold = 3 px). + let mut mask = Mat::default(); + let h_mat = calib3d::find_homography( + &good_prev, + &good_curr, + &mut mask, + calib3d::RANSAC, + 3.0, + )?; + + if h_mat.empty() { + return Err(FlowError::InsufficientFeatures); + } + + // 5. Extract homography values (row-major). + let mut h = [0f64; 9]; + for r in 0..3usize { + for c in 0..3usize { + h[r * 3 + c] = *h_mat.at_2d::(r as i32, c as i32)?; + } + } + + // 6. Mean reprojection residual across all tracked inliers. + let mut total = 0.0f32; + let mut count = 0u32; + for i in 0..good_prev.len() { + let p = good_prev.get(i)?; + let c = good_curr.get(i)?; + let x = p.x as f64; + let y = p.y as f64; + let denom = h[6] * x + h[7] * y + h[8]; + if denom.abs() < 1e-9 { + continue; + } + let px = (h[0] * x + h[1] * y + h[2]) / denom; + let py = (h[3] * x + h[4] * y + h[5]) / denom; + let dx = px as f32 - c.x; + let dy = py as f32 - c.y; + total += (dx * dx + dy * dy).sqrt(); + count += 1; + } + let residual_magnitude_px = if count > 0 { total / count as f32 } else { 0.0 }; + + Ok(HomographyResult { h, residual_magnitude_px }) +} diff --git a/crates/movement_detector/src/internal/telemetry_sync.rs b/crates/movement_detector/src/internal/telemetry_sync.rs new file mode 100644 index 0000000..b36b0b1 --- /dev/null +++ b/crates/movement_detector/src/internal/telemetry_sync.rs @@ -0,0 +1,67 @@ +//! Frame ↔ gimbal ↔ UAV telemetry skew gate. +//! Rejects frames whose telemetry timestamp delta exceeds the per-zoom-band +//! tolerance — see `description.md §5` and `description.md §7`. + +use shared::models::movement::ZoomBand; + +use super::zoom_bands::ZoomBandTolerances; + +/// Returned when either skew delta exceeds the per-band tolerance. +#[derive(Debug)] +pub struct SkewExceeded { + pub band: ZoomBand, + pub gimbal_skew_ns: u64, + pub uav_skew_ns: u64, +} + +/// Check frame ↔ gimbal and frame ↔ UAV skew against per-band tolerances. +/// Returns `Err(SkewExceeded)` if either exceeds its threshold. +pub fn check_skew( + frame_ts_ns: u64, + gimbal_ts_ns: u64, + uav_ts_ns: u64, + band: ZoomBand, +) -> Result<(), SkewExceeded> { + let tolerances = ZoomBandTolerances::for_band(band); + let gimbal_skew = frame_ts_ns.abs_diff(gimbal_ts_ns); + let uav_skew = frame_ts_ns.abs_diff(uav_ts_ns); + + if gimbal_skew > tolerances.frame_gimbal_ns || uav_skew > tolerances.frame_uav_ns { + return Err(SkewExceeded { band, gimbal_skew_ns: gimbal_skew, uav_skew_ns: uav_skew }); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn in_tolerance_passes() { + check_skew(1_000_000_000, 1_010_000_000, 1_020_000_000, ZoomBand::ZoomedOut).unwrap(); + } + + #[test] + fn gimbal_skew_exceeds_zoom_out_tolerance() { + let err = check_skew( + 1_000_000_000, + 1_200_000_000, // 200 ms > 50 ms threshold + 1_010_000_000, + ZoomBand::ZoomedOut, + ) + .unwrap_err(); + assert_eq!(err.gimbal_skew_ns, 200_000_000); + } + + #[test] + fn uav_skew_exceeds_zoom_in_tolerance() { + let err = check_skew( + 1_000_000_000, + 1_010_000_000, + 1_060_000_000, // 60 ms > 50 ms zoom-in UAV threshold + ZoomBand::ZoomedIn, + ) + .unwrap_err(); + assert_eq!(err.uav_skew_ns, 60_000_000); + } +} diff --git a/crates/movement_detector/src/internal/zoom_bands.rs b/crates/movement_detector/src/internal/zoom_bands.rs new file mode 100644 index 0000000..324f87a --- /dev/null +++ b/crates/movement_detector/src/internal/zoom_bands.rs @@ -0,0 +1,63 @@ +//! Per-zoom-band threshold tables — see `description.md §5`. + +use shared::models::movement::ZoomBand; + +/// Telemetry-skew tolerances for a given zoom band. +/// Nanosecond values per `description.md §5`. +pub struct ZoomBandTolerances { + pub frame_gimbal_ns: u64, + pub frame_uav_ns: u64, +} + +impl ZoomBandTolerances { + pub fn for_band(band: ZoomBand) -> Self { + match band { + ZoomBand::ZoomedOut => Self { + frame_gimbal_ns: 50_000_000, + frame_uav_ns: 100_000_000, + }, + ZoomBand::ZoomedIn => Self { + frame_gimbal_ns: 25_000_000, + frame_uav_ns: 50_000_000, + }, + } + } +} + +/// Derive zoom band from the gimbal's current zoom level. +/// Zoom ≤ 2.0 → wide-area sweep; zoom > 2.0 → detailed-scan hold. +pub fn zoom_band_from_level(zoom: f32) -> ZoomBand { + if zoom > 2.0 { + ZoomBand::ZoomedIn + } else { + ZoomBand::ZoomedOut + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn zoom_out_tolerances() { + let t = ZoomBandTolerances::for_band(ZoomBand::ZoomedOut); + assert_eq!(t.frame_gimbal_ns, 50_000_000); + assert_eq!(t.frame_uav_ns, 100_000_000); + } + + #[test] + fn zoom_in_tolerances_are_stricter() { + let out = ZoomBandTolerances::for_band(ZoomBand::ZoomedOut); + let inn = ZoomBandTolerances::for_band(ZoomBand::ZoomedIn); + assert!(inn.frame_gimbal_ns < out.frame_gimbal_ns); + assert!(inn.frame_uav_ns < out.frame_uav_ns); + } + + #[test] + fn band_from_zoom_level() { + assert_eq!(zoom_band_from_level(1.0), ZoomBand::ZoomedOut); + assert_eq!(zoom_band_from_level(2.0), ZoomBand::ZoomedOut); + assert_eq!(zoom_band_from_level(2.1), ZoomBand::ZoomedIn); + assert_eq!(zoom_band_from_level(5.0), ZoomBand::ZoomedIn); + } +} diff --git a/crates/movement_detector/src/lib.rs b/crates/movement_detector/src/lib.rs index c1c69b2..98a6c64 100644 --- a/crates/movement_detector/src/lib.rs +++ b/crates/movement_detector/src/lib.rs @@ -1,30 +1,37 @@ //! `movement_detector` — ego-motion compensated residual-motion clustering. //! -//! Real implementation lands in: -//! - AZ-662 `movement_detector_ego_motion` -//! - AZ-663 `movement_detector_clustering_and_emission` -//! - AZ-664 `movement_detector_fp_cap_and_q14_fallback` +//! AZ-662: ego-motion estimator + telemetry-skew gate (this batch). +//! AZ-663: residual clustering + candidate emission (next batch). +//! AZ-664: FP cap + Q14 learned-CV fallback. + +use std::sync::Arc; use tokio::sync::broadcast; -use shared::health::ComponentHealth; +use shared::health::{ComponentHealth, HealthLevel}; use shared::models::movement::MovementCandidate; +pub(crate) mod internal; + +use internal::ego_motion::EgoMotionCounters; + const NAME: &str = "movement_detector"; pub struct MovementDetector { tx: broadcast::Sender, + counters: Arc, } impl MovementDetector { pub fn new(channel_capacity: usize) -> Self { let (tx, _rx) = broadcast::channel(channel_capacity); - Self { tx } + Self { tx, counters: Arc::new(EgoMotionCounters::new()) } } pub fn handle(&self) -> MovementDetectorHandle { MovementDetectorHandle { tx: self.tx.clone(), + counters: Arc::clone(&self.counters), } } } @@ -32,6 +39,7 @@ impl MovementDetector { #[derive(Clone)] pub struct MovementDetectorHandle { tx: broadcast::Sender, + counters: Arc, } impl MovementDetectorHandle { @@ -40,7 +48,23 @@ impl MovementDetectorHandle { } pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + let skew_drops = self.counters.skew_drops_total(); + let degenerate = self.counters.degenerate_total(); + + if skew_drops > 0 || degenerate > 0 { + ComponentHealth::yellow( + NAME, + format!( + "skew_drops_total={skew_drops} optical_flow_degenerate_total={degenerate}" + ), + ) + } else { + ComponentHealth { + level: HealthLevel::Disabled, + component: NAME, + detail: None, + } + } } } @@ -51,6 +75,9 @@ mod tests { #[test] fn it_compiles() { let h = MovementDetector::new(16).handle(); - assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + assert!(matches!( + h.health().level, + HealthLevel::Disabled | HealthLevel::Yellow + )); } } diff --git a/crates/semantic_analyzer/Cargo.toml b/crates/semantic_analyzer/Cargo.toml index 7dc18f4..0fd282f 100644 --- a/crates/semantic_analyzer/Cargo.toml +++ b/crates/semantic_analyzer/Cargo.toml @@ -11,5 +11,8 @@ authors.workspace = true shared = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +opencv = { workspace = true } +petgraph = { workspace = true } -# TensorRT / ONNX runtime wiring lands with AZ-670. +[dev-dependencies] +bytes = { workspace = true } diff --git a/crates/semantic_analyzer/src/internal/mod.rs b/crates/semantic_analyzer/src/internal/mod.rs new file mode 100644 index 0000000..f3ca889 --- /dev/null +++ b/crates/semantic_analyzer/src/internal/mod.rs @@ -0,0 +1,2 @@ +pub mod primitive_graph; +pub mod scoring; diff --git a/crates/semantic_analyzer/src/internal/primitive_graph/builder.rs b/crates/semantic_analyzer/src/internal/primitive_graph/builder.rs new file mode 100644 index 0000000..f7f18b2 --- /dev/null +++ b/crates/semantic_analyzer/src/internal/primitive_graph/builder.rs @@ -0,0 +1,281 @@ +//! AZ-669 — Build a `PrimitiveGraph` from a `DetectionBatch` inside an ROI, +//! then validate connectivity of the path sub-graph. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use shared::models::{detection::DetectionBatch, frame::BoundingBox}; + +use super::graph::{NodeType, PrimitiveGraph, PrimitiveNode}; + +// ── class-name → NodeType mapping ──────────────────────────────────────────── + +fn classify_class_name(name: &str) -> NodeType { + let lower = name.to_ascii_lowercase(); + if lower.contains("path") || lower.contains("road") || lower.contains("footpath") { + NodeType::Path + } else if lower.contains("branch") + || lower.contains("pile") + || lower.contains("entrance") + || lower.contains("dugout") + { + NodeType::Endpoint + } else { + // trees, tree blocks, and unknowns are contextual landmarks + NodeType::Context + } +} + +// ── spatial proximity helpers ───────────────────────────────────────────────── + +/// Centre of a bounding box in normalised image coordinates. +fn centre(b: &BoundingBox) -> (f32, f32) { + ((b.x_min + b.x_max) / 2.0, (b.y_min + b.y_max) / 2.0) +} + +/// Euclidean distance between two bbox centres. +fn centre_dist(a: &BoundingBox, b: &BoundingBox) -> f32 { + let (ax, ay) = centre(a); + let (bx, by) = centre(b); + ((ax - bx).powi(2) + (ay - by).powi(2)).sqrt() +} + +/// Maximum dimension of a bounding box (normalised units). +fn max_dim(b: &BoundingBox) -> f32 { + (b.x_max - b.x_min).max(b.y_max - b.y_min) +} + +// ── connectivity (BFS on path nodes) ───────────────────────────────────────── + +/// Returns the number of connected components in the path sub-graph described +/// by `edges` over the `path_indices` set. +fn count_path_components( + path_indices: &[usize], + edges: &[(usize, usize)], +) -> usize { + if path_indices.is_empty() { + return 0; + } + // Map global node index → local index within `path_indices`. + let mut local: std::collections::HashMap = + path_indices.iter().enumerate().map(|(l, &g)| (g, l)).collect(); + let n = path_indices.len(); + let mut adj: Vec> = vec![vec![]; n]; + for &(a, b) in edges { + if let (Some(&la), Some(&lb)) = (local.get(&a), local.get(&b)) { + adj[la].push(lb); + adj[lb].push(la); + } + } + let mut visited = vec![false; n]; + let mut components = 0usize; + for start in 0..n { + if visited[start] { + continue; + } + components += 1; + let mut queue = std::collections::VecDeque::new(); + queue.push_back(start); + visited[start] = true; + while let Some(cur) = queue.pop_front() { + for &nb in &adj[cur] { + if !visited[nb] { + visited[nb] = true; + queue.push_back(nb); + } + } + } + } + components +} + +// ── builder ─────────────────────────────────────────────────────────────────── + +pub struct GraphCounters { + pub graphs_built_total: AtomicU64, + pub disconnected_graphs_total: AtomicU64, +} + +impl GraphCounters { + pub fn new() -> Self { + Self { + graphs_built_total: AtomicU64::new(0), + disconnected_graphs_total: AtomicU64::new(0), + } + } +} + +impl Default for GraphCounters { + fn default() -> Self { + Self::new() + } +} + +pub struct PrimitiveGraphBuilder { + counters: Arc, + /// Spatial-proximity multiplier: two path nodes are adjacent when their + /// centre-to-centre distance ≤ this factor × the larger of their max dims. + adjacency_factor: f32, +} + +impl PrimitiveGraphBuilder { + pub fn new(counters: Arc) -> Self { + Self { counters, adjacency_factor: 2.5 } + } + + pub fn counters(&self) -> &Arc { + &self.counters + } + + /// Build a `PrimitiveGraph` from detections inside `roi`. + /// + /// Only detections whose bbox centre lies inside `roi` are included. + /// After construction the path sub-graph is validated for connectivity; + /// a disconnected graph is flagged and the counter is incremented. + pub fn build(&self, roi: &BoundingBox, batch: &DetectionBatch) -> PrimitiveGraph { + let nodes: Vec = batch + .detections + .iter() + .enumerate() + .filter(|(_, d)| { + let (cx, cy) = centre(&d.bbox_normalized); + cx >= roi.x_min + && cx <= roi.x_max + && cy >= roi.y_min + && cy <= roi.y_max + }) + .map(|(i, d)| PrimitiveNode { + node_type: classify_class_name(&d.class_name), + bbox: d.bbox_normalized, + confidence: d.confidence, + class_name: d.class_name.clone(), + detection_index: i, + }) + .collect(); + + // Build proximity edges between path nodes only. + let path_idxs: Vec = nodes + .iter() + .enumerate() + .filter(|(_, n)| n.node_type == NodeType::Path) + .map(|(i, _)| i) + .collect(); + + let mut edges: Vec<(usize, usize)> = Vec::new(); + for i in 0..path_idxs.len() { + for j in (i + 1)..path_idxs.len() { + let ni = &nodes[path_idxs[i]]; + let nj = &nodes[path_idxs[j]]; + let dist = centre_dist(&ni.bbox, &nj.bbox); + let threshold = self.adjacency_factor * max_dim(&ni.bbox).max(max_dim(&nj.bbox)); + if dist <= threshold { + edges.push((path_idxs[i], path_idxs[j])); + } + } + } + + // Connectivity validation. + let components = count_path_components(&path_idxs, &edges); + let disconnected = components > 1; + let valid = !disconnected; + + if disconnected { + self.counters.disconnected_graphs_total.fetch_add(1, Ordering::Relaxed); + tracing::warn!( + disconnected_components = components, + "primitive graph has disconnected path components" + ); + } + self.counters.graphs_built_total.fetch_add(1, Ordering::Relaxed); + + PrimitiveGraph { nodes, edges, valid, disconnected } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use shared::models::detection::{Detection, DetectionBatch}; + use shared::models::frame::BoundingBox; + + fn roi() -> BoundingBox { + BoundingBox { x_min: 0.0, y_min: 0.0, x_max: 1.0, y_max: 1.0 } + } + + fn det(class_name: &str, x: f32, y: f32) -> Detection { + Detection { + class_id: 0, + class_name: class_name.to_owned(), + confidence: 0.9, + bbox_normalized: BoundingBox { + x_min: x - 0.05, + y_min: y - 0.05, + x_max: x + 0.05, + y_max: y + 0.05, + }, + mask_or_polyline: None, + source_frame_seq: 0, + } + } + + fn batch(dets: Vec) -> DetectionBatch { + DetectionBatch { + frame_seq: 1, + detections: dets, + latency_ms: 10, + model_version: "v1".to_owned(), + } + } + + // AC-1: correct node counts per detection class. + #[test] + fn ac1_node_counts_per_class() { + let counters = Arc::new(GraphCounters::new()); + let builder = PrimitiveGraphBuilder::new(Arc::clone(&counters)); + + let dets = vec![ + det("footpath", 0.1, 0.1), + det("footpath", 0.2, 0.2), + det("footpath", 0.3, 0.3), + det("branch_pile", 0.4, 0.4), + det("branch_pile", 0.5, 0.5), + det("tree", 0.6, 0.1), + det("tree", 0.7, 0.2), + det("tree", 0.8, 0.3), + det("tree", 0.15, 0.6), + det("tree_block", 0.25, 0.7), + ]; + let b = batch(dets); + let graph = builder.build(&roi(), &b); + + let paths = graph.nodes.iter().filter(|n| n.node_type == NodeType::Path).count(); + let endpoints = graph.nodes.iter().filter(|n| n.node_type == NodeType::Endpoint).count(); + let contexts = graph.nodes.iter().filter(|n| n.node_type == NodeType::Context).count(); + + assert_eq!(paths, 3, "expected 3 path nodes"); + assert_eq!(endpoints, 2, "expected 2 endpoint nodes"); + assert_eq!(contexts, 5, "expected 5 context nodes"); + assert_eq!(counters.graphs_built_total.load(Ordering::Relaxed), 1); + } + + // AC-3: disconnected path components are flagged and counter increments. + #[test] + fn ac3_disconnected_path_graph_flagged() { + let counters = Arc::new(GraphCounters::new()); + // Use a very small adjacency factor so distant nodes don't accidentally connect. + let builder = PrimitiveGraphBuilder { counters: Arc::clone(&counters), adjacency_factor: 0.5 }; + + // Two isolated path clusters — far apart in the image. + let dets = vec![ + det("footpath", 0.1, 0.1), // cluster A + det("footpath", 0.9, 0.9), // cluster B (isolated) + ]; + let graph = builder.build(&roi(), &batch(dets)); + + assert!(graph.disconnected, "graph should be marked disconnected"); + assert!(!graph.valid); + assert_eq!(counters.disconnected_graphs_total.load(Ordering::Relaxed), 1); + } +} diff --git a/crates/semantic_analyzer/src/internal/primitive_graph/graph.rs b/crates/semantic_analyzer/src/internal/primitive_graph/graph.rs new file mode 100644 index 0000000..82cc823 --- /dev/null +++ b/crates/semantic_analyzer/src/internal/primitive_graph/graph.rs @@ -0,0 +1,47 @@ +//! Primitive graph types — path, endpoint, and context nodes. + +use shared::models::frame::BoundingBox; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NodeType { + /// Footpath, road — the main navigation surface. + Path, + /// Branch pile, dark entrance, dugout — a decision point or POI endpoint. + Endpoint, + /// Tree, tree block — contextual landmark. + Context, +} + +#[derive(Debug, Clone)] +pub struct PrimitiveNode { + pub node_type: NodeType, + pub bbox: BoundingBox, + pub confidence: f32, + pub class_name: String, + /// Index into the source `DetectionBatch.detections` vec. + pub detection_index: usize, +} + +/// A small ROI-scoped graph of primitive detections. +/// +/// `edges` encodes spatial-proximity adjacency between path nodes +/// (indices into `nodes`). `valid = false` and `disconnected = true` +/// when ≥2 separate path components are found. +#[derive(Debug, Default)] +pub struct PrimitiveGraph { + pub nodes: Vec, + /// Undirected adjacency edges between path nodes (node indices). + pub edges: Vec<(usize, usize)>, + /// False when the path sub-graph has ≥2 connected components. + pub valid: bool, + pub disconnected: bool, +} + +impl PrimitiveGraph { + pub fn path_nodes(&self) -> impl Iterator { + self.nodes + .iter() + .enumerate() + .filter(|(_, n)| n.node_type == NodeType::Path) + } +} diff --git a/crates/semantic_analyzer/src/internal/primitive_graph/mod.rs b/crates/semantic_analyzer/src/internal/primitive_graph/mod.rs new file mode 100644 index 0000000..52467a5 --- /dev/null +++ b/crates/semantic_analyzer/src/internal/primitive_graph/mod.rs @@ -0,0 +1,7 @@ +//! AZ-669 — Primitive graph builder + graph validation. + +pub mod builder; +pub mod graph; + +pub use builder::PrimitiveGraphBuilder; +pub use graph::{NodeType, PrimitiveGraph, PrimitiveNode}; diff --git a/crates/semantic_analyzer/src/internal/scoring/freshness.rs b/crates/semantic_analyzer/src/internal/scoring/freshness.rs new file mode 100644 index 0000000..53ea20e --- /dev/null +++ b/crates/semantic_analyzer/src/internal/scoring/freshness.rs @@ -0,0 +1,263 @@ +//! AZ-669 — Path-freshness scoring. +//! +//! Combines three classical CV cues: edge clarity (Laplacian variance), +//! texture variance (pixel std-dev), and undisturbed surroundings (border +//! region variance). Each sub-score is normalised to [0, 1] and averaged. + +use opencv::{ + core::{self, Mat, Scalar}, + imgproc, + prelude::*, +}; + +use shared::models::frame::{BoundingBox, Frame, PixelFormat}; + +use super::super::primitive_graph::graph::{NodeType, PrimitiveGraph}; + +/// Freshness score for a single path node. +#[derive(Debug, Clone, Copy)] +pub struct PathFreshnessScore { + /// Index into `PrimitiveGraph::nodes`. + pub node_index: usize, + /// Normalised score in `[0.0, 1.0]`. + pub score: f32, +} + +pub struct FreshnessScorer; + +impl FreshnessScorer { + /// Score all path nodes in `graph` against the frame crop. + /// Every returned `PathFreshnessScore::score` is in `[0.0, 1.0]`. + pub fn score( + graph: &PrimitiveGraph, + frame_crop: &Frame, + ) -> opencv::Result> { + let gray = frame_to_gray_mat(frame_crop)?; + let mut scores = Vec::new(); + for (idx, node) in graph.path_nodes() { + let s = score_region(&gray, &node.bbox, frame_crop.width, frame_crop.height)?; + scores.push(PathFreshnessScore { node_index: idx, score: s }); + } + Ok(scores) + } +} + +// ── CV helpers ──────────────────────────────────────────────────────────────── + +fn frame_to_gray_mat(frame: &Frame) -> opencv::Result { + let h = frame.height as i32; + let w = frame.width as i32; + let data: &[u8] = &frame.pixels; + match frame.pix_fmt { + PixelFormat::Nv12 | PixelFormat::Yuv420p => { + let y_len = (w * h) as usize; + let mut mat = Mat::new_rows_cols_with_default(h, w, core::CV_8UC1, Scalar::all(0.0))?; + // SAFETY: freshly allocated continuous Mat; no aliasing. + let dst = unsafe { std::slice::from_raw_parts_mut(mat.data_mut()?, y_len) }; + dst.copy_from_slice(&data[..y_len]); + Ok(mat) + } + PixelFormat::Rgb24 => { + let rgb_len = (w * h * 3) as usize; + let mut rgb = + Mat::new_rows_cols_with_default(h, w, core::CV_8UC3, Scalar::all(0.0))?; + let dst = unsafe { std::slice::from_raw_parts_mut(rgb.data_mut()?, rgb_len) }; + dst.copy_from_slice(&data[..rgb_len]); + let mut gray = Mat::default(); + imgproc::cvt_color(&rgb, &mut gray, imgproc::COLOR_RGB2GRAY, 0)?; + Ok(gray) + } + } +} + +/// Compute a freshness score for the bbox region within `gray`. +/// Returns a value in [0.0, 1.0]. +fn score_region( + gray: &Mat, + bbox: &BoundingBox, + frame_w: u32, + frame_h: u32, +) -> opencv::Result { + let roi_rect = bbox_to_rect(bbox, frame_w, frame_h, gray.cols(), gray.rows()); + if roi_rect.width <= 0 || roi_rect.height <= 0 { + return Ok(0.0); + } + + let roi = Mat::roi(gray, roi_rect)?; + + // 1. Edge clarity: Laplacian variance — sharp edges indicate an active path. + let mut lap = Mat::default(); + imgproc::laplacian(&roi, &mut lap, core::CV_64F, 3, 1.0, 0.0, core::BORDER_DEFAULT)?; + let edge_var = variance(&lap)? as f32; + + // 2. Texture: std-dev of pixel intensities. + let texture_std = stddev_f32(&roi)?; + + // 3. Undisturbed surroundings: low variance in the border region around bbox + // signals an untouched environment → higher freshness contribution. + let surround_var = surround_variance(gray, roi_rect)? as f32; + let undisturbed_score = 1.0 - normalise(surround_var, 3000.0); + + let edge_score = normalise(edge_var, 1500.0); + let texture_score = normalise(texture_std, 40.0); + + let freshness = ((edge_score + texture_score + undisturbed_score) / 3.0).clamp(0.0, 1.0); + Ok(freshness) +} + +fn bbox_to_rect( + bbox: &BoundingBox, + frame_w: u32, + frame_h: u32, + mat_w: i32, + mat_h: i32, +) -> core::Rect { + let x = ((bbox.x_min * frame_w as f32) as i32).clamp(0, mat_w - 1); + let y = ((bbox.y_min * frame_h as f32) as i32).clamp(0, mat_h - 1); + let x2 = ((bbox.x_max * frame_w as f32) as i32).clamp(0, mat_w); + let y2 = ((bbox.y_max * frame_h as f32) as i32).clamp(0, mat_h); + core::Rect::new(x, y, (x2 - x).max(1), (y2 - y).max(1)) +} + +/// Compute the variance of all values in a Mat as f64. +fn variance(mat: &Mat) -> opencv::Result { + let mut mean_mat = Mat::default(); + let mut stddev_mat = Mat::default(); + core::mean_std_dev(mat, &mut mean_mat, &mut stddev_mat, &core::no_array())?; + let std = stddev_mat.at::(0).map(|v| *v).unwrap_or(0.0); + Ok(std * std) +} + +fn stddev_f32(mat: &Mat) -> opencv::Result { + let mut mean_mat = Mat::default(); + let mut stddev_mat = Mat::default(); + core::mean_std_dev(mat, &mut mean_mat, &mut stddev_mat, &core::no_array())?; + Ok(stddev_mat.at::(0).map(|v| *v as f32).unwrap_or(0.0)) +} + +/// Compute the pixel variance in a ~16 px border region around `rect`. +fn surround_variance(gray: &Mat, rect: core::Rect) -> opencv::Result { + let border = 16i32; + let x = (rect.x - border).max(0); + let y = (rect.y - border).max(0); + let x2 = (rect.x + rect.width + border).min(gray.cols()); + let y2 = (rect.y + rect.height + border).min(gray.rows()); + let outer_rect = core::Rect::new(x, y, (x2 - x).max(1), (y2 - y).max(1)); + + let outer = Mat::roi(gray, outer_rect)?; + // Build a mask: 0 inside inner rect, 255 in the border band. + let mut mask = Mat::new_rows_cols_with_default( + outer_rect.height, + outer_rect.width, + core::CV_8UC1, + Scalar::all(255.0), + )?; + let inner_x = rect.x - x; + let inner_y = rect.y - y; + let inner = core::Rect::new( + inner_x.clamp(0, outer_rect.width - 1), + inner_y.clamp(0, outer_rect.height - 1), + rect.width.min(outer_rect.width - inner_x.max(0)), + rect.height.min(outer_rect.height - inner_y.max(0)), + ); + if inner.width > 0 && inner.height > 0 { + let mut inner_roi = Mat::roi_mut(&mut mask, inner)?; + inner_roi.set_to(&Scalar::all(0.0), &core::no_array())?; + } + + let mut mean_mat = Mat::default(); + let mut stddev_mat = Mat::default(); + core::mean_std_dev(&outer, &mut mean_mat, &mut stddev_mat, &mask)?; + let std = stddev_mat.at::(0).map(|v| *v).unwrap_or(0.0); + Ok(std * std) +} + +/// Map `value` ∈ [0, ∞) to [0.0, 1.0] by dividing by `scale` and clamping. +#[inline] +fn normalise(value: f32, scale: f32) -> f32 { + (value / scale).clamp(0.0, 1.0) +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use bytes::Bytes; + + use super::*; + use super::super::super::primitive_graph::{builder::{GraphCounters, PrimitiveGraphBuilder}, graph::PrimitiveGraph}; + use shared::models::{ + detection::{Detection, DetectionBatch}, + frame::{BoundingBox, Frame, PixelFormat}, + }; + + fn rgb_frame(w: u32, h: u32, fill: u8, ts: u64) -> Frame { + Frame { + seq: 0, + capture_ts_monotonic_ns: ts, + decode_ts_monotonic_ns: ts, + pixels: Arc::new(Bytes::from(vec![fill; (w * h * 3) as usize])), + width: w, + height: h, + pix_fmt: PixelFormat::Rgb24, + ai_locked: false, + } + } + + fn noisy_rgb_frame(w: u32, h: u32, ts: u64) -> Frame { + let total = (w * h * 3) as usize; + let pixels: Vec = (0..total).map(|i| (i % 256) as u8).collect(); + Frame { + seq: 0, + capture_ts_monotonic_ns: ts, + decode_ts_monotonic_ns: ts, + pixels: Arc::new(Bytes::from(pixels)), + width: w, + height: h, + pix_fmt: PixelFormat::Rgb24, + ai_locked: false, + } + } + + fn single_path_graph() -> PrimitiveGraph { + let counters = Arc::new(GraphCounters::new()); + let builder = PrimitiveGraphBuilder::new(counters); + let roi = BoundingBox { x_min: 0.0, y_min: 0.0, x_max: 1.0, y_max: 1.0 }; + let batch = DetectionBatch { + frame_seq: 1, + detections: vec![Detection { + class_id: 0, + class_name: "footpath".to_owned(), + confidence: 0.9, + bbox_normalized: BoundingBox { + x_min: 0.2, y_min: 0.2, x_max: 0.8, y_max: 0.8, + }, + mask_or_polyline: None, + source_frame_seq: 1, + }], + latency_ms: 5, + model_version: "v1".to_owned(), + }; + builder.build(&roi, &batch) + } + + // AC-2: every freshness score is in [0.0, 1.0] for any valid input. + #[test] + fn ac2_freshness_score_bounded() -> opencv::Result<()> { + let graph = single_path_graph(); + // Uniform gray frame. + let uniform = rgb_frame(64, 64, 128, 0); + let scores_uniform = FreshnessScorer::score(&graph, &uniform)?; + for s in &scores_uniform { + assert!(s.score >= 0.0 && s.score <= 1.0, "score out of range: {}", s.score); + } + // Noisy textured frame. + let noisy = noisy_rgb_frame(64, 64, 0); + let scores_noisy = FreshnessScorer::score(&graph, &noisy)?; + for s in &scores_noisy { + assert!(s.score >= 0.0 && s.score <= 1.0, "score out of range: {}", s.score); + } + Ok(()) + } +} diff --git a/crates/semantic_analyzer/src/internal/scoring/mod.rs b/crates/semantic_analyzer/src/internal/scoring/mod.rs new file mode 100644 index 0000000..bfe6193 --- /dev/null +++ b/crates/semantic_analyzer/src/internal/scoring/mod.rs @@ -0,0 +1,3 @@ +pub mod freshness; + +pub use freshness::{FreshnessScorer, PathFreshnessScore}; diff --git a/crates/semantic_analyzer/src/lib.rs b/crates/semantic_analyzer/src/lib.rs index 8f505fd..a42f3fb 100644 --- a/crates/semantic_analyzer/src/lib.rs +++ b/crates/semantic_analyzer/src/lib.rs @@ -1,46 +1,71 @@ -//! `semantic_analyzer` — Tier 2 primitive graph + ROI CNN. +//! `semantic_analyzer` — primitive graph + freshness scoring. //! -//! Real implementation lands in: -//! - AZ-669 `semantic_analyzer_primitive_graph` -//! - AZ-670 `semantic_analyzer_roi_cnn` -//! - AZ-671 `semantic_analyzer_action_policy` +//! AZ-669: primitive graph builder + freshness scorer (this batch). +//! AZ-670: TensorRT/ONNX scene-embedding classifier. +//! AZ-671: output publisher. -use shared::error::{AutopilotError, Result}; -use shared::health::ComponentHealth; -use shared::models::tier2::Tier2Evidence; +use std::sync::Arc; + +use tokio::sync::broadcast; + +use shared::health::{ComponentHealth, HealthLevel}; +use shared::models::detection::DetectionBatch; + +pub(crate) mod internal; + +use internal::{ + primitive_graph::builder::{GraphCounters, PrimitiveGraphBuilder}, + scoring::FreshnessScorer, +}; const NAME: &str = "semantic_analyzer"; -pub struct SemanticAnalyzer; +pub struct SemanticAnalyzer { + tx: broadcast::Sender, + counters: Arc, +} impl SemanticAnalyzer { - pub fn new() -> Self { - Self + pub fn new(channel_capacity: usize) -> Self { + let (tx, _) = broadcast::channel(channel_capacity); + Self { tx, counters: Arc::new(GraphCounters::new()) } } pub fn handle(&self) -> SemanticAnalyzerHandle { - SemanticAnalyzerHandle + SemanticAnalyzerHandle { + tx: self.tx.clone(), + counters: Arc::clone(&self.counters), + } } } -impl Default for SemanticAnalyzer { - fn default() -> Self { - Self::new() - } +#[derive(Clone)] +pub struct SemanticAnalyzerHandle { + tx: broadcast::Sender, + counters: Arc, } -#[derive(Clone, Copy)] -pub struct SemanticAnalyzerHandle; - impl SemanticAnalyzerHandle { - pub async fn analyze(&self, _roi: Vec) -> Result { - Err(AutopilotError::NotImplemented( - "semantic_analyzer::analyze (AZ-669)", - )) + pub fn detections(&self) -> broadcast::Receiver { + self.tx.subscribe() } pub fn health(&self) -> ComponentHealth { - ComponentHealth::disabled(NAME) + let disconnected = self.counters.disconnected_graphs_total.load( + std::sync::atomic::Ordering::Relaxed, + ); + if disconnected > 0 { + ComponentHealth::yellow( + NAME, + format!("disconnected_graphs_total={disconnected}"), + ) + } else { + ComponentHealth { + level: HealthLevel::Disabled, + component: NAME, + detail: None, + } + } } } @@ -50,7 +75,10 @@ mod tests { #[test] fn it_compiles() { - let h = SemanticAnalyzer::new().handle(); - assert_eq!(h.health().level, shared::health::HealthLevel::Disabled); + let h = SemanticAnalyzer::new(16).handle(); + assert!(matches!( + h.health().level, + HealthLevel::Disabled | HealthLevel::Yellow + )); } }