Files
annotations/Azaion.Inference/api_client.pyx
T
dzaitsev d92da6afa4 Errors sending to UI
notifying client of AI model conversion
2025-05-14 12:43:50 +03:00

173 lines
6.5 KiB
Cython

import json
import os
from http import HTTPStatus
from os import path
from uuid import UUID
import jwt
import requests
cimport constants
import yaml
from requests import HTTPError
from cdn_manager cimport CDNManager, CDNCredentials
from hardware_service cimport HardwareService
from security cimport Security
from user cimport User, RoleEnum
cdef class ApiClient:
"""Handles API authentication and downloading of the AI model."""
def __init__(self, str api_url):
self.credentials = None
self.user = None
self.token = None
self.cdn_manager = None
self.api_url = api_url
cdef set_credentials(self, Credentials credentials):
self.credentials = credentials
if self.cdn_manager is not None:
return
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, <str>'')
yaml_config = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(yaml_config["host"],
yaml_config["downloader_access_key"],
yaml_config["downloader_access_secret"],
yaml_config["uploader_access_key"],
yaml_config["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
cdef login(self):
response = None
try:
response = requests.post(f"{self.api_url}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
except HTTPError as e:
print(response.json())
if response.status_code == HTTPStatus.CONFLICT:
res = response.json()
raise Exception(res['Message'])
cdef set_token(self, str token):
self.token = token
claims = jwt.decode(token, options={"verify_signature": False})
try:
id = str(UUID(claims.get("nameid", "")))
except ValueError:
raise ValueError("Invalid GUID format in claims")
email = claims.get("unique_name", "")
role_str = claims.get("role", "")
if role_str == "ApiAdmin":
role = RoleEnum.ApiAdmin
elif role_str == "Admin":
role = RoleEnum.Admin
elif role_str == "ResourceUploader":
role = RoleEnum.ResourceUploader
elif role_str == "Validator":
role = RoleEnum.Validator
elif role_str == "Operator":
role = RoleEnum.Operator
else:
role = RoleEnum.NONE
self.user = User(id, email, role)
cdef get_user(self):
if self.user is None:
self.login()
return self.user
cdef upload_file(self, str filename, bytes resource, str folder):
if self.token is None:
self.login()
url = f"{self.api_url}/resources/{folder}"
headers = { "Authorization": f"Bearer {self.token}" }
files = {'data': (filename, resource)}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
constants.log(f"Uploaded {filename} to {self.api_url}/{folder} successfully: {r.status_code}.")
except Exception as e:
constants.log(f"Upload fail: {e}")
cdef load_bytes(self, str filename, str folder):
hardware_service = HardwareService()
cdef str hardware = hardware_service.get_hardware_info()
if self.token is None:
self.login()
url = f"{self.api_url}/resources/get/{folder}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware,
"fileName": filename
}, indent=4)
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
print('500!')
hw_hash = Security.get_hw_hash(hardware)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes')
return data
cdef load_big_small_resource(self, str resource_name, str folder, str key):
cdef str big_part = f'{resource_name}.big'
cdef str small_part = f'{resource_name}.small'
print(f'checking on existence for {folder}\\{big_part}')
if os.path.exists(os.path.join(<str> folder, big_part)):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
print(f'local file {folder}\\{big_part} is found!')
else:
print(f'downloading file {folder}\\{big_part} from cdn...')
if self.cdn_manager.download(folder, big_part):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
else:
return None
encrypted_bytes_small = self.load_bytes(small_part, folder)
encrypted_bytes = encrypted_bytes_small + encrypted_bytes_big
result = Security.decrypt_to(encrypted_bytes, key)
return result
cdef upload_big_small_resource(self, bytes resource, str resource_name, str folder, str key):
cdef str big_part_name = f'{resource_name}.big'
cdef str small_part_name = f'{resource_name}.small'
resource_encrypted = Security.encrypt_to(<bytes>resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.3 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size] # slice bytes for part1
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(folder, <str>big_part_name, part_big)
with open(path.join(<str>folder, <str>big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, folder)