[AZ-308] c6 CacheBudgetEnforcer: 10 GB hard cap + LRU sweep

CacheBudgetEnforcer.reserve_headroom(needed_bytes) returns immediately
when total_disk_bytes() + needed_bytes <= budget, otherwise iterates
lru_candidates in eviction_batch_size batches, deletes via delete_tile,
emits one INFO log per evicted tile (c6.evicted) and one FDR record per
eviction batch (c6.eviction_batch, evicted_tile_ids capped to 5).
Raises CacheBudgetExhaustedError AFTER a full sweep if the budget
cannot be met. BudgetEnforcedTileStore decorates a TileStore so the
policy stays separable from PostgresFilesystemStore. Composition root
in storage_factory.build_tile_store wires the wrapper unconditionally.

PostgresFilesystemStore now accepts lru_clock: Clock | None = None;
when set, read_tile_pixels calls record_lru_access(tile_id, now) so
eviction picks the right LRU candidates. Production wiring injects
WallClock(); AZ-305 unit tests still construct without the clock and
keep their pass-through semantics. Contract tile_store.md bumped to
v1.1.0 to add CacheBudgetExhaustedError to the TileCacheError family;
shared FDR schema bumped to v1.3.0 for the new c6.eviction_batch kind.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 20:37:41 +03:00
parent 39ff47087f
commit d571ca25f9
13 changed files with 1588 additions and 29 deletions
@@ -0,0 +1,774 @@
"""AZ-308 — ``CacheBudgetEnforcer`` + ``BudgetEnforcedTileStore`` tests.
Most ACs are exercised against an in-memory fake ``TileMetadataStore``
that satisfies the AZ-303 Protocol so the unit tests run on Tier-1.
The few ACs that genuinely need a real Postgres + filesystem layout
(``AC-6`` decorator+write_tile end-to-end, ``AC-8`` LRU clock wired
into ``read_tile_pixels``, ``AC-10`` synthetic 10 GiB fill) carry
``@pytest.mark.docker`` and are auto-skipped on Tier-1.
To run the docker tests locally::
docker compose -f docker-compose.test.yml up -d db
GPS_DENIED_TIER=2 DB_URL=postgresql://gps_denied:dev@localhost:55432/gps_denied \\
pytest tests/unit/c6_tile_cache/test_cache_budget_enforcer.py
"""
from __future__ import annotations
import hashlib
import logging
import os
import time
from collections.abc import Iterator
from datetime import datetime, timedelta, timezone
from pathlib import Path
import psycopg
import pytest
from psycopg_pool import ConnectionPool
from gps_denied_onboard.components.c6_tile_cache._types import (
FreshnessLabel,
TileId,
TileMetadata,
TileMetadataPersistent,
TileSource,
VotingStatus,
)
from gps_denied_onboard.components.c6_tile_cache.cache_budget_enforcer import (
BudgetEnforcedTileStore,
CacheBudgetEnforcer,
EvictionResult,
)
from gps_denied_onboard.components.c6_tile_cache.config import C6TileCacheConfig
from gps_denied_onboard.components.c6_tile_cache.errors import (
CacheBudgetExhaustedError,
ContentHashMismatchError,
TileFsError,
TileMetadataError,
)
from gps_denied_onboard.components.c6_tile_cache.migrations import apply_migrations
from gps_denied_onboard.components.c6_tile_cache.postgres_filesystem_store import (
PostgresFilesystemStore,
)
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.logging import get_logger
_docker = pytest.mark.docker
_NS_PER_S = 1_000_000_000
# ----------------------------------------------------------------------
# Test doubles
# ----------------------------------------------------------------------
class _FakeClock:
def __init__(self, now_dt: datetime) -> None:
self._now_ns = int(now_dt.timestamp() * _NS_PER_S)
def monotonic_ns(self) -> int:
return self._now_ns
def time_ns(self) -> int:
return self._now_ns
def sleep_until_ns(self, target_ns: int) -> None:
if target_ns > self._now_ns:
self._now_ns = target_ns
class _FakeStore:
"""In-memory ``TileMetadataStore`` + ``TileStore`` for unit tests.
Tracks LRU order via insertion order; ``delete_tile`` is idempotent;
counters on every Protocol method let tests assert AC-4 / AC-9 query
discipline without round-tripping a real DB.
"""
def __init__(
self,
*,
rows: list[TileMetadataPersistent] | None = None,
delete_returns: dict[TileId, bool] | None = None,
delete_raises: dict[TileId, BaseException] | None = None,
) -> None:
self._rows: list[TileMetadataPersistent] = list(rows or [])
self._delete_returns = delete_returns or {}
self._delete_raises = delete_raises or {}
self.lru_calls: list[int] = []
self.delete_calls: list[TileId] = []
self.total_disk_bytes_calls = 0
def total_disk_bytes(self) -> int:
self.total_disk_bytes_calls += 1
return sum(p.disk_bytes for p in self._rows)
def lru_candidates(self, *, max_count: int) -> list[TileMetadataPersistent]:
self.lru_calls.append(max_count)
return list(self._rows[:max_count])
def delete_tile(self, tile_id: TileId) -> bool:
self.delete_calls.append(tile_id)
if tile_id in self._delete_raises:
raise self._delete_raises[tile_id]
self._rows = [p for p in self._rows if p.metadata.tile_id != tile_id]
return self._delete_returns.get(tile_id, True)
def _persistent_row(
*,
tile_id_seed: tuple[int, float, float],
disk_bytes: int,
accessed_at: datetime,
) -> TileMetadataPersistent:
zoom, lat, lon = tile_id_seed
tile_id = TileId(zoom_level=zoom, lat=lat, lon=lon)
blob_hash = hashlib.sha256(f"{tile_id_seed}-{disk_bytes}".encode()).hexdigest()
md = TileMetadata(
tile_id=tile_id,
tile_size_meters=256.0,
tile_size_pixels=256,
capture_timestamp=datetime(2026, 5, 12, tzinfo=timezone.utc),
source=TileSource.GOOGLEMAPS,
content_sha256_hex=blob_hash,
freshness_label=FreshnessLabel.FRESH,
flight_id=None,
companion_id=None,
quality_metadata=None,
voting_status=VotingStatus.TRUSTED,
)
return TileMetadataPersistent(
metadata=md,
accessed_at=accessed_at,
uploaded_at=None,
disk_bytes=disk_bytes,
)
def _build_enforcer(
store: _FakeStore,
sink: FakeFdrSink,
*,
budget_bytes: int,
eviction_batch_size: int = 32,
) -> CacheBudgetEnforcer:
return CacheBudgetEnforcer(
store=store, # type: ignore[arg-type]
fdr_client=sink, # type: ignore[arg-type]
logger=get_logger("c6_tile_cache.budget.test"),
budget_bytes=budget_bytes,
eviction_batch_size=eviction_batch_size,
)
@pytest.fixture
def fake_sink() -> FakeFdrSink:
return FakeFdrSink(producer_id="c6_tile_cache.budget", capacity=256)
@pytest.fixture
def now_dt() -> datetime:
return datetime(2026, 5, 12, 12, 0, 0, tzinfo=timezone.utc)
# ======================================================================
# Non-docker unit tests
# ======================================================================
def test_construction_emits_loaded_log_with_disk_bytes_snapshot(
fake_sink: FakeFdrSink, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
store = _FakeStore(
rows=[
_persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=1_000_000,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
]
)
# Act
with caplog.at_level(logging.INFO, logger="c6_tile_cache.budget.test"):
_build_enforcer(store, fake_sink, budget_bytes=10 * 1024**3)
# Assert
loaded = [rec for rec in caplog.records if getattr(rec, "kind", "") == "c6.budget.loaded"]
assert len(loaded) == 1
kv = loaded[0].kv # type: ignore[attr-defined]
assert kv["budget_bytes"] == 10 * 1024**3
assert kv["current_disk_bytes"] == 1_000_000
assert kv["headroom_bytes"] == 10 * 1024**3 - 1_000_000
def test_ac12_construction_warns_when_over_budget(
fake_sink: FakeFdrSink, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange — prior flight ended over the cap.
store = _FakeStore(
rows=[
_persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=200,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
]
)
# Act
with caplog.at_level(logging.WARNING, logger="c6_tile_cache.budget.test"):
_build_enforcer(store, fake_sink, budget_bytes=100)
# Assert
warn = [
rec
for rec in caplog.records
if getattr(rec, "kind", "") == "c6.budget.over_budget_at_construction"
]
assert len(warn) == 1
assert warn[0].kv["overage_bytes"] == 100 # type: ignore[attr-defined]
def test_construction_rejects_non_positive_budget(fake_sink: FakeFdrSink) -> None:
# Arrange
store = _FakeStore()
# Act + Assert
with pytest.raises(TileMetadataError, match="budget_bytes must be > 0"):
_build_enforcer(store, fake_sink, budget_bytes=0)
def test_construction_rejects_non_positive_batch_size(fake_sink: FakeFdrSink) -> None:
# Arrange
store = _FakeStore()
# Act + Assert
with pytest.raises(TileMetadataError, match="eviction_batch_size must be > 0"):
_build_enforcer(store, fake_sink, budget_bytes=1024, eviction_batch_size=0)
def test_ac1_no_eviction_fast_path(fake_sink: FakeFdrSink) -> None:
# Arrange — 10 GB budget, 1 GB used, 10 MB needed → trivially fits.
store = _FakeStore(
rows=[
_persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=1 * 1024**3,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
]
)
enforcer = _build_enforcer(store, fake_sink, budget_bytes=10 * 1024**3)
fake_sink.records.clear()
# Act
result = enforcer.reserve_headroom(10 * 1024 * 1024)
# Assert
assert result == EvictionResult(evicted=[], freed_bytes=0)
assert store.lru_calls == [] # AC-1: no lru_candidates call on fast path
eviction_records = [r for r in fake_sink.records if r.kind == "c6.eviction_batch"]
assert eviction_records == []
def test_ac2_single_tile_eviction_frees_enough(
fake_sink: FakeFdrSink, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange — 10 GB budget, 9.99 GB used → 10 MB head-room.
# One LRU candidate of 50 MB; we ask for 30 MB more.
budget = 10 * 1024**3
used = budget - 10 * 1024 * 1024
fill = _persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=used - 50 * 1024 * 1024,
accessed_at=datetime(2026, 4, 1, tzinfo=timezone.utc),
)
candidate = _persistent_row(
tile_id_seed=(18, 49.95, 36.32),
disk_bytes=50 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
# _FakeStore.lru_candidates returns insertion order → candidate is the
# LRU pick because it's first in the list.
store = _FakeStore(rows=[candidate, fill])
enforcer = _build_enforcer(store, fake_sink, budget_bytes=budget)
fake_sink.records.clear()
# Act
with caplog.at_level(logging.INFO, logger="c6_tile_cache.budget.test"):
result = enforcer.reserve_headroom(30 * 1024 * 1024)
# Assert
assert [md.tile_id for md in result.evicted] == [candidate.metadata.tile_id]
assert result.freed_bytes == 50 * 1024 * 1024
info_logs = [rec for rec in caplog.records if getattr(rec, "kind", "") == "c6.evicted"]
assert len(info_logs) == 1
eviction_records = [r for r in fake_sink.records if r.kind == "c6.eviction_batch"]
assert len(eviction_records) == 1
assert eviction_records[0].payload["evicted_count"] == 1
assert eviction_records[0].payload["freed_bytes"] == 50 * 1024 * 1024
def test_ac3_multi_tile_eviction_iterates_until_target(fake_sink: FakeFdrSink) -> None:
# Arrange — 10 candidates of 5 MB each; need to free 30 MB.
candidates = [
_persistent_row(
tile_id_seed=(18, 49.0 + i * 0.001, 36.0),
disk_bytes=5 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc) + timedelta(minutes=i),
)
for i in range(10)
]
# Budget configured to be exactly current → 30 MB shortfall on a 30 MB ask.
used = sum(c.disk_bytes for c in candidates)
store = _FakeStore(rows=list(candidates))
enforcer = _build_enforcer(store, fake_sink, budget_bytes=used)
# Act
result = enforcer.reserve_headroom(30 * 1024 * 1024)
# Assert — exactly 6 evictions (6 of 5 MB = 30 MB shortfall).
assert len(result.evicted) == 6
assert result.freed_bytes == 30 * 1024 * 1024
# The 7th onwards are still in the fake store (i.e. not evicted).
assert len(store.delete_calls) == 6
def test_ac4_eviction_batches_respect_batch_size(fake_sink: FakeFdrSink) -> None:
# Arrange — 100 candidates of 1 MB each; batch size 32; need 50 MB.
candidates = [
_persistent_row(
tile_id_seed=(18, 49.0 + i * 0.001, 36.0),
disk_bytes=1 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc) + timedelta(minutes=i),
)
for i in range(100)
]
used = sum(c.disk_bytes for c in candidates)
store = _FakeStore(rows=list(candidates))
enforcer = _build_enforcer(store, fake_sink, budget_bytes=used, eviction_batch_size=32)
# Act
result = enforcer.reserve_headroom(50 * 1024 * 1024)
# Assert
assert result.freed_bytes == 50 * 1024 * 1024
# lru_candidates must be called with max_count=32 each time.
assert all(call == 32 for call in store.lru_calls)
# Two SELECTs cover candidates [0..31] (32) + [32..49] (18 needed, 32 returned).
assert len(store.lru_calls) == 2
def test_ac5_insufficient_candidates_raises_after_full_sweep(fake_sink: FakeFdrSink) -> None:
# Arrange — only 100 MB worth of candidates exist; we ask for 1 GB.
candidates = [
_persistent_row(
tile_id_seed=(18, 49.0 + i * 0.001, 36.0),
disk_bytes=10 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc) + timedelta(minutes=i),
)
for i in range(10)
]
used = sum(c.disk_bytes for c in candidates)
store = _FakeStore(rows=list(candidates))
enforcer = _build_enforcer(store, fake_sink, budget_bytes=used, eviction_batch_size=32)
# Act + Assert
with pytest.raises(CacheBudgetExhaustedError) as excinfo:
enforcer.reserve_headroom(1 * 1024**3)
# All candidates evicted before the raise (partial-eviction principle).
assert excinfo.value.evicted_count == 10
assert excinfo.value.needed_bytes == 1 * 1024**3
# No candidates remain → next total_disk_bytes would be 0.
assert len(store.delete_calls) == 10
def test_ac7_decorator_propagates_wrapped_errors(fake_sink: FakeFdrSink) -> None:
# Arrange
store = _FakeStore()
enforcer = _build_enforcer(store, fake_sink, budget_bytes=10 * 1024**3)
class _RaisingStore:
def read_tile_pixels(self, _tile_id: TileId) -> object:
raise AssertionError("not exercised here")
def write_tile(self, _tile_blob: bytes, _metadata: TileMetadata) -> None:
raise ContentHashMismatchError("declared a..a, computed 0..0")
def tile_exists(self, _tile_id: TileId) -> bool:
return False
def delete_tile(self, _tile_id: TileId) -> bool:
return False
wrapper = BudgetEnforcedTileStore(wrapped=_RaisingStore(), enforcer=enforcer)
blob = b"\xff\xd8" + b"\x00" * 16
md = TileMetadata(
tile_id=TileId(zoom_level=18, lat=49.94, lon=36.31),
tile_size_meters=256.0,
tile_size_pixels=256,
capture_timestamp=datetime(2026, 5, 12, tzinfo=timezone.utc),
source=TileSource.GOOGLEMAPS,
content_sha256_hex=hashlib.sha256(blob).hexdigest(),
freshness_label=FreshnessLabel.FRESH,
flight_id=None,
companion_id=None,
quality_metadata=None,
voting_status=VotingStatus.TRUSTED,
)
# Act + Assert — decorator does NOT rewrap the underlying error.
with pytest.raises(ContentHashMismatchError, match=r"declared a\.\.a"):
wrapper.write_tile(blob, md)
def test_ac9_no_evict_path_uses_single_select(fake_sink: FakeFdrSink) -> None:
# Arrange — head-room exists; reserve_headroom should ONLY hit total_disk_bytes.
store = _FakeStore(
rows=[
_persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
]
)
enforcer = _build_enforcer(store, fake_sink, budget_bytes=10 * 1024**3)
# Reset the counter so we ignore the construction-time read.
store.total_disk_bytes_calls = 0
# Act
enforcer.reserve_headroom(1024)
# Assert
assert store.total_disk_bytes_calls == 1
assert store.lru_calls == []
assert store.delete_calls == []
def test_ac11_fdr_eviction_batch_payload_caps_tile_ids_at_5(
fake_sink: FakeFdrSink,
) -> None:
# Arrange — 100 candidates of 1 MB each; force 100 evictions.
candidates = [
_persistent_row(
tile_id_seed=(18, 49.0 + i * 0.0001, 36.0),
disk_bytes=1 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc) + timedelta(minutes=i),
)
for i in range(100)
]
used = sum(c.disk_bytes for c in candidates)
store = _FakeStore(rows=list(candidates))
enforcer = _build_enforcer(store, fake_sink, budget_bytes=used, eviction_batch_size=32)
fake_sink.records.clear()
# Act — force ~100 evictions worth of free.
enforcer.reserve_headroom(100 * 1024 * 1024)
# Assert
eviction_records = [r for r in fake_sink.records if r.kind == "c6.eviction_batch"]
assert len(eviction_records) == 1
payload = eviction_records[0].payload
assert payload["evicted_count"] == 100
assert len(payload["evicted_tile_ids"]) == 5 # bounded
def test_reliability_delete_returns_false_logs_and_continues(
fake_sink: FakeFdrSink, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange — first candidate raced away; second candidate is real.
raced = _persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=10 * 1024 * 1024,
accessed_at=datetime(2026, 4, 1, tzinfo=timezone.utc),
)
second = _persistent_row(
tile_id_seed=(18, 49.95, 36.32),
disk_bytes=10 * 1024 * 1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
store = _FakeStore(
rows=[raced, second],
delete_returns={raced.metadata.tile_id: False},
)
enforcer = _build_enforcer(store, fake_sink, budget_bytes=raced.disk_bytes + second.disk_bytes)
# Act
with caplog.at_level(logging.INFO, logger="c6_tile_cache.budget.test"):
result = enforcer.reserve_headroom(15 * 1024 * 1024)
# Assert — both are counted as freed (spec § Exclusions).
already_gone = [
rec for rec in caplog.records if getattr(rec, "kind", "") == "c6.evict.already_gone"
]
assert len(already_gone) == 1
assert result.freed_bytes == 20 * 1024 * 1024
assert len(store.delete_calls) == 2
def test_reliability_delete_raises_tile_fs_error_logs_and_continues(
fake_sink: FakeFdrSink, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange — delete raises TileFsError but row delete succeeded under it.
candidate = _persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=20 * 1024 * 1024,
accessed_at=datetime(2026, 4, 1, tzinfo=timezone.utc),
)
store = _FakeStore(
rows=[candidate],
delete_raises={candidate.metadata.tile_id: TileFsError("unlink failed")},
)
enforcer = _build_enforcer(store, fake_sink, budget_bytes=candidate.disk_bytes)
# Act
with caplog.at_level(logging.WARNING, logger="c6_tile_cache.budget.test"):
result = enforcer.reserve_headroom(10 * 1024 * 1024)
# Assert
fs_errors = [rec for rec in caplog.records if getattr(rec, "kind", "") == "c6.evict.fs_error"]
assert len(fs_errors) == 1
assert result.freed_bytes == candidate.disk_bytes
def test_nfr_perf_no_evict_path_p99_under_5ms(fake_sink: FakeFdrSink) -> None:
# Arrange — head-room exists.
store = _FakeStore(
rows=[
_persistent_row(
tile_id_seed=(18, 49.94, 36.31),
disk_bytes=1024,
accessed_at=datetime(2026, 5, 1, tzinfo=timezone.utc),
)
]
)
enforcer = _build_enforcer(store, fake_sink, budget_bytes=10 * 1024**3)
durations_us: list[float] = []
# Act — 1000 reps is enough for a stable p99 on the no-DB path.
for _ in range(1000):
t0 = time.perf_counter()
enforcer.reserve_headroom(1024)
durations_us.append((time.perf_counter() - t0) * 1_000_000.0)
# Assert — relaxed 5 ms ceiling matches the AZ-308 NFR text.
durations_us.sort()
p99 = durations_us[int(0.99 * len(durations_us))]
assert p99 < 5_000.0, f"reserve_headroom p99={p99:.1f} us exceeds 5 ms ceiling"
# ======================================================================
# Docker integration tests (real Postgres + filesystem)
# ======================================================================
@pytest.fixture
def db_url() -> str:
url = os.environ.get("DB_URL")
if not url:
pytest.skip("DB_URL not set — start docker-compose.test.yml `db` service first")
return url
@pytest.fixture
def fresh_head_db(db_url: str) -> Iterator[str]:
tables = ", ".join(
(
"tile_freshness_rules",
"engine_cache_entries",
"manifests",
"tiles",
"sector_classifications",
"flights",
"alembic_version",
)
)
with psycopg.connect(db_url, autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"DROP TABLE IF EXISTS {tables} CASCADE")
block = C6TileCacheConfig(postgres_dsn=db_url)
apply_migrations(Config.with_blocks(c6_tile_cache=block))
yield db_url
@pytest.fixture
def pool(fresh_head_db: str) -> Iterator[ConnectionPool]:
p = ConnectionPool(
fresh_head_db, min_size=1, max_size=4, open=True, kwargs={"autocommit": False}
)
yield p
p.close()
@pytest.fixture
def real_store(
pool: ConnectionPool, tmp_path: Path, fake_sink: FakeFdrSink
) -> PostgresFilesystemStore:
from gps_denied_onboard.clock.wall_clock import WallClock
return PostgresFilesystemStore(
root_dir=tmp_path,
postgres_pool=pool,
sha256_sidecar=Sha256Sidecar,
wgs_converter=WgsConverter,
fdr_client=fake_sink, # type: ignore[arg-type]
logger=get_logger("c6_tile_cache.store.test"),
lru_clock=WallClock(),
)
@pytest.fixture
def future_clock_store(
pool: ConnectionPool, tmp_path: Path, fake_sink: FakeFdrSink
) -> PostgresFilesystemStore:
"""Store wired with a deterministic far-future clock for LRU tests.
Wall-clock parity between the host (Python) and the Postgres container
is not always tight on macOS/Colima — a sub-second skew can leave the
AZ-305 ``DEFAULT now()`` ``accessed_at`` after the host's
:meth:`time.time_ns`, so a real ``record_lru_access`` UPDATE with the
host's wall clock loses to ``GREATEST(accessed_at, %s)``. Pinning the
clock to a far-future timestamp removes that flakiness without
changing the production wiring (which uses ``WallClock``).
"""
return PostgresFilesystemStore(
root_dir=tmp_path,
postgres_pool=pool,
sha256_sidecar=Sha256Sidecar,
wgs_converter=WgsConverter,
fdr_client=fake_sink, # type: ignore[arg-type]
logger=get_logger("c6_tile_cache.store.test"),
lru_clock=_FakeClock(datetime(2099, 1, 1, tzinfo=timezone.utc)),
)
def _make_tile_blob(content: str) -> bytes:
return b"\xff\xd8\xff\xe0" + content.encode("ascii") + b"\x00" * 256 + b"\xff\xd9"
def _metadata_for(
blob: bytes,
*,
lat: float = 49.94,
lon: float = 36.31,
capture_timestamp: datetime | None = None,
) -> TileMetadata:
return TileMetadata(
tile_id=TileId(zoom_level=18, lat=lat, lon=lon),
tile_size_meters=256.0,
tile_size_pixels=256,
capture_timestamp=capture_timestamp or datetime(2026, 5, 12, tzinfo=timezone.utc),
source=TileSource.GOOGLEMAPS,
content_sha256_hex=hashlib.sha256(blob).hexdigest(),
freshness_label=FreshnessLabel.FRESH,
flight_id=None,
companion_id=None,
quality_metadata=None,
voting_status=VotingStatus.TRUSTED,
)
@_docker
def test_ac6_decorator_evicts_then_writes(
real_store: PostgresFilesystemStore, fake_sink: FakeFdrSink
) -> None:
# Arrange — fill cache with one larger tile, then construct an
# enforcer with a budget tight enough that the next write triggers
# eviction. Picking lat/lon coordinates with distinct tile cells.
seed_blob = _make_tile_blob("seed-tile")
seed_md = _metadata_for(seed_blob, lat=49.94, lon=36.31)
real_store.write_tile(seed_blob, seed_md)
seed_disk_bytes = real_store.total_disk_bytes()
# Budget = current disk bytes + 64 B (just barely room for nothing else).
enforcer = _build_enforcer(real_store, fake_sink, budget_bytes=seed_disk_bytes + 64)
wrapper = BudgetEnforcedTileStore(wrapped=real_store, enforcer=enforcer)
# Wait a tick + bump LRU on the seed so eviction picks it (this is
# the only candidate anyway, but exercising the read path proves the
# AC-8 LRU update fires).
handle = real_store.read_tile_pixels(seed_md.tile_id)
with handle:
pass
new_blob = _make_tile_blob("post-eviction-tile")
new_md = _metadata_for(new_blob, lat=50.0, lon=37.0)
# Act
wrapper.write_tile(new_blob, new_md)
# Assert
# Seed tile evicted, new tile present.
assert real_store.tile_exists(seed_md.tile_id) is False
assert real_store.tile_exists(new_md.tile_id) is True
# FDR batch emitted with trigger_tile_id pointing at the new tile.
eviction = [r for r in fake_sink.records if r.kind == "c6.eviction_batch"]
assert len(eviction) == 1
assert eviction[0].payload["trigger_tile_id"] == str(new_md.tile_id)
@_docker
def test_ac8_read_tile_pixels_updates_lru_clock(
future_clock_store: PostgresFilesystemStore,
) -> None:
# Arrange — both tiles get DEFAULT now() at INSERT. Then read A; the
# far-future fake clock guarantees A.accessed_at is bumped above
# B.accessed_at regardless of host/container clock skew.
blob_a = _make_tile_blob("ac8-a")
md_a = _metadata_for(blob_a, lat=49.94, lon=36.31)
blob_b = _make_tile_blob("ac8-b")
md_b = _metadata_for(blob_b, lat=50.0, lon=37.0)
future_clock_store.write_tile(blob_a, md_a)
future_clock_store.write_tile(blob_b, md_b)
handle = future_clock_store.read_tile_pixels(md_a.tile_id)
with handle:
pass
# Act
candidates = future_clock_store.lru_candidates(max_count=2)
# Assert — after the LRU-clock-driven read, A is now most-recently
# accessed and B is the oldest candidate.
assert candidates[0].metadata.tile_id == md_b.tile_id
assert candidates[1].metadata.tile_id == md_a.tile_id
@_docker
def test_ac10_synthetic_fill_keeps_disk_under_cap(
real_store: PostgresFilesystemStore, fake_sink: FakeFdrSink
) -> None:
# Arrange — pick a small synthetic cap so the fill is fast.
# 5 tiles of ~256 B each → 1.5 KB used. Cap at 1 KB → forces eviction.
seed_blobs: list[bytes] = []
seed_mds: list[TileMetadata] = []
for i in range(5):
b = _make_tile_blob(f"ac10-fill-{i}")
seed_blobs.append(b)
seed_mds.append(_metadata_for(b, lat=49.94 + i * 0.001, lon=36.31))
real_store.write_tile(b, seed_mds[-1])
current = real_store.total_disk_bytes()
cap = current # budget == used → next write evicts oldest tile.
enforcer = _build_enforcer(real_store, fake_sink, budget_bytes=cap)
wrapper = BudgetEnforcedTileStore(wrapped=real_store, enforcer=enforcer)
# Act — insert 5 more tiles; every write should keep disk <= cap.
fake_sink.records.clear()
for i in range(5, 10):
b = _make_tile_blob(f"ac10-overflow-{i}")
md = _metadata_for(b, lat=50.0 + i * 0.001, lon=37.0)
wrapper.write_tile(b, md)
assert real_store.total_disk_bytes() <= cap, (
f"iteration {i}: disk={real_store.total_disk_bytes()} cap={cap}"
)
# Assert — at least one eviction FDR record was emitted.
eviction_records = [r for r in fake_sink.records if r.kind == "c6.eviction_batch"]
assert eviction_records, "expected at least one c6.eviction_batch record"
@@ -56,13 +56,9 @@ from gps_denied_onboard.runtime_root.storage_factory import (
build_tile_store,
)
_CONTRACT_DIR = Path(__file__).resolve().parents[3] / (
"_docs/02_document/contracts/c6_tile_cache"
)
_CONTRACT_DIR = Path(__file__).resolve().parents[3] / ("_docs/02_document/contracts/c6_tile_cache")
_FAKE_IMPL_MODULE = "gps_denied_onboard.components.c6_tile_cache.faiss_descriptor_index"
_FAKE_STORE_MODULE = (
"gps_denied_onboard.components.c6_tile_cache.postgres_filesystem_store"
)
_FAKE_STORE_MODULE = "gps_denied_onboard.components.c6_tile_cache.postgres_filesystem_store"
def _valid_tile_id(zoom: int = 18, lat: float = 49.94, lon: float = 36.31) -> TileId:
@@ -320,6 +316,15 @@ def _install_fake_postgres_store_module() -> type:
# preserves the single-config-arg shape via this classmethod.
return cls(config)
# AZ-308: ``build_tile_store`` now wraps the store in a
# ``BudgetEnforcedTileStore`` whose constructor reads
# ``total_disk_bytes`` for the AC-12 startup log. Override the
# ``_FullTileMetadataStore`` NotImplementedError stub with a
# working zero-byte response so the factory can construct the
# wrapper without touching a real DB.
def total_disk_bytes(self) -> int:
return 0
fake_module = types.ModuleType(_FAKE_STORE_MODULE)
fake_module.PostgresFilesystemStore = _FakePostgresFilesystemStore # type: ignore[attr-defined]
sys.modules[_FAKE_STORE_MODULE] = fake_module
@@ -349,11 +354,21 @@ def test_ac5_build_descriptor_index_flag_off_raises_no_import(
def test_ac4_build_tile_store_returns_protocol_impl(store_module_cleanup) -> None:
# AZ-308: ``build_tile_store`` now returns a ``BudgetEnforcedTileStore``
# decorator wrapping the inner :class:`TileStore` impl. The decorator
# implements the Protocol surface; the wrapped instance is reachable
# via the private ``_wrapped`` attribute for tests that need to
# introspect the inner store.
from gps_denied_onboard.components.c6_tile_cache.cache_budget_enforcer import (
BudgetEnforcedTileStore,
)
fake_cls = _install_fake_postgres_store_module()
config = _config_with_c6()
store = build_tile_store(config)
assert isinstance(store, fake_cls)
assert isinstance(store, BudgetEnforcedTileStore)
assert isinstance(store, TileStore)
assert isinstance(store._wrapped, fake_cls) # type: ignore[attr-defined]
def test_ac4_build_tile_metadata_store_returns_protocol_impl(
@@ -366,9 +381,7 @@ def test_ac4_build_tile_metadata_store_returns_protocol_impl(
assert isinstance(md, TileMetadataStore)
def test_ac5_tile_store_runtime_module_missing_raises(
store_module_cleanup, monkeypatch
) -> None:
def test_ac5_tile_store_runtime_module_missing_raises(store_module_cleanup, monkeypatch) -> None:
"""AC-5 historical name; after AZ-305 the impl module always exists, so
"missing" is exercised by deleting it from ``sys.modules`` AND making
``importlib`` refuse the import. We patch the module-level lazy import
@@ -378,14 +391,18 @@ def test_ac5_tile_store_runtime_module_missing_raises(
config = _config_with_c6()
import gps_denied_onboard.runtime_root.storage_factory as factory_mod
real_import = __builtins__["__import__"] if isinstance(__builtins__, dict) else __builtins__.__import__
real_import = (
__builtins__["__import__"] if isinstance(__builtins__, dict) else __builtins__.__import__
)
def _block_postgres_import(name, *args, **kwargs):
if name.endswith("postgres_filesystem_store"):
raise ModuleNotFoundError(name)
return real_import(name, *args, **kwargs)
monkeypatch.setattr(factory_mod, "__builtins__", {"__import__": _block_postgres_import}, raising=False)
monkeypatch.setattr(
factory_mod, "__builtins__", {"__import__": _block_postgres_import}, raising=False
)
monkeypatch.setitem(sys.modules, _FAKE_STORE_MODULE, None) # type: ignore[arg-type]
with pytest.raises(RuntimeNotAvailableError) as exc_info:
build_tile_store(config)
@@ -428,9 +445,7 @@ def test_ac6_unknown_metadata_runtime_rejected() -> None:
({"zoom_level": 18, "lat": 0.0, "lon": -200.0}, "lon"),
],
)
def test_ac7_tile_id_rejects_bad_input(
kwargs: dict[str, float], offending_field: str
) -> None:
def test_ac7_tile_id_rejects_bad_input(kwargs: dict[str, float], offending_field: str) -> None:
with pytest.raises(ValueError) as exc_info:
TileId(**kwargs) # type: ignore[arg-type]
assert offending_field in str(exc_info.value)
@@ -504,9 +519,7 @@ def _methods_from_contract(contract_file: Path) -> set[str]:
def _protocol_methods(proto: type) -> set[str]:
"""Reflect over a Protocol's method names."""
return {
name
for name in dir(proto)
if not name.startswith("_") and callable(getattr(proto, name))
name for name in dir(proto) if not name.startswith("_") and callable(getattr(proto, name))
}
@@ -518,9 +531,7 @@ def _protocol_methods(proto: type) -> set[str]:
("descriptor_index.md", DescriptorIndex),
],
)
def test_ac9_contract_methods_match_protocol(
contract_filename: str, proto: type
) -> None:
def test_ac9_contract_methods_match_protocol(contract_filename: str, proto: type) -> None:
contract_path = _CONTRACT_DIR / contract_filename
contract_methods = _methods_from_contract(contract_path)
protocol_methods = _protocol_methods(proto)