diff --git a/src/gps_denied/components/vio/__init__.py b/src/gps_denied/components/vio/__init__.py index e69de29..f23864f 100644 --- a/src/gps_denied/components/vio/__init__.py +++ b/src/gps_denied/components/vio/__init__.py @@ -0,0 +1,29 @@ +"""VIO component (ARCH-01). + +Public surface for visual-inertial odometry adapters. Phase-1 split of +the legacy ``core/vo.py`` monolith into per-backend modules: + + - protocol.py — VisualOdometry Protocol (alias ISequentialVisualOdometry) + - orbslam_backend.py — pure-Python OpenCV: SequentialVisualOdometry + ORBVisualOdometry + - cuvslam_backend.py — Jetson cuVSLAM SDK bridge: CuVSLAMVisualOdometry + CuVSLAMMonoDepthVisualOdometry + - factory.py — create_vo_backend env-aware DI seed + - native/ — placeholder for future cuvslam SDK native glue + +The legacy ``gps_denied.core.vo`` import path is preserved as a thin +re-export shim for one phase; tests still import from there. +""" +from gps_denied.components.vio.protocol import ( + ISequentialVisualOdometry, + VisualOdometry, +) +from gps_denied.components.vio.orbslam_backend import ( + ORBVisualOdometry, + SequentialVisualOdometry, +) + +__all__ = [ + "VisualOdometry", + "ISequentialVisualOdometry", + "ORBVisualOdometry", + "SequentialVisualOdometry", +] diff --git a/src/gps_denied/components/vio/orbslam_backend.py b/src/gps_denied/components/vio/orbslam_backend.py new file mode 100644 index 0000000..6ef5b56 --- /dev/null +++ b/src/gps_denied/components/vio/orbslam_backend.py @@ -0,0 +1,260 @@ +"""Pure-Python OpenCV VO backends (ARCH-01 / ARCH-05). + +Houses the two OpenCV-only VO implementations that have no native SDK +dependency: + + - SequentialVisualOdometry — SuperPoint + LightGlue (TRT on Jetson / Mock on dev) + - ORBVisualOdometry — OpenCV ORB + BFMatcher (dev/CI stub, VO-02) + +Both implement the ``VisualOdometry`` Protocol (alias +``ISequentialVisualOdometry``) defined in ``components.vio.protocol``. This +module deliberately does NOT import ``cuvslam`` — the cuVSLAM-bridge +backends live in ``components.vio.cuvslam_backend`` and keep that +optional-import block isolated. +""" +from __future__ import annotations + +import logging +from typing import Optional + +import cv2 +import numpy as np + +from gps_denied.components.vio.protocol import ISequentialVisualOdometry +from gps_denied.core.models import IModelManager +from gps_denied.schemas import CameraParameters +from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose + +logger = logging.getLogger(__name__) + + +class SequentialVisualOdometry(ISequentialVisualOdometry): + """Frame-to-frame visual odometry using SuperPoint + LightGlue.""" + + def __init__(self, model_manager: IModelManager): + self.model_manager = model_manager + + def extract_features(self, image: np.ndarray) -> Features: + """Extracts keypoints and descriptors using SuperPoint.""" + engine = self.model_manager.get_inference_engine("SuperPoint") + result = engine.infer(image) + + return Features( + keypoints=result["keypoints"], + descriptors=result["descriptors"], + scores=result["scores"] + ) + + def match_features(self, features1: Features, features2: Features) -> Matches: + """Matches features using LightGlue.""" + engine = self.model_manager.get_inference_engine("LightGlue") + result = engine.infer({ + "features1": features1, + "features2": features2 + }) + + return Matches( + matches=result["matches"], + scores=result["scores"], + keypoints1=result["keypoints1"], + keypoints2=result["keypoints2"] + ) + + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None: + """Estimates camera motion using Essential Matrix (RANSAC).""" + inlier_threshold = 20 + if len(matches.matches) < 8: + return None + + pts1 = np.ascontiguousarray(matches.keypoints1) + pts2 = np.ascontiguousarray(matches.keypoints2) + + # Build camera matrix + f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width) + if camera_params.principal_point: + cx, cy = camera_params.principal_point + else: + cx = camera_params.resolution_width / 2.0 + cy = camera_params.resolution_height / 2.0 + + K = np.array([ + [f_px, 0, cx], + [0, f_px, cy], + [0, 0, 1] + ], dtype=np.float64) + + try: + E, inliers = cv2.findEssentialMat( + pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0 + ) + except Exception as e: + logger.error(f"Error finding essential matrix: {e}") + return None + + if E is None or E.shape != (3, 3): + return None + + inliers_mask = inliers.flatten().astype(bool) + inlier_count = np.sum(inliers_mask) + + if inlier_count < inlier_threshold: + logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}") + return None + + # Recover pose + try: + _, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers) + except Exception as e: + logger.error(f"Error recovering pose: {e}") + return None + + return Motion( + translation=t.flatten(), + rotation=R, + inliers=inliers_mask, + inlier_count=inlier_count + ) + + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> RelativePose | None: + """Computes relative pose between two frames.""" + f1 = self.extract_features(prev_image) + f2 = self.extract_features(curr_image) + + matches = self.match_features(f1, f2) + + motion = self.estimate_motion(matches, camera_params) + + if motion is None: + return None + + tracking_good = motion.inlier_count > 50 + + return RelativePose( + translation=motion.translation, + rotation=motion.rotation, + confidence=float(motion.inlier_count / max(1, len(matches.matches))), + inlier_count=motion.inlier_count, + total_matches=len(matches.matches), + tracking_good=tracking_good, + scale_ambiguous=True, + ) + + +# --------------------------------------------------------------------------- +# ORBVisualOdometry — OpenCV ORB stub for dev/CI (VO-02) +# --------------------------------------------------------------------------- + +class ORBVisualOdometry(ISequentialVisualOdometry): + """OpenCV ORB-based VO stub for x86 dev/CI environments. + + Satisfies the same ISequentialVisualOdometry interface as the cuVSLAM wrapper. + Translation is unit-scale (scale_ambiguous=True) — metric scale requires ESKF. + """ + + _MIN_INLIERS = 20 + _N_FEATURES = 2000 + + def __init__(self): + self._orb = cv2.ORB_create(nfeatures=self._N_FEATURES) + self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) + + # ------------------------------------------------------------------ + # ISequentialVisualOdometry interface + # ------------------------------------------------------------------ + + def extract_features(self, image: np.ndarray) -> Features: + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image + kps, descs = self._orb.detectAndCompute(gray, None) + if descs is None or len(kps) == 0: + return Features( + keypoints=np.zeros((0, 2), dtype=np.float32), + descriptors=np.zeros((0, 32), dtype=np.uint8), + scores=np.zeros(0, dtype=np.float32), + ) + pts = np.array([[k.pt[0], k.pt[1]] for k in kps], dtype=np.float32) + scores = np.array([k.response for k in kps], dtype=np.float32) + return Features(keypoints=pts, descriptors=descs.astype(np.float32), scores=scores) + + def match_features(self, features1: Features, features2: Features) -> Matches: + if len(features1.keypoints) == 0 or len(features2.keypoints) == 0: + return Matches( + matches=np.zeros((0, 2), dtype=np.int32), + scores=np.zeros(0, dtype=np.float32), + keypoints1=np.zeros((0, 2), dtype=np.float32), + keypoints2=np.zeros((0, 2), dtype=np.float32), + ) + d1 = features1.descriptors.astype(np.uint8) + d2 = features2.descriptors.astype(np.uint8) + raw = self._matcher.knnMatch(d1, d2, k=2) + # Lowe ratio test + good = [] + for pair in raw: + if len(pair) == 2 and pair[0].distance < 0.75 * pair[1].distance: + good.append(pair[0]) + if not good: + return Matches( + matches=np.zeros((0, 2), dtype=np.int32), + scores=np.zeros(0, dtype=np.float32), + keypoints1=np.zeros((0, 2), dtype=np.float32), + keypoints2=np.zeros((0, 2), dtype=np.float32), + ) + idx = np.array([[m.queryIdx, m.trainIdx] for m in good], dtype=np.int32) + scores = np.array([1.0 / (1.0 + m.distance) for m in good], dtype=np.float32) + kp1 = features1.keypoints[idx[:, 0]] + kp2 = features2.keypoints[idx[:, 1]] + return Matches(matches=idx, scores=scores, keypoints1=kp1, keypoints2=kp2) + + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: + if len(matches.matches) < 8: + return None + pts1 = np.ascontiguousarray(matches.keypoints1, dtype=np.float64) + pts2 = np.ascontiguousarray(matches.keypoints2, dtype=np.float64) + f_px = camera_params.focal_length * ( + camera_params.resolution_width / camera_params.sensor_width + ) + cx = (camera_params.principal_point[0] + if camera_params.principal_point + else camera_params.resolution_width / 2.0) + cy = (camera_params.principal_point[1] + if camera_params.principal_point + else camera_params.resolution_height / 2.0) + K = np.array([[f_px, 0, cx], [0, f_px, cy], [0, 0, 1]], dtype=np.float64) + try: + E, inliers = cv2.findEssentialMat(pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0) + except Exception as exc: + logger.warning("ORB findEssentialMat failed: %s", exc) + return None + if E is None or E.shape != (3, 3) or inliers is None: + return None + inlier_mask = inliers.flatten().astype(bool) + inlier_count = int(np.sum(inlier_mask)) + if inlier_count < self._MIN_INLIERS: + return None + try: + _, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers) + except Exception as exc: + logger.warning("ORB recoverPose failed: %s", exc) + return None + return Motion(translation=t.flatten(), rotation=R, inliers=inlier_mask, inlier_count=inlier_count) + + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> Optional[RelativePose]: + f1 = self.extract_features(prev_image) + f2 = self.extract_features(curr_image) + matches = self.match_features(f1, f2) + motion = self.estimate_motion(matches, camera_params) + if motion is None: + return None + tracking_good = motion.inlier_count >= self._MIN_INLIERS + return RelativePose( + translation=motion.translation, + rotation=motion.rotation, + confidence=float(motion.inlier_count / max(1, len(matches.matches))), + inlier_count=motion.inlier_count, + total_matches=len(matches.matches), + tracking_good=tracking_good, + scale_ambiguous=True, # monocular ORB cannot recover metric scale + )