[AZ-337] C2 UltraVPR primary backbone VprStrategy

UltraVPR is the Documentary Lead's PRIMARY backbone per
description.md § 1 and is wired by default
(config.c2_vpr.strategy = "ultra_vpr"). Runs on the C7 TensorRT
runtime (AZ-298) or ONNX-Runtime fallback (AZ-299); explicitly NOT
on the PyTorch FP16 runtime so a TRT engine compile bug can fall
back to NetVLAD without simultaneously breaking both strategies.

Production changes:
- c2_vpr/ultra_vpr.py - UltraVprStrategy + module-level create()
  factory. embed_query pipeline: preprocess -> runtime.infer ->
  single-stage L2 -> VprQuery. retrieve_topk delegates one-line to
  FaissBridge. Engine load + output-shape assertion happen at
  create() time (AC-6) so misconfiguration surfaces at startup,
  not 17 minutes into a flight. UltraVPR has D=512 fixed (NOT a
  config knob; AC-5 / AC-6 / AC-7 all assume 512). Single-stage L2
  (no intra-cluster step like NetVLAD; spy-test enforces this so a
  future refactor cannot silently regress recall).
- c2_vpr/_preprocessor_ultra_vpr.py - centre-crop using the camera
  calibration's principal point (cx, cy from intrinsics_3x3),
  falling back to geometric centre + WARN log when calibration is
  absent (AC-9). Resize -> (384, 384) -> ImageNet mean/std ->
  FP16 NCHW.
- No composition-root changes: UltraVPR consumes a pre-compiled
  .trt engine (no PyTorch nn.Module), so the strategy module does
  NOT expose MODEL_NAME / architecture_factory. The composition-
  root _register_strategy_architecture helper no-ops cleanly for
  this case (verified by test_create_does_not_register_pytorch_architecture).

Tests:
- tests/unit/c2_vpr/test_ultra_vpr.py - 29 tests covering all 12
  ACs + preprocessor contract + constructor validation + FDR
  record emission + single-stage L2 enforcement.

Full unit suite: 1637 passed / 80 env-skipped (+29 new tests).
Per-batch code review (batch_47_review.md): PASS_WITH_WARNINGS
(3 Low-severity findings; no Critical / High / Medium):
- F1: _iso_ts_from_clock is now the 7th copy (AZ-508 will close).
- F2: AZ-337 spec uses outdated C7 API names; affects upcoming
  AZ-339 / AZ-340. Spec-hygiene PBI recommended.
- F3: principal-point fallback uses (0, 0) zero-detection for
  missing calibration; safe but tightens when intrinsics become
  Optional.

Architectural notes:
- AZ-507 layering clean. Imports only InferenceRuntimeCut,
  DescriptorIndexCut, c2_vpr internals, _types, helpers,
  clock, fdr_client. Architecture lint test passes.
- Pattern parity with NetVLAD (B46) where semantics permit;
  UltraVPR-specific paths (single-stage L2, 'embedding' output
  key, TRT runtime, no architecture registry, principal-point
  crop) are clearly localised.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 22:43:17 +03:00
