diff --git a/src/gps_denied/components/vio/__init__.py b/src/gps_denied/components/vio/__init__.py index f23864f..7da1748 100644 --- a/src/gps_denied/components/vio/__init__.py +++ b/src/gps_denied/components/vio/__init__.py @@ -20,10 +20,16 @@ from gps_denied.components.vio.orbslam_backend import ( ORBVisualOdometry, SequentialVisualOdometry, ) +from gps_denied.components.vio.cuvslam_backend import ( + CuVSLAMMonoDepthVisualOdometry, + CuVSLAMVisualOdometry, +) __all__ = [ "VisualOdometry", "ISequentialVisualOdometry", "ORBVisualOdometry", "SequentialVisualOdometry", + "CuVSLAMVisualOdometry", + "CuVSLAMMonoDepthVisualOdometry", ] diff --git a/src/gps_denied/components/vio/cuvslam_backend.py b/src/gps_denied/components/vio/cuvslam_backend.py new file mode 100644 index 0000000..2c5d212 --- /dev/null +++ b/src/gps_denied/components/vio/cuvslam_backend.py @@ -0,0 +1,302 @@ +"""cuVSLAM SDK bridge backends (Jetson production path). + +Houses the two cuVSLAM-based VO backends: + + - CuVSLAMVisualOdometry — Inertial mode (stereo + IMU) + - CuVSLAMMonoDepthVisualOdometry — Mono-Depth mode (single camera + barometric depth hint) + +The cuVSLAM Python SDK is **only available on aarch64 Jetson**; on x86 +dev/CI machines the import fails and each class transparently falls back +to ``ORBVisualOdometry`` from ``components.vio.orbslam_backend``. The +fallback flag ``_CUVSLAM_AVAILABLE`` is exposed at module level so tests +and the env-aware factory can branch on platform without re-probing the +import. + +Decision recorded in +``docs/superpowers/specs/2026-04-18-oss-stack-tech-audit-design.md``: +Mono-Depth is the canonical path for the single-nadir-camera UAV +hardware; Inertial mode is retained for sprint-1 reversibility. +""" +from __future__ import annotations + +import logging +from typing import Optional + +import numpy as np + +from gps_denied.components.vio.orbslam_backend import ORBVisualOdometry +from gps_denied.components.vio.protocol import ISequentialVisualOdometry +from gps_denied.schemas import CameraParameters +from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Optional cuVSLAM SDK import (aarch64 Jetson only — x86 dev/CI must still pass) +# --------------------------------------------------------------------------- +try: + import cuvslam # type: ignore # only available on Jetson + _CUVSLAM_AVAILABLE = True +except ImportError: + cuvslam = None # type: ignore[assignment] + _CUVSLAM_AVAILABLE = False + + +# --------------------------------------------------------------------------- +# CuVSLAMVisualOdometry — NVIDIA cuVSLAM Inertial mode (Jetson, VO-01) +# --------------------------------------------------------------------------- + +class CuVSLAMVisualOdometry(ISequentialVisualOdometry): + """cuVSLAM wrapper for Jetson Orin (Inertial mode). + + Provides metric relative poses in NED (scale_ambiguous=False). + Falls back to ORBVisualOdometry internally when the cuVSLAM SDK is absent + so the same class can be instantiated on dev/CI with scale_ambiguous reflecting + the actual backend capability. + + Usage on Jetson: + vo = CuVSLAMVisualOdometry(camera_params, imu_params) + pose = vo.compute_relative_pose(prev, curr, cam) # scale_ambiguous=False + """ + + def __init__(self, camera_params: Optional[CameraParameters] = None, imu_params: Optional[dict] = None): + self._camera_params = camera_params + self._imu_params = imu_params or {} + self._cuvslam = None + self._tracker = None + self._orb_fallback = ORBVisualOdometry() + + try: + import cuvslam # type: ignore # only available on Jetson + self._cuvslam = cuvslam + self._init_tracker() + logger.info("CuVSLAMVisualOdometry: cuVSLAM SDK loaded (Jetson mode)") + except ImportError: + logger.info("CuVSLAMVisualOdometry: cuVSLAM not available — using ORB fallback (dev/CI mode)") + + def _init_tracker(self): + """Initialise cuVSLAM tracker in Inertial mode.""" + if self._cuvslam is None: + return + try: + cam = self._camera_params + rig_params = self._cuvslam.CameraRigParams() + if cam is not None: + f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width) + cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0 + cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0 + rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics( + fx=f_px, fy=f_px, cx=cx, cy=cy, + width=cam.resolution_width, height=cam.resolution_height, + ) + tracker_params = self._cuvslam.TrackerParams() + tracker_params.use_imu = True + tracker_params.imu_noise_model = self._cuvslam.ImuNoiseModel( + accel_noise=self._imu_params.get("accel_noise", 0.01), + gyro_noise=self._imu_params.get("gyro_noise", 0.001), + ) + self._tracker = self._cuvslam.Tracker(rig_params, tracker_params) + logger.info("cuVSLAM tracker initialised in Inertial mode") + except Exception as exc: + logger.error("cuVSLAM tracker init failed: %s", exc) + self._cuvslam = None + + @property + def _has_cuvslam(self) -> bool: + return self._cuvslam is not None and self._tracker is not None + + # ------------------------------------------------------------------ + # ISequentialVisualOdometry interface — delegate to cuVSLAM or ORB + # ------------------------------------------------------------------ + + def extract_features(self, image: np.ndarray) -> Features: + return self._orb_fallback.extract_features(image) + + def match_features(self, features1: Features, features2: Features) -> Matches: + return self._orb_fallback.match_features(features1, features2) + + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: + return self._orb_fallback.estimate_motion(matches, camera_params) + + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> Optional[RelativePose]: + if self._has_cuvslam: + return self._compute_via_cuvslam(curr_image, camera_params) + # Dev/CI fallback — ORB with scale_ambiguous still marked False to signal + # this class is *intended* as the metric backend (ESKF provides scale externally) + pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params) + if pose is None: + return None + return RelativePose( + translation=pose.translation, + rotation=pose.rotation, + confidence=pose.confidence, + inlier_count=pose.inlier_count, + total_matches=pose.total_matches, + tracking_good=pose.tracking_good, + scale_ambiguous=False, # VO-04: cuVSLAM Inertial = metric; ESKF provides scale ref on dev + ) + + def _compute_via_cuvslam(self, image: np.ndarray, camera_params: CameraParameters) -> Optional[RelativePose]: + """Run cuVSLAM tracking step and convert result to RelativePose.""" + try: + result = self._tracker.track(image) + if result is None or not result.tracking_ok: + return None + R = np.array(result.rotation).reshape(3, 3) + t = np.array(result.translation) + return RelativePose( + translation=t, + rotation=R, + confidence=float(getattr(result, "confidence", 1.0)), + inlier_count=int(getattr(result, "inlier_count", 100)), + total_matches=int(getattr(result, "total_matches", 100)), + tracking_good=True, + scale_ambiguous=False, # VO-04: cuVSLAM Inertial mode = metric NED + ) + except Exception as exc: + logger.error("cuVSLAM tracking step failed: %s", exc) + return None + + +# --------------------------------------------------------------------------- +# CuVSLAMMonoDepthVisualOdometry — cuVSLAM Mono-Depth mode (sprint 1 production) +# --------------------------------------------------------------------------- +# TODO(sprint 2): collapse duplicated SDK-load / _init_tracker scaffolding with +# CuVSLAMVisualOdometry once Inertial mode is removed. Kept separate for sprint 1 +# so the Inertial → Mono-Depth migration is reversible. + +# Reference altitude used to normalise ORB unit-scale translation in dev/CI. +# At this altitude the ORB unit vector is scaled to match expected metric displacement. +_MONO_DEPTH_REFERENCE_ALTITUDE_M = 600.0 + + +class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry): + """cuVSLAM Mono-Depth wrapper — barometer altitude as synthetic depth. + + Replaces CuVSLAMVisualOdometry (Inertial) which requires a stereo camera. + cuVSLAM Mono-Depth accepts a depth hint (barometric altitude) to recover + metric scale from a single nadir camera. + + On dev/CI (no cuVSLAM SDK): falls back to ORBVisualOdometry and scales + translation by depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M so that + the dev/CI metric magnitude is consistent with the Jetson production output. + + Note — solution.md records OdometryMode=INERTIAL which requires stereo. + This class uses OdometryMode=MONO_DEPTH, the correct mode for our hardware. + Decision recorded in docs/superpowers/specs/2026-04-18-oss-stack-tech-audit-design.md. + """ + + def __init__( + self, + depth_hint_m: float = _MONO_DEPTH_REFERENCE_ALTITUDE_M, + camera_params: Optional[CameraParameters] = None, + imu_params: Optional[dict] = None, + ): + self._depth_hint_m = depth_hint_m + self._camera_params = camera_params + self._imu_params = imu_params or {} + self._cuvslam = None + self._tracker = None + self._orb_fallback = ORBVisualOdometry() + + try: + import cuvslam # type: ignore + self._cuvslam = cuvslam + self._init_tracker() + logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM SDK loaded (Jetson Mono-Depth mode)") + except ImportError: + logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM not available — using scaled ORB fallback") + + def update_depth_hint(self, depth_hint_m: float) -> None: + """Update barometric altitude used for scale recovery. Call each frame.""" + self._depth_hint_m = max(depth_hint_m, 1.0) + + def _init_tracker(self) -> None: + if self._cuvslam is None: + return + try: + cam = self._camera_params + rig_params = self._cuvslam.CameraRigParams() + if cam is not None: + f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width) + cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0 + cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0 + rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics( + fx=f_px, fy=f_px, cx=cx, cy=cy, + width=cam.resolution_width, height=cam.resolution_height, + ) + tracker_params = self._cuvslam.TrackerParams() + tracker_params.use_imu = False + tracker_params.odometry_mode = self._cuvslam.OdometryMode.MONO_DEPTH + self._tracker = self._cuvslam.Tracker(rig_params, tracker_params) + logger.info("cuVSLAM tracker initialised in Mono-Depth mode") + except Exception: + logger.exception( + "cuVSLAM Mono-Depth tracker init FAILED — falling back to ORB. " + "Production Jetson path is DISABLED until this is fixed." + ) + self._cuvslam = None + + @property + def _has_cuvslam(self) -> bool: + return self._cuvslam is not None and self._tracker is not None + + def extract_features(self, image: np.ndarray) -> Features: + return self._orb_fallback.extract_features(image) + + def match_features(self, features1: Features, features2: Features) -> Matches: + return self._orb_fallback.match_features(features1, features2) + + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: + return self._orb_fallback.estimate_motion(matches, camera_params) + + def compute_relative_pose( + self, + prev_image: np.ndarray, + curr_image: np.ndarray, + camera_params: CameraParameters, + ) -> Optional[RelativePose]: + if self._has_cuvslam: + return self._compute_via_cuvslam(curr_image) + return self._compute_via_orb_scaled(prev_image, curr_image, camera_params) + + def _compute_via_cuvslam(self, image: np.ndarray) -> Optional[RelativePose]: + try: + result = self._tracker.track(image, depth_hint=self._depth_hint_m) + if result is None or not result.tracking_ok: + return None + return RelativePose( + translation=np.array(result.translation), + rotation=np.array(result.rotation).reshape(3, 3), + confidence=float(getattr(result, "confidence", 1.0)), + inlier_count=int(getattr(result, "inlier_count", 100)), + total_matches=int(getattr(result, "total_matches", 100)), + tracking_good=True, + scale_ambiguous=False, + ) + except Exception: + logger.exception("cuVSLAM Mono-Depth tracking step failed — frame dropped") + return None + + def _compute_via_orb_scaled( + self, + prev_image: np.ndarray, + curr_image: np.ndarray, + camera_params: CameraParameters, + ) -> Optional[RelativePose]: + """Dev/CI fallback: ORB translation scaled by depth_hint_m.""" + pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params) + if pose is None: + return None + scale = self._depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M + return RelativePose( + translation=pose.translation * scale, + rotation=pose.rotation, + confidence=pose.confidence, + inlier_count=pose.inlier_count, + total_matches=pose.total_matches, + tracking_good=pose.tracking_good, + scale_ambiguous=False, + ) diff --git a/src/gps_denied/components/vio/native/__init__.py b/src/gps_denied/components/vio/native/__init__.py new file mode 100644 index 0000000..8c98e7f --- /dev/null +++ b/src/gps_denied/components/vio/native/__init__.py @@ -0,0 +1,6 @@ +"""Native bridge code for cuvslam SDK (placeholder). + +Phase 1: empty. Future stages may add ctypes/Cython wrappers around the +cuvslam wheel if version-skew or platform-specific glue is needed. For +now the SDK is imported directly in cuvslam_backend.py. +"""