From 3665acef66627b3a51112d0ca85b05d4a8fdc27f Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Tue, 12 May 2026 05:25:35 +0300 Subject: [PATCH] [AZ-336] C2 VprStrategy: Protocol + DTOs + factory + composition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundational scaffolding for every concrete C2 backbone (UltraVPR, NetVLAD, MegaLoc, MixVPR, SelaVPR, EigenPlaces, SALAD — AZ-337..AZ-340) and the C2.5 ReRanker consumer side. No backbone is implemented here. * VprStrategy Protocol (embed_query / retrieve_topk / descriptor_dim) + BackbonePreprocessor C2-internal Protocol (NOT in Public API per description.md § 6). * DTOs in L1 _types/vpr.py — VprQuery, VprCandidate, VprResult; all frozen + slots; tuple-not-list for VprResult.candidates so the immutability invariant truly holds. * Error family: VprError + VprBackboneError + VprPreprocessError + IndexUnavailableError; same-named but namespace-distinct from c6_tile_cache.IndexUnavailableError (the c2 family is the closed envelope C5 / C2.5 consume; concrete strategies rewrap the C6 form). * C2VprConfig (strategy enum + backbone_weights_path + faiss_index_path) with strict validation at load; registered into Config.components on c2_vpr import. * build_vpr_strategy factory with 7-strategy resolution table, lazy import, BUILD_VPR_ gating, ImportError→ StrategyNotAvailableError mapping, and pre-flight descriptor_dim match against DescriptorIndex.descriptor_dim() — mismatch fires ConfigError at startup, NOT at first frame. Contract change vs the v1.0.0 draft: factory takes descriptor_index: DescriptorIndex (not tile_store: TileStore) because descriptor_dim() lives on DescriptorIndex per C6's Public API. The contract markdown is updated to match. Architecture: VprCandidate.tile_id is a plain (zoom, lat, lon) tuple, keeping _types/ (L1) free of any c6_tile_cache (L3) import per module-layout.md. Consumers reconstruct TileId at the C6 boundary. Excluded per task spec: * Concrete backbones (AZ-337..AZ-340). * FAISS HNSW retrieve wiring (AZ-341). * DescriptorNormaliser helper (AZ-283, already shipped). * AC-9 single-thread binding — deferred per task spec Risk 4 until the generic compose_root thread-binding registry is in place (today each factory owns its own, e.g. fc_factory). Tests: 45 ACs + NFRs in tests/unit/c2_vpr/test_protocol_conformance.py covering AC-1..AC-8, the error family, the config validation, the factory NFR (p99 ≤ 50 ms). The legacy smoke test is removed. Full sweep 973 passed, 2 skipped (CI-only cmake / actionlint). Co-authored-by: Cursor --- .../contracts/c2_vpr/vpr_strategy_protocol.md | 37 +- .../AZ-336_c2_vpr_strategy_protocol.md | 0 _docs/_autodev_state.md | 6 +- src/gps_denied_onboard/_types/vpr.py | 94 +++- .../components/c2_vpr/__init__.py | 51 +- .../components/c2_vpr/_preprocessor.py | 60 ++ .../components/c2_vpr/config.py | 82 +++ .../components/c2_vpr/errors.py | 66 +++ .../components/c2_vpr/interface.py | 110 +++- .../runtime_root/vpr_factory.py | 214 +++++++ .../unit/c2_vpr/test_protocol_conformance.py | 528 ++++++++++++++++++ tests/unit/c2_vpr/test_smoke.py | 10 - 12 files changed, 1202 insertions(+), 56 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-336_c2_vpr_strategy_protocol.md (100%) create mode 100644 src/gps_denied_onboard/components/c2_vpr/_preprocessor.py create mode 100644 src/gps_denied_onboard/components/c2_vpr/config.py create mode 100644 src/gps_denied_onboard/components/c2_vpr/errors.py create mode 100644 src/gps_denied_onboard/runtime_root/vpr_factory.py create mode 100644 tests/unit/c2_vpr/test_protocol_conformance.py delete mode 100644 tests/unit/c2_vpr/test_smoke.py diff --git a/_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md b/_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md index 84be2a3..b3daa0f 100644 --- a/_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md +++ b/_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md @@ -4,7 +4,7 @@ **Producer task**: AZ-336 (`VprStrategy` Protocol + factory + composition) **Consumer tasks**: AZ-337 (UltraVPR), AZ-338 (NetVLAD baseline), AZ-339 (MegaLoc + MixVPR), AZ-340 (SelaVPR + EigenPlaces + SALAD), AZ-341 (FAISS HNSW retrieve wiring), and downstream c2_5_rerank (AZ-256 / E-C2.5) **Module-layout home**: `src/gps_denied_onboard/components/c2_vpr/interface.py` (Protocols), `src/gps_denied_onboard/components/c2_vpr/__init__.py` (re-exports), `src/gps_denied_onboard/runtime_root/vpr_factory.py` (factory) -**Status**: draft, awaiting AZ-336 implementation +**Status**: v1.0.0 (AZ-336 implemented 2026-05-12) ## Purpose @@ -69,25 +69,23 @@ class VprStrategy(Protocol): ```python from dataclasses import dataclass -from uuid import UUID -import numpy as np @dataclass(frozen=True, slots=True) class VprQuery: """Backbone embedding for a single nav-camera frame. Produced by `VprStrategy.embed_query`; consumed by `VprStrategy.retrieve_topk` (same instance) or — in the C10 corpus-build path — by `DescriptorIndexBuilder` to populate the corpus descriptor matrix.""" - frame_id: UUID - embedding: np.ndarray # shape (D,), dtype float16 or float32; L2-normalised - produced_at: int # monotonic_ns + frame_id: int # echoes NavCameraFrame.frame_id (the source carries int across the pipeline) + embedding: object # numpy.ndarray, shape (D,), dtype float16|float32; L2-normalised. typed object to keep _types/ free of numpy import-time dep. + produced_at: int # monotonic_ns from injected Clock @dataclass(frozen=True, slots=True) class VprCandidate: """One retrieval candidate from the top-K result.""" - tile_id: tuple # composite (zoomLevel, lat, lon); see C6 TileRecord - descriptor_distance: float # backbone-specific metric (cosine for L2-normalised; Euclidean for raw) + tile_id: tuple[int, float, float] # composite (zoom_level, lat, lon); matches c6_tile_cache.TileId. tuple form keeps _types/ free of an L1→L3 import per module-layout.md. + descriptor_distance: float # backbone-specific metric (cosine for L2-normalised; Euclidean for raw) descriptor_dim: int @@ -95,10 +93,10 @@ class VprCandidate: class VprResult: """Top-K candidates from `VprStrategy.retrieve_topk`. Consumed by C2.5 ReRanker.""" - frame_id: UUID - candidates: list[VprCandidate] # length == k, sorted ascending by descriptor_distance - retrieved_at: int # monotonic_ns - backbone_label: str # non-empty; matches BUILD_VPR_ lowercase + frame_id: int # echoes the source NavCameraFrame.frame_id + candidates: tuple[VprCandidate, ...] # length == k, ascending by descriptor_distance. tuple (not list) so the frozen+slots invariant holds. + retrieved_at: int # monotonic_ns from injected Clock + backbone_label: str # non-empty; matches BUILD_VPR_ lowercase ``` ### Protocol: `BackbonePreprocessor` (C2-internal; lives in `c2_vpr/_preprocessor.py`) @@ -155,18 +153,21 @@ class IndexUnavailableError(VprError): # src/gps_denied_onboard/runtime_root/vpr_factory.py from typing import TYPE_CHECKING -from gps_denied_onboard.config import Config +from gps_denied_onboard.config.schema import Config from gps_denied_onboard.components.c2_vpr import VprStrategy -from gps_denied_onboard.components.c6_tile_cache import TileStore +from gps_denied_onboard.components.c6_tile_cache import DescriptorIndex from gps_denied_onboard.components.c7_inference import InferenceRuntime def build_vpr_strategy( config: Config, - tile_store: TileStore, + *, + descriptor_index: DescriptorIndex, inference_runtime: InferenceRuntime, ) -> VprStrategy: - """Composition-root factory. Reads `config.vpr.strategy` and `config.vpr.backbone_weights_path`; lazy-imports the concrete strategy module gated by its CMake `BUILD_VPR_` flag; refuses to instantiate a strategy whose flag is OFF (raises `ConfigurationError` pointing at the offending strategy name + missing flag). + """Composition-root factory. Reads `config.components['c2_vpr'].strategy` and `config.components['c2_vpr'].backbone_weights_path`; lazy-imports the concrete strategy module gated by its CMake `BUILD_VPR_` flag; refuses to instantiate a strategy whose flag is OFF (raises `StrategyNotAvailableError` pointing at the offending strategy name + missing flag). + + `descriptor_index` (NOT `tile_store`) is injected: the pre-flight `descriptor_dim` validation reads from the C6 `DescriptorIndex.descriptor_dim()` which is the Public API that owns the FAISS index sidecar. The contract draft earlier named this parameter `tile_store`; the implementation moved it to match C6's actual Public API. Strategy resolution table: @@ -180,9 +181,9 @@ def build_vpr_strategy( | "eigen_places" | EigenPlacesStrategy | components.c2_vpr.eigen_places | BUILD_VPR_EIGENPLACES | | "salad" | SaladStrategy | components.c2_vpr.salad | BUILD_VPR_SALAD | - Pre-flight validation: after constructing the strategy, the factory queries `strategy.descriptor_dim()` and asserts it matches the C6 corpus index's declared `descriptor_dim` (read from the FAISS index sidecar). Mismatch → `ConfigurationError` at startup, NOT at first frame. + Pre-flight validation: after constructing the strategy, the factory queries `strategy.descriptor_dim()` and asserts it matches `descriptor_index.descriptor_dim()` (the FAISS index sidecar value). Mismatch → `ConfigError` at startup, NOT at first frame. - Returns a fully-constructed strategy ready for `embed_query` / `retrieve_topk` invocation. The caller (runtime root) is responsible for binding the instance to one ingest thread. + Returns a fully-constructed strategy ready for `embed_query` / `retrieve_topk` invocation. The caller (runtime root) is responsible for binding the instance to one ingest thread (AC-9 deferred until the generic compose_root thread-binding registry is in place; see task spec Risk 4). """ ... ``` diff --git a/_docs/02_tasks/todo/AZ-336_c2_vpr_strategy_protocol.md b/_docs/02_tasks/done/AZ-336_c2_vpr_strategy_protocol.md similarity index 100% rename from _docs/02_tasks/todo/AZ-336_c2_vpr_strategy_protocol.md rename to _docs/02_tasks/done/AZ-336_c2_vpr_strategy_protocol.md diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3c9b827..e55ef6e 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 14 - name: cumulative-code-review - detail: "PASS after F1+F2 remediation in-session; F3 informational; ready for batch 23" + phase: 2 + name: detect-progress + detail: "batch 23 complete (AZ-303, AZ-297, AZ-331, AZ-398 per-task commits); selecting batch 24" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/_types/vpr.py b/src/gps_denied_onboard/_types/vpr.py index 9129a45..4594ec5 100644 --- a/src/gps_denied_onboard/_types/vpr.py +++ b/src/gps_denied_onboard/_types/vpr.py @@ -1,34 +1,98 @@ -"""C2 VPR + C2.5 rerank DTOs.""" +"""C2 VPR + C2.5 rerank DTOs (L1 cross-component layer). + +The C2 trio (:class:`VprQuery`, :class:`VprCandidate`, :class:`VprResult`) +is frozen by ``contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0 (AZ-336): +slotted, immutable, no defaults, and stamped with the producer's +``monotonic_ns`` so the C13 FDR record can correlate the embed→retrieve +hop without a wall-clock dependency. + +``RerankResult`` is the legacy C2.5 stub kept untouched for the AZ-342 +contract rewrite — touching it here would expand AZ-336 scope into a +sibling task. Once AZ-342 lands, ``RerankResult`` moves to its v1.0.0 +shape. +""" from __future__ import annotations -from dataclasses import dataclass, field -from datetime import datetime -from typing import Any +from dataclasses import dataclass + +__all__ = [ + "RerankResult", + "VprCandidate", + "VprQuery", + "VprResult", +] -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class VprQuery: - """A VPR query (global descriptor + frame metadata).""" + """Backbone embedding for one nav-camera frame. + + Produced by :meth:`VprStrategy.embed_query`; consumed by + :meth:`VprStrategy.retrieve_topk` on the same instance (same + ingest thread per INV-1) or — in the offline C10 corpus-build + path — by the descriptor index builder. + + ``frame_id`` echoes :attr:`NavCameraFrame.frame_id` (int); the + contract documented ``UUID`` in draft but every other in-pipeline + DTO routed from a single frame carries the source ``int`` so + consumers can correlate without an extra mapping step. + ``embedding`` is L2-normalised by the strategy before return + (INV-3); ``produced_at`` is ``monotonic_ns`` from an injected + :class:`gps_denied_onboard.clock.Clock`. + """ frame_id: int - timestamp: datetime - global_descriptor: Any - metadata: dict[str, Any] = field(default_factory=dict) + embedding: object # numpy.ndarray, shape (D,), dtype float16|float32 + produced_at: int -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) +class VprCandidate: + """One retrieval candidate from a top-K result. + + ``tile_id`` is the composite ``(zoom_level, lat, lon)`` tuple + matching :class:`gps_denied_onboard.components.c6_tile_cache.TileId`; + the tuple form keeps this L1 DTO free of an L3 component import + (the contract spells it ``tuple`` per the architecture layering + rule). Consumers reconstruct ``TileId`` at the C6 boundary. + + ``descriptor_distance`` is the backbone-specific metric (cosine + on L2-normalised embeddings, Euclidean on raw); the strategy + sorts the parent ``VprResult.candidates`` tuple ascending by + this field (INV-4). ``descriptor_dim`` is echoed from the + strategy so a downstream FDR audit can verify the strategy↔index + dim match after the flight. + """ + + tile_id: tuple[int, float, float] + descriptor_distance: float + descriptor_dim: int + + +@dataclass(frozen=True, slots=True) class VprResult: - """Top-K candidates from C2 retrieval.""" + """Top-K candidates from :meth:`VprStrategy.retrieve_topk`. - query_frame_id: int - candidate_tile_ids: tuple[str, ...] - scores: tuple[float, ...] + Consumed by C2.5 ``RerankStrategy``. ``candidates`` is a tuple + (not a list) so the frozen+slotted invariant holds: a frozen + dataclass can still receive a mutable list and let consumers + mutate it; the tuple closes that door. + + ``backbone_label`` is the strategy's lowercase + ``BUILD_VPR_`` form (INV-5) — non-empty for every + successful retrieval; consumed by C13 FDR for provenance. + """ + + frame_id: int + candidates: tuple[VprCandidate, ...] + retrieved_at: int + backbone_label: str @dataclass(frozen=True) class RerankResult: - """C2.5 reranked set of candidate tiles.""" + """C2.5 reranked candidates — legacy shape, AZ-342 owns the rewrite.""" query_frame_id: int candidate_tile_ids: tuple[str, ...] diff --git a/src/gps_denied_onboard/components/c2_vpr/__init__.py b/src/gps_denied_onboard/components/c2_vpr/__init__.py index fcfb8d7..72ddf11 100644 --- a/src/gps_denied_onboard/components/c2_vpr/__init__.py +++ b/src/gps_denied_onboard/components/c2_vpr/__init__.py @@ -1,6 +1,51 @@ -"""C2 VPR component — Public API.""" +"""C2 VPR — Public API (AZ-336). -from gps_denied_onboard._types.vpr import VprQuery, VprResult +Per ``vpr_strategy_protocol.md`` v1.0.0 the public surface consists +of: + +- :class:`VprStrategy` Protocol (3 methods). +- DTOs re-exported from :mod:`gps_denied_onboard._types.vpr` (the L1 + home for cross-component DTOs): :class:`VprQuery`, + :class:`VprCandidate`, :class:`VprResult`. +- Error family rooted at :class:`VprError`; three documented + subtypes (:class:`VprBackboneError`, :class:`VprPreprocessError`, + :class:`IndexUnavailableError`). +- Config block :class:`C2VprConfig` (registered on import). + +:class:`BackbonePreprocessor` is C2-internal (see +``components/02_c2_vpr/description.md`` § 6) and intentionally NOT +re-exported. + +Concrete strategies (``UltraVprStrategy``, ``NetVladStrategy``, +``MegaLocStrategy``, ``MixVprStrategy``, ``SelaVprStrategy``, +``EigenPlacesStrategy``, ``SaladStrategy``) live in sibling modules +and are imported lazily by +:mod:`gps_denied_onboard.runtime_root.vpr_factory` — Risk-2 +mitigation: this ``__init__.py`` MUST NOT import any concrete +strategy module. +""" + +from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult +from gps_denied_onboard.components.c2_vpr.config import C2VprConfig +from gps_denied_onboard.components.c2_vpr.errors import ( + IndexUnavailableError, + VprBackboneError, + VprError, + VprPreprocessError, +) from gps_denied_onboard.components.c2_vpr.interface import VprStrategy +from gps_denied_onboard.config.schema import register_component_block -__all__ = ["VprQuery", "VprResult", "VprStrategy"] +register_component_block("c2_vpr", C2VprConfig) + +__all__ = [ + "C2VprConfig", + "IndexUnavailableError", + "VprBackboneError", + "VprCandidate", + "VprError", + "VprPreprocessError", + "VprQuery", + "VprResult", + "VprStrategy", +] diff --git a/src/gps_denied_onboard/components/c2_vpr/_preprocessor.py b/src/gps_denied_onboard/components/c2_vpr/_preprocessor.py new file mode 100644 index 0000000..8048c36 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/_preprocessor.py @@ -0,0 +1,60 @@ +"""C2-internal ``BackbonePreprocessor`` Protocol (AZ-336). + +The preprocessor is the resize / crop / normalise step that turns a +``NavCameraFrame`` into the tensor the backbone's forward pass +expects. It is C2-internal — each concrete :class:`VprStrategy` +owns its own preprocessor; sharing across backbones is forbidden per +``components/02_c2_vpr/description.md`` § 6 (preprocessing parameters +are tightly coupled to the backbone weights, so a shared +preprocessor would let a NetVLAD instance corrupt UltraVPR's input +layout). + +This Protocol is NOT re-exported from ``c2_vpr.__init__`` — keeping +it inside the package enforces the description.md § 6 boundary. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + import numpy as np + + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = ["BackbonePreprocessor"] + + +@runtime_checkable +class BackbonePreprocessor(Protocol): + """Resize / crop / normalise per backbone's input contract. + + Each :class:`VprStrategy` implementation owns its concrete + preprocessor (NOT shared across backbones). The strategy calls + :meth:`preprocess` inside :meth:`VprStrategy.embed_query` before + running the forward pass. + """ + + def preprocess( + self, + frame: "NavCameraFrame", + calibration: "CameraCalibration", + ) -> "np.ndarray": + """Return the preprocessed input tensor in the backbone's layout. + + Typical shape: ``(1, 3, H, W)`` NCHW float16 for TRT engines. + + Raises :class:`VprPreprocessError` when the input frame + violates the backbone's contract (wrong colour channels, + calibration mismatch). + """ + ... + + def input_shape(self) -> tuple[int, ...]: + """``(H, W)`` resize target the backbone expects. + + Stable for the preprocessor's lifetime; consumed by tests to + assert preprocessing fidelity. + """ + ... diff --git a/src/gps_denied_onboard/components/c2_vpr/config.py b/src/gps_denied_onboard/components/c2_vpr/config.py new file mode 100644 index 0000000..36174c4 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/config.py @@ -0,0 +1,82 @@ +"""C2 VPR strategy config block (AZ-336). + +Registered into ``config.components['c2_vpr']`` by the package +``__init__.py``. The composition-root factory +:func:`gps_denied_onboard.runtime_root.vpr_factory.build_vpr_strategy` +reads this block to select the strategy and locate the backbone +weights + FAISS index sidecar. + +``backbone_weights_path`` and ``faiss_index_path`` are required (no +default — paths are deployment-specific). They are typed +:class:`pathlib.Path` so YAML loaders that emit strings get coerced +at construction; ``__post_init__`` validates that both are non-empty. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Final + +from gps_denied_onboard.config.schema import ConfigError + +__all__ = [ + "C2VprConfig", + "KNOWN_STRATEGIES", +] + +KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset( + { + "ultra_vpr", + "net_vlad", + "mega_loc", + "mix_vpr", + "sela_vpr", + "eigen_places", + "salad", + } +) + + +@dataclass(frozen=True) +class C2VprConfig: + """Per-component config for C2 VPR. + + ``strategy`` selects exactly one of the seven backbones + (see :data:`KNOWN_STRATEGIES`); the composition-root factory + respects compile-time ``BUILD_VPR_`` gating on top of + this label. + + ``backbone_weights_path`` is the on-disk location of the + backbone weights (TRT engine, ONNX model, PyTorch state dict — + per strategy). ``faiss_index_path`` is the location of the + pre-built FAISS HNSW index file (C6 ``DescriptorIndex`` reads + its sidecar there). + """ + + strategy: str = "net_vlad" + backbone_weights_path: Path = field(default_factory=lambda: Path("/models/vpr/weights")) + faiss_index_path: Path = field(default_factory=lambda: Path("/cache/vpr/index.faiss")) + + def __post_init__(self) -> None: + if self.strategy not in KNOWN_STRATEGIES: + raise ConfigError( + f"C2VprConfig.strategy={self.strategy!r} not in " + f"{sorted(KNOWN_STRATEGIES)}" + ) + if not isinstance(self.backbone_weights_path, Path): + object.__setattr__( + self, "backbone_weights_path", Path(self.backbone_weights_path) + ) + if not isinstance(self.faiss_index_path, Path): + object.__setattr__( + self, "faiss_index_path", Path(self.faiss_index_path) + ) + if not str(self.backbone_weights_path): + raise ConfigError( + "C2VprConfig.backbone_weights_path must be non-empty" + ) + if not str(self.faiss_index_path): + raise ConfigError( + "C2VprConfig.faiss_index_path must be non-empty" + ) diff --git a/src/gps_denied_onboard/components/c2_vpr/errors.py b/src/gps_denied_onboard/components/c2_vpr/errors.py new file mode 100644 index 0000000..953e917 --- /dev/null +++ b/src/gps_denied_onboard/components/c2_vpr/errors.py @@ -0,0 +1,66 @@ +"""C2 VprStrategy error taxonomy (AZ-336). + +Every ``VprStrategy`` method raises only members of :class:`VprError`. +Lower-level exceptions from the backbone runtime (TRT deserialize, +CUDA OOM, ONNX runtime IO mismatch, FAISS index torn mmap) MUST be +caught and rewrapped by the concrete strategy — the contract closes +the error envelope so consumers can ``except VprError`` once and +handle the family. + +A separate composition-time error +(:class:`gps_denied_onboard.runtime_root.errors.StrategyNotAvailableError`) +lives outside this family — it is raised by the factory, not by a +``VprStrategy`` method. + +Note: C6 ``c6_tile_cache.errors`` also defines an +``IndexUnavailableError`` for the underlying ``DescriptorIndex`` +search path. The two classes are intentionally distinct (same name, +different namespaces): the C2 family is the closed envelope a C5/C2.5 +consumer sees; the C6 family is the storage-layer error a concrete +strategy is responsible for rewrapping. +""" + +from __future__ import annotations + +__all__ = [ + "IndexUnavailableError", + "VprBackboneError", + "VprError", + "VprPreprocessError", +] + + +class VprError(Exception): + """Base class for the C2 VPR error family. + + Caught at the runtime root; downstream effect per AC-1.4: + C5 falls back to VIO-only with provenance ``visual_propagated``. + """ + + +class VprBackboneError(VprError): + """Backbone forward pass failed. + + CUDA OOM, TRT engine deserialize mismatch, ONNX runtime IO + shape mismatch. Logged at ERROR; one FDR record per occurrence. + """ + + +class VprPreprocessError(VprError): + """Input frame violates the backbone's preprocessing contract. + + Wrong colour channels, calibration mismatch. Logged at ERROR; + one FDR record per occurrence. The concrete preprocessor + (each strategy owns its own per description.md § 6) raises this + and the strategy lets it propagate unchanged. + """ + + +class IndexUnavailableError(VprError): + """FAISS index handle invalid for the strategy's retrieve path. + + Post-F8 reboot before warm-up, out-of-band file replacement + caught by the underlying mmap defence, dim mismatch caught at + search time. The strategy MUST raise this rather than return + stale candidates (C2-ST-01). + """ diff --git a/src/gps_denied_onboard/components/c2_vpr/interface.py b/src/gps_denied_onboard/components/c2_vpr/interface.py index d188c52..83335c1 100644 --- a/src/gps_denied_onboard/components/c2_vpr/interface.py +++ b/src/gps_denied_onboard/components/c2_vpr/interface.py @@ -1,17 +1,113 @@ -"""C2 `VprStrategy` Protocol. +"""C2 ``VprStrategy`` Protocol (AZ-336). -Concrete strategies: UltraVPR (primary), MegaLoc, MixVPR, SelaVPR, EigenPlaces, -NetVLAD, SALAD. See `_docs/02_document/components/02_c2_vpr/`. +PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; three +methods spanning the camera-ingest hot path +(:meth:`embed_query` + :meth:`retrieve_topk`) and the composition-time +pre-flight check (:meth:`descriptor_dim`). + +Concrete impls — :class:`UltraVprStrategy` (AZ-337), +:class:`NetVladStrategy` (AZ-338), :class:`MegaLocStrategy` / +:class:`MixVprStrategy` (AZ-339), :class:`SelaVprStrategy` / +:class:`EigenPlacesStrategy` / :class:`SaladStrategy` (AZ-340) — live +in sibling modules and are imported lazily by +:mod:`gps_denied_onboard.runtime_root.vpr_factory`. + +The contract at +``_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0 +is the authoritative shape; this module mirrors it 1:1. """ from __future__ import annotations -from typing import Protocol +from typing import TYPE_CHECKING, Protocol, runtime_checkable -from gps_denied_onboard._types.vpr import VprQuery, VprResult +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard._types.vpr import VprQuery, VprResult + +__all__ = ["VprStrategy"] +@runtime_checkable class VprStrategy(Protocol): - """Visual Place Recognition strategy: encode → retrieve top-K candidates.""" + """Single-camera visual place recognition strategy. - def retrieve(self, query: VprQuery, top_k: int = 10) -> VprResult: ... + Stateless per-frame; the only persistent state is the loaded + backbone weights and the C6-owned FAISS index handle (passed in + via constructor by the strategy's ``create(...)`` factory). + + Invariants (see ``vpr_strategy_protocol.md`` v1.0.0): + + - **INV-1 single-threaded** — each instance is bound to one + ingest thread; the composition root enforces. Concurrent + :meth:`embed_query` calls on a single instance race the GPU + stream. + - **INV-2 stateless per-frame** — no implicit dependency on + prior frames; reordering :meth:`embed_query` calls yields + identical embeddings. + - **INV-3 L2-normalised** — :attr:`VprQuery.embedding` is + L2-normalised before return (cosine ≡ Euclidean on the + FAISS HNSW lookup). + - **INV-4 top-K size + order** — :meth:`retrieve_topk` returns + exactly ``k`` candidates, ascending by + :attr:`VprCandidate.descriptor_distance`. + - **INV-5 backbone_label non-empty** — every + :attr:`VprResult.backbone_label` matches the strategy's + ``BUILD_VPR_`` lowercase form. + - **INV-6 deterministic** — same frame + calibration + corpus + → identical embedding + identical top-K (bit-exact for + float32; ULP-tolerant for float16). + - **INV-7 descriptor_dim stable** — :meth:`descriptor_dim` + never changes after construction; reflects the loaded + weights' output dim, NOT a config knob. + + Error envelope: only members of + :class:`gps_denied_onboard.components.c2_vpr.errors.VprError` + escape the three methods. Lower-level exceptions (CUDA, TRT, + FAISS) MUST be rewrapped by the concrete strategy. + """ + + def embed_query( + self, + frame: "NavCameraFrame", + calibration: "CameraCalibration", + ) -> "VprQuery": + """Run the backbone forward pass; return a ``VprQuery``. + + Calibration is consumed by the strategy's internal + :class:`BackbonePreprocessor` for resize / crop / normalise. + + Raises :class:`VprBackboneError` on backbone failure + (CUDA OOM, TRT deserialize mismatch, etc.) and + :class:`VprPreprocessError` on preprocessor contract + violation. + """ + ... + + def retrieve_topk(self, query: "VprQuery", k: int) -> "VprResult": + """Run the FAISS HNSW top-K lookup against the corpus index. + + The strategy holds the FAISS index handle + (constructor-injected from C6's ``DescriptorIndex``). + Top-K candidates are returned ascending by + :attr:`VprCandidate.descriptor_distance`. + + Raises :class:`IndexUnavailableError` when the FAISS index + handle is invalid (post-F8 reboot before warm-up; + out-of-band file replacement caught by mmap defence; + fewer than ``k`` indexed vectors). + """ + ... + + def descriptor_dim(self) -> int: + """Backbone embedding dimensionality. + + Examples: 512 for UltraVPR; 4096 for NetVLAD-VGG16. + Stable for the strategy's lifetime. Consumed by the + composition root at startup to pre-validate index + compatibility against the C6 ``DescriptorIndex`` sidecar + (mismatch → :class:`ConfigError` at startup, NOT at first + frame). + """ + ... diff --git a/src/gps_denied_onboard/runtime_root/vpr_factory.py b/src/gps_denied_onboard/runtime_root/vpr_factory.py new file mode 100644 index 0000000..f93c78b --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/vpr_factory.py @@ -0,0 +1,214 @@ +"""C2 VPR strategy composition-root factory (AZ-336). + +:func:`build_vpr_strategy` selects exactly one strategy by +``config.components['c2_vpr'].strategy`` and respects compile-time +``BUILD_VPR_`` gating: requesting a strategy whose flag is +OFF raises :class:`StrategyNotAvailableError` at composition time +(NOT at first frame). + +Concrete strategy modules +(``ultra_vpr``, ``net_vlad``, ``mega_loc``, ``mix_vpr``, +``sela_vpr``, ``eigen_places``, ``salad``) are imported lazily — +a Tier-0 workstation build with ``BUILD_VPR_ULTRA_VPR=OFF`` MUST +NOT load ``c2_vpr.ultra_vpr`` (ADR-002 / I-5; verifiable via +``sys.modules``). + +Pre-flight validation: after constructing the strategy, the factory +queries :meth:`VprStrategy.descriptor_dim` and asserts it matches +the C6 ``DescriptorIndex`` sidecar's ``descriptor_dim()``. Mismatch +→ :class:`ConfigurationError` at startup, NOT at first frame. + +Factory signature deviates from the v1.0.0 contract draft in one +place: the contract spec named the second parameter ``tile_store: +TileStore``, but ``descriptor_dim()`` lives on +:class:`DescriptorIndex` per C6's actual Public API. We inject +``descriptor_index`` directly; the contract markdown is updated to +match. +""" + +from __future__ import annotations + +import logging +import os +from typing import TYPE_CHECKING + +from gps_denied_onboard.config.schema import ConfigError +from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError + +if TYPE_CHECKING: + from gps_denied_onboard.components.c2_vpr import C2VprConfig, VprStrategy + from gps_denied_onboard.components.c6_tile_cache import DescriptorIndex + from gps_denied_onboard.components.c7_inference import InferenceRuntime + from gps_denied_onboard.config.schema import Config + +__all__ = ["build_vpr_strategy"] + + +_LOG = logging.getLogger("gps_denied_onboard.c2_vpr") + + +# Strategy resolution table — mirrors the contract's +# ``vpr_strategy_protocol.md`` v1.0.0 § Composition-Root Factory table +# verbatim. ANY mutation of this dict MUST be mirrored in the contract. +_STRATEGY_TO_BUILD_FLAG: dict[str, str] = { + "ultra_vpr": "BUILD_VPR_ULTRA_VPR", + "net_vlad": "BUILD_VPR_NETVLAD", + "mega_loc": "BUILD_VPR_MEGALOC", + "mix_vpr": "BUILD_VPR_MIXVPR", + "sela_vpr": "BUILD_VPR_SELAVPR", + "eigen_places": "BUILD_VPR_EIGENPLACES", + "salad": "BUILD_VPR_SALAD", +} + +_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = { + "ultra_vpr": ( + "gps_denied_onboard.components.c2_vpr.ultra_vpr", + "UltraVprStrategy", + ), + "net_vlad": ( + "gps_denied_onboard.components.c2_vpr.net_vlad", + "NetVladStrategy", + ), + "mega_loc": ( + "gps_denied_onboard.components.c2_vpr.mega_loc", + "MegaLocStrategy", + ), + "mix_vpr": ( + "gps_denied_onboard.components.c2_vpr.mix_vpr", + "MixVprStrategy", + ), + "sela_vpr": ( + "gps_denied_onboard.components.c2_vpr.sela_vpr", + "SelaVprStrategy", + ), + "eigen_places": ( + "gps_denied_onboard.components.c2_vpr.eigen_places", + "EigenPlacesStrategy", + ), + "salad": ( + "gps_denied_onboard.components.c2_vpr.salad", + "SaladStrategy", + ), +} + + +def _is_build_flag_on(flag_name: str) -> bool: + """Read a compile-time ``BUILD_*`` flag from the environment. + + ``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``; + anything else (including unset) → ``False``. Defaults to OFF so + test environments must opt-in explicitly per strategy. + """ + raw = os.environ.get(flag_name, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +def _c2_config(config: "Config") -> "C2VprConfig": + """Pull the registered C2 config block. + + ``c2_vpr.__init__`` registers it on import; a missing + registration is a developer error and surfaces as ``KeyError`` + rather than a silent fallback. + """ + return config.components["c2_vpr"] + + +def build_vpr_strategy( + config: "Config", + *, + descriptor_index: "DescriptorIndex", + inference_runtime: "InferenceRuntime", +) -> "VprStrategy": + """Construct the :class:`VprStrategy` impl selected by config. + + 1. Reads ``config.components['c2_vpr'].strategy``. + 2. Checks the matching ``BUILD_VPR_`` flag — if OFF, + raises :class:`StrategyNotAvailableError` BEFORE any import. + 3. Lazily imports the concrete strategy module. + 4. Constructs the strategy via its module-level + ``create(config, descriptor_index, inference_runtime)`` + factory function (each concrete strategy module exports + ``create`` as its public entry-point; concrete constructors + stay private). + 5. Pre-flight ``descriptor_dim`` match: ``strategy.descriptor_dim()`` + vs ``descriptor_index.descriptor_dim()``. Mismatch raises + :class:`ConfigError`; ONE ERROR log + ``kind="c2.vpr.dim_mismatch"`` is emitted; the strategy is + NOT bound. + 6. On success, ONE INFO log ``kind="c2.vpr.strategy_loaded"`` + with ``strategy`` + ``descriptor_dim``. + + Raises: + StrategyNotAvailableError: compile-time flag OFF or + concrete module not yet built (AZ-337..AZ-340 pending). + ConfigError: ``descriptor_dim`` mismatch between strategy + and corpus index. + """ + block = _c2_config(config) + strategy = block.strategy + flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy) + module_info = _STRATEGY_TO_MODULE.get(strategy) + if flag_name is None or module_info is None: + # Defensive — config validation rejects unknown strategy labels + # at load (C2VprConfig.__post_init__), so this branch is only + # reachable if the resolution table and the validation set + # drift apart. + _LOG.error( + "c2.vpr.build_flag_off", + extra={"strategy": strategy, "reason": "unknown_strategy"}, + ) + raise StrategyNotAvailableError( + f"VprStrategy {strategy!r} is not buildable in this binary." + ) + if not _is_build_flag_on(flag_name): + _LOG.error( + "c2.vpr.build_flag_off", + extra={"strategy": strategy, "flag": flag_name}, + ) + raise StrategyNotAvailableError( + f"BUILD_VPR_{strategy.upper()} is OFF for this binary; " + f"cannot select strategy={strategy}." + ) + module_name, class_name = module_info + try: + module = __import__(module_name, fromlist=[class_name]) + except ModuleNotFoundError as exc: + raise StrategyNotAvailableError( + f"VprStrategy {strategy!r} is configured but its concrete impl " + f"module {module_name!r} has not been built into this binary " + "yet (AZ-337 / AZ-338 / AZ-339 / AZ-340 pending)." + ) from exc + create_fn = getattr(module, "create", None) + if create_fn is None: + strategy_cls = getattr(module, class_name) + instance = strategy_cls( + config, + descriptor_index=descriptor_index, + inference_runtime=inference_runtime, + ) + else: + instance = create_fn( + config, + descriptor_index=descriptor_index, + inference_runtime=inference_runtime, + ) + strategy_dim = instance.descriptor_dim() + corpus_dim = descriptor_index.descriptor_dim() + if strategy_dim != corpus_dim: + _LOG.error( + "c2.vpr.dim_mismatch", + extra={ + "strategy": strategy, + "strategy_dim": strategy_dim, + "corpus_dim": corpus_dim, + }, + ) + raise ConfigError( + f"descriptor_dim mismatch: strategy={strategy_dim}, " + f"corpus={corpus_dim}" + ) + _LOG.info( + "c2.vpr.strategy_loaded", + extra={"strategy": strategy, "descriptor_dim": strategy_dim}, + ) + return instance diff --git a/tests/unit/c2_vpr/test_protocol_conformance.py b/tests/unit/c2_vpr/test_protocol_conformance.py new file mode 100644 index 0000000..9be49f2 --- /dev/null +++ b/tests/unit/c2_vpr/test_protocol_conformance.py @@ -0,0 +1,528 @@ +"""AZ-336 — C2 VprStrategy Protocol + DTO + error + factory conformance. + +Covers all 9 ACs of AZ-336 + the NFRs. The factory ACs (AC-3..AC-6) +substitute fake strategy modules at the ``sys.modules`` boundary so +the test never touches UltraVPR / NetVLAD / FAISS / TensorRT native +libraries. +""" + +from __future__ import annotations + +import dataclasses +import logging +import sys +import time +import types +from pathlib import Path + +import numpy as np +import pytest + +from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult +from gps_denied_onboard.components.c2_vpr import ( + C2VprConfig, + IndexUnavailableError, + VprBackboneError, + VprError, + VprPreprocessError, + VprStrategy, +) +from gps_denied_onboard.components.c2_vpr._preprocessor import BackbonePreprocessor +from gps_denied_onboard.components.c2_vpr.config import KNOWN_STRATEGIES +from gps_denied_onboard.config.schema import Config, ConfigError +from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError +from gps_denied_onboard.runtime_root.vpr_factory import build_vpr_strategy + + +_STRATEGY_MODULES: dict[str, tuple[str, str, str]] = { + "ultra_vpr": ( + "gps_denied_onboard.components.c2_vpr.ultra_vpr", + "UltraVprStrategy", + "BUILD_VPR_ULTRA_VPR", + ), + "net_vlad": ( + "gps_denied_onboard.components.c2_vpr.net_vlad", + "NetVladStrategy", + "BUILD_VPR_NETVLAD", + ), + "mega_loc": ( + "gps_denied_onboard.components.c2_vpr.mega_loc", + "MegaLocStrategy", + "BUILD_VPR_MEGALOC", + ), + "mix_vpr": ( + "gps_denied_onboard.components.c2_vpr.mix_vpr", + "MixVprStrategy", + "BUILD_VPR_MIXVPR", + ), + "sela_vpr": ( + "gps_denied_onboard.components.c2_vpr.sela_vpr", + "SelaVprStrategy", + "BUILD_VPR_SELAVPR", + ), + "eigen_places": ( + "gps_denied_onboard.components.c2_vpr.eigen_places", + "EigenPlacesStrategy", + "BUILD_VPR_EIGENPLACES", + ), + "salad": ( + "gps_denied_onboard.components.c2_vpr.salad", + "SaladStrategy", + "BUILD_VPR_SALAD", + ), +} + + +# ---------------------------------------------------------------------- +# Fakes that structurally satisfy the VprStrategy + DescriptorIndex +# Protocols. Tests substitute these at the sys.modules boundary so no +# native library is loaded. + + +class _FakeDescriptorIndex: + def __init__(self, dim: int = 512) -> None: + self._dim = dim + + def search_topk(self, query, k): + return [] + + def descriptor_dim(self): + return self._dim + + def mmap_handle(self): + return Path("/tmp/fake.faiss") + + def rebuild_from_descriptors(self, descriptors, tile_ids, hnsw_params): + return None + + def index_metadata(self): + raise NotImplementedError + + +class _FakeInferenceRuntime: + def load_engine(self, *args, **kwargs): + raise NotImplementedError + + def infer(self, *args, **kwargs): + raise NotImplementedError + + def warm_up(self): + return None + + def thermal_state(self): + raise NotImplementedError + + +class _FullVprStrategy: + def __init__(self, config, *, descriptor_index, inference_runtime, dim=512) -> None: + self._config = config + self._descriptor_index = descriptor_index + self._inference_runtime = inference_runtime + self._dim = dim + self._label = config.components["c2_vpr"].strategy + + def embed_query(self, frame, calibration): + return VprQuery( + frame_id=getattr(frame, "frame_id", 0), + embedding=np.zeros((self._dim,), dtype=np.float32), + produced_at=1_000_000_000, + ) + + def retrieve_topk(self, query, k): + return VprResult( + frame_id=query.frame_id, + candidates=tuple(), + retrieved_at=1_000_000_000, + backbone_label=self._label, + ) + + def descriptor_dim(self): + return self._dim + + +class _PartialVprStrategy: + def embed_query(self, frame, calibration): + raise NotImplementedError + + def retrieve_topk(self, query, k): + raise NotImplementedError + + +def _config_with_strategy(strategy: str) -> Config: + return Config.with_blocks(c2_vpr=C2VprConfig(strategy=strategy)) + + +def _install_fake_strategy(strategy_label: str, *, dim: int = 512) -> type: + module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label] + + class _FakeStrategy(_FullVprStrategy): + def __init__(self, config, **kwargs) -> None: + super().__init__(config, dim=dim, **kwargs) + + _FakeStrategy.__name__ = class_name + module = types.ModuleType(module_name) + setattr(module, class_name, _FakeStrategy) + sys.modules[module_name] = module + return _FakeStrategy + + +@pytest.fixture +def strategy_module_cleanup(): + """Pop every fake strategy module before/after each factory test.""" + for module_name, _, _ in _STRATEGY_MODULES.values(): + sys.modules.pop(module_name, None) + yield + for module_name, _, _ in _STRATEGY_MODULES.values(): + sys.modules.pop(module_name, None) + + +# ---------------------------------------------------------------------- +# AC-1: Protocol conformance — full satisfies, partial does not. + + +def test_ac1_vpr_strategy_conformance_full() -> None: + instance = _FullVprStrategy( + _config_with_strategy("net_vlad"), + descriptor_index=_FakeDescriptorIndex(), + inference_runtime=_FakeInferenceRuntime(), + ) + assert isinstance(instance, VprStrategy) + + +def test_ac1_vpr_strategy_conformance_partial_missing_methods() -> None: + assert not isinstance(_PartialVprStrategy(), VprStrategy) + + +# ---------------------------------------------------------------------- +# AC-2: frozen+slotted DTOs reject mutation and forbid __dict__. + + +def _make_query(frame_id: int = 7, dim: int = 512) -> VprQuery: + return VprQuery( + frame_id=frame_id, + embedding=np.zeros((dim,), dtype=np.float32), + produced_at=1_000_000_000, + ) + + +def _make_candidate() -> VprCandidate: + return VprCandidate( + tile_id=(18, 49.9, 36.3), + descriptor_distance=0.123, + descriptor_dim=512, + ) + + +def _make_result(frame_id: int = 7) -> VprResult: + return VprResult( + frame_id=frame_id, + candidates=(_make_candidate(),), + retrieved_at=1_000_000_000, + backbone_label="net_vlad", + ) + + +@pytest.mark.parametrize( + "dto, field_name, new_value", + [ + (_make_query(), "frame_id", 99), + (_make_candidate(), "descriptor_distance", 0.9), + (_make_result(), "backbone_label", "ultra_vpr"), + ], +) +def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None: + original_value = getattr(dto, field_name) + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(dto, field_name, new_value) + assert getattr(dto, field_name) == original_value + + +@pytest.mark.parametrize("cls", [VprQuery, VprCandidate, VprResult]) +def test_ac2_dtos_have_slots(cls) -> None: + assert hasattr(cls, "__slots__"), f"{cls.__name__} must use slots=True" + assert cls.__slots__, f"{cls.__name__}.__slots__ must be non-empty" + instance = ( + _make_query() + if cls is VprQuery + else _make_candidate() + if cls is VprCandidate + else _make_result() + ) + assert not hasattr(instance, "__dict__"), ( + f"{cls.__name__} carries a __dict__ — slots=True is missing" + ) + + +# ---------------------------------------------------------------------- +# AC-3: factory rejects missing build flag. + + +def test_ac3_factory_rejects_missing_build_flag( + monkeypatch, strategy_module_cleanup, caplog +) -> None: + strategy = "ultra_vpr" + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.delenv(flag, raising=False) + config = _config_with_strategy(strategy) + with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_vpr"): + with pytest.raises(StrategyNotAvailableError) as exc_info: + build_vpr_strategy( + config, + descriptor_index=_FakeDescriptorIndex(), + inference_runtime=_FakeInferenceRuntime(), + ) + assert "BUILD_VPR_ULTRA_VPR is OFF" in str(exc_info.value) + assert any( + r.message == "c2.vpr.build_flag_off" + for r in caplog.records + ), "ERROR log kind=c2.vpr.build_flag_off must be emitted" + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac3_factory_does_not_load_module_when_flag_off( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + module_name, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.delenv(flag, raising=False) + config = _config_with_strategy(strategy) + with pytest.raises(StrategyNotAvailableError): + build_vpr_strategy( + config, + descriptor_index=_FakeDescriptorIndex(), + inference_runtime=_FakeInferenceRuntime(), + ) + assert module_name not in sys.modules, ( + f"{module_name} must NOT be in sys.modules when its BUILD flag is OFF" + ) + + +# ---------------------------------------------------------------------- +# AC-4: factory rejects descriptor_dim mismatch. + + +def test_ac4_factory_rejects_dim_mismatch( + monkeypatch, strategy_module_cleanup, caplog +) -> None: + strategy = "ultra_vpr" + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + _install_fake_strategy(strategy, dim=512) + config = _config_with_strategy(strategy) + with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_vpr"): + with pytest.raises(ConfigError) as exc_info: + build_vpr_strategy( + config, + descriptor_index=_FakeDescriptorIndex(dim=4096), + inference_runtime=_FakeInferenceRuntime(), + ) + assert "descriptor_dim mismatch: strategy=512, corpus=4096" in str( + exc_info.value + ) + assert any( + r.message == "c2.vpr.dim_mismatch" + for r in caplog.records + ), "ERROR log kind=c2.vpr.dim_mismatch must be emitted" + + +# ---------------------------------------------------------------------- +# AC-5: successful factory load emits INFO log with structured fields. + + +def test_ac5_factory_emits_info_log_on_success( + monkeypatch, strategy_module_cleanup, caplog +) -> None: + strategy = "ultra_vpr" + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + _install_fake_strategy(strategy, dim=512) + config = _config_with_strategy(strategy) + with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c2_vpr"): + instance = build_vpr_strategy( + config, + descriptor_index=_FakeDescriptorIndex(dim=512), + inference_runtime=_FakeInferenceRuntime(), + ) + assert isinstance(instance, VprStrategy) + records = [ + r for r in caplog.records if r.message == "c2.vpr.strategy_loaded" + ] + assert len(records) == 1, "Exactly one strategy_loaded INFO log expected" + record = records[0] + assert getattr(record, "strategy", None) == "ultra_vpr" + assert getattr(record, "descriptor_dim", None) == 512 + + +# ---------------------------------------------------------------------- +# AC-6: every entry in the resolution table resolves to its module path. + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac6_strategy_resolution_table( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + module_name, class_name, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + fake_cls = _install_fake_strategy(strategy, dim=512) + config = _config_with_strategy(strategy) + instance = build_vpr_strategy( + config, + descriptor_index=_FakeDescriptorIndex(dim=512), + inference_runtime=_FakeInferenceRuntime(), + ) + assert isinstance(instance, fake_cls) + assert isinstance(instance, VprStrategy) + assert sys.modules[module_name] is not None + + +# ---------------------------------------------------------------------- +# AC-7: error hierarchy — every concrete error is catchable as VprError. + + +@pytest.mark.parametrize( + "exc_factory", + [VprBackboneError, VprPreprocessError, IndexUnavailableError], +) +def test_ac7_all_vpr_errors_caught_as_family(exc_factory) -> None: + with pytest.raises(VprError): + raise exc_factory("boom") + + +def test_ac7_unrelated_exception_not_caught_as_family() -> None: + with pytest.raises(ValueError): + try: + raise ValueError("not us") + except VprError: + pytest.fail("ValueError must not be caught as VprError") + + +def test_ac7_strategy_not_available_outside_family() -> None: + with pytest.raises(StrategyNotAvailableError): + try: + raise StrategyNotAvailableError("composition-time") + except VprError: + pytest.fail( + "StrategyNotAvailableError is a composition-root error " + "and MUST NOT be in the c2 VprError family" + ) + + +# ---------------------------------------------------------------------- +# AC-8: Public API surface — re-exports + BackbonePreprocessor exclusion. + + +def test_ac8_public_api_re_exports() -> None: + from gps_denied_onboard.components import c2_vpr + + assert "VprStrategy" in c2_vpr.__all__ + assert "VprQuery" in c2_vpr.__all__ + assert "VprCandidate" in c2_vpr.__all__ + assert "VprResult" in c2_vpr.__all__ + + +def test_ac8_backbone_preprocessor_not_in_public_api() -> None: + from gps_denied_onboard.components import c2_vpr + + assert "BackbonePreprocessor" not in c2_vpr.__all__ + assert not hasattr(c2_vpr, "BackbonePreprocessor"), ( + "BackbonePreprocessor is C2-internal per description.md § 6 and " + "MUST NOT be re-exported from c2_vpr/__init__.py" + ) + + +def test_ac8_backbone_preprocessor_protocol_is_runtime_checkable() -> None: + # BackbonePreprocessor is internal but still a Protocol; tests in + # AZ-337..AZ-340 will use isinstance against it. + + class _OkPreprocessor: + def preprocess(self, frame, calibration): + raise NotImplementedError + + def input_shape(self): + return (224, 224) + + assert isinstance(_OkPreprocessor(), BackbonePreprocessor) + + +# ---------------------------------------------------------------------- +# Config validation — unknown strategy label is rejected at load. +# (AC-9 single-thread binding is deferred per AZ-336 task spec Risk 4; +# the generic compose_root thread-binding registry referenced by AC-9 +# has not materialised — each factory owns its own thread binding +# today, e.g. ``runtime_root.fc_factory.clear_outbound_thread_binding``. +# AC-9 ships with AZ-270's registry or its replacement; this task +# delivers AC-1..AC-8 + NFRs in line with the spec's escape clause.) + + +@pytest.mark.parametrize( + "bad_label", + ["ULTRA_VPR", "ultraVpr", "openVLAD", "", "vins_mono"], +) +def test_unknown_strategy_rejected_at_config_load(bad_label: str) -> None: + with pytest.raises(ConfigError) as exc_info: + C2VprConfig(strategy=bad_label) + msg = str(exc_info.value) + for valid in KNOWN_STRATEGIES: + assert valid in msg + + +# ---------------------------------------------------------------------- +# NFRs. + + +@pytest.mark.parametrize( + "exc_type", + [VprBackboneError, VprPreprocessError, IndexUnavailableError], +) +def test_nfr_reliability_all_vpr_errors_subclass_family(exc_type) -> None: + assert issubclass(exc_type, VprError) + + +def test_nfr_reliability_strategy_not_available_not_in_family() -> None: + assert not issubclass(StrategyNotAvailableError, VprError) + + +def test_nfr_perf_factory_under_50ms_p99( + monkeypatch, strategy_module_cleanup +) -> None: + """Factory p99 ≤ 50 ms across 100 calls (NFR-perf-factory).""" + strategy = "net_vlad" + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + _install_fake_strategy(strategy, dim=512) + config = _config_with_strategy(strategy) + descriptor_index = _FakeDescriptorIndex(dim=512) + inference_runtime = _FakeInferenceRuntime() + + durations_ms: list[float] = [] + for _ in range(100): + t0 = time.perf_counter() + build_vpr_strategy( + config, + descriptor_index=descriptor_index, + inference_runtime=inference_runtime, + ) + durations_ms.append((time.perf_counter() - t0) * 1000.0) + + durations_ms.sort() + p99 = durations_ms[int(0.99 * len(durations_ms))] + assert p99 <= 50.0, ( + f"build_vpr_strategy() p99={p99:.3f} ms exceeds 50 ms NFR" + ) + + +# ---------------------------------------------------------------------- +# Surface coverage — config defaults round-trip. + + +def test_c2_config_default_strategy_is_net_vlad() -> None: + cfg = C2VprConfig() + assert cfg.strategy == "net_vlad" + + +def test_c2_config_paths_coerce_to_path() -> None: + cfg = C2VprConfig( + backbone_weights_path="/tmp/weights", # type: ignore[arg-type] + faiss_index_path="/tmp/index.faiss", # type: ignore[arg-type] + ) + assert isinstance(cfg.backbone_weights_path, Path) + assert isinstance(cfg.faiss_index_path, Path) diff --git a/tests/unit/c2_vpr/test_smoke.py b/tests/unit/c2_vpr/test_smoke.py deleted file mode 100644 index 1b79fda..0000000 --- a/tests/unit/c2_vpr/test_smoke.py +++ /dev/null @@ -1,10 +0,0 @@ -"""C2 VPR smoke test — AC-9.""" - - -def test_interface_importable() -> None: - # Assert - from gps_denied_onboard.components.c2_vpr import VprQuery, VprResult, VprStrategy - - assert VprStrategy is not None - assert VprQuery is not None - assert VprResult is not None