"""C10 ``CacheProvisionerImpl`` — top-level F1 orchestrator (AZ-325). Composes :class:`EngineCompiler` (AZ-321), :class:`DescriptorBatcher` (AZ-322), and :class:`ManifestBuilder` (AZ-323) into the public contract surface specified by ``_docs/02_document/contracts/c10_provisioning/cache_provisioner.md``. Design highlights: * CP-INV-4 mutual exclusion is enforced via a ``cache_root/.c10.lock`` filesystem lockfile acquired through the injected :class:`FileLockFactory`. The default impl uses the ``filelock`` package (fcntl-backed → auto-released on process exit, AC-8 SIGKILL recovery). * D-C10-1 idempotence is decided by reading the existing ``Manifest.json``'s recorded ``build.manifest_hash`` and recomputing the same hash for the new request. Because AZ-323's hash includes engine + descriptor-index SHA-256 (which are build outputs), the warm path reads the existing Manifest's listed artifacts to reconstruct the inputs the AZ-323 helper needs. AC-2 forbids any call to ``compile_engines_for_corpus`` / ``populate_descriptors`` / ``build_manifest`` on this path; tiles are queried via the C6 metadata store only (cheap) so the predicted engine paths can be checked against the recorded set. * D-C10-3 / CP-INV-3 coverage walk runs after a SUCCESS build: every regular file under ``cache_root`` (excluding the Manifest itself, its sidecars, the lockfile, and the ``.prev`` rollback) MUST be listed in the new Manifest's ``artifacts`` block. Orphans → roll back to the prior-good Manifest and raise :class:`ManifestCoverageError`. * Lock release is unconditional (try/finally) on every exit path — SUCCESS, FAILURE, IDEMPOTENT_NO_OP, ``ManifestCoverageError``, and any propagated exception from the inner phases. AC-8 verifies this by re-acquiring the lock after each error path. Cross-component imports: this module never imports ``components.c6_*`` directly. Tile metadata access goes through the :class:`TilesByBboxQuery` consumer-side cut already defined in ``manifest_builder.py`` for AZ-323; the composition root (``runtime_root.c10_factory.build_cache_provisioner``) wires the real C6 store into the same adapter the AZ-323 builder consumes. The build-identity hash formula matches AZ-323's emitted ``build.manifest_hash`` byte-for-byte. AZ-323 / AZ-324 / AZ-325 all share a single definition by importing :func:`aggregate_tile_hash` and :func:`compute_manifest_hash` from ``components.c10_provisioning._canonical_hash``. Resolves cumulative- review Finding F1 (batches 34–36) — the verifier and provisioner used to import leading-underscore privates from ``manifest_builder``. """ from __future__ import annotations import hashlib import logging from contextlib import AbstractContextManager from dataclasses import dataclass from pathlib import Path import orjson from filelock import FileLock, Timeout as FileLockTimeout from gps_denied_onboard._types.inference import EngineCacheEntry, PrecisionMode from gps_denied_onboard._types.manifests import HostCapabilities from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c10_provisioning.config import ( C10ProvisionerConfig, ) from gps_denied_onboard.components.c10_provisioning.descriptor_batcher import ( BatcherOutcome, CorpusFilter, DescriptorBatcher, ) from gps_denied_onboard.components.c10_provisioning.engine_compiler import ( BackboneSpec, EngineCompileRequest, EngineCompileResult, EngineCompiler, CompileOutcome, ) from gps_denied_onboard.components.c10_provisioning.errors import ( BuildLockHeldError, ManifestCoverageError, ) from gps_denied_onboard.components.c10_provisioning.interface import ( BuildOutcome, BuildReport, BuildRequest, FileLockFactory, ) from gps_denied_onboard.components.c10_provisioning._canonical_hash import ( TileHashRecord, aggregate_tile_hash, compute_manifest_hash, ) from gps_denied_onboard.components.c10_provisioning.manifest_builder import ( ManifestBuildInput, ManifestBuilder, TilesByBboxQuery, ) from gps_denied_onboard.helpers.engine_filename_schema import ( EngineFilenameSchema, ) __all__ = [ "CacheProvisionerImpl", "FilelockFileLockFactory", ] _LOG_KIND_PREFIX = "c10.provision" _LOCK_FILENAME = ".c10.lock" _MANIFEST_PREV_SUFFIX = ".prev" _MANIFEST_SHA256_SUFFIX = ".sha256" _MANIFEST_SIG_SUFFIX = ".sig" # Filenames excluded from the coverage walk because they are the Manifest # itself, its sidecars, the lockfile, or the rollback snapshot. Compared # as exact string suffixes against ``Path.name``. _COVERAGE_EXCLUDED_NAMES: frozenset[str] = frozenset() # populated at construction @dataclass(frozen=True) class _LockGuard(AbstractContextManager["_LockGuard"]): """Context-manager wrapper that re-raises the contract's typed error. The default :class:`FilelockFileLockFactory` returns one of these so callers can unconditionally ``with`` the result; an acquisition timeout raises :class:`BuildLockHeldError` instead of leaking ``filelock.Timeout`` upward. """ lock: FileLock timeout_s: float path: Path def __enter__(self) -> "_LockGuard": try: self.lock.acquire(timeout=self.timeout_s) except FileLockTimeout as exc: raise BuildLockHeldError( f"another build holds the lockfile at {self.path}" ) from exc return self def __exit__(self, exc_type, exc, tb) -> None: try: self.lock.release() finally: # Best-effort lockfile removal so the cache_root listing # is clean after a successful build. ``filelock`` itself # does not delete the file; the SIGKILL-safety guarantee # is at the fcntl-flock layer (kernel releases the # advisory lock on process exit even if the file # persists). try: self.path.unlink() except FileNotFoundError: pass except OSError as exc_unlink: # Cleanup failure is non-fatal — the lock has been # released; leftover lockfile bytes are harmless on # the next acquisition (filelock re-uses the file). # Surface at WARN so operators see persistent # filesystem permission issues. logging.getLogger("c10_provisioning.lock").warning( f"{_LOG_KIND_PREFIX}.lock.cleanup", extra={ "kind": f"{_LOG_KIND_PREFIX}.lock.cleanup", "kv": {"path": str(self.path), "reason": str(exc_unlink)}, }, ) class FilelockFileLockFactory: """Default :class:`FileLockFactory` impl using the ``filelock`` package. Uses ``filelock.FileLock`` which wraps ``fcntl.flock`` on POSIX (auto-released on process exit, satisfying the SIGKILL clause of AC-8) and ``msvcrt`` locks on Windows. The non-blocking timeout is forwarded to ``acquire(timeout=...)``; on timeout the wrapper re-raises as :class:`BuildLockHeldError` per the contract. """ def try_lock( self, path: Path, *, timeout_s: float ) -> AbstractContextManager[None]: return _LockGuard( lock=FileLock(str(path)), timeout_s=timeout_s, path=path, ) class CacheProvisionerImpl: """Default implementation of the :class:`CacheProvisioner` Protocol. Constructor injection only — no side effects in ``__init__`` other than naming the structured logger. The composition root assembles every collaborator and the orchestrator wires them in the order the contract dictates. The orchestrator deliberately does NOT cache references to intermediate state across calls; every ``build_cache_artifacts`` invocation is a fresh transaction guarded by the lockfile. """ def __init__( self, *, engine_compiler: EngineCompiler, descriptor_batcher: DescriptorBatcher, manifest_builder: ManifestBuilder, tile_metadata_store: TilesByBboxQuery, lock_factory: FileLockFactory, backbones: tuple[BackboneSpec, ...], host: HostCapabilities, precision: PrecisionMode, workspace_mb: int, logger: logging.Logger, clock: Clock, config: C10ProvisionerConfig, ) -> None: self._engine_compiler = engine_compiler self._descriptor_batcher = descriptor_batcher self._manifest_builder = manifest_builder self._tiles_query = tile_metadata_store self._lock_factory = lock_factory self._backbones = backbones self._host = host self._precision = precision self._workspace_mb = workspace_mb self._log = logger self._clock = clock self._config = config # ------------------------------------------------------------------ # Public surface # ------------------------------------------------------------------ def build_cache_artifacts(self, request: BuildRequest) -> BuildReport: run_started_ns = self._clock.monotonic_ns() manifest_path = request.cache_root / self._config.manifest_filename prev_path = manifest_path.with_suffix( manifest_path.suffix + _MANIFEST_PREV_SUFFIX ) lock_path = request.cache_root / _LOCK_FILENAME request.cache_root.mkdir(parents=True, exist_ok=True) with self._lock_factory.try_lock( lock_path, timeout_s=self._config.lock_timeout_s ): self._log.info( f"{_LOG_KIND_PREFIX}.lock.acquired", extra={ "kind": f"{_LOG_KIND_PREFIX}.lock.acquired", "kv": {"path": str(lock_path)}, }, ) sorted_tiles = self._fetch_sorted_tiles(request) if not sorted_tiles: return self._build_failure_empty_corpus(request, run_started_ns) idempotent_hash = self._check_idempotence( request=request, manifest_path=manifest_path, sorted_tiles=sorted_tiles, ) if idempotent_hash is not None: elapsed_s = self._elapsed_s(run_started_ns) self._log.info( f"{_LOG_KIND_PREFIX}.idempotent.no_op", extra={ "kind": f"{_LOG_KIND_PREFIX}.idempotent.no_op", "kv": { "manifest_hash": idempotent_hash, "elapsed_s": elapsed_s, }, }, ) return BuildReport( outcome=BuildOutcome.IDEMPOTENT_NO_OP, engines_built=0, engines_reused=0, descriptors_generated=0, manifest_hash=idempotent_hash, manifest_path=manifest_path, failure_reason=None, elapsed_s=elapsed_s, ) return self._run_active_build( request=request, manifest_path=manifest_path, prev_path=prev_path, run_started_ns=run_started_ns, ) def compile_engines_for_corpus( self, request: EngineCompileRequest ) -> tuple[EngineCacheEntry, ...]: """Diagnostic-mode passthrough — re-compile engines without touching descriptors / Manifest. Per CP-TC-11 / AC-10 this is a thin forwarder. It does NOT acquire the lockfile (the operator runs this for engine-only re-compile flows after a hardware change, where the orchestrator's full transaction would be overkill). The return value is the underlying compiler's ``EngineCompileResult.entry`` projected as the contract's ``tuple[EngineCacheEntry, ...]``. """ results = self._engine_compiler.compile_engines_for_corpus(request) return tuple(result.entry for result in results) # ------------------------------------------------------------------ # Internals — active build path # ------------------------------------------------------------------ def _run_active_build( self, *, request: BuildRequest, manifest_path: Path, prev_path: Path, run_started_ns: int, ) -> BuildReport: prior_existed = self._snapshot_prior_manifest(manifest_path, prev_path) try: engine_results = self._engine_compiler.compile_engines_for_corpus( self._compose_engine_request(request) ) except Exception: self._restore_prior_manifest(manifest_path, prev_path, prior_existed) raise engines_built, engines_reused = self._count_outcomes(engine_results) engine_entries = tuple(result.entry for result in engine_results) try: descriptor_report = self._descriptor_batcher.populate_descriptors( CorpusFilter( bbox=( request.bbox.min_lat_deg, request.bbox.min_lon_deg, request.bbox.max_lat_deg, request.bbox.max_lon_deg, ), zoom_levels=request.zoom_levels, sector_class=request.sector_class.value, ) ) except Exception: self._restore_prior_manifest(manifest_path, prev_path, prior_existed) raise if descriptor_report.outcome is not BatcherOutcome.SUCCESS: self._restore_prior_manifest(manifest_path, prev_path, prior_existed) elapsed_s = self._elapsed_s(run_started_ns) self._log.error( f"{_LOG_KIND_PREFIX}.descriptor.failure", extra={ "kind": f"{_LOG_KIND_PREFIX}.descriptor.failure", "kv": { "failure_reason": descriptor_report.failure_reason, "elapsed_s": elapsed_s, }, }, ) return BuildReport( outcome=BuildOutcome.FAILURE, engines_built=engines_built, engines_reused=engines_reused, descriptors_generated=0, manifest_hash=None, manifest_path=None, failure_reason=descriptor_report.failure_reason, elapsed_s=elapsed_s, ) descriptor_index_path = self._derive_descriptor_index_path(request) try: manifest_artifact = self._manifest_builder.build_manifest( ManifestBuildInput( cache_root=request.cache_root, bbox=request.bbox, zoom_levels=request.zoom_levels, sector_class=request.sector_class.value, engine_entries=engine_entries, descriptor_index_path=descriptor_index_path, calibration_path=request.calibration_path, key_path=request.key_path, takeoff_origin=request.takeoff_origin, flight_id=request.flight_id, ) ) except Exception: self._restore_prior_manifest(manifest_path, prev_path, prior_existed) raise try: self._verify_coverage( cache_root=request.cache_root, manifest_path=manifest_path, engine_entries=engine_entries, descriptor_index_path=descriptor_index_path, calibration_path=request.calibration_path, ) except ManifestCoverageError: self._restore_prior_manifest(manifest_path, prev_path, prior_existed) raise self._cleanup_prev(prev_path) elapsed_s = self._elapsed_s(run_started_ns) self._log.info( f"{_LOG_KIND_PREFIX}.build.success", extra={ "kind": f"{_LOG_KIND_PREFIX}.build.success", "kv": { "manifest_hash": manifest_artifact.manifest_hash, "engines_built": engines_built, "engines_reused": engines_reused, "descriptors_generated": descriptor_report.descriptors_generated, "elapsed_s": elapsed_s, }, }, ) return BuildReport( outcome=BuildOutcome.SUCCESS, engines_built=engines_built, engines_reused=engines_reused, descriptors_generated=descriptor_report.descriptors_generated, manifest_hash=manifest_artifact.manifest_hash, manifest_path=manifest_artifact.manifest_path, failure_reason=None, elapsed_s=elapsed_s, ) # ------------------------------------------------------------------ # Internals — helpers # ------------------------------------------------------------------ def _fetch_sorted_tiles( self, request: BuildRequest ) -> tuple[TileHashRecord, ...]: raw = tuple( self._tiles_query.query_by_bbox( bbox=request.bbox, zoom_levels=request.zoom_levels, sector_class=request.sector_class.value, ) ) return tuple( sorted(raw, key=lambda r: (r.zoom, r.lat, r.lon, r.source)) ) def _build_failure_empty_corpus( self, request: BuildRequest, run_started_ns: int ) -> BuildReport: elapsed_s = self._elapsed_s(run_started_ns) reason = ( "no tiles in C6 for the requested scope; run C11 " "TileDownloader first" ) self._log.error( f"{_LOG_KIND_PREFIX}.empty.corpus", extra={ "kind": f"{_LOG_KIND_PREFIX}.empty.corpus", "kv": { "bbox": [ request.bbox.min_lat_deg, request.bbox.min_lon_deg, request.bbox.max_lat_deg, request.bbox.max_lon_deg, ], "zoom_levels": list(request.zoom_levels), "sector_class": request.sector_class.value, "elapsed_s": elapsed_s, }, }, ) return BuildReport( outcome=BuildOutcome.FAILURE, engines_built=0, engines_reused=0, descriptors_generated=0, manifest_hash=None, manifest_path=None, failure_reason=reason, elapsed_s=elapsed_s, ) def _check_idempotence( self, *, request: BuildRequest, manifest_path: Path, sorted_tiles: tuple[TileHashRecord, ...], ) -> str | None: """Return the existing Manifest's hash if the request is idempotent. Reads the existing Manifest's recorded artifacts WITHOUT verifying signatures (AZ-324's job). Reconstructs the engine entries from the listing, recomputes the build-identity hash with the AZ-323 formula, compares to ``build.manifest_hash``. AC-2 guarantees: no calls to ``compile_engines_for_corpus``, ``populate_descriptors``, or ``build_manifest`` on this path. """ if not manifest_path.exists(): return None try: body = orjson.loads(manifest_path.read_bytes()) except (orjson.JSONDecodeError, OSError): return None build_block = body.get("build") if not isinstance(build_block, dict): return None existing_hash = build_block.get("manifest_hash") if not isinstance(existing_hash, str) or len(existing_hash) != 64: return None artifacts = body.get("artifacts") if not isinstance(artifacts, dict): return None listed_engines = artifacts.get("engines") descriptor_index_block = artifacts.get("descriptor_index") if not isinstance(listed_engines, list): return None if not isinstance(descriptor_index_block, dict): return None descriptor_index_sha256 = descriptor_index_block.get("sha256") if not isinstance(descriptor_index_sha256, str): return None # Predict the engine paths the new request would produce. If # any predicted path is missing from the listing, the previous # cache was built for a different backbone / host / precision — # not idempotent. predicted_paths = sorted( str(self._predict_engine_path(bb, request.cache_root)) for bb in self._backbones ) listed_path_strs = sorted( str(e.get("path", "")) for e in listed_engines if isinstance(e, dict) and isinstance(e.get("path"), str) ) if predicted_paths != listed_path_strs: return None engine_entries: list[EngineCacheEntry] = [] for entry in listed_engines: if not isinstance(entry, dict): return None path = entry.get("path") sha = entry.get("sha256") if not isinstance(path, str) or not isinstance(sha, str): return None engine_entries.append( EngineCacheEntry( engine_path=Path(path), sha256_hex=sha, sm=self._host.sm, jp=self._host.jetpack, trt=self._host.trt, precision=self._precision, extras={}, ) ) try: calibration_bytes = request.calibration_path.read_bytes() except OSError: return None calibration_sha256 = hashlib.sha256(calibration_bytes).hexdigest() tiles_coverage_sha256 = aggregate_tile_hash(sorted_tiles) request_hash = compute_manifest_hash( engine_entries=tuple(engine_entries), calibration_sha256=calibration_sha256, descriptor_index_sha256=descriptor_index_sha256, tiles_coverage_sha256=tiles_coverage_sha256, sector_class=request.sector_class.value, bbox=request.bbox, zoom_levels=request.zoom_levels, takeoff_origin=request.takeoff_origin, flight_id=request.flight_id, ) if request_hash == existing_hash: return existing_hash return None def _compose_engine_request( self, request: BuildRequest ) -> EngineCompileRequest: return EngineCompileRequest( backbones=self._backbones, calibration_path=request.calibration_path, cache_root=request.cache_root, precision=self._precision, host=self._host, workspace_mb=self._workspace_mb, ) def _predict_engine_path( self, backbone: BackboneSpec, cache_root: Path ) -> Path: filename = EngineFilenameSchema.build( model_name=backbone.model_name, sm=self._host.sm, jetpack=self._host.jetpack, trt=self._host.trt, precision=self._precision.value, ) return cache_root / filename def _derive_descriptor_index_path(self, request: BuildRequest) -> Path: return request.cache_root / "corpus.index" @staticmethod def _count_outcomes( results: tuple[EngineCompileResult, ...], ) -> tuple[int, int]: built = sum(1 for r in results if r.outcome is CompileOutcome.BUILT) reused = sum(1 for r in results if r.outcome is CompileOutcome.REUSED) return built, reused def _snapshot_prior_manifest( self, manifest_path: Path, prev_path: Path ) -> bool: """Rename existing Manifest to the .prev rollback path. Return True if a prior existed.""" if not manifest_path.exists(): return False if prev_path.exists(): # Rebuilds aren't stack-able (CP-INV-2 docs); a stale .prev # from a previous interrupted run is replaced silently. try: prev_path.unlink() except OSError: pass manifest_path.rename(prev_path) return True def _restore_prior_manifest( self, manifest_path: Path, prev_path: Path, prior_existed: bool, ) -> None: """Roll back to the .prev snapshot. Best-effort cleanup of partial Manifest.""" if manifest_path.exists(): try: manifest_path.unlink() except OSError: # Leave partial Manifest if unlink fails — the verifier # at takeoff will reject it; the operator sees the # explicit ERROR log we emit at the call site. pass if prior_existed and prev_path.exists(): prev_path.rename(manifest_path) def _cleanup_prev(self, prev_path: Path) -> None: if prev_path.exists(): try: prev_path.unlink() except OSError as exc: self._log.warning( f"{_LOG_KIND_PREFIX}.prev.cleanup", extra={ "kind": f"{_LOG_KIND_PREFIX}.prev.cleanup", "kv": {"path": str(prev_path), "reason": str(exc)}, }, ) def _verify_coverage( self, *, cache_root: Path, manifest_path: Path, engine_entries: tuple[EngineCacheEntry, ...], descriptor_index_path: Path, calibration_path: Path, ) -> None: """Walk ``cache_root`` and ensure no orphan files exist (CP-INV-3). Excludes the Manifest itself, its sidecars, the lockfile, the ``.prev`` rollback, and any ``.sha256`` sidecar (the helper atomic-write contract pairs each primary file with a sidecar of the same name + ``.sha256`` suffix; the listing in the Manifest references only the primary). """ manifest_filename = manifest_path.name excluded_names = { manifest_filename, f"{manifest_filename}{_MANIFEST_SHA256_SUFFIX}", f"{manifest_filename}{_MANIFEST_SIG_SUFFIX}", f"{manifest_filename}{_MANIFEST_PREV_SUFFIX}", _LOCK_FILENAME, } expected_paths: set[Path] = set() for entry in engine_entries: expected_paths.add(Path(entry.engine_path).resolve()) expected_paths.add(descriptor_index_path.resolve()) expected_paths.add(calibration_path.resolve()) walked: set[Path] = set() for path in cache_root.rglob("*"): if not path.is_file(): continue if path.name in excluded_names: continue if path.suffix == _MANIFEST_SHA256_SUFFIX: # SHA-256 sidecar is implicit per AZ-280 atomic-write # contract — the primary file is what the Manifest # lists; the sidecar is paired by convention. continue walked.add(path.resolve()) orphans = walked - expected_paths if not orphans: return if self._config.coverage_strict: self._log.error( f"{_LOG_KIND_PREFIX}.coverage.orphans", extra={ "kind": f"{_LOG_KIND_PREFIX}.coverage.orphans", "kv": { "orphans": sorted(str(p) for p in orphans), "cache_root": str(cache_root), }, }, ) raise ManifestCoverageError( "orphan files in cache_root not listed in Manifest: " f"{sorted(str(p) for p in orphans)!r}" ) self._log.warning( f"{_LOG_KIND_PREFIX}.coverage.orphans.lenient", extra={ "kind": f"{_LOG_KIND_PREFIX}.coverage.orphans.lenient", "kv": { "orphans": sorted(str(p) for p in orphans), "cache_root": str(cache_root), }, }, ) def _elapsed_s(self, run_started_ns: int) -> float: return max(0.0, (self._clock.monotonic_ns() - run_started_ns) / 1e9)