"""AZ-343 — :class:`InlierCountReRanker` acceptance + NFR coverage. Covers AC-1..AC-12 from the task spec at ``_docs/02_tasks/todo/AZ-343_c2_5_inlier_count_reranker.md``. Performance NFR (C2.5-PT-01 p95 ≤ 80 ms for 10 single-pair LightGlue passes against the real TRT engine) is deferred to Step 9 / E-BBT per the task's "Excluded" section — the harness here uses test doubles that bypass real GPU work. """ from __future__ import annotations import logging from dataclasses import dataclass import numpy as np import pytest from gps_denied_onboard._types.matching import CorrespondenceSet, KeypointSet from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard._types.rerank import RerankResult from gps_denied_onboard._types.vpr import VprCandidate, VprResult from gps_denied_onboard.components.c2_5_rerank import ( C2_5RerankConfig, RerankAllCandidatesFailedError, ReRankStrategy, ) from gps_denied_onboard.components.c2_5_rerank.inlier_based_reranker import ( InlierCountReRanker, create, ) from gps_denied_onboard.components.c6_tile_cache import TilePixelHandle from gps_denied_onboard.components.c6_tile_cache.errors import TileNotFoundError from gps_denied_onboard.config.schema import Config from gps_denied_onboard.fdr_client import FdrRecord from gps_denied_onboard.helpers.feature_extractor import FeatureExtractorError from gps_denied_onboard.helpers.lightglue_runtime import LightGlueRuntimeError # ---------------------------------------------------------------------- # Test doubles @dataclass class _FakeClock: _t: int = 1_700_000_000_000_000_000 def monotonic_ns(self) -> int: self._t += 1 return self._t def time_ns(self) -> int: return self._t def sleep_until_ns(self, target_ns: int) -> None: return None class _FakeTilePixelHandle(TilePixelHandle): """Reusable :class:`TilePixelHandle` — supports multi-shot ``with`` blocks. The buffer is mutable so AC-7 can prove identity (mutation through one ``with`` block must be visible through the next). """ def __init__(self, jpeg_bytes: bytes, path): self._buf = bytearray(jpeg_bytes) self._path = path @property def filesystem_path(self): return self._path def __enter__(self) -> memoryview: return memoryview(self._buf) def __exit__(self, exc_type, exc_val, exc_tb) -> None: return None def mutate(self, new_bytes: bytes) -> None: self._buf = bytearray(new_bytes) def _synthesise_jpeg(seed: int) -> bytes: """Produce a deterministic colour JPEG keyed off ``seed``.""" import cv2 rng = np.random.default_rng(seed) image = rng.integers(0, 255, size=(32, 32, 3), dtype=np.uint8) ok, buf = cv2.imencode(".jpg", image) assert ok, "cv2.imencode failed in test fixture" return bytes(buf) class _FakeTileStore: """Returns deterministic handles per ``tile_id``; can be told to fail.""" def __init__(self): from pathlib import Path self._handles: dict[tuple, _FakeTilePixelHandle] = {} self._fail: set[tuple] = set() self._path_base = Path("/tmp/c2_5_rerank_fake") def install(self, tile_id, *, fail: bool = False, jpeg_seed: int | None = None) -> None: if fail: self._fail.add(tile_id) return if jpeg_seed is None: jpeg_seed = hash(tile_id) & 0xFFFF self._handles[tile_id] = _FakeTilePixelHandle( jpeg_bytes=_synthesise_jpeg(jpeg_seed), path=self._path_base / f"{tile_id}.jpg", ) def handle(self, tile_id) -> _FakeTilePixelHandle: return self._handles[tile_id] def read_tile_pixels(self, tile_id): if tile_id in self._fail: raise TileNotFoundError(f"fake: {tile_id} marked as failing") return self._handles[tile_id] def write_tile(self, tile_blob, metadata): raise NotImplementedError def tile_exists(self, tile_id): return tile_id in self._handles def delete_tile(self, tile_id): return self._handles.pop(tile_id, None) is not None class _FakeFeatureExtractor: """Returns a deterministic :class:`KeypointSet` per image; can fail.""" def __init__(self) -> None: self._fail_calls: set[int] = set() self._call_count = 0 def fail_on(self, call_index: int) -> None: self._fail_calls.add(call_index) def descriptor_dim(self) -> int: return 256 def extract(self, image_bgr: np.ndarray) -> KeypointSet: idx = self._call_count self._call_count += 1 if idx in self._fail_calls: raise FeatureExtractorError(f"fake extractor failing on call {idx}") return KeypointSet( keypoints=np.zeros((4, 2), dtype=np.float32), descriptors=np.zeros((4, 256), dtype=np.float32), ) class _ProgrammableLightGlue: """Returns the next pre-programmed :class:`CorrespondenceSet`; can raise.""" def __init__(self) -> None: self._calls: list[ tuple[KeypointSet, KeypointSet] ] = [] self._results: list[object] = [] # CorrespondenceSet | Exception def queue_inliers(self, count: int) -> None: self._results.append(_make_correspondence_set(count)) def queue_error(self, exc: BaseException) -> None: self._results.append(exc) def descriptor_dim(self) -> int: return 256 def match(self, features_a: KeypointSet, features_b: KeypointSet) -> CorrespondenceSet: self._calls.append((features_a, features_b)) if not self._results: raise AssertionError( "fake LightGlue ran out of programmed responses; queue more" ) result = self._results.pop(0) if isinstance(result, BaseException): raise result return result def match_batch(self, features_a_list, features_b_list): raise NotImplementedError @property def calls(self) -> list[tuple[KeypointSet, KeypointSet]]: return self._calls class _CapturingFdrClient: def __init__(self) -> None: self.records: list[FdrRecord] = [] def enqueue(self, record: FdrRecord) -> None: self.records.append(record) def _make_correspondence_set(count: int) -> CorrespondenceSet: return CorrespondenceSet( correspondences=np.zeros((count, 4), dtype=np.float32), scores=np.full((count,), 0.5, dtype=np.float32), ) def _make_frame(frame_id: int = 7) -> NavCameraFrame: from datetime import datetime, timezone image = (np.random.default_rng(frame_id).integers(0, 255, (16, 16, 3))).astype( np.uint8 ) return NavCameraFrame( frame_id=frame_id, timestamp=datetime.now(tz=timezone.utc), image=image, camera_calibration_id="cam0", ) def _make_vpr_candidate(*, tile_id, distance: float) -> VprCandidate: return VprCandidate(tile_id=tile_id, descriptor_distance=distance, descriptor_dim=256) def _make_vpr_result(*, frame_id: int, candidates: list[VprCandidate]) -> VprResult: return VprResult( frame_id=frame_id, candidates=tuple(candidates), retrieved_at=10, backbone_label="ultra_vpr", ) def _build_reranker( *, tile_store: _FakeTileStore, extractor: _FakeFeatureExtractor, lightglue: _ProgrammableLightGlue, fdr_client=None, top_n: int = 3, debug_per_frame_log: bool = False, ) -> InlierCountReRanker: config = Config.with_blocks( c2_5_rerank=C2_5RerankConfig( strategy="inlier_count", top_n=top_n, debug_per_frame_log=debug_per_frame_log, ) ) return InlierCountReRanker( config=config, tile_store=tile_store, lightglue_runtime=lightglue, feature_extractor=extractor, clock=_FakeClock(), fdr_client=fdr_client, ) def _install_k_candidates( tile_store: _FakeTileStore, *, k: int, distances: list[float] | None = None, fail_indices: set[int] | None = None, ) -> list[VprCandidate]: distances = distances or [0.1 * i for i in range(k)] fail_indices = fail_indices or set() candidates: list[VprCandidate] = [] for i in range(k): tile_id = (18, 49.0 + i * 0.001, 36.0 + i * 0.001) tile_store.install(tile_id, fail=i in fail_indices, jpeg_seed=i) candidates.append(_make_vpr_candidate(tile_id=tile_id, distance=distances[i])) return candidates # ---------------------------------------------------------------------- # Calibration fixture (the strategy ignores it for now — Protocol shape only). @pytest.fixture def calibration(): from gps_denied_onboard._types.calibration import CameraCalibration return CameraCalibration( camera_id="cam0", intrinsics_3x3=np.eye(3, dtype=np.float32), distortion=np.zeros((5,), dtype=np.float32), body_to_camera_se3=np.eye(4, dtype=np.float32), acquisition_method="synthetic", ) # ---------------------------------------------------------------------- # AC-1 — Protocol conformance. def test_ac1_isinstance_rerank_strategy(calibration) -> None: # Arrange tile_store = _FakeTileStore() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=_ProgrammableLightGlue(), ) # Assert assert isinstance(reranker, ReRankStrategy) assert hasattr(reranker, "rerank") # ---------------------------------------------------------------------- # AC-2 — top-N ordering with mixed inlier counts + ties + zeros. def test_ac2_top_n_ordering_and_tie_break(calibration) -> None: # Arrange inlier_counts = [412, 198, 287, 153, 287, 0, 65, 412, 89, 234] descriptor_distances = [0.1, 0.4, 0.2, 0.3, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] tile_store = _FakeTileStore() candidates = _install_k_candidates( tile_store, k=10, distances=descriptor_distances ) lightglue = _ProgrammableLightGlue() for count in inlier_counts: lightglue.queue_inliers(count) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, top_n=3, ) vpr_result = _make_vpr_result(frame_id=12, candidates=candidates) # Act result = reranker.rerank(_make_frame(12), vpr_result, n=3, calibration=calibration) # Assert assert len(result.candidates) == 3 assert result.candidates[0].inlier_count == 412 assert result.candidates[0].descriptor_distance == pytest.approx(0.1) assert result.candidates[1].inlier_count == 412 assert result.candidates[1].descriptor_distance == pytest.approx(0.8) assert result.candidates[2].inlier_count == 287 assert result.candidates[2].descriptor_distance == pytest.approx(0.2) # Zero-inlier candidate is dropped; candidates_dropped accounts for it. assert result.candidates_dropped >= 1 # ---------------------------------------------------------------------- # AC-3 — drop-and-continue on LightGlue failure. def test_ac3_drop_and_continue_on_backbone_error(calibration, caplog) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10) lightglue = _ProgrammableLightGlue() for i in range(10): if i == 3: lightglue.queue_error(LightGlueRuntimeError("boom")) else: lightglue.queue_inliers(100 + i) fdr = _CapturingFdrClient() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=fdr, ) vpr_result = _make_vpr_result(frame_id=21, candidates=candidates) # Act with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_5_rerank"): result = reranker.rerank( _make_frame(21), vpr_result, n=3, calibration=calibration ) # Assert assert len(result.candidates) == 3 assert result.candidates_dropped >= 1 backbone_errors = [ r for r in caplog.records if r.message == "c2_5.rerank.backbone_error" ] assert len(backbone_errors) == 1 assert getattr(backbone_errors[0], "kv", {}).get("reason") == "lightglue_forward_failed" backbone_fdr = [r for r in fdr.records if r.kind == "rerank.backbone_error"] assert len(backbone_fdr) == 1 # ---------------------------------------------------------------------- # AC-4 — drop-and-continue on TileStore failure. def test_ac4_drop_and_continue_on_tile_fetch_error(calibration, caplog) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10, fail_indices={6}) lightglue = _ProgrammableLightGlue() for _ in range(9): lightglue.queue_inliers(200) fdr = _CapturingFdrClient() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=fdr, ) vpr_result = _make_vpr_result(frame_id=42, candidates=candidates) # Act with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_5_rerank"): result = reranker.rerank( _make_frame(42), vpr_result, n=3, calibration=calibration ) # Assert assert len(result.candidates) == 3 assert result.candidates_dropped >= 1 tile_fetch_errors = [ r for r in caplog.records if r.message == "c2_5.rerank.tile_fetch_error" ] assert len(tile_fetch_errors) == 1 tile_fetch_fdr = [r for r in fdr.records if r.kind == "rerank.tile_fetch_error"] assert len(tile_fetch_fdr) == 1 # ---------------------------------------------------------------------- # AC-5 — zero survivors raises RerankAllCandidatesFailedError. def test_ac5_zero_survivors_raises(calibration, caplog) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10) lightglue = _ProgrammableLightGlue() for _ in range(10): lightglue.queue_error(LightGlueRuntimeError("everything-fails")) fdr = _CapturingFdrClient() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=fdr, ) vpr_result = _make_vpr_result(frame_id=99, candidates=candidates) # Act / Assert with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_5_rerank"): with pytest.raises(RerankAllCandidatesFailedError): reranker.rerank( _make_frame(99), vpr_result, n=3, calibration=calibration ) backbone_errors = [ r for r in caplog.records if r.message == "c2_5.rerank.backbone_error" ] assert len(backbone_errors) == 10 all_failed = [ r for r in caplog.records if r.message == "c2_5.rerank.all_failed" ] assert len(all_failed) == 1 all_failed_fdr = [r for r in fdr.records if r.kind == "rerank.all_failed"] assert len(all_failed_fdr) == 1 payload = all_failed_fdr[0].payload assert payload["candidates_input"] == 10 assert payload["candidates_dropped"] == 10 # ---------------------------------------------------------------------- # AC-6 — fewer than N survivors → WARN log + partial result. def test_ac6_fewer_than_n_survivors_warn(calibration, caplog) -> None: # Arrange — 8 fail, 2 succeed. tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10) lightglue = _ProgrammableLightGlue() # Two succeed, six fail with LightGlueRuntimeError, two return zero inliers. success_indices = {0, 5} zero_indices = {2, 8} for i in range(10): if i in success_indices: lightglue.queue_inliers(300 + i) elif i in zero_indices: lightglue.queue_inliers(0) else: lightglue.queue_error(LightGlueRuntimeError("bad")) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, ) vpr_result = _make_vpr_result(frame_id=55, candidates=candidates) # Act with caplog.at_level(logging.WARNING, logger="gps_denied_onboard.c2_5_rerank"): result = reranker.rerank( _make_frame(55), vpr_result, n=3, calibration=calibration ) # Assert assert len(result.candidates) == 2 assert result.candidates_dropped == 8 warn_records = [ r for r in caplog.records if r.message == "c2_5.rerank.fewer_than_n_survivors" ] assert len(warn_records) == 1 assert getattr(warn_records[0], "kv", {}).get("requested") == 3 assert getattr(warn_records[0], "kv", {}).get("returned") == 2 assert getattr(warn_records[0], "kv", {}).get("dropped") == 8 # ---------------------------------------------------------------------- # AC-7 — tile_pixels_handle is a reference, not a copy. def test_ac7_tile_pixels_handle_is_reference(calibration) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=3) lightglue = _ProgrammableLightGlue() for _ in range(3): lightglue.queue_inliers(500) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, top_n=3, ) vpr_result = _make_vpr_result(frame_id=1, candidates=candidates) # Act result = reranker.rerank( _make_frame(1), vpr_result, n=3, calibration=calibration ) # Assert — identity preservation against the TileStore-returned handle. for survivor in result.candidates: original = tile_store.handle(survivor.tile_id) assert survivor.tile_pixels_handle is original # ---------------------------------------------------------------------- # AC-8 — descriptor_distance carried forward unchanged. def test_ac8_descriptor_distance_preserved(calibration) -> None: # Arrange tile_store = _FakeTileStore() distance = 0.123456789 candidates = _install_k_candidates( tile_store, k=3, distances=[distance, 0.2, 0.3] ) lightglue = _ProgrammableLightGlue() for _ in range(3): lightglue.queue_inliers(700) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, ) vpr_result = _make_vpr_result(frame_id=2, candidates=candidates) # Act result = reranker.rerank( _make_frame(2), vpr_result, n=3, calibration=calibration ) # Assert top_tile = candidates[0].tile_id matching = [c for c in result.candidates if c.tile_id == top_tile] assert matching assert matching[0].descriptor_distance == distance # ---------------------------------------------------------------------- # AC-9 — deterministic same-inputs → bit-identical RerankResult.candidates. def test_ac9_deterministic_candidates(calibration) -> None: # Arrange — single reranker instance called three times so the # injected clock advances between calls (AC-9: reranked_at MUST # differ across calls but candidates MUST NOT). counts = [40, 90, 70, 10, 60, 30, 80, 20, 50, 100] distances = [0.1 * i for i in range(10)] tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10, distances=distances) lightglue = _ProgrammableLightGlue() for _ in range(3): for c in counts: lightglue.queue_inliers(c) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, ) vpr_result = _make_vpr_result(frame_id=314, candidates=candidates) # Act runs: list[RerankResult] = [ reranker.rerank(_make_frame(314), vpr_result, n=3, calibration=calibration) for _ in range(3) ] # Assert triples = [ tuple((c.tile_id, c.inlier_count, c.descriptor_distance) for c in r.candidates) for r in runs ] assert triples[0] == triples[1] == triples[2] # reranked_at differs across calls because Clock.monotonic_ns advances. assert runs[0].reranked_at != runs[1].reranked_at assert runs[1].reranked_at != runs[2].reranked_at # ---------------------------------------------------------------------- # AC-10 — composition-root wiring via the AZ-342 factory. def test_ac10_composition_root_wiring(monkeypatch, caplog) -> None: # Arrange — reuse the module already imported at file top so the # class identity matches; the factory's lazy import picks it up # from sys.modules unchanged. monkeypatch.setenv("BUILD_RERANK_INLIER_COUNT", "ON") from gps_denied_onboard.runtime_root.rerank_factory import build_rerank_strategy config = Config.with_blocks( c2_5_rerank=C2_5RerankConfig(strategy="inlier_count", top_n=3) ) tile_store = _FakeTileStore() extractor = _FakeFeatureExtractor() lightglue = _ProgrammableLightGlue() clock = _FakeClock() # Act with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c2_5_rerank"): instance = build_rerank_strategy( config, tile_store=tile_store, lightglue_runtime=lightglue, feature_extractor=extractor, clock=clock, ) # Assert assert isinstance(instance, InlierCountReRanker) assert isinstance(instance, ReRankStrategy) assert instance._lightglue_runtime is lightglue ready_logs = [r for r in caplog.records if r.message == "c2_5.rerank.ready"] assert len(ready_logs) == 1 kv = getattr(ready_logs[0], "kv", {}) assert kv.get("strategy") == "inlier_count" assert kv.get("N") == 3 assert kv.get("K") == 10 # ---------------------------------------------------------------------- # AC-11 — FDR rerank.frame_done emission per frame. def test_ac11_frame_done_fdr_emission(calibration) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10) lightglue = _ProgrammableLightGlue() successes = [412, 287, 198] + [10] * 7 # top three survive ranking. for c in successes: lightglue.queue_inliers(c) fdr = _CapturingFdrClient() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=fdr, ) vpr_result = _make_vpr_result(frame_id=77, candidates=candidates) # Act result = reranker.rerank( _make_frame(77), vpr_result, n=3, calibration=calibration ) # Assert frame_done = [r for r in fdr.records if r.kind == "rerank.frame_done"] assert len(frame_done) == 1 payload = frame_done[0].payload assert payload["frame_id"] == 77 assert payload["candidates_input"] == 10 assert payload["top_inlier_count"] == result.candidates[0].inlier_count # ---------------------------------------------------------------------- # AC-12 — single-pair LightGlue invocation count. def test_ac12_single_pair_lightglue_called_exactly_k_times(calibration) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10) lightglue = _ProgrammableLightGlue() for i in range(10): lightglue.queue_inliers(10 + i) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, ) vpr_result = _make_vpr_result(frame_id=88, candidates=candidates) # Act reranker.rerank(_make_frame(88), vpr_result, n=3, calibration=calibration) # Assert assert len(lightglue.calls) == 10 first_query = lightglue.calls[0][0] for query, _ in lightglue.calls[1:]: assert query is first_query # ---------------------------------------------------------------------- # Mixed drop-and-continue smoke (Risk-1 / Risk-2 coverage). def test_drop_and_continue_mixed_failures(calibration, caplog) -> None: # Arrange — 1 TileFetch failure, 1 LightGlue failure, 2 zero-inliers, 6 succeed. tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=10, fail_indices={2}) lightglue = _ProgrammableLightGlue() # Index 2 is dropped at tile fetch; the remaining 9 indices feed LightGlue. counts_for_remaining = [50, 75, 25, 100, 0, 80, 90, 0] # 8 entries for indices 0,1,3,4,5,6,7,8 # Index 9 hits a LightGlue error. plan: list[object] = [] rem_iter = iter(counts_for_remaining) for i in range(10): if i == 2: continue if i == 9: plan.append(LightGlueRuntimeError("backbone-died")) else: plan.append(next(rem_iter)) for item in plan: if isinstance(item, Exception): lightglue.queue_error(item) else: lightglue.queue_inliers(item) fdr = _CapturingFdrClient() reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=fdr, ) vpr_result = _make_vpr_result(frame_id=66, candidates=candidates) # Act with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c2_5_rerank"): result = reranker.rerank( _make_frame(66), vpr_result, n=3, calibration=calibration ) # Assert assert len(result.candidates) == 3 # 1 tile-fetch + 1 backbone + 2 zero-inliers = 4 drops. assert result.candidates_dropped == 4 backbone_errors = [ r for r in caplog.records if r.message == "c2_5.rerank.backbone_error" ] assert len(backbone_errors) == 1 tile_fetch_errors = [ r for r in caplog.records if r.message == "c2_5.rerank.tile_fetch_error" ] assert len(tile_fetch_errors) == 1 assert any(r.kind == "rerank.backbone_error" for r in fdr.records) assert any(r.kind == "rerank.tile_fetch_error" for r in fdr.records) assert any(r.kind == "rerank.frame_done" for r in fdr.records) # ---------------------------------------------------------------------- # Public API — ``InlierCountReRanker`` stays out of c2_5_rerank.__all__ (AC-8). def test_inlier_count_reranker_not_publicly_re_exported() -> None: # Arrange / Act from gps_denied_onboard.components import c2_5_rerank # Assert assert "InlierCountReRanker" not in c2_5_rerank.__all__ # ---------------------------------------------------------------------- # Module-level create() is the factory entry-point (Outcome step 5). def test_create_returns_inlier_count_reranker() -> None: # Arrange config = Config.with_blocks( c2_5_rerank=C2_5RerankConfig(strategy="inlier_count", top_n=3) ) # Act instance = create( config, tile_store=_FakeTileStore(), lightglue_runtime=_ProgrammableLightGlue(), feature_extractor=_FakeFeatureExtractor(), clock=_FakeClock(), ) # Assert assert isinstance(instance, InlierCountReRanker) assert isinstance(instance, ReRankStrategy) # ---------------------------------------------------------------------- # Health: no_input_candidates short-circuit also raises. def test_zero_input_candidates_short_circuits(calibration) -> None: # Arrange reranker = _build_reranker( tile_store=_FakeTileStore(), extractor=_FakeFeatureExtractor(), lightglue=_ProgrammableLightGlue(), ) vpr_result = _make_vpr_result(frame_id=5, candidates=[]) # Act / Assert with pytest.raises(RerankAllCandidatesFailedError): reranker.rerank( _make_frame(5), vpr_result, n=3, calibration=calibration ) # ---------------------------------------------------------------------- # DEBUG gating — per-frame frame_done DEBUG only fires when configured on. def test_debug_per_frame_log_gated_off_by_default(calibration, caplog) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=3) lightglue = _ProgrammableLightGlue() for _ in range(3): lightglue.queue_inliers(100) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, debug_per_frame_log=False, ) vpr_result = _make_vpr_result(frame_id=33, candidates=candidates) # Act with caplog.at_level(logging.DEBUG, logger="gps_denied_onboard.c2_5_rerank"): reranker.rerank(_make_frame(33), vpr_result, n=3, calibration=calibration) # Assert debug_records = [ r for r in caplog.records if r.message == "c2_5.rerank.frame_done" ] assert debug_records == [] def test_debug_per_frame_log_emits_when_enabled(calibration, caplog) -> None: # Arrange tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=3) lightglue = _ProgrammableLightGlue() for _ in range(3): lightglue.queue_inliers(100) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, debug_per_frame_log=True, ) vpr_result = _make_vpr_result(frame_id=34, candidates=candidates) # Act with caplog.at_level(logging.DEBUG, logger="gps_denied_onboard.c2_5_rerank"): reranker.rerank(_make_frame(34), vpr_result, n=3, calibration=calibration) # Assert debug_records = [ r for r in caplog.records if r.message == "c2_5.rerank.frame_done" ] assert len(debug_records) == 1 # ---------------------------------------------------------------------- # FDR enqueue failures must NEVER promote to drop events (observability-only). def test_fdr_enqueue_failure_is_swallowed(calibration) -> None: # Arrange class _BrokenFdr: def enqueue(self, record): raise RuntimeError("queue broken") tile_store = _FakeTileStore() candidates = _install_k_candidates(tile_store, k=3) lightglue = _ProgrammableLightGlue() for _ in range(3): lightglue.queue_inliers(100) reranker = _build_reranker( tile_store=tile_store, extractor=_FakeFeatureExtractor(), lightglue=lightglue, fdr_client=_BrokenFdr(), ) vpr_result = _make_vpr_result(frame_id=99, candidates=candidates) # Act result = reranker.rerank( _make_frame(99), vpr_result, n=3, calibration=calibration ) # Assert assert len(result.candidates) == 3