From 058ed315dd58d3be77d8eeb9cb7ef897fd775595 Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Sun, 22 Mar 2026 22:59:55 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20stage7=20=E2=80=94=20Model=20Manager=20?= =?UTF-8?q?(F16)=20and=20Sequential=20VO=20(F07)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 + docs-Lokal/LOCAL_EXECUTION_PLAN.md | 4 +- src/gps_denied/core/models.py | 122 ++++++++++++++++++++++++ src/gps_denied/core/vo.py | 147 +++++++++++++++++++++++++++++ src/gps_denied/schemas/model.py | 21 +++++ src/gps_denied/schemas/vo.py | 49 ++++++++++ tests/test_models.py | 49 ++++++++++ tests/test_vo.py | 102 ++++++++++++++++++++ 8 files changed, 494 insertions(+), 2 deletions(-) create mode 100644 src/gps_denied/core/models.py create mode 100644 src/gps_denied/core/vo.py create mode 100644 src/gps_denied/schemas/model.py create mode 100644 src/gps_denied/schemas/vo.py create mode 100644 tests/test_models.py create mode 100644 tests/test_vo.py diff --git a/README.md b/README.md index 9f83a0b..fe95645 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ | **Трансформація координат (F13)** | Зберігання локального ENU Origin, конвертація WGS84 ↔ Local ENU ↔ Pixels | | **Вхідний пайплайн (F05)** | `cv2`, `asyncio.Queue`. Керує FIFO чергою батчів кадрів з БПЛА, здійснює базову валідацію послідовностей та збереження фотографій на диск. | | **Менеджер ротацій (F06)** | Оберти 360° блоками по 30° для підбору орієнтації; трекінг історії курсу з виявленням різких поворотів (>45°). | +| **Model Manager (F16)** | Архітектура завантаження ML моделей (Mock/Fallback). | +| **Візуальна Одометрія (F07)** | Суперпоінт / LightGlue імітація. OpenCV (`findEssentialMat` + RANSAC + `recoverPose`) для розрахунку відносного руху між кадрами без відомого масштабу. | | **Граф поз (VO/GPR)** | GTSAM (Python) - очікується в наступних етапах | ## Швидкий старт diff --git a/docs-Lokal/LOCAL_EXECUTION_PLAN.md b/docs-Lokal/LOCAL_EXECUTION_PLAN.md index 48317c1..2d78fb6 100644 --- a/docs-Lokal/LOCAL_EXECUTION_PLAN.md +++ b/docs-Lokal/LOCAL_EXECUTION_PLAN.md @@ -88,8 +88,8 @@ - FIFO батчів (ImageInputPipeline), менеджер ротацій (ImageRotationManager). - Асинхронне збереження в кеш `image_storage`. -### Етап 7 — Model manager та послідовний VO -- Завантаження локальних вагів (SuperPoint+LightGlue), побудова ланцюжка відносних оцінок. +### Етап 7 — Model manager та послідовний VO ✅ +- Завантаження локальних вагів (SuperPoint+LightGlue - Mock), побудова ланцюжка відносних оцінок (`SequentialVisualOdometry`). ### Етап 8 — Глобальне місце та метричне уточнення - Кросс-вью вирівнювання до тайла Google Maps. diff --git a/src/gps_denied/core/models.py b/src/gps_denied/core/models.py new file mode 100644 index 0000000..3e304c0 --- /dev/null +++ b/src/gps_denied/core/models.py @@ -0,0 +1,122 @@ +"""Model Manager (Component F16).""" + +import logging +from abc import ABC, abstractmethod +from typing import Any + +import numpy as np + +from gps_denied.schemas.model import InferenceEngine + +logger = logging.getLogger(__name__) + + +class IModelManager(ABC): + @abstractmethod + def load_model(self, model_name: str, model_format: str) -> bool: + pass + + @abstractmethod + def get_inference_engine(self, model_name: str) -> InferenceEngine: + pass + + @abstractmethod + def optimize_to_tensorrt(self, model_name: str, onnx_path: str) -> str: + pass + + @abstractmethod + def fallback_to_onnx(self, model_name: str) -> bool: + pass + + @abstractmethod + def warmup_model(self, model_name: str) -> bool: + pass + + +class MockInferenceEngine(InferenceEngine): + """A mock implementation of Inference Engine for rapid testing.""" + def infer(self, input_data: Any) -> Any: + if self.model_name == "SuperPoint": + # Mock extracting 500 features + n_features = 500 + # Assuming input_data is an image of shape (H, W, 3) + h, w = input_data.shape[:2] if hasattr(input_data, "shape") else (480, 640) + + keypoints = np.random.rand(n_features, 2) * [w, h] + descriptors = np.random.rand(n_features, 256) + scores = np.random.rand(n_features) + + return { + "keypoints": keypoints, + "descriptors": descriptors, + "scores": scores + } + + elif self.model_name == "LightGlue": + # Mock matching + # input_data expected to be a tuple/dict of two feature sets + f1, f2 = input_data["features1"], input_data["features2"] + kp1 = f1.keypoints + kp2 = f2.keypoints + + # Create ~100 random matches + n_matches = min(100, len(kp1), len(kp2)) + + indices1 = np.random.choice(len(kp1), n_matches, replace=False) + indices2 = np.random.choice(len(kp2), n_matches, replace=False) + + matches = np.stack([indices1, indices2], axis=1) + scores = np.random.rand(n_matches) + + return { + "matches": matches, + "scores": scores, + "keypoints1": kp1[indices1], + "keypoints2": kp2[indices2] + } + + elif self.model_name == "LiteSAM": + # Just a placeholder for F09 + pass + + raise ValueError(f"Unknown mock model: {self.model_name}") + + +class ModelManager(IModelManager): + """Manages ML models lifecycle and provisioning.""" + + def __init__(self): + self._loaded_models: dict[str, InferenceEngine] = {} + + def load_model(self, model_name: str, model_format: str) -> bool: + """Loads a model (or mock).""" + logger.info(f"Loading {model_name} in format {model_format}") + + # For prototype, we strictly use Mock + engine = MockInferenceEngine(model_name, model_format) + self._loaded_models[model_name] = engine + + self.warmup_model(model_name) + return True + + def get_inference_engine(self, model_name: str) -> InferenceEngine: + """Gets an inference engine for a specific model.""" + if model_name not in self._loaded_models: + # Auto load if not loaded + self.load_model(model_name, "mock") + + return self._loaded_models[model_name] + + def optimize_to_tensorrt(self, model_name: str, onnx_path: str) -> str: + """Placeholder for TensorRT optimization.""" + return f"{onnx_path}.trt" + + def fallback_to_onnx(self, model_name: str) -> bool: + """Placeholder for fallback logic.""" + logger.warning(f"Falling back to ONNX for {model_name}") + return True + + def warmup_model(self, model_name: str) -> bool: + """Warms up a loaded model.""" + logger.info(f"Warming up {model_name}") + return True diff --git a/src/gps_denied/core/vo.py b/src/gps_denied/core/vo.py new file mode 100644 index 0000000..0714921 --- /dev/null +++ b/src/gps_denied/core/vo.py @@ -0,0 +1,147 @@ +"""Sequential Visual Odometry (Component F07).""" + +import logging +from abc import ABC, abstractmethod + +import cv2 +import numpy as np + +from gps_denied.core.models import IModelManager +from gps_denied.schemas.flight import CameraParameters +from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose + +logger = logging.getLogger(__name__) + + +class ISequentialVisualOdometry(ABC): + @abstractmethod + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> RelativePose | None: + pass + + @abstractmethod + def extract_features(self, image: np.ndarray) -> Features: + pass + + @abstractmethod + def match_features(self, features1: Features, features2: Features) -> Matches: + pass + + @abstractmethod + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None: + pass + + +class SequentialVisualOdometry(ISequentialVisualOdometry): + """Frame-to-frame visual odometry using SuperPoint + LightGlue.""" + + def __init__(self, model_manager: IModelManager): + self.model_manager = model_manager + + def extract_features(self, image: np.ndarray) -> Features: + """Extracts keypoints and descriptors using SuperPoint.""" + engine = self.model_manager.get_inference_engine("SuperPoint") + result = engine.infer(image) + + return Features( + keypoints=result["keypoints"], + descriptors=result["descriptors"], + scores=result["scores"] + ) + + def match_features(self, features1: Features, features2: Features) -> Matches: + """Matches features using LightGlue.""" + engine = self.model_manager.get_inference_engine("LightGlue") + result = engine.infer({ + "features1": features1, + "features2": features2 + }) + + return Matches( + matches=result["matches"], + scores=result["scores"], + keypoints1=result["keypoints1"], + keypoints2=result["keypoints2"] + ) + + def estimate_motion(self, matches: Matches, camera_params: CameraParameters) -> Motion | None: + """Estimates camera motion using Essential Matrix (RANSAC).""" + inlier_threshold = 20 + if len(matches.matches) < 8: + return None + + pts1 = np.ascontiguousarray(matches.keypoints1) + pts2 = np.ascontiguousarray(matches.keypoints2) + + # Build camera matrix + f_px = camera_params.focal_length * (camera_params.resolution_width / camera_params.sensor_width) + if camera_params.principal_point: + cx, cy = camera_params.principal_point + else: + cx = camera_params.resolution_width / 2.0 + cy = camera_params.resolution_height / 2.0 + + K = np.array([ + [f_px, 0, cx], + [0, f_px, cy], + [0, 0, 1] + ], dtype=np.float64) + + try: + E, inliers = cv2.findEssentialMat( + pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0 + ) + except Exception as e: + logger.error(f"Error finding essential matrix: {e}") + return None + + if E is None or E.shape != (3, 3): + return None + + inliers_mask = inliers.flatten().astype(bool) + inlier_count = np.sum(inliers_mask) + + if inlier_count < inlier_threshold: + logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}") + return None + + # Recover pose + try: + _, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers) + except Exception as e: + logger.error(f"Error recovering pose: {e}") + return None + + return Motion( + translation=t.flatten(), + rotation=R, + inliers=inliers_mask, + inlier_count=inlier_count + ) + + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> RelativePose | None: + """Computes relative pose between two frames.""" + f1 = self.extract_features(prev_image) + f2 = self.extract_features(curr_image) + + matches = self.match_features(f1, f2) + + motion = self.estimate_motion(matches, camera_params) + + if motion is None: + return None + + tracking_good = motion.inlier_count > 50 + + return RelativePose( + translation=motion.translation, + rotation=motion.rotation, + confidence=float(motion.inlier_count / max(1, len(matches.matches))), + inlier_count=motion.inlier_count, + total_matches=len(matches.matches), + tracking_good=tracking_good, + scale_ambiguous=True + ) diff --git a/src/gps_denied/schemas/model.py b/src/gps_denied/schemas/model.py new file mode 100644 index 0000000..1a06bd2 --- /dev/null +++ b/src/gps_denied/schemas/model.py @@ -0,0 +1,21 @@ +"""Model Manager schemas (Component F16).""" + +from typing import Any +from pydantic import BaseModel + +class ModelConfig(BaseModel): + """Configuration for an ML model.""" + model_name: str + model_path: str + format: str + precision: str # "fp16", "fp32" + warmup_iterations: int = 3 + +class InferenceEngine: + """Base definition for an inference engine.""" + def __init__(self, model_name: str, format_: str): + self.model_name = model_name + self.format = format_ + + def infer(self, input_data: Any) -> Any: + raise NotImplementedError diff --git a/src/gps_denied/schemas/vo.py b/src/gps_denied/schemas/vo.py new file mode 100644 index 0000000..eb1b6af --- /dev/null +++ b/src/gps_denied/schemas/vo.py @@ -0,0 +1,49 @@ +"""Sequential Visual Odometry schemas (Component F07).""" + +from typing import Optional + +import numpy as np +from pydantic import BaseModel + + +class Features(BaseModel): + """Extracted image features (e.g., from SuperPoint).""" + model_config = {"arbitrary_types_allowed": True} + + keypoints: np.ndarray # (N, 2) + descriptors: np.ndarray # (N, 256) + scores: np.ndarray # (N,) + + +class Matches(BaseModel): + """Matches between two sets of features (e.g., from LightGlue).""" + model_config = {"arbitrary_types_allowed": True} + + matches: np.ndarray # (M, 2) + scores: np.ndarray # (M,) + keypoints1: np.ndarray # (M, 2) + keypoints2: np.ndarray # (M, 2) + + +class RelativePose(BaseModel): + """Relative pose between two frames.""" + model_config = {"arbitrary_types_allowed": True} + + translation: np.ndarray # (3,) + rotation: np.ndarray # (3, 3) + confidence: float + inlier_count: int + total_matches: int + tracking_good: bool + scale_ambiguous: bool = True + chunk_id: Optional[str] = None + + +class Motion(BaseModel): + """Motion estimate from OpenCV.""" + model_config = {"arbitrary_types_allowed": True} + + translation: np.ndarray # (3,) unit vector + rotation: np.ndarray # (3, 3) rotation matrix + inliers: np.ndarray # Boolean mask of inliers + inlier_count: int diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..ee171c2 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,49 @@ +"""Tests for Model Manager (F16).""" + +import numpy as np +from gps_denied.core.models import ModelManager + +def test_load_and_get_model(): + manager = ModelManager() + + # Should auto-load + engine = manager.get_inference_engine("SuperPoint") + assert engine.model_name == "SuperPoint" + + # Check fallback/dummy + assert manager.fallback_to_onnx("SuperPoint") is True + assert manager.optimize_to_tensorrt("SuperPoint", "path.onnx") == "path.onnx.trt" + + +def test_mock_superpoint(): + manager = ModelManager() + engine = manager.get_inference_engine("SuperPoint") + + dummy_img = np.zeros((480, 640, 3), dtype=np.uint8) + res = engine.infer(dummy_img) + + assert "keypoints" in res + assert "descriptors" in res + assert "scores" in res + assert len(res["keypoints"]) == 500 + assert res["descriptors"].shape == (500, 256) + + +def test_mock_lightglue(): + manager = ModelManager() + engine = manager.get_inference_engine("LightGlue") + + # Need mock features + class DummyF: + def __init__(self, keypoints): + self.keypoints = keypoints + + f1 = DummyF(np.random.rand(120, 2)) + f2 = DummyF(np.random.rand(150, 2)) + + res = engine.infer({"features1": f1, "features2": f2}) + + assert "matches" in res + assert len(res["matches"]) == 100 # min(100, 120, 150) + assert res["keypoints1"].shape == (100, 2) + assert res["keypoints2"].shape == (100, 2) diff --git a/tests/test_vo.py b/tests/test_vo.py new file mode 100644 index 0000000..ff4b7ff --- /dev/null +++ b/tests/test_vo.py @@ -0,0 +1,102 @@ +"""Tests for Sequential Visual Odometry (F07).""" + +import numpy as np +import pytest + +from gps_denied.core.models import ModelManager +from gps_denied.core.vo import SequentialVisualOdometry +from gps_denied.schemas.flight import CameraParameters +from gps_denied.schemas.vo import Features, Matches + + +@pytest.fixture +def vo(): + manager = ModelManager() + return SequentialVisualOdometry(manager) + + +@pytest.fixture +def cam_params(): + return CameraParameters( + focal_length=5.0, + sensor_width=6.4, + sensor_height=4.8, + resolution_width=640, + resolution_height=480, + principal_point=(320.0, 240.0) + ) + + +def test_extract_features(vo): + img = np.zeros((480, 640, 3), dtype=np.uint8) + features = vo.extract_features(img) + + assert isinstance(features, Features) + assert features.keypoints.shape == (500, 2) + assert features.descriptors.shape == (500, 256) + + +def test_match_features(vo): + f1 = Features( + keypoints=np.random.rand(100, 2), + descriptors=np.random.rand(100, 256), + scores=np.random.rand(100) + ) + f2 = Features( + keypoints=np.random.rand(100, 2), + descriptors=np.random.rand(100, 256), + scores=np.random.rand(100) + ) + + matches = vo.match_features(f1, f2) + assert isinstance(matches, Matches) + assert matches.matches.shape == (100, 2) + + +def test_estimate_motion_insufficient_matches(vo, cam_params): + matches = Matches( + matches=np.zeros((5, 2)), + scores=np.zeros(5), + keypoints1=np.zeros((5, 2)), + keypoints2=np.zeros((5, 2)) + ) + + # Less than 8 points should return None + motion = vo.estimate_motion(matches, cam_params) + assert motion is None + + +def test_estimate_motion_synthetic(vo, cam_params): + # To reliably test compute_relative_pose, we create points strictly satisfying epipolar constraint + # Simple straight motion: Add a small shift on X axis + n_pts = 100 + pts1 = np.random.rand(n_pts, 2) * 400 + 100 + pts2 = pts1 + np.array([10.0, 0.0]) # moving 10 pixels right + + matches = Matches( + matches=np.column_stack([np.arange(n_pts), np.arange(n_pts)]), + scores=np.ones(n_pts), + keypoints1=pts1, + keypoints2=pts2 + ) + + motion = vo.estimate_motion(matches, cam_params) + assert motion is not None + assert motion.inlier_count > 20 + assert motion.translation.shape == (3,) + assert motion.rotation.shape == (3, 3) + + +def test_compute_relative_pose(vo, cam_params): + img1 = np.zeros((480, 640, 3), dtype=np.uint8) + img2 = np.zeros((480, 640, 3), dtype=np.uint8) + + # Given the random nature of our mock, OpenCV's findEssentialMat will likely find 0 inliers + # or fail. We expect compute_relative_pose to gracefully return None or low confidence. + pose = vo.compute_relative_pose(img1, img2, cam_params) + + if pose is not None: + assert pose.translation.shape == (3,) + assert pose.rotation.shape == (3, 3) + # Because we randomize points in the mock manager, inliers will be extremely low + assert pose.tracking_good is False