[AZ-173] [AZ-174] Stream-based detection API and DB-driven AI config

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-31 06:30:22 +03:00
parent 6547c5903a
commit 6c24d09eab
15 changed files with 562 additions and 105 deletions
+176 -37
View File
@@ -1,7 +1,11 @@
import io
import mimetypes
import threading
from pathlib import Path
import av
import cv2
import numpy as np
cimport constants_inf
from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus
@@ -13,6 +17,18 @@ from threading import Thread
from engines import EngineClass
def ai_config_from_dict(dict data):
return AIRecognitionConfig.from_dict(data)
def _write_video_bytes_to_path(str path, bytes data, object done_event):
try:
with open(path, 'wb') as f:
f.write(data)
finally:
done_event.set()
cdef class Inference:
cdef LoaderHttpClient loader_client
cdef InferenceEngine engine
@@ -135,6 +151,7 @@ cdef class Inference:
cpdef run_detect(self, dict config_dict, object annotation_callback, object status_callback=None):
cdef list[str] videos = []
cdef list[str] images = []
cdef object media_paths = config_dict.get("paths", [])
cdef AIRecognitionConfig ai_config = AIRecognitionConfig.from_dict(config_dict)
if ai_config is None:
raise Exception('ai recognition config is empty')
@@ -148,7 +165,7 @@ cdef class Inference:
return
self.detection_counts = {}
for p in ai_config.paths:
for p in media_paths:
media_name = Path(<str>p).stem.replace(" ", "")
self.detection_counts[media_name] = 0
if self.is_video(p):
@@ -163,22 +180,147 @@ cdef class Inference:
constants_inf.log(<str>f'run inference on {v}...')
self._process_video(ai_config, v)
cpdef run_detect_image(self, bytes image_bytes, AIRecognitionConfig ai_config, str media_name,
object annotation_callback, object status_callback=None):
cdef list all_frame_data = []
cdef str original_media_name
self._annotation_callback = annotation_callback
self._status_callback = status_callback
self.stop_signal = <bint>False
self.init_ai()
if self.engine is None:
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
return
if not image_bytes:
return
frame = cv2.imdecode(np.frombuffer(image_bytes, dtype=np.uint8), cv2.IMREAD_COLOR)
if frame is None:
constants_inf.logerror(<str>'Failed to decode image bytes')
return
original_media_name = media_name.replace(" ", "")
self.detection_counts = {}
self.detection_counts[original_media_name] = 0
self._tile_detections = {}
self._append_image_frame_entries(ai_config, all_frame_data, frame, original_media_name)
self._finalize_image_inference(ai_config, all_frame_data)
cpdef run_detect_video(self, bytes video_bytes, AIRecognitionConfig ai_config, str media_name, str save_path,
object annotation_callback, object status_callback=None):
cdef str original_media_name
self._annotation_callback = annotation_callback
self._status_callback = status_callback
self.stop_signal = <bint>False
self.init_ai()
if self.engine is None:
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
return
if not video_bytes:
return
original_media_name = media_name.replace(" ", "")
self.detection_counts = {}
self.detection_counts[original_media_name] = 0
writer_done = threading.Event()
wt = threading.Thread(
target=_write_video_bytes_to_path,
args=(save_path, video_bytes, writer_done),
daemon=True,
)
wt.start()
try:
bio = io.BytesIO(video_bytes)
container = av.open(bio)
try:
self._process_video_pyav(ai_config, original_media_name, container)
finally:
container.close()
finally:
writer_done.wait()
wt.join(timeout=3600)
cdef _process_video_pyav(self, AIRecognitionConfig ai_config, str original_media_name, object container):
cdef int frame_count = 0
cdef int batch_count = 0
cdef list batch_frames = []
cdef list[long] batch_timestamps = []
cdef int model_h, model_w
cdef int total_frames
cdef int tf
cdef double duration_sec
cdef double fps
self._previous_annotation = <Annotation>None
model_h, model_w = self.engine.get_input_shape()
streams = container.streams.video
if not streams:
constants_inf.logerror(<str>'No video stream in container')
self.send_detection_status()
return
vstream = streams[0]
total_frames = 0
if vstream.frames is not None and int(vstream.frames) > 0:
total_frames = int(vstream.frames)
else:
duration_sec = 0.0
if vstream.duration is not None and vstream.time_base is not None:
duration_sec = float(vstream.duration * vstream.time_base)
fps = 25.0
if vstream.average_rate is not None:
fps = float(vstream.average_rate)
if duration_sec > 0:
total_frames = int(duration_sec * fps)
if total_frames < 1:
total_frames = 1
tf = total_frames
constants_inf.log(<str>f'Video (PyAV): ~{tf} frames est, {vstream.width}x{vstream.height}')
cdef int effective_batch = min(self.engine.max_batch_size, ai_config.model_batch_size)
if effective_batch < 1:
effective_batch = 1
for av_frame in container.decode(vstream):
if self.stop_signal:
break
frame_count += 1
arr = av_frame.to_ndarray(format='bgr24')
if frame_count % ai_config.frame_period_recognition == 0:
ts_ms = 0
if av_frame.time is not None:
ts_ms = int(av_frame.time * 1000)
elif av_frame.pts is not None and vstream.time_base is not None:
ts_ms = int(float(av_frame.pts) * float(vstream.time_base) * 1000)
batch_frames.append(arr)
batch_timestamps.append(<long>ts_ms)
if len(batch_frames) >= effective_batch:
batch_count += 1
tf = total_frames if total_frames > 0 else max(frame_count, 1)
constants_inf.log(<str>f'Video batch {batch_count}: frame {frame_count}/{tf} ({frame_count*100//tf}%)')
self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, tf, model_w)
batch_frames = []
batch_timestamps = []
if batch_frames:
batch_count += 1
tf = total_frames if total_frames > 0 else max(frame_count, 1)
constants_inf.log(<str>f'Video batch {batch_count} (flush): {len(batch_frames)} remaining frames')
self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, tf, model_w)
constants_inf.log(<str>f'Video done: {frame_count} frames read, {batch_count} batches processed')
self.send_detection_status()
cdef _process_video(self, AIRecognitionConfig ai_config, str video_name):
cdef int frame_count = 0
cdef int batch_count = 0
cdef list batch_frames = []
cdef list[long] batch_timestamps = []
cdef Annotation annotation
cdef int model_h, model_w
cdef str original_media_name
self._previous_annotation = <Annotation>None
model_h, model_w = self.engine.get_input_shape()
original_media_name = Path(<str>video_name).stem.replace(" ", "")
v_input = cv2.VideoCapture(<str>video_name)
if not v_input.isOpened():
constants_inf.logerror(<str>f'Failed to open video: {video_name}')
return
total_frames = int(v_input.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames < 1:
total_frames = 1
fps = v_input.get(cv2.CAP_PROP_FPS)
width = int(v_input.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(v_input.get(cv2.CAP_PROP_FRAME_HEIGHT))
@@ -201,21 +343,21 @@ cdef class Inference:
if len(batch_frames) >= effective_batch:
batch_count += 1
constants_inf.log(<str>f'Video batch {batch_count}: frame {frame_count}/{total_frames} ({frame_count*100//total_frames}%)')
self._process_video_batch(ai_config, batch_frames, batch_timestamps, video_name, frame_count, total_frames, model_w)
self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, total_frames, model_w)
batch_frames = []
batch_timestamps = []
if batch_frames:
batch_count += 1
constants_inf.log(<str>f'Video batch {batch_count} (flush): {len(batch_frames)} remaining frames')
self._process_video_batch(ai_config, batch_frames, batch_timestamps, video_name, frame_count, total_frames, model_w)
self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, total_frames, model_w)
v_input.release()
constants_inf.log(<str>f'Video done: {frame_count} frames read, {batch_count} batches processed')
self.send_detection_status()
cdef _process_video_batch(self, AIRecognitionConfig ai_config, list batch_frames,
list batch_timestamps, str video_name,
list batch_timestamps, str original_media_name,
int frame_count, int total_frames, int model_w):
cdef Annotation annotation
list_detections = self.engine.process_frames(batch_frames, ai_config)
@@ -225,7 +367,6 @@ cdef class Inference:
for i in range(len(list_detections)):
detections = list_detections[i]
original_media_name = Path(<str>video_name).stem.replace(" ", "")
name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}'
annotation = Annotation(name, original_media_name, batch_timestamps[i], detections)
@@ -247,56 +388,54 @@ cdef class Inference:
cb = self._annotation_callback
cb(annotation, percent)
cdef _process_images(self, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list all_frame_data = []
cdef _append_image_frame_entries(self, AIRecognitionConfig ai_config, list all_frame_data, frame, str original_media_name):
cdef double ground_sampling_distance
cdef int model_h, model_w
cdef int img_h, img_w
model_h, model_w = self.engine.get_input_shape()
self._tile_detections = {}
for path in image_paths:
frame = cv2.imread(<str>path)
if frame is None:
constants_inf.logerror(<str>f'Failed to read image {path}')
continue
img_h, img_w, _ = frame.shape
original_media_name = Path(<str> path).stem.replace(" ", "")
ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
constants_inf.log(<str>f'ground sampling distance: {ground_sampling_distance}')
if img_h <= 1.5 * model_h and img_w <= 1.5 * model_w:
all_frame_data.append((frame, original_media_name, f'{original_media_name}_000000', ground_sampling_distance))
else:
tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance)
constants_inf.log(<str> f'calc tile size: {tile_size}')
res = self.split_to_tiles(frame, path, tile_size, ai_config.big_image_tile_overlap_percent)
for tile_frame, omn, tile_name in res:
all_frame_data.append((tile_frame, omn, tile_name, ground_sampling_distance))
img_h, img_w, _ = frame.shape
ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w)
constants_inf.log(<str>f'ground sampling distance: {ground_sampling_distance}')
if img_h <= 1.5 * model_h and img_w <= 1.5 * model_w:
all_frame_data.append((frame, original_media_name, f'{original_media_name}_000000', ground_sampling_distance))
else:
tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance)
constants_inf.log(<str> f'calc tile size: {tile_size}')
res = self.split_to_tiles(frame, original_media_name, tile_size, ai_config.big_image_tile_overlap_percent)
for tile_frame, omn, tile_name in res:
all_frame_data.append((tile_frame, omn, tile_name, ground_sampling_distance))
cdef _finalize_image_inference(self, AIRecognitionConfig ai_config, list all_frame_data):
if not all_frame_data:
return
frames = [fd[0] for fd in all_frame_data]
all_dets = self.engine.process_frames(frames, ai_config)
for i in range(len(all_dets)):
frame_entry = all_frame_data[i]
f = frame_entry[0]
original_media_name = frame_entry[1]
name = frame_entry[2]
gsd = frame_entry[3]
annotation = Annotation(name, original_media_name, 0, all_dets[i])
if self.is_valid_image_annotation(annotation, gsd, f.shape):
constants_inf.log(<str> f'Detected {annotation}')
_, image = cv2.imencode('.jpg', f)
annotation.image = image.tobytes()
self.on_annotation(annotation)
self.send_detection_status()
cdef _process_images(self, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list all_frame_data = []
self._tile_detections = {}
for path in image_paths:
frame = cv2.imread(<str>path)
if frame is None:
constants_inf.logerror(<str>f'Failed to read image {path}')
continue
original_media_name = Path(<str> path).stem.replace(" ", "")
self._append_image_frame_entries(ai_config, all_frame_data, frame, original_media_name)
self._finalize_image_inference(ai_config, all_frame_data)
cdef send_detection_status(self):
if self._status_callback is not None:
cb = self._status_callback
@@ -304,14 +443,14 @@ cdef class Inference:
cb(media_name, self.detection_counts[media_name])
self.detection_counts.clear()
cdef split_to_tiles(self, frame, path, tile_size, overlap_percent):
constants_inf.log(<str>f'splitting image {path} to tiles...')
cdef split_to_tiles(self, frame, str media_stem, tile_size, overlap_percent):
constants_inf.log(<str>f'splitting image {media_stem} to tiles...')
img_h, img_w, _ = frame.shape
stride_w = int(tile_size * (1 - overlap_percent / 100))
stride_h = int(tile_size * (1 - overlap_percent / 100))
results = []
original_media_name = Path(<str> path).stem.replace(" ", "")
original_media_name = media_stem
for y in range(0, img_h, stride_h):
for x in range(0, img_w, stride_w):
x_end = min(x + tile_size, img_w)