[AZ-342] C2.5 ReRankStrategy: Protocol + DTOs + factory + composition

Foundational scaffolding for the InlierCountReRanker (AZ-343) and
the future C3 CrossDomainMatcher consumer (AZ-344). No concrete
re-ranker is implemented here.

* ReRankStrategy Protocol (single rerank(frame, vpr_result, n,
  calibration) -> RerankResult method) with all 8 invariants in the
  docstring — notably INV-8 drop-and-continue (per-candidate failure
  NEVER propagates unless every candidate fails).
* DTOs moved to L1 _types/rerank.py — RerankCandidate, RerankResult;
  frozen+slots; tuple-not-list for RerankResult.candidates; tile_id
  encoded as (zoom_level, lat, lon) tuple to keep _types/ free of any
  c6_tile_cache (L3) import per module-layout.md.
* Error family: RerankError + RerankBackboneError +
  RerankAllCandidatesFailedError. Only RerankAllCandidatesFailedError
  escapes rerank(); RerankBackboneError is caught inside the per-
  candidate loop, logged ERROR, FDR-stamped, candidate dropped.
* C2_5RerankConfig (strategy enum default "inlier_count", top_n int
  default 3) with strict validation at load; registered into
  Config.components on c2_5_rerank import.
* build_rerank_strategy(config, *, tile_store, lightglue_runtime)
  factory: 1-strategy resolution table, lazy import,
  BUILD_RERANK_<variant> gate, ImportError → StrategyNotAvailableError
  mapping. The shared LightGlueRuntime is constructor-injected
  (R14 fix: neither C2.5 nor C3 owns its lifecycle).

Renamed the Protocol from the existing stub "RerankStrategy" to
"ReRankStrategy" to match the contract; updated module-layout.md.
Removed the legacy RerankResult shape from _types/vpr.py — the
v1.0.0 shape lives in _types/rerank.py.

Excluded per task spec:
* Concrete InlierCountReRanker (AZ-343).
* C3 matcher protocol task (AZ-344, next in batch).
* AC-9 single-thread binding + AC-10 LightGlueRuntime identity-share
  between C2.5/C3 — deferred per task spec Risk 3 until the generic
  compose_root thread-binding registry and the C3 factory both land.

