"""``MegaLocStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-339). MegaLoc is one of two secondary backbones (alongside :class:`MixVprStrategy`) shipped exclusively in the research binary for the IT-12 comparative-study matrix (``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002, ``BUILD_VPR_MEGALOC`` is ON for the research binary and replay-cli, OFF for the airborne and operator-tooling binaries — selecting ``mega_loc`` on a binary without the flag fails fast at composition-root time via :class:`StrategyNotAvailableError` (not at first frame). The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507). Engine output key is ``"embedding"`` and the strategy applies single-stage global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval delegates to :class:`FaissBridge` (AZ-341). Architecture-registry differences from :class:`NetVladStrategy`: MegaLoc consumes a pre-compiled ``.trt`` engine produced by C10's engine compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``. :func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture` no-ops for this strategy. Engine load happens in :func:`create` (NOT at first frame) so the engine-output-shape assertion (AC-6) surfaces at startup, not after takeoff. Per-frame :meth:`embed_query` pipeline: 1. ``preprocessor.preprocess(frame, calibration)`` -> ``(1, 3, 322, 322)`` FP16 NCHW ndarray. 2. ``inference_runtime.infer(handle, {"input": tensor})`` -> ``{"embedding": (1, 2048) FP16 ndarray}``. 3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage). 4. Return :class:`VprQuery` with ``frame_id``, normalised embedding, produced_at monotonic ns. Error envelope: every method raises only members of :class:`VprError`. ``RuntimeError`` from the backbone forward -> rewrapped to :class:`VprBackboneError`; :class:`VprPreprocessError` from the preprocessor propagates unchanged. Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; see AZ-341 AC-10. """ from __future__ import annotations import logging from typing import TYPE_CHECKING, Final, Literal import numpy as np from gps_denied_onboard._types.inference import ( BuildConfig, EngineHandle, PrecisionMode, ) from gps_denied_onboard._types.vpr import VprQuery, VprResult from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c2_vpr._engine_dim_assertion import ( assert_engine_output_dim, ) from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge from gps_denied_onboard.components.c2_vpr._preprocessor_mega_loc import ( MegaLocBackbonePreprocessor, ) from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( DescriptorIndexCut, ) from gps_denied_onboard.components.c2_vpr.errors import ( VprBackboneError, VprPreprocessError, ) from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( InferenceRuntimeCut, ) from gps_denied_onboard.config.schema import ConfigError from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient from gps_denied_onboard.fdr_client.records import ( CURRENT_SCHEMA_VERSION, FdrRecord, ) from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser from gps_denied_onboard.helpers.iso_timestamps import ( iso_ts_from_clock as _iso_ts_from_clock, ) if TYPE_CHECKING: from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard.config.schema import Config __all__ = ["DESCRIPTOR_DIM", "MegaLocStrategy", "create"] # MegaLoc's published embedding dimension (D=2048) per the upstream # research code drop. Engine output shape is asserted at create() time # against this constant — changing it would silently break AC-2 / # AC-4 / AC-5 / AC-6. DESCRIPTOR_DIM: Final[int] = 2048 _BACKBONE_LABEL: Final[Literal["mega_loc"]] = "mega_loc" _COMPONENT: Final[str] = "c2_vpr" _OUTPUT_KEY: Final[str] = "embedding" _ENGINE_INPUT_KEY: Final[str] = "input" _ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset( {"tensorrt", "onnx_trt_ep"} ) _LOG_KIND_READY: Final[str] = "c2.vpr.ready" _LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" _LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" _LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" _FDR_KIND_EMBED: Final[str] = "vpr.embed_query" _FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" _FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" class MegaLocStrategy: """C2 secondary VprStrategy backed by a TRT MegaLoc engine. See module docstring for the engine-loading + per-frame pipeline. Stateless across frames (INV-2); single-threaded per instance (INV-1, per AZ-336). """ def __init__( self, *, inference_runtime: InferenceRuntimeCut, engine_handle: EngineHandle, descriptor_index: DescriptorIndexCut, preprocessor: MegaLocBackbonePreprocessor, normaliser: DescriptorNormaliser, faiss_bridge: FaissBridge, fdr_client: FdrClient, clock: Clock, logger: logging.Logger, descriptor_dim: int = DESCRIPTOR_DIM, ) -> None: if descriptor_dim < 1: raise ValueError( f"MegaLocStrategy.descriptor_dim must be >= 1; " f"got {descriptor_dim}" ) self._inference_runtime = inference_runtime self._engine_handle = engine_handle self._descriptor_index = descriptor_index self._preprocessor = preprocessor self._normaliser = normaliser self._faiss_bridge = faiss_bridge self._fdr_client = fdr_client self._clock = clock self._logger = logger self._descriptor_dim = descriptor_dim def embed_query( self, frame: NavCameraFrame, calibration: CameraCalibration, ) -> VprQuery: try: tensor = self._preprocessor.preprocess(frame, calibration) except VprPreprocessError as exc: self._emit_preprocess_error(frame, exc) raise ns_start = self._clock.monotonic_ns() try: outputs = self._inference_runtime.infer( self._engine_handle, {_ENGINE_INPUT_KEY: tensor} ) except Exception as exc: wrapped = self._wrap_backbone_error(frame, exc) raise wrapped from exc ns_end = self._clock.monotonic_ns() latency_us = max(1, (ns_end - ns_start) // 1_000) if _OUTPUT_KEY not in outputs: err = VprBackboneError( f"MegaLoc forward returned no {_OUTPUT_KEY!r} key; " f"got {sorted(outputs.keys())!r}" ) self._emit_backbone_error(frame, err) raise err raw = np.asarray(outputs[_OUTPUT_KEY]) if ( raw.ndim != 2 or raw.shape[0] != 1 or raw.shape[1] != self._descriptor_dim ): err = VprBackboneError( f"MegaLoc forward returned shape {raw.shape}; " f"expected (1, {self._descriptor_dim})" ) self._emit_backbone_error(frame, err) raise err flat = np.ascontiguousarray(raw[0], dtype=np.float16) normalised = self._normaliser.l2_normalise(flat) self._emit_embed_record( frame_id=int(frame.frame_id), latency_us=int(latency_us) ) return VprQuery( frame_id=int(frame.frame_id), embedding=normalised, produced_at=ns_end, ) def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: return self._faiss_bridge.retrieve( query, k, backbone_label=_BACKBONE_LABEL ) def descriptor_dim(self) -> int: return self._descriptor_dim def _wrap_backbone_error( self, frame: NavCameraFrame, exc: BaseException ) -> VprBackboneError: wrapped = VprBackboneError( f"MegaLoc forward raised {type(exc).__name__}: {exc}" ) self._emit_backbone_error(frame, wrapped) return wrapped def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: record = FdrRecord( schema_version=CURRENT_SCHEMA_VERSION, ts=_iso_ts_from_clock(self._clock), producer_id=self._fdr_client.producer_id, kind=_FDR_KIND_EMBED, payload={ "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, "descriptor_dim": self._descriptor_dim, "latency_us": latency_us, }, ) result = self._fdr_client.enqueue(record) if result == EnqueueResult.OVERRUN: self._logger.warning( "FDR enqueue dropped vpr.embed_query record (buffer overrun)", extra={ "component": _COMPONENT, "kind": _LOG_KIND_FDR_OVERRUN, "kv": { "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, }, }, ) def _emit_backbone_error( self, frame: NavCameraFrame, error: BaseException ) -> None: frame_id = int(frame.frame_id) msg = f"MegaLoc backbone error: {error}" self._logger.error( msg, extra={ "component": _COMPONENT, "kind": _LOG_KIND_BACKBONE_ERROR, "kv": { "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, "error_type": type(error).__name__, }, }, ) self._fdr_client.enqueue( FdrRecord( schema_version=CURRENT_SCHEMA_VERSION, ts=_iso_ts_from_clock(self._clock), producer_id=self._fdr_client.producer_id, kind=_FDR_KIND_BACKBONE_ERROR, payload={ "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, "error_type": type(error).__name__, "error_message": str(error)[:512], }, ) ) def _emit_preprocess_error( self, frame: NavCameraFrame, error: BaseException ) -> None: frame_id = int(frame.frame_id) msg = f"MegaLoc preprocess error: {error}" self._logger.error( msg, extra={ "component": _COMPONENT, "kind": _LOG_KIND_PREPROCESS_ERROR, "kv": { "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, "error_type": type(error).__name__, }, }, ) self._fdr_client.enqueue( FdrRecord( schema_version=CURRENT_SCHEMA_VERSION, ts=_iso_ts_from_clock(self._clock), producer_id=self._fdr_client.producer_id, kind=_FDR_KIND_PREPROCESS_ERROR, payload={ "frame_id": frame_id, "backbone_label": _BACKBONE_LABEL, "error_type": type(error).__name__, "error_message": str(error)[:512], }, ) ) def _build_trt_build_config() -> BuildConfig: return BuildConfig( precision=PrecisionMode.FP16, workspace_mb=0, calibration_dataset=None, optimization_profiles=(), ) def create( config: Config, *, descriptor_index: DescriptorIndexCut, inference_runtime: InferenceRuntimeCut, fdr_client: FdrClient | None = None, clock: Clock | None = None, logger: logging.Logger | None = None, ) -> MegaLocStrategy: """Module-level factory consumed by :func:`build_vpr_strategy`. MegaLoc is unselectable when the C7 TRT / ONNX-RT runtimes are excluded — ``current_runtime_label()`` MUST be one of ``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected with :class:`ConfigError` at composition time. Engine output shape is asserted at create time via a single dry-run inference on a zero-init input; mismatch raises :class:`ConfigError` BEFORE the strategy is bound (AC-6). Optional keyword-only injection points (``fdr_client`` / ``clock`` / ``logger``) keep tests deterministic; production wiring fills them from the composition root. """ runtime_label = inference_runtime.current_runtime_label() if runtime_label not in _ALLOWED_RUNTIME_LABELS: raise ConfigError( f"MegaLoc requires BUILD_TENSORRT_RUNTIME=ON (or " f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary " f"has runtime_label={runtime_label!r}." ) block = config.components["c2_vpr"] weights_path = block.backbone_weights_path if fdr_client is None: raise ValueError( "MegaLocStrategy.create: fdr_client is required; the " "composition root must inject the running FDR client." ) if clock is None: from gps_denied_onboard.clock.wall_clock import WallClock clock = WallClock() if logger is None: logger = logging.getLogger("gps_denied_onboard.c2_vpr.mega_loc") entry = inference_runtime.compile_engine( weights_path, _build_trt_build_config() ) handle = inference_runtime.deserialize_engine(entry) preprocessor = MegaLocBackbonePreprocessor(logger=logger) normaliser = DescriptorNormaliser() faiss_bridge = FaissBridge( descriptor_index=descriptor_index, descriptor_dim=DESCRIPTOR_DIM, warn_top1_threshold=block.warn_top1_threshold, debug_log_per_frame_distances=block.debug_per_frame_distances, fdr_client=fdr_client, logger=logger, clock=clock, ) assert_engine_output_dim( inference_runtime, handle, preprocessor, DESCRIPTOR_DIM, output_key=_OUTPUT_KEY, input_key=_ENGINE_INPUT_KEY, ) logger.info( "C2 VPR strategy ready", extra={ "component": _COMPONENT, "kind": _LOG_KIND_READY, "kv": { "strategy": _BACKBONE_LABEL, "descriptor_dim": DESCRIPTOR_DIM, }, }, ) return MegaLocStrategy( inference_runtime=inference_runtime, engine_handle=handle, descriptor_index=descriptor_index, preprocessor=preprocessor, normaliser=normaliser, faiss_bridge=faiss_bridge, fdr_client=fdr_client, clock=clock, logger=logger, descriptor_dim=DESCRIPTOR_DIM, )