mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:11:13 +00:00
refactor(01-03): replace core/vo.py with re-export shim to components/vio
- core/vo.py is now ~30 LOC of pure re-exports from
components/vio/{protocol, orbslam_backend, cuvslam_backend, factory}.
- All 8 public symbols (VisualOdometry, ISequentialVisualOdometry,
ORBVisualOdometry, SequentialVisualOdometry, CuVSLAMVisualOdometry,
CuVSLAMMonoDepthVisualOdometry, create_vo_backend, _CUVSLAM_AVAILABLE)
remain importable from the legacy path with class identity preserved
(re-export, not redefinition — isinstance checks still hold).
- tests/test_vo.py: 22/22 passing unchanged. No test files edited.
- Shim is removed in Phase 2 when TEST-01 reorganizes test taxonomy.
This commit is contained in:
+30
-573
@@ -1,575 +1,32 @@
|
||||
"""Sequential Visual Odometry (Component F07).
|
||||
"""Legacy import path for VIO. Phase 1 shim — code lives in components/vio/.
|
||||
|
||||
Three concrete backends:
|
||||
- SequentialVisualOdometry — SuperPoint + LightGlue (TRT on Jetson / Mock on dev)
|
||||
- ORBVisualOdometry — OpenCV ORB + BFMatcher (dev/CI stub, VO-02)
|
||||
- CuVSLAMVisualOdometry — NVIDIA cuVSLAM Inertial mode (Jetson only, VO-01)
|
||||
|
||||
Factory: create_vo_backend() selects the right one at runtime.
|
||||
This shim preserves ``from gps_denied.core.vo import ...`` for tests that
|
||||
were green at the start of Phase 1. Future phases may migrate test
|
||||
imports to the new path; the shim is removed in Phase 2 (TEST-01
|
||||
reorganization).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from gps_denied.core.models import IModelManager
|
||||
from gps_denied.schemas import CameraParameters
|
||||
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ISequentialVisualOdometry(ABC):
|
||||
@abstractmethod
|
||||
def compute_relative_pose(
|
||||
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
|
||||
) -> RelativePose | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def extract_features(self, image: np.ndarray) -> Features:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def match_features(self, features1: Features, features2: Features) -> Matches:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None:
|
||||
pass
|
||||
|
||||
|
||||
class SequentialVisualOdometry(ISequentialVisualOdometry):
|
||||
"""Frame-to-frame visual odometry using SuperPoint + LightGlue."""
|
||||
|
||||
def __init__(self, model_manager: IModelManager):
|
||||
self.model_manager = model_manager
|
||||
|
||||
def extract_features(self, image: np.ndarray) -> Features:
|
||||
"""Extracts keypoints and descriptors using SuperPoint."""
|
||||
engine = self.model_manager.get_inference_engine("SuperPoint")
|
||||
result = engine.infer(image)
|
||||
|
||||
return Features(
|
||||
keypoints=result["keypoints"],
|
||||
descriptors=result["descriptors"],
|
||||
scores=result["scores"]
|
||||
)
|
||||
|
||||
def match_features(self, features1: Features, features2: Features) -> Matches:
|
||||
"""Matches features using LightGlue."""
|
||||
engine = self.model_manager.get_inference_engine("LightGlue")
|
||||
result = engine.infer({
|
||||
"features1": features1,
|
||||
"features2": features2
|
||||
})
|
||||
|
||||
return Matches(
|
||||
matches=result["matches"],
|
||||
scores=result["scores"],
|
||||
keypoints1=result["keypoints1"],
|
||||
keypoints2=result["keypoints2"]
|
||||
)
|
||||
|
||||
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None:
|
||||
"""Estimates camera motion using Essential Matrix (RANSAC)."""
|
||||
inlier_threshold = 20
|
||||
if len(matches.matches) < 8:
|
||||
return None
|
||||
|
||||
pts1 = np.ascontiguousarray(matches.keypoints1)
|
||||
pts2 = np.ascontiguousarray(matches.keypoints2)
|
||||
|
||||
# Build camera matrix
|
||||
f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width)
|
||||
if camera_params.principal_point:
|
||||
cx, cy = camera_params.principal_point
|
||||
else:
|
||||
cx = camera_params.resolution_width / 2.0
|
||||
cy = camera_params.resolution_height / 2.0
|
||||
|
||||
K = np.array([
|
||||
[f_px, 0, cx],
|
||||
[0, f_px, cy],
|
||||
[0, 0, 1]
|
||||
], dtype=np.float64)
|
||||
|
||||
try:
|
||||
E, inliers = cv2.findEssentialMat(
|
||||
pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding essential matrix: {e}")
|
||||
return None
|
||||
|
||||
if E is None or E.shape != (3, 3):
|
||||
return None
|
||||
|
||||
inliers_mask = inliers.flatten().astype(bool)
|
||||
inlier_count = np.sum(inliers_mask)
|
||||
|
||||
if inlier_count < inlier_threshold:
|
||||
logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}")
|
||||
return None
|
||||
|
||||
# Recover pose
|
||||
try:
|
||||
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
|
||||
except Exception as e:
|
||||
logger.error(f"Error recovering pose: {e}")
|
||||
return None
|
||||
|
||||
return Motion(
|
||||
translation=t.flatten(),
|
||||
rotation=R,
|
||||
inliers=inliers_mask,
|
||||
inlier_count=inlier_count
|
||||
)
|
||||
|
||||
def compute_relative_pose(
|
||||
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
|
||||
) -> RelativePose | None:
|
||||
"""Computes relative pose between two frames."""
|
||||
f1 = self.extract_features(prev_image)
|
||||
f2 = self.extract_features(curr_image)
|
||||
|
||||
matches = self.match_features(f1, f2)
|
||||
|
||||
motion = self.estimate_motion(matches, camera_params)
|
||||
|
||||
if motion is None:
|
||||
return None
|
||||
|
||||
tracking_good = motion.inlier_count > 50
|
||||
|
||||
return RelativePose(
|
||||
translation=motion.translation,
|
||||
rotation=motion.rotation,
|
||||
confidence=float(motion.inlier_count / max(1, len(matches.matches))),
|
||||
inlier_count=motion.inlier_count,
|
||||
total_matches=len(matches.matches),
|
||||
tracking_good=tracking_good,
|
||||
scale_ambiguous=True,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORBVisualOdometry — OpenCV ORB stub for dev/CI (VO-02)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class ORBVisualOdometry(ISequentialVisualOdometry):
|
||||
"""OpenCV ORB-based VO stub for x86 dev/CI environments.
|
||||
|
||||
Satisfies the same ISequentialVisualOdometry interface as the cuVSLAM wrapper.
|
||||
Translation is unit-scale (scale_ambiguous=True) — metric scale requires ESKF.
|
||||
"""
|
||||
|
||||
_MIN_INLIERS = 20
|
||||
_N_FEATURES = 2000
|
||||
|
||||
def __init__(self):
|
||||
self._orb = cv2.ORB_create(nfeatures=self._N_FEATURES)
|
||||
self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# ISequentialVisualOdometry interface
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def extract_features(self, image: np.ndarray) -> Features:
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image
|
||||
kps, descs = self._orb.detectAndCompute(gray, None)
|
||||
if descs is None or len(kps) == 0:
|
||||
return Features(
|
||||
keypoints=np.zeros((0, 2), dtype=np.float32),
|
||||
descriptors=np.zeros((0, 32), dtype=np.uint8),
|
||||
scores=np.zeros(0, dtype=np.float32),
|
||||
)
|
||||
pts = np.array([[k.pt[0], k.pt[1]] for k in kps], dtype=np.float32)
|
||||
scores = np.array([k.response for k in kps], dtype=np.float32)
|
||||
return Features(keypoints=pts, descriptors=descs.astype(np.float32), scores=scores)
|
||||
|
||||
def match_features(self, features1: Features, features2: Features) -> Matches:
|
||||
if len(features1.keypoints) == 0 or len(features2.keypoints) == 0:
|
||||
return Matches(
|
||||
matches=np.zeros((0, 2), dtype=np.int32),
|
||||
scores=np.zeros(0, dtype=np.float32),
|
||||
keypoints1=np.zeros((0, 2), dtype=np.float32),
|
||||
keypoints2=np.zeros((0, 2), dtype=np.float32),
|
||||
)
|
||||
d1 = features1.descriptors.astype(np.uint8)
|
||||
d2 = features2.descriptors.astype(np.uint8)
|
||||
raw = self._matcher.knnMatch(d1, d2, k=2)
|
||||
# Lowe ratio test
|
||||
good = []
|
||||
for pair in raw:
|
||||
if len(pair) == 2 and pair[0].distance < 0.75 * pair[1].distance:
|
||||
good.append(pair[0])
|
||||
if not good:
|
||||
return Matches(
|
||||
matches=np.zeros((0, 2), dtype=np.int32),
|
||||
scores=np.zeros(0, dtype=np.float32),
|
||||
keypoints1=np.zeros((0, 2), dtype=np.float32),
|
||||
keypoints2=np.zeros((0, 2), dtype=np.float32),
|
||||
)
|
||||
idx = np.array([[m.queryIdx, m.trainIdx] for m in good], dtype=np.int32)
|
||||
scores = np.array([1.0 / (1.0 + m.distance) for m in good], dtype=np.float32)
|
||||
kp1 = features1.keypoints[idx[:, 0]]
|
||||
kp2 = features2.keypoints[idx[:, 1]]
|
||||
return Matches(matches=idx, scores=scores, keypoints1=kp1, keypoints2=kp2)
|
||||
|
||||
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
|
||||
if len(matches.matches) < 8:
|
||||
return None
|
||||
pts1 = np.ascontiguousarray(matches.keypoints1, dtype=np.float64)
|
||||
pts2 = np.ascontiguousarray(matches.keypoints2, dtype=np.float64)
|
||||
f_px = camera_params.focal_length * (
|
||||
camera_params.resolution_width / camera_params.sensor_width
|
||||
)
|
||||
cx = (camera_params.principal_point[0]
|
||||
if camera_params.principal_point
|
||||
else camera_params.resolution_width / 2.0)
|
||||
cy = (camera_params.principal_point[1]
|
||||
if camera_params.principal_point
|
||||
else camera_params.resolution_height / 2.0)
|
||||
K = np.array([[f_px, 0, cx], [0, f_px, cy], [0, 0, 1]], dtype=np.float64)
|
||||
try:
|
||||
E, inliers = cv2.findEssentialMat(pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
|
||||
except Exception as exc:
|
||||
logger.warning("ORB findEssentialMat failed: %s", exc)
|
||||
return None
|
||||
if E is None or E.shape != (3, 3) or inliers is None:
|
||||
return None
|
||||
inlier_mask = inliers.flatten().astype(bool)
|
||||
inlier_count = int(np.sum(inlier_mask))
|
||||
if inlier_count < self._MIN_INLIERS:
|
||||
return None
|
||||
try:
|
||||
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
|
||||
except Exception as exc:
|
||||
logger.warning("ORB recoverPose failed: %s", exc)
|
||||
return None
|
||||
return Motion(translation=t.flatten(), rotation=R, inliers=inlier_mask, inlier_count=inlier_count)
|
||||
|
||||
def compute_relative_pose(
|
||||
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
|
||||
) -> Optional[RelativePose]:
|
||||
f1 = self.extract_features(prev_image)
|
||||
f2 = self.extract_features(curr_image)
|
||||
matches = self.match_features(f1, f2)
|
||||
motion = self.estimate_motion(matches, camera_params)
|
||||
if motion is None:
|
||||
return None
|
||||
tracking_good = motion.inlier_count >= self._MIN_INLIERS
|
||||
return RelativePose(
|
||||
translation=motion.translation,
|
||||
rotation=motion.rotation,
|
||||
confidence=float(motion.inlier_count / max(1, len(matches.matches))),
|
||||
inlier_count=motion.inlier_count,
|
||||
total_matches=len(matches.matches),
|
||||
tracking_good=tracking_good,
|
||||
scale_ambiguous=True, # monocular ORB cannot recover metric scale
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CuVSLAMVisualOdometry — NVIDIA cuVSLAM Inertial mode (Jetson, VO-01)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class CuVSLAMVisualOdometry(ISequentialVisualOdometry):
|
||||
"""cuVSLAM wrapper for Jetson Orin (Inertial mode).
|
||||
|
||||
Provides metric relative poses in NED (scale_ambiguous=False).
|
||||
Falls back to ORBVisualOdometry internally when the cuVSLAM SDK is absent
|
||||
so the same class can be instantiated on dev/CI with scale_ambiguous reflecting
|
||||
the actual backend capability.
|
||||
|
||||
Usage on Jetson:
|
||||
vo = CuVSLAMVisualOdometry(camera_params, imu_params)
|
||||
pose = vo.compute_relative_pose(prev, curr, cam) # scale_ambiguous=False
|
||||
"""
|
||||
|
||||
def __init__(self, camera_params: Optional[CameraParameters] = None, imu_params: Optional[dict] = None):
|
||||
self._camera_params = camera_params
|
||||
self._imu_params = imu_params or {}
|
||||
self._cuvslam = None
|
||||
self._tracker = None
|
||||
self._orb_fallback = ORBVisualOdometry()
|
||||
|
||||
try:
|
||||
import cuvslam # type: ignore # only available on Jetson
|
||||
self._cuvslam = cuvslam
|
||||
self._init_tracker()
|
||||
logger.info("CuVSLAMVisualOdometry: cuVSLAM SDK loaded (Jetson mode)")
|
||||
except ImportError:
|
||||
logger.info("CuVSLAMVisualOdometry: cuVSLAM not available — using ORB fallback (dev/CI mode)")
|
||||
|
||||
def _init_tracker(self):
|
||||
"""Initialise cuVSLAM tracker in Inertial mode."""
|
||||
if self._cuvslam is None:
|
||||
return
|
||||
try:
|
||||
cam = self._camera_params
|
||||
rig_params = self._cuvslam.CameraRigParams()
|
||||
if cam is not None:
|
||||
f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width)
|
||||
cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0
|
||||
cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0
|
||||
rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics(
|
||||
fx=f_px, fy=f_px, cx=cx, cy=cy,
|
||||
width=cam.resolution_width, height=cam.resolution_height,
|
||||
)
|
||||
tracker_params = self._cuvslam.TrackerParams()
|
||||
tracker_params.use_imu = True
|
||||
tracker_params.imu_noise_model = self._cuvslam.ImuNoiseModel(
|
||||
accel_noise=self._imu_params.get("accel_noise", 0.01),
|
||||
gyro_noise=self._imu_params.get("gyro_noise", 0.001),
|
||||
)
|
||||
self._tracker = self._cuvslam.Tracker(rig_params, tracker_params)
|
||||
logger.info("cuVSLAM tracker initialised in Inertial mode")
|
||||
except Exception as exc:
|
||||
logger.error("cuVSLAM tracker init failed: %s", exc)
|
||||
self._cuvslam = None
|
||||
|
||||
@property
|
||||
def _has_cuvslam(self) -> bool:
|
||||
return self._cuvslam is not None and self._tracker is not None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# ISequentialVisualOdometry interface — delegate to cuVSLAM or ORB
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def extract_features(self, image: np.ndarray) -> Features:
|
||||
return self._orb_fallback.extract_features(image)
|
||||
|
||||
def match_features(self, features1: Features, features2: Features) -> Matches:
|
||||
return self._orb_fallback.match_features(features1, features2)
|
||||
|
||||
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
|
||||
return self._orb_fallback.estimate_motion(matches, camera_params)
|
||||
|
||||
def compute_relative_pose(
|
||||
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
|
||||
) -> Optional[RelativePose]:
|
||||
if self._has_cuvslam:
|
||||
return self._compute_via_cuvslam(curr_image, camera_params)
|
||||
# Dev/CI fallback — ORB with scale_ambiguous still marked False to signal
|
||||
# this class is *intended* as the metric backend (ESKF provides scale externally)
|
||||
pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params)
|
||||
if pose is None:
|
||||
return None
|
||||
return RelativePose(
|
||||
translation=pose.translation,
|
||||
rotation=pose.rotation,
|
||||
confidence=pose.confidence,
|
||||
inlier_count=pose.inlier_count,
|
||||
total_matches=pose.total_matches,
|
||||
tracking_good=pose.tracking_good,
|
||||
scale_ambiguous=False, # VO-04: cuVSLAM Inertial = metric; ESKF provides scale ref on dev
|
||||
)
|
||||
|
||||
def _compute_via_cuvslam(self, image: np.ndarray, camera_params: CameraParameters) -> Optional[RelativePose]:
|
||||
"""Run cuVSLAM tracking step and convert result to RelativePose."""
|
||||
try:
|
||||
result = self._tracker.track(image)
|
||||
if result is None or not result.tracking_ok:
|
||||
return None
|
||||
R = np.array(result.rotation).reshape(3, 3)
|
||||
t = np.array(result.translation)
|
||||
return RelativePose(
|
||||
translation=t,
|
||||
rotation=R,
|
||||
confidence=float(getattr(result, "confidence", 1.0)),
|
||||
inlier_count=int(getattr(result, "inlier_count", 100)),
|
||||
total_matches=int(getattr(result, "total_matches", 100)),
|
||||
tracking_good=True,
|
||||
scale_ambiguous=False, # VO-04: cuVSLAM Inertial mode = metric NED
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error("cuVSLAM tracking step failed: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CuVSLAMMonoDepthVisualOdometry — cuVSLAM Mono-Depth mode (sprint 1 production)
|
||||
# ---------------------------------------------------------------------------
|
||||
# TODO(sprint 2): collapse duplicated SDK-load / _init_tracker scaffolding with
|
||||
# CuVSLAMVisualOdometry once Inertial mode is removed. Kept separate for sprint 1
|
||||
# so the Inertial → Mono-Depth migration is reversible.
|
||||
|
||||
# Reference altitude used to normalise ORB unit-scale translation in dev/CI.
|
||||
# At this altitude the ORB unit vector is scaled to match expected metric displacement.
|
||||
_MONO_DEPTH_REFERENCE_ALTITUDE_M = 600.0
|
||||
|
||||
|
||||
class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry):
|
||||
"""cuVSLAM Mono-Depth wrapper — barometer altitude as synthetic depth.
|
||||
|
||||
Replaces CuVSLAMVisualOdometry (Inertial) which requires a stereo camera.
|
||||
cuVSLAM Mono-Depth accepts a depth hint (barometric altitude) to recover
|
||||
metric scale from a single nadir camera.
|
||||
|
||||
On dev/CI (no cuVSLAM SDK): falls back to ORBVisualOdometry and scales
|
||||
translation by depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M so that
|
||||
the dev/CI metric magnitude is consistent with the Jetson production output.
|
||||
|
||||
Note — solution.md records OdometryMode=INERTIAL which requires stereo.
|
||||
This class uses OdometryMode=MONO_DEPTH, the correct mode for our hardware.
|
||||
Decision recorded in docs/superpowers/specs/2026-04-18-oss-stack-tech-audit-design.md.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
depth_hint_m: float = _MONO_DEPTH_REFERENCE_ALTITUDE_M,
|
||||
camera_params: Optional[CameraParameters] = None,
|
||||
imu_params: Optional[dict] = None,
|
||||
):
|
||||
self._depth_hint_m = depth_hint_m
|
||||
self._camera_params = camera_params
|
||||
self._imu_params = imu_params or {}
|
||||
self._cuvslam = None
|
||||
self._tracker = None
|
||||
self._orb_fallback = ORBVisualOdometry()
|
||||
|
||||
try:
|
||||
import cuvslam # type: ignore
|
||||
self._cuvslam = cuvslam
|
||||
self._init_tracker()
|
||||
logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM SDK loaded (Jetson Mono-Depth mode)")
|
||||
except ImportError:
|
||||
logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM not available — using scaled ORB fallback")
|
||||
|
||||
def update_depth_hint(self, depth_hint_m: float) -> None:
|
||||
"""Update barometric altitude used for scale recovery. Call each frame."""
|
||||
self._depth_hint_m = max(depth_hint_m, 1.0)
|
||||
|
||||
def _init_tracker(self) -> None:
|
||||
if self._cuvslam is None:
|
||||
return
|
||||
try:
|
||||
cam = self._camera_params
|
||||
rig_params = self._cuvslam.CameraRigParams()
|
||||
if cam is not None:
|
||||
f_px = cam.focal_length * (cam.resolution_width / cam.sensor_width)
|
||||
cx = cam.principal_point[0] if cam.principal_point else cam.resolution_width / 2.0
|
||||
cy = cam.principal_point[1] if cam.principal_point else cam.resolution_height / 2.0
|
||||
rig_params.cameras[0].intrinsics = self._cuvslam.CameraIntrinsics(
|
||||
fx=f_px, fy=f_px, cx=cx, cy=cy,
|
||||
width=cam.resolution_width, height=cam.resolution_height,
|
||||
)
|
||||
tracker_params = self._cuvslam.TrackerParams()
|
||||
tracker_params.use_imu = False
|
||||
tracker_params.odometry_mode = self._cuvslam.OdometryMode.MONO_DEPTH
|
||||
self._tracker = self._cuvslam.Tracker(rig_params, tracker_params)
|
||||
logger.info("cuVSLAM tracker initialised in Mono-Depth mode")
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"cuVSLAM Mono-Depth tracker init FAILED — falling back to ORB. "
|
||||
"Production Jetson path is DISABLED until this is fixed."
|
||||
)
|
||||
self._cuvslam = None
|
||||
|
||||
@property
|
||||
def _has_cuvslam(self) -> bool:
|
||||
return self._cuvslam is not None and self._tracker is not None
|
||||
|
||||
def extract_features(self, image: np.ndarray) -> Features:
|
||||
return self._orb_fallback.extract_features(image)
|
||||
|
||||
def match_features(self, features1: Features, features2: Features) -> Matches:
|
||||
return self._orb_fallback.match_features(features1, features2)
|
||||
|
||||
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
|
||||
return self._orb_fallback.estimate_motion(matches, camera_params)
|
||||
|
||||
def compute_relative_pose(
|
||||
self,
|
||||
prev_image: np.ndarray,
|
||||
curr_image: np.ndarray,
|
||||
camera_params: CameraParameters,
|
||||
) -> Optional[RelativePose]:
|
||||
if self._has_cuvslam:
|
||||
return self._compute_via_cuvslam(curr_image)
|
||||
return self._compute_via_orb_scaled(prev_image, curr_image, camera_params)
|
||||
|
||||
def _compute_via_cuvslam(self, image: np.ndarray) -> Optional[RelativePose]:
|
||||
try:
|
||||
result = self._tracker.track(image, depth_hint=self._depth_hint_m)
|
||||
if result is None or not result.tracking_ok:
|
||||
return None
|
||||
return RelativePose(
|
||||
translation=np.array(result.translation),
|
||||
rotation=np.array(result.rotation).reshape(3, 3),
|
||||
confidence=float(getattr(result, "confidence", 1.0)),
|
||||
inlier_count=int(getattr(result, "inlier_count", 100)),
|
||||
total_matches=int(getattr(result, "total_matches", 100)),
|
||||
tracking_good=True,
|
||||
scale_ambiguous=False,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("cuVSLAM Mono-Depth tracking step failed — frame dropped")
|
||||
return None
|
||||
|
||||
def _compute_via_orb_scaled(
|
||||
self,
|
||||
prev_image: np.ndarray,
|
||||
curr_image: np.ndarray,
|
||||
camera_params: CameraParameters,
|
||||
) -> Optional[RelativePose]:
|
||||
"""Dev/CI fallback: ORB translation scaled by depth_hint_m."""
|
||||
pose = self._orb_fallback.compute_relative_pose(prev_image, curr_image, camera_params)
|
||||
if pose is None:
|
||||
return None
|
||||
scale = self._depth_hint_m / _MONO_DEPTH_REFERENCE_ALTITUDE_M
|
||||
return RelativePose(
|
||||
translation=pose.translation * scale,
|
||||
rotation=pose.rotation,
|
||||
confidence=pose.confidence,
|
||||
inlier_count=pose.inlier_count,
|
||||
total_matches=pose.total_matches,
|
||||
tracking_good=pose.tracking_good,
|
||||
scale_ambiguous=False,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Factory — selects appropriate VO backend at runtime
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_vo_backend(
|
||||
model_manager: Optional[IModelManager] = None,
|
||||
prefer_cuvslam: bool = True,
|
||||
prefer_mono_depth: bool = False,
|
||||
camera_params: Optional[CameraParameters] = None,
|
||||
imu_params: Optional[dict] = None,
|
||||
depth_hint_m: float = 600.0,
|
||||
) -> ISequentialVisualOdometry:
|
||||
"""Return the best available VO backend for the current platform.
|
||||
|
||||
Priority when prefer_mono_depth=True:
|
||||
1. CuVSLAMMonoDepthVisualOdometry (sprint 1 production path)
|
||||
2. ORBVisualOdometry (dev/CI fallback inside Mono-Depth wrapper)
|
||||
|
||||
Priority when prefer_mono_depth=False (legacy):
|
||||
1. CuVSLAMVisualOdometry (Jetson — cuVSLAM SDK present)
|
||||
2. SequentialVisualOdometry (TRT/Mock SuperPoint+LightGlue)
|
||||
3. ORBVisualOdometry (pure OpenCV fallback)
|
||||
"""
|
||||
if prefer_mono_depth:
|
||||
return CuVSLAMMonoDepthVisualOdometry(
|
||||
depth_hint_m=depth_hint_m,
|
||||
camera_params=camera_params,
|
||||
imu_params=imu_params,
|
||||
)
|
||||
|
||||
if prefer_cuvslam:
|
||||
vo = CuVSLAMVisualOdometry(camera_params=camera_params, imu_params=imu_params)
|
||||
if vo._has_cuvslam:
|
||||
return vo
|
||||
|
||||
if model_manager is not None:
|
||||
return SequentialVisualOdometry(model_manager)
|
||||
|
||||
return ORBVisualOdometry()
|
||||
from gps_denied.components.vio.cuvslam_backend import (
|
||||
_CUVSLAM_AVAILABLE,
|
||||
CuVSLAMMonoDepthVisualOdometry,
|
||||
CuVSLAMVisualOdometry,
|
||||
)
|
||||
from gps_denied.components.vio.factory import create_vo_backend
|
||||
from gps_denied.components.vio.orbslam_backend import (
|
||||
ORBVisualOdometry,
|
||||
SequentialVisualOdometry,
|
||||
)
|
||||
from gps_denied.components.vio.protocol import (
|
||||
ISequentialVisualOdometry,
|
||||
VisualOdometry,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"VisualOdometry",
|
||||
"ISequentialVisualOdometry",
|
||||
"ORBVisualOdometry",
|
||||
"SequentialVisualOdometry",
|
||||
"CuVSLAMVisualOdometry",
|
||||
"CuVSLAMMonoDepthVisualOdometry",
|
||||
"create_vo_backend",
|
||||
"_CUVSLAM_AVAILABLE",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user