mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 22:16:30 +00:00
242 lines
8.3 KiB
Python
242 lines
8.3 KiB
Python
import json
|
|
import subprocess
|
|
import psutil
|
|
import hashlib
|
|
import base64
|
|
import os
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import padding
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
import requests
|
|
from io import BytesIO
|
|
from http import HTTPStatus
|
|
import random
|
|
|
|
|
|
BUFFER_SIZE = 64 * 1024 # 64 KB
|
|
API_URL = "https://api.azaion.com"
|
|
|
|
|
|
class HWInfo:
|
|
def __init__(self, cpu, gpu, memory, mac_address, hw_hash):
|
|
self.cpu = cpu
|
|
self.gpu = gpu
|
|
self.memory = memory
|
|
self.mac_address = mac_address
|
|
self.hash = hw_hash
|
|
|
|
def to_json_object(self):
|
|
return {
|
|
"CPU": self.cpu,
|
|
"GPU": self.gpu,
|
|
"MacAddress": self.mac_address,
|
|
"Memory": self.memory,
|
|
"Hash": self.hash,
|
|
}
|
|
|
|
def __str__(self):
|
|
return f'CPU: {self.cpu}. GPU: {self.gpu}. Memory: {self.memory}. MAC Address: {self.mac_address}'
|
|
|
|
@staticmethod
|
|
def encrypt_to(input_stream, key):
|
|
aes_key = hashlib.sha256(key.encode('utf-8')).digest()
|
|
iv = os.urandom(16)
|
|
|
|
cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend())
|
|
encryptor = cipher.encryptor()
|
|
|
|
res = bytearray()
|
|
res.extend(iv)
|
|
while chunk := input_stream.read(BUFFER_SIZE):
|
|
encrypted_chunk = encryptor.update(chunk)
|
|
res.extend(encrypted_chunk)
|
|
res.extend(encryptor.finalize())
|
|
return res
|
|
|
|
@staticmethod
|
|
def decrypt_to(input_stream, key):
|
|
aes_key = hashlib.sha256(key.encode('utf-8')).digest()
|
|
iv = input_stream.read(16)
|
|
|
|
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
|
decryptor = cipher.decryptor()
|
|
|
|
res = bytearray()
|
|
while chunk := input_stream.read(BUFFER_SIZE):
|
|
decrypted_chunk = decryptor.update(chunk)
|
|
res.extend(decrypted_chunk)
|
|
res.extend(decryptor.finalize())
|
|
|
|
unpadder = padding.PKCS7(128).unpadder() # AES block size is 128 bits (16 bytes)
|
|
return unpadder.update(res) + unpadder.finalize()
|
|
|
|
@staticmethod
|
|
def calc_hash(key):
|
|
str_bytes = key.encode('utf-8')
|
|
hash_bytes = hashlib.sha384(str_bytes).digest()
|
|
h = base64.b64encode(hash_bytes).decode('utf-8')
|
|
return h
|
|
|
|
|
|
class HWService:
|
|
def __init__(self):
|
|
try:
|
|
res = subprocess.check_output("ver", shell=True).decode('utf-8')
|
|
if "Microsoft Windows" in res:
|
|
self.is_windows = True
|
|
else:
|
|
self.is_windows = False
|
|
except Exception:
|
|
print('Error during os type checking')
|
|
self.is_windows = False
|
|
|
|
def get_mac_address(self, interface="Ethernet"):
|
|
addresses = psutil.net_if_addrs()
|
|
for interface_name, interface_info in addresses.items():
|
|
if interface_name == interface:
|
|
for addr in interface_info:
|
|
if addr.family == psutil.AF_LINK:
|
|
return addr.address.replace('-', '')
|
|
return None
|
|
|
|
def get_hardware_info(self):
|
|
if self.is_windows:
|
|
os_command = (
|
|
"powershell -Command \""
|
|
"Get-CimInstance -ClassName Win32_Processor | Select-Object -ExpandProperty Name | Write-Output; "
|
|
"Get-CimInstance -ClassName Win32_VideoController | Select-Object -ExpandProperty Name | Write-Output; "
|
|
"Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object -ExpandProperty TotalVisibleMemorySize | Write-Output"
|
|
"\""
|
|
)
|
|
else:
|
|
os_command = (
|
|
"/bin/bash -c \" lscpu | grep 'Model name:' | cut -d':' -f2 && "
|
|
"lspci | grep VGA | cut -d':' -f3 && "
|
|
"free -g | grep Mem: | awk '{print $2}' && \""
|
|
)
|
|
result = subprocess.check_output(os_command, shell=True).decode('utf-8')
|
|
lines = [line.strip() for line in result.splitlines() if line.strip()]
|
|
|
|
cpu = lines[0].replace("Name=", "").replace(" ", " ")
|
|
gpu = lines[1].replace("Name=", "").replace(" ", " ")
|
|
memory = lines[2].replace("TotalVisibleMemorySize=", "").replace(" ", " ")
|
|
mac_address = self.get_mac_address()
|
|
full_hw_str = f'Azaion_{mac_address}_{cpu}_{gpu}'
|
|
|
|
hw_hash = HWInfo.calc_hash(full_hw_str)
|
|
return HWInfo(cpu, gpu, memory, mac_address, hw_hash)
|
|
|
|
class Credentials:
|
|
|
|
def __init__(self, email, password, folder):
|
|
self.email = email
|
|
self.password = password
|
|
self.folder = folder
|
|
|
|
class Api:
|
|
|
|
def __init__(self, credentials):
|
|
self.token = None
|
|
self.credentials = credentials
|
|
|
|
@staticmethod
|
|
def create_file(filename, size_mb=1): # chunk_size_kb is now configurable
|
|
size_bytes = size_mb * 1024 * 1024
|
|
chunk_size = 1024 * 1024 # 1mb chunk size
|
|
bytes_written = 0
|
|
sha256_hash = hashlib.sha256() # init hash
|
|
|
|
with open(filename, 'wb') as f:
|
|
while bytes_written < size_bytes:
|
|
write_size = min(chunk_size, size_bytes - bytes_written)
|
|
random_bytes = os.urandom(write_size)
|
|
f.write(random_bytes)
|
|
bytes_written += write_size
|
|
sha256_hash.update(random_bytes)
|
|
return sha256_hash.hexdigest()
|
|
|
|
def login(self):
|
|
response = requests.post(f"{API_URL}/login",
|
|
json={"email": self.credentials.email, "password": self.credentials.password})
|
|
response.raise_for_status()
|
|
token = response.json()["token"]
|
|
self.token = token
|
|
|
|
@staticmethod
|
|
def get_sha256(data_bytes):
|
|
sha256 = hashlib.sha256()
|
|
sha256.update(data_bytes)
|
|
return sha256.hexdigest()
|
|
|
|
def upload_file(self, filename, folder = None):
|
|
folder = folder or self.credentials.folder
|
|
if self.token is None:
|
|
self.login()
|
|
url = f"{API_URL}/resources/{folder}"
|
|
headers = {"Authorization": f"Bearer {self.token}"}
|
|
files = dict(data=open(filename, 'rb'))
|
|
try:
|
|
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
|
|
r.raise_for_status()
|
|
print(f"Upload success: {r.status_code}")
|
|
except Exception as e:
|
|
print(f"Upload fail: {e}")
|
|
|
|
def get_encryption_key(self, hardware_hash):
|
|
key = f'{self.credentials.email}-{self.credentials.password}-{hardware_hash}-#%@AzaionKey@%#---'
|
|
return HWInfo.calc_hash(key)
|
|
|
|
def load_bytes(self, filename, folder = None):
|
|
folder = folder or self.credentials.folder
|
|
hardware_service = HWService()
|
|
hardware = hardware_service.get_hardware_info()
|
|
|
|
if self.token is None:
|
|
self.login()
|
|
url = f"{API_URL}/resources/get/{folder}"
|
|
headers = {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
payload = json.dumps(
|
|
{
|
|
"password": self.credentials.password,
|
|
"hardware": hardware,
|
|
"fileName": filename
|
|
}, indent=4)
|
|
response = requests.post(url, data=payload, headers=headers, stream=True, timeout=20)
|
|
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
|
|
self.login()
|
|
headers = {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.post(url, data=payload, headers=headers, stream=True)
|
|
|
|
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
|
|
print('500!') #test
|
|
|
|
key = self.get_encryption_key(hardware.hash)
|
|
|
|
stream = BytesIO(response.raw.read())
|
|
data = HWInfo.decrypt_to(stream, key)
|
|
print(f'Downloaded file: {filename}, {len(data)} bytes')
|
|
return data
|
|
|
|
|
|
credentials = Credentials('admin@azaion.com', 'Az@1on1000Odm$n', 'stage')
|
|
api = Api(credentials)
|
|
|
|
file = 'file1'
|
|
sha256_init = Api.create_file(file, size_mb=1)
|
|
api.upload_file(file)
|
|
|
|
file_bytes = api.load_bytes(file)
|
|
|
|
#
|
|
# print(f'received: {len(file_bytes)/1024} kb')
|
|
# sha256_downloaded = Api.get_sha256(file_bytes)
|
|
# print(f'{sha256_init}: sha256 initial file')
|
|
# print(f'{sha256_downloaded}: sha256 downloaded file')
|