mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 22:36:31 +00:00
28069f63f9
This reverts commit cf01e5d952.
291 lines
11 KiB
Cython
291 lines
11 KiB
Cython
import json
|
|
import mimetypes
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
cimport constants
|
|
from remote_command cimport RemoteCommand
|
|
from annotation cimport Detection, Annotation
|
|
from ai_config cimport AIRecognitionConfig
|
|
from hardware_service cimport HardwareService
|
|
from security cimport Security
|
|
|
|
if HardwareService.has_nvidia_gpu():
|
|
from tensorrt_engine import TensorRTEngine
|
|
else:
|
|
from onnx_engine import OnnxEngine
|
|
|
|
|
|
cdef class Inference:
|
|
def __init__(self, api_client, on_annotation):
|
|
self.api_client = api_client
|
|
self.on_annotation = on_annotation
|
|
self.stop_signal = False
|
|
self.model_input = None
|
|
self.model_width = 0
|
|
self.model_height = 0
|
|
self.engine = None
|
|
self.is_building_engine = False
|
|
|
|
cdef build_tensor_engine(self):
|
|
is_nvidia = HardwareService.has_nvidia_gpu()
|
|
if not is_nvidia:
|
|
return
|
|
|
|
engine_filename = TensorRTEngine.get_engine_filename(0)
|
|
key = Security.get_model_encryption_key()
|
|
models_dir = constants.MODELS_FOLDER
|
|
if not os.path.exists(os.path.join(<str> models_dir, f'{engine_filename}.big')):
|
|
#TODO: Check cdn on engine exists, if there is, download
|
|
self.is_building_engine = True
|
|
time.sleep(8) # prevent simultaneously loading dll and models
|
|
onnx_model = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key)
|
|
model_bytes = TensorRTEngine.convert_from_onnx(onnx_model)
|
|
self.api_client.upload_big_small_resource(model_bytes, <str> engine_filename, models_dir, key)
|
|
print('uploaded ')
|
|
self.is_building_engine = False
|
|
else:
|
|
print('tensor rt engine is here, no need to build')
|
|
|
|
|
|
cdef init_ai(self):
|
|
if self.engine is not None:
|
|
return
|
|
|
|
is_nvidia = HardwareService.has_nvidia_gpu()
|
|
key = Security.get_model_encryption_key()
|
|
models_dir = constants.MODELS_FOLDER
|
|
if is_nvidia:
|
|
while self.is_building_engine:
|
|
time.sleep(1)
|
|
engine_filename = TensorRTEngine.get_engine_filename(0)
|
|
model_bytes = self.api_client.load_big_small_resource(engine_filename, models_dir, key)
|
|
self.engine = TensorRTEngine(model_bytes)
|
|
else:
|
|
model_bytes = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key)
|
|
self.engine = OnnxEngine(model_bytes)
|
|
|
|
self.model_height, self.model_width = self.engine.get_input_shape()
|
|
|
|
cdef preprocess(self, frames):
|
|
blobs = [cv2.dnn.blobFromImage(frame,
|
|
scalefactor=1.0 / 255.0,
|
|
size=(self.model_width, self.model_height),
|
|
mean=(0, 0, 0),
|
|
swapRB=True,
|
|
crop=False)
|
|
for frame in frames]
|
|
return np.vstack(blobs)
|
|
|
|
cdef postprocess(self, output, ai_config):
|
|
cdef list[Detection] detections = []
|
|
cdef int ann_index
|
|
cdef float x1, y1, x2, y2, conf, cx, cy, w, h
|
|
cdef int class_id
|
|
cdef list[list[Detection]] results = []
|
|
try:
|
|
for ann_index in range(len(output[0])):
|
|
detections.clear()
|
|
for det in output[0][ann_index]:
|
|
if det[4] == 0: # if confidence is 0 then valid points are over.
|
|
break
|
|
x1 = det[0] / self.model_width
|
|
y1 = det[1] / self.model_height
|
|
x2 = det[2] / self.model_width
|
|
y2 = det[3] / self.model_height
|
|
conf = round(det[4], 2)
|
|
class_id = int(det[5])
|
|
|
|
x = (x1 + x2) / 2
|
|
y = (y1 + y2) / 2
|
|
w = x2 - x1
|
|
h = y2 - y1
|
|
if conf >= ai_config.probability_threshold:
|
|
detections.append(Detection(x, y, w, h, class_id, conf))
|
|
filtered_detections = self.remove_overlapping_detections(detections)
|
|
results.append(filtered_detections)
|
|
return results
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to postprocess: {str(e)}")
|
|
|
|
cdef remove_overlapping_detections(self, list[Detection] detections):
|
|
cdef Detection det1, det2
|
|
filtered_output = []
|
|
filtered_out_indexes = []
|
|
|
|
for det1_index in range(len(detections)):
|
|
if det1_index in filtered_out_indexes:
|
|
continue
|
|
det1 = detections[det1_index]
|
|
res = det1_index
|
|
for det2_index in range(det1_index + 1, len(detections)):
|
|
det2 = detections[det2_index]
|
|
if det1.overlaps(det2):
|
|
if det1.confidence > det2.confidence or (
|
|
det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id
|
|
filtered_out_indexes.append(det2_index)
|
|
else:
|
|
filtered_out_indexes.append(res)
|
|
res = det2_index
|
|
filtered_output.append(detections[res])
|
|
filtered_out_indexes.append(res)
|
|
return filtered_output
|
|
|
|
cdef bint is_video(self, str filepath):
|
|
mime_type, _ = mimetypes.guess_type(<str>filepath)
|
|
return mime_type and mime_type.startswith("video")
|
|
|
|
cdef split_list_extend(self, lst, chunk_size):
|
|
chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
|
|
|
|
# If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element.
|
|
last_chunk = chunks[len(chunks) - 1]
|
|
if len(last_chunk) < chunk_size:
|
|
last_elem = last_chunk[len(last_chunk)-1]
|
|
while len(last_chunk) < chunk_size:
|
|
last_chunk.append(last_elem)
|
|
return chunks
|
|
|
|
cdef run_inference(self, RemoteCommand cmd):
|
|
cdef list[str] videos = []
|
|
cdef list[str] images = []
|
|
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_msgpack(cmd.data)
|
|
if ai_config is None:
|
|
raise Exception('ai recognition config is empty')
|
|
|
|
self.stop_signal = False
|
|
self.init_ai()
|
|
|
|
print(ai_config.paths)
|
|
for m in ai_config.paths:
|
|
if self.is_video(m):
|
|
videos.append(m)
|
|
else:
|
|
images.append(m)
|
|
# images first, it's faster
|
|
if len(images) > 0:
|
|
for chunk in self.split_list_extend(images, self.engine.get_batch_size()):
|
|
print(f'run inference on {" ".join(chunk)}...')
|
|
self._process_images(cmd, ai_config, chunk)
|
|
if len(videos) > 0:
|
|
for v in videos:
|
|
print(f'run inference on {v}...')
|
|
self._process_video(cmd, ai_config, v)
|
|
|
|
|
|
cdef _process_video(self, RemoteCommand cmd, AIRecognitionConfig ai_config, str video_name):
|
|
cdef int frame_count = 0
|
|
cdef list batch_frames = []
|
|
cdef list[int] batch_timestamps = []
|
|
self._previous_annotation = None
|
|
|
|
v_input = cv2.VideoCapture(<str>video_name)
|
|
while v_input.isOpened() and not self.stop_signal:
|
|
ret, frame = v_input.read()
|
|
if not ret or frame is None:
|
|
break
|
|
|
|
frame_count += 1
|
|
if frame_count % ai_config.frame_period_recognition == 0:
|
|
batch_frames.append(frame)
|
|
batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC)))
|
|
|
|
if len(batch_frames) == self.engine.get_batch_size():
|
|
input_blob = self.preprocess(batch_frames)
|
|
|
|
outputs = self.engine.run(input_blob)
|
|
|
|
list_detections = self.postprocess(outputs, ai_config)
|
|
for i in range(len(list_detections)):
|
|
detections = list_detections[i]
|
|
annotation = Annotation(video_name, batch_timestamps[i], detections)
|
|
if self.is_valid_annotation(annotation, ai_config):
|
|
_, image = cv2.imencode('.jpg', batch_frames[i])
|
|
annotation.image = image.tobytes()
|
|
self._previous_annotation = annotation
|
|
|
|
print(annotation)
|
|
self.on_annotation(cmd, annotation)
|
|
|
|
batch_frames.clear()
|
|
batch_timestamps.clear()
|
|
v_input.release()
|
|
|
|
|
|
cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
|
|
cdef list frames = []
|
|
cdef list timestamps = []
|
|
self._previous_annotation = None
|
|
for image in image_paths:
|
|
frame = cv2.imread(image)
|
|
frames.append(frame)
|
|
timestamps.append(0)
|
|
|
|
input_blob = self.preprocess(frames)
|
|
|
|
outputs = self.engine.run(input_blob)
|
|
|
|
list_detections = self.postprocess(outputs, ai_config)
|
|
for i in range(len(list_detections)):
|
|
detections = list_detections[i]
|
|
annotation = Annotation(image_paths[i], timestamps[i], detections)
|
|
_, image = cv2.imencode('.jpg', frames[i])
|
|
annotation.image = image.tobytes()
|
|
self.on_annotation(cmd, annotation)
|
|
|
|
|
|
cdef stop(self):
|
|
self.stop_signal = True
|
|
|
|
cdef bint is_valid_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
|
|
# No detections, invalid
|
|
if not annotation.detections:
|
|
return False
|
|
|
|
# First valid annotation, always accept
|
|
if self._previous_annotation is None:
|
|
return True
|
|
|
|
# Enough time has passed since last annotation
|
|
if annotation.time >= self._previous_annotation.time + <long>(ai_config.frame_recognition_seconds * 1000):
|
|
return True
|
|
|
|
# More objects detected than before
|
|
if len(annotation.detections) > len(self._previous_annotation.detections):
|
|
return True
|
|
|
|
cdef:
|
|
Detection current_det, prev_det
|
|
double dx, dy, distance_sq, min_distance_sq
|
|
Detection closest_det
|
|
|
|
# Check each detection against previous frame
|
|
for current_det in annotation.detections:
|
|
min_distance_sq = 1e18 # Initialize with large value
|
|
closest_det = None
|
|
|
|
# Find the closest detection in previous frame
|
|
for prev_det in self._previous_annotation.detections:
|
|
dx = current_det.x - prev_det.x
|
|
dy = current_det.y - prev_det.y
|
|
distance_sq = dx * dx + dy * dy
|
|
|
|
if distance_sq < min_distance_sq:
|
|
min_distance_sq = distance_sq
|
|
closest_det = prev_det
|
|
|
|
# Check if beyond tracking distance
|
|
if min_distance_sq > ai_config.tracking_distance_confidence:
|
|
return True
|
|
|
|
# Check probability increase
|
|
if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase:
|
|
return True
|
|
|
|
return False
|