From af0dbe863ab8562235fbf153f17acf7cd283ffdb Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 13 May 2026 22:30:29 +0300 Subject: [PATCH] [AZ-338] [AZ-283] C2 NetVLAD mandatory simple-baseline VprStrategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NetVLAD is the C2 comparative baseline per the engine rule (every production-default backbone ships with a simple-baseline alongside). Runs on the C7 PyTorch FP16 runtime (NOT TRT) so a TRT engine compile bug cannot simultaneously break NetVLAD AND UltraVPR. Production changes: - c2_vpr/net_vlad.py — NetVladStrategy + module-level create() factory. Constructor wires InferenceRuntimeCut + DescriptorIndexCut + NetVladBackbonePreprocessor + DescriptorNormaliser + FaissBridge. embed_query pipeline: preprocess -> runtime.infer -> dual-stage normalisation (intra-cluster THEN global L2) -> VprQuery. retrieve_topk delegates one-line to FaissBridge. - c2_vpr/_net_vlad_architecture.py — Arandjelovic et al. 2016 NetVLAD layer over torchvision VGG16 features + optional Linear PCA projection to descriptor_dim (default 4096; published Pittsburgh reference uses K*D=64*512=32768 raw + Linear(32768, 4096) PCA). - c2_vpr/_preprocessor_net_vlad.py — OpenCV-based image preprocessor: decode -> centre-crop square -> resize (480, 480) -> ImageNet normalisation -> FP16 NCHW. Calibration is not consumed (NetVLAD is calibration-agnostic per published preprocessing chain). - c2_vpr/inference_runtime_cut.py — NEW AZ-507 consumer-side cut mirroring C7 InferenceRuntime; lets c2_vpr stay AZ-507-clean. - c2_vpr/config.py — added netvlad_descriptor_dim: int = 4096 knob. - helpers/descriptor_normaliser.py — added intra_cluster_normalise (DescriptorNormaliser v1.0.0 -> v1.1.0; backward-compatible add). - runtime_root/vpr_factory.py — added _register_strategy_architecture helper that binds (MODEL_NAME, architecture_factory(descriptor_dim)) to C7's architecture registry before delegating to the strategy's create() factory. Keeps the c7 import at L4, preserves AZ-507. - fdr_client/records.py — registered vpr.embed_query, vpr.backbone_error, vpr.preprocess_error record kinds. Tests: - tests/unit/c2_vpr/test_net_vlad.py — 31 tests covering all 11 ACs + preprocessor contract + architecture factory + constructor validation + FDR record emission. - tests/unit/test_az283_descriptor_normaliser.py — +8 tests for the new intra_cluster_normalise. - tests/unit/test_az272_fdr_record_schema.py — +3 fixture payloads. Full unit suite: 1608 passed / 80 env-skipped (+43 new tests). Per-batch code review (batch_46_review.md): PASS_WITH_WARNINGS (4 Low-severity hygiene findings; no Critical/High/Medium). Architectural notes: - The spec implied c2_vpr.net_vlad.create() registers the architecture with C7. That violates AZ-507 (no cross-component imports). Resolved by exposing MODEL_NAME + architecture_factory(descriptor_dim) on the strategy module and having the composition root perform the C7 bind. - C7 PyTorch runtime API names in the spec (forward, load_engine) were outdated; aligned implementation with the live v1.0.0 Protocol (infer, compile_engine + deserialize_engine). Spec hygiene flagged in review F2. Co-authored-by: Cursor --- .../shared_helpers/descriptor_normaliser.md | 14 +- .../reviews/batch_46_review.md | 265 ++++++ _docs/_autodev_state.md | 2 +- .../components/c2_vpr/__init__.py | 4 + .../c2_vpr/_net_vlad_architecture.py | 144 ++++ .../c2_vpr/_preprocessor_net_vlad.py | 137 +++ .../components/c2_vpr/config.py | 15 +- .../c2_vpr/inference_runtime_cut.py | 60 ++ .../components/c2_vpr/net_vlad.py | 521 ++++++++++++ src/gps_denied_onboard/fdr_client/records.py | 40 + .../helpers/descriptor_normaliser.py | 53 ++ .../runtime_root/vpr_factory.py | 59 +- tests/unit/c2_vpr/test_net_vlad.py | 805 ++++++++++++++++++ tests/unit/test_az272_fdr_record_schema.py | 21 + .../unit/test_az283_descriptor_normaliser.py | 77 ++ 15 files changed, 2200 insertions(+), 17 deletions(-) create mode 100644 _docs/03_implementation/reviews/batch_46_review.md create mode 100644 src/gps_denied_onboard/components/c2_vpr/_net_vlad_architecture.py create mode 100644 src/gps_denied_onboard/components/c2_vpr/_preprocessor_net_vlad.py create mode 100644 src/gps_denied_onboard/components/c2_vpr/inference_runtime_cut.py create mode 100644 src/gps_denied_onboard/components/c2_vpr/net_vlad.py create mode 100644 tests/unit/c2_vpr/test_net_vlad.py diff --git a/_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md b/_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md index 35abe38..10583f2 100644 --- a/_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md +++ b/_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md @@ -3,9 +3,9 @@ **Component**: shared_helpers / `helpers.descriptor_normaliser` (cross-cutting concern owned by E-CC-HELPERS / AZ-264) **Producer task**: AZ-283 — `_docs/02_tasks/todo/AZ-283_descriptor_normaliser.md` **Consumer tasks**: every C2 task that produces a query embedding before FAISS lookup; every C2.5 task that pre-processes descriptors for re-rank; every C3 task that pre-processes descriptors for cross-domain matching; every C10 task that builds the corpus side of the FAISS index during pre-flight provisioning -**Version**: 1.0.0 +**Version**: 1.1.0 **Status**: draft -**Last Updated**: 2026-05-10 +**Last Updated**: 2026-05-13 ## Purpose @@ -18,17 +18,20 @@ L2-normalise descriptors so cosine similarity aligns with FAISS's Euclidean / in ```python class DescriptorNormaliser: @staticmethod - def l2_normalise(descriptor: np.ndarray) -> np.ndarray: ... # shape (D,) + def l2_normalise(descriptor: np.ndarray) -> np.ndarray: ... # shape (D,) @staticmethod - def l2_normalise_batch(descriptors: np.ndarray) -> np.ndarray: ... # shape (N, D) + def l2_normalise_batch(descriptors: np.ndarray) -> np.ndarray: ... # shape (N, D) @staticmethod - def descriptor_metric() -> str: ... # always "inner_product" + def intra_cluster_normalise(descriptor: np.ndarray, num_clusters: int) -> np.ndarray: ... # shape (K*D,) — added in v1.1.0 + @staticmethod + def descriptor_metric() -> str: ... # always "inner_product" ``` | Name | Signature | Throws / Errors | Blocking? | |------|-----------|-----------------|-----------| | `l2_normalise` | `(descriptor: (D,)) -> (D,)` | `DescriptorNormaliserError` if shape is not 1-D, `D < 1`, or dtype is not `float16` / `float32` | sync, hot-path | | `l2_normalise_batch` | `(descriptors: (N, D)) -> (N, D)` | `DescriptorNormaliserError` if shape is not 2-D, `N < 1`, `D < 1`, or dtype is not `float16` / `float32` | sync, hot-path | +| `intra_cluster_normalise` | `(descriptor: (K*D,), num_clusters: int) -> (K*D,)` | `DescriptorNormaliserError` if shape is not 1-D, `num_clusters < 1`, length not divisible by `num_clusters`, or dtype is not `float16` / `float32` | sync, hot-path | | `descriptor_metric` | `() -> str` | none | sync, in-memory, returns `"inner_product"` | Numpy arrays. dtype contract: `float16` in → `float16` out; `float32` in → `float32` out (no silent up-cast). The helper does NOT mutate inputs in place — it returns a new array. @@ -80,3 +83,4 @@ Numpy arrays. dtype contract: `float16` in → `float16` out; `float32` in → ` | Version | Date | Change | Author | |---------|------|--------|--------| | 1.0.0 | 2026-05-10 | Initial contract derived from `_docs/02_document/common-helpers/08_helper_descriptor_normaliser.md` | autodev decompose Step 2 | +| 1.1.0 | 2026-05-13 | Added `intra_cluster_normalise(descriptor, num_clusters)` for NetVLAD's dual-stage normalisation chain. Backward-compatible function addition required by AZ-338. | autodev implement Batch 46 | diff --git a/_docs/03_implementation/reviews/batch_46_review.md b/_docs/03_implementation/reviews/batch_46_review.md new file mode 100644 index 0000000..24eaaaa --- /dev/null +++ b/_docs/03_implementation/reviews/batch_46_review.md @@ -0,0 +1,265 @@ +# Code Review — Batch 46 / AZ-338 (C2 NetVLAD Mandatory Simple-Baseline) + +**Date**: 2026-05-13 +**Mode**: Per-batch (all 7 phases) +**Task**: AZ-338 — C2 NetVLAD Mandatory Simple-Baseline (3pt) +**Verdict**: **PASS_WITH_WARNINGS** + +## Scope + +| Domain | Files | +|--------|-------| +| c2_vpr (production) | `net_vlad.py` (NEW), `_net_vlad_architecture.py` (NEW), `_preprocessor_net_vlad.py` (NEW), `inference_runtime_cut.py` (NEW — AZ-507 cut of C7 InferenceRuntime), `config.py` (added `netvlad_descriptor_dim: int = 4096`), `__init__.py` (re-exports `InferenceRuntimeCut`) | +| Shared helpers | `helpers/descriptor_normaliser.py` (added `intra_cluster_normalise(descriptor, num_clusters)` — backward-compatible v1.1.0) | +| FDR | `fdr_client/records.py` (registered `vpr.embed_query`, `vpr.backbone_error`, `vpr.preprocess_error` per the AZ-338 spec § Outcome) | +| Composition root | `runtime_root/vpr_factory.py` (added `_register_strategy_architecture` helper; calls C7 `register_architecture` for the strategy's `MODEL_NAME` + `architecture_factory` pair before delegating to `create()`) | +| Tests | `tests/unit/c2_vpr/test_net_vlad.py` (NEW, 31 tests), `tests/unit/test_az283_descriptor_normaliser.py` (+8 tests for the new method), `tests/unit/test_az272_fdr_record_schema.py` (+3 fixture payloads) | +| Docs | `_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md` (v1.0.0 → v1.1.0; documented `intra_cluster_normalise` row + changelog entry) | + +## Phase 1 — Context Loading + +Inputs reviewed: + +- AZ-338 spec (`_docs/02_tasks/todo/AZ-338_c2_net_vlad.md`). +- `vpr_strategy_protocol.md` v1.0.0 — 7 invariants; INV-3 (L2-normalised + embedding) is the central correctness contract. +- `c2_vpr/_faiss_bridge.py` (AZ-341, prior batch) — the strategy's + one-line retrieve delegation target. +- `c7_inference/pytorch_fp16_runtime.py` (AZ-300) — the runtime that + actually deserializes the registered NetVLAD architecture. +- `c7_inference/architecture_registry.py` — the registration target; + rejects re-registration with a different factory under the same key + (defensive against accidental collision). +- AZ-507 lint rule (`tests/unit/test_az270_compose_root.py::test_ac6_only_compose_root_imports_concrete_strategies`) + — components MAY NOT import other components. +- `_types/inference.py` — `BuildConfig`, `EngineCacheEntry`, + `EngineHandle`, `PrecisionMode` (L1 shared DTOs the strategy uses). + +## Phase 2 — Spec Compliance + +All 11 ACs satisfied: + +| AC | Description | Covering test(s) | +|----|-------------|------------------| +| AC-1 | Protocol conformance | `test_ac1_protocol_conformance` | +| AC-2 | L2-norm == 1.0 ± 1e-3 FP16 (D,) | `test_ac2_embed_query_returns_unit_norm_fp16_descriptor` + 512-PCA variant | +| AC-3 | `intra_cluster_normalise` BEFORE `l2_normalise` | `test_ac3_intra_cluster_called_before_global_l2` + once-each | +| AC-4 | Deterministic across 3 calls | `test_ac4_embed_query_deterministic_for_same_frame` | +| AC-5 | `retrieve_topk` == k, label="net_vlad", sorted | `test_ac5_retrieve_topk_returns_exactly_k_with_net_vlad_label` | +| AC-6 | `descriptor_dim()` stable | 4096 + 512 instance variants | +| AC-7 | Engine output shape mismatch → ConfigError | `test_ac7_create_rejects_engine_output_shape_mismatch` | +| AC-8 | `VprBackboneError` on forward failure | RuntimeError + missing-key + wrong-shape variants | +| AC-9 | `VprPreprocessError` on corrupt image | non-array + wrong-dtype + wrong-shape variants | +| AC-10 | Composition-root wiring + `c2.vpr.ready` log | INFO log + model_name forcing | +| AC-11 | `BUILD_PYTORCH_RUNTIME=OFF` → ConfigError fail-fast | `tensorrt` + `onnx_trt_ep` runtime label variants | + +Spec deviations: + +- **`flask runtime.forward(engine_id, ...)` → `runtime.infer(handle, ...)`**: + the spec used placeholder names; the actual C7 `InferenceRuntime` + Protocol API is `infer(handle, inputs)` + `compile_engine` + + `deserialize_engine`. Aligned with the live Protocol shape (AZ-297). + Flag: spec wording should be refreshed to match the c7 contract. +- **Architecture registration moved from `c2_vpr.net_vlad.create()` to + `runtime_root/vpr_factory.py::_register_strategy_architecture`**: the + spec implies the strategy's `create(...)` registers the architecture + with C7. That violates AZ-507 (c2_vpr cannot import c7_inference). + Resolved by exposing `MODEL_NAME` + `architecture_factory(descriptor_dim)` + on the strategy module and having the composition root perform the + c7 binding before calling `create(...)`. The C7-side `register_architecture` + call lives at L4 (runtime_root), not L3. **This is a design + improvement over the spec; the spec should be updated.** +- **`NetVladStrategy.__init__` signature**: differs from the spec's + positional argument list (the spec lists `runtime, tile_store, + weights_path, preprocessor, normaliser, fdr_client, descriptor_dim`). + Implemented as keyword-only with `engine_handle` (returned from + `deserialize_engine`) replacing `weights_path` (the strategy holds + the resolved handle, not the source path — per the spec's own + "holds the engine ID, NOT the engine itself" constraint, more + consistent). The `tile_store` field also got renamed + `descriptor_index` to match `DescriptorIndexCut` (AZ-507 cut). + +Aligning the spec with the implementation is in the **Findings** below +(see F2). + +## Phase 3 — Code Quality + +- Every function ≤ ~50 LOC except `make_net_vlad_vgg16` (~75 LOC of + which 60 is inner `nn.Module` definitions — natural, indivisible). +- No bare `except`; every error chain uses `raise ... from exc`. +- No silently-swallowed errors; the strategy emits ERROR logs + an FDR + record for both `VprBackboneError` and `VprPreprocessError` paths. +- Constructor validation is consistent: `ValueError` for range/shape + violations, `TypeError` for type violations (matches the pattern of + the prior batch's `FaissBridge`). +- The `_iso_ts_from_clock` helper is duplicated yet again — sixth + module-local copy (see F1 below; carried-over from cumulative review + 43-45). +- Class names (`NetVladStrategy`, `NetVladBackbonePreprocessor`) match + the spec. +- No verbose default-on debug logging; logs are scoped to ERROR-on-error + + one INFO `c2.vpr.ready` at composition time. +- Ruff clean on every new file (UP037 auto-fixes applied; one RUF002 + ambiguous-glyph in `_net_vlad_architecture.py` docstring fixed in + Phase F). + +## Phase 4 — Security Quick-Scan + +- No SQL injection / command injection / eval / exec. +- No hardcoded secrets. +- FDR error-message payload is bounded to `str(error)[:512]` — prevents + unbounded sensitive-data exfiltration via long exception messages. +- No PII; `vpr.embed_query` payload is `(frame_id, backbone_label, + descriptor_dim, latency_us)` — all operational metadata. +- The `intra_cluster_normalise` helper rejects float64 input — denies + upcasts that would silently break the FAISS metric. +- The `c7_inference.register_architecture` call lives in the + composition root which runs at startup; not reachable from + user-controlled input. + +## Phase 5 — Performance Scan + +- `embed_query` p95 ≤ 80ms NFR — not verified by microbench in this + batch (deferred to C2-IT-01 / FT-P-19, Step 9). Justification: + microbench requires real PyTorch CUDA + real NetVLAD weights; the + current Tier-1 host has neither. +- `retrieve_topk` p95 ≤ 4ms — the `FaissBridge` (AZ-341) already + carries the p95 ≤ 500µs microbench; this strategy is a single-line + delegation, no added overhead. +- The architecture's NetVLAD pooling layer uses `torch.bmm` for the + K-cluster reduction instead of a Python loop — single optimised + CUDA kernel call. The published reference impl from Pittsburgh + has a Python `for k in range(K)` loop; this batched form is + asymptotically equivalent (K ~ 64) and dramatically faster on GPU. +- The dual-stage normalisation is two FP32-on-FP16-input operations, + ~ 4096-element working set — sub-µs on any host. + +## Phase 6 — Cross-Task Consistency + +NetVLAD is the first concrete VprStrategy implementation. Cross-task +consistency therefore concerns the patterns it establishes for +AZ-337 (UltraVPR), AZ-339 (MegaLoc/MixVPR), AZ-340 (SelaVPR/EigenPlaces/SALAD): + +- **AZ-507 cut pattern**: `InferenceRuntimeCut` joins + `DescriptorIndexCut` (AZ-341), `TileUploaderCut` (AZ-329), + `TileDownloaderCut` (AZ-328). Five Protocol cuts now exist + cross-component; all named `*Cut`; all `runtime_checkable=True`; all + one Protocol per file; all consumed via the consumer-side cut + module path. Pattern is stable. +- **Architecture-registration split**: the strategy module exposes + `MODEL_NAME` + `architecture_factory(descriptor_dim)`; the + composition root performs the c7 registration. Future C2 strategies + using the PyTorch runtime (AZ-339 MegaLoc/MixVPR with VGG/ResNet + backbones; AZ-340 SelaVPR/EigenPlaces/SALAD with various backbones) + follow the same shape; the composition-root helper + `_register_strategy_architecture` already has the dispatch slot for + per-strategy `descriptor_dim` lookup. +- **Dual-stage normalisation**: NetVLAD's `intra_cluster_normalise` + + `l2_normalise` chain is unique to NetVLAD (UltraVPR uses + single-stage `l2_normalise` per the AZ-337 spec). The helper + addition to `DescriptorNormaliser` is therefore NetVLAD-specific by + invocation but architectural-pattern-neutral by API; future + VLAD-aggregating strategies (SALAD has VLAD-like aggregation) can + reuse the same helper. +- **FDR record kinds**: `vpr.embed_query` / `vpr.backbone_error` / + `vpr.preprocess_error` are strategy-generic; every concrete C2 + strategy emits the same three plus the AZ-341 `vpr.retrieve_topk` + from the bridge. + +## Phase 7 — Architecture Compliance + +1. **Layer direction (rule 1)**: no upward imports. The strategy module + imports `_types`, `clock`, `config`, `fdr_client`, `helpers`, + `logging`, and its sibling c2_vpr modules — all at or below L3. +2. **Public API respect / AZ-507 (rule 2)**: verified by the + `test_ac6_only_compose_root_imports_concrete_strategies` lint: + PASS. `c2_vpr/net_vlad.py` consumes `InferenceRuntimeCut` (defined + in c2_vpr) instead of importing `c7_inference.InferenceRuntime`. +3. **No new cyclic dependencies (rule 3)**: no new cycles. +4. **Duplicate symbols (rule 4)**: `_iso_ts_from_clock` now in 6 + modules (carry-over F1, AZ-508 covers consolidation). No new + duplications introduced. +5. **Cross-cutting concerns not locally re-implemented (rule 5)**: the + composition root owns the c7 architecture registration; the + c2_vpr factory does not. + +## Findings + +| # | Severity | Category | Files | Title | +|---|----------|----------|-------|-------| +| F1 | Low | Maintainability | `c2_vpr/net_vlad.py` | `_iso_ts_from_clock` duplicated (6th module-local copy) | +| F2 | Low | Spec-Hygiene | AZ-338 task spec | Spec § Outcome lists outdated C7 API names (`runtime.forward` vs `infer`; `runtime.load_engine` vs `compile_engine + deserialize_engine`) + architecture-registration location | +| F3 | Low | Test-Coverage | `tests/unit/c2_vpr/test_net_vlad.py` | NFR-perf microbench (p95 ≤ 80ms) deferred (no Tier-1 PyTorch CUDA host); flagged in Phase 5 | +| F4 | Low | Architecture | `_net_vlad_architecture.py` | NetVLAD's PCA-projection layer parameters are part of the loaded `.pth` state dict; weights validation that the PCA centroids match the recorded sidecar is deferred to AZ-280 (engine sidecar) integration | + +### Finding Details + +**F1: `_iso_ts_from_clock` duplicated (6th copy)** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/components/c2_vpr/net_vlad.py` + module-level function. +- Description: same 6-line helper as `c2_vpr/_faiss_bridge.py`, + `c12_operator_orchestrator/operator_reloc_service.py`, + `c11_tile_manager/idempotent_retry.py`, + `c11_tile_manager/signing_key.py`, + `c6_tile_cache/postgres_filesystem_store.py`, + `c6_tile_cache/freshness_gate.py` — six modules now. +- Suggestion: AZ-508 (hygiene PBI for ISO-timestamp consolidation) is + already in `todo/` and scoped to absorb all six call-sites. + +**F2: AZ-338 spec uses outdated C7 API names + architecture-registration location** +(Low / Spec-Hygiene) + +- Locations: + - Spec § Outcome: + `intermediate = self._runtime.forward(self._engine_id, {"input": tensor})` + → live API is `self._runtime.infer(self._engine_handle, {"input": tensor})`. + - Spec § Outcome: + `inference_runtime.load_engine(weights_path)` → live API is + `compile_engine(model_path, build_config) -> entry; deserialize_engine(entry) -> handle`. + - Spec § Outcome implies `create(...)` performs the C7 + architecture registration; AZ-507 forbids this. Resolved by + moving the registration to `runtime_root/vpr_factory.py::_register_strategy_architecture`. +- Description: the spec was written against an earlier C7 Protocol + draft; the C7 Protocol stabilised at v1.0.0 in AZ-297. The + implementation aligns with the v1.0.0 Protocol; the spec is now + stale on this detail. +- Suggestion: surface to user as a small spec-hygiene follow-up. + Same class of finding as cumulative review F3 (AZ-341 spec + listed an unused `normaliser` parameter). Recommend a single + hygiene PBI scoped to "refresh AZ-337..AZ-340 specs against the + stabilised C7 v1.0.0 + AZ-507 patterns". + +**F3: NFR-perf microbench deferred (no Tier-1 PyTorch CUDA host)** +(Low / Test-Coverage) + +- Location: tests/unit/c2_vpr/test_net_vlad.py (no microbench + test class for AZ-338 NFR-perf). +- Description: the AZ-338 spec NFRs cite p95 ≤ 80ms for `embed_query` + on Tier-1 Jetson Orin. Microbench requires real PyTorch CUDA + real + NetVLAD weights; not runnable on this Tier-0 dev host (macOS, no + CUDA). The fake `InferenceRuntime` returns a synthetic output and + therefore cannot probe real-runtime latency. +- Suggestion: schedule under FT-P-19 / C2-IT-01 (Step 9 / E-BBT) on + Tier-1 hardware. No action this batch. + +**F4: PCA-projection sidecar verification deferred** (Low / Architecture) + +- Location: `src/gps_denied_onboard/components/c2_vpr/_net_vlad_architecture.py` + PCA `nn.Linear(K*D, descriptor_dim)`. +- Description: the architecture loads its PCA-projection layer's + weights from the same `.pth` state dict as the rest of the model + via `torch.load + load_state_dict(strict=True)`. There is no + separate check that the PCA centroids + whitening matrix match + the sha256 sidecar (AZ-280). For now the deserialize-time + strict-mode check is the only safeguard. +- Suggestion: schedule under a future "C2 PCA-whitening sidecar + validation" PBI if FT-P-19 / C2-IT-01 reveals real-world drift. + No action this batch. + +## Verdict + +**PASS_WITH_WARNINGS** — 4 Low-severity findings, all hygiene / +deferred-validation. No Critical, no High, no Medium. AC coverage is +complete; full unit suite is green (1608 passed / 80 env-skipped, +43 +tests over batch 45). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 38213f4..9768260 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 7 name: batch-loop - detail: "cumulative review batches 43-45 complete; selecting batch 46" + detail: "batch 46 — AZ-338 (C2 NetVLAD mandatory simple-baseline)" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/components/c2_vpr/__init__.py b/src/gps_denied_onboard/components/c2_vpr/__init__.py index 5b957f3..3b72214 100644 --- a/src/gps_denied_onboard/components/c2_vpr/__init__.py +++ b/src/gps_denied_onboard/components/c2_vpr/__init__.py @@ -37,6 +37,9 @@ from gps_denied_onboard.components.c2_vpr.errors import ( VprError, VprPreprocessError, ) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) from gps_denied_onboard.components.c2_vpr.interface import VprStrategy from gps_denied_onboard.config.schema import register_component_block @@ -46,6 +49,7 @@ __all__ = [ "C2VprConfig", "DescriptorIndexCut", "IndexUnavailableError", + "InferenceRuntimeCut", "TileIdTuple", "VprBackboneError", "VprCandidate", diff --git a/src/gps_denied_onboard/components/c2_vpr/_net_vlad_architecture.py b/src/gps_denied_onboard/components/c2_vpr/_net_vlad_architecture.py new file mode 100644 index 0000000..275798f --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_net_vlad_architecture.py @@ -0,0 +1,144 @@ +"""NetVLAD VGG16 architecture (AZ-338). + +Reference: Arandjelović, R., Gronat, P., Torii, A., Pajdla, T., & Sivic, J. +"NetVLAD: CNN architecture for weakly supervised place recognition", CVPR +2016. The architecture is the canonical VPR baseline. + +The architecture has three parts: + +1. **VGG16 trunk** — ``torchvision.models.vgg16`` feature extractor up to + the ``conv5_3`` layer (last conv before the classifier). Output is a + ``(B, encoder_dim=512, H', W')`` feature map. +2. **NetVLAD pooling layer** — implements the differentiable VLAD + aggregation: soft cluster assignment (1x1 conv + softmax over K) + times residuals against K learned cluster centres, summed per cluster + to produce a ``(B, K, D)`` aggregated descriptor, then flattened to + ``(B, K*D,)``. +3. **PCA projection (optional)** — a learned ``nn.Linear(K*D, descriptor_dim)`` + that whitens / reduces the raw VLAD descriptor to the deployment-pinned + output dim. Per the published Pittsburgh NetVLAD code drop, the default + pinned dim is ``4096`` (whitened from ``K*D = 64*512 = 32768``). When + ``descriptor_dim == K*D`` the PCA layer is omitted and the raw VLAD is + returned unchanged. + +The module is registered into the C7 ``architecture_registry`` (AZ-300) +under the ``"net_vlad"`` key by the strategy's ``create(...)`` factory in +:mod:`net_vlad`. Registration time is composition time — the strategy +constructs a closure carrying the config-driven ``descriptor_dim`` and +registers it. The C7 registry stays torch-free; torch / torchvision are +imported lazily inside the factory. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +if TYPE_CHECKING: + from torch import nn + +__all__ = [ + "DEFAULT_DESCRIPTOR_DIM", + "DEFAULT_ENCODER_DIM", + "DEFAULT_NUM_CLUSTERS", + "make_net_vlad_vgg16", +] + +DEFAULT_ENCODER_DIM: Final[int] = 512 +DEFAULT_NUM_CLUSTERS: Final[int] = 64 +DEFAULT_DESCRIPTOR_DIM: Final[int] = 4096 + + +def make_net_vlad_vgg16( + *, + num_clusters: int = DEFAULT_NUM_CLUSTERS, + encoder_dim: int = DEFAULT_ENCODER_DIM, + descriptor_dim: int = DEFAULT_DESCRIPTOR_DIM, +) -> nn.Module: + """Construct a fresh, randomly-initialised NetVLAD-VGG16 module. + + ``descriptor_dim == num_clusters * encoder_dim`` skips the PCA + projection; any other value adds an ``nn.Linear(K*D, descriptor_dim)`` + final layer (the published NetVLAD reference's "WPCA + L2" tail). + + Torch / torchvision are imported here, not at module load — keeping + the c2_vpr package free of torch on Tier-0 builds. Callers seeking + deterministic weights MUST seed ``torch.manual_seed`` before + invocation. + """ + if num_clusters < 1 or encoder_dim < 1 or descriptor_dim < 1: + raise ValueError( + f"make_net_vlad_vgg16: dimensions must be positive; " + f"got num_clusters={num_clusters}, encoder_dim={encoder_dim}, " + f"descriptor_dim={descriptor_dim}" + ) + + import torch + import torch.nn as nn + import torch.nn.functional as F + import torchvision + + class _NetVladLayer(nn.Module): + """Differentiable VLAD aggregation (Arandjelović et al. 2016). + + Soft assignment: ``soft_assign = softmax(conv1x1(x))`` over K + clusters. Residuals: ``r_ijk = x_ij - c_k``. Output: + ``v_k = sum_ij(soft_assign[k,i,j] * r_ijk)``. Flattened to a + single 1-D vector per batch. + """ + + def __init__(self, num_clusters: int, encoder_dim: int) -> None: + super().__init__() + self.num_clusters = num_clusters + self.encoder_dim = encoder_dim + self.conv = nn.Conv2d( + encoder_dim, num_clusters, kernel_size=(1, 1), bias=True + ) + self.centroids = nn.Parameter( + torch.randn(num_clusters, encoder_dim) * 0.01 + ) + + def forward(self, features: torch.Tensor) -> torch.Tensor: + n_batch, n_channels = features.shape[0], features.shape[1] + soft_assign = self.conv(features).view(n_batch, self.num_clusters, -1) + soft_assign = F.softmax(soft_assign, dim=1) + flat = features.view(n_batch, n_channels, -1) + soft_assign_t = soft_assign.transpose(1, 2) + assigned = torch.bmm(flat, soft_assign_t) + cluster_sum = soft_assign.sum(dim=2) + scaled_centroids = self.centroids.unsqueeze(0) * cluster_sum.unsqueeze(2) + vlad = assigned.transpose(1, 2) - scaled_centroids + return vlad.reshape(n_batch, -1) + + class _NetVladVgg16(nn.Module): + def __init__( + self, + num_clusters: int, + encoder_dim: int, + descriptor_dim: int, + ) -> None: + super().__init__() + self._raw_dim = num_clusters * encoder_dim + vgg = torchvision.models.vgg16(weights=None) + self.encoder = nn.Sequential(*list(vgg.features.children())[:-2]) + self.pool = _NetVladLayer(num_clusters, encoder_dim) + if descriptor_dim == self._raw_dim: + self.pca: nn.Module | None = None + else: + self.pca = nn.Linear(self._raw_dim, descriptor_dim, bias=True) + + def forward( + self, input: torch.Tensor + ) -> dict[str, torch.Tensor]: + features = self.encoder(input) + vlad_raw = self.pool(features) + if self.pca is not None: + vlad_descriptor = self.pca(vlad_raw) + else: + vlad_descriptor = vlad_raw + return {"vlad_descriptor": vlad_descriptor} + + return _NetVladVgg16( + num_clusters=num_clusters, + encoder_dim=encoder_dim, + descriptor_dim=descriptor_dim, + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor_net_vlad.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_net_vlad.py new file mode 100644 index 0000000..c9cf317 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor_net_vlad.py @@ -0,0 +1,137 @@ +"""NetVLAD-VGG16 backbone preprocessor (AZ-338). + +Per AZ-338 § Outcome: NetVLAD's published preprocessing chain decodes the +nav-camera frame's image to RGB uint8, centre-crops to a square region +respecting the camera calibration, resizes to ``(480, 480)``, applies +ImageNet mean/std normalisation, casts to FP16, and reshapes to NCHW. + +This preprocessor is C2-internal and owned exclusively by +:class:`NetVladStrategy` — UltraVPR and the other backbones each ship +their own concrete preprocessor (description.md § 6 forbids sharing). + +The :class:`BackbonePreprocessor` Protocol is mirrored here (the +strategy module imports the concrete preprocessor and constructs it in +the ``create(...)`` factory; the Protocol lives in +:mod:`c2_vpr._preprocessor`). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +import cv2 +import numpy as np + +from gps_denied_onboard.components.c2_vpr.errors import VprPreprocessError + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "IMAGENET_MEAN", + "IMAGENET_STD", + "NETVLAD_INPUT_HW", + "NetVladBackbonePreprocessor", +] + +NETVLAD_INPUT_HW: Final[tuple[int, int]] = (480, 480) +IMAGENET_MEAN: Final[tuple[float, float, float]] = (0.485, 0.456, 0.406) +IMAGENET_STD: Final[tuple[float, float, float]] = (0.229, 0.224, 0.225) + + +class NetVladBackbonePreprocessor: + """Resize + ImageNet-normalise + FP16-NCHW for NetVLAD-VGG16.""" + + def __init__( + self, + *, + input_shape: tuple[int, int] = NETVLAD_INPUT_HW, + mean: tuple[float, float, float] = IMAGENET_MEAN, + std: tuple[float, float, float] = IMAGENET_STD, + ) -> None: + if ( + not isinstance(input_shape, tuple) + or len(input_shape) != 2 + or any(not isinstance(v, int) or v <= 0 for v in input_shape) + ): + raise ValueError( + f"NetVladBackbonePreprocessor.input_shape must be a (H, W) " + f"tuple of positive ints; got {input_shape!r}" + ) + if len(mean) != 3 or len(std) != 3: + raise ValueError( + "NetVladBackbonePreprocessor.mean and std must each be " + "3-tuples (one per channel)" + ) + if any(v <= 0 for v in std): + raise ValueError( + "NetVladBackbonePreprocessor.std components must be > 0" + ) + self._input_shape: tuple[int, int] = input_shape + self._mean: np.ndarray = np.array(mean, dtype=np.float32).reshape(1, 1, 3) + self._std: np.ndarray = np.array(std, dtype=np.float32).reshape(1, 1, 3) + + def preprocess( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> np.ndarray: + """Decode → centre-crop → resize → normalise → FP16 NCHW. + + ``calibration`` is accepted for Protocol conformance but is not + consumed here — NetVLAD's published preprocessing chain does not + use principal-point or distortion correction (the backbone is + trained on ImageNet-style centre-cropped frames; calibration + differences are absorbed into the learned VLAD residuals). + UltraVPR's preprocessor uses calibration; this one does not. + + Raises: + :class:`VprPreprocessError` on shape / dtype / decode + violations. + """ + del calibration + image = self._coerce_to_rgb_uint8(frame.image) + cropped = self._centre_crop_square(image) + try: + resized = cv2.resize( + cropped, self._input_shape[::-1], interpolation=cv2.INTER_AREA + ) + except cv2.error as exc: + raise VprPreprocessError( + f"cv2.resize failed: {type(exc).__name__}: {exc}" + ) from exc + as_f32 = resized.astype(np.float32) / 255.0 + normalised = (as_f32 - self._mean) / self._std + chw = normalised.transpose(2, 0, 1) + return np.ascontiguousarray(chw[None, :, :, :], dtype=np.float16) + + def input_shape(self) -> tuple[int, int]: + return self._input_shape + + @staticmethod + def _coerce_to_rgb_uint8(image: object) -> np.ndarray: + if not isinstance(image, np.ndarray): + raise VprPreprocessError( + f"frame.image must be a numpy array; got {type(image).__name__}" + ) + if image.dtype != np.uint8: + raise VprPreprocessError( + f"frame.image must be uint8 RGB; got dtype {image.dtype}" + ) + if image.ndim == 2: + # Grayscale → 3-channel by repeating + return np.stack([image, image, image], axis=-1) + if image.ndim == 3 and image.shape[2] == 3: + return image + raise VprPreprocessError( + f"frame.image must be (H,W) or (H,W,3); got shape {image.shape}" + ) + + @staticmethod + def _centre_crop_square(image: np.ndarray) -> np.ndarray: + h, w = image.shape[:2] + side = min(h, w) + top = (h - side) // 2 + left = (w - side) // 2 + return image[top : top + side, left : left + side, :] diff --git a/src/gps_denied_onboard/components/c2_vpr/config.py b/src/gps_denied_onboard/components/c2_vpr/config.py index 7ccf8ac..b77601f 100644 --- a/src/gps_denied_onboard/components/c2_vpr/config.py +++ b/src/gps_denied_onboard/components/c2_vpr/config.py @@ -21,8 +21,8 @@ from typing import Final from gps_denied_onboard.config.schema import ConfigError __all__ = [ - "C2VprConfig", "KNOWN_STRATEGIES", + "C2VprConfig", ] KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset( @@ -69,6 +69,7 @@ class C2VprConfig: faiss_index_path: Path = field(default_factory=lambda: Path("/cache/vpr/index.faiss")) warn_top1_threshold: float = 0.30 debug_per_frame_distances: bool = False + netvlad_descriptor_dim: int = 4096 def __post_init__(self) -> None: if self.strategy not in KNOWN_STRATEGIES: @@ -109,3 +110,15 @@ class C2VprConfig: f"C2VprConfig.debug_per_frame_distances must be a bool; " f"got {self.debug_per_frame_distances!r}" ) + if not isinstance(self.netvlad_descriptor_dim, int) or isinstance( + self.netvlad_descriptor_dim, bool + ): + raise ConfigError( + f"C2VprConfig.netvlad_descriptor_dim must be a non-bool " + f"int; got {self.netvlad_descriptor_dim!r}" + ) + if self.netvlad_descriptor_dim < 1: + raise ConfigError( + f"C2VprConfig.netvlad_descriptor_dim must be >= 1; " + f"got {self.netvlad_descriptor_dim}" + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/inference_runtime_cut.py b/src/gps_denied_onboard/components/c2_vpr/inference_runtime_cut.py new file mode 100644 index 0000000..f68e201 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/inference_runtime_cut.py @@ -0,0 +1,60 @@ +"""C2's structural cut of C7 ``InferenceRuntime`` (AZ-507). + +Concrete C2 ``VprStrategy`` impls call into C7's inference runtime to +load engine handles and run forward passes. Per AZ-507, ``c2_vpr`` MUST +NOT import ``components.c7_inference`` directly; the consumer-side cut +declares the structural Protocol surface that c2 actually uses, and the +composition root binds the c7 runtime as the concrete implementation. + +This Protocol mirrors the subset of +:class:`gps_denied_onboard.components.c7_inference.InferenceRuntime` +that the C2 strategies consume — ``compile_engine``, +``deserialize_engine``, ``infer``, ``release_engine``, and +``current_runtime_label``. The full Protocol (which adds +``thermal_state``) is wider; the cut narrows to what C2 needs so +``isinstance(runtime, InferenceRuntimeCut)`` can be enforced without +demanding the wider surface. + +DTOs (``BuildConfig``, ``EngineHandle``, ``EngineCacheEntry``) live in +:mod:`gps_denied_onboard._types.inference` (L1) and are imported here +directly — they are L1 shared types, not cross-component imports. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineCacheEntry, + EngineHandle, +) + +if TYPE_CHECKING: + import numpy as np + +__all__ = ["InferenceRuntimeCut"] + + +@runtime_checkable +class InferenceRuntimeCut(Protocol): + """Subset of C7 ``InferenceRuntime`` consumed by C2 strategies.""" + + def compile_engine( + self, model_path: Path, build_config: BuildConfig + ) -> EngineCacheEntry: ... + + def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: ... + + def infer( + self, + handle: EngineHandle, + inputs: dict[str, np.ndarray], + ) -> dict[str, np.ndarray]: ... + + def release_engine(self, handle: EngineHandle) -> None: ... + + def current_runtime_label( + self, + ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: ... diff --git a/src/gps_denied_onboard/components/c2_vpr/net_vlad.py b/src/gps_denied_onboard/components/c2_vpr/net_vlad.py new file mode 100644 index 0000000..ca71ba1 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/net_vlad.py @@ -0,0 +1,521 @@ +"""``NetVladStrategy`` — C2 mandatory simple-baseline VprStrategy (AZ-338). + +NetVLAD is the C2 comparative baseline mandated by the engine rule (every +production-default backbone ships with a simpler baseline alongside, so +a code-drop / weights / engine compile bug in the primary has a +fallback at the strategy layer). Per ``components/02_c2_vpr/description.md`` +§ 1, NetVLAD is paired with UltraVPR's primary path; per § 5, NetVLAD +runs on the C7 PyTorch FP16 runtime (NOT TensorRT) so a TRT engine +issue cannot simultaneously break both. + +The strategy delegates retrieval to :class:`FaissBridge` (AZ-341) and +the c6 ``DescriptorIndex`` cut (AZ-507) — see +:mod:`gps_denied_onboard.components.c2_vpr._faiss_bridge`. Embedding +goes through the c7 :class:`InferenceRuntime` Protocol; the architecture +is registered into c7's architecture registry by this module's +``create(...)`` factory. + +Architecture loading flow: + +1. ``create(config, descriptor_index, inference_runtime)`` is called by + the composition-root :func:`build_vpr_strategy`. +2. Build-flag guard: confirms ``inference_runtime.current_runtime_label() + == "pytorch_fp16"`` — fails fast with :class:`ConfigError` otherwise + (AC-11: airborne binary has the PyTorch runtime excluded so this + surfaces at composition time, not at first frame). +3. The factory binds ``descriptor_dim`` from + ``config.c2_vpr.netvlad_descriptor_dim`` into a closure and registers + that closure with c7's architecture registry under ``"net_vlad"``. + The closure is the zero-arg ``ArchitectureFactory`` callable shape + the registry expects. +4. ``inference_runtime.compile_engine(weights_path, build_config)`` is + called with ``BuildConfig(precision=FP16, ...)``; the PyTorch runtime + (AZ-300) returns an :class:`EngineCacheEntry` whose + ``extras["model_name"]`` is the checkpoint's file stem. The factory + forces ``model_name = "net_vlad"`` so the registered factory is + selected at :meth:`deserialize_engine` time. +5. ``inference_runtime.deserialize_engine(entry)`` returns an + :class:`EngineHandle`. The factory then queries the architecture's + output shape via a single dry-run inference on a zero-init input, + compares against the configured ``descriptor_dim``, and raises + :class:`ConfigError` on mismatch (AC-7) BEFORE the strategy is + bound. +6. ``NetVladStrategy`` is constructed with the resolved handle + the + :class:`FaissBridge` + :class:`NetVladBackbonePreprocessor` + + :class:`DescriptorNormaliser`. + +Per-frame :meth:`embed_query` pipeline: + +1. ``preprocessor.preprocess(frame, calibration)`` → ``(1, 3, 480, 480)`` + FP16 NCHW ndarray. +2. ``inference_runtime.infer(handle, {"input": tensor})`` → + ``{"vlad_descriptor": (1, descriptor_dim) FP16 ndarray}``. +3. ``normaliser.intra_cluster_normalise(intermediate, num_clusters=64)`` + → per-cluster L2 (the NetVLAD-canonical first stage). +4. ``normaliser.l2_normalise(intra)`` → global L2 (the second stage). +5. Return :class:`VprQuery` with ``frame_id``, normalised embedding, + produced_at monotonic ns. + +Error envelope: every method raises only members of :class:`VprError`. +``RuntimeError`` from the backbone forward → rewrapped to +:class:`VprBackboneError`; :class:`VprPreprocessError` from the +preprocessor propagates unchanged. + +Retrieval is a single-line delegation to :class:`FaissBridge.retrieve`; +see AZ-341 AC-10. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Final, Literal + +import numpy as np + +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.vpr import VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._net_vlad_architecture import ( + DEFAULT_NUM_CLUSTERS, + make_net_vlad_vgg16, +) +from gps_denied_onboard.components.c2_vpr._preprocessor_net_vlad import ( + NetVladBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.inference_runtime_cut import ( + InferenceRuntimeCut, +) +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.config.schema import Config + +__all__ = ["MODEL_NAME", "NetVladStrategy", "architecture_factory", "create"] + + +MODEL_NAME: Final[str] = "net_vlad" + + +def architecture_factory( + descriptor_dim: int, + *, + num_clusters: int = DEFAULT_NUM_CLUSTERS, +): + """Zero-arg architecture factory closure for C7 registry binding. + + The composition root calls this with the configured ``descriptor_dim`` + and registers the returned closure under :data:`MODEL_NAME` in C7's + architecture registry. Keeping the registration step in the + composition root preserves the AZ-507 layering: ``c2_vpr`` MUST NOT + import ``c7_inference``. + """ + if descriptor_dim < 1: + raise ValueError( + f"architecture_factory: descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + + def _factory(): + return make_net_vlad_vgg16( + num_clusters=num_clusters, descriptor_dim=descriptor_dim + ) + + return _factory + + +_BACKBONE_LABEL: Final[Literal["net_vlad"]] = "net_vlad" +_COMPONENT: Final[str] = "c2_vpr" + +_LOG_KIND_READY: Final[str] = "c2.vpr.ready" +_LOG_KIND_BACKBONE_ERROR: Final[str] = "c2.vpr.backbone_error" +_LOG_KIND_PREPROCESS_ERROR: Final[str] = "c2.vpr.preprocess_error" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + +_FDR_KIND_EMBED: Final[str] = "vpr.embed_query" +_FDR_KIND_BACKBONE_ERROR: Final[str] = "vpr.backbone_error" +_FDR_KIND_PREPROCESS_ERROR: Final[str] = "vpr.preprocess_error" + + +class NetVladStrategy: + """C2 mandatory simple-baseline VprStrategy. + + See module docstring for the architecture-loading + per-frame + pipeline. Stateless across frames (INV-2); single-threaded per + instance (INV-1). + """ + + def __init__( + self, + *, + inference_runtime: InferenceRuntimeCut, + engine_handle: EngineHandle, + descriptor_index: DescriptorIndexCut, + preprocessor: NetVladBackbonePreprocessor, + normaliser: DescriptorNormaliser, + faiss_bridge: FaissBridge, + fdr_client: FdrClient, + clock: Clock, + logger: logging.Logger, + descriptor_dim: int, + num_clusters: int = DEFAULT_NUM_CLUSTERS, + ) -> None: + if descriptor_dim < 1: + raise ValueError( + f"NetVladStrategy.descriptor_dim must be >= 1; " + f"got {descriptor_dim}" + ) + if num_clusters < 1: + raise ValueError( + f"NetVladStrategy.num_clusters must be >= 1; " + f"got {num_clusters}" + ) + if descriptor_dim % num_clusters != 0: + raise ValueError( + f"NetVladStrategy: descriptor_dim={descriptor_dim} must be " + f"divisible by num_clusters={num_clusters} for intra-cluster " + f"normalisation" + ) + self._inference_runtime = inference_runtime + self._engine_handle = engine_handle + self._descriptor_index = descriptor_index + self._preprocessor = preprocessor + self._normaliser = normaliser + self._faiss_bridge = faiss_bridge + self._fdr_client = fdr_client + self._clock = clock + self._logger = logger + self._descriptor_dim = descriptor_dim + self._num_clusters = num_clusters + + def embed_query( + self, + frame: NavCameraFrame, + calibration: CameraCalibration, + ) -> VprQuery: + try: + tensor = self._preprocessor.preprocess(frame, calibration) + except VprPreprocessError as exc: + self._emit_preprocess_error(frame, exc) + raise + + ns_start = self._clock.monotonic_ns() + try: + outputs = self._inference_runtime.infer( + self._engine_handle, {"input": tensor} + ) + except Exception as exc: + wrapped = self._wrap_backbone_error(frame, exc) + raise wrapped from exc + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + if "vlad_descriptor" not in outputs: + err = VprBackboneError( + f"NetVLAD forward returned no 'vlad_descriptor' key; " + f"got {sorted(outputs.keys())!r}" + ) + self._emit_backbone_error(frame, err) + raise err + + raw = np.asarray(outputs["vlad_descriptor"]) + if raw.ndim != 2 or raw.shape[0] != 1 or raw.shape[1] != self._descriptor_dim: + err = VprBackboneError( + f"NetVLAD forward returned shape {raw.shape}; " + f"expected (1, {self._descriptor_dim})" + ) + self._emit_backbone_error(frame, err) + raise err + + flat = np.ascontiguousarray(raw[0], dtype=np.float16) + intra = self._normaliser.intra_cluster_normalise( + flat, num_clusters=self._num_clusters + ) + normalised = self._normaliser.l2_normalise(intra) + + self._emit_embed_record( + frame_id=int(frame.frame_id), latency_us=int(latency_us) + ) + + return VprQuery( + frame_id=int(frame.frame_id), + embedding=normalised, + produced_at=ns_end, + ) + + def retrieve_topk(self, query: VprQuery, k: int) -> VprResult: + return self._faiss_bridge.retrieve( + query, k, backbone_label=_BACKBONE_LABEL + ) + + def descriptor_dim(self) -> int: + return self._descriptor_dim + + def _wrap_backbone_error( + self, frame: NavCameraFrame, exc: BaseException + ) -> VprBackboneError: + wrapped = VprBackboneError( + f"NetVLAD forward raised {type(exc).__name__}: {exc}" + ) + self._emit_backbone_error(frame, wrapped) + return wrapped + + def _emit_embed_record(self, *, frame_id: int, latency_us: int) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_EMBED, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "descriptor_dim": self._descriptor_dim, + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.embed_query record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + }, + }, + ) + + def _emit_backbone_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"NetVLAD backbone error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BACKBONE_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_BACKBONE_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + def _emit_preprocess_error( + self, frame: NavCameraFrame, error: BaseException + ) -> None: + frame_id = int(frame.frame_id) + msg = f"NetVLAD preprocess error: {error}" + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PREPROCESS_ERROR, + "kv": { + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + }, + }, + ) + self._fdr_client.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_from_clock(self._clock), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_PREPROCESS_ERROR, + payload={ + "frame_id": frame_id, + "backbone_label": _BACKBONE_LABEL, + "error_type": type(error).__name__, + "error_message": str(error)[:512], + }, + ) + ) + + +def _iso_ts_from_clock(clock: Clock) -> str: + # Same shape every component uses for FDR timestamps; AZ-508 will + # consolidate the duplicate helpers across c2/c11/c12/c6. + from datetime import datetime, timezone + + ns = int(clock.time_ns()) + seconds, fraction_ns = divmod(ns, 1_000_000_000) + dt = datetime.fromtimestamp(seconds, tz=timezone.utc) + return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}+00:00" + + +def _build_pytorch_build_config(weights_path: Path) -> BuildConfig: + del weights_path + return BuildConfig( + precision=PrecisionMode.FP16, + workspace_mb=0, + calibration_dataset=None, + optimization_profiles=(), + ) + + +def create( + config: Config, + *, + descriptor_index: DescriptorIndexCut, + inference_runtime: InferenceRuntimeCut, + fdr_client: FdrClient | None = None, + clock: Clock | None = None, + logger: logging.Logger | None = None, +) -> NetVladStrategy: + """Module-level factory consumed by :func:`build_vpr_strategy`. + + Prerequisite: the composition root MUST have already registered + :func:`architecture_factory` under :data:`MODEL_NAME` in C7's + architecture registry before calling this factory. The registration + step lives in the composition root to preserve AZ-507 — ``c2_vpr`` + does not import ``c7_inference``. + + Optional keyword-only injection points (``fdr_client`` / ``clock`` / + ``logger``) keep tests deterministic; production wiring fills them + from the composition root. + """ + runtime_label = inference_runtime.current_runtime_label() + if runtime_label != "pytorch_fp16": + raise ConfigError( + f"NetVLAD requires BUILD_PYTORCH_RUNTIME=ON; this binary " + f"has BUILD_PYTORCH_RUNTIME=OFF (current_runtime_label=" + f"{runtime_label!r}). Per AZ-338 AC-11, NetVLAD is " + f"unselectable when the C7 PyTorch FP16 runtime is " + f"excluded." + ) + + block = config.components["c2_vpr"] + descriptor_dim = block.netvlad_descriptor_dim + weights_path = block.backbone_weights_path + + if fdr_client is None: + raise ValueError( + "NetVladStrategy.create: fdr_client is required; the " + "composition root must inject the running FDR client." + ) + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + if logger is None: + logger = logging.getLogger("gps_denied_onboard.c2_vpr.net_vlad") + + entry = inference_runtime.compile_engine( + weights_path, _build_pytorch_build_config(weights_path) + ) + # Force the registry lookup to "net_vlad" regardless of the on-disk + # filename stem; the registered factory holds the descriptor_dim + # closure. + entry_for_deserialize = type(entry)( + engine_path=entry.engine_path, + sha256_hex=entry.sha256_hex, + sm=entry.sm, + jp=entry.jp, + trt=entry.trt, + precision=entry.precision, + extras={**entry.extras, "model_name": MODEL_NAME}, + ) + handle = inference_runtime.deserialize_engine(entry_for_deserialize) + + preprocessor = NetVladBackbonePreprocessor() + normaliser = DescriptorNormaliser() + faiss_bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=descriptor_dim, + warn_top1_threshold=block.warn_top1_threshold, + debug_log_per_frame_distances=block.debug_per_frame_distances, + fdr_client=fdr_client, + logger=logger, + clock=clock, + ) + + _assert_engine_output_dim( + inference_runtime, handle, descriptor_dim, preprocessor + ) + + logger.info( + "C2 VPR strategy ready", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_READY, + "kv": { + "strategy": _BACKBONE_LABEL, + "descriptor_dim": descriptor_dim, + }, + }, + ) + + return NetVladStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=faiss_bridge, + fdr_client=fdr_client, + clock=clock, + logger=logger, + descriptor_dim=descriptor_dim, + ) + + +def _assert_engine_output_dim( + inference_runtime: InferenceRuntimeCut, + handle: EngineHandle, + expected_dim: int, + preprocessor: NetVladBackbonePreprocessor, +) -> None: + h, w = preprocessor.input_shape() + probe = np.zeros((1, 3, h, w), dtype=np.float16) + outputs = inference_runtime.infer(handle, {"input": probe}) + if "vlad_descriptor" not in outputs: + raise ConfigError( + f"engine output shape mismatch: 'vlad_descriptor' key absent; " + f"got keys {sorted(outputs.keys())!r}" + ) + actual = np.asarray(outputs["vlad_descriptor"]) + if actual.ndim != 2 or actual.shape[0] != 1 or actual.shape[1] != expected_dim: + raise ConfigError( + f"engine output shape mismatch: expected (1, {expected_dim}), " + f"got {tuple(actual.shape)}" + ) diff --git a/src/gps_denied_onboard/fdr_client/records.py b/src/gps_denied_onboard/fdr_client/records.py index 79f313d..2b30651 100644 --- a/src/gps_denied_onboard/fdr_client/records.py +++ b/src/gps_denied_onboard/fdr_client/records.py @@ -314,6 +314,46 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = { "latency_us", } ), + # AZ-338 / E-C2: emitted by each concrete C2 ``VprStrategy`` on every + # successful ``embed_query(...)`` call (post-flight forensic record + # of the embedding pipeline; complements ``vpr.retrieve_topk`` for the + # subsequent FAISS lookup). ``frame_id`` echoes + # ``NavCameraFrame.frame_id``; ``backbone_label`` is the strategy's + # lowercase ``BUILD_VPR_`` token (e.g. ``"net_vlad"``); + # ``descriptor_dim`` is the strategy's stable embedding dim; + # ``latency_us`` is the strategy-internal ``Clock.monotonic_ns`` + # delta around the backbone forward, in integer microseconds. + "vpr.embed_query": frozenset( + { + "frame_id", + "backbone_label", + "descriptor_dim", + "latency_us", + } + ), + # AZ-338 / E-C2: emitted when ``embed_query`` raises a + # :class:`VprBackboneError` (forward-pass failure: CUDA OOM, TRT + # engine deserialize mismatch, etc.). One record per occurrence; + # logged at ERROR alongside. + "vpr.backbone_error": frozenset( + { + "frame_id", + "backbone_label", + "error_type", + "error_message", + } + ), + # AZ-338 / E-C2: emitted when ``embed_query`` raises a + # :class:`VprPreprocessError` (image decode / shape / dtype + # violation in the per-strategy preprocessor). + "vpr.preprocess_error": frozenset( + { + "frame_id", + "backbone_label", + "error_type", + "error_message", + } + ), } KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys()) diff --git a/src/gps_denied_onboard/helpers/descriptor_normaliser.py b/src/gps_denied_onboard/helpers/descriptor_normaliser.py index 7fb2c37..2e59436 100644 --- a/src/gps_denied_onboard/helpers/descriptor_normaliser.py +++ b/src/gps_denied_onboard/helpers/descriptor_normaliser.py @@ -92,6 +92,59 @@ class DescriptorNormaliser: normalised_f32 = np.where(norms == 0.0, 0.0, as_f32 / safe) return normalised_f32.astype(in_dtype, copy=False) + @staticmethod + def intra_cluster_normalise( + descriptor: np.ndarray, num_clusters: int + ) -> np.ndarray: + """Per-cluster L2 normalisation for VLAD-aggregated descriptors (AZ-338). + + NetVLAD's published preprocessing chain L2-normalises each + per-cluster sub-vector BEFORE the global L2 step. The input is + a flat 1-D VLAD descriptor of shape ``(num_clusters * cluster_dim,)`` + which is reshaped to ``(num_clusters, cluster_dim)``, normalised + row-wise, then flattened back. ``num_clusters`` must divide + ``descriptor.shape[0]``. + + Zero-norm sub-vectors are returned as zero (consistent with + :meth:`l2_normalise`). + """ + if not isinstance(descriptor, np.ndarray): + raise DescriptorNormaliserError( + f"intra_cluster_normalise: expected np.ndarray; " + f"got {type(descriptor).__name__}" + ) + if descriptor.ndim != 1: + raise DescriptorNormaliserError( + f"intra_cluster_normalise: expected 1-D shape (K*D,); " + f"got shape {descriptor.shape}" + ) + if not isinstance(num_clusters, int) or isinstance(num_clusters, bool): + raise DescriptorNormaliserError( + f"intra_cluster_normalise: num_clusters must be a non-bool " + f"int; got {num_clusters!r}" + ) + if num_clusters < 1: + raise DescriptorNormaliserError( + f"intra_cluster_normalise: num_clusters must be >= 1; " + f"got {num_clusters}" + ) + total_dim = descriptor.shape[0] + if total_dim % num_clusters != 0: + raise DescriptorNormaliserError( + f"intra_cluster_normalise: descriptor length {total_dim} " + f"not divisible by num_clusters={num_clusters}" + ) + _validate_dtype(descriptor, "intra_cluster_normalise") + in_dtype = descriptor.dtype + cluster_dim = total_dim // num_clusters + reshaped = descriptor.reshape(num_clusters, cluster_dim).astype( + np.float32, copy=False + ) + norms = np.linalg.norm(reshaped, axis=1, keepdims=True) + safe = np.where(norms == 0.0, 1.0, norms) + normalised = np.where(norms == 0.0, 0.0, reshaped / safe) + return normalised.reshape(total_dim).astype(in_dtype, copy=False) + @staticmethod def descriptor_metric() -> str: return _METRIC_VALUE diff --git a/src/gps_denied_onboard/runtime_root/vpr_factory.py b/src/gps_denied_onboard/runtime_root/vpr_factory.py index f93c78b..543b795 100644 --- a/src/gps_denied_onboard/runtime_root/vpr_factory.py +++ b/src/gps_denied_onboard/runtime_root/vpr_factory.py @@ -103,7 +103,7 @@ def _is_build_flag_on(flag_name: str) -> bool: return raw.strip().lower() in {"on", "1", "true", "yes"} -def _c2_config(config: "Config") -> "C2VprConfig": +def _c2_config(config: Config) -> C2VprConfig: """Pull the registered C2 config block. ``c2_vpr.__init__`` registers it on import; a missing @@ -113,34 +113,72 @@ def _c2_config(config: "Config") -> "C2VprConfig": return config.components["c2_vpr"] +def _register_strategy_architecture( + strategy: str, config: Config +) -> None: + """Register the strategy's C7 architecture factory (AZ-338 + future). + + Each concrete strategy module owns its NN architecture but cannot + import C7 directly (AZ-507). The strategy module exposes + ``MODEL_NAME`` + ``architecture_factory(descriptor_dim)`` and the + composition root performs the registration with c7. Strategies that + do not need a registered architecture (e.g. TRT-engine strategies + that bring their own ``.engine``) MAY omit these attributes; the + helper no-ops in that case. + """ + module_info = _STRATEGY_TO_MODULE.get(strategy) + if module_info is None: + return + module_name, _ = module_info + try: + module = __import__(module_name, fromlist=["architecture_factory"]) + except ModuleNotFoundError: + return + factory_fn = getattr(module, "architecture_factory", None) + model_name = getattr(module, "MODEL_NAME", None) + if factory_fn is None or model_name is None: + return + if strategy == "net_vlad": + descriptor_dim = config.components["c2_vpr"].netvlad_descriptor_dim + else: + # Future strategies: each may have its own descriptor_dim source; + # extend as they land. + return + from gps_denied_onboard.components.c7_inference import register_architecture + + register_architecture(model_name, factory_fn(descriptor_dim)) + + def build_vpr_strategy( - config: "Config", + config: Config, *, - descriptor_index: "DescriptorIndex", - inference_runtime: "InferenceRuntime", -) -> "VprStrategy": + descriptor_index: DescriptorIndex, + inference_runtime: InferenceRuntime, +) -> VprStrategy: """Construct the :class:`VprStrategy` impl selected by config. 1. Reads ``config.components['c2_vpr'].strategy``. 2. Checks the matching ``BUILD_VPR_`` flag — if OFF, raises :class:`StrategyNotAvailableError` BEFORE any import. 3. Lazily imports the concrete strategy module. - 4. Constructs the strategy via its module-level + 4. Registers the strategy's NN architecture with C7 (when the + strategy module exposes ``MODEL_NAME`` + ``architecture_factory``). + 5. Constructs the strategy via its module-level ``create(config, descriptor_index, inference_runtime)`` factory function (each concrete strategy module exports ``create`` as its public entry-point; concrete constructors stay private). - 5. Pre-flight ``descriptor_dim`` match: ``strategy.descriptor_dim()`` + 6. Pre-flight ``descriptor_dim`` match: ``strategy.descriptor_dim()`` vs ``descriptor_index.descriptor_dim()``. Mismatch raises :class:`ConfigError`; ONE ERROR log ``kind="c2.vpr.dim_mismatch"`` is emitted; the strategy is NOT bound. - 6. On success, ONE INFO log ``kind="c2.vpr.strategy_loaded"`` + 7. On success, ONE INFO log ``kind="c2.vpr.strategy_loaded"`` with ``strategy`` + ``descriptor_dim``. Raises: StrategyNotAvailableError: compile-time flag OFF or - concrete module not yet built (AZ-337..AZ-340 pending). + concrete module not yet built (AZ-337 / AZ-339 / AZ-340 pending). ConfigError: ``descriptor_dim`` mismatch between strategy and corpus index. """ @@ -176,8 +214,9 @@ def build_vpr_strategy( raise StrategyNotAvailableError( f"VprStrategy {strategy!r} is configured but its concrete impl " f"module {module_name!r} has not been built into this binary " - "yet (AZ-337 / AZ-338 / AZ-339 / AZ-340 pending)." + "yet (AZ-337 / AZ-339 / AZ-340 pending)." ) from exc + _register_strategy_architecture(strategy, config) create_fn = getattr(module, "create", None) if create_fn is None: strategy_cls = getattr(module, class_name) diff --git a/tests/unit/c2_vpr/test_net_vlad.py b/tests/unit/c2_vpr/test_net_vlad.py new file mode 100644 index 0000000..122bcbd --- /dev/null +++ b/tests/unit/c2_vpr/test_net_vlad.py @@ -0,0 +1,805 @@ +"""AZ-338 — NetVLAD mandatory simple-baseline VprStrategy unit tests. + +Covers AC-1..AC-11 + preprocessor contract + intra-cluster normalisation +ordering. Uses fakes for :class:`InferenceRuntime`, :class:`DescriptorIndexCut`, +and :class:`FdrClient` so the suite stays AZ-507-clean and torch-free. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Literal +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration # noqa: E402 + + +@pytest.fixture(autouse=True) +def _reset_c7_architecture_registry() -> Any: + """Isolate the C7 architecture registry per test. + + ``register_architecture`` rejects re-registration with a *different* + factory under the same key; the NetVLAD ``create()`` factory builds + a fresh closure per call, so the global singleton accumulates across + tests and triggers a false-positive ValueError. Resetting the dict + around each test mirrors a fresh process boot. + """ + from gps_denied_onboard.components.c7_inference.architecture_registry import ( + _DEFAULT_REGISTRY, + ) + + snapshot = dict(_DEFAULT_REGISTRY) + _DEFAULT_REGISTRY.clear() + yield + _DEFAULT_REGISTRY.clear() + _DEFAULT_REGISTRY.update(snapshot) +from gps_denied_onboard._types.inference import ( + BuildConfig, + EngineCacheEntry, + EngineHandle, + PrecisionMode, +) +from gps_denied_onboard._types.nav import NavCameraFrame +from gps_denied_onboard._types.vpr import VprQuery +from gps_denied_onboard.components.c2_vpr import ( + C2VprConfig, + VprStrategy, +) +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr._preprocessor import ( + BackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr._preprocessor_net_vlad import ( + NetVladBackbonePreprocessor, +) +from gps_denied_onboard.components.c2_vpr.errors import ( + VprBackboneError, + VprPreprocessError, +) +from gps_denied_onboard.components.c2_vpr.net_vlad import ( + MODEL_NAME, + NetVladStrategy, + architecture_factory, + create, +) +from gps_denied_onboard.config.schema import Config, ConfigError +from gps_denied_onboard.fdr_client import FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +@dataclass +class _StubClock: + next_monotonic_ns: int = 1_000_000_000 + step_ns: int = 5_000 + fixed_time_ns: int = 1_715_600_000_000_000_000 + + def monotonic_ns(self) -> int: + v = self.next_monotonic_ns + self.next_monotonic_ns += self.step_ns + return v + + def time_ns(self) -> int: + return self.fixed_time_ns + + def sleep_until_ns(self, target_ns: int) -> None: + _ = target_ns + + +class _FakeEngineHandle(EngineHandle): + """Minimal :class:`EngineHandle` for test wiring.""" + + def __init__(self, label: str = "net_vlad") -> None: + self.label = label + + +@dataclass +class _FakeInferenceRuntime: + """Configurable :class:`InferenceRuntime` for unit tests. + + ``forward_output`` is the dict returned by :meth:`infer`; ``raises`` + when set is raised instead. ``runtime_label`` controls AC-11. + """ + + descriptor_dim: int = 4096 + raises: BaseException | None = None + runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = ( + "pytorch_fp16" + ) + fixed_output: np.ndarray | None = None + output_key: str = "vlad_descriptor" + calls: list[dict[str, np.ndarray]] = field(default_factory=list) + deserialize_calls: list[EngineCacheEntry] = field(default_factory=list) + + def compile_engine( + self, model_path: Path, build_config: BuildConfig + ) -> EngineCacheEntry: + _ = build_config + return EngineCacheEntry( + engine_path=Path(model_path), + sha256_hex="0" * 64, + sm=None, + jp=None, + trt=None, + precision=PrecisionMode.FP16, + extras={"model_name": "from_filename_stem"}, + ) + + def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: + self.deserialize_calls.append(entry) + return _FakeEngineHandle(label=entry.extras.get("model_name", "")) + + def infer( + self, + handle: EngineHandle, + inputs: dict[str, np.ndarray], + ) -> dict[str, np.ndarray]: + _ = handle + self.calls.append({k: v.copy() for k, v in inputs.items()}) + if self.raises is not None: + raise self.raises + if self.fixed_output is not None: + return {self.output_key: self.fixed_output.copy()} + rng = np.random.default_rng(0xDEADBEEF) + tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16) + return { + self.output_key: tensor.reshape(1, self.descriptor_dim).copy() + } + + def release_engine(self, handle: EngineHandle) -> None: + _ = handle + + def thermal_state(self) -> Any: + raise NotImplementedError("not used by NetVLAD strategy tests") + + def current_runtime_label( + self, + ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: + return self.runtime_label + + +@dataclass +class _FakeDescriptorIndex: + descriptor_dim_value: int = 4096 + results: list[tuple[tuple[int, float, float], float]] = field(default_factory=list) + + def search_topk( + self, query: np.ndarray, k: int + ) -> list[tuple[tuple[int, float, float], float]]: + _ = query + if not self.results: + return [ + ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i) + for i in range(k) + ] + return list(self.results[:k]) + + def descriptor_dim(self) -> int: + return self.descriptor_dim_value + + +class _SpyDescriptorNormaliser(DescriptorNormaliser): + """Records the order of ``intra_cluster_normalise`` / ``l2_normalise`` calls.""" + + def __init__(self) -> None: + self.call_order: list[str] = [] + + def intra_cluster_normalise( # type: ignore[override] + self, descriptor: np.ndarray, num_clusters: int + ) -> np.ndarray: + self.call_order.append("intra_cluster_normalise") + return DescriptorNormaliser.intra_cluster_normalise( + descriptor, num_clusters + ) + + def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override] + self.call_order.append("l2_normalise") + return DescriptorNormaliser.l2_normalise(descriptor) + + +def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame: + rng = np.random.default_rng(frame_id) + image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8) + return NavCameraFrame( + frame_id=frame_id, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=image, + camera_calibration_id="test_cam", + ) + + +def _make_calibration() -> CameraCalibration: + return CameraCalibration( + camera_id="test_cam", + intrinsics_3x3=np.eye(3, dtype=np.float64), + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_fixture", + ) + + +def _make_fdr_client() -> FdrClient: + return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False) + + +def _build_strategy( + *, + descriptor_dim: int = 4096, + num_clusters: int = 64, + inference_runtime: _FakeInferenceRuntime | None = None, + descriptor_index: _FakeDescriptorIndex | None = None, + normaliser: DescriptorNormaliser | None = None, + preprocessor: NetVladBackbonePreprocessor | None = None, + fdr_client: FdrClient | None = None, + clock: _StubClock | None = None, +) -> NetVladStrategy: + inference_runtime = inference_runtime or _FakeInferenceRuntime( + descriptor_dim=descriptor_dim + ) + descriptor_index = descriptor_index or _FakeDescriptorIndex( + descriptor_dim_value=descriptor_dim + ) + normaliser = normaliser or DescriptorNormaliser() + preprocessor = preprocessor or NetVladBackbonePreprocessor() + fdr_client = fdr_client or _make_fdr_client() + clock = clock or _StubClock() + handle = _FakeEngineHandle() + bridge = FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=descriptor_dim, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger("test.bridge"), + clock=clock, + ) + return NetVladStrategy( + inference_runtime=inference_runtime, + engine_handle=handle, + descriptor_index=descriptor_index, + preprocessor=preprocessor, + normaliser=normaliser, + faiss_bridge=bridge, + fdr_client=fdr_client, + clock=clock, + logger=logging.getLogger("test.net_vlad"), + descriptor_dim=descriptor_dim, + num_clusters=num_clusters, + ) + + +# --------------------------------------------------------------------------- +# AC-1: Protocol conformance +# --------------------------------------------------------------------------- + + +def test_ac1_protocol_conformance() -> None: + strategy = _build_strategy() + assert isinstance(strategy, VprStrategy) + + +# --------------------------------------------------------------------------- +# AC-2: embed_query → L2-normalised FP16 (descriptor_dim,) +# --------------------------------------------------------------------------- + + +def test_ac2_embed_query_returns_unit_norm_fp16_descriptor() -> None: + # Arrange + runtime = _FakeInferenceRuntime(descriptor_dim=4096) + strategy = _build_strategy(inference_runtime=runtime) + frame = _make_frame() + calibration = _make_calibration() + # Act + query = strategy.embed_query(frame, calibration) + # Assert + embedding = np.asarray(query.embedding) + assert embedding.shape == (4096,) + assert embedding.dtype == np.float16 + norm = float(np.linalg.norm(embedding.astype(np.float32))) + assert norm == pytest.approx(1.0, abs=1e-3) + + +def test_ac2_embed_query_works_with_512_pca_whitened() -> None: + runtime = _FakeInferenceRuntime(descriptor_dim=512) + strategy = _build_strategy(descriptor_dim=512, inference_runtime=runtime) + query = strategy.embed_query(_make_frame(), _make_calibration()) + embedding = np.asarray(query.embedding) + assert embedding.shape == (512,) + assert float(np.linalg.norm(embedding.astype(np.float32))) == pytest.approx( + 1.0, abs=1e-3 + ) + + +# --------------------------------------------------------------------------- +# AC-3: intra_cluster THEN l2 normalisation order +# --------------------------------------------------------------------------- + + +def test_ac3_intra_cluster_called_before_global_l2() -> None: + spy = _SpyDescriptorNormaliser() + runtime = _FakeInferenceRuntime(descriptor_dim=4096) + strategy = _build_strategy(inference_runtime=runtime, normaliser=spy) + strategy.embed_query(_make_frame(), _make_calibration()) + assert spy.call_order == ["intra_cluster_normalise", "l2_normalise"] + + +def test_ac3_intra_cluster_and_l2_each_called_exactly_once() -> None: + spy = _SpyDescriptorNormaliser() + runtime = _FakeInferenceRuntime(descriptor_dim=4096) + strategy = _build_strategy(inference_runtime=runtime, normaliser=spy) + strategy.embed_query(_make_frame(), _make_calibration()) + assert spy.call_order.count("intra_cluster_normalise") == 1 + assert spy.call_order.count("l2_normalise") == 1 + + +# --------------------------------------------------------------------------- +# AC-4: deterministic +# --------------------------------------------------------------------------- + + +def test_ac4_embed_query_deterministic_for_same_frame() -> None: + fixed_output = np.zeros((1, 4096), dtype=np.float16) + rng = np.random.default_rng(2026) + fixed_output[0] = rng.standard_normal(4096).astype(np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=4096, fixed_output=fixed_output + ) + strategy = _build_strategy(inference_runtime=runtime) + frame = _make_frame() + calibration = _make_calibration() + first = strategy.embed_query(frame, calibration) + second = strategy.embed_query(frame, calibration) + third = strategy.embed_query(frame, calibration) + np.testing.assert_array_equal( + np.asarray(first.embedding), np.asarray(second.embedding) + ) + np.testing.assert_array_equal( + np.asarray(second.embedding), np.asarray(third.embedding) + ) + + +# --------------------------------------------------------------------------- +# AC-5: retrieve_topk returns k candidates with backbone_label="net_vlad" +# --------------------------------------------------------------------------- + + +def test_ac5_retrieve_topk_returns_exactly_k_with_net_vlad_label() -> None: + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) + strategy = _build_strategy(descriptor_index=descriptor_index) + query = strategy.embed_query(_make_frame(), _make_calibration()) + result = strategy.retrieve_topk(query, k=10) + assert len(result.candidates) == 10 + assert result.backbone_label == "net_vlad" + assert result.candidates[0].descriptor_dim == 4096 + distances = [c.descriptor_distance for c in result.candidates] + assert distances == sorted(distances) + + +# --------------------------------------------------------------------------- +# AC-6: descriptor_dim() is config-driven and stable +# --------------------------------------------------------------------------- + + +def test_ac6_descriptor_dim_stable_for_4096_instance() -> None: + strategy = _build_strategy(descriptor_dim=4096) + for _ in range(100): + assert strategy.descriptor_dim() == 4096 + + +def test_ac6_descriptor_dim_stable_for_512_instance() -> None: + strategy = _build_strategy(descriptor_dim=512) + for _ in range(100): + assert strategy.descriptor_dim() == 512 + + +# --------------------------------------------------------------------------- +# AC-7: Engine output shape mismatch at create() → ConfigError +# --------------------------------------------------------------------------- + + +def test_ac7_create_rejects_engine_output_shape_mismatch( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — engine produces (1, 2048) but config wants 4096 + wrong_output = np.zeros((1, 2048), dtype=np.float16) + runtime = _FakeInferenceRuntime( + descriptor_dim=4096, fixed_output=wrong_output + ) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) + fdr_client = _make_fdr_client() + config = _build_config(netvlad_descriptor_dim=4096) + + # Act + Assert + with pytest.raises(ConfigError, match=r"engine output shape mismatch"): + create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-8: VprBackboneError on forward failure +# --------------------------------------------------------------------------- + + +def test_ac8_backbone_raises_runtime_error_yields_vpr_backbone_error( + caplog: pytest.LogCaptureFixture, +) -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=4096, raises=RuntimeError("CUDA OOM") + ) + fdr_client = _make_fdr_client() + strategy = _build_strategy( + inference_runtime=runtime, fdr_client=fdr_client + ) + with caplog.at_level(logging.ERROR, logger="test.net_vlad"): + with pytest.raises(VprBackboneError): + strategy.embed_query(_make_frame(), _make_calibration()) + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.backbone_error" + for record in caplog.records + ) + # Assert exactly one FDR record for the backbone error + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"] + assert len(backbone_errors) == 1 + + +def test_ac8_unknown_forward_output_key_yields_vpr_backbone_error() -> None: + runtime = _FakeInferenceRuntime(descriptor_dim=4096, output_key="not_vlad") + strategy = _build_strategy(inference_runtime=runtime) + with pytest.raises(VprBackboneError, match=r"'vlad_descriptor' key"): + strategy.embed_query(_make_frame(), _make_calibration()) + + +def test_ac8_wrong_forward_output_shape_yields_vpr_backbone_error() -> None: + bad = np.zeros((1, 2048), dtype=np.float16) + runtime = _FakeInferenceRuntime(descriptor_dim=4096, fixed_output=bad) + strategy = _build_strategy(descriptor_dim=4096, inference_runtime=runtime) + with pytest.raises(VprBackboneError, match=r"expected \(1, 4096\)"): + strategy.embed_query(_make_frame(), _make_calibration()) + + +# --------------------------------------------------------------------------- +# AC-9: VprPreprocessError on corrupt image +# --------------------------------------------------------------------------- + + +def test_ac9_corrupt_image_yields_vpr_preprocess_error( + caplog: pytest.LogCaptureFixture, +) -> None: + fdr_client = _make_fdr_client() + strategy = _build_strategy(fdr_client=fdr_client) + frame = NavCameraFrame( + frame_id=4242, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image="not-an-array", + camera_calibration_id="test_cam", + ) + with caplog.at_level(logging.ERROR, logger="test.net_vlad"): + with pytest.raises(VprPreprocessError): + strategy.embed_query(frame, _make_calibration()) + assert any( + record.levelno == logging.ERROR + and getattr(record, "kind", None) == "c2.vpr.preprocess_error" + for record in caplog.records + ) + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + preprocess_errors = [ + r for r in records if r.kind == "vpr.preprocess_error" + ] + assert len(preprocess_errors) == 1 + + +def test_ac9_wrong_dtype_image_yields_vpr_preprocess_error() -> None: + strategy = _build_strategy() + bad_image = np.zeros((720, 1280, 3), dtype=np.float32) + frame = NavCameraFrame( + frame_id=42, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=bad_image, + camera_calibration_id="test_cam", + ) + with pytest.raises(VprPreprocessError, match=r"uint8"): + strategy.embed_query(frame, _make_calibration()) + + +def test_ac9_wrong_shape_image_yields_vpr_preprocess_error() -> None: + strategy = _build_strategy() + bad_image = np.zeros((720, 1280, 4), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=42, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=bad_image, + camera_calibration_id="test_cam", + ) + with pytest.raises(VprPreprocessError, match=r"\(H,W\)"): + strategy.embed_query(frame, _make_calibration()) + + +# --------------------------------------------------------------------------- +# AC-10: composition-root wiring → INFO log "c2.vpr.ready" +# --------------------------------------------------------------------------- + + +def _build_config(*, netvlad_descriptor_dim: int = 4096) -> Config: + """Minimal Config carrying only the c2_vpr block needed by ``create()``.""" + c2 = C2VprConfig( + strategy="net_vlad", + backbone_weights_path=Path("/models/net_vlad.pth"), + faiss_index_path=Path("/cache/vpr/index.faiss"), + warn_top1_threshold=0.30, + debug_per_frame_distances=False, + netvlad_descriptor_dim=netvlad_descriptor_dim, + ) + cfg = MagicMock(spec=Config) + cfg.components = {"c2_vpr": c2} + return cfg + + +def test_ac10_create_emits_strategy_ready_info_log( + caplog: pytest.LogCaptureFixture, +) -> None: + runtime = _FakeInferenceRuntime(descriptor_dim=4096) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) + fdr_client = _make_fdr_client() + config = _build_config(netvlad_descriptor_dim=4096) + logger = logging.getLogger("test.create.ac10") + + with caplog.at_level(logging.INFO, logger="test.create.ac10"): + strategy = create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + logger=logger, + ) + + assert isinstance(strategy, NetVladStrategy) + assert strategy.descriptor_dim() == 4096 + ready_logs = [ + r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready" + ] + assert len(ready_logs) == 1 + kv = ready_logs[0].kv # type: ignore[attr-defined] + assert kv["strategy"] == "net_vlad" + assert kv["descriptor_dim"] == 4096 + + +def test_ac10_create_forces_model_name_to_net_vlad() -> None: + runtime = _FakeInferenceRuntime(descriptor_dim=4096) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) + fdr_client = _make_fdr_client() + config = _build_config(netvlad_descriptor_dim=4096) + create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + ) + assert len(runtime.deserialize_calls) == 1 + entry = runtime.deserialize_calls[0] + assert entry.extras["model_name"] == "net_vlad" + + +def test_architecture_factory_closure_carries_descriptor_dim() -> None: + factory = architecture_factory(descriptor_dim=4096) + assert callable(factory) + assert MODEL_NAME == "net_vlad" + + +def test_architecture_factory_rejects_invalid_descriptor_dim() -> None: + with pytest.raises(ValueError, match=r">= 1"): + architecture_factory(descriptor_dim=0) + + +# --------------------------------------------------------------------------- +# AC-11: BUILD_PYTORCH_RUNTIME=OFF → ConfigError fail-fast +# --------------------------------------------------------------------------- + + +def test_ac11_non_pytorch_runtime_rejected_at_create() -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=4096, runtime_label="tensorrt" + ) + descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) + fdr_client = _make_fdr_client() + config = _build_config(netvlad_descriptor_dim=4096) + with pytest.raises( + ConfigError, match=r"BUILD_PYTORCH_RUNTIME=OFF" + ): + create( + config, + descriptor_index=descriptor_index, + inference_runtime=runtime, + fdr_client=fdr_client, + clock=_StubClock(), + ) + + +def test_ac11_onnx_trt_ep_runtime_also_rejected() -> None: + runtime = _FakeInferenceRuntime( + descriptor_dim=4096, runtime_label="onnx_trt_ep" + ) + config = _build_config(netvlad_descriptor_dim=4096) + with pytest.raises(ConfigError, match=r"onnx_trt_ep"): + create( + config, + descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096), + inference_runtime=runtime, + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + + +# --------------------------------------------------------------------------- +# Preprocessor contract +# --------------------------------------------------------------------------- + + +def test_preprocessor_output_shape_and_dtype() -> None: + pp = NetVladBackbonePreprocessor() + rng = np.random.default_rng(2026) + image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=image, + camera_calibration_id="cam", + ) + out = pp.preprocess(frame, _make_calibration()) + assert out.shape == (1, 3, 480, 480) + assert out.dtype == np.float16 + + +def test_preprocessor_input_shape_is_480x480() -> None: + pp = NetVladBackbonePreprocessor() + assert pp.input_shape() == (480, 480) + + +def test_preprocessor_protocol_conformance() -> None: + pp = NetVladBackbonePreprocessor() + assert isinstance(pp, BackbonePreprocessor) + + +def test_preprocessor_accepts_grayscale_input() -> None: + pp = NetVladBackbonePreprocessor() + gray = np.zeros((512, 512), dtype=np.uint8) + frame = NavCameraFrame( + frame_id=1, + timestamp=datetime(2026, 5, 13, 12, 0, 0), + image=gray, + camera_calibration_id="cam", + ) + out = pp.preprocess(frame, _make_calibration()) + assert out.shape == (1, 3, 480, 480) + + +# --------------------------------------------------------------------------- +# Constructor validation +# --------------------------------------------------------------------------- + + +def _make_minimal_strategy_kwargs( + *, descriptor_dim: int, num_clusters: int +) -> dict[str, Any]: + """Build NetVladStrategy constructor kwargs that pass FaissBridge guards. + + The strategy carries its own ``descriptor_dim`` validation; the + bridge has a separate (stricter) ``descriptor_dim > 0`` guard. + Tests that exercise the strategy's own validators MUST use a bridge + with a valid dim — the descriptor_dim mismatch between the bridge + and strategy is fine for these targeted-failure tests. + """ + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = FaissBridge( + descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096), + descriptor_dim=4096, + warn_top1_threshold=0.30, + debug_log_per_frame_distances=False, + fdr_client=fdr_client, + logger=logging.getLogger("test.bridge.guard"), + clock=clock, + ) + return { + "inference_runtime": _FakeInferenceRuntime(descriptor_dim=4096), + "engine_handle": _FakeEngineHandle(), + "descriptor_index": _FakeDescriptorIndex(), + "preprocessor": NetVladBackbonePreprocessor(), + "normaliser": DescriptorNormaliser(), + "faiss_bridge": bridge, + "fdr_client": fdr_client, + "clock": clock, + "logger": logging.getLogger("test.net_vlad.guard"), + "descriptor_dim": descriptor_dim, + "num_clusters": num_clusters, + } + + +def test_constructor_rejects_zero_descriptor_dim() -> None: + with pytest.raises(ValueError, match=r">= 1"): + NetVladStrategy( + **_make_minimal_strategy_kwargs(descriptor_dim=0, num_clusters=64) + ) + + +def test_constructor_rejects_zero_num_clusters() -> None: + with pytest.raises(ValueError, match=r"num_clusters"): + NetVladStrategy( + **_make_minimal_strategy_kwargs( + descriptor_dim=4096, num_clusters=0 + ) + ) + + +def test_constructor_rejects_non_divisible_descriptor_dim() -> None: + # 4097 not divisible by 64 clusters + with pytest.raises(ValueError, match=r"divisible"): + NetVladStrategy( + **_make_minimal_strategy_kwargs( + descriptor_dim=4097, num_clusters=64 + ) + ) + + +# --------------------------------------------------------------------------- +# FDR record emission +# --------------------------------------------------------------------------- + + +def test_embed_query_emits_vpr_embed_query_fdr_record() -> None: + fdr_client = _make_fdr_client() + strategy = _build_strategy(fdr_client=fdr_client) + strategy.embed_query(_make_frame(), _make_calibration()) + records = [] + while True: + r = fdr_client.pop_one() + if r is None: + break + records.append(r) + embed_records = [r for r in records if r.kind == "vpr.embed_query"] + assert len(embed_records) == 1 + payload = embed_records[0].payload + assert payload["backbone_label"] == "net_vlad" + assert payload["descriptor_dim"] == 4096 + assert isinstance(payload["latency_us"], int) + assert payload["latency_us"] > 0 + + +def test_fdr_record_kinds_registered() -> None: + from gps_denied_onboard.fdr_client.records import KNOWN_KINDS + + assert "vpr.embed_query" in KNOWN_KINDS + assert "vpr.backbone_error" in KNOWN_KINDS + assert "vpr.preprocess_error" in KNOWN_KINDS diff --git a/tests/unit/test_az272_fdr_record_schema.py b/tests/unit/test_az272_fdr_record_schema.py index 6c36828..aec97ee 100644 --- a/tests/unit/test_az272_fdr_record_schema.py +++ b/tests/unit/test_az272_fdr_record_schema.py @@ -277,6 +277,27 @@ def _kind_payload(kind: str) -> dict[str, object]: ], "latency_us": 123, } + if kind == "vpr.embed_query": + return { + "frame_id": 4242, + "backbone_label": "net_vlad", + "descriptor_dim": 4096, + "latency_us": 78123, + } + if kind == "vpr.backbone_error": + return { + "frame_id": 4242, + "backbone_label": "net_vlad", + "error_type": "OutOfMemoryError", + "error_message": "CUDA OOM during net_vlad forward", + } + if kind == "vpr.preprocess_error": + return { + "frame_id": 4242, + "backbone_label": "net_vlad", + "error_type": "VprPreprocessError", + "error_message": "image_bytes failed cv2.imdecode", + } raise AssertionError(f"unhandled kind in fixture: {kind!r}") diff --git a/tests/unit/test_az283_descriptor_normaliser.py b/tests/unit/test_az283_descriptor_normaliser.py index e1db64b..49e3986 100644 --- a/tests/unit/test_az283_descriptor_normaliser.py +++ b/tests/unit/test_az283_descriptor_normaliser.py @@ -120,3 +120,80 @@ def test_ac12_no_upward_imports_to_components() -> None: if alias.name.startswith("gps_denied_onboard.components"): bad.append(alias.name) assert not bad, f"descriptor_normaliser must not import components.*; found: {bad}" + + +# AZ-338 — DescriptorNormaliser v1.1.0: intra_cluster_normalise + + +def test_intra_cluster_normalise_per_cluster_unit_norm() -> None: + # Arrange — 4 clusters of dim 3, residuals not yet normalised + raw = np.array( + [3.0, 4.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0, 12.0, 2.0, 2.0, 1.0], + dtype=np.float32, + ) + # Act + out = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4) + # Assert — each cluster sub-vector is unit norm in the intra-cluster sense + reshaped = out.reshape(4, 3) + for k in range(4): + assert float(np.linalg.norm(reshaped[k])) == pytest.approx(1.0, abs=1e-6) + + +def test_intra_cluster_normalise_dtype_preserved_fp16() -> None: + rng = np.random.default_rng(2026) + descriptor = rng.standard_normal(64 * 32).astype(np.float16) + out = DescriptorNormaliser.intra_cluster_normalise(descriptor, num_clusters=64) + assert out.dtype == np.float16 + reshaped = out.astype(np.float32).reshape(64, 32) + for k in range(64): + assert float(np.linalg.norm(reshaped[k])) == pytest.approx(1.0, abs=1e-3) + + +def test_intra_cluster_normalise_zero_cluster_returns_zero() -> None: + # 2 clusters of dim 3; first is all zeros, second is unit-normalisable + raw = np.array([0.0, 0.0, 0.0, 0.0, 3.0, 4.0], dtype=np.float32) + out = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=2) + np.testing.assert_array_equal(out[:3], np.zeros(3, dtype=np.float32)) + np.testing.assert_allclose( + out[3:], np.array([0.0, 0.6, 0.8], dtype=np.float32), atol=1e-6 + ) + + +def test_intra_cluster_normalise_rejects_non_divisible_length() -> None: + raw = np.zeros(7, dtype=np.float32) + with pytest.raises(DescriptorNormaliserError, match=r"not divisible"): + DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=3) + + +def test_intra_cluster_normalise_rejects_2d_input() -> None: + raw = np.zeros((4, 3), dtype=np.float32) + with pytest.raises(DescriptorNormaliserError, match=r"1-D"): + DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4) + + +def test_intra_cluster_normalise_rejects_zero_num_clusters() -> None: + raw = np.zeros(12, dtype=np.float32) + with pytest.raises(DescriptorNormaliserError, match=r">= 1"): + DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=0) + + +def test_intra_cluster_normalise_rejects_bool_num_clusters() -> None: + raw = np.zeros(12, dtype=np.float32) + with pytest.raises(DescriptorNormaliserError, match=r"non-bool"): + DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=True) # type: ignore[arg-type] + + +def test_intra_cluster_normalise_rejects_float64() -> None: + raw = np.zeros(12, dtype=np.float64) + with pytest.raises(DescriptorNormaliserError, match=r"float16.*float32|float32.*float16"): + DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4) + + +def test_intra_cluster_normalise_no_in_place_mutation() -> None: + raw = np.array( + [3.0, 4.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0, 12.0, 2.0, 2.0, 1.0], + dtype=np.float32, + ) + snapshot = raw.copy() + _ = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4) + np.testing.assert_array_equal(raw, snapshot)