feat(01-02): add migration-target Protocols for vio/gpr/satellite_matcher/mavlink_io/coordinate_transforms (ARCH-05)

- VisualOdometry mirrors ISequentialVisualOdometry (4 methods)
- GlobalPlaceRecognition mirrors IGlobalPlaceRecognition (7 methods)
- SatelliteTileLoader mirrors SatelliteDataManager public API (11 methods)
- MetricRefiner mirrors IMetricRefinement (6 methods)
- MAVLinkBridgeProtocol mirrors MAVLinkBridge public API (8 methods)
- CoordinateTransformsProtocol mirrors CoordinateTransformer (9 methods)
- All Protocols runtime_checkable; backwards-compat I-prefixed aliases
  exposed for vio/gpr/metric (deprecated in Phase 2)
- Pure-additive: zero existing files touched
- isinstance check confirms SatelliteDataManager and CoordinateTransformer
  already satisfy the new Protocols structurally
This commit is contained in:
Yuzviak
2026-05-10 22:54:44 +03:00
parent b03567e551
commit 622b1a1ebe
5 changed files with 288 additions and 0 deletions
@@ -0,0 +1,68 @@
"""Protocol surface for the coordinate_transforms component (ARCH-05).
Per ARCH-04 the implementation stays in ``core/coordinates.py`` —
this directory holds ONLY the Protocol so the ARCH-01 directory
inventory is complete. Method signatures mirror the concrete
``CoordinateTransformer`` public surface.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
import numpy as np
from gps_denied.schemas import CameraParameters, GPSPoint
@runtime_checkable
class CoordinateTransformsProtocol(Protocol):
"""ENU origin management + WGS84⇄ENU⇄pixel transforms."""
def set_enu_origin(self, flight_id: str, origin_gps: GPSPoint) -> None: ...
def get_enu_origin(self, flight_id: str) -> GPSPoint: ...
def gps_to_enu(
self, flight_id: str, gps: GPSPoint
) -> tuple[float, float, float]: ...
def enu_to_gps(
self, flight_id: str, enu: tuple[float, float, float]
) -> GPSPoint: ...
def pixel_to_gps(
self,
flight_id: str,
pixel: tuple[float, float],
frame_pose: dict,
camera_params: CameraParameters,
altitude: float,
quaternion: np.ndarray | None = None,
) -> GPSPoint: ...
def gps_to_pixel(
self,
flight_id: str,
gps: GPSPoint,
frame_pose: dict,
camera_params: CameraParameters,
altitude: float,
quaternion: np.ndarray | None = None,
) -> tuple[float, float]: ...
def image_object_to_gps(
self,
flight_id: str,
frame_id: int,
object_pixel: tuple[float, float],
frame_pose: dict | None = None,
camera_params: CameraParameters | None = None,
altitude: float = 100.0,
quaternion: np.ndarray | None = None,
) -> GPSPoint: ...
def transform_points(
self,
points: list[tuple[float, float]],
transformation: list[list[float]],
) -> list[tuple[float, float]]: ...
+43
View File
@@ -0,0 +1,43 @@
"""Protocol surface for the GPR component (ARCH-05).
Phase 1: mirrors ``IGlobalPlaceRecognition`` from ``core/gpr.py``.
Adapters move here in Plan 05 (GPR).
"""
from __future__ import annotations
from typing import List, Protocol, runtime_checkable
import numpy as np
from gps_denied.schemas.gpr import DatabaseMatch, TileCandidate
@runtime_checkable
class GlobalPlaceRecognition(Protocol):
"""Coarse localisation surface (mirrors IGlobalPlaceRecognition)."""
def retrieve_candidate_tiles(
self, image: np.ndarray, top_k: int
) -> List[TileCandidate]: ...
def compute_location_descriptor(self, image: np.ndarray) -> np.ndarray: ...
def query_database(
self, descriptor: np.ndarray, top_k: int
) -> List[DatabaseMatch]: ...
def rank_candidates(
self, candidates: List[TileCandidate]
) -> List[TileCandidate]: ...
def load_index(self, flight_id: str, index_path: str) -> bool: ...
def retrieve_candidate_tiles_for_chunk(
self, chunk_images: List[np.ndarray], top_k: int
) -> List[TileCandidate]: ...
def compute_chunk_descriptor(self, chunk_images: List[np.ndarray]) -> np.ndarray: ...
# Backwards-compat alias.
IGlobalPlaceRecognition = GlobalPlaceRecognition
@@ -0,0 +1,39 @@
"""Protocol surface for the MAVLink I/O component (ARCH-05).
Phase 1: mirrors the concrete ``MAVLinkBridge`` public surface from
``core/mavlink.py`` (no ABC today). Adapters move here in Plan 07
(mavlink_io); private helpers ``_confidence_to_fix_type`` and
``_eskf_to_gps_input`` MUST stay re-exported from the old path.
"""
from __future__ import annotations
from typing import Callable, Optional, Protocol, runtime_checkable
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ESKFState, IMUMeasurement
from gps_denied.schemas.mavlink import GPSInputMessage, RelocalizationRequest
@runtime_checkable
class MAVLinkBridgeProtocol(Protocol):
"""Public surface of the MAVLink GPS_INPUT/IMU/telemetry bridge."""
def set_imu_callback(
self, cb: Callable[[IMUMeasurement], None]
) -> None: ...
def set_reloc_callback(
self, cb: Callable[[RelocalizationRequest], None]
) -> None: ...
def update_state(self, state: ESKFState, altitude_m: float = 0.0) -> None: ...
def notify_satellite_correction(self) -> None: ...
def update_drift_estimate(self, drift_m: float) -> None: ...
async def start(self, origin: GPSPoint) -> None: ...
async def stop(self) -> None: ...
def build_gps_input(self) -> Optional[GPSInputMessage]: ...
@@ -0,0 +1,103 @@
"""Protocol surfaces for the satellite_matcher component (ARCH-05).
Two Protocols live here per PATTERNS.md §3:
* ``SatelliteTileLoader`` — mirrors the concrete ``SatelliteDataManager``
public surface from ``core/satellite.py`` (no existing ABC).
* ``MetricRefiner`` — mirrors ``IMetricRefinement`` from ``core/metric.py``.
Adapters move here in Plan 06 (satellite_matcher).
"""
from __future__ import annotations
from typing import List, Optional, Protocol, Tuple, runtime_checkable
import numpy as np
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult
from gps_denied.schemas.satellite import TileBounds, TileCoords
@runtime_checkable
class SatelliteTileLoader(Protocol):
"""Local satellite-tile reader / mosaic builder.
Mirrors the public surface of ``SatelliteDataManager`` (no ABC today).
"""
def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None: ...
def select_tiles_for_eskf_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> list[TileCoords]: ...
def assemble_mosaic(
self,
tile_list: list[tuple[TileCoords, np.ndarray]],
target_size: int = 512,
) -> tuple[np.ndarray, TileBounds] | None: ...
def fetch_tiles_for_position(
self, gps: GPSPoint, sigma_h_m: float, zoom: int
) -> tuple[np.ndarray, TileBounds] | None: ...
def get_cached_tile(
self, flight_id: str, tile_coords: TileCoords
) -> np.ndarray | None: ...
def cache_tile(
self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray
) -> bool: ...
def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords: ...
def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds: ...
def clear_flight_cache(self, flight_id: str) -> bool: ...
def expand_search_grid(
self, center: TileCoords, current_size: int, new_size: int
) -> list[TileCoords]: ...
def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]: ...
@runtime_checkable
class MetricRefiner(Protocol):
"""LiteSAM-style satellite alignment surface (mirrors IMetricRefinement)."""
def align_to_satellite(
self,
uav_image: np.ndarray,
satellite_tile: np.ndarray,
tile_bounds: TileBounds,
) -> Optional[AlignmentResult]: ...
def compute_homography(
self, uav_image: np.ndarray, satellite_tile: np.ndarray
) -> Optional[np.ndarray]: ...
def extract_gps_from_alignment(
self,
homography: np.ndarray,
tile_bounds: TileBounds,
image_center: Tuple[int, int],
) -> GPSPoint: ...
def compute_match_confidence(self, alignment: AlignmentResult) -> float: ...
def align_chunk_to_satellite(
self,
chunk_images: List[np.ndarray],
satellite_tile: np.ndarray,
tile_bounds: TileBounds,
) -> Optional[ChunkAlignmentResult]: ...
def match_chunk_homography(
self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray
) -> Optional[np.ndarray]: ...
# Backwards-compat alias for the only ABC that previously existed.
IMetricRefinement = MetricRefiner
+35
View File
@@ -0,0 +1,35 @@
"""Protocol surface for the VIO component (ARCH-05).
Phase 1: defines the Protocol that concrete adapters in this directory
implement. Method signatures mirror ``ISequentialVisualOdometry`` from
``core/vo.py``. Adapters are NOT moved here yet — see Plan 04 (VIO).
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
import numpy as np
from gps_denied.schemas import CameraParameters
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
@runtime_checkable
class VisualOdometry(Protocol):
"""Sequential visual odometry surface (mirrors ISequentialVisualOdometry)."""
def compute_relative_pose(
self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters
) -> RelativePose | None: ...
def extract_features(self, image: np.ndarray) -> Features: ...
def match_features(self, features1: Features, features2: Features) -> Matches: ...
def estimate_motion(
self, matches: Matches, camera_params: CameraParameters
) -> Motion | None: ...
# Backwards-compat alias — Phase 2 will deprecate the I-prefix.
ISequentialVisualOdometry = VisualOdometry