Files
gps-denied-onboard/tests/unit/c6_tile_cache/test_protocol_conformance.py
T
Oleksandr Bezdieniezhnykh a06b107fc3 [AZ-320] Add C11 IdempotentRetryTileUploader decorator
Wraps HttpTileUploader (AZ-319) with two bounded retry budgets:

- In-call (per-batch) — re-invokes inner on PARTIAL outcome up to
  `max_in_call_retries` times with capped exponential backoff
  (`min(base ** attempt_number, cap)`). On exhaustion: surfaces an
  operator hint via `next_retry_at_s = now + backoff_cap_s`.
- Per-tile (cross-call) — atomically increments c6's
  `tiles.upload_attempts` counter for every rejection; once a tile
  hits `max_per_tile_attempts` it is forward-only transitioned to
  `voting_status = upload_giveup` (excluded from `pending_uploads`).
  Each transition emits FDR `kind="c11.upload.giveup"` plus an
  ERROR log.

C6 contract changes (AZ-303 v1.3.0):
- VotingStatus.UPLOAD_GIVEUP added (forward-only from PENDING/TRUSTED).
- TileMetadataStore.increment_upload_attempts(tile_id) -> int added
  with NotImplementedError default for backwards-compat.
- Migration 0003_c11_upload_attempts: additive column +
  widened ck_tiles_voting_status (preserves IS NULL clause).

C11 wiring:
- C11RetryConfig + disable_retry_decorator on C11Config.
- build_tile_uploader wraps in decorator by default; bypass flag
  returns the bare HttpTileUploader. New `clock` keyword.

Cross-component isolation honoured (AZ-507): the decorator declares
`_RetryMetadataStoreLike` Protocol cut over c6's TileMetadataStore
and references `UPLOAD_GIVEUP` via a local string constant — no c6
imports.

Tests: 13 decorator + 1 conformance + 2 factory bypass + AC-6 enum
update + alembic head bump + AZ-272 schema fixture. 238 passed across
c11/c6/fdr suites; pre-existing perf microbenches unrelated.

Code review: PASS_WITH_WARNINGS (5 Low/Informational findings,
docs-level or downstream-CI-blocked). See
_docs/03_implementation/reviews/batch_41_review.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 08:48:53 +03:00

665 lines
22 KiB
Python

