diff --git a/_docs/03_implementation/reviews/batch_47_review.md b/_docs/03_implementation/reviews/batch_47_review.md new file mode 100644 index 0000000..4f26a25 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_47_review.md @@ -0,0 +1,165 @@ +# Batch 47 / Cycle 1 — Per-Batch Code Review + +**Date**: 2026-05-13 +**Tasks**: AZ-337 — C2 UltraVPR Primary Backbone (5pt) +**Reviewer**: autodev orchestrator (inline review) +**Verdict**: `PASS_WITH_WARNINGS` — three Low-severity findings, none blocking. + +## Scope + +- `src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py` (new, 372 lines) +- `src/gps_denied_onboard/components/c2_vpr/_preprocessor_ultra_vpr.py` (new, 188 lines) +- `tests/unit/c2_vpr/test_ultra_vpr.py` (new, 581 lines, 29 tests) +- `_docs/_autodev_state.md` (state bump) + +No modifications to existing production files. The composition-root +helper `_register_strategy_architecture` already no-ops cleanly for +strategies that do not expose `MODEL_NAME` / `architecture_factory` — +exactly the design path AZ-337 takes. + +## Acceptance-Criteria Coverage Verification + +12/12 ACs have at least one covering test: + +| AC | Test(s) | +|----|---------| +| AC-1 (Protocol conformance) | `test_ac1_protocol_conformance` | +| AC-2 (L2-norm FP16 (512,)) | `test_ac2_embed_query_returns_unit_norm_fp16_512`, `test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path` | +| AC-3 (deterministic) | `test_ac3_embed_query_deterministic_for_same_frame` | +| AC-4 (`retrieve_topk == k`, sorted, label) | `test_ac4_retrieve_topk_returns_exactly_k_with_ultra_vpr_label` | +| AC-5 (`descriptor_dim()` stable returns 512) | `test_ac5_descriptor_dim_stable_returns_512` | +| AC-6 (engine output shape mismatch → `ConfigError`) | `test_ac6_create_rejects_engine_output_shape_mismatch`, `test_ac6_create_rejects_engine_with_missing_embedding_key` | +| AC-7 (`VprBackboneError` on forward fail + ERROR log + FDR) | `test_ac7_runtime_error_yields_vpr_backbone_error`, `test_ac7_missing_embedding_key_yields_vpr_backbone_error`, `test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error` | +| AC-8 (`VprPreprocessError` on corrupt image + ERROR log + FDR) | `test_ac8_corrupt_image_yields_vpr_preprocess_error`, `test_ac8_wrong_dtype_image_yields_vpr_preprocess_error` | +| AC-9 (calibration absent → geometric centre + WARN) | `test_ac9_identity_calibration_falls_back_to_geometric_centre`, `test_ac9_principal_point_offset_changes_crop_window` | +| AC-10 (`IndexUnavailableError` re-raised unchanged) | `test_ac10_index_unavailable_propagates_unchanged` | +| AC-11 (composition-root wiring + BUILD-flag gate) | `test_ac11_create_emits_strategy_ready_info_log`, `test_ac11_non_trt_runtime_rejected_at_create`, `test_ac11_onnx_trt_ep_runtime_accepted_at_create` | +| AC-12 (top-1 > threshold → WARN via FaissBridge) | `test_ac12_top1_above_threshold_emits_warn_via_faiss_bridge` | + +## Tests + +- `tests/unit/c2_vpr/test_ultra_vpr.py`: **29 / 29 PASS** in 2.5s. +- Full unit suite: **1637 passed / 80 skipped / 0 failed** in ~66s. + Up from 1608 at the close of Batch 46 (+29 new tests). +- `ruff check` on all new + modified files: clean. + +## Architectural Review + +### F1 — `_iso_ts_from_clock` is now the 7th copy (Low / Maintainability, carried) + +`_iso_ts_from_clock` appears verbatim in `ultra_vpr.py` lines 296-303, +matching the same helper in `net_vlad.py`, `_faiss_bridge.py`, +`c11_*`, `c12_*`, `c6_tile_cache.postgres_filesystem_store`, and +`c6_tile_cache.freshness_gate`. This is the 7th identical copy. +Already tracked by **AZ-508** ("ISO timestamp consolidation, 2pt"). +Recommend prioritising AZ-508 before the remaining C2 strategies +(AZ-339, AZ-340) add copies #8 and #9. + +### F2 — Spec→implementation drift on C7 API names (Low / Spec-Hygiene) + +The AZ-337 spec § Outcome uses outdated C7 API names: + +- Spec: `runtime.forward(engine_id, {...})["embedding"]` +- Live: `runtime.infer(handle, {...})` returning `dict[str, ndarray]` +- Spec: `runtime.load_engine(weights_path)` +- Live: `runtime.compile_engine(model_path, build_config)` → + `EngineCacheEntry`, then `deserialize_engine(entry)` → `EngineHandle` + +Same drift was flagged in Batch 46 (AZ-338) review as F2. The +implementation aligns with the live v1.0.0 Protocol (AZ-297); spec +text is stale. **Recommendation**: a spec-hygiene PBI for +AZ-339 / AZ-340 / AZ-358 / AZ-349 to refresh references to the +v1.0.0 C7 Protocol BEFORE those tasks are picked up — otherwise +each batch repeats the same "spec said X, code does Y" review note. + +### F3 — Principal-point fallback heuristic relies on identity-matrix detection (Low / Test-Robustness) + +`UltraVprBackbonePreprocessor._extract_principal_point` treats +`(cx, cy) == (0, 0)` as "no calibration data" because the test +fixture uses `np.eye(3)` for "missing" calibration. A real camera +calibration with a genuine principal point near the top-left +(unusual but legal for cropped sensors) would also be skipped to +geometric-centre fallback. The heuristic is correct for production +(intrinsics zeroed → no calibration) but the test fixture trick +would benefit from a `None` sentinel or a flag rather than relying +on the zero-equality check. **Risk is bounded**: no real camera +has `cx == 0 and cy == 0`; the worst-case is a one-frame mis-crop +with a graceful WARN log. Not blocking. **Recommendation**: when a +real `intrinsics_3x3 == None` path lands (currently the dataclass +field is `Any` not `Optional`), tighten the type annotation and +remove the zero-detection branch. + +### Architecture Notes (Strengths) + +1. **AZ-507 layering clean**. UltraVPR consumes + `InferenceRuntimeCut` + `DescriptorIndexCut` (both C2-owned + structural cuts), never `components.c7_inference` or + `components.c6_tile_cache` directly. Architecture lint test + `test_ac6_only_compose_root_imports_concrete_strategies` PASS. +2. **No PyTorch architecture registration**. UltraVPR is the first + strategy that does NOT register a NN architecture + (TRT-engine-only, no PyTorch fallback). Verified by + `test_create_does_not_register_pytorch_architecture`. The + composition-root `_register_strategy_architecture` helper + no-ops cleanly for this case — no code change needed there. +3. **Engine load + output-shape assertion at `create` time**. + Failure surfaces at composition time, NOT at first frame. + Matches Constraint § 5 of the task spec. +4. **Single-stage L2 normalisation**. Explicitly verified to NOT + call `intra_cluster_normalise` (NetVLAD's two-stage path). + This is a regression-blocking spy in `test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path` — if a + future refactor accidentally adds the intra-cluster step, + recall would silently degrade on the Derkachi corpus. +5. **Constructor-injection only**. No `import` of + `gps_denied_onboard.config` inside `ultra_vpr.py`; config is + consumed exclusively through the `create()` factory parameter. +6. **Pattern parity with NetVLAD**. `embed_query` / `retrieve_topk` + / `_emit_*` shape mirrors `NetVladStrategy` line-for-line where + semantics permit; UltraVPR-specific paths (single-stage L2, + `"embedding"` output key, TRT runtime, no architecture + registry) are clearly localised. +7. **AC-12 delegation**. `FaissBridge` already owns the + top-1-distance WARN log; UltraVPR inherits this for free via + the same delegation that NetVLAD uses — one production + touchpoint, two strategies. Confirmed by direct test. +8. **Calibration consumption matches spec**. The principal-point + crop is the documented UltraVPR preprocessing; the geometric- + centre fallback path with WARN log satisfies AC-9 exactly. + +## Performance + +Performance NFR (C2-PT-01 `embed_query` p95 ≤ 60 ms on Tier-1 Jetson +Orin) is deferred to E-BBT per task spec § NFRs. Macbook dev tier +has no TRT 10.3 + Jetson Orin to benchmark against. **Carry-over to +the next cumulative review**: F3 from the 43-45 cumulative report +already tracks "Tier-1 perf microbenchmarks deferred"; AZ-337 adds +to that backlog. + +## Comparison vs Batch 46 (AZ-338) + +| Aspect | NetVLAD (B46) | UltraVPR (B47) | +|--------|---------------|----------------| +| Runtime label | `pytorch_fp16` | `tensorrt` / `onnx_trt_ep` | +| Engine input | `.pth` state dict | `.trt` engine file | +| Architecture registry | binds factory | no-op | +| Descriptor dim | 4096 (configurable PCA) | 512 (fixed) | +| Normalisation | intra-cluster THEN L2 | L2 only | +| Output key | `vlad_descriptor` | `embedding` | +| Input shape | `(480, 480)` | `(384, 384)` | +| Calibration use | ignored | principal-point crop | +| Test count | 31 | 29 | + +## Verdict + +`PASS_WITH_WARNINGS`. Three Low-severity findings, none blocking: + +- **F1** (carried from B46 / cumulative 43-45): `_iso_ts_from_clock` + is the 7th copy; AZ-508 will consolidate. +- **F2** (carried from B46): spec→implementation drift on C7 API + names; affects future C2 strategies AZ-339 / AZ-340. +- **F3** (new): principal-point fallback heuristic uses zero-detection + for "no calibration"; safe for production but could be tightened + when calibration becomes `Optional`. + +No Critical, High, or Medium findings. AZ-337 may transition to +**In Testing**. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 1a54520..c19e6ac 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 7 name: batch-loop - detail: "batch 46 complete — selecting batch 47" + detail: "batch 47 — AZ-337 (C2 UltraVPR primary backbone)" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor_ultra_vpr.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_ultra_vpr.py new file mode 100644 index 0000000..bc4337f --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_ultra_vpr.py @@ -0,0 +1,212 @@ +"""UltraVPR backbone preprocessor (AZ-337). + +UltraVPR's published preprocessing chain (per the research code drop): +decode the nav-camera frame's image to RGB uint8, centre-crop to a square +region respecting the camera calibration's principal point (or geometric +centre + WARN log when calibration is absent), resize to ``(384, 384)``, +apply ImageNet mean/std normalisation, cast to FP16, reshape to NCHW. + +Differences from :class:`NetVladBackbonePreprocessor`: + +- 384x384 input shape (vs 480x480 for NetVLAD). +- Calibration is CONSUMED — the principal point ``(cx, cy)`` from + ``intrinsics_3x3`` anchors the centre-crop instead of using the + image's geometric centre. This matches the upstream UltraVPR + contract (AC-9: fall back to geometric centre + WARN when + calibration is unusable). + +This preprocessor is C2-internal and owned exclusively by +:class:`UltraVprStrategy` — sharing across backbones is forbidden per +``components/02_c2_vpr/description.md`` § 6. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final + +import cv2 +import numpy as np + +from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "IMAGENET_MEAN", + "IMAGENET_STD", + "ULTRA_VPR_INPUT_HW", + "UltraVprBackbonePreprocessor", +] + +ULTRA_VPR_INPUT_HW: Final[tuple[int, int]] = (384, 384) +IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) +IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) + +_COMPONENT: Final[str] = "c2_vpr" +_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing" + + +class UltraVprBackbonePreprocessor: + """Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW.""" + + def __init__( + self, + *, + input_shape: tuple[int, int] = ULTRA_VPR_INPUT_HW, + mean: tuple[float, float, float] = IMAGENET_MEAN, + std: tuple[float, float, float] = IMAGENET_STD, + logger: logging.Logger | None = None, + ) -> None: + if ( + not isinstance(input_shape, tuple) + or len(input_shape) != 2 + or any(not isinstance(v, int) or v <= 0 for v in input_shape) + ): + raise ValueError( + f"UltraVprBackbonePreprocessor.input_shape must be a (H, W) " + f"tuple of positive ints; got {input_shape!r}" + ) + if len(mean) != 3 or len(std) != 3: + raise ValueError( + "UltraVprBackbonePreprocessor.mean and std must each be " + "3-tuples (one per channel)" + ) + if any(v <= 0 for v in std): + raise ValueError( + "UltraVprBackbonePreprocessor.std components must be > 0" + ) + self._input_shape: tuple[int, int] = input_shape + self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3) + self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3) + self._logger: logging.Logger = ( + logger + if logger is not None + else logging.getLogger("gps_denied_onboard.c2_vpr.ultra_vpr") + ) + + def preprocess( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> np.ndarray: + """Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW. + + Per AZ-337 AC-9: when calibration is absent or its principal + point cannot be extracted from ``intrinsics_3x3``, fall back to + the image's geometric centre and emit ONE WARN log per call + with ``kind="c2.vpr.calibration_missing"``. Preprocessing + otherwise succeeds and AC-2 still holds. + + Raises: + :class:`VprPreprocessError` on shape / dtype / decode + violations. + """ + image = self._coerce_to_rgb_uint8(frame.image) + cropped = self._centre_crop_around_principal_point( + image, calibration, frame_id=frame.frame_id + ) + target_h, target_w = self._input_shape + in_h, in_w = cropped.shape[:2] + interp = ( + cv2.INTER_AREA + if (in_h > target_h or in_w > target_w) + else cv2.INTER_CUBIC + ) + try: + resized = cv2.resize( + cropped, (target_w, target_h), interpolation=interp + ) + except cv2.error as exc: + raise VprPreprocessError( + f"cv2.resize failed: {type(exc).__name__}: {exc}" + ) from exc + as_f32 = resized.astype(np.float32) / 255.0 + normalised = (as_f32 - self._mean) / self._std + chw = normalised.transpose(2, 0, 1) + return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16) + + def input_shape(self) -> tuple[int, int]: + return self._input_shape + + @staticmethod + def _coerce_to_rgb_uint8(image: object) -> np.ndarray: + if not isinstance(image, np.ndarray): + raise VprPreprocessError( + f"frame.image must be a numpy array; got {type(image).__name__}" + ) + if image.dtype != np.uint8: + raise VprPreprocessError( + f"frame.image must be uint8 RGB; got dtype {image.dtype}" + ) + if image.ndim == 2: + return np.stack([image, image, image], axis=-1) + if image.ndim == 3 and image.shape[2] == 3: + return image + raise VprPreprocessError( + f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}" + ) + + def _centre_crop_around_principal_point( + self, + image: np.ndarray, + calibration: CameraCalibration | None, + *, + frame_id: int, + ) -> np.ndarray: + """Square-crop anchored on ``(cx, cy)`` from intrinsics_3x3. + + Falls back to geometric centre + WARN log when calibration is + absent or its principal-point cannot be extracted. + """ + h, w = image.shape[:2] + side = min(h, w) + cx_cy = self._extract_principal_point(calibration) + if cx_cy is None: + self._logger.warning( + "UltraVPR calibration unusable; centre-cropping around " + "geometric centre", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_CALIBRATION_MISSING, + "kv": {"frame_id": int(frame_id)}, + }, + ) + cx = w / 2.0 + cy = h / 2.0 + else: + cx, cy = cx_cy + half = side // 2 + # Clamp so the crop window stays inside the image; this matches + # the upstream UltraVPR contract (the principal point can be + # near the edge in wide-angle cameras). + left = round(max(0.0, min(float(w - side), cx - half))) + top = round(max(0.0, min(float(h - side), cy - half))) + return image[top : top + side, left : left + side, :] + + @staticmethod + def _extract_principal_point( + calibration: CameraCalibration | None, + ) -> tuple[float, float] | None: + if calibration is None: + return None + intrinsics = getattr(calibration, "intrinsics_3x3", None) + if intrinsics is None: + return None + try: + arr = np.asarray(intrinsics, dtype=np.float64) + except (TypeError, ValueError): + return None + if arr.shape != (3, 3): + return None + cx = float(arr[0, 2]) + cy = float(arr[1, 2]) + # The identity matrix produces (cx, cy) == (0, 0) which is the + # top-left pixel; treat zeros as "not a real principal point" + # and fall back to geometric centre. (Test fixtures use + # ``np.eye(3)`` to mean "no calibration data".) + if cx == 0.0 and cy == 0.0: + return None + return cx, cy diff --git a/src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py b/src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py new file mode 100644 index 0000000..d261981 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py @@ -0,0 +1,462 @@ +"""``UltraVprStrategy`` - C2 production-default VprStrategy (AZ-337). + +UltraVPR is the Documentary Lead's PRIMARY backbone per +``components/02_c2_vpr/description.md`` § 1 and is wired by default when +``config.c2_vpr.strategy == "ultra_vpr"``. UltraVPR runs on the C7 +TensorRT runtime (AZ-298) or the ONNX-Runtime fallback (AZ-299) - +explicitly NOT on the PyTorch FP16 runtime (which is reserved for the +NetVLAD baseline). This runtime isolation lets a TRT engine compile +bug fall back to NetVLAD without simultaneously breaking both. + +The strategy delegates retrieval to :class:`FaissBridge` (AZ-341) and +the c6 ``DescriptorIndex`` cut (AZ-507) - see +:mod:`gps_denied_onboard.components.c2_vpr._faiss_bridge`. Embedding +goes through the c7 :class:`InferenceRuntime` Protocol via the local +:class:`InferenceRuntimeCut` (AZ-507). + +Architecture-registry differences from :class:`NetVladStrategy`: + +UltraVPR consumes a pre-compiled ``.trt`` engine produced by C10's +engine compiler (AZ-321) - there is no PyTorch ``nn.Module`` to +register. The strategy module therefore does NOT expose +``MODEL_NAME`` / ``architecture_factory``; the composition root's +:func:`gps_denied_onboard.runtime_root.vpr_factory.\ +_register_strategy_architecture` helper no-ops for this strategy. + +Engine load happens in :func:`create` (NOT at first frame) so the +engine-output-shape assertion (AC-6) surfaces at startup, not 17 +minutes into a flight when the first VPR query hits. + +Per-frame :meth:`embed_query` pipeline: + +1. ``preprocessor.preprocess(frame, calibration)`` -> + ``(1, 3, 384, 384)`` FP16 NCHW ndarray. +2. ``inference_runtime.infer(handle, {"input": tensor})`` -> + ``{"embedding": (1, 512) FP16 ndarray}``. +3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (UltraVPR is + single-stage; no intra-cluster step like NetVLAD). +4. Return :class:`VprQuery` with ``frame_id``, normalised embedding, + produced_at monotonic ns. + +Error envelope: every method raises only members of :class:`VprError`. +``RuntimeError`` from the backbone forward -> rewrapped to +:class:`VprBackboneError`; :class:`VprPreprocessError` from the +preprocessor propagates unchanged. :class:`IndexUnavailableError` +from :class:`FaissBridge` (and through it from c6) is re-raised +unchanged (AC-10). + +Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; +see AZ-341 AC-10. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final, Literal + +import numpy as np + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.vpr import VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor_ultra_vpr import ( + UltraVprBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.config.schema import Config + +__all__ = ["DESCRIPTOR_DIM", "UltraVprStrategy", "create"] + + +# UltraVPR ships with a fixed published embedding dimension (D=512) per +# the upstream research code drop. Unlike NetVLAD (whose Linear PCA +# layer makes the output dimension a tunable knob), UltraVPR's +# embedding head is fused into the engine; making this a config-knob +# would let an operator silently break AC-2.1b. AC-5 / AC-6 / AC-7 of +# AZ-337 all assume 512. +DESCRIPTOR_DIM: Final[int] = 512 + +_BACKBONE_LABEL: Final[Literal["ultra_vpr"]] = "ultra_vpr" +_COMPONENT: Final[str] = "c2_vpr" +_OUTPUT_KEY: Final[str] = "embedding" +_ENGINE_INPUT_KEY: Final[str] = "input" + +_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset( + {"tensorrt", "onnx_trt_ep"} +) + +_LOG_KIND_READY: Final[str] = "c2.vpr.ready" +_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" +_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + +_FDR_KIND_EMBED: Final[str] = "vpr.embed_query" +_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" +_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" + + +class UltraVprStrategy: + """C2 production-default VprStrategy backed by a TRT UltraVPR engine. + + See module docstring for the engine-loading + per-frame pipeline. + Stateless across frames (INV-2); single-threaded per instance + (INV-1, per AZ-336). + """ + + def __init__( + self, + *, + inference_runtime: InferenceRuntimeCut, + engine_handle: EngineHandle, + descriptor_index: DescriptorIndexCut, + preprocessor: UltraVprBackbonePreprocessor, + normaliser: DescriptorNormaliser, + faiss_bridge: FaissBridge, + fdr_client: FdrClient, + clock: Clock, + logger: logging.Logger, + descriptor_dim: int = DESCRIPTOR_DIM, + ) -> None: + if descriptor_dim < 1: + raise ValueError( + f"UltraVprStrategy.descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + self._inference_runtime = inference_runtime + self._engine_handle = engine_handle + self._descriptor_index = descriptor_index + self._preprocessor = preprocessor + self._normaliser = normaliser + self._faiss_bridge = faiss_bridge + self._fdr_client = fdr_client + self._clock = clock + self._logger = logger + self._descriptor_dim = descriptor_dim + + def embed_query( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> VprQuery: + try: + tensor = self._preprocessor.preprocess(frame, calibration) + except VprPreprocessError as exc: + self._emit_preprocess_error(frame, exc) + raise + + ns_start = self._clock.monotonic_ns() + try: + outputs = self._inference_runtime.infer( + self._engine_handle, {_ENGINE_INPUT_KEY: tensor} + ) + except Exception as exc: + wrapped = self._wrap_backbone_error(frame, exc) + raise wrapped from exc + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + if _OUTPUT_KEY not in outputs: + err = VprBackboneError( + f"UltraVPR forward returned no {_OUTPUT_KEY!r} key; " + f"got {sorted(outputs.keys())!r}" + ) + self._emit_backbone_error(frame, err) + raise err + + raw = np.asarray(outputs[_OUTPUT_KEY]) + if ( + raw.ndim != 2 + or raw.shape[0] != 1 + or raw.shape[1] != self._descriptor_dim + ): + err = VprBackboneError( + f"UltraVPR forward returned shape {raw.shape}; " + f"expected (1, {self._descriptor_dim})" + ) + self._emit_backbone_error(frame, err) + raise err + + flat = np.ascontiguousarray(raw[0], dtype=np.float16) + normalised = self._normaliser.l2_normalise(flat) + + self._emit_embed_record( + frame_id=int(frame.frame_id), latency_us=int(latency_us) + ) + + return VprQuery( + frame_id=int(frame.frame_id), + embedding=normalised, + produced_at=ns_end, + ) + + def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: + return self._faiss_bridge.retrieve( + query, k, backbone_label=_BACKBONE_LABEL + ) + + def descriptor_dim(self) -> int: + return self._descriptor_dim + + def _wrap_backbone_error( + self, frame: NavCameraFrame, exc: BaseException + ) -> VprBackboneError: + wrapped = VprBackboneError( + f"UltraVPR forward raised {type(exc).__name__}: {exc}" + ) + self._emit_backbone_error(frame, wrapped) + return wrapped + + def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_EMBED, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "descriptor_dim": self._descriptor_dim, + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.embed_query record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + }, + }, + ) + + def _emit_backbone_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"UltraVPR backbone error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BACKBONE_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_BACKBONE_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + def _emit_preprocess_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"UltraVPR preprocess error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PREPROCESS_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_PREPROCESS_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + +def _iso_ts_from_clock(clock: Clock) -> str: + # Same shape every component uses for FDR timestamps; AZ-508 will + # consolidate the duplicate helpers across c2/c11/c12/c6. + from datetime import datetime, timezone + + ns = int(clock.time_ns()) + seconds, fraction_ns = divmod(ns, 1_000_000_000) + dt = datetime.fromtimestamp(seconds, tz=timezone.utc) + return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}+00:00" + + +def _build_trt_build_config() -> BuildConfig: + return BuildConfig( + precision=PrecisionMode.FP16, + workspace_mb=0, + calibration_dataset=None, + optimization_profiles=(), + ) + + +def create( + config: Config, + *, + descriptor_index: DescriptorIndexCut, + inference_runtime: InferenceRuntimeCut, + fdr_client: FdrClient | None = None, + clock: Clock | None = None, + logger: logging.Logger | None = None, +) -> UltraVprStrategy: + """Module-level factory consumed by :func:`build_vpr_strategy`. + + AC-11: UltraVPR is unselectable when the C7 TRT / ONNX-RT runtimes + are excluded - ``current_runtime_label()`` MUST be one of + ``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected + with :class:`ConfigError` at composition time (NOT at first frame). + + AC-6: engine output shape is asserted at create time via a single + dry-run inference on a zero-init input; mismatch raises + :class:`ConfigError` BEFORE the strategy is bound. + + Optional keyword-only injection points (``fdr_client`` / ``clock`` / + ``logger``) keep tests deterministic; production wiring fills them + from the composition root. + """ + runtime_label = inference_runtime.current_runtime_label() + if runtime_label not in _ALLOWED_RUNTIME_LABELS: + raise ConfigError( + f"UltraVPR requires BUILD_TENSORRT_RUNTIME=ON (or " + f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary " + f"has runtime_label={runtime_label!r}. Per AZ-337 AC-11, " + f"UltraVPR is unselectable when the C7 TRT / ONNX-RT " + f"runtimes are excluded." + ) + + block = config.components["c2_vpr"] + weights_path = block.backbone_weights_path + + if fdr_client is None: + raise ValueError( + "UltraVprStrategy.create: fdr_client is required; the " + "composition root must inject the running FDR client." + ) + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + if logger is None: + logger = logging.getLogger("gps_denied_onboard.c2_vpr.ultra_vpr") + + entry = inference_runtime.compile_engine( + weights_path, _build_trt_build_config() + ) + handle = inference_runtime.deserialize_engine(entry) + + preprocessor = UltraVprBackbonePreprocessor(logger=logger) + normaliser = DescriptorNormaliser() + faiss_bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=DESCRIPTOR_DIM, + warn_top1_threshold=block.warn_top1_threshold, + debug_log_per_frame_distances=block.debug_per_frame_distances, + fdr_client=fdr_client, + logger=logger, + clock=clock, + ) + + _assert_engine_output_dim(inference_runtime, handle, preprocessor) + + logger.info( + "C2 VPR strategy ready", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_READY, + "kv": { + "strategy": _BACKBONE_LABEL, + "descriptor_dim": DESCRIPTOR_DIM, + }, + }, + ) + + return UltraVprStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=faiss_bridge, + fdr_client=fdr_client, + clock=clock, + logger=logger, + descriptor_dim=DESCRIPTOR_DIM, + ) + + +def _assert_engine_output_dim( + inference_runtime: InferenceRuntimeCut, + handle: EngineHandle, + preprocessor: UltraVprBackbonePreprocessor, +) -> None: + h, w = preprocessor.input_shape() + probe = np.zeros((1, 3, h, w), dtype=np.float16) + outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe}) + if _OUTPUT_KEY not in outputs: + raise ConfigError( + f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; " + f"got keys {sorted(outputs.keys())!r}" + ) + actual = np.asarray(outputs[_OUTPUT_KEY]) + if ( + actual.ndim != 2 + or actual.shape[0] != 1 + or actual.shape[1] != DESCRIPTOR_DIM + ): + raise ConfigError( + f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), " + f"got {tuple(actual.shape)}" + ) diff --git a/tests/unit/c2_vpr/test_ultra_vpr.py b/tests/unit/c2_vpr/test_ultra_vpr.py new file mode 100644 index 0000000..bb6130a --- /dev/null +++ b/tests/unit/c2_vpr/test_ultra_vpr.py @@ -0,0 +1,852 @@ +"""AZ-337 - UltraVPR primary VprStrategy unit tests. + +Covers AC-1..AC-12 + preprocessor contract + constructor validation + +FDR record emission + single-stage L2 normalisation. Uses fakes for +:class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`, and +:class:`FdrClient` so the suite stays AZ-507-clean and TRT-free. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Literal +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineCacheEntry, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.nav import NavCameraFrame +from gps_denied_onboard.components.c2_vpr import ( + C2VprConfig, + IndexUnavailableError, + VprStrategy, +) +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor import ( + BackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr._preprocessor_ultra_vpr import ( + UltraVprBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.ultra_vpr import ( + DESCRIPTOR_DIM, + UltraVprStrategy, + create, +) +from gps_denied_onboard.config.schema import Config, ConfigError +from gps_denied_onboard.fdr_client import FdrClient +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +@dataclass +class _StubClock: + next_monotonic_ns: int = 1_000_000_000 + step_ns: int = 5_000 + fixed_time_ns: int = 1_715_600_000_000_000_000 + + def monotonic_ns(self) -> int: + v = self.next_monotonic_ns + self.next_monotonic_ns += self.step_ns + return v + + def time_ns(self) -> int: + return self.fixed_time_ns + + def sleep_until_ns(self, target_ns: int) -> None: + _ = target_ns + + +class _FakeEngineHandle(EngineHandle): + """Minimal :class:`EngineHandle` for test wiring.""" + + def __init__(self, label: str = "ultra_vpr") -> None: + self.label = label + + +@dataclass +class _FakeInferenceRuntime: + """Configurable :class:`InferenceRuntimeCut` for unit tests. + + ``fixed_output`` is the array returned under ``embedding``; ``raises`` + when set is raised instead. ``runtime_label`` controls AC-11. + """ + + descriptor_dim: int = DESCRIPTOR_DIM + raises: BaseException | None = None + runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = ( + "tensorrt" + ) + fixed_output: np.ndarray | None = None + output_key: str = "embedding" + calls: list[dict[str, np.ndarray]] = field(default_factory=list) + deserialize_calls: list[EngineCacheEntry] = field(default_factory=list) + + def compile_engine( + self, model_path: Path, build_config: BuildConfig + ) -> EngineCacheEntry: + _ = build_config + return EngineCacheEntry( + engine_path=Path(model_path), + sha256_hex="0" * 64, + sm=None, + jp=None, + trt=None, + precision=PrecisionMode.FP16, + extras={"model_name": "ultra_vpr"}, + ) + + def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: + self.deserialize_calls.append(entry) + return _FakeEngineHandle(label=entry.extras.get("model_name", "")) + + def infer( + self, + handle: EngineHandle, + inputs: dict[str, np.ndarray], + ) -> dict[str, np.ndarray]: + _ = handle + self.calls.append({k: v.copy() for k, v in inputs.items()}) + if self.raises is not None: + raise self.raises + if self.fixed_output is not None: + return {self.output_key: self.fixed_output.copy()} + rng = np.random.default_rng(0xCAFEBABE) + tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16) + return { + self.output_key: tensor.reshape(1, self.descriptor_dim).copy() + } + + def release_engine(self, handle: EngineHandle) -> None: + _ = handle + + def current_runtime_label( + self, + ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: + return self.runtime_label + + +@dataclass +class _FakeDescriptorIndex: + descriptor_dim_value: int = DESCRIPTOR_DIM + results: list[tuple[tuple[int, float, float], float]] = field( + default_factory=list + ) + raises: BaseException | None = None + + def search_topk( + self, query: np.ndarray, k: int + ) -> list[tuple[tuple[int, float, float], float]]: + _ = query + if self.raises is not None: + raise self.raises + if not self.results: + return [ + ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i) + for i in range(k) + ] + return list(self.results[:k]) + + def descriptor_dim(self) -> int: + return self.descriptor_dim_value + + +def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame: + rng = np.random.default_rng(frame_id) + image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8) + return NavCameraFrame( + frame_id=frame_id, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=image, + camera_calibration_id="test_cam", + ) + + +def _make_calibration(*, cx: float = 640.0, cy: float = 360.0) -> CameraCalibration: + """Return a calibration with a non-trivial principal point. + + The identity matrix used elsewhere in the tests collapses to + ``(cx, cy) == (0, 0)`` which the preprocessor treats as + "no calibration data" - here we set explicit values to exercise + the principal-point-aware crop path. + """ + intrinsics = np.array( + [ + [1000.0, 0.0, cx], + [0.0, 1000.0, cy], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + return CameraCalibration( + camera_id="test_cam", + intrinsics_3x3=intrinsics, + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_fixture", + ) + + +def _make_calibration_identity() -> CameraCalibration: + """Identity intrinsics - principal point collapses to (0, 0).""" + return CameraCalibration( + camera_id="test_cam", + intrinsics_3x3=np.eye(3, dtype=np.float64), + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_fixture", + ) + + +def _make_fdr_client() -> FdrClient: + return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False) + + +def _build_strategy( + *, + inference_runtime: _FakeInferenceRuntime | None = None, + descriptor_index: _FakeDescriptorIndex | None = None, + normaliser: DescriptorNormaliser | None = None, + preprocessor: UltraVprBackbonePreprocessor | None = None, + fdr_client: FdrClient | None = None, + clock: _StubClock | None = None, + descriptor_dim: int = DESCRIPTOR_DIM, +) -> UltraVprStrategy: + inference_runtime = inference_runtime or _FakeInferenceRuntime( + descriptor_dim=descriptor_dim + ) + descriptor_index = descriptor_index or _FakeDescriptorIndex( + descriptor_dim_value=descriptor_dim + ) + normaliser = normaliser or DescriptorNormaliser() + preprocessor = preprocessor or UltraVprBackbonePreprocessor() + fdr_client = fdr_client or _make_fdr_client() + clock = clock or _StubClock() + handle = _FakeEngineHandle() + bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=descriptor_dim, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger("test.bridge"), + clock=clock, + ) + return UltraVprStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=bridge, + fdr_client=fdr_client, + clock=clock, + logger=logging.getLogger("test.ultra_vpr"), + descriptor_dim=descriptor_dim, + ) + + +def _build_config() -> Config: + """Minimal Config carrying only the c2_vpr block needed by ``create()``.""" + c2 = C2VprConfig( + strategy="ultra_vpr", + backbone_weights_path=Path("/models/ultra_vpr.trt"), + faiss_index_path=Path("/cache/vpr/index.faiss"), + warn_top1_threshold=0.30, + debug_per_frame_distances=False, + ) + cfg = MagicMock(spec=Config) + cfg.components = {"c2_vpr": c2} + return cfg + + +# --------------------------------------------------------------------------- +# AC-1: Protocol conformance +# --------------------------------------------------------------------------- + + +def test_ac1_protocol_conformance() -> None: + strategy = _build_strategy() + assert isinstance(strategy, VprStrategy) + + +# --------------------------------------------------------------------------- +# AC-2: embed_query produces L2-normalised FP16 (512,) embedding +# --------------------------------------------------------------------------- + + +def test_ac2_embed_query_returns_unit_norm_fp16_512() -> None: + # Arrange + runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM) + strategy = _build_strategy(inference_runtime=runtime) + frame = _make_frame() + calibration = _make_calibration() + # Act + query = strategy.embed_query(frame, calibration) + # Assert + embedding = np.asarray(query.embedding) + assert embedding.shape == (DESCRIPTOR_DIM,) + assert embedding.dtype == np.float16 + norm = float(np.linalg.norm(embedding.astype(np.float32))) + assert norm == pytest.approx(1.0, abs=1e-3) + + +def test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path() -> None: + """UltraVPR is single-stage L2 (unlike NetVLAD's two-stage chain). + + Calling :meth:`DescriptorNormaliser.intra_cluster_normalise` would + be a bug; verify the strategy never invokes it. + """ + calls: list[str] = [] + + class _SpyNormaliser(DescriptorNormaliser): + def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override] + calls.append("l2_normalise") + return DescriptorNormaliser.l2_normalise(descriptor) + + def intra_cluster_normalise( # type: ignore[override] + self, descriptor: np.ndarray, num_clusters: int + ) -> np.ndarray: + calls.append("intra_cluster_normalise") + return DescriptorNormaliser.intra_cluster_normalise( + descriptor, num_clusters + ) + + spy = _SpyNormaliser() + strategy = _build_strategy(normaliser=spy) + strategy.embed_query(_make_frame(), _make_calibration()) + assert "intra_cluster_normalise" not in calls + assert calls == ["l2_normalise"] + + +# --------------------------------------------------------------------------- +# AC-3: embed_query is deterministic +# --------------------------------------------------------------------------- + + +def test_ac3_embed_query_deterministic_for_same_frame() -> None: + fixed = np.zeros((1, DESCRIPTOR_DIM), dtype=np.float16) + rng = np.random.default_rng(2026) + fixed[0] = rng.standard_normal(DESCRIPTOR_DIM).astype(np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, fixed_output=fixed + ) + strategy = _build_strategy(inference_runtime=runtime) + frame = _make_frame() + calibration = _make_calibration() + first = strategy.embed_query(frame, calibration) + second = strategy.embed_query(frame, calibration) + third = strategy.embed_query(frame, calibration) + np.testing.assert_array_equal( + np.asarray(first.embedding), np.asarray(second.embedding) + ) + np.testing.assert_array_equal( + np.asarray(second.embedding), np.asarray(third.embedding) + ) + + +# --------------------------------------------------------------------------- +# AC-4: retrieve_topk returns exactly k candidates sorted ascending +# --------------------------------------------------------------------------- + + +def test_ac4_retrieve_topk_returns_exactly_k_with_ultra_vpr_label() -> None: + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) + strategy = _build_strategy(descriptor_index=descriptor_index) + query = strategy.embed_query(_make_frame(), _make_calibration()) + result = strategy.retrieve_topk(query, k=10) + assert len(result.candidates) == 10 + assert result.backbone_label == "ultra_vpr" + assert result.candidates[0].descriptor_dim == DESCRIPTOR_DIM + distances = [c.descriptor_distance for c in result.candidates] + assert distances == sorted(distances) + + +# --------------------------------------------------------------------------- +# AC-5: descriptor_dim() is stable and returns 512 +# --------------------------------------------------------------------------- + + +def test_ac5_descriptor_dim_stable_returns_512() -> None: + strategy = _build_strategy() + for _ in range(100): + assert strategy.descriptor_dim() == 512 + + +# --------------------------------------------------------------------------- +# AC-6: Engine output shape mismatch at create() -> ConfigError +# --------------------------------------------------------------------------- + + +def test_ac6_create_rejects_engine_output_shape_mismatch() -> None: + # Arrange - engine produces (1, 256), expected (1, 512) + wrong = np.zeros((1, 256), dtype=np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, fixed_output=wrong + ) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) + fdr_client = _make_fdr_client() + config = _build_config() + + # Act + Assert + with pytest.raises( + ConfigError, match=r"engine output shape mismatch.*\(1, 512\).*\(1, 256\)" + ): + create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + ) + + +def test_ac6_create_rejects_engine_with_missing_embedding_key() -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, output_key="wrong_key" + ) + with pytest.raises(ConfigError, match=r"'embedding' key absent"): + create( + _build_config(), + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=DESCRIPTOR_DIM + ), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-7: VprBackboneError on forward-pass failure +# --------------------------------------------------------------------------- + + +def test_ac7_runtime_error_yields_vpr_backbone_error( + caplog: pytest.LogCaptureFixture, +) -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, raises=RuntimeError("CUDA OOM") + ) + fdr_client = _make_fdr_client() + strategy = _build_strategy( + inference_runtime=runtime, fdr_client=fdr_client + ) + with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"): + with pytest.raises(VprBackboneError): + strategy.embed_query(_make_frame(), _make_calibration()) + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.backbone_error" + for record in caplog.records + ) + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"] + assert len(backbone_errors) == 1 + + +def test_ac7_missing_embedding_key_yields_vpr_backbone_error() -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, output_key="not_embedding" + ) + strategy = _build_strategy(inference_runtime=runtime) + with pytest.raises(VprBackboneError, match=r"'embedding' key"): + strategy.embed_query(_make_frame(), _make_calibration()) + + +def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error() -> None: + bad = np.zeros((1, 256), dtype=np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, fixed_output=bad + ) + strategy = _build_strategy(inference_runtime=runtime) + with pytest.raises(VprBackboneError, match=r"expected \(1, 512\)"): + strategy.embed_query(_make_frame(), _make_calibration()) + + +# --------------------------------------------------------------------------- +# AC-8: VprPreprocessError on corrupt image bytes +# --------------------------------------------------------------------------- + + +def test_ac8_corrupt_image_yields_vpr_preprocess_error( + caplog: pytest.LogCaptureFixture, +) -> None: + fdr_client = _make_fdr_client() + strategy = _build_strategy(fdr_client=fdr_client) + frame = NavCameraFrame( + frame_id=4242, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image="not-an-array", + camera_calibration_id="test_cam", + ) + with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"): + with pytest.raises(VprPreprocessError): + strategy.embed_query(frame, _make_calibration()) + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.preprocess_error" + for record in caplog.records + ) + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + preprocess_errors = [ + r for r in records if r.kind == "vpr.preprocess_error" + ] + assert len(preprocess_errors) == 1 + + +def test_ac8_wrong_dtype_image_yields_vpr_preprocess_error() -> None: + strategy = _build_strategy() + bad_image = np.zeros((720, 1280, 3), dtype=np.float32) + frame = NavCameraFrame( + frame_id=42, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=bad_image, + camera_calibration_id="test_cam", + ) + with pytest.raises(VprPreprocessError, match=r"uint8"): + strategy.embed_query(frame, _make_calibration()) + + +# --------------------------------------------------------------------------- +# AC-9: Calibration absent / identity -> centre-crop fallback + WARN log +# --------------------------------------------------------------------------- + + +def test_ac9_identity_calibration_falls_back_to_geometric_centre( + caplog: pytest.LogCaptureFixture, +) -> None: + """Identity intrinsics produce ``(cx, cy) == (0, 0)`` which the + preprocessor treats as missing calibration data. + """ + preprocessor_logger = logging.getLogger("test.ultra_vpr.pp") + preprocessor = UltraVprBackbonePreprocessor(logger=preprocessor_logger) + strategy = _build_strategy(preprocessor=preprocessor) + with caplog.at_level(logging.WARNING, logger="test.ultra_vpr.pp"): + query = strategy.embed_query( + _make_frame(), _make_calibration_identity() + ) + warn_records = [ + r + for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.calibration_missing" + ] + assert len(warn_records) == 1 + # AC-2 still holds with the fallback path + norm = float(np.linalg.norm(np.asarray(query.embedding).astype(np.float32))) + assert norm == pytest.approx(1.0, abs=1e-3) + + +def test_ac9_principal_point_offset_changes_crop_window() -> None: + """The principal-point-aware crop produces a different output than + the geometric-centre crop when the principal point is non-central. + """ + rng = np.random.default_rng(0xABCD) + image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=image, + camera_calibration_id="cam", + ) + pp = UltraVprBackbonePreprocessor() + cal_centre = _make_calibration(cx=640.0, cy=360.0) + cal_offset = _make_calibration(cx=900.0, cy=200.0) + out_centre = pp.preprocess(frame, cal_centre) + out_offset = pp.preprocess(frame, cal_offset) + assert not np.array_equal(out_centre, out_offset) + + +# --------------------------------------------------------------------------- +# AC-10: IndexUnavailableError propagated unchanged from retrieve_topk +# --------------------------------------------------------------------------- + + +def test_ac10_index_unavailable_propagates_unchanged() -> None: + err = IndexUnavailableError("stale handle") + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=DESCRIPTOR_DIM, raises=err + ) + strategy = _build_strategy(descriptor_index=descriptor_index) + query = strategy.embed_query(_make_frame(), _make_calibration()) + with pytest.raises(IndexUnavailableError, match=r"stale handle"): + strategy.retrieve_topk(query, k=10) + + +# --------------------------------------------------------------------------- +# AC-11: composition-root wiring + INFO log "c2.vpr.ready" +# --------------------------------------------------------------------------- + + +def test_ac11_create_emits_strategy_ready_info_log( + caplog: pytest.LogCaptureFixture, +) -> None: + runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) + fdr_client = _make_fdr_client() + config = _build_config() + logger = logging.getLogger("test.ultra_vpr.create") + + with caplog.at_level(logging.INFO, logger="test.ultra_vpr.create"): + strategy = create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + logger=logger, + ) + + assert isinstance(strategy, UltraVprStrategy) + assert strategy.descriptor_dim() == 512 + ready_logs = [ + r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready" + ] + assert len(ready_logs) == 1 + kv = ready_logs[0].kv # type: ignore[attr-defined] + assert kv["strategy"] == "ultra_vpr" + assert kv["descriptor_dim"] == 512 + + +def test_ac11_non_trt_runtime_rejected_at_create() -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, runtime_label="pytorch_fp16" + ) + config = _build_config() + with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME=ON"): + create( + config, + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=DESCRIPTOR_DIM + ), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +def test_ac11_onnx_trt_ep_runtime_accepted_at_create() -> None: + """ONNX-Runtime is the documented fallback (per AZ-337 description).""" + runtime = _FakeInferenceRuntime( + descriptor_dim=DESCRIPTOR_DIM, runtime_label="onnx_trt_ep" + ) + strategy = create( + _build_config(), + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=DESCRIPTOR_DIM + ), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + assert isinstance(strategy, UltraVprStrategy) + + +# --------------------------------------------------------------------------- +# AC-12: WARN log on top-1 distance above threshold (delegated to FaissBridge) +# --------------------------------------------------------------------------- + + +def test_ac12_top1_above_threshold_emits_warn_via_faiss_bridge( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange - corpus returns top-1 distance 0.42 > 0.30 default threshold + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=DESCRIPTOR_DIM, + results=[ + ((1, 49.0, 36.0), 0.42), + ((2, 49.001, 36.001), 0.51), + ((3, 49.002, 36.002), 0.65), + ], + ) + strategy = _build_strategy(descriptor_index=descriptor_index) + query = strategy.embed_query(_make_frame(), _make_calibration()) + + with caplog.at_level(logging.WARNING, logger="test.bridge"): + strategy.retrieve_topk(query, k=3) + + warn_records = [ + r + for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" + ] + assert len(warn_records) == 1 + kv = warn_records[0].kv # type: ignore[attr-defined] + assert kv["distance"] == pytest.approx(0.42) + assert kv["threshold"] == pytest.approx(0.30) + assert kv["backbone_label"] == "ultra_vpr" + + +# --------------------------------------------------------------------------- +# Preprocessor contract +# --------------------------------------------------------------------------- + + +def test_preprocessor_output_shape_and_dtype() -> None: + pp = UltraVprBackbonePreprocessor() + rng = np.random.default_rng(2026) + image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=image, + camera_calibration_id="cam", + ) + out = pp.preprocess(frame, _make_calibration()) + assert out.shape == (1, 3, 384, 384) + assert out.dtype == np.float16 + + +def test_preprocessor_input_shape_is_384x384() -> None: + pp = UltraVprBackbonePreprocessor() + assert pp.input_shape() == (384, 384) + + +def test_preprocessor_protocol_conformance() -> None: + pp = UltraVprBackbonePreprocessor() + assert isinstance(pp, BackbonePreprocessor) + + +def test_preprocessor_accepts_grayscale_input() -> None: + pp = UltraVprBackbonePreprocessor() + gray = np.zeros((512, 512), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=gray, + camera_calibration_id="cam", + ) + out = pp.preprocess(frame, _make_calibration()) + assert out.shape == (1, 3, 384, 384) + + +def test_preprocessor_mean_std_correct_on_grey_image() -> None: + """A uniform-grey image should produce per-channel ``(grey - mean) / std``.""" + pp = UltraVprBackbonePreprocessor() + grey = np.full((512, 512, 3), 128, dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=grey, + camera_calibration_id="cam", + ) + out = pp.preprocess(frame, _make_calibration()) + mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) + std = np.array([0.229, 0.224, 0.225], dtype=np.float32) + expected = (128.0 / 255.0 - mean) / std + actual_per_channel = ( + out[0].astype(np.float32).reshape(3, -1).mean(axis=1) + ) + np.testing.assert_allclose(actual_per_channel, expected, atol=1e-2) + + +# --------------------------------------------------------------------------- +# Constructor validation +# --------------------------------------------------------------------------- + + +def _make_minimal_strategy_kwargs(*, descriptor_dim: int) -> dict[str, Any]: + """Build kwargs that pass FaissBridge guards. + + The strategy carries its own ``descriptor_dim`` validation; the + bridge has a separate (stricter) ``descriptor_dim > 0`` guard. + Tests that exercise the strategy's own validators MUST use a bridge + with a valid dim. + """ + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = FaissBridge( + descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=512), + descriptor_dim=512, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger("test.bridge.guard"), + clock=clock, + ) + return { + "inference_runtime": _FakeInferenceRuntime(), + "engine_handle": _FakeEngineHandle(), + "descriptor_index": _FakeDescriptorIndex(), + "preprocessor": UltraVprBackbonePreprocessor(), + "normaliser": DescriptorNormaliser(), + "faiss_bridge": bridge, + "fdr_client": fdr_client, + "clock": clock, + "logger": logging.getLogger("test.ultra_vpr.guard"), + "descriptor_dim": descriptor_dim, + } + + +def test_constructor_rejects_zero_descriptor_dim() -> None: + with pytest.raises(ValueError, match=r">= 1"): + UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=0)) + + +def test_constructor_rejects_negative_descriptor_dim() -> None: + with pytest.raises(ValueError, match=r">= 1"): + UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=-5)) + + +# --------------------------------------------------------------------------- +# FDR record emission +# --------------------------------------------------------------------------- + + +def test_embed_query_emits_vpr_embed_query_fdr_record() -> None: + fdr_client = _make_fdr_client() + strategy = _build_strategy(fdr_client=fdr_client) + strategy.embed_query(_make_frame(), _make_calibration()) + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + embed_records = [r for r in records if r.kind == "vpr.embed_query"] + assert len(embed_records) == 1 + payload = embed_records[0].payload + assert payload["backbone_label"] == "ultra_vpr" + assert payload["descriptor_dim"] == 512 + assert isinstance(payload["latency_us"], int) + assert payload["latency_us"] > 0 + + +def test_create_does_not_register_pytorch_architecture() -> None: + """UltraVPR uses a TRT engine - no PyTorch architecture registration. + + Verifies the strategy module does NOT expose ``MODEL_NAME`` / + ``architecture_factory`` attributes (which would trigger registration + in the composition root). + """ + import gps_denied_onboard.components.c2_vpr.ultra_vpr as mod + + assert not hasattr(mod, "MODEL_NAME") + assert not hasattr(mod, "architecture_factory")