import argparse import gzip import hashlib import logging import os import secrets import shutil import sys import tempfile from typing import Any, Dict, List, Optional import boto3 import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes logger = logging.getLogger(__name__) _DEFAULT_PUBLISH_PATH = "/internal/resources/publish" def _require_env(name: str) -> str: value = os.environ.get(name) if not value: raise ValueError(f"missing required environment variable: {name}") return value def object_key(dev_stage: str, resource_name: str, architecture: str, version: str) -> str: return f"{dev_stage}/{resource_name}-{architecture}-{version}.enc" def build_cdn_url(endpoint: str, bucket: str, key: str) -> str: public_base = os.environ.get("CDN_PUBLIC_BASE_URL") if public_base: return f"{public_base.rstrip('/')}/{key}" return f"{endpoint.rstrip('/')}/{bucket}/{key}" def gzip_file(source_path: str, destination_path: str) -> None: with open(source_path, "rb") as src, gzip.open( destination_path, "wb", compresslevel=9 ) as dst: shutil.copyfileobj(src, dst, length=1024 * 1024) def encrypt_aes256_cbc_file(plaintext_path: str, ciphertext_path: str, aes_key: bytes) -> None: if len(aes_key) != 32: raise ValueError("aes key must be 32 bytes") iv = os.urandom(16) cipher = Cipher( algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() padder = padding.PKCS7(128).padder() with open(ciphertext_path, "wb") as out: out.write(iv) with open(plaintext_path, "rb") as inp: while True: chunk = inp.read(1024 * 1024) if not chunk: break padded = padder.update(chunk) if padded: out.write(encryptor.update(padded)) tail = padder.finalize() if tail: out.write(encryptor.update(tail)) out.write(encryptor.finalize()) def sha256_file(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: while True: block = f.read(1024 * 1024) if not block: break h.update(block) return h.hexdigest().lower() def upload_s3_file( endpoint: str, access_key: str, secret_key: str, bucket: str, key: str, file_path: str, ) -> None: client = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) with open(file_path, "rb") as body: client.upload_fileobj(body, bucket, key) def register_resource( admin_base_url: str, token: str, payload: Dict[str, Any], ) -> None: path = os.environ.get("ADMIN_API_PUBLISH_PATH", _DEFAULT_PUBLISH_PATH).lstrip("/") base = admin_base_url.rstrip("/") url = f"{base}/{path}" resp = requests.post( url, headers={"Authorization": f"Bearer {token}"}, json=payload, timeout=120, ) resp.raise_for_status() def publish( file_path: str, resource_name: str, dev_stage: str, architecture: str, version: str, ) -> Dict[str, Any]: endpoint = _require_env("S3_ENDPOINT") access_key = _require_env("S3_ACCESS_KEY") secret_key = _require_env("S3_SECRET_KEY") bucket = _require_env("S3_BUCKET") admin_url = _require_env("ADMIN_API_URL") admin_token = _require_env("ADMIN_API_TOKEN") key = object_key(dev_stage, resource_name, architecture, version) aes_key = secrets.token_bytes(32) encryption_key_hex = aes_key.hex() gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name try: gzip_file(file_path, gz_path) encrypt_aes256_cbc_file(gz_path, enc_path, aes_key) digest = sha256_file(enc_path) size_bytes = os.path.getsize(enc_path) upload_s3_file(endpoint, access_key, secret_key, bucket, key, enc_path) cdn_url = build_cdn_url(endpoint, bucket, key) body = { "resource_name": resource_name, "dev_stage": dev_stage, "architecture": architecture, "version": version, "cdn_url": cdn_url, "sha256": digest, "encryption_key": encryption_key_hex, "size_bytes": size_bytes, } register_resource(admin_url, admin_token, body) return { "object_key": key, "cdn_url": cdn_url, "sha256": digest, "encryption_key_hex": encryption_key_hex, "size_bytes": size_bytes, } finally: for p in (gz_path, enc_path): try: os.unlink(p) except OSError: pass def parse_args(argv: List[str]) -> argparse.Namespace: p = argparse.ArgumentParser(description="Compress, encrypt, upload artifact and register resource") p.add_argument("--file", required=True, help="Path to file to publish") p.add_argument("--resource-name", required=True) p.add_argument("--dev-stage", required=True) p.add_argument("--architecture", required=True) p.add_argument("--version", required=True) return p.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv if argv is not None else sys.argv[1:]) try: publish( args.file, args.resource_name, args.dev_stage, args.architecture, args.version, ) return 0 except Exception: logger.exception("publish failed") return 1 if __name__ == "__main__": sys.exit(main())