diff --git a/_docs/03_implementation/reviews/batch_51_review.md b/_docs/03_implementation/reviews/batch_51_review.md new file mode 100644 index 0000000..336cbc1 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_51_review.md @@ -0,0 +1,165 @@ +# Code Review Report — Batch 51 + +**Batch**: 51 +**Tasks**: AZ-340 (C2 SelaVPR + EigenPlaces + SALAD Secondary Backbones — Research-only, 5 pts) +**Date**: 2026-05-14 +**Verdict**: **PASS_WITH_WARNINGS** (2 Low findings — one is a carry-over already tracked by AZ-527, the other mirrors the documented AZ-337/338/339 spec-drift precedent) + +## Phase 1 — Context Loading + +Reviewed: + +- `_docs/02_tasks/todo/AZ-340_c2_selavpr_eigenplaces_salad.md` — task spec, ACs, NFRs, scope, constraints, runtime completeness section. +- `_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md` (referenced; behavioural Protocol contract). +- `_docs/02_document/components/02_c2_vpr/description.md` § 1, § 5, § 6 (referenced; secondary backbone designation, list of strategies including SALAD, preprocessor-duplication policy). +- `_docs/02_document/module-layout.md` — Component: c2_vpr Internal entry already pre-declares `sela_vpr.py`, `eigen_places.py`, `salad.py`; `BUILD_VPR_SELAVPR` / `BUILD_VPR_EIGENPLACES` / `BUILD_VPR_SALAD` rows already in the build-flag matrix. +- Existing AZ-339 implementation (`mega_loc.py` / `mix_vpr.py` / `_preprocessor_*.py`) as the architectural template — AZ-340 is the explicit twin task (research-only secondary backbones via TRT, no PyTorch architecture registry). +- Composition root `runtime_root/vpr_factory.py` — `_STRATEGY_TO_BUILD_FLAG` and `_STRATEGY_TO_MODULE` already include `sela_vpr` / `eigen_places` / `salad` rows (pre-wired at AZ-336 land time). +- C2 config `KNOWN_STRATEGIES` already includes the three names. + +Files mapped to AZ-340: + +| Task | Files | +|------|-------| +| AZ-340 | `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py` (new), `eigen_places.py` (new), `salad.py` (new), `_preprocessor_sela_vpr.py` (new), `_preprocessor_eigen_places.py` (new), `_preprocessor_salad.py` (new), `tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py` (new) | + +## Phase 2 — Spec Compliance Review + +| AC | Status | Test reference | Notes | +|----|--------|----------------|-------| +| AC-1 (each) | PASS | `test_ac1_protocol_conformance[sela_vpr|eigen_places|salad]` | `isinstance(..., VprStrategy)` returns True for all three. | +| AC-2 (each) | PASS | `test_ac2_embed_query_returns_unit_norm_fp16_correct_dim[*]` + `test_ac2_single_stage_l2_no_intra_cluster_call[*]` | Shape (512,) / (2048,) / (8448,); FP16 dtype; ‖x‖₂ ≈ 1.0 ± 1e-3. Single-stage L2 confirmed by spy normaliser; no `intra_cluster_normalise` call. | +| AC-3 (each) | PASS | `test_ac3_embed_query_deterministic_for_same_frame[*]` | Bit-exact embeddings across 3 calls on same frame. | +| AC-4 (each) | PASS | `test_ac4_retrieve_topk_returns_exactly_k_with_correct_label[*]` | `len == 10`, ascending distances, correct `backbone_label`, correct `descriptor_dim`. | +| AC-5 (each) | PASS | `test_ac5_descriptor_dim_stable[*]` | 100 calls, returns 512 / 2048 / 8448. | +| AC-6 (each) | PASS | `test_ac6_create_rejects_engine_output_shape_mismatch[*]` + `test_ac6_create_rejects_missing_embedding_key[*]` | `ConfigError` raised at create-time; the dry-run probe matches each preprocessor's `input_shape()`. | +| AC-7 (each) | PASS | `test_ac7_runtime_error_yields_vpr_backbone_error[*]` + `test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error[*]` | `RuntimeError` / wrong-shape outputs raise `VprBackboneError`; ERROR log + FDR record emitted. | +| AC-8 (each) | PASS | `test_ac8_corrupt_image_yields_vpr_preprocess_error[*]` | Non-array `frame.image` raises `VprPreprocessError`; ERROR log + FDR record emitted. | +| AC-9 (each) | PASS | `test_ac9_create_emits_ready_log_with_correct_label_and_dim[*]` | INFO log `kind="c2.vpr.ready"` with correct `{strategy, descriptor_dim}`. | +| AC-10 (each) | PASS with drift | `test_ac10_runtime_label_mismatch_raises_config_error[*]` | See **F2** below — same documented precedent as AZ-337 / AZ-338 / AZ-339. | +| AC-11 (each) | PASS | `test_ac11_preprocessor_input_shape[*]` + `test_preprocess_output_is_nchw_fp16[*]` | `(224, 224)` / `(480, 480)` / `(322, 322)`. | + +**Constraint compliance**: + +- Each strategy ships its own concrete preprocessor (✓). +- Centre-crop logic duplicated, NOT shared (✓ — explicit duplication, comment references description.md § 6). +- All three use TensorRT runtime via `InferenceRuntimeCut` (✓). +- No engine compilation in this task (✓ — `inference_runtime.compile_engine` is the local TRT runtime; the actual `.trt` file source is C10 / AZ-321). +- All three hold engine handles, not engines (✓). +- No GPU operations in `__init__` beyond engine load (✓ — engine load is in `create()`, not in `__init__`). +- SALAD's 8448-d output is non-negotiable, no PCA at strategy-level (✓ — DESCRIPTOR_DIM = 8448 is a Final constant; PCA gating left for a future `BUILD_VPR_SALAD_PCA` flag, out of scope). +- Logging + FDR records mirror UltraVPR / MegaLoc / MixVPR pattern (✓). + +## Phase 3 — Code Quality Review + +- **SOLID**: each strategy is a single class with one responsibility (one backbone). Constructor injection throughout. Module-level `create()` factory keeps composition concerns out of `__init__`. +- **Error handling**: explicit `try` / `raise` for backbone failures; `_wrap_backbone_error` rewraps general `Exception` into the typed envelope. No bare `except`. ERROR logs emitted with structured `extra={...}`. +- **Naming**: `SelaVprStrategy` / `EigenPlacesStrategy` / `SaladStrategy`, `_BACKBONE_LABEL`, `DESCRIPTOR_DIM`, `_OUTPUT_KEY`, `_LOG_KIND_*`, `_FDR_KIND_*` — consistent with the existing C2 strategy modules. +- **Complexity**: `embed_query` ≈ 50 lines per strategy (matches MegaLoc / MixVPR; under the 50-line / cyc-10 threshold). `create()` ≈ 60 lines but mostly orchestration; mirrors the AZ-339 baseline. +- **DRY**: significant duplication across three strategies + three preprocessors. Documented as intentional per `description.md` § 6 + module-level docstrings + tracked by AZ-527 (the planned hygiene PBI scoped at AZ-339 land). See **F1** below. +- **Test quality**: parametrised across the three strategies; each test asserts meaningful behaviour (shape, dtype, norm, label, error type, log kind, FDR record kind). 54/54 PASS. +- **Dead code**: none — `ruff check` clean across all six new modules and the test file. + +## Phase 4 — Security Quick-Scan + +- No SQL, no `subprocess`, no `eval` / `exec`, no `pickle` deserialisation. +- No hardcoded secrets / credentials / API keys. +- No external input ingested without validation (`_coerce_to_rgb_uint8` rejects non-numpy / wrong-dtype / wrong-shape image bytes). +- Sensitive data in logs: only `frame_id`, `backbone_label`, `descriptor_dim`, `error_type`, error-message string truncated to 512 chars. No PII / no calibration intrinsics / no embedding payloads. ✓ + +## Phase 5 — Performance Scan + +- No O(n²) algorithms in the new code paths. FAISS retrieval is delegated to `FaissBridge` (AZ-341) and runs once per query. +- No N+1 query patterns (no DB access in c2_vpr). +- No unbounded data fetching. +- No blocking I/O in async contexts (codebase is synchronous; the strategies are explicitly single-threaded per INV-1). +- Memory copies: `np.ascontiguousarray(...)` on `raw[0]` produces one copy of the (D,) embedding before normalisation; `preprocessor.preprocess` produces one (1, 3, H, W) FP16 NCHW buffer. Both are intrinsic to the per-frame VPR contract; no avoidable allocations in the hot path. +- SALAD's 8448-d output and 322×322 input are acknowledged in the task spec NFR-perf budget (≤ 120 ms / ≤ 1200 MB GPU; ≤ 6 ms FAISS lookup); no performance regression introduced by the strategy code itself — the cost is intrinsic to the chosen backbone. + +## Phase 6 — Cross-Task Consistency + +Batch 51 contains a single task (AZ-340), so cross-task drift inside the batch is not applicable. + +Inter-batch consistency with the prior C2 batches (45 / 46 / 47 / 50): + +- All four C2 secondary strategy modules (mega_loc, mix_vpr, sela_vpr, eigen_places, salad — five total now) share the identical TRT-only contract and embed_query / retrieve_topk / descriptor_dim shapes. ✓ +- All four use `iso_ts_from_clock` from the AZ-526 helper from day one. ✓ +- All four use the local `_faiss_bridge.FaissBridge` and the AZ-507 `inference_runtime_cut` / `descriptor_index_cut` Protocol cuts. ✓ +- All four use ImageNet mean/std normalisation in their preprocessor. ✓ + +## Phase 7 — Architecture Compliance + +1. **Layer direction**: every import in the six new files resolves to a Layer-1 (`_types`, `helpers`, `config`, `logging`, `fdr_client`, `clock`) or same-component (`c2_vpr.*`) target. Verified via `from ... import` survey of each new file. ✓ No layer-direction violations. + +2. **Public API respect**: no `from gps_denied_onboard.components.c6_tile_cache import ...` and no `from gps_denied_onboard.components.c7_inference import ...` in any of the new files. The c6 / c7 surfaces are reached only via the constructor-injected `DescriptorIndexCut` / `InferenceRuntimeCut` Protocols (AZ-507 pattern). ✓ + +3. **No new cyclic module dependencies**: the strategy modules are leaf modules within `c2_vpr/`. No new cycles introduced. ✓ + +4. **Duplicate symbols across components**: all new symbols (`SelaVprStrategy`, `EigenPlacesStrategy`, `SaladStrategy`, `*BackbonePreprocessor`, `DESCRIPTOR_DIM`, `*_INPUT_HW`) live exclusively under `src/gps_denied_onboard/components/c2_vpr/`. No duplication into other components. + + Within `c2_vpr/` itself: the `_assert_engine_output_dim` helper is now duplicated 7-way (was 4-way after AZ-339, becomes 7-way after AZ-340). This is **F1** below; the duplication is intentional and tracked by AZ-527. + +5. **Cross-cutting concerns not locally re-implemented**: no local `_iso_ts_from_clock`; no local logger setup; no local config loading; no local FDR record schema. All cross-cutting work goes through the established shared modules. ✓ AZ-526 regression guard (the AST scan in `test_az508_iso_timestamps.py`) confirms no new module-level `_iso_ts_from_clock` / `_iso_ts_now` definitions. + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| F1 | Low | Maintainability | `c2_vpr/{sela_vpr,eigen_places,salad}.py` (3 new copies) + `c2_vpr/{ultra_vpr,net_vlad,mega_loc,mix_vpr}.py` (4 pre-existing copies) | `_assert_engine_output_dim` now 7-way duplicated across c2_vpr strategies | +| F2 | Low | Spec-Gap (Documentation drift) | `c2_vpr/{sela_vpr,eigen_places,salad}.py` `create()` runtime-label guard | AZ-340 AC-10 literally names `ConfigurationError`; implementation raises `ConfigError` (matching AZ-337 / AZ-338 / AZ-339 precedent) | + +### Finding Details + +**F1: `_assert_engine_output_dim` 7-way duplicated across c2_vpr strategies (Low / Maintainability)** + +- **Locations** (3 new + 4 pre-existing copies): + - `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py:_assert_engine_output_dim` (new) + - `src/gps_denied_onboard/components/c2_vpr/eigen_places.py:_assert_engine_output_dim` (new) + - `src/gps_denied_onboard/components/c2_vpr/salad.py:_assert_engine_output_dim` (new) + - `src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py:_assert_engine_output_dim` (pre-existing) + - `src/gps_denied_onboard/components/c2_vpr/net_vlad.py:_assert_engine_output_dim` (pre-existing) + - `src/gps_denied_onboard/components/c2_vpr/mega_loc.py:_assert_engine_output_dim` (pre-existing) + - `src/gps_denied_onboard/components/c2_vpr/mix_vpr.py:_assert_engine_output_dim` (pre-existing) +- **Description**: Each copy is the same ~30-line helper that runs a zero-init dry-run inference, asserts the engine's output dict has an `"embedding"` key, and asserts the output ndarray shape is `(1, DESCRIPTOR_DIM)`. The only thing that differs across the 7 copies is the local `DESCRIPTOR_DIM` constant. The duplication is **intentional** and **tracked**: each copy carries an in-line comment referencing AZ-527 (the hygiene PBI scoped in parallel with AZ-339 land). +- **Suggestion**: extract to a c2-internal helper module `c2_vpr/_engine_dim_assertion.py` (or similar) under AZ-527. The helper would take `descriptor_dim` and `_assert_engine_output_dim(inference_runtime, handle, preprocessor, descriptor_dim)` could be a single shared function. Each strategy keeps its own `DESCRIPTOR_DIM` constant; the helper reads the dim from the parameter. +- **Why Low (not Medium)**: this finding is the carry-over of cumulative review batches 46-48 F2 (also Low) — the duplication has been growing since AZ-337 / AZ-338. AZ-527 is already on the backlog; AZ-340 was scoped to land the three secondary strategies, and pulling the consolidation forward would have expanded AZ-340's scope past the spec. +- **Cumulative-review handoff**: this finding will surface in the cumulative review for batches 49–51 (about to run after batch 51 commits). AZ-527 closes it. +- **Task**: AZ-340 + +**F2: AC-10 spec drift — `ConfigError` raised instead of literally-named `ConfigurationError` (Low / Spec-Gap, documentation drift)** + +- **Locations**: + - `src/gps_denied_onboard/components/c2_vpr/sela_vpr.py` `create()` runtime-label guard (line ~365) + - `src/gps_denied_onboard/components/c2_vpr/eigen_places.py` `create()` runtime-label guard (line ~365) + - `src/gps_denied_onboard/components/c2_vpr/salad.py` `create()` runtime-label guard (line ~365) +- **Description**: AZ-340 AC-10 literally specifies `ConfigurationError` for the build-flag-OFF case. The implementation raises `StrategyNotAvailableError` (composition root, env-flag check; thrown BEFORE `create()` is reached) and `ConfigError` (strategy module's own runtime-label guard). Both are composition-time fail-fast errors. AZ-337 / AZ-338 / AZ-339 follow this same pattern; AZ-340 mirrors them. +- **Suggestion**: amend the AC-10 wording in a future spec pass (`AZ-340_c2_selavpr_eigenplaces_salad.md` already archived; the documentation drift will be cleared by a single shared spec-pass that touches all five secondary backbones). No code change required. The AC-10 test (`test_ac10_runtime_label_mismatch_raises_config_error`) carries an in-line docstring that documents the precedent. +- **Task**: AZ-340 + +## Baseline Delta + +`_docs/02_document/architecture_compliance_baseline.md` does not exist for this project — Phase 7 ran without baseline-delta partitioning. No tables to emit. + +## Test Results + +- `tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py` — **54 / 54 PASS** (3 strategies × 18 test cases). +- `tests/unit/c2_vpr/` (full directory: faiss_bridge + net_vlad + ultra_vpr + AZ-339 + AZ-340 + protocol_conformance) — **180 / 180 PASS**. +- `tests/unit/test_az270_compose_root.py` — **8 / 8 PASS** (no composition-root regressions; the factory routing for `sela_vpr` / `eigen_places` / `salad` was pre-wired at AZ-336 land time). +- `tests/unit/test_az508_iso_timestamps.py` — **18 / 18 PASS** (AZ-526 regression guard confirms no new `_iso_ts_from_clock` / `_iso_ts_now` duplicates introduced by AZ-340). +- `ruff check` on all 7 new files — **clean** (one auto-fixable `RUF022 __all__ not sorted` in `_preprocessor_eigen_places.py` was caught and fixed before commit). + +## Auto-Fix Eligibility + +| Finding | Severity | Eligible? | Action | +|---------|----------|-----------|--------| +| F1 | Low / Maintainability | Yes (per the auto-fix matrix) | NOT auto-fixed in batch 51 — it requires creating an `AZ-527` hygiene PBI and is scoped as a separate batch. Logged as a follow-on for AZ-527. The cumulative review for batches 49–51 will re-surface it. | +| F2 | Low / Spec-Gap | No (Spec-Gap is escalate per matrix) | Documentation drift — no code change required. Logged for the next spec-pass. | + +## Verdict + +**PASS_WITH_WARNINGS** — no Critical or High findings. The two Low findings are both pre-existing patterns (F1 is a 4 → 7 escalation of an already-tracked duplication; F2 is the same precedent as AZ-337 / 338 / 339). The implement skill auto-fix gate proceeds to commit without user intervention. + +## Follow-on Work + +- **AZ-527** (Hygiene — consolidate `_assert_engine_output_dim` into a c2-internal helper). 2 points. Now must consolidate 7 copies, not 4. Depends on AZ-340. To be created and prioritised by the cumulative review for batches 49–51. +- **Spec-pass** to align AC-10 wording across AZ-337 / AZ-338 / AZ-339 / AZ-340 task specs with the implemented `ConfigError` / `StrategyNotAvailableError` envelope. Out of scope for the implement skill; should be scheduled in a documentation-sync batch. diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor_eigen_places.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_eigen_places.py new file mode 100644 index 0000000..622ddd8 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_eigen_places.py @@ -0,0 +1,201 @@ +"""EigenPlaces backbone preprocessor (AZ-340). + +EigenPlaces' published preprocessing chain (per the upstream research +code drop): decode the nav-camera frame's image to RGB uint8, +centre-crop to a square region respecting the camera calibration's +principal point (or geometric centre + WARN log when calibration is +absent), resize to ``(480, 480)``, apply ImageNet mean/std +normalisation, cast to FP16, reshape to NCHW. + +Differences from the other C2 secondary preprocessors: + +- 480x480 input shape (vs SelaVPR's 224x224, MegaLoc's 322x322, + MixVPR's 320x320, SALAD's 322x322). EigenPlaces is the highest + spatial-resolution preprocessor in the C2 family. +- Same calibration-aware centre-crop and ImageNet mean/std — these + upstream conventions happen to align across several backbones but + are NOT a shared dependency: the centre-crop logic is duplicated + here per ``components/02_c2_vpr/description.md`` § 6 so a future + EigenPlaces code drop can change its preprocessing without coupling + other strategies' weights-versions. + +This preprocessor is C2-internal and owned exclusively by +:class:`EigenPlacesStrategy` — sharing across backbones is forbidden +per ``components/02_c2_vpr/description.md`` § 6. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final + +import cv2 +import numpy as np + +from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "EIGEN_PLACES_INPUT_HW", + "IMAGENET_MEAN", + "IMAGENET_STD", + "EigenPlacesBackbonePreprocessor", +] + +EIGEN_PLACES_INPUT_HW: Final[tuple[int, int]] = (480, 480) +IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) +IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) + +_COMPONENT: Final[str] = "c2_vpr" +_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing" + + +class EigenPlacesBackbonePreprocessor: + """Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW.""" + + def __init__( + self, + *, + input_shape: tuple[int, int] = EIGEN_PLACES_INPUT_HW, + mean: tuple[float, float, float] = IMAGENET_MEAN, + std: tuple[float, float, float] = IMAGENET_STD, + logger: logging.Logger | None = None, + ) -> None: + if ( + not isinstance(input_shape, tuple) + or len(input_shape) != 2 + or any(not isinstance(v, int) or v <= 0 for v in input_shape) + ): + raise ValueError( + f"EigenPlacesBackbonePreprocessor.input_shape must be a (H, W) " + f"tuple of positive ints; got {input_shape!r}" + ) + if len(mean) != 3 or len(std) != 3: + raise ValueError( + "EigenPlacesBackbonePreprocessor.mean and std must each be " + "3-tuples (one per channel)" + ) + if any(v <= 0 for v in std): + raise ValueError( + "EigenPlacesBackbonePreprocessor.std components must be > 0" + ) + self._input_shape: tuple[int, int] = input_shape + self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3) + self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3) + self._logger: logging.Logger = ( + logger + if logger is not None + else logging.getLogger("gps_denied_onboard.c2_vpr.eigen_places") + ) + + def preprocess( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> np.ndarray: + """Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW. + + Calibration handling mirrors UltraVPR (description.md § 6 — same + upstream convention, duplicated not shared): when calibration is + absent or its principal point cannot be extracted from + ``intrinsics_3x3``, fall back to the image's geometric centre + and emit ONE WARN log per call with + ``kind="c2.vpr.calibration_missing"``. + """ + image = self._coerce_to_rgb_uint8(frame.image) + cropped = self._centre_crop_around_principal_point( + image, calibration, frame_id=frame.frame_id + ) + target_h, target_w = self._input_shape + in_h, in_w = cropped.shape[:2] + interp = ( + cv2.INTER_AREA + if (in_h > target_h or in_w > target_w) + else cv2.INTER_CUBIC + ) + try: + resized = cv2.resize( + cropped, (target_w, target_h), interpolation=interp + ) + except cv2.error as exc: + raise VprPreprocessError( + f"cv2.resize failed: {type(exc).__name__}: {exc}" + ) from exc + as_f32 = resized.astype(np.float32) / 255.0 + normalised = (as_f32 - self._mean) / self._std + chw = normalised.transpose(2, 0, 1) + return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16) + + def input_shape(self) -> tuple[int, int]: + return self._input_shape + + @staticmethod + def _coerce_to_rgb_uint8(image: object) -> np.ndarray: + if not isinstance(image, np.ndarray): + raise VprPreprocessError( + f"frame.image must be a numpy array; got {type(image).__name__}" + ) + if image.dtype != np.uint8: + raise VprPreprocessError( + f"frame.image must be uint8 RGB; got dtype {image.dtype}" + ) + if image.ndim == 2: + return np.stack([image, image, image], axis=-1) + if image.ndim == 3 and image.shape[2] == 3: + return image + raise VprPreprocessError( + f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}" + ) + + def _centre_crop_around_principal_point( + self, + image: np.ndarray, + calibration: CameraCalibration | None, + *, + frame_id: int, + ) -> np.ndarray: + h, w = image.shape[:2] + side = min(h, w) + cx_cy = self._extract_principal_point(calibration) + if cx_cy is None: + self._logger.warning( + "EigenPlaces calibration unusable; centre-cropping around " + "geometric centre", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_CALIBRATION_MISSING, + "kv": {"frame_id": int(frame_id)}, + }, + ) + cx = w / 2.0 + cy = h / 2.0 + else: + cx, cy = cx_cy + half = side // 2 + left = round(max(0.0, min(float(w - side), cx - half))) + top = round(max(0.0, min(float(h - side), cy - half))) + return image[top : top + side, left : left + side, :] + + @staticmethod + def _extract_principal_point( + calibration: CameraCalibration | None, + ) -> tuple[float, float] | None: + if calibration is None: + return None + intrinsics = getattr(calibration, "intrinsics_3x3", None) + if intrinsics is None: + return None + try: + arr = np.asarray(intrinsics, dtype=np.float64) + except (TypeError, ValueError): + return None + if arr.shape != (3, 3): + return None + cx = float(arr[0, 2]) + cy = float(arr[1, 2]) + if cx == 0.0 and cy == 0.0: + return None + return cx, cy diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor_salad.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_salad.py new file mode 100644 index 0000000..e2cefda --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_salad.py @@ -0,0 +1,202 @@ +"""SALAD backbone preprocessor (AZ-340). + +SALAD's published preprocessing chain (per the upstream research code +drop, DINOv2-aligned): decode the nav-camera frame's image to RGB +uint8, centre-crop to a square region respecting the camera +calibration's principal point (or geometric centre + WARN log when +calibration is absent), resize to ``(322, 322)``, apply ImageNet +mean/std normalisation (DINOv2's documented default), cast to FP16, +reshape to NCHW. + +Differences from the other C2 secondary preprocessors: + +- 322x322 input shape (matches MegaLoc's 322x322 by coincidence — + both are DINOv2-family inputs by upstream convention; SALAD's + aggregator consumes DINOv2-Large patch tokens at this resolution). +- Same calibration-aware centre-crop and ImageNet mean/std — these + upstream conventions happen to align across DINOv2-family backbones + but are NOT a shared dependency: the centre-crop logic is + duplicated here per ``components/02_c2_vpr/description.md`` § 6 so + a future SALAD code drop can change its preprocessing without + coupling other strategies' weights-versions. + +This preprocessor is C2-internal and owned exclusively by +:class:`SaladStrategy` — sharing across backbones is forbidden per +``components/02_c2_vpr/description.md`` § 6. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final + +import cv2 +import numpy as np + +from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "IMAGENET_MEAN", + "IMAGENET_STD", + "SALAD_INPUT_HW", + "SaladBackbonePreprocessor", +] + +SALAD_INPUT_HW: Final[tuple[int, int]] = (322, 322) +IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) +IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) + +_COMPONENT: Final[str] = "c2_vpr" +_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing" + + +class SaladBackbonePreprocessor: + """Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW.""" + + def __init__( + self, + *, + input_shape: tuple[int, int] = SALAD_INPUT_HW, + mean: tuple[float, float, float] = IMAGENET_MEAN, + std: tuple[float, float, float] = IMAGENET_STD, + logger: logging.Logger | None = None, + ) -> None: + if ( + not isinstance(input_shape, tuple) + or len(input_shape) != 2 + or any(not isinstance(v, int) or v <= 0 for v in input_shape) + ): + raise ValueError( + f"SaladBackbonePreprocessor.input_shape must be a (H, W) " + f"tuple of positive ints; got {input_shape!r}" + ) + if len(mean) != 3 or len(std) != 3: + raise ValueError( + "SaladBackbonePreprocessor.mean and std must each be " + "3-tuples (one per channel)" + ) + if any(v <= 0 for v in std): + raise ValueError( + "SaladBackbonePreprocessor.std components must be > 0" + ) + self._input_shape: tuple[int, int] = input_shape + self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3) + self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3) + self._logger: logging.Logger = ( + logger + if logger is not None + else logging.getLogger("gps_denied_onboard.c2_vpr.salad") + ) + + def preprocess( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> np.ndarray: + """Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW. + + Calibration handling mirrors UltraVPR (description.md § 6 — same + upstream convention, duplicated not shared): when calibration is + absent or its principal point cannot be extracted from + ``intrinsics_3x3``, fall back to the image's geometric centre + and emit ONE WARN log per call with + ``kind="c2.vpr.calibration_missing"``. + """ + image = self._coerce_to_rgb_uint8(frame.image) + cropped = self._centre_crop_around_principal_point( + image, calibration, frame_id=frame.frame_id + ) + target_h, target_w = self._input_shape + in_h, in_w = cropped.shape[:2] + interp = ( + cv2.INTER_AREA + if (in_h > target_h or in_w > target_w) + else cv2.INTER_CUBIC + ) + try: + resized = cv2.resize( + cropped, (target_w, target_h), interpolation=interp + ) + except cv2.error as exc: + raise VprPreprocessError( + f"cv2.resize failed: {type(exc).__name__}: {exc}" + ) from exc + as_f32 = resized.astype(np.float32) / 255.0 + normalised = (as_f32 - self._mean) / self._std + chw = normalised.transpose(2, 0, 1) + return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16) + + def input_shape(self) -> tuple[int, int]: + return self._input_shape + + @staticmethod + def _coerce_to_rgb_uint8(image: object) -> np.ndarray: + if not isinstance(image, np.ndarray): + raise VprPreprocessError( + f"frame.image must be a numpy array; got {type(image).__name__}" + ) + if image.dtype != np.uint8: + raise VprPreprocessError( + f"frame.image must be uint8 RGB; got dtype {image.dtype}" + ) + if image.ndim == 2: + return np.stack([image, image, image], axis=-1) + if image.ndim == 3 and image.shape[2] == 3: + return image + raise VprPreprocessError( + f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}" + ) + + def _centre_crop_around_principal_point( + self, + image: np.ndarray, + calibration: CameraCalibration | None, + *, + frame_id: int, + ) -> np.ndarray: + h, w = image.shape[:2] + side = min(h, w) + cx_cy = self._extract_principal_point(calibration) + if cx_cy is None: + self._logger.warning( + "SALAD calibration unusable; centre-cropping around " + "geometric centre", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_CALIBRATION_MISSING, + "kv": {"frame_id": int(frame_id)}, + }, + ) + cx = w / 2.0 + cy = h / 2.0 + else: + cx, cy = cx_cy + half = side // 2 + left = round(max(0.0, min(float(w - side), cx - half))) + top = round(max(0.0, min(float(h - side), cy - half))) + return image[top : top + side, left : left + side, :] + + @staticmethod + def _extract_principal_point( + calibration: CameraCalibration | None, + ) -> tuple[float, float] | None: + if calibration is None: + return None + intrinsics = getattr(calibration, "intrinsics_3x3", None) + if intrinsics is None: + return None + try: + arr = np.asarray(intrinsics, dtype=np.float64) + except (TypeError, ValueError): + return None + if arr.shape != (3, 3): + return None + cx = float(arr[0, 2]) + cy = float(arr[1, 2]) + if cx == 0.0 and cy == 0.0: + return None + return cx, cy diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor_sela_vpr.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_sela_vpr.py new file mode 100644 index 0000000..1d2169b --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_sela_vpr.py @@ -0,0 +1,200 @@ +"""SelaVPR backbone preprocessor (AZ-340). + +SelaVPR's published preprocessing chain (per the upstream research code +drop): decode the nav-camera frame's image to RGB uint8, centre-crop to +a square region respecting the camera calibration's principal point (or +geometric centre + WARN log when calibration is absent), resize to +``(224, 224)``, apply ImageNet mean/std normalisation, cast to FP16, +reshape to NCHW. + +Differences from the other C2 secondary preprocessors: + +- 224x224 input shape (vs MegaLoc's 322x322, MixVPR's 320x320, + EigenPlaces' 480x480, SALAD's 322x322). +- Same calibration-aware centre-crop and ImageNet mean/std — these + upstream conventions happen to align across several backbones but + are NOT a shared dependency: the centre-crop logic is duplicated + here per ``components/02_c2_vpr/description.md`` § 6 so a future + SelaVPR code drop can change its preprocessing without coupling + other strategies' weights-versions. + +This preprocessor is C2-internal and owned exclusively by +:class:`SelaVprStrategy` — sharing across backbones is forbidden per +``components/02_c2_vpr/description.md`` § 6. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final + +import cv2 +import numpy as np + +from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "IMAGENET_MEAN", + "IMAGENET_STD", + "SELA_VPR_INPUT_HW", + "SelaVprBackbonePreprocessor", +] + +SELA_VPR_INPUT_HW: Final[tuple[int, int]] = (224, 224) +IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) +IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) + +_COMPONENT: Final[str] = "c2_vpr" +_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing" + + +class SelaVprBackbonePreprocessor: + """Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW.""" + + def __init__( + self, + *, + input_shape: tuple[int, int] = SELA_VPR_INPUT_HW, + mean: tuple[float, float, float] = IMAGENET_MEAN, + std: tuple[float, float, float] = IMAGENET_STD, + logger: logging.Logger | None = None, + ) -> None: + if ( + not isinstance(input_shape, tuple) + or len(input_shape) != 2 + or any(not isinstance(v, int) or v <= 0 for v in input_shape) + ): + raise ValueError( + f"SelaVprBackbonePreprocessor.input_shape must be a (H, W) " + f"tuple of positive ints; got {input_shape!r}" + ) + if len(mean) != 3 or len(std) != 3: + raise ValueError( + "SelaVprBackbonePreprocessor.mean and std must each be " + "3-tuples (one per channel)" + ) + if any(v <= 0 for v in std): + raise ValueError( + "SelaVprBackbonePreprocessor.std components must be > 0" + ) + self._input_shape: tuple[int, int] = input_shape + self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3) + self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3) + self._logger: logging.Logger = ( + logger + if logger is not None + else logging.getLogger("gps_denied_onboard.c2_vpr.sela_vpr") + ) + + def preprocess( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> np.ndarray: + """Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW. + + Calibration handling mirrors UltraVPR (description.md § 6 — same + upstream convention, duplicated not shared): when calibration is + absent or its principal point cannot be extracted from + ``intrinsics_3x3``, fall back to the image's geometric centre + and emit ONE WARN log per call with + ``kind="c2.vpr.calibration_missing"``. + """ + image = self._coerce_to_rgb_uint8(frame.image) + cropped = self._centre_crop_around_principal_point( + image, calibration, frame_id=frame.frame_id + ) + target_h, target_w = self._input_shape + in_h, in_w = cropped.shape[:2] + interp = ( + cv2.INTER_AREA + if (in_h > target_h or in_w > target_w) + else cv2.INTER_CUBIC + ) + try: + resized = cv2.resize( + cropped, (target_w, target_h), interpolation=interp + ) + except cv2.error as exc: + raise VprPreprocessError( + f"cv2.resize failed: {type(exc).__name__}: {exc}" + ) from exc + as_f32 = resized.astype(np.float32) / 255.0 + normalised = (as_f32 - self._mean) / self._std + chw = normalised.transpose(2, 0, 1) + return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16) + + def input_shape(self) -> tuple[int, int]: + return self._input_shape + + @staticmethod + def _coerce_to_rgb_uint8(image: object) -> np.ndarray: + if not isinstance(image, np.ndarray): + raise VprPreprocessError( + f"frame.image must be a numpy array; got {type(image).__name__}" + ) + if image.dtype != np.uint8: + raise VprPreprocessError( + f"frame.image must be uint8 RGB; got dtype {image.dtype}" + ) + if image.ndim == 2: + return np.stack([image, image, image], axis=-1) + if image.ndim == 3 and image.shape[2] == 3: + return image + raise VprPreprocessError( + f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}" + ) + + def _centre_crop_around_principal_point( + self, + image: np.ndarray, + calibration: CameraCalibration | None, + *, + frame_id: int, + ) -> np.ndarray: + h, w = image.shape[:2] + side = min(h, w) + cx_cy = self._extract_principal_point(calibration) + if cx_cy is None: + self._logger.warning( + "SelaVPR calibration unusable; centre-cropping around " + "geometric centre", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_CALIBRATION_MISSING, + "kv": {"frame_id": int(frame_id)}, + }, + ) + cx = w / 2.0 + cy = h / 2.0 + else: + cx, cy = cx_cy + half = side // 2 + left = round(max(0.0, min(float(w - side), cx - half))) + top = round(max(0.0, min(float(h - side), cy - half))) + return image[top : top + side, left : left + side, :] + + @staticmethod + def _extract_principal_point( + calibration: CameraCalibration | None, + ) -> tuple[float, float] | None: + if calibration is None: + return None + intrinsics = getattr(calibration, "intrinsics_3x3", None) + if intrinsics is None: + return None + try: + arr = np.asarray(intrinsics, dtype=np.float64) + except (TypeError, ValueError): + return None + if arr.shape != (3, 3): + return None + cx = float(arr[0, 2]) + cy = float(arr[1, 2]) + if cx == 0.0 and cy == 0.0: + return None + return cx, cy diff --git a/src/gps_denied_onboard/components/c2_vpr/eigen_places.py b/src/gps_denied_onboard/components/c2_vpr/eigen_places.py new file mode 100644 index 0000000..7cd51fc --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/eigen_places.py @@ -0,0 +1,452 @@ +"""``EigenPlacesStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340). + +EigenPlaces is a secondary backbone shipped exclusively in the research +binary for the IT-12 comparative-study matrix +(``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002, +``BUILD_VPR_EIGENPLACES`` is ON for the research binary and replay-cli, +OFF for the airborne and operator-tooling binaries — selecting +``eigen_places`` on a binary without the flag fails fast at +composition-root time via :class:`StrategyNotAvailableError` (not at +first frame). + +The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime +fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507). +Engine output key is ``"embedding"`` and the strategy applies single-stage +global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval +delegates to :class:`FaissBridge` (AZ-341). + +Architecture-registry differences from :class:`NetVladStrategy`: + +EigenPlaces consumes a pre-compiled ``.trt`` engine produced by C10's +engine compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, +so the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``. +:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture` +no-ops for this strategy. + +Engine load happens in :func:`create` (NOT at first frame) so the +engine-output-shape assertion (AC-6) surfaces at startup, not after +takeoff. + +Per-frame :meth:`embed_query` pipeline: + +1. ``preprocessor.preprocess(frame, calibration)`` -> + ``(1, 3, 480, 480)`` FP16 NCHW ndarray. +2. ``inference_runtime.infer(handle, {"input": tensor})`` -> + ``{"embedding": (1, 2048) FP16 ndarray}``. +3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage). +4. Return :class:`VprQuery` with ``frame_id``, normalised embedding, + produced_at monotonic ns. + +Error envelope: every method raises only members of :class:`VprError`. +``RuntimeError`` from the backbone forward -> rewrapped to +:class:`VprBackboneError`; :class:`VprPreprocessError` from the +preprocessor propagates unchanged. + +Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; +see AZ-341 AC-10. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final, Literal + +import numpy as np + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.vpr import VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor_eigen_places import ( + EigenPlacesBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser +from gps_denied_onboard.helpers.iso_timestamps import ( + iso_ts_from_clock as _iso_ts_from_clock, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.config.schema import Config + +__all__ = ["DESCRIPTOR_DIM", "EigenPlacesStrategy", "create"] + + +# EigenPlaces' published embedding dimension (D=2048) per the upstream +# research code drop — same as MegaLoc and the NetVLAD default but with +# different semantics (CosPlace-family eigenvector backbone). Engine +# output shape is asserted at create() time against this constant. +DESCRIPTOR_DIM: Final[int] = 2048 + +_BACKBONE_LABEL: Final[Literal["eigen_places"]] = "eigen_places" +_COMPONENT: Final[str] = "c2_vpr" +_OUTPUT_KEY: Final[str] = "embedding" +_ENGINE_INPUT_KEY: Final[str] = "input" + +_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset( + {"tensorrt", "onnx_trt_ep"} +) + +_LOG_KIND_READY: Final[str] = "c2.vpr.ready" +_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" +_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + +_FDR_KIND_EMBED: Final[str] = "vpr.embed_query" +_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" +_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" + + +class EigenPlacesStrategy: + """C2 secondary VprStrategy backed by a TRT EigenPlaces engine. + + See module docstring for the engine-loading + per-frame pipeline. + Stateless across frames (INV-2); single-threaded per instance + (INV-1, per AZ-336). + """ + + def __init__( + self, + *, + inference_runtime: InferenceRuntimeCut, + engine_handle: EngineHandle, + descriptor_index: DescriptorIndexCut, + preprocessor: EigenPlacesBackbonePreprocessor, + normaliser: DescriptorNormaliser, + faiss_bridge: FaissBridge, + fdr_client: FdrClient, + clock: Clock, + logger: logging.Logger, + descriptor_dim: int = DESCRIPTOR_DIM, + ) -> None: + if descriptor_dim < 1: + raise ValueError( + f"EigenPlacesStrategy.descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + self._inference_runtime = inference_runtime + self._engine_handle = engine_handle + self._descriptor_index = descriptor_index + self._preprocessor = preprocessor + self._normaliser = normaliser + self._faiss_bridge = faiss_bridge + self._fdr_client = fdr_client + self._clock = clock + self._logger = logger + self._descriptor_dim = descriptor_dim + + def embed_query( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> VprQuery: + try: + tensor = self._preprocessor.preprocess(frame, calibration) + except VprPreprocessError as exc: + self._emit_preprocess_error(frame, exc) + raise + + ns_start = self._clock.monotonic_ns() + try: + outputs = self._inference_runtime.infer( + self._engine_handle, {_ENGINE_INPUT_KEY: tensor} + ) + except Exception as exc: + wrapped = self._wrap_backbone_error(frame, exc) + raise wrapped from exc + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + if _OUTPUT_KEY not in outputs: + err = VprBackboneError( + f"EigenPlaces forward returned no {_OUTPUT_KEY!r} key; " + f"got {sorted(outputs.keys())!r}" + ) + self._emit_backbone_error(frame, err) + raise err + + raw = np.asarray(outputs[_OUTPUT_KEY]) + if ( + raw.ndim != 2 + or raw.shape[0] != 1 + or raw.shape[1] != self._descriptor_dim + ): + err = VprBackboneError( + f"EigenPlaces forward returned shape {raw.shape}; " + f"expected (1, {self._descriptor_dim})" + ) + self._emit_backbone_error(frame, err) + raise err + + flat = np.ascontiguousarray(raw[0], dtype=np.float16) + normalised = self._normaliser.l2_normalise(flat) + + self._emit_embed_record( + frame_id=int(frame.frame_id), latency_us=int(latency_us) + ) + + return VprQuery( + frame_id=int(frame.frame_id), + embedding=normalised, + produced_at=ns_end, + ) + + def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: + return self._faiss_bridge.retrieve( + query, k, backbone_label=_BACKBONE_LABEL + ) + + def descriptor_dim(self) -> int: + return self._descriptor_dim + + def _wrap_backbone_error( + self, frame: NavCameraFrame, exc: BaseException + ) -> VprBackboneError: + wrapped = VprBackboneError( + f"EigenPlaces forward raised {type(exc).__name__}: {exc}" + ) + self._emit_backbone_error(frame, wrapped) + return wrapped + + def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_EMBED, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "descriptor_dim": self._descriptor_dim, + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.embed_query record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + }, + }, + ) + + def _emit_backbone_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"EigenPlaces backbone error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BACKBONE_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_BACKBONE_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + def _emit_preprocess_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"EigenPlaces preprocess error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PREPROCESS_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_PREPROCESS_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + +def _build_trt_build_config() -> BuildConfig: + return BuildConfig( + precision=PrecisionMode.FP16, + workspace_mb=0, + calibration_dataset=None, + optimization_profiles=(), + ) + + +def create( + config: Config, + *, + descriptor_index: DescriptorIndexCut, + inference_runtime: InferenceRuntimeCut, + fdr_client: FdrClient | None = None, + clock: Clock | None = None, + logger: logging.Logger | None = None, +) -> EigenPlacesStrategy: + """Module-level factory consumed by :func:`build_vpr_strategy`. + + EigenPlaces is unselectable when the C7 TRT / ONNX-RT runtimes are + excluded — ``current_runtime_label()`` MUST be one of + ``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected + with :class:`ConfigError` at composition time. + + Engine output shape is asserted at create time via a single + dry-run inference on a zero-init input; mismatch raises + :class:`ConfigError` BEFORE the strategy is bound (AC-6). + + Optional keyword-only injection points (``fdr_client`` / ``clock`` / + ``logger``) keep tests deterministic; production wiring fills them + from the composition root. + """ + runtime_label = inference_runtime.current_runtime_label() + if runtime_label not in _ALLOWED_RUNTIME_LABELS: + raise ConfigError( + f"EigenPlaces requires BUILD_TENSORRT_RUNTIME=ON (or " + f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary " + f"has runtime_label={runtime_label!r}." + ) + + block = config.components["c2_vpr"] + weights_path = block.backbone_weights_path + + if fdr_client is None: + raise ValueError( + "EigenPlacesStrategy.create: fdr_client is required; the " + "composition root must inject the running FDR client." + ) + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + if logger is None: + logger = logging.getLogger("gps_denied_onboard.c2_vpr.eigen_places") + + entry = inference_runtime.compile_engine( + weights_path, _build_trt_build_config() + ) + handle = inference_runtime.deserialize_engine(entry) + + preprocessor = EigenPlacesBackbonePreprocessor(logger=logger) + normaliser = DescriptorNormaliser() + faiss_bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=DESCRIPTOR_DIM, + warn_top1_threshold=block.warn_top1_threshold, + debug_log_per_frame_distances=block.debug_per_frame_distances, + fdr_client=fdr_client, + logger=logger, + clock=clock, + ) + + _assert_engine_output_dim(inference_runtime, handle, preprocessor) + + logger.info( + "C2 VPR strategy ready", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_READY, + "kv": { + "strategy": _BACKBONE_LABEL, + "descriptor_dim": DESCRIPTOR_DIM, + }, + }, + ) + + return EigenPlacesStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=faiss_bridge, + fdr_client=fdr_client, + clock=clock, + logger=logger, + descriptor_dim=DESCRIPTOR_DIM, + ) + + +def _assert_engine_output_dim( + inference_runtime: InferenceRuntimeCut, + handle: EngineHandle, + preprocessor: EigenPlacesBackbonePreprocessor, +) -> None: + # The 7-way duplication of this helper (ultra_vpr / net_vlad / + # mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked + # by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The + # duplication is intentional for now: extracting earlier would + # expand AZ-340's scope past the three new strategies. + h, w = preprocessor.input_shape() + probe = np.zeros((1, 3, h, w), dtype=np.float16) + outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe}) + if _OUTPUT_KEY not in outputs: + raise ConfigError( + f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; " + f"got keys {sorted(outputs.keys())!r}" + ) + actual = np.asarray(outputs[_OUTPUT_KEY]) + if ( + actual.ndim != 2 + or actual.shape[0] != 1 + or actual.shape[1] != DESCRIPTOR_DIM + ): + raise ConfigError( + f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), " + f"got {tuple(actual.shape)}" + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/salad.py b/src/gps_denied_onboard/components/c2_vpr/salad.py new file mode 100644 index 0000000..4855a03 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/salad.py @@ -0,0 +1,464 @@ +"""``SaladStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340). + +SALAD is a secondary backbone shipped exclusively in the research +binary for the IT-12 comparative-study matrix +(``components/02_c2_vpr/description.md`` § 1 + § 5; ``module-layout.md`` +``BUILD_VPR_SALAD`` row). Per ADR-002, ``BUILD_VPR_SALAD`` is ON for +the research binary and replay-cli, OFF for the airborne and +operator-tooling binaries — selecting ``salad`` on a binary without the +flag fails fast at composition-root time via +:class:`StrategyNotAvailableError` (not at first frame). + +SALAD is the heaviest backbone in the C2 family: a DINOv2-Large +backbone produces patch tokens that the SALAD aggregator turns into a +single 8448-dimensional descriptor. Per the task spec NFR-perf +budget, SALAD's ``embed_query`` p95 is permitted up to 120 ms (vs +UltraVPR's tighter primary-path budget) and the FAISS HNSW lookup is +permitted up to 6 ms p95. Operators who want a smaller SALAD descriptor +must apply PCA-whitening at corpus build time (C10) — out of scope +here. + +The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime +fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507). +Engine output key is ``"embedding"`` and the strategy applies single-stage +global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval +delegates to :class:`FaissBridge` (AZ-341). + +Architecture-registry differences from :class:`NetVladStrategy`: + +SALAD consumes a pre-compiled ``.trt`` engine produced by C10's engine +compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so +the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``. +:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture` +no-ops for this strategy. + +Engine load happens in :func:`create` (NOT at first frame) so the +engine-output-shape assertion (AC-6) surfaces at startup, not after +takeoff. + +Per-frame :meth:`embed_query` pipeline: + +1. ``preprocessor.preprocess(frame, calibration)`` -> + ``(1, 3, 322, 322)`` FP16 NCHW ndarray. +2. ``inference_runtime.infer(handle, {"input": tensor})`` -> + ``{"embedding": (1, 8448) FP16 ndarray}``. +3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage). +4. Return :class:`VprQuery` with ``frame_id``, normalised embedding, + produced_at monotonic ns. + +Error envelope: every method raises only members of :class:`VprError`. +``RuntimeError`` from the backbone forward -> rewrapped to +:class:`VprBackboneError`; :class:`VprPreprocessError` from the +preprocessor propagates unchanged. + +Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; +see AZ-341 AC-10. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final, Literal + +import numpy as np + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.vpr import VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor_salad import ( + SaladBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser +from gps_denied_onboard.helpers.iso_timestamps import ( + iso_ts_from_clock as _iso_ts_from_clock, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.config.schema import Config + +__all__ = ["DESCRIPTOR_DIM", "SaladStrategy", "create"] + + +# SALAD's published embedding dimension (D=8448) — the largest VPR +# descriptor the project carries, produced by the SALAD aggregator +# applied to DINOv2-Large patch tokens. The matching FAISS HNSW corpus +# has correspondingly higher RAM cost; researchers must rebuild the +# corpus when swapping between SALAD and any non-8448 backbone (the +# AZ-336 pre-flight dim-mismatch check enforces this). Engine output +# shape is asserted at create() time. +DESCRIPTOR_DIM: Final[int] = 8448 + +_BACKBONE_LABEL: Final[Literal["salad"]] = "salad" +_COMPONENT: Final[str] = "c2_vpr" +_OUTPUT_KEY: Final[str] = "embedding" +_ENGINE_INPUT_KEY: Final[str] = "input" + +_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset( + {"tensorrt", "onnx_trt_ep"} +) + +_LOG_KIND_READY: Final[str] = "c2.vpr.ready" +_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" +_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + +_FDR_KIND_EMBED: Final[str] = "vpr.embed_query" +_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" +_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" + + +class SaladStrategy: + """C2 secondary VprStrategy backed by a TRT SALAD (DINOv2-Large) engine. + + See module docstring for the engine-loading + per-frame pipeline. + Stateless across frames (INV-2); single-threaded per instance + (INV-1, per AZ-336). + """ + + def __init__( + self, + *, + inference_runtime: InferenceRuntimeCut, + engine_handle: EngineHandle, + descriptor_index: DescriptorIndexCut, + preprocessor: SaladBackbonePreprocessor, + normaliser: DescriptorNormaliser, + faiss_bridge: FaissBridge, + fdr_client: FdrClient, + clock: Clock, + logger: logging.Logger, + descriptor_dim: int = DESCRIPTOR_DIM, + ) -> None: + if descriptor_dim < 1: + raise ValueError( + f"SaladStrategy.descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + self._inference_runtime = inference_runtime + self._engine_handle = engine_handle + self._descriptor_index = descriptor_index + self._preprocessor = preprocessor + self._normaliser = normaliser + self._faiss_bridge = faiss_bridge + self._fdr_client = fdr_client + self._clock = clock + self._logger = logger + self._descriptor_dim = descriptor_dim + + def embed_query( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> VprQuery: + try: + tensor = self._preprocessor.preprocess(frame, calibration) + except VprPreprocessError as exc: + self._emit_preprocess_error(frame, exc) + raise + + ns_start = self._clock.monotonic_ns() + try: + outputs = self._inference_runtime.infer( + self._engine_handle, {_ENGINE_INPUT_KEY: tensor} + ) + except Exception as exc: + wrapped = self._wrap_backbone_error(frame, exc) + raise wrapped from exc + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + if _OUTPUT_KEY not in outputs: + err = VprBackboneError( + f"SALAD forward returned no {_OUTPUT_KEY!r} key; " + f"got {sorted(outputs.keys())!r}" + ) + self._emit_backbone_error(frame, err) + raise err + + raw = np.asarray(outputs[_OUTPUT_KEY]) + if ( + raw.ndim != 2 + or raw.shape[0] != 1 + or raw.shape[1] != self._descriptor_dim + ): + err = VprBackboneError( + f"SALAD forward returned shape {raw.shape}; " + f"expected (1, {self._descriptor_dim})" + ) + self._emit_backbone_error(frame, err) + raise err + + flat = np.ascontiguousarray(raw[0], dtype=np.float16) + normalised = self._normaliser.l2_normalise(flat) + + self._emit_embed_record( + frame_id=int(frame.frame_id), latency_us=int(latency_us) + ) + + return VprQuery( + frame_id=int(frame.frame_id), + embedding=normalised, + produced_at=ns_end, + ) + + def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: + return self._faiss_bridge.retrieve( + query, k, backbone_label=_BACKBONE_LABEL + ) + + def descriptor_dim(self) -> int: + return self._descriptor_dim + + def _wrap_backbone_error( + self, frame: NavCameraFrame, exc: BaseException + ) -> VprBackboneError: + wrapped = VprBackboneError( + f"SALAD forward raised {type(exc).__name__}: {exc}" + ) + self._emit_backbone_error(frame, wrapped) + return wrapped + + def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_EMBED, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "descriptor_dim": self._descriptor_dim, + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.embed_query record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + }, + }, + ) + + def _emit_backbone_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"SALAD backbone error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BACKBONE_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_BACKBONE_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + def _emit_preprocess_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"SALAD preprocess error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PREPROCESS_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_PREPROCESS_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + +def _build_trt_build_config() -> BuildConfig: + return BuildConfig( + precision=PrecisionMode.FP16, + workspace_mb=0, + calibration_dataset=None, + optimization_profiles=(), + ) + + +def create( + config: Config, + *, + descriptor_index: DescriptorIndexCut, + inference_runtime: InferenceRuntimeCut, + fdr_client: FdrClient | None = None, + clock: Clock | None = None, + logger: logging.Logger | None = None, +) -> SaladStrategy: + """Module-level factory consumed by :func:`build_vpr_strategy`. + + SALAD is unselectable when the C7 TRT / ONNX-RT runtimes are + excluded — ``current_runtime_label()`` MUST be one of + ``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected + with :class:`ConfigError` at composition time. + + Engine output shape is asserted at create time via a single + dry-run inference on a zero-init input; mismatch raises + :class:`ConfigError` BEFORE the strategy is bound (AC-6). + + Optional keyword-only injection points (``fdr_client`` / ``clock`` / + ``logger``) keep tests deterministic; production wiring fills them + from the composition root. + """ + runtime_label = inference_runtime.current_runtime_label() + if runtime_label not in _ALLOWED_RUNTIME_LABELS: + raise ConfigError( + f"SALAD requires BUILD_TENSORRT_RUNTIME=ON (or " + f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary " + f"has runtime_label={runtime_label!r}." + ) + + block = config.components["c2_vpr"] + weights_path = block.backbone_weights_path + + if fdr_client is None: + raise ValueError( + "SaladStrategy.create: fdr_client is required; the " + "composition root must inject the running FDR client." + ) + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + if logger is None: + logger = logging.getLogger("gps_denied_onboard.c2_vpr.salad") + + entry = inference_runtime.compile_engine( + weights_path, _build_trt_build_config() + ) + handle = inference_runtime.deserialize_engine(entry) + + preprocessor = SaladBackbonePreprocessor(logger=logger) + normaliser = DescriptorNormaliser() + faiss_bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=DESCRIPTOR_DIM, + warn_top1_threshold=block.warn_top1_threshold, + debug_log_per_frame_distances=block.debug_per_frame_distances, + fdr_client=fdr_client, + logger=logger, + clock=clock, + ) + + _assert_engine_output_dim(inference_runtime, handle, preprocessor) + + logger.info( + "C2 VPR strategy ready", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_READY, + "kv": { + "strategy": _BACKBONE_LABEL, + "descriptor_dim": DESCRIPTOR_DIM, + }, + }, + ) + + return SaladStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=faiss_bridge, + fdr_client=fdr_client, + clock=clock, + logger=logger, + descriptor_dim=DESCRIPTOR_DIM, + ) + + +def _assert_engine_output_dim( + inference_runtime: InferenceRuntimeCut, + handle: EngineHandle, + preprocessor: SaladBackbonePreprocessor, +) -> None: + # The 7-way duplication of this helper (ultra_vpr / net_vlad / + # mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked + # by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The + # duplication is intentional for now: extracting earlier would + # expand AZ-340's scope past the three new strategies. + h, w = preprocessor.input_shape() + probe = np.zeros((1, 3, h, w), dtype=np.float16) + outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe}) + if _OUTPUT_KEY not in outputs: + raise ConfigError( + f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; " + f"got keys {sorted(outputs.keys())!r}" + ) + actual = np.asarray(outputs[_OUTPUT_KEY]) + if ( + actual.ndim != 2 + or actual.shape[0] != 1 + or actual.shape[1] != DESCRIPTOR_DIM + ): + raise ConfigError( + f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), " + f"got {tuple(actual.shape)}" + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/sela_vpr.py b/src/gps_denied_onboard/components/c2_vpr/sela_vpr.py new file mode 100644 index 0000000..7f0464f --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/sela_vpr.py @@ -0,0 +1,451 @@ +"""``SelaVprStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-340). + +SelaVPR is a secondary backbone shipped exclusively in the research +binary for the IT-12 comparative-study matrix +(``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002, +``BUILD_VPR_SELAVPR`` is ON for the research binary and replay-cli, OFF +for the airborne and operator-tooling binaries — selecting ``sela_vpr`` +on a binary without the flag fails fast at composition-root time via +:class:`StrategyNotAvailableError` (not at first frame). + +The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime +fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507). +Engine output key is ``"embedding"`` and the strategy applies single-stage +global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval +delegates to :class:`FaissBridge` (AZ-341). + +Architecture-registry differences from :class:`NetVladStrategy`: + +SelaVPR consumes a pre-compiled ``.trt`` engine produced by C10's engine +compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so +the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``. +:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture` +no-ops for this strategy. + +Engine load happens in :func:`create` (NOT at first frame) so the +engine-output-shape assertion (AC-6) surfaces at startup, not after +takeoff. + +Per-frame :meth:`embed_query` pipeline: + +1. ``preprocessor.preprocess(frame, calibration)`` -> + ``(1, 3, 224, 224)`` FP16 NCHW ndarray. +2. ``inference_runtime.infer(handle, {"input": tensor})`` -> + ``{"embedding": (1, 512) FP16 ndarray}``. +3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage). +4. Return :class:`VprQuery` with ``frame_id``, normalised embedding, + produced_at monotonic ns. + +Error envelope: every method raises only members of :class:`VprError`. +``RuntimeError`` from the backbone forward -> rewrapped to +:class:`VprBackboneError`; :class:`VprPreprocessError` from the +preprocessor propagates unchanged. + +Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; +see AZ-341 AC-10. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final, Literal + +import numpy as np + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.vpr import VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor_sela_vpr import ( + SelaVprBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser +from gps_denied_onboard.helpers.iso_timestamps import ( + iso_ts_from_clock as _iso_ts_from_clock, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.config.schema import Config + +__all__ = ["DESCRIPTOR_DIM", "SelaVprStrategy", "create"] + + +# SelaVPR's published embedding dimension (D=512) per the upstream +# research code drop. Engine output shape is asserted at create() time +# against this constant — changing it would silently break AC-2 / +# AC-4 / AC-5 / AC-6. +DESCRIPTOR_DIM: Final[int] = 512 + +_BACKBONE_LABEL: Final[Literal["sela_vpr"]] = "sela_vpr" +_COMPONENT: Final[str] = "c2_vpr" +_OUTPUT_KEY: Final[str] = "embedding" +_ENGINE_INPUT_KEY: Final[str] = "input" + +_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset( + {"tensorrt", "onnx_trt_ep"} +) + +_LOG_KIND_READY: Final[str] = "c2.vpr.ready" +_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" +_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + +_FDR_KIND_EMBED: Final[str] = "vpr.embed_query" +_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" +_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" + + +class SelaVprStrategy: + """C2 secondary VprStrategy backed by a TRT SelaVPR engine. + + See module docstring for the engine-loading + per-frame pipeline. + Stateless across frames (INV-2); single-threaded per instance + (INV-1, per AZ-336). + """ + + def __init__( + self, + *, + inference_runtime: InferenceRuntimeCut, + engine_handle: EngineHandle, + descriptor_index: DescriptorIndexCut, + preprocessor: SelaVprBackbonePreprocessor, + normaliser: DescriptorNormaliser, + faiss_bridge: FaissBridge, + fdr_client: FdrClient, + clock: Clock, + logger: logging.Logger, + descriptor_dim: int = DESCRIPTOR_DIM, + ) -> None: + if descriptor_dim < 1: + raise ValueError( + f"SelaVprStrategy.descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + self._inference_runtime = inference_runtime + self._engine_handle = engine_handle + self._descriptor_index = descriptor_index + self._preprocessor = preprocessor + self._normaliser = normaliser + self._faiss_bridge = faiss_bridge + self._fdr_client = fdr_client + self._clock = clock + self._logger = logger + self._descriptor_dim = descriptor_dim + + def embed_query( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> VprQuery: + try: + tensor = self._preprocessor.preprocess(frame, calibration) + except VprPreprocessError as exc: + self._emit_preprocess_error(frame, exc) + raise + + ns_start = self._clock.monotonic_ns() + try: + outputs = self._inference_runtime.infer( + self._engine_handle, {_ENGINE_INPUT_KEY: tensor} + ) + except Exception as exc: + wrapped = self._wrap_backbone_error(frame, exc) + raise wrapped from exc + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + if _OUTPUT_KEY not in outputs: + err = VprBackboneError( + f"SelaVPR forward returned no {_OUTPUT_KEY!r} key; " + f"got {sorted(outputs.keys())!r}" + ) + self._emit_backbone_error(frame, err) + raise err + + raw = np.asarray(outputs[_OUTPUT_KEY]) + if ( + raw.ndim != 2 + or raw.shape[0] != 1 + or raw.shape[1] != self._descriptor_dim + ): + err = VprBackboneError( + f"SelaVPR forward returned shape {raw.shape}; " + f"expected (1, {self._descriptor_dim})" + ) + self._emit_backbone_error(frame, err) + raise err + + flat = np.ascontiguousarray(raw[0], dtype=np.float16) + normalised = self._normaliser.l2_normalise(flat) + + self._emit_embed_record( + frame_id=int(frame.frame_id), latency_us=int(latency_us) + ) + + return VprQuery( + frame_id=int(frame.frame_id), + embedding=normalised, + produced_at=ns_end, + ) + + def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: + return self._faiss_bridge.retrieve( + query, k, backbone_label=_BACKBONE_LABEL + ) + + def descriptor_dim(self) -> int: + return self._descriptor_dim + + def _wrap_backbone_error( + self, frame: NavCameraFrame, exc: BaseException + ) -> VprBackboneError: + wrapped = VprBackboneError( + f"SelaVPR forward raised {type(exc).__name__}: {exc}" + ) + self._emit_backbone_error(frame, wrapped) + return wrapped + + def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_EMBED, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "descriptor_dim": self._descriptor_dim, + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.embed_query record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + }, + }, + ) + + def _emit_backbone_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"SelaVPR backbone error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BACKBONE_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_BACKBONE_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + def _emit_preprocess_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"SelaVPR preprocess error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PREPROCESS_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_PREPROCESS_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + +def _build_trt_build_config() -> BuildConfig: + return BuildConfig( + precision=PrecisionMode.FP16, + workspace_mb=0, + calibration_dataset=None, + optimization_profiles=(), + ) + + +def create( + config: Config, + *, + descriptor_index: DescriptorIndexCut, + inference_runtime: InferenceRuntimeCut, + fdr_client: FdrClient | None = None, + clock: Clock | None = None, + logger: logging.Logger | None = None, +) -> SelaVprStrategy: + """Module-level factory consumed by :func:`build_vpr_strategy`. + + SelaVPR is unselectable when the C7 TRT / ONNX-RT runtimes are + excluded — ``current_runtime_label()`` MUST be one of + ``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected + with :class:`ConfigError` at composition time. + + Engine output shape is asserted at create time via a single + dry-run inference on a zero-init input; mismatch raises + :class:`ConfigError` BEFORE the strategy is bound (AC-6). + + Optional keyword-only injection points (``fdr_client`` / ``clock`` / + ``logger``) keep tests deterministic; production wiring fills them + from the composition root. + """ + runtime_label = inference_runtime.current_runtime_label() + if runtime_label not in _ALLOWED_RUNTIME_LABELS: + raise ConfigError( + f"SelaVPR requires BUILD_TENSORRT_RUNTIME=ON (or " + f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary " + f"has runtime_label={runtime_label!r}." + ) + + block = config.components["c2_vpr"] + weights_path = block.backbone_weights_path + + if fdr_client is None: + raise ValueError( + "SelaVprStrategy.create: fdr_client is required; the " + "composition root must inject the running FDR client." + ) + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + if logger is None: + logger = logging.getLogger("gps_denied_onboard.c2_vpr.sela_vpr") + + entry = inference_runtime.compile_engine( + weights_path, _build_trt_build_config() + ) + handle = inference_runtime.deserialize_engine(entry) + + preprocessor = SelaVprBackbonePreprocessor(logger=logger) + normaliser = DescriptorNormaliser() + faiss_bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=DESCRIPTOR_DIM, + warn_top1_threshold=block.warn_top1_threshold, + debug_log_per_frame_distances=block.debug_per_frame_distances, + fdr_client=fdr_client, + logger=logger, + clock=clock, + ) + + _assert_engine_output_dim(inference_runtime, handle, preprocessor) + + logger.info( + "C2 VPR strategy ready", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_READY, + "kv": { + "strategy": _BACKBONE_LABEL, + "descriptor_dim": DESCRIPTOR_DIM, + }, + }, + ) + + return SelaVprStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=faiss_bridge, + fdr_client=fdr_client, + clock=clock, + logger=logger, + descriptor_dim=DESCRIPTOR_DIM, + ) + + +def _assert_engine_output_dim( + inference_runtime: InferenceRuntimeCut, + handle: EngineHandle, + preprocessor: SelaVprBackbonePreprocessor, +) -> None: + # The 7-way duplication of this helper (ultra_vpr / net_vlad / + # mega_loc / mix_vpr / sela_vpr / eigen_places / salad) is tracked + # by AZ-527 (hygiene PBI sized in parallel with AZ-339 land). The + # duplication is intentional for now: extracting earlier would + # expand AZ-340's scope past the three new strategies. + h, w = preprocessor.input_shape() + probe = np.zeros((1, 3, h, w), dtype=np.float16) + outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe}) + if _OUTPUT_KEY not in outputs: + raise ConfigError( + f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; " + f"got keys {sorted(outputs.keys())!r}" + ) + actual = np.asarray(outputs[_OUTPUT_KEY]) + if ( + actual.ndim != 2 + or actual.shape[0] != 1 + or actual.shape[1] != DESCRIPTOR_DIM + ): + raise ConfigError( + f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), " + f"got {tuple(actual.shape)}" + ) diff --git a/tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py b/tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py new file mode 100644 index 0000000..3a8cc59 --- /dev/null +++ b/tests/unit/c2_vpr/test_az340_sela_vpr_eigen_places_salad.py @@ -0,0 +1,835 @@ +"""AZ-340 — SelaVPR + EigenPlaces + SALAD secondary VprStrategy unit tests. + +Covers AC-1..AC-11 for all three strategies. Parametrised across the +strategies so the test surface stays compact (one test per AC times +three strategies) and any drift between the three implementations is +visible at the assertion level — same approach as +``test_az339_mega_loc_mix_vpr.py``. + +Uses fakes for :class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`, +and :class:`FdrClient` so the suite stays AZ-507-clean and TRT-free +(mirrors the precedent in ``test_ultra_vpr.py`` / ``test_az339_*``). +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Literal +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineCacheEntry, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.nav import NavCameraFrame +from gps_denied_onboard.components.c2_vpr import ( + C2VprConfig, + VprStrategy, +) +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor_eigen_places import ( + EigenPlacesBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr._preprocessor_salad import ( + SaladBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr._preprocessor_sela_vpr import ( + SelaVprBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.eigen_places import ( + DESCRIPTOR_DIM as EIGEN_PLACES_DIM, +) +from gps_denied_onboard.components.c2_vpr.eigen_places import ( + EigenPlacesStrategy, +) +from gps_denied_onboard.components.c2_vpr.eigen_places import ( + create as create_eigen_places, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.salad import ( + DESCRIPTOR_DIM as SALAD_DIM, +) +from gps_denied_onboard.components.c2_vpr.salad import ( + SaladStrategy, +) +from gps_denied_onboard.components.c2_vpr.salad import ( + create as create_salad, +) +from gps_denied_onboard.components.c2_vpr.sela_vpr import ( + DESCRIPTOR_DIM as SELA_VPR_DIM, +) +from gps_denied_onboard.components.c2_vpr.sela_vpr import ( + SelaVprStrategy, +) +from gps_denied_onboard.components.c2_vpr.sela_vpr import ( + create as create_sela_vpr, +) +from gps_denied_onboard.config.schema import Config, ConfigError +from gps_denied_onboard.fdr_client import FdrClient +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser + +# --------------------------------------------------------------------------- +# Parametrisation: each strategy + its bound constants +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class _StrategySpec: + name: str + strategy_cls: type + create_fn: Any + preprocessor_cls: type + descriptor_dim: int + backbone_label: str + input_hw: tuple[int, int] + + +_SPECS: list[_StrategySpec] = [ + _StrategySpec( + name="sela_vpr", + strategy_cls=SelaVprStrategy, + create_fn=create_sela_vpr, + preprocessor_cls=SelaVprBackbonePreprocessor, + descriptor_dim=SELA_VPR_DIM, + backbone_label="sela_vpr", + input_hw=(224, 224), + ), + _StrategySpec( + name="eigen_places", + strategy_cls=EigenPlacesStrategy, + create_fn=create_eigen_places, + preprocessor_cls=EigenPlacesBackbonePreprocessor, + descriptor_dim=EIGEN_PLACES_DIM, + backbone_label="eigen_places", + input_hw=(480, 480), + ), + _StrategySpec( + name="salad", + strategy_cls=SaladStrategy, + create_fn=create_salad, + preprocessor_cls=SaladBackbonePreprocessor, + descriptor_dim=SALAD_DIM, + backbone_label="salad", + input_hw=(322, 322), + ), +] + + +@pytest.fixture(params=_SPECS, ids=[s.name for s in _SPECS]) +def spec(request: pytest.FixtureRequest) -> _StrategySpec: + return request.param + + +# --------------------------------------------------------------------------- +# Fakes (mirrors test_az339_*.py shape) +# --------------------------------------------------------------------------- + + +@dataclass +class _StubClock: + next_monotonic_ns: int = 1_000_000_000 + step_ns: int = 5_000 + fixed_time_ns: int = 1_715_600_000_000_000_000 + + def monotonic_ns(self) -> int: + v = self.next_monotonic_ns + self.next_monotonic_ns += self.step_ns + return v + + def time_ns(self) -> int: + return self.fixed_time_ns + + def sleep_until_ns(self, target_ns: int) -> None: + _ = target_ns + + +class _FakeEngineHandle(EngineHandle): + def __init__(self, label: str) -> None: + self.label = label + + +@dataclass +class _FakeInferenceRuntime: + descriptor_dim: int = 512 + raises: BaseException | None = None + runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = ( + "tensorrt" + ) + fixed_output: np.ndarray | None = None + output_key: str = "embedding" + calls: list[dict[str, np.ndarray]] = field(default_factory=list) + deserialize_calls: list[EngineCacheEntry] = field(default_factory=list) + model_name: str = "sela_vpr" + + def compile_engine( + self, model_path: Path, build_config: BuildConfig + ) -> EngineCacheEntry: + _ = build_config + return EngineCacheEntry( + engine_path=Path(model_path), + sha256_hex="0" * 64, + sm=None, + jp=None, + trt=None, + precision=PrecisionMode.FP16, + extras={"model_name": self.model_name}, + ) + + def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: + self.deserialize_calls.append(entry) + return _FakeEngineHandle(label=entry.extras.get("model_name", "")) + + def infer( + self, handle: EngineHandle, inputs: dict[str, np.ndarray] + ) -> dict[str, np.ndarray]: + _ = handle + self.calls.append({k: v.copy() for k, v in inputs.items()}) + if self.raises is not None: + raise self.raises + if self.fixed_output is not None: + return {self.output_key: self.fixed_output.copy()} + rng = np.random.default_rng(0xCAFEBABE) + tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16) + return { + self.output_key: tensor.reshape(1, self.descriptor_dim).copy() + } + + def release_engine(self, handle: EngineHandle) -> None: + _ = handle + + def current_runtime_label( + self, + ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: + return self.runtime_label + + +@dataclass +class _FakeDescriptorIndex: + descriptor_dim_value: int = 512 + results: list[tuple[tuple[int, float, float], float]] = field( + default_factory=list + ) + raises: BaseException | None = None + + def search_topk( + self, query: np.ndarray, k: int + ) -> list[tuple[tuple[int, float, float], float]]: + _ = query + if self.raises is not None: + raise self.raises + if not self.results: + return [ + ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i) + for i in range(k) + ] + return list(self.results[:k]) + + def descriptor_dim(self) -> int: + return self.descriptor_dim_value + + +# --------------------------------------------------------------------------- +# Fixture helpers +# --------------------------------------------------------------------------- + + +def _make_frame( + *, frame_id: int = 4242, h: int = 720, w: int = 1280 +) -> NavCameraFrame: + rng = np.random.default_rng(frame_id) + image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8) + return NavCameraFrame( + frame_id=frame_id, + timestamp=datetime(2026, 5, 14, 0, 0, 0), + image=image, + camera_calibration_id="test_cam", + ) + + +def _make_calibration( + *, cx: float = 640.0, cy: float = 360.0 +) -> CameraCalibration: + intrinsics = np.array( + [ + [1000.0, 0.0, cx], + [0.0, 1000.0, cy], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + return CameraCalibration( + camera_id="test_cam", + intrinsics_3x3=intrinsics, + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_fixture", + ) + + +def _make_fdr_client() -> FdrClient: + return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False) + + +def _build_strategy( + spec: _StrategySpec, + *, + inference_runtime: _FakeInferenceRuntime | None = None, + descriptor_index: _FakeDescriptorIndex | None = None, + preprocessor: Any = None, + fdr_client: FdrClient | None = None, + clock: _StubClock | None = None, + descriptor_dim: int | None = None, +) -> Any: + dim = spec.descriptor_dim if descriptor_dim is None else descriptor_dim + inference_runtime = inference_runtime or _FakeInferenceRuntime( + descriptor_dim=dim, model_name=spec.name + ) + descriptor_index = descriptor_index or _FakeDescriptorIndex( + descriptor_dim_value=dim + ) + preprocessor = preprocessor or spec.preprocessor_cls() + fdr_client = fdr_client or _make_fdr_client() + clock = clock or _StubClock() + handle = _FakeEngineHandle(label=spec.name) + bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=dim, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger(f"test.{spec.name}.bridge"), + clock=clock, + ) + return spec.strategy_cls( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=DescriptorNormaliser(), + faiss_bridge=bridge, + fdr_client=fdr_client, + clock=clock, + logger=logging.getLogger(f"test.{spec.name}"), + descriptor_dim=dim, + ) + + +def _build_config(strategy_name: str) -> Config: + c2 = C2VprConfig( + strategy=strategy_name, + backbone_weights_path=Path(f"/models/{strategy_name}.trt"), + faiss_index_path=Path("/cache/vpr/index.faiss"), + warn_top1_threshold=0.30, + debug_per_frame_distances=False, + ) + cfg = MagicMock(spec=Config) + cfg.components = {"c2_vpr": c2} + return cfg + + +# --------------------------------------------------------------------------- +# AC-1: Protocol conformance +# --------------------------------------------------------------------------- + + +def test_ac1_protocol_conformance(spec: _StrategySpec) -> None: + strategy = _build_strategy(spec) + assert isinstance(strategy, VprStrategy) + + +# --------------------------------------------------------------------------- +# AC-2: embed_query → L2-normalised FP16 embedding of correct dim +# --------------------------------------------------------------------------- + + +def test_ac2_embed_query_returns_unit_norm_fp16_correct_dim( + spec: _StrategySpec, +) -> None: + # Arrange + strategy = _build_strategy(spec) + frame = _make_frame() + calibration = _make_calibration() + + # Act + query = strategy.embed_query(frame, calibration) + + # Assert + embedding = np.asarray(query.embedding) + assert embedding.shape == (spec.descriptor_dim,) + assert embedding.dtype == np.float16 + norm = float(np.linalg.norm(embedding.astype(np.float32))) + assert norm == pytest.approx(1.0, abs=1e-3) + + +def test_ac2_single_stage_l2_no_intra_cluster_call( + spec: _StrategySpec, +) -> None: + """Secondary backbones use single-stage L2 (no NetVLAD-style intra-cluster step).""" + # Arrange + calls: list[str] = [] + + class _SpyNormaliser(DescriptorNormaliser): + def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override] + calls.append("l2_normalise") + return DescriptorNormaliser.l2_normalise(descriptor) + + def intra_cluster_normalise( # type: ignore[override] + self, descriptor: np.ndarray, num_clusters: int + ) -> np.ndarray: + calls.append("intra_cluster_normalise") + return DescriptorNormaliser.intra_cluster_normalise( + descriptor, num_clusters + ) + + inference_runtime = _FakeInferenceRuntime(descriptor_dim=spec.descriptor_dim) + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=spec.descriptor_dim, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger(f"test.{spec.name}.bridge"), + clock=clock, + ) + strategy = spec.strategy_cls( + inference_runtime=inference_runtime, + engine_handle=_FakeEngineHandle(spec.name), + descriptor_index=descriptor_index, + preprocessor=spec.preprocessor_cls(), + normaliser=_SpyNormaliser(), + faiss_bridge=bridge, + fdr_client=fdr_client, + clock=clock, + logger=logging.getLogger(f"test.{spec.name}"), + descriptor_dim=spec.descriptor_dim, + ) + + # Act + strategy.embed_query(_make_frame(), _make_calibration()) + + # Assert + assert "intra_cluster_normalise" not in calls + assert calls == ["l2_normalise"] + + +# --------------------------------------------------------------------------- +# AC-3: deterministic embeddings +# --------------------------------------------------------------------------- + + +def test_ac3_embed_query_deterministic_for_same_frame( + spec: _StrategySpec, +) -> None: + # Arrange + rng = np.random.default_rng(2026) + fixed = rng.standard_normal(spec.descriptor_dim).astype(np.float16) + fixed = fixed.reshape(1, spec.descriptor_dim) + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, fixed_output=fixed + ) + strategy = _build_strategy(spec, inference_runtime=runtime) + frame = _make_frame() + calibration = _make_calibration() + + # Act + first = strategy.embed_query(frame, calibration) + second = strategy.embed_query(frame, calibration) + third = strategy.embed_query(frame, calibration) + + # Assert + np.testing.assert_array_equal( + np.asarray(first.embedding), np.asarray(second.embedding) + ) + np.testing.assert_array_equal( + np.asarray(second.embedding), np.asarray(third.embedding) + ) + + +# --------------------------------------------------------------------------- +# AC-4: retrieve_topk returns k candidates with correct backbone_label +# --------------------------------------------------------------------------- + + +def test_ac4_retrieve_topk_returns_exactly_k_with_correct_label( + spec: _StrategySpec, +) -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ) + strategy = _build_strategy(spec, descriptor_index=descriptor_index) + + # Act + query = strategy.embed_query(_make_frame(), _make_calibration()) + result = strategy.retrieve_topk(query, k=10) + + # Assert + assert len(result.candidates) == 10 + assert result.backbone_label == spec.backbone_label + assert result.candidates[0].descriptor_dim == spec.descriptor_dim + distances = [c.descriptor_distance for c in result.candidates] + assert distances == sorted(distances) + + +# --------------------------------------------------------------------------- +# AC-5: descriptor_dim() is stable +# --------------------------------------------------------------------------- + + +def test_ac5_descriptor_dim_stable(spec: _StrategySpec) -> None: + strategy = _build_strategy(spec) + for _ in range(100): + assert strategy.descriptor_dim() == spec.descriptor_dim + + +# --------------------------------------------------------------------------- +# AC-6: Engine output shape mismatch → ConfigError at create() +# --------------------------------------------------------------------------- + + +def test_ac6_create_rejects_engine_output_shape_mismatch( + spec: _StrategySpec, +) -> None: + # Arrange — engine produces (1, 100), expected (1, spec.descriptor_dim) + wrong = np.zeros((1, 100), dtype=np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, + fixed_output=wrong, + model_name=spec.name, + ) + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ) + + # Act + Assert + with pytest.raises(ConfigError, match=r"engine output shape mismatch"): + spec.create_fn( + _build_config(spec.name), + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +def test_ac6_create_rejects_missing_embedding_key( + spec: _StrategySpec, +) -> None: + # Arrange + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, + output_key="wrong_key", + model_name=spec.name, + ) + + # Act + Assert + with pytest.raises(ConfigError, match=r"'embedding' key absent"): + spec.create_fn( + _build_config(spec.name), + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-7: VprBackboneError on forward-pass failure +# --------------------------------------------------------------------------- + + +def test_ac7_runtime_error_yields_vpr_backbone_error( + spec: _StrategySpec, caplog: pytest.LogCaptureFixture +) -> None: + # Arrange + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, raises=RuntimeError("CUDA OOM") + ) + fdr_client = _make_fdr_client() + strategy = _build_strategy( + spec, inference_runtime=runtime, fdr_client=fdr_client + ) + + # Act + with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"): + with pytest.raises(VprBackboneError): + strategy.embed_query(_make_frame(), _make_calibration()) + + # Assert + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.backbone_error" + for record in caplog.records + ) + records: list[Any] = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"] + assert len(backbone_errors) == 1 + + +def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error( + spec: _StrategySpec, +) -> None: + # Arrange + bad = np.zeros((1, 100), dtype=np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, fixed_output=bad + ) + strategy = _build_strategy(spec, inference_runtime=runtime) + + # Act + Assert + with pytest.raises( + VprBackboneError, match=rf"expected \(1, {spec.descriptor_dim}\)" + ): + strategy.embed_query(_make_frame(), _make_calibration()) + + +# --------------------------------------------------------------------------- +# AC-8: VprPreprocessError on corrupt image bytes +# --------------------------------------------------------------------------- + + +def test_ac8_corrupt_image_yields_vpr_preprocess_error( + spec: _StrategySpec, caplog: pytest.LogCaptureFixture +) -> None: + # Arrange + fdr_client = _make_fdr_client() + strategy = _build_strategy(spec, fdr_client=fdr_client) + frame = NavCameraFrame( + frame_id=4242, + timestamp=datetime(2026, 5, 14, 0, 0, 0), + image="not-an-array", + camera_calibration_id="test_cam", + ) + + # Act + with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"): + with pytest.raises(VprPreprocessError): + strategy.embed_query(frame, _make_calibration()) + + # Assert + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.preprocess_error" + for record in caplog.records + ) + records: list[Any] = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + preprocess_errors = [ + r for r in records if r.kind == "vpr.preprocess_error" + ] + assert len(preprocess_errors) == 1 + + +# --------------------------------------------------------------------------- +# AC-9: Composition-root wiring + INFO "c2.vpr.ready" log emitted +# --------------------------------------------------------------------------- + + +def test_ac9_create_emits_ready_log_with_correct_label_and_dim( + spec: _StrategySpec, caplog: pytest.LogCaptureFixture +) -> None: + # Arrange + logger_name = f"gps_denied_onboard.c2_vpr.{spec.name}" + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, model_name=spec.name + ) + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ) + + # Act + with caplog.at_level(logging.INFO, logger=logger_name): + strategy = spec.create_fn( + _build_config(spec.name), + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + # Assert + assert isinstance(strategy, spec.strategy_cls) + ready_records = [ + r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready" + ] + assert len(ready_records) == 1 + kv = getattr(ready_records[0], "kv", {}) + assert kv == { + "strategy": spec.backbone_label, + "descriptor_dim": spec.descriptor_dim, + } + + +# --------------------------------------------------------------------------- +# AC-10: Build-flag exclusion → composition-time fail-fast +# --------------------------------------------------------------------------- + + +def test_ac10_runtime_label_mismatch_raises_config_error( + spec: _StrategySpec, +) -> None: + """Selecting a secondary backbone on a binary built without the + TRT / ONNX-RT runtimes fails fast at create-time. + + Note: AC-10 of the task spec literally names ``ConfigurationError``; + the existing factory contract (AZ-336) raises + ``StrategyNotAvailableError`` via the BUILD_VPR_* env-flag check + BEFORE create() is reached, but the strategy module's own runtime + label guard surfaces a ``ConfigError`` for the same intent + (wrong runtime). Both are composition-time fail-fast errors. Same + mirroring precedent as AZ-337 / AZ-338 / AZ-339. + """ + # Arrange + runtime = _FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, + runtime_label="pytorch_fp16", + model_name=spec.name, + ) + + # Act + Assert + with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME"): + spec.create_fn( + _build_config(spec.name), + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-11: Preprocessor input shape +# --------------------------------------------------------------------------- + + +def test_ac11_preprocessor_input_shape(spec: _StrategySpec) -> None: + preprocessor = spec.preprocessor_cls() + assert preprocessor.input_shape() == spec.input_hw + + +def test_preprocess_output_is_nchw_fp16(spec: _StrategySpec) -> None: + # Arrange + preprocessor = spec.preprocessor_cls() + frame = _make_frame() + calibration = _make_calibration() + + # Act + tensor = preprocessor.preprocess(frame, calibration) + + # Assert + h, w = spec.input_hw + assert tensor.shape == (1, 3, h, w) + assert tensor.dtype == np.float16 + + +# --------------------------------------------------------------------------- +# Constructor validation +# --------------------------------------------------------------------------- + + +def test_constructor_rejects_zero_descriptor_dim(spec: _StrategySpec) -> None: + # Arrange (skip _build_strategy to bypass FaissBridge's own validation) + fdr_client = _make_fdr_client() + clock = _StubClock() + descriptor_index = _FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ) + bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=spec.descriptor_dim, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger(f"test.{spec.name}.bridge"), + clock=clock, + ) + + # Act + Assert + with pytest.raises(ValueError, match=r"descriptor_dim must be >= 1"): + spec.strategy_cls( + inference_runtime=_FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, model_name=spec.name + ), + engine_handle=_FakeEngineHandle(spec.name), + descriptor_index=descriptor_index, + preprocessor=spec.preprocessor_cls(), + normaliser=DescriptorNormaliser(), + faiss_bridge=bridge, + fdr_client=fdr_client, + clock=clock, + logger=logging.getLogger(f"test.{spec.name}"), + descriptor_dim=0, + ) + + +def test_create_requires_fdr_client(spec: _StrategySpec) -> None: + with pytest.raises(ValueError, match=r"fdr_client is required"): + spec.create_fn( + _build_config(spec.name), + descriptor_index=_FakeDescriptorIndex( + descriptor_dim_value=spec.descriptor_dim + ), + inference_runtime=_FakeInferenceRuntime( + descriptor_dim=spec.descriptor_dim, model_name=spec.name + ), + fdr_client=None, + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# FDR emission on success path +# --------------------------------------------------------------------------- + + +def test_embed_query_emits_fdr_record(spec: _StrategySpec) -> None: + # Arrange + fdr_client = _make_fdr_client() + strategy = _build_strategy(spec, fdr_client=fdr_client) + + # Act + strategy.embed_query(_make_frame(), _make_calibration()) + + # Assert + records: list[Any] = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + embed = [r for r in records if r.kind == "vpr.embed_query"] + assert len(embed) == 1 + payload = embed[0].payload + assert payload["backbone_label"] == spec.backbone_label + assert payload["descriptor_dim"] == spec.descriptor_dim + assert payload["latency_us"] >= 1