mirror of
https://github.com/azaion/detections.git
synced 2026-04-22 06:56:31 +00:00
27f4aceb52
- Updated the `Inference` class to replace the `get_onnx_engine_bytes` method with `download_model`, allowing for dynamic model loading based on a specified filename. - Modified the `convert_and_upload_model` method to accept `source_bytes` instead of `onnx_engine_bytes`, enhancing flexibility in model conversion. - Introduced a new property `engine_name` to the `Inference` class for better access to engine details. - Adjusted the `AIRecognitionConfig` structure to include a new method pointer `from_dict`, improving configuration handling. - Updated various test cases to reflect changes in model paths and timeout settings, ensuring consistency and reliability in testing.
526 lines
23 KiB
Cython
526 lines
23 KiB
Cython
import mimetypes
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
cimport constants_inf
|
|
|
|
from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus
|
|
from annotation cimport Detection, Annotation
|
|
from ai_config cimport AIRecognitionConfig
|
|
from engines.inference_engine cimport InferenceEngine
|
|
from loader_http_client cimport LoaderHttpClient
|
|
from threading import Thread
|
|
from engines import EngineClass
|
|
|
|
|
|
cdef class Inference:
|
|
cdef LoaderHttpClient loader_client
|
|
cdef InferenceEngine engine
|
|
cdef object _annotation_callback
|
|
cdef object _status_callback
|
|
cdef Annotation _previous_annotation
|
|
cdef dict[str, list[Detection]] _tile_detections
|
|
cdef dict[str, int] detection_counts
|
|
cdef AIRecognitionConfig ai_config
|
|
cdef bint stop_signal
|
|
cdef public AIAvailabilityStatus ai_availability_status
|
|
cdef str model_input
|
|
cdef int model_width
|
|
cdef int model_height
|
|
cdef bytes _converted_model_bytes
|
|
cdef bint is_building_engine
|
|
|
|
def __init__(self, loader_client):
|
|
self.loader_client = loader_client
|
|
self._annotation_callback = None
|
|
self._status_callback = None
|
|
self.stop_signal = <bint>False
|
|
self.model_input = <str>None
|
|
self.model_width = 0
|
|
self.model_height = 0
|
|
self.detection_counts = {}
|
|
self.engine = <InferenceEngine>None
|
|
self.is_building_engine = <bint>False
|
|
self.ai_availability_status = AIAvailabilityStatus()
|
|
self._converted_model_bytes = <bytes>None
|
|
self.init_ai()
|
|
|
|
@property
|
|
def is_engine_ready(self):
|
|
return self.engine is not None
|
|
|
|
@property
|
|
def engine_name(self):
|
|
if self.engine is not None:
|
|
return self.engine.engine_name
|
|
return None
|
|
|
|
|
|
cdef bytes download_model(self, str filename):
|
|
models_dir = constants_inf.MODELS_FOLDER
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING)
|
|
res = self.loader_client.load_big_small_resource(filename, models_dir)
|
|
if res.err is not None:
|
|
raise Exception(res.err)
|
|
return res.data
|
|
|
|
cdef convert_and_upload_model(self, bytes source_bytes, str engine_filename):
|
|
try:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.CONVERTING)
|
|
models_dir = constants_inf.MODELS_FOLDER
|
|
model_bytes = EngineClass.convert_from_source(source_bytes)
|
|
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.UPLOADING)
|
|
res = self.loader_client.upload_big_small_resource(model_bytes, engine_filename, models_dir)
|
|
if res.err is not None:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, <str>f"Failed to upload converted model: {res.err}")
|
|
|
|
self._converted_model_bytes = model_bytes
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
|
|
except Exception as e:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str> str(e))
|
|
self._converted_model_bytes = <bytes>None
|
|
finally:
|
|
self.is_building_engine = <bint>False
|
|
|
|
cdef init_ai(self):
|
|
constants_inf.log(<str> 'init AI...')
|
|
try:
|
|
if self.engine is not None:
|
|
return
|
|
if self.is_building_engine:
|
|
return
|
|
|
|
if self._converted_model_bytes is not None:
|
|
try:
|
|
self.engine = EngineClass(self._converted_model_bytes)
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
|
|
self.model_height, self.model_width = self.engine.get_input_shape()
|
|
except Exception as e:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str> str(e))
|
|
finally:
|
|
self._converted_model_bytes = <bytes>None
|
|
return
|
|
|
|
models_dir = constants_inf.MODELS_FOLDER
|
|
engine_filename = EngineClass.get_engine_filename()
|
|
if engine_filename is not None:
|
|
try:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING)
|
|
res = self.loader_client.load_big_small_resource(engine_filename, models_dir)
|
|
if res.err is not None:
|
|
raise Exception(res.err)
|
|
self.engine = EngineClass(res.data)
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
|
|
except Exception as e:
|
|
source_filename = EngineClass.get_source_filename()
|
|
if source_filename is None:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str>f"Pre-built engine not found: {str(e)}")
|
|
return
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, <str>str(e))
|
|
source_bytes = self.download_model(source_filename)
|
|
self.is_building_engine = True
|
|
|
|
thread = Thread(target=self.convert_and_upload_model, args=(source_bytes, engine_filename))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return
|
|
else:
|
|
self.engine = EngineClass(<bytes>self.download_model(constants_inf.AI_ONNX_MODEL_FILE))
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED)
|
|
self.is_building_engine = False
|
|
|
|
self.model_height, self.model_width = self.engine.get_input_shape()
|
|
except Exception as e:
|
|
self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, <str>str(e))
|
|
self.is_building_engine = False
|
|
|
|
|
|
cdef preprocess(self, frames):
|
|
blobs = [cv2.dnn.blobFromImage(frame,
|
|
scalefactor=1.0 / 255.0,
|
|
size=(self.model_width, self.model_height),
|
|
mean=(0, 0, 0),
|
|
swapRB=True,
|
|
crop=False)
|
|
for frame in frames]
|
|
return np.vstack(blobs)
|
|
|
|
cdef postprocess(self, output, ai_config):
|
|
cdef list[Detection] detections = []
|
|
cdef int ann_index
|
|
cdef float x1, y1, x2, y2, conf, cx, cy, w, h
|
|
cdef int class_id
|
|
cdef list[list[Detection]] results = []
|
|
try:
|
|
for ann_index in range(len(output[0])):
|
|
detections.clear()
|
|
for det in output[0][ann_index]:
|
|
if det[4] == 0: # if confidence is 0 then valid points are over.
|
|
break
|
|
x1 = det[0] / self.model_width
|
|
y1 = det[1] / self.model_height
|
|
x2 = det[2] / self.model_width
|
|
y2 = det[3] / self.model_height
|
|
conf = round(det[4], 2)
|
|
class_id = int(det[5])
|
|
|
|
x = (x1 + x2) / 2
|
|
y = (y1 + y2) / 2
|
|
w = x2 - x1
|
|
h = y2 - y1
|
|
if conf >= ai_config.probability_threshold:
|
|
detections.append(Detection(x, y, w, h, class_id, conf))
|
|
filtered_detections = self.remove_overlapping_detections(detections, ai_config.tracking_intersection_threshold)
|
|
results.append(filtered_detections)
|
|
return results
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to postprocess: {str(e)}")
|
|
|
|
cdef remove_overlapping_detections(self, list[Detection] detections, float confidence_threshold=0.6):
|
|
cdef Detection det1, det2
|
|
filtered_output = []
|
|
filtered_out_indexes = []
|
|
|
|
for det1_index in range(len(detections)):
|
|
if det1_index in filtered_out_indexes:
|
|
continue
|
|
det1 = detections[det1_index]
|
|
res = det1_index
|
|
for det2_index in range(det1_index + 1, len(detections)):
|
|
det2 = detections[det2_index]
|
|
if det1.overlaps(det2, confidence_threshold):
|
|
if det1.confidence > det2.confidence or (
|
|
det1.confidence == det2.confidence and det1.cls < det2.cls): # det1 has higher confidence or lower class_id
|
|
filtered_out_indexes.append(det2_index)
|
|
else:
|
|
filtered_out_indexes.append(res)
|
|
res = det2_index
|
|
filtered_output.append(detections[res])
|
|
filtered_out_indexes.append(res)
|
|
return filtered_output
|
|
|
|
cdef bint is_video(self, str filepath):
|
|
mime_type, _ = mimetypes.guess_type(<str>filepath)
|
|
return <bint>(mime_type and mime_type.startswith("video"))
|
|
|
|
cdef split_list_extend(self, lst, chunk_size):
|
|
chunks = [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
|
|
|
|
# If the last chunk is smaller than the desired chunk_size, extend it by duplicating its last element.
|
|
last_chunk = chunks[len(chunks) - 1]
|
|
if len(last_chunk) < chunk_size:
|
|
last_elem = last_chunk[len(last_chunk)-1]
|
|
while len(last_chunk) < chunk_size:
|
|
last_chunk.append(last_elem)
|
|
return chunks
|
|
|
|
cpdef run_detect(self, dict config_dict, object annotation_callback, object status_callback=None):
|
|
cdef list[str] videos = []
|
|
cdef list[str] images = []
|
|
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_dict(config_dict)
|
|
if ai_config is None:
|
|
raise Exception('ai recognition config is empty')
|
|
|
|
self._annotation_callback = annotation_callback
|
|
self._status_callback = status_callback
|
|
self.stop_signal = <bint>False
|
|
self.init_ai()
|
|
if self.engine is None:
|
|
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
|
|
return
|
|
|
|
self.detection_counts = {}
|
|
for p in ai_config.paths:
|
|
media_name = Path(<str>p).stem.replace(" ", "")
|
|
self.detection_counts[media_name] = 0
|
|
if self.is_video(p):
|
|
videos.append(p)
|
|
else:
|
|
images.append(p)
|
|
if len(images) > 0:
|
|
constants_inf.log(<str>f'run inference on {" ".join(images)}...')
|
|
self._process_images(ai_config, images)
|
|
if len(videos) > 0:
|
|
for v in videos:
|
|
constants_inf.log(<str>f'run inference on {v}...')
|
|
self._process_video(ai_config, v)
|
|
|
|
cpdef list detect_single_image(self, bytes image_bytes, dict config_dict):
|
|
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_dict(config_dict)
|
|
self.init_ai()
|
|
if self.engine is None:
|
|
raise RuntimeError("AI engine not available")
|
|
|
|
img_array = np.frombuffer(image_bytes, dtype=np.uint8)
|
|
frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if frame is None:
|
|
raise ValueError("Invalid image data")
|
|
|
|
cdef int batch_size = self.engine.get_batch_size()
|
|
frames = [frame] * batch_size
|
|
input_blob = self.preprocess(frames)
|
|
outputs = self.engine.run(input_blob)
|
|
list_detections = self.postprocess(outputs, ai_config)
|
|
if not list_detections:
|
|
return []
|
|
|
|
cdef list[Detection] detections = list_detections[0]
|
|
if ai_config.focal_length > 0 and ai_config.sensor_width > 0:
|
|
img_h, img_w = frame.shape[0], frame.shape[1]
|
|
gsd = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
|
|
detections = [
|
|
d for d in detections
|
|
if d.w * img_w * gsd <= constants_inf.annotations_dict[d.cls].max_object_size_meters
|
|
and d.h * img_h * gsd <= constants_inf.annotations_dict[d.cls].max_object_size_meters
|
|
]
|
|
return detections
|
|
|
|
cdef _process_video(self, AIRecognitionConfig ai_config, str video_name):
|
|
cdef int frame_count = 0
|
|
cdef int batch_count = 0
|
|
cdef list batch_frames = []
|
|
cdef list[long] batch_timestamps = []
|
|
cdef Annotation annotation
|
|
self._previous_annotation = <Annotation>None
|
|
|
|
v_input = cv2.VideoCapture(<str>video_name)
|
|
if not v_input.isOpened():
|
|
constants_inf.logerror(<str>f'Failed to open video: {video_name}')
|
|
return
|
|
total_frames = int(v_input.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
fps = v_input.get(cv2.CAP_PROP_FPS)
|
|
width = int(v_input.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(v_input.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
constants_inf.log(<str>f'Video: {total_frames} frames, {fps:.1f} fps, {width}x{height}')
|
|
while v_input.isOpened() and not self.stop_signal:
|
|
ret, frame = v_input.read()
|
|
if not ret or frame is None:
|
|
break
|
|
|
|
frame_count += 1
|
|
if frame_count % ai_config.frame_period_recognition == 0:
|
|
batch_frames.append(frame)
|
|
batch_timestamps.append(<long>v_input.get(cv2.CAP_PROP_POS_MSEC))
|
|
|
|
if len(batch_frames) == self.engine.get_batch_size():
|
|
batch_count += 1
|
|
constants_inf.log(<str>f'Video batch {batch_count}: frame {frame_count}/{total_frames} ({frame_count*100//total_frames}%)')
|
|
input_blob = self.preprocess(batch_frames)
|
|
|
|
outputs = self.engine.run(input_blob)
|
|
|
|
list_detections = self.postprocess(outputs, ai_config)
|
|
total_dets = sum(len(d) for d in list_detections)
|
|
if total_dets > 0:
|
|
constants_inf.log(<str>f'Video batch {batch_count}: {total_dets} detections from postprocess')
|
|
for i in range(len(list_detections)):
|
|
detections = list_detections[i]
|
|
|
|
original_media_name = Path(<str>video_name).stem.replace(" ", "")
|
|
name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}'
|
|
annotation = Annotation(name, original_media_name, batch_timestamps[i], detections)
|
|
|
|
if detections:
|
|
valid = self.is_valid_video_annotation(annotation, ai_config)
|
|
constants_inf.log(<str>f'Video frame {name}: {len(detections)} dets, valid={valid}')
|
|
if valid:
|
|
_, image = cv2.imencode('.jpg', batch_frames[i])
|
|
annotation.image = image.tobytes()
|
|
self._previous_annotation = annotation
|
|
self.on_annotation(annotation, frame_count, total_frames)
|
|
else:
|
|
self.is_valid_video_annotation(annotation, ai_config)
|
|
|
|
batch_frames.clear()
|
|
batch_timestamps.clear()
|
|
v_input.release()
|
|
constants_inf.log(<str>f'Video done: {frame_count} frames read, {batch_count} batches processed')
|
|
self.send_detection_status()
|
|
|
|
cdef on_annotation(self, Annotation annotation, int frame_count=0, int total_frames=0):
|
|
self.detection_counts[annotation.original_media_name] = self.detection_counts.get(annotation.original_media_name, 0) + 1
|
|
if self._annotation_callback is not None:
|
|
percent = int(frame_count * 100 / total_frames) if total_frames > 0 else 0
|
|
cb = self._annotation_callback
|
|
cb(annotation, percent)
|
|
|
|
cdef _process_images(self, AIRecognitionConfig ai_config, list[str] image_paths):
|
|
cdef list frame_data
|
|
self._tile_detections = {}
|
|
for path in image_paths:
|
|
frame_data = []
|
|
frame = cv2.imread(<str>path)
|
|
img_h, img_w, _ = frame.shape
|
|
if frame is None:
|
|
constants_inf.logerror(<str>f'Failed to read image {path}')
|
|
continue
|
|
original_media_name = Path(<str> path).stem.replace(" ", "")
|
|
|
|
ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
|
|
constants_inf.log(<str>f'ground sampling distance: {ground_sampling_distance}')
|
|
|
|
if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width:
|
|
frame_data.append((frame, original_media_name, f'{original_media_name}_000000'))
|
|
else:
|
|
tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance)
|
|
constants_inf.log(<str> f'calc tile size: {tile_size}')
|
|
res = self.split_to_tiles(frame, path, tile_size, ai_config.big_image_tile_overlap_percent)
|
|
frame_data.extend(res)
|
|
if len(frame_data) > self.engine.get_batch_size():
|
|
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
|
|
self._process_images_inner(ai_config, chunk, ground_sampling_distance)
|
|
self.send_detection_status()
|
|
|
|
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
|
|
self._process_images_inner(ai_config, chunk, ground_sampling_distance)
|
|
self.send_detection_status()
|
|
|
|
cdef send_detection_status(self):
|
|
if self._status_callback is not None:
|
|
cb = self._status_callback
|
|
for media_name in self.detection_counts.keys():
|
|
cb(media_name, self.detection_counts[media_name])
|
|
self.detection_counts.clear()
|
|
|
|
cdef split_to_tiles(self, frame, path, tile_size, overlap_percent):
|
|
constants_inf.log(<str>f'splitting image {path} to tiles...')
|
|
img_h, img_w, _ = frame.shape
|
|
stride_w = int(tile_size * (1 - overlap_percent / 100))
|
|
stride_h = int(tile_size * (1 - overlap_percent / 100))
|
|
|
|
results = []
|
|
original_media_name = Path(<str> path).stem.replace(" ", "")
|
|
for y in range(0, img_h, stride_h):
|
|
for x in range(0, img_w, stride_w):
|
|
x_end = min(x + tile_size, img_w)
|
|
y_end = min(y + tile_size, img_h)
|
|
|
|
# correct x,y for the close-to-border tiles
|
|
if x_end - x < tile_size:
|
|
if img_w - (x - stride_w) <= tile_size:
|
|
continue # the previous tile already covered the last gap
|
|
x = img_w - tile_size
|
|
if y_end - y < tile_size:
|
|
if img_h - (y - stride_h) <= tile_size:
|
|
continue # the previous tile already covered the last gap
|
|
y = img_h - tile_size
|
|
|
|
tile = frame[y:y_end, x:x_end]
|
|
name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000'
|
|
results.append((tile, original_media_name, name))
|
|
return results
|
|
|
|
cdef _process_images_inner(self, AIRecognitionConfig ai_config, list frame_data, double ground_sampling_distance):
|
|
cdef list frames, original_media_names, names
|
|
cdef Annotation annotation
|
|
cdef int i
|
|
frames, original_media_names, names = map(list, zip(*frame_data))
|
|
|
|
input_blob = self.preprocess(frames)
|
|
outputs = self.engine.run(input_blob)
|
|
|
|
list_detections = self.postprocess(outputs, ai_config)
|
|
for i in range(len(list_detections)):
|
|
annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i])
|
|
if self.is_valid_image_annotation(annotation, ground_sampling_distance, frames[i].shape):
|
|
constants_inf.log(<str> f'Detected {annotation}')
|
|
_, image = cv2.imencode('.jpg', frames[i])
|
|
annotation.image = image.tobytes()
|
|
self.on_annotation(annotation)
|
|
|
|
cpdef stop(self):
|
|
self.stop_signal = True
|
|
|
|
cdef remove_tiled_duplicates(self, Annotation annotation):
|
|
right = annotation.name.rindex('!')
|
|
left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX)
|
|
tile_size_str, x_str, y_str = annotation.name[left:right].split('_')
|
|
tile_size = int(tile_size_str)
|
|
x = int(x_str)
|
|
y = int(y_str)
|
|
|
|
cdef list[Detection] unique_detections = []
|
|
|
|
existing_abs_detections = self._tile_detections.setdefault(annotation.original_media_name, [])
|
|
|
|
for det in annotation.detections:
|
|
x1 = det.x * tile_size
|
|
y1 = det.y * tile_size
|
|
det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence)
|
|
|
|
if det_abs not in existing_abs_detections:
|
|
unique_detections.append(det)
|
|
existing_abs_detections.append(det_abs)
|
|
|
|
annotation.detections = unique_detections
|
|
|
|
cdef bint is_valid_image_annotation(self, Annotation annotation, double ground_sampling_distance, frame_shape):
|
|
if constants_inf.SPLIT_SUFFIX in annotation.name:
|
|
self.remove_tiled_duplicates(annotation)
|
|
img_h, img_w, _ = frame_shape
|
|
if annotation.detections:
|
|
constants_inf.log(<str> f'Initial ann: {annotation}')
|
|
|
|
cdef list[Detection] valid_detections = []
|
|
for det in annotation.detections:
|
|
m_w = det.w * img_w * ground_sampling_distance
|
|
m_h = det.h * img_h * ground_sampling_distance
|
|
max_size = constants_inf.annotations_dict[det.cls].max_object_size_meters
|
|
|
|
if m_w <= max_size and m_h <= max_size:
|
|
valid_detections.append(det)
|
|
constants_inf.log(<str> f'Kept ({m_w} {m_h}) <= {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
|
|
else:
|
|
constants_inf.log(<str> f'Removed ({m_w} {m_h}) > {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
|
|
|
|
annotation.detections = valid_detections
|
|
|
|
if not annotation.detections:
|
|
return False
|
|
return True
|
|
|
|
cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config):
|
|
if constants_inf.SPLIT_SUFFIX in annotation.name:
|
|
self.remove_tiled_duplicates(annotation)
|
|
if not annotation.detections:
|
|
return False
|
|
|
|
if self._previous_annotation is None:
|
|
return True
|
|
|
|
if annotation.time >= self._previous_annotation.time + <long>(ai_config.frame_recognition_seconds * 1000):
|
|
return True
|
|
|
|
if len(annotation.detections) > len(self._previous_annotation.detections):
|
|
return True
|
|
|
|
cdef:
|
|
Detection current_det, prev_det
|
|
double dx, dy, distance_sq, min_distance_sq
|
|
Detection closest_det
|
|
|
|
for current_det in annotation.detections:
|
|
min_distance_sq = 1e18
|
|
closest_det = None
|
|
|
|
for prev_det in self._previous_annotation.detections:
|
|
dx = current_det.x - prev_det.x
|
|
dy = current_det.y - prev_det.y
|
|
distance_sq = dx * dx + dy * dy
|
|
|
|
if distance_sq < min_distance_sq:
|
|
min_distance_sq = distance_sq
|
|
closest_det = prev_det
|
|
|
|
dist_px = ai_config.tracking_distance_confidence * self.model_width
|
|
dist_px_sq = dist_px * dist_px
|
|
if min_distance_sq > dist_px_sq:
|
|
return True
|
|
|
|
if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase:
|
|
return True
|
|
|
|
return False
|