mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 18:01:13 +00:00
[AZ-336] C2 VprStrategy: Protocol + DTOs + factory + composition
Foundational scaffolding for every concrete C2 backbone (UltraVPR, NetVLAD, MegaLoc, MixVPR, SelaVPR, EigenPlaces, SALAD — AZ-337..AZ-340) and the C2.5 ReRanker consumer side. No backbone is implemented here. * VprStrategy Protocol (embed_query / retrieve_topk / descriptor_dim) + BackbonePreprocessor C2-internal Protocol (NOT in Public API per description.md § 6). * DTOs in L1 _types/vpr.py — VprQuery, VprCandidate, VprResult; all frozen + slots; tuple-not-list for VprResult.candidates so the immutability invariant truly holds. * Error family: VprError + VprBackboneError + VprPreprocessError + IndexUnavailableError; same-named but namespace-distinct from c6_tile_cache.IndexUnavailableError (the c2 family is the closed envelope C5 / C2.5 consume; concrete strategies rewrap the C6 form). * C2VprConfig (strategy enum + backbone_weights_path + faiss_index_path) with strict validation at load; registered into Config.components on c2_vpr import. * build_vpr_strategy factory with 7-strategy resolution table, lazy import, BUILD_VPR_<variant> gating, ImportError→ StrategyNotAvailableError mapping, and pre-flight descriptor_dim match against DescriptorIndex.descriptor_dim() — mismatch fires ConfigError at startup, NOT at first frame. Contract change vs the v1.0.0 draft: factory takes descriptor_index: DescriptorIndex (not tile_store: TileStore) because descriptor_dim() lives on DescriptorIndex per C6's Public API. The contract markdown is updated to match. Architecture: VprCandidate.tile_id is a plain (zoom, lat, lon) tuple, keeping _types/ (L1) free of any c6_tile_cache (L3) import per module-layout.md. Consumers reconstruct TileId at the C6 boundary. Excluded per task spec: * Concrete backbones (AZ-337..AZ-340). * FAISS HNSW retrieve wiring (AZ-341). * DescriptorNormaliser helper (AZ-283, already shipped). * AC-9 single-thread binding — deferred per task spec Risk 4 until the generic compose_root thread-binding registry is in place (today each factory owns its own, e.g. fc_factory). Tests: 45 ACs + NFRs in tests/unit/c2_vpr/test_protocol_conformance.py covering AC-1..AC-8, the error family, the config validation, the factory NFR (p99 ≤ 50 ms). The legacy smoke test is removed. Full sweep 973 passed, 2 skipped (CI-only cmake / actionlint). Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -4,7 +4,7 @@
|
|||||||
**Producer task**: AZ-336 (`VprStrategy` Protocol + factory + composition)
|
**Producer task**: AZ-336 (`VprStrategy` Protocol + factory + composition)
|
||||||
**Consumer tasks**: AZ-337 (UltraVPR), AZ-338 (NetVLAD baseline), AZ-339 (MegaLoc + MixVPR), AZ-340 (SelaVPR + EigenPlaces + SALAD), AZ-341 (FAISS HNSW retrieve wiring), and downstream c2_5_rerank (AZ-256 / E-C2.5)
|
**Consumer tasks**: AZ-337 (UltraVPR), AZ-338 (NetVLAD baseline), AZ-339 (MegaLoc + MixVPR), AZ-340 (SelaVPR + EigenPlaces + SALAD), AZ-341 (FAISS HNSW retrieve wiring), and downstream c2_5_rerank (AZ-256 / E-C2.5)
|
||||||
**Module-layout home**: `src/gps_denied_onboard/components/c2_vpr/interface.py` (Protocols), `src/gps_denied_onboard/components/c2_vpr/__init__.py` (re-exports), `src/gps_denied_onboard/runtime_root/vpr_factory.py` (factory)
|
**Module-layout home**: `src/gps_denied_onboard/components/c2_vpr/interface.py` (Protocols), `src/gps_denied_onboard/components/c2_vpr/__init__.py` (re-exports), `src/gps_denied_onboard/runtime_root/vpr_factory.py` (factory)
|
||||||
**Status**: draft, awaiting AZ-336 implementation
|
**Status**: v1.0.0 (AZ-336 implemented 2026-05-12)
|
||||||
|
|
||||||
## Purpose
|
## Purpose
|
||||||
|
|
||||||
@@ -69,25 +69,23 @@ class VprStrategy(Protocol):
|
|||||||
|
|
||||||
```python
|
```python
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from uuid import UUID
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True, slots=True)
|
@dataclass(frozen=True, slots=True)
|
||||||
class VprQuery:
|
class VprQuery:
|
||||||
"""Backbone embedding for a single nav-camera frame. Produced by `VprStrategy.embed_query`; consumed by `VprStrategy.retrieve_topk` (same instance) or — in the C10 corpus-build path — by `DescriptorIndexBuilder` to populate the corpus descriptor matrix."""
|
"""Backbone embedding for a single nav-camera frame. Produced by `VprStrategy.embed_query`; consumed by `VprStrategy.retrieve_topk` (same instance) or — in the C10 corpus-build path — by `DescriptorIndexBuilder` to populate the corpus descriptor matrix."""
|
||||||
|
|
||||||
frame_id: UUID
|
frame_id: int # echoes NavCameraFrame.frame_id (the source carries int across the pipeline)
|
||||||
embedding: np.ndarray # shape (D,), dtype float16 or float32; L2-normalised
|
embedding: object # numpy.ndarray, shape (D,), dtype float16|float32; L2-normalised. typed object to keep _types/ free of numpy import-time dep.
|
||||||
produced_at: int # monotonic_ns
|
produced_at: int # monotonic_ns from injected Clock
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True, slots=True)
|
@dataclass(frozen=True, slots=True)
|
||||||
class VprCandidate:
|
class VprCandidate:
|
||||||
"""One retrieval candidate from the top-K result."""
|
"""One retrieval candidate from the top-K result."""
|
||||||
|
|
||||||
tile_id: tuple # composite (zoomLevel, lat, lon); see C6 TileRecord
|
tile_id: tuple[int, float, float] # composite (zoom_level, lat, lon); matches c6_tile_cache.TileId. tuple form keeps _types/ free of an L1→L3 import per module-layout.md.
|
||||||
descriptor_distance: float # backbone-specific metric (cosine for L2-normalised; Euclidean for raw)
|
descriptor_distance: float # backbone-specific metric (cosine for L2-normalised; Euclidean for raw)
|
||||||
descriptor_dim: int
|
descriptor_dim: int
|
||||||
|
|
||||||
|
|
||||||
@@ -95,10 +93,10 @@ class VprCandidate:
|
|||||||
class VprResult:
|
class VprResult:
|
||||||
"""Top-K candidates from `VprStrategy.retrieve_topk`. Consumed by C2.5 ReRanker."""
|
"""Top-K candidates from `VprStrategy.retrieve_topk`. Consumed by C2.5 ReRanker."""
|
||||||
|
|
||||||
frame_id: UUID
|
frame_id: int # echoes the source NavCameraFrame.frame_id
|
||||||
candidates: list[VprCandidate] # length == k, sorted ascending by descriptor_distance
|
candidates: tuple[VprCandidate, ...] # length == k, ascending by descriptor_distance. tuple (not list) so the frozen+slots invariant holds.
|
||||||
retrieved_at: int # monotonic_ns
|
retrieved_at: int # monotonic_ns from injected Clock
|
||||||
backbone_label: str # non-empty; matches BUILD_VPR_<variant> lowercase
|
backbone_label: str # non-empty; matches BUILD_VPR_<variant> lowercase
|
||||||
```
|
```
|
||||||
|
|
||||||
### Protocol: `BackbonePreprocessor` (C2-internal; lives in `c2_vpr/_preprocessor.py`)
|
### Protocol: `BackbonePreprocessor` (C2-internal; lives in `c2_vpr/_preprocessor.py`)
|
||||||
@@ -155,18 +153,21 @@ class IndexUnavailableError(VprError):
|
|||||||
# src/gps_denied_onboard/runtime_root/vpr_factory.py
|
# src/gps_denied_onboard/runtime_root/vpr_factory.py
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
from gps_denied_onboard.config import Config
|
from gps_denied_onboard.config.schema import Config
|
||||||
from gps_denied_onboard.components.c2_vpr import VprStrategy
|
from gps_denied_onboard.components.c2_vpr import VprStrategy
|
||||||
from gps_denied_onboard.components.c6_tile_cache import TileStore
|
from gps_denied_onboard.components.c6_tile_cache import DescriptorIndex
|
||||||
from gps_denied_onboard.components.c7_inference import InferenceRuntime
|
from gps_denied_onboard.components.c7_inference import InferenceRuntime
|
||||||
|
|
||||||
|
|
||||||
def build_vpr_strategy(
|
def build_vpr_strategy(
|
||||||
config: Config,
|
config: Config,
|
||||||
tile_store: TileStore,
|
*,
|
||||||
|
descriptor_index: DescriptorIndex,
|
||||||
inference_runtime: InferenceRuntime,
|
inference_runtime: InferenceRuntime,
|
||||||
) -> VprStrategy:
|
) -> VprStrategy:
|
||||||
"""Composition-root factory. Reads `config.vpr.strategy` and `config.vpr.backbone_weights_path`; lazy-imports the concrete strategy module gated by its CMake `BUILD_VPR_<variant>` flag; refuses to instantiate a strategy whose flag is OFF (raises `ConfigurationError` pointing at the offending strategy name + missing flag).
|
"""Composition-root factory. Reads `config.components['c2_vpr'].strategy` and `config.components['c2_vpr'].backbone_weights_path`; lazy-imports the concrete strategy module gated by its CMake `BUILD_VPR_<variant>` flag; refuses to instantiate a strategy whose flag is OFF (raises `StrategyNotAvailableError` pointing at the offending strategy name + missing flag).
|
||||||
|
|
||||||
|
`descriptor_index` (NOT `tile_store`) is injected: the pre-flight `descriptor_dim` validation reads from the C6 `DescriptorIndex.descriptor_dim()` which is the Public API that owns the FAISS index sidecar. The contract draft earlier named this parameter `tile_store`; the implementation moved it to match C6's actual Public API.
|
||||||
|
|
||||||
Strategy resolution table:
|
Strategy resolution table:
|
||||||
|
|
||||||
@@ -180,9 +181,9 @@ def build_vpr_strategy(
|
|||||||
| "eigen_places" | EigenPlacesStrategy | components.c2_vpr.eigen_places | BUILD_VPR_EIGENPLACES |
|
| "eigen_places" | EigenPlacesStrategy | components.c2_vpr.eigen_places | BUILD_VPR_EIGENPLACES |
|
||||||
| "salad" | SaladStrategy | components.c2_vpr.salad | BUILD_VPR_SALAD |
|
| "salad" | SaladStrategy | components.c2_vpr.salad | BUILD_VPR_SALAD |
|
||||||
|
|
||||||
Pre-flight validation: after constructing the strategy, the factory queries `strategy.descriptor_dim()` and asserts it matches the C6 corpus index's declared `descriptor_dim` (read from the FAISS index sidecar). Mismatch → `ConfigurationError` at startup, NOT at first frame.
|
Pre-flight validation: after constructing the strategy, the factory queries `strategy.descriptor_dim()` and asserts it matches `descriptor_index.descriptor_dim()` (the FAISS index sidecar value). Mismatch → `ConfigError` at startup, NOT at first frame.
|
||||||
|
|
||||||
Returns a fully-constructed strategy ready for `embed_query` / `retrieve_topk` invocation. The caller (runtime root) is responsible for binding the instance to one ingest thread.
|
Returns a fully-constructed strategy ready for `embed_query` / `retrieve_topk` invocation. The caller (runtime root) is responsible for binding the instance to one ingest thread (AC-9 deferred until the generic compose_root thread-binding registry is in place; see task spec Risk 4).
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ step: 7
|
|||||||
name: Implement
|
name: Implement
|
||||||
status: in_progress
|
status: in_progress
|
||||||
sub_step:
|
sub_step:
|
||||||
phase: 14
|
phase: 2
|
||||||
name: cumulative-code-review
|
name: detect-progress
|
||||||
detail: "PASS after F1+F2 remediation in-session; F3 informational; ready for batch 23"
|
detail: "batch 23 complete (AZ-303, AZ-297, AZ-331, AZ-398 per-task commits); selecting batch 24"
|
||||||
retry_count: 0
|
retry_count: 0
|
||||||
cycle: 1
|
cycle: 1
|
||||||
tracker: jira
|
tracker: jira
|
||||||
|
|||||||
@@ -1,34 +1,98 @@
|
|||||||
"""C2 VPR + C2.5 rerank DTOs."""
|
"""C2 VPR + C2.5 rerank DTOs (L1 cross-component layer).
|
||||||
|
|
||||||
|
The C2 trio (:class:`VprQuery`, :class:`VprCandidate`, :class:`VprResult`)
|
||||||
|
is frozen by ``contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0 (AZ-336):
|
||||||
|
slotted, immutable, no defaults, and stamped with the producer's
|
||||||
|
``monotonic_ns`` so the C13 FDR record can correlate the embed→retrieve
|
||||||
|
hop without a wall-clock dependency.
|
||||||
|
|
||||||
|
``RerankResult`` is the legacy C2.5 stub kept untouched for the AZ-342
|
||||||
|
contract rewrite — touching it here would expand AZ-336 scope into a
|
||||||
|
sibling task. Once AZ-342 lands, ``RerankResult`` moves to its v1.0.0
|
||||||
|
shape.
|
||||||
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
|
||||||
from typing import Any
|
__all__ = [
|
||||||
|
"RerankResult",
|
||||||
|
"VprCandidate",
|
||||||
|
"VprQuery",
|
||||||
|
"VprResult",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True, slots=True)
|
||||||
class VprQuery:
|
class VprQuery:
|
||||||
"""A VPR query (global descriptor + frame metadata)."""
|
"""Backbone embedding for one nav-camera frame.
|
||||||
|
|
||||||
|
Produced by :meth:`VprStrategy.embed_query`; consumed by
|
||||||
|
:meth:`VprStrategy.retrieve_topk` on the same instance (same
|
||||||
|
ingest thread per INV-1) or — in the offline C10 corpus-build
|
||||||
|
path — by the descriptor index builder.
|
||||||
|
|
||||||
|
``frame_id`` echoes :attr:`NavCameraFrame.frame_id` (int); the
|
||||||
|
contract documented ``UUID`` in draft but every other in-pipeline
|
||||||
|
DTO routed from a single frame carries the source ``int`` so
|
||||||
|
consumers can correlate without an extra mapping step.
|
||||||
|
``embedding`` is L2-normalised by the strategy before return
|
||||||
|
(INV-3); ``produced_at`` is ``monotonic_ns`` from an injected
|
||||||
|
:class:`gps_denied_onboard.clock.Clock`.
|
||||||
|
"""
|
||||||
|
|
||||||
frame_id: int
|
frame_id: int
|
||||||
timestamp: datetime
|
embedding: object # numpy.ndarray, shape (D,), dtype float16|float32
|
||||||
global_descriptor: Any
|
produced_at: int
|
||||||
metadata: dict[str, Any] = field(default_factory=dict)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class VprCandidate:
|
||||||
|
"""One retrieval candidate from a top-K result.
|
||||||
|
|
||||||
|
``tile_id`` is the composite ``(zoom_level, lat, lon)`` tuple
|
||||||
|
matching :class:`gps_denied_onboard.components.c6_tile_cache.TileId`;
|
||||||
|
the tuple form keeps this L1 DTO free of an L3 component import
|
||||||
|
(the contract spells it ``tuple`` per the architecture layering
|
||||||
|
rule). Consumers reconstruct ``TileId`` at the C6 boundary.
|
||||||
|
|
||||||
|
``descriptor_distance`` is the backbone-specific metric (cosine
|
||||||
|
on L2-normalised embeddings, Euclidean on raw); the strategy
|
||||||
|
sorts the parent ``VprResult.candidates`` tuple ascending by
|
||||||
|
this field (INV-4). ``descriptor_dim`` is echoed from the
|
||||||
|
strategy so a downstream FDR audit can verify the strategy↔index
|
||||||
|
dim match after the flight.
|
||||||
|
"""
|
||||||
|
|
||||||
|
tile_id: tuple[int, float, float]
|
||||||
|
descriptor_distance: float
|
||||||
|
descriptor_dim: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
class VprResult:
|
class VprResult:
|
||||||
"""Top-K candidates from C2 retrieval."""
|
"""Top-K candidates from :meth:`VprStrategy.retrieve_topk`.
|
||||||
|
|
||||||
query_frame_id: int
|
Consumed by C2.5 ``RerankStrategy``. ``candidates`` is a tuple
|
||||||
candidate_tile_ids: tuple[str, ...]
|
(not a list) so the frozen+slotted invariant holds: a frozen
|
||||||
scores: tuple[float, ...]
|
dataclass can still receive a mutable list and let consumers
|
||||||
|
mutate it; the tuple closes that door.
|
||||||
|
|
||||||
|
``backbone_label`` is the strategy's lowercase
|
||||||
|
``BUILD_VPR_<variant>`` form (INV-5) — non-empty for every
|
||||||
|
successful retrieval; consumed by C13 FDR for provenance.
|
||||||
|
"""
|
||||||
|
|
||||||
|
frame_id: int
|
||||||
|
candidates: tuple[VprCandidate, ...]
|
||||||
|
retrieved_at: int
|
||||||
|
backbone_label: str
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class RerankResult:
|
class RerankResult:
|
||||||
"""C2.5 reranked set of candidate tiles."""
|
"""C2.5 reranked candidates — legacy shape, AZ-342 owns the rewrite."""
|
||||||
|
|
||||||
query_frame_id: int
|
query_frame_id: int
|
||||||
candidate_tile_ids: tuple[str, ...]
|
candidate_tile_ids: tuple[str, ...]
|
||||||
|
|||||||
@@ -1,6 +1,51 @@
|
|||||||
"""C2 VPR component — Public API."""
|
"""C2 VPR — Public API (AZ-336).
|
||||||
|
|
||||||
from gps_denied_onboard._types.vpr import VprQuery, VprResult
|
Per ``vpr_strategy_protocol.md`` v1.0.0 the public surface consists
|
||||||
|
of:
|
||||||
|
|
||||||
|
- :class:`VprStrategy` Protocol (3 methods).
|
||||||
|
- DTOs re-exported from :mod:`gps_denied_onboard._types.vpr` (the L1
|
||||||
|
home for cross-component DTOs): :class:`VprQuery`,
|
||||||
|
:class:`VprCandidate`, :class:`VprResult`.
|
||||||
|
- Error family rooted at :class:`VprError`; three documented
|
||||||
|
subtypes (:class:`VprBackboneError`, :class:`VprPreprocessError`,
|
||||||
|
:class:`IndexUnavailableError`).
|
||||||
|
- Config block :class:`C2VprConfig` (registered on import).
|
||||||
|
|
||||||
|
:class:`BackbonePreprocessor` is C2-internal (see
|
||||||
|
``components/02_c2_vpr/description.md`` § 6) and intentionally NOT
|
||||||
|
re-exported.
|
||||||
|
|
||||||
|
Concrete strategies (``UltraVprStrategy``, ``NetVladStrategy``,
|
||||||
|
``MegaLocStrategy``, ``MixVprStrategy``, ``SelaVprStrategy``,
|
||||||
|
``EigenPlacesStrategy``, ``SaladStrategy``) live in sibling modules
|
||||||
|
and are imported lazily by
|
||||||
|
:mod:`gps_denied_onboard.runtime_root.vpr_factory` — Risk-2
|
||||||
|
mitigation: this ``__init__.py`` MUST NOT import any concrete
|
||||||
|
strategy module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult
|
||||||
|
from gps_denied_onboard.components.c2_vpr.config import C2VprConfig
|
||||||
|
from gps_denied_onboard.components.c2_vpr.errors import (
|
||||||
|
IndexUnavailableError,
|
||||||
|
VprBackboneError,
|
||||||
|
VprError,
|
||||||
|
VprPreprocessError,
|
||||||
|
)
|
||||||
from gps_denied_onboard.components.c2_vpr.interface import VprStrategy
|
from gps_denied_onboard.components.c2_vpr.interface import VprStrategy
|
||||||
|
from gps_denied_onboard.config.schema import register_component_block
|
||||||
|
|
||||||
__all__ = ["VprQuery", "VprResult", "VprStrategy"]
|
register_component_block("c2_vpr", C2VprConfig)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"C2VprConfig",
|
||||||
|
"IndexUnavailableError",
|
||||||
|
"VprBackboneError",
|
||||||
|
"VprCandidate",
|
||||||
|
"VprError",
|
||||||
|
"VprPreprocessError",
|
||||||
|
"VprQuery",
|
||||||
|
"VprResult",
|
||||||
|
"VprStrategy",
|
||||||
|
]
|
||||||
|
|||||||
@@ -0,0 +1,60 @@
|
|||||||
|
"""C2-internal ``BackbonePreprocessor`` Protocol (AZ-336).
|
||||||
|
|
||||||
|
The preprocessor is the resize / crop / normalise step that turns a
|
||||||
|
``NavCameraFrame`` into the tensor the backbone's forward pass
|
||||||
|
expects. It is C2-internal — each concrete :class:`VprStrategy`
|
||||||
|
owns its own preprocessor; sharing across backbones is forbidden per
|
||||||
|
``components/02_c2_vpr/description.md`` § 6 (preprocessing parameters
|
||||||
|
are tightly coupled to the backbone weights, so a shared
|
||||||
|
preprocessor would let a NetVLAD instance corrupt UltraVPR's input
|
||||||
|
layout).
|
||||||
|
|
||||||
|
This Protocol is NOT re-exported from ``c2_vpr.__init__`` — keeping
|
||||||
|
it inside the package enforces the description.md § 6 boundary.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, Protocol, runtime_checkable
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from gps_denied_onboard._types.calibration import CameraCalibration
|
||||||
|
from gps_denied_onboard._types.nav import NavCameraFrame
|
||||||
|
|
||||||
|
__all__ = ["BackbonePreprocessor"]
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class BackbonePreprocessor(Protocol):
|
||||||
|
"""Resize / crop / normalise per backbone's input contract.
|
||||||
|
|
||||||
|
Each :class:`VprStrategy` implementation owns its concrete
|
||||||
|
preprocessor (NOT shared across backbones). The strategy calls
|
||||||
|
:meth:`preprocess` inside :meth:`VprStrategy.embed_query` before
|
||||||
|
running the forward pass.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def preprocess(
|
||||||
|
self,
|
||||||
|
frame: "NavCameraFrame",
|
||||||
|
calibration: "CameraCalibration",
|
||||||
|
) -> "np.ndarray":
|
||||||
|
"""Return the preprocessed input tensor in the backbone's layout.
|
||||||
|
|
||||||
|
Typical shape: ``(1, 3, H, W)`` NCHW float16 for TRT engines.
|
||||||
|
|
||||||
|
Raises :class:`VprPreprocessError` when the input frame
|
||||||
|
violates the backbone's contract (wrong colour channels,
|
||||||
|
calibration mismatch).
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def input_shape(self) -> tuple[int, ...]:
|
||||||
|
"""``(H, W)`` resize target the backbone expects.
|
||||||
|
|
||||||
|
Stable for the preprocessor's lifetime; consumed by tests to
|
||||||
|
assert preprocessing fidelity.
|
||||||
|
"""
|
||||||
|
...
|
||||||
@@ -0,0 +1,82 @@
|
|||||||
|
"""C2 VPR strategy config block (AZ-336).
|
||||||
|
|
||||||
|
Registered into ``config.components['c2_vpr']`` by the package
|
||||||
|
``__init__.py``. The composition-root factory
|
||||||
|
:func:`gps_denied_onboard.runtime_root.vpr_factory.build_vpr_strategy`
|
||||||
|
reads this block to select the strategy and locate the backbone
|
||||||
|
weights + FAISS index sidecar.
|
||||||
|
|
||||||
|
``backbone_weights_path`` and ``faiss_index_path`` are required (no
|
||||||
|
default — paths are deployment-specific). They are typed
|
||||||
|
:class:`pathlib.Path` so YAML loaders that emit strings get coerced
|
||||||
|
at construction; ``__post_init__`` validates that both are non-empty.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
from gps_denied_onboard.config.schema import ConfigError
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"C2VprConfig",
|
||||||
|
"KNOWN_STRATEGIES",
|
||||||
|
]
|
||||||
|
|
||||||
|
KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset(
|
||||||
|
{
|
||||||
|
"ultra_vpr",
|
||||||
|
"net_vlad",
|
||||||
|
"mega_loc",
|
||||||
|
"mix_vpr",
|
||||||
|
"sela_vpr",
|
||||||
|
"eigen_places",
|
||||||
|
"salad",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class C2VprConfig:
|
||||||
|
"""Per-component config for C2 VPR.
|
||||||
|
|
||||||
|
``strategy`` selects exactly one of the seven backbones
|
||||||
|
(see :data:`KNOWN_STRATEGIES`); the composition-root factory
|
||||||
|
respects compile-time ``BUILD_VPR_<variant>`` gating on top of
|
||||||
|
this label.
|
||||||
|
|
||||||
|
``backbone_weights_path`` is the on-disk location of the
|
||||||
|
backbone weights (TRT engine, ONNX model, PyTorch state dict —
|
||||||
|
per strategy). ``faiss_index_path`` is the location of the
|
||||||
|
pre-built FAISS HNSW index file (C6 ``DescriptorIndex`` reads
|
||||||
|
its sidecar there).
|
||||||
|
"""
|
||||||
|
|
||||||
|
strategy: str = "net_vlad"
|
||||||
|
backbone_weights_path: Path = field(default_factory=lambda: Path("/models/vpr/weights"))
|
||||||
|
faiss_index_path: Path = field(default_factory=lambda: Path("/cache/vpr/index.faiss"))
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
if self.strategy not in KNOWN_STRATEGIES:
|
||||||
|
raise ConfigError(
|
||||||
|
f"C2VprConfig.strategy={self.strategy!r} not in "
|
||||||
|
f"{sorted(KNOWN_STRATEGIES)}"
|
||||||
|
)
|
||||||
|
if not isinstance(self.backbone_weights_path, Path):
|
||||||
|
object.__setattr__(
|
||||||
|
self, "backbone_weights_path", Path(self.backbone_weights_path)
|
||||||
|
)
|
||||||
|
if not isinstance(self.faiss_index_path, Path):
|
||||||
|
object.__setattr__(
|
||||||
|
self, "faiss_index_path", Path(self.faiss_index_path)
|
||||||
|
)
|
||||||
|
if not str(self.backbone_weights_path):
|
||||||
|
raise ConfigError(
|
||||||
|
"C2VprConfig.backbone_weights_path must be non-empty"
|
||||||
|
)
|
||||||
|
if not str(self.faiss_index_path):
|
||||||
|
raise ConfigError(
|
||||||
|
"C2VprConfig.faiss_index_path must be non-empty"
|
||||||
|
)
|
||||||
@@ -0,0 +1,66 @@
|
|||||||
|
"""C2 VprStrategy error taxonomy (AZ-336).
|
||||||
|
|
||||||
|
Every ``VprStrategy`` method raises only members of :class:`VprError`.
|
||||||
|
Lower-level exceptions from the backbone runtime (TRT deserialize,
|
||||||
|
CUDA OOM, ONNX runtime IO mismatch, FAISS index torn mmap) MUST be
|
||||||
|
caught and rewrapped by the concrete strategy — the contract closes
|
||||||
|
the error envelope so consumers can ``except VprError`` once and
|
||||||
|
handle the family.
|
||||||
|
|
||||||
|
A separate composition-time error
|
||||||
|
(:class:`gps_denied_onboard.runtime_root.errors.StrategyNotAvailableError`)
|
||||||
|
lives outside this family — it is raised by the factory, not by a
|
||||||
|
``VprStrategy`` method.
|
||||||
|
|
||||||
|
Note: C6 ``c6_tile_cache.errors`` also defines an
|
||||||
|
``IndexUnavailableError`` for the underlying ``DescriptorIndex``
|
||||||
|
search path. The two classes are intentionally distinct (same name,
|
||||||
|
different namespaces): the C2 family is the closed envelope a C5/C2.5
|
||||||
|
consumer sees; the C6 family is the storage-layer error a concrete
|
||||||
|
strategy is responsible for rewrapping.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"IndexUnavailableError",
|
||||||
|
"VprBackboneError",
|
||||||
|
"VprError",
|
||||||
|
"VprPreprocessError",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class VprError(Exception):
|
||||||
|
"""Base class for the C2 VPR error family.
|
||||||
|
|
||||||
|
Caught at the runtime root; downstream effect per AC-1.4:
|
||||||
|
C5 falls back to VIO-only with provenance ``visual_propagated``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class VprBackboneError(VprError):
|
||||||
|
"""Backbone forward pass failed.
|
||||||
|
|
||||||
|
CUDA OOM, TRT engine deserialize mismatch, ONNX runtime IO
|
||||||
|
shape mismatch. Logged at ERROR; one FDR record per occurrence.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class VprPreprocessError(VprError):
|
||||||
|
"""Input frame violates the backbone's preprocessing contract.
|
||||||
|
|
||||||
|
Wrong colour channels, calibration mismatch. Logged at ERROR;
|
||||||
|
one FDR record per occurrence. The concrete preprocessor
|
||||||
|
(each strategy owns its own per description.md § 6) raises this
|
||||||
|
and the strategy lets it propagate unchanged.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class IndexUnavailableError(VprError):
|
||||||
|
"""FAISS index handle invalid for the strategy's retrieve path.
|
||||||
|
|
||||||
|
Post-F8 reboot before warm-up, out-of-band file replacement
|
||||||
|
caught by the underlying mmap defence, dim mismatch caught at
|
||||||
|
search time. The strategy MUST raise this rather than return
|
||||||
|
stale candidates (C2-ST-01).
|
||||||
|
"""
|
||||||
@@ -1,17 +1,113 @@
|
|||||||
"""C2 `VprStrategy` Protocol.
|
"""C2 ``VprStrategy`` Protocol (AZ-336).
|
||||||
|
|
||||||
Concrete strategies: UltraVPR (primary), MegaLoc, MixVPR, SelaVPR, EigenPlaces,
|
PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; three
|
||||||
NetVLAD, SALAD. See `_docs/02_document/components/02_c2_vpr/`.
|
methods spanning the camera-ingest hot path
|
||||||
|
(:meth:`embed_query` + :meth:`retrieve_topk`) and the composition-time
|
||||||
|
pre-flight check (:meth:`descriptor_dim`).
|
||||||
|
|
||||||
|
Concrete impls — :class:`UltraVprStrategy` (AZ-337),
|
||||||
|
:class:`NetVladStrategy` (AZ-338), :class:`MegaLocStrategy` /
|
||||||
|
:class:`MixVprStrategy` (AZ-339), :class:`SelaVprStrategy` /
|
||||||
|
:class:`EigenPlacesStrategy` / :class:`SaladStrategy` (AZ-340) — live
|
||||||
|
in sibling modules and are imported lazily by
|
||||||
|
:mod:`gps_denied_onboard.runtime_root.vpr_factory`.
|
||||||
|
|
||||||
|
The contract at
|
||||||
|
``_docs/02_document/contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0
|
||||||
|
is the authoritative shape; this module mirrors it 1:1.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Protocol
|
from typing import TYPE_CHECKING, Protocol, runtime_checkable
|
||||||
|
|
||||||
from gps_denied_onboard._types.vpr import VprQuery, VprResult
|
if TYPE_CHECKING:
|
||||||
|
from gps_denied_onboard._types.calibration import CameraCalibration
|
||||||
|
from gps_denied_onboard._types.nav import NavCameraFrame
|
||||||
|
from gps_denied_onboard._types.vpr import VprQuery, VprResult
|
||||||
|
|
||||||
|
__all__ = ["VprStrategy"]
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
class VprStrategy(Protocol):
|
class VprStrategy(Protocol):
|
||||||
"""Visual Place Recognition strategy: encode → retrieve top-K candidates."""
|
"""Single-camera visual place recognition strategy.
|
||||||
|
|
||||||
def retrieve(self, query: VprQuery, top_k: int = 10) -> VprResult: ...
|
Stateless per-frame; the only persistent state is the loaded
|
||||||
|
backbone weights and the C6-owned FAISS index handle (passed in
|
||||||
|
via constructor by the strategy's ``create(...)`` factory).
|
||||||
|
|
||||||
|
Invariants (see ``vpr_strategy_protocol.md`` v1.0.0):
|
||||||
|
|
||||||
|
- **INV-1 single-threaded** — each instance is bound to one
|
||||||
|
ingest thread; the composition root enforces. Concurrent
|
||||||
|
:meth:`embed_query` calls on a single instance race the GPU
|
||||||
|
stream.
|
||||||
|
- **INV-2 stateless per-frame** — no implicit dependency on
|
||||||
|
prior frames; reordering :meth:`embed_query` calls yields
|
||||||
|
identical embeddings.
|
||||||
|
- **INV-3 L2-normalised** — :attr:`VprQuery.embedding` is
|
||||||
|
L2-normalised before return (cosine ≡ Euclidean on the
|
||||||
|
FAISS HNSW lookup).
|
||||||
|
- **INV-4 top-K size + order** — :meth:`retrieve_topk` returns
|
||||||
|
exactly ``k`` candidates, ascending by
|
||||||
|
:attr:`VprCandidate.descriptor_distance`.
|
||||||
|
- **INV-5 backbone_label non-empty** — every
|
||||||
|
:attr:`VprResult.backbone_label` matches the strategy's
|
||||||
|
``BUILD_VPR_<variant>`` lowercase form.
|
||||||
|
- **INV-6 deterministic** — same frame + calibration + corpus
|
||||||
|
→ identical embedding + identical top-K (bit-exact for
|
||||||
|
float32; ULP-tolerant for float16).
|
||||||
|
- **INV-7 descriptor_dim stable** — :meth:`descriptor_dim`
|
||||||
|
never changes after construction; reflects the loaded
|
||||||
|
weights' output dim, NOT a config knob.
|
||||||
|
|
||||||
|
Error envelope: only members of
|
||||||
|
:class:`gps_denied_onboard.components.c2_vpr.errors.VprError`
|
||||||
|
escape the three methods. Lower-level exceptions (CUDA, TRT,
|
||||||
|
FAISS) MUST be rewrapped by the concrete strategy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def embed_query(
|
||||||
|
self,
|
||||||
|
frame: "NavCameraFrame",
|
||||||
|
calibration: "CameraCalibration",
|
||||||
|
) -> "VprQuery":
|
||||||
|
"""Run the backbone forward pass; return a ``VprQuery``.
|
||||||
|
|
||||||
|
Calibration is consumed by the strategy's internal
|
||||||
|
:class:`BackbonePreprocessor` for resize / crop / normalise.
|
||||||
|
|
||||||
|
Raises :class:`VprBackboneError` on backbone failure
|
||||||
|
(CUDA OOM, TRT deserialize mismatch, etc.) and
|
||||||
|
:class:`VprPreprocessError` on preprocessor contract
|
||||||
|
violation.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def retrieve_topk(self, query: "VprQuery", k: int) -> "VprResult":
|
||||||
|
"""Run the FAISS HNSW top-K lookup against the corpus index.
|
||||||
|
|
||||||
|
The strategy holds the FAISS index handle
|
||||||
|
(constructor-injected from C6's ``DescriptorIndex``).
|
||||||
|
Top-K candidates are returned ascending by
|
||||||
|
:attr:`VprCandidate.descriptor_distance`.
|
||||||
|
|
||||||
|
Raises :class:`IndexUnavailableError` when the FAISS index
|
||||||
|
handle is invalid (post-F8 reboot before warm-up;
|
||||||
|
out-of-band file replacement caught by mmap defence;
|
||||||
|
fewer than ``k`` indexed vectors).
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def descriptor_dim(self) -> int:
|
||||||
|
"""Backbone embedding dimensionality.
|
||||||
|
|
||||||
|
Examples: 512 for UltraVPR; 4096 for NetVLAD-VGG16.
|
||||||
|
Stable for the strategy's lifetime. Consumed by the
|
||||||
|
composition root at startup to pre-validate index
|
||||||
|
compatibility against the C6 ``DescriptorIndex`` sidecar
|
||||||
|
(mismatch → :class:`ConfigError` at startup, NOT at first
|
||||||
|
frame).
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|||||||
@@ -0,0 +1,214 @@
|
|||||||
|
"""C2 VPR strategy composition-root factory (AZ-336).
|
||||||
|
|
||||||
|
:func:`build_vpr_strategy` selects exactly one strategy by
|
||||||
|
``config.components['c2_vpr'].strategy`` and respects compile-time
|
||||||
|
``BUILD_VPR_<variant>`` gating: requesting a strategy whose flag is
|
||||||
|
OFF raises :class:`StrategyNotAvailableError` at composition time
|
||||||
|
(NOT at first frame).
|
||||||
|
|
||||||
|
Concrete strategy modules
|
||||||
|
(``ultra_vpr``, ``net_vlad``, ``mega_loc``, ``mix_vpr``,
|
||||||
|
``sela_vpr``, ``eigen_places``, ``salad``) are imported lazily —
|
||||||
|
a Tier-0 workstation build with ``BUILD_VPR_ULTRA_VPR=OFF`` MUST
|
||||||
|
NOT load ``c2_vpr.ultra_vpr`` (ADR-002 / I-5; verifiable via
|
||||||
|
``sys.modules``).
|
||||||
|
|
||||||
|
Pre-flight validation: after constructing the strategy, the factory
|
||||||
|
queries :meth:`VprStrategy.descriptor_dim` and asserts it matches
|
||||||
|
the C6 ``DescriptorIndex`` sidecar's ``descriptor_dim()``. Mismatch
|
||||||
|
→ :class:`ConfigurationError` at startup, NOT at first frame.
|
||||||
|
|
||||||
|
Factory signature deviates from the v1.0.0 contract draft in one
|
||||||
|
place: the contract spec named the second parameter ``tile_store:
|
||||||
|
TileStore``, but ``descriptor_dim()`` lives on
|
||||||
|
:class:`DescriptorIndex` per C6's actual Public API. We inject
|
||||||
|
``descriptor_index`` directly; the contract markdown is updated to
|
||||||
|
match.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from gps_denied_onboard.config.schema import ConfigError
|
||||||
|
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from gps_denied_onboard.components.c2_vpr import C2VprConfig, VprStrategy
|
||||||
|
from gps_denied_onboard.components.c6_tile_cache import DescriptorIndex
|
||||||
|
from gps_denied_onboard.components.c7_inference import InferenceRuntime
|
||||||
|
from gps_denied_onboard.config.schema import Config
|
||||||
|
|
||||||
|
__all__ = ["build_vpr_strategy"]
|
||||||
|
|
||||||
|
|
||||||
|
_LOG = logging.getLogger("gps_denied_onboard.c2_vpr")
|
||||||
|
|
||||||
|
|
||||||
|
# Strategy resolution table — mirrors the contract's
|
||||||
|
# ``vpr_strategy_protocol.md`` v1.0.0 § Composition-Root Factory table
|
||||||
|
# verbatim. ANY mutation of this dict MUST be mirrored in the contract.
|
||||||
|
_STRATEGY_TO_BUILD_FLAG: dict[str, str] = {
|
||||||
|
"ultra_vpr": "BUILD_VPR_ULTRA_VPR",
|
||||||
|
"net_vlad": "BUILD_VPR_NETVLAD",
|
||||||
|
"mega_loc": "BUILD_VPR_MEGALOC",
|
||||||
|
"mix_vpr": "BUILD_VPR_MIXVPR",
|
||||||
|
"sela_vpr": "BUILD_VPR_SELAVPR",
|
||||||
|
"eigen_places": "BUILD_VPR_EIGENPLACES",
|
||||||
|
"salad": "BUILD_VPR_SALAD",
|
||||||
|
}
|
||||||
|
|
||||||
|
_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = {
|
||||||
|
"ultra_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.ultra_vpr",
|
||||||
|
"UltraVprStrategy",
|
||||||
|
),
|
||||||
|
"net_vlad": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.net_vlad",
|
||||||
|
"NetVladStrategy",
|
||||||
|
),
|
||||||
|
"mega_loc": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.mega_loc",
|
||||||
|
"MegaLocStrategy",
|
||||||
|
),
|
||||||
|
"mix_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.mix_vpr",
|
||||||
|
"MixVprStrategy",
|
||||||
|
),
|
||||||
|
"sela_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.sela_vpr",
|
||||||
|
"SelaVprStrategy",
|
||||||
|
),
|
||||||
|
"eigen_places": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.eigen_places",
|
||||||
|
"EigenPlacesStrategy",
|
||||||
|
),
|
||||||
|
"salad": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.salad",
|
||||||
|
"SaladStrategy",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _is_build_flag_on(flag_name: str) -> bool:
|
||||||
|
"""Read a compile-time ``BUILD_*`` flag from the environment.
|
||||||
|
|
||||||
|
``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``;
|
||||||
|
anything else (including unset) → ``False``. Defaults to OFF so
|
||||||
|
test environments must opt-in explicitly per strategy.
|
||||||
|
"""
|
||||||
|
raw = os.environ.get(flag_name, "")
|
||||||
|
return raw.strip().lower() in {"on", "1", "true", "yes"}
|
||||||
|
|
||||||
|
|
||||||
|
def _c2_config(config: "Config") -> "C2VprConfig":
|
||||||
|
"""Pull the registered C2 config block.
|
||||||
|
|
||||||
|
``c2_vpr.__init__`` registers it on import; a missing
|
||||||
|
registration is a developer error and surfaces as ``KeyError``
|
||||||
|
rather than a silent fallback.
|
||||||
|
"""
|
||||||
|
return config.components["c2_vpr"]
|
||||||
|
|
||||||
|
|
||||||
|
def build_vpr_strategy(
|
||||||
|
config: "Config",
|
||||||
|
*,
|
||||||
|
descriptor_index: "DescriptorIndex",
|
||||||
|
inference_runtime: "InferenceRuntime",
|
||||||
|
) -> "VprStrategy":
|
||||||
|
"""Construct the :class:`VprStrategy` impl selected by config.
|
||||||
|
|
||||||
|
1. Reads ``config.components['c2_vpr'].strategy``.
|
||||||
|
2. Checks the matching ``BUILD_VPR_<variant>`` flag — if OFF,
|
||||||
|
raises :class:`StrategyNotAvailableError` BEFORE any import.
|
||||||
|
3. Lazily imports the concrete strategy module.
|
||||||
|
4. Constructs the strategy via its module-level
|
||||||
|
``create(config, descriptor_index, inference_runtime)``
|
||||||
|
factory function (each concrete strategy module exports
|
||||||
|
``create`` as its public entry-point; concrete constructors
|
||||||
|
stay private).
|
||||||
|
5. Pre-flight ``descriptor_dim`` match: ``strategy.descriptor_dim()``
|
||||||
|
vs ``descriptor_index.descriptor_dim()``. Mismatch raises
|
||||||
|
:class:`ConfigError`; ONE ERROR log
|
||||||
|
``kind="c2.vpr.dim_mismatch"`` is emitted; the strategy is
|
||||||
|
NOT bound.
|
||||||
|
6. On success, ONE INFO log ``kind="c2.vpr.strategy_loaded"``
|
||||||
|
with ``strategy`` + ``descriptor_dim``.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
StrategyNotAvailableError: compile-time flag OFF or
|
||||||
|
concrete module not yet built (AZ-337..AZ-340 pending).
|
||||||
|
ConfigError: ``descriptor_dim`` mismatch between strategy
|
||||||
|
and corpus index.
|
||||||
|
"""
|
||||||
|
block = _c2_config(config)
|
||||||
|
strategy = block.strategy
|
||||||
|
flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy)
|
||||||
|
module_info = _STRATEGY_TO_MODULE.get(strategy)
|
||||||
|
if flag_name is None or module_info is None:
|
||||||
|
# Defensive — config validation rejects unknown strategy labels
|
||||||
|
# at load (C2VprConfig.__post_init__), so this branch is only
|
||||||
|
# reachable if the resolution table and the validation set
|
||||||
|
# drift apart.
|
||||||
|
_LOG.error(
|
||||||
|
"c2.vpr.build_flag_off",
|
||||||
|
extra={"strategy": strategy, "reason": "unknown_strategy"},
|
||||||
|
)
|
||||||
|
raise StrategyNotAvailableError(
|
||||||
|
f"VprStrategy {strategy!r} is not buildable in this binary."
|
||||||
|
)
|
||||||
|
if not _is_build_flag_on(flag_name):
|
||||||
|
_LOG.error(
|
||||||
|
"c2.vpr.build_flag_off",
|
||||||
|
extra={"strategy": strategy, "flag": flag_name},
|
||||||
|
)
|
||||||
|
raise StrategyNotAvailableError(
|
||||||
|
f"BUILD_VPR_{strategy.upper()} is OFF for this binary; "
|
||||||
|
f"cannot select strategy={strategy}."
|
||||||
|
)
|
||||||
|
module_name, class_name = module_info
|
||||||
|
try:
|
||||||
|
module = __import__(module_name, fromlist=[class_name])
|
||||||
|
except ModuleNotFoundError as exc:
|
||||||
|
raise StrategyNotAvailableError(
|
||||||
|
f"VprStrategy {strategy!r} is configured but its concrete impl "
|
||||||
|
f"module {module_name!r} has not been built into this binary "
|
||||||
|
"yet (AZ-337 / AZ-338 / AZ-339 / AZ-340 pending)."
|
||||||
|
) from exc
|
||||||
|
create_fn = getattr(module, "create", None)
|
||||||
|
if create_fn is None:
|
||||||
|
strategy_cls = getattr(module, class_name)
|
||||||
|
instance = strategy_cls(
|
||||||
|
config,
|
||||||
|
descriptor_index=descriptor_index,
|
||||||
|
inference_runtime=inference_runtime,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
instance = create_fn(
|
||||||
|
config,
|
||||||
|
descriptor_index=descriptor_index,
|
||||||
|
inference_runtime=inference_runtime,
|
||||||
|
)
|
||||||
|
strategy_dim = instance.descriptor_dim()
|
||||||
|
corpus_dim = descriptor_index.descriptor_dim()
|
||||||
|
if strategy_dim != corpus_dim:
|
||||||
|
_LOG.error(
|
||||||
|
"c2.vpr.dim_mismatch",
|
||||||
|
extra={
|
||||||
|
"strategy": strategy,
|
||||||
|
"strategy_dim": strategy_dim,
|
||||||
|
"corpus_dim": corpus_dim,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
raise ConfigError(
|
||||||
|
f"descriptor_dim mismatch: strategy={strategy_dim}, "
|
||||||
|
f"corpus={corpus_dim}"
|
||||||
|
)
|
||||||
|
_LOG.info(
|
||||||
|
"c2.vpr.strategy_loaded",
|
||||||
|
extra={"strategy": strategy, "descriptor_dim": strategy_dim},
|
||||||
|
)
|
||||||
|
return instance
|
||||||
@@ -0,0 +1,528 @@
|
|||||||
|
"""AZ-336 — C2 VprStrategy Protocol + DTO + error + factory conformance.
|
||||||
|
|
||||||
|
Covers all 9 ACs of AZ-336 + the NFRs. The factory ACs (AC-3..AC-6)
|
||||||
|
substitute fake strategy modules at the ``sys.modules`` boundary so
|
||||||
|
the test never touches UltraVPR / NetVLAD / FAISS / TensorRT native
|
||||||
|
libraries.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gps_denied_onboard._types.vpr import VprCandidate, VprQuery, VprResult
|
||||||
|
from gps_denied_onboard.components.c2_vpr import (
|
||||||
|
C2VprConfig,
|
||||||
|
IndexUnavailableError,
|
||||||
|
VprBackboneError,
|
||||||
|
VprError,
|
||||||
|
VprPreprocessError,
|
||||||
|
VprStrategy,
|
||||||
|
)
|
||||||
|
from gps_denied_onboard.components.c2_vpr._preprocessor import BackbonePreprocessor
|
||||||
|
from gps_denied_onboard.components.c2_vpr.config import KNOWN_STRATEGIES
|
||||||
|
from gps_denied_onboard.config.schema import Config, ConfigError
|
||||||
|
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
|
||||||
|
from gps_denied_onboard.runtime_root.vpr_factory import build_vpr_strategy
|
||||||
|
|
||||||
|
|
||||||
|
_STRATEGY_MODULES: dict[str, tuple[str, str, str]] = {
|
||||||
|
"ultra_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.ultra_vpr",
|
||||||
|
"UltraVprStrategy",
|
||||||
|
"BUILD_VPR_ULTRA_VPR",
|
||||||
|
),
|
||||||
|
"net_vlad": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.net_vlad",
|
||||||
|
"NetVladStrategy",
|
||||||
|
"BUILD_VPR_NETVLAD",
|
||||||
|
),
|
||||||
|
"mega_loc": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.mega_loc",
|
||||||
|
"MegaLocStrategy",
|
||||||
|
"BUILD_VPR_MEGALOC",
|
||||||
|
),
|
||||||
|
"mix_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.mix_vpr",
|
||||||
|
"MixVprStrategy",
|
||||||
|
"BUILD_VPR_MIXVPR",
|
||||||
|
),
|
||||||
|
"sela_vpr": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.sela_vpr",
|
||||||
|
"SelaVprStrategy",
|
||||||
|
"BUILD_VPR_SELAVPR",
|
||||||
|
),
|
||||||
|
"eigen_places": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.eigen_places",
|
||||||
|
"EigenPlacesStrategy",
|
||||||
|
"BUILD_VPR_EIGENPLACES",
|
||||||
|
),
|
||||||
|
"salad": (
|
||||||
|
"gps_denied_onboard.components.c2_vpr.salad",
|
||||||
|
"SaladStrategy",
|
||||||
|
"BUILD_VPR_SALAD",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Fakes that structurally satisfy the VprStrategy + DescriptorIndex
|
||||||
|
# Protocols. Tests substitute these at the sys.modules boundary so no
|
||||||
|
# native library is loaded.
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeDescriptorIndex:
|
||||||
|
def __init__(self, dim: int = 512) -> None:
|
||||||
|
self._dim = dim
|
||||||
|
|
||||||
|
def search_topk(self, query, k):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def descriptor_dim(self):
|
||||||
|
return self._dim
|
||||||
|
|
||||||
|
def mmap_handle(self):
|
||||||
|
return Path("/tmp/fake.faiss")
|
||||||
|
|
||||||
|
def rebuild_from_descriptors(self, descriptors, tile_ids, hnsw_params):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def index_metadata(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeInferenceRuntime:
|
||||||
|
def load_engine(self, *args, **kwargs):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def infer(self, *args, **kwargs):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def warm_up(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def thermal_state(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class _FullVprStrategy:
|
||||||
|
def __init__(self, config, *, descriptor_index, inference_runtime, dim=512) -> None:
|
||||||
|
self._config = config
|
||||||
|
self._descriptor_index = descriptor_index
|
||||||
|
self._inference_runtime = inference_runtime
|
||||||
|
self._dim = dim
|
||||||
|
self._label = config.components["c2_vpr"].strategy
|
||||||
|
|
||||||
|
def embed_query(self, frame, calibration):
|
||||||
|
return VprQuery(
|
||||||
|
frame_id=getattr(frame, "frame_id", 0),
|
||||||
|
embedding=np.zeros((self._dim,), dtype=np.float32),
|
||||||
|
produced_at=1_000_000_000,
|
||||||
|
)
|
||||||
|
|
||||||
|
def retrieve_topk(self, query, k):
|
||||||
|
return VprResult(
|
||||||
|
frame_id=query.frame_id,
|
||||||
|
candidates=tuple(),
|
||||||
|
retrieved_at=1_000_000_000,
|
||||||
|
backbone_label=self._label,
|
||||||
|
)
|
||||||
|
|
||||||
|
def descriptor_dim(self):
|
||||||
|
return self._dim
|
||||||
|
|
||||||
|
|
||||||
|
class _PartialVprStrategy:
|
||||||
|
def embed_query(self, frame, calibration):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def retrieve_topk(self, query, k):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
def _config_with_strategy(strategy: str) -> Config:
|
||||||
|
return Config.with_blocks(c2_vpr=C2VprConfig(strategy=strategy))
|
||||||
|
|
||||||
|
|
||||||
|
def _install_fake_strategy(strategy_label: str, *, dim: int = 512) -> type:
|
||||||
|
module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label]
|
||||||
|
|
||||||
|
class _FakeStrategy(_FullVprStrategy):
|
||||||
|
def __init__(self, config, **kwargs) -> None:
|
||||||
|
super().__init__(config, dim=dim, **kwargs)
|
||||||
|
|
||||||
|
_FakeStrategy.__name__ = class_name
|
||||||
|
module = types.ModuleType(module_name)
|
||||||
|
setattr(module, class_name, _FakeStrategy)
|
||||||
|
sys.modules[module_name] = module
|
||||||
|
return _FakeStrategy
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def strategy_module_cleanup():
|
||||||
|
"""Pop every fake strategy module before/after each factory test."""
|
||||||
|
for module_name, _, _ in _STRATEGY_MODULES.values():
|
||||||
|
sys.modules.pop(module_name, None)
|
||||||
|
yield
|
||||||
|
for module_name, _, _ in _STRATEGY_MODULES.values():
|
||||||
|
sys.modules.pop(module_name, None)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-1: Protocol conformance — full satisfies, partial does not.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac1_vpr_strategy_conformance_full() -> None:
|
||||||
|
instance = _FullVprStrategy(
|
||||||
|
_config_with_strategy("net_vlad"),
|
||||||
|
descriptor_index=_FakeDescriptorIndex(),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert isinstance(instance, VprStrategy)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac1_vpr_strategy_conformance_partial_missing_methods() -> None:
|
||||||
|
assert not isinstance(_PartialVprStrategy(), VprStrategy)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-2: frozen+slotted DTOs reject mutation and forbid __dict__.
|
||||||
|
|
||||||
|
|
||||||
|
def _make_query(frame_id: int = 7, dim: int = 512) -> VprQuery:
|
||||||
|
return VprQuery(
|
||||||
|
frame_id=frame_id,
|
||||||
|
embedding=np.zeros((dim,), dtype=np.float32),
|
||||||
|
produced_at=1_000_000_000,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_candidate() -> VprCandidate:
|
||||||
|
return VprCandidate(
|
||||||
|
tile_id=(18, 49.9, 36.3),
|
||||||
|
descriptor_distance=0.123,
|
||||||
|
descriptor_dim=512,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_result(frame_id: int = 7) -> VprResult:
|
||||||
|
return VprResult(
|
||||||
|
frame_id=frame_id,
|
||||||
|
candidates=(_make_candidate(),),
|
||||||
|
retrieved_at=1_000_000_000,
|
||||||
|
backbone_label="net_vlad",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"dto, field_name, new_value",
|
||||||
|
[
|
||||||
|
(_make_query(), "frame_id", 99),
|
||||||
|
(_make_candidate(), "descriptor_distance", 0.9),
|
||||||
|
(_make_result(), "backbone_label", "ultra_vpr"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None:
|
||||||
|
original_value = getattr(dto, field_name)
|
||||||
|
with pytest.raises(dataclasses.FrozenInstanceError):
|
||||||
|
setattr(dto, field_name, new_value)
|
||||||
|
assert getattr(dto, field_name) == original_value
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cls", [VprQuery, VprCandidate, VprResult])
|
||||||
|
def test_ac2_dtos_have_slots(cls) -> None:
|
||||||
|
assert hasattr(cls, "__slots__"), f"{cls.__name__} must use slots=True"
|
||||||
|
assert cls.__slots__, f"{cls.__name__}.__slots__ must be non-empty"
|
||||||
|
instance = (
|
||||||
|
_make_query()
|
||||||
|
if cls is VprQuery
|
||||||
|
else _make_candidate()
|
||||||
|
if cls is VprCandidate
|
||||||
|
else _make_result()
|
||||||
|
)
|
||||||
|
assert not hasattr(instance, "__dict__"), (
|
||||||
|
f"{cls.__name__} carries a __dict__ — slots=True is missing"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-3: factory rejects missing build flag.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac3_factory_rejects_missing_build_flag(
|
||||||
|
monkeypatch, strategy_module_cleanup, caplog
|
||||||
|
) -> None:
|
||||||
|
strategy = "ultra_vpr"
|
||||||
|
_, _, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.delenv(flag, raising=False)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_vpr"):
|
||||||
|
with pytest.raises(StrategyNotAvailableError) as exc_info:
|
||||||
|
build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=_FakeDescriptorIndex(),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert "BUILD_VPR_ULTRA_VPR is OFF" in str(exc_info.value)
|
||||||
|
assert any(
|
||||||
|
r.message == "c2.vpr.build_flag_off"
|
||||||
|
for r in caplog.records
|
||||||
|
), "ERROR log kind=c2.vpr.build_flag_off must be emitted"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
|
||||||
|
def test_ac3_factory_does_not_load_module_when_flag_off(
|
||||||
|
monkeypatch, strategy_module_cleanup, strategy
|
||||||
|
) -> None:
|
||||||
|
module_name, _, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.delenv(flag, raising=False)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
with pytest.raises(StrategyNotAvailableError):
|
||||||
|
build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=_FakeDescriptorIndex(),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert module_name not in sys.modules, (
|
||||||
|
f"{module_name} must NOT be in sys.modules when its BUILD flag is OFF"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-4: factory rejects descriptor_dim mismatch.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac4_factory_rejects_dim_mismatch(
|
||||||
|
monkeypatch, strategy_module_cleanup, caplog
|
||||||
|
) -> None:
|
||||||
|
strategy = "ultra_vpr"
|
||||||
|
_, _, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.setenv(flag, "ON")
|
||||||
|
_install_fake_strategy(strategy, dim=512)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_vpr"):
|
||||||
|
with pytest.raises(ConfigError) as exc_info:
|
||||||
|
build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=_FakeDescriptorIndex(dim=4096),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert "descriptor_dim mismatch: strategy=512, corpus=4096" in str(
|
||||||
|
exc_info.value
|
||||||
|
)
|
||||||
|
assert any(
|
||||||
|
r.message == "c2.vpr.dim_mismatch"
|
||||||
|
for r in caplog.records
|
||||||
|
), "ERROR log kind=c2.vpr.dim_mismatch must be emitted"
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-5: successful factory load emits INFO log with structured fields.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac5_factory_emits_info_log_on_success(
|
||||||
|
monkeypatch, strategy_module_cleanup, caplog
|
||||||
|
) -> None:
|
||||||
|
strategy = "ultra_vpr"
|
||||||
|
_, _, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.setenv(flag, "ON")
|
||||||
|
_install_fake_strategy(strategy, dim=512)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c2_vpr"):
|
||||||
|
instance = build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=_FakeDescriptorIndex(dim=512),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert isinstance(instance, VprStrategy)
|
||||||
|
records = [
|
||||||
|
r for r in caplog.records if r.message == "c2.vpr.strategy_loaded"
|
||||||
|
]
|
||||||
|
assert len(records) == 1, "Exactly one strategy_loaded INFO log expected"
|
||||||
|
record = records[0]
|
||||||
|
assert getattr(record, "strategy", None) == "ultra_vpr"
|
||||||
|
assert getattr(record, "descriptor_dim", None) == 512
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-6: every entry in the resolution table resolves to its module path.
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES))
|
||||||
|
def test_ac6_strategy_resolution_table(
|
||||||
|
monkeypatch, strategy_module_cleanup, strategy
|
||||||
|
) -> None:
|
||||||
|
module_name, class_name, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.setenv(flag, "ON")
|
||||||
|
fake_cls = _install_fake_strategy(strategy, dim=512)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
instance = build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=_FakeDescriptorIndex(dim=512),
|
||||||
|
inference_runtime=_FakeInferenceRuntime(),
|
||||||
|
)
|
||||||
|
assert isinstance(instance, fake_cls)
|
||||||
|
assert isinstance(instance, VprStrategy)
|
||||||
|
assert sys.modules[module_name] is not None
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-7: error hierarchy — every concrete error is catchable as VprError.
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"exc_factory",
|
||||||
|
[VprBackboneError, VprPreprocessError, IndexUnavailableError],
|
||||||
|
)
|
||||||
|
def test_ac7_all_vpr_errors_caught_as_family(exc_factory) -> None:
|
||||||
|
with pytest.raises(VprError):
|
||||||
|
raise exc_factory("boom")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac7_unrelated_exception_not_caught_as_family() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
try:
|
||||||
|
raise ValueError("not us")
|
||||||
|
except VprError:
|
||||||
|
pytest.fail("ValueError must not be caught as VprError")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac7_strategy_not_available_outside_family() -> None:
|
||||||
|
with pytest.raises(StrategyNotAvailableError):
|
||||||
|
try:
|
||||||
|
raise StrategyNotAvailableError("composition-time")
|
||||||
|
except VprError:
|
||||||
|
pytest.fail(
|
||||||
|
"StrategyNotAvailableError is a composition-root error "
|
||||||
|
"and MUST NOT be in the c2 VprError family"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-8: Public API surface — re-exports + BackbonePreprocessor exclusion.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac8_public_api_re_exports() -> None:
|
||||||
|
from gps_denied_onboard.components import c2_vpr
|
||||||
|
|
||||||
|
assert "VprStrategy" in c2_vpr.__all__
|
||||||
|
assert "VprQuery" in c2_vpr.__all__
|
||||||
|
assert "VprCandidate" in c2_vpr.__all__
|
||||||
|
assert "VprResult" in c2_vpr.__all__
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac8_backbone_preprocessor_not_in_public_api() -> None:
|
||||||
|
from gps_denied_onboard.components import c2_vpr
|
||||||
|
|
||||||
|
assert "BackbonePreprocessor" not in c2_vpr.__all__
|
||||||
|
assert not hasattr(c2_vpr, "BackbonePreprocessor"), (
|
||||||
|
"BackbonePreprocessor is C2-internal per description.md § 6 and "
|
||||||
|
"MUST NOT be re-exported from c2_vpr/__init__.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac8_backbone_preprocessor_protocol_is_runtime_checkable() -> None:
|
||||||
|
# BackbonePreprocessor is internal but still a Protocol; tests in
|
||||||
|
# AZ-337..AZ-340 will use isinstance against it.
|
||||||
|
|
||||||
|
class _OkPreprocessor:
|
||||||
|
def preprocess(self, frame, calibration):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def input_shape(self):
|
||||||
|
return (224, 224)
|
||||||
|
|
||||||
|
assert isinstance(_OkPreprocessor(), BackbonePreprocessor)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Config validation — unknown strategy label is rejected at load.
|
||||||
|
# (AC-9 single-thread binding is deferred per AZ-336 task spec Risk 4;
|
||||||
|
# the generic compose_root thread-binding registry referenced by AC-9
|
||||||
|
# has not materialised — each factory owns its own thread binding
|
||||||
|
# today, e.g. ``runtime_root.fc_factory.clear_outbound_thread_binding``.
|
||||||
|
# AC-9 ships with AZ-270's registry or its replacement; this task
|
||||||
|
# delivers AC-1..AC-8 + NFRs in line with the spec's escape clause.)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"bad_label",
|
||||||
|
["ULTRA_VPR", "ultraVpr", "openVLAD", "", "vins_mono"],
|
||||||
|
)
|
||||||
|
def test_unknown_strategy_rejected_at_config_load(bad_label: str) -> None:
|
||||||
|
with pytest.raises(ConfigError) as exc_info:
|
||||||
|
C2VprConfig(strategy=bad_label)
|
||||||
|
msg = str(exc_info.value)
|
||||||
|
for valid in KNOWN_STRATEGIES:
|
||||||
|
assert valid in msg
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# NFRs.
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"exc_type",
|
||||||
|
[VprBackboneError, VprPreprocessError, IndexUnavailableError],
|
||||||
|
)
|
||||||
|
def test_nfr_reliability_all_vpr_errors_subclass_family(exc_type) -> None:
|
||||||
|
assert issubclass(exc_type, VprError)
|
||||||
|
|
||||||
|
|
||||||
|
def test_nfr_reliability_strategy_not_available_not_in_family() -> None:
|
||||||
|
assert not issubclass(StrategyNotAvailableError, VprError)
|
||||||
|
|
||||||
|
|
||||||
|
def test_nfr_perf_factory_under_50ms_p99(
|
||||||
|
monkeypatch, strategy_module_cleanup
|
||||||
|
) -> None:
|
||||||
|
"""Factory p99 ≤ 50 ms across 100 calls (NFR-perf-factory)."""
|
||||||
|
strategy = "net_vlad"
|
||||||
|
_, _, flag = _STRATEGY_MODULES[strategy]
|
||||||
|
monkeypatch.setenv(flag, "ON")
|
||||||
|
_install_fake_strategy(strategy, dim=512)
|
||||||
|
config = _config_with_strategy(strategy)
|
||||||
|
descriptor_index = _FakeDescriptorIndex(dim=512)
|
||||||
|
inference_runtime = _FakeInferenceRuntime()
|
||||||
|
|
||||||
|
durations_ms: list[float] = []
|
||||||
|
for _ in range(100):
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
build_vpr_strategy(
|
||||||
|
config,
|
||||||
|
descriptor_index=descriptor_index,
|
||||||
|
inference_runtime=inference_runtime,
|
||||||
|
)
|
||||||
|
durations_ms.append((time.perf_counter() - t0) * 1000.0)
|
||||||
|
|
||||||
|
durations_ms.sort()
|
||||||
|
p99 = durations_ms[int(0.99 * len(durations_ms))]
|
||||||
|
assert p99 <= 50.0, (
|
||||||
|
f"build_vpr_strategy() p99={p99:.3f} ms exceeds 50 ms NFR"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Surface coverage — config defaults round-trip.
|
||||||
|
|
||||||
|
|
||||||
|
def test_c2_config_default_strategy_is_net_vlad() -> None:
|
||||||
|
cfg = C2VprConfig()
|
||||||
|
assert cfg.strategy == "net_vlad"
|
||||||
|
|
||||||
|
|
||||||
|
def test_c2_config_paths_coerce_to_path() -> None:
|
||||||
|
cfg = C2VprConfig(
|
||||||
|
backbone_weights_path="/tmp/weights", # type: ignore[arg-type]
|
||||||
|
faiss_index_path="/tmp/index.faiss", # type: ignore[arg-type]
|
||||||
|
)
|
||||||
|
assert isinstance(cfg.backbone_weights_path, Path)
|
||||||
|
assert isinstance(cfg.faiss_index_path, Path)
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
"""C2 VPR smoke test — AC-9."""
|
|
||||||
|
|
||||||
|
|
||||||
def test_interface_importable() -> None:
|
|
||||||
# Assert
|
|
||||||
from gps_denied_onboard.components.c2_vpr import VprQuery, VprResult, VprStrategy
|
|
||||||
|
|
||||||
assert VprStrategy is not None
|
|
||||||
assert VprQuery is not None
|
|
||||||
assert VprResult is not None
|
|
||||||
Reference in New Issue
Block a user