feat: stage7 — Model Manager (F16) and Sequential VO (F07)

This commit is contained in:
Yuzviak
2026-03-22 22:59:55 +02:00
parent 9ef046d623
commit 058ed315dd
8 changed files with 494 additions and 2 deletions
+122
View File
@@ -0,0 +1,122 @@
"""Model Manager (Component F16)."""
import logging
from abc import ABC, abstractmethod
from typing import Any
import numpy as np
from gps_denied.schemas.model import InferenceEngine
logger = logging.getLogger(__name__)
class IModelManager(ABC):
@abstractmethod
def load_model(self, model_name: str, model_format: str) -> bool:
pass
@abstractmethod
def get_inference_engine(self, model_name: str) -> InferenceEngine:
pass
@abstractmethod
def optimize_to_tensorrt(self, model_name: str, onnx_path: str) -> str:
pass
@abstractmethod
def fallback_to_onnx(self, model_name: str) -> bool:
pass
@abstractmethod
def warmup_model(self, model_name: str) -> bool:
pass
class MockInferenceEngine(InferenceEngine):
"""A mock implementation of Inference Engine for rapid testing."""
def infer(self, input_data: Any) -> Any:
if self.model_name == "SuperPoint":
# Mock extracting 500 features
n_features = 500
# Assuming input_data is an image of shape (H, W, 3)
h, w = input_data.shape[:2] if hasattr(input_data, "shape") else (480, 640)
keypoints = np.random.rand(n_features, 2) * [w, h]
descriptors = np.random.rand(n_features, 256)
scores = np.random.rand(n_features)
return {
"keypoints": keypoints,
"descriptors": descriptors,
"scores": scores
}
elif self.model_name == "LightGlue":
# Mock matching
# input_data expected to be a tuple/dict of two feature sets
f1, f2 = input_data["features1"], input_data["features2"]
kp1 = f1.keypoints
kp2 = f2.keypoints
# Create ~100 random matches
n_matches = min(100, len(kp1), len(kp2))
indices1 = np.random.choice(len(kp1), n_matches, replace=False)
indices2 = np.random.choice(len(kp2), n_matches, replace=False)
matches = np.stack([indices1, indices2], axis=1)
scores = np.random.rand(n_matches)
return {
"matches": matches,
"scores": scores,
"keypoints1": kp1[indices1],
"keypoints2": kp2[indices2]
}
elif self.model_name == "LiteSAM":
# Just a placeholder for F09
pass
raise ValueError(f"Unknown mock model: {self.model_name}")
class ModelManager(IModelManager):
"""Manages ML models lifecycle and provisioning."""
def __init__(self):
self._loaded_models: dict[str, InferenceEngine] = {}
def load_model(self, model_name: str, model_format: str) -> bool:
"""Loads a model (or mock)."""
logger.info(f"Loading {model_name} in format {model_format}")
# For prototype, we strictly use Mock
engine = MockInferenceEngine(model_name, model_format)
self._loaded_models[model_name] = engine
self.warmup_model(model_name)
return True
def get_inference_engine(self, model_name: str) -> InferenceEngine:
"""Gets an inference engine for a specific model."""
if model_name not in self._loaded_models:
# Auto load if not loaded
self.load_model(model_name, "mock")
return self._loaded_models[model_name]
def optimize_to_tensorrt(self, model_name: str, onnx_path: str) -> str:
"""Placeholder for TensorRT optimization."""
return f"{onnx_path}.trt"
def fallback_to_onnx(self, model_name: str) -> bool:
"""Placeholder for fallback logic."""
logger.warning(f"Falling back to ONNX for {model_name}")
return True
def warmup_model(self, model_name: str) -> bool:
"""Warms up a loaded model."""
logger.info(f"Warming up {model_name}")
return True
+147
View File
@@ -0,0 +1,147 @@
"""Sequential Visual Odometry (Component F07)."""
import logging
from abc import ABC, abstractmethod
import cv2
import numpy as np
from gps_denied.core.models import IModelManager
from gps_denied.schemas.flight import CameraParameters
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
logger = logging.getLogger(__name__)
class ISequentialVisualOdometry(ABC):
@abstractmethod
def compute_relative_pose(
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
) -> RelativePose | None:
pass
@abstractmethod
def extract_features(self, image: np.ndarray) -> Features:
pass
@abstractmethod
def match_features(self, features1: Features, features2: Features) -> Matches:
pass
@abstractmethod
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None:
pass
class SequentialVisualOdometry(ISequentialVisualOdometry):
"""Frame-to-frame visual odometry using SuperPoint + LightGlue."""
def __init__(self, model_manager: IModelManager):
self.model_manager = model_manager
def extract_features(self, image: np.ndarray) -> Features:
"""Extracts keypoints and descriptors using SuperPoint."""
engine = self.model_manager.get_inference_engine("SuperPoint")
result = engine.infer(image)
return Features(
keypoints=result["keypoints"],
descriptors=result["descriptors"],
scores=result["scores"]
)
def match_features(self, features1: Features, features2: Features) -> Matches:
"""Matches features using LightGlue."""
engine = self.model_manager.get_inference_engine("LightGlue")
result = engine.infer({
"features1": features1,
"features2": features2
})
return Matches(
matches=result["matches"],
scores=result["scores"],
keypoints1=result["keypoints1"],
keypoints2=result["keypoints2"]
)
def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None:
"""Estimates camera motion using Essential Matrix (RANSAC)."""
inlier_threshold = 20
if len(matches.matches) < 8:
return None
pts1 = np.ascontiguousarray(matches.keypoints1)
pts2 = np.ascontiguousarray(matches.keypoints2)
# Build camera matrix
f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width)
if camera_params.principal_point:
cx, cy = camera_params.principal_point
else:
cx = camera_params.resolution_width / 2.0
cy = camera_params.resolution_height / 2.0
K = np.array([
[f_px, 0, cx],
[0, f_px, cy],
[0, 0, 1]
], dtype=np.float64)
try:
E, inliers = cv2.findEssentialMat(
pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0
)
except Exception as e:
logger.error(f"Error finding essential matrix: {e}")
return None
if E is None or E.shape != (3, 3):
return None
inliers_mask = inliers.flatten().astype(bool)
inlier_count = np.sum(inliers_mask)
if inlier_count < inlier_threshold:
logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}")
return None
# Recover pose
try:
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
except Exception as e:
logger.error(f"Error recovering pose: {e}")
return None
return Motion(
translation=t.flatten(),
rotation=R,
inliers=inliers_mask,
inlier_count=inlier_count
)
def compute_relative_pose(
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
) -> RelativePose | None:
"""Computes relative pose between two frames."""
f1 = self.extract_features(prev_image)
f2 = self.extract_features(curr_image)
matches = self.match_features(f1, f2)
motion = self.estimate_motion(matches, camera_params)
if motion is None:
return None
tracking_good = motion.inlier_count > 50
return RelativePose(
translation=motion.translation,
rotation=motion.rotation,
confidence=float(motion.inlier_count / max(1, len(matches.matches))),
inlier_count=motion.inlier_count,
total_matches=len(matches.matches),
tracking_good=tracking_good,
scale_ambiguous=True
)