[AZ-340] C2 SelaVPR + EigenPlaces + SALAD secondary VPR backbones

Three new VprStrategy implementations for IT-12 comparative-study
(research binary only, gated OFF for airborne / operator-tooling per
ADR-002). All run via the C7 TensorRT runtime (or ONNX-RT fallback)
with their own concrete BackbonePreprocessor, single-stage L2
normalisation, and FaissBridge-delegated retrieval — same pattern as
AZ-339 (MegaLoc + MixVPR), parametrised in tests for compactness.

  * SelaVprStrategy   — D=512,  input 224x224
  * EigenPlacesStrategy — D=2048, input 480x480
  * SaladStrategy     — D=8448, input 322x322 (DINOv2-Large backbone;
                        heaviest in the C2 family — NFR-perf budget
                        relaxed to 120 ms p95 / 1200 MB GPU per task
                        spec)

The composition-root factory tables and KNOWN_STRATEGIES set were
already pre-wired at AZ-336 land time; module-layout.md already names
all three Internal entries and BUILD_VPR_* rows. No CMake change
required (env-flag gating).

54 unit tests (3 strategies * 18 cases) cover AC-1..AC-11 plus extras
(single-stage L2, NCHW FP16, constructor validation, FDR emission).
All pass; sibling c2_vpr suite + composition-root regression + AZ-526
iso-ts regression all green.

