Files
gps-denied-onboard/tests/blackbox/test_satellite_anchor.py
T
Oleksandr Bezdieniezhnykh 5acd14b792 [AZ-234] [AZ-235] [AZ-236] [AZ-237] Add replay tests
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-05 06:24:10 +03:00

124 lines
4.0 KiB
Python

from anchor_verification import AnchorFrame, CandidateTile, GeometryGatedAnchorVerifier
from e2e.replay.harness import SatelliteCacheStub
from satellite_service import (
LocalVprIndexPackage,
LocalVprRetriever,
RelocalizationRequest,
SatelliteSyncBoundary,
VprDescriptorRecord,
)
from shared.contracts import VprCandidate
from tile_manager import GeneratedTileSyncPackage
def test_verified_anchor_includes_retrieval_matching_and_provenance_evidence() -> None:
# Arrange
retriever = LocalVprRetriever()
retriever.load_index(
LocalVprIndexPackage(
package_id="fixture-index",
records=(
VprDescriptorRecord(
chunk_id="chunk-001",
tile_id="tile-001",
descriptor=(1.0, 0.0, 0.0),
footprint={"min_lat": 48.0, "max_lat": 48.1, "min_lon": 37.0, "max_lon": 37.1},
freshness_status="fresh",
),
),
)
)
retrieval = retriever.retrieve(
RelocalizationRequest(
frame_id="frame-001",
image_ref="AD000001.jpg",
trigger_reason="cold_start",
top_k=1,
query_descriptor=(1.0, 0.0, 0.0),
)
)
keypoints = tuple((float(index), float(index % 5)) for index in range(24))
shifted_keypoints = tuple((x + 1.0, y + 1.0) for x, y in keypoints)
verifier = GeometryGatedAnchorVerifier()
# Act
verification = verifier.verify_candidate(
AnchorFrame(frame_id="frame-001", image_ref="AD000001.jpg", keypoints=keypoints),
CandidateTile(
candidate=retrieval.candidates[0],
image_ref="tile-001.cog",
keypoints=shifted_keypoints,
provenance_trusted=True,
),
)
# Assert
assert retrieval.ready is True
assert retrieval.latency_ms is not None
assert verification.decision.accepted is True
assert verification.decision.candidate_id == "chunk-001"
assert verification.decision.inliers >= 20
assert verification.decision.mean_reprojection_error_px <= 3.0
assert verification.homography is not None
assert verification.freshness_status == "fresh"
def test_unsafe_cache_or_low_texture_candidates_never_emit_trusted_anchor() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(
frame_id="frame-low-texture",
image_ref="low-texture.jpg",
usable_for_anchor=False,
keypoints=((0.0, 0.0), (1.0, 1.0), (2.0, 2.0), (3.0, 3.0)),
)
candidate = VprCandidate(
chunk_id="chunk-stale",
tile_id="tile-stale",
score=0.9,
footprint={"min_lat": 48.0, "max_lat": 48.1, "min_lon": 37.0, "max_lon": 37.1},
freshness_status="stale",
)
# Act
verification = verifier.verify_candidate(
frame,
CandidateTile(
candidate=candidate,
image_ref="tile-stale.cog",
keypoints=((0.0, 0.0), (1.0, 1.0), (2.0, 2.0), (3.0, 3.0)),
provenance_trusted=False,
),
)
# Assert
assert verification.decision.accepted is False
assert verification.decision.rejection_reason == "frame_not_usable"
def test_flight_mode_missing_cache_does_not_attempt_external_access() -> None:
# Arrange
cache_stub = SatelliteCacheStub()
sync_boundary = SatelliteSyncBoundary()
# Act
cache_response = cache_stub.query_manifest("NFT-SEC-04", "missing")
sync_result = sync_boundary.upload_generated_tiles(
GeneratedTileSyncPackage(
package_ref="generated-empty",
mission_id="mission-001",
manifest_delta=(),
sidecars=(),
),
phase="in_flight",
)
# Assert
assert cache_response["network_fetch_attempted"] is False
assert cache_response["trusted"] is False
assert int(str(cache_response["fixture_size_bytes"])) < int(
str(cache_response["storage_budget_bytes"])
)
assert sync_result.error is not None
assert sync_result.error.cause == "mid_flight_network_blocked"