import json import mimetypes import os import subprocess import sys import time import cv2 import numpy as np cimport constants from remote_command cimport RemoteCommand from annotation cimport Detection, Annotation from ai_config cimport AIRecognitionConfig from hardware_service cimport HardwareService from security cimport Security if HardwareService.has_nvidia_gpu(): from tensorrt_engine import TensorRTEngine else: from onnx_engine import OnnxEngine cdef class Inference: def __init__(self, api_client, on_annotation): self.api_client = api_client self.on_annotation = on_annotation self.stop_signal = False self.model_input = None self.model_width = 0 self.model_height = 0 self.engine = None self.is_building_engine = False cdef build_tensor_engine(self): is_nvidia = HardwareService.has_nvidia_gpu() if not is_nvidia: return engine_filename = TensorRTEngine.get_engine_filename(0) key = Security.get_model_encryption_key() models_dir = constants.MODELS_FOLDER if not os.path.exists(os.path.join( models_dir, f'{engine_filename}.big')): #TODO: Check cdn on engine exists, if there is, download self.is_building_engine = True time.sleep(8) # prevent simultaneously loading dll and models onnx_model = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key) model_bytes = TensorRTEngine.convert_from_onnx(onnx_model) self.api_client.upload_big_small_resource(model_bytes, engine_filename, models_dir, key) print('uploaded ') self.is_building_engine = False else: print('tensor rt engine is here, no need to build') cdef init_ai(self): if self.engine is not None: return is_nvidia = HardwareService.has_nvidia_gpu() key = Security.get_model_encryption_key() models_dir = constants.MODELS_FOLDER if is_nvidia: while self.is_building_engine: time.sleep(1) engine_filename = TensorRTEngine.get_engine_filename(0) model_bytes = self.api_client.load_big_small_resource(engine_filename, models_dir, key) self.engine = TensorRTEngine(model_bytes) else: model_bytes = self.api_client.load_big_small_resource(constants.AI_ONNX_MODEL_FILE, models_dir, key) self.engine = OnnxEngine(model_bytes) self.model_height, self.model_width = self.engine.get_input_shape() cdef preprocess(self, frames): blobs = [cv2.dnn.blobFromImage(frame, scalefactor=1.0 / 255.0, size=(self.model_width, self.model_height), mean=(0, 0, 0), swapRB=True, crop=False) for frame in frames] return np.vstack(blobs) cdef postprocess(self, output, ai_config): cdef list[Detection] detections = [] cdef int ann_index cdef float x1, y1, x2, y2, conf, cx, cy, w, h cdef int class_id cdef list[list[Detection]] results = [] try: for ann_index in range(len(output[0])): detections.clear() for det in output[0][ann_index]: if det[4] == 0: # if confidence is 0 then valid points are over. break x1 = det[0] / self.model_width y1 = det[1] / self.model_height x2 = det[2] / self.model_width y2 = det[3] / self.model_height conf = round(det[4], 2) class_id = int(det[5]) x = (x1 + x2) / 2 y = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 if conf >= ai_config.probability_threshold: detections.append(Detection(x, y, w, h, class_id, conf)) filtered_detections = self.remove_overlapping_detections(detections) results.append(filtered_detections) return results except Exception as e: raise RuntimeError(f"Failed to postprocess: {str(e)}") cdef remove_overlapping_detections(self, list[Detection] detections): cdef Detection det1, det2 filtered_output = [] filtered_out_indexes = [] for det1_index in range(len(detections)): if det1_index in filtered_out_indexes: continue det1 = detections[det1_index] res = det1_index for det2_index in range(det1_index + 1, len(detections)): det2 = detections[det2_index] if det1.overlaps(det2): if det1.confidence > det2.confidence or ( det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id filtered_out_indexes.append(det2_index) else: filtered_out_indexes.append(res) res = det2_index filtered_output.append(detections[res]) filtered_out_indexes.append(res) return filtered_output cdef bint is_video(self, str filepath): mime_type, _ = mimetypes.guess_type(filepath) return mime_type and mime_type.startswith("video") cdef split_list_extend(self, lst, chunk_size): chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] # If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element. last_chunk = chunks[len(chunks) - 1] if len(last_chunk) < chunk_size: last_elem = last_chunk[len(last_chunk)-1] while len(last_chunk) < chunk_size: last_chunk.append(last_elem) return chunks cdef run_inference(self, RemoteCommand cmd): cdef list[str] videos = [] cdef list[str] images = [] cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_msgpack(cmd.data) if ai_config is None: raise Exception('ai recognition config is empty') self.stop_signal = False self.init_ai() print(ai_config.paths) for m in ai_config.paths: if self.is_video(m): videos.append(m) else: images.append(m) # images first, it's faster if len(images) > 0: for chunk in self.split_list_extend(images, self.engine.get_batch_size()): print(f'run inference on {" ".join(chunk)}...') self._process_images(cmd, ai_config, chunk) if len(videos) > 0: for v in videos: print(f'run inference on {v}...') self._process_video(cmd, ai_config, v) cdef _process_video(self, RemoteCommand cmd, AIRecognitionConfig ai_config, str video_name): cdef int frame_count = 0 cdef list batch_frames = [] cdef list[int] batch_timestamps = [] self._previous_annotation = None v_input = cv2.VideoCapture(video_name) while v_input.isOpened() and not self.stop_signal: ret, frame = v_input.read() if not ret or frame is None: break frame_count += 1 if frame_count % ai_config.frame_period_recognition == 0: batch_frames.append(frame) batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC))) if len(batch_frames) == self.engine.get_batch_size(): input_blob = self.preprocess(batch_frames) outputs = self.engine.run(input_blob) list_detections = self.postprocess(outputs, ai_config) for i in range(len(list_detections)): detections = list_detections[i] annotation = Annotation(video_name, batch_timestamps[i], detections) if self.is_valid_annotation(annotation, ai_config): _, image = cv2.imencode('.jpg', batch_frames[i]) annotation.image = image.tobytes() self._previous_annotation = annotation print(annotation) self.on_annotation(cmd, annotation) batch_frames.clear() batch_timestamps.clear() v_input.release() cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths): cdef list frames = [] cdef list timestamps = [] self._previous_annotation = None for image in image_paths: frame = cv2.imread(image) frames.append(frame) timestamps.append(0) input_blob = self.preprocess(frames) outputs = self.engine.run(input_blob) list_detections = self.postprocess(outputs, ai_config) for i in range(len(list_detections)): detections = list_detections[i] annotation = Annotation(image_paths[i], timestamps[i], detections) _, image = cv2.imencode('.jpg', frames[i]) annotation.image = image.tobytes() self.on_annotation(cmd, annotation) cdef stop(self): self.stop_signal = True cdef bint is_valid_annotation(self, Annotation annotation, AIRecognitionConfig ai_config): # No detections, invalid if not annotation.detections: return False # First valid annotation, always accept if self._previous_annotation is None: return True # Enough time has passed since last annotation if annotation.time >= self._previous_annotation.time + (ai_config.frame_recognition_seconds * 1000): return True # More objects detected than before if len(annotation.detections) > len(self._previous_annotation.detections): return True cdef: Detection current_det, prev_det double dx, dy, distance_sq, min_distance_sq Detection closest_det # Check each detection against previous frame for current_det in annotation.detections: min_distance_sq = 1e18 # Initialize with large value closest_det = None # Find the closest detection in previous frame for prev_det in self._previous_annotation.detections: dx = current_det.x - prev_det.x dy = current_det.y - prev_det.y distance_sq = dx * dx + dy * dy if distance_sq < min_distance_sq: min_distance_sq = distance_sq closest_det = prev_det # Check if beyond tracking distance if min_distance_sq > ai_config.tracking_distance_confidence: return True # Check probability increase if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase: return True return False