import mimetypes from pathlib import Path import cv2 import msgpack import numpy as np cimport constants_inf from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus from annotation cimport Detection, Annotation from ai_config cimport AIRecognitionConfig import pynvml from threading import Thread from remote_command_inf cimport RemoteCommand, CommandType cdef int tensor_gpu_index cdef int check_tensor_gpu_index(): try: pynvml.nvmlInit() deviceCount = pynvml.nvmlDeviceGetCount() if deviceCount == 0: constants_inf.logerror('No NVIDIA GPUs found.') return -1 for i in range(deviceCount): handle = pynvml.nvmlDeviceGetHandleByIndex(i) major, minor = pynvml.nvmlDeviceGetCudaComputeCapability(handle) if major > 6 or (major == 6 and minor >= 1): constants_inf.log('found NVIDIA GPU!') return i constants_inf.logerror('NVIDIA GPU doesnt support TensorRT!') return -1 except pynvml.NVMLError: return -1 finally: try: pynvml.nvmlShutdown() except: constants_inf.logerror('Failed to shutdown pynvml cause probably no NVIDIA GPU') pass tensor_gpu_index = check_tensor_gpu_index() if tensor_gpu_index > -1: from tensorrt_engine import TensorRTEngine else: from onnx_engine import OnnxEngine cdef class Inference: def __init__(self, loader_client, remote_handler): self.loader_client = loader_client self.remote_handler = remote_handler self.stop_signal = False self.model_input = None self.model_width = 0 self.model_height = 0 self.detection_counts = {} self.engine = None self.is_building_engine = False self.ai_availability_status = AIAvailabilityStatus() self._converted_model_bytes = None self.init_ai() cdef bytes get_onnx_engine_bytes(self): models_dir = constants_inf.MODELS_FOLDER self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING) res = self.loader_client.load_big_small_resource(constants_inf.AI_ONNX_MODEL_FILE, models_dir) if res.err is not None: raise Exception(res.err) return res.data cdef convert_and_upload_model(self, bytes onnx_engine_bytes, str engine_filename): try: self.ai_availability_status.set_status(AIAvailabilityEnum.CONVERTING) models_dir = constants_inf.MODELS_FOLDER model_bytes = TensorRTEngine.convert_from_onnx(onnx_engine_bytes) self.ai_availability_status.set_status(AIAvailabilityEnum.UPLOADING) res = self.loader_client.upload_big_small_resource(model_bytes, engine_filename, models_dir) if res.err is not None: self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, f"Failed to upload converted model: {res.err}") self._converted_model_bytes = model_bytes self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) self._converted_model_bytes = None finally: self.is_building_engine = False cdef init_ai(self): constants_inf.log( 'init AI...') try: if self.engine is not None: return if self.is_building_engine: return if self._converted_model_bytes is not None: try: self.engine = TensorRTEngine(self._converted_model_bytes) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) self.model_height, self.model_width = self.engine.get_input_shape() except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) finally: self._converted_model_bytes = None # Consume the bytes return models_dir = constants_inf.MODELS_FOLDER if tensor_gpu_index > -1: try: engine_filename = TensorRTEngine.get_engine_filename(0) self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING) res = self.loader_client.load_big_small_resource(engine_filename, models_dir) if res.err is not None: raise Exception(res.err) self.engine = TensorRTEngine(res.data) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, str(e)) onnx_engine_bytes = self.get_onnx_engine_bytes() self.is_building_engine = True thread = Thread(target=self.convert_and_upload_model, args=(onnx_engine_bytes, engine_filename)) thread.daemon = True thread.start() return else: self.engine = OnnxEngine(self.get_onnx_engine_bytes()) self.is_building_engine = False self.model_height, self.model_width = self.engine.get_input_shape() except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) self.is_building_engine = False cdef preprocess(self, frames): blobs = [cv2.dnn.blobFromImage(frame, scalefactor=1.0 / 255.0, size=(self.model_width, self.model_height), mean=(0, 0, 0), swapRB=True, crop=False) for frame in frames] return np.vstack(blobs) cdef postprocess(self, output, ai_config): cdef list[Detection] detections = [] cdef int ann_index cdef float x1, y1, x2, y2, conf, cx, cy, w, h cdef int class_id cdef list[list[Detection]] results = [] try: for ann_index in range(len(output[0])): detections.clear() for det in output[0][ann_index]: if det[4] == 0: # if confidence is 0 then valid points are over. break x1 = det[0] / self.model_width y1 = det[1] / self.model_height x2 = det[2] / self.model_width y2 = det[3] / self.model_height conf = round(det[4], 2) class_id = int(det[5]) x = (x1 + x2) / 2 y = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 if conf >= ai_config.probability_threshold: detections.append(Detection(x, y, w, h, class_id, conf)) filtered_detections = self.remove_overlapping_detections(detections, ai_config.tracking_intersection_threshold) results.append(filtered_detections) return results except Exception as e: raise RuntimeError(f"Failed to postprocess: {str(e)}") cdef remove_overlapping_detections(self, list[Detection] detections, float confidence_threshold=0.6): cdef Detection det1, det2 filtered_output = [] filtered_out_indexes = [] for det1_index in range(len(detections)): if det1_index in filtered_out_indexes: continue det1 = detections[det1_index] res = det1_index for det2_index in range(det1_index + 1, len(detections)): det2 = detections[det2_index] if det1.overlaps(det2, confidence_threshold): if det1.confidence > det2.confidence or ( det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id filtered_out_indexes.append(det2_index) else: filtered_out_indexes.append(res) res = det2_index filtered_output.append(detections[res]) filtered_out_indexes.append(res) return filtered_output cdef bint is_video(self, str filepath): mime_type, _ = mimetypes.guess_type(filepath) return mime_type and mime_type.startswith("video") cdef split_list_extend(self, lst, chunk_size): chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] # If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element. last_chunk = chunks[len(chunks) - 1] if len(last_chunk) < chunk_size: last_elem = last_chunk[len(last_chunk)-1] while len(last_chunk) < chunk_size: last_chunk.append(last_elem) return chunks cdef run_inference(self, RemoteCommand cmd): cdef list[str] videos = [] cdef list[str] images = [] cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_msgpack(cmd.data) if ai_config is None: raise Exception('ai recognition config is empty') self.stop_signal = False self.init_ai() if self.engine is None: constants_inf.log( "AI engine not available. Conversion may be in progress. Skipping inference.") response = RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, self.ai_availability_status.serialize()) self.remote_handler.send(cmd.client_id, response) return self.detection_counts = {} for p in ai_config.paths: media_name = Path(p).stem.replace(" ", "") self.detection_counts[media_name] = 0 if self.is_video(p): videos.append(p) else: images.append(p) # images first, it's faster if len(images) > 0: constants_inf.log(f'run inference on {" ".join(images)}...') self._process_images(cmd, ai_config, images) if len(videos) > 0: for v in videos: constants_inf.log(f'run inference on {v}...') self._process_video(cmd, ai_config, v) cdef _process_video(self, RemoteCommand cmd, AIRecognitionConfig ai_config, str video_name): cdef int frame_count = 0 cdef list batch_frames = [] cdef list[int] batch_timestamps = [] cdef Annotation annotation self._previous_annotation = None v_input = cv2.VideoCapture(video_name) while v_input.isOpened() and not self.stop_signal: ret, frame = v_input.read() if not ret or frame is None: break frame_count += 1 if frame_count % ai_config.frame_period_recognition == 0: batch_frames.append(frame) batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC))) if len(batch_frames) == self.engine.get_batch_size(): input_blob = self.preprocess(batch_frames) outputs = self.engine.run(input_blob) list_detections = self.postprocess(outputs, ai_config) for i in range(len(list_detections)): detections = list_detections[i] original_media_name = Path(video_name).stem.replace(" ", "") name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}' annotation = Annotation(name, original_media_name, batch_timestamps[i], detections) if self.is_valid_video_annotation(annotation, ai_config): _, image = cv2.imencode('.jpg', batch_frames[i]) annotation.image = image.tobytes() self._previous_annotation = annotation self.on_annotation(cmd, annotation) batch_frames.clear() batch_timestamps.clear() v_input.release() cdef on_annotation(self, RemoteCommand cmd, Annotation annotation): cdef RemoteCommand response = RemoteCommand(CommandType.INFERENCE_DATA, annotation.serialize()) self.remote_handler.send(cmd.client_id, response) cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths): cdef list frame_data self._tile_detections = {} for path in image_paths: frame_data = [] frame = cv2.imread(path) img_h, img_w, _ = frame.shape if frame is None: constants_inf.logerror(f'Failed to read image {path}') continue original_media_name = Path( path).stem.replace(" ", "") if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width: frame_data.append((frame, original_media_name, f'{original_media_name}_000000')) else: res = self.split_to_tiles(frame, path, ai_config.tile_size, ai_config.big_image_tile_overlap_percent) frame_data.extend(res) if len(frame_data) > self.engine.get_batch_size(): for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()): self._process_images_inner(cmd, ai_config, chunk) for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()): self._process_images_inner(cmd, ai_config, chunk) cdef split_to_tiles(self, frame, path, tile_size, overlap_percent): constants_inf.log(f'splitting image {path} to tiles...') img_h, img_w, _ = frame.shape stride_w = int(tile_size * (1 - overlap_percent / 100)) stride_h = int(tile_size * (1 - overlap_percent / 100)) results = [] original_media_name = Path( path).stem.replace(" ", "") for y in range(0, img_h, stride_h): for x in range(0, img_w, stride_w): x_end = min(x + tile_size, img_w) y_end = min(y + tile_size, img_h) # correct x,y for the close-to-border tiles if x_end - x < tile_size: if img_w - (x - stride_w) <= tile_size: continue # the previous tile already covered the last gap x = img_w - tile_size if y_end - y < tile_size: if img_h - (y - stride_h) <= tile_size: continue # the previous tile already covered the last gap y = img_h - tile_size tile = frame[y:y_end, x:x_end] name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000' results.append((tile, original_media_name, name)) return results cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data): cdef list frames, original_media_names, names cdef Annotation annotation cdef int i frames, original_media_names, names = map(list, zip(*frame_data)) input_blob = self.preprocess(frames) outputs = self.engine.run(input_blob) list_detections = self.postprocess(outputs, ai_config) for i in range(len(list_detections)): annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i]) if self.is_valid_image_annotation(annotation): constants_inf.log( f'Detected {annotation}') _, image = cv2.imencode('.jpg', frames[i]) annotation.image = image.tobytes() self.on_annotation(cmd, annotation) self.detection_counts[original_media_names[i]] = self.detection_counts.get(original_media_names[i], 0) + 1 # Send detecting status for each original media name for media_name in self.detection_counts.keys(): try: status = { "mn": media_name, "dc": self.detection_counts[media_name] } self.remote_handler.send(cmd.client_id, RemoteCommand(CommandType.INFERENCE_STATUS, msgpack.packb(status))) except Exception: pass cdef stop(self): self.stop_signal = True cdef remove_tiled_duplicates(self, Annotation annotation): right = annotation.name.rindex('!') left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX) tile_size_str, x_str, y_str = annotation.name[left:right].split('_') tile_size = int(tile_size_str) x = int(x_str) y = int(y_str) for det in annotation.detections: x1 = det.x * tile_size y1 = det.y * tile_size det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence) detections = self._tile_detections.setdefault(annotation.original_media_name, []) if det_abs in detections: annotation.detections.remove(det) else: detections.append(det_abs) cdef bint is_valid_image_annotation(self, Annotation annotation): if constants_inf.SPLIT_SUFFIX in annotation.name: self.remove_tiled_duplicates(annotation) if not annotation.detections: return False return True cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config): if constants_inf.SPLIT_SUFFIX in annotation.name: self.remove_tiled_duplicates(annotation) if not annotation.detections: return False # First valid annotation, always accept if self._previous_annotation is None: return True # Enough time has passed since last annotation if annotation.time >= self._previous_annotation.time + (ai_config.frame_recognition_seconds * 1000): return True # More objects detected than before if len(annotation.detections) > len(self._previous_annotation.detections): return True cdef: Detection current_det, prev_det double dx, dy, distance_sq, min_distance_sq Detection closest_det # Check each detection against previous frame for current_det in annotation.detections: min_distance_sq = 1e18 # Initialize with large value closest_det = None # Find the closest detection in previous frame for prev_det in self._previous_annotation.detections: dx = current_det.x - prev_det.x dy = current_det.y - prev_det.y distance_sq = dx * dx + dy * dy if distance_sq < min_distance_sq: min_distance_sq = distance_sq closest_det = prev_det # Check if beyond tracking distance dist_px = ai_config.tracking_distance_confidence * self.model_width dist_px_sq = dist_px * dist_px if min_distance_sq > dist_px_sq: return True # Check probability increase if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase: return True return False