Files
annotations/Azaion.AI/secure_model.pyx
T
Alex Bezdieniezhnykh 62623b7123 add ramdisk, load AI model to ramdisk and start recognition from it
rewrite zmq to DEALER and ROUTER
add GET_USER command to get CurrentUser from Python
all auth is on the python side
inference run and validate annotations on python
2025-01-29 17:45:26 +02:00

105 lines
3.8 KiB
Cython

import os
import platform
import tempfile
from pathlib import Path
from libc.stdio cimport FILE, fopen, fclose, remove
from libc.stdlib cimport free
from libc.string cimport strdup
cdef class SecureModelLoader:
def __cinit__(self, int disk_size_mb=512):
self._disk_size_mb = disk_size_mb
self._ramdisk_path = None
self._temp_file_path = None
cpdef str load_model(self, bytes model_bytes):
"""Public method to load YOLO model securely."""
self._model_bytes = model_bytes
self._create_ramdisk()
self._store_model()
return self._temp_file_path
cdef str _get_ramdisk_path(self):
"""Determine the RAM disk path based on the OS."""
if platform.system() == "Windows":
return "R:\\"
elif platform.system() == "Linux":
return "/mnt/ramdisk"
elif platform.system() == "Darwin":
return "/Volumes/RAMDisk"
else:
raise RuntimeError("Unsupported OS for RAM disk")
cdef void _create_ramdisk(self):
"""Create a RAM disk securely based on the OS."""
system = platform.system()
if system == "Windows":
# Create RAM disk via PowerShell
command = f'powershell -Command "subst R: {tempfile.gettempdir()}"'
if os.system(command) != 0:
raise RuntimeError("Failed to create RAM disk on Windows")
self._ramdisk_path = "R:\\"
elif system == "Linux":
# Use tmpfs for RAM disk
self._ramdisk_path = "/mnt/ramdisk"
if not Path(self._ramdisk_path).exists():
os.mkdir(self._ramdisk_path)
if os.system(f"mount -t tmpfs -o size={self._disk_size_mb}M tmpfs {self._ramdisk_path}") != 0:
raise RuntimeError("Failed to create RAM disk on Linux")
elif system == "Darwin":
# Use hdiutil for macOS RAM disk
block_size = 2048 # 512-byte blocks * 2048 = 1MB
num_blocks = self._disk_size_mb * block_size
result = os.popen(f"hdiutil attach -nomount ram://{num_blocks}").read().strip()
if result:
self._ramdisk_path = "/Volumes/RAMDisk"
os.system(f"diskutil eraseVolume HFS+ RAMDisk {result}")
else:
raise RuntimeError("Failed to create RAM disk on macOS")
cdef void _store_model(self):
"""Write model securely to the RAM disk."""
cdef char* temp_path
cdef FILE* cfile
with tempfile.NamedTemporaryFile(
dir=self._ramdisk_path, suffix='.pt', delete=False
) as tmp_file:
tmp_file.write(self._model_bytes)
self._temp_file_path = tmp_file.name
encoded_path = self._temp_file_path.encode('utf-8')
temp_path = strdup(encoded_path)
with nogil:
cfile = fopen(temp_path, "rb")
if cfile == NULL:
raise IOError(f"Could not open {self._temp_file_path}")
fclose(cfile)
cdef void _cleanup(self):
"""Remove the model file and unmount RAM disk securely."""
cdef char* c_path
if self._temp_file_path:
c_path = strdup(os.fsencode(self._temp_file_path))
with nogil:
remove(c_path)
free(c_path)
self._temp_file_path = None
# Unmount RAM disk based on OS
if self._ramdisk_path:
if platform.system() == "Windows":
os.system("subst R: /D")
elif platform.system() == "Linux":
os.system(f"umount {self._ramdisk_path}")
elif platform.system() == "Darwin":
os.system("hdiutil detach /Volumes/RAMDisk")
self._ramdisk_path = None
def __dealloc__(self):
"""Ensure cleanup when the object is deleted."""
self._cleanup()