Files
annotations/Azaion.Inference/inference.pyx
T
Oleksandr Bezdieniezhnykh fde9a9f418 add altitude + camera spec component and calc tile size by this
also restrict detections to be no bigger than in classes.json
2025-09-23 01:48:10 +03:00

501 lines
21 KiB
Cython

import mimetypes
from pathlib import Path
import cv2
import msgpack
import numpy as np
cimport constants_inf
from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus
from annotation cimport Detection, Annotation
from ai_config cimport AIRecognitionConfig
import pynvml
from threading import Thread
from remote_command_inf cimport RemoteCommand, CommandType
cdef int tensor_gpu_index
cdef int check_tensor_gpu_index():
try:
pynvml.nvmlInit()
deviceCount = pynvml.nvmlDeviceGetCount()
if deviceCount == 0:
constants_inf.logerror(<str>'No NVIDIA GPUs found.')
return -1
for i in range(deviceCount):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
major, minor = pynvml.nvmlDeviceGetCudaComputeCapability(handle)
if major > 6 or (major == 6 and minor >= 1):
constants_inf.log(<str>'found NVIDIA GPU!')
return i
constants_inf.logerror(<str>'NVIDIA GPU doesnt support TensorRT!')
return -1
except pynvml.NVMLError:
return -1
finally:
try:
pynvml.nvmlShutdown()
except:
constants_inf.logerror(<str>'Failed to shutdown pynvml cause probably no NVIDIA GPU')
pass
tensor_gpu_index = check_tensor_gpu_index()
if tensor_gpu_index > -1:
from tensorrt_engine import TensorRTEngine
else:
from onnx_engine import OnnxEngine
cdef class Inference:
def __init__(self, loader_client, remote_handler):
self.loader_client = loader_client
self.remote_handler = remote_handler
self.stop_signal = False
self.model_input = None
self.model_width = 0
self.model_height = 0
self.detection_counts = {}
self.engine = None
self.is_building_engine = False
self.ai_availability_status = AIAvailabilityStatus()
self._converted_model_bytes = None
self.init_ai()
cdef bytes get_onnx_engine_bytes(self):
models_dir = constants_inf.MODELS_FOLDER
self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING)
res = self.loader_client.load_big_small_resource(constants_inf.AI_ONNX_MODEL_FILE, models_dir)
if res.err is not None:
raise Exception(res.err)
return res.data
cdef convert_and_upload_model(self, bytes onnx_engine_bytes, str engine_filename):
try:
self.ai_availability_status.set_status(AIAvailabilityEnum.CONVERTING)
models_dir = constants_inf.MODELS_FOLDER
model_bytes = TensorRTEngine.convert_from_onnx(onnx_engine_bytes)
self.ai_availability_status.set_status(AIAvailabilityEnum.UPLOADING)
res = self.loader_client.upload_big_small_resource(model_bytes, engine_filename, models_dir)
if res.err is not None:
self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, <str>f"Failed to upload converted model: {res.err}")
self._converted_model_bytes = model_bytes
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
except Exception as e:
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str> str(e))
self._converted_model_bytes = None
finally:
self.is_building_engine = False
cdef init_ai(self):
constants_inf.log(<str> 'init AI...')
try:
if self.engine is not None:
return
if self.is_building_engine:
return
if self._converted_model_bytes is not None:
try:
self.engine = TensorRTEngine(self._converted_model_bytes)
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
self.model_height, self.model_width = self.engine.get_input_shape()
except Exception as e:
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str> str(e))
finally:
self._converted_model_bytes = None # Consume the bytes
return
models_dir = constants_inf.MODELS_FOLDER
if tensor_gpu_index > -1:
try:
engine_filename = TensorRTEngine.get_engine_filename(0)
self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING)
res = self.loader_client.load_big_small_resource(engine_filename, models_dir)
if res.err is not None:
raise Exception(res.err)
self.engine = TensorRTEngine(res.data)
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
except Exception as e:
self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, <str>str(e))
onnx_engine_bytes = self.get_onnx_engine_bytes()
self.is_building_engine = True
thread = Thread(target=self.convert_and_upload_model, args=(onnx_engine_bytes, engine_filename))
thread.daemon = True
thread.start()
return
else:
self.engine = OnnxEngine(<bytes>self.get_onnx_engine_bytes())
self.is_building_engine = False
self.model_height, self.model_width = self.engine.get_input_shape()
except Exception as e:
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str>str(e))
self.is_building_engine = False
cdef preprocess(self, frames):
blobs = [cv2.dnn.blobFromImage(frame,
scalefactor=1.0 / 255.0,
size=(self.model_width, self.model_height),
mean=(0, 0, 0),
swapRB=True,
crop=False)
for frame in frames]
return np.vstack(blobs)
cdef postprocess(self, output, ai_config):
cdef list[Detection] detections = []
cdef int ann_index
cdef float x1, y1, x2, y2, conf, cx, cy, w, h
cdef int class_id
cdef list[list[Detection]] results = []
try:
for ann_index in range(len(output[0])):
detections.clear()
for det in output[0][ann_index]:
if det[4] == 0: # if confidence is 0 then valid points are over.
break
x1 = det[0] / self.model_width
y1 = det[1] / self.model_height
x2 = det[2] / self.model_width
y2 = det[3] / self.model_height
conf = round(det[4], 2)
class_id = int(det[5])
x = (x1 + x2) / 2
y = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
if conf >= ai_config.probability_threshold:
detections.append(Detection(x, y, w, h, class_id, conf))
filtered_detections = self.remove_overlapping_detections(detections, ai_config.tracking_intersection_threshold)
results.append(filtered_detections)
return results
except Exception as e:
raise RuntimeError(f"Failed to postprocess: {str(e)}")
cdef remove_overlapping_detections(self, list[Detection] detections, float confidence_threshold=0.6):
cdef Detection det1, det2
filtered_output = []
filtered_out_indexes = []
for det1_index in range(len(detections)):
if det1_index in filtered_out_indexes:
continue
det1 = detections[det1_index]
res = det1_index
for det2_index in range(det1_index + 1, len(detections)):
det2 = detections[det2_index]
if det1.overlaps(det2, confidence_threshold):
if det1.confidence > det2.confidence or (
det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id
filtered_out_indexes.append(det2_index)
else:
filtered_out_indexes.append(res)
res = det2_index
filtered_output.append(detections[res])
filtered_out_indexes.append(res)
return filtered_output
cdef bint is_video(self, str filepath):
mime_type, _ = mimetypes.guess_type(<str>filepath)
return mime_type and mime_type.startswith("video")
cdef split_list_extend(self, lst, chunk_size):
chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
# If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element.
last_chunk = chunks[len(chunks) - 1]
if len(last_chunk) < chunk_size:
last_elem = last_chunk[len(last_chunk)-1]
while len(last_chunk) < chunk_size:
last_chunk.append(last_elem)
return chunks
cdef run_inference(self, RemoteCommand cmd):
cdef list[str] videos = []
cdef list[str] images = []
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_msgpack(cmd.data)
if ai_config is None:
raise Exception('ai recognition config is empty')
self.stop_signal = False
self.init_ai()
if self.engine is None:
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
response = RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, self.ai_availability_status.serialize())
self.remote_handler.send(cmd.client_id, <RemoteCommand>response)
return
self.detection_counts = {}
for p in ai_config.paths:
media_name = Path(<str>p).stem.replace(" ", "")
self.detection_counts[media_name] = 0
if self.is_video(p):
videos.append(p)
else:
images.append(p)
# images first, it's faster
if len(images) > 0:
constants_inf.log(<str>f'run inference on {" ".join(images)}...')
self._process_images(cmd, ai_config, images)
if len(videos) > 0:
for v in videos:
constants_inf.log(<str>f'run inference on {v}...')
self._process_video(cmd, ai_config, v)
cdef _process_video(self, RemoteCommand cmd, AIRecognitionConfig ai_config, str video_name):
cdef int frame_count = 0
cdef list batch_frames = []
cdef list[int] batch_timestamps = []
cdef Annotation annotation
self._previous_annotation = None
v_input = cv2.VideoCapture(<str>video_name)
while v_input.isOpened() and not self.stop_signal:
ret, frame = v_input.read()
if not ret or frame is None:
break
frame_count += 1
if frame_count % ai_config.frame_period_recognition == 0:
batch_frames.append(frame)
batch_timestamps.append(int(v_input.get(cv2.CAP_PROP_POS_MSEC)))
if len(batch_frames) == self.engine.get_batch_size():
input_blob = self.preprocess(batch_frames)
outputs = self.engine.run(input_blob)
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
detections = list_detections[i]
original_media_name = Path(<str>video_name).stem.replace(" ", "")
name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}'
annotation = Annotation(name, original_media_name, batch_timestamps[i], detections)
if self.is_valid_video_annotation(annotation, ai_config):
_, image = cv2.imencode('.jpg', batch_frames[i])
annotation.image = image.tobytes()
self._previous_annotation = annotation
self.on_annotation(cmd, annotation)
batch_frames.clear()
batch_timestamps.clear()
v_input.release()
self.send_detection_status(cmd.client_id)
cdef on_annotation(self, RemoteCommand cmd, Annotation annotation):
cdef RemoteCommand response = RemoteCommand(CommandType.INFERENCE_DATA, annotation.serialize())
self.remote_handler.send(cmd.client_id, response)
self.detection_counts[annotation.original_media_name] = self.detection_counts.get(annotation.original_media_name, 0) + 1
cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list frame_data
self._tile_detections = {}
for path in image_paths:
frame_data = []
frame = cv2.imread(<str>path)
img_h, img_w, _ = frame.shape
if frame is None:
constants_inf.logerror(<str>f'Failed to read image {path}')
continue
original_media_name = Path(<str> path).stem.replace(" ", "")
ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
constants_inf.log(<str>f'ground sampling distance: {ground_sampling_distance}')
if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width:
frame_data.append((frame, original_media_name, f'{original_media_name}_000000'))
else:
tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance)
constants_inf.log(<str> f'calc tile size: {tile_size}')
res = self.split_to_tiles(frame, path, tile_size, ai_config.big_image_tile_overlap_percent)
frame_data.extend(res)
if len(frame_data) > self.engine.get_batch_size():
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk, ground_sampling_distance)
self.send_detection_status(cmd.client_id)
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk, ground_sampling_distance)
self.send_detection_status(cmd.client_id)
cdef send_detection_status(self, client_id):
for media_name in self.detection_counts.keys():
try:
status = {
"mn": media_name,
"dc": self.detection_counts[media_name]
}
self.remote_handler.send(client_id, <RemoteCommand>RemoteCommand(CommandType.INFERENCE_STATUS, msgpack.packb(status)))
except Exception:
pass
self.detection_counts.clear()
cdef split_to_tiles(self, frame, path, tile_size, overlap_percent):
constants_inf.log(<str>f'splitting image {path} to tiles...')
img_h, img_w, _ = frame.shape
stride_w = int(tile_size * (1 - overlap_percent / 100))
stride_h = int(tile_size * (1 - overlap_percent / 100))
results = []
original_media_name = Path(<str> path).stem.replace(" ", "")
for y in range(0, img_h, stride_h):
for x in range(0, img_w, stride_w):
x_end = min(x + tile_size, img_w)
y_end = min(y + tile_size, img_h)
# correct x,y for the close-to-border tiles
if x_end - x < tile_size:
if img_w - (x - stride_w) <= tile_size:
continue # the previous tile already covered the last gap
x = img_w - tile_size
if y_end - y < tile_size:
if img_h - (y - stride_h) <= tile_size:
continue # the previous tile already covered the last gap
y = img_h - tile_size
tile = frame[y:y_end, x:x_end]
name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000'
results.append((tile, original_media_name, name))
return results
cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data, double ground_sampling_distance):
cdef list frames, original_media_names, names
cdef Annotation annotation
cdef int i
frames, original_media_names, names = map(list, zip(*frame_data))
input_blob = self.preprocess(frames)
outputs = self.engine.run(input_blob)
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i])
if self.is_valid_image_annotation(annotation, ground_sampling_distance, frames[i].shape):
constants_inf.log(<str> f'Detected {annotation}')
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
self.on_annotation(cmd, annotation)
cdef stop(self):
self.stop_signal = True
cdef remove_tiled_duplicates(self, Annotation annotation):
# Parse tile info from the annotation name
right = annotation.name.rindex('!')
left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX)
tile_size_str, x_str, y_str = annotation.name[left:right].split('_')
tile_size = int(tile_size_str)
x = int(x_str)
y = int(y_str)
# This will be our new, filtered list of detections
cdef list[Detection] unique_detections = []
existing_abs_detections = self._tile_detections.setdefault(annotation.original_media_name, [])
for det in annotation.detections:
# Calculate the absolute position and size of the detection
x1 = det.x * tile_size
y1 = det.y * tile_size
det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence)
# If it's not a duplicate, keep it and update the cache
if det_abs not in existing_abs_detections:
unique_detections.append(det)
existing_abs_detections.append(det_abs)
annotation.detections = unique_detections
cdef bint is_valid_image_annotation(self, Annotation annotation, double ground_sampling_distance, frame_shape):
if constants_inf.SPLIT_SUFFIX in annotation.name:
self.remove_tiled_duplicates(annotation)
img_h, img_w, _ = frame_shape
if annotation.detections:
constants_inf.log(<str> f'Initial ann: {annotation}')
cdef list[Detection] valid_detections = []
for det in annotation.detections:
m_w = det.w * img_w * ground_sampling_distance
m_h = det.h * img_h * ground_sampling_distance
max_size = constants_inf.annotations_dict[det.cls].max_object_size_meters
if m_w <= max_size and m_h <= max_size:
valid_detections.append(det)
constants_inf.log(<str> f'Kept ({m_w} {m_h}) <= {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
else:
constants_inf.log(<str> f'Removed ({m_w} {m_h}) > {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
# Replace the old list with the new, filtered one
annotation.detections = valid_detections
if not annotation.detections:
return False
return True
cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
if constants_inf.SPLIT_SUFFIX in annotation.name:
self.remove_tiled_duplicates(annotation)
if not annotation.detections:
return False
# First valid annotation, always accept
if self._previous_annotation is None:
return True
# Enough time has passed since last annotation
if annotation.time >= self._previous_annotation.time + <long>(ai_config.frame_recognition_seconds * 1000):
return True
# More objects detected than before
if len(annotation.detections) > len(self._previous_annotation.detections):
return True
cdef:
Detection current_det, prev_det
double dx, dy, distance_sq, min_distance_sq
Detection closest_det
# Check each detection against previous frame
for current_det in annotation.detections:
min_distance_sq = 1e18 # Initialize with large value
closest_det = None
# Find the closest detection in previous frame
for prev_det in self._previous_annotation.detections:
dx = current_det.x - prev_det.x
dy = current_det.y - prev_det.y
distance_sq = dx * dx + dy * dy
if distance_sq < min_distance_sq:
min_distance_sq = distance_sq
closest_det = prev_det
# Check if beyond tracking distance
dist_px = ai_config.tracking_distance_confidence * self.model_width
dist_px_sq = dist_px * dist_px
if min_distance_sq > dist_px_sq:
return True
# Check probability increase
if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase:
return True
return False