Files
annotations/Azaion.Loader/api_client.pyx
Alex Bezdieniezhnykh def7aad833 add resource check
incorrect pass / hw handling in a loader
2025-06-15 15:01:55 +03:00

200 lines
8.0 KiB
Cython

import json
import os
from http import HTTPStatus
from os import path
from uuid import UUID
import jwt
import requests
cimport constants
import yaml
from requests import HTTPError
from credentials cimport Credentials
from cdn_manager cimport CDNManager, CDNCredentials
from hardware_service cimport HardwareService
from security cimport Security
from user cimport User, RoleEnum
cdef class ApiClient:
"""Handles API authentication and downloading of the AI model."""
def __init__(self, str api_url):
self.credentials = None
self.user = None
self.token = None
self.cdn_manager = None
self.api_url = api_url
cdef set_credentials(self, Credentials credentials):
self.credentials = credentials
if self.cdn_manager is not None:
return
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, <str>'')
yaml_config = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(yaml_config["host"],
yaml_config["downloader_access_key"],
yaml_config["downloader_access_secret"],
yaml_config["uploader_access_key"],
yaml_config["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
cdef login(self):
response = None
try:
response = requests.post(f"{self.api_url}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
except HTTPError as e:
res = response.json()
constants.logerror(str(res))
if response.status_code == HTTPStatus.CONFLICT:
raise Exception(f"Error {res['ErrorCode']}: {res['Message']}")
cdef set_token(self, str token):
self.token = token
claims = jwt.decode(token, options={"verify_signature": False})
try:
id = str(UUID(claims.get("nameid", "")))
except ValueError:
raise ValueError("Invalid GUID format in claims")
email = claims.get("unique_name", "")
role_str = claims.get("role", "")
if role_str == "ApiAdmin":
role = RoleEnum.ApiAdmin
elif role_str == "Admin":
role = RoleEnum.Admin
elif role_str == "ResourceUploader":
role = RoleEnum.ResourceUploader
elif role_str == "Validator":
role = RoleEnum.Validator
elif role_str == "Operator":
role = RoleEnum.Operator
else:
role = RoleEnum.NONE
self.user = User(id, email, role)
cdef get_user(self):
if self.user is None:
self.login()
return self.user
cdef upload_file(self, str filename, bytes resource, str folder):
if self.token is None:
self.login()
url = f"{self.api_url}/resources/{folder}"
headers = { "Authorization": f"Bearer {self.token}" }
files = {'data': (filename, resource)}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
constants.log(f"Uploaded {filename} to {self.api_url}/{folder} successfully: {r.status_code}.")
except Exception as e:
constants.logerror(f"Upload fail: {e}")
cdef list_files(self, str folder, str search_file):
response = self.request('get', f'{self.api_url}/resources/list/{folder}', {
"search": search_file
}, is_stream=False)
constants.log(<str> f'Get files list by {folder}')
return response.json()
cdef check_resource(self):
cdef str hardware = HardwareService.get_hardware_info()
payload = json.dumps({ "hardware": hardware }, indent=4)
response = self.request('post', f'{self.api_url}/resources/check', payload, is_stream=False)
cdef load_bytes(self, str filename, str folder):
cdef str hardware = HardwareService.get_hardware_info()
hw_hash = Security.get_hw_hash(hardware)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware,
"fileName": filename
}, indent=4)
response = self.request('post', f'{self.api_url}/resources/get/{folder}', payload, is_stream=True)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes')
return data
cdef request(self, str method, str url, object payload, bint is_stream):
if self.token is None:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.request(method, url, data=payload, headers=headers, stream=is_stream)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.request(method, url, data=payload, headers=headers, stream=is_stream)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
raise Exception(f'Internal API error! {response.text}')
if response.status_code == HTTPStatus.CONFLICT:
res = response.json()
err_code = res['ErrorCode']
err_msg = res['Message']
raise Exception(f"Error {err_code}: {err_msg}")
return response
cdef load_big_file_cdn(self, str folder, str big_part):
constants.log(f'downloading file {folder}\\{big_part} from cdn...')
if self.cdn_manager.download(folder, big_part):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
return encrypted_bytes_big
else:
raise Exception(f'Cannot download file {folder}\\{big_part} from CDN!')
cdef load_big_small_resource(self, str resource_name, str folder):
cdef str big_part = f'{resource_name}.big'
cdef str small_part = f'{resource_name}.small'
encrypted_bytes_small = self.load_bytes(small_part, folder)
key = Security.get_resource_encryption_key()
constants.log(f'checking on existence for {folder}\\{big_part}')
if os.path.exists(os.path.join(<str> folder, big_part)):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
local_bytes_big = binary_file.read()
constants.log(f'local file {folder}\\{big_part} is found!')
try:
resource = Security.decrypt_to(encrypted_bytes_small + local_bytes_big, key)
return resource
except Exception as ex:
constants.logerror('Local file {folder}\\{big_part} doesnt match with api file, old version')
remote_bytes_big = self.load_big_file_cdn(folder, big_part)
return Security.decrypt_to(encrypted_bytes_small + remote_bytes_big, key)
cdef upload_big_small_resource(self, bytes resource, str resource_name, str folder):
cdef str big_part_name = f'{resource_name}.big'
cdef str small_part_name = f'{resource_name}.small'
key = Security.get_resource_encryption_key()
resource_encrypted = Security.encrypt_to(<bytes>resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.3 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size] # slice bytes for part1
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(folder, <str>big_part_name, part_big)
with open(path.join(<str>folder, <str>big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, folder)