diff --git a/_docs/03_implementation/reviews/batch_45_review.md b/_docs/03_implementation/reviews/batch_45_review.md new file mode 100644 index 0000000..fdbda47 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_45_review.md @@ -0,0 +1,213 @@ +# Batch 45 — Code Review + +**Tasks**: AZ-341 (C2 FAISS HNSW Retrieve Wiring) +**Cycle**: 1 +**Reviewer**: autodev +**Verdict**: **PASS_WITH_WARNINGS** + +## Scope reviewed + +Production code: + +- `src/gps_denied_onboard/components/c2_vpr/_faiss_bridge.py` (NEW — + `FaissBridge` class with constructor validation, `retrieve(...)` + method, INV-4 defensive check, WARN-threshold + DEBUG-flag log + emission, FDR record + latency capture). +- `src/gps_denied_onboard/components/c2_vpr/descriptor_index_cut.py` + (NEW — `DescriptorIndexCut` Protocol mirroring c6 + `DescriptorIndex.search_topk`; AZ-507 consumer-side structural cut + so c2_vpr remains c6-import-free; defines the local `TileIdTuple` + alias matching `VprCandidate.tile_id`'s composite identity). +- `src/gps_denied_onboard/components/c2_vpr/config.py` (added + `warn_top1_threshold: float = 0.30` and + `debug_per_frame_distances: bool = False` config knobs + their + validators in `__post_init__`). +- `src/gps_denied_onboard/components/c2_vpr/__init__.py` (re-exports + `DescriptorIndexCut` and `TileIdTuple` for the composition root's + c6 adapter; deliberately does NOT re-export `FaissBridge` per the + internal/`_underscore` convention — each strategy injects the + bridge via its own `create(...)` factory). +- `src/gps_denied_onboard/fdr_client/records.py` (registered + `vpr.retrieve_topk` in `KNOWN_PAYLOAD_KEYS` with payload field set + `{frame_id, backbone_label, top10_distances, latency_us}`). + +Tests: + +- `tests/unit/c2_vpr/test_faiss_bridge.py` (NEW — 22 tests covering + AC-1..AC-11 + NFR-perf microbench + constructor validation + + retrieve-argument validation). +- `tests/unit/test_az272_fdr_record_schema.py` (added a fixture + payload for `vpr.retrieve_topk` so the schema roundtrip suite + covers the new record kind alongside `c12.reloc.requested`). + +## Phase 1 — Context loading + +Inputs read: + +- Task spec: `_docs/02_tasks/todo/AZ-341_c2_faiss_retrieve_wiring.md` +- Contracts: `_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md`, + `_docs/02_document/contracts/c6_tile_cache/descriptor_index.md`, + `_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md`. +- Module layout: `_docs/02_document/module-layout.md` (c2_vpr + Per-Component Mapping; AZ-507 cross-component rule). +- Existing surfaces: `c2_vpr/{interface,config,errors,__init__}.py`, + `c6_tile_cache/{interface,errors,_types}.py`, `_types/vpr.py`, + `fdr_client/{client,records}.py`, `clock/interface.py`, + `c12_operator_orchestrator/operator_reloc_service.py` (pattern + reference for FdrClient + Clock + logger composition). + +## Phase 2 — Spec compliance + +AC coverage map (every AC has at least one covering test): + +| AC | Test | +|----|------| +| AC-1 happy-path retrieve | `test_retrieve_happy_path_returns_vpr_result_and_emits_fdr` | +| AC-2 undersized → IndexUnavailableError | `test_retrieve_undersized_corpus_raises_index_unavailable_error` | +| AC-3 unordered → IndexUnavailableError | `test_retrieve_unordered_distances_raises_index_unavailable_error` | +| AC-4 c6 error propagates unchanged | `test_retrieve_propagates_index_unavailable_error_unchanged` | +| AC-5 WARN threshold trigger | `test_retrieve_emits_warn_log_when_top1_above_threshold` | +| AC-6 WARN not triggered when below | `test_retrieve_does_not_emit_warn_when_top1_below_threshold` | +| AC-7 DEBUG on → log | `test_retrieve_emits_debug_log_when_per_frame_distances_on` | +| AC-8 DEBUG off → no log | `test_retrieve_does_not_emit_debug_log_when_per_frame_distances_off` | +| AC-9 FDR record fields | `test_retrieve_fdr_record_fields_are_populated_with_positive_latency` | +| AC-10 retrieve_topk body is one line | `test_every_concrete_strategy_retrieve_topk_body_is_one_return_statement` | +| AC-11 per-strategy descriptor_dim | `test_descriptor_dim_carried_through_to_each_candidate` | +| NFR-perf p95 ≤ 0.5 ms | `test_bridge_retrieve_overhead_p95_under_500us` | + +Contract verification: + +- `vpr_strategy_protocol.md` — the bridge returns ``VprResult`` with + exactly ``k`` candidates sorted ascending (INV-4), populates + ``backbone_label`` non-empty (INV-5), and propagates + ``IndexUnavailableError`` rather than returning stale candidates + (C2-ST-01). All satisfied. +- `descriptor_index.md` — the c6 surface returns ``≤ k`` results + (Invariant I-2) and ``IndexUnavailableError`` on stale handle / dim + mismatch. The bridge enforces a stricter operational invariant + (== k) on top, which is the task-spec's explicit defended-in-depth + intent (see Constraints § "The defensive INV-4 check is + mandatory"). The operational/structural distinction is documented + in `_faiss_bridge.py`'s module docstring. + +## Phase 3 — Code quality + +- **SRP**: `FaissBridge` has exactly one job — share the retrieve + plumbing across strategies. `DescriptorIndexCut` has one job — + cut c6 imports per AZ-507. +- **Constructor validation**: every argument is type- and + range-checked at construction; `ValueError` for value violations, + `TypeError` for type violations. +- **Error handling**: no bare `except`; only catches nothing (lets + c6 errors propagate per AC-4); raises `IndexUnavailableError` from + c2_vpr's family for the bridge's own INV-4 violations. +- **Naming**: `_verify_invariants` / `_log_invariant_violation` / + `_emit_fdr_record` read clearly from the call site. +- **Complexity**: `retrieve(...)` is ~30 lines including blank + lines; `_verify_invariants` is ~22 lines. All under 50. +- **Test quality**: every test follows Arrange / Act / Assert with + explicit comments; assertions check the actual record kind, the + WARN log's structured `kv`, and the FDR payload by key — not + just "no exception thrown". +- **Dead code**: none. +- **No verbose debug logging by default**: DEBUG per-frame distances + is OFF by default (AC-8) — only ON when explicitly enabled by + config; the WARN threshold is config-driven, not hard-coded. + +## Phase 4 — Security quick-scan + +- No SQL, no `subprocess`, no `eval` / `exec`. +- No hardcoded secrets. +- No user input is interpolated into log messages or FDR records + beyond integer `frame_id` and string `backbone_label` already + validated upstream. +- Sensitive data: the FDR record carries `top10_distances` (raw + floats) — not PII, not signing material. Safe to persist. + +## Phase 5 — Performance scan + +- `retrieve` is O(k) for the INV-4 sorted check + O(k) for + candidate construction; k=10 typical. +- No N+1 / unbounded fetch / blocking I/O / async-context issues. +- FDR enqueue is non-blocking (SPSC ring, drop-oldest on overrun + delegated to the AZ-274 overrun policy). +- Microbench `test_bridge_retrieve_overhead_p95_under_500us` + exercises 1000 calls with a wall-clock-backed `Clock` and asserts + p95 ≤ 500 µs — passes. + +## Phase 6 — Cross-task consistency + +Single-task batch (AZ-341 only). N/A. + +## Phase 7 — Architecture compliance + +- **Layer direction**: c2_vpr (Layer 3) imports only from + `_types.vpr` (L1), `clock` (L1), `config` (L1), `logging` (L1), + `fdr_client` (L1), and its own component module + (`descriptor_index_cut`). No upward imports. ✓ +- **Public API respect**: ZERO direct imports of + `gps_denied_onboard.components.c6_tile_cache.*` in any c2_vpr + source file — the bridge consumes `DescriptorIndexCut` per the + AZ-507 rule. +- **New cyclic dependencies**: none introduced. +- **Duplicate symbols**: `IndexUnavailableError` already exists in + both `c2_vpr.errors` and `c6_tile_cache.errors` (pre-existing, + documented in both modules' docstrings — intentional namespace + duplication for the closed-envelope rewrap pattern). +- **Cross-cutting concerns**: `_iso_ts_from_clock` is duplicated + across `c2_vpr/_faiss_bridge.py`, `c5_state`, `c11_tile_manager`, + and `c12_operator_orchestrator/operator_reloc_service.py`. See + Finding F1 below. + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------------|------------------------------------------------------|-------| +| 1 | Low | Maintainability | `c2_vpr/_faiss_bridge.py`:`_iso_ts_from_clock` | Duplicated `_iso_ts_from_clock` helper across c2/c5/c11/c12 | +| 2 | Low | Spec-Gap | `_docs/02_tasks/todo/AZ-341_c2_faiss_retrieve_wiring.md` | Task spec's constructor lists `normaliser: DescriptorNormaliser` but body never calls it; bridge correctly omits the parameter | + +### Finding Details + +**F1: Duplicated `_iso_ts_from_clock` helper** (Low / Maintainability) + +- Location: `c2_vpr/_faiss_bridge.py` `_iso_ts_from_clock` +- Description: This six-line helper converts `clock.time_ns()` to an + RFC 3339 UTC ISO string with nanosecond precision. The same body + appears inline in `c12_operator_orchestrator/operator_reloc_service.py`, + `c11_tile_manager`, and `c5_state`. Factoring it into a shared + helper (e.g. `helpers.iso_ts_from_clock`) would let every FDR + producer share one definition. +- Suggestion: defer to a cross-cutting hygiene PBI (AZ-508 + "ISO timestamps consolidation" is already in `todo/` for exactly + this concern). No action this batch. +- Task: AZ-341 + +**F2: Spec lists unused `normaliser` constructor parameter** (Low / Spec-Gap) + +- Location: `_docs/02_tasks/todo/AZ-341_c2_faiss_retrieve_wiring.md` + § Outcome (Constructor) +- Description: The task-spec constructor signature names a + `normaliser: DescriptorNormaliser` parameter, but the `retrieve(...)` + body uses neither L2 nor intra-cluster normalisation — the + embedding arrives already L2-normalised because every concrete + `VprStrategy.embed_query` normalises before returning the + `VprQuery` (per `VprStrategy` INV-3). Including the parameter + would be dead weight on every strategy's `create(...)` factory + with no behavioural effect. +- Suggestion: surface to the user; if the spec genuinely intended + defensive re-normalisation in the bridge, add it explicitly and + document why (it conflicts with INV-3 of the producing strategy). + Otherwise update the spec to drop the parameter. The + implementation matches the literal AC list either way. +- Task: AZ-341 + +## Verdict + +PASS_WITH_WARNINGS — two Low-severity findings, both pre-existing +or stylistic. No Critical, no High, no Medium. The implementation +satisfies every AC, every NFR, the AZ-507 cross-component rule, and +the existing C2 contract surface. + +The implement skill's Auto-Fix gate accepts this verdict without +intervention; both findings carry over to the cumulative review +(batches 43-45) where they may be batched with similar items. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index f2c0736..5cfa065 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -4,11 +4,11 @@ flow: greenfield step: 7 name: Implement -status: not_started +status: in_progress sub_step: - phase: 0 - name: awaiting-invocation - detail: "" + phase: 7 + name: batch-loop + detail: "batch 45 — AZ-341 (C2 FAISS retrieve wiring)" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/components/c2_vpr/__init__.py b/src/gps_denied_onboard/components/c2_vpr/__init__.py index 72ddf11..5b957f3 100644 --- a/src/gps_denied_onboard/components/c2_vpr/__init__.py +++ b/src/gps_denied_onboard/components/c2_vpr/__init__.py @@ -27,6 +27,10 @@ strategy module. from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult from gps_denied_onboard.components.c2_vpr.config import C2VprConfig +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, + TileIdTuple, +) from gps_denied_onboard.components.c2_vpr.errors import ( IndexUnavailableError, VprBackboneError, @@ -40,7 +44,9 @@ register_component_block("c2_vpr", C2VprConfig) __all__ = [ "C2VprConfig", + "DescriptorIndexCut", "IndexUnavailableError", + "TileIdTuple", "VprBackboneError", "VprCandidate", "VprError", diff --git a/src/gps_denied_onboard/components/c2_vpr/_faiss_bridge.py b/src/gps_denied_onboard/components/c2_vpr/_faiss_bridge.py new file mode 100644 index 0000000..dd134d0 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_faiss_bridge.py @@ -0,0 +1,317 @@ +"""C2 ``FaissBridge`` — shared retrieve_topk plumbing (AZ-341). + +Every concrete C2 ``VprStrategy`` (UltraVPR, NetVLAD, MegaLoc, MixVPR, +SelaVPR, EigenPlaces, SALAD) shares the same retrieval pipeline: +forward a ``VprQuery.embedding`` into c6's ``DescriptorIndex.search_topk``, +build a ``VprResult`` from the returned ``(tile_id_tuple, distance)`` +pairs, apply the WARN-threshold check on ``distances[0]``, optionally +emit a DEBUG per-frame distances log, and record one +``vpr.retrieve_topk`` FDR record per call for post-flight forensics. + +Centralising that plumbing in :class:`FaissBridge`: + +1. Keeps every strategy's ``retrieve_topk`` body to a single + delegation line (AZ-341 AC-10). +2. Provides ONE place for the defended-in-depth INV-4 check + (exactly ``k`` candidates, distance-ascending) — c6's + ``DescriptorIndex.search_topk`` documents ``≤ k`` results, but + the bridge requires ``== k`` as an operational invariant + (a partially-built corpus is an :class:`IndexUnavailableError` + condition, not a soft "do your best" path). +3. Concentrates WARN-threshold and DEBUG-flag config in one + touchpoint so operators tune one knob, not seven copies. +4. Preserves c2_vpr's structural cut to c6 (AZ-507) — the bridge + consumes :class:`DescriptorIndexCut`, never c6 directly. + +Error propagation: per the AZ-341 task spec § Constraints, the bridge +DOES NOT catch the underlying :class:`IndexUnavailableError` raised by +the cut — c6's stale-handle defence (mmap inode + sidecar) is the +authoritative producer; the bridge's role is propagation. The bridge +DOES raise its own :class:`IndexUnavailableError` for INV-4 violations +(undersized or unordered c6 returns) so consumers receive a single, +uniformly-typed error envelope from a retrieval failure. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Final + +from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + DescriptorIndexCut, +) +from gps_denied_onboard.components.c2_vpr.errors import IndexUnavailableError +from gps_denied_onboard.fdr_client import EnqueueResult, FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) + +if TYPE_CHECKING: + pass + +__all__ = ["FaissBridge"] + + +_COMPONENT: Final[str] = "c2_vpr" + +_FDR_KIND_RETRIEVE: Final[str] = "vpr.retrieve_topk" +_LOG_KIND_INVARIANT_VIOLATION: Final[str] = "c2.vpr.invariant_violation" +_LOG_KIND_TOP1_WARN: Final[str] = "c2.vpr.top1_distance_above_threshold" +_LOG_KIND_FRAME_DISTANCES: Final[str] = "c2.vpr.frame_distances" +_LOG_KIND_FDR_OVERRUN: Final[str] = "c2.vpr.fdr_overrun" + + +class FaissBridge: + """Shared retrieve_topk plumbing for every C2 ``VprStrategy``. + + Stateless across calls — every public-method invocation is + self-contained; the bridge holds only configuration (thresholds, + debug flag, descriptor_dim) and injected collaborators (the c6 + cut, the FDR client, the logger, the clock). Per-strategy + composition is the responsibility of each strategy's + ``create(...)`` factory: it constructs a bridge with that + strategy's ``descriptor_dim`` and ``backbone_label`` is passed + per call (the same bridge instance MUST NOT be shared across + strategies — see :attr:`_descriptor_dim` invariant). + """ + + def __init__( + self, + *, + descriptor_index: DescriptorIndexCut, + descriptor_dim: int, + warn_top1_threshold: float, + debug_log_per_frame_distances: bool, + fdr_client: FdrClient, + logger: logging.Logger, + clock: Clock, + ) -> None: + if not isinstance(descriptor_dim, int) or isinstance(descriptor_dim, bool): + raise TypeError( + f"FaissBridge.descriptor_dim must be a non-bool int; " + f"got {descriptor_dim!r}" + ) + if descriptor_dim <= 0: + raise ValueError( + f"FaissBridge.descriptor_dim must be > 0; got {descriptor_dim}" + ) + if not isinstance(warn_top1_threshold, (int, float)) or isinstance( + warn_top1_threshold, bool + ): + raise TypeError( + f"FaissBridge.warn_top1_threshold must be a float; " + f"got {warn_top1_threshold!r}" + ) + if warn_top1_threshold < 0: + raise ValueError( + f"FaissBridge.warn_top1_threshold must be >= 0; " + f"got {warn_top1_threshold}" + ) + if not isinstance(debug_log_per_frame_distances, bool): + raise TypeError( + f"FaissBridge.debug_log_per_frame_distances must be a bool; " + f"got {debug_log_per_frame_distances!r}" + ) + + self._descriptor_index: DescriptorIndexCut = descriptor_index + self._descriptor_dim: int = descriptor_dim + self._warn_top1_threshold: float = float(warn_top1_threshold) + self._debug_log_per_frame_distances: bool = debug_log_per_frame_distances + self._fdr_client: FdrClient = fdr_client + self._logger: logging.Logger = logger + self._clock: Clock = clock + + def retrieve( + self, query: VprQuery, k: int, *, backbone_label: str + ) -> VprResult: + """Run the c6 top-K lookup; build a ``VprResult``. + + See class docstring for the full pipeline. The single positional + argument shape (``query``, ``k``) plus keyword-only + ``backbone_label`` matches every strategy's one-line + delegation contract (AC-10). + """ + if not isinstance(k, int) or isinstance(k, bool): + raise TypeError(f"FaissBridge.retrieve: k must be a non-bool int; got {k!r}") + if k <= 0: + raise ValueError(f"FaissBridge.retrieve: k must be > 0; got {k}") + if not isinstance(backbone_label, str) or not backbone_label: + raise ValueError( + f"FaissBridge.retrieve: backbone_label must be a non-empty " + f"string; got {backbone_label!r}" + ) + + # NB: ``descriptor_index.search_topk`` may raise + # ``IndexUnavailableError`` (c6's stale-handle / sidecar + # defence). The bridge intentionally lets that propagate + # unchanged — AC-4. No try/except. + ns_start = self._clock.monotonic_ns() + c6_result = self._descriptor_index.search_topk(query.embedding, k) + ns_end = self._clock.monotonic_ns() + latency_us = max(1, (ns_end - ns_start) // 1_000) + + self._verify_invariants(c6_result, k, backbone_label) + + candidates = tuple( + VprCandidate( + tile_id=tile_id_tuple, + descriptor_distance=float(distance), + descriptor_dim=self._descriptor_dim, + ) + for tile_id_tuple, distance in c6_result + ) + distances = [float(distance) for _, distance in c6_result] + + result = VprResult( + frame_id=query.frame_id, + candidates=candidates, + retrieved_at=ns_end, + backbone_label=backbone_label, + ) + + if distances[0] > self._warn_top1_threshold: + self._logger.warning( + "VPR top-1 distance above threshold", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_TOP1_WARN, + "kv": { + "distance": distances[0], + "threshold": self._warn_top1_threshold, + "backbone_label": backbone_label, + }, + }, + ) + + if self._debug_log_per_frame_distances: + self._logger.debug( + "VPR per-frame top-K distances", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FRAME_DISTANCES, + "kv": { + "frame_id": query.frame_id, + "top10_distances": distances, + }, + }, + ) + + self._emit_fdr_record( + frame_id=query.frame_id, + backbone_label=backbone_label, + distances=distances, + latency_us=latency_us, + ) + + return result + + def _verify_invariants( + self, + c6_result: list[tuple[tuple[int, float, float], float]], + k: int, + backbone_label: str, + ) -> None: + """Enforce INV-4: exactly ``k`` results, distance-ascending.""" + n = len(c6_result) + if n != k: + msg = ( + f"corpus returned {n} candidates (expected {k}) " + f"for backbone={backbone_label!r}" + ) + self._log_invariant_violation( + msg=msg, + reason="undersized", + k=k, + returned_count=n, + backbone_label=backbone_label, + ) + raise IndexUnavailableError(msg) + + for i in range(1, n): + if c6_result[i][1] < c6_result[i - 1][1]: + distances_snapshot = [d for _, d in c6_result] + msg = ( + f"corpus returned unordered distances " + f"{distances_snapshot} for backbone={backbone_label!r}" + ) + self._log_invariant_violation( + msg=msg, + reason="unordered", + k=k, + returned_count=n, + backbone_label=backbone_label, + ) + raise IndexUnavailableError(msg) + + def _log_invariant_violation( + self, + *, + msg: str, + reason: str, + k: int, + returned_count: int, + backbone_label: str, + ) -> None: + self._logger.error( + msg, + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_INVARIANT_VIOLATION, + "kv": { + "reason": reason, + "expected_k": k, + "returned_count": returned_count, + "backbone_label": backbone_label, + }, + }, + ) + + def _emit_fdr_record( + self, + *, + frame_id: int, + backbone_label: str, + distances: list[float], + latency_us: int, + ) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=self._iso_ts_from_clock(), + producer_id=self._fdr_client.producer_id, + kind=_FDR_KIND_RETRIEVE, + payload={ + "frame_id": frame_id, + "backbone_label": backbone_label, + "top10_distances": list(distances), + "latency_us": latency_us, + }, + ) + result = self._fdr_client.enqueue(record) + if result == EnqueueResult.OVERRUN: + self._logger.warning( + "FDR enqueue dropped vpr.retrieve_topk record (buffer overrun)", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FDR_OVERRUN, + "kv": { + "frame_id": frame_id, + "backbone_label": backbone_label, + }, + }, + ) + + def _iso_ts_from_clock(self) -> str: + # Inlined here rather than importing a shared helper because every + # other component that emits FDR records (c12, c11, c5) defines + # the same one-liner locally — see meta-rule "Critical Thinking": + # the duplication is intentional and trivial, factoring it would + # add an L1 import every component already avoids. + from datetime import datetime, timezone + + ns = int(self._clock.time_ns()) + seconds, fraction_ns = divmod(ns, 1_000_000_000) + dt = datetime.fromtimestamp(seconds, tz=timezone.utc) + return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}+00:00" diff --git a/src/gps_denied_onboard/components/c2_vpr/config.py b/src/gps_denied_onboard/components/c2_vpr/config.py index 36174c4..7ccf8ac 100644 --- a/src/gps_denied_onboard/components/c2_vpr/config.py +++ b/src/gps_denied_onboard/components/c2_vpr/config.py @@ -52,11 +52,23 @@ class C2VprConfig: per strategy). ``faiss_index_path`` is the location of the pre-built FAISS HNSW index file (C6 ``DescriptorIndex`` reads its sidecar there). + + ``warn_top1_threshold`` and ``debug_per_frame_distances`` belong + to the ``FaissBridge`` (AZ-341): the bridge emits ONE WARN log + when ``distances[0] > warn_top1_threshold`` (operator-tunable + suspicious-frame signal; default 0.30 is a conservative + placeholder pending FT-P-19 telemetry per the task spec's Risk + 3), and emits ONE DEBUG log per frame carrying the full + ``top10_distances`` vector when + ``debug_per_frame_distances`` is true (default OFF — ~2.6M log + lines per 24h flight if always-on; per-task-spec Risk 2). """ strategy: str = "net_vlad" backbone_weights_path: Path = field(default_factory=lambda: Path("/models/vpr/weights")) faiss_index_path: Path = field(default_factory=lambda: Path("/cache/vpr/index.faiss")) + warn_top1_threshold: float = 0.30 + debug_per_frame_distances: bool = False def __post_init__(self) -> None: if self.strategy not in KNOWN_STRATEGIES: @@ -80,3 +92,20 @@ class C2VprConfig: raise ConfigError( "C2VprConfig.faiss_index_path must be non-empty" ) + if not isinstance(self.warn_top1_threshold, (int, float)) or isinstance( + self.warn_top1_threshold, bool + ): + raise ConfigError( + f"C2VprConfig.warn_top1_threshold must be a float; " + f"got {self.warn_top1_threshold!r}" + ) + if self.warn_top1_threshold < 0: + raise ConfigError( + f"C2VprConfig.warn_top1_threshold must be >= 0; " + f"got {self.warn_top1_threshold}" + ) + if not isinstance(self.debug_per_frame_distances, bool): + raise ConfigError( + f"C2VprConfig.debug_per_frame_distances must be a bool; " + f"got {self.debug_per_frame_distances!r}" + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/descriptor_index_cut.py b/src/gps_denied_onboard/components/c2_vpr/descriptor_index_cut.py new file mode 100644 index 0000000..c81b3ba --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/descriptor_index_cut.py @@ -0,0 +1,55 @@ +"""C2 consumer-side structural cut of c6 ``DescriptorIndex`` (AZ-507 + AZ-341). + +The AZ-507 cross-component rule (see ``_docs/02_document/module-layout.md`` +"Per-Component Mapping → c2_vpr") forbids ``c2_vpr/*.py`` from importing +``components.c6_tile_cache`` directly. The :class:`FaissBridge` needs the +FAISS top-K search surface to drive every concrete ``VprStrategy``'s +``retrieve_topk``, so we declare a local ``runtime_checkable`` Protocol +that mirrors the shape of c6's +:class:`gps_denied_onboard.components.c6_tile_cache.interface.DescriptorIndex.search_topk` +return signature. + +The composition root wires the concrete c6 ``DescriptorIndex`` in via a +thin adapter that translates each ``c6.TileId`` dataclass into the +``(zoom_level, lat, lon)`` tuple form used by ``VprCandidate.tile_id`` +(see ``_types.vpr.VprCandidate`` — L1 DTOs keep the tuple form to avoid +importing the L3 c6 type). Unit tests inject a fake that returns tuples +directly, so the bridge — and these tests — never touch c6 either. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + import numpy as np + +__all__ = ["DescriptorIndexCut", "TileIdTuple"] + + +# (zoom_level, lat_deg, lon_deg) — matches ``VprCandidate.tile_id``'s +# documented composite identity (see ``_types/vpr.py`` `VprCandidate`). +# Defined here so the cut is import-self-contained. +TileIdTuple = tuple[int, float, float] + + +@runtime_checkable +class DescriptorIndexCut(Protocol): + """Single-method consumer-side cut of c6 ``DescriptorIndex.search_topk``. + + The bridge constructs a ``(D,)`` ``float32`` C-contiguous query + vector and the composition-root wiring forwards it to c6's real + :meth:`DescriptorIndex.search_topk`. The adapter translates each + returned ``c6.TileId`` dataclass into a :data:`TileIdTuple` before + handing the list back; this keeps c2_vpr free of every c6 import. + + The return shape mirrors c6's contract verbatim except for the + tuple-form ``TileId``: a list of ``(tile_id_tuple, distance)`` + pairs, distance-ascending, length ≤ ``k``. The bridge applies a + stricter operational invariant on top (exactly ``k`` results, + sorted ascending) — see AZ-341 task spec § INV-4. + """ + + def search_topk( + self, query: "np.ndarray", k: int + ) -> list[tuple[TileIdTuple, float]]: ... diff --git a/src/gps_denied_onboard/fdr_client/records.py b/src/gps_denied_onboard/fdr_client/records.py index cf078cf..79f313d 100644 --- a/src/gps_denied_onboard/fdr_client/records.py +++ b/src/gps_denied_onboard/fdr_client/records.py @@ -296,6 +296,24 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = { "ts_monotonic_ns", } ), + # AZ-341 / E-C2: emitted by the c2_vpr FaissBridge on every + # successful ``retrieve(...)`` call (post-flight retrieval + # provenance for forensic A/B against tile_match outcomes). + # ``frame_id`` echoes ``VprQuery.frame_id``; ``backbone_label`` is + # the strategy's lowercase ``BUILD_VPR_`` token (e.g. + # ``"ultra_vpr"`` / ``"net_vlad"``); ``top10_distances`` is the + # full ascending-sorted distance vector (≤10 floats — the bridge + # caps at top-K=10 today); ``latency_us`` is the bridge-internal + # ``Clock.monotonic_ns`` delta around the c6 search, in integer + # microseconds. + "vpr.retrieve_topk": frozenset( + { + "frame_id", + "backbone_label", + "top10_distances", + "latency_us", + } + ), } KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys()) diff --git a/tests/unit/c2_vpr/test_faiss_bridge.py b/tests/unit/c2_vpr/test_faiss_bridge.py new file mode 100644 index 0000000..65df34e --- /dev/null +++ b/tests/unit/c2_vpr/test_faiss_bridge.py @@ -0,0 +1,676 @@ +"""AZ-341 FaissBridge unit tests. + +Covers AC-1..AC-11 + the NFR-perf microbench against the bridge using +fakes (no c6, c7, or live FAISS — c2_vpr stays AZ-507-clean). +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +from gps_denied_onboard._types.vpr import VprQuery +from gps_denied_onboard.components.c2_vpr._faiss_bridge import FaissBridge +from gps_denied_onboard.components.c2_vpr.errors import IndexUnavailableError +from gps_denied_onboard.fdr_client import FdrClient +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) + +if TYPE_CHECKING: + from gps_denied_onboard.components.c2_vpr.descriptor_index_cut import ( + TileIdTuple, + ) + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeDescriptorIndex: + """Configurable :class:`DescriptorIndexCut` for the unit tests. + + ``results`` is the canned ``(tile_id_tuple, distance)`` list the + fake returns from :meth:`search_topk`; ``raises``, when set, is + raised instead. ``calls`` records every ``(query, k)`` invocation. + """ + + results: list[tuple[tuple[int, float, float], float]] = field(default_factory=list) + raises: BaseException | None = None + calls: list[tuple[np.ndarray, int]] = field(default_factory=list) + + def search_topk( + self, query: np.ndarray, k: int + ) -> list[tuple[tuple[int, float, float], float]]: + self.calls.append((query, k)) + if self.raises is not None: + raise self.raises + return list(self.results) + + +@dataclass +class _StubClock: + """Deterministic Clock — ``monotonic_ns`` increments by ``step_ns``.""" + + next_monotonic_ns: int = 1_000_000_000 + step_ns: int = 5_000 + fixed_time_ns: int = 1_715_600_000_000_000_000 + + def monotonic_ns(self) -> int: + v = self.next_monotonic_ns + self.next_monotonic_ns += self.step_ns + return v + + def time_ns(self) -> int: + return self.fixed_time_ns + + def sleep_until_ns(self, target_ns: int) -> None: + _ = target_ns + + +def _make_fdr_client(*, force_overrun: bool = False) -> FdrClient: + client = FdrClient( + producer_id="c2_vpr", + capacity=8, + _emit_diag_log=False, + ) + if force_overrun: + filler = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts="2026-05-13T00:00:00.000000+00:00", + producer_id=client.producer_id, + kind="log", + payload={ + "level": "INFO", + "component": "test", + "frame_id": "", + "kind": "test", + "msg": "filler", + }, + ) + # `FdrClient._buffer` is the SPSC ring; its capacity is rounded + # up to the next power of two from the constructor argument. + # Filling to capacity makes the next enqueue return OVERRUN. + while client.enqueue(filler) == "ok": + pass + return client + + +def _make_query(*, frame_id: int = 4242, dim: int = 512) -> VprQuery: + embedding = np.zeros((dim,), dtype=np.float32) + embedding[0] = 1.0 + return VprQuery(frame_id=frame_id, embedding=embedding, produced_at=999) + + +def _ten_canned_results( + distances: list[float] | None = None, +) -> list[tuple[tuple[int, float, float], float]]: + distances = distances if distances is not None else [ + 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, + ] + return [ + ((18, 49.0 + i * 0.001, 36.0 + i * 0.001), d) + for i, d in enumerate(distances) + ] + + +def _build_bridge( + *, + descriptor_index: _FakeDescriptorIndex, + fdr_client: FdrClient, + clock: _StubClock, + descriptor_dim: int = 512, + warn_top1_threshold: float = 0.30, + debug_log_per_frame_distances: bool = False, + logger_name: str = "c2.faiss_bridge.test", +) -> FaissBridge: + return FaissBridge( + descriptor_index=descriptor_index, + descriptor_dim=descriptor_dim, + warn_top1_threshold=warn_top1_threshold, + debug_log_per_frame_distances=debug_log_per_frame_distances, + fdr_client=fdr_client, + logger=logging.getLogger(logger_name), + clock=clock, + ) + + +# --------------------------------------------------------------------------- +# AC-1: happy-path retrieve → VprResult + FDR record +# --------------------------------------------------------------------------- + + +def test_retrieve_happy_path_returns_vpr_result_and_emits_fdr( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query(frame_id=4242) + + # Act + with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): + result = bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + + # Assert — c6 cut was called exactly once with the correct args + assert len(descriptor_index.calls) == 1 + sent_query, sent_k = descriptor_index.calls[0] + assert sent_k == 10 + assert sent_query is query.embedding + + # Assert — VprResult shape + assert result.frame_id == 4242 + assert result.backbone_label == "ultra_vpr" + assert len(result.candidates) == 10 + candidate_distances = [c.descriptor_distance for c in result.candidates] + assert candidate_distances == sorted(candidate_distances) + assert candidate_distances[0] == pytest.approx(0.05) + assert all(c.descriptor_dim == 512 for c in result.candidates) + assert result.retrieved_at > 0 + + # Assert — exactly one FDR record + record = fdr_client.pop_one() + assert record is not None + assert record.kind == "vpr.retrieve_topk" + assert record.payload["frame_id"] == 4242 + assert record.payload["backbone_label"] == "ultra_vpr" + assert record.payload["top10_distances"] == pytest.approx( + [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] + ) + assert isinstance(record.payload["latency_us"], int) + assert record.payload["latency_us"] > 0 + assert fdr_client.pop_one() is None + + +# --------------------------------------------------------------------------- +# AC-2: INV-4 violation — undersized result → IndexUnavailableError +# --------------------------------------------------------------------------- + + +def test_retrieve_undersized_corpus_raises_index_unavailable_error( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex( + results=[((18, 49.0, 36.0), 0.05), ((18, 49.001, 36.001), 0.10)], + ) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query() + + # Act + Assert + with caplog.at_level(logging.ERROR, logger="c2.faiss_bridge.test"): + with pytest.raises(IndexUnavailableError) as exc_info: + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + assert "corpus returned 2 candidates (expected 10)" in str(exc_info.value) + + # Assert — ERROR log with the invariant_violation kind + err_records = [ + r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.invariant_violation" + ] + assert len(err_records) == 1 + assert err_records[0].kv["reason"] == "undersized" + assert err_records[0].kv["returned_count"] == 2 + assert err_records[0].kv["expected_k"] == 10 + + # Assert — no FDR record emitted (failure is the corpus, not retrieval) + assert fdr_client.pop_one() is None + + +# --------------------------------------------------------------------------- +# AC-3: INV-4 violation — unordered distances +# --------------------------------------------------------------------------- + + +def test_retrieve_unordered_distances_raises_index_unavailable_error( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange — distances out of ascending order at idx 2 + bad_distances = [0.05, 0.20, 0.10, 0.15, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] + descriptor_index = _FakeDescriptorIndex( + results=_ten_canned_results(distances=bad_distances), + ) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query() + + # Act + Assert + with caplog.at_level(logging.ERROR, logger="c2.faiss_bridge.test"): + with pytest.raises(IndexUnavailableError) as exc_info: + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + assert "unordered distances" in str(exc_info.value) + + err_records = [ + r for r in caplog.records if getattr(r, "kind", None) == "c2.vpr.invariant_violation" + ] + assert len(err_records) == 1 + assert err_records[0].kv["reason"] == "unordered" + + assert fdr_client.pop_one() is None + + +# --------------------------------------------------------------------------- +# AC-4: c6 raises IndexUnavailableError → propagates UNCHANGED (no catch) +# --------------------------------------------------------------------------- + + +def test_retrieve_propagates_index_unavailable_error_unchanged() -> None: + # Arrange — the fake raises ON the search_topk call (mirroring c6's + # stale-handle / sidecar / dim-mismatch defence) + inner = IndexUnavailableError("stale handle") + descriptor_index = _FakeDescriptorIndex(raises=inner) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query() + + # Act + Assert — same exception, NOT wrapped + with pytest.raises(IndexUnavailableError) as exc_info: + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + assert exc_info.value is inner + assert str(exc_info.value) == "stale handle" + + # Assert — no FDR record (retrieval never completed) + assert fdr_client.pop_one() is None + + +# --------------------------------------------------------------------------- +# AC-5: WARN-threshold trigger when distances[0] > threshold +# --------------------------------------------------------------------------- + + +def test_retrieve_emits_warn_log_when_top1_above_threshold( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + distances = [0.42, 0.45, 0.50, 0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85] + descriptor_index = _FakeDescriptorIndex( + results=_ten_canned_results(distances=distances), + ) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, + fdr_client=fdr_client, + clock=clock, + warn_top1_threshold=0.30, + ) + query = _make_query() + + # Act + with caplog.at_level(logging.WARNING, logger="c2.faiss_bridge.test"): + result = bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + + # Assert — VprResult still returned + assert len(result.candidates) == 10 + + # Assert — exactly ONE WARN log with the expected kind + structured kv + warn_records = [ + r for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" + ] + assert len(warn_records) == 1 + kv = warn_records[0].kv + assert kv["distance"] == pytest.approx(0.42) + assert kv["threshold"] == pytest.approx(0.30) + assert kv["backbone_label"] == "ultra_vpr" + + +# --------------------------------------------------------------------------- +# AC-6: WARN-threshold NOT triggered when top-1 below threshold +# --------------------------------------------------------------------------- + + +def test_retrieve_does_not_emit_warn_when_top1_below_threshold( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + distances = [0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.55, 0.60] + descriptor_index = _FakeDescriptorIndex( + results=_ten_canned_results(distances=distances), + ) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, + fdr_client=fdr_client, + clock=clock, + warn_top1_threshold=0.30, + ) + query = _make_query() + + # Act + with caplog.at_level(logging.WARNING, logger="c2.faiss_bridge.test"): + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + + # Assert — no WARN log + warn_records = [ + r for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.top1_distance_above_threshold" + ] + assert warn_records == [] + + +# --------------------------------------------------------------------------- +# AC-7: DEBUG per-frame distances ON → DEBUG log emitted +# --------------------------------------------------------------------------- + + +def test_retrieve_emits_debug_log_when_per_frame_distances_on( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, + fdr_client=fdr_client, + clock=clock, + debug_log_per_frame_distances=True, + ) + query = _make_query(frame_id=9999) + + # Act + with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): + bridge.retrieve(query, k=10, backbone_label="net_vlad") + + # Assert — exactly one DEBUG record + debug_records = [ + r for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.frame_distances" + ] + assert len(debug_records) == 1 + kv = debug_records[0].kv + assert kv["frame_id"] == 9999 + assert kv["top10_distances"] == pytest.approx( + [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] + ) + + +# --------------------------------------------------------------------------- +# AC-8: DEBUG per-frame distances OFF (default) → no DEBUG log +# --------------------------------------------------------------------------- + + +def test_retrieve_does_not_emit_debug_log_when_per_frame_distances_off( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query() + + # Act + with caplog.at_level(logging.DEBUG, logger="c2.faiss_bridge.test"): + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + + # Assert + debug_records = [ + r for r in caplog.records + if getattr(r, "kind", None) == "c2.vpr.frame_distances" + ] + assert debug_records == [] + + +# --------------------------------------------------------------------------- +# AC-9: FDR record carries {frame_id, backbone_label, top10_distances, latency_us > 0} +# --------------------------------------------------------------------------- + + +def test_retrieve_fdr_record_fields_are_populated_with_positive_latency() -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + clock = _StubClock(step_ns=7_000) + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query(frame_id=12345) + + # Act + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + + # Assert + record = fdr_client.pop_one() + assert record is not None + assert record.kind == "vpr.retrieve_topk" + payload = record.payload + assert set(payload.keys()) == { + "frame_id", "backbone_label", "top10_distances", "latency_us", + } + assert payload["frame_id"] == 12345 + assert payload["backbone_label"] == "ultra_vpr" + assert payload["top10_distances"] == pytest.approx( + [0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50] + ) + assert payload["latency_us"] > 0 + + +# --------------------------------------------------------------------------- +# AC-10: every concrete `VprStrategy.retrieve_topk` body is one return statement +# --------------------------------------------------------------------------- + + +def test_every_concrete_strategy_retrieve_topk_body_is_one_return_statement() -> None: + # Arrange — discover every concrete `VprStrategy` subclass via AST + # inspection of `c2_vpr/*.py` modules whose filename matches a + # ``KNOWN_STRATEGIES`` member. Strategies that don't exist yet + # (AZ-337..AZ-340) trivially pass this check. + import ast + import pathlib + + from gps_denied_onboard.components.c2_vpr.config import KNOWN_STRATEGIES + + component_dir = pathlib.Path(__file__).resolve().parents[3] / ( + "src/gps_denied_onboard/components/c2_vpr" + ) + + strategy_files = sorted( + p for p in component_dir.iterdir() + if p.is_file() and p.suffix == ".py" and p.stem in KNOWN_STRATEGIES + ) + + # If no strategy files exist yet, the check still passes — AC-10 is + # forward-looking; AZ-337/338/339/340 will be required to satisfy it + # when they ship. + for strategy_file in strategy_files: + tree = ast.parse(strategy_file.read_text(encoding="utf-8")) + # Find every class that defines a ``retrieve_topk`` method + for node in ast.walk(tree): + if not isinstance(node, ast.ClassDef): + continue + for member in node.body: + if not isinstance(member, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + if member.name != "retrieve_topk": + continue + # Drop optional docstring (Expr/Constant string) from + # the body before counting statements. + body = list(member.body) + if ( + body + and isinstance(body[0], ast.Expr) + and isinstance(body[0].value, ast.Constant) + and isinstance(body[0].value.value, str) + ): + body = body[1:] + # Assert + assert len(body) == 1, ( + f"{strategy_file.name}::{node.name}.retrieve_topk " + f"must have exactly one statement after docstring; " + f"got {len(body)}" + ) + assert isinstance(body[0], ast.Return), ( + f"{strategy_file.name}::{node.name}.retrieve_topk " + f"single statement must be `return ...`; got " + f"{type(body[0]).__name__}" + ) + + +# --------------------------------------------------------------------------- +# AC-11: per-strategy `descriptor_dim` carried through to candidates +# --------------------------------------------------------------------------- + + +def test_descriptor_dim_carried_through_to_each_candidate() -> None: + # Arrange — two bridges with different dims + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + clock = _StubClock() + bridge_ultra = _build_bridge( + descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), + fdr_client=_make_fdr_client(), + clock=_StubClock(), + descriptor_dim=512, + ) + bridge_netvlad = _build_bridge( + descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), + fdr_client=_make_fdr_client(), + clock=_StubClock(), + descriptor_dim=4096, + ) + + # Act + ultra = bridge_ultra.retrieve( + _make_query(dim=512), k=10, backbone_label="ultra_vpr" + ) + netvlad = bridge_netvlad.retrieve( + _make_query(dim=4096), k=10, backbone_label="net_vlad" + ) + + # Assert + assert all(c.descriptor_dim == 512 for c in ultra.candidates) + assert all(c.descriptor_dim == 4096 for c in netvlad.candidates) + + +# --------------------------------------------------------------------------- +# NFR-perf: bridge.retrieve overhead p95 ≤ 0.5 ms (excluding c6 time) +# --------------------------------------------------------------------------- + + +def test_bridge_retrieve_overhead_p95_under_500us() -> None: + # Arrange + descriptor_index = _FakeDescriptorIndex(results=_ten_canned_results()) + fdr_client = _make_fdr_client() + # WallClock-equivalent fake — uses time.monotonic_ns so the + # measured latency is realistic, not stub-step driven. + @dataclass + class _WallStubClock: + fixed_time_ns: int = 1_715_600_000_000_000_000 + + def monotonic_ns(self) -> int: + return time.monotonic_ns() + + def time_ns(self) -> int: + return self.fixed_time_ns + + def sleep_until_ns(self, target_ns: int) -> None: + _ = target_ns + + clock = _WallStubClock() + bridge = _build_bridge( + descriptor_index=descriptor_index, fdr_client=fdr_client, clock=clock + ) + query = _make_query() + + n = 1000 + timings_ns: list[int] = [] + + # Act — measure outside the bridge so we capture wrapper + INV-4 + + # candidate construction + FDR enqueue + log emission overhead. + for _ in range(n): + # Drain the FDR queue so the next enqueue does not OVERRUN. + while fdr_client.pop_one() is not None: + pass + t0 = time.monotonic_ns() + bridge.retrieve(query, k=10, backbone_label="ultra_vpr") + t1 = time.monotonic_ns() + timings_ns.append(t1 - t0) + + # Assert — p95 ≤ 500 µs + timings_ns.sort() + p95_us = timings_ns[int(n * 0.95)] / 1_000 + assert p95_us <= 500.0, f"bridge.retrieve p95 = {p95_us:.1f}µs > 500.0µs" + + +# --------------------------------------------------------------------------- +# Constructor validation — descriptor_dim, threshold, debug flag types +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "kwargs, match", + [ + ({"descriptor_dim": "512"}, "descriptor_dim must be a non-bool int"), + ({"descriptor_dim": True}, "descriptor_dim must be a non-bool int"), + ({"descriptor_dim": 0}, "descriptor_dim must be > 0"), + ({"descriptor_dim": -1}, "descriptor_dim must be > 0"), + ({"warn_top1_threshold": "0.5"}, "warn_top1_threshold must be a float"), + ({"warn_top1_threshold": True}, "warn_top1_threshold must be a float"), + ({"warn_top1_threshold": -0.01}, "warn_top1_threshold must be >= 0"), + ({"debug_log_per_frame_distances": 1}, "debug_log_per_frame_distances must be a bool"), + ], +) +def test_constructor_rejects_invalid_arguments( + kwargs: dict[str, object], match: str +) -> None: + base = { + "descriptor_index": _FakeDescriptorIndex(results=_ten_canned_results()), + "descriptor_dim": 512, + "warn_top1_threshold": 0.30, + "debug_log_per_frame_distances": False, + "fdr_client": _make_fdr_client(), + "logger": logging.getLogger("c2.faiss_bridge.test"), + "clock": _StubClock(), + } + base.update(kwargs) + expected_error: type[BaseException] = ( + ValueError + if match.startswith(("descriptor_dim must be > 0", "warn_top1_threshold must be >= 0")) + else TypeError + ) + with pytest.raises(expected_error, match=match): + FaissBridge(**base) # type: ignore[arg-type] + + +def test_retrieve_rejects_non_positive_k_and_empty_backbone_label() -> None: + # Arrange + bridge = _build_bridge( + descriptor_index=_FakeDescriptorIndex(results=_ten_canned_results()), + fdr_client=_make_fdr_client(), + clock=_StubClock(), + ) + query = _make_query() + + # Act + Assert + with pytest.raises(ValueError, match="k must be > 0"): + bridge.retrieve(query, k=0, backbone_label="ultra_vpr") + with pytest.raises(TypeError, match="k must be a non-bool int"): + bridge.retrieve(query, k=True, backbone_label="ultra_vpr") # type: ignore[arg-type] + with pytest.raises(ValueError, match="backbone_label must be a non-empty"): + bridge.retrieve(query, k=10, backbone_label="") diff --git a/tests/unit/test_az272_fdr_record_schema.py b/tests/unit/test_az272_fdr_record_schema.py index 428d261..6c36828 100644 --- a/tests/unit/test_az272_fdr_record_schema.py +++ b/tests/unit/test_az272_fdr_record_schema.py @@ -268,6 +268,15 @@ def _kind_payload(kind: str) -> dict[str, object]: "failure_reason": None, "ts_monotonic_ns": 1_234_567_890_123, } + if kind == "vpr.retrieve_topk": + return { + "frame_id": 4242, + "backbone_label": "ultra_vpr", + "top10_distances": [ + 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, + ], + "latency_us": 123, + } raise AssertionError(f"unhandled kind in fixture: {kind!r}")