from inference_engine cimport InferenceEngine import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit # required for automatically initialize CUDA, do not remove. import pynvml import numpy as np cimport constants_inf cdef class TensorRTEngine(InferenceEngine): def __init__(self, model_bytes: bytes, batch_size: int = 4, **kwargs): super().__init__(model_bytes, batch_size) try: logger = trt.Logger(trt.Logger.WARNING) runtime = trt.Runtime(logger) engine = runtime.deserialize_cuda_engine(model_bytes) if engine is None: raise RuntimeError(f"Failed to load TensorRT engine from bytes") self.context = engine.create_execution_context() # input self.input_name = engine.get_tensor_name(0) engine_input_shape = engine.get_tensor_shape(self.input_name) if engine_input_shape[0] != -1: self.batch_size = engine_input_shape[0] else: self.batch_size = batch_size self.input_shape = [ self.batch_size, engine_input_shape[1], # Channels (usually fixed at 3 for RGB) 1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height 1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width ] self.context.set_input_shape(self.input_name, self.input_shape) input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize self.d_input = cuda.mem_alloc(input_size) # output self.output_name = engine.get_tensor_name(1) engine_output_shape = tuple(engine.get_tensor_shape(self.output_name)) self.output_shape = [ self.batch_size, 300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number 6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls ] self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32) self.d_output = cuda.mem_alloc(self.h_output.nbytes) self.stream = cuda.Stream() except Exception as e: raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}") @staticmethod def get_gpu_memory_bytes(int device_id): total_memory = None try: pynvml.nvmlInit() handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) total_memory = mem_info.total except pynvml.NVMLError: total_memory = None finally: try: pynvml.nvmlShutdown() except pynvml.NVMLError: pass return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory # default 2 Gb @staticmethod def get_engine_filename(int device_id): try: device = cuda.Device(device_id) sm_count = device.multiprocessor_count cc_major, cc_minor = device.compute_capability() return f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}.engine" except Exception: return None @staticmethod def convert_from_onnx(bytes onnx_model): workspace_bytes = int(TensorRTEngine.get_gpu_memory_bytes(0) * 0.9) explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) trt_logger = trt.Logger(trt.Logger.WARNING) with trt.Builder(trt_logger) as builder, \ builder.create_network(explicit_batch_flag) as network, \ trt.OnnxParser(network, trt_logger) as parser, \ builder.create_builder_config() as config: config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_bytes) if not parser.parse(onnx_model): return None if builder.platform_has_fast_fp16: constants_inf.log('Converting to supported fp16') config.set_flag(trt.BuilderFlag.FP16) else: constants_inf.log('Converting to supported fp32. (fp16 is not supported)') plan = builder.build_serialized_network(network, config) if plan is None: constants_inf.logerror('Conversion failed.') return None constants_inf.log('conversion done!') return bytes(plan) cpdef tuple get_input_shape(self): return self.input_shape[2], self.input_shape[3] cpdef int get_batch_size(self): return self.batch_size cpdef run(self, input_data): try: cuda.memcpy_htod_async(self.d_input, input_data, self.stream) self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer self.context.execute_async_v3(stream_handle=self.stream.handle) self.stream.synchronize() # Fix: Remove the stream parameter from memcpy_dtoh cuda.memcpy_dtoh(self.h_output, self.d_output) output = self.h_output.reshape(self.output_shape) return [output] except Exception as e: raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")