import json import subprocess import psutil import hashlib import base64 import os from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes import requests from io import BytesIO from http import HTTPStatus import random BUFFER_SIZE = 64 * 1024 # 64 KB API_URL = "https://api.azaion.com" class HWInfo: def __init__(self, cpu, gpu, memory, mac_address, hw_hash): self.cpu = cpu self.gpu = gpu self.memory = memory self.mac_address = mac_address self.hash = hw_hash def to_json_object(self): return { "CPU": self.cpu, "GPU": self.gpu, "MacAddress": self.mac_address, "Memory": self.memory, "Hash": self.hash, } def __str__(self): return f'CPU: {self.cpu}. GPU: {self.gpu}. Memory: {self.memory}. MAC Address: {self.mac_address}' @staticmethod def encrypt_to(input_stream, key): aes_key = hashlib.sha256(key.encode('utf-8')).digest() iv = os.urandom(16) cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend()) encryptor = cipher.encryptor() res = bytearray() res.extend(iv) while chunk := input_stream.read(BUFFER_SIZE): encrypted_chunk = encryptor.update(chunk) res.extend(encrypted_chunk) res.extend(encryptor.finalize()) return res @staticmethod def decrypt_to(input_stream, key): aes_key = hashlib.sha256(key.encode('utf-8')).digest() iv = input_stream.read(16) cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() res = bytearray() while chunk := input_stream.read(BUFFER_SIZE): decrypted_chunk = decryptor.update(chunk) res.extend(decrypted_chunk) res.extend(decryptor.finalize()) unpadder = padding.PKCS7(128).unpadder() # AES block size is 128 bits (16 bytes) return unpadder.update(res) + unpadder.finalize() @staticmethod def calc_hash(key): str_bytes = key.encode('utf-8') hash_bytes = hashlib.sha384(str_bytes).digest() h = base64.b64encode(hash_bytes).decode('utf-8') return h class HWService: def __init__(self): try: res = subprocess.check_output("ver", shell=True).decode('utf-8') if "Microsoft Windows" in res: self.is_windows = True else: self.is_windows = False except Exception: print('Error during os type checking') self.is_windows = False def get_mac_address(self, interface="Ethernet"): addresses = psutil.net_if_addrs() for interface_name, interface_info in addresses.items(): if interface_name == interface: for addr in interface_info: if addr.family == psutil.AF_LINK: return addr.address.replace('-', '') return None def get_hardware_info(self): if self.is_windows: os_command = ( "powershell -Command \"" "Get-CimInstance -ClassName Win32_Processor | Select-Object -ExpandProperty Name | Write-Output; " "Get-CimInstance -ClassName Win32_VideoController | Select-Object -ExpandProperty Name | Write-Output; " "Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object -ExpandProperty TotalVisibleMemorySize | Write-Output" "\"" ) else: os_command = ( "/bin/bash -c \" lscpu | grep 'Model name:' | cut -d':' -f2 && " "lspci | grep VGA | cut -d':' -f3 && " "free -g | grep Mem: | awk '{print $2}' && \"" ) result = subprocess.check_output(os_command, shell=True).decode('utf-8') lines = [line.strip() for line in result.splitlines() if line.strip()] cpu = lines[0].replace("Name=", "").replace(" ", " ") gpu = lines[1].replace("Name=", "").replace(" ", " ") memory = lines[2].replace("TotalVisibleMemorySize=", "").replace(" ", " ") mac_address = self.get_mac_address() full_hw_str = f'Azaion_{mac_address}_{cpu}_{gpu}' hw_hash = HWInfo.calc_hash(full_hw_str) return HWInfo(cpu, gpu, memory, mac_address, hw_hash) class Credentials: def __init__(self, email, password, folder): self.email = email self.password = password self.folder = folder class Api: def __init__(self, credentials): self.token = None self.credentials = credentials @staticmethod def create_file(filename, size_mb=1): # chunk_size_kb is now configurable size_bytes = size_mb * 1024 * 1024 chunk_size = 1024 * 1024 # 1mb chunk size bytes_written = 0 sha256_hash = hashlib.sha256() # init hash with open(filename, 'wb') as f: while bytes_written < size_bytes: write_size = min(chunk_size, size_bytes - bytes_written) random_bytes = os.urandom(write_size) f.write(random_bytes) bytes_written += write_size sha256_hash.update(random_bytes) return sha256_hash.hexdigest() def login(self): response = requests.post(f"{API_URL}/login", json={"email": self.credentials.email, "password": self.credentials.password}) response.raise_for_status() token = response.json()["token"] self.token = token @staticmethod def get_sha256(data_bytes): sha256 = hashlib.sha256() sha256.update(data_bytes) return sha256.hexdigest() def upload_file(self, filename, folder = None): folder = folder or self.credentials.folder if self.token is None: self.login() url = f"{API_URL}/resources/{folder}" headers = {"Authorization": f"Bearer {self.token}"} files = dict(data=open(filename, 'rb')) try: r = requests.post(url, headers=headers, files=files, allow_redirects=True) r.raise_for_status() print(f"Upload success: {r.status_code}") except Exception as e: print(f"Upload fail: {e}") def get_encryption_key(self, hardware_hash): key = f'{self.credentials.email}-{self.credentials.password}-{hardware_hash}-#%@AzaionKey@%#---' return HWInfo.calc_hash(key) def load_bytes(self, filename, folder = None): folder = folder or self.credentials.folder hardware_service = HWService() hardware = hardware_service.get_hardware_info() if self.token is None: self.login() url = f"{API_URL}/resources/get/{folder}" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } payload = json.dumps( { "password": self.credentials.password, "hardware": hardware, "fileName": filename }, indent=4) response = requests.post(url, data=payload, headers=headers, stream=True, timeout=20) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: print('500!') key = self.get_encryption_key(hardware.hash) stream = BytesIO(response.raw.read()) data = HWInfo.decrypt_to(stream, key) print(f'Downloaded file: {filename}, {len(data)} bytes') return data credentials = Credentials('admin@azaion.com', 'Az@1on1000Odm$n', 'stage') api = Api(credentials) file = 'file1' sha256_init = Api.create_file(file, size_mb=1) api.upload_file(file) file_bytes = api.load_bytes(file) # # print(f'received: {len(file_bytes)/1024} kb') # sha256_downloaded = Api.get_sha256(file_bytes) # print(f'{sha256_init}: sha256 initial file') # print(f'{sha256_downloaded}: sha256 downloaded file')