[AZ-325] C10 CacheProvisioner orchestrator

Implements the public top-level F1 build orchestrator for E-C10 per
contract v1.1.0. Composes EngineCompiler (AZ-321), DescriptorBatcher
(AZ-322), and ManifestBuilder (AZ-323) into a single idempotent
operation guarded by a fcntl-backed cache_root/.c10.lock and a
post-build coverage walk.

Adds:
- CacheProvisionerImpl + FilelockFileLockFactory (provisioner.py)
- BuildRequest/BuildReport/BuildOutcome/SectorClassification DTOs +
  FileLockFactory Protocol + replaced placeholder CacheProvisioner
  Protocol with v1.1.0 surface (interface.py)
- C10ProvisionerConfig wired into C10ProvisioningConfig (config.py)
- BuildLockHeldError + ManifestCoverageError (errors.py)
- build_cache_provisioner composition root (c10_factory.py)
- 18 tests covering AC-1..AC-16 + NFR-perf-coverage-walk
- filelock>=3.13,<4.0 (single new third-party dep)

Idempotence (CP-INV-1) reuses AZ-323's _compute_manifest_hash /
_aggregate_tile_hash so the build-identity decision agrees byte-for-
byte with the Manifest's recorded manifest_hash. Coverage rollback
uses a .prev rename snapshot. Diagnostic compile_engines_for_corpus
is lock-free per AC-10.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 05:00:16 +03:00
parent 684ec2601c
commit f7b2e70085
12 changed files with 2329 additions and 21 deletions
@@ -17,6 +17,7 @@ from gps_denied_onboard.components.c10_provisioning.c7_engine_embedder import (
from gps_denied_onboard.components.c10_provisioning.config import (
BackboneConfig,
C10ManifestConfig,
C10ProvisionerConfig,
C10ProvisioningConfig,
SigningMode,
)
@@ -42,14 +43,21 @@ from gps_denied_onboard.components.c10_provisioning.engine_compiler import (
EngineCompileSummary,
)
from gps_denied_onboard.components.c10_provisioning.errors import (
BuildLockHeldError,
C10ProvisioningError,
DescriptorBatchError,
ManifestCoverageError,
ManifestWriteError,
)
from gps_denied_onboard.components.c10_provisioning.interface import (
BackboneEmbedder,
BuildOutcome,
BuildReport,
BuildRequest,
CacheProvisioner,
FileLockFactory,
ManifestSigner,
SectorClassification,
SigningKeyHandle,
)
from gps_denied_onboard.components.c10_provisioning.manifest_builder import (
@@ -69,6 +77,10 @@ from gps_denied_onboard.components.c10_provisioning.manifest_verifier import (
VerifyFailReason,
VerifyOutcome,
)
from gps_denied_onboard.components.c10_provisioning.provisioner import (
CacheProvisionerImpl,
FilelockFileLockFactory,
)
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c10_provisioning", C10ProvisioningConfig)
@@ -80,12 +92,18 @@ __all__ = [
"BackboneEmbedder",
"BackboneSpec",
"BatcherTile",
"BuildLockHeldError",
"BuildOutcome",
"BuildReport",
"BuildRequest",
"C7EngineBackboneEmbedder",
"C10BatcherConfig",
"C10ManifestConfig",
"C10ProvisionerConfig",
"C10ProvisioningConfig",
"C10ProvisioningError",
"CacheProvisioner",
"CacheProvisionerImpl",
"CompileEngineCallable",
"CompileOutcome",
"CorpusFilter",
@@ -99,15 +117,19 @@ __all__ = [
"EngineCompileResult",
"EngineCompileSummary",
"EngineCompiler",
"FileLockFactory",
"FilelockFileLockFactory",
"Manifest",
"ManifestArtifact",
"ManifestBuildInput",
"ManifestBuilder",
"ManifestCoverageError",
"ManifestSigner",
"ManifestVerifier",
"ManifestVerifierImpl",
"ManifestWriteError",
"ProgressEvent",
"SectorClassification",
"SigningKeyHandle",
"SigningMode",
"TileBboxRecord",
@@ -26,6 +26,7 @@ from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"BackboneConfig",
"C10ManifestConfig",
"C10ProvisionerConfig",
"C10ProvisioningConfig",
"SigningMode",
]
@@ -33,6 +34,8 @@ __all__ = [
_DEFAULT_WORKSPACE_MB: int = 4096
_DEFAULT_MANIFEST_SCHEMA_VERSION: str = "1.1"
_DEFAULT_LOCK_TIMEOUT_S: float = 5.0
_DEFAULT_MANIFEST_FILENAME: str = "Manifest.json"
class SigningMode(str, Enum):
@@ -152,6 +155,48 @@ class BackboneConfig:
)
@dataclass(frozen=True)
class C10ProvisionerConfig:
"""Top-level :class:`CacheProvisioner` orchestrator policy (AZ-325).
Distinct from :class:`C10ProvisioningConfig` (the broader component
config carrying engine corpus + Manifest signing policy). This
block holds ONLY the orchestrator's own knobs:
* ``coverage_strict`` — when ``True`` (default + production),
orphan files under ``cache_root`` after a SUCCESS build raise
:class:`ManifestCoverageError` and the build is rolled back to
the prior-good Manifest. When ``False``, orphans emit a single
WARN log and the new Manifest is kept. Documented as "for
forensic builds only" in description.md §7 — CI runs assert
strict.
* ``lock_timeout_s`` — non-blocking acquisition timeout for
``cache_root/.c10.lock`` (CP-INV-4). Short by default (5 s) so
a real concurrent invocation surfaces as
:class:`BuildLockHeldError` quickly rather than a multi-minute
stall.
* ``manifest_filename`` — overrides the on-disk Manifest filename;
tests use this to verify the orchestrator does not hardcode
``Manifest.json`` in path lookups.
"""
coverage_strict: bool = True
lock_timeout_s: float = _DEFAULT_LOCK_TIMEOUT_S
manifest_filename: str = _DEFAULT_MANIFEST_FILENAME
def __post_init__(self) -> None:
if self.lock_timeout_s <= 0:
raise ConfigError(
"C10ProvisionerConfig.lock_timeout_s must be > 0; "
f"got {self.lock_timeout_s}"
)
if not self.manifest_filename:
raise ConfigError(
"C10ProvisionerConfig.manifest_filename must be a "
"non-empty string"
)
@dataclass(frozen=True)
class C10ProvisioningConfig:
"""Per-component config for C10 cache provisioning.
@@ -170,11 +215,19 @@ class C10ProvisioningConfig:
(signing mode, allowed operator fingerprints, schema version).
Defaulted to dev-mode with no allowlist so unit tests + replay
runs that don't build Manifests stay no-op.
``provisioner`` carries the AZ-325 :class:`CacheProvisioner`
orchestrator policy (coverage_strict, lock timeout, manifest
filename). Defaults to strict + 5-second lock timeout — the
documented production posture.
"""
backbones: tuple[BackboneConfig, ...] = field(default_factory=tuple)
workspace_mb: int = _DEFAULT_WORKSPACE_MB
manifest: C10ManifestConfig = field(default_factory=C10ManifestConfig)
provisioner: C10ProvisionerConfig = field(
default_factory=lambda: C10ProvisionerConfig()
)
def __post_init__(self) -> None:
if self.workspace_mb <= 0:
@@ -1,18 +1,30 @@
"""C10 cache-provisioning error family.
Rooted at :class:`C10ProvisioningError`; today the family contains
:class:`ManifestWriteError` (AZ-323) covering signing-key load failure,
fingerprint-allowlist rejection, and any I/O failure path during
``ManifestBuilder.build_manifest``. AZ-324 / AZ-325 add additional
subtypes (``ManifestVerifierError``, ``ManifestCoverageError``,
``ContentHashMismatchError``) under the same root as they land.
Rooted at :class:`C10ProvisioningError`; the family covers:
* :class:`ManifestWriteError` (AZ-323) — signing-key load failure,
fingerprint-allowlist rejection, atomic-write failure during
:meth:`ManifestBuilder.build_manifest`.
* :class:`DescriptorBatchError` (AZ-322) — CUDA OOM, descriptor-dim
mismatch, FAISS rebuild failure during
:meth:`DescriptorBatcher.populate_descriptors`.
* :class:`BuildLockHeldError` (AZ-325) — another invocation of
:meth:`CacheProvisioner.build_cache_artifacts` already holds the
``cache_root/.c10.lock`` file (CP-INV-4 race-condition guard, see
description.md §7).
* :class:`ManifestCoverageError` (AZ-325) — after a SUCCESS build, an
orphan file under ``cache_root`` is not listed in the new Manifest's
``artifacts`` block (D-C10-3 / CP-INV-3). The orchestrator rolls
back to the prior-good Manifest before re-raising.
"""
from __future__ import annotations
__all__ = [
"BuildLockHeldError",
"C10ProvisioningError",
"DescriptorBatchError",
"ManifestCoverageError",
"ManifestWriteError",
]
@@ -57,3 +69,38 @@ class ManifestWriteError(C10ProvisioningError):
"c10.manifest.build.error"` log payload (set by ``ManifestBuilder``)
carries the discriminator field.
"""
class BuildLockHeldError(C10ProvisioningError):
"""A concurrent ``build_cache_artifacts`` already holds the lock.
Raised by :class:`CacheProvisioner` (AZ-325) when another process
has acquired ``cache_root/.c10.lock`` and the configured
``lock_timeout_s`` elapsed before the lock could be obtained.
Enforces CP-INV-4 (mutual exclusion of concurrent builds on the
same cache root). The existing build is unaffected; the held
lockfile is NOT deleted.
Operators observe this via the structured
``kind="c10.provision.lock.held"`` ERROR log; the recovery action
is to wait for the other build to finish or to ``kill`` the stale
process (filelock auto-releases on process exit).
"""
class ManifestCoverageError(C10ProvisioningError):
"""Orphan files under ``cache_root`` are not listed in the Manifest.
Raised by :class:`CacheProvisioner` (AZ-325) after a SUCCESS build
when the strict-mode coverage walk discovers files under
``cache_root`` that are not part of the new Manifest's
``artifacts`` block. Enforces D-C10-3 / CP-INV-3 (no smuggled
artifacts in the takeoff cache).
On this exception the orchestrator restores the prior-good
Manifest (renaming ``Manifest.json.prev`` back to
``Manifest.json``) before re-raising; the cache is therefore left
in the previous-good state, never in an in-between state. The
structured ``kind="c10.provision.coverage.orphans"`` ERROR log
names the orphan paths.
"""
@@ -1,40 +1,181 @@
"""C10 Public-API Protocols.
"""C10 Public-API Protocols + top-level orchestrator DTOs.
- :class:`CacheProvisioner` (AZ-325, pending) — pre-flight orchestrator.
- :class:`ManifestSigner` (AZ-323) — Ed25519 detached signing surface
Public surfaces:
* :class:`CacheProvisioner` (AZ-325) — the F1 build-phase orchestrator.
Composes :class:`EngineCompiler` (AZ-321),
:class:`DescriptorBatcher` (AZ-322), and :class:`ManifestBuilder`
(AZ-323) into a single idempotent build pipeline gated by a
filesystem lockfile. See
``_docs/02_document/contracts/c10_provisioning/cache_provisioner.md``.
* :class:`FileLockFactory` (AZ-325) — consumer-side cut over the
``filelock`` package that lets tests inject a deterministic
in-process lock without spawning subprocesses.
* :class:`ManifestSigner` (AZ-323) — Ed25519 detached signing surface
consumed by :class:`ManifestBuilder`.
- :class:`BackboneEmbedder` (AZ-322) — image-batch → descriptor surface
* :class:`BackboneEmbedder` (AZ-322) — image-batch → descriptor surface
consumed by :class:`DescriptorBatcher`. The default impl wraps the
AZ-298 / AZ-299 / AZ-300 ``InferenceRuntime``-produced engine; when
E-C2 (AZ-336+) ships its public embed surface a thin adapter swaps
the impl in via the composition root.
AZ-298 / AZ-299 / AZ-300 ``InferenceRuntime``-produced engine.
Concrete impl: engine compile + descriptors + manifest + content-hash gate. See
`_docs/02_document/components/11_c10_provisioning/`.
The orchestrator + lock-factory DTOs live alongside the Protocol
because the Protocol's signatures reference them; keeping everything
in this single import surface is consistent with how AZ-321 collocates
``CompileEngineCallable`` with its request/result DTOs.
Per the contract document the public ``Bbox`` field is the project's
canonical :class:`gps_denied_onboard._types.geo.BoundingBox` (not a
new ``Bbox`` DTO) — this matches what AZ-323 / AZ-324 already accept
and avoids a redundant adapter layer at the C10/C12 boundary.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from uuid import UUID
from gps_denied_onboard._types.manifests import Manifest
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import EngineCacheEntry
if TYPE_CHECKING:
import numpy as np
__all__ = [
"BackboneEmbedder",
"BuildOutcome",
"BuildReport",
"BuildRequest",
"CacheProvisioner",
"FileLockFactory",
"ManifestSigner",
"SectorClassification",
"SigningKeyHandle",
]
class CacheProvisioner(Protocol):
"""Pre-flight cache provisioning (engine compile + descriptor batch + manifest)."""
class SectorClassification(str, Enum):
"""Operator-set sector classification for a cache build (AZ-325).
def provision(self, flight_id: str, output_root: Path) -> Manifest: ...
Mirrors the C6 enum at the C10 contract surface so
``components/c10_provisioning/*`` never imports
``components.c6_tile_cache``. The string values are identical to
C6's so the composition-root adapters can round-trip via
``.value`` (see :func:`runtime_root.c10_factory.build_cache_provisioner`).
"""
ACTIVE_CONFLICT = "active_conflict"
STABLE_REAR = "stable_rear"
class BuildOutcome(str, Enum):
"""Terminal classification of one ``build_cache_artifacts`` call."""
SUCCESS = "success"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
@dataclass(frozen=True)
class BuildRequest:
"""Frozen call argument for :meth:`CacheProvisioner.build_cache_artifacts`.
``takeoff_origin`` / ``flight_id`` are the ADR-010 / AZ-489
pass-through fields — when supplied they are baked into both the
Manifest body and the build-identity hash so a re-planned flight
produces a fresh cache identity (CP-INV-8 / AC-14 / AC-16).
"""
bbox: BoundingBox
zoom_levels: tuple[int, ...]
sector_class: SectorClassification
calibration_path: Path
cache_root: Path
key_path: Path
takeoff_origin: LatLonAlt | None = None
flight_id: UUID | None = None
@dataclass(frozen=True)
class BuildReport:
"""Return value of :meth:`CacheProvisioner.build_cache_artifacts`.
``manifest_hash`` / ``manifest_path`` are populated for SUCCESS
and IDEMPOTENT_NO_OP outcomes; FAILURE leaves them as ``None``
and routes the operator-actionable reason through
``failure_reason``. Hard errors (``BuildLockHeldError``,
``EngineBuildError``, ``DescriptorBatchError``,
``ManifestWriteError``, ``ManifestCoverageError``) propagate as
exceptions instead of being captured here — only soft failures
(e.g. empty C6 corpus, non-strict coverage drift) are captured in
this report.
"""
outcome: BuildOutcome
engines_built: int
engines_reused: int
descriptors_generated: int
manifest_hash: str | None
manifest_path: Path | None
failure_reason: str | None
elapsed_s: float
@runtime_checkable
class FileLockFactory(Protocol):
"""Constructor for filesystem-lockfile context managers (AZ-325).
The default production impl
(:class:`gps_denied_onboard.components.c10_provisioning.provisioner.FilelockFileLockFactory`)
delegates to the ``filelock`` package, which uses fcntl flock so
the lock is auto-released on process exit (AC-8 SIGKILL recovery).
Tests inject a deterministic in-process factory to assert
contention behaviour without spawning subprocesses (AC-5).
Acquisition contract: ``try_lock`` returns a context manager whose
``__enter__`` either returns ``None`` (lock held) or raises
:class:`gps_denied_onboard.components.c10_provisioning.errors.BuildLockHeldError`
if the configured ``timeout_s`` elapsed before the lock could be
acquired. ``__exit__`` always releases the lock — the orchestrator
relies on this contract for AC-8 lock-released-on-every-exit.
"""
def try_lock(
self, path: Path, *, timeout_s: float
) -> AbstractContextManager[None]: ...
@runtime_checkable
class CacheProvisioner(Protocol):
"""Public top-level orchestrator for the C10 F1 build phase.
Composes :class:`EngineCompiler`, :class:`DescriptorBatcher`, and
:class:`ManifestBuilder` into a single idempotent operation:
1. Acquire ``cache_root/.c10.lock`` (CP-INV-4).
2. Query C6 for tiles in scope; empty → ``BuildReport(outcome=FAILURE)``.
3. Compute the build-identity hash; matches existing Manifest's
``manifest_hash`` → ``IDEMPOTENT_NO_OP`` (D-C10-1).
4. Otherwise run engine compile → descriptor populate → Manifest
build (snapshotting any prior Manifest to ``Manifest.json.prev``
for rollback).
5. Walk ``cache_root`` and verify every shipped file is in the new
Manifest's ``artifacts`` block; orphans → roll back +
:class:`ManifestCoverageError` (D-C10-3).
6. Cleanup ``Manifest.json.prev``; release lock.
The Protocol is ``@runtime_checkable`` so unit tests can assert
structural conformance against the default impl without importing
the impl class (CP-TC-10).
"""
def build_cache_artifacts(self, request: BuildRequest) -> BuildReport: ...
def compile_engines_for_corpus(
self, request: Any
) -> tuple[EngineCacheEntry, ...]: ...
class SigningKeyHandle(Protocol):
@@ -0,0 +1,755 @@
"""C10 ``CacheProvisionerImpl`` — top-level F1 orchestrator (AZ-325).
Composes :class:`EngineCompiler` (AZ-321), :class:`DescriptorBatcher`
(AZ-322), and :class:`ManifestBuilder` (AZ-323) into the public
contract surface specified by
``_docs/02_document/contracts/c10_provisioning/cache_provisioner.md``.
Design highlights:
* CP-INV-4 mutual exclusion is enforced via a ``cache_root/.c10.lock``
filesystem lockfile acquired through the injected
:class:`FileLockFactory`. The default impl uses the ``filelock``
package (fcntl-backed → auto-released on process exit, AC-8 SIGKILL
recovery).
* D-C10-1 idempotence is decided by reading the existing
``Manifest.json``'s recorded ``build.manifest_hash`` and recomputing
the same hash for the new request. Because AZ-323's hash includes
engine + descriptor-index SHA-256 (which are build outputs), the
warm path reads the existing Manifest's listed artifacts to
reconstruct the inputs the AZ-323 helper needs. AC-2 forbids any
call to ``compile_engines_for_corpus`` / ``populate_descriptors`` /
``build_manifest`` on this path; tiles are queried via the C6
metadata store only (cheap) so the predicted engine paths can be
checked against the recorded set.
* D-C10-3 / CP-INV-3 coverage walk runs after a SUCCESS build: every
regular file under ``cache_root`` (excluding the Manifest itself,
its sidecars, the lockfile, and the ``.prev`` rollback) MUST be
listed in the new Manifest's ``artifacts`` block. Orphans → roll
back to the prior-good Manifest and raise
:class:`ManifestCoverageError`.
* Lock release is unconditional (try/finally) on every exit path —
SUCCESS, FAILURE, IDEMPOTENT_NO_OP, ``ManifestCoverageError``, and
any propagated exception from the inner phases. AC-8 verifies this
by re-acquiring the lock after each error path.
Cross-component imports: this module never imports
``components.c6_*`` directly. Tile metadata access goes through the
:class:`TilesByBboxQuery` consumer-side cut already defined in
``manifest_builder.py`` for AZ-323; the composition root
(``runtime_root.c10_factory.build_cache_provisioner``) wires the real
C6 store into the same adapter the AZ-323 builder consumes.
The build-identity hash formula matches AZ-323's
``_compute_manifest_hash`` byte-for-byte; both modules import the
canonical helper (currently a leading-underscore export from
``manifest_builder``). Cumulative-review Finding F1 (carryover from
batches 3133) tracks promoting the helper to a shared
``_build_identity`` module so AZ-323 / AZ-324 / AZ-325 share a single
definition; that hygiene PBI is intentionally deferred — the import
is documented here so a reader sees the intent.
"""
from __future__ import annotations
import hashlib
import logging
from contextlib import AbstractContextManager
from dataclasses import dataclass
from pathlib import Path
import orjson
from filelock import FileLock, Timeout as FileLockTimeout
from gps_denied_onboard._types.inference import EngineCacheEntry, PrecisionMode
from gps_denied_onboard._types.manifests import HostCapabilities
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c10_provisioning.config import (
C10ProvisionerConfig,
)
from gps_denied_onboard.components.c10_provisioning.descriptor_batcher import (
BatcherOutcome,
CorpusFilter,
DescriptorBatcher,
)
from gps_denied_onboard.components.c10_provisioning.engine_compiler import (
BackboneSpec,
EngineCompileRequest,
EngineCompileResult,
EngineCompiler,
CompileOutcome,
)
from gps_denied_onboard.components.c10_provisioning.errors import (
BuildLockHeldError,
ManifestCoverageError,
)
from gps_denied_onboard.components.c10_provisioning.interface import (
BuildOutcome,
BuildReport,
BuildRequest,
FileLockFactory,
)
from gps_denied_onboard.components.c10_provisioning.manifest_builder import (
ManifestBuildInput,
ManifestBuilder,
TileHashRecord,
TilesByBboxQuery,
_aggregate_tile_hash,
_compute_manifest_hash,
)
from gps_denied_onboard.helpers.engine_filename_schema import (
EngineFilenameSchema,
)
__all__ = [
"CacheProvisionerImpl",
"FilelockFileLockFactory",
]
_LOG_KIND_PREFIX = "c10.provision"
_LOCK_FILENAME = ".c10.lock"
_MANIFEST_PREV_SUFFIX = ".prev"
_MANIFEST_SHA256_SUFFIX = ".sha256"
_MANIFEST_SIG_SUFFIX = ".sig"
# Filenames excluded from the coverage walk because they are the Manifest
# itself, its sidecars, the lockfile, or the rollback snapshot. Compared
# as exact string suffixes against ``Path.name``.
_COVERAGE_EXCLUDED_NAMES: frozenset[str] = frozenset() # populated at construction
@dataclass(frozen=True)
class _LockGuard(AbstractContextManager["_LockGuard"]):
"""Context-manager wrapper that re-raises the contract's typed error.
The default :class:`FilelockFileLockFactory` returns one of these so
callers can unconditionally ``with`` the result; an acquisition
timeout raises :class:`BuildLockHeldError` instead of leaking
``filelock.Timeout`` upward.
"""
lock: FileLock
timeout_s: float
path: Path
def __enter__(self) -> "_LockGuard":
try:
self.lock.acquire(timeout=self.timeout_s)
except FileLockTimeout as exc:
raise BuildLockHeldError(
f"another build holds the lockfile at {self.path}"
) from exc
return self
def __exit__(self, exc_type, exc, tb) -> None:
try:
self.lock.release()
finally:
# Best-effort lockfile removal so the cache_root listing
# is clean after a successful build. ``filelock`` itself
# does not delete the file; the SIGKILL-safety guarantee
# is at the fcntl-flock layer (kernel releases the
# advisory lock on process exit even if the file
# persists).
try:
self.path.unlink()
except FileNotFoundError:
pass
except OSError as exc_unlink:
# Cleanup failure is non-fatal — the lock has been
# released; leftover lockfile bytes are harmless on
# the next acquisition (filelock re-uses the file).
# Surface at WARN so operators see persistent
# filesystem permission issues.
logging.getLogger("c10_provisioning.lock").warning(
f"{_LOG_KIND_PREFIX}.lock.cleanup",
extra={
"kind": f"{_LOG_KIND_PREFIX}.lock.cleanup",
"kv": {"path": str(self.path), "reason": str(exc_unlink)},
},
)
class FilelockFileLockFactory:
"""Default :class:`FileLockFactory` impl using the ``filelock`` package.
Uses ``filelock.FileLock`` which wraps ``fcntl.flock`` on POSIX
(auto-released on process exit, satisfying the SIGKILL clause of
AC-8) and ``msvcrt`` locks on Windows. The non-blocking timeout is
forwarded to ``acquire(timeout=...)``; on timeout the wrapper
re-raises as :class:`BuildLockHeldError` per the contract.
"""
def try_lock(
self, path: Path, *, timeout_s: float
) -> AbstractContextManager[None]:
return _LockGuard(
lock=FileLock(str(path)),
timeout_s=timeout_s,
path=path,
)
class CacheProvisionerImpl:
"""Default implementation of the :class:`CacheProvisioner` Protocol.
Constructor injection only — no side effects in ``__init__`` other
than naming the structured logger. The composition root assembles
every collaborator and the orchestrator wires them in the order
the contract dictates.
The orchestrator deliberately does NOT cache references to
intermediate state across calls; every ``build_cache_artifacts``
invocation is a fresh transaction guarded by the lockfile.
"""
def __init__(
self,
*,
engine_compiler: EngineCompiler,
descriptor_batcher: DescriptorBatcher,
manifest_builder: ManifestBuilder,
tile_metadata_store: TilesByBboxQuery,
lock_factory: FileLockFactory,
backbones: tuple[BackboneSpec, ...],
host: HostCapabilities,
precision: PrecisionMode,
workspace_mb: int,
logger: logging.Logger,
clock: Clock,
config: C10ProvisionerConfig,
) -> None:
self._engine_compiler = engine_compiler
self._descriptor_batcher = descriptor_batcher
self._manifest_builder = manifest_builder
self._tiles_query = tile_metadata_store
self._lock_factory = lock_factory
self._backbones = backbones
self._host = host
self._precision = precision
self._workspace_mb = workspace_mb
self._log = logger
self._clock = clock
self._config = config
# ------------------------------------------------------------------
# Public surface
# ------------------------------------------------------------------
def build_cache_artifacts(self, request: BuildRequest) -> BuildReport:
run_started_ns = self._clock.monotonic_ns()
manifest_path = request.cache_root / self._config.manifest_filename
prev_path = manifest_path.with_suffix(
manifest_path.suffix + _MANIFEST_PREV_SUFFIX
)
lock_path = request.cache_root / _LOCK_FILENAME
request.cache_root.mkdir(parents=True, exist_ok=True)
with self._lock_factory.try_lock(
lock_path, timeout_s=self._config.lock_timeout_s
):
self._log.info(
f"{_LOG_KIND_PREFIX}.lock.acquired",
extra={
"kind": f"{_LOG_KIND_PREFIX}.lock.acquired",
"kv": {"path": str(lock_path)},
},
)
sorted_tiles = self._fetch_sorted_tiles(request)
if not sorted_tiles:
return self._build_failure_empty_corpus(request, run_started_ns)
idempotent_hash = self._check_idempotence(
request=request,
manifest_path=manifest_path,
sorted_tiles=sorted_tiles,
)
if idempotent_hash is not None:
elapsed_s = self._elapsed_s(run_started_ns)
self._log.info(
f"{_LOG_KIND_PREFIX}.idempotent.no_op",
extra={
"kind": f"{_LOG_KIND_PREFIX}.idempotent.no_op",
"kv": {
"manifest_hash": idempotent_hash,
"elapsed_s": elapsed_s,
},
},
)
return BuildReport(
outcome=BuildOutcome.IDEMPOTENT_NO_OP,
engines_built=0,
engines_reused=0,
descriptors_generated=0,
manifest_hash=idempotent_hash,
manifest_path=manifest_path,
failure_reason=None,
elapsed_s=elapsed_s,
)
return self._run_active_build(
request=request,
manifest_path=manifest_path,
prev_path=prev_path,
run_started_ns=run_started_ns,
)
def compile_engines_for_corpus(
self, request: EngineCompileRequest
) -> tuple[EngineCacheEntry, ...]:
"""Diagnostic-mode passthrough — re-compile engines without touching descriptors / Manifest.
Per CP-TC-11 / AC-10 this is a thin forwarder. It does NOT
acquire the lockfile (the operator runs this for engine-only
re-compile flows after a hardware change, where the orchestrator's
full transaction would be overkill). The return value is the
underlying compiler's ``EngineCompileResult.entry`` projected
as the contract's ``tuple[EngineCacheEntry, ...]``.
"""
results = self._engine_compiler.compile_engines_for_corpus(request)
return tuple(result.entry for result in results)
# ------------------------------------------------------------------
# Internals — active build path
# ------------------------------------------------------------------
def _run_active_build(
self,
*,
request: BuildRequest,
manifest_path: Path,
prev_path: Path,
run_started_ns: int,
) -> BuildReport:
prior_existed = self._snapshot_prior_manifest(manifest_path, prev_path)
try:
engine_results = self._engine_compiler.compile_engines_for_corpus(
self._compose_engine_request(request)
)
except Exception:
self._restore_prior_manifest(manifest_path, prev_path, prior_existed)
raise
engines_built, engines_reused = self._count_outcomes(engine_results)
engine_entries = tuple(result.entry for result in engine_results)
try:
descriptor_report = self._descriptor_batcher.populate_descriptors(
CorpusFilter(
bbox=(
request.bbox.min_lat_deg,
request.bbox.min_lon_deg,
request.bbox.max_lat_deg,
request.bbox.max_lon_deg,
),
zoom_levels=request.zoom_levels,
sector_class=request.sector_class.value,
)
)
except Exception:
self._restore_prior_manifest(manifest_path, prev_path, prior_existed)
raise
if descriptor_report.outcome is not BatcherOutcome.SUCCESS:
self._restore_prior_manifest(manifest_path, prev_path, prior_existed)
elapsed_s = self._elapsed_s(run_started_ns)
self._log.error(
f"{_LOG_KIND_PREFIX}.descriptor.failure",
extra={
"kind": f"{_LOG_KIND_PREFIX}.descriptor.failure",
"kv": {
"failure_reason": descriptor_report.failure_reason,
"elapsed_s": elapsed_s,
},
},
)
return BuildReport(
outcome=BuildOutcome.FAILURE,
engines_built=engines_built,
engines_reused=engines_reused,
descriptors_generated=0,
manifest_hash=None,
manifest_path=None,
failure_reason=descriptor_report.failure_reason,
elapsed_s=elapsed_s,
)
descriptor_index_path = self._derive_descriptor_index_path(request)
try:
manifest_artifact = self._manifest_builder.build_manifest(
ManifestBuildInput(
cache_root=request.cache_root,
bbox=request.bbox,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class.value,
engine_entries=engine_entries,
descriptor_index_path=descriptor_index_path,
calibration_path=request.calibration_path,
key_path=request.key_path,
takeoff_origin=request.takeoff_origin,
flight_id=request.flight_id,
)
)
except Exception:
self._restore_prior_manifest(manifest_path, prev_path, prior_existed)
raise
try:
self._verify_coverage(
cache_root=request.cache_root,
manifest_path=manifest_path,
engine_entries=engine_entries,
descriptor_index_path=descriptor_index_path,
calibration_path=request.calibration_path,
)
except ManifestCoverageError:
self._restore_prior_manifest(manifest_path, prev_path, prior_existed)
raise
self._cleanup_prev(prev_path)
elapsed_s = self._elapsed_s(run_started_ns)
self._log.info(
f"{_LOG_KIND_PREFIX}.build.success",
extra={
"kind": f"{_LOG_KIND_PREFIX}.build.success",
"kv": {
"manifest_hash": manifest_artifact.manifest_hash,
"engines_built": engines_built,
"engines_reused": engines_reused,
"descriptors_generated": descriptor_report.descriptors_generated,
"elapsed_s": elapsed_s,
},
},
)
return BuildReport(
outcome=BuildOutcome.SUCCESS,
engines_built=engines_built,
engines_reused=engines_reused,
descriptors_generated=descriptor_report.descriptors_generated,
manifest_hash=manifest_artifact.manifest_hash,
manifest_path=manifest_artifact.manifest_path,
failure_reason=None,
elapsed_s=elapsed_s,
)
# ------------------------------------------------------------------
# Internals — helpers
# ------------------------------------------------------------------
def _fetch_sorted_tiles(
self, request: BuildRequest
) -> tuple[TileHashRecord, ...]:
raw = tuple(
self._tiles_query.query_by_bbox(
bbox=request.bbox,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class.value,
)
)
return tuple(
sorted(raw, key=lambda r: (r.zoom, r.lat, r.lon, r.source))
)
def _build_failure_empty_corpus(
self, request: BuildRequest, run_started_ns: int
) -> BuildReport:
elapsed_s = self._elapsed_s(run_started_ns)
reason = (
"no tiles in C6 for the requested scope; run C11 "
"TileDownloader first"
)
self._log.error(
f"{_LOG_KIND_PREFIX}.empty.corpus",
extra={
"kind": f"{_LOG_KIND_PREFIX}.empty.corpus",
"kv": {
"bbox": [
request.bbox.min_lat_deg,
request.bbox.min_lon_deg,
request.bbox.max_lat_deg,
request.bbox.max_lon_deg,
],
"zoom_levels": list(request.zoom_levels),
"sector_class": request.sector_class.value,
"elapsed_s": elapsed_s,
},
},
)
return BuildReport(
outcome=BuildOutcome.FAILURE,
engines_built=0,
engines_reused=0,
descriptors_generated=0,
manifest_hash=None,
manifest_path=None,
failure_reason=reason,
elapsed_s=elapsed_s,
)
def _check_idempotence(
self,
*,
request: BuildRequest,
manifest_path: Path,
sorted_tiles: tuple[TileHashRecord, ...],
) -> str | None:
"""Return the existing Manifest's hash if the request is idempotent.
Reads the existing Manifest's recorded artifacts WITHOUT verifying
signatures (AZ-324's job). Reconstructs the engine entries from
the listing, recomputes the build-identity hash with the AZ-323
formula, compares to ``build.manifest_hash``. AC-2 guarantees:
no calls to ``compile_engines_for_corpus``,
``populate_descriptors``, or ``build_manifest`` on this path.
"""
if not manifest_path.exists():
return None
try:
body = orjson.loads(manifest_path.read_bytes())
except (orjson.JSONDecodeError, OSError):
return None
build_block = body.get("build")
if not isinstance(build_block, dict):
return None
existing_hash = build_block.get("manifest_hash")
if not isinstance(existing_hash, str) or len(existing_hash) != 64:
return None
artifacts = body.get("artifacts")
if not isinstance(artifacts, dict):
return None
listed_engines = artifacts.get("engines")
descriptor_index_block = artifacts.get("descriptor_index")
if not isinstance(listed_engines, list):
return None
if not isinstance(descriptor_index_block, dict):
return None
descriptor_index_sha256 = descriptor_index_block.get("sha256")
if not isinstance(descriptor_index_sha256, str):
return None
# Predict the engine paths the new request would produce. If
# any predicted path is missing from the listing, the previous
# cache was built for a different backbone / host / precision —
# not idempotent.
predicted_paths = sorted(
str(self._predict_engine_path(bb, request.cache_root))
for bb in self._backbones
)
listed_path_strs = sorted(
str(e.get("path", ""))
for e in listed_engines
if isinstance(e, dict) and isinstance(e.get("path"), str)
)
if predicted_paths != listed_path_strs:
return None
engine_entries: list[EngineCacheEntry] = []
for entry in listed_engines:
if not isinstance(entry, dict):
return None
path = entry.get("path")
sha = entry.get("sha256")
if not isinstance(path, str) or not isinstance(sha, str):
return None
engine_entries.append(
EngineCacheEntry(
engine_path=Path(path),
sha256_hex=sha,
sm=self._host.sm,
jp=self._host.jetpack,
trt=self._host.trt,
precision=self._precision,
extras={},
)
)
try:
calibration_bytes = request.calibration_path.read_bytes()
except OSError:
return None
calibration_sha256 = hashlib.sha256(calibration_bytes).hexdigest()
tiles_coverage_sha256 = _aggregate_tile_hash(sorted_tiles)
request_hash = _compute_manifest_hash(
engine_entries=tuple(engine_entries),
calibration_sha256=calibration_sha256,
descriptor_index_sha256=descriptor_index_sha256,
tiles_coverage_sha256=tiles_coverage_sha256,
sector_class=request.sector_class.value,
bbox=request.bbox,
zoom_levels=request.zoom_levels,
takeoff_origin=request.takeoff_origin,
flight_id=request.flight_id,
)
if request_hash == existing_hash:
return existing_hash
return None
def _compose_engine_request(
self, request: BuildRequest
) -> EngineCompileRequest:
return EngineCompileRequest(
backbones=self._backbones,
calibration_path=request.calibration_path,
cache_root=request.cache_root,
precision=self._precision,
host=self._host,
workspace_mb=self._workspace_mb,
)
def _predict_engine_path(
self, backbone: BackboneSpec, cache_root: Path
) -> Path:
filename = EngineFilenameSchema.build(
model_name=backbone.model_name,
sm=self._host.sm,
jetpack=self._host.jetpack,
trt=self._host.trt,
precision=self._precision.value,
)
return cache_root / filename
def _derive_descriptor_index_path(self, request: BuildRequest) -> Path:
return request.cache_root / "corpus.index"
@staticmethod
def _count_outcomes(
results: tuple[EngineCompileResult, ...],
) -> tuple[int, int]:
built = sum(1 for r in results if r.outcome is CompileOutcome.BUILT)
reused = sum(1 for r in results if r.outcome is CompileOutcome.REUSED)
return built, reused
def _snapshot_prior_manifest(
self, manifest_path: Path, prev_path: Path
) -> bool:
"""Rename existing Manifest to the .prev rollback path. Return True if a prior existed."""
if not manifest_path.exists():
return False
if prev_path.exists():
# Rebuilds aren't stack-able (CP-INV-2 docs); a stale .prev
# from a previous interrupted run is replaced silently.
try:
prev_path.unlink()
except OSError:
pass
manifest_path.rename(prev_path)
return True
def _restore_prior_manifest(
self,
manifest_path: Path,
prev_path: Path,
prior_existed: bool,
) -> None:
"""Roll back to the .prev snapshot. Best-effort cleanup of partial Manifest."""
if manifest_path.exists():
try:
manifest_path.unlink()
except OSError:
# Leave partial Manifest if unlink fails — the verifier
# at takeoff will reject it; the operator sees the
# explicit ERROR log we emit at the call site.
pass
if prior_existed and prev_path.exists():
prev_path.rename(manifest_path)
def _cleanup_prev(self, prev_path: Path) -> None:
if prev_path.exists():
try:
prev_path.unlink()
except OSError as exc:
self._log.warning(
f"{_LOG_KIND_PREFIX}.prev.cleanup",
extra={
"kind": f"{_LOG_KIND_PREFIX}.prev.cleanup",
"kv": {"path": str(prev_path), "reason": str(exc)},
},
)
def _verify_coverage(
self,
*,
cache_root: Path,
manifest_path: Path,
engine_entries: tuple[EngineCacheEntry, ...],
descriptor_index_path: Path,
calibration_path: Path,
) -> None:
"""Walk ``cache_root`` and ensure no orphan files exist (CP-INV-3).
Excludes the Manifest itself, its sidecars, the lockfile, the
``.prev`` rollback, and any ``.sha256`` sidecar (the helper
atomic-write contract pairs each primary file with a sidecar
of the same name + ``.sha256`` suffix; the listing in the
Manifest references only the primary).
"""
manifest_filename = manifest_path.name
excluded_names = {
manifest_filename,
f"{manifest_filename}{_MANIFEST_SHA256_SUFFIX}",
f"{manifest_filename}{_MANIFEST_SIG_SUFFIX}",
f"{manifest_filename}{_MANIFEST_PREV_SUFFIX}",
_LOCK_FILENAME,
}
expected_paths: set[Path] = set()
for entry in engine_entries:
expected_paths.add(Path(entry.engine_path).resolve())
expected_paths.add(descriptor_index_path.resolve())
expected_paths.add(calibration_path.resolve())
walked: set[Path] = set()
for path in cache_root.rglob("*"):
if not path.is_file():
continue
if path.name in excluded_names:
continue
if path.suffix == _MANIFEST_SHA256_SUFFIX:
# SHA-256 sidecar is implicit per AZ-280 atomic-write
# contract — the primary file is what the Manifest
# lists; the sidecar is paired by convention.
continue
walked.add(path.resolve())
orphans = walked - expected_paths
if not orphans:
return
if self._config.coverage_strict:
self._log.error(
f"{_LOG_KIND_PREFIX}.coverage.orphans",
extra={
"kind": f"{_LOG_KIND_PREFIX}.coverage.orphans",
"kv": {
"orphans": sorted(str(p) for p in orphans),
"cache_root": str(cache_root),
},
},
)
raise ManifestCoverageError(
"orphan files in cache_root not listed in Manifest: "
f"{sorted(str(p) for p in orphans)!r}"
)
self._log.warning(
f"{_LOG_KIND_PREFIX}.coverage.orphans.lenient",
extra={
"kind": f"{_LOG_KIND_PREFIX}.coverage.orphans.lenient",
"kv": {
"orphans": sorted(str(p) for p in orphans),
"cache_root": str(cache_root),
},
},
)
def _elapsed_s(self, run_started_ns: int) -> float:
return max(0.0, (self._clock.monotonic_ns() - run_started_ns) / 1e9)