import json import os from http import HTTPStatus from os import path from uuid import UUID import jwt import requests cimport constants import yaml from requests import HTTPError from credentials cimport Credentials from cdn_manager cimport CDNManager, CDNCredentials from hardware_service cimport HardwareService from security cimport Security from user cimport User, RoleEnum cdef class ApiClient: """Handles API authentication and downloading of the AI model.""" def __init__(self, str api_url): self.credentials = None self.user = None self.token = None self.cdn_manager = None self.api_url = api_url cdef set_credentials(self, Credentials credentials): self.credentials = credentials if self.cdn_manager is not None: return yaml_bytes = self.load_bytes(constants.CDN_CONFIG, '') yaml_config = yaml.safe_load(yaml_bytes) creds = CDNCredentials(yaml_config["host"], yaml_config["downloader_access_key"], yaml_config["downloader_access_secret"], yaml_config["uploader_access_key"], yaml_config["uploader_access_secret"]) self.cdn_manager = CDNManager(creds) cdef login(self): response = None try: response = requests.post(f"{self.api_url}/login", json={"email": self.credentials.email, "password": self.credentials.password}) response.raise_for_status() token = response.json()["token"] self.set_token(token) except HTTPError as e: res = response.json() constants.logerror(str(res)) if response.status_code == HTTPStatus.CONFLICT: raise Exception(f"Error {res['ErrorCode']}: {res['Message']}") cdef set_token(self, str token): self.token = token claims = jwt.decode(token, options={"verify_signature": False}) try: id = str(UUID(claims.get("nameid", ""))) except ValueError: raise ValueError("Invalid GUID format in claims") email = claims.get("unique_name", "") role_str = claims.get("role", "") if role_str == "ApiAdmin": role = RoleEnum.ApiAdmin elif role_str == "Admin": role = RoleEnum.Admin elif role_str == "ResourceUploader": role = RoleEnum.ResourceUploader elif role_str == "Validator": role = RoleEnum.Validator elif role_str == "Operator": role = RoleEnum.Operator else: role = RoleEnum.NONE self.user = User(id, email, role) cdef get_user(self): if self.user is None: self.login() return self.user cdef upload_file(self, str filename, bytes resource, str folder): if self.token is None: self.login() url = f"{self.api_url}/resources/{folder}" headers = { "Authorization": f"Bearer {self.token}" } files = {'data': (filename, resource)} try: r = requests.post(url, headers=headers, files=files, allow_redirects=True) r.raise_for_status() constants.log(f"Uploaded {filename} to {self.api_url}/{folder} successfully: {r.status_code}.") except Exception as e: constants.logerror(f"Upload fail: {e}") cdef list_files(self, str folder, str search_file): response = self.request('get', f'{self.api_url}/resources/list/{folder}', { "search": search_file }, is_stream=False) constants.log( f'Get files list by {folder}') return response.json() cdef check_resource(self): cdef str hardware = HardwareService.get_hardware_info() payload = json.dumps({ "hardware": hardware }, indent=4) response = self.request('post', f'{self.api_url}/resources/check', payload, is_stream=False) cdef load_bytes(self, str filename, str folder): cdef str hardware = HardwareService.get_hardware_info() hw_hash = Security.get_hw_hash(hardware) key = Security.get_api_encryption_key(self.credentials, hw_hash) payload = json.dumps( { "password": self.credentials.password, "hardware": hardware, "fileName": filename }, indent=4) response = self.request('post', f'{self.api_url}/resources/get/{folder}', payload, is_stream=True) resp_bytes = response.raw.read() data = Security.decrypt_to(resp_bytes, key) constants.log(f'Downloaded file: {filename}, {len(data)} bytes') return data cdef request(self, str method, str url, object payload, bint is_stream): if self.token is None: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.request(method, url, data=payload, headers=headers, stream=is_stream) if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: self.login() headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.request(method, url, data=payload, headers=headers, stream=is_stream) if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: raise Exception(f'Internal API error! {response.text}') if response.status_code == HTTPStatus.CONFLICT: res = response.json() err_code = res['ErrorCode'] err_msg = res['Message'] raise Exception(f"Error {err_code}: {err_msg}") return response cdef load_big_file_cdn(self, str folder, str big_part): constants.log(f'downloading file {folder}\\{big_part} from cdn...') if self.cdn_manager.download(folder, big_part): with open(path.join( folder, big_part), 'rb') as binary_file: encrypted_bytes_big = binary_file.read() return encrypted_bytes_big else: raise Exception(f'Cannot download file {folder}\\{big_part} from CDN!') cdef load_big_small_resource(self, str resource_name, str folder): cdef str big_part = f'{resource_name}.big' cdef str small_part = f'{resource_name}.small' encrypted_bytes_small = self.load_bytes(small_part, folder) key = Security.get_resource_encryption_key() constants.log(f'checking on existence for {folder}\\{big_part}') if os.path.exists(os.path.join( folder, big_part)): with open(path.join( folder, big_part), 'rb') as binary_file: local_bytes_big = binary_file.read() constants.log(f'local file {folder}\\{big_part} is found!') try: resource = Security.decrypt_to(encrypted_bytes_small + local_bytes_big, key) return resource except Exception as ex: constants.logerror('Local file {folder}\\{big_part} doesnt match with api file, old version') remote_bytes_big = self.load_big_file_cdn(folder, big_part) return Security.decrypt_to(encrypted_bytes_small + remote_bytes_big, key) cdef upload_big_small_resource(self, bytes resource, str resource_name, str folder): cdef str big_part_name = f'{resource_name}.big' cdef str small_part_name = f'{resource_name}.small' key = Security.get_resource_encryption_key() resource_encrypted = Security.encrypt_to(resource, key) part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.3 * len(resource_encrypted))) part_small = resource_encrypted[:part_small_size] # slice bytes for part1 part_big = resource_encrypted[part_small_size:] self.cdn_manager.upload(folder, big_part_name, part_big) with open(path.join(folder, big_part_name), 'wb') as f: f.write(part_big) self.upload_file(small_part_name, part_small, folder)