feat(01-03): move ORB + SequentialVO into components/vio/orbslam_backend.py

- Extract SequentialVisualOdometry and ORBVisualOdometry from core/vo.py
  into a dedicated pure-Python OpenCV backend module.
- Module deliberately does NOT import cuvslam — keeps optional-SDK
  isolation from the cuvslam backend (Plan 01-03 Task 1).
- Both classes inherit from the components.vio.protocol.ISequentialVisualOdometry
  Protocol alias (Plan 01-02 surface).
- Barrel-export both classes from components/vio/__init__.py.
- core/vo.py is unchanged in this commit; the shim wires up in Task 4.
This commit is contained in:
Yuzviak
2026-05-10 22:59:03 +03:00
parent e13df36c9a
commit d9895acb77
2 changed files with 289 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
"""VIO component (ARCH-01).
Public surface for visual-inertial odometry adapters. Phase-1 split of
the legacy ``core/vo.py`` monolith into per-backend modules:
- protocol.py — VisualOdometry Protocol (alias ISequentialVisualOdometry)
- orbslam_backend.py — pure-Python OpenCV: SequentialVisualOdometry + ORBVisualOdometry
- cuvslam_backend.py — Jetson cuVSLAM SDK bridge: CuVSLAMVisualOdometry + CuVSLAMMonoDepthVisualOdometry
- factory.py — create_vo_backend env-aware DI seed
- native/ — placeholder for future cuvslam SDK native glue
The legacy ``gps_denied.core.vo`` import path is preserved as a thin
re-export shim for one phase; tests still import from there.
"""
from gps_denied.components.vio.protocol import (
ISequentialVisualOdometry,
VisualOdometry,
)
from gps_denied.components.vio.orbslam_backend import (
ORBVisualOdometry,
SequentialVisualOdometry,
)
__all__ = [
"VisualOdometry",
"ISequentialVisualOdometry",
"ORBVisualOdometry",
"SequentialVisualOdometry",
]
@@ -0,0 +1,260 @@
"""Pure-Python OpenCV VO backends (ARCH-01 / ARCH-05).
Houses the two OpenCV-only VO implementations that have no native SDK
dependency:
- SequentialVisualOdometry — SuperPoint + LightGlue (TRT on Jetson / Mock on dev)
- ORBVisualOdometry — OpenCV ORB + BFMatcher (dev/CI stub, VO-02)
Both implement the ``VisualOdometry`` Protocol (alias
``ISequentialVisualOdometry``) defined in ``components.vio.protocol``. This
module deliberately does NOT import ``cuvslam`` — the cuVSLAM-bridge
backends live in ``components.vio.cuvslam_backend`` and keep that
optional-import block isolated.
"""
from __future__ import annotations
import logging
from typing import Optional
import cv2
import numpy as np
from gps_denied.components.vio.protocol import ISequentialVisualOdometry
from gps_denied.core.models import IModelManager
from gps_denied.schemas import CameraParameters
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
logger = logging.getLogger(__name__)
class SequentialVisualOdometry(ISequentialVisualOdometry):
"""Frame-to-frame visual odometry using SuperPoint + LightGlue."""
def __init__(self, model_manager: IModelManager):
self.model_manager = model_manager
def extract_features(self, image: np.ndarray) -> Features:
"""Extracts keypoints and descriptors using SuperPoint."""
engine = self.model_manager.get_inference_engine("SuperPoint")
result = engine.infer(image)
return Features(
keypoints=result["keypoints"],
descriptors=result["descriptors"],
scores=result["scores"]
)
def match_features(self, features1: Features, features2: Features) -> Matches:
"""Matches features using LightGlue."""
engine = self.model_manager.get_inference_engine("LightGlue")
result = engine.infer({
"features1": features1,
"features2": features2
})
return Matches(
matches=result["matches"],
scores=result["scores"],
keypoints1=result["keypoints1"],
keypoints2=result["keypoints2"]
)
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None:
"""Estimates camera motion using Essential Matrix (RANSAC)."""
inlier_threshold = 20
if len(matches.matches) < 8:
return None
pts1 = np.ascontiguousarray(matches.keypoints1)
pts2 = np.ascontiguousarray(matches.keypoints2)
# Build camera matrix
f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width)
if camera_params.principal_point:
cx, cy = camera_params.principal_point
else:
cx = camera_params.resolution_width / 2.0
cy = camera_params.resolution_height / 2.0
K = np.array([
[f_px, 0, cx],
[0, f_px, cy],
[0, 0, 1]
], dtype=np.float64)
try:
E, inliers = cv2.findEssentialMat(
pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0
)
except Exception as e:
logger.error(f"Error finding essential matrix: {e}")
return None
if E is None or E.shape != (3, 3):
return None
inliers_mask = inliers.flatten().astype(bool)
inlier_count = np.sum(inliers_mask)
if inlier_count < inlier_threshold:
logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}")
return None
# Recover pose
try:
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
except Exception as e:
logger.error(f"Error recovering pose: {e}")
return None
return Motion(
translation=t.flatten(),
rotation=R,
inliers=inliers_mask,
inlier_count=inlier_count
)
def compute_relative_pose(
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
) -> RelativePose | None:
"""Computes relative pose between two frames."""
f1 = self.extract_features(prev_image)
f2 = self.extract_features(curr_image)
matches = self.match_features(f1, f2)
motion = self.estimate_motion(matches, camera_params)
if motion is None:
return None
tracking_good = motion.inlier_count > 50
return RelativePose(
translation=motion.translation,
rotation=motion.rotation,
confidence=float(motion.inlier_count / max(1, len(matches.matches))),
inlier_count=motion.inlier_count,
total_matches=len(matches.matches),
tracking_good=tracking_good,
scale_ambiguous=True,
)
# ---------------------------------------------------------------------------
# ORBVisualOdometry — OpenCV ORB stub for dev/CI (VO-02)
# ---------------------------------------------------------------------------
class ORBVisualOdometry(ISequentialVisualOdometry):
"""OpenCV ORB-based VO stub for x86 dev/CI environments.
Satisfies the same ISequentialVisualOdometry interface as the cuVSLAM wrapper.
Translation is unit-scale (scale_ambiguous=True) — metric scale requires ESKF.
"""
_MIN_INLIERS = 20
_N_FEATURES = 2000
def __init__(self):
self._orb = cv2.ORB_create(nfeatures=self._N_FEATURES)
self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# ------------------------------------------------------------------
# ISequentialVisualOdometry interface
# ------------------------------------------------------------------
def extract_features(self, image: np.ndarray) -> Features:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image
kps, descs = self._orb.detectAndCompute(gray, None)
if descs is None or len(kps) == 0:
return Features(
keypoints=np.zeros((0, 2), dtype=np.float32),
descriptors=np.zeros((0, 32), dtype=np.uint8),
scores=np.zeros(0, dtype=np.float32),
)
pts = np.array([[k.pt[0], k.pt[1]] for k in kps], dtype=np.float32)
scores = np.array([k.response for k in kps], dtype=np.float32)
return Features(keypoints=pts, descriptors=descs.astype(np.float32), scores=scores)
def match_features(self, features1: Features, features2: Features) -> Matches:
if len(features1.keypoints) == 0 or len(features2.keypoints) == 0:
return Matches(
matches=np.zeros((0, 2), dtype=np.int32),
scores=np.zeros(0, dtype=np.float32),
keypoints1=np.zeros((0, 2), dtype=np.float32),
keypoints2=np.zeros((0, 2), dtype=np.float32),
)
d1 = features1.descriptors.astype(np.uint8)
d2 = features2.descriptors.astype(np.uint8)
raw = self._matcher.knnMatch(d1, d2, k=2)
# Lowe ratio test
good = []
for pair in raw:
if len(pair) == 2 and pair[0].distance < 0.75 * pair[1].distance:
good.append(pair[0])
if not good:
return Matches(
matches=np.zeros((0, 2), dtype=np.int32),
scores=np.zeros(0, dtype=np.float32),
keypoints1=np.zeros((0, 2), dtype=np.float32),
keypoints2=np.zeros((0, 2), dtype=np.float32),
)
idx = np.array([[m.queryIdx, m.trainIdx] for m in good], dtype=np.int32)
scores = np.array([1.0 / (1.0 + m.distance) for m in good], dtype=np.float32)
kp1 = features1.keypoints[idx[:, 0]]
kp2 = features2.keypoints[idx[:, 1]]
return Matches(matches=idx, scores=scores, keypoints1=kp1, keypoints2=kp2)
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
if len(matches.matches) < 8:
return None
pts1 = np.ascontiguousarray(matches.keypoints1, dtype=np.float64)
pts2 = np.ascontiguousarray(matches.keypoints2, dtype=np.float64)
f_px = camera_params.focal_length * (
camera_params.resolution_width / camera_params.sensor_width
)
cx = (camera_params.principal_point[0]
if camera_params.principal_point
else camera_params.resolution_width / 2.0)
cy = (camera_params.principal_point[1]
if camera_params.principal_point
else camera_params.resolution_height / 2.0)
K = np.array([[f_px, 0, cx], [0, f_px, cy], [0, 0, 1]], dtype=np.float64)
try:
E, inliers = cv2.findEssentialMat(pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
except Exception as exc:
logger.warning("ORB findEssentialMat failed: %s", exc)
return None
if E is None or E.shape != (3, 3) or inliers is None:
return None
inlier_mask = inliers.flatten().astype(bool)
inlier_count = int(np.sum(inlier_mask))
if inlier_count < self._MIN_INLIERS:
return None
try:
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
except Exception as exc:
logger.warning("ORB recoverPose failed: %s", exc)
return None
return Motion(translation=t.flatten(), rotation=R, inliers=inlier_mask, inlier_count=inlier_count)
def compute_relative_pose(
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
) -> Optional[RelativePose]:
f1 = self.extract_features(prev_image)
f2 = self.extract_features(curr_image)
matches = self.match_features(f1, f2)
motion = self.estimate_motion(matches, camera_params)
if motion is None:
return None
tracking_good = motion.inlier_count >= self._MIN_INLIERS
return RelativePose(
translation=motion.translation,
rotation=motion.rotation,
confidence=float(motion.inlier_count / max(1, len(matches.matches))),
inlier_count=motion.inlier_count,
total_matches=len(matches.matches),
tracking_good=tracking_good,
scale_ambiguous=True, # monocular ORB cannot recover metric scale
)