"""Atomic-write + SHA-256 sidecar helper (D-C10-3 / E-CC-HELPERS / AZ-280). Implements the `sha256_sidecar` contract v1.0.0 at `_docs/02_document/contracts/shared_helpers/sha256_sidecar.md`. Stateless static-only design (per coderule ยง static methods are appropriate only for pure, self-contained computations and well-bounded I/O). Atomic write is implemented via ``atomicwrites.atomic_write`` which uses the temp-file -> ``os.replace`` pattern. Verification recomputes the digest from the file's bytes; the sidecar value is consulted only as the "expected" side of the equality check. """ from __future__ import annotations import hashlib from pathlib import Path from atomicwrites import atomic_write __all__ = ["Sha256Sidecar", "Sha256SidecarError"] _SIDECAR_SUFFIX = ".sha256" _DIGEST_BYTES = 32 # SHA-256 _DIGEST_HEX_LEN = _DIGEST_BYTES * 2 class Sha256SidecarError(RuntimeError): """Raised by `Sha256Sidecar` on any sidecar / atomicity / aggregate failure. Wraps the underlying `OSError` (or `ValueError`) so callers only ever handle one exception hierarchy from the helper. """ def _sidecar_path(payload_path: Path) -> Path: """Return ``.sha256`` โ€” always appended verbatim to the full path string. `Path.with_suffix` would re-interpret an existing extension; we want a pure append so ``manifest`` -> ``manifest.sha256`` and ``engine.engine`` -> ``engine.engine.sha256``. """ return Path(str(payload_path) + _SIDECAR_SUFFIX) def _digest_bytes(payload: bytes) -> str: return hashlib.sha256(payload).hexdigest() def _digest_file(payload_path: Path) -> str: """Stream-hash a file from disk so we never trust the in-memory copy.""" hasher = hashlib.sha256() with payload_path.open("rb") as fh: while True: chunk = fh.read(1024 * 1024) if not chunk: break hasher.update(chunk) return hasher.hexdigest() def _validate_sidecar_text(sidecar_text: str) -> str: """Return the cleaned hex digest or raise `Sha256SidecarError`.""" if len(sidecar_text) != _DIGEST_HEX_LEN: raise Sha256SidecarError( f"malformed sidecar: expected exactly {_DIGEST_HEX_LEN} hex chars, " f"got {len(sidecar_text)} bytes (content: {sidecar_text!r})" ) try: int(sidecar_text, 16) except ValueError as exc: raise Sha256SidecarError( f"malformed sidecar: not a hex digest ({sidecar_text!r}): {exc}" ) from exc if sidecar_text.lower() != sidecar_text: raise Sha256SidecarError( f"malformed sidecar: hex digest must be lowercase ({sidecar_text!r})" ) return sidecar_text class Sha256Sidecar: """Atomic-write + SHA-256 sidecar facade. Static-only by design โ€” no per-call state is meaningful. Atomicity and verification invariants are documented at the contract level. """ @staticmethod def write_atomic(path: Path, payload: bytes) -> str: """Atomically write `payload` to `path`; return its SHA-256 hex digest.""" digest = _digest_bytes(payload) try: with atomic_write(str(path), mode="wb", overwrite=True) as fh: fh.write(payload) except OSError as exc: raise Sha256SidecarError(f"write_atomic: failed to write {path}: {exc}") from exc return digest @staticmethod def write_atomic_and_sidecar(path: Path, payload: bytes) -> str: """Atomically write `payload` and its `.sha256` sidecar. Both writes go through the temp-file + rename atomic-write pattern. Returns the hex digest that was written. """ digest = Sha256Sidecar.write_atomic(path, payload) sidecar = _sidecar_path(path) try: with atomic_write(str(sidecar), mode="w", overwrite=True) as fh: fh.write(digest) except OSError as exc: raise Sha256SidecarError( f"write_atomic_and_sidecar: failed to write sidecar at {sidecar}: {exc}" ) from exc return digest @staticmethod def verify(path: Path) -> bool: """Recompute the on-disk SHA-256 and compare with the sidecar. Returns False if `path` is missing entirely (a missing artifact is "not verifiable" rather than an error in the verification contract โ€” callers can branch on `path.exists()` first if they need to distinguish). Raises `Sha256SidecarError` if `path` exists but the sidecar is missing or malformed. """ if not path.exists(): return False sidecar = _sidecar_path(path) if not sidecar.exists(): raise Sha256SidecarError(f"verify: sidecar missing for {path} (expected at {sidecar})") try: sidecar_text = sidecar.read_text() except OSError as exc: raise Sha256SidecarError(f"verify: cannot read sidecar at {sidecar}: {exc}") from exc expected = _validate_sidecar_text(sidecar_text) try: actual = _digest_file(path) except OSError as exc: raise Sha256SidecarError(f"verify: cannot read payload at {path}: {exc}") from exc return actual == expected @staticmethod def aggregate_hash(paths: list[Path]) -> str: """Order-deterministic SHA-256 over many files (Manifest aggregate). Inputs are sorted by full path (case-sensitive) before hashing, so two runs over the same set produce byte-equal digests. The aggregate is the SHA-256 of the concatenation of ``\\0\\n`` lines. """ sorted_paths = sorted(paths, key=lambda p: str(p)) hasher = hashlib.sha256() for path in sorted_paths: if not path.exists(): raise Sha256SidecarError(f"aggregate_hash: missing path in input: {path}") try: digest = _digest_file(path) except OSError as exc: raise Sha256SidecarError(f"aggregate_hash: cannot read {path}: {exc}") from exc hasher.update(path.name.encode("utf-8")) hasher.update(b"\0") hasher.update(digest.encode("ascii")) hasher.update(b"\n") return hasher.hexdigest() # Public constant for callers that need to spell the sidecar suffix # explicitly (e.g. takeoff-load verifier listing). SIDECAR_SUFFIX = _SIDECAR_SUFFIX