import io import os cimport constants import boto3 cdef class CDNCredentials: def __init__(self, host, downloader_access_key, downloader_access_secret, uploader_access_key, uploader_access_secret): self.host = host self.downloader_access_key = downloader_access_key self.downloader_access_secret = downloader_access_secret self.uploader_access_key = uploader_access_key self.uploader_access_secret = uploader_access_secret cdef class CDNManager: def __init__(self, CDNCredentials credentials): self.creds = credentials self.download_client = boto3.client('s3', endpoint_url=self.creds.host, aws_access_key_id=self.creds.downloader_access_key, aws_secret_access_key=self.creds.downloader_access_secret) self.upload_client = boto3.client('s3', endpoint_url=self.creds.host, aws_access_key_id=self.creds.uploader_access_key, aws_secret_access_key=self.creds.uploader_access_secret) cdef upload(self, str bucket, str filename, bytes file_bytes): try: self.upload_client.upload_fileobj(io.BytesIO(file_bytes), bucket, filename) constants.log(f'uploaded {filename} ({len(file_bytes)} bytes) to the {bucket}') return True except Exception as e: constants.logerror(e) return False cdef download(self, str folder, str filename): try: os.makedirs(folder, exist_ok=True) self.download_client.download_file(folder, filename, f'{folder}\\{filename}') constants.log(f'downloaded {filename} from the {folder}') return True except Exception as e: constants.logerror(e) return False