Errors sending to UI

notifying client of AI model conversion
This commit is contained in:
dzaitsev
2025-05-07 17:32:29 +03:00
committed by Alex Bezdieniezhnykh
42 changed files with 630 additions and 363 deletions
+32 -10
View File
@@ -1,4 +1,5 @@
import json
import os
from http import HTTPStatus
from os import path
from uuid import UUID
@@ -6,6 +7,7 @@ import jwt
import requests
cimport constants
import yaml
from requests import HTTPError
from cdn_manager cimport CDNManager, CDNCredentials
from hardware_service cimport HardwareService
@@ -23,6 +25,9 @@ cdef class ApiClient:
cdef set_credentials(self, Credentials credentials):
self.credentials = credentials
if self.cdn_manager is not None:
return
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, <str>'')
yaml_config = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(yaml_config["host"],
@@ -34,11 +39,18 @@ cdef class ApiClient:
self.cdn_manager = CDNManager(creds)
cdef login(self):
response = requests.post(f"{self.api_url}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
response = None
try:
response = requests.post(f"{self.api_url}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
except HTTPError as e:
print(response.json())
if response.status_code == HTTPStatus.CONFLICT:
res = response.json()
raise Exception(res['Message'])
cdef set_token(self, str token):
@@ -123,11 +135,21 @@ cdef class ApiClient:
return data
cdef load_big_small_resource(self, str resource_name, str folder, str key):
cdef str big_part = path.join(<str>folder, f'{resource_name}.big')
cdef str big_part = f'{resource_name}.big'
cdef str small_part = f'{resource_name}.small'
with open(<str>big_part, 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
print(f'checking on existence for {folder}\\{big_part}')
if os.path.exists(os.path.join(<str> folder, big_part)):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
print(f'local file {folder}\\{big_part} is found!')
else:
print(f'downloading file {folder}\\{big_part} from cdn...')
if self.cdn_manager.download(folder, big_part):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
else:
return None
encrypted_bytes_small = self.load_bytes(small_part, folder)
@@ -145,7 +167,7 @@ cdef class ApiClient:
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(<str>constants.MODELS_FOLDER, <str>big_part_name, part_big)
self.cdn_manager.upload(folder, <str>big_part_name, part_big)
with open(path.join(<str>folder, <str>big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, constants.MODELS_FOLDER)
self.upload_file(small_part_name, part_small, folder)
+3 -3
View File
@@ -31,10 +31,10 @@ cdef class CDNManager:
print(e)
return False
cdef download(self, str bucket, str filename):
cdef download(self, str folder, str filename):
try:
self.download_client.download_file(bucket, filename, filename)
print(f'downloaded {filename} from the {bucket} to current folder')
self.download_client.download_file(folder, filename, f'{folder}\\{filename}')
print(f'downloaded {filename} from the {folder} to current folder')
return True
except Exception as e:
print(e)
-2
View File
@@ -13,7 +13,5 @@ cdef str MODELS_FOLDER
cdef int SMALL_SIZE_KB
cdef bytes DONE_SIGNAL
cdef log(str log_message, bytes client_id=*)
+1 -1
View File
@@ -16,7 +16,7 @@ cdef class Inference:
cdef int model_width
cdef int model_height
cdef build_tensor_engine(self)
cdef build_tensor_engine(self, object updater_callback)
cdef init_ai(self)
cdef bint is_building_engine
cdef bint is_video(self, str filepath)
+16 -13
View File
@@ -32,7 +32,7 @@ cdef class Inference:
self.engine = None
self.is_building_engine = False
cdef build_tensor_engine(self):
cdef build_tensor_engine(self, object updater_callback):
is_nvidia = HardwareService.has_nvidia_gpu()
if not is_nvidia:
return
@@ -40,18 +40,22 @@ cdef class Inference:
engine_filename = TensorRTEngine.get_engine_filename(0)
key = Security.get_model_encryption_key()
models_dir = constants.MODELS_FOLDER
if not os.path.exists(os.path.join(<str> models_dir, f'{engine_filename}.big')):
#TODO: Check cdn on engine exists, if there is, download
self.is_building_engine = True
time.sleep(8) # prevent simultaneously loading dll and models
onnx_model = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key)
model_bytes = TensorRTEngine.convert_from_onnx(onnx_model)
self.api_client.upload_big_small_resource(model_bytes, <str> engine_filename, models_dir, key)
print('uploaded ')
self.is_building_engine = False
else:
print('tensor rt engine is here, no need to build')
self.is_building_engine = True
updater_callback('downloading')
if self.api_client.load_big_small_resource(engine_filename, models_dir, key):
print('tensor rt engine is here, no need to build')
self.is_building_engine = False
return
# time.sleep(8) # prevent simultaneously loading dll and models
updater_callback('converting')
onnx_model = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key)
model_bytes = TensorRTEngine.convert_from_onnx(onnx_model)
updater_callback('uploading')
self.api_client.upload_big_small_resource(model_bytes, <str> engine_filename, models_dir, key)
print(f'uploaded {engine_filename} to CDN and API')
self.is_building_engine = False
cdef init_ai(self):
if self.engine is not None:
@@ -161,7 +165,6 @@ cdef class Inference:
self.stop_signal = False
self.init_ai()
print(ai_config.paths)
for m in ai_config.paths:
if self.is_video(m):
videos.append(m)
+23 -10
View File
@@ -34,7 +34,8 @@ cdef class CommandProcessor:
try:
command = self.inference_queue.get(timeout=0.5)
self.inference.run_inference(command)
self.remote_handler.send(command.client_id, <bytes>'DONE'.encode('utf-8'))
end_inference_command = RemoteCommand(CommandType.INFERENCE_DATA, None, 'DONE')
self.remote_handler.send(command.client_id, end_inference_command.serialize())
except queue.Empty:
continue
except Exception as e:
@@ -44,11 +45,13 @@ cdef class CommandProcessor:
cdef on_command(self, RemoteCommand command):
try:
if command.command_type == CommandType.LOGIN:
self.login(command)
self.api_client.set_credentials(Credentials.from_msgpack(command.data))
elif command.command_type == CommandType.LOAD:
self.load_file(command)
elif command.command_type == CommandType.INFERENCE:
self.inference_queue.put(command)
elif command.command_type == CommandType.AI_AVAILABILITY_CHECK:
self.build_tensor_engine(command.client_id)
elif command.command_type == CommandType.STOP_INFERENCE:
self.inference.stop()
elif command.command_type == CommandType.EXIT:
@@ -59,18 +62,28 @@ cdef class CommandProcessor:
except Exception as e:
print(f"Error handling client: {e}")
cdef login(self, RemoteCommand command):
self.api_client.set_credentials(Credentials.from_msgpack(command.data))
Thread(target=self.inference.build_tensor_engine).start() # build AI engine in non-blocking thread
cdef build_tensor_engine(self, client_id):
self.inference.build_tensor_engine(lambda status: self.build_tensor_status_updater(client_id, status))
self.remote_handler.send(client_id, RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, None, 'enabled').serialize())
cdef build_tensor_status_updater(self, bytes client_id, str status):
self.remote_handler.send(client_id, RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, None, status).serialize())
cdef load_file(self, RemoteCommand command):
cdef FileData file_data = FileData.from_msgpack(command.data)
response = self.api_client.load_bytes(file_data.filename, file_data.folder)
self.remote_handler.send(command.client_id, response)
cdef RemoteCommand response
cdef FileData file_data
cdef bytes file_bytes
try:
file_data = FileData.from_msgpack(command.data)
file_bytes = self.api_client.load_bytes(file_data.filename, file_data.folder)
response = RemoteCommand(CommandType.DATA_BYTES, file_bytes)
except Exception as e:
response = RemoteCommand(CommandType.DATA_BYTES, None, str(e))
self.remote_handler.send(command.client_id, response.serialize())
cdef on_annotation(self, RemoteCommand cmd, Annotation annotation):
data = annotation.serialize()
self.remote_handler.send(cmd.client_id, data)
cdef RemoteCommand response = RemoteCommand(CommandType.INFERENCE_DATA, annotation.serialize())
self.remote_handler.send(cmd.client_id, response.serialize())
def stop(self):
self.inference.stop()
+6
View File
@@ -1,13 +1,19 @@
cdef enum CommandType:
LOGIN = 10
LOAD = 20
DATA_BYTES = 25
INFERENCE = 30
INFERENCE_DATA = 35
STOP_INFERENCE = 40
AI_AVAILABILITY_CHECK = 80,
AI_AVAILABILITY_RESULT = 85,
ERROR = 90
EXIT = 100
cdef class RemoteCommand:
cdef public bytes client_id
cdef CommandType command_type
cdef str message
cdef bytes data
@staticmethod
+10 -3
View File
@@ -1,16 +1,22 @@
import msgpack
cdef class RemoteCommand:
def __init__(self, CommandType command_type, bytes data):
def __init__(self, CommandType command_type, bytes data, str message=None):
self.command_type = command_type
self.data = data
self.message = message
def __str__(self):
command_type_names = {
10: "LOGIN",
20: "LOAD",
25: "DATA_BYTES",
30: "INFERENCE",
35: "INFERENCE_DATA",
40: "STOP_INFERENCE",
80: "AI_AVAILABILITY_CHECK",
85: "AI_AVAILABILITY_RESULT",
90: "ERROR",
100: "EXIT"
}
data_str = f'{len(self.data)} bytes' if self.data else ''
@@ -19,10 +25,11 @@ cdef class RemoteCommand:
@staticmethod
cdef from_msgpack(bytes data):
unpacked = msgpack.unpackb(data, strict_map_key=False)
return RemoteCommand(unpacked.get("CommandType"), unpacked.get("Data"))
return RemoteCommand(unpacked.get("CommandType"), unpacked.get("Data"), unpacked.get("Message"))
cdef bytes serialize(self):
return msgpack.packb({
"CommandType": self.command_type,
"Data": self.data
"Data": self.data,
"Message": self.message
})
+6 -4
View File
@@ -70,10 +70,12 @@ cdef class RemoteCommandHandler:
worker_socket.close()
cdef send(self, bytes client_id, bytes data):
with self._context.socket(zmq.DEALER) as socket:
socket.connect("inproc://backend")
socket.send_multipart([client_id, data])
# constants.log(<str>f'Sent {len(data)} bytes.', client_id)
self._router.send_multipart([client_id, data])
# with self._context.socket(zmq.DEALER) as socket:
# socket.connect("inproc://backend")
# socket.send_multipart([client_id, data])
# # constants.log(<str>f'Sent {len(data)} bytes.', client_id)
cdef stop(self):
self._shutdown_event.set()