Files
Oleksandr Bezdieniezhnykh dde838d2cc [AZ-304] C6 Postgres schema: additive 0002 migration + UUIDv5
Strictly additive Alembic migration on the AZ-263 baseline (data_model
.md § 6.1 / § 6.3): six new tiles columns (tile_uuid UNIQUE,
location_hash, content_sha256, disk_bytes, accessed_at, uploaded_at),
four new btree indices, one UNIQUE expression index over the
COALESCE-zero-uuid natural key, CHECK widening of
ck_tiles_freshness_status to the AZ-263 + AZ-303 vocabulary UNION,
four NULLable bbox columns on sector_classifications, and a new
tile_freshness_rules table seeded with the two default thresholds.

Pinned UUIDv5 namespace (TILE_NAMESPACE_UUID =
5b8d0c2e-1a4f-4b3a-8c9d-e7f6a3b2c1d0) + derive_tile_id /
derive_location_hash helpers cross-coordinated with
satellite-provider. Migration runner apply_migrations(config) drives
Alembic command.upgrade("head") against the AZ-263 env with one
retry on PG SQLSTATE 40001 and structured INFO logs on apply / no-op.

Contract bump tile_metadata_store.md v1.1.0 -> v1.2.0 adds
TileMetadata.location_hash: UUID | None = None (non-breaking).
module-layout.md updated so c6_tile_cache explicitly Owns
db/migrations/**.

Tier-1 tests: UUIDv5 determinism + locked vectors + DSN resolution +
retry mocked DBAPIError -> 1180 passed, 32 skipped. Tier-2 docker
schema tests gated by @pytest.mark.docker run against the existing
docker-compose.test.yml db service.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 17:05:41 +03:00

679 lines
21 KiB
Python

"""AZ-304 — Schema-shape diff + per-AC integration tests against a real Postgres.
All tests in this module are ``@pytest.mark.docker`` (via the
module-level ``pytestmark``); they are auto-skipped on Tier-1 by
``tests/conftest.py`` so the project-wide unit suite stays hermetic. To
run locally: ``docker compose -f docker-compose.test.yml up -d db && \
GPS_DENIED_TIER=2 DB_URL=postgresql://gps_denied:dev@localhost:5432/gps_denied \
pytest tests/unit/c6_tile_cache/test_postgres_schema.py``.
"""
from __future__ import annotations
import dataclasses
import logging
import os
import time
from collections.abc import Iterator
from pathlib import Path
from uuid import UUID, uuid4
import psycopg
import pytest
from alembic import command
from alembic.config import Config as AlembicConfig
from gps_denied_onboard.components.c6_tile_cache._uuid_namespace import (
TILE_NAMESPACE_UUID,
derive_location_hash,
derive_tile_id,
)
from gps_denied_onboard.components.c6_tile_cache.config import C6TileCacheConfig
from gps_denied_onboard.components.c6_tile_cache.migrations import (
MigrationResult,
apply_migrations,
)
from gps_denied_onboard.config.schema import Config
pytestmark = pytest.mark.docker
_PROJECT_ROOT = Path(__file__).resolve().parents[3]
_ALEMBIC_INI = _PROJECT_ROOT / "alembic.ini"
_ALEMBIC_SCRIPT_LOCATION = _PROJECT_ROOT / "db" / "migrations"
_AZ263_REV = "0001_initial"
_AZ304_REV = "0002_c6_tile_identity_and_lru"
_FRESH_BAG_TABLES = (
"tile_freshness_rules",
"engine_cache_entries",
"manifests",
"tiles",
"sector_classifications",
"flights",
"alembic_version",
)
_AZ263_TILE_COLUMNS = {
"id",
"zoom_level",
"tile_x",
"tile_y",
"latitude",
"longitude",
"tile_size_meters",
"tile_size_pixels",
"capture_timestamp",
"compression",
"crs",
"source",
"flight_id",
"companion_id",
"tile_quality_metadata",
"voting_status",
"freshness_status",
"signature",
"created_at",
"updated_at",
}
_AZ304_TILE_COLUMNS = {
"tile_uuid",
"location_hash",
"content_sha256",
"disk_bytes",
"accessed_at",
"uploaded_at",
}
_AZ304_TILE_INDICES = {
"idx_tiles_natural_key",
"idx_tiles_location_hash",
"idx_tiles_accessed_at",
"idx_tiles_pending_upload",
"idx_tiles_flight_captured",
}
_AZ263_TILE_INDICES = {
"ix_tiles_zxy",
"ix_tiles_lat_lon",
"ix_tiles_voting_status_onboard",
"ix_tiles_flight_id",
"ix_tiles_created_at",
}
def _to_sqlalchemy_url(raw_dsn: str) -> str:
if raw_dsn.startswith("postgresql://"):
return raw_dsn.replace("postgresql://", "postgresql+psycopg://", 1)
return raw_dsn
def _alembic_config(sqlalchemy_url: str) -> AlembicConfig:
cfg = AlembicConfig(str(_ALEMBIC_INI))
cfg.set_main_option("script_location", str(_ALEMBIC_SCRIPT_LOCATION))
cfg.set_main_option("sqlalchemy.url", sqlalchemy_url)
return cfg
def _build_config(dsn: str) -> Config:
block = C6TileCacheConfig(postgres_dsn=dsn)
return Config.with_blocks(c6_tile_cache=block)
def _exec(conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None) -> None:
with conn.cursor() as cur:
cur.execute(sql, params or ())
def _fetchone(
conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None
) -> tuple[object, ...] | None:
with conn.cursor() as cur:
cur.execute(sql, params or ())
return cur.fetchone()
def _fetchall(
conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None
) -> list[tuple[object, ...]]:
with conn.cursor() as cur:
cur.execute(sql, params or ())
return cur.fetchall()
def _column_metadata(conn: psycopg.Connection, table: str) -> dict[str, tuple[object, ...]]:
rows = _fetchall(
conn,
"""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(table,),
)
return {str(row[0]): (row[1], row[2], row[3]) for row in rows}
def _index_names(conn: psycopg.Connection, table: str) -> set[str]:
rows = _fetchall(
conn,
"""
SELECT indexname FROM pg_indexes
WHERE schemaname = 'public' AND tablename = %s
""",
(table,),
)
return {str(row[0]) for row in rows}
def _check_constraint_source(conn: psycopg.Connection, name: str) -> str | None:
row = _fetchone(
conn,
"""
SELECT pg_get_constraintdef(c.oid)
FROM pg_constraint c
WHERE c.contype = 'c' AND c.conname = %s
""",
(name,),
)
if row is None:
return None
value = row[0]
return None if value is None else str(value)
@pytest.fixture
def db_url() -> str:
url = os.environ.get("DB_URL")
if not url:
pytest.skip("DB_URL not set — start docker-compose.test.yml `db` service first")
return url
@pytest.fixture
def fresh_db(db_url: str) -> Iterator[str]:
"""Drop all c6 tables + alembic_version; yield the same DSN."""
with psycopg.connect(db_url, autocommit=True) as conn:
tables = ", ".join(_FRESH_BAG_TABLES)
_exec(conn, f"DROP TABLE IF EXISTS {tables} CASCADE")
yield db_url
# Leave the DB dirty after each test; the next test's fresh_db drops again.
@pytest.fixture
def baselined_db(fresh_db: str) -> str:
"""Apply only 0001_initial; return DB at AZ-263 head."""
sa_url = _to_sqlalchemy_url(fresh_db)
cfg = _alembic_config(sa_url)
command.upgrade(cfg, _AZ263_REV)
return fresh_db
# ----------------------------------------------------------------------
# AC-1 / AC-9: apply on AZ-263-baselined DB; AZ-263 columns unchanged.
def test_ac1_apply_creates_additive_artifacts(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
# Act
result = apply_migrations(config)
# Assert: runner result
assert result.applied == [_AZ304_REV]
assert result.current_revision == _AZ304_REV
assert result.no_op is False
# Assert: schema artifacts
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col in tiles_cols, f"tiles.{col} missing post-0002"
for col in _AZ263_TILE_COLUMNS:
assert col in tiles_cols, f"AZ-263 tiles.{col} dropped by 0002 — regression"
tiles_idx = _index_names(conn, "tiles")
for idx in _AZ263_TILE_INDICES | _AZ304_TILE_INDICES:
assert idx in tiles_idx, f"tiles index {idx!r} missing"
sector_cols = _column_metadata(conn, "sector_classifications")
for col in ("min_lat", "min_lon", "max_lat", "max_lon"):
assert col in sector_cols, f"sector_classifications.{col} missing"
rules_rows = _fetchall(
conn, "SELECT classification, max_age_seconds, action FROM tile_freshness_rules"
)
assert len(rules_rows) == 2
def test_ac9_az263_columns_byte_identical(baselined_db: str) -> None:
"""AZ-263 column metadata is byte-identical pre- and post-0002."""
# Arrange: snapshot pre-migration column metadata.
with psycopg.connect(baselined_db) as conn:
before = _column_metadata(conn, "tiles")
before_flights = _column_metadata(conn, "flights")
before_sector = _column_metadata(conn, "sector_classifications")
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
after = _column_metadata(conn, "tiles")
after_flights = _column_metadata(conn, "flights")
after_sector = _column_metadata(conn, "sector_classifications")
for col, meta in before.items():
assert after.get(col) == meta, f"tiles.{col} drifted: {meta} -> {after.get(col)}"
assert after_flights == before_flights, "flights columns drifted"
# sector_classifications has new NULLable columns; assert the AZ-263 ones survived.
for col, meta in before_sector.items():
assert after_sector.get(col) == meta, f"sector_classifications.{col} drifted"
# ----------------------------------------------------------------------
# AC-2: no-op at head.
def test_ac2_apply_is_noop_at_head(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
apply_migrations(config)
# Act
second = apply_migrations(config)
# Assert
assert second.applied == []
assert second.no_op is True
assert second.current_revision == _AZ304_REV
# ----------------------------------------------------------------------
# AC-3: widened freshness_status CHECK + new CHECKs exist.
def test_ac3_freshness_check_widened(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
defn = _check_constraint_source(conn, "ck_tiles_freshness_status")
assert defn is not None
for value in (
"'fresh'",
"'stale_warn'",
"'stale_reject'",
"'stale_active_conflict'",
"'stale_rear'",
"'downgraded'",
):
assert value in defn, f"widened CHECK missing {value}; got: {defn}"
def test_ac3_new_check_constraints_present(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
sha_defn = _check_constraint_source(conn, "ck_tiles_content_sha256_len")
bytes_defn = _check_constraint_source(conn, "ck_tiles_disk_bytes_nonneg")
action_defn = _check_constraint_source(conn, "ck_tfr_action")
assert sha_defn is not None and "length(content_sha256) = 64" in sha_defn
assert bytes_defn is not None and "disk_bytes >= 0" in bytes_defn
assert action_defn is not None and "reject" in action_defn and "downgrade" in action_defn
# ----------------------------------------------------------------------
# AC-4 + AC-4b: natural-key UNIQUE allows per-flight separation, rejects duplicates.
def _insert_tile(
conn: psycopg.Connection,
*,
zoom_level: int,
tile_x: int,
tile_y: int,
source: str,
flight_id: UUID | None,
content_sha256: str,
flight_table_id: UUID | None = None,
) -> None:
"""Direct INSERT used by AC-4/AC-4b tests (no PostgresFilesystemStore yet)."""
tile_uuid = derive_tile_id(zoom_level, tile_x, tile_y, source, flight_id)
location_hash = derive_location_hash(zoom_level, tile_x, tile_y)
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
flight_id,
tile_uuid, location_hash, content_sha256, disk_bytes
) VALUES (
%s, %s, %s, 0.0, 0.0,
256.0, 256, now(), %s,
%s,
%s, %s, %s, 1024
)
""",
(
zoom_level,
tile_x,
tile_y,
source,
flight_table_id if flight_table_id is not None else flight_id,
tile_uuid,
location_hash,
content_sha256,
),
)
def test_ac4_natural_key_allows_different_flights_same_cell(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
flight_a, flight_b = uuid4(), uuid4()
# Act
with psycopg.connect(baselined_db, autocommit=True) as conn:
_exec(
conn,
"INSERT INTO flights (id, companion_id, started_at) VALUES (%s, 'comp', now()), (%s, 'comp', now())",
(flight_a, flight_b),
)
_insert_tile(
conn,
zoom_level=18,
tile_x=10,
tile_y=20,
source="onboard_ingest",
flight_id=flight_a,
content_sha256="a" * 64,
)
_insert_tile(
conn,
zoom_level=18,
tile_x=10,
tile_y=20,
source="onboard_ingest",
flight_id=flight_b,
content_sha256="b" * 64,
)
# Assert
rows = _fetchall(
conn,
"SELECT tile_uuid, location_hash FROM tiles WHERE tile_x=10 AND tile_y=20",
)
assert len(rows) == 2
tile_uuids = {row[0] for row in rows}
location_hashes = {row[1] for row in rows}
assert len(tile_uuids) == 2, "per-flight tile_uuid collision"
assert len(location_hashes) == 1, "location_hash should match across flights"
def test_ac4b_natural_key_rejects_duplicate_flight_insert(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act + Assert
with psycopg.connect(baselined_db, autocommit=True) as conn:
_insert_tile(
conn,
zoom_level=18,
tile_x=30,
tile_y=40,
source="googlemaps",
flight_id=None,
content_sha256="c" * 64,
)
with pytest.raises(psycopg.errors.UniqueViolation):
# Same natural key (both flight_id=NULL → both coalesce to zero UUID).
# Use a different content_sha256 so the rejection comes from the
# natural-key index, not a coincidental UNIQUE elsewhere.
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes
) VALUES (
18, 30, 40, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024
)
""",
(uuid4(), uuid4(), "d" * 64),
)
# ----------------------------------------------------------------------
# AC-5: widened CHECK accepts all six values; rejects bogus.
@pytest.mark.parametrize(
"freshness_value",
[
"fresh",
"stale_warn",
"stale_reject",
"stale_active_conflict",
"stale_rear",
"downgraded",
],
)
def test_ac5_widened_check_accepts_union_values(baselined_db: str, freshness_value: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act
with psycopg.connect(baselined_db, autocommit=True) as conn:
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes,
freshness_status
) VALUES (
18, 1, 1, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024,
%s
)
""",
(uuid4(), uuid4(), "e" * 64, freshness_value),
)
# Assert
rows = _fetchall(
conn,
"SELECT freshness_status FROM tiles WHERE freshness_status = %s",
(freshness_value,),
)
assert any(row[0] == freshness_value for row in rows)
def test_ac5_widened_check_rejects_bogus(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act + Assert
with psycopg.connect(baselined_db, autocommit=True) as conn:
with pytest.raises(psycopg.errors.CheckViolation):
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes,
freshness_status
) VALUES (
18, 2, 2, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024,
'bogus'
)
""",
(uuid4(), uuid4(), "f" * 64),
)
# ----------------------------------------------------------------------
# AC-6: down migration reverses cleanly; subsequent upgrade re-applies.
def test_ac6_downgrade_reverses_cleanly(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
sa_url = _to_sqlalchemy_url(baselined_db)
cfg = _alembic_config(sa_url)
# Act: downgrade one revision
command.downgrade(cfg, "-1")
# Assert: AZ-304 artifacts gone, AZ-263 baseline intact.
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col not in tiles_cols, f"tiles.{col} should be dropped after downgrade"
for col in _AZ263_TILE_COLUMNS:
assert col in tiles_cols, f"AZ-263 tiles.{col} dropped by downgrade — regression"
tile_table = _fetchone(
conn,
"SELECT to_regclass('public.tile_freshness_rules')",
)
assert tile_table is not None and tile_table[0] is None
defn = _check_constraint_source(conn, "ck_tiles_freshness_status")
assert defn is not None
# AZ-263 vocabulary only.
assert "'stale_active_conflict'" not in defn
assert "'stale_rear'" not in defn
assert "'downgraded'" not in defn
# Act: re-upgrade
command.upgrade(cfg, "head")
# Assert: clean re-apply
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col in tiles_cols
# ----------------------------------------------------------------------
# AC-7: seed rows present with documented values.
def test_ac7_freshness_rules_seeded(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
rows = _fetchall(
conn,
"SELECT classification, max_age_seconds, action FROM tile_freshness_rules ORDER BY classification",
)
assert rows == [
("active_conflict", 15552000, "reject"),
("stable_rear", 31104000, "downgrade"),
]
# ----------------------------------------------------------------------
# AC-8: log INFO records carry kind / namespace_uuid.
def test_ac8_apply_logs_kind_applied(baselined_db: str, caplog: pytest.LogCaptureFixture) -> None:
# Arrange
caplog.set_level(logging.INFO, logger="c6_tile_cache.migrations")
# Act
apply_migrations(_build_config(baselined_db))
# Assert
applied_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c6.migration.applied"
]
assert len(applied_records) == 1
kv = getattr(applied_records[0], "kv", {})
assert kv.get("revisions") == [_AZ304_REV]
assert kv.get("namespace_uuid") == str(TILE_NAMESPACE_UUID)
def test_ac8_noop_logs_kind_no_op(baselined_db: str, caplog: pytest.LogCaptureFixture) -> None:
# Arrange
apply_migrations(_build_config(baselined_db)) # first apply
caplog.clear()
caplog.set_level(logging.INFO, logger="c6_tile_cache.migrations")
# Act
apply_migrations(_build_config(baselined_db)) # second = no-op
# Assert
noop_records = [r for r in caplog.records if getattr(r, "kind", None) == "c6.migration.no_op"]
assert len(noop_records) == 1
kv = getattr(noop_records[0], "kv", {})
assert kv.get("current_revision") == _AZ304_REV
assert kv.get("namespace_uuid") == str(TILE_NAMESPACE_UUID)
# ----------------------------------------------------------------------
# NFR-perf-apply / NFR-perf-noop: timing budgets.
def test_nfr_perf_apply_under_5s(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
# Act
t0 = time.perf_counter()
apply_migrations(config)
elapsed = time.perf_counter() - t0
# Assert
assert elapsed < 5.0, f"apply took {elapsed:.3f}s (>5s budget)"
def test_nfr_perf_noop_under_100ms(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
apply_migrations(config)
# Act
t0 = time.perf_counter()
apply_migrations(config)
elapsed = time.perf_counter() - t0
# Assert
assert elapsed < 0.100, f"no-op took {elapsed * 1000:.1f}ms (>100ms budget)"
# ----------------------------------------------------------------------
# Smoke: MigrationResult is frozen.
def test_migration_result_is_frozen() -> None:
# Arrange
result = MigrationResult(applied=["x"], current_revision="x", no_op=False)
# Act + Assert
with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)):
result.no_op = True # type: ignore[misc]
# AC-12 (`TileMetadata.location_hash` default = None) is covered in the
# AZ-303 protocol-conformance suite (`test_protocol_conformance.py`); no
# Postgres needed.