mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 18:41:13 +00:00
[AZ-230] Add local VPR retrieval boundary
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,104 @@
|
||||
from satellite_service import (
|
||||
LocalVprIndexPackage,
|
||||
LocalVprRetriever,
|
||||
RelocalizationRequest,
|
||||
VprDescriptorRecord,
|
||||
)
|
||||
|
||||
|
||||
def _record(
|
||||
chunk_id: str = "chunk-1",
|
||||
tile_id: str = "tile-1",
|
||||
descriptor: tuple[float, ...] = (1.0, 0.0, 0.0),
|
||||
freshness_status: str = "fresh",
|
||||
) -> VprDescriptorRecord:
|
||||
return VprDescriptorRecord(
|
||||
chunk_id=chunk_id,
|
||||
tile_id=tile_id,
|
||||
descriptor=descriptor,
|
||||
footprint={"min_lat": 49.0, "max_lat": 49.1, "min_lon": 36.0, "max_lon": 36.1},
|
||||
freshness_status=freshness_status,
|
||||
)
|
||||
|
||||
|
||||
def test_valid_local_index_load_reports_ready_status() -> None:
|
||||
# Arrange
|
||||
retriever = LocalVprRetriever()
|
||||
package = LocalVprIndexPackage(package_id="index-1", records=(_record(),))
|
||||
|
||||
# Act
|
||||
readiness = retriever.load_index(package)
|
||||
|
||||
# Assert
|
||||
assert readiness.ready is True
|
||||
assert readiness.engine == "cpu_faiss"
|
||||
assert readiness.loaded_records == 1
|
||||
|
||||
|
||||
def test_loaded_index_returns_bounded_candidates_with_freshness() -> None:
|
||||
# Arrange
|
||||
retriever = LocalVprRetriever()
|
||||
retriever.load_index(
|
||||
LocalVprIndexPackage(
|
||||
package_id="index-1",
|
||||
records=(
|
||||
_record(chunk_id="chunk-best", tile_id="tile-best", descriptor=(1.0, 0.0)),
|
||||
_record(
|
||||
chunk_id="chunk-stale",
|
||||
tile_id="tile-stale",
|
||||
descriptor=(0.8, 0.2),
|
||||
freshness_status="stale",
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
request = RelocalizationRequest(
|
||||
frame_id="frame-1",
|
||||
image_ref="replay/frame-1.jpg",
|
||||
trigger_reason="covariance_growth",
|
||||
top_k=1,
|
||||
query_descriptor=(1.0, 0.0),
|
||||
)
|
||||
|
||||
# Act
|
||||
result = retriever.retrieve(request)
|
||||
|
||||
# Assert
|
||||
assert result.degraded is False
|
||||
assert len(result.candidates) == 1
|
||||
assert result.candidates[0].chunk_id == "chunk-best"
|
||||
assert result.candidates[0].tile_id == "tile-best"
|
||||
assert result.candidates[0].freshness_status == "fresh"
|
||||
|
||||
|
||||
def test_missing_index_degrades_with_explicit_no_candidate_result() -> None:
|
||||
# Arrange
|
||||
retriever = LocalVprRetriever()
|
||||
request = RelocalizationRequest(
|
||||
frame_id="frame-1",
|
||||
image_ref="replay/frame-1.jpg",
|
||||
trigger_reason="cold_start",
|
||||
top_k=3,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = retriever.retrieve(request)
|
||||
|
||||
# Assert
|
||||
assert result.ready is False
|
||||
assert result.degraded is True
|
||||
assert result.candidates == ()
|
||||
assert result.error is not None
|
||||
assert result.error.cause == "index_not_loaded"
|
||||
|
||||
|
||||
def test_descriptor_fidelity_gate_rejects_large_optimized_delta() -> None:
|
||||
# Arrange
|
||||
retriever = LocalVprRetriever()
|
||||
|
||||
# Act
|
||||
report = retriever.verify_descriptor_fidelity((1.0, 0.0), (0.0, 1.0), max_l2_delta=0.1)
|
||||
|
||||
# Assert
|
||||
assert report.accepted is False
|
||||
assert report.observed_l2_delta > report.max_l2_delta
|
||||
Reference in New Issue
Block a user