mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 06:56:36 +00:00
274 lines
11 KiB
Python
274 lines
11 KiB
Python
import cv2
|
|
import numpy as np
|
|
import logging
|
|
from typing import Optional, Tuple, Dict, Any
|
|
from pydantic import BaseModel
|
|
from abc import ABC, abstractmethod
|
|
|
|
from f02_1_flight_lifecycle_manager import CameraParameters
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Data Models ---
|
|
|
|
class Features(BaseModel):
|
|
keypoints: np.ndarray # (N, 2) array of (x, y) coordinates
|
|
descriptors: np.ndarray # (N, 256) array of descriptors
|
|
scores: np.ndarray # (N,) array of confidence scores
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class Matches(BaseModel):
|
|
matches: np.ndarray # (M, 2) pairs of indices
|
|
scores: np.ndarray # (M,) match confidence
|
|
keypoints1: np.ndarray # (M, 2)
|
|
keypoints2: np.ndarray # (M, 2)
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class RelativePose(BaseModel):
|
|
translation: np.ndarray # (3,) unit vector
|
|
rotation: np.ndarray # (3, 3) matrix
|
|
confidence: float
|
|
inlier_count: int
|
|
total_matches: int
|
|
tracking_good: bool
|
|
scale_ambiguous: bool = True
|
|
chunk_id: Optional[str] = None
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
class Motion(BaseModel):
|
|
translation: np.ndarray
|
|
rotation: np.ndarray
|
|
inliers: np.ndarray
|
|
inlier_count: int
|
|
|
|
model_config = {"arbitrary_types_allowed": True}
|
|
|
|
# --- Interface ---
|
|
|
|
class ISequentialVisualOdometry(ABC):
|
|
@abstractmethod
|
|
def compute_relative_pose(self, prev_image: np.ndarray, curr_image: np.ndarray) -> Optional[RelativePose]: pass
|
|
|
|
@abstractmethod
|
|
def extract_features(self, image: np.ndarray) -> Features: pass
|
|
|
|
@abstractmethod
|
|
def match_features(self, features1: Features, features2: Features) -> Matches: pass
|
|
|
|
@abstractmethod
|
|
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]: pass
|
|
|
|
# --- Implementation ---
|
|
|
|
class SequentialVisualOdometry(ISequentialVisualOdometry):
|
|
"""
|
|
F07: Sequential Visual Odometry
|
|
Performs frame-to-frame metric tracking, relying on SuperPoint for feature extraction
|
|
and LightGlue for matching to handle low-overlap and low-texture scenarios.
|
|
"""
|
|
def __init__(self, model_manager=None):
|
|
self.model_manager = model_manager
|
|
|
|
# --- Feature Extraction (07.01) ---
|
|
|
|
def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
|
|
if len(image.shape) == 3 and image.shape[2] == 3:
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = image
|
|
return gray.astype(np.float32) / 255.0
|
|
|
|
def _run_superpoint_inference(self, preprocessed: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
if self.model_manager and hasattr(self.model_manager, 'run_superpoint'):
|
|
return self.model_manager.run_superpoint(preprocessed)
|
|
|
|
# Functional Classical CV Fallback (SIFT) for testing on real images without TensorRT
|
|
sift = cv2.SIFT_create(nfeatures=2000)
|
|
img_uint8 = (preprocessed * 255.0).astype(np.uint8)
|
|
|
|
kpts, descs = sift.detectAndCompute(img_uint8, None)
|
|
if kpts is None or len(kpts) == 0:
|
|
return np.empty((0, 2)), np.empty((0, 256)), np.empty((0,))
|
|
|
|
keypoints = np.array([k.pt for k in kpts]).astype(np.float32)
|
|
scores = np.array([k.response for k in kpts]).astype(np.float32)
|
|
|
|
# Pad SIFT's 128-dim descriptors to 256 to match the expected interface dimensions
|
|
descs_padded = np.pad(descs, ((0, 0), (0, 128)), 'constant').astype(np.float32)
|
|
|
|
return keypoints, descs_padded, scores
|
|
|
|
def _apply_nms(self, keypoints: np.ndarray, scores: np.ndarray, nms_radius: int) -> np.ndarray:
|
|
# Simplified Mock NMS: Sort by score and keep top 2000 for standard tracking
|
|
if len(scores) == 0:
|
|
return np.array([], dtype=int)
|
|
sorted_indices = np.argsort(scores)[::-1]
|
|
return sorted_indices[:2000]
|
|
|
|
def extract_features(self, image: np.ndarray) -> Features:
|
|
if image is None or image.size == 0:
|
|
return Features(keypoints=np.empty((0, 2)), descriptors=np.empty((0, 256)), scores=np.empty((0,)))
|
|
|
|
preprocessed = self._preprocess_image(image)
|
|
kpts, desc, scores = self._run_superpoint_inference(preprocessed)
|
|
|
|
keep_indices = self._apply_nms(kpts, scores, nms_radius=4)
|
|
|
|
return Features(
|
|
keypoints=kpts[keep_indices],
|
|
descriptors=desc[keep_indices],
|
|
scores=scores[keep_indices]
|
|
)
|
|
|
|
# --- Feature Matching (07.02) ---
|
|
|
|
def _prepare_features_for_lightglue(self, features: Features) -> Dict[str, Any]:
|
|
# In a real implementation, this would convert numpy arrays to torch tensors
|
|
# on the correct device (e.g., 'cuda').
|
|
return {
|
|
'keypoints': features.keypoints,
|
|
'descriptors': features.descriptors,
|
|
'image_size': np.array([1920, 1080]) # Placeholder size
|
|
}
|
|
|
|
def _run_lightglue_inference(self, features1_dict: Dict, features2_dict: Dict) -> Tuple[np.ndarray, np.ndarray]:
|
|
if self.model_manager and hasattr(self.model_manager, 'run_lightglue'):
|
|
return self.model_manager.run_lightglue(features1_dict, features2_dict)
|
|
|
|
# Functional Classical CV Fallback (BFMatcher)
|
|
# Extract the original 128 dimensions (ignoring the padding added in the SIFT fallback)
|
|
desc1 = features1_dict['descriptors'][:, :128].astype(np.float32)
|
|
desc2 = features2_dict['descriptors'][:, :128].astype(np.float32)
|
|
|
|
if len(desc1) == 0 or len(desc2) == 0:
|
|
return np.empty((0, 2), dtype=int), np.empty((0,))
|
|
|
|
matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
|
|
raw_matches = matcher.match(desc1, desc2)
|
|
|
|
if not raw_matches:
|
|
return np.empty((0, 2), dtype=int), np.empty((0,))
|
|
|
|
match_indices = np.array([[m.queryIdx, m.trainIdx] for m in raw_matches])
|
|
|
|
# Map L2 distances into a [0, 1] confidence score so our filter doesn't reject them
|
|
distances = np.array([m.distance for m in raw_matches])
|
|
scores = np.exp(-distances / 100.0).astype(np.float32)
|
|
|
|
return match_indices, scores
|
|
|
|
def _filter_matches_by_confidence(self, matches: np.ndarray, scores: np.ndarray, threshold: float) -> Tuple[np.ndarray, np.ndarray]:
|
|
keep = scores > threshold
|
|
return matches[keep], scores[keep]
|
|
|
|
def _extract_matched_keypoints(self, features1: Features, features2: Features, match_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
kpts1 = features1.keypoints[match_indices[:, 0]]
|
|
kpts2 = features2.keypoints[match_indices[:, 1]]
|
|
return kpts1, kpts2
|
|
|
|
def match_features(self, features1: Features, features2: Features) -> Matches:
|
|
f1_lg = self._prepare_features_for_lightglue(features1)
|
|
f2_lg = self._prepare_features_for_lightglue(features2)
|
|
|
|
raw_matches, raw_scores = self._run_lightglue_inference(f1_lg, f2_lg)
|
|
|
|
# Confidence threshold from LightGlue paper is often around 0.9
|
|
filtered_matches, filtered_scores = self._filter_matches_by_confidence(raw_matches, raw_scores, 0.1)
|
|
|
|
kpts1, kpts2 = self._extract_matched_keypoints(features1, features2, filtered_matches)
|
|
|
|
return Matches(matches=filtered_matches, scores=filtered_scores, keypoints1=kpts1, keypoints2=kpts2)
|
|
|
|
# --- Relative Pose Computation (07.03) ---
|
|
|
|
def _get_camera_matrix(self, camera_params: CameraParameters) -> np.ndarray:
|
|
w = camera_params.resolution.get("width", 1920)
|
|
h = camera_params.resolution.get("height", 1080)
|
|
f_mm = camera_params.focal_length_mm
|
|
sw_mm = camera_params.sensor_width_mm
|
|
f_px = (f_mm / sw_mm) * w if sw_mm > 0 else w
|
|
return np.array([
|
|
[f_px, 0.0, w / 2.0],
|
|
[0.0, f_px, h / 2.0],
|
|
[0.0, 0.0, 1.0]
|
|
], dtype=np.float64)
|
|
|
|
def _normalize_keypoints(self, keypoints: np.ndarray, camera_params: CameraParameters) -> np.ndarray:
|
|
K = self._get_camera_matrix(camera_params)
|
|
fx, fy = K[0, 0], K[1, 1]
|
|
cx, cy = K[0, 2], K[1, 2]
|
|
|
|
normalized = np.empty_like(keypoints, dtype=np.float64)
|
|
if len(keypoints) > 0:
|
|
normalized[:, 0] = (keypoints[:, 0] - cx) / fx
|
|
normalized[:, 1] = (keypoints[:, 1] - cy) / fy
|
|
return normalized
|
|
|
|
def _estimate_essential_matrix(self, points1: np.ndarray, points2: np.ndarray, K: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
|
|
if len(points1) < 8 or len(points2) < 8:
|
|
return None, None
|
|
E, mask = cv2.findEssentialMat(points1, points2, K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
|
|
return E, mask
|
|
|
|
def _decompose_essential_matrix(self, E: np.ndarray, points1: np.ndarray, points2: np.ndarray, K: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
|
|
if E is None or E.shape != (3, 3):
|
|
return None, None
|
|
_, R, t, mask = cv2.recoverPose(E, points1, points2, K)
|
|
return R, t
|
|
|
|
def _compute_tracking_quality(self, inlier_count: int, total_matches: int) -> Tuple[float, bool]:
|
|
if total_matches == 0:
|
|
return 0.0, False
|
|
|
|
inlier_ratio = inlier_count / total_matches
|
|
confidence = min(1.0, inlier_ratio * (inlier_count / 100.0))
|
|
|
|
if inlier_count > 50 and inlier_ratio > 0.5:
|
|
return float(confidence), True
|
|
elif inlier_count >= 20:
|
|
return float(confidence * 0.5), True # Degraded
|
|
return 0.0, False # Lost
|
|
|
|
def _build_relative_pose(self, motion: Motion, matches: Matches) -> RelativePose:
|
|
confidence, tracking_good = self._compute_tracking_quality(motion.inlier_count, len(matches.matches))
|
|
return RelativePose(
|
|
translation=motion.translation.flatten(),
|
|
rotation=motion.rotation,
|
|
confidence=confidence,
|
|
inlier_count=motion.inlier_count,
|
|
total_matches=len(matches.matches),
|
|
tracking_good=tracking_good,
|
|
scale_ambiguous=True
|
|
)
|
|
|
|
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Optional[Motion]:
|
|
if len(matches.matches) < 8:
|
|
return None
|
|
|
|
K = self._get_camera_matrix(camera_params)
|
|
pts1, pts2 = matches.keypoints1, matches.keypoints2
|
|
|
|
E, mask = self._estimate_essential_matrix(pts1, pts2, K)
|
|
R, t = self._decompose_essential_matrix(E, pts1, pts2, K)
|
|
if R is None or t is None:
|
|
return None
|
|
|
|
inliers = mask.flatten() == 1 if mask is not None else np.zeros(len(pts1), dtype=bool)
|
|
return Motion(translation=t, rotation=R, inliers=inliers, inlier_count=int(np.sum(inliers)))
|
|
|
|
def compute_relative_pose(self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: Optional[CameraParameters] = None) -> Optional[RelativePose]:
|
|
if camera_params is None:
|
|
camera_params = CameraParameters(focal_length_mm=25.0, sensor_width_mm=36.0, resolution={"width": 1920, "height": 1080})
|
|
|
|
feat1 = self.extract_features(prev_image)
|
|
feat2 = self.extract_features(curr_image)
|
|
|
|
matches = self.match_features(feat1, feat2)
|
|
motion = self.estimate_motion(matches, camera_params)
|
|
|
|
if motion is None:
|
|
return None
|
|
return self._build_relative_pose(motion, matches) |