"""`LightGlueRuntime` — shared LightGlue matcher (AZ-278 / E-CC-HELPERS / R14 fix). Implements the `lightglue_runtime` contract v1.0.0 at `_docs/02_document/contracts/shared_helpers/lightglue_runtime.md`. Layer 1 helper — NO `gps_denied_onboard.components.*` imports. The engine handle is an opaque Protocol defined in `_types/manifests.py`; C7's `InferenceRuntime.deserialize_engine` produces the concrete handle and the composition root injects ONE shared instance into both C2.5 (InlierBasedReranker) and C3 (CrossDomainMatcher) — the structural fix for R14. Single-threaded by contract. The concurrent-access guard is non-blocking: concurrent entry RAISES `LightGlueConcurrentAccessError` rather than serialising, so a composition-root regression that wires the runtime into multiple threads is caught immediately instead of silently corrupting CUDA state. """ from __future__ import annotations import threading from gps_denied_onboard._types.manifests import EngineHandle from gps_denied_onboard._types.matching import CorrespondenceSet, KeypointSet __all__ = [ "LightGlueConcurrentAccessError", "LightGlueRuntime", "LightGlueRuntimeError", ] class LightGlueRuntimeError(RuntimeError): """Raised on construction guards or descriptor-dim mismatch.""" class LightGlueConcurrentAccessError(RuntimeError): """Raised when a concurrent ``match`` / ``match_batch`` entry is detected. The serial-access invariant is a composition-root contract — if you see this exception, the runtime was wired into more than one thread by mistake. Fix the composition root, do NOT add a lock here. """ def _validate_keypoint_set(features: KeypointSet, *, name: str, expected_dim: int) -> None: if features.descriptors.ndim != 2 or features.descriptors.shape[1] != expected_dim: actual_dim = ( features.descriptors.shape[1] if features.descriptors.ndim == 2 else "" ) raise LightGlueRuntimeError( f"{name}: descriptor dim mismatch — engine expects {expected_dim}, " f"got {actual_dim} (descriptors.shape={features.descriptors.shape})" ) class LightGlueRuntime: """Shared LightGlue inference runtime. Single-thread by contract; concurrent entry raises. """ def __init__(self, engine_handle: EngineHandle) -> None: if engine_handle is None: raise LightGlueRuntimeError( "LightGlueRuntime requires a non-None engine_handle (got None); " "composition root must inject the engine produced by C7's " "InferenceRuntime.deserialize_engine" ) try: descriptor_dim = int(engine_handle.descriptor_dim) except AttributeError as exc: raise LightGlueRuntimeError( f"engine_handle missing required Protocol attribute 'descriptor_dim': {exc}" ) from exc if descriptor_dim < 1: raise LightGlueRuntimeError( f"engine_handle.descriptor_dim must be >= 1; got {descriptor_dim}" ) self._engine = engine_handle self._descriptor_dim = descriptor_dim # Non-blocking guard: ``try_acquire`` raises on contention rather # than serialising callers, per the contract's "concurrent calls # are a bug" stance. self._in_use = threading.Lock() def descriptor_dim(self) -> int: return self._descriptor_dim def match(self, features_a: KeypointSet, features_b: KeypointSet) -> CorrespondenceSet: """Match a single pair (C2.5 path).""" if not self._in_use.acquire(blocking=False): raise LightGlueConcurrentAccessError( "LightGlueRuntime.match called from a second thread while another " "match is in flight — the runtime owns ONE CUDA stream and must be " "bound to a single hot-path thread by the composition root" ) try: _validate_keypoint_set(features_a, name="features_a", expected_dim=self._descriptor_dim) _validate_keypoint_set(features_b, name="features_b", expected_dim=self._descriptor_dim) return self._engine.forward(features_a, features_b) finally: self._in_use.release() def match_batch( self, features_a_list: list[KeypointSet], features_b_list: list[KeypointSet], ) -> list[CorrespondenceSet]: """Batch-match (C3 path) — iterates serially over the single CUDA stream.""" if len(features_a_list) != len(features_b_list): raise LightGlueRuntimeError( f"match_batch: features_a_list (len={len(features_a_list)}) and " f"features_b_list (len={len(features_b_list)}) must have equal length" ) if not self._in_use.acquire(blocking=False): raise LightGlueConcurrentAccessError( "LightGlueRuntime.match_batch called concurrently with another match" ) try: results: list[CorrespondenceSet] = [] for idx, (fa, fb) in enumerate(zip(features_a_list, features_b_list, strict=True)): _validate_keypoint_set( fa, name=f"features_a_list[{idx}]", expected_dim=self._descriptor_dim ) _validate_keypoint_set( fb, name=f"features_b_list[{idx}]", expected_dim=self._descriptor_dim ) results.append(self._engine.forward(fa, fb)) return results finally: self._in_use.release()