"""AZ-338 — NetVLAD mandatory simple-baseline VprStrategy unit tests. Covers AC-1..AC-11 + preprocessor contract + intra-cluster normalisation ordering. Uses fakes for :class:`InferenceRuntime`, :class:`DescriptorIndexCut`, and :class:`FdrClient` so the suite stays AZ-507-clean and torch-free. """ from __future__ import annotations import logging from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Literal from unittest.mock import MagicMock import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration # noqa: E402 @pytest.fixture(autouse=True) def _reset_c7_architecture_registry() -> Any: """Isolate the C7 architecture registry per test. ``register_architecture`` rejects re-registration with a *different* factory under the same key; the NetVLAD ``create()`` factory builds a fresh closure per call, so the global singleton accumulates across tests and triggers a false-positive ValueError. Resetting the dict around each test mirrors a fresh process boot. """ from gps_denied_onboard.components.c7_inference.architecture_registry import ( _DEFAULT_REGISTRY, ) snapshot = dict(_DEFAULT_REGISTRY) _DEFAULT_REGISTRY.clear() yield _DEFAULT_REGISTRY.clear() _DEFAULT_REGISTRY.update(snapshot) from gps_denied_onboard._types.inference import ( BuildConfig, EngineCacheEntry, EngineHandle, PrecisionMode, ) from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard._types.vpr import VprQuery from gps_denied_onboard.components.c2_vpr import ( C2VprConfig, VprStrategy, ) from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge from gps_denied_onboard.components.c2_vpr._preprocessor import ( BackbonePreprocessor, ) from gps_denied_onboard.components.c2_vpr._preprocessor_net_vlad import ( NetVladBackbonePreprocessor, ) from gps_denied_onboard.components.c2_vpr.errors import ( VprBackboneError, VprPreprocessError, ) from gps_denied_onboard.components.c2_vpr.net_vlad import ( MODEL_NAME, NetVladStrategy, architecture_factory, create, ) from gps_denied_onboard.config.schema import Config, ConfigError from gps_denied_onboard.fdr_client import FdrClient from gps_denied_onboard.fdr_client.records import ( CURRENT_SCHEMA_VERSION, FdrRecord, ) from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser # --------------------------------------------------------------------------- # Fakes # --------------------------------------------------------------------------- @dataclass class _StubClock: next_monotonic_ns: int = 1_000_000_000 step_ns: int = 5_000 fixed_time_ns: int = 1_715_600_000_000_000_000 def monotonic_ns(self) -> int: v = self.next_monotonic_ns self.next_monotonic_ns += self.step_ns return v def time_ns(self) -> int: return self.fixed_time_ns def sleep_until_ns(self, target_ns: int) -> None: _ = target_ns class _FakeEngineHandle(EngineHandle): """Minimal :class:`EngineHandle` for test wiring.""" def __init__(self, label: str = "net_vlad") -> None: self.label = label @dataclass class _FakeInferenceRuntime: """Configurable :class:`InferenceRuntime` for unit tests. ``forward_output`` is the dict returned by :meth:`infer`; ``raises`` when set is raised instead. ``runtime_label`` controls AC-11. """ descriptor_dim: int = 4096 raises: BaseException | None = None runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = ( "pytorch_fp16" ) fixed_output: np.ndarray | None = None output_key: str = "vlad_descriptor" calls: list[dict[str, np.ndarray]] = field(default_factory=list) deserialize_calls: list[EngineCacheEntry] = field(default_factory=list) def compile_engine( self, model_path: Path, build_config: BuildConfig ) -> EngineCacheEntry: _ = build_config return EngineCacheEntry( engine_path=Path(model_path), sha256_hex="0" * 64, sm=None, jp=None, trt=None, precision=PrecisionMode.FP16, extras={"model_name": "from_filename_stem"}, ) def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: self.deserialize_calls.append(entry) return _FakeEngineHandle(label=entry.extras.get("model_name", "")) def infer( self, handle: EngineHandle, inputs: dict[str, np.ndarray], ) -> dict[str, np.ndarray]: _ = handle self.calls.append({k: v.copy() for k, v in inputs.items()}) if self.raises is not None: raise self.raises if self.fixed_output is not None: return {self.output_key: self.fixed_output.copy()} rng = np.random.default_rng(0xDEADBEEF) tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16) return { self.output_key: tensor.reshape(1, self.descriptor_dim).copy() } def release_engine(self, handle: EngineHandle) -> None: _ = handle def thermal_state(self) -> Any: raise NotImplementedError("not used by NetVLAD strategy tests") def current_runtime_label( self, ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: return self.runtime_label @dataclass class _FakeDescriptorIndex: descriptor_dim_value: int = 4096 results: list[tuple[tuple[int, float, float], float]] = field(default_factory=list) def search_topk( self, query: np.ndarray, k: int ) -> list[tuple[tuple[int, float, float], float]]: _ = query if not self.results: return [ ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i) for i in range(k) ] return list(self.results[:k]) def descriptor_dim(self) -> int: return self.descriptor_dim_value class _SpyDescriptorNormaliser(DescriptorNormaliser): """Records the order of ``intra_cluster_normalise`` / ``l2_normalise`` calls.""" def __init__(self) -> None: self.call_order: list[str] = [] def intra_cluster_normalise( # type: ignore[override] self, descriptor: np.ndarray, num_clusters: int ) -> np.ndarray: self.call_order.append("intra_cluster_normalise") return DescriptorNormaliser.intra_cluster_normalise( descriptor, num_clusters ) def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override] self.call_order.append("l2_normalise") return DescriptorNormaliser.l2_normalise(descriptor) def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame: rng = np.random.default_rng(frame_id) image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8) return NavCameraFrame( frame_id=frame_id, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=image, camera_calibration_id="test_cam", ) def _make_calibration() -> CameraCalibration: return CameraCalibration( camera_id="test_cam", intrinsics_3x3=np.eye(3, dtype=np.float64), distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="test_fixture", ) def _make_fdr_client() -> FdrClient: return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False) def _build_strategy( *, descriptor_dim: int = 4096, num_clusters: int = 64, inference_runtime: _FakeInferenceRuntime | None = None, descriptor_index: _FakeDescriptorIndex | None = None, normaliser: DescriptorNormaliser | None = None, preprocessor: NetVladBackbonePreprocessor | None = None, fdr_client: FdrClient | None = None, clock: _StubClock | None = None, ) -> NetVladStrategy: inference_runtime = inference_runtime or _FakeInferenceRuntime( descriptor_dim=descriptor_dim ) descriptor_index = descriptor_index or _FakeDescriptorIndex( descriptor_dim_value=descriptor_dim ) normaliser = normaliser or DescriptorNormaliser() preprocessor = preprocessor or NetVladBackbonePreprocessor() fdr_client = fdr_client or _make_fdr_client() clock = clock or _StubClock() handle = _FakeEngineHandle() bridge = FaissBridge( descriptor_index=descriptor_index, descriptor_dim=descriptor_dim, warn_top1_threshold=0.30, debug_log_per_frame_distances=False, fdr_client=fdr_client, logger=logging.getLogger("test.bridge"), clock=clock, ) return NetVladStrategy( inference_runtime=inference_runtime, engine_handle=handle, descriptor_index=descriptor_index, preprocessor=preprocessor, normaliser=normaliser, faiss_bridge=bridge, fdr_client=fdr_client, clock=clock, logger=logging.getLogger("test.net_vlad"), descriptor_dim=descriptor_dim, num_clusters=num_clusters, ) # --------------------------------------------------------------------------- # AC-1: Protocol conformance # --------------------------------------------------------------------------- def test_ac1_protocol_conformance() -> None: strategy = _build_strategy() assert isinstance(strategy, VprStrategy) # --------------------------------------------------------------------------- # AC-2: embed_query → L2-normalised FP16 (descriptor_dim,) # --------------------------------------------------------------------------- def test_ac2_embed_query_returns_unit_norm_fp16_descriptor() -> None: # Arrange runtime = _FakeInferenceRuntime(descriptor_dim=4096) strategy = _build_strategy(inference_runtime=runtime) frame = _make_frame() calibration = _make_calibration() # Act query = strategy.embed_query(frame, calibration) # Assert embedding = np.asarray(query.embedding) assert embedding.shape == (4096,) assert embedding.dtype == np.float16 norm = float(np.linalg.norm(embedding.astype(np.float32))) assert norm == pytest.approx(1.0, abs=1e-3) def test_ac2_embed_query_works_with_512_pca_whitened() -> None: runtime = _FakeInferenceRuntime(descriptor_dim=512) strategy = _build_strategy(descriptor_dim=512, inference_runtime=runtime) query = strategy.embed_query(_make_frame(), _make_calibration()) embedding = np.asarray(query.embedding) assert embedding.shape == (512,) assert float(np.linalg.norm(embedding.astype(np.float32))) == pytest.approx( 1.0, abs=1e-3 ) # --------------------------------------------------------------------------- # AC-3: intra_cluster THEN l2 normalisation order # --------------------------------------------------------------------------- def test_ac3_intra_cluster_called_before_global_l2() -> None: spy = _SpyDescriptorNormaliser() runtime = _FakeInferenceRuntime(descriptor_dim=4096) strategy = _build_strategy(inference_runtime=runtime, normaliser=spy) strategy.embed_query(_make_frame(), _make_calibration()) assert spy.call_order == ["intra_cluster_normalise", "l2_normalise"] def test_ac3_intra_cluster_and_l2_each_called_exactly_once() -> None: spy = _SpyDescriptorNormaliser() runtime = _FakeInferenceRuntime(descriptor_dim=4096) strategy = _build_strategy(inference_runtime=runtime, normaliser=spy) strategy.embed_query(_make_frame(), _make_calibration()) assert spy.call_order.count("intra_cluster_normalise") == 1 assert spy.call_order.count("l2_normalise") == 1 # --------------------------------------------------------------------------- # AC-4: deterministic # --------------------------------------------------------------------------- def test_ac4_embed_query_deterministic_for_same_frame() -> None: fixed_output = np.zeros((1, 4096), dtype=np.float16) rng = np.random.default_rng(2026) fixed_output[0] = rng.standard_normal(4096).astype(np.float16) runtime = _FakeInferenceRuntime( descriptor_dim=4096, fixed_output=fixed_output ) strategy = _build_strategy(inference_runtime=runtime) frame = _make_frame() calibration = _make_calibration() first = strategy.embed_query(frame, calibration) second = strategy.embed_query(frame, calibration) third = strategy.embed_query(frame, calibration) np.testing.assert_array_equal( np.asarray(first.embedding), np.asarray(second.embedding) ) np.testing.assert_array_equal( np.asarray(second.embedding), np.asarray(third.embedding) ) # --------------------------------------------------------------------------- # AC-5: retrieve_topk returns k candidates with backbone_label="net_vlad" # --------------------------------------------------------------------------- def test_ac5_retrieve_topk_returns_exactly_k_with_net_vlad_label() -> None: descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) strategy = _build_strategy(descriptor_index=descriptor_index) query = strategy.embed_query(_make_frame(), _make_calibration()) result = strategy.retrieve_topk(query, k=10) assert len(result.candidates) == 10 assert result.backbone_label == "net_vlad" assert result.candidates[0].descriptor_dim == 4096 distances = [c.descriptor_distance for c in result.candidates] assert distances == sorted(distances) # --------------------------------------------------------------------------- # AC-6: descriptor_dim() is config-driven and stable # --------------------------------------------------------------------------- def test_ac6_descriptor_dim_stable_for_4096_instance() -> None: strategy = _build_strategy(descriptor_dim=4096) for _ in range(100): assert strategy.descriptor_dim() == 4096 def test_ac6_descriptor_dim_stable_for_512_instance() -> None: strategy = _build_strategy(descriptor_dim=512) for _ in range(100): assert strategy.descriptor_dim() == 512 # --------------------------------------------------------------------------- # AC-7: Engine output shape mismatch at create() → ConfigError # --------------------------------------------------------------------------- def test_ac7_create_rejects_engine_output_shape_mismatch( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange — engine produces (1, 2048) but config wants 4096 wrong_output = np.zeros((1, 2048), dtype=np.float16) runtime = _FakeInferenceRuntime( descriptor_dim=4096, fixed_output=wrong_output ) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) fdr_client = _make_fdr_client() config = _build_config(netvlad_descriptor_dim=4096) # Act + Assert with pytest.raises(ConfigError, match=r"engine output shape mismatch"): create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), ) # --------------------------------------------------------------------------- # AC-8: VprBackboneError on forward failure # --------------------------------------------------------------------------- def test_ac8_backbone_raises_runtime_error_yields_vpr_backbone_error( caplog: pytest.LogCaptureFixture, ) -> None: runtime = _FakeInferenceRuntime( descriptor_dim=4096, raises=RuntimeError("CUDA OOM") ) fdr_client = _make_fdr_client() strategy = _build_strategy( inference_runtime=runtime, fdr_client=fdr_client ) with caplog.at_level(logging.ERROR, logger="test.net_vlad"): with pytest.raises(VprBackboneError): strategy.embed_query(_make_frame(), _make_calibration()) assert any( record.levelno == logging.ERROR and getattr(record, "kind", None) == "c2.vpr.backbone_error" for record in caplog.records ) # Assert exactly one FDR record for the backbone error records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"] assert len(backbone_errors) == 1 def test_ac8_unknown_forward_output_key_yields_vpr_backbone_error() -> None: runtime = _FakeInferenceRuntime(descriptor_dim=4096, output_key="not_vlad") strategy = _build_strategy(inference_runtime=runtime) with pytest.raises(VprBackboneError, match=r"'vlad_descriptor' key"): strategy.embed_query(_make_frame(), _make_calibration()) def test_ac8_wrong_forward_output_shape_yields_vpr_backbone_error() -> None: bad = np.zeros((1, 2048), dtype=np.float16) runtime = _FakeInferenceRuntime(descriptor_dim=4096, fixed_output=bad) strategy = _build_strategy(descriptor_dim=4096, inference_runtime=runtime) with pytest.raises(VprBackboneError, match=r"expected \(1, 4096\)"): strategy.embed_query(_make_frame(), _make_calibration()) # --------------------------------------------------------------------------- # AC-9: VprPreprocessError on corrupt image # --------------------------------------------------------------------------- def test_ac9_corrupt_image_yields_vpr_preprocess_error( caplog: pytest.LogCaptureFixture, ) -> None: fdr_client = _make_fdr_client() strategy = _build_strategy(fdr_client=fdr_client) frame = NavCameraFrame( frame_id=4242, timestamp=datetime(2026, 5, 13, 12, 0, 0), image="not-an-array", camera_calibration_id="test_cam", ) with caplog.at_level(logging.ERROR, logger="test.net_vlad"): with pytest.raises(VprPreprocessError): strategy.embed_query(frame, _make_calibration()) assert any( record.levelno == logging.ERROR and getattr(record, "kind", None) == "c2.vpr.preprocess_error" for record in caplog.records ) records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) preprocess_errors = [ r for r in records if r.kind == "vpr.preprocess_error" ] assert len(preprocess_errors) == 1 def test_ac9_wrong_dtype_image_yields_vpr_preprocess_error() -> None: strategy = _build_strategy() bad_image = np.zeros((720, 1280, 3), dtype=np.float32) frame = NavCameraFrame( frame_id=42, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=bad_image, camera_calibration_id="test_cam", ) with pytest.raises(VprPreprocessError, match=r"uint8"): strategy.embed_query(frame, _make_calibration()) def test_ac9_wrong_shape_image_yields_vpr_preprocess_error() -> None: strategy = _build_strategy() bad_image = np.zeros((720, 1280, 4), dtype=np.uint8) frame = NavCameraFrame( frame_id=42, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=bad_image, camera_calibration_id="test_cam", ) with pytest.raises(VprPreprocessError, match=r"\(H,W\)"): strategy.embed_query(frame, _make_calibration()) # --------------------------------------------------------------------------- # AC-10: composition-root wiring → INFO log "c2.vpr.ready" # --------------------------------------------------------------------------- def _build_config(*, netvlad_descriptor_dim: int = 4096) -> Config: """Minimal Config carrying only the c2_vpr block needed by ``create()``.""" c2 = C2VprConfig( strategy="net_vlad", backbone_weights_path=Path("/models/net_vlad.pth"), faiss_index_path=Path("/cache/vpr/index.faiss"), warn_top1_threshold=0.30, debug_per_frame_distances=False, netvlad_descriptor_dim=netvlad_descriptor_dim, ) cfg = MagicMock(spec=Config) cfg.components = {"c2_vpr": c2} return cfg def test_ac10_create_emits_strategy_ready_info_log( caplog: pytest.LogCaptureFixture, ) -> None: runtime = _FakeInferenceRuntime(descriptor_dim=4096) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) fdr_client = _make_fdr_client() config = _build_config(netvlad_descriptor_dim=4096) logger = logging.getLogger("test.create.ac10") with caplog.at_level(logging.INFO, logger="test.create.ac10"): strategy = create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), logger=logger, ) assert isinstance(strategy, NetVladStrategy) assert strategy.descriptor_dim() == 4096 ready_logs = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready" ] assert len(ready_logs) == 1 kv = ready_logs[0].kv # type: ignore[attr-defined] assert kv["strategy"] == "net_vlad" assert kv["descriptor_dim"] == 4096 def test_ac10_create_forces_model_name_to_net_vlad() -> None: runtime = _FakeInferenceRuntime(descriptor_dim=4096) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) fdr_client = _make_fdr_client() config = _build_config(netvlad_descriptor_dim=4096) create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), ) assert len(runtime.deserialize_calls) == 1 entry = runtime.deserialize_calls[0] assert entry.extras["model_name"] == "net_vlad" def test_architecture_factory_closure_carries_descriptor_dim() -> None: factory = architecture_factory(descriptor_dim=4096) assert callable(factory) assert MODEL_NAME == "net_vlad" def test_architecture_factory_rejects_invalid_descriptor_dim() -> None: with pytest.raises(ValueError, match=r">= 1"): architecture_factory(descriptor_dim=0) # --------------------------------------------------------------------------- # AC-11: BUILD_PYTORCH_RUNTIME=OFF → ConfigError fail-fast # --------------------------------------------------------------------------- def test_ac11_non_pytorch_runtime_rejected_at_create() -> None: runtime = _FakeInferenceRuntime( descriptor_dim=4096, runtime_label="tensorrt" ) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=4096) fdr_client = _make_fdr_client() config = _build_config(netvlad_descriptor_dim=4096) with pytest.raises( ConfigError, match=r"BUILD_PYTORCH_RUNTIME=OFF" ): create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), ) def test_ac11_onnx_trt_ep_runtime_also_rejected() -> None: runtime = _FakeInferenceRuntime( descriptor_dim=4096, runtime_label="onnx_trt_ep" ) config = _build_config(netvlad_descriptor_dim=4096) with pytest.raises(ConfigError, match=r"onnx_trt_ep"): create( config, descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096), inference_runtime=runtime, fdr_client=_make_fdr_client(), clock=_StubClock(), ) # --------------------------------------------------------------------------- # Preprocessor contract # --------------------------------------------------------------------------- def test_preprocessor_output_shape_and_dtype() -> None: pp = NetVladBackbonePreprocessor() rng = np.random.default_rng(2026) image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=image, camera_calibration_id="cam", ) out = pp.preprocess(frame, _make_calibration()) assert out.shape == (1, 3, 480, 480) assert out.dtype == np.float16 def test_preprocessor_input_shape_is_480x480() -> None: pp = NetVladBackbonePreprocessor() assert pp.input_shape() == (480, 480) def test_preprocessor_protocol_conformance() -> None: pp = NetVladBackbonePreprocessor() assert isinstance(pp, BackbonePreprocessor) def test_preprocessor_accepts_grayscale_input() -> None: pp = NetVladBackbonePreprocessor() gray = np.zeros((512, 512), dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=gray, camera_calibration_id="cam", ) out = pp.preprocess(frame, _make_calibration()) assert out.shape == (1, 3, 480, 480) # --------------------------------------------------------------------------- # Constructor validation # --------------------------------------------------------------------------- def _make_minimal_strategy_kwargs( *, descriptor_dim: int, num_clusters: int ) -> dict[str, Any]: """Build NetVladStrategy constructor kwargs that pass FaissBridge guards. The strategy carries its own ``descriptor_dim`` validation; the bridge has a separate (stricter) ``descriptor_dim > 0`` guard. Tests that exercise the strategy's own validators MUST use a bridge with a valid dim — the descriptor_dim mismatch between the bridge and strategy is fine for these targeted-failure tests. """ fdr_client = _make_fdr_client() clock = _StubClock() bridge = FaissBridge( descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=4096), descriptor_dim=4096, warn_top1_threshold=0.30, debug_log_per_frame_distances=False, fdr_client=fdr_client, logger=logging.getLogger("test.bridge.guard"), clock=clock, ) return { "inference_runtime": _FakeInferenceRuntime(descriptor_dim=4096), "engine_handle": _FakeEngineHandle(), "descriptor_index": _FakeDescriptorIndex(), "preprocessor": NetVladBackbonePreprocessor(), "normaliser": DescriptorNormaliser(), "faiss_bridge": bridge, "fdr_client": fdr_client, "clock": clock, "logger": logging.getLogger("test.net_vlad.guard"), "descriptor_dim": descriptor_dim, "num_clusters": num_clusters, } def test_constructor_rejects_zero_descriptor_dim() -> None: with pytest.raises(ValueError, match=r">= 1"): NetVladStrategy( **_make_minimal_strategy_kwargs(descriptor_dim=0, num_clusters=64) ) def test_constructor_rejects_zero_num_clusters() -> None: with pytest.raises(ValueError, match=r"num_clusters"): NetVladStrategy( **_make_minimal_strategy_kwargs( descriptor_dim=4096, num_clusters=0 ) ) def test_constructor_rejects_non_divisible_descriptor_dim() -> None: # 4097 not divisible by 64 clusters with pytest.raises(ValueError, match=r"divisible"): NetVladStrategy( **_make_minimal_strategy_kwargs( descriptor_dim=4097, num_clusters=64 ) ) # --------------------------------------------------------------------------- # FDR record emission # --------------------------------------------------------------------------- def test_embed_query_emits_vpr_embed_query_fdr_record() -> None: fdr_client = _make_fdr_client() strategy = _build_strategy(fdr_client=fdr_client) strategy.embed_query(_make_frame(), _make_calibration()) records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) embed_records = [r for r in records if r.kind == "vpr.embed_query"] assert len(embed_records) == 1 payload = embed_records[0].payload assert payload["backbone_label"] == "net_vlad" assert payload["descriptor_dim"] == 4096 assert isinstance(payload["latency_us"], int) assert payload["latency_us"] > 0 def test_fdr_record_kinds_registered() -> None: from gps_denied_onboard.fdr_client.records import KNOWN_KINDS assert "vpr.embed_query" in KNOWN_KINDS assert "vpr.backbone_error" in KNOWN_KINDS assert "vpr.preprocess_error" in KNOWN_KINDS