import io import json from http import HTTPStatus import requests import constants from hardware_service import get_hardware_info from security import Security class ApiCredentials: def __init__(self, url, email, password, folder): self.url = url self.email = email self.password = password self.folder = folder class Api: def __init__(self, credentials): self.token = None self.credentials = credentials def login(self): response = requests.post(f'{self.credentials.url}/login', json={"email": self.credentials.email, "password": self.credentials.password}) response.raise_for_status() token = response.json()["token"] self.token = token def upload_file(self, filename: str, file_bytes: bytearray): folder = self.credentials.folder if self.token is None: self.login() url = f"{self.credentials.url}/resources/{folder}" headers = {"Authorization": f"Bearer {self.token}"} files = {'data': (filename, io.BytesIO(file_bytes))} try: r = requests.post(url, headers=headers, files=files, allow_redirects=True) r.raise_for_status() print(f"Upload {len(file_bytes)} bytes ({filename}) to {self.credentials.url}. Result: {r.status_code}") except Exception as e: print(f"Upload fail: {e}") def load_bytes(self, filename, folder = None): folder = folder or self.credentials.folder hardware = get_hardware_info() if self.token is None: self.login() url = f"{self.credentials.url}/resources/get/{folder}" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } payload = json.dumps( { "password": self.credentials.password, "hardware": hardware.to_json_object(), "fileName": filename }, indent=4) response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: print('500!') hw_hash = Security.get_hw_hash(hardware) key = Security.get_api_encryption_key(self.credentials, hw_hash) resp_bytes = response.raw.read() data = Security.decrypt_to(resp_bytes, key) print(f'Downloaded file: {filename}, {len(data)} bytes') return data def load_resource(self, big_part, small_part): with open(big_part, 'rb') as binary_file: encrypted_bytes_big = binary_file.read() encrypted_bytes_small = self.load_bytes(small_part) encrypted_model_bytes = encrypted_bytes_small + encrypted_bytes_big key = Security.get_model_encryption_key() model_bytes = Security.decrypt_to(encrypted_model_bytes, key) return model_bytes