Files
annotations/Azaion.Inference/inference_engine.pyx
T
2025-04-24 16:30:21 +03:00

178 lines
6.8 KiB
Cython

import json
import struct
from typing import List, Tuple
import numpy as np
import onnxruntime as onnx
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit # required for automatically initialize CUDA, do not remove.
import pynvml
cdef class InferenceEngine:
def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs):
self.batch_size = batch_size
cdef tuple get_input_shape(self):
raise NotImplementedError("Subclass must implement get_input_shape")
cdef int get_batch_size(self):
return self.batch_size
cpdef run(self, input_data):
raise NotImplementedError("Subclass must implement run")
cdef class OnnxEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs):
super().__init__(model_bytes, batch_size)
self.session = onnx.InferenceSession(model_bytes, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
self.model_inputs = self.session.get_inputs()
self.input_name = self.model_inputs[0].name
self.input_shape = self.model_inputs[0].shape
self.batch_size = self.input_shape[0] if self.input_shape[0] != -1 else batch_size
print(f'AI detection model input: {self.model_inputs} {self.input_shape}')
model_meta = self.session.get_modelmeta()
print("Metadata:", model_meta.custom_metadata_map)
cdef tuple get_input_shape(self):
shape = self.input_shape
return shape[2], shape[3]
cdef int get_batch_size(self):
return self.batch_size
cpdef run(self, input_data):
return self.session.run(None, {self.input_name: input_data})
cdef class TensorRTEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 4, **kwargs):
super().__init__(model_bytes, batch_size)
print('Enter init TensorRT')
try:
logger = trt.Logger(trt.Logger.WARNING)
runtime = trt.Runtime(logger)
engine = runtime.deserialize_cuda_engine(model_bytes)
if engine is None:
raise RuntimeError(f"Failed to load TensorRT engine from bytes")
self.context = engine.create_execution_context()
# input
self.input_name = engine.get_tensor_name(0)
engine_input_shape = engine.get_tensor_shape(self.input_name)
if engine_input_shape[0] != -1:
self.batch_size = engine_input_shape[0]
else:
self.batch_size = batch_size
self.input_shape = [
self.batch_size,
engine_input_shape[1], # Channels (usually fixed at 3 for RGB)
1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height
1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width
]
self.context.set_input_shape(self.input_name, self.input_shape)
input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize
self.d_input = cuda.mem_alloc(input_size)
# output
self.output_name = engine.get_tensor_name(1)
engine_output_shape = tuple(engine.get_tensor_shape(self.output_name))
self.output_shape = [
self.batch_size,
300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number
6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls
]
self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32)
self.d_output = cuda.mem_alloc(self.h_output.nbytes)
self.stream = cuda.Stream()
except Exception as e:
raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}")
@staticmethod
cdef unsigned long long get_gpu_memory_bytes(device_id=0):
total_memory = None
try:
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(device_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
total_memory = mem_info.total
except pynvml.NVMLError:
total_memory = None
finally:
try:
pynvml.nvmlShutdown()
except pynvml.NVMLError:
pass
return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory # default 2 Gb
@staticmethod
cdef str get_engine_filename(device_id=0):
try:
device = cuda.Device(device_id)
sm_count = device.multiprocessor_count
cc_major, cc_minor = device.compute_capability()
return f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}.engine"
except Exception:
return None
@staticmethod
cdef bytes convert_from_onnx(bytes onnx_model):
cdef unsigned long long workspace_bytes = int(TensorRTEngine.get_gpu_memory_bytes() * 0.9)
explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
trt_logger = trt.Logger(trt.Logger.WARNING)
with trt.Builder(trt_logger) as builder, \
builder.create_network(explicit_batch_flag) as network, \
trt.OnnxParser(network, trt_logger) as parser, \
builder.create_builder_config() as config:
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_bytes)
if not parser.parse(onnx_model):
return None
if builder.platform_has_fast_fp16:
print('Converting to supported fp16')
config.set_flag(trt.BuilderFlag.FP16)
else:
print('Converting to supported fp32. (fp16 is not supported)')
plan = builder.build_serialized_network(network, config)
if plan is None:
print('Conversion failed.')
return None
print('conversion done!')
return bytes(plan)
cdef tuple get_input_shape(self):
return self.input_shape[2], self.input_shape[3]
cdef int get_batch_size(self):
return self.batch_size
cpdef run(self, input_data):
try:
cuda.memcpy_htod_async(self.d_input, input_data, self.stream)
self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer
self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer
self.context.execute_async_v3(stream_handle=self.stream.handle)
self.stream.synchronize()
# Fix: Remove the stream parameter from memcpy_dtoh
cuda.memcpy_dtoh(self.h_output, self.d_output)
output = self.h_output.reshape(self.output_shape)
return [output]
except Exception as e:
raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")