import mimetypes import time import cv2 import numpy as np import onnxruntime as onnx from remote_command cimport RemoteCommand from annotation cimport Detection, Annotation from ai_config cimport AIRecognitionConfig cdef class Inference: def __init__(self, model_bytes, on_annotation): self.stop_signal = False self.session = onnx.InferenceSession( model_bytes, providers=["CUDAExecutionProvider", "CPUExecutionProvider"] ) self.on_annotation = on_annotation self.ai_config = AIRecognitionConfig(4, 2, 0.25, 0.15, 15, 0.8, b'') model_inputs = self.session.get_inputs() self.model_input = model_inputs[0].name input_shape = model_inputs[0].shape self.model_width = input_shape[2] self.model_height = input_shape[3] print(f'AI detection model input: {self.model_input} ({self.model_width}, {self.model_height})') model_meta = self.session.get_modelmeta() print("Metadata:", model_meta.custom_metadata_map) cdef preprocess(self, frame): img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (self.model_width, self.model_height)) image_data = np.array(img) / 255.0 image_data = np.transpose(image_data, (2, 0, 1)) # Channel first image_data = np.expand_dims(image_data, axis=0).astype(np.float32) return image_data cdef postprocess(self, output, int img_width, int img_height): outputs = np.transpose(np.squeeze(output[0])) rows = outputs.shape[0] boxes = [] scores = [] class_ids = [] x_factor = img_width / self.model_width y_factor = img_height / self.model_height for i in range(rows): classes_scores = outputs[i][4:] max_score = np.amax(classes_scores) if max_score >= self.ai_config.probability_threshold: class_id = np.argmax(classes_scores) x, y, w, h = outputs[i][0], outputs[i][1], outputs[i][2], outputs[i][3] left = int((x - w / 2) * x_factor) top = int((y - h / 2) * y_factor) width = int(w * x_factor) height = int(h * y_factor) class_ids.append(class_id) scores.append(max_score) boxes.append([left, top, width, height]) indices = cv2.dnn.NMSBoxes(boxes, scores, self.ai_config.probability_threshold, 0.45) detections = [] for i in indices: x, y, w, h = boxes[i] detections.append(Detection(x, y, w, h, class_ids[i], scores[i])) return detections cdef bint is_video(self, str filepath): mime_type, _ = mimetypes.guess_type(filepath) return mime_type and mime_type.startswith("video") cdef run_inference(self, RemoteCommand cmd, int batch_size=8): print('run inference..') self.ai_config = AIRecognitionConfig.from_msgpack(cmd.data) self.stop_signal = False if self.is_video(cmd.filename): self._process_video(cmd, batch_size) else: self._process_image(cmd) cdef _process_video(self, RemoteCommand cmd, int batch_size): frame_count = 0 batch_frame = [] self._previous_annotation = None self.start_video_time = time.time() v_input = cv2.VideoCapture(cmd.filename) while v_input.isOpened(): ret, frame = v_input.read() if not ret or frame is None: break frame_count += 1 if frame_count % self.ai_config.frame_period_recognition == 0: ms = int(v_input.get(cv2.CAP_PROP_POS_MSEC)) annotation = self.detect_frame(frame, ms) if annotation is not None: self._previous_annotation = annotation self.on_annotation(annotation) cdef detect_frame(self, frame, long time): cdef Annotation annotation img_height, img_width = frame.shape[:2] start_time = time.time() img_data = self.preprocess(frame) preprocess_time = time.time() outputs = self.session.run(None, {self.model_input: img_data}) inference_time = time.time() detections = self.postprocess(outputs, img_width, img_height) postprocess_time = time.time() print(f'video time, ms: {time / 1000:.3f}. total time, s : {postprocess_time - self.start_video_time:.3f} ' f'preprocess time: {preprocess_time - start_time:.3f}, inference time: {inference_time - preprocess_time:.3f},' f' postprocess time: {postprocess_time - inference_time:.3f}, total time: {postprocess_time - start_time:.3f}') if len(detections) > 0: annotation = Annotation(frame, time, detections) if self.is_valid_annotation(annotation): _, image = cv2.imencode('.jpg', frame) annotation.image = image.tobytes() return annotation return None cdef _process_image(self, RemoteCommand cmd): self._previous_annotation = None frame = cv2.imread(cmd.filename) annotation = self.detect_frame(frame, 0) if annotation is None: _, image = cv2.imencode('.jpg', frame) annotation = Annotation(frame, time, []) annotation.image = image.tobytes() self.on_annotation(cmd, annotation) cdef stop(self): self.stop_signal = True cdef bint is_valid_annotation(self, Annotation annotation): # No detections, invalid if not annotation.detections: return False # First valid annotation, always accept if self._previous_annotation is None: return True # Enough time has passed since last annotation if annotation.time >= self._previous_annotation.time + (self.ai_config.frame_recognition_seconds * 1000): return True # More objects detected than before if len(annotation.detections) > len(self._previous_annotation.detections): return True cdef: Detection current_det, prev_det double dx, dy, distance_sq, min_distance_sq Detection closest_det # Check each detection against previous frame for current_det in annotation.detections: min_distance_sq = 1e18 # Initialize with large value closest_det = None # Find the closest detection in previous frame for prev_det in self._previous_annotation.detections: dx = current_det.x - prev_det.x dy = current_det.y - prev_det.y distance_sq = dx * dx + dy * dy if distance_sq < min_distance_sq: min_distance_sq = distance_sq closest_det = prev_det # Check if beyond tracking distance if min_distance_sq > self.ai_config.tracking_distance_confidence: return True # Check probability increase if current_det.confidence >= closest_det.confidence + self.ai_config.tracking_probability_increase: return True return False