[AZ-304] C6 Postgres schema: additive 0002 migration + UUIDv5

Strictly additive Alembic migration on the AZ-263 baseline (data_model
.md § 6.1 / § 6.3): six new tiles columns (tile_uuid UNIQUE,
location_hash, content_sha256, disk_bytes, accessed_at, uploaded_at),
four new btree indices, one UNIQUE expression index over the
COALESCE-zero-uuid natural key, CHECK widening of
ck_tiles_freshness_status to the AZ-263 + AZ-303 vocabulary UNION,
four NULLable bbox columns on sector_classifications, and a new
tile_freshness_rules table seeded with the two default thresholds.

Pinned UUIDv5 namespace (TILE_NAMESPACE_UUID =
5b8d0c2e-1a4f-4b3a-8c9d-e7f6a3b2c1d0) + derive_tile_id /
derive_location_hash helpers cross-coordinated with
satellite-provider. Migration runner apply_migrations(config) drives
Alembic command.upgrade("head") against the AZ-263 env with one
retry on PG SQLSTATE 40001 and structured INFO logs on apply / no-op.

Contract bump tile_metadata_store.md v1.1.0 -> v1.2.0 adds
TileMetadata.location_hash: UUID | None = None (non-breaking).
module-layout.md updated so c6_tile_cache explicitly Owns
db/migrations/**.

Tier-1 tests: UUIDv5 determinism + locked vectors + DSN resolution +
retry mocked DBAPIError -> 1180 passed, 32 skipped. Tier-2 docker
schema tests gated by @pytest.mark.docker run against the existing
docker-compose.test.yml db service.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 17:05:41 +03:00
parent 21f5a30d09
commit dde838d2cc
15 changed files with 2084 additions and 25 deletions
@@ -0,0 +1,156 @@
-- AZ-304 — Expected post-0002 c6_tile_cache schema (Postgres 16).
--
-- Human-readable canonical DDL for the schema state produced by applying
-- 0001_initial (AZ-263) + 0002_c6_tile_identity_and_lru (AZ-304). The
-- companion test `test_postgres_schema.py` introspects information_schema /
-- pg_indexes / pg_constraint / tile_freshness_rules row contents and
-- asserts each artifact below exists with the documented shape.
--
-- Updating this file without a corresponding migration revision is a
-- Spec-Gap finding (High) at code review.
-- =====================================================================
-- 0001_initial (AZ-263) — unchanged baseline
-- =====================================================================
CREATE TABLE flights (
id UUID PRIMARY KEY,
companion_id TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
landed_at TIMESTAMPTZ,
metadata JSONB
);
CREATE TABLE sector_classifications (
id BIGSERIAL PRIMARY KEY,
sector_id TEXT NOT NULL UNIQUE,
classification TEXT NOT NULL,
freshness_threshold_days INTEGER NOT NULL,
-- 0002 additive bbox columns (operator-populated pre-flight; NULL pre-population):
min_lat DOUBLE PRECISION,
min_lon DOUBLE PRECISION,
max_lat DOUBLE PRECISION,
max_lon DOUBLE PRECISION
);
CREATE TABLE tiles (
id BIGSERIAL PRIMARY KEY,
-- Canonical columns mirrored from satellite-provider (AZ-263):
zoom_level INTEGER NOT NULL,
tile_x INTEGER NOT NULL,
tile_y INTEGER NOT NULL,
latitude DOUBLE PRECISION NOT NULL,
longitude DOUBLE PRECISION NOT NULL,
tile_size_meters DOUBLE PRECISION NOT NULL,
tile_size_pixels INTEGER NOT NULL,
capture_timestamp TIMESTAMPTZ NOT NULL,
compression TEXT NOT NULL DEFAULT 'jpeg',
crs TEXT NOT NULL DEFAULT 'EPSG:3857',
source TEXT NOT NULL,
-- Additive onboard-only columns (AZ-263):
flight_id UUID REFERENCES flights(id),
companion_id TEXT,
tile_quality_metadata JSONB,
voting_status TEXT,
freshness_status TEXT NOT NULL DEFAULT 'fresh',
signature BYTEA,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- 0002 additive identity / LRU / disk-budget columns:
tile_uuid UUID NOT NULL UNIQUE,
location_hash UUID NOT NULL,
content_sha256 TEXT NOT NULL,
disk_bytes BIGINT NOT NULL,
accessed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
uploaded_at TIMESTAMPTZ,
-- AZ-263 CHECKs (preserved):
CONSTRAINT ck_tiles_zoom CHECK (zoom_level BETWEEN 10 AND 22),
CONSTRAINT ck_tiles_meters CHECK (tile_size_meters > 0),
CONSTRAINT ck_tiles_pixels CHECK (tile_size_pixels > 0),
CONSTRAINT ck_tiles_source CHECK (source IN ('googlemaps','onboard_ingest')),
CONSTRAINT ck_tiles_voting_status CHECK (
voting_status IS NULL OR voting_status IN ('pending','trusted','rejected')
),
-- 0002 widened CHECK (UNION of AZ-263 + AZ-303 vocabularies):
CONSTRAINT ck_tiles_freshness_status CHECK (
freshness_status IN (
'fresh','stale_warn','stale_reject',
'stale_active_conflict','stale_rear','downgraded'
)
),
-- 0002 additive CHECKs:
CONSTRAINT ck_tiles_content_sha256_len CHECK (length(content_sha256) = 64),
CONSTRAINT ck_tiles_disk_bytes_nonneg CHECK (disk_bytes >= 0)
);
-- AZ-263 indices (preserved):
CREATE INDEX ix_tiles_zxy ON tiles (zoom_level, tile_x, tile_y);
CREATE INDEX ix_tiles_lat_lon ON tiles (latitude, longitude);
CREATE INDEX ix_tiles_voting_status_onboard
ON tiles (voting_status)
WHERE source = 'onboard_ingest';
CREATE INDEX ix_tiles_flight_id ON tiles (flight_id);
CREATE INDEX ix_tiles_created_at ON tiles (created_at);
-- 0002 additive indices:
CREATE UNIQUE INDEX idx_tiles_natural_key ON tiles (
zoom_level,
tile_x,
tile_y,
tile_size_meters,
source,
COALESCE(flight_id, '00000000-0000-0000-0000-000000000000'::uuid)
);
CREATE INDEX idx_tiles_location_hash ON tiles (location_hash);
CREATE INDEX idx_tiles_accessed_at ON tiles (accessed_at);
CREATE INDEX idx_tiles_pending_upload ON tiles (uploaded_at)
WHERE source = 'onboard_ingest' AND uploaded_at IS NULL;
CREATE INDEX idx_tiles_flight_captured ON tiles (flight_id, capture_timestamp)
WHERE flight_id IS NOT NULL;
-- Note: an automatic UNIQUE btree on `tiles.tile_uuid` is created by the
-- column-level UNIQUE; PG names it `tiles_tile_uuid_key` (system-generated).
-- The diff test does NOT assert that name; it asserts presence of a UNIQUE
-- index covering exactly `(tile_uuid)`.
CREATE TABLE manifests (
id BIGSERIAL PRIMARY KEY,
manifest_id TEXT NOT NULL UNIQUE,
flight_id UUID NOT NULL REFERENCES flights(id),
content_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload JSONB NOT NULL
);
CREATE TABLE engine_cache_entries (
id BIGSERIAL PRIMARY KEY,
engine_path TEXT NOT NULL,
sm_arch TEXT NOT NULL,
jetpack_version TEXT NOT NULL,
tensorrt_version TEXT NOT NULL,
precision TEXT NOT NULL,
content_hash TEXT NOT NULL UNIQUE,
int8_calibration_path TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- =====================================================================
-- 0002_c6_tile_identity_and_lru (AZ-304) — new table
-- =====================================================================
CREATE TABLE tile_freshness_rules (
classification TEXT PRIMARY KEY,
max_age_seconds BIGINT NOT NULL,
action TEXT NOT NULL,
set_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT ck_tfr_action CHECK (action IN ('reject','downgrade')),
CONSTRAINT ck_tfr_max_age_pos CHECK (max_age_seconds > 0)
);
-- Seed rows applied by the 0002 migration (AC-7):
INSERT INTO tile_freshness_rules (classification, max_age_seconds, action) VALUES
('active_conflict', 15552000, 'reject'), -- 6 months × 30 days × 86400 s
('stable_rear', 31104000, 'downgrade'); -- 12 months × 30 days × 86400 s
@@ -0,0 +1,217 @@
"""AZ-304 NFR-reliability-retry + unit tests for the c6_tile_cache migration runner.
These tests stub :mod:`gps_denied_onboard.components.c6_tile_cache.migrations`
internals so the runner's retry / DSN-resolution / error-mapping paths can
be exercised without a real Postgres. The integration suite
(``test_postgres_schema.py``, ``@pytest.mark.docker``) covers the happy
path end-to-end.
"""
from __future__ import annotations
from typing import Any
import pytest
from sqlalchemy.exc import DBAPIError
from gps_denied_onboard.components.c6_tile_cache import migrations as migrations_module
from gps_denied_onboard.components.c6_tile_cache.config import C6TileCacheConfig
from gps_denied_onboard.components.c6_tile_cache.migrations import (
MigrationError,
MigrationResult,
apply_migrations,
)
from gps_denied_onboard.config.schema import Config
def _config(dsn: str = "postgresql://user:pass@host:5432/db") -> Config:
return Config.with_blocks(c6_tile_cache=C6TileCacheConfig(postgres_dsn=dsn))
class _FakeSerializationFailure(Exception):
"""Lightweight stand-in for ``psycopg.errors.SerializationFailure``."""
sqlstate = "40001"
def _make_dbapi_error(orig: Exception) -> DBAPIError:
"""Construct a SQLAlchemy DBAPIError wrapping ``orig`` (matches runtime shape)."""
return DBAPIError(statement="", params=None, orig=orig)
# ----------------------------------------------------------------------
# DSN resolution.
def test_resolve_dsn_prefers_config_over_env(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("DB_URL", "postgresql://env-host/db")
cfg = _config("postgresql://cfg-host/db")
# Act
dsn = migrations_module._resolve_dsn(cfg)
# Assert
assert dsn == "postgresql://cfg-host/db"
def test_resolve_dsn_falls_back_to_env(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("DB_URL", "postgresql://env-host/db")
cfg = _config("")
# Act
dsn = migrations_module._resolve_dsn(cfg)
# Assert
assert dsn == "postgresql://env-host/db"
def test_resolve_dsn_raises_when_no_source(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.delenv("DB_URL", raising=False)
cfg = _config("")
# Act + Assert
with pytest.raises(MigrationError, match="no DSN available"):
migrations_module._resolve_dsn(cfg)
def test_to_sqlalchemy_url_rewrites_postgresql_prefix() -> None:
# Act + Assert
assert migrations_module._to_sqlalchemy_url("postgresql://h/db") == "postgresql+psycopg://h/db"
# Already-prefixed URLs pass through unchanged.
assert (
migrations_module._to_sqlalchemy_url("postgresql+psycopg://h/db")
== "postgresql+psycopg://h/db"
)
# ----------------------------------------------------------------------
# NFR-reliability-retry: one SerializationFailure → retry → succeed.
def test_nfr_reliability_retry_once_on_serialization_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange — stub all DB-touching internals so the retry path is the only
# variable. Both iterations see the DB BELOW head pre-upgrade; the first
# upgrade attempt raises SQLSTATE 40001, the second succeeds. Post-upgrade
# revision is read once after the successful attempt.
rev_sequence = iter([None, None, "0002_c6_tile_identity_and_lru"])
upgrade_calls: list[int] = []
def fake_upgrade(_cfg: Any, _target: str) -> None:
upgrade_calls.append(len(upgrade_calls))
if len(upgrade_calls) == 1:
raise _make_dbapi_error(_FakeSerializationFailure("conflict"))
def fake_current_revision(_url: str) -> str | None:
return next(rev_sequence)
monkeypatch.setattr(
"gps_denied_onboard.components.c6_tile_cache.migrations.command.upgrade",
fake_upgrade,
)
monkeypatch.setattr(migrations_module, "_current_revision", fake_current_revision)
monkeypatch.setattr(
migrations_module,
"_head_revision",
lambda _cfg: "0002_c6_tile_identity_and_lru",
)
monkeypatch.setattr(
migrations_module,
"_resolve_applied",
lambda _cfg, _pre, _post: ["0002_c6_tile_identity_and_lru"],
)
# Act
result = apply_migrations(_config())
# Assert
assert isinstance(result, MigrationResult)
assert result.applied == ["0002_c6_tile_identity_and_lru"]
assert result.no_op is False
assert len(upgrade_calls) == 2, "runner must retry on SQLSTATE 40001"
def test_nfr_reliability_terminal_after_two_serialization_failures(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
def fake_upgrade(_cfg: Any, _target: str) -> None:
raise _make_dbapi_error(_FakeSerializationFailure("persistent conflict"))
monkeypatch.setattr(
"gps_denied_onboard.components.c6_tile_cache.migrations.command.upgrade",
fake_upgrade,
)
monkeypatch.setattr(migrations_module, "_current_revision", lambda _url: None)
monkeypatch.setattr(
migrations_module,
"_head_revision",
lambda _cfg: "0002_c6_tile_identity_and_lru",
)
# Act + Assert
with pytest.raises(MigrationError, match="database error"):
apply_migrations(_config())
def test_nfr_reliability_does_not_retry_on_non_serialization_dbapi_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
upgrade_calls: list[int] = []
class _GenericDbApiOrig(Exception):
sqlstate = "42P01" # "undefined_table" — NOT a serialization conflict
def fake_upgrade(_cfg: Any, _target: str) -> None:
upgrade_calls.append(len(upgrade_calls))
raise _make_dbapi_error(_GenericDbApiOrig("missing relation"))
monkeypatch.setattr(
"gps_denied_onboard.components.c6_tile_cache.migrations.command.upgrade",
fake_upgrade,
)
monkeypatch.setattr(migrations_module, "_current_revision", lambda _url: None)
monkeypatch.setattr(
migrations_module,
"_head_revision",
lambda _cfg: "0002_c6_tile_identity_and_lru",
)
# Act + Assert
with pytest.raises(MigrationError):
apply_migrations(_config())
assert len(upgrade_calls) == 1, "non-40001 errors must fail-fast (no retry)"
def test_apply_migrations_logs_no_op_when_already_at_head(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
monkeypatch.setattr(
migrations_module, "_current_revision", lambda _url: "0002_c6_tile_identity_and_lru"
)
monkeypatch.setattr(
migrations_module, "_head_revision", lambda _cfg: "0002_c6_tile_identity_and_lru"
)
# Act
result = apply_migrations(_config())
# Assert
assert result == MigrationResult(
applied=[], current_revision="0002_c6_tile_identity_and_lru", no_op=True
)
def test_migration_error_is_not_a_tile_cache_error_member() -> None:
"""`MigrationError` deliberately lives outside the TileCacheError family."""
# Arrange
from gps_denied_onboard.components.c6_tile_cache.errors import TileCacheError
# Act + Assert
assert not issubclass(MigrationError, TileCacheError)
@@ -0,0 +1,678 @@
"""AZ-304 — Schema-shape diff + per-AC integration tests against a real Postgres.
All tests in this module are ``@pytest.mark.docker`` (via the
module-level ``pytestmark``); they are auto-skipped on Tier-1 by
``tests/conftest.py`` so the project-wide unit suite stays hermetic. To
run locally: ``docker compose -f docker-compose.test.yml up -d db && \
GPS_DENIED_TIER=2 DB_URL=postgresql://gps_denied:dev@localhost:5432/gps_denied \
pytest tests/unit/c6_tile_cache/test_postgres_schema.py``.
"""
from __future__ import annotations
import dataclasses
import logging
import os
import time
from collections.abc import Iterator
from pathlib import Path
from uuid import UUID, uuid4
import psycopg
import pytest
from alembic import command
from alembic.config import Config as AlembicConfig
from gps_denied_onboard.components.c6_tile_cache._uuid_namespace import (
TILE_NAMESPACE_UUID,
derive_location_hash,
derive_tile_id,
)
from gps_denied_onboard.components.c6_tile_cache.config import C6TileCacheConfig
from gps_denied_onboard.components.c6_tile_cache.migrations import (
MigrationResult,
apply_migrations,
)
from gps_denied_onboard.config.schema import Config
pytestmark = pytest.mark.docker
_PROJECT_ROOT = Path(__file__).resolve().parents[3]
_ALEMBIC_INI = _PROJECT_ROOT / "alembic.ini"
_ALEMBIC_SCRIPT_LOCATION = _PROJECT_ROOT / "db" / "migrations"
_AZ263_REV = "0001_initial"
_AZ304_REV = "0002_c6_tile_identity_and_lru"
_FRESH_BAG_TABLES = (
"tile_freshness_rules",
"engine_cache_entries",
"manifests",
"tiles",
"sector_classifications",
"flights",
"alembic_version",
)
_AZ263_TILE_COLUMNS = {
"id",
"zoom_level",
"tile_x",
"tile_y",
"latitude",
"longitude",
"tile_size_meters",
"tile_size_pixels",
"capture_timestamp",
"compression",
"crs",
"source",
"flight_id",
"companion_id",
"tile_quality_metadata",
"voting_status",
"freshness_status",
"signature",
"created_at",
"updated_at",
}
_AZ304_TILE_COLUMNS = {
"tile_uuid",
"location_hash",
"content_sha256",
"disk_bytes",
"accessed_at",
"uploaded_at",
}
_AZ304_TILE_INDICES = {
"idx_tiles_natural_key",
"idx_tiles_location_hash",
"idx_tiles_accessed_at",
"idx_tiles_pending_upload",
"idx_tiles_flight_captured",
}
_AZ263_TILE_INDICES = {
"ix_tiles_zxy",
"ix_tiles_lat_lon",
"ix_tiles_voting_status_onboard",
"ix_tiles_flight_id",
"ix_tiles_created_at",
}
def _to_sqlalchemy_url(raw_dsn: str) -> str:
if raw_dsn.startswith("postgresql://"):
return raw_dsn.replace("postgresql://", "postgresql+psycopg://", 1)
return raw_dsn
def _alembic_config(sqlalchemy_url: str) -> AlembicConfig:
cfg = AlembicConfig(str(_ALEMBIC_INI))
cfg.set_main_option("script_location", str(_ALEMBIC_SCRIPT_LOCATION))
cfg.set_main_option("sqlalchemy.url", sqlalchemy_url)
return cfg
def _build_config(dsn: str) -> Config:
block = C6TileCacheConfig(postgres_dsn=dsn)
return Config.with_blocks(c6_tile_cache=block)
def _exec(conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None) -> None:
with conn.cursor() as cur:
cur.execute(sql, params or ())
def _fetchone(
conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None
) -> tuple[object, ...] | None:
with conn.cursor() as cur:
cur.execute(sql, params or ())
return cur.fetchone()
def _fetchall(
conn: psycopg.Connection, sql: str, params: tuple[object, ...] | None = None
) -> list[tuple[object, ...]]:
with conn.cursor() as cur:
cur.execute(sql, params or ())
return cur.fetchall()
def _column_metadata(conn: psycopg.Connection, table: str) -> dict[str, tuple[object, ...]]:
rows = _fetchall(
conn,
"""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(table,),
)
return {str(row[0]): (row[1], row[2], row[3]) for row in rows}
def _index_names(conn: psycopg.Connection, table: str) -> set[str]:
rows = _fetchall(
conn,
"""
SELECT indexname FROM pg_indexes
WHERE schemaname = 'public' AND tablename = %s
""",
(table,),
)
return {str(row[0]) for row in rows}
def _check_constraint_source(conn: psycopg.Connection, name: str) -> str | None:
row = _fetchone(
conn,
"""
SELECT pg_get_constraintdef(c.oid)
FROM pg_constraint c
WHERE c.contype = 'c' AND c.conname = %s
""",
(name,),
)
if row is None:
return None
value = row[0]
return None if value is None else str(value)
@pytest.fixture
def db_url() -> str:
url = os.environ.get("DB_URL")
if not url:
pytest.skip("DB_URL not set — start docker-compose.test.yml `db` service first")
return url
@pytest.fixture
def fresh_db(db_url: str) -> Iterator[str]:
"""Drop all c6 tables + alembic_version; yield the same DSN."""
with psycopg.connect(db_url, autocommit=True) as conn:
tables = ", ".join(_FRESH_BAG_TABLES)
_exec(conn, f"DROP TABLE IF EXISTS {tables} CASCADE")
yield db_url
# Leave the DB dirty after each test; the next test's fresh_db drops again.
@pytest.fixture
def baselined_db(fresh_db: str) -> str:
"""Apply only 0001_initial; return DB at AZ-263 head."""
sa_url = _to_sqlalchemy_url(fresh_db)
cfg = _alembic_config(sa_url)
command.upgrade(cfg, _AZ263_REV)
return fresh_db
# ----------------------------------------------------------------------
# AC-1 / AC-9: apply on AZ-263-baselined DB; AZ-263 columns unchanged.
def test_ac1_apply_creates_additive_artifacts(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
# Act
result = apply_migrations(config)
# Assert: runner result
assert result.applied == [_AZ304_REV]
assert result.current_revision == _AZ304_REV
assert result.no_op is False
# Assert: schema artifacts
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col in tiles_cols, f"tiles.{col} missing post-0002"
for col in _AZ263_TILE_COLUMNS:
assert col in tiles_cols, f"AZ-263 tiles.{col} dropped by 0002 — regression"
tiles_idx = _index_names(conn, "tiles")
for idx in _AZ263_TILE_INDICES | _AZ304_TILE_INDICES:
assert idx in tiles_idx, f"tiles index {idx!r} missing"
sector_cols = _column_metadata(conn, "sector_classifications")
for col in ("min_lat", "min_lon", "max_lat", "max_lon"):
assert col in sector_cols, f"sector_classifications.{col} missing"
rules_rows = _fetchall(
conn, "SELECT classification, max_age_seconds, action FROM tile_freshness_rules"
)
assert len(rules_rows) == 2
def test_ac9_az263_columns_byte_identical(baselined_db: str) -> None:
"""AZ-263 column metadata is byte-identical pre- and post-0002."""
# Arrange: snapshot pre-migration column metadata.
with psycopg.connect(baselined_db) as conn:
before = _column_metadata(conn, "tiles")
before_flights = _column_metadata(conn, "flights")
before_sector = _column_metadata(conn, "sector_classifications")
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
after = _column_metadata(conn, "tiles")
after_flights = _column_metadata(conn, "flights")
after_sector = _column_metadata(conn, "sector_classifications")
for col, meta in before.items():
assert after.get(col) == meta, f"tiles.{col} drifted: {meta} -> {after.get(col)}"
assert after_flights == before_flights, "flights columns drifted"
# sector_classifications has new NULLable columns; assert the AZ-263 ones survived.
for col, meta in before_sector.items():
assert after_sector.get(col) == meta, f"sector_classifications.{col} drifted"
# ----------------------------------------------------------------------
# AC-2: no-op at head.
def test_ac2_apply_is_noop_at_head(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
apply_migrations(config)
# Act
second = apply_migrations(config)
# Assert
assert second.applied == []
assert second.no_op is True
assert second.current_revision == _AZ304_REV
# ----------------------------------------------------------------------
# AC-3: widened freshness_status CHECK + new CHECKs exist.
def test_ac3_freshness_check_widened(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
defn = _check_constraint_source(conn, "ck_tiles_freshness_status")
assert defn is not None
for value in (
"'fresh'",
"'stale_warn'",
"'stale_reject'",
"'stale_active_conflict'",
"'stale_rear'",
"'downgraded'",
):
assert value in defn, f"widened CHECK missing {value}; got: {defn}"
def test_ac3_new_check_constraints_present(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
sha_defn = _check_constraint_source(conn, "ck_tiles_content_sha256_len")
bytes_defn = _check_constraint_source(conn, "ck_tiles_disk_bytes_nonneg")
action_defn = _check_constraint_source(conn, "ck_tfr_action")
assert sha_defn is not None and "length(content_sha256) = 64" in sha_defn
assert bytes_defn is not None and "disk_bytes >= 0" in bytes_defn
assert action_defn is not None and "reject" in action_defn and "downgrade" in action_defn
# ----------------------------------------------------------------------
# AC-4 + AC-4b: natural-key UNIQUE allows per-flight separation, rejects duplicates.
def _insert_tile(
conn: psycopg.Connection,
*,
zoom_level: int,
tile_x: int,
tile_y: int,
source: str,
flight_id: UUID | None,
content_sha256: str,
flight_table_id: UUID | None = None,
) -> None:
"""Direct INSERT used by AC-4/AC-4b tests (no PostgresFilesystemStore yet)."""
tile_uuid = derive_tile_id(zoom_level, tile_x, tile_y, source, flight_id)
location_hash = derive_location_hash(zoom_level, tile_x, tile_y)
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
flight_id,
tile_uuid, location_hash, content_sha256, disk_bytes
) VALUES (
%s, %s, %s, 0.0, 0.0,
256.0, 256, now(), %s,
%s,
%s, %s, %s, 1024
)
""",
(
zoom_level,
tile_x,
tile_y,
source,
flight_table_id if flight_table_id is not None else flight_id,
tile_uuid,
location_hash,
content_sha256,
),
)
def test_ac4_natural_key_allows_different_flights_same_cell(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
flight_a, flight_b = uuid4(), uuid4()
# Act
with psycopg.connect(baselined_db, autocommit=True) as conn:
_exec(
conn,
"INSERT INTO flights (id, companion_id, started_at) VALUES (%s, 'comp', now()), (%s, 'comp', now())",
(flight_a, flight_b),
)
_insert_tile(
conn,
zoom_level=18,
tile_x=10,
tile_y=20,
source="onboard_ingest",
flight_id=flight_a,
content_sha256="a" * 64,
)
_insert_tile(
conn,
zoom_level=18,
tile_x=10,
tile_y=20,
source="onboard_ingest",
flight_id=flight_b,
content_sha256="b" * 64,
)
# Assert
rows = _fetchall(
conn,
"SELECT tile_uuid, location_hash FROM tiles WHERE tile_x=10 AND tile_y=20",
)
assert len(rows) == 2
tile_uuids = {row[0] for row in rows}
location_hashes = {row[1] for row in rows}
assert len(tile_uuids) == 2, "per-flight tile_uuid collision"
assert len(location_hashes) == 1, "location_hash should match across flights"
def test_ac4b_natural_key_rejects_duplicate_flight_insert(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act + Assert
with psycopg.connect(baselined_db, autocommit=True) as conn:
_insert_tile(
conn,
zoom_level=18,
tile_x=30,
tile_y=40,
source="googlemaps",
flight_id=None,
content_sha256="c" * 64,
)
with pytest.raises(psycopg.errors.UniqueViolation):
# Same natural key (both flight_id=NULL → both coalesce to zero UUID).
# Use a different content_sha256 so the rejection comes from the
# natural-key index, not a coincidental UNIQUE elsewhere.
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes
) VALUES (
18, 30, 40, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024
)
""",
(uuid4(), uuid4(), "d" * 64),
)
# ----------------------------------------------------------------------
# AC-5: widened CHECK accepts all six values; rejects bogus.
@pytest.mark.parametrize(
"freshness_value",
[
"fresh",
"stale_warn",
"stale_reject",
"stale_active_conflict",
"stale_rear",
"downgraded",
],
)
def test_ac5_widened_check_accepts_union_values(baselined_db: str, freshness_value: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act
with psycopg.connect(baselined_db, autocommit=True) as conn:
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes,
freshness_status
) VALUES (
18, 1, 1, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024,
%s
)
""",
(uuid4(), uuid4(), "e" * 64, freshness_value),
)
# Assert
rows = _fetchall(
conn,
"SELECT freshness_status FROM tiles WHERE freshness_status = %s",
(freshness_value,),
)
assert any(row[0] == freshness_value for row in rows)
def test_ac5_widened_check_rejects_bogus(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
# Act + Assert
with psycopg.connect(baselined_db, autocommit=True) as conn:
with pytest.raises(psycopg.errors.CheckViolation):
_exec(
conn,
"""
INSERT INTO tiles (
zoom_level, tile_x, tile_y, latitude, longitude,
tile_size_meters, tile_size_pixels, capture_timestamp, source,
tile_uuid, location_hash, content_sha256, disk_bytes,
freshness_status
) VALUES (
18, 2, 2, 0.0, 0.0,
256.0, 256, now(), 'googlemaps',
%s, %s, %s, 1024,
'bogus'
)
""",
(uuid4(), uuid4(), "f" * 64),
)
# ----------------------------------------------------------------------
# AC-6: down migration reverses cleanly; subsequent upgrade re-applies.
def test_ac6_downgrade_reverses_cleanly(baselined_db: str) -> None:
# Arrange
apply_migrations(_build_config(baselined_db))
sa_url = _to_sqlalchemy_url(baselined_db)
cfg = _alembic_config(sa_url)
# Act: downgrade one revision
command.downgrade(cfg, "-1")
# Assert: AZ-304 artifacts gone, AZ-263 baseline intact.
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col not in tiles_cols, f"tiles.{col} should be dropped after downgrade"
for col in _AZ263_TILE_COLUMNS:
assert col in tiles_cols, f"AZ-263 tiles.{col} dropped by downgrade — regression"
tile_table = _fetchone(
conn,
"SELECT to_regclass('public.tile_freshness_rules')",
)
assert tile_table is not None and tile_table[0] is None
defn = _check_constraint_source(conn, "ck_tiles_freshness_status")
assert defn is not None
# AZ-263 vocabulary only.
assert "'stale_active_conflict'" not in defn
assert "'stale_rear'" not in defn
assert "'downgraded'" not in defn
# Act: re-upgrade
command.upgrade(cfg, "head")
# Assert: clean re-apply
with psycopg.connect(baselined_db) as conn:
tiles_cols = _column_metadata(conn, "tiles")
for col in _AZ304_TILE_COLUMNS:
assert col in tiles_cols
# ----------------------------------------------------------------------
# AC-7: seed rows present with documented values.
def test_ac7_freshness_rules_seeded(baselined_db: str) -> None:
# Act
apply_migrations(_build_config(baselined_db))
# Assert
with psycopg.connect(baselined_db) as conn:
rows = _fetchall(
conn,
"SELECT classification, max_age_seconds, action FROM tile_freshness_rules ORDER BY classification",
)
assert rows == [
("active_conflict", 15552000, "reject"),
("stable_rear", 31104000, "downgrade"),
]
# ----------------------------------------------------------------------
# AC-8: log INFO records carry kind / namespace_uuid.
def test_ac8_apply_logs_kind_applied(baselined_db: str, caplog: pytest.LogCaptureFixture) -> None:
# Arrange
caplog.set_level(logging.INFO, logger="c6_tile_cache.migrations")
# Act
apply_migrations(_build_config(baselined_db))
# Assert
applied_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c6.migration.applied"
]
assert len(applied_records) == 1
kv = getattr(applied_records[0], "kv", {})
assert kv.get("revisions") == [_AZ304_REV]
assert kv.get("namespace_uuid") == str(TILE_NAMESPACE_UUID)
def test_ac8_noop_logs_kind_no_op(baselined_db: str, caplog: pytest.LogCaptureFixture) -> None:
# Arrange
apply_migrations(_build_config(baselined_db)) # first apply
caplog.clear()
caplog.set_level(logging.INFO, logger="c6_tile_cache.migrations")
# Act
apply_migrations(_build_config(baselined_db)) # second = no-op
# Assert
noop_records = [r for r in caplog.records if getattr(r, "kind", None) == "c6.migration.no_op"]
assert len(noop_records) == 1
kv = getattr(noop_records[0], "kv", {})
assert kv.get("current_revision") == _AZ304_REV
assert kv.get("namespace_uuid") == str(TILE_NAMESPACE_UUID)
# ----------------------------------------------------------------------
# NFR-perf-apply / NFR-perf-noop: timing budgets.
def test_nfr_perf_apply_under_5s(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
# Act
t0 = time.perf_counter()
apply_migrations(config)
elapsed = time.perf_counter() - t0
# Assert
assert elapsed < 5.0, f"apply took {elapsed:.3f}s (>5s budget)"
def test_nfr_perf_noop_under_100ms(baselined_db: str) -> None:
# Arrange
config = _build_config(baselined_db)
apply_migrations(config)
# Act
t0 = time.perf_counter()
apply_migrations(config)
elapsed = time.perf_counter() - t0
# Assert
assert elapsed < 0.100, f"no-op took {elapsed * 1000:.1f}ms (>100ms budget)"
# ----------------------------------------------------------------------
# Smoke: MigrationResult is frozen.
def test_migration_result_is_frozen() -> None:
# Arrange
result = MigrationResult(applied=["x"], current_revision="x", no_op=False)
# Act + Assert
with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)):
result.no_op = True # type: ignore[misc]
# AC-12 (`TileMetadata.location_hash` default = None) is covered in the
# AZ-303 protocol-conformance suite (`test_protocol_conformance.py`); no
# Postgres needed.
@@ -0,0 +1,177 @@
"""AZ-304 AC-10 / AC-11 — UUIDv5 namespace determinism + cross-repo coordination.
The expected UUIDs locked below are the *cross-repo coordination
evidence* between ``gps-denied-onboard`` (Python ``uuid.uuid5``) and
``satellite-provider`` (C# Guid.NewGuid v5 implementation per
``AZ-TBD_tile_identity_uuidv5_bulk_list``). Both sides MUST emit
byte-identical UUIDs for these input vectors; changing any expected
value here without a coordinated cross-workspace release breaks the
correlation key joining the two systems.
"""
from __future__ import annotations
from uuid import UUID
import pytest
from gps_denied_onboard.components.c6_tile_cache._types import TileSource
from gps_denied_onboard.components.c6_tile_cache._uuid_namespace import (
TILE_NAMESPACE_UUID,
derive_location_hash,
derive_tile_id,
)
_EXPECTED_NAMESPACE = UUID("5b8d0c2e-1a4f-4b3a-8c9d-e7f6a3b2c1d0")
# AC-10 — five locked tile-id vectors. (z, x, y, source, flight_id) -> expected uuidv5.
_TILE_ID_VECTORS: list[tuple[int, int, int, str, str | None, str]] = [
(18, 72346, 46342, "googlemaps", None, "6f49531b-1351-55ba-b733-66d3f1fca1a5"),
(
18,
72346,
46342,
"onboard_ingest",
"11111111-2222-3333-4444-555555555555",
"c7f6eda4-3b95-5818-a0b7-1aa8cbb5aa95",
),
(10, 300, 200, "googlemaps", None, "3604dd59-1018-5889-97dc-ba5635761ac5"),
(
21,
999999,
999999,
"onboard_ingest",
"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"36c6e0c0-54a1-56ab-9dfd-a4f8f184cb22",
),
(15, 1000, 2000, "googlemaps", None, "955df722-8d4e-5375-8a23-4f45dc16fef1"),
]
# AC-11 — four locked location-hash vectors. (z, x, y) -> expected uuidv5.
_LOCATION_HASH_VECTORS: list[tuple[int, int, int, str]] = [
(18, 72346, 46342, "e95c7edb-550e-58eb-8f94-3056f73a57d3"),
(10, 300, 200, "76aa22b7-fd8e-5089-8b20-c45fb4a0f5e8"),
(21, 999999, 999999, "4337a27e-f118-524f-8d74-82cf9295c632"),
(15, 1000, 2000, "0501b2fc-0fc8-5330-a407-c7ccbf1fb9c7"),
]
# ----------------------------------------------------------------------
# AC-10: namespace + derivation are deterministic and locked.
def test_ac10_namespace_uuid_locked() -> None:
# Assert
assert TILE_NAMESPACE_UUID == _EXPECTED_NAMESPACE
assert TILE_NAMESPACE_UUID.version == 4 # the namespace itself was a one-time UUIDv4 pick
assert str(TILE_NAMESPACE_UUID) == "5b8d0c2e-1a4f-4b3a-8c9d-e7f6a3b2c1d0"
@pytest.mark.parametrize("z,x,y,source,flight_id,expected", _TILE_ID_VECTORS)
def test_ac10_derive_tile_id_locked_vectors(
z: int, x: int, y: int, source: str, flight_id: str | None, expected: str
) -> None:
# Act
result = derive_tile_id(z, x, y, source, flight_id)
# Assert
assert result == UUID(expected)
assert result.version == 5
@pytest.mark.parametrize("z,x,y,source,flight_id,expected", _TILE_ID_VECTORS)
def test_ac10_derive_tile_id_idempotent_on_second_call(
z: int, x: int, y: int, source: str, flight_id: str | None, expected: str
) -> None:
# Act
first = derive_tile_id(z, x, y, source, flight_id)
second = derive_tile_id(z, x, y, source, flight_id)
# Assert
assert first == second == UUID(expected)
def test_ac10_derive_tile_id_accepts_tile_source_enum() -> None:
"""Passing a `TileSource` enum yields the same UUID as its string value."""
# Act
from_enum = derive_tile_id(18, 72346, 46342, TileSource.GOOGLEMAPS, None)
from_str = derive_tile_id(18, 72346, 46342, "googlemaps", None)
# Assert
assert from_enum == from_str
assert from_enum == UUID("6f49531b-1351-55ba-b733-66d3f1fca1a5")
def test_ac10_derive_tile_id_accepts_uuid_flight_id() -> None:
"""Passing a `UUID` instance for ``flight_id`` matches the string form."""
# Arrange
flight_uuid_str = "11111111-2222-3333-4444-555555555555"
# Act
from_uuid = derive_tile_id(18, 72346, 46342, "onboard_ingest", UUID(flight_uuid_str))
from_str = derive_tile_id(18, 72346, 46342, "onboard_ingest", flight_uuid_str)
# Assert
assert from_uuid == from_str
assert from_uuid == UUID("c7f6eda4-3b95-5818-a0b7-1aa8cbb5aa95")
def test_ac10_derive_tile_id_rejects_unknown_source_type() -> None:
# Act + Assert
with pytest.raises(TypeError, match="source must be"):
derive_tile_id(18, 0, 0, 123, None)
def test_ac10_derive_tile_id_rejects_unknown_flight_id_type() -> None:
# Act + Assert
with pytest.raises(TypeError, match="flight_id must be"):
derive_tile_id(18, 0, 0, "googlemaps", 12345)
def test_ac10_derive_tile_id_rejects_malformed_flight_id_string() -> None:
# Act + Assert
with pytest.raises(ValueError):
derive_tile_id(18, 0, 0, "googlemaps", "not-a-uuid")
# ----------------------------------------------------------------------
# AC-11: location_hash is invariant across source / flight_id.
@pytest.mark.parametrize("z,x,y,expected", _LOCATION_HASH_VECTORS)
def test_ac11_derive_location_hash_locked_vectors(z: int, x: int, y: int, expected: str) -> None:
# Act
result = derive_location_hash(z, x, y)
# Assert
assert result == UUID(expected)
assert result.version == 5
def test_ac11_location_hash_invariant_across_source_and_flight() -> None:
"""Same (z, x, y) yields the same location_hash for any source + flight_id."""
# Arrange
cell = (18, 72346, 46342)
cell_lh = derive_location_hash(*cell)
# Act + Assert — `derive_tile_id` produces _different_ tile UUIDs for
# different source/flight combos but the cell-bag is the same; this is
# explicitly tested via the locked vectors above. Here we only need to
# confirm that `location_hash` NEVER depends on these inputs.
for _source in ("googlemaps", "onboard_ingest"):
for _flight_id in (None, "11111111-2222-3333-4444-555555555555"):
assert derive_location_hash(*cell) == cell_lh
def test_ac11_different_cells_yield_different_location_hashes() -> None:
# Act
lh1 = derive_location_hash(18, 72346, 46342)
lh2 = derive_location_hash(18, 72346, 46343) # neighbour cell
lh3 = derive_location_hash(19, 72346, 46342) # same x,y at different zoom
# Assert
assert lh1 != lh2
assert lh1 != lh3
assert lh2 != lh3
+9 -2
View File
@@ -20,7 +20,14 @@ REPO_ROOT = Path(__file__).resolve().parents[2]
MIGRATION_BODY = (REPO_ROOT / "db" / "migrations" / "versions" / "0001_initial.py").read_text()
def test_head_revision_is_0001_initial() -> None:
def test_head_revision_matches_latest_migration() -> None:
"""Asserts the Alembic head tracks the latest migration on disk.
AZ-263 originally pinned this to ``0001_initial``; AZ-304 advanced the head
to ``0002_c6_tile_identity_and_lru`` (additive on AZ-263 — see
``_docs/02_tasks/todo/AZ-304_c6_postgres_schema.md``). Future migrations
update this assertion in lockstep with the new head.
"""
# Arrange
cwd = os.getcwd()
os.chdir(REPO_ROOT)
@@ -33,7 +40,7 @@ def test_head_revision_is_0001_initial() -> None:
os.chdir(cwd)
# Assert
assert list(heads) == ["0001_initial"], f"unexpected heads: {heads}"
assert list(heads) == ["0002_c6_tile_identity_and_lru"], f"unexpected heads: {heads}"
@pytest.mark.parametrize(