"""AZ-341 FaissBridge unit tests. Covers AC-1..AC-11 + the NFR-perf microbench against the bridge using fakes (no c6, c7, or live FAISS — c2_vpr stays AZ-507-clean). """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from typing import TYPE_CHECKING import numpy as np import pytest from gps_denied_onboard._types.vpr import VprQuery from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge from gps_denied_onboard.components.c2_vpr.errors import IndexUnavailableError from gps_denied_onboard.fdr_client import FdrClient from gps_denied_onboard.fdr_client.records import ( CURRENT_SCHEMA_VERSION, FdrRecord, ) if TYPE_CHECKING: from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( TileIdTuple, ) # --------------------------------------------------------------------------- # Fakes # --------------------------------------------------------------------------- @dataclass class _FakeDescriptorIndex: """Configurable :class:`DescriptorIndexCut` for the unit tests. ``results`` is the canned ``(tile_id_tuple, distance)`` list the fake returns from :meth:`search_topk`; ``raises``, when set, is raised instead. ``calls`` records every ``(query, k)`` invocation. """ results: list[tuple[tuple[int, float, float], float]] = field(default_factory=list) raises: BaseException | None = None calls: list[tuple[np.ndarray, int]] = field(default_factory=list) def search_topk( self, query: np.ndarray, k: int ) -> list[tuple[tuple[int, float, float], float]]: self.calls.append((query, k)) if self.raises is not None: raise self.raises return list(self.results) @dataclass class _StubClock: """Deterministic Clock — ``monotonic_ns`` increments by ``step_ns``.""" next_monotonic_ns: int = 1_000_000_000 step_ns: int = 5_000 fixed_time_ns: int = 1_715_600_000_000_000_000 def monotonic_ns(self) -> int: v = self.next_monotonic_ns self.next_monotonic_ns += self.step_ns return v def time_ns(self) -> int: return self.fixed_time_ns def sleep_until_ns(self, target_ns: int) -> None: _ = target_ns def _make_fdr_client(*, force_overrun: bool = False) -> FdrClient: client = FdrClient( producer_id="c2_vpr", capacity=8, _emit_diag_log=False, ) if force_overrun: filler = FdrRecord( schema_version=CURRENT_SCHEMA_VERSION, ts="2026-05-13T00:00:00.000000+00:00", producer_id=client.producer_id, kind="log", payload={ "level": "INFO", "component": "test", "frame_id": "", "kind": "test", "msg": "filler", }, ) # `FdrClient._buffer` is the SPSC ring; its capacity is rounded # up to the next power of two from the constructor argument. # Filling to capacity makes the next enqueue return OVERRUN. while client.enqueue(filler) == "ok": pass return client def _make_query(*, frame_id: int = 4242, dim: int = 512) -> VprQuery: embedding = np.zeros((dim,), dtype=np.float32) embedding[0] = 1.0 return VprQuery(frame_id=frame_id, embedding=embedding, produced_at=999) def _ten_canned_results( distances: list[float] | None = None, ) -> list[tuple[tuple[int, float, float], float]]: distances = distances if distances is not None else [ 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, ] return [ ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), d) for i, d in enumerate(distances) ] def _build_bridge( *, descriptor_index: _FakeDescriptorIndex, fdr_client: FdrClient, clock: _StubClock, descriptor_dim: int = 512, warn_top1_threshold: float = 0.30, debug_log_per_frame_distances: bool = False, logger_name: str = "c2.faiss_bridge.test", ) -> FaissBridge: return FaissBridge( descriptor_index=descriptor_index, descriptor_dim=descriptor_dim, warn_top1_threshold=warn_top1_threshold, debug_log_per_frame_distances=debug_log_per_frame_distances, fdr_client=fdr_client, logger=logging.getLogger(logger_name), clock=clock, ) # --------------------------------------------------------------------------- # AC-1: happy-path retrieve → VprResult + FDR record # --------------------------------------------------------------------------- def test_retrieve_happy_path_returns_vpr_result_and_emits_fdr( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query(frame_id=4242) # Act with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): result = bridge.retrieve(query, k=10, backbone_label="ultra_vpr") # Assert — c6 cut was called exactly once with the correct args assert len(descriptor_index.calls) == 1 sent_query, sent_k = descriptor_index.calls[0] assert sent_k == 10 assert sent_query is query.embedding # Assert — VprResult shape assert result.frame_id == 4242 assert result.backbone_label == "ultra_vpr" assert len(result.candidates) == 10 candidate_distances = [c.descriptor_distance for c in result.candidates] assert candidate_distances == sorted(candidate_distances) assert candidate_distances[0] == pytest.approx(0.05) assert all(c.descriptor_dim == 512 for c in result.candidates) assert result.retrieved_at > 0 # Assert — exactly one FDR record record = fdr_client.pop_one() assert record is not None assert record.kind == "vpr.retrieve_topk" assert record.payload["frame_id"] == 4242 assert record.payload["backbone_label"] == "ultra_vpr" assert record.payload["top10_distances"] == pytest.approx( [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] ) assert isinstance(record.payload["latency_us"], int) assert record.payload["latency_us"] > 0 assert fdr_client.pop_one() is None # --------------------------------------------------------------------------- # AC-2: INV-4 violation — undersized result → IndexUnavailableError # --------------------------------------------------------------------------- def test_retrieve_undersized_corpus_raises_index_unavailable_error( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange descriptor_index = _FakeDescriptorIndex( results=[((18, 49.0, 36.0), 0.05), ((18, 49.001, 36.001), 0.10)], ) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query() # Act + Assert with caplog.at_level(logging.ERROR, logger="c2.faiss_bridge.test"): with pytest.raises(IndexUnavailableError) as exc_info: bridge.retrieve(query, k=10, backbone_label="ultra_vpr") assert "corpus returned 2 candidates (expected 10)" in str(exc_info.value) # Assert — ERROR log with the invariant_violation kind err_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.invariant_violation" ] assert len(err_records) == 1 assert err_records[0].kv["reason"] == "undersized" assert err_records[0].kv["returned_count"] == 2 assert err_records[0].kv["expected_k"] == 10 # Assert — no FDR record emitted (failure is the corpus, not retrieval) assert fdr_client.pop_one() is None # --------------------------------------------------------------------------- # AC-3: INV-4 violation — unordered distances # --------------------------------------------------------------------------- def test_retrieve_unordered_distances_raises_index_unavailable_error( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange — distances out of ascending order at idx 2 bad_distances = [0.05, 0.20, 0.10, 0.15, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] descriptor_index = _FakeDescriptorIndex( results=_ten_canned_results(distances=bad_distances), ) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query() # Act + Assert with caplog.at_level(logging.ERROR, logger="c2.faiss_bridge.test"): with pytest.raises(IndexUnavailableError) as exc_info: bridge.retrieve(query, k=10, backbone_label="ultra_vpr") assert "unordered distances" in str(exc_info.value) err_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.invariant_violation" ] assert len(err_records) == 1 assert err_records[0].kv["reason"] == "unordered" assert fdr_client.pop_one() is None # --------------------------------------------------------------------------- # AC-4: c6 raises IndexUnavailableError → propagates UNCHANGED (no catch) # --------------------------------------------------------------------------- def test_retrieve_propagates_index_unavailable_error_unchanged() -> None: # Arrange — the fake raises ON the search_topk call (mirroring c6's # stale-handle / sidecar / dim-mismatch defence) inner = IndexUnavailableError("stale handle") descriptor_index = _FakeDescriptorIndex(raises=inner) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query() # Act + Assert — same exception, NOT wrapped with pytest.raises(IndexUnavailableError) as exc_info: bridge.retrieve(query, k=10, backbone_label="ultra_vpr") assert exc_info.value is inner assert str(exc_info.value) == "stale handle" # Assert — no FDR record (retrieval never completed) assert fdr_client.pop_one() is None # --------------------------------------------------------------------------- # AC-5: WARN-threshold trigger when distances[0] > threshold # --------------------------------------------------------------------------- def test_retrieve_emits_warn_log_when_top1_above_threshold( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange distances = [0.42, 0.45, 0.50, 0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85] descriptor_index = _FakeDescriptorIndex( results=_ten_canned_results(distances=distances), ) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock, warn_top1_threshold=0.30, ) query = _make_query() # Act with caplog.at_level(logging.WARNING, logger="c2.faiss_bridge.test"): result = bridge.retrieve(query, k=10, backbone_label="ultra_vpr") # Assert — VprResult still returned assert len(result.candidates) == 10 # Assert — exactly ONE WARN log with the expected kind + structured kv warn_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" ] assert len(warn_records) == 1 kv = warn_records[0].kv assert kv["distance"] == pytest.approx(0.42) assert kv["threshold"] == pytest.approx(0.30) assert kv["backbone_label"] == "ultra_vpr" # --------------------------------------------------------------------------- # AC-6: WARN-threshold NOT triggered when top-1 below threshold # --------------------------------------------------------------------------- def test_retrieve_does_not_emit_warn_when_top1_below_threshold( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange distances = [0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.55, 0.60] descriptor_index = _FakeDescriptorIndex( results=_ten_canned_results(distances=distances), ) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock, warn_top1_threshold=0.30, ) query = _make_query() # Act with caplog.at_level(logging.WARNING, logger="c2.faiss_bridge.test"): bridge.retrieve(query, k=10, backbone_label="ultra_vpr") # Assert — no WARN log warn_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" ] assert warn_records == [] # --------------------------------------------------------------------------- # AC-7: DEBUG per-frame distances ON → DEBUG log emitted # --------------------------------------------------------------------------- def test_retrieve_emits_debug_log_when_per_frame_distances_on( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock, debug_log_per_frame_distances=True, ) query = _make_query(frame_id=9999) # Act with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): bridge.retrieve(query, k=10, backbone_label="net_vlad") # Assert — exactly one DEBUG record debug_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.frame_distances" ] assert len(debug_records) == 1 kv = debug_records[0].kv assert kv["frame_id"] == 9999 assert kv["top10_distances"] == pytest.approx( [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] ) # --------------------------------------------------------------------------- # AC-8: DEBUG per-frame distances OFF (default) → no DEBUG log # --------------------------------------------------------------------------- def test_retrieve_does_not_emit_debug_log_when_per_frame_distances_off( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() clock = _StubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query() # Act with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): bridge.retrieve(query, k=10, backbone_label="ultra_vpr") # Assert debug_records = [ r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.frame_distances" ] assert debug_records == [] # --------------------------------------------------------------------------- # AC-9: FDR record carries {frame_id, backbone_label, top10_distances, latency_us > 0} # --------------------------------------------------------------------------- def test_retrieve_fdr_record_fields_are_populated_with_positive_latency() -> None: # Arrange descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() clock = _StubClock(step_ns=7_000) bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query(frame_id=12345) # Act bridge.retrieve(query, k=10, backbone_label="ultra_vpr") # Assert record = fdr_client.pop_one() assert record is not None assert record.kind == "vpr.retrieve_topk" payload = record.payload assert set(payload.keys()) == { "frame_id", "backbone_label", "top10_distances", "latency_us", } assert payload["frame_id"] == 12345 assert payload["backbone_label"] == "ultra_vpr" assert payload["top10_distances"] == pytest.approx( [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] ) assert payload["latency_us"] > 0 # --------------------------------------------------------------------------- # AC-10: every concrete `VprStrategy.retrieve_topk` body is one return statement # --------------------------------------------------------------------------- def test_every_concrete_strategy_retrieve_topk_body_is_one_return_statement() -> None: # Arrange — discover every concrete `VprStrategy` subclass via AST # inspection of `c2_vpr/*.py` modules whose filename matches a # ``KNOWN_STRATEGIES`` member. Strategies that don't exist yet # (AZ-337..AZ-340) trivially pass this check. import ast import pathlib from gps_denied_onboard.components.c2_vpr.config import KNOWN_STRATEGIES component_dir = pathlib.Path(__file__).resolve().parents[3] / ( "src/gps_denied_onboard/components/c2_vpr" ) strategy_files = sorted( p for p in component_dir.iterdir() if p.is_file() and p.suffix == ".py" and p.stem in KNOWN_STRATEGIES ) # If no strategy files exist yet, the check still passes — AC-10 is # forward-looking; AZ-337/338/339/340 will be required to satisfy it # when they ship. for strategy_file in strategy_files: tree = ast.parse(strategy_file.read_text(encoding="utf-8")) # Find every class that defines a ``retrieve_topk`` method for node in ast.walk(tree): if not isinstance(node, ast.ClassDef): continue for member in node.body: if not isinstance(member, (ast.FunctionDef, ast.AsyncFunctionDef)): continue if member.name != "retrieve_topk": continue # Drop optional docstring (Expr/Constant string) from # the body before counting statements. body = list(member.body) if ( body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Constant) and isinstance(body[0].value.value, str) ): body = body[1:] # Assert assert len(body) == 1, ( f"{strategy_file.name}::{node.name}.retrieve_topk " f"must have exactly one statement after docstring; " f"got {len(body)}" ) assert isinstance(body[0], ast.Return), ( f"{strategy_file.name}::{node.name}.retrieve_topk " f"single statement must be `return ...`; got " f"{type(body[0]).__name__}" ) # --------------------------------------------------------------------------- # AC-11: per-strategy `descriptor_dim` carried through to candidates # --------------------------------------------------------------------------- def test_descriptor_dim_carried_through_to_each_candidate() -> None: # Arrange — two bridges with different dims descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() clock = _StubClock() bridge_ultra = _build_bridge( descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), fdr_client=_make_fdr_client(), clock=_StubClock(), descriptor_dim=512, ) bridge_netvlad = _build_bridge( descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), fdr_client=_make_fdr_client(), clock=_StubClock(), descriptor_dim=4096, ) # Act ultra = bridge_ultra.retrieve( _make_query(dim=512), k=10, backbone_label="ultra_vpr" ) netvlad = bridge_netvlad.retrieve( _make_query(dim=4096), k=10, backbone_label="net_vlad" ) # Assert assert all(c.descriptor_dim == 512 for c in ultra.candidates) assert all(c.descriptor_dim == 4096 for c in netvlad.candidates) # --------------------------------------------------------------------------- # NFR-perf: bridge.retrieve overhead p95 ≤ 0.5 ms (excluding c6 time) # --------------------------------------------------------------------------- def test_bridge_retrieve_overhead_p95_under_500us() -> None: # Arrange descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) fdr_client = _make_fdr_client() # WallClock-equivalent fake — uses time.monotonic_ns so the # measured latency is realistic, not stub-step driven. @dataclass class _WallStubClock: fixed_time_ns: int = 1_715_600_000_000_000_000 def monotonic_ns(self) -> int: return time.monotonic_ns() def time_ns(self) -> int: return self.fixed_time_ns def sleep_until_ns(self, target_ns: int) -> None: _ = target_ns clock = _WallStubClock() bridge = _build_bridge( descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock ) query = _make_query() n = 1000 timings_ns: list[int] = [] # Act — measure outside the bridge so we capture wrapper + INV-4 + # candidate construction + FDR enqueue + log emission overhead. for _ in range(n): # Drain the FDR queue so the next enqueue does not OVERRUN. while fdr_client.pop_one() is not None: pass t0 = time.monotonic_ns() bridge.retrieve(query, k=10, backbone_label="ultra_vpr") t1 = time.monotonic_ns() timings_ns.append(t1 - t0) # Assert — p95 ≤ 500 µs timings_ns.sort() p95_us = timings_ns[int(n * 0.95)] / 1_000 assert p95_us <= 500.0, f"bridge.retrieve p95 = {p95_us:.1f}µs > 500.0µs" # --------------------------------------------------------------------------- # Constructor validation — descriptor_dim, threshold, debug flag types # --------------------------------------------------------------------------- @pytest.mark.parametrize( "kwargs, match", [ ({"descriptor_dim": "512"}, "descriptor_dim must be a non-bool int"), ({"descriptor_dim": True}, "descriptor_dim must be a non-bool int"), ({"descriptor_dim": 0}, "descriptor_dim must be > 0"), ({"descriptor_dim": -1}, "descriptor_dim must be > 0"), ({"warn_top1_threshold": "0.5"}, "warn_top1_threshold must be a float"), ({"warn_top1_threshold": True}, "warn_top1_threshold must be a float"), ({"warn_top1_threshold": -0.01}, "warn_top1_threshold must be >= 0"), ({"debug_log_per_frame_distances": 1}, "debug_log_per_frame_distances must be a bool"), ], ) def test_constructor_rejects_invalid_arguments( kwargs: dict[str, object], match: str ) -> None: base = { "descriptor_index": _FakeDescriptorIndex(results=_ten_canned_results()), "descriptor_dim": 512, "warn_top1_threshold": 0.30, "debug_log_per_frame_distances": False, "fdr_client": _make_fdr_client(), "logger": logging.getLogger("c2.faiss_bridge.test"), "clock": _StubClock(), } base.update(kwargs) expected_error: type[BaseException] = ( ValueError if match.startswith(("descriptor_dim must be > 0", "warn_top1_threshold must be >= 0")) else TypeError ) with pytest.raises(expected_error, match=match): FaissBridge(**base) # type: ignore[arg-type] def test_retrieve_rejects_non_positive_k_and_empty_backbone_label() -> None: # Arrange bridge = _build_bridge( descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), fdr_client=_make_fdr_client(), clock=_StubClock(), ) query = _make_query() # Act + Assert with pytest.raises(ValueError, match="k must be > 0"): bridge.retrieve(query, k=0, backbone_label="ultra_vpr") with pytest.raises(TypeError, match="k must be a non-bool int"): bridge.retrieve(query, k=True, backbone_label="ultra_vpr") # type: ignore[arg-type] with pytest.raises(ValueError, match="backbone_label must be a non-empty"): bridge.retrieve(query, k=10, backbone_label="")