splitting python complete

This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-08-12 14:48:56 +03:00
parent fc6e5db795
commit ad782bcbaa
31 changed files with 834 additions and 369 deletions
+87 -42
View File
@@ -1,5 +1,7 @@
import mimetypes
import time
from pathlib import Path
import cv2
import numpy as np
cimport constants_inf
@@ -54,6 +56,8 @@ cdef class Inference:
self.model_input = None
self.model_width = 0
self.model_height = 0
self.tile_width = 0
self.tile_height = 0
self.engine = None
self.is_building_engine = False
@@ -93,7 +97,7 @@ cdef class Inference:
except Exception as e:
updater_callback(f'Error. {str(e)}')
cdef init_ai(self):
cpdef init_ai(self):
if self.engine is not None:
return
@@ -114,6 +118,8 @@ cdef class Inference:
self.engine = OnnxEngine(res.data)
self.model_height, self.model_width = self.engine.get_input_shape()
self.tile_width = self.model_width
self.tile_height = self.model_height
cdef preprocess(self, frames):
blobs = [cv2.dnn.blobFromImage(frame,
@@ -211,11 +217,11 @@ cdef class Inference:
images.append(m)
# images first, it's faster
if len(images) > 0:
constants_inf.log(f'run inference on {" ".join(images)}...')
constants_inf.log(<str>f'run inference on {" ".join(images)}...')
self._process_images(cmd, ai_config, images)
if len(videos) > 0:
for v in videos:
constants_inf.log(f'run inference on {v}...')
constants_inf.log(<str>f'run inference on {v}...')
self._process_video(cmd, ai_config, v)
@@ -223,8 +229,10 @@ cdef class Inference:
cdef int frame_count = 0
cdef list batch_frames = []
cdef list[int] batch_timestamps = []
cdef Annotation annotation
self._previous_annotation = None
v_input = cv2.VideoCapture(<str>video_name)
while v_input.isOpened() and not self.stop_signal:
ret, frame = v_input.read()
@@ -244,8 +252,12 @@ cdef class Inference:
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
detections = list_detections[i]
annotation = Annotation(video_name, batch_timestamps[i], detections)
if self.is_valid_annotation(annotation, ai_config):
original_media_name = Path(<str>video_name).stem.replace(" ", "")
name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}'
annotation = Annotation(name, original_media_name, batch_timestamps[i], detections)
if self.is_valid_video_annotation(annotation, ai_config):
_, image = cv2.imencode('.jpg', batch_frames[i])
annotation.image = image.tobytes()
self._previous_annotation = annotation
@@ -256,71 +268,104 @@ cdef class Inference:
v_input.release()
cpdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list frame_data = []
cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list frame_data
self._tile_detections = {}
for path in image_paths:
frame_data = []
frame = cv2.imread(<str>path)
img_h, img_w, _ = frame.shape
if frame is None:
constants_inf.logerror(<str>f'Failed to read image {path}')
continue
img_h, img_w, _ = frame.shape
original_media_name = Path(<str> path).stem.replace(" ", "")
if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width:
frame_data.append((frame, path))
frame_data.append((frame, original_media_name, f'{original_media_name}_000000'))
else:
(split_frames, split_pats) = self.split_to_tiles(frame, path, img_w, img_h, ai_config.big_image_tile_overlap_percent)
frame_data.extend(zip(split_frames, split_pats))
res = self.split_to_tiles(frame, path, ai_config.big_image_tile_overlap_percent)
frame_data.extend(res)
if len(frame_data) > self.engine.get_batch_size():
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk)
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk)
cpdef split_to_tiles(self, frame, path, img_w, img_h, overlap_percent):
stride_w = self.model_width * (1 - overlap_percent / 100)
stride_h = self.model_height * (1 - overlap_percent / 100)
n_tiles_x = int(np.ceil((img_w - self.model_width) / stride_w)) + 1
n_tiles_y = int(np.ceil((img_h - self.model_height) / stride_h)) + 1
cpdef split_to_tiles(self, frame, path, overlap_percent):
constants_inf.log(<str>f'splitting image {path} to tiles...')
img_h, img_w, _ = frame.shape
stride_w = int(self.tile_width * (1 - overlap_percent / 100))
stride_h = int(self.tile_height * (1 - overlap_percent / 100))
results = []
for y_idx in range(n_tiles_y):
for x_idx in range(n_tiles_x):
y_start = y_idx * stride_w
x_start = x_idx * stride_h
original_media_name = Path(<str> path).stem.replace(" ", "")
for y in range(0, img_h, stride_h):
for x in range(0, img_w, stride_w):
x_end = min(x + self.tile_width, img_w)
y_end = min(y + self.tile_height, img_h)
# Ensure the tile doesn't go out of bounds
y_end = min(y_start + self.model_width, img_h)
x_end = min(x_start + self.model_height, img_w)
# correct x,y for the close-to-border tiles
if x_end - x < self.tile_width:
if img_w - (x - stride_w) <= self.tile_width:
continue # the previous tile already covered the last gap
x = img_w - self.tile_width
if y_end - y < self.tile_height:
if img_h - (y - stride_h) <= self.tile_height:
continue # the previous tile already covered the last gap
y = img_h - self.tile_height
# We need to re-calculate start if we are at the edge to get a full 1280x1280 tile
if y_end == img_h:
y_start = img_h - self.model_height
if x_end == img_w:
x_start = img_w - self.model_width
tile = frame[y_start:y_end, x_start:x_end]
name = path.stem + f'.tile_{x_start}_{y_start}' + path.suffix
results.append((tile, name))
tile = frame[y:y_end, x:x_end]
name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{x:04d}_{y:04d}!_000000'
results.append((tile, original_media_name, name))
return results
cpdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data):
frames = [frame for frame, _ in frame_data]
cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data):
cdef list frames, original_media_names, names
cdef Annotation annotation
frames, original_media_names, names = map(list, zip(*frame_data))
input_blob = self.preprocess(frames)
outputs = self.engine.run(input_blob)
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
detections = list_detections[i]
annotation = Annotation(frame_data[i][1], 0, detections)
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
self.on_annotation(cmd, annotation)
annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i])
if self.is_valid_image_annotation(annotation):
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
self.on_annotation(cmd, annotation)
cdef stop(self):
self.stop_signal = True
cdef bint is_valid_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
# No detections, invalid
cdef remove_tiled_duplicates(self, Annotation annotation):
right = annotation.name.rindex('!')
left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX)
x_str, y_str = annotation.name[left:right].split('_')
x = int(x_str)
y = int(y_str)
for det in annotation.detections:
x1 = det.x * self.tile_width
y1 = det.y * self.tile_height
det_abs = Detection(x + x1, y + y1, det.w * self.tile_width, det.h * self.tile_height, det.cls, det.confidence)
detections = self._tile_detections.setdefault(annotation.original_media_name, [])
if det_abs in detections:
annotation.detections.remove(det)
else:
detections.append(det_abs)
cdef bint is_valid_image_annotation(self, Annotation annotation):
if constants_inf.SPLIT_SUFFIX in annotation.name:
self.remove_tiled_duplicates(annotation)
if not annotation.detections:
return False
return True
cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
if constants_inf.SPLIT_SUFFIX in annotation.name:
self.remove_tiled_duplicates(annotation)
if not annotation.detections:
return False