import json import os from http import HTTPStatus from uuid import UUID import jwt import requests cimport constants from hardware_service cimport HardwareService, HardwareInfo from security cimport Security from io import BytesIO from user cimport User, RoleEnum from file_data cimport FileData cdef class ApiClient: """Handles API authentication and downloading of the AI model.""" def __init__(self): self.credentials = None self.user = None self.token = None cdef get_encryption_key(self, str hardware_hash): cdef str key = f'{self.credentials.email}-{self.credentials.password}-{hardware_hash}-#%@AzaionKey@%#---' return Security.calc_hash(key) cdef login(self): response = requests.post(f"{constants.API_URL}/login", json={"email": self.credentials.email, "password": self.credentials.password}) response.raise_for_status() token = response.json()["token"] self.set_token(token) cdef set_token(self, str token): self.token = token claims = jwt.decode(token, options={"verify_signature": False}) try: id = str(UUID(claims.get("nameid", ""))) except ValueError: raise ValueError("Invalid GUID format in claims") email = claims.get("unique_name", "") role_str = claims.get("role", "") if role_str == "ApiAdmin": role = RoleEnum.ApiAdmin elif role_str == "Admin": role = RoleEnum.Admin elif role_str == "ResourceUploader": role = RoleEnum.ResourceUploader elif role_str == "Validator": role = RoleEnum.Validator elif role_str == "Operator": role = RoleEnum.Operator else: role = RoleEnum.NONE self.user = User(id, email, role) cdef get_user(self): if self.user is None: self.login() return self.user cdef load_bytes(self, str filename, str folder=None): folder = folder or self.credentials.folder hardware_service = HardwareService() cdef HardwareInfo hardware = hardware_service.get_hardware_info() if self.token is None: self.login() url = f"{constants.API_URL}/resources/get/{folder}" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } payload = json.dumps( { "password": self.credentials.password, "hardware": hardware.to_json_object(), "fileName": filename }, indent=4) response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: print('500!') key = self.get_encryption_key(hardware.hash) stream = BytesIO(response.raw.read()) data = Security.decrypt_to(stream, key) print(f'loaded file: {filename}, {len(data)} bytes') return data cdef load_ai_model(self): return self.load_bytes(constants.AI_MODEL_FILE) cdef load_queue_config(self): return self.load_bytes(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8')