Files
annotations/Azaion.AI/api_client.pyx
T
Alex Bezdieniezhnykh fb11622c32 rewrite inference and file loading to cython
Step 1: can compile
2025-01-15 16:43:56 +02:00

72 lines
2.6 KiB
Cython

# cython: language_level=3
import io
import os
from http import HTTPStatus
import requests
import constants
from hardware_service import HardwareService
from processor_command import FileCommand, CommandType
from security import Security
cdef class ApiClient:
"""Handles API authentication and downloading of the AI model."""
cdef str email
cdef str password
cdef str token
cdef str folder
def __init__(self, str email, str password, str folder):
self.email = email
self.password = password
self.folder = folder
if os.path.exists(constants.TOKEN_FILE):
with open(constants.TOKEN_FILE, "r") as file:
self.token = file.read().strip()
else:
self.token = None
cdef get_encryption_key(self, command: FileCommand, str hardware_hash):
return f'{self.email}-{self.password}-{hardware_hash}-#%@AzaionKey@%#---'
cdef login(self, str email, str password, persist_token:bool=False):
response = requests.post(f"{constants.API_URL}/login", json={"email": email, "password": password})
response.raise_for_status()
self.token = response.json()["token"]
if persist_token:
with open(constants.TOKEN_FILE, 'w') as file:
file.write(self.token)
cdef bytes load_file(self, command: FileCommand, persist_token:bool=False):
hardware_service = HardwareService()
hardware = hardware_service.get_hardware_info()
if self.token is None:
self.login(self.email, self.password, persist_token)
url = f"{constants.API_URL}/resources/get/{self.folder}"
headers = {"Authorization": f"Bearer {self.token}"}
payload = {
"password": self.password,
"hardware": hardware,
"fileName": command.filename
}
response = requests.post(url, json=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login(self.email, self.password, persist_token)
response = requests.post(url, json=payload, headers=headers, stream=True)
key = self.get_encryption_key(command, hardware.hash)
encrypted_stream = io.BytesIO(response.content)
decrypted_stream = io.BytesIO()
Security.decrypt_to(encrypted_stream, decrypted_stream, key)
return decrypted_stream
cdef bytes load_ai_model(self):
file_command = FileCommand(CommandType.LOAD, constants.AI_MODEL_FILE)
return self.load_file(file_command, True)