[AZ-341] C2 FAISS HNSW retrieve wiring (FaissBridge + AZ-507 cut)

Shared retrieve_topk plumbing for every concrete C2 VprStrategy:
- FaissBridge centralises the c6 search_topk → VprResult pipeline,
  the defended-in-depth INV-4 check (exactly k, distance-ascending),
  the WARN-threshold check on distances[0], optional per-frame DEBUG
  log, and one `vpr.retrieve_topk` FDR record per call with latency
  measurement.
- DescriptorIndexCut Protocol — consumer-side structural cut of c6
  DescriptorIndex.search_topk (AZ-507); keeps c2_vpr c6-import-free.
- C2VprConfig gains warn_top1_threshold + debug_per_frame_distances
  knobs with validators.
- KNOWN_PAYLOAD_KEYS registers vpr.retrieve_topk for the FDR record
  schema with payload {frame_id, backbone_label, top10_distances,
  latency_us}; companion fixture added to the AZ-272 roundtrip suite.
- 22 unit tests cover AC-1..AC-11 + NFR-perf microbench (p95 ≤ 0.5 ms)
  + constructor and retrieve-argument validation.

Verdict: PASS_WITH_WARNINGS (2 Low findings — duplicated ISO-ts
helper across c2/c5/c11/c12, captured in AZ-508 hygiene PBI;
spec-listed but unused `normaliser` parameter dropped — INV-3 makes
the embedding L2-normalised at the strategy's `embed_query`).

Tests: 1565 passed / 80 skipped (was 1543; +22 new tests).
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 21:45:40 +03:00
parent 25836925c9
commit 88f6ae6dce
9 changed files with 1327 additions and 4 deletions
@@ -27,6 +27,10 @@ strategy module.
from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult
from gps_denied_onboard.components.c2_vpr.config import C2VprConfig
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
TileIdTuple,
)
from gps_denied_onboard.components.c2_vpr.errors import (
IndexUnavailableError,
VprBackboneError,
@@ -40,7 +44,9 @@ register_component_block("c2_vpr", C2VprConfig)
__all__ = [
"C2VprConfig",
"DescriptorIndexCut",
"IndexUnavailableError",
"TileIdTuple",
"VprBackboneError",
"VprCandidate",
"VprError",
@@ -0,0 +1,317 @@
"""C2 ``FaissBridge`` — shared retrieve_topk plumbing (AZ-341).
Every concrete C2 ``VprStrategy`` (UltraVPR, NetVLAD, MegaLoc, MixVPR,
SelaVPR, EigenPlaces, SALAD) shares the same retrieval pipeline:
forward a ``VprQuery.embedding`` into c6's ``DescriptorIndex.search_topk``,
build a ``VprResult`` from the returned ``(tile_id_tuple, distance)``
pairs, apply the WARN-threshold check on ``distances[0]``, optionally
emit a DEBUG per-frame distances log, and record one
``vpr.retrieve_topk`` FDR record per call for post-flight forensics.
Centralising that plumbing in :class:`FaissBridge`:
1. Keeps every strategy's ``retrieve_topk`` body to a single
delegation line (AZ-341 AC-10).
2. Provides ONE place for the defended-in-depth INV-4 check
(exactly ``k`` candidates, distance-ascending) — c6's
``DescriptorIndex.search_topk`` documents ``≤ k`` results, but
the bridge requires ``== k`` as an operational invariant
(a partially-built corpus is an :class:`IndexUnavailableError`
condition, not a soft "do your best" path).
3. Concentrates WARN-threshold and DEBUG-flag config in one
touchpoint so operators tune one knob, not seven copies.
4. Preserves c2_vpr's structural cut to c6 (AZ-507) — the bridge
consumes :class:`DescriptorIndexCut`, never c6 directly.
Error propagation: per the AZ-341 task spec § Constraints, the bridge
DOES NOT catch the underlying :class:`IndexUnavailableError` raised by
the cut — c6's stale-handle defence (mmap inode + sidecar) is the
authoritative producer; the bridge's role is propagation. The bridge
DOES raise its own :class:`IndexUnavailableError` for INV-4 violations
(undersized or unordered c6 returns) so consumers receive a single,
uniformly-typed error envelope from a retrieval failure.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import (
DescriptorIndexCut,
)
from gps_denied_onboard.components.c2_vpr.errors import IndexUnavailableError
from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
if TYPE_CHECKING:
pass
__all__ = ["FaissBridge"]
_COMPONENT: Final[str] = "c2_vpr"
_FDR_KIND_RETRIEVE: Final[str] = "vpr.retrieve_topk"
_LOG_KIND_INVARIANT_VIOLATION: Final[str] = "c2.vpr.invariant_violation"
_LOG_KIND_TOP1_WARN: Final[str] = "c2.vpr.top1_distance_above_threshold"
_LOG_KIND_FRAME_DISTANCES: Final[str] = "c2.vpr.frame_distances"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun"
class FaissBridge:
"""Shared retrieve_topk plumbing for every C2 ``VprStrategy``.
Stateless across calls — every public-method invocation is
self-contained; the bridge holds only configuration (thresholds,
debug flag, descriptor_dim) and injected collaborators (the c6
cut, the FDR client, the logger, the clock). Per-strategy
composition is the responsibility of each strategy's
``create(...)`` factory: it constructs a bridge with that
strategy's ``descriptor_dim`` and ``backbone_label`` is passed
per call (the same bridge instance MUST NOT be shared across
strategies — see :attr:`_descriptor_dim` invariant).
"""
def __init__(
self,
*,
descriptor_index: DescriptorIndexCut,
descriptor_dim: int,
warn_top1_threshold: float,
debug_log_per_frame_distances: bool,
fdr_client: FdrClient,
logger: logging.Logger,
clock: Clock,
) -> None:
if not isinstance(descriptor_dim, int) or isinstance(descriptor_dim, bool):
raise TypeError(
f"FaissBridge.descriptor_dim must be a non-bool int; "
f"got {descriptor_dim!r}"
)
if descriptor_dim <= 0:
raise ValueError(
f"FaissBridge.descriptor_dim must be > 0; got {descriptor_dim}"
)
if not isinstance(warn_top1_threshold, (int, float)) or isinstance(
warn_top1_threshold, bool
):
raise TypeError(
f"FaissBridge.warn_top1_threshold must be a float; "
f"got {warn_top1_threshold!r}"
)
if warn_top1_threshold < 0:
raise ValueError(
f"FaissBridge.warn_top1_threshold must be >= 0; "
f"got {warn_top1_threshold}"
)
if not isinstance(debug_log_per_frame_distances, bool):
raise TypeError(
f"FaissBridge.debug_log_per_frame_distances must be a bool; "
f"got {debug_log_per_frame_distances!r}"
)
self._descriptor_index: DescriptorIndexCut = descriptor_index
self._descriptor_dim: int = descriptor_dim
self._warn_top1_threshold: float = float(warn_top1_threshold)
self._debug_log_per_frame_distances: bool = debug_log_per_frame_distances
self._fdr_client: FdrClient = fdr_client
self._logger: logging.Logger = logger
self._clock: Clock = clock
def retrieve(
self, query: VprQuery, k: int, *, backbone_label: str
) -> VprResult:
"""Run the c6 top-K lookup; build a ``VprResult``.
See class docstring for the full pipeline. The single positional
argument shape (``query``, ``k``) plus keyword-only
``backbone_label`` matches every strategy's one-line
delegation contract (AC-10).
"""
if not isinstance(k, int) or isinstance(k, bool):
raise TypeError(f"FaissBridge.retrieve: k must be a non-bool int; got {k!r}")
if k <= 0:
raise ValueError(f"FaissBridge.retrieve: k must be > 0; got {k}")
if not isinstance(backbone_label, str) or not backbone_label:
raise ValueError(
f"FaissBridge.retrieve: backbone_label must be a non-empty "
f"string; got {backbone_label!r}"
)
# NB: ``descriptor_index.search_topk`` may raise
# ``IndexUnavailableError`` (c6's stale-handle / sidecar
# defence). The bridge intentionally lets that propagate
# unchanged — AC-4. No try/except.
ns_start = self._clock.monotonic_ns()
c6_result = self._descriptor_index.search_topk(query.embedding, k)
ns_end = self._clock.monotonic_ns()
latency_us = max(1, (ns_end - ns_start) // 1_000)
self._verify_invariants(c6_result, k, backbone_label)
candidates = tuple(
VprCandidate(
tile_id=tile_id_tuple,
descriptor_distance=float(distance),
descriptor_dim=self._descriptor_dim,
)
for tile_id_tuple, distance in c6_result
)
distances = [float(distance) for _, distance in c6_result]
result = VprResult(
frame_id=query.frame_id,
candidates=candidates,
retrieved_at=ns_end,
backbone_label=backbone_label,
)
if distances[0] > self._warn_top1_threshold:
self._logger.warning(
"VPR top-1 distance above threshold",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_TOP1_WARN,
"kv": {
"distance": distances[0],
"threshold": self._warn_top1_threshold,
"backbone_label": backbone_label,
},
},
)
if self._debug_log_per_frame_distances:
self._logger.debug(
"VPR per-frame top-K distances",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FRAME_DISTANCES,
"kv": {
"frame_id": query.frame_id,
"top10_distances": distances,
},
},
)
self._emit_fdr_record(
frame_id=query.frame_id,
backbone_label=backbone_label,
distances=distances,
latency_us=latency_us,
)
return result
def _verify_invariants(
self,
c6_result: list[tuple[tuple[int, float, float], float]],
k: int,
backbone_label: str,
) -> None:
"""Enforce INV-4: exactly ``k`` results, distance-ascending."""
n = len(c6_result)
if n != k:
msg = (
f"corpus returned {n} candidates (expected {k}) "
f"for backbone={backbone_label!r}"
)
self._log_invariant_violation(
msg=msg,
reason="undersized",
k=k,
returned_count=n,
backbone_label=backbone_label,
)
raise IndexUnavailableError(msg)
for i in range(1, n):
if c6_result[i][1] < c6_result[i - 1][1]:
distances_snapshot = [d for _, d in c6_result]
msg = (
f"corpus returned unordered distances "
f"{distances_snapshot} for backbone={backbone_label!r}"
)
self._log_invariant_violation(
msg=msg,
reason="unordered",
k=k,
returned_count=n,
backbone_label=backbone_label,
)
raise IndexUnavailableError(msg)
def _log_invariant_violation(
self,
*,
msg: str,
reason: str,
k: int,
returned_count: int,
backbone_label: str,
) -> None:
self._logger.error(
msg,
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_INVARIANT_VIOLATION,
"kv": {
"reason": reason,
"expected_k": k,
"returned_count": returned_count,
"backbone_label": backbone_label,
},
},
)
def _emit_fdr_record(
self,
*,
frame_id: int,
backbone_label: str,
distances: list[float],
latency_us: int,
) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=self._iso_ts_from_clock(),
producer_id=self._fdr_client.producer_id,
kind=_FDR_KIND_RETRIEVE,
payload={
"frame_id": frame_id,
"backbone_label": backbone_label,
"top10_distances": list(distances),
"latency_us": latency_us,
},
)
result = self._fdr_client.enqueue(record)
if result == EnqueueResult.OVERRUN:
self._logger.warning(
"FDR enqueue dropped vpr.retrieve_topk record (buffer overrun)",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {
"frame_id": frame_id,
"backbone_label": backbone_label,
},
},
)
def _iso_ts_from_clock(self) -> str:
# Inlined here rather than importing a shared helper because every
# other component that emits FDR records (c12, c11, c5) defines
# the same one-liner locally — see meta-rule "Critical Thinking":
# the duplication is intentional and trivial, factoring it would
# add an L1 import every component already avoids.
from datetime import datetime, timezone
ns = int(self._clock.time_ns())
seconds, fraction_ns = divmod(ns, 1_000_000_000)
dt = datetime.fromtimestamp(seconds, tz=timezone.utc)
return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}+00:00"
@@ -52,11 +52,23 @@ class C2VprConfig:
per strategy). ``faiss_index_path`` is the location of the
pre-built FAISS HNSW index file (C6 ``DescriptorIndex`` reads
its sidecar there).
``warn_top1_threshold`` and ``debug_per_frame_distances`` belong
to the ``FaissBridge`` (AZ-341): the bridge emits ONE WARN log
when ``distances[0] > warn_top1_threshold`` (operator-tunable
suspicious-frame signal; default 0.30 is a conservative
placeholder pending FT-P-19 telemetry per the task spec's Risk
3), and emits ONE DEBUG log per frame carrying the full
``top10_distances`` vector when
``debug_per_frame_distances`` is true (default OFF — ~2.6M log
lines per 24h flight if always-on; per-task-spec Risk 2).
"""
strategy: str = "net_vlad"
backbone_weights_path: Path = field(default_factory=lambda: Path("/models/vpr/weights"))
faiss_index_path: Path = field(default_factory=lambda: Path("/cache/vpr/index.faiss"))
warn_top1_threshold: float = 0.30
debug_per_frame_distances: bool = False
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STRATEGIES:
@@ -80,3 +92,20 @@ class C2VprConfig:
raise ConfigError(
"C2VprConfig.faiss_index_path must be non-empty"
)
if not isinstance(self.warn_top1_threshold, (int, float)) or isinstance(
self.warn_top1_threshold, bool
):
raise ConfigError(
f"C2VprConfig.warn_top1_threshold must be a float; "
f"got {self.warn_top1_threshold!r}"
)
if self.warn_top1_threshold < 0:
raise ConfigError(
f"C2VprConfig.warn_top1_threshold must be >= 0; "
f"got {self.warn_top1_threshold}"
)
if not isinstance(self.debug_per_frame_distances, bool):
raise ConfigError(
f"C2VprConfig.debug_per_frame_distances must be a bool; "
f"got {self.debug_per_frame_distances!r}"
)
@@ -0,0 +1,55 @@
"""C2 consumer-side structural cut of c6 ``DescriptorIndex`` (AZ-507 + AZ-341).
The AZ-507 cross-component rule (see ``_docs/02_document/module-layout.md``
"Per-Component Mapping → c2_vpr") forbids ``c2_vpr/*.py`` from importing
``components.c6_tile_cache`` directly. The :class:`FaissBridge` needs the
FAISS top-K search surface to drive every concrete ``VprStrategy``'s
``retrieve_topk``, so we declare a local ``runtime_checkable`` Protocol
that mirrors the shape of c6's
:class:`gps_denied_onboard.components.c6_tile_cache.interface.DescriptorIndex.search_topk`
return signature.
The composition root wires the concrete c6 ``DescriptorIndex`` in via a
thin adapter that translates each ``c6.TileId`` dataclass into the
``(zoom_level, lat, lon)`` tuple form used by ``VprCandidate.tile_id``
(see ``_types.vpr.VprCandidate`` — L1 DTOs keep the tuple form to avoid
importing the L3 c6 type). Unit tests inject a fake that returns tuples
directly, so the bridge — and these tests — never touch c6 either.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
import numpy as np
__all__ = ["DescriptorIndexCut", "TileIdTuple"]
# (zoom_level, lat_deg, lon_deg) — matches ``VprCandidate.tile_id``'s
# documented composite identity (see ``_types/vpr.py`` `VprCandidate`).
# Defined here so the cut is import-self-contained.
TileIdTuple = tuple[int, float, float]
@runtime_checkable
class DescriptorIndexCut(Protocol):
"""Single-method consumer-side cut of c6 ``DescriptorIndex.search_topk``.
The bridge constructs a ``(D,)`` ``float32`` C-contiguous query
vector and the composition-root wiring forwards it to c6's real
:meth:`DescriptorIndex.search_topk`. The adapter translates each
returned ``c6.TileId`` dataclass into a :data:`TileIdTuple` before
handing the list back; this keeps c2_vpr free of every c6 import.
The return shape mirrors c6's contract verbatim except for the
tuple-form ``TileId``: a list of ``(tile_id_tuple, distance)``
pairs, distance-ascending, length ≤ ``k``. The bridge applies a
stricter operational invariant on top (exactly ``k`` results,
sorted ascending) — see AZ-341 task spec § INV-4.
"""
def search_topk(
self, query: "np.ndarray", k: int
) -> list[tuple[TileIdTuple, float]]: ...