Files
Oleksandr Bezdieniezhnykh af0dbe863a [AZ-338] [AZ-283] C2 NetVLAD mandatory simple-baseline VprStrategy
NetVLAD is the C2 comparative baseline per the engine rule (every
production-default backbone ships with a simple-baseline alongside).
Runs on the C7 PyTorch FP16 runtime (NOT TRT) so a TRT engine compile
bug cannot simultaneously break NetVLAD AND UltraVPR.

Production changes:
- c2_vpr/net_vlad.py — NetVladStrategy + module-level create() factory.
  Constructor wires InferenceRuntimeCut + DescriptorIndexCut +
  NetVladBackbonePreprocessor + DescriptorNormaliser + FaissBridge.
  embed_query pipeline: preprocess -> runtime.infer -> dual-stage
  normalisation (intra-cluster THEN global L2) -> VprQuery.
  retrieve_topk delegates one-line to FaissBridge.
- c2_vpr/_net_vlad_architecture.py — Arandjelovic et al. 2016 NetVLAD
  layer over torchvision VGG16 features + optional Linear PCA
  projection to descriptor_dim (default 4096; published Pittsburgh
  reference uses K*D=64*512=32768 raw + Linear(32768, 4096) PCA).
- c2_vpr/_preprocessor_net_vlad.py — OpenCV-based image preprocessor:
  decode -> centre-crop square -> resize (480, 480) -> ImageNet
  normalisation -> FP16 NCHW. Calibration is not consumed (NetVLAD
  is calibration-agnostic per published preprocessing chain).
- c2_vpr/inference_runtime_cut.py — NEW AZ-507 consumer-side cut
  mirroring C7 InferenceRuntime; lets c2_vpr stay AZ-507-clean.
- c2_vpr/config.py — added netvlad_descriptor_dim: int = 4096 knob.
- helpers/descriptor_normaliser.py — added intra_cluster_normalise
  (DescriptorNormaliser v1.0.0 -> v1.1.0; backward-compatible add).
- runtime_root/vpr_factory.py — added _register_strategy_architecture
  helper that binds (MODEL_NAME, architecture_factory(descriptor_dim))
  to C7's architecture registry before delegating to the strategy's
  create() factory. Keeps the c7 import at L4, preserves AZ-507.
- fdr_client/records.py — registered vpr.embed_query,
  vpr.backbone_error, vpr.preprocess_error record kinds.

Tests:
- tests/unit/c2_vpr/test_net_vlad.py — 31 tests covering all 11 ACs +
  preprocessor contract + architecture factory + constructor
  validation + FDR record emission.
- tests/unit/test_az283_descriptor_normaliser.py — +8 tests for the
  new intra_cluster_normalise.
- tests/unit/test_az272_fdr_record_schema.py — +3 fixture payloads.

Full unit suite: 1608 passed / 80 env-skipped (+43 new tests).
Per-batch code review (batch_46_review.md): PASS_WITH_WARNINGS
(4 Low-severity hygiene findings; no Critical/High/Medium).

Architectural notes:
- The spec implied c2_vpr.net_vlad.create() registers the architecture
  with C7. That violates AZ-507 (no cross-component imports). Resolved
  by exposing MODEL_NAME + architecture_factory(descriptor_dim) on the
  strategy module and having the composition root perform the C7 bind.
- C7 PyTorch runtime API names in the spec (forward, load_engine)
  were outdated; aligned implementation with the live v1.0.0 Protocol
  (infer, compile_engine + deserialize_engine). Spec hygiene flagged
  in review F2.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 22:30:29 +03:00

806 lines
28 KiB
Python

