Files
annotations/Azaion.Inference/api_client.pyx
T
Alex Bezdieniezhnykh babcbc0fc7 clean postbuild script
clean warnings
2025-04-28 10:20:06 +03:00

150 lines
5.7 KiB
Cython

import json
from http import HTTPStatus
from os import path
from uuid import UUID
import jwt
import requests
cimport constants
import yaml
from cdn_manager cimport CDNManager, CDNCredentials
from hardware_service cimport HardwareService, HardwareInfo
from security cimport Security
from user cimport User, RoleEnum
cdef class ApiClient:
"""Handles API authentication and downloading of the AI model."""
def __init__(self):
self.credentials = None
self.user = None
self.token = None
self.cdn_manager = None
cdef set_credentials(self, Credentials credentials):
self.credentials = credentials
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, <str>'')
yaml_config = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(yaml_config["host"],
yaml_config["downloader_access_key"],
yaml_config["downloader_access_secret"],
yaml_config["uploader_access_key"],
yaml_config["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
cdef login(self):
response = requests.post(f"{constants.API_URL}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
cdef set_token(self, str token):
self.token = token
claims = jwt.decode(token, options={"verify_signature": False})
try:
id = str(UUID(claims.get("nameid", "")))
except ValueError:
raise ValueError("Invalid GUID format in claims")
email = claims.get("unique_name", "")
role_str = claims.get("role", "")
if role_str == "ApiAdmin":
role = RoleEnum.ApiAdmin
elif role_str == "Admin":
role = RoleEnum.Admin
elif role_str == "ResourceUploader":
role = RoleEnum.ResourceUploader
elif role_str == "Validator":
role = RoleEnum.Validator
elif role_str == "Operator":
role = RoleEnum.Operator
else:
role = RoleEnum.NONE
self.user = User(id, email, role)
cdef get_user(self):
if self.user is None:
self.login()
return self.user
cdef upload_file(self, str filename, bytes resource, str folder):
if self.token is None:
self.login()
url = f"{constants.API_URL}/resources/{folder}"
headers = { "Authorization": f"Bearer {self.token}" }
files = {'data': (filename, resource)}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
constants.log(f"Uploaded {filename} to {constants.API_URL}/{folder} successfully: {r.status_code}.")
except Exception as e:
constants.log(f"Upload fail: {e}")
cdef load_bytes(self, str filename, str folder):
hardware_service = HardwareService()
cdef HardwareInfo hardware = hardware_service.get_hardware_info()
if self.token is None:
self.login()
url = f"{constants.API_URL}/resources/get/{folder}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware.to_json_object(),
"fileName": filename
}, indent=4)
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
print('500!')
hw_hash = Security.get_hw_hash(hardware)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes')
return data
cdef load_big_small_resource(self, str resource_name, str folder, str key):
cdef str big_part = path.join(<str>folder, f'{resource_name}.big')
cdef str small_part = f'{resource_name}.small'
with open(<str>big_part, 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
encrypted_bytes_small = self.load_bytes(small_part, folder)
encrypted_bytes = encrypted_bytes_small + encrypted_bytes_big
result = Security.decrypt_to(encrypted_bytes, key)
return result
cdef upload_big_small_resource(self, bytes resource, str resource_name, str folder, str key):
cdef str big_part_name = f'{resource_name}.big'
cdef str small_part_name = f'{resource_name}.small'
resource_encrypted = Security.encrypt_to(<bytes>resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.3 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size] # slice bytes for part1
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(<str>constants.MODELS_FOLDER, <str>big_part_name, part_big)
with open(path.join(<str>folder, <str>big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, constants.MODELS_FOLDER)