import json import os from http import HTTPStatus from uuid import UUID import jwt import requests cimport constants from hardware_service cimport HardwareService, HardwareInfo from security cimport Security from io import BytesIO from user cimport User, RoleEnum cdef class ApiClient: """Handles API authentication and downloading of the AI model.""" def __init__(self, str email, str password, str folder): self.email = email self.password = password self.folder = folder self.user = None if os.path.exists(constants.TOKEN_FILE): with open(constants.TOKEN_FILE, "r") as file: self.set_token(file.read().strip()) else: self.token = None cdef get_encryption_key(self, str hardware_hash): cdef str key = f'{self.email}-{self.password}-{hardware_hash}-#%@AzaionKey@%#---' return Security.calc_hash(key) cdef login(self): response = requests.post(f"{constants.API_URL}/login", json={"email": self.email, "password": self.password}) response.raise_for_status() token = response.json()["token"] self.set_token(token) with open(constants.TOKEN_FILE, 'w') as file: file.write(token) cdef set_token(self, str token): self.token = token claims = jwt.decode(token, options={"verify_signature": False}) try: id = str(UUID(claims.get("nameid", ""))) except ValueError: raise ValueError("Invalid GUID format in claims") email = claims.get("unique_name", "") role_str = claims.get("role", "") if role_str == "ApiAdmin": role = RoleEnum.ApiAdmin elif role_str == "Admin": role = RoleEnum.Admin elif role_str == "ResourceUploader": role = RoleEnum.ResourceUploader elif role_str == "Validator": role = RoleEnum.Validator elif role_str == "Operator": role = RoleEnum.Operator else: role = RoleEnum.NONE self.user = User(id, email, role) cdef get_user(self): if self.user is None: self.login() return self.user cdef load_bytes(self, str filename): hardware_service = HardwareService() cdef HardwareInfo hardware = hardware_service.get_hardware_info() if self.token is None: self.login() url = f"{constants.API_URL}/resources/get/{self.folder}" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } payload = json.dumps( { "password": self.password, "hardware": hardware.to_json_object(), "fileName": filename }, indent=4) response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: print('500!') key = self.get_encryption_key(hardware.hash) stream = BytesIO(response.raw.read()) data = Security.decrypt_to(stream, key) print(f'loaded file: {filename}, {len(data)} bytes') return data cdef load_ai_model(self): return self.load_bytes(constants.AI_MODEL_FILE) cdef load_queue_config(self): return self.load_bytes(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8')