import io import threading import av import cv2 import numpy as np cimport constants_inf from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus from annotation cimport Detection, Annotation from ai_config cimport AIRecognitionConfig from engines.inference_engine cimport InferenceEngine from loader_http_client cimport LoaderHttpClient from threading import Thread from engines import engine_factory def ai_config_from_dict(dict data): return AIRecognitionConfig.from_dict(data) def _write_video_bytes_to_path(str path, bytes data, object done_event): try: with open(path, 'wb') as f: f.write(data) finally: done_event.set() cdef class Inference: cdef LoaderHttpClient loader_client cdef InferenceEngine engine cdef object _annotation_callback cdef object _status_callback cdef Annotation _previous_annotation cdef dict[str, list[Detection]] _tile_detections cdef dict[str, int] detection_counts cdef AIRecognitionConfig ai_config cdef bint stop_signal cdef public AIAvailabilityStatus ai_availability_status cdef str model_input cdef bytes _converted_model_bytes cdef bint is_building_engine def __init__(self, loader_client): self.loader_client = loader_client self._annotation_callback = None self._status_callback = None self.stop_signal = False self.model_input = None self.detection_counts = {} self.engine = None self.is_building_engine = False self.ai_availability_status = AIAvailabilityStatus() self._converted_model_bytes = None self.init_ai() @property def is_engine_ready(self): return self.engine is not None @property def engine_name(self): if self.engine is not None: return self.engine.engine_name return None cdef _build_engine_async(self, bytes source_bytes, str models_dir): try: self.ai_availability_status.set_status(AIAvailabilityEnum.CONVERTING) engine_bytes = engine_factory.build_and_cache(source_bytes, self.loader_client, models_dir) self._converted_model_bytes = engine_bytes self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) self._converted_model_bytes = None finally: self.is_building_engine = False cdef init_ai(self): constants_inf.log( 'init AI...') try: if self.engine is not None: return if self.is_building_engine: return if self._converted_model_bytes is not None: try: self.engine = engine_factory.create(self._converted_model_bytes) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) finally: self._converted_model_bytes = None return models_dir = constants_inf.MODELS_FOLDER self.ai_availability_status.set_status(AIAvailabilityEnum.DOWNLOADING) engine = engine_factory.load_engine(self.loader_client, models_dir) if engine is not None: self.engine = engine self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) return source_filename = engine_factory.get_source_filename() if source_filename is None: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, "No engine available and no source to build from") return source_bytes = engine_factory.load_source(self.loader_client, models_dir) if engine_factory.has_build_step: self.ai_availability_status.set_status(AIAvailabilityEnum.WARNING, "Cached engine not found, converting from source") self.is_building_engine = True thread = Thread(target=self._build_engine_async, args=(source_bytes, models_dir)) thread.daemon = True thread.start() else: self.engine = engine_factory.create(source_bytes) self.ai_availability_status.set_status(AIAvailabilityEnum.ENABLED) except Exception as e: self.ai_availability_status.set_status(AIAvailabilityEnum.ERROR, str(e)) self.is_building_engine = False cpdef run_detect_image(self, bytes image_bytes, AIRecognitionConfig ai_config, str media_name, object annotation_callback, object status_callback=None): cdef list all_frame_data = [] cdef str original_media_name self._annotation_callback = annotation_callback self._status_callback = status_callback self.stop_signal = False self.init_ai() if self.engine is None: constants_inf.log( "AI engine not available. Conversion may be in progress. Skipping inference.") return if not image_bytes: return frame = cv2.imdecode(np.frombuffer(image_bytes, dtype=np.uint8), cv2.IMREAD_COLOR) if frame is None: constants_inf.logerror('Failed to decode image bytes') return original_media_name = media_name.replace(" ", "") self.detection_counts = {} self.detection_counts[original_media_name] = 0 self._tile_detections = {} self._append_image_frame_entries(ai_config, all_frame_data, frame, original_media_name) self._finalize_image_inference(ai_config, all_frame_data) cpdef run_detect_video(self, bytes video_bytes, AIRecognitionConfig ai_config, str media_name, str save_path, object annotation_callback, object status_callback=None): cdef str original_media_name self._annotation_callback = annotation_callback self._status_callback = status_callback self.stop_signal = False self.init_ai() if self.engine is None: constants_inf.log( "AI engine not available. Conversion may be in progress. Skipping inference.") return if not video_bytes: return original_media_name = media_name.replace(" ", "") self.detection_counts = {} self.detection_counts[original_media_name] = 0 writer_done = threading.Event() wt = threading.Thread( target=_write_video_bytes_to_path, args=(save_path, video_bytes, writer_done), daemon=True, ) wt.start() try: bio = io.BytesIO(video_bytes) container = av.open(bio) try: self._process_video_pyav(ai_config, original_media_name, container) finally: container.close() finally: writer_done.wait() wt.join(timeout=3600) cpdef run_detect_video_stream(self, object readable, AIRecognitionConfig ai_config, str media_name, object annotation_callback, object status_callback=None): cdef str original_media_name self._annotation_callback = annotation_callback self._status_callback = status_callback self.stop_signal = False self.init_ai() if self.engine is None: constants_inf.log( "AI engine not available. Conversion may be in progress. Skipping inference.") return original_media_name = media_name.replace(" ", "") self.detection_counts = {} self.detection_counts[original_media_name] = 0 container = av.open(readable) try: self._process_video_pyav(ai_config, original_media_name, container) finally: container.close() cdef _process_video_pyav(self, AIRecognitionConfig ai_config, str original_media_name, object container): cdef int frame_count = 0 cdef int batch_count = 0 cdef list batch_frames = [] cdef list[long] batch_timestamps = [] cdef int model_h, model_w cdef int total_frames cdef int tf cdef double duration_sec cdef double fps self._previous_annotation = None model_h, model_w = self.engine.get_input_shape() streams = container.streams.video if not streams: constants_inf.logerror('No video stream in container') self.send_detection_status() return vstream = streams[0] total_frames = 0 if vstream.frames is not None and int(vstream.frames) > 0: total_frames = int(vstream.frames) else: duration_sec = 0.0 if vstream.duration is not None and vstream.time_base is not None: duration_sec = float(vstream.duration * vstream.time_base) fps = 25.0 if vstream.average_rate is not None: fps = float(vstream.average_rate) if duration_sec > 0: total_frames = int(duration_sec * fps) if total_frames < 1: total_frames = 1 tf = total_frames constants_inf.log(f'Video (PyAV): ~{tf} frames est, {vstream.width}x{vstream.height}') cdef int effective_batch = min(self.engine.max_batch_size, ai_config.model_batch_size) if effective_batch < 1: effective_batch = 1 for av_frame in container.decode(vstream): if self.stop_signal: break frame_count += 1 arr = av_frame.to_ndarray(format='bgr24') if frame_count % ai_config.frame_period_recognition == 0: ts_ms = 0 if av_frame.time is not None: ts_ms = int(av_frame.time * 1000) elif av_frame.pts is not None and vstream.time_base is not None: ts_ms = int(float(av_frame.pts) * float(vstream.time_base) * 1000) batch_frames.append(arr) batch_timestamps.append(ts_ms) if len(batch_frames) >= effective_batch: batch_count += 1 tf = total_frames if total_frames > 0 else max(frame_count, 1) constants_inf.log(f'Video batch {batch_count}: frame {frame_count}/{tf} ({frame_count*100//tf}%)') last_ts = batch_timestamps[len(batch_timestamps) - 1] if batch_timestamps else 0 self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, tf, model_w) if self._annotation_callback is not None: pann = Annotation(original_media_name, original_media_name, last_ts, []) cb = self._annotation_callback cb(pann, int(frame_count * 100 / tf)) batch_frames = [] batch_timestamps = [] if batch_frames: batch_count += 1 tf = total_frames if total_frames > 0 else max(frame_count, 1) constants_inf.log(f'Video batch {batch_count} (flush): {len(batch_frames)} remaining frames') last_ts = batch_timestamps[len(batch_timestamps) - 1] if batch_timestamps else 0 self._process_video_batch(ai_config, batch_frames, batch_timestamps, original_media_name, frame_count, tf, model_w) if self._annotation_callback is not None: pann = Annotation(original_media_name, original_media_name, last_ts, []) cb = self._annotation_callback cb(pann, 100) constants_inf.log(f'Video done: {frame_count} frames read, {batch_count} batches processed') self.send_detection_status() cdef _process_video_batch(self, AIRecognitionConfig ai_config, list batch_frames, list batch_timestamps, str original_media_name, int frame_count, int total_frames, int model_w): cdef Annotation annotation list_detections = self.engine.process_frames(batch_frames, ai_config) total_dets = sum(len(d) for d in list_detections) if total_dets > 0: constants_inf.log(f'Video batch: {total_dets} detections from postprocess') for i in range(len(list_detections)): detections = list_detections[i] name = f'{original_media_name}_{constants_inf.format_time(batch_timestamps[i])}' annotation = Annotation(name, original_media_name, batch_timestamps[i], detections) if detections: valid = self.is_valid_video_annotation(annotation, ai_config, model_w) constants_inf.log(f'Video frame {name}: {len(detections)} dets, valid={valid}') if valid: _, image = cv2.imencode('.jpg', batch_frames[i]) annotation.image = image.tobytes() self._previous_annotation = annotation self.on_annotation(annotation, frame_count, total_frames) else: self.is_valid_video_annotation(annotation, ai_config, model_w) cdef on_annotation(self, Annotation annotation, int frame_count=0, int total_frames=0): self.detection_counts[annotation.original_media_name] = self.detection_counts.get(annotation.original_media_name, 0) + 1 if self._annotation_callback is not None: percent = int(frame_count * 100 / total_frames) if total_frames > 0 else 0 cb = self._annotation_callback cb(annotation, percent) cdef _append_image_frame_entries(self, AIRecognitionConfig ai_config, list all_frame_data, frame, str original_media_name): cdef double ground_sampling_distance cdef int model_h, model_w cdef int img_h, img_w model_h, model_w = self.engine.get_input_shape() img_h, img_w, _ = frame.shape ground_sampling_distance = ai_config.sensor_width * ai_config.altitude / (ai_config.focal_length * img_w) constants_inf.log(f'ground sampling distance: {ground_sampling_distance}') if img_h <= 1.5 * model_h and img_w <= 1.5 * model_w: all_frame_data.append((frame, original_media_name, f'{original_media_name}_000000', ground_sampling_distance)) else: tile_size = int(constants_inf.METERS_IN_TILE / ground_sampling_distance) constants_inf.log( f'calc tile size: {tile_size}') res = self.split_to_tiles(frame, original_media_name, tile_size, ai_config.big_image_tile_overlap_percent) for tile_frame, omn, tile_name in res: all_frame_data.append((tile_frame, omn, tile_name, ground_sampling_distance)) cdef _finalize_image_inference(self, AIRecognitionConfig ai_config, list all_frame_data): if not all_frame_data: return frames = [fd[0] for fd in all_frame_data] all_dets = self.engine.process_frames(frames, ai_config) for i in range(len(all_dets)): frame_entry = all_frame_data[i] f = frame_entry[0] original_media_name = frame_entry[1] name = frame_entry[2] gsd = frame_entry[3] annotation = Annotation(name, original_media_name, 0, all_dets[i]) if self.is_valid_image_annotation(annotation, gsd, f.shape): constants_inf.log( f'Detected {annotation}') _, image = cv2.imencode('.jpg', f) annotation.image = image.tobytes() self.on_annotation(annotation) self.send_detection_status() cdef send_detection_status(self): if self._status_callback is not None: cb = self._status_callback for media_name in self.detection_counts.keys(): cb(media_name, self.detection_counts[media_name]) self.detection_counts.clear() cdef split_to_tiles(self, frame, str media_stem, tile_size, overlap_percent): constants_inf.log(f'splitting image {media_stem} to tiles...') img_h, img_w, _ = frame.shape stride_w = int(tile_size * (1 - overlap_percent / 100)) stride_h = int(tile_size * (1 - overlap_percent / 100)) results = [] original_media_name = media_stem for y in range(0, img_h, stride_h): for x in range(0, img_w, stride_w): x_end = min(x + tile_size, img_w) y_end = min(y + tile_size, img_h) if x_end - x < tile_size: if img_w - (x - stride_w) <= tile_size: continue x = img_w - tile_size if y_end - y < tile_size: if img_h - (y - stride_h) <= tile_size: continue y = img_h - tile_size tile = frame[y:y_end, x:x_end] name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000' results.append((tile, original_media_name, name)) return results cpdef stop(self): self.stop_signal = True cdef remove_tiled_duplicates(self, Annotation annotation): right = annotation.name.rindex('!') left = annotation.name.index(constants_inf.SPLIT_SUFFIX) + len(constants_inf.SPLIT_SUFFIX) tile_size_str, x_str, y_str = annotation.name[left:right].split('_') tile_size = int(tile_size_str) x = int(x_str) y = int(y_str) cdef list[Detection] unique_detections = [] existing_abs_detections = self._tile_detections.setdefault(annotation.original_media_name, []) for det in annotation.detections: x1 = det.x * tile_size y1 = det.y * tile_size det_abs = Detection(x + x1, y + y1, det.w * tile_size, det.h * tile_size, det.cls, det.confidence) if det_abs not in existing_abs_detections: unique_detections.append(det) existing_abs_detections.append(det_abs) annotation.detections = unique_detections cdef bint is_valid_image_annotation(self, Annotation annotation, double ground_sampling_distance, frame_shape): if constants_inf.SPLIT_SUFFIX in annotation.name: self.remove_tiled_duplicates(annotation) img_h, img_w, _ = frame_shape if annotation.detections: constants_inf.log( f'Initial ann: {annotation}') cdef list[Detection] valid_detections = [] for det in annotation.detections: m_w = det.w * img_w * ground_sampling_distance m_h = det.h * img_h * ground_sampling_distance max_size = constants_inf.annotations_dict[det.cls].max_object_size_meters if m_w <= max_size and m_h <= max_size: valid_detections.append(det) constants_inf.log( f'Kept ({m_w} {m_h}) <= {max_size}. class: {constants_inf.annotations_dict[det.cls].name}') else: constants_inf.log( f'Removed ({m_w} {m_h}) > {max_size}. class: {constants_inf.annotations_dict[det.cls].name}') annotation.detections = valid_detections if not annotation.detections: return False return True cdef bint is_valid_video_annotation(self, Annotation annotation, AIRecognitionConfig ai_config, int model_w): if constants_inf.SPLIT_SUFFIX in annotation.name: self.remove_tiled_duplicates(annotation) if not annotation.detections: return False if self._previous_annotation is None: return True if annotation.time >= self._previous_annotation.time + (ai_config.frame_recognition_seconds * 1000): return True if len(annotation.detections) > len(self._previous_annotation.detections): return True cdef: Detection current_det, prev_det double dx, dy, distance_sq, min_distance_sq Detection closest_det for current_det in annotation.detections: min_distance_sq = 1e18 closest_det = None for prev_det in self._previous_annotation.detections: dx = current_det.x - prev_det.x dy = current_det.y - prev_det.y distance_sq = dx * dx + dy * dy if distance_sq < min_distance_sq: min_distance_sq = distance_sq closest_det = prev_det dist_px = ai_config.tracking_distance_confidence * model_w dist_px_sq = dist_px * dist_px if min_distance_sq > dist_px_sq: return True if current_det.confidence >= closest_det.confidence + ai_config.tracking_probability_increase: return True return False