"""``AlikedLightGlueMatcher`` — C3 secondary CrossDomainMatcher (AZ-346). Mirrors :mod:`gps_denied_onboard.components.c3_matcher.disk_lightglue` modulo backbone choice: ALIKED replaces DISK for the per-frame keypoint+descriptor extraction step; LightGlue + RANSAC stages are unchanged. ALIKED is the documented fallback if a future D-C3-1 IT-12 verdict shifts away from DISK, or if DISK's licensing / upstream maintenance changes mid-cycle. Both backbones ship in airborne / research binaries (ADR-002 allows multiple matchers at link time; only one is selected at runtime by ``config.matcher.strategy``). Preprocessor parameters (input size, normalisation) are hard-coded per AZ-346 Constraint — weights-coupled, same rule as DISK. See ``disk_lightglue.py`` for the layering + composition rationale; this module follows the same pattern. """ from __future__ import annotations import logging from typing import TYPE_CHECKING, Final, Literal import cv2 import numpy as np from gps_denied_onboard._types.inference import ( BuildConfig, PrecisionMode, ) from gps_denied_onboard._types.matching import KeypointSet from gps_denied_onboard.components.c3_matcher._engine_output_assertion import ( assert_keypoint_engine_output_schema, ) from gps_denied_onboard.components.c3_matcher._pipeline import ( TileExtractError, run_lightglue_pipeline, ) from gps_denied_onboard.components.c3_matcher.inference_runtime_cut import ( InferenceRuntimeCut, ) if TYPE_CHECKING: from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.matcher import MatchResult, MatcherHealth from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard._types.rerank import RerankResult from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c3_matcher._health_window import ( RollingHealthWindow, ) from gps_denied_onboard.config.schema import Config from gps_denied_onboard.fdr_client import FdrClient from gps_denied_onboard.helpers.lightglue_runtime import LightGlueRuntime from gps_denied_onboard.helpers.ransac_filter import RansacFilter __all__ = ["MODEL_NAME", "AlikedLightGlueMatcher", "create"] MODEL_NAME: Final[str] = "aliked" _MATCHER_LABEL: Final[Literal["aliked_lightglue"]] = "aliked_lightglue" _FDR_PRODUCER_ID: Final[str] = "c3_matcher.aliked_lightglue" _LOG_KIND_READY: Final[str] = "c3.matcher.ready" # ALIKED input contract: NCHW float32 RGB at 480x480, ImageNet-style # normalisation. Hard-coded per Constraint — weights-coupled. _INPUT_SIZE: Final[int] = 480 _INPUT_KEY: Final[str] = "image" _OUTPUT_KEYPOINTS: Final[str] = "keypoints" _OUTPUT_DESCRIPTORS: Final[str] = "descriptors" # ImageNet mean/std for ALIKED. Same convention as UltraVPR's preprocessor. _IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) _IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) class AlikedLightGlueMatcher: """ALIKED + LightGlue cross-domain matcher. Stateless per-frame except for the rolling health window. See module docstring for the architectural picture and the ``cross_domain_matcher_protocol.md`` v1.0.0 contract for the public invariants this class satisfies. """ def __init__( self, config: "Config", *, lightglue_runtime: "LightGlueRuntime", ransac_filter: type["RansacFilter"], inference_runtime: InferenceRuntimeCut, health_window: "RollingHealthWindow", engine_handle: object, clock: "Clock", fdr_client: "FdrClient | None", logger: logging.Logger, ) -> None: block = config.components["c3_matcher"] self._config = config self._lightglue_runtime = lightglue_runtime self._ransac_filter = ransac_filter self._inference_runtime = inference_runtime self._engine_handle = engine_handle self._health_window = health_window self._clock = clock self._fdr_client = fdr_client self._logger = logger self._min_inliers_threshold: int = int(block.min_inliers_threshold) self._residual_warn_threshold_px: float = float( block.residual_warn_threshold_px ) self._ransac_threshold_px: float = float(block.ransac_threshold_px) def match( self, frame: "NavCameraFrame", rerank_result: "RerankResult", calibration: "CameraCalibration", ) -> "MatchResult": del calibration return run_lightglue_pipeline( frame=frame, rerank_result=rerank_result, matcher_label=_MATCHER_LABEL, extract_features=self._extract_features, lightglue_runtime=self._lightglue_runtime, ransac_filter=self._ransac_filter, health_window=self._health_window, clock=self._clock, fdr_client=self._fdr_client, fdr_producer_id=_FDR_PRODUCER_ID, min_inliers_threshold=self._min_inliers_threshold, ransac_threshold_px=self._ransac_threshold_px, residual_warn_threshold_px=self._residual_warn_threshold_px, logger=self._logger, ) def health_snapshot(self) -> "MatcherHealth": return self._health_window.snapshot() def _extract_features(self, image_bgr: np.ndarray) -> KeypointSet: tensor = _preprocess_aliked(image_bgr) outputs = self._inference_runtime.infer( self._engine_handle, {_INPUT_KEY: tensor} ) return _outputs_to_keypoint_set(outputs) def _preprocess_aliked(image_bgr: np.ndarray) -> np.ndarray: """BGR → RGB 480×480 ImageNet-normalised float32 NCHW. Hard-coded weights-coupled preprocessing. Accepts (H, W, 3) BGR or (H, W) grayscale (broadcast to 3 channels). """ if image_bgr.ndim == 3: rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) elif image_bgr.ndim == 2: rgb = cv2.cvtColor(image_bgr, cv2.COLOR_GRAY2RGB) else: raise TileExtractError( f"ALIKED preprocessor: image must be 2-D (gray) or 3-D (BGR); " f"got ndim={image_bgr.ndim} shape={image_bgr.shape}" ) if rgb.shape[0] != _INPUT_SIZE or rgb.shape[1] != _INPUT_SIZE: rgb = cv2.resize( rgb, (_INPUT_SIZE, _INPUT_SIZE), interpolation=cv2.INTER_AREA ) rgb_f = rgb.astype(np.float32) / 255.0 mean = np.asarray(_IMAGENET_MEAN, dtype=np.float32) std = np.asarray(_IMAGENET_STD, dtype=np.float32) normalised = (rgb_f - mean) / std tensor = np.transpose(normalised, (2, 0, 1))[None, :, :, :] return np.ascontiguousarray(tensor, dtype=np.float32) def _outputs_to_keypoint_set(outputs: dict[str, np.ndarray]) -> KeypointSet: if _OUTPUT_KEYPOINTS not in outputs or _OUTPUT_DESCRIPTORS not in outputs: raise TileExtractError( f"ALIKED forward returned unexpected keys: " f"{sorted(outputs.keys())!r}; expected {_OUTPUT_KEYPOINTS!r} + " f"{_OUTPUT_DESCRIPTORS!r}" ) keypoints = np.asarray(outputs[_OUTPUT_KEYPOINTS], dtype=np.float32) descriptors = np.asarray(outputs[_OUTPUT_DESCRIPTORS], dtype=np.float32) if keypoints.ndim != 2 or keypoints.shape[1] != 2: raise TileExtractError( f"ALIKED keypoints must have shape (N, 2); got {keypoints.shape}" ) if descriptors.ndim != 2 or descriptors.shape[0] != keypoints.shape[0]: raise TileExtractError( f"ALIKED descriptors shape {descriptors.shape} inconsistent with " f"keypoints {keypoints.shape}" ) return KeypointSet(keypoints=keypoints, descriptors=descriptors) def _build_aliked_build_config() -> BuildConfig: return BuildConfig( precision=PrecisionMode.FP16, workspace_mb=512, calibration_dataset=None, optimization_profiles=(), ) def create( config: "Config", *, lightglue_runtime: "LightGlueRuntime", ransac_filter: type["RansacFilter"], inference_runtime: InferenceRuntimeCut, health_window: "RollingHealthWindow", clock: "Clock | None" = None, fdr_client: "FdrClient | None" = None, logger: logging.Logger | None = None, ) -> "AlikedLightGlueMatcher": """Module-level factory consumed by :func:`build_matcher_strategy`.""" block = config.components["c3_matcher"] weights_path = block.aliked_weights_path if weights_path is None: raise ValueError( "AlikedLightGlueMatcher.create: config.components['c3_matcher']" ".aliked_weights_path is None; the runtime root MUST populate " "the ALIKED engine path before constructing this strategy." ) if clock is None: from gps_denied_onboard.clock.wall_clock import WallClock clock = WallClock() if logger is None: logger = logging.getLogger("gps_denied_onboard.c3_matcher.aliked_lightglue") cache_entry = inference_runtime.compile_engine( weights_path, _build_aliked_build_config() ) entry_for_deserialize = type(cache_entry)( engine_path=cache_entry.engine_path, sha256_hex=cache_entry.sha256_hex, sm=cache_entry.sm, jp=cache_entry.jp, trt=cache_entry.trt, precision=cache_entry.precision, extras={**cache_entry.extras, "model_name": MODEL_NAME}, ) engine_handle = inference_runtime.deserialize_engine(entry_for_deserialize) # AZ-346 AC-special-1: validate the engine's output schema once # at startup so a misconfigured ALIKED engine surfaces as a # composition-time ConfigError instead of a mid-flight # InsufficientInliersError cascade. assert_keypoint_engine_output_schema( inference_runtime, engine_handle, input_size=_INPUT_SIZE, input_key=_INPUT_KEY, keypoints_key=_OUTPUT_KEYPOINTS, descriptors_key=_OUTPUT_DESCRIPTORS, ) logger.info( _LOG_KIND_READY, extra={ "kind": _LOG_KIND_READY, "kv": { "strategy": _MATCHER_LABEL, "min_inliers_threshold": int(block.min_inliers_threshold), "residual_warn_threshold_px": float( block.residual_warn_threshold_px ), "ransac_threshold_px": float(block.ransac_threshold_px), }, }, ) return AlikedLightGlueMatcher( config, lightglue_runtime=lightglue_runtime, ransac_filter=ransac_filter, inference_runtime=inference_runtime, health_window=health_window, engine_handle=engine_handle, clock=clock, fdr_client=fdr_client, logger=logger, )