Files
loader/scripts/publish_artifact.py
T
Oleksandr Bezdieniezhnykh 9a0248af72 [AZ-185][AZ-186] Batch 2
Made-with: Cursor
2026-04-15 07:32:37 +03:00

200 lines
5.8 KiB
Python

import argparse
import gzip
import hashlib
import logging
import os
import secrets
import shutil
import sys
import tempfile
from typing import Any, Dict, List, Optional
import boto3
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
logger = logging.getLogger(__name__)
_DEFAULT_PUBLISH_PATH = "/internal/resources/publish"
def _require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise ValueError(f"missing required environment variable: {name}")
return value
def object_key(dev_stage: str, resource_name: str, architecture: str, version: str) -> str:
return f"{dev_stage}/{resource_name}-{architecture}-{version}.enc"
def build_cdn_url(endpoint: str, bucket: str, key: str) -> str:
public_base = os.environ.get("CDN_PUBLIC_BASE_URL")
if public_base:
return f"{public_base.rstrip('/')}/{key}"
return f"{endpoint.rstrip('/')}/{bucket}/{key}"
def gzip_file(source_path: str, destination_path: str) -> None:
with open(source_path, "rb") as src, gzip.open(
destination_path, "wb", compresslevel=9
) as dst:
shutil.copyfileobj(src, dst, length=1024 * 1024)
def encrypt_aes256_cbc_file(plaintext_path: str, ciphertext_path: str, aes_key: bytes) -> None:
if len(aes_key) != 32:
raise ValueError("aes key must be 32 bytes")
iv = os.urandom(16)
cipher = Cipher(
algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend()
)
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
with open(ciphertext_path, "wb") as out:
out.write(iv)
with open(plaintext_path, "rb") as inp:
while True:
chunk = inp.read(1024 * 1024)
if not chunk:
break
padded = padder.update(chunk)
if padded:
out.write(encryptor.update(padded))
tail = padder.finalize()
if tail:
out.write(encryptor.update(tail))
out.write(encryptor.finalize())
def sha256_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
block = f.read(1024 * 1024)
if not block:
break
h.update(block)
return h.hexdigest().lower()
def upload_s3_file(
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
key: str,
file_path: str,
) -> None:
client = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
with open(file_path, "rb") as body:
client.upload_fileobj(body, bucket, key)
def register_resource(
admin_base_url: str,
token: str,
payload: Dict[str, Any],
) -> None:
path = os.environ.get("ADMIN_API_PUBLISH_PATH", _DEFAULT_PUBLISH_PATH).lstrip("/")
base = admin_base_url.rstrip("/")
url = f"{base}/{path}"
resp = requests.post(
url,
headers={"Authorization": f"Bearer {token}"},
json=payload,
timeout=120,
)
resp.raise_for_status()
def publish(
file_path: str,
resource_name: str,
dev_stage: str,
architecture: str,
version: str,
) -> Dict[str, Any]:
endpoint = _require_env("S3_ENDPOINT")
access_key = _require_env("S3_ACCESS_KEY")
secret_key = _require_env("S3_SECRET_KEY")
bucket = _require_env("S3_BUCKET")
admin_url = _require_env("ADMIN_API_URL")
admin_token = _require_env("ADMIN_API_TOKEN")
key = object_key(dev_stage, resource_name, architecture, version)
aes_key = secrets.token_bytes(32)
encryption_key_hex = aes_key.hex()
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
try:
gzip_file(file_path, gz_path)
encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
digest = sha256_file(enc_path)
size_bytes = os.path.getsize(enc_path)
upload_s3_file(endpoint, access_key, secret_key, bucket, key, enc_path)
cdn_url = build_cdn_url(endpoint, bucket, key)
body = {
"resource_name": resource_name,
"dev_stage": dev_stage,
"architecture": architecture,
"version": version,
"cdn_url": cdn_url,
"sha256": digest,
"encryption_key": encryption_key_hex,
"size_bytes": size_bytes,
}
register_resource(admin_url, admin_token, body)
return {
"object_key": key,
"cdn_url": cdn_url,
"sha256": digest,
"encryption_key_hex": encryption_key_hex,
"size_bytes": size_bytes,
}
finally:
for p in (gz_path, enc_path):
try:
os.unlink(p)
except OSError:
pass
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Compress, encrypt, upload artifact and register resource")
p.add_argument("--file", required=True, help="Path to file to publish")
p.add_argument("--resource-name", required=True)
p.add_argument("--dev-stage", required=True)
p.add_argument("--architecture", required=True)
p.add_argument("--version", required=True)
return p.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(argv if argv is not None else sys.argv[1:])
try:
publish(
args.file,
args.resource_name,
args.dev_stage,
args.architecture,
args.version,
)
return 0
except Exception:
logger.exception("publish failed")
return 1
if __name__ == "__main__":
sys.exit(main())