Files
ai-training/inference/tensorrt_engine.py
T
2025-04-06 18:45:06 +03:00

94 lines
3.9 KiB
Python

import re
import struct
import subprocess
from pathlib import Path
from typing import List, Tuple
import json
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
from inference.onnx_engine import InferenceEngine
import pycuda.autoinit # required for automatically initialize CUDA, do not remove.
class TensorRTEngine(InferenceEngine):
def __init__(self, model_bytes: bytes, batch_size: int = 4, **kwargs):
self.batch_size = batch_size
try:
logger = trt.Logger(trt.Logger.WARNING)
metadata_len = struct.unpack("<I", model_bytes[:4])[0]
try:
self.metadata = json.loads(model_bytes[4:4 + metadata_len])
self.class_names = self.metadata['names']
print(f"Model metadata: {json.dumps(self.metadata, indent=2)}")
except json.JSONDecodeError as err:
print(f"Failed to parse metadata")
return
engine_data = model_bytes[4 + metadata_len:]
runtime = trt.Runtime(logger)
self.engine = runtime.deserialize_cuda_engine(engine_data)
if self.engine is None:
raise RuntimeError(f"Failed to load TensorRT engine!")
self.context = self.engine.create_execution_context()
# input
self.input_name = self.engine.get_tensor_name(0)
engine_input_shape = self.engine.get_tensor_shape(self.input_name)
if engine_input_shape[0] != -1:
self.batch_size = engine_input_shape[0]
self.input_shape = [
self.batch_size,
engine_input_shape[1], # Channels (usually fixed at 3 for RGB)
1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height
1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width
]
self.context.set_input_shape(self.input_name, self.input_shape)
input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize
self.d_input = cuda.mem_alloc(input_size)
# output
self.output_name = self.engine.get_tensor_name(1)
engine_output_shape = tuple(self.engine.get_tensor_shape(self.output_name))
self.output_shape = [
batch_size if self.input_shape[0] == -1 else self.input_shape[0],
300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number
6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls
]
self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32)
self.d_output = cuda.mem_alloc(self.h_output.nbytes)
self.stream = cuda.Stream()
except Exception as e:
raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}")
def get_input_shape(self) -> Tuple[int, int]:
return self.input_shape[2], self.input_shape[3]
def get_batch_size(self) -> int:
return self.batch_size
# In tensorrt_engine.py, modify the run method:
def run(self, input_data: np.ndarray) -> List[np.ndarray]:
try:
cuda.memcpy_htod_async(self.d_input, input_data, self.stream)
self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer
self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer
self.context.execute_async_v3(stream_handle=self.stream.handle)
self.stream.synchronize()
# Fix: Remove the stream parameter from memcpy_dtoh
cuda.memcpy_dtoh(self.h_output, self.d_output)
output = self.h_output.reshape(self.output_shape)
return [output]
except Exception as e:
raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}")