Tests: AC-1..AC-8 + AC-11 + NFR-perf-factory in
tests/unit/c2_5_rerank/test_protocol_conformance.py. The legacy
smoke test is removed. Full sweep: 997 passed (one pre-existing
flake in test_az296_takeoff_abort, subprocess timing, unrelated to
this commit; passes in isolation).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 05:31:27 +03:00
parent 3665acef66
commit d6756f1855
12 changed files with 871 additions and 54 deletions
@@ -4,7 +4,7 @@
**Producer task**: AZ-342 (`ReRankStrategy` Protocol + factory + composition)
**Consumer tasks**: AZ-343 (`InlierCountReRanker` impl); downstream c3_matcher (epic AZ-257 / E-C3 — TBD at AZ-257 decompose time) which consumes `RerankResult`
**Version**: 1.0.0
**Status**: draft, awaiting AZ-342 implementation
**Status**: v1.0.0 (AZ-342 implemented 2026-05-12)
**Last Updated**: 2026-05-10
**Module-layout home**: `src/gps_denied_onboard/components/c2_5_rerank/interface.py` (Protocol), `src/gps_denied_onboard/components/c2_5_rerank/__init__.py` (re-exports), `src/gps_denied_onboard/runtime_root/rerank_factory.py` (factory)
@@ -75,31 +75,29 @@ class ReRankStrategy(Protocol):
```python
from dataclasses import dataclass
from uuid import UUID
import numpy as np
@dataclass(frozen=True, slots=True)
class RerankCandidate:
"""One re-rank survivor. Carries the C2-stage descriptor_distance forward for FDR provenance plus the new inlier_count from single-pair LightGlue."""
tile_id: tuple # composite (zoomLevel, lat, lon); see C6 TileRecord
inlier_count: int # single-pair LightGlue inliers; > 0 for any survivor
descriptor_distance: float # carried forward from C2's VprCandidate
descriptor_dim: int # carried forward from C2 for sanity assertions
tile_pixels_handle: object # opaque page-cache-backed pixel reference; see C6 TileStore contract
tile_id: tuple[int, float, float] # composite (zoom_level, lat, lon); matches c6_tile_cache.TileId. tuple form keeps _types/ free of an L1→L3 import per module-layout.md.
inlier_count: int # single-pair LightGlue inliers; > 0 for any survivor
descriptor_distance: float # carried forward from C2's VprCandidate
descriptor_dim: int # carried forward from C2 for sanity assertions
tile_pixels_handle: object # opaque page-cache-backed pixel reference; see C6 TileStore contract
@dataclass(frozen=True, slots=True)
class RerankResult:
"""Top-N survivors from `ReRankStrategy.rerank`. Consumed by C3 CrossDomainMatcher."""
frame_id: UUID
candidates: list[RerankCandidate] # 0 < len <= n; sorted descending by inlier_count, ties broken by descriptor_distance ascending
reranked_at: int # monotonic_ns
rerank_label: str # non-empty; matches BUILD_RERANK_<variant> lowercase (e.g., "inlier_count")
candidates_input: int # len(vpr_result.candidates) at entry — for FDR observability
candidates_dropped: int # candidates_input - len(candidates)
frame_id: int # echoes NavCameraFrame.frame_id (int across the pipeline)
candidates: tuple[RerankCandidate, ...] # 0 < len <= n; descending by inlier_count, ties broken by descriptor_distance ascending. tuple (not list) so the frozen+slots invariant holds.
reranked_at: int # monotonic_ns from injected Clock
rerank_label: str # non-empty; matches BUILD_RERANK_<variant> lowercase
candidates_input: int # len(vpr_result.candidates) at entry — for FDR observability
candidates_dropped: int # candidates_input - len(candidates)
```
### Error Hierarchy (in `c2_5_rerank/errors.py`)
+2 -2
View File
@@ -57,8 +57,8 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- **Epic**: AZ-256 (E-C2.5 Rerank)
- **Directory**: `src/gps_denied_onboard/components/c2_5_rerank/`
- **Public API**:
- `__init__.py` (re-exports `RerankStrategy`, `RerankResult`)
- `interface.py` (`RerankStrategy` Protocol)
- `__init__.py` (re-exports `ReRankStrategy`, `RerankResult`, `RerankCandidate`)
- `interface.py` (`ReRankStrategy` Protocol)
- **Internal**:
- `inlier_based_reranker.py` (single-pair LightGlue inlier count K=10→N=3)
- **Owns**: `src/gps_denied_onboard/components/c2_5_rerank/**`, `tests/unit/c2_5_rerank/**`
+67
View File
@@ -0,0 +1,67 @@
"""C2.5 rerank DTOs (L1 cross-component layer; AZ-342).
The two-DTO surface is frozen by
``contracts/c2_5_rerank/rerank_strategy_protocol.md`` v1.0.0:
slotted, immutable, ``produced_at`` stamped with the producer's
``monotonic_ns`` so the C13 FDR record can correlate without a
wall-clock dependency.
:class:`RerankCandidate.tile_id` is a plain
``tuple[int, float, float]`` of ``(zoom_level, lat, lon)`` —
identical encoding to :class:`VprCandidate.tile_id` — keeping the L1
layer free of an L1→L3 import per ``module-layout.md`` (consumers
reconstruct :class:`gps_denied_onboard.components.c6_tile_cache.TileId`
at the C6 boundary).
:class:`RerankCandidate.tile_pixels_handle` is intentionally typed
``object``: C6 owns the actual handle type and the rerank Protocol
treats it as opaque per Invariant 6 (the handle is a reference, NOT
a copy — copying tile pixels would defeat AC-4.1's latency budget).
"""
from __future__ import annotations
from dataclasses import dataclass
__all__ = ["RerankCandidate", "RerankResult"]
@dataclass(frozen=True, slots=True)
class RerankCandidate:
"""One re-rank survivor.
Carries the C2-stage ``descriptor_distance`` + ``descriptor_dim``
forward unchanged (INV-5) so the FDR record retains the full
provenance chain. ``inlier_count`` is the new field produced by
the single-pair LightGlue forward at re-rank time; ``> 0`` for
every survivor.
"""
tile_id: tuple[int, float, float]
inlier_count: int
descriptor_distance: float
descriptor_dim: int
tile_pixels_handle: object
@dataclass(frozen=True, slots=True)
class RerankResult:
"""Top-N survivors from :meth:`ReRankStrategy.rerank`.
Consumed by C3 CrossDomainMatcher. ``candidates`` is a tuple
(not a list) so the frozen+slots invariant truly holds — a frozen
dataclass holding a mutable list lets consumers mutate it; the
tuple closes that door.
``candidates_input`` / ``candidates_dropped`` make the
drop-and-continue accounting (INV-8) observable per-frame so a
post-flight aggregate alert can flag flights whose
``candidates_dropped`` p95 climbs.
"""
frame_id: int
candidates: tuple[RerankCandidate, ...]
reranked_at: int
rerank_label: str
candidates_input: int
candidates_dropped: int
+5 -17
View File
@@ -1,15 +1,13 @@
"""C2 VPR + C2.5 rerank DTOs (L1 cross-component layer).
"""C2 VPR DTOs (L1 cross-component layer; AZ-336).
The C2 trio (:class:`VprQuery`, :class:`VprCandidate`, :class:`VprResult`)
is frozen by ``contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0 (AZ-336):
The trio (:class:`VprQuery`, :class:`VprCandidate`, :class:`VprResult`)
is frozen by ``contracts/c2_vpr/vpr_strategy_protocol.md`` v1.0.0:
slotted, immutable, no defaults, and stamped with the producer's
``monotonic_ns`` so the C13 FDR record can correlate the embed→retrieve
hop without a wall-clock dependency.
``RerankResult`` is the legacy C2.5 stub kept untouched for the AZ-342
contract rewrite — touching it here would expand AZ-336 scope into a
sibling task. Once AZ-342 lands, ``RerankResult`` moves to its v1.0.0
shape.
C2.5 rerank DTOs live in :mod:`gps_denied_onboard._types.rerank` (AZ-342);
this module no longer re-exports them.
"""
from __future__ import annotations
@@ -17,7 +15,6 @@ from __future__ import annotations
from dataclasses import dataclass
__all__ = [
"RerankResult",
"VprCandidate",
"VprQuery",
"VprResult",
@@ -88,12 +85,3 @@ class VprResult:
candidates: tuple[VprCandidate, ...]
retrieved_at: int
backbone_label: str
@dataclass(frozen=True)
class RerankResult:
"""C2.5 reranked candidates — legacy shape, AZ-342 owns the rewrite."""
query_frame_id: int
candidate_tile_ids: tuple[str, ...]
inlier_counts: tuple[int, ...]
@@ -1,6 +1,42 @@
"""C2.5 Rerank component — Public API."""
"""C2.5 ReRank — Public API (AZ-342).
from gps_denied_onboard._types.vpr import RerankResult
from gps_denied_onboard.components.c2_5_rerank.interface import RerankStrategy
Per ``rerank_strategy_protocol.md`` v1.0.0 the public surface
consists of:
__all__ = ["RerankResult", "RerankStrategy"]
- :class:`ReRankStrategy` Protocol (one method).
- DTOs re-exported from :mod:`gps_denied_onboard._types.rerank` (the
L1 home for cross-component DTOs): :class:`RerankCandidate`,
:class:`RerankResult`.
- Error family rooted at :class:`RerankError`; two documented
subtypes (:class:`RerankBackboneError`,
:class:`RerankAllCandidatesFailedError`).
- Config block :class:`C2_5RerankConfig` (registered on import).
Concrete strategy (``InlierCountReRanker``, AZ-343) lives in a
sibling module and is imported lazily by
:mod:`gps_denied_onboard.runtime_root.rerank_factory` — Risk-2
mitigation: this ``__init__.py`` MUST NOT import any concrete
strategy module.
"""
from gps_denied_onboard._types.rerank import RerankCandidate, RerankResult
from gps_denied_onboard.components.c2_5_rerank.config import C2_5RerankConfig
from gps_denied_onboard.components.c2_5_rerank.errors import (
RerankAllCandidatesFailedError,
RerankBackboneError,
RerankError,
)
from gps_denied_onboard.components.c2_5_rerank.interface import ReRankStrategy
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c2_5_rerank", C2_5RerankConfig)
__all__ = [
"C2_5RerankConfig",
"ReRankStrategy",
"RerankAllCandidatesFailedError",
"RerankBackboneError",
"RerankCandidate",
"RerankError",
"RerankResult",
]
@@ -0,0 +1,54 @@
"""C2.5 ReRankStrategy config block (AZ-342).
Registered into ``config.components['c2_5_rerank']`` by the package
``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.rerank_factory.build_rerank_strategy`
reads this block to select the strategy and configure the top-N cut.
``top_n`` is the strategy-side cap on the returned
:attr:`RerankResult.candidates` length; the composition root binds
``n`` per-frame from this value (default 3 per the epic's K=10 → N=3
spec).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Final
from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"C2_5RerankConfig",
"KNOWN_STRATEGIES",
]
KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset({"inlier_count"})
@dataclass(frozen=True)
class C2_5RerankConfig:
"""Per-component config for C2.5 ReRank.
``strategy`` selects exactly one of the registered re-rankers
(today only ``inlier_count``); the composition-root factory
respects compile-time ``BUILD_RERANK_<variant>`` gating on top
of this label.
``top_n`` is the per-frame N cap (1..K-1). Default 3 (the epic's
K=10 → N=3 spec).
"""
strategy: str = "inlier_count"
top_n: int = 3
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STRATEGIES:
raise ConfigError(
f"C2_5RerankConfig.strategy={self.strategy!r} not in "
f"{sorted(KNOWN_STRATEGIES)}"
)
if self.top_n < 1:
raise ConfigError(
f"C2_5RerankConfig.top_n must be >= 1; got {self.top_n}"
)
@@ -0,0 +1,56 @@
"""C2.5 ReRankStrategy error taxonomy (AZ-342).
The family is intentionally narrow: a per-candidate failure is the
normal case (drop-and-continue, INV-8) and is signalled via
``candidates_dropped`` in the returned :class:`RerankResult` —
NOT via an exception. An exception escapes ``rerank`` only when
EVERY candidate fails (:class:`RerankAllCandidatesFailedError`)
which is the C5 → VIO-only-fallback trigger per AC-3.5.
:class:`RerankBackboneError` is raised INSIDE the per-candidate loop,
caught by the strategy, logged ERROR, FDR-stamped, and the
candidate is dropped. It is exposed publicly so the per-candidate
log + FDR taxonomy is observable and so future re-rankers using a
different backbone can re-raise the same kind.
``TileFetchError`` is C6-owned
(``c6_tile_cache.errors.TileNotFoundError`` / ``TileFsError``); the
strategy catches it in the per-candidate loop and treats it
identically to :class:`RerankBackboneError`.
"""
from __future__ import annotations
__all__ = [
"RerankAllCandidatesFailedError",
"RerankBackboneError",
"RerankError",
]
class RerankError(Exception):
"""Base class for the C2.5 rerank error family.
Caught at the runtime root only when
:class:`RerankAllCandidatesFailedError` fires; per-candidate
failures stay inside the strategy.
"""
class RerankBackboneError(RerankError):
"""Per-candidate LightGlue forward-pass failure.
CUDA OOM, TRT engine deserialize mismatch. Logged at ERROR; one
FDR record per occurrence; the offending candidate is dropped
from the rerank set; the surrounding ``rerank`` call continues
with the remaining candidates (INV-8).
"""
class RerankAllCandidatesFailedError(RerankError):
"""Zero survivors after the per-candidate loop.
Every candidate's LightGlue or tile fetch failed. Logged at
ERROR; FDR record ``kind=rerank.all_failed``. C5 falls back to
VIO-only with provenance ``visual_propagated`` (AC-3.5).
"""
@@ -1,17 +1,98 @@
"""C2.5 `RerankStrategy` Protocol.
"""C2.5 ``ReRankStrategy`` Protocol (AZ-342).
Default: `InlierBasedReranker` (single-pair LightGlue inlier counter, K=10 → N=3).
See `_docs/02_document/components/03_c2_5_rerank/`.
PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; a single
``rerank`` method that consumes a C2 :class:`VprResult` and produces
a :class:`RerankResult` ranked by single-pair LightGlue inlier count.
Concrete impl — :class:`InlierCountReRanker` (AZ-343) — lives in a
sibling module and is imported lazily by
:mod:`gps_denied_onboard.runtime_root.rerank_factory`.
The contract at
``_docs/02_document/contracts/c2_5_rerank/rerank_strategy_protocol.md``
v1.0.0 is the authoritative shape; this module mirrors it 1:1.
"""
from __future__ import annotations
from typing import Protocol
from typing import TYPE_CHECKING, Protocol, runtime_checkable
from gps_denied_onboard._types.vpr import RerankResult, VprResult
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.rerank import RerankResult
from gps_denied_onboard._types.vpr import VprResult
__all__ = ["ReRankStrategy"]
class RerankStrategy(Protocol):
"""Re-rank C2's top-K candidates down to N via cross-domain match scoring."""
@runtime_checkable
class ReRankStrategy(Protocol):
"""Single-camera re-rank strategy.
def rerank(self, vpr_result: VprResult, n_keep: int = 3) -> RerankResult: ...
Stateless per-frame; the only persistent state is the
constructor-injected
:class:`gps_denied_onboard.helpers.lightglue_runtime.LightGlueRuntime`
helper handle and the :class:`TileStore` Public API reference.
Invariants (see ``rerank_strategy_protocol.md`` v1.0.0):
- **INV-1 single-threaded** — each instance is bound to one
ingest thread; the shared ``LightGlueRuntime`` requires serial
access. Concurrent :meth:`rerank` calls on a single instance
race the GPU stream.
- **INV-2 stateless per-frame** — same inputs → same surviving
candidates in same order.
- **INV-3 top-N descending by inlier_count** — ties broken
deterministically by ``descriptor_distance`` ascending (the
C2-stage value carried forward).
- **INV-4 candidates length bounded** — ``0 < len <= n`` when
returned (zero raises :class:`RerankAllCandidatesFailedError`);
never exceeds ``n``; never exceeds
``len(vpr_result.candidates)``.
- **INV-5 descriptor_distance carried forward unchanged** — the
C2-stage value is preserved on every survivor for FDR
provenance.
- **INV-6 tile_pixels_handle is a reference, NOT a copy** —
``RerankCandidate.tile_pixels_handle`` is the same handle
returned by ``TileStore.read_tile_pixels`` (page-cache
backed).
- **INV-7 deterministic per tuple** — same ``(frame,
vpr_result, corpus, helper)`` → bit-identical
:class:`RerankResult`.
- **INV-8 drop-and-continue** — a per-candidate exception
NEVER propagates out of :meth:`rerank` unless EVERY candidate
fails. C3 relies on this partial-input tolerance.
Error envelope: only :class:`RerankAllCandidatesFailedError`
escapes :meth:`rerank`; per-candidate
:class:`RerankBackboneError` / ``TileFetchError`` from C6 are
caught inside the loop and turned into dropped candidates +
ERROR logs + per-occurrence FDR records.
"""
def rerank(
self,
frame: "NavCameraFrame",
vpr_result: "VprResult",
n: int,
calibration: "CameraCalibration",
) -> "RerankResult":
"""Re-rank the top-K candidates down to top-N by inlier count.
For each ``candidate`` in ``vpr_result.candidates``:
1. Fetch tile pixels via ``TileStore.read_tile_pixels(candidate.tile_id)``.
2. Run a single-pair LightGlue forward via the shared
:class:`LightGlueRuntime` (frame ↔ tile).
3. Record the inlier count.
Sort candidates descending by inlier count; return the top-N
as a :class:`RerankResult`. Drop-and-continue semantics
apply per INV-8.
Raises:
RerankAllCandidatesFailedError: zero survivors after
the per-candidate loop.
"""
...
@@ -0,0 +1,142 @@
"""C2.5 ReRank strategy composition-root factory (AZ-342).
:func:`build_rerank_strategy` selects exactly one strategy by
``config.components['c2_5_rerank'].strategy`` and respects
compile-time ``BUILD_RERANK_<variant>`` gating: requesting a
strategy whose flag is OFF raises
:class:`StrategyNotAvailableError` at composition time (NOT at
first frame).
The shared :class:`LightGlueRuntime` is constructor-injected — the
factory does NOT own its lifecycle. The runtime root constructs ONE
``LightGlueRuntime`` instance and passes the same reference to both
this factory (C2.5) and the future C3 matcher factory (R14 fix; see
``description.md`` § 6).
Concrete strategy modules are imported lazily — a Tier-0 workstation
build with ``BUILD_RERANK_INLIER_COUNT=OFF`` MUST NOT load
``c2_5_rerank.inlier_based_reranker`` (ADR-002 / I-5; verifiable via
``sys.modules``).
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
if TYPE_CHECKING:
from gps_denied_onboard.components.c2_5_rerank import (
C2_5RerankConfig,
ReRankStrategy,
)
from gps_denied_onboard.components.c6_tile_cache import TileStore
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.helpers.lightglue_runtime import LightGlueRuntime
__all__ = ["build_rerank_strategy"]
_LOG = logging.getLogger("gps_denied_onboard.c2_5_rerank")
# Strategy resolution table — mirrors the contract's
# ``rerank_strategy_protocol.md`` v1.0.0 § Composition-Root Factory
# table verbatim. ANY mutation here MUST be mirrored in the contract.
_STRATEGY_TO_BUILD_FLAG: dict[str, str] = {
"inlier_count": "BUILD_RERANK_INLIER_COUNT",
}
_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = {
"inlier_count": (
"gps_denied_onboard.components.c2_5_rerank.inlier_based_reranker",
"InlierCountReRanker",
),
}
def _is_build_flag_on(flag_name: str) -> bool:
raw = os.environ.get(flag_name, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}
def _c2_5_config(config: "Config") -> "C2_5RerankConfig":
return config.components["c2_5_rerank"]
def build_rerank_strategy(
config: "Config",
*,
tile_store: "TileStore",
lightglue_runtime: "LightGlueRuntime",
) -> "ReRankStrategy":
"""Construct the :class:`ReRankStrategy` impl selected by config.
1. Reads ``config.components['c2_5_rerank'].strategy``.
2. Checks the matching ``BUILD_RERANK_<variant>`` flag — if OFF,
raises :class:`StrategyNotAvailableError` BEFORE any import.
3. Lazily imports the concrete strategy module.
4. Constructs the strategy via its module-level
``create(config, tile_store, lightglue_runtime)`` factory
function (each concrete strategy module exports ``create`` as
its public entry-point; concrete constructors stay private).
5. Emits ONE INFO log ``kind="c2_5.rerank.strategy_loaded"`` with
structured fields ``{strategy, top_n}``.
Raises:
StrategyNotAvailableError: compile-time flag OFF or
concrete module not yet built (AZ-343 pending).
"""
block = _c2_5_config(config)
strategy = block.strategy
flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy)
module_info = _STRATEGY_TO_MODULE.get(strategy)
if flag_name is None or module_info is None:
# Defensive — config validation rejects unknown strategy labels
# at load (C2_5RerankConfig.__post_init__).
_LOG.error(
"c2_5.rerank.build_flag_off",
extra={"strategy": strategy, "reason": "unknown_strategy"},
)
raise StrategyNotAvailableError(
f"ReRankStrategy {strategy!r} is not buildable in this binary."
)
if not _is_build_flag_on(flag_name):
_LOG.error(
"c2_5.rerank.build_flag_off",
extra={"strategy": strategy, "flag": flag_name},
)
raise StrategyNotAvailableError(
f"BUILD_RERANK_{strategy.upper()} is OFF for this binary; "
f"cannot select strategy={strategy}."
)
module_name, class_name = module_info
try:
module = __import__(module_name, fromlist=[class_name])
except ModuleNotFoundError as exc:
raise StrategyNotAvailableError(
f"ReRankStrategy {strategy!r} is configured but its concrete impl "
f"module {module_name!r} has not been built into this binary "
"yet (AZ-343 pending)."
) from exc
create_fn = getattr(module, "create", None)
if create_fn is None:
strategy_cls = getattr(module, class_name)
instance = strategy_cls(
config,
tile_store=tile_store,
lightglue_runtime=lightglue_runtime,
)
else:
instance = create_fn(
config,
tile_store=tile_store,
lightglue_runtime=lightglue_runtime,
)
_LOG.info(
"c2_5.rerank.strategy_loaded",
extra={"strategy": strategy, "top_n": block.top_n},
)
return instance
@@ -0,0 +1,404 @@
"""AZ-342 — C2.5 ReRankStrategy Protocol + DTO + error + factory conformance.
Covers AC-1..AC-8 + AC-11 + NFRs. AC-9 (single-thread binding) and
AC-10 (LightGlueRuntime identity-share between C2.5 and C3) are
deferred per the task spec's Risk-4 escape clause — the generic
compose_root thread-binding registry and the cross-factory helper
identity assertion live with AZ-270 and the future C3 protocol task
(AZ-344). Each factory owns its own thread binding today.
"""
from __future__ import annotations
import dataclasses
import logging
import sys
import time
import types
import pytest
from gps_denied_onboard._types.rerank import RerankCandidate, RerankResult
from gps_denied_onboard.components.c2_5_rerank import (
C2_5RerankConfig,
ReRankStrategy,
RerankAllCandidatesFailedError,
RerankBackboneError,
RerankError,
)
from gps_denied_onboard.components.c2_5_rerank.config import KNOWN_STRATEGIES
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
from gps_denied_onboard.runtime_root.rerank_factory import build_rerank_strategy
_STRATEGY_MODULES: dict[str, tuple[str, str, str]] = {
"inlier_count": (
"gps_denied_onboard.components.c2_5_rerank.inlier_based_reranker",
"InlierCountReRanker",
"BUILD_RERANK_INLIER_COUNT",
),
}
# ----------------------------------------------------------------------
# Fakes that structurally satisfy the ReRankStrategy Protocol.
class _FakeTileStore:
def read_tile_pixels(self, tile_id):
raise NotImplementedError
def write_tile(self, tile_blob, metadata):
raise NotImplementedError
def tile_exists(self, tile_id):
return False
def delete_tile(self, tile_id):
return False
class _FakeLightGlueRuntime:
def descriptor_dim(self):
return 256
def match(self, features_a, features_b):
raise NotImplementedError
def match_batch(self, features_a_list, features_b_list):
raise NotImplementedError
class _FullReRankStrategy:
def __init__(self, config, *, tile_store, lightglue_runtime) -> None:
self._config = config
self._tile_store = tile_store
self._lightglue_runtime = lightglue_runtime
self._label = config.components["c2_5_rerank"].strategy
def rerank(self, frame, vpr_result, n, calibration):
return RerankResult(
frame_id=getattr(frame, "frame_id", 0),
candidates=tuple(),
reranked_at=1_000_000_000,
rerank_label=self._label,
candidates_input=0,
candidates_dropped=0,
)
class _PartialReRankStrategy:
pass
def _config_with_strategy(strategy: str = "inlier_count") -> Config:
return Config.with_blocks(c2_5_rerank=C2_5RerankConfig(strategy=strategy))
def _install_fake_strategy(strategy_label: str) -> type:
module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label]
class _FakeStrategy(_FullReRankStrategy):
pass
_FakeStrategy.__name__ = class_name
module = types.ModuleType(module_name)
setattr(module, class_name, _FakeStrategy)
sys.modules[module_name] = module
return _FakeStrategy
@pytest.fixture
def strategy_module_cleanup():
for module_name, _, _ in _STRATEGY_MODULES.values():
sys.modules.pop(module_name, None)
yield
for module_name, _, _ in _STRATEGY_MODULES.values():
sys.modules.pop(module_name, None)
# ----------------------------------------------------------------------
# AC-1: Protocol conformance.
def test_ac1_rerank_strategy_conformance_full() -> None:
instance = _FullReRankStrategy(
_config_with_strategy(),
tile_store=_FakeTileStore(),
lightglue_runtime=_FakeLightGlueRuntime(),
)
assert isinstance(instance, ReRankStrategy)
def test_ac1_rerank_strategy_conformance_partial_missing_methods() -> None:
assert not isinstance(_PartialReRankStrategy(), ReRankStrategy)
# ----------------------------------------------------------------------
# AC-2: frozen+slotted DTOs.
def _make_candidate() -> RerankCandidate:
return RerankCandidate(
tile_id=(18, 49.9, 36.3),
inlier_count=42,
descriptor_distance=0.123,
descriptor_dim=512,
tile_pixels_handle=object(),
)
def _make_result(frame_id: int = 7) -> RerankResult:
return RerankResult(
frame_id=frame_id,
candidates=(_make_candidate(),),
reranked_at=1_000_000_000,
rerank_label="inlier_count",
candidates_input=10,
candidates_dropped=9,
)
@pytest.mark.parametrize(
"dto, field_name, new_value",
[
(_make_candidate(), "inlier_count", 99),
(_make_result(), "rerank_label", "learned_reranker"),
],
)
def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None:
original_value = getattr(dto, field_name)
with pytest.raises(dataclasses.FrozenInstanceError):
setattr(dto, field_name, new_value)
assert getattr(dto, field_name) == original_value
@pytest.mark.parametrize("cls", [RerankCandidate, RerankResult])
def test_ac2_dtos_have_slots(cls) -> None:
assert hasattr(cls, "__slots__")
assert cls.__slots__
instance = _make_candidate() if cls is RerankCandidate else _make_result()
assert not hasattr(instance, "__dict__"), (
f"{cls.__name__} carries a __dict__ — slots=True is missing"
)
# ----------------------------------------------------------------------
# AC-3: factory rejects missing build flag.
def test_ac3_factory_rejects_missing_build_flag(
monkeypatch, strategy_module_cleanup, caplog
) -> None:
strategy = "inlier_count"
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.delenv(flag, raising=False)
config = _config_with_strategy(strategy)
with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_5_rerank"):
with pytest.raises(StrategyNotAvailableError) as exc_info:
build_rerank_strategy(
config,
tile_store=_FakeTileStore(),
lightglue_runtime=_FakeLightGlueRuntime(),
)
assert "BUILD_RERANK_INLIER_COUNT is OFF" in str(exc_info.value)
assert any(
r.message == "c2_5.rerank.build_flag_off" for r in caplog.records
)
def test_ac3_factory_does_not_load_module_when_flag_off(
monkeypatch, strategy_module_cleanup
) -> None:
module_name, _, flag = _STRATEGY_MODULES["inlier_count"]
monkeypatch.delenv(flag, raising=False)
config = _config_with_strategy("inlier_count")
with pytest.raises(StrategyNotAvailableError):
build_rerank_strategy(
config,
tile_store=_FakeTileStore(),
lightglue_runtime=_FakeLightGlueRuntime(),
)
assert module_name not in sys.modules
# ----------------------------------------------------------------------
# AC-4: unknown strategy rejected at config-load time.
@pytest.mark.parametrize(
"bad_label",
["INLIER_COUNT", "garbage", "", "learned_reranker"],
)
def test_ac4_unknown_strategy_rejected_at_config_load(bad_label: str) -> None:
with pytest.raises(ConfigError) as exc_info:
C2_5RerankConfig(strategy=bad_label)
msg = str(exc_info.value)
for valid in KNOWN_STRATEGIES:
assert valid in msg
# ----------------------------------------------------------------------
# AC-5: factory emits INFO log on success.
def test_ac5_factory_emits_info_log_on_success(
monkeypatch, strategy_module_cleanup, caplog
) -> None:
strategy = "inlier_count"
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
_install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c2_5_rerank"):
instance = build_rerank_strategy(
config,
tile_store=_FakeTileStore(),
lightglue_runtime=_FakeLightGlueRuntime(),
)
assert isinstance(instance, ReRankStrategy)
records = [
r for r in caplog.records if r.message == "c2_5.rerank.strategy_loaded"
]
assert len(records) == 1
record = records[0]
assert getattr(record, "strategy", None) == "inlier_count"
assert getattr(record, "top_n", None) == 3
# ----------------------------------------------------------------------
# AC-6: strategy resolution table.
def test_ac6_strategy_resolution(monkeypatch, strategy_module_cleanup) -> None:
strategy = "inlier_count"
module_name, class_name, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
fake_cls = _install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
instance = build_rerank_strategy(
config,
tile_store=_FakeTileStore(),
lightglue_runtime=_FakeLightGlueRuntime(),
)
assert isinstance(instance, fake_cls)
assert isinstance(instance, ReRankStrategy)
assert sys.modules[module_name] is not None
# ----------------------------------------------------------------------
# AC-7: error hierarchy.
@pytest.mark.parametrize(
"exc_factory",
[RerankBackboneError, RerankAllCandidatesFailedError],
)
def test_ac7_all_rerank_errors_caught_as_family(exc_factory) -> None:
with pytest.raises(RerankError):
raise exc_factory("boom")
def test_ac7_strategy_not_available_outside_family() -> None:
with pytest.raises(StrategyNotAvailableError):
try:
raise StrategyNotAvailableError("composition-time")
except RerankError:
pytest.fail(
"StrategyNotAvailableError is a composition-root error "
"and MUST NOT be in the c2.5 RerankError family"
)
# ----------------------------------------------------------------------
# AC-8: Public API re-exports.
def test_ac8_public_api_re_exports() -> None:
from gps_denied_onboard.components import c2_5_rerank
assert "ReRankStrategy" in c2_5_rerank.__all__
assert "RerankResult" in c2_5_rerank.__all__
assert "RerankCandidate" in c2_5_rerank.__all__
def test_ac8_internals_not_in_public_api() -> None:
from gps_denied_onboard.components import c2_5_rerank
# Concrete strategy must not leak into the package re-exports;
# consumers see only the Protocol.
assert "InlierCountReRanker" not in c2_5_rerank.__all__
# ----------------------------------------------------------------------
# AC-11: tile_pixels_handle opaqueness.
def test_ac11_tile_pixels_handle_opaque() -> None:
handle = object()
candidate = RerankCandidate(
tile_id=(18, 49.9, 36.3),
inlier_count=10,
descriptor_distance=0.5,
descriptor_dim=256,
tile_pixels_handle=handle,
)
assert candidate.tile_pixels_handle is handle
# ----------------------------------------------------------------------
# NFRs.
@pytest.mark.parametrize(
"exc_type",
[RerankBackboneError, RerankAllCandidatesFailedError],
)
def test_nfr_reliability_all_rerank_errors_subclass_family(exc_type) -> None:
assert issubclass(exc_type, RerankError)
def test_nfr_reliability_strategy_not_available_not_in_family() -> None:
assert not issubclass(StrategyNotAvailableError, RerankError)
def test_nfr_perf_factory_under_50ms_p99(
monkeypatch, strategy_module_cleanup
) -> None:
strategy = "inlier_count"
_, _, flag = _STRATEGY_MODULES[strategy]
monkeypatch.setenv(flag, "ON")
_install_fake_strategy(strategy)
config = _config_with_strategy(strategy)
tile_store = _FakeTileStore()
lightglue_runtime = _FakeLightGlueRuntime()
durations_ms: list[float] = []
for _ in range(100):
t0 = time.perf_counter()
build_rerank_strategy(
config, tile_store=tile_store, lightglue_runtime=lightglue_runtime
)
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p99 = durations_ms[int(0.99 * len(durations_ms))]
assert p99 <= 50.0
# ----------------------------------------------------------------------
# Surface coverage — config defaults.
def test_c2_5_config_defaults() -> None:
cfg = C2_5RerankConfig()
assert cfg.strategy == "inlier_count"
assert cfg.top_n == 3
def test_c2_5_config_top_n_validation() -> None:
with pytest.raises(ConfigError):
C2_5RerankConfig(top_n=0)
with pytest.raises(ConfigError):
C2_5RerankConfig(top_n=-3)
-9
View File
@@ -1,9 +0,0 @@
"""C2.5 Rerank smoke test — AC-9."""
def test_interface_importable() -> None:
# Assert
from gps_denied_onboard.components.c2_5_rerank import RerankResult, RerankStrategy
assert RerankStrategy is not None
assert RerankResult is not None