mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 23:31:13 +00:00
[AZ-231] Add anchor verification gates
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1 +1,24 @@
|
||||
"""Anchor verification component."""
|
||||
|
||||
from .interfaces import AnchorVerifier, GeometryGatedAnchorVerifier
|
||||
from .types import (
|
||||
AnchorFrame,
|
||||
AnchorVerificationResult,
|
||||
GeometryGateConfig,
|
||||
MatchEvidence,
|
||||
MatcherBenchmarkReport,
|
||||
MatcherBenchmarkResult,
|
||||
MatcherProfile,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"AnchorFrame",
|
||||
"AnchorVerificationResult",
|
||||
"AnchorVerifier",
|
||||
"GeometryGateConfig",
|
||||
"GeometryGatedAnchorVerifier",
|
||||
"MatchEvidence",
|
||||
"MatcherBenchmarkReport",
|
||||
"MatcherBenchmarkResult",
|
||||
"MatcherProfile",
|
||||
]
|
||||
|
||||
@@ -1,10 +1,91 @@
|
||||
"""Public anchor verification interfaces."""
|
||||
|
||||
from typing import Any, Protocol
|
||||
from typing import Protocol
|
||||
|
||||
from shared.contracts import AnchorDecision
|
||||
|
||||
from .types import (
|
||||
AnchorFrame,
|
||||
AnchorVerificationResult,
|
||||
GeometryGateConfig,
|
||||
MatchEvidence,
|
||||
MatcherBenchmarkReport,
|
||||
MatcherBenchmarkResult,
|
||||
)
|
||||
|
||||
|
||||
class AnchorVerifier(Protocol):
|
||||
"""Verifies retrieved candidates against camera observations."""
|
||||
|
||||
def verify(self, frame: Any, candidate: Any) -> Any:
|
||||
def verify(self, frame: AnchorFrame, evidence: MatchEvidence) -> AnchorVerificationResult:
|
||||
"""Return an anchor decision for one candidate."""
|
||||
|
||||
|
||||
class GeometryGatedAnchorVerifier:
|
||||
"""Converts matcher evidence into accepted/rejected anchor decisions."""
|
||||
|
||||
def __init__(self, gates: GeometryGateConfig | None = None) -> None:
|
||||
self._gates = gates or GeometryGateConfig()
|
||||
|
||||
def verify(self, frame: AnchorFrame, evidence: MatchEvidence) -> AnchorVerificationResult:
|
||||
accepted, reason = self._classify(frame, evidence)
|
||||
decision = AnchorDecision(
|
||||
candidate_id=evidence.candidate.chunk_id,
|
||||
accepted=accepted,
|
||||
estimated_pose=self._estimated_pose(evidence) if accepted else None,
|
||||
inliers=evidence.inliers,
|
||||
mean_reprojection_error_px=evidence.mean_reprojection_error_px,
|
||||
rejection_reason=None if accepted else reason,
|
||||
)
|
||||
return AnchorVerificationResult(
|
||||
decision=decision,
|
||||
matcher_profile=evidence.matcher_profile,
|
||||
reason=reason,
|
||||
homography=evidence.homography,
|
||||
freshness_status=evidence.candidate.freshness_status,
|
||||
)
|
||||
|
||||
def benchmark(
|
||||
self, frame: AnchorFrame, evidences: tuple[MatchEvidence, ...]
|
||||
) -> MatcherBenchmarkReport:
|
||||
results: list[MatcherBenchmarkResult] = []
|
||||
for evidence in evidences:
|
||||
verification = self.verify(frame, evidence)
|
||||
results.append(
|
||||
MatcherBenchmarkResult(
|
||||
matcher_profile=evidence.matcher_profile,
|
||||
runtime_ms=evidence.runtime_ms,
|
||||
inliers=evidence.inliers,
|
||||
mean_reprojection_error_px=evidence.mean_reprojection_error_px,
|
||||
accepted=verification.decision.accepted,
|
||||
reason=verification.reason,
|
||||
)
|
||||
)
|
||||
return MatcherBenchmarkReport(
|
||||
results=tuple(results),
|
||||
)
|
||||
|
||||
def _classify(self, frame: AnchorFrame, evidence: MatchEvidence) -> tuple[bool, str]:
|
||||
if not frame.usable_for_anchor:
|
||||
return False, "frame_not_usable"
|
||||
if evidence.candidate.freshness_status != "fresh" or not evidence.provenance_trusted:
|
||||
return False, "stale_or_untrusted_provenance"
|
||||
if evidence.homography is None:
|
||||
return False, "geometry_failure"
|
||||
if evidence.inliers < self._gates.min_inliers:
|
||||
return False, "low_inliers"
|
||||
if evidence.mean_reprojection_error_px > self._gates.max_mean_reprojection_error_px:
|
||||
return False, "high_mre"
|
||||
return True, "accepted_geometry"
|
||||
|
||||
def _estimated_pose(self, evidence: MatchEvidence) -> dict[str, float]:
|
||||
footprint = evidence.candidate.footprint
|
||||
min_lat = footprint.get("min_lat", 0.0)
|
||||
max_lat = footprint.get("max_lat", min_lat)
|
||||
min_lon = footprint.get("min_lon", 0.0)
|
||||
max_lon = footprint.get("max_lon", min_lon)
|
||||
return {
|
||||
"latitude_deg": (min_lat + max_lat) / 2.0,
|
||||
"longitude_deg": (min_lon + max_lon) / 2.0,
|
||||
"mean_reprojection_error_px": evidence.mean_reprojection_error_px,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,56 @@
|
||||
"""Public anchor verification type aliases."""
|
||||
"""Public anchor verification models."""
|
||||
|
||||
from typing import Any
|
||||
from typing import Literal
|
||||
|
||||
AnchorDecisionLike = Any
|
||||
from pydantic import BaseModel, ConfigDict, Field, NonNegativeFloat, NonNegativeInt
|
||||
|
||||
from shared.contracts import AnchorDecision, VprCandidate
|
||||
|
||||
|
||||
class AnchorVerificationModel(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid", frozen=True)
|
||||
|
||||
|
||||
MatcherProfile = Literal["aliked_lightglue", "disk_lightglue", "sift_orb"]
|
||||
|
||||
|
||||
class AnchorFrame(AnchorVerificationModel):
|
||||
frame_id: str = Field(min_length=1)
|
||||
image_ref: str = Field(min_length=1)
|
||||
usable_for_anchor: bool = True
|
||||
|
||||
|
||||
class GeometryGateConfig(AnchorVerificationModel):
|
||||
min_inliers: NonNegativeInt = 20
|
||||
max_mean_reprojection_error_px: NonNegativeFloat = 3.0
|
||||
|
||||
|
||||
class MatchEvidence(AnchorVerificationModel):
|
||||
candidate: VprCandidate
|
||||
matcher_profile: MatcherProfile
|
||||
inliers: NonNegativeInt
|
||||
mean_reprojection_error_px: NonNegativeFloat
|
||||
homography: dict[str, float] | None = None
|
||||
runtime_ms: NonNegativeFloat
|
||||
provenance_trusted: bool = True
|
||||
|
||||
|
||||
class AnchorVerificationResult(AnchorVerificationModel):
|
||||
decision: AnchorDecision
|
||||
matcher_profile: MatcherProfile
|
||||
reason: str = Field(min_length=1)
|
||||
homography: dict[str, float] | None = None
|
||||
freshness_status: Literal["fresh", "stale", "rejected"]
|
||||
|
||||
|
||||
class MatcherBenchmarkResult(AnchorVerificationModel):
|
||||
matcher_profile: MatcherProfile
|
||||
runtime_ms: NonNegativeFloat
|
||||
inliers: NonNegativeInt
|
||||
mean_reprojection_error_px: NonNegativeFloat
|
||||
accepted: bool
|
||||
reason: str = Field(min_length=1)
|
||||
|
||||
|
||||
class MatcherBenchmarkReport(AnchorVerificationModel):
|
||||
results: tuple[MatcherBenchmarkResult, ...] = Field(min_length=1)
|
||||
|
||||
Reference in New Issue
Block a user