from engines.inference_engine cimport InferenceEngine cimport constants_inf import numpy as np from PIL import Image import io import os import tempfile import zipfile cdef class CoreMLEngine(InferenceEngine): def __init__(self, model_bytes: bytes, batch_size: int = 1, **kwargs): super().__init__(model_bytes, batch_size) import coremltools as ct model_path = kwargs.get('model_path') if model_path is None: model_path = self._extract_from_zip(model_bytes) self.model = ct.models.MLModel( model_path, compute_units=ct.ComputeUnit.ALL) spec = self.model.get_spec() img_input = spec.description.input[0] self.img_width = int(img_input.type.imageType.width) self.img_height = int(img_input.type.imageType.height) self.batch_size = 1 constants_inf.log(f'CoreML model: {self.img_width}x{self.img_height}') self.engine_name = "coreml" @staticmethod def get_engine_filename(): return "azaion_coreml.zip" @staticmethod def _extract_from_zip(model_bytes): tmpdir = tempfile.mkdtemp() buf = io.BytesIO(model_bytes) with zipfile.ZipFile(buf, 'r') as zf: zf.extractall(tmpdir) for item in os.listdir(tmpdir): if item.endswith('.mlpackage') or item.endswith('.mlmodel'): return os.path.join(tmpdir, item) raise ValueError("No .mlpackage or .mlmodel found in zip") cdef tuple get_input_shape(self): return (self.img_height, self.img_width) cdef int get_batch_size(self): return 1 cdef run(self, input_data): cdef int w = self.img_width cdef int h = self.img_height blob = input_data[0] img_array = np.clip(blob * 255.0, 0, 255).astype(np.uint8) img_array = np.transpose(img_array, (1, 2, 0)) pil_img = Image.fromarray(img_array, 'RGB') pred = self.model.predict({ 'image': pil_img, 'iouThreshold': 0.45, 'confidenceThreshold': 0.25, }) coords = pred.get('coordinates', np.empty((0, 4), dtype=np.float32)) confs = pred.get('confidence', np.empty((0, 80), dtype=np.float32)) if coords.size == 0: return [np.zeros((1, 0, 6), dtype=np.float32)] cx, cy, bw, bh = coords[:, 0], coords[:, 1], coords[:, 2], coords[:, 3] x1 = (cx - bw / 2) * w y1 = (cy - bh / 2) * h x2 = (cx + bw / 2) * w y2 = (cy + bh / 2) * h class_ids = np.argmax(confs, axis=1).astype(np.float32) conf_values = np.max(confs, axis=1) dets = np.stack([x1, y1, x2, y2, conf_values, class_ids], axis=1) return [dets[np.newaxis, :, :]]