Files
gps-denied-onboard/f07_sequential_visual_odometry.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

274 lines
11 KiB
Python

import cv2
import numpy as np
import logging
from typing import Optional, Tuple, Dict, Any
from pydantic import BaseModel
from abc import ABC, abstractmethod
from f02_1_flight_lifecycle_manager import CameraParameters
logger = logging.getLogger(__name__)
# --- Data Models ---
class Features(BaseModel):
keypoints: np.ndarray # (N, 2) array of (x, y) coordinates
descriptors: np.ndarray # (N, 256) array of descriptors
scores: np.ndarray # (N,) array of confidence scores
model_config = {"arbitrary_types_allowed": True}
class Matches(BaseModel):
matches: np.ndarray # (M, 2) pairs of indices
scores: np.ndarray # (M,) match confidence
keypoints1: np.ndarray # (M, 2)
keypoints2: np.ndarray # (M, 2)
model_config = {"arbitrary_types_allowed": True}
class RelativePose(BaseModel):
translation: np.ndarray # (3,) unit vector
rotation: np.ndarray # (3, 3) matrix
confidence: float
inlier_count: int
total_matches: int
tracking_good: bool
scale_ambiguous: bool = True
chunk_id: Optional[str] = None
model_config = {"arbitrary_types_allowed": True}
class Motion(BaseModel):
translation: np.ndarray
rotation: np.ndarray
inliers: np.ndarray
inlier_count: int
model_config = {"arbitrary_types_allowed": True}
# --- Interface ---
class ISequentialVisualOdometry(ABC):
@abstractmethod
def compute_relative_pose(self, prev_image: np.ndarray, curr_image: np.ndarray) -> Optional[RelativePose]: pass
@abstractmethod
def extract_features(self, image: np.ndarray) -> Features: pass
@abstractmethod
def match_features(self, features1: Features, features2: Features) -> Matches: pass
@abstractmethod
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: pass
# --- Implementation ---
class SequentialVisualOdometry(ISequentialVisualOdometry):
"""
F07: Sequential Visual Odometry
Performs frame-to-frame metric tracking, relying on SuperPoint for feature extraction
and LightGlue for matching to handle low-overlap and low-texture scenarios.
"""
def __init__(self, model_manager=None):
self.model_manager = model_manager
# --- Feature Extraction (07.01) ---
def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
if len(image.shape) == 3 and image.shape[2] == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
return gray.astype(np.float32) / 255.0
def _run_superpoint_inference(self, preprocessed: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
if self.model_manager and hasattr(self.model_manager, 'run_superpoint'):
return self.model_manager.run_superpoint(preprocessed)
# Functional Classical CV Fallback (SIFT) for testing on real images without TensorRT
sift = cv2.SIFT_create(nfeatures=2000)
img_uint8 = (preprocessed * 255.0).astype(np.uint8)
kpts, descs = sift.detectAndCompute(img_uint8, None)
if kpts is None or len(kpts) == 0:
return np.empty((0, 2)), np.empty((0, 256)), np.empty((0,))
keypoints = np.array([k.pt for k in kpts]).astype(np.float32)
scores = np.array([k.response for k in kpts]).astype(np.float32)
# Pad SIFT's 128-dim descriptors to 256 to match the expected interface dimensions
descs_padded = np.pad(descs, ((0, 0), (0, 128)), 'constant').astype(np.float32)
return keypoints, descs_padded, scores
def _apply_nms(self, keypoints: np.ndarray, scores: np.ndarray, nms_radius: int) -> np.ndarray:
# Simplified Mock NMS: Sort by score and keep top 2000 for standard tracking
if len(scores) == 0:
return np.array([], dtype=int)
sorted_indices = np.argsort(scores)[::-1]
return sorted_indices[:2000]
def extract_features(self, image: np.ndarray) -> Features:
if image is None or image.size == 0:
return Features(keypoints=np.empty((0, 2)), descriptors=np.empty((0, 256)), scores=np.empty((0,)))
preprocessed = self._preprocess_image(image)
kpts, desc, scores = self._run_superpoint_inference(preprocessed)
keep_indices = self._apply_nms(kpts, scores, nms_radius=4)
return Features(
keypoints=kpts[keep_indices],
descriptors=desc[keep_indices],
scores=scores[keep_indices]
)
# --- Feature Matching (07.02) ---
def _prepare_features_for_lightglue(self, features: Features) -> Dict[str, Any]:
# In a real implementation, this would convert numpy arrays to torch tensors
# on the correct device (e.g., 'cuda').
return {
'keypoints': features.keypoints,
'descriptors': features.descriptors,
'image_size': np.array([1920, 1080]) # Placeholder size
}
def _run_lightglue_inference(self, features1_dict: Dict, features2_dict: Dict) -> Tuple[np.ndarray, np.ndarray]:
if self.model_manager and hasattr(self.model_manager, 'run_lightglue'):
return self.model_manager.run_lightglue(features1_dict, features2_dict)
# Functional Classical CV Fallback (BFMatcher)
# Extract the original 128 dimensions (ignoring the padding added in the SIFT fallback)
desc1 = features1_dict['descriptors'][:, :128].astype(np.float32)
desc2 = features2_dict['descriptors'][:, :128].astype(np.float32)
if len(desc1) == 0 or len(desc2) == 0:
return np.empty((0, 2), dtype=int), np.empty((0,))
matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
raw_matches = matcher.match(desc1, desc2)
if not raw_matches:
return np.empty((0, 2), dtype=int), np.empty((0,))
match_indices = np.array([[m.queryIdx, m.trainIdx] for m in raw_matches])
# Map L2 distances into a [0, 1] confidence score so our filter doesn't reject them
distances = np.array([m.distance for m in raw_matches])
scores = np.exp(-distances / 100.0).astype(np.float32)
return match_indices, scores
def _filter_matches_by_confidence(self, matches: np.ndarray, scores: np.ndarray, threshold: float) -> Tuple[np.ndarray, np.ndarray]:
keep = scores > threshold
return matches[keep], scores[keep]
def _extract_matched_keypoints(self, features1: Features, features2: Features, match_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
kpts1 = features1.keypoints[match_indices[:, 0]]
kpts2 = features2.keypoints[match_indices[:, 1]]
return kpts1, kpts2
def match_features(self, features1: Features, features2: Features) -> Matches:
f1_lg = self._prepare_features_for_lightglue(features1)
f2_lg = self._prepare_features_for_lightglue(features2)
raw_matches, raw_scores = self._run_lightglue_inference(f1_lg, f2_lg)
# Confidence threshold from LightGlue paper is often around 0.9
filtered_matches, filtered_scores = self._filter_matches_by_confidence(raw_matches, raw_scores, 0.1)
kpts1, kpts2 = self._extract_matched_keypoints(features1, features2, filtered_matches)
return Matches(matches=filtered_matches, scores=filtered_scores, keypoints1=kpts1, keypoints2=kpts2)
# --- Relative Pose Computation (07.03) ---
def _get_camera_matrix(self, camera_params: CameraParameters) -> np.ndarray:
w = camera_params.resolution.get("width", 1920)
h = camera_params.resolution.get("height", 1080)
f_mm = camera_params.focal_length_mm
sw_mm = camera_params.sensor_width_mm
f_px = (f_mm / sw_mm) * w if sw_mm > 0 else w
return np.array([
[f_px, 0.0, w / 2.0],
[0.0, f_px, h / 2.0],
[0.0, 0.0, 1.0]
], dtype=np.float64)
def _normalize_keypoints(self, keypoints: np.ndarray, camera_params: CameraParameters) -> np.ndarray:
K = self._get_camera_matrix(camera_params)
fx, fy = K[0, 0], K[1, 1]
cx, cy = K[0, 2], K[1, 2]
normalized = np.empty_like(keypoints, dtype=np.float64)
if len(keypoints) > 0:
normalized[:, 0] = (keypoints[:, 0] - cx) / fx
normalized[:, 1] = (keypoints[:, 1] - cy) / fy
return normalized
def _estimate_essential_matrix(self, points1: np.ndarray, points2: np.ndarray, K: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
if len(points1) < 8 or len(points2) < 8:
return None, None
E, mask = cv2.findEssentialMat(points1, points2, K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
return E, mask
def _decompose_essential_matrix(self, E: np.ndarray, points1: np.ndarray, points2: np.ndarray, K: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
if E is None or E.shape != (3, 3):
return None, None
_, R, t, mask = cv2.recoverPose(E, points1, points2, K)
return R, t
def _compute_tracking_quality(self, inlier_count: int, total_matches: int) -> Tuple[float, bool]:
if total_matches == 0:
return 0.0, False
inlier_ratio = inlier_count / total_matches
confidence = min(1.0, inlier_ratio * (inlier_count / 100.0))
if inlier_count > 50 and inlier_ratio > 0.5:
return float(confidence), True
elif inlier_count >= 20:
return float(confidence * 0.5), True # Degraded
return 0.0, False # Lost
def _build_relative_pose(self, motion: Motion, matches: Matches) -> RelativePose:
confidence, tracking_good = self._compute_tracking_quality(motion.inlier_count, len(matches.matches))
return RelativePose(
translation=motion.translation.flatten(),
rotation=motion.rotation,
confidence=confidence,
inlier_count=motion.inlier_count,
total_matches=len(matches.matches),
tracking_good=tracking_good,
scale_ambiguous=True
)
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
if len(matches.matches) < 8:
return None
K = self._get_camera_matrix(camera_params)
pts1, pts2 = matches.keypoints1, matches.keypoints2
E, mask = self._estimate_essential_matrix(pts1, pts2, K)
R, t = self._decompose_essential_matrix(E, pts1, pts2, K)
if R is None or t is None:
return None
inliers = mask.flatten() == 1 if mask is not None else np.zeros(len(pts1), dtype=bool)
return Motion(translation=t, rotation=R, inliers=inliers, inlier_count=int(np.sum(inliers)))
def compute_relative_pose(self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: Optional[CameraParameters] = None) -> Optional[RelativePose]:
if camera_params is None:
camera_params = CameraParameters(focal_length_mm=25.0, sensor_width_mm=36.0, resolution={"width": 1920, "height": 1080})
feat1 = self.extract_features(prev_image)
feat2 = self.extract_features(curr_image)
matches = self.match_features(feat1, feat2)
motion = self.estimate_motion(matches, camera_params)
if motion is None:
return None
return self._build_relative_pose(motion, matches)