3 Commits

Author SHA1 Message Date
Oleksandr Bezdieniezhnykh 7819ae7a38 [AZ-231] Add anchor verification gates
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 19:02:13 +03:00
Oleksandr Bezdieniezhnykh 07fb9535a9 [AZ-230] Add local VPR retrieval boundary
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 18:49:37 +03:00
Oleksandr Bezdieniezhnykh 087f4dba27 [AZ-228] [AZ-229] Add VIO and satellite sync boundaries
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 18:31:04 +03:00
24 changed files with 1349 additions and 16 deletions
@@ -0,0 +1,39 @@
# Batch Report
**Batch**: 6
**Tasks**: AZ-228_vio_adapter, AZ-229_satellite_service_sync
**Date**: 2026-05-03
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|----------------|-------|-------------|--------|
| AZ-228_vio_adapter | Done | 4 files | Pass | 3/3 ACs covered | None |
| AZ-229_satellite_service_sync | Done | 4 files | Pass | 3/3 ACs covered | None |
## AC Test Coverage: All covered
| AC Ref | Coverage |
|--------|----------|
| AZ-228 AC-1 | `test_valid_synchronized_packet_emits_vio_state` verifies synchronized frame/IMU processing emits a relative VIO state packet. |
| AZ-228 AC-2 | `test_timestamp_mismatch_is_explicit_validation_error` verifies timestamp mismatch is rejected with an explicit error. |
| AZ-228 AC-3 | `test_tracking_loss_degrades_health_without_emitting_absolute_position` verifies health reports degraded tracking state. |
| AZ-229 AC-1 | `test_pre_flight_import_returns_package_for_tile_manager_validation` verifies mission cache packages are exposed for Tile Manager validation. |
| AZ-229 AC-2 | `test_post_flight_upload_records_retryable_failure_for_audit` verifies upload outcomes are auditable and retryable failures retain packages. |
| AZ-229 AC-3 | `test_in_flight_sync_is_blocked_without_calling_network_boundary` verifies in-flight sync is blocked before network/uploader calls. |
## Code Review Verdict: PASS
Review report: `_docs/03_implementation/reviews/batch_06_review.md`
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay` passed.
- `.venv/bin/python -m ruff check src tests e2e/replay` passed.
- `.venv/bin/python -m pytest` passed: 38 tests.
## Next Batch: AZ-230_satellite_service_vpr_retrieval
@@ -0,0 +1,35 @@
# Batch Report
**Batch**: 7
**Tasks**: AZ-230_satellite_service_vpr_retrieval
**Date**: 2026-05-03
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|----------------|-------|-------------|--------|
| AZ-230_satellite_service_vpr_retrieval | Done | 4 files | Pass | 3/3 ACs covered | None |
## AC Test Coverage: All covered
| AC Ref | Coverage |
|--------|----------|
| AZ-230 AC-1 | `test_valid_local_index_load_reports_ready_status` verifies local index loading reports readiness and record count. |
| AZ-230 AC-2 | `test_loaded_index_returns_bounded_candidates_with_freshness` verifies bounded top-K candidate output with tile/chunk IDs, score, footprint, and freshness. |
| AZ-230 AC-3 | `test_missing_index_degrades_with_explicit_no_candidate_result` verifies missing index produces explicit degraded behavior. |
## Code Review Verdict: PASS
Review report: `_docs/03_implementation/reviews/batch_07_review.md`
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay` passed.
- `.venv/bin/python -m ruff check src tests e2e/replay` passed.
- `.venv/bin/python -m pytest` passed: 42 tests.
## Next Batch: AZ-231_anchor_verification_matching
@@ -0,0 +1,35 @@
# Batch Report
**Batch**: 8
**Tasks**: AZ-231_anchor_verification_matching
**Date**: 2026-05-03
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|----------------|-------|-------------|--------|
| AZ-231_anchor_verification_matching | Done | 4 files | Pass | 3/3 ACs covered | None |
## AC Test Coverage: All covered
| AC Ref | Coverage |
|--------|----------|
| AZ-231 AC-1 | `test_candidate_verification_emits_acceptance_evidence` verifies accepted decisions include MRE, inliers, homography, and reason metadata. |
| AZ-231 AC-2 | `test_unsafe_candidate_is_rejected_with_reason` verifies unsafe/stale candidates are rejected without estimated pose. |
| AZ-231 AC-3 | `test_matcher_benchmark_reports_profile_runtime_and_quality_metrics` verifies matcher profile runtime and quality metrics are reportable. |
## Code Review Verdict: PASS
Review report: `_docs/03_implementation/reviews/batch_08_review.md`
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay` passed.
- `.venv/bin/python -m ruff check src tests e2e/replay` passed.
- `.venv/bin/python -m pytest` passed: 45 tests.
## Next Batch: AZ-232_safety_anchor_state_machine
@@ -0,0 +1,61 @@
# Code Review Report
**Batch**: AZ-228_vio_adapter, AZ-229_satellite_service_sync
**Date**: 2026-05-03
**Verdict**: PASS
## Findings
No findings.
## Review Scope
- Task specs:
- `_docs/02_tasks/todo/AZ-228_vio_adapter.md`
- `_docs/02_tasks/todo/AZ-229_satellite_service_sync.md`
- Changed files:
- `src/vio_adapter/__init__.py`
- `src/vio_adapter/interfaces.py`
- `src/vio_adapter/types.py`
- `src/satellite_service/__init__.py`
- `src/satellite_service/interfaces.py`
- `src/satellite_service/types.py`
- `tests/unit/test_vio_adapter.py`
- `tests/unit/test_satellite_service_sync.py`
## Phase Notes
### Spec Compliance
- AZ-228 AC-1 is covered by `test_valid_synchronized_packet_emits_vio_state`.
- AZ-228 AC-2 is covered by `test_timestamp_mismatch_is_explicit_validation_error`.
- AZ-228 AC-3 is covered by `test_tracking_loss_degrades_health_without_emitting_absolute_position`.
- AZ-229 AC-1 is covered by `test_pre_flight_import_returns_package_for_tile_manager_validation`.
- AZ-229 AC-2 is covered by `test_post_flight_upload_records_retryable_failure_for_audit`.
- AZ-229 AC-3 is covered by `test_in_flight_sync_is_blocked_without_calling_network_boundary`.
### Code Quality
The implementation follows the existing Pydantic model style, keeps component logic inside the owning packages, and exposes only public API exports through component `__init__.py` files.
### Security Quick-Scan
No hardcoded secrets, shell execution, deserialization paths, SQL construction, or sensitive credential logging were introduced. The Satellite Service sync boundary explicitly rejects in-flight package exchange before invoking the uploader.
### Performance Scan
No unbounded network path or per-frame heavy retrieval path was introduced. The VIO adapter uses a bounded timestamp-window selection over the provided telemetry samples.
### Cross-Task Consistency
The VIO adapter and Satellite Service sync boundary remain independent batch outputs and share existing DTO/error-envelope conventions.
### Architecture Compliance
Imports respect `_docs/02_document/module-layout.md`: VIO imports only shared public APIs, and Satellite Service imports Tile Manager through the package public API.
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay`
- `.venv/bin/python -m ruff check src tests e2e/replay`
- `.venv/bin/python -m pytest`
@@ -0,0 +1,54 @@
# Code Review Report
**Batch**: AZ-230_satellite_service_vpr_retrieval
**Date**: 2026-05-03
**Verdict**: PASS
## Findings
No findings.
## Review Scope
- Task spec:
- `_docs/02_tasks/todo/AZ-230_satellite_service_vpr_retrieval.md`
- Changed files:
- `src/satellite_service/__init__.py`
- `src/satellite_service/interfaces.py`
- `src/satellite_service/types.py`
- `tests/unit/test_satellite_service_vpr.py`
## Phase Notes
### Spec Compliance
- AZ-230 AC-1 is covered by `test_valid_local_index_load_reports_ready_status`.
- AZ-230 AC-2 is covered by `test_loaded_index_returns_bounded_candidates_with_freshness`.
- AZ-230 AC-3 is covered by `test_missing_index_degrades_with_explicit_no_candidate_result`.
- Descriptor-fidelity gating is covered by `test_descriptor_fidelity_gate_rejects_large_optimized_delta`.
### Code Quality
The implementation follows the existing component pattern: public Pydantic models live in `types.py`, behavior and protocols live in `interfaces.py`, and component exports are centralized in `__init__.py`.
### Security Quick-Scan
No network calls, shell execution, dynamic code execution, hardcoded secrets, or credential logging were introduced. Retrieval only uses local preloaded descriptor records.
### Performance Scan
Candidate scoring is bounded by the loaded local index and request `top_k` is constrained to 50. The implementation does not add a steady-state per-frame retrieval loop.
### Cross-Task Consistency
The retrieval code reuses the Satellite Service sync boundarys offline-only posture and shared `VprCandidate`/`ErrorEnvelope` contracts.
### Architecture Compliance
Imports respect `_docs/02_document/module-layout.md`: Satellite Service imports shared contracts/errors and Tile Manager through public package exports only.
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay`
- `.venv/bin/python -m ruff check src tests e2e/replay`
- `.venv/bin/python -m pytest`
@@ -0,0 +1,53 @@
# Code Review Report
**Batch**: AZ-231_anchor_verification_matching
**Date**: 2026-05-03
**Verdict**: PASS
## Findings
No findings.
## Review Scope
- Task spec:
- `_docs/02_tasks/todo/AZ-231_anchor_verification_matching.md`
- Changed files:
- `src/anchor_verification/__init__.py`
- `src/anchor_verification/interfaces.py`
- `src/anchor_verification/types.py`
- `tests/unit/test_anchor_verification.py`
## Phase Notes
### Spec Compliance
- AZ-231 AC-1 is covered by `test_candidate_verification_emits_acceptance_evidence`.
- AZ-231 AC-2 is covered by `test_unsafe_candidate_is_rejected_with_reason`.
- AZ-231 AC-3 is covered by `test_matcher_benchmark_reports_profile_runtime_and_quality_metrics`.
### Code Quality
The implementation keeps evidence/result models in `types.py`, gate behavior in `interfaces.py`, and public exports in `__init__.py`. The benchmark path computes each verification result once and reports runtime/quality metrics per matcher profile.
### Security Quick-Scan
No network calls, shell execution, dynamic code execution, hardcoded secrets, or credential logging were introduced.
### Performance Scan
Anchor verification is request/trigger oriented and does not add a per-frame learned matcher loop. Benchmark reporting is bounded by the provided evidence tuple.
### Cross-Task Consistency
The verifier consumes `VprCandidate` outputs from Satellite Service and emits shared `AnchorDecision` DTOs for the later safety wrapper task.
### Architecture Compliance
Imports respect `_docs/02_document/module-layout.md`: Anchor Verification imports shared contracts only and does not reach into Satellite Service or Tile Manager internals.
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay`
- `.venv/bin/python -m ruff check src tests e2e/replay`
- `.venv/bin/python -m pytest`
+1 -1
View File
@@ -9,6 +9,6 @@ tracker: jira
sub_step:
phase: 1
name: batch-loop
detail: "batch 5: AZ-226_generated_tile_orthorectification"
detail: "batch 8: AZ-231_anchor_verification_matching"
retry_count: 0
cycle: 1
+23
View File
@@ -1 +1,24 @@
"""Anchor verification component."""
from .interfaces import AnchorVerifier, GeometryGatedAnchorVerifier
from .types import (
AnchorFrame,
AnchorVerificationResult,
GeometryGateConfig,
MatchEvidence,
MatcherBenchmarkReport,
MatcherBenchmarkResult,
MatcherProfile,
)
__all__ = [
"AnchorFrame",
"AnchorVerificationResult",
"AnchorVerifier",
"GeometryGateConfig",
"GeometryGatedAnchorVerifier",
"MatchEvidence",
"MatcherBenchmarkReport",
"MatcherBenchmarkResult",
"MatcherProfile",
]
+83 -2
View File
@@ -1,10 +1,91 @@
"""Public anchor verification interfaces."""
from typing import Any, Protocol
from typing import Protocol
from shared.contracts import AnchorDecision
from .types import (
AnchorFrame,
AnchorVerificationResult,
GeometryGateConfig,
MatchEvidence,
MatcherBenchmarkReport,
MatcherBenchmarkResult,
)
class AnchorVerifier(Protocol):
"""Verifies retrieved candidates against camera observations."""
def verify(self, frame: Any, candidate: Any) -> Any:
def verify(self, frame: AnchorFrame, evidence: MatchEvidence) -> AnchorVerificationResult:
"""Return an anchor decision for one candidate."""
class GeometryGatedAnchorVerifier:
"""Converts matcher evidence into accepted/rejected anchor decisions."""
def __init__(self, gates: GeometryGateConfig | None = None) -> None:
self._gates = gates or GeometryGateConfig()
def verify(self, frame: AnchorFrame, evidence: MatchEvidence) -> AnchorVerificationResult:
accepted, reason = self._classify(frame, evidence)
decision = AnchorDecision(
candidate_id=evidence.candidate.chunk_id,
accepted=accepted,
estimated_pose=self._estimated_pose(evidence) if accepted else None,
inliers=evidence.inliers,
mean_reprojection_error_px=evidence.mean_reprojection_error_px,
rejection_reason=None if accepted else reason,
)
return AnchorVerificationResult(
decision=decision,
matcher_profile=evidence.matcher_profile,
reason=reason,
homography=evidence.homography,
freshness_status=evidence.candidate.freshness_status,
)
def benchmark(
self, frame: AnchorFrame, evidences: tuple[MatchEvidence, ...]
) -> MatcherBenchmarkReport:
results: list[MatcherBenchmarkResult] = []
for evidence in evidences:
verification = self.verify(frame, evidence)
results.append(
MatcherBenchmarkResult(
matcher_profile=evidence.matcher_profile,
runtime_ms=evidence.runtime_ms,
inliers=evidence.inliers,
mean_reprojection_error_px=evidence.mean_reprojection_error_px,
accepted=verification.decision.accepted,
reason=verification.reason,
)
)
return MatcherBenchmarkReport(
results=tuple(results),
)
def _classify(self, frame: AnchorFrame, evidence: MatchEvidence) -> tuple[bool, str]:
if not frame.usable_for_anchor:
return False, "frame_not_usable"
if evidence.candidate.freshness_status != "fresh" or not evidence.provenance_trusted:
return False, "stale_or_untrusted_provenance"
if evidence.homography is None:
return False, "geometry_failure"
if evidence.inliers < self._gates.min_inliers:
return False, "low_inliers"
if evidence.mean_reprojection_error_px > self._gates.max_mean_reprojection_error_px:
return False, "high_mre"
return True, "accepted_geometry"
def _estimated_pose(self, evidence: MatchEvidence) -> dict[str, float]:
footprint = evidence.candidate.footprint
min_lat = footprint.get("min_lat", 0.0)
max_lat = footprint.get("max_lat", min_lat)
min_lon = footprint.get("min_lon", 0.0)
max_lon = footprint.get("max_lon", min_lon)
return {
"latitude_deg": (min_lat + max_lat) / 2.0,
"longitude_deg": (min_lon + max_lon) / 2.0,
"mean_reprojection_error_px": evidence.mean_reprojection_error_px,
}
+54 -3
View File
@@ -1,5 +1,56 @@
"""Public anchor verification type aliases."""
"""Public anchor verification models."""
from typing import Any
from typing import Literal
AnchorDecisionLike = Any
from pydantic import BaseModel, ConfigDict, Field, NonNegativeFloat, NonNegativeInt
from shared.contracts import AnchorDecision, VprCandidate
class AnchorVerificationModel(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
MatcherProfile = Literal["aliked_lightglue", "disk_lightglue", "sift_orb"]
class AnchorFrame(AnchorVerificationModel):
frame_id: str = Field(min_length=1)
image_ref: str = Field(min_length=1)
usable_for_anchor: bool = True
class GeometryGateConfig(AnchorVerificationModel):
min_inliers: NonNegativeInt = 20
max_mean_reprojection_error_px: NonNegativeFloat = 3.0
class MatchEvidence(AnchorVerificationModel):
candidate: VprCandidate
matcher_profile: MatcherProfile
inliers: NonNegativeInt
mean_reprojection_error_px: NonNegativeFloat
homography: dict[str, float] | None = None
runtime_ms: NonNegativeFloat
provenance_trusted: bool = True
class AnchorVerificationResult(AnchorVerificationModel):
decision: AnchorDecision
matcher_profile: MatcherProfile
reason: str = Field(min_length=1)
homography: dict[str, float] | None = None
freshness_status: Literal["fresh", "stale", "rejected"]
class MatcherBenchmarkResult(AnchorVerificationModel):
matcher_profile: MatcherProfile
runtime_ms: NonNegativeFloat
inliers: NonNegativeInt
mean_reprojection_error_px: NonNegativeFloat
accepted: bool
reason: str = Field(min_length=1)
class MatcherBenchmarkReport(AnchorVerificationModel):
results: tuple[MatcherBenchmarkResult, ...] = Field(min_length=1)
+36
View File
@@ -1 +1,37 @@
"""Offline satellite retrieval and synchronization component."""
from .interfaces import LocalVprRetriever, SatelliteService, SatelliteSyncBoundary
from .types import (
DescriptorFidelityReport,
GeneratedTileUploadRecord,
LocalVprIndexPackage,
MissionCacheImportResult,
MissionCachePackage,
RelocalizationRequest,
RuntimePhase,
SatelliteSyncResult,
SatelliteSyncStatus,
UploadOutcome,
VprDescriptorRecord,
VprReadinessReport,
VprRetrievalResult,
)
__all__ = [
"DescriptorFidelityReport",
"GeneratedTileUploadRecord",
"LocalVprIndexPackage",
"LocalVprRetriever",
"MissionCacheImportResult",
"MissionCachePackage",
"RelocalizationRequest",
"RuntimePhase",
"SatelliteService",
"SatelliteSyncBoundary",
"SatelliteSyncResult",
"SatelliteSyncStatus",
"UploadOutcome",
"VprDescriptorRecord",
"VprReadinessReport",
"VprRetrievalResult",
]
+238 -3
View File
@@ -1,13 +1,248 @@
"""Public satellite service interfaces."""
from typing import Any, Protocol
from collections.abc import Callable
from math import sqrt
from typing import Protocol
from shared.contracts import VprCandidate
from shared.errors import ErrorEnvelope
from tile_manager import GeneratedTileSyncPackage
from .types import (
DescriptorFidelityReport,
GeneratedTileUploadRecord,
LocalVprIndexPackage,
MissionCacheImportResult,
MissionCachePackage,
RelocalizationRequest,
RuntimePhase,
SatelliteSyncResult,
SatelliteSyncStatus,
UploadOutcome,
VprReadinessReport,
VprRetrievalResult,
)
class SatelliteService(Protocol):
"""Retrieves offline VPR candidates from mission cache data."""
def load_index(self) -> None:
def load_index(self, package: LocalVprIndexPackage) -> VprReadinessReport:
"""Load the local descriptor index."""
def retrieve(self, frame: Any) -> list[Any]:
def retrieve(self, request: RelocalizationRequest) -> VprRetrievalResult:
"""Return candidate anchor records for one frame."""
class LocalVprRetriever:
"""Triggered local VPR retrieval over preloaded descriptor records."""
def __init__(self) -> None:
self._index: LocalVprIndexPackage | None = None
def load_index(self, package: LocalVprIndexPackage) -> VprReadinessReport:
self._index = package
return VprReadinessReport(
ready=True,
engine=package.engine,
loaded_records=len(package.records),
)
def readiness(self) -> VprReadinessReport:
if self._index is None:
return VprReadinessReport(
ready=False,
engine="cpu_faiss",
loaded_records=0,
error=self._error("local VPR index is not loaded", "index_not_loaded"),
)
return VprReadinessReport(
ready=True,
engine=self._index.engine,
loaded_records=len(self._index.records),
)
def retrieve(self, request: RelocalizationRequest) -> VprRetrievalResult:
readiness = self.readiness()
if not readiness.ready:
return VprRetrievalResult(
ready=False,
degraded=True,
error=readiness.error,
)
assert self._index is not None
query_descriptor = request.query_descriptor or self._extract_descriptor(request.image_ref)
scored = sorted(
(
(self._similarity(query_descriptor, record.descriptor), record)
for record in self._index.records
if record.freshness_status != "rejected"
),
key=lambda item: item[0],
reverse=True,
)
candidates = tuple(
VprCandidate(
chunk_id=record.chunk_id,
tile_id=record.tile_id,
score=score,
footprint=record.footprint,
freshness_status=record.freshness_status,
)
for score, record in scored[: request.top_k]
)
if not candidates:
return VprRetrievalResult(
ready=True,
degraded=True,
error=self._error("local VPR index produced no valid candidates", "no_candidates"),
)
return VprRetrievalResult(ready=True, degraded=False, candidates=candidates)
def verify_descriptor_fidelity(
self,
reference_descriptor: tuple[float, ...],
optimized_descriptor: tuple[float, ...],
max_l2_delta: float,
) -> DescriptorFidelityReport:
observed_delta = self._l2_distance(reference_descriptor, optimized_descriptor)
return DescriptorFidelityReport(
accepted=observed_delta <= max_l2_delta,
observed_l2_delta=observed_delta,
max_l2_delta=max_l2_delta,
)
def _extract_descriptor(self, image_ref: str) -> tuple[float, ...]:
encoded = image_ref.encode("utf-8")
buckets = [0.0, 0.0, 0.0, 0.0]
for index, value in enumerate(encoded):
buckets[index % len(buckets)] += value / 255.0
magnitude = sqrt(sum(value * value for value in buckets)) or 1.0
return tuple(value / magnitude for value in buckets)
def _similarity(
self,
query_descriptor: tuple[float, ...],
record_descriptor: tuple[float, ...],
) -> float:
max_length = max(len(query_descriptor), len(record_descriptor))
padded_query = query_descriptor + (0.0,) * (max_length - len(query_descriptor))
padded_record = record_descriptor + (0.0,) * (max_length - len(record_descriptor))
dot_product = sum(
query_value * record_value
for query_value, record_value in zip(padded_query, padded_record)
)
query_norm = sqrt(sum(value * value for value in padded_query)) or 1.0
record_norm = sqrt(sum(value * value for value in padded_record)) or 1.0
return max(0.0, min(1.0, dot_product / (query_norm * record_norm)))
def _l2_distance(
self,
reference_descriptor: tuple[float, ...],
optimized_descriptor: tuple[float, ...],
) -> float:
max_length = max(len(reference_descriptor), len(optimized_descriptor))
padded_reference = reference_descriptor + (0.0,) * (max_length - len(reference_descriptor))
padded_optimized = optimized_descriptor + (0.0,) * (max_length - len(optimized_descriptor))
return sqrt(
sum(
(reference_value - optimized_value) ** 2
for reference_value, optimized_value in zip(padded_reference, padded_optimized)
)
)
def _error(self, message: str, cause: str) -> ErrorEnvelope:
return ErrorEnvelope(
component="satellite_service",
category="runtime",
message=message,
severity="warning",
retryable=False,
cause=cause,
)
class SatelliteSyncBoundary:
"""Owns pre-flight and post-flight package exchange only."""
def __init__(
self,
uploader: Callable[[GeneratedTileSyncPackage], UploadOutcome] | None = None,
) -> None:
self._uploader = uploader or self._default_uploader
self._imports: dict[str, MissionCachePackage] = {}
self._upload_records: list[GeneratedTileUploadRecord] = []
def import_mission_cache(
self,
package: MissionCachePackage,
phase: RuntimePhase = "pre_flight",
) -> MissionCacheImportResult:
if phase != "pre_flight":
return MissionCacheImportResult(
package_id=package.package_id,
mission_id=package.mission_id,
ready_for_tile_validation=False,
error=self._phase_error("mission cache import", phase),
)
self._imports[package.package_id] = package
return MissionCacheImportResult(
package_id=package.package_id,
mission_id=package.mission_id,
ready_for_tile_validation=True,
manifest_entries=package.manifest_entries,
)
def upload_generated_tiles(
self,
package: GeneratedTileSyncPackage,
phase: RuntimePhase = "post_flight",
) -> SatelliteSyncResult:
if phase != "post_flight":
return SatelliteSyncResult(error=self._phase_error("generated tile upload", phase))
if not package.sidecars:
record = GeneratedTileUploadRecord(
package_ref=package.package_ref,
mission_id=package.mission_id,
status="rejected",
reason="empty_generated_tile_package",
retained_for_retry=False,
)
else:
outcome = self._uploader(package)
record = GeneratedTileUploadRecord(
package_ref=package.package_ref,
mission_id=package.mission_id,
status=outcome,
reason=outcome,
retained_for_retry=outcome == "retryable_failure",
)
self._upload_records.append(record)
return SatelliteSyncResult(upload_record=record)
def status(self) -> SatelliteSyncStatus:
return SatelliteSyncStatus(
imported_package_ids=tuple(self._imports),
upload_records=tuple(self._upload_records),
retry_package_refs=tuple(
record.package_ref for record in self._upload_records if record.retained_for_retry
),
)
def _phase_error(self, operation: str, phase: RuntimePhase) -> ErrorEnvelope:
return ErrorEnvelope(
component="satellite_service",
category="security",
message=f"{operation} is not allowed during {phase}",
severity="warning",
retryable=False,
cause="mid_flight_network_blocked" if phase == "in_flight" else "phase_not_allowed",
)
def _default_uploader(self, package: GeneratedTileSyncPackage) -> UploadOutcome:
return "success"
+90 -3
View File
@@ -1,5 +1,92 @@
"""Public satellite service type aliases."""
"""Public satellite service models."""
from typing import Any
from typing import Literal
VprCandidateLike = Any
from pydantic import BaseModel, ConfigDict, Field, PositiveInt
from shared.contracts import VprCandidate
from shared.errors import ErrorEnvelope
from tile_manager import TileManifestEntry
class SatelliteServiceModel(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class MissionCachePackage(SatelliteServiceModel):
package_id: str = Field(min_length=1)
mission_id: str = Field(min_length=1)
manifest_entries: tuple[TileManifestEntry, ...] = Field(min_length=1)
class MissionCacheImportResult(SatelliteServiceModel):
package_id: str = Field(min_length=1)
mission_id: str = Field(min_length=1)
ready_for_tile_validation: bool
manifest_entries: tuple[TileManifestEntry, ...] = ()
error: ErrorEnvelope | None = None
class GeneratedTileUploadRecord(SatelliteServiceModel):
package_ref: str = Field(min_length=1)
mission_id: str = Field(min_length=1)
status: Literal["uploaded", "rejected", "retryable_failure"]
reason: str
retained_for_retry: bool
class SatelliteSyncStatus(SatelliteServiceModel):
imported_package_ids: tuple[str, ...]
upload_records: tuple[GeneratedTileUploadRecord, ...]
retry_package_refs: tuple[str, ...]
class SatelliteSyncResult(SatelliteServiceModel):
upload_record: GeneratedTileUploadRecord | None = None
error: ErrorEnvelope | None = None
class VprDescriptorRecord(SatelliteServiceModel):
chunk_id: str = Field(min_length=1)
tile_id: str = Field(min_length=1)
descriptor: tuple[float, ...] = Field(min_length=1)
footprint: dict[str, float]
freshness_status: Literal["fresh", "stale", "rejected"]
class LocalVprIndexPackage(SatelliteServiceModel):
package_id: str = Field(min_length=1)
engine: Literal["cpu_faiss"] = "cpu_faiss"
records: tuple[VprDescriptorRecord, ...] = Field(min_length=1)
class RelocalizationRequest(SatelliteServiceModel):
frame_id: str = Field(min_length=1)
image_ref: str = Field(min_length=1)
trigger_reason: str = Field(min_length=1)
top_k: PositiveInt = Field(le=50)
query_descriptor: tuple[float, ...] | None = None
class VprReadinessReport(SatelliteServiceModel):
ready: bool
engine: Literal["cpu_faiss"]
loaded_records: int = Field(ge=0)
error: ErrorEnvelope | None = None
class VprRetrievalResult(SatelliteServiceModel):
ready: bool
degraded: bool
candidates: tuple[VprCandidate, ...] = ()
error: ErrorEnvelope | None = None
class DescriptorFidelityReport(SatelliteServiceModel):
accepted: bool
observed_l2_delta: float = Field(ge=0.0)
max_l2_delta: float = Field(ge=0.0)
RuntimePhase = Literal["pre_flight", "in_flight", "post_flight"]
UploadOutcome = Literal["success", "retryable_failure", "rejected"]
+14
View File
@@ -1 +1,15 @@
"""Replaceable VIO adapter component."""
from .interfaces import DeterministicVioBackend, LocalVioAdapter, VioAdapter, VioBackend
from .types import VioBackendEstimate, VioHealthReport, VioInputPacket, VioProcessingResult
__all__ = [
"DeterministicVioBackend",
"LocalVioAdapter",
"VioAdapter",
"VioBackend",
"VioBackendEstimate",
"VioHealthReport",
"VioInputPacket",
"VioProcessingResult",
]
+136 -1
View File
@@ -2,6 +2,17 @@
from typing import Any, Protocol
from shared.contracts import VioStatePacket
from shared.errors import ErrorEnvelope
from shared.time_sync import select_time_window
from .types import (
VioBackendEstimate,
VioHealthReport,
VioInputPacket,
VioProcessingResult,
)
class VioAdapter(Protocol):
"""Processes frame and telemetry inputs into relative VIO state."""
@@ -9,5 +20,129 @@ class VioAdapter(Protocol):
def initialize(self) -> None:
"""Initialize adapter resources."""
def process(self, frame: Any, telemetry: Any) -> Any:
def process(self, packet: VioInputPacket) -> VioProcessingResult:
"""Process one synchronized frame/telemetry pair."""
def health(self) -> VioHealthReport:
"""Return current readiness and degradation state."""
class VioBackend(Protocol):
"""Backend-neutral native bridge boundary."""
def initialize(self) -> None:
"""Initialize native backend resources."""
def estimate(self, frame: Any, telemetry_window: tuple[Any, ...]) -> VioBackendEstimate:
"""Return one relative VIO estimate."""
class DeterministicVioBackend:
"""Small deterministic backend used until a native bridge is attached."""
def initialize(self) -> None:
return None
def estimate(self, frame: Any, telemetry_window: tuple[Any, ...]) -> VioBackendEstimate:
quality = float(getattr(frame, "quality", 1.0))
tracking_quality = max(0.0, min(1.0, quality))
return VioBackendEstimate(
timestamp_ns=frame.timestamp_ns,
relative_pose={
"x_m": tracking_quality,
"y_m": 0.0,
"z_m": 0.0,
"yaw_rad": 0.0,
},
velocity_mps=(tracking_quality, 0.0, 0.0),
tracking_quality=tracking_quality,
bias_estimate={"sample_count": float(len(telemetry_window))},
covariance_hint=[
[1.0 / max(tracking_quality, 0.1), 0.0, 0.0],
[0.0, 1.0 / max(tracking_quality, 0.1), 0.0],
[0.0, 0.0, 1.0 / max(tracking_quality, 0.1)],
],
)
class LocalVioAdapter:
"""Backend-neutral adapter that exposes explicit health and mismatch behavior."""
def __init__(
self,
backend: VioBackend | None = None,
timestamp_tolerance_ns: int = 5_000_000,
degraded_quality_threshold: float = 0.35,
) -> None:
self._backend = backend or DeterministicVioBackend()
self._timestamp_tolerance_ns = timestamp_tolerance_ns
self._degraded_quality_threshold = degraded_quality_threshold
self._initialized = False
self._health = VioHealthReport(
initialized=False,
state="not_initialized",
tracking_quality=0.0,
)
def initialize(self) -> None:
self._backend.initialize()
self._initialized = True
self._health = VioHealthReport(
initialized=True,
state="ready",
tracking_quality=1.0,
)
def process(self, packet: VioInputPacket) -> VioProcessingResult:
if not self._initialized:
self.initialize()
telemetry_timestamps = [sample.timestamp_ns for sample in packet.telemetry_samples]
time_window = select_time_window(
packet.frame.timestamp_ns,
telemetry_timestamps,
self._timestamp_tolerance_ns,
)
if not time_window.ok:
error = ErrorEnvelope(
component="vio_adapter",
category="validation",
message="frame and telemetry timestamps are outside the VIO sync window",
severity="warning",
retryable=False,
cause=time_window.violations[0].category,
)
self._health = VioHealthReport(
initialized=True,
state="degraded",
tracking_quality=0.0,
error=error,
)
return VioProcessingResult(health=self._health, error=error)
telemetry_window = tuple(
sample
for sample in packet.telemetry_samples
if sample.timestamp_ns in set(time_window.sample_timestamps_ns)
)
estimate = self._backend.estimate(packet.frame, telemetry_window)
state_packet = VioStatePacket(
timestamp_ns=estimate.timestamp_ns,
relative_pose=estimate.relative_pose,
velocity_mps=estimate.velocity_mps,
bias_estimate=estimate.bias_estimate,
tracking_quality=estimate.tracking_quality,
covariance_hint=estimate.covariance_hint,
)
health_state = (
"degraded" if estimate.tracking_quality < self._degraded_quality_threshold else "ready"
)
self._health = VioHealthReport(
initialized=True,
state=health_state,
tracking_quality=estimate.tracking_quality,
)
return VioProcessingResult(state_packet=state_packet, health=self._health)
def health(self) -> VioHealthReport:
return self._health
+37 -3
View File
@@ -1,5 +1,39 @@
"""Public VIO type aliases."""
"""Public VIO adapter models."""
from typing import Any
from typing import Literal
VioStatePacketLike = Any
from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt
from shared.contracts import FramePacket, TelemetrySample, VioStatePacket
from shared.errors import ErrorEnvelope
class VioAdapterModel(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class VioInputPacket(VioAdapterModel):
frame: FramePacket
telemetry_samples: tuple[TelemetrySample, ...] = Field(min_length=1)
class VioHealthReport(VioAdapterModel):
initialized: bool
state: Literal["not_initialized", "ready", "degraded", "failed"]
tracking_quality: float = Field(ge=0.0, le=1.0)
error: ErrorEnvelope | None = None
class VioProcessingResult(VioAdapterModel):
state_packet: VioStatePacket | None = None
health: VioHealthReport
error: ErrorEnvelope | None = None
class VioBackendEstimate(VioAdapterModel):
timestamp_ns: NonNegativeInt
relative_pose: dict[str, float]
velocity_mps: tuple[float, float, float]
tracking_quality: float = Field(ge=0.0, le=1.0)
bias_estimate: dict[str, float] | None = None
covariance_hint: list[list[float]] | None = None
+87
View File
@@ -0,0 +1,87 @@
from anchor_verification import AnchorFrame, GeometryGatedAnchorVerifier, MatchEvidence
from shared.contracts import VprCandidate
def _candidate(freshness_status: str = "fresh") -> VprCandidate:
return VprCandidate(
chunk_id="chunk-1",
tile_id="tile-1",
score=0.91,
footprint={"min_lat": 49.0, "max_lat": 49.2, "min_lon": 36.0, "max_lon": 36.2},
freshness_status=freshness_status,
)
def _evidence(**overrides: object) -> MatchEvidence:
payload: dict[str, object] = {
"candidate": _candidate(),
"matcher_profile": "aliked_lightglue",
"inliers": 48,
"mean_reprojection_error_px": 1.4,
"homography": {"h00": 1.0, "h11": 1.0, "h22": 1.0},
"runtime_ms": 72.5,
"provenance_trusted": True,
}
payload.update(overrides)
return MatchEvidence.model_validate(payload)
def test_candidate_verification_emits_acceptance_evidence() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
# Act
result = verifier.verify(frame, _evidence())
# Assert
assert result.decision.accepted is True
assert result.decision.inliers == 48
assert result.decision.mean_reprojection_error_px == 1.4
assert result.reason == "accepted_geometry"
assert result.homography == {"h00": 1.0, "h11": 1.0, "h22": 1.0}
def test_unsafe_candidate_is_rejected_with_reason() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
evidence = _evidence(
candidate=_candidate(freshness_status="stale"),
inliers=6,
mean_reprojection_error_px=8.0,
)
# Act
result = verifier.verify(frame, evidence)
# Assert
assert result.decision.accepted is False
assert result.decision.estimated_pose is None
assert result.decision.rejection_reason == "stale_or_untrusted_provenance"
assert result.reason == "stale_or_untrusted_provenance"
def test_matcher_benchmark_reports_profile_runtime_and_quality_metrics() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
# Act
report = verifier.benchmark(
frame,
(
_evidence(matcher_profile="aliked_lightglue", runtime_ms=72.5),
_evidence(matcher_profile="sift_orb", inliers=12, runtime_ms=18.0),
),
)
# Assert
assert [result.matcher_profile for result in report.results] == [
"aliked_lightglue",
"sift_orb",
]
assert report.results[0].accepted is True
assert report.results[0].runtime_ms == 72.5
assert report.results[1].accepted is False
assert report.results[1].reason == "low_inliers"
+96
View File
@@ -0,0 +1,96 @@
from datetime import datetime, timezone
from satellite_service import MissionCachePackage, SatelliteSyncBoundary
from tile_manager import (
GeneratedTileSidecar,
GeneratedTileSyncPackage,
TileManifestEntry,
)
def _manifest_entry() -> TileManifestEntry:
return TileManifestEntry(
tile_id="tile-1",
chunk_id="chunk-1",
crs="EPSG:3857",
meters_per_pixel=0.3,
capture_date="2026-05-01",
expires_at=datetime(2026, 6, 1, tzinfo=timezone.utc),
content_hash="sha256:tile",
expected_content_hash="sha256:tile",
sidecar_hash="sha256:sidecar",
expected_sidecar_hash="sha256:sidecar",
signature_hash="sig:trusted",
provenance="suite-satellite-service",
footprint={"min_lat": 49.0, "max_lat": 49.1},
descriptor_ref="descriptors/chunk-1.vlad",
)
def _generated_package() -> GeneratedTileSyncPackage:
sidecar = GeneratedTileSidecar(
tile_id="generated-1",
parent_frame_id="frame-1",
parent_covariance_m=2.0,
quality_score=0.8,
trust_level="generated",
provenance="nav-camera-generated",
)
return GeneratedTileSyncPackage(
package_ref="generated/mission-1/sync-package.json",
mission_id="mission-1",
manifest_delta=({"tile_id": "generated-1", "trust_level": "generated"},),
sidecars=(sidecar,),
)
def test_pre_flight_import_returns_package_for_tile_manager_validation() -> None:
# Arrange
boundary = SatelliteSyncBoundary()
package = MissionCachePackage(
package_id="pkg-1",
mission_id="mission-1",
manifest_entries=(_manifest_entry(),),
)
# Act
result = boundary.import_mission_cache(package, phase="pre_flight")
# Assert
assert result.ready_for_tile_validation is True
assert result.manifest_entries[0].tile_id == "tile-1"
assert boundary.status().imported_package_ids == ("pkg-1",)
def test_post_flight_upload_records_retryable_failure_for_audit() -> None:
# Arrange
boundary = SatelliteSyncBoundary(uploader=lambda package: "retryable_failure")
# Act
result = boundary.upload_generated_tiles(_generated_package(), phase="post_flight")
# Assert
assert result.upload_record is not None
assert result.upload_record.status == "retryable_failure"
assert result.upload_record.retained_for_retry is True
assert boundary.status().retry_package_refs == ("generated/mission-1/sync-package.json",)
def test_in_flight_sync_is_blocked_without_calling_network_boundary() -> None:
# Arrange
calls: list[str] = []
def uploader(package: GeneratedTileSyncPackage) -> str:
calls.append(package.package_ref)
return "success"
boundary = SatelliteSyncBoundary(uploader=uploader)
# Act
result = boundary.upload_generated_tiles(_generated_package(), phase="in_flight")
# Assert
assert result.upload_record is None
assert result.error is not None
assert result.error.cause == "mid_flight_network_blocked"
assert calls == []
+104
View File
@@ -0,0 +1,104 @@
from satellite_service import (
LocalVprIndexPackage,
LocalVprRetriever,
RelocalizationRequest,
VprDescriptorRecord,
)
def _record(
chunk_id: str = "chunk-1",
tile_id: str = "tile-1",
descriptor: tuple[float, ...] = (1.0, 0.0, 0.0),
freshness_status: str = "fresh",
) -> VprDescriptorRecord:
return VprDescriptorRecord(
chunk_id=chunk_id,
tile_id=tile_id,
descriptor=descriptor,
footprint={"min_lat": 49.0, "max_lat": 49.1, "min_lon": 36.0, "max_lon": 36.1},
freshness_status=freshness_status,
)
def test_valid_local_index_load_reports_ready_status() -> None:
# Arrange
retriever = LocalVprRetriever()
package = LocalVprIndexPackage(package_id="index-1", records=(_record(),))
# Act
readiness = retriever.load_index(package)
# Assert
assert readiness.ready is True
assert readiness.engine == "cpu_faiss"
assert readiness.loaded_records == 1
def test_loaded_index_returns_bounded_candidates_with_freshness() -> None:
# Arrange
retriever = LocalVprRetriever()
retriever.load_index(
LocalVprIndexPackage(
package_id="index-1",
records=(
_record(chunk_id="chunk-best", tile_id="tile-best", descriptor=(1.0, 0.0)),
_record(
chunk_id="chunk-stale",
tile_id="tile-stale",
descriptor=(0.8, 0.2),
freshness_status="stale",
),
),
)
)
request = RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="covariance_growth",
top_k=1,
query_descriptor=(1.0, 0.0),
)
# Act
result = retriever.retrieve(request)
# Assert
assert result.degraded is False
assert len(result.candidates) == 1
assert result.candidates[0].chunk_id == "chunk-best"
assert result.candidates[0].tile_id == "tile-best"
assert result.candidates[0].freshness_status == "fresh"
def test_missing_index_degrades_with_explicit_no_candidate_result() -> None:
# Arrange
retriever = LocalVprRetriever()
request = RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="cold_start",
top_k=3,
)
# Act
result = retriever.retrieve(request)
# Assert
assert result.ready is False
assert result.degraded is True
assert result.candidates == ()
assert result.error is not None
assert result.error.cause == "index_not_loaded"
def test_descriptor_fidelity_gate_rejects_large_optimized_delta() -> None:
# Arrange
retriever = LocalVprRetriever()
# Act
report = retriever.verify_descriptor_fidelity((1.0, 0.0), (0.0, 1.0), max_l2_delta=0.1)
# Assert
assert report.accepted is False
assert report.observed_l2_delta > report.max_l2_delta
+73
View File
@@ -0,0 +1,73 @@
from shared.contracts import FramePacket, TelemetrySample
from vio_adapter import LocalVioAdapter, VioInputPacket
def _frame(**overrides: object) -> FramePacket:
payload: dict[str, object] = {
"frame_id": "frame-1",
"timestamp_ns": 1_000_000,
"image_ref": "replay/frame-1.jpg",
"calibration_id": "calib-1",
"occlusion": "clear",
"quality": 0.85,
}
payload.update(overrides)
return FramePacket.model_validate(payload)
def _telemetry(timestamp_ns: int = 1_000_000) -> TelemetrySample:
return TelemetrySample(
timestamp_ns=timestamp_ns,
imu={"accel_x": 0.1, "accel_y": 0.0, "accel_z": 9.8},
attitude={"roll": 0.0, "pitch": 0.01, "yaw": 0.02},
altitude_m=120.0,
airspeed_mps=24.0,
gps_health="lost",
)
def test_valid_synchronized_packet_emits_vio_state() -> None:
# Arrange
adapter = LocalVioAdapter()
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.error is None
assert result.state_packet is not None
assert result.state_packet.timestamp_ns == 1_000_000
assert result.state_packet.tracking_quality == 0.85
assert result.health.state == "ready"
def test_timestamp_mismatch_is_explicit_validation_error() -> None:
# Arrange
adapter = LocalVioAdapter(timestamp_tolerance_ns=1_000)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(2_000_000),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is None
assert result.error is not None
assert result.error.component == "vio_adapter"
assert result.error.cause == "gap_exceeded"
assert result.health.state == "degraded"
def test_tracking_loss_degrades_health_without_emitting_absolute_position() -> None:
# Arrange
adapter = LocalVioAdapter(degraded_quality_threshold=0.35)
packet = VioInputPacket(frame=_frame(quality=0.2), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is not None
assert result.health.state == "degraded"
assert "latitude_deg" not in result.state_packet.model_dump()
assert "longitude_deg" not in result.state_packet.model_dump()