From bae8587c51c9d521b95e27d497201136e9df6e72 Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Sun, 10 May 2026 23:01:17 +0300 Subject: [PATCH] refactor(01-03): replace core/vo.py with re-export shim to components/vio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - core/vo.py is now ~30 LOC of pure re-exports from components/vio/{protocol, orbslam_backend, cuvslam_backend, factory}. - All 8 public symbols (VisualOdometry, ISequentialVisualOdometry, ORBVisualOdometry, SequentialVisualOdometry, CuVSLAMVisualOdometry, CuVSLAMMonoDepthVisualOdometry, create_vo_backend, _CUVSLAM_AVAILABLE) remain importable from the legacy path with class identity preserved (re-export, not redefinition — isinstance checks still hold). - tests/test_vo.py: 22/22 passing unchanged. No test files edited. - Shim is removed in Phase 2 when TEST-01 reorganizes test taxonomy. --- src/gps_denied/core/vo.py | 603 ++------------------------------------ 1 file changed, 30 insertions(+), 573 deletions(-) diff --git a/src/gps_denied/core/vo.py b/src/gps_denied/core/vo.py index f0831c0..ba20b25 100644 --- a/src/gps_denied/core/vo.py +++ b/src/gps_denied/core/vo.py @@ -1,575 +1,32 @@ -"""Sequential Visual Odometry (Component F07). +"""Legacy import path for VIO. Phase 1 shim — code lives in components/vio/. -Three concrete backends: - - SequentialVisualOdometry — SuperPoint + LightGlue (TRT on Jetson / Mock on dev) - - ORBVisualOdometry — OpenCV ORB + BFMatcher (dev/CI stub, VO-02) - - CuVSLAMVisualOdometry — NVIDIA cuVSLAM Inertial mode (Jetson only, VO-01) - -Factory: create_vo_backend() selects the right one at runtime. +This shim preserves ``from gps_denied.core.vo import ...`` for tests that +were green at the start of Phase 1. Future phases may migrate test +imports to the new path; the shim is removed in Phase 2 (TEST-01 +reorganization). """ - -import logging -from abc import ABC, abstractmethod -from typing import Optional - -import cv2 -import numpy as np - -from gps_denied.core.models import IModelManager -from gps_denied.schemas import CameraParameters -from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose - -logger = logging.getLogger(__name__) - - -class ISequentialVisualOdometry(ABC): - @abstractmethod - def compute_relative_pose( - self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters - ) -> RelativePose | None: - pass - - @abstractmethod - def extract_features(self, image: np.ndarray) -> Features: - pass - - @abstractmethod - def match_features(self, features1: Features, features2: Features) -> Matches: - pass - - @abstractmethod - def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None: - pass - - -class SequentialVisualOdometry(ISequentialVisualOdometry): - """Frame-to-frame visual odometry using SuperPoint + LightGlue.""" - - def __init__(self, model_manager: IModelManager): - self.model_manager = model_manager - - def extract_features(self, image: np.ndarray) -> Features: - """Extracts keypoints and descriptors using SuperPoint.""" - engine = self.model_manager.get_inference_engine("SuperPoint") - result = engine.infer(image) - - return Features( - keypoints=result["keypoints"], - descriptors=result["descriptors"], - scores=result["scores"] - ) - - def match_features(self, features1: Features, features2: Features) -> Matches: - """Matches features using LightGlue.""" - engine = self.model_manager.get_inference_engine("LightGlue") - result = engine.infer({ - "features1": features1, - "features2": features2 - }) - - return Matches( - matches=result["matches"], - scores=result["scores"], - keypoints1=result["keypoints1"], - keypoints2=result["keypoints2"] - ) - - def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None: - """Estimates camera motion using Essential Matrix (RANSAC).""" - inlier_threshold = 20 - if len(matches.matches) < 8: - return None - - pts1 = np.ascontiguousarray(matches.keypoints1) - pts2 = np.ascontiguousarray(matches.keypoints2) - - # Build camera matrix - f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width) - if camera_params.principal_point: - cx, cy = camera_params.principal_point - else: - cx = camera_params.resolution_width / 2.0 - cy = camera_params.resolution_height / 2.0 - - K = np.array([ - [f_px, 0, cx], - [0, f_px, cy], - [0, 0, 1] - ], dtype=np.float64) - - try: - E, inliers = cv2.findEssentialMat( - pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0 - ) - except Exception as e: - logger.error(f"Error finding essential matrix: {e}") - return None - - if E is None or E.shape != (3, 3): - return None - - inliers_mask = inliers.flatten().astype(bool) - inlier_count = np.sum(inliers_mask) - - if inlier_count < inlier_threshold: - logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}") - return None - - # Recover pose - try: - _, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers) - except Exception as e: - logger.error(f"Error recovering pose: {e}") - return None - - return Motion( - translation=t.flatten(), - rotation=R, - inliers=inliers_mask, - inlier_count=inlier_count - ) - - def compute_relative_pose( - self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters - ) -> RelativePose | None: - """Computes relative pose between two frames.""" - f1 = self.extract_features(prev_image) - f2 = self.extract_features(curr_image) - - matches = self.match_features(f1, f2) - - motion = self.estimate_motion(matches, camera_params) - - if motion is None: - return None - - tracking_good = motion.inlier_count > 50 - - return RelativePose( - translation=motion.translation, - rotation=motion.rotation, - confidence=float(motion.inlier_count / max(1, len(matches.matches))), - inlier_count=motion.inlier_count, - total_matches=len(matches.matches), - tracking_good=tracking_good, - scale_ambiguous=True, - ) - - -# --------------------------------------------------------------------------- -# ORBVisualOdometry — OpenCV ORB stub for dev/CI (VO-02) -# --------------------------------------------------------------------------- - -class ORBVisualOdometry(ISequentialVisualOdometry): - """OpenCV ORB-based VO stub for x86 dev/CI environments. - - Satisfies the same ISequentialVisualOdometry interface as the cuVSLAM wrapper. - Translation is unit-scale (scale_ambiguous=True) — metric scale requires ESKF. - """ - - _MIN_INLIERS = 20 - _N_FEATURES = 2000 - - def __init__(self): - self._orb = cv2.ORB_create(nfeatures=self._N_FEATURES) - self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) - - # ------------------------------------------------------------------ - # ISequentialVisualOdometry interface - # ------------------------------------------------------------------ - - def extract_features(self, image: np.ndarray) -> Features: - gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image - kps, descs = self._orb.detectAndCompute(gray, None) - if descs is None or len(kps) == 0: - return Features( - keypoints=np.zeros((0, 2), dtype=np.float32), - descriptors=np.zeros((0, 32), dtype=np.uint8), - scores=np.zeros(0, dtype=np.float32), - ) - pts = np.array([[k.pt[0], k.pt[1]] for k in kps], dtype=np.float32) - scores = np.array([k.response for k in kps], dtype=np.float32) - return Features(keypoints=pts, descriptors=descs.astype(np.float32), scores=scores) - - def match_features(self, features1: Features, features2: Features) -> Matches: - if len(features1.keypoints) == 0 or len(features2.keypoints) == 0: - return Matches( - matches=np.zeros((0, 2), dtype=np.int32), - scores=np.zeros(0, dtype=np.float32), - keypoints1=np.zeros((0, 2), dtype=np.float32), - keypoints2=np.zeros((0, 2), dtype=np.float32), - ) - d1 = features1.descriptors.astype(np.uint8) - d2 = features2.descriptors.astype(np.uint8) - raw = self._matcher.knnMatch(d1, d2, k=2) - # Lowe ratio test - good = [] - for pair in raw: - if len(pair) == 2 and pair[0].distance < 0.75 * pair[1].distance: - good.append(pair[0]) - if not good: - return Matches( - matches=np.zeros((0, 2), dtype=np.int32), - scores=np.zeros(0, dtype=np.float32), - keypoints1=np.zeros((0, 2), dtype=np.float32), - keypoints2=np.zeros((0, 2), dtype=np.float32), - ) - idx = np.array([[m.queryIdx, m.trainIdx] for m in good], dtype=np.int32) - scores = np.array([1.0 / (1.0 + m.distance) for m in good], dtype=np.float32) - kp1 = features1.keypoints[idx[:, 0]] - kp2 = features2.keypoints[idx[:, 1]] - return Matches(matches=idx, scores=scores, keypoints1=kp1, keypoints2=kp2) - - def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: - if len(matches.matches) < 8: - return None - pts1 = np.ascontiguousarray(matches.keypoints1, dtype=np.float64) - pts2 = np.ascontiguousarray(matches.keypoints2, dtype=np.float64) - f_px = camera_params.focal_length * ( - camera_params.resolution_width / camera_params.sensor_width - ) - cx = (camera_params.principal_point[0] - if camera_params.principal_point - else camera_params.resolution_width / 2.0) - cy = (camera_params.principal_point[1] - if camera_params.principal_point - else camera_params.resolution_height / 2.0) - K = np.array([[f_px, 0, cx], [0, f_px, cy], [0, 0, 1]], dtype=np.float64) - try: - E, inliers = cv2.findEssentialMat(pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0) - except Exception as exc: - logger.warning("ORB findEssentialMat failed: %s", exc) - return None - if E is None or E.shape != (3, 3) or inliers is None: - return None - inlier_mask = inliers.flatten().astype(bool) - inlier_count = int(np.sum(inlier_mask)) - if inlier_count < self._MIN_INLIERS: - return None - try: - _, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers) - except Exception as exc: - logger.warning("ORB recoverPose failed: %s", exc) - return None - return Motion(translation=t.flatten(), rotation=R, inliers=inlier_mask, inlier_count=inlier_count) - - def compute_relative_pose( - self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters - ) -> Optional[RelativePose]: - f1 = self.extract_features(prev_image) - f2 = self.extract_features(curr_image) - matches = self.match_features(f1, f2) - motion = self.estimate_motion(matches, camera_params) - if motion is None: - return None - tracking_good = motion.inlier_count >= self._MIN_INLIERS - return RelativePose( - translation=motion.translation, - rotation=motion.rotation, - confidence=float(motion.inlier_count / max(1, len(matches.matches))), - inlier_count=motion.inlier_count, - total_matches=len(matches.matches), - tracking_good=tracking_good, - scale_ambiguous=True, # monocular ORB cannot recover metric scale - ) - - -# --------------------------------------------------------------------------- -# CuVSLAMVisualOdometry — NVIDIA cuVSLAM Inertial mode (Jetson, VO-01) -# --------------------------------------------------------------------------- - -class CuVSLAMVisualOdometry(ISequentialVisualOdometry): - """cuVSLAM wrapper for Jetson Orin (Inertial mode). - - Provides metric relative poses in NED (scale_ambiguous=False). - Falls back to ORBVisualOdometry internally when the cuVSLAM SDK is absent - so the same class can be instantiated on dev/CI with scale_ambiguous reflecting - the actual backend capability. - - Usage on Jetson: - vo = CuVSLAMVisualOdometry(camera_params, imu_params) - pose = vo.compute_relative_pose(prev, curr, cam) # scale_ambiguous=False - """ - - def __init__(self, camera_params: Optional[CameraParameters] = None, imu_params: Optional[dict] = None): - self._camera_params = camera_params - self._imu_params = imu_params or {} - self._cuvslam = None - self._tracker = None - self._orb_fallback = ORBVisualOdometry() - - try: - import cuvslam # type: ignore # only available on Jetson - self._cuvslam = cuvslam - self._init_tracker() - logger.info("CuVSLAMVisualOdometry: cuVSLAM SDK loaded (Jetson mode)") - except ImportError: - logger.info("CuVSLAMVisualOdometry: cuVSLAM not available — using ORB fallback (dev/CI mode)") - - def _init_tracker(self): - """Initialise cuVSLAM tracker in Inertial mode.""" - if self._cuvslam is None: - return - try: - cam = self._camera_params - rig_params = self._cuvslam.CameraRigParams() - if cam is not None: - f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width) - cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0 - cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0 - rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics( - fx=f_px, fy=f_px, cx=cx, cy=cy, - width=cam.resolution_width, height=cam.resolution_height, - ) - tracker_params = self._cuvslam.TrackerParams() - tracker_params.use_imu = True - tracker_params.imu_noise_model = self._cuvslam.ImuNoiseModel( - accel_noise=self._imu_params.get("accel_noise", 0.01), - gyro_noise=self._imu_params.get("gyro_noise", 0.001), - ) - self._tracker = self._cuvslam.Tracker(rig_params, tracker_params) - logger.info("cuVSLAM tracker initialised in Inertial mode") - except Exception as exc: - logger.error("cuVSLAM tracker init failed: %s", exc) - self._cuvslam = None - - @property - def _has_cuvslam(self) -> bool: - return self._cuvslam is not None and self._tracker is not None - - # ------------------------------------------------------------------ - # ISequentialVisualOdometry interface — delegate to cuVSLAM or ORB - # ------------------------------------------------------------------ - - def extract_features(self, image: np.ndarray) -> Features: - return self._orb_fallback.extract_features(image) - - def match_features(self, features1: Features, features2: Features) -> Matches: - return self._orb_fallback.match_features(features1, features2) - - def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: - return self._orb_fallback.estimate_motion(matches, camera_params) - - def compute_relative_pose( - self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters - ) -> Optional[RelativePose]: - if self._has_cuvslam: - return self._compute_via_cuvslam(curr_image, camera_params) - # Dev/CI fallback — ORB with scale_ambiguous still marked False to signal - # this class is *intended* as the metric backend (ESKF provides scale externally) - pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params) - if pose is None: - return None - return RelativePose( - translation=pose.translation, - rotation=pose.rotation, - confidence=pose.confidence, - inlier_count=pose.inlier_count, - total_matches=pose.total_matches, - tracking_good=pose.tracking_good, - scale_ambiguous=False, # VO-04: cuVSLAM Inertial = metric; ESKF provides scale ref on dev - ) - - def _compute_via_cuvslam(self, image: np.ndarray, camera_params: CameraParameters) -> Optional[RelativePose]: - """Run cuVSLAM tracking step and convert result to RelativePose.""" - try: - result = self._tracker.track(image) - if result is None or not result.tracking_ok: - return None - R = np.array(result.rotation).reshape(3, 3) - t = np.array(result.translation) - return RelativePose( - translation=t, - rotation=R, - confidence=float(getattr(result, "confidence", 1.0)), - inlier_count=int(getattr(result, "inlier_count", 100)), - total_matches=int(getattr(result, "total_matches", 100)), - tracking_good=True, - scale_ambiguous=False, # VO-04: cuVSLAM Inertial mode = metric NED - ) - except Exception as exc: - logger.error("cuVSLAM tracking step failed: %s", exc) - return None - - -# --------------------------------------------------------------------------- -# CuVSLAMMonoDepthVisualOdometry — cuVSLAM Mono-Depth mode (sprint 1 production) -# --------------------------------------------------------------------------- -# TODO(sprint 2): collapse duplicated SDK-load / _init_tracker scaffolding with -# CuVSLAMVisualOdometry once Inertial mode is removed. Kept separate for sprint 1 -# so the Inertial → Mono-Depth migration is reversible. - -# Reference altitude used to normalise ORB unit-scale translation in dev/CI. -# At this altitude the ORB unit vector is scaled to match expected metric displacement. -_MONO_DEPTH_REFERENCE_ALTITUDE_M = 600.0 - - -class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry): - """cuVSLAM Mono-Depth wrapper — barometer altitude as synthetic depth. - - Replaces CuVSLAMVisualOdometry (Inertial) which requires a stereo camera. - cuVSLAM Mono-Depth accepts a depth hint (barometric altitude) to recover - metric scale from a single nadir camera. - - On dev/CI (no cuVSLAM SDK): falls back to ORBVisualOdometry and scales - translation by depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M so that - the dev/CI metric magnitude is consistent with the Jetson production output. - - Note — solution.md records OdometryMode=INERTIAL which requires stereo. - This class uses OdometryMode=MONO_DEPTH, the correct mode for our hardware. - Decision recorded in docs/superpowers/specs/2026-04-18-oss-stack-tech-audit-design.md. - """ - - def __init__( - self, - depth_hint_m: float = _MONO_DEPTH_REFERENCE_ALTITUDE_M, - camera_params: Optional[CameraParameters] = None, - imu_params: Optional[dict] = None, - ): - self._depth_hint_m = depth_hint_m - self._camera_params = camera_params - self._imu_params = imu_params or {} - self._cuvslam = None - self._tracker = None - self._orb_fallback = ORBVisualOdometry() - - try: - import cuvslam # type: ignore - self._cuvslam = cuvslam - self._init_tracker() - logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM SDK loaded (Jetson Mono-Depth mode)") - except ImportError: - logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM not available — using scaled ORB fallback") - - def update_depth_hint(self, depth_hint_m: float) -> None: - """Update barometric altitude used for scale recovery. Call each frame.""" - self._depth_hint_m = max(depth_hint_m, 1.0) - - def _init_tracker(self) -> None: - if self._cuvslam is None: - return - try: - cam = self._camera_params - rig_params = self._cuvslam.CameraRigParams() - if cam is not None: - f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width) - cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0 - cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0 - rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics( - fx=f_px, fy=f_px, cx=cx, cy=cy, - width=cam.resolution_width, height=cam.resolution_height, - ) - tracker_params = self._cuvslam.TrackerParams() - tracker_params.use_imu = False - tracker_params.odometry_mode = self._cuvslam.OdometryMode.MONO_DEPTH - self._tracker = self._cuvslam.Tracker(rig_params, tracker_params) - logger.info("cuVSLAM tracker initialised in Mono-Depth mode") - except Exception: - logger.exception( - "cuVSLAM Mono-Depth tracker init FAILED — falling back to ORB. " - "Production Jetson path is DISABLED until this is fixed." - ) - self._cuvslam = None - - @property - def _has_cuvslam(self) -> bool: - return self._cuvslam is not None and self._tracker is not None - - def extract_features(self, image: np.ndarray) -> Features: - return self._orb_fallback.extract_features(image) - - def match_features(self, features1: Features, features2: Features) -> Matches: - return self._orb_fallback.match_features(features1, features2) - - def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: - return self._orb_fallback.estimate_motion(matches, camera_params) - - def compute_relative_pose( - self, - prev_image: np.ndarray, - curr_image: np.ndarray, - camera_params: CameraParameters, - ) -> Optional[RelativePose]: - if self._has_cuvslam: - return self._compute_via_cuvslam(curr_image) - return self._compute_via_orb_scaled(prev_image, curr_image, camera_params) - - def _compute_via_cuvslam(self, image: np.ndarray) -> Optional[RelativePose]: - try: - result = self._tracker.track(image, depth_hint=self._depth_hint_m) - if result is None or not result.tracking_ok: - return None - return RelativePose( - translation=np.array(result.translation), - rotation=np.array(result.rotation).reshape(3, 3), - confidence=float(getattr(result, "confidence", 1.0)), - inlier_count=int(getattr(result, "inlier_count", 100)), - total_matches=int(getattr(result, "total_matches", 100)), - tracking_good=True, - scale_ambiguous=False, - ) - except Exception: - logger.exception("cuVSLAM Mono-Depth tracking step failed — frame dropped") - return None - - def _compute_via_orb_scaled( - self, - prev_image: np.ndarray, - curr_image: np.ndarray, - camera_params: CameraParameters, - ) -> Optional[RelativePose]: - """Dev/CI fallback: ORB translation scaled by depth_hint_m.""" - pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params) - if pose is None: - return None - scale = self._depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M - return RelativePose( - translation=pose.translation * scale, - rotation=pose.rotation, - confidence=pose.confidence, - inlier_count=pose.inlier_count, - total_matches=pose.total_matches, - tracking_good=pose.tracking_good, - scale_ambiguous=False, - ) - - -# --------------------------------------------------------------------------- -# Factory — selects appropriate VO backend at runtime -# --------------------------------------------------------------------------- - -def create_vo_backend( - model_manager: Optional[IModelManager] = None, - prefer_cuvslam: bool = True, - prefer_mono_depth: bool = False, - camera_params: Optional[CameraParameters] = None, - imu_params: Optional[dict] = None, - depth_hint_m: float = 600.0, -) -> ISequentialVisualOdometry: - """Return the best available VO backend for the current platform. - - Priority when prefer_mono_depth=True: - 1. CuVSLAMMonoDepthVisualOdometry (sprint 1 production path) - 2. ORBVisualOdometry (dev/CI fallback inside Mono-Depth wrapper) - - Priority when prefer_mono_depth=False (legacy): - 1. CuVSLAMVisualOdometry (Jetson — cuVSLAM SDK present) - 2. SequentialVisualOdometry (TRT/Mock SuperPoint+LightGlue) - 3. ORBVisualOdometry (pure OpenCV fallback) - """ - if prefer_mono_depth: - return CuVSLAMMonoDepthVisualOdometry( - depth_hint_m=depth_hint_m, - camera_params=camera_params, - imu_params=imu_params, - ) - - if prefer_cuvslam: - vo = CuVSLAMVisualOdometry(camera_params=camera_params, imu_params=imu_params) - if vo._has_cuvslam: - return vo - - if model_manager is not None: - return SequentialVisualOdometry(model_manager) - - return ORBVisualOdometry() +from gps_denied.components.vio.cuvslam_backend import ( + _CUVSLAM_AVAILABLE, + CuVSLAMMonoDepthVisualOdometry, + CuVSLAMVisualOdometry, +) +from gps_denied.components.vio.factory import create_vo_backend +from gps_denied.components.vio.orbslam_backend import ( + ORBVisualOdometry, + SequentialVisualOdometry, +) +from gps_denied.components.vio.protocol import ( + ISequentialVisualOdometry, + VisualOdometry, +) + +__all__ = [ + "VisualOdometry", + "ISequentialVisualOdometry", + "ORBVisualOdometry", + "SequentialVisualOdometry", + "CuVSLAMVisualOdometry", + "CuVSLAMMonoDepthVisualOdometry", + "create_vo_backend", + "_CUVSLAM_AVAILABLE", +]