"""AZ-303 — C6 storage Protocol + DTO + error + factory conformance.
Covers all 10 ACs of AZ-303 (see ``_docs/02_tasks/todo/AZ-303_...``).
The factory ACs (AC-4 / AC-5) substitute lazy-importable fake impl
modules at ``sys.modules`` boundaries so the test never touches FAISS
or psycopg2.
"""
from __future__ import annotations
import dataclasses
import re
import sys
import types
from datetime import datetime, timezone
from pathlib import Path
import pytest
from gps_denied_onboard.components.c6_tile_cache import (
Bbox,
C6TileCacheConfig,
ContentHashMismatchError,
DescriptorIndex,
FreshnessLabel,
FreshnessRejectionError,
HnswParams,
IndexBuildError,
IndexMetadata,
IndexUnavailableError,
SectorBoundary,
SectorClassification,
TileCacheError,
TileFsError,
TileId,
TileMetadata,
TileMetadataError,
TileMetadataPersistent,
TileMetadataStore,
TileNotFoundError,
TilePixelHandle,
TileQualityMetadata,
TileSource,
TileStore,
VotingStatus,
)
from gps_denied_onboard.components.c6_tile_cache.config import (
KNOWN_DESCRIPTOR_INDEX_RUNTIMES,
)
from gps_denied_onboard.config.schema import Config, ConfigError
from gps_denied_onboard.runtime_root.errors import RuntimeNotAvailableError
from gps_denied_onboard.runtime_root.storage_factory import (
build_descriptor_index,
build_tile_metadata_store,
build_tile_store,
)
_CONTRACT_DIR = Path(__file__).resolve().parents[3] / ("_docs/02_document/contracts/c6_tile_cache")
_FAKE_IMPL_MODULE = "gps_denied_onboard.components.c6_tile_cache.faiss_descriptor_index"
_FAKE_STORE_MODULE = "gps_denied_onboard.components.c6_tile_cache.postgres_filesystem_store"
def _valid_tile_id(zoom: int = 18, lat: float = 49.94, lon: float = 36.31) -> TileId:
return TileId(zoom_level=zoom, lat=lat, lon=lon)
def _valid_tile_metadata() -> TileMetadata:
return TileMetadata(
tile_id=_valid_tile_id(),
tile_size_meters=256.0,
tile_size_pixels=256,
capture_timestamp=datetime(2026, 1, 1, tzinfo=timezone.utc),
source=TileSource.GOOGLEMAPS,
content_sha256_hex="a" * 64,
freshness_label=FreshnessLabel.FRESH,
flight_id=None,
companion_id=None,
quality_metadata=None,
voting_status=VotingStatus.TRUSTED,
)
def _config_with_c6(overrides: dict[str, object] | None = None) -> Config:
overrides = overrides or {}
block = C6TileCacheConfig(**overrides) # type: ignore[arg-type]
return Config.with_blocks(c6_tile_cache=block)
# ----------------------------------------------------------------------
# AC-1: three Protocols are conformance-checkable.
class _FullTileStore:
def read_tile_pixels(self, tile_id):
raise NotImplementedError
def write_tile(self, tile_blob, metadata):
raise NotImplementedError
def tile_exists(self, tile_id):
raise NotImplementedError
def delete_tile(self, tile_id):
raise NotImplementedError
class _PartialTileStore:
def read_tile_pixels(self, tile_id):
raise NotImplementedError
def write_tile(self, tile_blob, metadata):
raise NotImplementedError
def tile_exists(self, tile_id):
raise NotImplementedError
class _FullTileMetadataStore:
def query_by_bbox(self, bbox, zoom, *, voting_filter=None, source_filter=None):
raise NotImplementedError
def insert_metadata(self, metadata):
raise NotImplementedError
def update_voting_status(self, tile_id, status):
raise NotImplementedError
def mark_uploaded(self, tile_id, uploaded_at):
raise NotImplementedError
def pending_uploads(self):
raise NotImplementedError
def record_lru_access(self, tile_id, accessed_at):
raise NotImplementedError
def lru_candidates(self, *, max_count):
raise NotImplementedError
def total_disk_bytes(self):
raise NotImplementedError
def get_by_id(self, tile_id):
raise NotImplementedError
def increment_upload_attempts(self, tile_id):
raise NotImplementedError
class _PartialTileMetadataStore:
def query_by_bbox(self, bbox, zoom, *, voting_filter=None, source_filter=None):
raise NotImplementedError
class _FullDescriptorIndex:
def search_topk(self, query, k):
raise NotImplementedError
def descriptor_dim(self):
raise NotImplementedError
def mmap_handle(self):
raise NotImplementedError
def rebuild_from_descriptors(self, descriptors, tile_ids, hnsw_params):
raise NotImplementedError
def index_metadata(self):
raise NotImplementedError
class _PartialDescriptorIndex:
def search_topk(self, query, k):
raise NotImplementedError
def descriptor_dim(self):
raise NotImplementedError
def test_ac1_tile_store_conformance_full() -> None:
assert isinstance(_FullTileStore(), TileStore)
def test_ac1_tile_store_conformance_partial_missing_delete() -> None:
assert not isinstance(_PartialTileStore(), TileStore)
def test_ac1_tile_metadata_store_conformance_full() -> None:
assert isinstance(_FullTileMetadataStore(), TileMetadataStore)
def test_ac1_tile_metadata_store_conformance_partial() -> None:
assert not isinstance(_PartialTileMetadataStore(), TileMetadataStore)
def test_ac1_descriptor_index_conformance_full() -> None:
assert isinstance(_FullDescriptorIndex(), DescriptorIndex)
def test_ac1_descriptor_index_conformance_partial_missing_metadata() -> None:
assert not isinstance(_PartialDescriptorIndex(), DescriptorIndex)
# ----------------------------------------------------------------------
# AC-2: frozen DTOs reject mutation.
@pytest.mark.parametrize(
"dto, field_name, new_value",
[
(_valid_tile_id(), "lat", 0.0),
(_valid_tile_metadata(), "tile_size_meters", 9999.0),
(Bbox(min_lat=0.0, min_lon=0.0, max_lat=1.0, max_lon=1.0), "min_lat", 5.0),
(HnswParams(), "m", 64),
(
TileQualityMetadata(
estimator_label="satellite_anchored",
covariance_2x2=((0.1, 0.0), (0.0, 0.1)),
last_anchor_age_ms=100,
mre_px=0.5,
imu_bias_norm=0.01,
),
"mre_px",
1.0,
),
],
)
def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None:
original_value = getattr(dto, field_name)
with pytest.raises(dataclasses.FrozenInstanceError):
setattr(dto, field_name, new_value)
assert getattr(dto, field_name) == original_value
# ----------------------------------------------------------------------
# AC-3: error hierarchy catchable as a single family.
@pytest.mark.parametrize(
"exc_factory",
[
TileNotFoundError,
TileFsError,
TileMetadataError,
ContentHashMismatchError,
FreshnessRejectionError,
IndexUnavailableError,
],
)
def test_ac3_all_runtime_errors_caught_as_family(exc_factory) -> None:
with pytest.raises(TileCacheError):
raise exc_factory("boom")
def test_ac3_unrelated_exception_not_caught_as_family() -> None:
with pytest.raises(ValueError):
try:
raise ValueError("not us")
except TileCacheError:
pytest.fail("ValueError must not be caught as TileCacheError")
def test_ac3_index_build_error_outside_family() -> None:
with pytest.raises(IndexBuildError):
try:
raise IndexBuildError("offline only")
except TileCacheError:
pytest.fail("IndexBuildError must NOT be in the TileCacheError family")
# ----------------------------------------------------------------------
# AC-4 + AC-5: factory honours config + BUILD flag gate.
@pytest.fixture
def faiss_module_cleanup():
"""Ensure no residual fake FAISS impl module leaks between tests."""
sys.modules.pop(_FAKE_IMPL_MODULE, None)
yield
sys.modules.pop(_FAKE_IMPL_MODULE, None)
@pytest.fixture
def store_module_cleanup():
sys.modules.pop(_FAKE_STORE_MODULE, None)
yield
sys.modules.pop(_FAKE_STORE_MODULE, None)
def _install_fake_faiss_impl_module() -> type:
"""Install a fake ``faiss_descriptor_index`` module in ``sys.modules``.
The fake's ``FaissDescriptorIndex`` class structurally satisfies the
:class:`DescriptorIndex` Protocol. We attach the class via
``types.ModuleType`` so the factory's lazy import succeeds.
"""
class _FakeFaissDescriptorIndex(_FullDescriptorIndex):
def __init__(self, config: Config) -> None:
self.config = config
@classmethod
def from_config(cls, config: Config) -> _FakeFaissDescriptorIndex:
# AZ-306: factory now dispatches via from_config so the production
# impl can wire its Sha256Sidecar / logger / warmup query without
# the runtime_root touching them. Mirror PostgresFilesystemStore.
return cls(config)
fake_module = types.ModuleType(_FAKE_IMPL_MODULE)
fake_module.FaissDescriptorIndex = _FakeFaissDescriptorIndex # type: ignore[attr-defined]
sys.modules[_FAKE_IMPL_MODULE] = fake_module
return _FakeFaissDescriptorIndex
def _install_fake_postgres_store_module() -> type:
class _FakePostgresFilesystemStore(_FullTileStore, _FullTileMetadataStore):
def __init__(self, config: Config) -> None:
self.config = config
@classmethod
def from_config(cls, config: Config) -> _FakePostgresFilesystemStore:
# AZ-305: factories now dispatch via from_config so the production
# impl can wire its ConnectionPool / FdrClient / helpers without
# the runtime_root opening a connection of its own. The test fake
# preserves the single-config-arg shape via this classmethod.
return cls(config)
# AZ-308: ``build_tile_store`` now wraps the store in a
# ``BudgetEnforcedTileStore`` whose constructor reads
# ``total_disk_bytes`` for the AC-12 startup log. Override the
# ``_FullTileMetadataStore`` NotImplementedError stub with a
# working zero-byte response so the factory can construct the
# wrapper without touching a real DB.
def total_disk_bytes(self) -> int:
return 0
fake_module = types.ModuleType(_FAKE_STORE_MODULE)
fake_module.PostgresFilesystemStore = _FakePostgresFilesystemStore # type: ignore[attr-defined]
sys.modules[_FAKE_STORE_MODULE] = fake_module
return _FakePostgresFilesystemStore
def test_ac4_build_descriptor_index_returns_protocol_impl(
monkeypatch, faiss_module_cleanup
) -> None:
monkeypatch.setenv("BUILD_FAISS_INDEX", "ON")
fake_cls = _install_fake_faiss_impl_module()
config = _config_with_c6()
handle = build_descriptor_index(config)
assert isinstance(handle, fake_cls)
assert isinstance(handle, DescriptorIndex)
def test_ac5_build_descriptor_index_flag_off_raises_no_import(
monkeypatch, faiss_module_cleanup
) -> None:
monkeypatch.delenv("BUILD_FAISS_INDEX", raising=False)
config = _config_with_c6()
with pytest.raises(RuntimeNotAvailableError) as exc_info:
build_descriptor_index(config)
assert "faiss_hnsw" in str(exc_info.value)
assert _FAKE_IMPL_MODULE not in sys.modules
def test_ac4_build_tile_store_returns_protocol_impl(store_module_cleanup) -> None:
# AZ-308: ``build_tile_store`` now returns a ``BudgetEnforcedTileStore``
# decorator wrapping the inner :class:`TileStore` impl. The decorator
# implements the Protocol surface; the wrapped instance is reachable
# via the private ``_wrapped`` attribute for tests that need to
# introspect the inner store.
from gps_denied_onboard.components.c6_tile_cache.cache_budget_enforcer import (
BudgetEnforcedTileStore,
)
fake_cls = _install_fake_postgres_store_module()
config = _config_with_c6()
store = build_tile_store(config)
assert isinstance(store, BudgetEnforcedTileStore)
assert isinstance(store, TileStore)
assert isinstance(store._wrapped, fake_cls) # type: ignore[attr-defined]
def test_ac4_build_tile_metadata_store_returns_protocol_impl(
store_module_cleanup,
) -> None:
fake_cls = _install_fake_postgres_store_module()
config = _config_with_c6()
md = build_tile_metadata_store(config)
assert isinstance(md, fake_cls)
assert isinstance(md, TileMetadataStore)
def test_ac5_tile_store_runtime_module_missing_raises(store_module_cleanup, monkeypatch) -> None:
"""AC-5 historical name; after AZ-305 the impl module always exists, so
"missing" is exercised by deleting it from ``sys.modules`` AND making
``importlib`` refuse the import. We patch the module-level lazy import
site to ``raise ModuleNotFoundError`` so the factory hits the same
documented branch.
"""
config = _config_with_c6()
import gps_denied_onboard.runtime_root.storage_factory as factory_mod
real_import = (
__builtins__["__import__"] if isinstance(__builtins__, dict) else __builtins__.__import__
)
def _block_postgres_import(name, *args, **kwargs):
if name.endswith("postgres_filesystem_store"):
raise ModuleNotFoundError(name)
return real_import(name, *args, **kwargs)
monkeypatch.setattr(
factory_mod, "__builtins__", {"__import__": _block_postgres_import}, raising=False
)
monkeypatch.setitem(sys.modules, _FAKE_STORE_MODULE, None) # type: ignore[arg-type]
with pytest.raises(RuntimeNotAvailableError) as exc_info:
build_tile_store(config)
assert "postgres_filesystem" in str(exc_info.value)
# ----------------------------------------------------------------------
# AC-6: unknown runtime label rejected at config load.
def test_ac6_unknown_descriptor_index_runtime_rejected() -> None:
with pytest.raises(ConfigError) as exc_info:
C6TileCacheConfig(descriptor_index_runtime="scann")
msg = str(exc_info.value)
assert "scann" in msg
for valid in KNOWN_DESCRIPTOR_INDEX_RUNTIMES:
assert valid in msg
def test_ac6_unknown_store_runtime_rejected() -> None:
with pytest.raises(ConfigError):
C6TileCacheConfig(store_runtime="sqlite_filesystem")
def test_ac6_unknown_metadata_runtime_rejected() -> None:
with pytest.raises(ConfigError):
C6TileCacheConfig(metadata_runtime="sqlite_filesystem")
# ----------------------------------------------------------------------
# AC-7: constructor-time validation rejects bad input.
@pytest.mark.parametrize(
"kwargs, offending_field",
[
({"zoom_level": 22, "lat": 0.0, "lon": 0.0}, "zoom_level"),
({"zoom_level": -1, "lat": 0.0, "lon": 0.0}, "zoom_level"),
({"zoom_level": 18, "lat": 100.0, "lon": 0.0}, "lat"),
({"zoom_level": 18, "lat": 0.0, "lon": -200.0}, "lon"),
],
)
def test_ac7_tile_id_rejects_bad_input(kwargs: dict[str, float], offending_field: str) -> None:
with pytest.raises(ValueError) as exc_info:
TileId(**kwargs) # type: ignore[arg-type]
assert offending_field in str(exc_info.value)
@pytest.mark.parametrize(
"kwargs",
[
{"min_lat": 10.0, "min_lon": 0.0, "max_lat": 5.0, "max_lon": 10.0},
{"min_lat": 0.0, "min_lon": 10.0, "max_lat": 5.0, "max_lon": 5.0},
{"min_lat": 5.0, "min_lon": 5.0, "max_lat": 5.0, "max_lon": 10.0},
],
)
def test_ac7_bbox_rejects_inverted_or_degenerate(kwargs: dict[str, float]) -> None:
with pytest.raises(ValueError):
Bbox(**kwargs) # type: ignore[arg-type]
# ----------------------------------------------------------------------
# AC-8: TilePixelHandle is read-only by contract.
class _BytesTilePixelHandle(TilePixelHandle):
"""In-memory fake used by tests; mirrors the mmap impl's read-only contract."""
def __init__(self, blob: bytes, path: Path) -> None:
self._blob = blob
self._path = path
self._view: memoryview | None = None
@property
def filesystem_path(self) -> Path:
return self._path
def __enter__(self) -> memoryview:
self._view = memoryview(self._blob)
return self._view
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._view is not None:
self._view.release()
self._view = None
def test_ac8_tile_pixel_handle_returns_read_only_memoryview(tmp_path: Path) -> None:
blob = b"\xff\xd8\xff" + b"\x00" * 100 # JPEG SOI + filler
handle_path = tmp_path / "fake.jpg"
handle = _BytesTilePixelHandle(blob, handle_path)
with handle as memview:
with pytest.raises(TypeError):
memview[0] = 0xFF # type: ignore[index]
assert bytes(memview[:3]) == b"\xff\xd8\xff"
# ----------------------------------------------------------------------
# AC-9: contract files match Protocol shapes.
_METHOD_TABLE_RE = re.compile(r"^\|\s*`(?P<name>[a-z_][a-z0-9_]*)`\s*\|", re.MULTILINE)
def _methods_from_contract(contract_file: Path) -> set[str]:
"""Pull every backtick-quoted method name from the ``## Shape`` table."""
text = contract_file.read_text(encoding="utf-8")
shape_start = text.index("## Shape")
next_section = text.find("\n## ", shape_start + len("## Shape"))
shape_section = text[shape_start:next_section] if next_section != -1 else text[shape_start:]
return {m.group("name") for m in _METHOD_TABLE_RE.finditer(shape_section)}
def _protocol_methods(proto: type) -> set[str]:
"""Reflect over a Protocol's method names."""
return {
name for name in dir(proto) if not name.startswith("_") and callable(getattr(proto, name))
}
@pytest.mark.parametrize(
"contract_filename, proto",
[
("tile_store.md", TileStore),
("tile_metadata_store.md", TileMetadataStore),
("descriptor_index.md", DescriptorIndex),
],
)
def test_ac9_contract_methods_match_protocol(contract_filename: str, proto: type) -> None:
contract_path = _CONTRACT_DIR / contract_filename
contract_methods = _methods_from_contract(contract_path)
protocol_methods = _protocol_methods(proto)
missing_in_protocol = contract_methods - protocol_methods
missing_in_contract = protocol_methods - contract_methods
assert not missing_in_protocol, (
f"{contract_filename}: methods declared in contract but missing from "
f"Protocol: {sorted(missing_in_protocol)}"
)
assert not missing_in_contract, (
f"{contract_filename}: methods present on Protocol but missing from "
f"contract: {sorted(missing_in_contract)}"
)
# ----------------------------------------------------------------------
# AC-10: VotingStatus surface.
def test_ac10_voting_status_has_documented_states_only() -> None:
# AC-6 (AZ-320): ``upload_giveup`` is the new terminal state set by
# the C11 retry decorator after a tile exhausts its per-tile retry
# budget. Forward-only — see tile_metadata_store.md v1.3.0 I-8.
assert {v.value for v in VotingStatus} == {
"pending",
"trusted",
"rejected",
"upload_giveup",
}
assert VotingStatus.PENDING.value == "pending"
assert VotingStatus.TRUSTED.value == "trusted"
assert VotingStatus.REJECTED.value == "rejected"
assert VotingStatus.UPLOAD_GIVEUP.value == "upload_giveup"
# ----------------------------------------------------------------------
# NFR-reliability-error-family + smoke surface tests.
@pytest.mark.parametrize(
"exc_type",
[
TileNotFoundError,
TileFsError,
TileMetadataError,
ContentHashMismatchError,
FreshnessRejectionError,
IndexUnavailableError,
],
)
def test_nfr_reliability_all_runtime_errors_subclass_family(exc_type) -> None:
assert issubclass(exc_type, TileCacheError)
def test_nfr_reliability_index_build_error_not_in_family() -> None:
assert not issubclass(IndexBuildError, TileCacheError)
def test_sector_classification_enum_surface() -> None:
assert {v.value for v in SectorClassification} == {
"active_conflict",
"stable_rear",
}
def test_sector_boundary_dto_constructs_and_freezes() -> None:
bbox = Bbox(min_lat=0.0, min_lon=0.0, max_lat=1.0, max_lon=1.0)
sector = SectorBoundary(bbox=bbox, classification=SectorClassification.ACTIVE_CONFLICT)
with pytest.raises(dataclasses.FrozenInstanceError):
sector.classification = SectorClassification.STABLE_REAR # type: ignore[misc]
def test_tile_metadata_persistent_dto_constructs_and_freezes() -> None:
md = _valid_tile_metadata()
persistent = TileMetadataPersistent(
metadata=md,
accessed_at=datetime(2026, 1, 2, tzinfo=timezone.utc),
uploaded_at=None,
disk_bytes=12345,
)
with pytest.raises(dataclasses.FrozenInstanceError):
persistent.disk_bytes = 0 # type: ignore[misc]
def test_nfr_perf_build_factories_under_50ms_p99(
monkeypatch, faiss_module_cleanup, store_module_cleanup
) -> None:
"""Factory triple p99 ≤ 50 ms across 1000 calls (NFR-perf-factory)."""
import time
monkeypatch.setenv("BUILD_FAISS_INDEX", "ON")
_install_fake_faiss_impl_module()
_install_fake_postgres_store_module()
config = _config_with_c6()
durations_ms: list[float] = []
for _ in range(1000):
for factory in (build_tile_store, build_tile_metadata_store, build_descriptor_index):
t0 = time.perf_counter()
factory(config)
durations_ms.append((time.perf_counter() - t0) * 1000.0)
durations_ms.sort()
p99 = durations_ms[int(0.99 * len(durations_ms))]
assert p99 <= 50.0, f"build_*() p99={p99:.3f} ms exceeds 50 ms NFR"
def test_index_metadata_dto_constructs_and_freezes(tmp_path: Path) -> None:
md = IndexMetadata(
descriptor_dim=512,
n_vectors=10_000,
backbone_label="ultra_vpr_v0",
backbone_sha256_hex="b" * 64,
built_at=datetime(2026, 1, 3, tzinfo=timezone.utc),
hnsw_params=HnswParams(),
sidecar_sha256_hex="c" * 64,
file_path=tmp_path / "tiles.index",
)
with pytest.raises(dataclasses.FrozenInstanceError):
md.n_vectors = 0 # type: ignore[misc]