import json from http import HTTPStatus from uuid import UUID import jwt import requests cimport constants from hardware_service cimport HardwareService, HardwareInfo from security cimport Security from io import BytesIO from user cimport User, RoleEnum cdef class ApiClient: """Handles API authentication and downloading of the AI model.""" def __init__(self): self.credentials = None self.user = None self.token = None cdef set_credentials(self, Credentials credentials): self.credentials = credentials cdef login(self): response = requests.post(f"{constants.API_URL}/login", json={"email": self.credentials.email, "password": self.credentials.password}) response.raise_for_status() token = response.json()["token"] self.set_token(token) cdef set_token(self, str token): self.token = token claims = jwt.decode(token, options={"verify_signature": False}) try: id = str(UUID(claims.get("nameid", ""))) except ValueError: raise ValueError("Invalid GUID format in claims") email = claims.get("unique_name", "") role_str = claims.get("role", "") if role_str == "ApiAdmin": role = RoleEnum.ApiAdmin elif role_str == "Admin": role = RoleEnum.Admin elif role_str == "ResourceUploader": role = RoleEnum.ResourceUploader elif role_str == "Validator": role = RoleEnum.Validator elif role_str == "Operator": role = RoleEnum.Operator else: role = RoleEnum.NONE self.user = User(id, email, role) cdef get_user(self): if self.user is None: self.login() return self.user cdef upload_file(self, str filename, str folder=None): folder = folder or self.credentials.folder if self.token is None: self.login() url = f"{constants.API_URL}/resources/{folder}" headers = { "Authorization": f"Bearer {self.token}" } files = dict(data=open(filename, 'rb')) try: r = requests.post(url, headers=headers, files=files, allow_redirects=True) r.raise_for_status() print(f"Upload success: {r.status_code}") except Exception as e: print(f"Upload fail: {e}") cdef load_bytes(self, str filename, str folder=None): folder = folder or self.credentials.folder hardware_service = HardwareService() cdef HardwareInfo hardware = hardware_service.get_hardware_info() if self.token is None: self.login() url = f"{constants.API_URL}/resources/get/{folder}" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } payload = json.dumps( { "password": self.credentials.password, "hardware": hardware.to_json_object(), "fileName": filename }, indent=4) response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post(url, data=payload, headers=headers, stream=True) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: print('500!') hw_hash = Security.get_hw_hash(hardware) key = Security.get_api_encryption_key(self.credentials, hw_hash) resp_bytes = response.raw.read() data = Security.decrypt_to(resp_bytes, key) constants.log(f'Downloaded file: {filename}, {len(data)} bytes') return data cdef load_ai_model(self, bint is_tensor=False): if is_tensor: big_file = constants.AI_TENSOR_MODEL_FILE_BIG small_file = constants.AI_TENSOR_MODEL_FILE_SMALL else: big_file = constants.AI_ONNX_MODEL_FILE_BIG small_file = constants.AI_ONNX_MODEL_FILE_SMALL with open(big_file, 'rb') as binary_file: encrypted_bytes_big = binary_file.read() print('read encrypted big file') print(f'small file: {small_file}') encrypted_bytes_small = self.load_bytes(small_file) print('read encrypted small file') encrypted_model_bytes = encrypted_bytes_small + encrypted_bytes_big key = Security.get_model_encryption_key() model_bytes = Security.decrypt_to(encrypted_model_bytes, key) return model_bytes