Files
annotations/Azaion.Inference/inference.pyx
T
2025-03-02 21:32:31 +02:00

251 lines
9.8 KiB
Cython

import json
import mimetypes
import cv2
import numpy as np
import onnxruntime as onnx
cimport constants
from remote_command cimport RemoteCommand
from annotation cimport Detection, Annotation
from ai_config cimport AIRecognitionConfig
cdef class Inference:
def __init__(self, api_client, on_annotation):
self.api_client = api_client
self.on_annotation = on_annotation
self.stop_signal = False
self.session = None
self.model_input = None
self.model_width = 0
self.model_height = 0
self.class_names = None
def init_ai(self):
model_bytes = self.api_client.load_ai_model()
self.session = onnx.InferenceSession(
model_bytes, providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
)
model_inputs = self.session.get_inputs()
self.model_input = model_inputs[0].name
input_shape = model_inputs[0].shape
self.model_width = input_shape[2]
self.model_height = input_shape[3]
print(f'AI detection model input: {self.model_input} ({self.model_width}, {self.model_height})')
model_meta = self.session.get_modelmeta()
print("Metadata:", model_meta.custom_metadata_map)
self.class_names = eval(model_meta.custom_metadata_map["names"])
cdef preprocess(self, frames):
blobs = [cv2.dnn.blobFromImage(frame,
scalefactor=1.0 / 255.0,
size=(self.model_width, self.model_height),
mean=(0, 0, 0),
swapRB=True,
crop=False)
for frame in frames]
return np.vstack(blobs)
cdef postprocess(self, output, ai_config):
cdef list[Detection] detections = []
cdef int ann_index
cdef float x1, y1, x2, y2, conf, cx, cy, w, h
cdef int class_id
cdef list[list[Detection]] results = []
for ann_index in range(len(output[0])):
detections.clear()
for det in output[0][ann_index]:
if det[4] == 0: # if confidence is 0 then valid points are over.
break
x1 = det[0] / self.model_width
y1 = det[1] / self.model_height
x2 = det[2] / self.model_width
y2 = det[3] / self.model_height
conf = round(det[4], 2)
class_id = int(det[5])
x = (x1 + x2) / 2
y = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
if conf >= ai_config.probability_threshold:
detections.append(Detection(x, y, w, h, class_id, conf))
filtered_detections = self.remove_overlapping_detections(detections)
results.append(filtered_detections)
return results
cdef remove_overlapping_detections(self, list[Detection] detections):
cdef Detection det1, det2
filtered_output = []
filtered_out_indexes = []
for det1_index in range(len(detections)):
if det1_index in filtered_out_indexes:
continue
det1 = detections[det1_index]
res = det1_index
for det2_index in range(det1_index + 1, len(detections)):
det2 = detections[det2_index]
if det1.overlaps(det2):
if det1.confidence > det2.confidence or (
det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id
filtered_out_indexes.append(det2_index)
else:
filtered_out_indexes.append(res)
res = det2_index
filtered_output.append(detections[res])
filtered_out_indexes.append(res)
return filtered_output
cdef bint is_video(self, str filepath):
mime_type, _ = mimetypes.guess_type(<str>filepath)
return mime_type and mime_type.startswith("video")
cdef split_list_extend(self, lst, chunk_size):
chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
# If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element.
last_chunk = chunks[len(chunks) - 1]
if len(last_chunk) < chunk_size:
last_elem = last_chunk[len(last_chunk)-1]
while len(last_chunk) < chunk_size:
last_chunk.append(last_elem)
return chunks
cdef run_inference(self, RemoteCommand cmd):
cdef list[str] videos = []
cdef list[str] images = []
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_msgpack(cmd.data)
if ai_config is None:
raise Exception('ai recognition config is empty')
self.stop_signal = False
if self.session is None:
self.init_ai()
print(ai_config.paths)
for m in ai_config.paths:
if self.is_video(m):
videos.append(m)
else:
images.append(m)
# images first, it's faster
if len(images) > 0:
for chunk in self.split_list_extend(images, constants.MODEL_BATCH_SIZE):
print(f'run inference on {" ".join(chunk)}...')
self._process_images(cmd, ai_config, chunk)
if len(videos) > 0:
for v in videos:
print(f'run inference on {v}...')
self._process_video(cmd, ai_config, v)
cdef _process_video(self, RemoteCommand cmd, AIRecognitionConfig ai_config, str video_name):
cdef int frame_count = 0
cdef list batch_frames = []
cdef list[int] batch_timestamps = []
self._previous_annotation = None
v_input = cv2.VideoCapture(<str>video_name)
while v_input.isOpened() and not self.stop_signal:
ret, frame = v_input.read()
if not ret or frame is None:
break
frame_count += 1
if frame_count % ai_config.frame_period_recognition == 0:
batch_frames.append(frame)
batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC)))
if len(batch_frames) == constants.MODEL_BATCH_SIZE:
input_blob = self.preprocess(batch_frames)
outputs = self.session.run(None, {self.model_input: input_blob})
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
detections = list_detections[i]
annotation = Annotation(video_name, batch_timestamps[i], detections)
if self.is_valid_annotation(annotation, ai_config):
_, image = cv2.imencode('.jpg', batch_frames[i])
annotation.image = image.tobytes()
self._previous_annotation = annotation
print(annotation.to_str(self.class_names))
self.on_annotation(cmd, annotation)
batch_frames.clear()
batch_timestamps.clear()
v_input.release()
cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list frames = []
cdef list timestamps = []
self._previous_annotation = None
for image in image_paths:
frame = cv2.imread(image)
frames.append(frame)
timestamps.append(0)
input_blob = self.preprocess(frames)
outputs = self.session.run(None, {self.model_input: input_blob})
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
detections = list_detections[i]
annotation = Annotation(image_paths[i], timestamps[i], detections)
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
print(annotation.to_str(self.class_names))
self.on_annotation(cmd, annotation)
cdef stop(self):
self.stop_signal = True
cdef bint is_valid_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
# No detections, invalid
if not annotation.detections:
return False
# First valid annotation, always accept
if self._previous_annotation is None:
return True
# Enough time has passed since last annotation
if annotation.time >= self._previous_annotation.time + <long>(ai_config.frame_recognition_seconds * 1000):
return True
# More objects detected than before
if len(annotation.detections) > len(self._previous_annotation.detections):
return True
cdef:
Detection current_det, prev_det
double dx, dy, distance_sq, min_distance_sq
Detection closest_det
# Check each detection against previous frame
for current_det in annotation.detections:
min_distance_sq = 1e18 # Initialize with large value
closest_det = None
# Find the closest detection in previous frame
for prev_det in self._previous_annotation.detections:
dx = current_det.x - prev_det.x
dy = current_det.y - prev_det.y
distance_sq = dx * dx + dy * dy
if distance_sq < min_distance_sq:
min_distance_sq = distance_sq
closest_det = prev_det
# Check if beyond tracking distance
if min_distance_sq > ai_config.tracking_distance_confidence:
return True
# Check probability increase
if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase:
return True
return False