[AZ-339] C2 MegaLoc + MixVPR secondary VPR backbones

Adds two research-only VprStrategy implementations for the IT-12
comparative-study matrix. MegaLocStrategy (D=2048, 322x322) and
MixVprStrategy (D=4096, 320x320), both via C7 TensorRT FP16 with
their own concrete BackbonePreprocessor. Single-stage global L2
normalisation; retrieval delegated to FaissBridge; FDR records +
structured logs identical to UltraVPR. BUILD_VPR_MEGALOC and
BUILD_VPR_MIXVPR ON for research/replay-cli only, OFF for airborne
and operator-tooling (fail-fast at composition root via existing
AZ-336 factory). Uses helpers.iso_ts_from_clock from day 1 — no
new timestamp helper duplicates introduced.

36 parametrised AC tests + 25 protocol-conformance + 18 helper
regression tests pass; 1690 / 1690 unit tests pass (excluding 1
pre-existing flaky cold-start subprocess test in c12). Verdict:
PASS_WITH_WARNINGS — one Medium follow-on (AZ-527 to consolidate
4-way _assert_engine_output_dim) + one Low AC wording drift.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 23:52:54 +03:00
parent 5dfd9a577e
commit 0d65ff4705
9 changed files with 2283 additions and 1 deletions
@@ -0,0 +1,63 @@
# Batch 50 — Implementation Report (Cycle 1)
**Tasks**: AZ-339 (C2 MegaLoc + MixVPR Secondary Backbones — Research-only)
**Date**: 2026-05-13
**Cycle**: 1
**Status**: COMPLETE (review verdict: PASS_WITH_WARNINGS, one Medium + one Low finding)
## What was done
Added two secondary `VprStrategy` implementations for IT-12 comparative-study: `MegaLocStrategy` (D=2048, 322×322 input) and `MixVprStrategy` (D=4096, 320×320 input). Both run via the C7 TensorRT runtime (or ONNX-Runtime fallback), apply ImageNet mean/std preprocessing + single-stage L2 normalisation, and delegate retrieval to `FaissBridge`. Both are gated OFF for airborne and operator-tooling per ADR-002 — `BUILD_VPR_MEGALOC` and `BUILD_VPR_MIXVPR` ON only for the research binary and replay-cli.
### Files added (5)
| File | Purpose |
|------|---------|
| `src/gps_denied_onboard/components/c2_vpr/mega_loc.py` | `MegaLocStrategy` class + `create()` factory + `_assert_engine_output_dim` helper |
| `src/gps_denied_onboard/components/c2_vpr/_preprocessor_mega_loc.py` | `MegaLocBackbonePreprocessor` (centre-crop + 322×322 resize + ImageNet normalise + FP16 NCHW) |
| `src/gps_denied_onboard/components/c2_vpr/mix_vpr.py` | `MixVprStrategy` class + `create()` factory + `_assert_engine_output_dim` helper |
| `src/gps_denied_onboard/components/c2_vpr/_preprocessor_mix_vpr.py` | `MixVprBackbonePreprocessor` (centre-crop + 320×320 resize + ImageNet normalise + FP16 NCHW) |
| `tests/unit/c2_vpr/test_az339_mega_loc_mix_vpr.py` | 36 parametrised AC tests across both strategies |
### Files changed
- _None._ The composition-root factory (`runtime_root/vpr_factory.py`) was already wired for `mega_loc` and `mix_vpr` strategy names at AZ-336 land time — `_STRATEGY_TO_BUILD_FLAG` and `_STRATEGY_TO_MODULE` tables already include the rows. The `KNOWN_STRATEGIES` frozenset in `c2_vpr/config.py` already includes both. The `module-layout.md` `Component: c2_vpr` § Internal list already names `mega_loc.py` and `mix_vpr.py` (pre-declared by AZ-336). No CMake change required — `BUILD_VPR_*` gating is environment-variable-based per `_is_build_flag_on` in `vpr_factory.py`.
## AC coverage
All 11 ACs verified per strategy via the parametrised test suite. See `_docs/03_implementation/reviews/batch_50_review.md` § Phase 2 for the AC ↔ test mapping table.
| AC | Status | Notes |
|----|--------|-------|
| AC-1..AC-9 + AC-11 | PASS | Each AC parametrised over both strategies (36 test cases total) |
| AC-10 | PASS with drift | Implementation raises `StrategyNotAvailableError` (env-flag OFF path) and `ConfigError` (runtime-label mismatch path); the spec literally names `ConfigurationError`. Mirrors the established AZ-337 / AZ-338 precedent. Logged as Low finding F2. |
## Test results
- `tests/unit/c2_vpr/test_az339_mega_loc_mix_vpr.py`**36 / 36 PASS**.
- `tests/unit/c2_vpr/test_protocol_conformance.py`**25 / 25 PASS** (auto-extends across all 7 strategies; the two new ones are picked up by the parametrised `_STRATEGY_MODULES` table without test changes).
- `tests/unit/c2_vpr/` (full directory: faiss_bridge + net_vlad + ultra_vpr + new AZ-339 file) — **126 / 126 PASS**.
- `tests/unit/test_az508_iso_timestamps.py`**18 / 18 PASS** (AZ-526 regression guard confirms no new `_iso_ts_from_clock` duplicates introduced by the AZ-339 strategies).
- `tests/unit/test_az270_compose_root.py`**8 / 8 PASS**.
- `tests/unit/test_az272_fdr_record_schema.py`**33 / 33 PASS** (unmodified; the new strategies emit FDR records that match the existing schema).
- Full unit suite: **1690 passed, 80 skipped (TRT/CUDA/actionlint), 1 pre-existing failure** (`test_cold_start_under_500ms_p99` — subprocess timeout on cold-start latency budget, unrelated; confirmed by stashing AZ-339 changes and re-running).
- `ruff check` on all 5 new files — clean.
## Architectural decisions
1. **Single parametrised test file `test_az339_mega_loc_mix_vpr.py`** — rather than two near-identical files mirroring `test_ultra_vpr.py` / `test_net_vlad.py`. The two strategies share byte-identical behavioural contracts (same Protocol, same FDR record kinds, same log kinds, same error envelope) and differ only on three values (`DESCRIPTOR_DIM`, `_BACKBONE_LABEL`, preprocessor `input_shape()`). A parametrised approach keeps any future drift visible at the assertion level and reduces the test surface from ~1500 lines (two copies of test_ultra_vpr.py) to ~700 lines.
2. **Preprocessor duplication preserved** (mega_loc vs mix_vpr vs ultra_vpr) — per `components/02_c2_vpr/description.md` § 6 and the task spec § Constraints. Each preprocessor owns its own input-shape constants so a future code drop can change a backbone's preprocessing without coupling other strategies' weights-versions.
3. **`_assert_engine_output_dim` duplicated, NOT extracted** — see Spec Drift / Review Finding F1 below. The cleaner path is a dedicated AZ-527 hygiene PBI mirroring AZ-508 → AZ-526.
4. **`iso_ts_from_clock` imported from the AZ-526 helper from day 1** — neither new strategy introduces a local `_iso_ts_from_clock` body. The AZ-526 regression guard test confirms this.
5. **Runtime-label guard placed inside `create()`** (not in `__init__`) — runtime selection is a composition-time concern; once the strategy is constructed it's expected to work. Matches the UltraVPR / NetVLAD precedent.
## Spec drift noted (carried into review F2)
AZ-339 § AC-10 literally specifies `ConfigurationError` for the build-flag-OFF case. The existing AZ-336 composition-root factory raises `StrategyNotAvailableError` for this case (per its own contract and test coverage at `test_protocol_conformance.py:268-274`). The strategy module's own runtime-label guard raises `ConfigError` for the related "wrong C7 runtime" case. AZ-337 (UltraVPR) and AZ-338 (NetVLAD) followed this same pattern; AZ-339 mirrors them. AC-10 wording should be amended in a future spec pass; no code change required.
## Cumulative review obligation
This batch is mid-window (batch 50, next cumulative review at batch 51 / batches 49-51). The new finding F1 (`_assert_engine_output_dim` 4-way duplication) will surface in that cumulative review, and AZ-527 (the planned hygiene PBI) will close it. The AZ-526 regression guard test confirmed that neither AZ-526's F1+F3 closure regressed in AZ-339.
## Follow-on PBI
**AZ-527** (Hygiene — consolidate `_assert_engine_output_dim` into a c2-internal helper). 2 points. Depends on AZ-339. To be created and prioritised as Batch 51 or 52.
@@ -0,0 +1,104 @@
# Code Review Report
**Batch**: 50 — AZ-339 (C2 MegaLoc + MixVPR Secondary Backbones)
**Date**: 2026-05-13
**Verdict**: PASS_WITH_WARNINGS
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| 1 | Medium | Maintainability/Architecture | `c2_vpr/mega_loc.py:438`, `mix_vpr.py:432`, `ultra_vpr.py:432`, `net_vlad.py:494` | `_assert_engine_output_dim` now 4-way duplicated — schedule AZ-527 |
| 2 | Low | Scope | AZ-339 task spec § AC-10 | AC-10 names `ConfigurationError`; precedent + impl raise `StrategyNotAvailableError` / `ConfigError` |
### Finding Details
**F1: `_assert_engine_output_dim` now 4-way duplicated** (Medium / Maintainability + Architecture)
- Locations:
- `src/gps_denied_onboard/components/c2_vpr/ultra_vpr.py:432`
- `src/gps_denied_onboard/components/c2_vpr/net_vlad.py:494`
- `src/gps_denied_onboard/components/c2_vpr/mega_loc.py:438`
- `src/gps_denied_onboard/components/c2_vpr/mix_vpr.py:432`
- Description: Each strategy module ships a near-identical ~22-line `_assert_engine_output_dim(inference_runtime, handle, preprocessor)` helper. Bodies vary only on three values: `_OUTPUT_KEY` (always `"embedding"` for mega_loc / mix_vpr / ultra_vpr; `"vlad_descriptor"` for net_vlad), `DESCRIPTOR_DIM` (per-strategy constant), and `preprocessor.input_shape()`. Same drift signature as AZ-508 → AZ-526 (`_iso_ts_now` / `_iso_ts_from_clock`).
- The cumulative review (batches 46-48) flagged this duplication as F2 and recommended deferring "until a third VPR strategy joins (AZ-339 batch)". That trigger has fired.
- Suggestion: Create AZ-527 (Hygiene — consolidate `_assert_engine_output_dim` into a c2-internal helper). Signature: `_assert_engine_output_dim(inference_runtime, handle, *, expected_dim, output_key, input_shape)`. 2 points; depends on AZ-339.
- Inline comments in the new mega_loc.py and mix_vpr.py already cite AZ-527 as the planned follow-on so the duplication is intentional, not accidental.
- Task: AZ-339 (carries forward from cumulative-46-48 F2).
**F2: AC-10 names `ConfigurationError` but precedent / implementation raise `StrategyNotAvailableError` / `ConfigError`** (Low / Scope)
- Location: `_docs/02_tasks/todo/AZ-339_c2_megaloc_mixvpr.md` § AC-10.
- Description: AC-10 literally reads "`ConfigurationError` is raised at composition-root time with message containing the missing flag; the binary refuses to start (fail-fast per AZ-336 factory's lazy-import → ImportError → `ConfigurationError` mapping)". The existing AZ-336 factory (`build_vpr_strategy`) raises **`StrategyNotAvailableError`** for the `BUILD_VPR_<X>=OFF` case (verified via `tests/unit/c2_vpr/test_protocol_conformance.py:268-274` for UltraVPR; same pattern auto-extends to MegaLoc / MixVPR via the parametrized `_STRATEGY_MODULES` table). `StrategyNotAvailableError` is a `RuntimeError` subclass, NOT a `ConfigError`. AZ-337 / AZ-338 followed this precedent; AZ-339 does the same. The strategy module's own runtime-label guard raises `ConfigError` (the "wrong C7 runtime label" case), which satisfies AC-10's *spirit* of "composition-time fail-fast".
- Implementation choice: mirrored the existing precedent.
- Suggestion: amend AC-10 to read "`StrategyNotAvailableError` (for BUILD flag OFF) or `ConfigError` (for runtime-label mismatch) at composition-root time, with a message naming the missing flag or runtime". Recorded as drift; no code change required.
- Task: AZ-339.
## Phase Summary
### Phase 1 — Context Loading
Read AZ-339 task spec (208 lines, AC-1..AC-11 per strategy + NFRs), the AZ-337 UltraVPR + AZ-338 NetVLAD precedents (`ultra_vpr.py`, `net_vlad.py`, `_preprocessor_ultra_vpr.py`), the AZ-336 composition-root factory (`vpr_factory.py`), the AZ-336 C2VprConfig + KNOWN_STRATEGIES, and the `cumulative_review_batches_46-48_cycle1_report.md` F2 finding. Mapped 5 new files (2 strategy, 2 preprocessor, 1 test) to AZ-339.
### Phase 2 — Spec Compliance
| AC | Verified by | Status |
|----|-------------|--------|
| AC-1 (Protocol conformance) | `test_ac1_protocol_conformance[mega_loc]`, `[mix_vpr]` | PASS |
| AC-2 (L2-norm FP16 correct dim) | `test_ac2_embed_query_returns_unit_norm_fp16_correct_dim[*]`, `test_ac2_single_stage_l2_no_intra_cluster_call[*]` | PASS |
| AC-3 (deterministic embeddings) | `test_ac3_embed_query_deterministic_for_same_frame[*]` | PASS |
| AC-4 (retrieve_topk k + label) | `test_ac4_retrieve_topk_returns_exactly_k_with_correct_label[*]` | PASS |
| AC-5 (descriptor_dim stable) | `test_ac5_descriptor_dim_stable[*]` | PASS |
| AC-6 (engine shape mismatch → ConfigError at create) | `test_ac6_create_rejects_engine_output_shape_mismatch[*]`, `test_ac6_create_rejects_missing_embedding_key[*]` | PASS |
| AC-7 (VprBackboneError on forward failure) | `test_ac7_runtime_error_yields_vpr_backbone_error[*]`, `test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error[*]` | PASS |
| AC-8 (VprPreprocessError on corrupt image) | `test_ac8_corrupt_image_yields_vpr_preprocess_error[*]` | PASS |
| AC-9 (compose wiring + INFO ready log) | `test_ac9_create_emits_ready_log_with_correct_label_and_dim[*]` | PASS |
| AC-10 (build-flag exclusion fail-fast) | `test_ac10_runtime_label_mismatch_raises_config_error[*]` + `tests/unit/c2_vpr/test_protocol_conformance.py` parametrised over `_STRATEGY_MODULES` (auto-covers mega_loc + mix_vpr) | PASS with F2 wording drift |
| AC-11 (preprocessor input shape) | `test_ac11_preprocessor_input_shape[*]`, `test_preprocess_output_is_nchw_fp16[*]` | PASS |
36 / 36 tests in `test_az339_mega_loc_mix_vpr.py` pass; 25 / 25 in `test_protocol_conformance.py` pass (now auto-covering the two new strategies via the existing parametrised module-import table).
### Phase 3 — Code Quality
- **SRP**: Strategy class = embed + retrieve via injected dependencies. Preprocessor class = decode + crop + resize + normalise. Each error handler is a separate helper method. Factory `create()` is wiring-only.
- **Error handling**: Every failure path emits a structured ERROR log AND an FDR record before raising. Errors are explicitly re-raised; no swallowed exceptions.
- **Naming**: Consistent with the UltraVPR precedent — `_BACKBONE_LABEL`, `_OUTPUT_KEY`, `_LOG_KIND_*`, `_FDR_KIND_*`, `_assert_engine_output_dim`. `DESCRIPTOR_DIM` is module-level Final per strategy (2048 / 4096), matching the AZ-337 / AZ-338 pattern.
- **Complexity**: Strategy class ~310 lines (incl. error handlers); `embed_query` ~55 lines (within the 50-line guidance; same shape as UltraVPR). Cyclomatic complexity low.
- **DRY**: Strategy-pair duplication (mega_loc vs mix_vpr) is **intentional** per the task spec § Constraints: "Each strategy ships its own concrete preprocessor — preprocessing parameters per upstream code drop … sharing would couple weights-versions across strategies and let one strategy's upgrade silently break another's preprocessing." `_assert_engine_output_dim` duplication is unintentional — see F1.
- **Test quality**: AAA pattern with explicit markers. Parametrised across `_StrategySpec` to keep cross-strategy assertions semantically identical. Each AC has at least one parametrised test plus targeted variants for failure modes.
- **Dead code**: None introduced. `Literal` import in strategy modules is used by `_BACKBONE_LABEL: Final[Literal["mega_loc"]]` / `["mix_vpr"]` annotations.
### Phase 4 — Security Quick-Scan
- No SQL, no shell, no `eval` / `exec`, no dynamic deserialisation.
- `cv2.resize` is the only third-party call; inputs are typed `np.ndarray` and validated for dtype / ndim / shape upstream.
- `error_message[:512]` truncation prevents pathological log-line / FDR-payload growth on a long backbone error.
- No hardcoded secrets, API keys, or paths beyond test-fixture placeholders (`/models/mega_loc.trt`, `/cache/vpr/index.faiss`).
- Image inputs are byte-bounded (`uint8` only); rejection paths emit `VprPreprocessError` not raw `cv2.error`.
### Phase 5 — Performance Scan
- Construction is O(1) (no GPU ops in `__init__` per the task spec § Constraints).
- `embed_query` is O(H·W) for decode / resize / normalise — same algorithmic cost as UltraVPR. The 2048-d / 4096-d FP16 embedding is allocated once per frame.
- No N+1 patterns, no unbounded fetching.
- One FDR-record allocation per frame on the success path — same per-frame allocation cost as UltraVPR; sits well below the bounded `capacity` of the FdrClient ring.
- NFR-perf budgets (MegaLoc ≤ 80 ms p95, MixVPR ≤ 100 ms p95) are research-side guidance per the task spec § NFR; not engine-rule-binding. Cannot be measured in unit tests; deferred to Step 9 / E-BBT against the real engines per the task spec § Risks 1 + 4.
### Phase 6 — Cross-Task Consistency
Single-batch with two strategies — they were implemented in lockstep, share the same factory `create()` shape, the same error envelope, the same FDR record kinds (`vpr.embed_query`, `vpr.backbone_error`, `vpr.preprocess_error`), and the same log kinds. The parametrised test surface verifies behavioural equivalence directly.
### Phase 7 — Architecture Compliance
- **Layer direction**: c2_vpr modules import from `_types` (L1), `clock` (L1), `helpers.descriptor_normaliser` (L1), `helpers.iso_timestamps` (L1), `config.schema` (L1), `fdr_client` (L2), and internal `c2_vpr` modules. No upward imports. **PASS.**
- **Public API respect**: The strategies do NOT import from `c6_tile_cache` or `c7_inference` directly — they use the consumer-side cuts (`DescriptorIndexCut`, `InferenceRuntimeCut`) defined locally in c2_vpr per AZ-507. **PASS.**
- **No new cyclic dependencies**: New modules sit in c2_vpr leaf positions; no incoming imports from c2_5 / c3 / runtime_root that didn't already exist for ultra_vpr / net_vlad. **PASS.**
- **Duplicate symbols**: F1 (above) — `_assert_engine_output_dim` is the only new duplication. Strategy class names are unique (`MegaLocStrategy`, `MixVprStrategy`). Preprocessor class names are unique. Constants (`DESCRIPTOR_DIM`, `_BACKBONE_LABEL`) are module-scoped and intentionally per-strategy.
- **Cross-cutting concerns not locally re-implemented**: The new strategies import `iso_ts_from_clock` from `gps_denied_onboard.helpers.iso_timestamps` — they do NOT re-introduce a local `_iso_ts_from_clock` body (verified by `test_ac4_az526_no_module_level_iso_ts_from_clock_outside_helper` continuing to pass post-AZ-339). **PASS.** AZ-526's regression guard worked exactly as designed.
## Pre-existing failure noted (not blocking)
`tests/unit/c12_operator_orchestrator/test_cli_console_script.py::TestConsoleScript::test_cold_start_under_500ms_p99` — fails on this dev laptop with a `subprocess.TimeoutExpired` after 5 seconds when running `operator-orchestrator --help`. Confirmed pre-existing by stashing the AZ-339 changes, running the test against the prior commit `5dfd9a5` (AZ-526), and observing the same failure. Cold-start latency depends on local Python interpreter startup + import time and is unrelated to this batch. Not blocking; logged here for traceability.
## Verdict Rationale
One Medium finding (F1: `_assert_engine_output_dim` 4-way duplication, planned for AZ-527) and one Low finding (F2: AC-10 wording drift, mirroring established AZ-337 / AZ-338 precedent). No Critical, no High. Verdict: **PASS_WITH_WARNINGS**.
+1 -1
View File
@@ -12,5 +12,5 @@ sub_step:
retry_count: 0 retry_count: 0
cycle: 1 cycle: 1
tracker: jira tracker: jira
last_completed_batch: 49 last_completed_batch: 50
last_cumulative_review: batches_46-48 last_cumulative_review: batches_46-48
@@ -0,0 +1,199 @@
"""MegaLoc backbone preprocessor (AZ-339).
MegaLoc's published preprocessing chain (per the research code drop):
decode the nav-camera frame's image to RGB uint8, centre-crop to a
square region respecting the camera calibration's principal point (or
geometric centre + WARN log when calibration is absent), resize to
``(322, 322)``, apply ImageNet mean/std normalisation, cast to FP16,
reshape to NCHW.
Differences from :class:`UltraVprBackbonePreprocessor`:
- 322x322 input shape (vs UltraVPR's 384x384, MixVPR's 320x320).
- Same calibration-aware centre-crop and ImageNet mean/std — these
upstream conventions happen to align with UltraVPR but are NOT a
shared dependency: the centre-crop logic is duplicated here per
``components/02_c2_vpr/description.md`` § 6 so a future MegaLoc
code drop can change its preprocessing without coupling other
strategies' weights-versions.
This preprocessor is C2-internal and owned exclusively by
:class:`MegaLocStrategy` — sharing across backbones is forbidden per
``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"IMAGENET_MEAN",
"IMAGENET_STD",
"MEGA_LOC_INPUT_HW",
"MegaLocBackbonePreprocessor",
]
MEGA_LOC_INPUT_HW: Final[tuple[int, int]] = (322, 322)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class MegaLocBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = MEGA_LOC_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"MegaLocBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"MegaLocBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"MegaLocBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.mega_loc")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Calibration handling mirrors UltraVPR (description.md § 6 — same
upstream convention, duplicated not shared): when calibration is
absent or its principal point cannot be extracted from
``intrinsics_3x3``, fall back to the image's geometric centre
and emit ONE WARN log per call with
``kind="c2.vpr.calibration_missing"``.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"MegaLoc calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,200 @@
"""MixVPR backbone preprocessor (AZ-339).
MixVPR's published preprocessing chain (per the research code drop):
decode the nav-camera frame's image to RGB uint8, centre-crop to a
square region respecting the camera calibration's principal point (or
geometric centre + WARN log when calibration is absent), resize to
``(320, 320)``, apply ImageNet mean/std normalisation, cast to FP16,
reshape to NCHW.
Differences from :class:`MegaLocBackbonePreprocessor` /
:class:`UltraVprBackbonePreprocessor`:
- 320x320 input shape (vs MegaLoc's 322x322, UltraVPR's 384x384).
- Same calibration-aware centre-crop and ImageNet mean/std — these
upstream conventions happen to align with UltraVPR / MegaLoc but
are NOT a shared dependency: the centre-crop logic is duplicated
here per ``components/02_c2_vpr/description.md`` § 6 so a future
MixVPR code drop can change its preprocessing without coupling
other strategies' weights-versions.
This preprocessor is C2-internal and owned exclusively by
:class:`MixVprStrategy` — sharing across backbones is forbidden per
``components/02_c2_vpr/description.md`` § 6.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
import cv2
import numpy as np
from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"IMAGENET_MEAN",
"IMAGENET_STD",
"MIX_VPR_INPUT_HW",
"MixVprBackbonePreprocessor",
]
MIX_VPR_INPUT_HW: Final[tuple[int, int]] = (320, 320)
IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406)
IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225)
_COMPONENT: Final[str] = "c2_vpr"
_LOG_KIND_CALIBRATION_MISSING: Final[str] = "c2.vpr.calibration_missing"
class MixVprBackbonePreprocessor:
"""Centre-crop (principal-point-aware) + resize + ImageNet-normalise + FP16 NCHW."""
def __init__(
self,
*,
input_shape: tuple[int, int] = MIX_VPR_INPUT_HW,
mean: tuple[float, float, float] = IMAGENET_MEAN,
std: tuple[float, float, float] = IMAGENET_STD,
logger: logging.Logger | None = None,
) -> None:
if (
not isinstance(input_shape, tuple)
or len(input_shape) != 2
or any(not isinstance(v, int) or v <= 0 for v in input_shape)
):
raise ValueError(
f"MixVprBackbonePreprocessor.input_shape must be a (H, W) "
f"tuple of positive ints; got {input_shape!r}"
)
if len(mean) != 3 or len(std) != 3:
raise ValueError(
"MixVprBackbonePreprocessor.mean and std must each be "
"3-tuples (one per channel)"
)
if any(v <= 0 for v in std):
raise ValueError(
"MixVprBackbonePreprocessor.std components must be > 0"
)
self._input_shape: tuple[int, int] = input_shape
self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3)
self._logger: logging.Logger = (
logger
if logger is not None
else logging.getLogger("gps_denied_onboard.c2_vpr.mix_vpr")
)
def preprocess(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> np.ndarray:
"""Decode -> centre-crop (principal-point-aware) -> resize -> normalise -> FP16 NCHW.
Calibration handling mirrors UltraVPR (description.md § 6 — same
upstream convention, duplicated not shared): when calibration is
absent or its principal point cannot be extracted from
``intrinsics_3x3``, fall back to the image's geometric centre
and emit ONE WARN log per call with
``kind="c2.vpr.calibration_missing"``.
"""
image = self._coerce_to_rgb_uint8(frame.image)
cropped = self._centre_crop_around_principal_point(
image, calibration, frame_id=frame.frame_id
)
target_h, target_w = self._input_shape
in_h, in_w = cropped.shape[:2]
interp = (
cv2.INTER_AREA
if (in_h > target_h or in_w > target_w)
else cv2.INTER_CUBIC
)
try:
resized = cv2.resize(
cropped, (target_w, target_h), interpolation=interp
)
except cv2.error as exc:
raise VprPreprocessError(
f"cv2.resize failed: {type(exc).__name__}: {exc}"
) from exc
as_f32 = resized.astype(np.float32) / 255.0
normalised = (as_f32 - self._mean) / self._std
chw = normalised.transpose(2, 0, 1)
return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16)
def input_shape(self) -> tuple[int, int]:
return self._input_shape
@staticmethod
def _coerce_to_rgb_uint8(image: object) -> np.ndarray:
if not isinstance(image, np.ndarray):
raise VprPreprocessError(
f"frame.image must be a numpy array; got {type(image).__name__}"
)
if image.dtype != np.uint8:
raise VprPreprocessError(
f"frame.image must be uint8 RGB; got dtype {image.dtype}"
)
if image.ndim == 2:
return np.stack([image, image, image], axis=-1)
if image.ndim == 3 and image.shape[2] == 3:
return image
raise VprPreprocessError(
f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}"
)
def _centre_crop_around_principal_point(
self,
image: np.ndarray,
calibration: CameraCalibration | None,
*,
frame_id: int,
) -> np.ndarray:
h, w = image.shape[:2]
side = min(h, w)
cx_cy = self._extract_principal_point(calibration)
if cx_cy is None:
self._logger.warning(
"MixVPR calibration unusable; centre-cropping around "
"geometric centre",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_CALIBRATION_MISSING,
"kv": {"frame_id": int(frame_id)},
},
)
cx = w / 2.0
cy = h / 2.0
else:
cx, cy = cx_cy
half = side // 2
left = round(max(0.0, min(float(w - side), cx - half)))
top = round(max(0.0, min(float(h - side), cy - half)))
return image[top : top + side, left : left + side, :]
@staticmethod
def _extract_principal_point(
calibration: CameraCalibration | None,
) -> tuple[float, float] | None:
if calibration is None:
return None
intrinsics = getattr(calibration, "intrinsics_3x3", None)
if intrinsics is None:
return None
try:
arr = np.asarray(intrinsics, dtype=np.float64)
except (TypeError, ValueError):
return None
if arr.shape != (3, 3):
return None
cx = float(arr[0, 2])
cy = float(arr[1, 2])
if cx == 0.0 and cy == 0.0:
return None
return cx, cy
@@ -0,0 +1,451 @@
"""``MegaLocStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-339).
MegaLoc is one of two secondary backbones (alongside :class:`MixVprStrategy`)
shipped exclusively in the research binary for the IT-12 comparative-study
matrix (``components/02_c2_vpr/description.md`` § 1 + § 5). Per ADR-002,
``BUILD_VPR_MEGALOC`` is ON for the research binary and replay-cli, OFF
for the airborne and operator-tooling binaries — selecting ``mega_loc``
on a binary without the flag fails fast at composition-root time via
:class:`StrategyNotAvailableError` (not at first frame).
The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime
fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507).
Engine output key is ``"embedding"`` and the strategy applies single-stage
global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval
delegates to :class:`FaissBridge` (AZ-341).
Architecture-registry differences from :class:`NetVladStrategy`:
MegaLoc consumes a pre-compiled ``.trt`` engine produced by C10's engine
compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so
the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``.
:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture`
no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not after
takeoff.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 322, 322)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 2048) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged.
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_mega_loc import (
MegaLocBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock as _iso_ts_from_clock,
)
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "MegaLocStrategy", "create"]
# MegaLoc's published embedding dimension (D=2048) per the upstream
# research code drop. Engine output shape is asserted at create() time
# against this constant — changing it would silently break AC-2 /
# AC-4 / AC-5 / AC-6.
DESCRIPTOR_DIM: Final[int] = 2048
_BACKBONE_LABEL: Final[Literal["mega_loc"]] = "mega_loc"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class MegaLocStrategy:
"""C2 secondary VprStrategy backed by a TRT MegaLoc engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: MegaLocBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"MegaLocStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"MegaLoc forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"MegaLoc forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"MegaLoc forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"MegaLoc backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"MegaLoc preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> MegaLocStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
MegaLoc is unselectable when the C7 TRT / ONNX-RT runtimes are
excluded — ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time.
Engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound (AC-6).
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"MegaLoc requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"MegaLocStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.mega_loc")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = MegaLocBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return MegaLocStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: MegaLocBackbonePreprocessor,
) -> None:
# The 4-way duplication of this helper (ultra_vpr / net_vlad /
# mega_loc / mix_vpr) will be consolidated by AZ-527 (hygiene
# PBI sized in parallel with AZ-339 land). The duplication is
# intentional for now: extracting earlier would expand AZ-339's
# scope past the two new strategies.
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
@@ -0,0 +1,454 @@
"""``MixVprStrategy`` — C2 secondary VprStrategy for IT-12 (AZ-339).
MixVPR is the second of two secondary backbones (alongside
:class:`MegaLocStrategy`) shipped exclusively in the research binary
for the IT-12 comparative-study matrix (``components/02_c2_vpr/
description.md`` § 1 + § 5). Per ADR-002, ``BUILD_VPR_MIXVPR`` is ON
for the research binary and replay-cli, OFF for the airborne and
operator-tooling binaries — selecting ``mix_vpr`` on a binary without
the flag fails fast at composition-root time via
:class:`StrategyNotAvailableError` (not at first frame).
The strategy runs on the C7 TensorRT runtime (AZ-298), or the ONNX-Runtime
fallback (AZ-299), via the local :class:`InferenceRuntimeCut` (AZ-507).
Engine output key is ``"embedding"`` and the strategy applies single-stage
global L2 normalisation (no NetVLAD-style intra-cluster step). Retrieval
delegates to :class:`FaissBridge` (AZ-341).
Architecture-registry differences from :class:`NetVladStrategy`:
MixVPR consumes a pre-compiled ``.trt`` engine produced by C10's engine
compiler (AZ-321) — there is no PyTorch ``nn.Module`` to register, so
the module does NOT expose ``MODEL_NAME`` / ``architecture_factory``.
:func:`gps_denied_onboard.runtime_root.vpr_factory._register_strategy_architecture`
no-ops for this strategy.
Engine load happens in :func:`create` (NOT at first frame) so the
engine-output-shape assertion (AC-6) surfaces at startup, not after
takeoff.
Per-frame :meth:`embed_query` pipeline:
1. ``preprocessor.preprocess(frame, calibration)`` ->
``(1, 3, 320, 320)`` FP16 NCHW ndarray.
2. ``inference_runtime.infer(handle, {"input": tensor})`` ->
``{"embedding": (1, 4096) FP16 ndarray}``.
3. ``normaliser.l2_normalise(raw[0])`` -> global L2 (single-stage).
4. Return :class:`VprQuery` with ``frame_id``, normalised embedding,
produced_at monotonic ns.
Error envelope: every method raises only members of :class:`VprError`.
``RuntimeError`` from the backbone forward -> rewrapped to
:class:`VprBackboneError`; :class:`VprPreprocessError` from the
preprocessor propagates unchanged.
Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`;
see AZ-341 AC-10.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final, Literal
import numpy as np
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_mix_vpr import (
MixVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.config.schema import ConfigError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock as _iso_ts_from_clock,
)
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.config.schema import Config
__all__ = ["DESCRIPTOR_DIM", "MixVprStrategy", "create"]
# MixVPR's published embedding dimension (D=4096) per the upstream
# research code drop. The 4096-d output is the largest VPR descriptor
# the project carries; the matching FAISS HNSW corpus has correspondingly
# higher RAM cost (researchers must rebuild the corpus when swapping
# between MixVPR and any non-4096 backbone — see AZ-336 pre-flight
# dim-mismatch check). Engine output shape is asserted at create() time.
DESCRIPTOR_DIM: Final[int] = 4096
_BACKBONE_LABEL: Final[Literal["mix_vpr"]] = "mix_vpr"
_COMPONENT: Final[str] = "c2_vpr"
_OUTPUT_KEY: Final[str] = "embedding"
_ENGINE_INPUT_KEY: Final[str] = "input"
_ALLOWED_RUNTIME_LABELS: Final[frozenset[str]] = frozenset(
{"tensorrt", "onnx_trt_ep"}
)
_LOG_KIND_READY: Final[str] = "c2.vpr.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error"
_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
_FDR_KIND_EMBED: Final[str] = "vpr.embed_query"
_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error"
_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error"
class MixVprStrategy:
"""C2 secondary VprStrategy backed by a TRT MixVPR engine.
See module docstring for the engine-loading + per-frame pipeline.
Stateless across frames (INV-2); single-threaded per instance
(INV-1, per AZ-336).
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: EngineHandle,
descriptor_index: DescriptorIndexCut,
preprocessor: MixVprBackbonePreprocessor,
normaliser: DescriptorNormaliser,
faiss_bridge: FaissBridge,
fdr_client: FdrClient,
clock: Clock,
logger: logging.Logger,
descriptor_dim: int = DESCRIPTOR_DIM,
) -> None:
if descriptor_dim < 1:
raise ValueError(
f"MixVprStrategy.descriptor_dim must be >= 1; "
f"got {descriptor_dim}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._descriptor_index = descriptor_index
self._preprocessor = preprocessor
self._normaliser = normaliser
self._faiss_bridge = faiss_bridge
self._fdr_client = fdr_client
self._clock = clock
self._logger = logger
self._descriptor_dim = descriptor_dim
def embed_query(
self,
frame: NavCameraFrame,
calibration: CameraCalibration,
) -> VprQuery:
try:
tensor = self._preprocessor.preprocess(frame, calibration)
except VprPreprocessError as exc:
self._emit_preprocess_error(frame, exc)
raise
ns_start = self._clock.monotonic_ns()
try:
outputs = self._inference_runtime.infer(
self._engine_handle, {_ENGINE_INPUT_KEY: tensor}
)
except Exception as exc:
wrapped = self._wrap_backbone_error(frame, exc)
raise wrapped from exc
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
if _OUTPUT_KEY not in outputs:
err = VprBackboneError(
f"MixVPR forward returned no {_OUTPUT_KEY!r} key; "
f"got {sorted(outputs.keys())!r}"
)
self._emit_backbone_error(frame, err)
raise err
raw = np.asarray(outputs[_OUTPUT_KEY])
if (
raw.ndim != 2
or raw.shape[0] != 1
or raw.shape[1] != self._descriptor_dim
):
err = VprBackboneError(
f"MixVPR forward returned shape {raw.shape}; "
f"expected (1, {self._descriptor_dim})"
)
self._emit_backbone_error(frame, err)
raise err
flat = np.ascontiguousarray(raw[0], dtype=np.float16)
normalised = self._normaliser.l2_normalise(flat)
self._emit_embed_record(
frame_id=int(frame.frame_id), latency_us=int(latency_us)
)
return VprQuery(
frame_id=int(frame.frame_id),
embedding=normalised,
produced_at=ns_end,
)
def retrieve_topk(self, query: VprQuery, k: int) -> VprResult:
return self._faiss_bridge.retrieve(
query, k, backbone_label=_BACKBONE_LABEL
)
def descriptor_dim(self) -> int:
return self._descriptor_dim
def _wrap_backbone_error(
self, frame: NavCameraFrame, exc: BaseException
) -> VprBackboneError:
wrapped = VprBackboneError(
f"MixVPR forward raised {type(exc).__name__}: {exc}"
)
self._emit_backbone_error(frame, wrapped)
return wrapped
def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_EMBED,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"descriptor_dim": self._descriptor_dim,
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.embed_query record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
},
},
)
def _emit_backbone_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"MixVPR backbone error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_BACKBONE_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _emit_preprocess_error(
self, frame: NavCameraFrame, error: BaseException
) -> None:
frame_id = int(frame.frame_id)
msg = f"MixVPR preprocess error: {error}"
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PREPROCESS_ERROR,
"kv": {
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
},
},
)
self._fdr_client.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_from_clock(self._clock),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_PREPROCESS_ERROR,
payload={
"frame_id": frame_id,
"backbone_label": _BACKBONE_LABEL,
"error_type": type(error).__name__,
"error_message": str(error)[:512],
},
)
)
def _build_trt_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=0,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: Config,
*,
descriptor_index: DescriptorIndexCut,
inference_runtime: InferenceRuntimeCut,
fdr_client: FdrClient | None = None,
clock: Clock | None = None,
logger: logging.Logger | None = None,
) -> MixVprStrategy:
"""Module-level factory consumed by :func:`build_vpr_strategy`.
MixVPR is unselectable when the C7 TRT / ONNX-RT runtimes are
excluded — ``current_runtime_label()`` MUST be one of
``{"tensorrt", "onnx_trt_ep"}``; ``"pytorch_fp16"`` is rejected
with :class:`ConfigError` at composition time.
Engine output shape is asserted at create time via a single
dry-run inference on a zero-init input; mismatch raises
:class:`ConfigError` BEFORE the strategy is bound (AC-6).
Optional keyword-only injection points (``fdr_client`` / ``clock`` /
``logger``) keep tests deterministic; production wiring fills them
from the composition root.
"""
runtime_label = inference_runtime.current_runtime_label()
if runtime_label not in _ALLOWED_RUNTIME_LABELS:
raise ConfigError(
f"MixVPR requires BUILD_TENSORRT_RUNTIME=ON (or "
f"BUILD_ONNX_TRT_EP_RUNTIME=ON as fallback); this binary "
f"has runtime_label={runtime_label!r}."
)
block = config.components["c2_vpr"]
weights_path = block.backbone_weights_path
if fdr_client is None:
raise ValueError(
"MixVprStrategy.create: fdr_client is required; the "
"composition root must inject the running FDR client."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c2_vpr.mix_vpr")
entry = inference_runtime.compile_engine(
weights_path, _build_trt_build_config()
)
handle = inference_runtime.deserialize_engine(entry)
preprocessor = MixVprBackbonePreprocessor(logger=logger)
normaliser = DescriptorNormaliser()
faiss_bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=DESCRIPTOR_DIM,
warn_top1_threshold=block.warn_top1_threshold,
debug_log_per_frame_distances=block.debug_per_frame_distances,
fdr_client=fdr_client,
logger=logger,
clock=clock,
)
_assert_engine_output_dim(inference_runtime, handle, preprocessor)
logger.info(
"C2 VPR strategy ready",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _BACKBONE_LABEL,
"descriptor_dim": DESCRIPTOR_DIM,
},
},
)
return MixVprStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=faiss_bridge,
fdr_client=fdr_client,
clock=clock,
logger=logger,
descriptor_dim=DESCRIPTOR_DIM,
)
def _assert_engine_output_dim(
inference_runtime: InferenceRuntimeCut,
handle: EngineHandle,
preprocessor: MixVprBackbonePreprocessor,
) -> None:
# The 4-way duplication of this helper (ultra_vpr / net_vlad /
# mega_loc / mix_vpr) will be consolidated by AZ-527 (hygiene
# PBI sized in parallel with AZ-339 land). The duplication is
# intentional for now: extracting earlier would expand AZ-339's
# scope past the two new strategies.
h, w = preprocessor.input_shape()
probe = np.zeros((1, 3, h, w), dtype=np.float16)
outputs = inference_runtime.infer(handle, {_ENGINE_INPUT_KEY: probe})
if _OUTPUT_KEY not in outputs:
raise ConfigError(
f"engine output shape mismatch: {_OUTPUT_KEY!r} key absent; "
f"got keys {sorted(outputs.keys())!r}"
)
actual = np.asarray(outputs[_OUTPUT_KEY])
if (
actual.ndim != 2
or actual.shape[0] != 1
or actual.shape[1] != DESCRIPTOR_DIM
):
raise ConfigError(
f"engine output shape mismatch: expected (1, {DESCRIPTOR_DIM}), "
f"got {tuple(actual.shape)}"
)
@@ -0,0 +1,811 @@
"""AZ-339 — MegaLoc + MixVPR secondary VprStrategy unit tests.
Covers AC-1..AC-11 for both strategies. Parametrised across the two
strategies so the test surface stays compact (one test per AC times
two strategies) and any drift between the two implementations is
visible at the assertion level.
Uses fakes for :class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`,
and :class:`FdrClient` so the suite stays AZ-507-clean and TRT-free
(mirrors the precedent in ``test_ultra_vpr.py``).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
from unittest.mock import MagicMock
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineCacheEntry,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.components.c2_vpr import (
C2VprConfig,
VprStrategy,
)
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor_mega_loc import (
MegaLocBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr._preprocessor_mix_vpr import (
MixVprBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.mega_loc import (
DESCRIPTOR_DIM as MEGA_LOC_DIM,
)
from gps_denied_onboard.components.c2_vpr.mega_loc import (
MegaLocStrategy,
)
from gps_denied_onboard.components.c2_vpr.mega_loc import (
create as create_mega_loc,
)
from gps_denied_onboard.components.c2_vpr.mix_vpr import (
DESCRIPTOR_DIM as MIX_VPR_DIM,
)
from gps_denied_onboard.components.c2_vpr.mix_vpr import (
MixVprStrategy,
)
from gps_denied_onboard.components.c2_vpr.mix_vpr import (
create as create_mix_vpr,
)
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.fdr_client import FdrClient
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
# ---------------------------------------------------------------------------
# Parametrisation: each strategy + its bound constants
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class _StrategySpec:
name: str
strategy_cls: type
create_fn: Any
preprocessor_cls: type
descriptor_dim: int
backbone_label: str
input_hw: tuple[int, int]
_SPECS: list[_StrategySpec] = [
_StrategySpec(
name="mega_loc",
strategy_cls=MegaLocStrategy,
create_fn=create_mega_loc,
preprocessor_cls=MegaLocBackbonePreprocessor,
descriptor_dim=MEGA_LOC_DIM,
backbone_label="mega_loc",
input_hw=(322, 322),
),
_StrategySpec(
name="mix_vpr",
strategy_cls=MixVprStrategy,
create_fn=create_mix_vpr,
preprocessor_cls=MixVprBackbonePreprocessor,
descriptor_dim=MIX_VPR_DIM,
backbone_label="mix_vpr",
input_hw=(320, 320),
),
]
@pytest.fixture(params=_SPECS, ids=[s.name for s in _SPECS])
def spec(request: pytest.FixtureRequest) -> _StrategySpec:
return request.param
# ---------------------------------------------------------------------------
# Fakes (mirrors test_ultra_vpr.py shape)
# ---------------------------------------------------------------------------
@dataclass
class _StubClock:
next_monotonic_ns: int = 1_000_000_000
step_ns: int = 5_000
fixed_time_ns: int = 1_715_600_000_000_000_000
def monotonic_ns(self) -> int:
v = self.next_monotonic_ns
self.next_monotonic_ns += self.step_ns
return v
def time_ns(self) -> int:
return self.fixed_time_ns
def sleep_until_ns(self, target_ns: int) -> None:
_ = target_ns
class _FakeEngineHandle(EngineHandle):
def __init__(self, label: str) -> None:
self.label = label
@dataclass
class _FakeInferenceRuntime:
descriptor_dim: int = 2048
raises: BaseException | None = None
runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = (
"tensorrt"
)
fixed_output: np.ndarray | None = None
output_key: str = "embedding"
calls: list[dict[str, np.ndarray]] = field(default_factory=list)
deserialize_calls: list[EngineCacheEntry] = field(default_factory=list)
model_name: str = "mega_loc"
def compile_engine(
self, model_path: Path, build_config: BuildConfig
) -> EngineCacheEntry:
_ = build_config
return EngineCacheEntry(
engine_path=Path(model_path),
sha256_hex="0" * 64,
sm=None,
jp=None,
trt=None,
precision=PrecisionMode.FP16,
extras={"model_name": self.model_name},
)
def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle:
self.deserialize_calls.append(entry)
return _FakeEngineHandle(label=entry.extras.get("model_name", ""))
def infer(
self, handle: EngineHandle, inputs: dict[str, np.ndarray]
) -> dict[str, np.ndarray]:
_ = handle
self.calls.append({k: v.copy() for k, v in inputs.items()})
if self.raises is not None:
raise self.raises
if self.fixed_output is not None:
return {self.output_key: self.fixed_output.copy()}
rng = np.random.default_rng(0xCAFEBABE)
tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16)
return {
self.output_key: tensor.reshape(1, self.descriptor_dim).copy()
}
def release_engine(self, handle: EngineHandle) -> None:
_ = handle
def current_runtime_label(
self,
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]:
return self.runtime_label
@dataclass
class _FakeDescriptorIndex:
descriptor_dim_value: int = 2048
results: list[tuple[tuple[int, float, float], float]] = field(
default_factory=list
)
raises: BaseException | None = None
def search_topk(
self, query: np.ndarray, k: int
) -> list[tuple[tuple[int, float, float], float]]:
_ = query
if self.raises is not None:
raise self.raises
if not self.results:
return [
((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i)
for i in range(k)
]
return list(self.results[:k])
def descriptor_dim(self) -> int:
return self.descriptor_dim_value
# ---------------------------------------------------------------------------
# Fixture helpers
# ---------------------------------------------------------------------------
def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame:
rng = np.random.default_rng(frame_id)
image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8)
return NavCameraFrame(
frame_id=frame_id,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="test_cam",
)
def _make_calibration(*, cx: float = 640.0, cy: float = 360.0) -> CameraCalibration:
intrinsics = np.array(
[
[1000.0, 0.0, cx],
[0.0, 1000.0, cy],
[0.0, 0.0, 1.0],
],
dtype=np.float64,
)
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=intrinsics,
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="test_fixture",
)
def _make_fdr_client() -> FdrClient:
return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False)
def _build_strategy(
spec: _StrategySpec,
*,
inference_runtime: _FakeInferenceRuntime | None = None,
descriptor_index: _FakeDescriptorIndex | None = None,
preprocessor: Any = None,
fdr_client: FdrClient | None = None,
clock: _StubClock | None = None,
descriptor_dim: int | None = None,
) -> Any:
dim = spec.descriptor_dim if descriptor_dim is None else descriptor_dim
inference_runtime = inference_runtime or _FakeInferenceRuntime(
descriptor_dim=dim, model_name=spec.name
)
descriptor_index = descriptor_index or _FakeDescriptorIndex(
descriptor_dim_value=dim
)
preprocessor = preprocessor or spec.preprocessor_cls()
fdr_client = fdr_client or _make_fdr_client()
clock = clock or _StubClock()
handle = _FakeEngineHandle(label=spec.name)
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
return spec.strategy_cls(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=DescriptorNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=dim,
)
def _build_config(strategy_name: str) -> Config:
c2 = C2VprConfig(
strategy=strategy_name,
backbone_weights_path=Path(f"/models/{strategy_name}.trt"),
faiss_index_path=Path("/cache/vpr/index.faiss"),
warn_top1_threshold=0.30,
debug_per_frame_distances=False,
)
cfg = MagicMock(spec=Config)
cfg.components = {"c2_vpr": c2}
return cfg
# ---------------------------------------------------------------------------
# AC-1: Protocol conformance
# ---------------------------------------------------------------------------
def test_ac1_protocol_conformance(spec: _StrategySpec) -> None:
strategy = _build_strategy(spec)
assert isinstance(strategy, VprStrategy)
# ---------------------------------------------------------------------------
# AC-2: embed_query → L2-normalised FP16 embedding of correct dim
# ---------------------------------------------------------------------------
def test_ac2_embed_query_returns_unit_norm_fp16_correct_dim(
spec: _StrategySpec,
) -> None:
# Arrange
strategy = _build_strategy(spec)
frame = _make_frame()
calibration = _make_calibration()
# Act
query = strategy.embed_query(frame, calibration)
# Assert
embedding = np.asarray(query.embedding)
assert embedding.shape == (spec.descriptor_dim,)
assert embedding.dtype == np.float16
norm = float(np.linalg.norm(embedding.astype(np.float32)))
assert norm == pytest.approx(1.0, abs=1e-3)
def test_ac2_single_stage_l2_no_intra_cluster_call(
spec: _StrategySpec,
) -> None:
"""Secondary backbones use single-stage L2 (no NetVLAD-style intra-cluster step)."""
# Arrange
calls: list[str] = []
class _SpyNormaliser(DescriptorNormaliser):
def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override]
calls.append("l2_normalise")
return DescriptorNormaliser.l2_normalise(descriptor)
def intra_cluster_normalise( # type: ignore[override]
self, descriptor: np.ndarray, num_clusters: int
) -> np.ndarray:
calls.append("intra_cluster_normalise")
return DescriptorNormaliser.intra_cluster_normalise(
descriptor, num_clusters
)
# Build manually to inject the spy
inference_runtime = _FakeInferenceRuntime(descriptor_dim=spec.descriptor_dim)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
fdr_client = _make_fdr_client()
clock = _StubClock()
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=spec.descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
strategy = spec.strategy_cls(
inference_runtime=inference_runtime,
engine_handle=_FakeEngineHandle(spec.name),
descriptor_index=descriptor_index,
preprocessor=spec.preprocessor_cls(),
normaliser=_SpyNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=spec.descriptor_dim,
)
# Act
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
assert "intra_cluster_normalise" not in calls
assert calls == ["l2_normalise"]
# ---------------------------------------------------------------------------
# AC-3: deterministic embeddings
# ---------------------------------------------------------------------------
def test_ac3_embed_query_deterministic_for_same_frame(
spec: _StrategySpec,
) -> None:
# Arrange
rng = np.random.default_rng(2026)
fixed = rng.standard_normal(spec.descriptor_dim).astype(np.float16)
fixed = fixed.reshape(1, spec.descriptor_dim)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, fixed_output=fixed
)
strategy = _build_strategy(spec, inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
# Act
first = strategy.embed_query(frame, calibration)
second = strategy.embed_query(frame, calibration)
third = strategy.embed_query(frame, calibration)
# Assert
np.testing.assert_array_equal(
np.asarray(first.embedding), np.asarray(second.embedding)
)
np.testing.assert_array_equal(
np.asarray(second.embedding), np.asarray(third.embedding)
)
# ---------------------------------------------------------------------------
# AC-4: retrieve_topk returns k candidates with correct backbone_label
# ---------------------------------------------------------------------------
def test_ac4_retrieve_topk_returns_exactly_k_with_correct_label(
spec: _StrategySpec,
) -> None:
# Arrange
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
strategy = _build_strategy(spec, descriptor_index=descriptor_index)
# Act
query = strategy.embed_query(_make_frame(), _make_calibration())
result = strategy.retrieve_topk(query, k=10)
# Assert
assert len(result.candidates) == 10
assert result.backbone_label == spec.backbone_label
assert result.candidates[0].descriptor_dim == spec.descriptor_dim
distances = [c.descriptor_distance for c in result.candidates]
assert distances == sorted(distances)
# ---------------------------------------------------------------------------
# AC-5: descriptor_dim() is stable
# ---------------------------------------------------------------------------
def test_ac5_descriptor_dim_stable(spec: _StrategySpec) -> None:
# Arrange
strategy = _build_strategy(spec)
# Act / Assert
for _ in range(100):
assert strategy.descriptor_dim() == spec.descriptor_dim
# ---------------------------------------------------------------------------
# AC-6: Engine output shape mismatch → ConfigError at create()
# ---------------------------------------------------------------------------
def test_ac6_create_rejects_engine_output_shape_mismatch(
spec: _StrategySpec,
) -> None:
# Arrange — engine produces (1, 100), expected (1, spec.descriptor_dim)
wrong = np.zeros((1, 100), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
fixed_output=wrong,
model_name=spec.name,
)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
# Act + Assert
with pytest.raises(ConfigError, match=r"engine output shape mismatch"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
def test_ac6_create_rejects_missing_embedding_key(
spec: _StrategySpec,
) -> None:
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
output_key="wrong_key",
model_name=spec.name,
)
# Act + Assert
with pytest.raises(ConfigError, match=r"'embedding' key absent"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-7: VprBackboneError on forward-pass failure
# ---------------------------------------------------------------------------
def test_ac7_runtime_error_yields_vpr_backbone_error(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, raises=RuntimeError("CUDA OOM")
)
fdr_client = _make_fdr_client()
strategy = _build_strategy(
spec, inference_runtime=runtime, fdr_client=fdr_client
)
# Act
with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"):
with pytest.raises(VprBackboneError):
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.backbone_error"
for record in caplog.records
)
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"]
assert len(backbone_errors) == 1
def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error(
spec: _StrategySpec,
) -> None:
# Arrange
bad = np.zeros((1, 100), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, fixed_output=bad
)
strategy = _build_strategy(spec, inference_runtime=runtime)
# Act + Assert
with pytest.raises(
VprBackboneError, match=rf"expected \(1, {spec.descriptor_dim}\)"
):
strategy.embed_query(_make_frame(), _make_calibration())
# ---------------------------------------------------------------------------
# AC-8: VprPreprocessError on corrupt image bytes
# ---------------------------------------------------------------------------
def test_ac8_corrupt_image_yields_vpr_preprocess_error(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
fdr_client = _make_fdr_client()
strategy = _build_strategy(spec, fdr_client=fdr_client)
frame = NavCameraFrame(
frame_id=4242,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image="not-an-array",
camera_calibration_id="test_cam",
)
# Act
with caplog.at_level(logging.ERROR, logger=f"test.{spec.name}"):
with pytest.raises(VprPreprocessError):
strategy.embed_query(frame, _make_calibration())
# Assert
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.preprocess_error"
for record in caplog.records
)
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
preprocess_errors = [
r for r in records if r.kind == "vpr.preprocess_error"
]
assert len(preprocess_errors) == 1
# ---------------------------------------------------------------------------
# AC-9: Composition-root wiring + INFO "c2.vpr.ready" log emitted
# ---------------------------------------------------------------------------
def test_ac9_create_emits_ready_log_with_correct_label_and_dim(
spec: _StrategySpec, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
logger_name = f"gps_denied_onboard.c2_vpr.{spec.name}"
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
)
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
# Act
with caplog.at_level(logging.INFO, logger=logger_name):
strategy = spec.create_fn(
_build_config(spec.name),
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# Assert
assert isinstance(strategy, spec.strategy_cls)
ready_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready"
]
assert len(ready_records) == 1
kv = getattr(ready_records[0], "kv", {})
assert kv == {"strategy": spec.backbone_label, "descriptor_dim": spec.descriptor_dim}
# ---------------------------------------------------------------------------
# AC-10: Build-flag exclusion → composition-time fail-fast
# ---------------------------------------------------------------------------
def test_ac10_runtime_label_mismatch_raises_config_error(
spec: _StrategySpec,
) -> None:
"""Selecting a secondary backbone on a binary built without the
TRT / ONNX-RT runtimes fails fast at create-time.
Note: AC-10 of the task spec literally names ``ConfigurationError``;
the existing factory contract (AZ-336) raises
``StrategyNotAvailableError`` via the BUILD_VPR_* env-flag check
BEFORE create() is reached, but the strategy module's own runtime
label guard surfaces a ``ConfigError`` for the same intent
(wrong runtime). Both are composition-time fail-fast errors.
"""
# Arrange
runtime = _FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim,
runtime_label="pytorch_fp16",
model_name=spec.name,
)
# Act + Assert
with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-11: Preprocessor input shape
# ---------------------------------------------------------------------------
def test_ac11_preprocessor_input_shape(spec: _StrategySpec) -> None:
# Arrange
preprocessor = spec.preprocessor_cls()
# Act + Assert
assert preprocessor.input_shape() == spec.input_hw
def test_preprocess_output_is_nchw_fp16(spec: _StrategySpec) -> None:
# Arrange
preprocessor = spec.preprocessor_cls()
frame = _make_frame()
calibration = _make_calibration()
# Act
tensor = preprocessor.preprocess(frame, calibration)
# Assert
h, w = spec.input_hw
assert tensor.shape == (1, 3, h, w)
assert tensor.dtype == np.float16
# ---------------------------------------------------------------------------
# Constructor validation
# ---------------------------------------------------------------------------
def test_constructor_rejects_zero_descriptor_dim(spec: _StrategySpec) -> None:
# Arrange (skip _build_strategy to bypass FaissBridge's own validation)
fdr_client = _make_fdr_client()
clock = _StubClock()
descriptor_index = _FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
)
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=spec.descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger(f"test.{spec.name}.bridge"),
clock=clock,
)
# Act + Assert
with pytest.raises(ValueError, match=r"descriptor_dim must be >= 1"):
spec.strategy_cls(
inference_runtime=_FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
),
engine_handle=_FakeEngineHandle(spec.name),
descriptor_index=descriptor_index,
preprocessor=spec.preprocessor_cls(),
normaliser=DescriptorNormaliser(),
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger(f"test.{spec.name}"),
descriptor_dim=0,
)
def test_create_requires_fdr_client(spec: _StrategySpec) -> None:
# Arrange + Act + Assert
with pytest.raises(ValueError, match=r"fdr_client is required"):
spec.create_fn(
_build_config(spec.name),
descriptor_index=_FakeDescriptorIndex(
descriptor_dim_value=spec.descriptor_dim
),
inference_runtime=_FakeInferenceRuntime(
descriptor_dim=spec.descriptor_dim, model_name=spec.name
),
fdr_client=None,
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# FDR emission on success path
# ---------------------------------------------------------------------------
def test_embed_query_emits_fdr_record(spec: _StrategySpec) -> None:
# Arrange
fdr_client = _make_fdr_client()
strategy = _build_strategy(spec, fdr_client=fdr_client)
# Act
strategy.embed_query(_make_frame(), _make_calibration())
# Assert
records: list[Any] = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
embed = [r for r in records if r.kind == "vpr.embed_query"]
assert len(embed) == 1
payload = embed[0].payload
assert payload["backbone_label"] == spec.backbone_label
assert payload["descriptor_dim"] == spec.descriptor_dim
assert payload["latency_us"] >= 1