mirror of
https://github.com/azaion/ai-training.git
synced 2026-04-22 22:26:36 +00:00
148 lines
5.9 KiB
Python
148 lines
5.9 KiB
Python
import re
|
|
import struct
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Tuple
|
|
import json
|
|
import numpy as np
|
|
import tensorrt as trt
|
|
import pycuda.driver as cuda
|
|
from inference.onnx_engine import InferenceEngine
|
|
# required for automatically initialize CUDA, do not remove.
|
|
import pycuda.autoinit
|
|
import pynvml
|
|
|
|
|
|
class TensorRTEngine(InferenceEngine):
|
|
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
|
|
|
|
def __init__(self, model_bytes: bytes, **kwargs):
|
|
try:
|
|
# metadata_len = struct.unpack("<I", model_bytes[:4])[0]
|
|
# try:
|
|
# self.metadata = json.loads(model_bytes[4:4 + metadata_len])
|
|
# self.class_names = self.metadata['names']
|
|
# print(f"Model metadata: {json.dumps(self.metadata, indent=2)}")
|
|
# except json.JSONDecodeError as err:
|
|
# print(f"Failed to parse metadata")
|
|
# return
|
|
# engine_data = model_bytes[4 + metadata_len:]
|
|
|
|
runtime = trt.Runtime(self.TRT_LOGGER)
|
|
self.engine = runtime.deserialize_cuda_engine(model_bytes)
|
|
|
|
if self.engine is None:
|
|
raise RuntimeError(f"Failed to load TensorRT engine!")
|
|
|
|
self.context = self.engine.create_execution_context()
|
|
|
|
# input
|
|
self.input_name = self.engine.get_tensor_name(0)
|
|
engine_input_shape = self.engine.get_tensor_shape(self.input_name)
|
|
if engine_input_shape[0] != -1:
|
|
self.batch_size = engine_input_shape[0]
|
|
self.input_shape = [
|
|
self.batch_size,
|
|
engine_input_shape[1], # Channels (usually fixed at 3 for RGB)
|
|
1280 if engine_input_shape[2] == -1 else engine_input_shape[2], # Height
|
|
1280 if engine_input_shape[3] == -1 else engine_input_shape[3] # Width
|
|
]
|
|
self.context.set_input_shape(self.input_name, self.input_shape)
|
|
input_size = trt.volume(self.input_shape) * np.dtype(np.float32).itemsize
|
|
self.d_input = cuda.mem_alloc(input_size)
|
|
|
|
# output
|
|
self.output_name = self.engine.get_tensor_name(1)
|
|
engine_output_shape = tuple(self.engine.get_tensor_shape(self.output_name))
|
|
self.output_shape = [
|
|
4 if self.input_shape[0] == -1 else self.input_shape[0], # by default, batch size is 4
|
|
300 if engine_output_shape[1] == -1 else engine_output_shape[1], # max detections number
|
|
6 if engine_output_shape[2] == -1 else engine_output_shape[2] # x1 y1 x2 y2 conf cls
|
|
]
|
|
self.h_output = cuda.pagelocked_empty(tuple(self.output_shape), dtype=np.float32)
|
|
self.d_output = cuda.mem_alloc(self.h_output.nbytes)
|
|
|
|
self.stream = cuda.Stream()
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to initialize TensorRT engine: {str(e)}")
|
|
|
|
def get_input_shape(self) -> Tuple[int, int]:
|
|
return self.input_shape[2], self.input_shape[3]
|
|
|
|
def get_batch_size(self) -> int:
|
|
return self.batch_size
|
|
|
|
@staticmethod
|
|
def get_gpu_memory_bytes(device_id=0) -> int:
|
|
total_memory = None
|
|
try:
|
|
pynvml.nvmlInit()
|
|
handle = pynvml.nvmlDeviceGetHandleByIndex(device_id)
|
|
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
|
|
total_memory = mem_info.total
|
|
except pynvml.NVMLError:
|
|
total_memory = None
|
|
finally:
|
|
try:
|
|
pynvml.nvmlShutdown()
|
|
except pynvml.NVMLError:
|
|
pass
|
|
return 2 * 1024 * 1024 * 1024 if total_memory is None else total_memory # default 2 Gb
|
|
|
|
@staticmethod
|
|
def get_engine_filename(device_id=0) -> str | None:
|
|
try:
|
|
device = cuda.Device(device_id)
|
|
sm_count = device.multiprocessor_count
|
|
cc_major, cc_minor = device.compute_capability()
|
|
return f"azaion.cc_{cc_major}.{cc_minor}_sm_{sm_count}.engine"
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def convert_from_onnx(onnx_model: bytes) -> bytes | None:
|
|
workspace_bytes = int(TensorRTEngine.get_gpu_memory_bytes() * 0.9)
|
|
|
|
explicit_batch_flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
|
|
|
|
with trt.Builder(TensorRTEngine.TRT_LOGGER) as builder, \
|
|
builder.create_network(explicit_batch_flag) as network, \
|
|
trt.OnnxParser(network, TensorRTEngine.TRT_LOGGER) as parser, \
|
|
builder.create_builder_config() as config:
|
|
|
|
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_bytes)
|
|
|
|
if not parser.parse(onnx_model):
|
|
return None
|
|
|
|
if builder.platform_has_fast_fp16:
|
|
print('Converting to supported fp16')
|
|
config.set_flag(trt.BuilderFlag.FP16)
|
|
else:
|
|
print('Converting to supported fp32. (fp16 is not supported)')
|
|
plan = builder.build_serialized_network(network, config)
|
|
|
|
if plan is None:
|
|
print('Conversion failed.')
|
|
return None
|
|
|
|
return bytes(plan)
|
|
|
|
def run(self, input_data: np.ndarray) -> List[np.ndarray]:
|
|
try:
|
|
cuda.memcpy_htod_async(self.d_input, input_data, self.stream)
|
|
self.context.set_tensor_address(self.input_name, int(self.d_input)) # input buffer
|
|
self.context.set_tensor_address(self.output_name, int(self.d_output)) # output buffer
|
|
|
|
self.context.execute_async_v3(stream_handle=self.stream.handle)
|
|
self.stream.synchronize()
|
|
|
|
# Fix: Remove the stream parameter from memcpy_dtoh
|
|
cuda.memcpy_dtoh(self.h_output, self.d_output)
|
|
|
|
output = self.h_output.reshape(self.output_shape)
|
|
return [output]
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to run TensorRT inference: {str(e)}") |