from pathlib import Path from typing import List, Tuple import json import numpy as np import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit # required for automatically initialize CUDA, do not remove. from onnx_engine import InferenceEngine class TensorRTEngine(InferenceEngine): def __init__(self, model_path: str, batch_size: int = 4, **kwargs): self.model_path = model_path self.batch_size = batch_size try: logger = trt.Logger(trt.Logger.WARNING) with open(model_path, 'rb') as f: metadata_len = int.from_bytes(f.read(4), byteorder='little', signed=True) metadata_bytes = f.read(metadata_len) try: self.metadata = json.loads(metadata_bytes) print(f"Model metadata: {json.dumps(self.metadata, indent=2)}") except json.JSONDecodeError: print(f"Failed to parse metadata: {metadata_bytes}") self.metadata = {} engine_data = f.read() runtime = trt.Runtime(logger) self.engine = runtime.deserialize_cuda_engine(engine_data) if self.engine is None: raise RuntimeError(f"Failed to load TensorRT engine from {model_path}") self.context = self.engine.create_execution_context() # input self.input_name = self.engine.get_tensor_name(0) engine_input_shape = self.engine.get_tensor_shape(self.input_name) self.input_shape = [ batch_size if engine_input_shape[0] == -1 else engine_input_shape[0], engine_input_shape[1], # Channels (usually fixed at 3 for RGB) 1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height 1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width ] self.context.set_input_shape(self.input_name, self.input_shape) input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize self.d_input = cuda.mem_alloc(input_size) # output self.output_name = self.engine.get_tensor_name(1) engine_output_shape = tuple(self.engine.get_tensor_shape(self.output_name)) self.output_shape = [ batch_size if self.input_shape[0] == -1 else self.input_shape[0], 300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number 6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls ] self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32) self.d_output = cuda.mem_alloc(self.h_output.nbytes) self.stream = cuda.Stream() except Exception as e: raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}") def get_input_shape(self) -> Tuple[int, int]: return self.input_shape[2], self.input_shape[3] def get_batch_size(self) -> int: return self.batch_size # In tensorrt_engine.py, modify the run method: def run(self, input_data: np.ndarray) -> List[np.ndarray]: try: cuda.memcpy_htod_async(self.d_input, input_data, self.stream) self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer self.context.execute_async_v3(stream_handle=self.stream.handle) self.stream.synchronize() # Fix: Remove the stream parameter from memcpy_dtoh cuda.memcpy_dtoh(self.h_output, self.d_output) output = self.h_output.reshape(self.output_shape) return [output] except Exception as e: raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")