Code review verdict: PASS_WITH_WARNINGS. Two Low findings logged in
batch_51_review.md: F1 escalates `_assert_engine_output_dim`
duplication from 4-way to 7-way (already tracked by AZ-527 hygiene
PBI; will surface in cumulative review batches 49-51); F2 mirrors the
AZ-337 / 338 / 339 AC-10 spec-drift precedent (literal
ConfigurationError vs implemented ConfigError / StrategyNotAvailable).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 00:32:38 +03:00
parent e81616a09d
commit 87909cce9f
8 changed files with 2970 additions and 0 deletions
@@ -0,0 +1,165 @@
# Code Review Report — Batch 51
**Batch**: 51
**Tasks**: AZ-340 (C2 SelaVPR + EigenPlaces + SALAD Secondary Backbones — Research-only, 5 pts)
**Date**: 2026-05-14
**Verdict**: **PASS_WITH_WARNINGS** (2 Low findings — one is a carry-over already tracked by AZ-527, the other mirrors the documented AZ-337/338/339 spec-drift precedent)
## Phase 1 — Context Loading
Reviewed:
- `_docs/02_tasks/todo/AZ-340_c2_selavpr_eigenplaces_salad.md` — task spec, ACs, NFRs, scope, constraints, runtime completeness section.
- `_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md` (referenced; behavioural Protocol contract).
- `_docs/02_document/components/02_c2_vpr/description.md` § 1, § 5, § 6 (referenced; secondary backbone designation, list of strategies including SALAD, preprocessor-duplication policy).
- `_docs/02_document/module-layout.md` — Component: c2_vpr Internal entry already pre-declares `sela_vpr.py`, `eigen_places.py`, `salad.py`; `BUILD_VPR_SELAVPR` / `BUILD_VPR_EIGENPLACES` / `BUILD_VPR_SALAD` rows already in the build-flag matrix.
- Existing AZ-339 implementation (`mega_loc.py` / `mix_vpr.py` / `_preprocessor_*.py`) as the architectural template — AZ-340 is the explicit twin task (research-only secondary backbones via TRT, no PyTorch architecture registry).
- Composition root `runtime_root/vpr_factory.py``_STRATEGY_TO_BUILD_FLAG` and `_STRATEGY_TO_MODULE` already include `sela_vpr` / `eigen_places` / `salad` rows (pre-wired at AZ-336 land time).
- C2 config `KNOWN_STRATEGIES` already includes the three names.
Files mapped to AZ-340:
| Task | Files |
|------|-------|
| AZ-340 | `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py` (new), `eigen_places.py` (new), `salad.py` (new), `_preprocessor_sela_vpr.py` (new), `_preprocessor_eigen_places.py` (new), `_preprocessor_salad.py` (new), `tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py` (new) |
## Phase 2 — Spec Compliance Review
| AC | Status | Test reference | Notes |
|----|--------|----------------|-------|
| AC-1 (each) | PASS | `test_ac1_protocol_conformance[sela_vpr|eigen_places|salad]` | `isinstance(..., VprStrategy)` returns True for all three. |
| AC-2 (each) | PASS | `test_ac2_embed_query_returns_unit_norm_fp16_correct_dim[*]` + `test_ac2_single_stage_l2_no_intra_cluster_call[*]` | Shape (512,) / (2048,) / (8448,); FP16 dtype; ‖x‖₂ ≈ 1.0 ± 1e-3. Single-stage L2 confirmed by spy normaliser; no `intra_cluster_normalise` call. |
| AC-3 (each) | PASS | `test_ac3_embed_query_deterministic_for_same_frame[*]` | Bit-exact embeddings across 3 calls on same frame. |
| AC-4 (each) | PASS | `test_ac4_retrieve_topk_returns_exactly_k_with_correct_label[*]` | `len == 10`, ascending distances, correct `backbone_label`, correct `descriptor_dim`. |
| AC-5 (each) | PASS | `test_ac5_descriptor_dim_stable[*]` | 100 calls, returns 512 / 2048 / 8448. |
| AC-6 (each) | PASS | `test_ac6_create_rejects_engine_output_shape_mismatch[*]` + `test_ac6_create_rejects_missing_embedding_key[*]` | `ConfigError` raised at create-time; the dry-run probe matches each preprocessor's `input_shape()`. |
| AC-7 (each) | PASS | `test_ac7_runtime_error_yields_vpr_backbone_error[*]` + `test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error[*]` | `RuntimeError` / wrong-shape outputs raise `VprBackboneError`; ERROR log + FDR record emitted. |
| AC-8 (each) | PASS | `test_ac8_corrupt_image_yields_vpr_preprocess_error[*]` | Non-array `frame.image` raises `VprPreprocessError`; ERROR log + FDR record emitted. |
| AC-9 (each) | PASS | `test_ac9_create_emits_ready_log_with_correct_label_and_dim[*]` | INFO log `kind="c2.vpr.ready"` with correct `{strategy, descriptor_dim}`. |
| AC-10 (each) | PASS with drift | `test_ac10_runtime_label_mismatch_raises_config_error[*]` | See **F2** below — same documented precedent as AZ-337 / AZ-338 / AZ-339. |
| AC-11 (each) | PASS | `test_ac11_preprocessor_input_shape[*]` + `test_preprocess_output_is_nchw_fp16[*]` | `(224, 224)` / `(480, 480)` / `(322, 322)`. |
**Constraint compliance**:
- Each strategy ships its own concrete preprocessor (✓).
- Centre-crop logic duplicated, NOT shared (✓ — explicit duplication, comment references description.md § 6).
- All three use TensorRT runtime via `InferenceRuntimeCut` (✓).
- No engine compilation in this task (✓ — `inference_runtime.compile_engine` is the local TRT runtime; the actual `.trt` file source is C10 / AZ-321).
- All three hold engine handles, not engines (✓).
- No GPU operations in `__init__` beyond engine load (✓ — engine load is in `create()`, not in `__init__`).
- SALAD's 8448-d output is non-negotiable, no PCA at strategy-level (✓ — DESCRIPTOR_DIM = 8448 is a Final constant; PCA gating left for a future `BUILD_VPR_SALAD_PCA` flag, out of scope).
- Logging + FDR records mirror UltraVPR / MegaLoc / MixVPR pattern (✓).
## Phase 3 — Code Quality Review
- **SOLID**: each strategy is a single class with one responsibility (one backbone). Constructor injection throughout. Module-level `create()` factory keeps composition concerns out of `__init__`.
- **Error handling**: explicit `try` / `raise` for backbone failures; `_wrap_backbone_error` rewraps general `Exception` into the typed envelope. No bare `except`. ERROR logs emitted with structured `extra={...}`.
- **Naming**: `SelaVprStrategy` / `EigenPlacesStrategy` / `SaladStrategy`, `_BACKBONE_LABEL`, `DESCRIPTOR_DIM`, `_OUTPUT_KEY`, `_LOG_KIND_*`, `_FDR_KIND_*` — consistent with the existing C2 strategy modules.
- **Complexity**: `embed_query` ≈ 50 lines per strategy (matches MegaLoc / MixVPR; under the 50-line / cyc-10 threshold). `create()` ≈ 60 lines but mostly orchestration; mirrors the AZ-339 baseline.
- **DRY**: significant duplication across three strategies + three preprocessors. Documented as intentional per `description.md` § 6 + module-level docstrings + tracked by AZ-527 (the planned hygiene PBI scoped at AZ-339 land). See **F1** below.
- **Test quality**: parametrised across the three strategies; each test asserts meaningful behaviour (shape, dtype, norm, label, error type, log kind, FDR record kind). 54/54 PASS.
- **Dead code**: none — `ruff check` clean across all six new modules and the test file.
## Phase 4 — Security Quick-Scan
- No SQL, no `subprocess`, no `eval` / `exec`, no `pickle` deserialisation.
- No hardcoded secrets / credentials / API keys.
- No external input ingested without validation (`_coerce_to_rgb_uint8` rejects non-numpy / wrong-dtype / wrong-shape image bytes).
- Sensitive data in logs: only `frame_id`, `backbone_label`, `descriptor_dim`, `error_type`, error-message string truncated to 512 chars. No PII / no calibration intrinsics / no embedding payloads. ✓
## Phase 5 — Performance Scan
- No O(n²) algorithms in the new code paths. FAISS retrieval is delegated to `FaissBridge` (AZ-341) and runs once per query.
- No N+1 query patterns (no DB access in c2_vpr).
- No unbounded data fetching.
- No blocking I/O in async contexts (codebase is synchronous; the strategies are explicitly single-threaded per INV-1).
- Memory copies: `np.ascontiguousarray(...)` on `raw[0]` produces one copy of the (D,) embedding before normalisation; `preprocessor.preprocess` produces one (1, 3, H, W) FP16 NCHW buffer. Both are intrinsic to the per-frame VPR contract; no avoidable allocations in the hot path.
- SALAD's 8448-d output and 322×322 input are acknowledged in the task spec NFR-perf budget (≤ 120 ms / ≤ 1200 MB GPU; ≤ 6 ms FAISS lookup); no performance regression introduced by the strategy code itself — the cost is intrinsic to the chosen backbone.
## Phase 6 — Cross-Task Consistency
Batch 51 contains a single task (AZ-340), so cross-task drift inside the batch is not applicable.
Inter-batch consistency with the prior C2 batches (45 / 46 / 47 / 50):
- All four C2 secondary strategy modules (mega_loc, mix_vpr, sela_vpr, eigen_places, salad — five total now) share the identical TRT-only contract and embed_query / retrieve_topk / descriptor_dim shapes. ✓
- All four use `iso_ts_from_clock` from the AZ-526 helper from day one. ✓
- All four use the local `_faiss_bridge.FaissBridge` and the AZ-507 `inference_runtime_cut` / `descriptor_index_cut` Protocol cuts. ✓
- All four use ImageNet mean/std normalisation in their preprocessor. ✓
## Phase 7 — Architecture Compliance
1. **Layer direction**: every import in the six new files resolves to a Layer-1 (`_types`, `helpers`, `config`, `logging`, `fdr_client`, `clock`) or same-component (`c2_vpr.*`) target. Verified via `from ... import` survey of each new file. ✓ No layer-direction violations.
2. **Public API respect**: no `from gps_denied_onboard.components.c6_tile_cache import ...` and no `from gps_denied_onboard.components.c7_inference import ...` in any of the new files. The c6 / c7 surfaces are reached only via the constructor-injected `DescriptorIndexCut` / `InferenceRuntimeCut` Protocols (AZ-507 pattern). ✓
3. **No new cyclic module dependencies**: the strategy modules are leaf modules within `c2_vpr/`. No new cycles introduced. ✓
4. **Duplicate symbols across components**: all new symbols (`SelaVprStrategy`, `EigenPlacesStrategy`, `SaladStrategy`, `*BackbonePreprocessor`, `DESCRIPTOR_DIM`, `*_INPUT_HW`) live exclusively under `src/gps_denied_onboard/components/c2_vpr/`. No duplication into other components.
Within `c2_vpr/` itself: the `_assert_engine_output_dim` helper is now duplicated 7-way (was 4-way after AZ-339, becomes 7-way after AZ-340). This is **F1** below; the duplication is intentional and tracked by AZ-527.
5. **Cross-cutting concerns not locally re-implemented**: no local `_iso_ts_from_clock`; no local logger setup; no local config loading; no local FDR record schema. All cross-cutting work goes through the established shared modules. ✓ AZ-526 regression guard (the AST scan in `test_az508_iso_timestamps.py`) confirms no new module-level `_iso_ts_from_clock` / `_iso_ts_now` definitions.
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| F1 | Low | Maintainability | `c2_vpr/{sela_vpr,eigen_places,salad}.py` (3 new copies) + `c2_vpr/{ultra_vpr,net_vlad,mega_loc,mix_vpr}.py` (4 pre-existing copies) | `_assert_engine_output_dim` now 7-way duplicated across c2_vpr strategies |
| F2 | Low | Spec-Gap (Documentation drift) | `c2_vpr/{sela_vpr,eigen_places,salad}.py` `create()` runtime-label guard | AZ-340 AC-10 literally names `ConfigurationError`; implementation raises `ConfigError` (matching AZ-337 / AZ-338 / AZ-339 precedent) |
### Finding Details
**F1: `_assert_engine_output_dim` 7-way duplicated across c2_vpr strategies (Low / Maintainability)**
- **Locations** (3 new + 4 pre-existing copies):
- `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py:_assert_engine_output_dim` (new)
- `src/gps_denied_onboard/components/c2_vpr/eigen_places.py:_assert_engine_output_dim` (new)
- `src/gps_denied_onboard/components/c2_vpr/salad.py:_assert_engine_output_dim` (new)
- `src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py:_assert_engine_output_dim` (pre-existing)
- `src/gps_denied_onboard/components/c2_vpr/net_vlad.py:_assert_engine_output_dim` (pre-existing)
- `src/gps_denied_onboard/components/c2_vpr/mega_loc.py:_assert_engine_output_dim` (pre-existing)
- `src/gps_denied_onboard/components/c2_vpr/mix_vpr.py:_assert_engine_output_dim` (pre-existing)
- **Description**: Each copy is the same ~30-line helper that runs a zero-init dry-run inference, asserts the engine's output dict has an `"embedding"` key, and asserts the output ndarray shape is `(1, DESCRIPTOR_DIM)`. The only thing that differs across the 7 copies is the local `DESCRIPTOR_DIM` constant. The duplication is **intentional** and **tracked**: each copy carries an in-line comment referencing AZ-527 (the hygiene PBI scoped in parallel with AZ-339 land).
- **Suggestion**: extract to a c2-internal helper module `c2_vpr/_engine_dim_assertion.py` (or similar) under AZ-527. The helper would take `descriptor_dim` and `_assert_engine_output_dim(inference_runtime, handle, preprocessor, descriptor_dim)` could be a single shared function. Each strategy keeps its own `DESCRIPTOR_DIM` constant; the helper reads the dim from the parameter.
- **Why Low (not Medium)**: this finding is the carry-over of cumulative review batches 46-48 F2 (also Low) — the duplication has been growing since AZ-337 / AZ-338. AZ-527 is already on the backlog; AZ-340 was scoped to land the three secondary strategies, and pulling the consolidation forward would have expanded AZ-340's scope past the spec.
- **Cumulative-review handoff**: this finding will surface in the cumulative review for batches 4951 (about to run after batch 51 commits). AZ-527 closes it.
- **Task**: AZ-340
**F2: AC-10 spec drift — `ConfigError` raised instead of literally-named `ConfigurationError` (Low / Spec-Gap, documentation drift)**
- **Locations**:
- `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py` `create()` runtime-label guard (line ~365)
- `src/gps_denied_onboard/components/c2_vpr/eigen_places.py` `create()` runtime-label guard (line ~365)
- `src/gps_denied_onboard/components/c2_vpr/salad.py` `create()` runtime-label guard (line ~365)
- **Description**: AZ-340 AC-10 literally specifies `ConfigurationError` for the build-flag-OFF case. The implementation raises `StrategyNotAvailableError` (composition root, env-flag check; thrown BEFORE `create()` is reached) and `ConfigError` (strategy module's own runtime-label guard). Both are composition-time fail-fast errors. AZ-337 / AZ-338 / AZ-339 follow this same pattern; AZ-340 mirrors them.
- **Suggestion**: amend the AC-10 wording in a future spec pass (`AZ-340_c2_selavpr_eigenplaces_salad.md` already archived; the documentation drift will be cleared by a single shared spec-pass that touches all five secondary backbones). No code change required. The AC-10 test (`test_ac10_runtime_label_mismatch_raises_config_error`) carries an in-line docstring that documents the precedent.
- **Task**: AZ-340
## Baseline Delta
`_docs/02_document/architecture_compliance_baseline.md` does not exist for this project — Phase 7 ran without baseline-delta partitioning. No tables to emit.
## Test Results
- `tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py`**54 / 54 PASS** (3 strategies × 18 test cases).
- `tests/unit/c2_vpr/` (full directory: faiss_bridge + net_vlad + ultra_vpr + AZ-339 + AZ-340 + protocol_conformance) — **180 / 180 PASS**.
- `tests/unit/test_az270_compose_root.py`**8 / 8 PASS** (no composition-root regressions; the factory routing for `sela_vpr` / `eigen_places` / `salad` was pre-wired at AZ-336 land time).
- `tests/unit/test_az508_iso_timestamps.py`**18 / 18 PASS** (AZ-526 regression guard confirms no new `_iso_ts_from_clock` / `_iso_ts_now` duplicates introduced by AZ-340).
- `ruff check` on all 7 new files — **clean** (one auto-fixable `RUF022 __all__ not sorted` in `_preprocessor_eigen_places.py` was caught and fixed before commit).
## Auto-Fix Eligibility
| Finding | Severity | Eligible? | Action |
|---------|----------|-----------|--------|
| F1 | Low / Maintainability | Yes (per the auto-fix matrix) | NOT auto-fixed in batch 51 — it requires creating an `AZ-527` hygiene PBI and is scoped as a separate batch. Logged as a follow-on for AZ-527. The cumulative review for batches 4951 will re-surface it. |
| F2 | Low / Spec-Gap | No (Spec-Gap is escalate per matrix) | Documentation drift — no code change required. Logged for the next spec-pass. |
## Verdict
**PASS_WITH_WARNINGS** — no Critical or High findings. The two Low findings are both pre-existing patterns (F1 is a 4 → 7 escalation of an already-tracked duplication; F2 is the same precedent as AZ-337 / 338 / 339). The implement skill auto-fix gate proceeds to commit without user intervention.
## Follow-on Work
- **AZ-527** (Hygiene — consolidate `_assert_engine_output_dim` into a c2-internal helper). 2 points. Now must consolidate 7 copies, not 4. Depends on AZ-340. To be created and prioritised by the cumulative review for batches 4951.
- **Spec-pass** to align AC-10 wording across AZ-337 / AZ-338 / AZ-339 / AZ-340 task specs with the implemented `ConfigError` / `StrategyNotAvailableError` envelope. Out of scope for the implement skill; should be scheduled in a documentation-sync batch.
@@ -0,0 +1,201 @@
"""EigenPlaces backbone preprocessor (AZ-340).
EigenPlaces' published preprocessing chain (per the upstream research
code drop): decode the nav-camera frame's image to RGB uint8,
centre-crop to a square region respecting the camera calibration's
principal point (or geometric centre + WARN log when calibration is
absent), resize to ``(480, 480)``, apply ImageNet mean/std
normalisation, cast to FP16, reshape to NCHW.
Differences from the other C2 secondary preprocessors:
- 480x480 input shape (vs SelaVPR's 224x224, MegaLoc's 322x322,
MixVPR's 320x320, SALAD's 322x322). EigenPlaces is the highest
spatial-resolution preprocessor in the C2 family.
- Same calibration-aware centre-crop and ImageNet mean/std — these
upstream conventions happen to align across several backbones but
are NOT a shared dependency: the centre-crop logic is duplicated
here per ``components/02_c2_vpr/description.md`` § 6 so a future
EigenPlaces code drop can change its preprocessing without coupling
other strategies' weights-versions.
This preprocessor is C2-internal and owned exclusively by
:class:`EigenPlacesStrategy` — sharing across backbones is forbidden
per ``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"EIGEN_PLACES_INPUT_HW",
"IMAGENET_MEAN",
"IMAGENET_STD",
"EigenPlacesBackbonePreprocessor",
]
EIGEN_PLACES_INPUT_HW: Final[tuple[int, int]] = (480, 480)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class EigenPlacesBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = EIGEN_PLACES_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"EigenPlacesBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"EigenPlacesBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"EigenPlacesBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.eigen_places")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Calibration handling mirrors UltraVPR (description.md § 6 — same
upstream convention, duplicated not shared): when calibration is
absent or its principal point cannot be extracted from
``intrinsics_3x3``, fall back to the image's geometric centre
and emit ONE WARN log per call with
``kind="c2.vpr.calibration_missing"``.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"EigenPlaces calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,202 @@
"""SALAD backbone preprocessor (AZ-340).
SALAD's published preprocessing chain (per the upstream research code
drop, DINOv2-aligned): decode the nav-camera frame's image to RGB
uint8, centre-crop to a square region respecting the camera
calibration's principal point (or geometric centre + WARN log when
calibration is absent), resize to ``(322, 322)``, apply ImageNet
mean/std normalisation (DINOv2's documented default), cast to FP16,
reshape to NCHW.
Differences from the other C2 secondary preprocessors:
- 322x322 input shape (matches MegaLoc's 322x322 by coincidence —
both are DINOv2-family inputs by upstream convention; SALAD's
aggregator consumes DINOv2-Large patch tokens at this resolution).
- Same calibration-aware centre-crop and ImageNet mean/std — these
upstream conventions happen to align across DINOv2-family backbones
but are NOT a shared dependency: the centre-crop logic is
duplicated here per ``components/02_c2_vpr/description.md`` § 6 so
a future SALAD code drop can change its preprocessing without
coupling other strategies' weights-versions.
This preprocessor is C2-internal and owned exclusively by
:class:`SaladStrategy` — sharing across backbones is forbidden per
``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"IMAGENET_MEAN",
"IMAGENET_STD",
"SALAD_INPUT_HW",
"SaladBackbonePreprocessor",
]
SALAD_INPUT_HW: Final[tuple[int, int]] = (322, 322)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class SaladBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = SALAD_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"SaladBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"SaladBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"SaladBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.salad")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Calibration handling mirrors UltraVPR (description.md § 6 — same
upstream convention, duplicated not shared): when calibration is
absent or its principal point cannot be extracted from
``intrinsics_3x3``, fall back to the image's geometric centre
and emit ONE WARN log per call with
``kind="c2.vpr.calibration_missing"``.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"SALAD calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,200 @@
"""SelaVPR backbone preprocessor (AZ-340).
SelaVPR's published preprocessing chain (per the upstream research code
drop): decode the nav-camera frame's image to RGB uint8, centre-crop to
a square region respecting the camera calibration's principal point (or
geometric centre + WARN log when calibration is absent), resize to
``(224, 224)``, apply ImageNet mean/std normalisation, cast to FP16,
reshape to NCHW.
Differences from the other C2 secondary preprocessors:
- 224x224 input shape (vs MegaLoc's 322x322, MixVPR's 320x320,
EigenPlaces' 480x480, SALAD's 322x322).
- Same calibration-aware centre-crop and ImageNet mean/std — these
upstream conventions happen to align across several backbones but
are NOT a shared dependency: the centre-crop logic is duplicated
here per ``components/02_c2_vpr/description.md`` § 6 so a future
SelaVPR code drop can change its preprocessing without coupling
other strategies' weights-versions.
This preprocessor is C2-internal and owned exclusively by
:class:`SelaVprStrategy` — sharing across backbones is forbidden per
``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"IMAGENET_MEAN",
"IMAGENET_STD",
"SELA_VPR_INPUT_HW",
"SelaVprBackbonePreprocessor",
]
SELA_VPR_INPUT_HW: Final[tuple[int, int]] = (224, 224)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class SelaVprBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = SELA_VPR_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"SelaVprBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"SelaVprBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"SelaVprBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.sela_vpr")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Calibration handling mirrors UltraVPR (description.md § 6 — same
upstream convention, duplicated not shared): when calibration is
absent or its principal point cannot be extracted from
``intrinsics_3x3``, fall back to the image's geometric centre
and emit ONE WARN log per call with
``kind="c2.vpr.calibration_missing"``.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"SelaVPR calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,452 @@
"""``EigenPlacesStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340).
EigenPlaces is a secondary backbone shipped exclusively in the research
binary for the IT-12 comparative-study matrix
(``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002,
``BUILD_VPR_EIGENPLACES`` is ON for the research binary and replay-cli,
OFF for the airborne and operator-tooling binaries — selecting
``eigen_places`` on a binary without the flag fails fast at
composition-root time via :class:`StrategyNotAvailableError` (not at
first frame).
The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime
fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507).
Engine output key is ``"embedding"`` and the strategy applies single-stage
global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval
delegates to :class:`FaissBridge` (AZ-341).
Architecture-registry differences from :class:`NetVladStrategy`:
EigenPlaces consumes a pre-compiled ``.trt`` engine produced by C10's
engine compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register,
so the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``.
:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture`
no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not after
takeoff.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 480, 480)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 2048) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged.
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_eigen_places import (
EigenPlacesBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock as _iso_ts_from_clock,
)
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "EigenPlacesStrategy", "create"]
# EigenPlaces' published embedding dimension (D=2048) per the upstream
# research code drop — same as MegaLoc and the NetVLAD default but with
# different semantics (CosPlace-family eigenvector backbone). Engine
# output shape is asserted at create() time against this constant.
DESCRIPTOR_DIM: Final[int] = 2048
_BACKBONE_LABEL: Final[Literal["eigen_places"]] = "eigen_places"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class EigenPlacesStrategy:
"""C2 secondary VprStrategy backed by a TRT EigenPlaces engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: EigenPlacesBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"EigenPlacesStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"EigenPlaces forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"EigenPlaces forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"EigenPlaces forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"EigenPlaces backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"EigenPlaces preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> EigenPlacesStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
EigenPlaces is unselectable when the C7 TRT / ONNX-RT runtimes are
excluded — ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time.
Engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound (AC-6).
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"EigenPlaces requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"EigenPlacesStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.eigen_places")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = EigenPlacesBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return EigenPlacesStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: EigenPlacesBackbonePreprocessor,
) -> None:
# The 7-way duplication of this helper (ultra_vpr / net_vlad /
# mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked
# by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The
# duplication is intentional for now: extracting earlier would
# expand AZ-340's scope past the three new strategies.
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
@@ -0,0 +1,464 @@
"""``SaladStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340).
SALAD is a secondary backbone shipped exclusively in the research
binary for the IT-12 comparative-study matrix
(``components/02_c2_vpr/description.md`` § 1 + § 5; ``module-layout.md``
``BUILD_VPR_SALAD`` row). Per ADR-002, ``BUILD_VPR_SALAD`` is ON for
the research binary and replay-cli, OFF for the airborne and
operator-tooling binaries — selecting ``salad`` on a binary without the
flag fails fast at composition-root time via
:class:`StrategyNotAvailableError` (not at first frame).
SALAD is the heaviest backbone in the C2 family: a DINOv2-Large
backbone produces patch tokens that the SALAD aggregator turns into a
single 8448-dimensional descriptor. Per the task spec NFR-perf
budget, SALAD's ``embed_query`` p95 is permitted up to 120 ms (vs
UltraVPR's tighter primary-path budget) and the FAISS HNSW lookup is
permitted up to 6 ms p95. Operators who want a smaller SALAD descriptor
must apply PCA-whitening at corpus build time (C10) — out of scope
here.
The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime
fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507).
Engine output key is ``"embedding"`` and the strategy applies single-stage
global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval
delegates to :class:`FaissBridge` (AZ-341).
Architecture-registry differences from :class:`NetVladStrategy`:
SALAD consumes a pre-compiled ``.trt`` engine produced by C10's engine
compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so
the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``.
:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture`
no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not after
takeoff.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 322, 322)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 8448) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged.
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_salad import (
SaladBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock as _iso_ts_from_clock,
)
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "SaladStrategy", "create"]
# SALAD's published embedding dimension (D=8448) — the largest VPR
# descriptor the project carries, produced by the SALAD aggregator
# applied to DINOv2-Large patch tokens. The matching FAISS HNSW corpus
# has correspondingly higher RAM cost; researchers must rebuild the
# corpus when swapping between SALAD and any non-8448 backbone (the
# AZ-336 pre-flight dim-mismatch check enforces this). Engine output
# shape is asserted at create() time.
DESCRIPTOR_DIM: Final[int] = 8448
_BACKBONE_LABEL: Final[Literal["salad"]] = "salad"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class SaladStrategy:
"""C2 secondary VprStrategy backed by a TRT SALAD (DINOv2-Large) engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: SaladBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"SaladStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"SALAD forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"SALAD forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"SALAD forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"SALAD backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"SALAD preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> SaladStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
SALAD is unselectable when the C7 TRT / ONNX-RT runtimes are
excluded — ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time.
Engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound (AC-6).
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"SALAD requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"SaladStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.salad")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = SaladBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return SaladStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: SaladBackbonePreprocessor,
) -> None:
# The 7-way duplication of this helper (ultra_vpr / net_vlad /
# mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked
# by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The
# duplication is intentional for now: extracting earlier would
# expand AZ-340's scope past the three new strategies.
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
@@ -0,0 +1,451 @@
"""``SelaVprStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340).
SelaVPR is a secondary backbone shipped exclusively in the research
binary for the IT-12 comparative-study matrix
(``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002,
``BUILD_VPR_SELAVPR`` is ON for the research binary and replay-cli, OFF
for the airborne and operator-tooling binaries — selecting ``sela_vpr``
on a binary without the flag fails fast at composition-root time via
:class:`StrategyNotAvailableError` (not at first frame).
The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime
fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507).
Engine output key is ``"embedding"`` and the strategy applies single-stage
global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval
delegates to :class:`FaissBridge` (AZ-341).
Architecture-registry differences from :class:`NetVladStrategy`:
SelaVPR consumes a pre-compiled ``.trt`` engine produced by C10's engine
compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so
the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``.
:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture`
no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not after
takeoff.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 224, 224)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 512) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged.
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_sela_vpr import (
SelaVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock as _iso_ts_from_clock,
)
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "SelaVprStrategy", "create"]
# SelaVPR's published embedding dimension (D=512) per the upstream
# research code drop. Engine output shape is asserted at create() time
# against this constant — changing it would silently break AC-2 /
# AC-4 / AC-5 / AC-6.
DESCRIPTOR_DIM: Final[int] = 512
_BACKBONE_LABEL: Final[Literal["sela_vpr"]] = "sela_vpr"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class SelaVprStrategy:
"""C2 secondary VprStrategy backed by a TRT SelaVPR engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: SelaVprBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"SelaVprStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"SelaVPR forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"SelaVPR forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"SelaVPR forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"SelaVPR backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"SelaVPR preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> SelaVprStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
SelaVPR is unselectable when the C7 TRT / ONNX-RT runtimes are
excluded — ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time.
Engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound (AC-6).
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"SelaVPR requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"SelaVprStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.sela_vpr")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = SelaVprBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return SelaVprStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: SelaVprBackbonePreprocessor,
) -> None:
# The 7-way duplication of this helper (ultra_vpr / net_vlad /
# mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked
# by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The
# duplication is intentional for now: extracting earlier would
# expand AZ-340's scope past the three new strategies.
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
@@ -0,0 +1,835 @@
"""AZ-340 — SelaVPR + EigenPlaces + SALAD secondary VprStrategy unit tests.
Covers AC-1..AC-11 for all three strategies. Parametrised across the
strategies so the test surface stays compact (one test per AC times
three strategies) and any drift between the three implementations is
visible at the assertion level — same approach as
``test_az339_mega_loc_mix_vpr.py``.
Uses fakes for :class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`,
and :class:`FdrClient` so the suite stays AZ-507-clean and TRT-free
(mirrors the precedent in ``test_ultra_vpr.py`` / ``test_az339_*``).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
from unittest.mock import MagicMock
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineCacheEntry,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.components.c2_vpr import (
C2VprConfig,
VprStrategy,
)
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_eigen_places import (
EigenPlacesBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr._preprocessor_salad import (
SaladBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr._preprocessor_sela_vpr import (
SelaVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.eigen_places import (
DESCRIPTOR_DIM as EIGEN_PLACES_DIM,
)
from gps_denied_onboard.components.c2_vpr.eigen_places import (
EigenPlacesStrategy,
)
from gps_denied_onboard.components.c2_vpr.eigen_places import (
create as create_eigen_places,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.salad import (
DESCRIPTOR_DIM as SALAD_DIM,
)
from gps_denied_onboard.components.c2_vpr.salad import (
SaladStrategy,
)
from gps_denied_onboard.components.c2_vpr.salad import (
create as create_salad,
)
from gps_denied_onboard.components.c2_vpr.sela_vpr import (
DESCRIPTOR_DIM as SELA_VPR_DIM,
)
from gps_denied_onboard.components.c2_vpr.sela_vpr import (
SelaVprStrategy,
)
from gps_denied_onboard.components.c2_vpr.sela_vpr import (
create as create_sela_vpr,
)
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.fdr_client import FdrClient
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
# ---------------------------------------------------------------------------
# Parametrisation: each strategy + its bound constants
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class _StrategySpec:
name: str
strategy_cls: type
create_fn: Any
preprocessor_cls: type
descriptor_dim: int
backbone_label: str
input_hw: tuple[int, int]
_SPECS: list[_StrategySpec] = [
_StrategySpec(
name="sela_vpr",
strategy_cls=SelaVprStrategy,
create_fn=create_sela_vpr,
preprocessor_cls=SelaVprBackbonePreprocessor,
descriptor_dim=SELA_VPR_DIM,
backbone_label="sela_vpr",
input_hw=(224, 224),
),
_StrategySpec(
name="eigen_places",
strategy_cls=EigenPlacesStrategy,
create_fn=create_eigen_places,
preprocessor_cls=EigenPlacesBackbonePreprocessor,
descriptor_dim=EIGEN_PLACES_DIM,
backbone_label="eigen_places",
input_hw=(480, 480),
),
_StrategySpec(
name="salad",
strategy_cls=SaladStrategy,
create_fn=create_salad,
preprocessor_cls=SaladBackbonePreprocessor,
descriptor_dim=SALAD_DIM,
backbone_label="salad",
input_hw=(322, 322),
),
]
@pytest.fixture(params=_SPECS, ids=[s.name for s in _SPECS])
def spec(request: pytest.FixtureRequest) -> _StrategySpec:
return request.param
# ---------------------------------------------------------------------------
# Fakes (mirrors test_az339_*.py shape)
# ---------------------------------------------------------------------------
@dataclass
class _StubClock:
next_monotonic_ns: int = 1_000_000_000
step_ns: int = 5_000
fixed_time_ns: int = 1_715_600_000_000_000_000
def monotonic_ns(self) -> int:
v = self.next_monotonic_ns
self.next_monotonic_ns += self.step_ns
return v
def time_ns(self) -> int:
return self.fixed_time_ns
def sleep_until_ns(self, target_ns: int) -> None:
_ = target_ns
class _FakeEngineHandle(EngineHandle):
def __init__(self, label: str) -> None:
self.label = label
@dataclass
class _FakeInferenceRuntime:
descriptor_dim: int = 512
raises: BaseException | None = None
runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = (
"tensorrt"
)
fixed_output: np.ndarray | None = None
output_key: str = "embedding"
calls: list[dict[str, np.ndarray]] = field(default_factory=list)
deserialize_calls: list[EngineCacheEntry] = field(default_factory=list)
model_name: str = "sela_vpr"
def compile_engine(
self, model_path: Path, build_config: BuildConfig
) -> EngineCacheEntry:
_ = build_config
return EngineCacheEntry(
engine_path=Path(model_path),
sha256_hex="0" * 64,
sm=None,
jp=None,
trt=None,
precision=PrecisionMode.FP16,
extras={"model_name": self.model_name},
)
def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle:
self.deserialize_calls.append(entry)
return _FakeEngineHandle(label=entry.extras.get("model_name", ""))
def infer(
self, handle: EngineHandle, inputs: dict[str, np.ndarray]
) -> dict[str, np.ndarray]:
_ = handle
self.calls.append({k: v.copy() for k, v in inputs.items()})
if self.raises is not None:
raise self.raises
if self.fixed_output is not None:
return {self.output_key: self.fixed_output.copy()}
rng = np.random.default_rng(0xCAFEBABE)
tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16)
return {
self.output_key: tensor.reshape(1, self.descriptor_dim).copy()
}
def release_engine(self, handle: EngineHandle) -> None:
_ = handle
def current_runtime_label(
self,
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]:
return self.runtime_label
@dataclass
class _FakeDescriptorIndex:
descriptor_dim_value: int = 512
results: list[tuple[tuple[int, float, float], float]] = field(
default_factory=list
)
raises: BaseException | None = None
def search_topk(
self, query: np.ndarray, k: int
) -> list[tuple[tuple[int, float, float], float]]:
_ = query
if self.raises is not None:
raise self.raises
if not self.results:
return [
((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i)
for i in range(k)
]
return list(self.results[:k])
def descriptor_dim(self) -> int:
return self.descriptor_dim_value
# ---------------------------------------------------------------------------
# Fixture helpers
# ---------------------------------------------------------------------------
def _make_frame(
*, frame_id: int = 4242, h: int = 720, w: int = 1280
) -> NavCameraFrame:
rng = np.random.default_rng(frame_id)
image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8)
return NavCameraFrame(
frame_id=frame_id,
timestamp=datetime(2026, 5, 14, 0, 0, 0),
image=image,
camera_calibration_id="test_cam",
)
def _make_calibration(
*, cx: float = 640.0, cy: float = 360.0
) -> CameraCalibration:
intrinsics = np.array(
[
[1000.0, 0.0, cx],
[0.0, 1000.0, cy],
[0.0, 0.0, 1.0],
],
dtype=np.float64,
)
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=intrinsics,
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="test_fixture",
)
def _make_fdr_client() -> FdrClient:
return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False)
def _build_strategy(
spec: _StrategySpec,
*,
inference_runtime: _FakeInferenceRuntime | None = None,
descriptor_index: _FakeDescriptorIndex | None = None,
preprocessor: Any = None,
fdr_client: FdrClient | None = None,
clock: _StubClock | None = None,
descriptor_dim: int | None = None,
) -> Any:
dim = spec.descriptor_dim if descriptor_dim is None else descriptor_dim
inference_runtime = inference_runtime or _FakeInferenceRuntime(
descriptor_dim=dim, model_name=spec.name
)
descriptor_index = descriptor_index or _FakeDescriptorIndex(
descriptor_dim_value=dim
)
preprocessor = preprocessor or spec.preprocessor_cls()
fdr_client = fdr_client or _make_fdr_client()
clock = clock or _StubClock()
handle = _FakeEngineHandle(label=spec.name)
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
return spec.strategy_cls(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=DescriptorNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=dim,
)
def _build_config(strategy_name: str) -> Config:
c2 = C2VprConfig(
strategy=strategy_name,
backbone_weights_path=Path(f"/models/{strategy_name}.trt"),
faiss_index_path=Path("/cache/vpr/index.faiss"),
warn_top1_threshold=0.30,
debug_per_frame_distances=False,
)
cfg = MagicMock(spec=Config)
cfg.components = {"c2_vpr": c2}
return cfg
# ---------------------------------------------------------------------------
# AC-1: Protocol conformance
# ---------------------------------------------------------------------------
def test_ac1_protocol_conformance(spec: _StrategySpec) -> None:
strategy = _build_strategy(spec)
assert isinstance(strategy, VprStrategy)
# ---------------------------------------------------------------------------
# AC-2: embed_query → L2-normalised FP16 embedding of correct dim
# ---------------------------------------------------------------------------
def test_ac2_embed_query_returns_unit_norm_fp16_correct_dim(
spec: _StrategySpec,
) -> None:
# Arrange
strategy = _build_strategy(spec)
frame = _make_frame()
calibration = _make_calibration()
# Act
query = strategy.embed_query(frame, calibration)
# Assert
embedding = np.asarray(query.embedding)
assert embedding.shape == (spec.descriptor_dim,)
assert embedding.dtype == np.float16
norm = float(np.linalg.norm(embedding.astype(np.float32)))
assert norm == pytest.approx(1.0, abs=1e-3)
def test_ac2_single_stage_l2_no_intra_cluster_call(
spec: _StrategySpec,
) -> None:
"""Secondary backbones use single-stage L2 (no NetVLAD-style intra-cluster step)."""
# Arrange
calls: list[str] = []
class _SpyNormaliser(DescriptorNormaliser):
def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override]
calls.append("l2_normalise")
return DescriptorNormaliser.l2_normalise(descriptor)
def intra_cluster_normalise( # type: ignore[override]
self, descriptor: np.ndarray, num_clusters: int
) -> np.ndarray:
calls.append("intra_cluster_normalise")
return DescriptorNormaliser.intra_cluster_normalise(
descriptor, num_clusters
)
inference_runtime = _FakeInferenceRuntime(descriptor_dim=spec.descriptor_dim)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
fdr_client = _make_fdr_client()
clock = _StubClock()
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=spec.descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
strategy = spec.strategy_cls(
inference_runtime=inference_runtime,
engine_handle=_FakeEngineHandle(spec.name),
descriptor_index=descriptor_index,
preprocessor=spec.preprocessor_cls(),
normaliser=_SpyNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=spec.descriptor_dim,
)
# Act
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
assert "intra_cluster_normalise" not in calls
assert calls == ["l2_normalise"]
# ---------------------------------------------------------------------------
# AC-3: deterministic embeddings
# ---------------------------------------------------------------------------
def test_ac3_embed_query_deterministic_for_same_frame(
spec: _StrategySpec,
) -> None:
# Arrange
rng = np.random.default_rng(2026)
fixed = rng.standard_normal(spec.descriptor_dim).astype(np.float16)
fixed = fixed.reshape(1, spec.descriptor_dim)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, fixed_output=fixed
)
strategy = _build_strategy(spec, inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
# Act
first = strategy.embed_query(frame, calibration)
second = strategy.embed_query(frame, calibration)
third = strategy.embed_query(frame, calibration)
# Assert
np.testing.assert_array_equal(
np.asarray(first.embedding), np.asarray(second.embedding)
)
np.testing.assert_array_equal(
np.asarray(second.embedding), np.asarray(third.embedding)
)
# ---------------------------------------------------------------------------
# AC-4: retrieve_topk returns k candidates with correct backbone_label
# ---------------------------------------------------------------------------
def test_ac4_retrieve_topk_returns_exactly_k_with_correct_label(
spec: _StrategySpec,
) -> None:
# Arrange
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
strategy = _build_strategy(spec, descriptor_index=descriptor_index)
# Act
query = strategy.embed_query(_make_frame(), _make_calibration())
result = strategy.retrieve_topk(query, k=10)
# Assert
assert len(result.candidates) == 10
assert result.backbone_label == spec.backbone_label
assert result.candidates[0].descriptor_dim == spec.descriptor_dim
distances = [c.descriptor_distance for c in result.candidates]
assert distances == sorted(distances)
# ---------------------------------------------------------------------------
# AC-5: descriptor_dim() is stable
# ---------------------------------------------------------------------------
def test_ac5_descriptor_dim_stable(spec: _StrategySpec) -> None:
strategy = _build_strategy(spec)
for _ in range(100):
assert strategy.descriptor_dim() == spec.descriptor_dim
# ---------------------------------------------------------------------------
# AC-6: Engine output shape mismatch → ConfigError at create()
# ---------------------------------------------------------------------------
def test_ac6_create_rejects_engine_output_shape_mismatch(
spec: _StrategySpec,
) -> None:
# Arrange — engine produces (1, 100), expected (1, spec.descriptor_dim)
wrong = np.zeros((1, 100), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
fixed_output=wrong,
model_name=spec.name,
)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
# Act + Assert
with pytest.raises(ConfigError, match=r"engine output shape mismatch"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
def test_ac6_create_rejects_missing_embedding_key(
spec: _StrategySpec,
) -> None:
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
output_key="wrong_key",
model_name=spec.name,
)
# Act + Assert
with pytest.raises(ConfigError, match=r"'embedding' key absent"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-7: VprBackboneError on forward-pass failure
# ---------------------------------------------------------------------------
def test_ac7_runtime_error_yields_vpr_backbone_error(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, raises=RuntimeError("CUDA OOM")
)
fdr_client = _make_fdr_client()
strategy = _build_strategy(
spec, inference_runtime=runtime, fdr_client=fdr_client
)
# Act
with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"):
with pytest.raises(VprBackboneError):
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.backbone_error"
for record in caplog.records
)
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"]
assert len(backbone_errors) == 1
def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error(
spec: _StrategySpec,
) -> None:
# Arrange
bad = np.zeros((1, 100), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, fixed_output=bad
)
strategy = _build_strategy(spec, inference_runtime=runtime)
# Act + Assert
with pytest.raises(
VprBackboneError, match=rf"expected \(1, {spec.descriptor_dim}\)"
):
strategy.embed_query(_make_frame(), _make_calibration())
# ---------------------------------------------------------------------------
# AC-8: VprPreprocessError on corrupt image bytes
# ---------------------------------------------------------------------------
def test_ac8_corrupt_image_yields_vpr_preprocess_error(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
fdr_client = _make_fdr_client()
strategy = _build_strategy(spec, fdr_client=fdr_client)
frame = NavCameraFrame(
frame_id=4242,
timestamp=datetime(2026, 5, 14, 0, 0, 0),
image="not-an-array",
camera_calibration_id="test_cam",
)
# Act
with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"):
with pytest.raises(VprPreprocessError):
strategy.embed_query(frame, _make_calibration())
# Assert
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.preprocess_error"
for record in caplog.records
)
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
preprocess_errors = [
r for r in records if r.kind == "vpr.preprocess_error"
]
assert len(preprocess_errors) == 1
# ---------------------------------------------------------------------------
# AC-9: Composition-root wiring + INFO "c2.vpr.ready" log emitted
# ---------------------------------------------------------------------------
def test_ac9_create_emits_ready_log_with_correct_label_and_dim(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
logger_name = f"gps_denied_onboard.c2_vpr.{spec.name}"
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
# Act
with caplog.at_level(logging.INFO, logger=logger_name):
strategy = spec.create_fn(
_build_config(spec.name),
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# Assert
assert isinstance(strategy, spec.strategy_cls)
ready_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready"
]
assert len(ready_records) == 1
kv = getattr(ready_records[0], "kv", {})
assert kv == {
"strategy": spec.backbone_label,
"descriptor_dim": spec.descriptor_dim,
}
# ---------------------------------------------------------------------------
# AC-10: Build-flag exclusion → composition-time fail-fast
# ---------------------------------------------------------------------------
def test_ac10_runtime_label_mismatch_raises_config_error(
spec: _StrategySpec,
) -> None:
"""Selecting a secondary backbone on a binary built without the
TRT / ONNX-RT runtimes fails fast at create-time.
Note: AC-10 of the task spec literally names ``ConfigurationError``;
the existing factory contract (AZ-336) raises
``StrategyNotAvailableError`` via the BUILD_VPR_* env-flag check
BEFORE create() is reached, but the strategy module's own runtime
label guard surfaces a ``ConfigError`` for the same intent
(wrong runtime). Both are composition-time fail-fast errors. Same
mirroring precedent as AZ-337 / AZ-338 / AZ-339.
"""
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
runtime_label="pytorch_fp16",
model_name=spec.name,
)
# Act + Assert
with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-11: Preprocessor input shape
# ---------------------------------------------------------------------------
def test_ac11_preprocessor_input_shape(spec: _StrategySpec) -> None:
preprocessor = spec.preprocessor_cls()
assert preprocessor.input_shape() == spec.input_hw
def test_preprocess_output_is_nchw_fp16(spec: _StrategySpec) -> None:
# Arrange
preprocessor = spec.preprocessor_cls()
frame = _make_frame()
calibration = _make_calibration()
# Act
tensor = preprocessor.preprocess(frame, calibration)
# Assert
h, w = spec.input_hw
assert tensor.shape == (1, 3, h, w)
assert tensor.dtype == np.float16
# ---------------------------------------------------------------------------
# Constructor validation
# ---------------------------------------------------------------------------
def test_constructor_rejects_zero_descriptor_dim(spec: _StrategySpec) -> None:
# Arrange (skip _build_strategy to bypass FaissBridge's own validation)
fdr_client = _make_fdr_client()
clock = _StubClock()
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=spec.descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
# Act + Assert
with pytest.raises(ValueError, match=r"descriptor_dim must be >= 1"):
spec.strategy_cls(
inference_runtime=_FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
),
engine_handle=_FakeEngineHandle(spec.name),
descriptor_index=descriptor_index,
preprocessor=spec.preprocessor_cls(),
normaliser=DescriptorNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=0,
)
def test_create_requires_fdr_client(spec: _StrategySpec) -> None:
with pytest.raises(ValueError, match=r"fdr_client is required"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=_FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
),
fdr_client=None,
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# FDR emission on success path
# ---------------------------------------------------------------------------
def test_embed_query_emits_fdr_record(spec: _StrategySpec) -> None:
# Arrange
fdr_client = _make_fdr_client()
strategy = _build_strategy(spec, fdr_client=fdr_client)
# Act
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
embed = [r for r in records if r.kind == "vpr.embed_query"]
assert len(embed) == 1
payload = embed[0].payload
assert payload["backbone_label"] == spec.backbone_label
assert payload["descriptor_dim"] == spec.descriptor_dim
assert payload["latency_us"] >= 1