parent 773d589d34
commit 3c4fd272f1
5 changed files with 1692 additions and 1 deletions
@@ -0,0 +1,165 @@
# Batch 47 / Cycle 1 — Per-Batch Code Review
**Date**: 2026-05-13
**Tasks**: AZ-337 — C2 UltraVPR Primary Backbone (5pt)
**Reviewer**: autodev orchestrator (inline review)
**Verdict**: `PASS_WITH_WARNINGS` — three Low-severity findings, none blocking.
## Scope
- `src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py` (new, 372 lines)
- `src/gps_denied_onboard/components/c2_vpr/_preprocessor_ultra_vpr.py` (new, 188 lines)
- `tests/unit/c2_vpr/test_ultra_vpr.py` (new, 581 lines, 29 tests)
- `_docs/_autodev_state.md` (state bump)
No modifications to existing production files. The composition-root
helper `_register_strategy_architecture` already no-ops cleanly for
strategies that do not expose `MODEL_NAME` / `architecture_factory`
exactly the design path AZ-337 takes.
## Acceptance-Criteria Coverage Verification
12/12 ACs have at least one covering test:
| AC | Test(s) |
|----|---------|
| AC-1 (Protocol conformance) | `test_ac1_protocol_conformance` |
| AC-2 (L2-norm FP16 (512,)) | `test_ac2_embed_query_returns_unit_norm_fp16_512`, `test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path` |
| AC-3 (deterministic) | `test_ac3_embed_query_deterministic_for_same_frame` |
| AC-4 (`retrieve_topk == k`, sorted, label) | `test_ac4_retrieve_topk_returns_exactly_k_with_ultra_vpr_label` |
| AC-5 (`descriptor_dim()` stable returns 512) | `test_ac5_descriptor_dim_stable_returns_512` |
| AC-6 (engine output shape mismatch → `ConfigError`) | `test_ac6_create_rejects_engine_output_shape_mismatch`, `test_ac6_create_rejects_engine_with_missing_embedding_key` |
| AC-7 (`VprBackboneError` on forward fail + ERROR log + FDR) | `test_ac7_runtime_error_yields_vpr_backbone_error`, `test_ac7_missing_embedding_key_yields_vpr_backbone_error`, `test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error` |
| AC-8 (`VprPreprocessError` on corrupt image + ERROR log + FDR) | `test_ac8_corrupt_image_yields_vpr_preprocess_error`, `test_ac8_wrong_dtype_image_yields_vpr_preprocess_error` |
| AC-9 (calibration absent → geometric centre + WARN) | `test_ac9_identity_calibration_falls_back_to_geometric_centre`, `test_ac9_principal_point_offset_changes_crop_window` |
| AC-10 (`IndexUnavailableError` re-raised unchanged) | `test_ac10_index_unavailable_propagates_unchanged` |
| AC-11 (composition-root wiring + BUILD-flag gate) | `test_ac11_create_emits_strategy_ready_info_log`, `test_ac11_non_trt_runtime_rejected_at_create`, `test_ac11_onnx_trt_ep_runtime_accepted_at_create` |
| AC-12 (top-1 > threshold → WARN via FaissBridge) | `test_ac12_top1_above_threshold_emits_warn_via_faiss_bridge` |
## Tests
- `tests/unit/c2_vpr/test_ultra_vpr.py`: **29 / 29 PASS** in 2.5s.
- Full unit suite: **1637 passed / 80 skipped / 0 failed** in ~66s.
Up from 1608 at the close of Batch 46 (+29 new tests).
- `ruff check` on all new + modified files: clean.
## Architectural Review
### F1 — `_iso_ts_from_clock` is now the 7th copy (Low / Maintainability, carried)
`_iso_ts_from_clock` appears verbatim in `ultra_vpr.py` lines 296-303,
matching the same helper in `net_vlad.py`, `_faiss_bridge.py`,
`c11_*`, `c12_*`, `c6_tile_cache.postgres_filesystem_store`, and
`c6_tile_cache.freshness_gate`. This is the 7th identical copy.
Already tracked by **AZ-508** ("ISO timestamp consolidation, 2pt").
Recommend prioritising AZ-508 before the remaining C2 strategies
(AZ-339, AZ-340) add copies #8 and #9.
### F2 — Spec→implementation drift on C7 API names (Low / Spec-Hygiene)
The AZ-337 spec § Outcome uses outdated C7 API names:
- Spec: `runtime.forward(engine_id, {...})["embedding"]`
- Live: `runtime.infer(handle, {...})` returning `dict[str, ndarray]`
- Spec: `runtime.load_engine(weights_path)`
- Live: `runtime.compile_engine(model_path, build_config)`
`EngineCacheEntry`, then `deserialize_engine(entry)``EngineHandle`
Same drift was flagged in Batch 46 (AZ-338) review as F2. The
implementation aligns with the live v1.0.0 Protocol (AZ-297); spec
text is stale. **Recommendation**: a spec-hygiene PBI for
AZ-339 / AZ-340 / AZ-358 / AZ-349 to refresh references to the
v1.0.0 C7 Protocol BEFORE those tasks are picked up — otherwise
each batch repeats the same "spec said X, code does Y" review note.
### F3 — Principal-point fallback heuristic relies on identity-matrix detection (Low / Test-Robustness)
`UltraVprBackbonePreprocessor._extract_principal_point` treats
`(cx, cy) == (0, 0)` as "no calibration data" because the test
fixture uses `np.eye(3)` for "missing" calibration. A real camera
calibration with a genuine principal point near the top-left
(unusual but legal for cropped sensors) would also be skipped to
geometric-centre fallback. The heuristic is correct for production
(intrinsics zeroed → no calibration) but the test fixture trick
would benefit from a `None` sentinel or a flag rather than relying
on the zero-equality check. **Risk is bounded**: no real camera
has `cx == 0 and cy == 0`; the worst-case is a one-frame mis-crop
with a graceful WARN log. Not blocking. **Recommendation**: when a
real `intrinsics_3x3 == None` path lands (currently the dataclass
field is `Any` not `Optional`), tighten the type annotation and
remove the zero-detection branch.
### Architecture Notes (Strengths)
1. **AZ-507 layering clean**. UltraVPR consumes
`InferenceRuntimeCut` + `DescriptorIndexCut` (both C2-owned
structural cuts), never `components.c7_inference` or
`components.c6_tile_cache` directly. Architecture lint test
`test_ac6_only_compose_root_imports_concrete_strategies` PASS.
2. **No PyTorch architecture registration**. UltraVPR is the first
strategy that does NOT register a NN architecture
(TRT-engine-only, no PyTorch fallback). Verified by
`test_create_does_not_register_pytorch_architecture`. The
composition-root `_register_strategy_architecture` helper
no-ops cleanly for this case — no code change needed there.
3. **Engine load + output-shape assertion at `create` time**.
Failure surfaces at composition time, NOT at first frame.
Matches Constraint § 5 of the task spec.
4. **Single-stage L2 normalisation**. Explicitly verified to NOT
call `intra_cluster_normalise` (NetVLAD's two-stage path).
This is a regression-blocking spy in `test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path` — if a
future refactor accidentally adds the intra-cluster step,
recall would silently degrade on the Derkachi corpus.
5. **Constructor-injection only**. No `import` of
`gps_denied_onboard.config` inside `ultra_vpr.py`; config is
consumed exclusively through the `create()` factory parameter.
6. **Pattern parity with NetVLAD**. `embed_query` / `retrieve_topk`
/ `_emit_*` shape mirrors `NetVladStrategy` line-for-line where
semantics permit; UltraVPR-specific paths (single-stage L2,
`"embedding"` output key, TRT runtime, no architecture
registry) are clearly localised.
7. **AC-12 delegation**. `FaissBridge` already owns the
top-1-distance WARN log; UltraVPR inherits this for free via
the same delegation that NetVLAD uses — one production
touchpoint, two strategies. Confirmed by direct test.
8. **Calibration consumption matches spec**. The principal-point
crop is the documented UltraVPR preprocessing; the geometric-
centre fallback path with WARN log satisfies AC-9 exactly.
## Performance
Performance NFR (C2-PT-01 `embed_query` p95 ≤ 60 ms on Tier-1 Jetson
Orin) is deferred to E-BBT per task spec § NFRs. Macbook dev tier
has no TRT 10.3 + Jetson Orin to benchmark against. **Carry-over to
the next cumulative review**: F3 from the 43-45 cumulative report
already tracks "Tier-1 perf microbenchmarks deferred"; AZ-337 adds
to that backlog.
## Comparison vs Batch 46 (AZ-338)
| Aspect | NetVLAD (B46) | UltraVPR (B47) |
|--------|---------------|----------------|
| Runtime label | `pytorch_fp16` | `tensorrt` / `onnx_trt_ep` |
| Engine input | `.pth` state dict | `.trt` engine file |
| Architecture registry | binds factory | no-op |
| Descriptor dim | 4096 (configurable PCA) | 512 (fixed) |
| Normalisation | intra-cluster THEN L2 | L2 only |
| Output key | `vlad_descriptor` | `embedding` |
| Input shape | `(480, 480)` | `(384, 384)` |
| Calibration use | ignored | principal-point crop |
| Test count | 31 | 29 |
## Verdict
`PASS_WITH_WARNINGS`. Three Low-severity findings, none blocking:
- **F1** (carried from B46 / cumulative 43-45): `_iso_ts_from_clock`
is the 7th copy; AZ-508 will consolidate.
- **F2** (carried from B46): spec→implementation drift on C7 API
names; affects future C2 strategies AZ-339 / AZ-340.
- **F3** (new): principal-point fallback heuristic uses zero-detection
for "no calibration"; safe for production but could be tightened
when calibration becomes `Optional`.
No Critical, High, or Medium findings. AZ-337 may transition to
**In Testing**.
+1 -1
View File
@@ -8,7 +8,7 @@ status: in_progress
sub_step:
phase: 7
name: batch-loop
detail: "batch 46 complete — selecting batch 47"
detail: "batch 47 — AZ-337 (C2 UltraVPR primary backbone)"
retry_count: 0
cycle: 1
tracker: jira
@@ -0,0 +1,212 @@
"""UltraVPR backbone preprocessor (AZ-337).
UltraVPR's published preprocessing chain (per the research code drop):
decode the nav-camera frame's image to RGB uint8, centre-crop to a square
region respecting the camera calibration's principal point (or geometric
centre + WARN log when calibration is absent), resize to ``(384, 384)``,
apply ImageNet mean/std normalisation, cast to FP16, reshape to NCHW.
Differences from :class:`NetVladBackbonePreprocessor`:
- 384x384 input shape (vs 480x480 for NetVLAD).
- Calibration is CONSUMED — the principal point ``(cx, cy)`` from
``intrinsics_3x3`` anchors the centre-crop instead of using the
image's geometric centre. This matches the upstream UltraVPR
contract (AC-9: fall back to geometric centre + WARN when
calibration is unusable).
This preprocessor is C2-internal and owned exclusively by
:class:`UltraVprStrategy` — sharing across backbones is forbidden per
``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"IMAGENET_MEAN",
"IMAGENET_STD",
"ULTRA_VPR_INPUT_HW",
"UltraVprBackbonePreprocessor",
]
ULTRA_VPR_INPUT_HW: Final[tuple[int, int]] = (384, 384)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class UltraVprBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = ULTRA_VPR_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"UltraVprBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"UltraVprBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"UltraVprBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.ultra_vpr")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Per AZ-337 AC-9: when calibration is absent or its principal
point cannot be extracted from ``intrinsics_3x3``, fall back to
the image's geometric centre and emit ONE WARN log per call
with ``kind="c2.vpr.calibration_missing"``. Preprocessing
otherwise succeeds and AC-2 still holds.
Raises:
:class:`VprPreprocessError` on shape / dtype / decode
violations.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
"""Square-crop anchored on ``(cx, cy)`` from intrinsics_3x3.
Falls back to geometric centre + WARN log when calibration is
absent or its principal-point cannot be extracted.
"""
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"UltraVPR calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
# Clamp so the crop window stays inside the image; this matches
# the upstream UltraVPR contract (the principal point can be
# near the edge in wide-angle cameras).
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
# The identity matrix produces (cx, cy) == (0, 0) which is the
# top-left pixel; treat zeros as "not a real principal point"
# and fall back to geometric centre. (Test fixtures use
# ``np.eye(3)`` to mean "no calibration data".)
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,462 @@
"""``UltraVprStrategy`` - C2 production-default VprStrategy (AZ-337).
UltraVPR is the Documentary Lead's PRIMARY backbone per
``components/02_c2_vpr/description.md`` § 1 and is wired by default when
``config.c2_vpr.strategy == "ultra_vpr"``. UltraVPR runs on the C7
TensorRT runtime (AZ-298) or the ONNX-Runtime fallback (AZ-299) -
explicitly NOT on the PyTorch FP16 runtime (which is reserved for the
NetVLAD baseline). This runtime isolation lets a TRT engine compile
bug fall back to NetVLAD without simultaneously breaking both.
The strategy delegates retrieval to :class:`FaissBridge` (AZ-341) and
the c6 ``DescriptorIndex`` cut (AZ-507) - see
:mod:`gps_denied_onboard.components.c2_vpr._faiss_bridge`. Embedding
goes through the c7 :class:`InferenceRuntime` Protocol via the local
:class:`InferenceRuntimeCut` (AZ-507).
Architecture-registry differences from :class:`NetVladStrategy`:
UltraVPR consumes a pre-compiled ``.trt`` engine produced by C10's
engine compiler (AZ-321) - there is no PyTorch ``nn.Module`` to
register. The strategy module therefore does NOT expose
``MODEL_NAME`` / ``architecture_factory``; the composition root's
:func:`gps_denied_onboard.runtime_root.vpr_factory.\
_register_strategy_architecture` helper no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not 17
minutes into a flight when the first VPR query hits.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 384, 384)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 512) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (UltraVPR is
single-stage; no intra-cluster step like NetVLAD).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged. :class:`IndexUnavailableError`
from :class:`FaissBridge` (and through it from c6) is re-raised
unchanged (AC-10).
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_ultra_vpr import (
UltraVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "UltraVprStrategy", "create"]
# UltraVPR ships with a fixed published embedding dimension (D=512) per
# the upstream research code drop. Unlike NetVLAD (whose Linear PCA
# layer makes the output dimension a tunable knob), UltraVPR's
# embedding head is fused into the engine; making this a config-knob
# would let an operator silently break AC-2.1b. AC-5 / AC-6 / AC-7 of
# AZ-337 all assume 512.
DESCRIPTOR_DIM: Final[int] = 512
_BACKBONE_LABEL: Final[Literal["ultra_vpr"]] = "ultra_vpr"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class UltraVprStrategy:
"""C2 production-default VprStrategy backed by a TRT UltraVPR engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: UltraVprBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"UltraVprStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"UltraVPR forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"UltraVPR forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"UltraVPR forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"UltraVPR backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"UltraVPR preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _iso_ts_from_clock(clock: Clock) -> str:
# Same shape every component uses for FDR timestamps; AZ-508 will
# consolidate the duplicate helpers across c2/c11/c12/c6.
from datetime import datetime, timezone
ns = int(clock.time_ns())
seconds, fraction_ns = divmod(ns, 1_000_000_000)
dt = datetime.fromtimestamp(seconds, tz=timezone.utc)
return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}+00:00"
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> UltraVprStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
AC-11: UltraVPR is unselectable when the C7 TRT / ONNX-RT runtimes
are excluded - ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time (NOT at first frame).
AC-6: engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound.
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"UltraVPR requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}. Per AZ-337 AC-11, "
f"UltraVPR is unselectable when the C7 TRT / ONNX-RT "
f"runtimes are excluded."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"UltraVprStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.ultra_vpr")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = UltraVprBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return UltraVprStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: UltraVprBackbonePreprocessor,
) -> None:
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
+852
View File
@@ -0,0 +1,852 @@
"""AZ-337 - UltraVPR primary VprStrategy unit tests.
Covers AC-1..AC-12 + preprocessor contract + constructor validation +
FDR record emission + single-stage L2 normalisation. Uses fakes for
:class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`, and
:class:`FdrClient` so the suite stays AZ-507-clean and TRT-free.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
from unittest.mock import MagicMock
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineCacheEntry,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.components.c2_vpr import (
C2VprConfig,
IndexUnavailableError,
VprStrategy,
)
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor import (
BackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr._preprocessor_ultra_vpr import (
UltraVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.ultra_vpr import (
DESCRIPTOR_DIM,
UltraVprStrategy,
create,
)
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.fdr_client import FdrClient
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _StubClock:
next_monotonic_ns: int = 1_000_000_000
step_ns: int = 5_000
fixed_time_ns: int = 1_715_600_000_000_000_000
def monotonic_ns(self) -> int:
v = self.next_monotonic_ns
self.next_monotonic_ns += self.step_ns
return v
def time_ns(self) -> int:
return self.fixed_time_ns
def sleep_until_ns(self, target_ns: int) -> None:
_ = target_ns
class _FakeEngineHandle(EngineHandle):
"""Minimal :class:`EngineHandle` for test wiring."""
def __init__(self, label: str = "ultra_vpr") -> None:
self.label = label
@dataclass
class _FakeInferenceRuntime:
"""Configurable :class:`InferenceRuntimeCut` for unit tests.
``fixed_output`` is the array returned under ``embedding``; ``raises``
when set is raised instead. ``runtime_label`` controls AC-11.
"""
descriptor_dim: int = DESCRIPTOR_DIM
raises: BaseException | None = None
runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = (
"tensorrt"
)
fixed_output: np.ndarray | None = None
output_key: str = "embedding"
calls: list[dict[str, np.ndarray]] = field(default_factory=list)
deserialize_calls: list[EngineCacheEntry] = field(default_factory=list)
def compile_engine(
self, model_path: Path, build_config: BuildConfig
) -> EngineCacheEntry:
_ = build_config
return EngineCacheEntry(
engine_path=Path(model_path),
sha256_hex="0" * 64,
sm=None,
jp=None,
trt=None,
precision=PrecisionMode.FP16,
extras={"model_name": "ultra_vpr"},
)
def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle:
self.deserialize_calls.append(entry)
return _FakeEngineHandle(label=entry.extras.get("model_name", ""))
def infer(
self,
handle: EngineHandle,
inputs: dict[str, np.ndarray],
) -> dict[str, np.ndarray]:
_ = handle
self.calls.append({k: v.copy() for k, v in inputs.items()})
if self.raises is not None:
raise self.raises
if self.fixed_output is not None:
return {self.output_key: self.fixed_output.copy()}
rng = np.random.default_rng(0xCAFEBABE)
tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16)
return {
self.output_key: tensor.reshape(1, self.descriptor_dim).copy()
}
def release_engine(self, handle: EngineHandle) -> None:
_ = handle
def current_runtime_label(
self,
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]:
return self.runtime_label
@dataclass
class _FakeDescriptorIndex:
descriptor_dim_value: int = DESCRIPTOR_DIM
results: list[tuple[tuple[int, float, float], float]] = field(
default_factory=list
)
raises: BaseException | None = None
def search_topk(
self, query: np.ndarray, k: int
) -> list[tuple[tuple[int, float, float], float]]:
_ = query
if self.raises is not None:
raise self.raises
if not self.results:
return [
((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i)
for i in range(k)
]
return list(self.results[:k])
def descriptor_dim(self) -> int:
return self.descriptor_dim_value
def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame:
rng = np.random.default_rng(frame_id)
image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8)
return NavCameraFrame(
frame_id=frame_id,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="test_cam",
)
def _make_calibration(*, cx: float = 640.0, cy: float = 360.0) -> CameraCalibration:
"""Return a calibration with a non-trivial principal point.
The identity matrix used elsewhere in the tests collapses to
``(cx, cy) == (0, 0)`` which the preprocessor treats as
"no calibration data" - here we set explicit values to exercise
the principal-point-aware crop path.
"""
intrinsics = np.array(
[
[1000.0, 0.0, cx],
[0.0, 1000.0, cy],
[0.0, 0.0, 1.0],
],
dtype=np.float64,
)
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=intrinsics,
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="test_fixture",
)
def _make_calibration_identity() -> CameraCalibration:
"""Identity intrinsics - principal point collapses to (0, 0)."""
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="test_fixture",
)
def _make_fdr_client() -> FdrClient:
return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False)
def _build_strategy(
*,
inference_runtime: _FakeInferenceRuntime | None = None,
descriptor_index: _FakeDescriptorIndex | None = None,
normaliser: DescriptorNormaliser | None = None,
preprocessor: UltraVprBackbonePreprocessor | None = None,
fdr_client: FdrClient | None = None,
clock: _StubClock | None = None,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> UltraVprStrategy:
inference_runtime = inference_runtime or _FakeInferenceRuntime(
descriptor_dim=descriptor_dim
)
descriptor_index = descriptor_index or _FakeDescriptorIndex(
descriptor_dim_value=descriptor_dim
)
normaliser = normaliser or DescriptorNormaliser()
preprocessor = preprocessor or UltraVprBackbonePreprocessor()
fdr_client = fdr_client or _make_fdr_client()
clock = clock or _StubClock()
handle = _FakeEngineHandle()
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger("test.bridge"),
clock=clock,
)
return UltraVprStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger("test.ultra_vpr"),
descriptor_dim=descriptor_dim,
)
def _build_config() -> Config:
"""Minimal Config carrying only the c2_vpr block needed by ``create()``."""
c2 = C2VprConfig(
strategy="ultra_vpr",
backbone_weights_path=Path("/models/ultra_vpr.trt"),
faiss_index_path=Path("/cache/vpr/index.faiss"),
warn_top1_threshold=0.30,
debug_per_frame_distances=False,
)
cfg = MagicMock(spec=Config)
cfg.components = {"c2_vpr": c2}
return cfg
# ---------------------------------------------------------------------------
# AC-1: Protocol conformance
# ---------------------------------------------------------------------------
def test_ac1_protocol_conformance() -> None:
strategy = _build_strategy()
assert isinstance(strategy, VprStrategy)
# ---------------------------------------------------------------------------
# AC-2: embed_query produces L2-normalised FP16 (512,) embedding
# ---------------------------------------------------------------------------
def test_ac2_embed_query_returns_unit_norm_fp16_512() -> None:
# Arrange
runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM)
strategy = _build_strategy(inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
# Act
query = strategy.embed_query(frame, calibration)
# Assert
embedding = np.asarray(query.embedding)
assert embedding.shape == (DESCRIPTOR_DIM,)
assert embedding.dtype == np.float16
norm = float(np.linalg.norm(embedding.astype(np.float32)))
assert norm == pytest.approx(1.0, abs=1e-3)
def test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path() -> None:
"""UltraVPR is single-stage L2 (unlike NetVLAD's two-stage chain).
Calling :meth:`DescriptorNormaliser.intra_cluster_normalise` would
be a bug; verify the strategy never invokes it.
"""
calls: list[str] = []
class _SpyNormaliser(DescriptorNormaliser):
def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override]
calls.append("l2_normalise")
return DescriptorNormaliser.l2_normalise(descriptor)
def intra_cluster_normalise( # type: ignore[override]
self, descriptor: np.ndarray, num_clusters: int
) -> np.ndarray:
calls.append("intra_cluster_normalise")
return DescriptorNormaliser.intra_cluster_normalise(
descriptor, num_clusters
)
spy = _SpyNormaliser()
strategy = _build_strategy(normaliser=spy)
strategy.embed_query(_make_frame(), _make_calibration())
assert "intra_cluster_normalise" not in calls
assert calls == ["l2_normalise"]
# ---------------------------------------------------------------------------
# AC-3: embed_query is deterministic
# ---------------------------------------------------------------------------
def test_ac3_embed_query_deterministic_for_same_frame() -> None:
fixed = np.zeros((1, DESCRIPTOR_DIM), dtype=np.float16)
rng = np.random.default_rng(2026)
fixed[0] = rng.standard_normal(DESCRIPTOR_DIM).astype(np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, fixed_output=fixed
)
strategy = _build_strategy(inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
first = strategy.embed_query(frame, calibration)
second = strategy.embed_query(frame, calibration)
third = strategy.embed_query(frame, calibration)
np.testing.assert_array_equal(
np.asarray(first.embedding), np.asarray(second.embedding)
)
np.testing.assert_array_equal(
np.asarray(second.embedding), np.asarray(third.embedding)
)
# ---------------------------------------------------------------------------
# AC-4: retrieve_topk returns exactly k candidates sorted ascending
# ---------------------------------------------------------------------------
def test_ac4_retrieve_topk_returns_exactly_k_with_ultra_vpr_label() -> None:
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM)
strategy = _build_strategy(descriptor_index=descriptor_index)
query = strategy.embed_query(_make_frame(), _make_calibration())
result = strategy.retrieve_topk(query, k=10)
assert len(result.candidates) == 10
assert result.backbone_label == "ultra_vpr"
assert result.candidates[0].descriptor_dim == DESCRIPTOR_DIM
distances = [c.descriptor_distance for c in result.candidates]
assert distances == sorted(distances)
# ---------------------------------------------------------------------------
# AC-5: descriptor_dim() is stable and returns 512
# ---------------------------------------------------------------------------
def test_ac5_descriptor_dim_stable_returns_512() -> None:
strategy = _build_strategy()
for _ in range(100):
assert strategy.descriptor_dim() == 512
# ---------------------------------------------------------------------------
# AC-6: Engine output shape mismatch at create() -> ConfigError
# ---------------------------------------------------------------------------
def test_ac6_create_rejects_engine_output_shape_mismatch() -> None:
# Arrange - engine produces (1, 256), expected (1, 512)
wrong = np.zeros((1, 256), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, fixed_output=wrong
)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM)
fdr_client = _make_fdr_client()
config = _build_config()
# Act + Assert
with pytest.raises(
ConfigError, match=r"engine output shape mismatch.*\(1, 512\).*\(1, 256\)"
):
create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
)
def test_ac6_create_rejects_engine_with_missing_embedding_key() -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, output_key="wrong_key"
)
with pytest.raises(ConfigError, match=r"'embedding' key absent"):
create(
_build_config(),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=DESCRIPTOR_DIM
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-7: VprBackboneError on forward-pass failure
# ---------------------------------------------------------------------------
def test_ac7_runtime_error_yields_vpr_backbone_error(
caplog: pytest.LogCaptureFixture,
) -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, raises=RuntimeError("CUDA OOM")
)
fdr_client = _make_fdr_client()
strategy = _build_strategy(
inference_runtime=runtime, fdr_client=fdr_client
)
with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"):
with pytest.raises(VprBackboneError):
strategy.embed_query(_make_frame(), _make_calibration())
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.backbone_error"
for record in caplog.records
)
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"]
assert len(backbone_errors) == 1
def test_ac7_missing_embedding_key_yields_vpr_backbone_error() -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, output_key="not_embedding"
)
strategy = _build_strategy(inference_runtime=runtime)
with pytest.raises(VprBackboneError, match=r"'embedding' key"):
strategy.embed_query(_make_frame(), _make_calibration())
def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error() -> None:
bad = np.zeros((1, 256), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, fixed_output=bad
)
strategy = _build_strategy(inference_runtime=runtime)
with pytest.raises(VprBackboneError, match=r"expected \(1, 512\)"):
strategy.embed_query(_make_frame(), _make_calibration())
# ---------------------------------------------------------------------------
# AC-8: VprPreprocessError on corrupt image bytes
# ---------------------------------------------------------------------------
def test_ac8_corrupt_image_yields_vpr_preprocess_error(
caplog: pytest.LogCaptureFixture,
) -> None:
fdr_client = _make_fdr_client()
strategy = _build_strategy(fdr_client=fdr_client)
frame = NavCameraFrame(
frame_id=4242,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image="not-an-array",
camera_calibration_id="test_cam",
)
with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"):
with pytest.raises(VprPreprocessError):
strategy.embed_query(frame, _make_calibration())
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.preprocess_error"
for record in caplog.records
)
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
preprocess_errors = [
r for r in records if r.kind == "vpr.preprocess_error"
]
assert len(preprocess_errors) == 1
def test_ac8_wrong_dtype_image_yields_vpr_preprocess_error() -> None:
strategy = _build_strategy()
bad_image = np.zeros((720, 1280, 3), dtype=np.float32)
frame = NavCameraFrame(
frame_id=42,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=bad_image,
camera_calibration_id="test_cam",
)
with pytest.raises(VprPreprocessError, match=r"uint8"):
strategy.embed_query(frame, _make_calibration())
# ---------------------------------------------------------------------------
# AC-9: Calibration absent / identity -> centre-crop fallback + WARN log
# ---------------------------------------------------------------------------
def test_ac9_identity_calibration_falls_back_to_geometric_centre(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Identity intrinsics produce ``(cx, cy) == (0, 0)`` which the
preprocessor treats as missing calibration data.
"""
preprocessor_logger = logging.getLogger("test.ultra_vpr.pp")
preprocessor = UltraVprBackbonePreprocessor(logger=preprocessor_logger)
strategy = _build_strategy(preprocessor=preprocessor)
with caplog.at_level(logging.WARNING, logger="test.ultra_vpr.pp"):
query = strategy.embed_query(
_make_frame(), _make_calibration_identity()
)
warn_records = [
r
for r in caplog.records
if getattr(r, "kind", None) == "c2.vpr.calibration_missing"
]
assert len(warn_records) == 1
# AC-2 still holds with the fallback path
norm = float(np.linalg.norm(np.asarray(query.embedding).astype(np.float32)))
assert norm == pytest.approx(1.0, abs=1e-3)
def test_ac9_principal_point_offset_changes_crop_window() -> None:
"""The principal-point-aware crop produces a different output than
the geometric-centre crop when the principal point is non-central.
"""
rng = np.random.default_rng(0xABCD)
image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="cam",
)
pp = UltraVprBackbonePreprocessor()
cal_centre = _make_calibration(cx=640.0, cy=360.0)
cal_offset = _make_calibration(cx=900.0, cy=200.0)
out_centre = pp.preprocess(frame, cal_centre)
out_offset = pp.preprocess(frame, cal_offset)
assert not np.array_equal(out_centre, out_offset)
# ---------------------------------------------------------------------------
# AC-10: IndexUnavailableError propagated unchanged from retrieve_topk
# ---------------------------------------------------------------------------
def test_ac10_index_unavailable_propagates_unchanged() -> None:
err = IndexUnavailableError("stale handle")
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=DESCRIPTOR_DIM, raises=err
)
strategy = _build_strategy(descriptor_index=descriptor_index)
query = strategy.embed_query(_make_frame(), _make_calibration())
with pytest.raises(IndexUnavailableError, match=r"stale handle"):
strategy.retrieve_topk(query, k=10)
# ---------------------------------------------------------------------------
# AC-11: composition-root wiring + INFO log "c2.vpr.ready"
# ---------------------------------------------------------------------------
def test_ac11_create_emits_strategy_ready_info_log(
caplog: pytest.LogCaptureFixture,
) -> None:
runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM)
fdr_client = _make_fdr_client()
config = _build_config()
logger = logging.getLogger("test.ultra_vpr.create")
with caplog.at_level(logging.INFO, logger="test.ultra_vpr.create"):
strategy = create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
logger=logger,
)
assert isinstance(strategy, UltraVprStrategy)
assert strategy.descriptor_dim() == 512
ready_logs = [
r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready"
]
assert len(ready_logs) == 1
kv = ready_logs[0].kv # type: ignore[attr-defined]
assert kv["strategy"] == "ultra_vpr"
assert kv["descriptor_dim"] == 512
def test_ac11_non_trt_runtime_rejected_at_create() -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, runtime_label="pytorch_fp16"
)
config = _build_config()
with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME=ON"):
create(
config,
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=DESCRIPTOR_DIM
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
def test_ac11_onnx_trt_ep_runtime_accepted_at_create() -> None:
"""ONNX-Runtime is the documented fallback (per AZ-337 description)."""
runtime = _FakeInferenceRuntime(
descriptor_dim=DESCRIPTOR_DIM, runtime_label="onnx_trt_ep"
)
strategy = create(
_build_config(),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=DESCRIPTOR_DIM
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
assert isinstance(strategy, UltraVprStrategy)
# ---------------------------------------------------------------------------
# AC-12: WARN log on top-1 distance above threshold (delegated to FaissBridge)
# ---------------------------------------------------------------------------
def test_ac12_top1_above_threshold_emits_warn_via_faiss_bridge(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange - corpus returns top-1 distance 0.42 > 0.30 default threshold
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=DESCRIPTOR_DIM,
results=[
((1, 49.0, 36.0), 0.42),
((2, 49.001, 36.001), 0.51),
((3, 49.002, 36.002), 0.65),
],
)
strategy = _build_strategy(descriptor_index=descriptor_index)
query = strategy.embed_query(_make_frame(), _make_calibration())
with caplog.at_level(logging.WARNING, logger="test.bridge"):
strategy.retrieve_topk(query, k=3)
warn_records = [
r
for r in caplog.records
if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold"
]
assert len(warn_records) == 1
kv = warn_records[0].kv # type: ignore[attr-defined]
assert kv["distance"] == pytest.approx(0.42)
assert kv["threshold"] == pytest.approx(0.30)
assert kv["backbone_label"] == "ultra_vpr"
# ---------------------------------------------------------------------------
# Preprocessor contract
# ---------------------------------------------------------------------------
def test_preprocessor_output_shape_and_dtype() -> None:
pp = UltraVprBackbonePreprocessor()
rng = np.random.default_rng(2026)
image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="cam",
)
out = pp.preprocess(frame, _make_calibration())
assert out.shape == (1, 3, 384, 384)
assert out.dtype == np.float16
def test_preprocessor_input_shape_is_384x384() -> None:
pp = UltraVprBackbonePreprocessor()
assert pp.input_shape() == (384, 384)
def test_preprocessor_protocol_conformance() -> None:
pp = UltraVprBackbonePreprocessor()
assert isinstance(pp, BackbonePreprocessor)
def test_preprocessor_accepts_grayscale_input() -> None:
pp = UltraVprBackbonePreprocessor()
gray = np.zeros((512, 512), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=gray,
camera_calibration_id="cam",
)
out = pp.preprocess(frame, _make_calibration())
assert out.shape == (1, 3, 384, 384)
def test_preprocessor_mean_std_correct_on_grey_image() -> None:
"""A uniform-grey image should produce per-channel ``(grey - mean) / std``."""
pp = UltraVprBackbonePreprocessor()
grey = np.full((512, 512, 3), 128, dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=grey,
camera_calibration_id="cam",
)
out = pp.preprocess(frame, _make_calibration())
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
expected = (128.0 / 255.0 - mean) / std
actual_per_channel = (
out[0].astype(np.float32).reshape(3, -1).mean(axis=1)
)
np.testing.assert_allclose(actual_per_channel, expected, atol=1e-2)
# ---------------------------------------------------------------------------
# Constructor validation
# ---------------------------------------------------------------------------
def _make_minimal_strategy_kwargs(*, descriptor_dim: int) -> dict[str, Any]:
"""Build kwargs that pass FaissBridge guards.
The strategy carries its own ``descriptor_dim`` validation; the
bridge has a separate (stricter) ``descriptor_dim > 0`` guard.
Tests that exercise the strategy's own validators MUST use a bridge
with a valid dim.
"""
fdr_client = _make_fdr_client()
clock = _StubClock()
bridge = FaissBridge(
descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=512),
descriptor_dim=512,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger("test.bridge.guard"),
clock=clock,
)
return {
"inference_runtime": _FakeInferenceRuntime(),
"engine_handle": _FakeEngineHandle(),
"descriptor_index": _FakeDescriptorIndex(),
"preprocessor": UltraVprBackbonePreprocessor(),
"normaliser": DescriptorNormaliser(),
"faiss_bridge": bridge,
"fdr_client": fdr_client,
"clock": clock,
"logger": logging.getLogger("test.ultra_vpr.guard"),
"descriptor_dim": descriptor_dim,
}
def test_constructor_rejects_zero_descriptor_dim() -> None:
with pytest.raises(ValueError, match=r">= 1"):
UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=0))
def test_constructor_rejects_negative_descriptor_dim() -> None:
with pytest.raises(ValueError, match=r">= 1"):
UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=-5))
# ---------------------------------------------------------------------------
# FDR record emission
# ---------------------------------------------------------------------------
def test_embed_query_emits_vpr_embed_query_fdr_record() -> None:
fdr_client = _make_fdr_client()
strategy = _build_strategy(fdr_client=fdr_client)
strategy.embed_query(_make_frame(), _make_calibration())
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
embed_records = [r for r in records if r.kind == "vpr.embed_query"]
assert len(embed_records) == 1
payload = embed_records[0].payload
assert payload["backbone_label"] == "ultra_vpr"
assert payload["descriptor_dim"] == 512
assert isinstance(payload["latency_us"], int)
assert payload["latency_us"] > 0
def test_create_does_not_register_pytorch_architecture() -> None:
"""UltraVPR uses a TRT engine - no PyTorch architecture registration.
Verifies the strategy module does NOT expose ``MODEL_NAME`` /
``architecture_factory`` attributes (which would trigger registration
in the composition root).
"""
import gps_denied_onboard.components.c2_vpr.ultra_vpr as mod
assert not hasattr(mod, "MODEL_NAME")
assert not hasattr(mod, "architecture_factory")