from engines.inference_engine cimport InferenceEngine import tensorrt as trt # pyright: ignore[reportMissingImports] import pycuda.driver as cuda # pyright: ignore[reportMissingImports] import pycuda.autoinit # pyright: ignore[reportMissingImports] import pynvml import numpy as np import os cimport constants_inf GPU_MEMORY_FRACTION = 0.8 class _CacheCalibrator(trt.IInt8EntropyCalibrator2): def __init__(self, path): super().__init__() self._path = path def get_batch_size(self): return 1 def get_batch(self, names): return None def read_calibration_cache(self): with open(self._path, 'rb') as f: return f.read() def write_calibration_cache(self, cache): pass cdef class TensorRTEngine(InferenceEngine): def __init__(self, model_bytes: bytes, max_batch_size: int = 8, **kwargs): InferenceEngine.__init__(self, model_bytes, max_batch_size, engine_name="tensorrt") try: logger = trt.Logger(trt.Logger.WARNING) runtime = trt.Runtime(logger) engine = runtime.deserialize_cuda_engine(model_bytes) if engine is None: raise RuntimeError("Failed to load TensorRT engine from bytes") self.context = engine.create_execution_context() self.input_name = engine.get_tensor_name(0) engine_input_shape = engine.get_tensor_shape(self.input_name) C = engine_input_shape[1] H = 1280 if engine_input_shape[2] == -1 else engine_input_shape[2] W = 1280 if engine_input_shape[3] == -1 else engine_input_shape[3] if engine_input_shape[0] == -1: gpu_mem = TensorRTEngine.get_gpu_memory_bytes(0) self.max_batch_size = TensorRTEngine.calculate_max_batch_size(gpu_mem, H, W) else: self.max_batch_size = engine_input_shape[0] self.input_shape = [self.max_batch_size, C, H, W] self.context.set_input_shape(self.input_name, self.input_shape) input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize self.d_input = cuda.mem_alloc(input_size) self.output_name = engine.get_tensor_name(1) engine_output_shape = tuple(engine.get_tensor_shape(self.output_name)) self.output_shape = [ self.max_batch_size, 300 if engine_output_shape[1] == -1 else engine_output_shape[1], 6 if engine_output_shape[2] == -1 else engine_output_shape[2], ] self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32) self.d_output = cuda.mem_alloc(self.h_output.nbytes) self.stream = cuda.Stream() except Exception as e: raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}") @staticmethod def calculate_max_batch_size(gpu_memory_bytes, int input_h, int input_w): frame_input_bytes = 3 * input_h * input_w * 4 estimated_per_frame = frame_input_bytes * 12 available = gpu_memory_bytes * GPU_MEMORY_FRACTION calculated = max(1, int(available / estimated_per_frame)) return min(calculated, 32) @staticmethod def get_gpu_memory_bytes(int device_id): total_memory = None try: pynvml.nvmlInit() handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) total_memory = mem_info.total except pynvml.NVMLError: total_memory = None finally: try: pynvml.nvmlShutdown() except pynvml.NVMLError: pass return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory @staticmethod def get_engine_filename(str precision="fp16"): try: from engines import tensor_gpu_index device = cuda.Device(max(tensor_gpu_index, 0)) sm_count = device.multiprocessor_count cc_major, cc_minor = device.compute_capability() base = f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}" if precision == "int8": return f"{base}.int8.engine" return f"{base}.engine" except Exception: return None @staticmethod def convert_from_source(bytes onnx_model, str calib_cache_path=None, bint force_static_input=False): gpu_mem = TensorRTEngine.get_gpu_memory_bytes(0) workspace_bytes = int(gpu_mem * 0.9) explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) trt_logger = trt.Logger(trt.Logger.WARNING) if force_static_input: try: from engines.onnx_tensorrt_compat import prepare_for_tensorrt onnx_model = prepare_for_tensorrt(onnx_model) constants_inf.log('Prepared ONNX model for TensorRT static Jetson build') except Exception as e: constants_inf.logerror(f'ONNX TensorRT compatibility preparation failed: {str(e)}') with trt.Builder(trt_logger) as builder, \ builder.create_network(explicit_batch_flag) as network, \ trt.OnnxParser(network, trt_logger) as parser, \ builder.create_builder_config() as config: config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_bytes) if not parser.parse(onnx_model): for i in range(parser.num_errors): constants_inf.logerror(f'TensorRT ONNX parser error: {parser.get_error(i)}') return None input_tensor = network.get_input(0) shape = input_tensor.shape C = shape[1] H = max(shape[2], 1280) if shape[2] != -1 else 1280 W = max(shape[3], 1280) if shape[3] != -1 else 1280 if force_static_input: input_tensor.shape = (1, C, H, W) elif shape[0] == -1 or shape[2] == -1 or shape[3] == -1: max_batch = TensorRTEngine.calculate_max_batch_size(gpu_mem, H, W) profile = builder.create_optimization_profile() profile.set_shape( input_tensor.name, (1, C, H, W), (max_batch, C, H, W), (max_batch, C, H, W), ) config.add_optimization_profile(profile) use_int8 = calib_cache_path is not None and os.path.isfile(calib_cache_path) if use_int8: constants_inf.log('Converting to INT8 with calibration cache') calibrator = _CacheCalibrator(calib_cache_path) config.set_flag(trt.BuilderFlag.INT8) if builder.platform_has_fast_fp16: config.set_flag(trt.BuilderFlag.FP16) config.int8_calibrator = calibrator elif builder.platform_has_fast_fp16: constants_inf.log('Converting to supported fp16') config.set_flag(trt.BuilderFlag.FP16) else: constants_inf.log('Converting to supported fp32. (fp16 is not supported)') plan = builder.build_serialized_network(network, config) if plan is None: constants_inf.logerror('Conversion failed.') return None constants_inf.log('conversion done!') return bytes(plan) cdef tuple get_input_shape(self): return (self.input_shape[2], self.input_shape[3]) cdef run(self, input_data): try: actual_batch = input_data.shape[0] if actual_batch != self.input_shape[0]: actual_shape = [actual_batch, self.input_shape[1], self.input_shape[2], self.input_shape[3]] self.context.set_input_shape(self.input_name, actual_shape) cuda.memcpy_htod_async(self.d_input, input_data, self.stream) self.context.set_tensor_address(self.input_name, int(self.d_input)) self.context.set_tensor_address(self.output_name, int(self.d_output)) self.context.execute_async_v3(stream_handle=self.stream.handle) self.stream.synchronize() cuda.memcpy_dtoh(self.h_output, self.d_output) output_shape = [actual_batch, self.output_shape[1], self.output_shape[2]] output = self.h_output[:actual_batch].reshape(output_shape) return [output] except Exception as e: raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")