"""AZ-337 - UltraVPR primary VprStrategy unit tests. Covers AC-1..AC-12 + preprocessor contract + constructor validation + FDR record emission + single-stage L2 normalisation. Uses fakes for :class:`InferenceRuntimeCut`, :class:`DescriptorIndexCut`, and :class:`FdrClient` so the suite stays AZ-507-clean and TRT-free. """ from __future__ import annotations import logging from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Literal from unittest.mock import MagicMock import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.inference import ( BuildConfig, EngineCacheEntry, EngineHandle, PrecisionMode, ) from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard.components.c2_vpr import ( C2VprConfig, IndexUnavailableError, VprStrategy, ) from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge from gps_denied_onboard.components.c2_vpr._preprocessor import ( BackbonePreprocessor, ) from gps_denied_onboard.components.c2_vpr._preprocessor_ultra_vpr import ( UltraVprBackbonePreprocessor, ) from gps_denied_onboard.components.c2_vpr.errors import ( VprBackboneError, VprPreprocessError, ) from gps_denied_onboard.components.c2_vpr.ultra_vpr import ( DESCRIPTOR_DIM, UltraVprStrategy, create, ) from gps_denied_onboard.config.schema import Config, ConfigError from gps_denied_onboard.fdr_client import FdrClient from gps_denied_onboard.helpers.descriptor_normaliser import DescriptorNormaliser # --------------------------------------------------------------------------- # Fakes # --------------------------------------------------------------------------- @dataclass class _StubClock: next_monotonic_ns: int = 1_000_000_000 step_ns: int = 5_000 fixed_time_ns: int = 1_715_600_000_000_000_000 def monotonic_ns(self) -> int: v = self.next_monotonic_ns self.next_monotonic_ns += self.step_ns return v def time_ns(self) -> int: return self.fixed_time_ns def sleep_until_ns(self, target_ns: int) -> None: _ = target_ns class _FakeEngineHandle(EngineHandle): """Minimal :class:`EngineHandle` for test wiring.""" def __init__(self, label: str = "ultra_vpr") -> None: self.label = label @dataclass class _FakeInferenceRuntime: """Configurable :class:`InferenceRuntimeCut` for unit tests. ``fixed_output`` is the array returned under ``embedding``; ``raises`` when set is raised instead. ``runtime_label`` controls AC-11. """ descriptor_dim: int = DESCRIPTOR_DIM raises: BaseException | None = None runtime_label: Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"] = ( "tensorrt" ) fixed_output: np.ndarray | None = None output_key: str = "embedding" calls: list[dict[str, np.ndarray]] = field(default_factory=list) deserialize_calls: list[EngineCacheEntry] = field(default_factory=list) def compile_engine( self, model_path: Path, build_config: BuildConfig ) -> EngineCacheEntry: _ = build_config return EngineCacheEntry( engine_path=Path(model_path), sha256_hex="0" * 64, sm=None, jp=None, trt=None, precision=PrecisionMode.FP16, extras={"model_name": "ultra_vpr"}, ) def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: self.deserialize_calls.append(entry) return _FakeEngineHandle(label=entry.extras.get("model_name", "")) def infer( self, handle: EngineHandle, inputs: dict[str, np.ndarray], ) -> dict[str, np.ndarray]: _ = handle self.calls.append({k: v.copy() for k, v in inputs.items()}) if self.raises is not None: raise self.raises if self.fixed_output is not None: return {self.output_key: self.fixed_output.copy()} rng = np.random.default_rng(0xCAFEBABE) tensor = rng.standard_normal(self.descriptor_dim).astype(np.float16) return { self.output_key: tensor.reshape(1, self.descriptor_dim).copy() } def release_engine(self, handle: EngineHandle) -> None: _ = handle def current_runtime_label( self, ) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: return self.runtime_label @dataclass class _FakeDescriptorIndex: descriptor_dim_value: int = DESCRIPTOR_DIM results: list[tuple[tuple[int, float, float], float]] = field( default_factory=list ) raises: BaseException | None = None def search_topk( self, query: np.ndarray, k: int ) -> list[tuple[tuple[int, float, float], float]]: _ = query if self.raises is not None: raise self.raises if not self.results: return [ ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), 0.05 + 0.05 * i) for i in range(k) ] return list(self.results[:k]) def descriptor_dim(self) -> int: return self.descriptor_dim_value def _make_frame(*, frame_id: int = 4242, h: int = 720, w: int = 1280) -> NavCameraFrame: rng = np.random.default_rng(frame_id) image = rng.integers(0, 256, size=(h, w, 3), dtype=np.uint8) return NavCameraFrame( frame_id=frame_id, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=image, camera_calibration_id="test_cam", ) def _make_calibration(*, cx: float = 640.0, cy: float = 360.0) -> CameraCalibration: """Return a calibration with a non-trivial principal point. The identity matrix used elsewhere in the tests collapses to ``(cx, cy) == (0, 0)`` which the preprocessor treats as "no calibration data" - here we set explicit values to exercise the principal-point-aware crop path. """ intrinsics = np.array( [ [1000.0, 0.0, cx], [0.0, 1000.0, cy], [0.0, 0.0, 1.0], ], dtype=np.float64, ) return CameraCalibration( camera_id="test_cam", intrinsics_3x3=intrinsics, distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="test_fixture", ) def _make_calibration_identity() -> CameraCalibration: """Identity intrinsics - principal point collapses to (0, 0).""" return CameraCalibration( camera_id="test_cam", intrinsics_3x3=np.eye(3, dtype=np.float64), distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=np.eye(4, dtype=np.float64), acquisition_method="test_fixture", ) def _make_fdr_client() -> FdrClient: return FdrClient(producer_id="c2_vpr", capacity=32, _emit_diag_log=False) def _build_strategy( *, inference_runtime: _FakeInferenceRuntime | None = None, descriptor_index: _FakeDescriptorIndex | None = None, normaliser: DescriptorNormaliser | None = None, preprocessor: UltraVprBackbonePreprocessor | None = None, fdr_client: FdrClient | None = None, clock: _StubClock | None = None, descriptor_dim: int = DESCRIPTOR_DIM, ) -> UltraVprStrategy: inference_runtime = inference_runtime or _FakeInferenceRuntime( descriptor_dim=descriptor_dim ) descriptor_index = descriptor_index or _FakeDescriptorIndex( descriptor_dim_value=descriptor_dim ) normaliser = normaliser or DescriptorNormaliser() preprocessor = preprocessor or UltraVprBackbonePreprocessor() fdr_client = fdr_client or _make_fdr_client() clock = clock or _StubClock() handle = _FakeEngineHandle() bridge = FaissBridge( descriptor_index=descriptor_index, descriptor_dim=descriptor_dim, warn_top1_threshold=0.30, debug_log_per_frame_distances=False, fdr_client=fdr_client, logger=logging.getLogger("test.bridge"), clock=clock, ) return UltraVprStrategy( inference_runtime=inference_runtime, engine_handle=handle, descriptor_index=descriptor_index, preprocessor=preprocessor, normaliser=normaliser, faiss_bridge=bridge, fdr_client=fdr_client, clock=clock, logger=logging.getLogger("test.ultra_vpr"), descriptor_dim=descriptor_dim, ) def _build_config() -> Config: """Minimal Config carrying only the c2_vpr block needed by ``create()``.""" c2 = C2VprConfig( strategy="ultra_vpr", backbone_weights_path=Path("/models/ultra_vpr.trt"), faiss_index_path=Path("/cache/vpr/index.faiss"), warn_top1_threshold=0.30, debug_per_frame_distances=False, ) cfg = MagicMock(spec=Config) cfg.components = {"c2_vpr": c2} return cfg # --------------------------------------------------------------------------- # AC-1: Protocol conformance # --------------------------------------------------------------------------- def test_ac1_protocol_conformance() -> None: strategy = _build_strategy() assert isinstance(strategy, VprStrategy) # --------------------------------------------------------------------------- # AC-2: embed_query produces L2-normalised FP16 (512,) embedding # --------------------------------------------------------------------------- def test_ac2_embed_query_returns_unit_norm_fp16_512() -> None: # Arrange runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM) strategy = _build_strategy(inference_runtime=runtime) frame = _make_frame() calibration = _make_calibration() # Act query = strategy.embed_query(frame, calibration) # Assert embedding = np.asarray(query.embedding) assert embedding.shape == (DESCRIPTOR_DIM,) assert embedding.dtype == np.float16 norm = float(np.linalg.norm(embedding.astype(np.float32))) assert norm == pytest.approx(1.0, abs=1e-3) def test_ac2_embedding_is_single_stage_l2_no_intra_cluster_path() -> None: """UltraVPR is single-stage L2 (unlike NetVLAD's two-stage chain). Calling :meth:`DescriptorNormaliser.intra_cluster_normalise` would be a bug; verify the strategy never invokes it. """ calls: list[str] = [] class _SpyNormaliser(DescriptorNormaliser): def l2_normalise(self, descriptor: np.ndarray) -> np.ndarray: # type: ignore[override] calls.append("l2_normalise") return DescriptorNormaliser.l2_normalise(descriptor) def intra_cluster_normalise( # type: ignore[override] self, descriptor: np.ndarray, num_clusters: int ) -> np.ndarray: calls.append("intra_cluster_normalise") return DescriptorNormaliser.intra_cluster_normalise( descriptor, num_clusters ) spy = _SpyNormaliser() strategy = _build_strategy(normaliser=spy) strategy.embed_query(_make_frame(), _make_calibration()) assert "intra_cluster_normalise" not in calls assert calls == ["l2_normalise"] # --------------------------------------------------------------------------- # AC-3: embed_query is deterministic # --------------------------------------------------------------------------- def test_ac3_embed_query_deterministic_for_same_frame() -> None: fixed = np.zeros((1, DESCRIPTOR_DIM), dtype=np.float16) rng = np.random.default_rng(2026) fixed[0] = rng.standard_normal(DESCRIPTOR_DIM).astype(np.float16) runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, fixed_output=fixed ) strategy = _build_strategy(inference_runtime=runtime) frame = _make_frame() calibration = _make_calibration() first = strategy.embed_query(frame, calibration) second = strategy.embed_query(frame, calibration) third = strategy.embed_query(frame, calibration) np.testing.assert_array_equal( np.asarray(first.embedding), np.asarray(second.embedding) ) np.testing.assert_array_equal( np.asarray(second.embedding), np.asarray(third.embedding) ) # --------------------------------------------------------------------------- # AC-4: retrieve_topk returns exactly k candidates sorted ascending # --------------------------------------------------------------------------- def test_ac4_retrieve_topk_returns_exactly_k_with_ultra_vpr_label() -> None: descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) strategy = _build_strategy(descriptor_index=descriptor_index) query = strategy.embed_query(_make_frame(), _make_calibration()) result = strategy.retrieve_topk(query, k=10) assert len(result.candidates) == 10 assert result.backbone_label == "ultra_vpr" assert result.candidates[0].descriptor_dim == DESCRIPTOR_DIM distances = [c.descriptor_distance for c in result.candidates] assert distances == sorted(distances) # --------------------------------------------------------------------------- # AC-5: descriptor_dim() is stable and returns 512 # --------------------------------------------------------------------------- def test_ac5_descriptor_dim_stable_returns_512() -> None: strategy = _build_strategy() for _ in range(100): assert strategy.descriptor_dim() == 512 # --------------------------------------------------------------------------- # AC-6: Engine output shape mismatch at create() -> ConfigError # --------------------------------------------------------------------------- def test_ac6_create_rejects_engine_output_shape_mismatch() -> None: # Arrange - engine produces (1, 256), expected (1, 512) wrong = np.zeros((1, 256), dtype=np.float16) runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, fixed_output=wrong ) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) fdr_client = _make_fdr_client() config = _build_config() # Act + Assert with pytest.raises( ConfigError, match=r"engine output shape mismatch.*\(1, 512\).*\(1, 256\)" ): create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), ) def test_ac6_create_rejects_engine_with_missing_embedding_key() -> None: runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, output_key="wrong_key" ) with pytest.raises(ConfigError, match=r"'embedding' key absent"): create( _build_config(), descriptor_index=_FakeDescriptorIndex( descriptor_dim_value=DESCRIPTOR_DIM ), inference_runtime=runtime, fdr_client=_make_fdr_client(), clock=_StubClock(), ) # --------------------------------------------------------------------------- # AC-7: VprBackboneError on forward-pass failure # --------------------------------------------------------------------------- def test_ac7_runtime_error_yields_vpr_backbone_error( caplog: pytest.LogCaptureFixture, ) -> None: runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, raises=RuntimeError("CUDA OOM") ) fdr_client = _make_fdr_client() strategy = _build_strategy( inference_runtime=runtime, fdr_client=fdr_client ) with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"): with pytest.raises(VprBackboneError): strategy.embed_query(_make_frame(), _make_calibration()) assert any( record.levelno == logging.ERROR and getattr(record, "kind", None) == "c2.vpr.backbone_error" for record in caplog.records ) records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) backbone_errors = [r for r in records if r.kind == "vpr.backbone_error"] assert len(backbone_errors) == 1 def test_ac7_missing_embedding_key_yields_vpr_backbone_error() -> None: runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, output_key="not_embedding" ) strategy = _build_strategy(inference_runtime=runtime) with pytest.raises(VprBackboneError, match=r"'embedding' key"): strategy.embed_query(_make_frame(), _make_calibration()) def test_ac7_wrong_forward_output_shape_yields_vpr_backbone_error() -> None: bad = np.zeros((1, 256), dtype=np.float16) runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, fixed_output=bad ) strategy = _build_strategy(inference_runtime=runtime) with pytest.raises(VprBackboneError, match=r"expected \(1, 512\)"): strategy.embed_query(_make_frame(), _make_calibration()) # --------------------------------------------------------------------------- # AC-8: VprPreprocessError on corrupt image bytes # --------------------------------------------------------------------------- def test_ac8_corrupt_image_yields_vpr_preprocess_error( caplog: pytest.LogCaptureFixture, ) -> None: fdr_client = _make_fdr_client() strategy = _build_strategy(fdr_client=fdr_client) frame = NavCameraFrame( frame_id=4242, timestamp=datetime(2026, 5, 13, 12, 0, 0), image="not-an-array", camera_calibration_id="test_cam", ) with caplog.at_level(logging.ERROR, logger="test.ultra_vpr"): with pytest.raises(VprPreprocessError): strategy.embed_query(frame, _make_calibration()) assert any( record.levelno == logging.ERROR and getattr(record, "kind", None) == "c2.vpr.preprocess_error" for record in caplog.records ) records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) preprocess_errors = [ r for r in records if r.kind == "vpr.preprocess_error" ] assert len(preprocess_errors) == 1 def test_ac8_wrong_dtype_image_yields_vpr_preprocess_error() -> None: strategy = _build_strategy() bad_image = np.zeros((720, 1280, 3), dtype=np.float32) frame = NavCameraFrame( frame_id=42, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=bad_image, camera_calibration_id="test_cam", ) with pytest.raises(VprPreprocessError, match=r"uint8"): strategy.embed_query(frame, _make_calibration()) # --------------------------------------------------------------------------- # AC-9: Calibration absent / identity -> centre-crop fallback + WARN log # --------------------------------------------------------------------------- def test_ac9_identity_calibration_falls_back_to_geometric_centre( caplog: pytest.LogCaptureFixture, ) -> None: """Identity intrinsics produce ``(cx, cy) == (0, 0)`` which the preprocessor treats as missing calibration data. """ preprocessor_logger = logging.getLogger("test.ultra_vpr.pp") preprocessor = UltraVprBackbonePreprocessor(logger=preprocessor_logger) strategy = _build_strategy(preprocessor=preprocessor) with caplog.at_level(logging.WARNING, logger="test.ultra_vpr.pp"): query = strategy.embed_query( _make_frame(), _make_calibration_identity() ) warn_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.calibration_missing" ] assert len(warn_records) == 1 # AC-2 still holds with the fallback path norm = float(np.linalg.norm(np.asarray(query.embedding).astype(np.float32))) assert norm == pytest.approx(1.0, abs=1e-3) def test_ac9_principal_point_offset_changes_crop_window() -> None: """The principal-point-aware crop produces a different output than the geometric-centre crop when the principal point is non-central. """ rng = np.random.default_rng(0xABCD) image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=image, camera_calibration_id="cam", ) pp = UltraVprBackbonePreprocessor() cal_centre = _make_calibration(cx=640.0, cy=360.0) cal_offset = _make_calibration(cx=900.0, cy=200.0) out_centre = pp.preprocess(frame, cal_centre) out_offset = pp.preprocess(frame, cal_offset) assert not np.array_equal(out_centre, out_offset) # --------------------------------------------------------------------------- # AC-10: IndexUnavailableError propagated unchanged from retrieve_topk # --------------------------------------------------------------------------- def test_ac10_index_unavailable_propagates_unchanged() -> None: err = IndexUnavailableError("stale handle") descriptor_index = _FakeDescriptorIndex( descriptor_dim_value=DESCRIPTOR_DIM, raises=err ) strategy = _build_strategy(descriptor_index=descriptor_index) query = strategy.embed_query(_make_frame(), _make_calibration()) with pytest.raises(IndexUnavailableError, match=r"stale handle"): strategy.retrieve_topk(query, k=10) # --------------------------------------------------------------------------- # AC-11: composition-root wiring + INFO log "c2.vpr.ready" # --------------------------------------------------------------------------- def test_ac11_create_emits_strategy_ready_info_log( caplog: pytest.LogCaptureFixture, ) -> None: runtime = _FakeInferenceRuntime(descriptor_dim=DESCRIPTOR_DIM) descriptor_index = _FakeDescriptorIndex(descriptor_dim_value=DESCRIPTOR_DIM) fdr_client = _make_fdr_client() config = _build_config() logger = logging.getLogger("test.ultra_vpr.create") with caplog.at_level(logging.INFO, logger="test.ultra_vpr.create"): strategy = create( config, descriptor_index=descriptor_index, inference_runtime=runtime, fdr_client=fdr_client, clock=_StubClock(), logger=logger, ) assert isinstance(strategy, UltraVprStrategy) assert strategy.descriptor_dim() == 512 ready_logs = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.ready" ] assert len(ready_logs) == 1 kv = ready_logs[0].kv # type: ignore[attr-defined] assert kv["strategy"] == "ultra_vpr" assert kv["descriptor_dim"] == 512 def test_ac11_non_trt_runtime_rejected_at_create() -> None: runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, runtime_label="pytorch_fp16" ) config = _build_config() with pytest.raises(ConfigError, match=r"BUILD_TENSORRT_RUNTIME=ON"): create( config, descriptor_index=_FakeDescriptorIndex( descriptor_dim_value=DESCRIPTOR_DIM ), inference_runtime=runtime, fdr_client=_make_fdr_client(), clock=_StubClock(), ) def test_ac11_onnx_trt_ep_runtime_accepted_at_create() -> None: """ONNX-Runtime is the documented fallback (per AZ-337 description).""" runtime = _FakeInferenceRuntime( descriptor_dim=DESCRIPTOR_DIM, runtime_label="onnx_trt_ep" ) strategy = create( _build_config(), descriptor_index=_FakeDescriptorIndex( descriptor_dim_value=DESCRIPTOR_DIM ), inference_runtime=runtime, fdr_client=_make_fdr_client(), clock=_StubClock(), ) assert isinstance(strategy, UltraVprStrategy) # --------------------------------------------------------------------------- # AC-12: WARN log on top-1 distance above threshold (delegated to FaissBridge) # --------------------------------------------------------------------------- def test_ac12_top1_above_threshold_emits_warn_via_faiss_bridge( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange - corpus returns top-1 distance 0.42 > 0.30 default threshold descriptor_index = _FakeDescriptorIndex( descriptor_dim_value=DESCRIPTOR_DIM, results=[ ((1, 49.0, 36.0), 0.42), ((2, 49.001, 36.001), 0.51), ((3, 49.002, 36.002), 0.65), ], ) strategy = _build_strategy(descriptor_index=descriptor_index) query = strategy.embed_query(_make_frame(), _make_calibration()) with caplog.at_level(logging.WARNING, logger="test.bridge"): strategy.retrieve_topk(query, k=3) warn_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" ] assert len(warn_records) == 1 kv = warn_records[0].kv # type: ignore[attr-defined] assert kv["distance"] == pytest.approx(0.42) assert kv["threshold"] == pytest.approx(0.30) assert kv["backbone_label"] == "ultra_vpr" # --------------------------------------------------------------------------- # Preprocessor contract # --------------------------------------------------------------------------- def test_preprocessor_output_shape_and_dtype() -> None: pp = UltraVprBackbonePreprocessor() rng = np.random.default_rng(2026) image = rng.integers(0, 256, size=(720, 1280, 3), dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=image, camera_calibration_id="cam", ) out = pp.preprocess(frame, _make_calibration()) assert out.shape == (1, 3, 384, 384) assert out.dtype == np.float16 def test_preprocessor_input_shape_is_384x384() -> None: pp = UltraVprBackbonePreprocessor() assert pp.input_shape() == (384, 384) def test_preprocessor_protocol_conformance() -> None: pp = UltraVprBackbonePreprocessor() assert isinstance(pp, BackbonePreprocessor) def test_preprocessor_accepts_grayscale_input() -> None: pp = UltraVprBackbonePreprocessor() gray = np.zeros((512, 512), dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=gray, camera_calibration_id="cam", ) out = pp.preprocess(frame, _make_calibration()) assert out.shape == (1, 3, 384, 384) def test_preprocessor_mean_std_correct_on_grey_image() -> None: """A uniform-grey image should produce per-channel ``(grey - mean) / std``.""" pp = UltraVprBackbonePreprocessor() grey = np.full((512, 512, 3), 128, dtype=np.uint8) frame = NavCameraFrame( frame_id=1, timestamp=datetime(2026, 5, 13, 12, 0, 0), image=grey, camera_calibration_id="cam", ) out = pp.preprocess(frame, _make_calibration()) mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) expected = (128.0 / 255.0 - mean) / std actual_per_channel = ( out[0].astype(np.float32).reshape(3, -1).mean(axis=1) ) np.testing.assert_allclose(actual_per_channel, expected, atol=1e-2) # --------------------------------------------------------------------------- # Constructor validation # --------------------------------------------------------------------------- def _make_minimal_strategy_kwargs(*, descriptor_dim: int) -> dict[str, Any]: """Build kwargs that pass FaissBridge guards. The strategy carries its own ``descriptor_dim`` validation; the bridge has a separate (stricter) ``descriptor_dim > 0`` guard. Tests that exercise the strategy's own validators MUST use a bridge with a valid dim. """ fdr_client = _make_fdr_client() clock = _StubClock() bridge = FaissBridge( descriptor_index=_FakeDescriptorIndex(descriptor_dim_value=512), descriptor_dim=512, warn_top1_threshold=0.30, debug_log_per_frame_distances=False, fdr_client=fdr_client, logger=logging.getLogger("test.bridge.guard"), clock=clock, ) return { "inference_runtime": _FakeInferenceRuntime(), "engine_handle": _FakeEngineHandle(), "descriptor_index": _FakeDescriptorIndex(), "preprocessor": UltraVprBackbonePreprocessor(), "normaliser": DescriptorNormaliser(), "faiss_bridge": bridge, "fdr_client": fdr_client, "clock": clock, "logger": logging.getLogger("test.ultra_vpr.guard"), "descriptor_dim": descriptor_dim, } def test_constructor_rejects_zero_descriptor_dim() -> None: with pytest.raises(ValueError, match=r">= 1"): UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=0)) def test_constructor_rejects_negative_descriptor_dim() -> None: with pytest.raises(ValueError, match=r">= 1"): UltraVprStrategy(**_make_minimal_strategy_kwargs(descriptor_dim=-5)) # --------------------------------------------------------------------------- # FDR record emission # --------------------------------------------------------------------------- def test_embed_query_emits_vpr_embed_query_fdr_record() -> None: fdr_client = _make_fdr_client() strategy = _build_strategy(fdr_client=fdr_client) strategy.embed_query(_make_frame(), _make_calibration()) records = [] while True: r = fdr_client.pop_one() if r is None: break records.append(r) embed_records = [r for r in records if r.kind == "vpr.embed_query"] assert len(embed_records) == 1 payload = embed_records[0].payload assert payload["backbone_label"] == "ultra_vpr" assert payload["descriptor_dim"] == 512 assert isinstance(payload["latency_us"], int) assert payload["latency_us"] > 0 def test_create_does_not_register_pytorch_architecture() -> None: """UltraVPR uses a TRT engine - no PyTorch architecture registration. Verifies the strategy module does NOT expose ``MODEL_NAME`` / ``architecture_factory`` attributes (which would trigger registration in the composition root). """ import gps_denied_onboard.components.c2_vpr.ultra_vpr as mod assert not hasattr(mod, "MODEL_NAME") assert not hasattr(mod, "architecture_factory")