add altitude + camera spec component and calc tile size by this

also restrict detections to be no bigger than in classes.json
This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-09-23 01:48:10 +03:00
parent b0e4b467c1
commit fde9a9f418
36 changed files with 715 additions and 222 deletions
+45 -11
View File
@@ -315,18 +315,24 @@ cdef class Inference:
constants_inf.logerror(<str>f'Failed to read image {path}')
continue
original_media_name = Path(<str> path).stem.replace(" ", "")
ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
constants_inf.log(<str>f'ground sampling distance: {ground_sampling_distance}')
if img_h <= 1.5 * self.model_height and img_w <= 1.5 * self.model_width:
frame_data.append((frame, original_media_name, f'{original_media_name}_000000'))
else:
res = self.split_to_tiles(frame, path, ai_config.tile_size, ai_config.big_image_tile_overlap_percent)
tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance)
constants_inf.log(<str> f'calc tile size: {tile_size}')
res = self.split_to_tiles(frame, path, tile_size, ai_config.big_image_tile_overlap_percent)
frame_data.extend(res)
if len(frame_data) > self.engine.get_batch_size():
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk)
self._process_images_inner(cmd, ai_config, chunk, ground_sampling_distance)
self.send_detection_status(cmd.client_id)
for chunk in self.split_list_extend(frame_data, self.engine.get_batch_size()):
self._process_images_inner(cmd, ai_config, chunk)
self._process_images_inner(cmd, ai_config, chunk, ground_sampling_distance)
self.send_detection_status(cmd.client_id)
cdef send_detection_status(self, client_id):
@@ -369,7 +375,7 @@ cdef class Inference:
results.append((tile, original_media_name, name))
return results
cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data):
cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data, double ground_sampling_distance):
cdef list frames, original_media_names, names
cdef Annotation annotation
cdef int i
@@ -381,7 +387,7 @@ cdef class Inference:
list_detections = self.postprocess(outputs, ai_config)
for i in range(len(list_detections)):
annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i])
if self.is_valid_image_annotation(annotation):
if self.is_valid_image_annotation(annotation, ground_sampling_distance, frames[i].shape):
constants_inf.log(<str> f'Detected {annotation}')
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
@@ -391,6 +397,7 @@ cdef class Inference:
self.stop_signal = True
cdef remove_tiled_duplicates(self, Annotation annotation):
# Parse tile info from the annotation name
right = annotation.name.rindex('!')
left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX)
tile_size_str, x_str, y_str = annotation.name[left:right].split('_')
@@ -398,19 +405,46 @@ cdef class Inference:
x = int(x_str)
y = int(y_str)
# This will be our new, filtered list of detections
cdef list[Detection] unique_detections = []
existing_abs_detections = self._tile_detections.setdefault(annotation.original_media_name, [])
for det in annotation.detections:
# Calculate the absolute position and size of the detection
x1 = det.x * tile_size
y1 = det.y * tile_size
det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence)
detections = self._tile_detections.setdefault(annotation.original_media_name, [])
if det_abs in detections:
annotation.detections.remove(det)
else:
detections.append(det_abs)
cdef bint is_valid_image_annotation(self, Annotation annotation):
# If it's not a duplicate, keep it and update the cache
if det_abs not in existing_abs_detections:
unique_detections.append(det)
existing_abs_detections.append(det_abs)
annotation.detections = unique_detections
cdef bint is_valid_image_annotation(self, Annotation annotation, double ground_sampling_distance, frame_shape):
if constants_inf.SPLIT_SUFFIX in annotation.name:
self.remove_tiled_duplicates(annotation)
img_h, img_w, _ = frame_shape
if annotation.detections:
constants_inf.log(<str> f'Initial ann: {annotation}')
cdef list[Detection] valid_detections = []
for det in annotation.detections:
m_w = det.w * img_w * ground_sampling_distance
m_h = det.h * img_h * ground_sampling_distance
max_size = constants_inf.annotations_dict[det.cls].max_object_size_meters
if m_w <= max_size and m_h <= max_size:
valid_detections.append(det)
constants_inf.log(<str> f'Kept ({m_w} {m_h}) <= {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
else:
constants_inf.log(<str> f'Removed ({m_w} {m_h}) > {max_size}. class: {constants_inf.annotations_dict[det.cls].name}')
# Replace the old list with the new, filtered one
annotation.detections = valid_detections
if not annotation.detections:
return False
return True