diff --git a/src/gps_denied/components/coordinate_transforms/protocol.py b/src/gps_denied/components/coordinate_transforms/protocol.py new file mode 100644 index 0000000..395af9b --- /dev/null +++ b/src/gps_denied/components/coordinate_transforms/protocol.py @@ -0,0 +1,68 @@ +"""Protocol surface for the coordinate_transforms component (ARCH-05). + +Per ARCH-04 the implementation stays in ``core/coordinates.py`` — +this directory holds ONLY the Protocol so the ARCH-01 directory +inventory is complete. Method signatures mirror the concrete +``CoordinateTransformer`` public surface. +""" +from __future__ import annotations + +from typing import Protocol, runtime_checkable + +import numpy as np + +from gps_denied.schemas import CameraParameters, GPSPoint + + +@runtime_checkable +class CoordinateTransformsProtocol(Protocol): + """ENU origin management + WGS84⇄ENU⇄pixel transforms.""" + + def set_enu_origin(self, flight_id: str, origin_gps: GPSPoint) -> None: ... + + def get_enu_origin(self, flight_id: str) -> GPSPoint: ... + + def gps_to_enu( + self, flight_id: str, gps: GPSPoint + ) -> tuple[float, float, float]: ... + + def enu_to_gps( + self, flight_id: str, enu: tuple[float, float, float] + ) -> GPSPoint: ... + + def pixel_to_gps( + self, + flight_id: str, + pixel: tuple[float, float], + frame_pose: dict, + camera_params: CameraParameters, + altitude: float, + quaternion: np.ndarray | None = None, + ) -> GPSPoint: ... + + def gps_to_pixel( + self, + flight_id: str, + gps: GPSPoint, + frame_pose: dict, + camera_params: CameraParameters, + altitude: float, + quaternion: np.ndarray | None = None, + ) -> tuple[float, float]: ... + + def image_object_to_gps( + self, + flight_id: str, + frame_id: int, + object_pixel: tuple[float, float], + frame_pose: dict | None = None, + camera_params: CameraParameters | None = None, + altitude: float = 100.0, + quaternion: np.ndarray | None = None, + ) -> GPSPoint: ... + + def transform_points( + self, + points: list[tuple[float, float]], + transformation: list[list[float]], + ) -> list[tuple[float, float]]: ... diff --git a/src/gps_denied/components/gpr/protocol.py b/src/gps_denied/components/gpr/protocol.py new file mode 100644 index 0000000..d4c52b9 --- /dev/null +++ b/src/gps_denied/components/gpr/protocol.py @@ -0,0 +1,43 @@ +"""Protocol surface for the GPR component (ARCH-05). + +Phase 1: mirrors ``IGlobalPlaceRecognition`` from ``core/gpr.py``. +Adapters move here in Plan 05 (GPR). +""" +from __future__ import annotations + +from typing import List, Protocol, runtime_checkable + +import numpy as np + +from gps_denied.schemas.gpr import DatabaseMatch, TileCandidate + + +@runtime_checkable +class GlobalPlaceRecognition(Protocol): + """Coarse localisation surface (mirrors IGlobalPlaceRecognition).""" + + def retrieve_candidate_tiles( + self, image: np.ndarray, top_k: int + ) -> List[TileCandidate]: ... + + def compute_location_descriptor(self, image: np.ndarray) -> np.ndarray: ... + + def query_database( + self, descriptor: np.ndarray, top_k: int + ) -> List[DatabaseMatch]: ... + + def rank_candidates( + self, candidates: List[TileCandidate] + ) -> List[TileCandidate]: ... + + def load_index(self, flight_id: str, index_path: str) -> bool: ... + + def retrieve_candidate_tiles_for_chunk( + self, chunk_images: List[np.ndarray], top_k: int + ) -> List[TileCandidate]: ... + + def compute_chunk_descriptor(self, chunk_images: List[np.ndarray]) -> np.ndarray: ... + + +# Backwards-compat alias. +IGlobalPlaceRecognition = GlobalPlaceRecognition diff --git a/src/gps_denied/components/mavlink_io/protocol.py b/src/gps_denied/components/mavlink_io/protocol.py new file mode 100644 index 0000000..79e0c1a --- /dev/null +++ b/src/gps_denied/components/mavlink_io/protocol.py @@ -0,0 +1,39 @@ +"""Protocol surface for the MAVLink I/O component (ARCH-05). + +Phase 1: mirrors the concrete ``MAVLinkBridge`` public surface from +``core/mavlink.py`` (no ABC today). Adapters move here in Plan 07 +(mavlink_io); private helpers ``_confidence_to_fix_type`` and +``_eskf_to_gps_input`` MUST stay re-exported from the old path. +""" +from __future__ import annotations + +from typing import Callable, Optional, Protocol, runtime_checkable + +from gps_denied.schemas import GPSPoint +from gps_denied.schemas.eskf import ESKFState, IMUMeasurement +from gps_denied.schemas.mavlink import GPSInputMessage, RelocalizationRequest + + +@runtime_checkable +class MAVLinkBridgeProtocol(Protocol): + """Public surface of the MAVLink GPS_INPUT/IMU/telemetry bridge.""" + + def set_imu_callback( + self, cb: Callable[[IMUMeasurement], None] + ) -> None: ... + + def set_reloc_callback( + self, cb: Callable[[RelocalizationRequest], None] + ) -> None: ... + + def update_state(self, state: ESKFState, altitude_m: float = 0.0) -> None: ... + + def notify_satellite_correction(self) -> None: ... + + def update_drift_estimate(self, drift_m: float) -> None: ... + + async def start(self, origin: GPSPoint) -> None: ... + + async def stop(self) -> None: ... + + def build_gps_input(self) -> Optional[GPSInputMessage]: ... diff --git a/src/gps_denied/components/satellite_matcher/protocol.py b/src/gps_denied/components/satellite_matcher/protocol.py new file mode 100644 index 0000000..2fda709 --- /dev/null +++ b/src/gps_denied/components/satellite_matcher/protocol.py @@ -0,0 +1,103 @@ +"""Protocol surfaces for the satellite_matcher component (ARCH-05). + +Two Protocols live here per PATTERNS.md §3: + + * ``SatelliteTileLoader`` — mirrors the concrete ``SatelliteDataManager`` + public surface from ``core/satellite.py`` (no existing ABC). + * ``MetricRefiner`` — mirrors ``IMetricRefinement`` from ``core/metric.py``. + +Adapters move here in Plan 06 (satellite_matcher). +""" +from __future__ import annotations + +from typing import List, Optional, Protocol, Tuple, runtime_checkable + +import numpy as np + +from gps_denied.schemas import GPSPoint +from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult +from gps_denied.schemas.satellite import TileBounds, TileCoords + + +@runtime_checkable +class SatelliteTileLoader(Protocol): + """Local satellite-tile reader / mosaic builder. + + Mirrors the public surface of ``SatelliteDataManager`` (no ABC today). + """ + + def load_local_tile(self, tile_coords: TileCoords) -> np.ndarray | None: ... + + def select_tiles_for_eskf_position( + self, gps: GPSPoint, sigma_h_m: float, zoom: int + ) -> list[TileCoords]: ... + + def assemble_mosaic( + self, + tile_list: list[tuple[TileCoords, np.ndarray]], + target_size: int = 512, + ) -> tuple[np.ndarray, TileBounds] | None: ... + + def fetch_tiles_for_position( + self, gps: GPSPoint, sigma_h_m: float, zoom: int + ) -> tuple[np.ndarray, TileBounds] | None: ... + + def get_cached_tile( + self, flight_id: str, tile_coords: TileCoords + ) -> np.ndarray | None: ... + + def cache_tile( + self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray + ) -> bool: ... + + def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords: ... + + def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds: ... + + def clear_flight_cache(self, flight_id: str) -> bool: ... + + def expand_search_grid( + self, center: TileCoords, current_size: int, new_size: int + ) -> list[TileCoords]: ... + + def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]: ... + + +@runtime_checkable +class MetricRefiner(Protocol): + """LiteSAM-style satellite alignment surface (mirrors IMetricRefinement).""" + + def align_to_satellite( + self, + uav_image: np.ndarray, + satellite_tile: np.ndarray, + tile_bounds: TileBounds, + ) -> Optional[AlignmentResult]: ... + + def compute_homography( + self, uav_image: np.ndarray, satellite_tile: np.ndarray + ) -> Optional[np.ndarray]: ... + + def extract_gps_from_alignment( + self, + homography: np.ndarray, + tile_bounds: TileBounds, + image_center: Tuple[int, int], + ) -> GPSPoint: ... + + def compute_match_confidence(self, alignment: AlignmentResult) -> float: ... + + def align_chunk_to_satellite( + self, + chunk_images: List[np.ndarray], + satellite_tile: np.ndarray, + tile_bounds: TileBounds, + ) -> Optional[ChunkAlignmentResult]: ... + + def match_chunk_homography( + self, chunk_images: List[np.ndarray], satellite_tile: np.ndarray + ) -> Optional[np.ndarray]: ... + + +# Backwards-compat alias for the only ABC that previously existed. +IMetricRefinement = MetricRefiner diff --git a/src/gps_denied/components/vio/protocol.py b/src/gps_denied/components/vio/protocol.py new file mode 100644 index 0000000..a4627ab --- /dev/null +++ b/src/gps_denied/components/vio/protocol.py @@ -0,0 +1,35 @@ +"""Protocol surface for the VIO component (ARCH-05). + +Phase 1: defines the Protocol that concrete adapters in this directory +implement. Method signatures mirror ``ISequentialVisualOdometry`` from +``core/vo.py``. Adapters are NOT moved here yet — see Plan 04 (VIO). +""" +from __future__ import annotations + +from typing import Protocol, runtime_checkable + +import numpy as np + +from gps_denied.schemas import CameraParameters +from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose + + +@runtime_checkable +class VisualOdometry(Protocol): + """Sequential visual odometry surface (mirrors ISequentialVisualOdometry).""" + + def compute_relative_pose( + self, prev_image: np.ndarray, curr_image: np.ndarray, camera_params: CameraParameters + ) -> RelativePose | None: ... + + def extract_features(self, image: np.ndarray) -> Features: ... + + def match_features(self, features1: Features, features2: Features) -> Matches: ... + + def estimate_motion( + self, matches: Matches, camera_params: CameraParameters + ) -> Motion | None: ... + + +# Backwards-compat alias — Phase 2 will deprecate the I-prefix. +ISequentialVisualOdometry = VisualOdometry