"""AZ-338 — NetVLAD mandatory simple-baseline VprStrategy unit tests.
Covers AC-1..AC-11 + preprocessor contract + intra-cluster normalisation
ordering. Uses fakes for :class:`InferenceRuntime`, :class:`DescriptorIndexCut`,
and :class:`FdrClient` so the suite stays AZ-507-clean and torch-free.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
from unittest.mock import MagicMock
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration # noqa: E402
@pytest.fixture(autouse=True)
def _reset_c7_architecture_registry() -> Any:
"""Isolate the C7 architecture registry per test.
``register_architecture`` rejects re-registration with a *different*
factory under the same key; the NetVLAD ``create()`` factory builds
a fresh closure per call, so the global singleton accumulates across
tests and triggers a false-positive ValueError. Resetting the dict
around each test mirrors a fresh process boot.
"""
from gps_denied_onboard.components.c7_inference.architecture_registry import (
_DEFAULT_REGISTRY,
)
snapshot = dict(_DEFAULT_REGISTRY)
_DEFAULT_REGISTRY.clear()
yield
_DEFAULT_REGISTRY.clear()
_DEFAULT_REGISTRY.update(snapshot)
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineCacheEntry,
EngineHandle,
PrecisionMode,
)
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.vpr import VprQuery
from gps_denied_onboard.components.c2_vpr import (
C2VprConfig,
VprStrategy,
)
from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge
from gps_denied_onboard.components.c2_vpr._preprocessor import (
BackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr._preprocessor_net_vlad import (
NetVladBackbonePreprocessor,
)
from gps_denied_onboard.components.c2_vpr.errors import (
VprBackboneError,
VprPreprocessError,
)
from gps_denied_onboard.components.c2_vpr.net_vlad import (
MODEL_NAME,
NetVladStrategy,
architecture_factory,
create,
)
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.fdr_client import FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _StubClock:
next_monotonic_ns: int = 1_000_000_000
step_ns: int = 5_000
fixed_time_ns: int = 1_715_600_000_000_000_000
def monotonic_ns(self) -> int:
v = self.next_monotonic_ns
self.next_monotonic_ns += self.step_ns
return v
def time_ns(self) -> int:
return self.fixed_time_ns
def sleep_until_ns(self, target_ns: int) -> None:
_ = target_ns
class _FakeEngineHandle(EngineHandle):
"""Minimal :class:`EngineHandle` for test wiring."""
def __init__(self, label: str = "net_vlad") -> None:
self.label = label
@dataclass
class _FakeInferenceRuntime:
"""Configurable :class:`InferenceRuntime` for unit tests.
``forward_output`` is the dict returned by :meth:`infer`; ``raises``
when set is raised instead. ``runtime_label`` controls AC-11.
"""
descriptor_dim: int = 4096
raises: BaseException | None = None
runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = (
"pytorch_fp16"
)
fixed_output: np.ndarray | None = None
output_key: str = "vlad_descriptor"
calls: list[dict[str, np.ndarray]] = field(default_factory=list)
deserialize_calls: list[EngineCacheEntry] = field(default_factory=list)
def compile_engine(
self, model_path: Path, build_config: BuildConfig
) -> EngineCacheEntry:
_ = build_config
return EngineCacheEntry(
engine_path=Path(model_path),
sha256_hex="0" * 64,
sm=None,
jp=None,
trt=None,
precision=PrecisionMode.FP16,
extras={"model_name": "from_filename_stem"},
)
def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle:
self.deserialize_calls.append(entry)
return _FakeEngineHandle(label=entry.extras.get("model_name", ""))
def infer(
self,
handle: EngineHandle,
inputs: dict[str, np.ndarray],
) -> dict[str, np.ndarray]:
_ = handle
self.calls.append({k: v.copy() for k, v in inputs.items()})
if self.raises is not None:
raise self.raises
if self.fixed_output is not None:
return {self.output_key: self.fixed_output.copy()}
rng = np.random.default_rng(0xDEADBEEF)
tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16)
return {
self.output_key: tensor.reshape(1, self.descriptor_dim).copy()
}
def release_engine(self, handle: EngineHandle) -> None:
_ = handle
def thermal_state(self) -> Any:
raise NotImplementedError("not used by NetVLAD strategy tests")
def current_runtime_label(
self,
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]:
return self.runtime_label
@dataclass
class _FakeDescriptorIndex:
descriptor_dim_value: int = 4096
results: list[tuple[tuple[int, float, float], float]] = field(default_factory=list)
def search_topk(
self, query: np.ndarray, k: int
) -> list[tuple[tuple[int, float, float], float]]:
_ = query
if not self.results:
return [
((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i)
for i in range(k)
]
return list(self.results[:k])
def descriptor_dim(self) -> int:
return self.descriptor_dim_value
class _SpyDescriptorNormaliser(DescriptorNormaliser):
"""Records the order of ``intra_cluster_normalise`` / ``l2_normalise`` calls."""
def __init__(self) -> None:
self.call_order: list[str] = []
def intra_cluster_normalise( # type: ignore[override]
self, descriptor: np.ndarray, num_clusters: int
) -> np.ndarray:
self.call_order.append("intra_cluster_normalise")
return DescriptorNormaliser.intra_cluster_normalise(
descriptor, num_clusters
)
def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override]
self.call_order.append("l2_normalise")
return DescriptorNormaliser.l2_normalise(descriptor)
def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame:
rng = np.random.default_rng(frame_id)
image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8)
return NavCameraFrame(
frame_id=frame_id,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="test_cam",
)
def _make_calibration() -> CameraCalibration:
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="test_fixture",
)
def _make_fdr_client() -> FdrClient:
return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False)
def _build_strategy(
*,
descriptor_dim: int = 4096,
num_clusters: int = 64,
inference_runtime: _FakeInferenceRuntime | None = None,
descriptor_index: _FakeDescriptorIndex | None = None,
normaliser: DescriptorNormaliser | None = None,
preprocessor: NetVladBackbonePreprocessor | None = None,
fdr_client: FdrClient | None = None,
clock: _StubClock | None = None,
) -> NetVladStrategy:
inference_runtime = inference_runtime or _FakeInferenceRuntime(
descriptor_dim=descriptor_dim
)
descriptor_index = descriptor_index or _FakeDescriptorIndex(
descriptor_dim_value=descriptor_dim
)
normaliser = normaliser or DescriptorNormaliser()
preprocessor = preprocessor or NetVladBackbonePreprocessor()
fdr_client = fdr_client or _make_fdr_client()
clock = clock or _StubClock()
handle = _FakeEngineHandle()
bridge = FaissBridge(
descriptor_index=descriptor_index,
descriptor_dim=descriptor_dim,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger("test.bridge"),
clock=clock,
)
return NetVladStrategy(
inference_runtime=inference_runtime,
engine_handle=handle,
descriptor_index=descriptor_index,
preprocessor=preprocessor,
normaliser=normaliser,
faiss_bridge=bridge,
fdr_client=fdr_client,
clock=clock,
logger=logging.getLogger("test.net_vlad"),
descriptor_dim=descriptor_dim,
num_clusters=num_clusters,
)
# ---------------------------------------------------------------------------
# AC-1: Protocol conformance
# ---------------------------------------------------------------------------
def test_ac1_protocol_conformance() -> None:
strategy = _build_strategy()
assert isinstance(strategy, VprStrategy)
# ---------------------------------------------------------------------------
# AC-2: embed_query → L2-normalised FP16 (descriptor_dim,)
# ---------------------------------------------------------------------------
def test_ac2_embed_query_returns_unit_norm_fp16_descriptor() -> None:
# Arrange
runtime = _FakeInferenceRuntime(descriptor_dim=4096)
strategy = _build_strategy(inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
# Act
query = strategy.embed_query(frame, calibration)
# Assert
embedding = np.asarray(query.embedding)
assert embedding.shape == (4096,)
assert embedding.dtype == np.float16
norm = float(np.linalg.norm(embedding.astype(np.float32)))
assert norm == pytest.approx(1.0, abs=1e-3)
def test_ac2_embed_query_works_with_512_pca_whitened() -> None:
runtime = _FakeInferenceRuntime(descriptor_dim=512)
strategy = _build_strategy(descriptor_dim=512, inference_runtime=runtime)
query = strategy.embed_query(_make_frame(), _make_calibration())
embedding = np.asarray(query.embedding)
assert embedding.shape == (512,)
assert float(np.linalg.norm(embedding.astype(np.float32))) == pytest.approx(
1.0, abs=1e-3
)
# ---------------------------------------------------------------------------
# AC-3: intra_cluster THEN l2 normalisation order
# ---------------------------------------------------------------------------
def test_ac3_intra_cluster_called_before_global_l2() -> None:
spy = _SpyDescriptorNormaliser()
runtime = _FakeInferenceRuntime(descriptor_dim=4096)
strategy = _build_strategy(inference_runtime=runtime, normaliser=spy)
strategy.embed_query(_make_frame(), _make_calibration())
assert spy.call_order == ["intra_cluster_normalise", "l2_normalise"]
def test_ac3_intra_cluster_and_l2_each_called_exactly_once() -> None:
spy = _SpyDescriptorNormaliser()
runtime = _FakeInferenceRuntime(descriptor_dim=4096)
strategy = _build_strategy(inference_runtime=runtime, normaliser=spy)
strategy.embed_query(_make_frame(), _make_calibration())
assert spy.call_order.count("intra_cluster_normalise") == 1
assert spy.call_order.count("l2_normalise") == 1
# ---------------------------------------------------------------------------
# AC-4: deterministic
# ---------------------------------------------------------------------------
def test_ac4_embed_query_deterministic_for_same_frame() -> None:
fixed_output = np.zeros((1, 4096), dtype=np.float16)
rng = np.random.default_rng(2026)
fixed_output[0] = rng.standard_normal(4096).astype(np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=4096, fixed_output=fixed_output
)
strategy = _build_strategy(inference_runtime=runtime)
frame = _make_frame()
calibration = _make_calibration()
first = strategy.embed_query(frame, calibration)
second = strategy.embed_query(frame, calibration)
third = strategy.embed_query(frame, calibration)
np.testing.assert_array_equal(
np.asarray(first.embedding), np.asarray(second.embedding)
)
np.testing.assert_array_equal(
np.asarray(second.embedding), np.asarray(third.embedding)
)
# ---------------------------------------------------------------------------
# AC-5: retrieve_topk returns k candidates with backbone_label="net_vlad"
# ---------------------------------------------------------------------------
def test_ac5_retrieve_topk_returns_exactly_k_with_net_vlad_label() -> None:
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096)
strategy = _build_strategy(descriptor_index=descriptor_index)
query = strategy.embed_query(_make_frame(), _make_calibration())
result = strategy.retrieve_topk(query, k=10)
assert len(result.candidates) == 10
assert result.backbone_label == "net_vlad"
assert result.candidates[0].descriptor_dim == 4096
distances = [c.descriptor_distance for c in result.candidates]
assert distances == sorted(distances)
# ---------------------------------------------------------------------------
# AC-6: descriptor_dim() is config-driven and stable
# ---------------------------------------------------------------------------
def test_ac6_descriptor_dim_stable_for_4096_instance() -> None:
strategy = _build_strategy(descriptor_dim=4096)
for _ in range(100):
assert strategy.descriptor_dim() == 4096
def test_ac6_descriptor_dim_stable_for_512_instance() -> None:
strategy = _build_strategy(descriptor_dim=512)
for _ in range(100):
assert strategy.descriptor_dim() == 512
# ---------------------------------------------------------------------------
# AC-7: Engine output shape mismatch at create() → ConfigError
# ---------------------------------------------------------------------------
def test_ac7_create_rejects_engine_output_shape_mismatch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange — engine produces (1, 2048) but config wants 4096
wrong_output = np.zeros((1, 2048), dtype=np.float16)
runtime = _FakeInferenceRuntime(
descriptor_dim=4096, fixed_output=wrong_output
)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096)
fdr_client = _make_fdr_client()
config = _build_config(netvlad_descriptor_dim=4096)
# Act + Assert
with pytest.raises(ConfigError, match=r"engine output shape mismatch"):
create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# AC-8: VprBackboneError on forward failure
# ---------------------------------------------------------------------------
def test_ac8_backbone_raises_runtime_error_yields_vpr_backbone_error(
caplog: pytest.LogCaptureFixture,
) -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=4096, raises=RuntimeError("CUDA OOM")
)
fdr_client = _make_fdr_client()
strategy = _build_strategy(
inference_runtime=runtime, fdr_client=fdr_client
)
with caplog.at_level(logging.ERROR, logger="test.net_vlad"):
with pytest.raises(VprBackboneError):
strategy.embed_query(_make_frame(), _make_calibration())
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.backbone_error"
for record in caplog.records
)
# Assert exactly one FDR record for the backbone error
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"]
assert len(backbone_errors) == 1
def test_ac8_unknown_forward_output_key_yields_vpr_backbone_error() -> None:
runtime = _FakeInferenceRuntime(descriptor_dim=4096, output_key="not_vlad")
strategy = _build_strategy(inference_runtime=runtime)
with pytest.raises(VprBackboneError, match=r"'vlad_descriptor' key"):
strategy.embed_query(_make_frame(), _make_calibration())
def test_ac8_wrong_forward_output_shape_yields_vpr_backbone_error() -> None:
bad = np.zeros((1, 2048), dtype=np.float16)
runtime = _FakeInferenceRuntime(descriptor_dim=4096, fixed_output=bad)
strategy = _build_strategy(descriptor_dim=4096, inference_runtime=runtime)
with pytest.raises(VprBackboneError, match=r"expected \(1, 4096\)"):
strategy.embed_query(_make_frame(), _make_calibration())
# ---------------------------------------------------------------------------
# AC-9: VprPreprocessError on corrupt image
# ---------------------------------------------------------------------------
def test_ac9_corrupt_image_yields_vpr_preprocess_error(
caplog: pytest.LogCaptureFixture,
) -> None:
fdr_client = _make_fdr_client()
strategy = _build_strategy(fdr_client=fdr_client)
frame = NavCameraFrame(
frame_id=4242,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image="not-an-array",
camera_calibration_id="test_cam",
)
with caplog.at_level(logging.ERROR, logger="test.net_vlad"):
with pytest.raises(VprPreprocessError):
strategy.embed_query(frame, _make_calibration())
assert any(
record.levelno == logging.ERROR
and getattr(record, "kind", None) == "c2.vpr.preprocess_error"
for record in caplog.records
)
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
preprocess_errors = [
r for r in records if r.kind == "vpr.preprocess_error"
]
assert len(preprocess_errors) == 1
def test_ac9_wrong_dtype_image_yields_vpr_preprocess_error() -> None:
strategy = _build_strategy()
bad_image = np.zeros((720, 1280, 3), dtype=np.float32)
frame = NavCameraFrame(
frame_id=42,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=bad_image,
camera_calibration_id="test_cam",
)
with pytest.raises(VprPreprocessError, match=r"uint8"):
strategy.embed_query(frame, _make_calibration())
def test_ac9_wrong_shape_image_yields_vpr_preprocess_error() -> None:
strategy = _build_strategy()
bad_image = np.zeros((720, 1280, 4), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=42,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=bad_image,
camera_calibration_id="test_cam",
)
with pytest.raises(VprPreprocessError, match=r"\(H,W\)"):
strategy.embed_query(frame, _make_calibration())
# ---------------------------------------------------------------------------
# AC-10: composition-root wiring → INFO log "c2.vpr.ready"
# ---------------------------------------------------------------------------
def _build_config(*, netvlad_descriptor_dim: int = 4096) -> Config:
"""Minimal Config carrying only the c2_vpr block needed by ``create()``."""
c2 = C2VprConfig(
strategy="net_vlad",
backbone_weights_path=Path("/models/net_vlad.pth"),
faiss_index_path=Path("/cache/vpr/index.faiss"),
warn_top1_threshold=0.30,
debug_per_frame_distances=False,
netvlad_descriptor_dim=netvlad_descriptor_dim,
)
cfg = MagicMock(spec=Config)
cfg.components = {"c2_vpr": c2}
return cfg
def test_ac10_create_emits_strategy_ready_info_log(
caplog: pytest.LogCaptureFixture,
) -> None:
runtime = _FakeInferenceRuntime(descriptor_dim=4096)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096)
fdr_client = _make_fdr_client()
config = _build_config(netvlad_descriptor_dim=4096)
logger = logging.getLogger("test.create.ac10")
with caplog.at_level(logging.INFO, logger="test.create.ac10"):
strategy = create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
logger=logger,
)
assert isinstance(strategy, NetVladStrategy)
assert strategy.descriptor_dim() == 4096
ready_logs = [
r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready"
]
assert len(ready_logs) == 1
kv = ready_logs[0].kv # type: ignore[attr-defined]
assert kv["strategy"] == "net_vlad"
assert kv["descriptor_dim"] == 4096
def test_ac10_create_forces_model_name_to_net_vlad() -> None:
runtime = _FakeInferenceRuntime(descriptor_dim=4096)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096)
fdr_client = _make_fdr_client()
config = _build_config(netvlad_descriptor_dim=4096)
create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
)
assert len(runtime.deserialize_calls) == 1
entry = runtime.deserialize_calls[0]
assert entry.extras["model_name"] == "net_vlad"
def test_architecture_factory_closure_carries_descriptor_dim() -> None:
factory = architecture_factory(descriptor_dim=4096)
assert callable(factory)
assert MODEL_NAME == "net_vlad"
def test_architecture_factory_rejects_invalid_descriptor_dim() -> None:
with pytest.raises(ValueError, match=r">= 1"):
architecture_factory(descriptor_dim=0)
# ---------------------------------------------------------------------------
# AC-11: BUILD_PYTORCH_RUNTIME=OFF → ConfigError fail-fast
# ---------------------------------------------------------------------------
def test_ac11_non_pytorch_runtime_rejected_at_create() -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=4096, runtime_label="tensorrt"
)
descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096)
fdr_client = _make_fdr_client()
config = _build_config(netvlad_descriptor_dim=4096)
with pytest.raises(
ConfigError, match=r"BUILD_PYTORCH_RUNTIME=OFF"
):
create(
config,
descriptor_index=descriptor_index,
inference_runtime=runtime,
fdr_client=fdr_client,
clock=_StubClock(),
)
def test_ac11_onnx_trt_ep_runtime_also_rejected() -> None:
runtime = _FakeInferenceRuntime(
descriptor_dim=4096, runtime_label="onnx_trt_ep"
)
config = _build_config(netvlad_descriptor_dim=4096)
with pytest.raises(ConfigError, match=r"onnx_trt_ep"):
create(
config,
descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096),
inference_runtime=runtime,
fdr_client=_make_fdr_client(),
clock=_StubClock(),
)
# ---------------------------------------------------------------------------
# Preprocessor contract
# ---------------------------------------------------------------------------
def test_preprocessor_output_shape_and_dtype() -> None:
pp = NetVladBackbonePreprocessor()
rng = np.random.default_rng(2026)
image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=image,
camera_calibration_id="cam",
)
out = pp.preprocess(frame, _make_calibration())
assert out.shape == (1, 3, 480, 480)
assert out.dtype == np.float16
def test_preprocessor_input_shape_is_480x480() -> None:
pp = NetVladBackbonePreprocessor()
assert pp.input_shape() == (480, 480)
def test_preprocessor_protocol_conformance() -> None:
pp = NetVladBackbonePreprocessor()
assert isinstance(pp, BackbonePreprocessor)
def test_preprocessor_accepts_grayscale_input() -> None:
pp = NetVladBackbonePreprocessor()
gray = np.zeros((512, 512), dtype=np.uint8)
frame = NavCameraFrame(
frame_id=1,
timestamp=datetime(2026, 5, 13, 12, 0, 0),
image=gray,
camera_calibration_id="cam",
)
out = pp.preprocess(frame, _make_calibration())
assert out.shape == (1, 3, 480, 480)
# ---------------------------------------------------------------------------
# Constructor validation
# ---------------------------------------------------------------------------
def _make_minimal_strategy_kwargs(
*, descriptor_dim: int, num_clusters: int
) -> dict[str, Any]:
"""Build NetVladStrategy constructor kwargs that pass FaissBridge guards.
The strategy carries its own ``descriptor_dim`` validation; the
bridge has a separate (stricter) ``descriptor_dim > 0`` guard.
Tests that exercise the strategy's own validators MUST use a bridge
with a valid dim — the descriptor_dim mismatch between the bridge
and strategy is fine for these targeted-failure tests.
"""
fdr_client = _make_fdr_client()
clock = _StubClock()
bridge = FaissBridge(
descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096),
descriptor_dim=4096,
warn_top1_threshold=0.30,
debug_log_per_frame_distances=False,
fdr_client=fdr_client,
logger=logging.getLogger("test.bridge.guard"),
clock=clock,
)
return {
"inference_runtime": _FakeInferenceRuntime(descriptor_dim=4096),
"engine_handle": _FakeEngineHandle(),
"descriptor_index": _FakeDescriptorIndex(),
"preprocessor": NetVladBackbonePreprocessor(),
"normaliser": DescriptorNormaliser(),
"faiss_bridge": bridge,
"fdr_client": fdr_client,
"clock": clock,
"logger": logging.getLogger("test.net_vlad.guard"),
"descriptor_dim": descriptor_dim,
"num_clusters": num_clusters,
}
def test_constructor_rejects_zero_descriptor_dim() -> None:
with pytest.raises(ValueError, match=r">= 1"):
NetVladStrategy(
**_make_minimal_strategy_kwargs(descriptor_dim=0, num_clusters=64)
)
def test_constructor_rejects_zero_num_clusters() -> None:
with pytest.raises(ValueError, match=r"num_clusters"):
NetVladStrategy(
**_make_minimal_strategy_kwargs(
descriptor_dim=4096, num_clusters=0
)
)
def test_constructor_rejects_non_divisible_descriptor_dim() -> None:
# 4097 not divisible by 64 clusters
with pytest.raises(ValueError, match=r"divisible"):
NetVladStrategy(
**_make_minimal_strategy_kwargs(
descriptor_dim=4097, num_clusters=64
)
)
# ---------------------------------------------------------------------------
# FDR record emission
# ---------------------------------------------------------------------------
def test_embed_query_emits_vpr_embed_query_fdr_record() -> None:
fdr_client = _make_fdr_client()
strategy = _build_strategy(fdr_client=fdr_client)
strategy.embed_query(_make_frame(), _make_calibration())
records = []
while True:
r = fdr_client.pop_one()
if r is None:
break
records.append(r)
embed_records = [r for r in records if r.kind == "vpr.embed_query"]
assert len(embed_records) == 1
payload = embed_records[0].payload
assert payload["backbone_label"] == "net_vlad"
assert payload["descriptor_dim"] == 4096
assert isinstance(payload["latency_us"], int)
assert payload["latency_us"] > 0
def test_fdr_record_kinds_registered() -> None:
from gps_denied_onboard.fdr_client.records import KNOWN_KINDS
assert "vpr.embed_query" in KNOWN_KINDS
assert "vpr.backbone_error" in KNOWN_KINDS
assert "vpr.preprocess_error" in KNOWN_KINDS