diff --git a/.gitignore b/.gitignore index 566dd17..d52b05a 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ obj *.DotSettings* *.user log*.txt -secured-config \ No newline at end of file +secured-config +build +venv \ No newline at end of file diff --git a/Azaion.AI/README.md b/Azaion.AI/README.md new file mode 100644 index 0000000..635eee7 --- /dev/null +++ b/Azaion.AI/README.md @@ -0,0 +1,64 @@ +

Azaion AI

+ +

+ Azaion AI is a worker written on cython (c-compilable python) which listens to socket and rabbit queue. +It accepts commands om a format: + + - CommandType: Inference / Load + - Filename + +And correspondingly do inference or just load encrypted file from the API. +Results (file or annotations) is putted to the other queue, or the same socket, depending on the command source. +

+ +

Installation

+ +

Install libs

+https://www.python.org/downloads/ + +Windows + +- [Install CUDA](https://developer.nvidia.com/cuda-12-1-0-download-archive) + +Linux +* ``` + sudo apt install nvidia-driver-535 + + wget https://developer.download.nvidia.com/compute/cudnn/9.2.0/local_installers/cudnn-local-repo-ubuntu2204-9.2.0_1.0-1_amd64.deb + sudo dpkg -i cudnn-local-repo-ubuntu2204-9.2.0_1.0-1_amd64.deb + + sudo cp /var/cudnn-local-repo-ubuntu2204-9.2.0/cudnn-*-keyring.gpg /usr/share/keyrings/ + sudo apt-get update + sudo apt-get -y install cudnn nvidia-cuda-toolkit -y + nvcc --version + ``` + +

Install dependencies

+``` +Make sure that your virtual env is installed with links to the global python packages and headers, like this: +python -m venv --system-site-packages venv +This is crucial for the Build because build needs Python.h header and other files. + +python -m pip install --upgrade pip +pip install --upgrade huggingface_hub +pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 + +pip install git+https://github.com/airockchip/ultralytics_yolov8.git +- or +pip install ultralytics + +pip install cython +pip uninstall -y opencv-python +pip install opencv-python +pip install msgpack + +``` +* fbgemm.dll error (Windows specific) + ``` + copypaste libomp140.x86_64.dll to C:\Windows\System32 + ``` + +

Build

+``` +python setup.py build_ext --inplace +``` \ No newline at end of file diff --git a/Azaion.AI/api_client.pyx b/Azaion.AI/api_client.pyx new file mode 100644 index 0000000..1a59061 --- /dev/null +++ b/Azaion.AI/api_client.pyx @@ -0,0 +1,71 @@ +# cython: language_level=3 +import io +import os +from http import HTTPStatus + +import requests +import constants +from hardware_service import HardwareService +from processor_command import FileCommand, CommandType +from security import Security + +cdef class ApiClient: + """Handles API authentication and downloading of the AI model.""" + cdef str email + cdef str password + cdef str token + cdef str folder + + def __init__(self, str email, str password, str folder): + self.email = email + self.password = password + self.folder = folder + if os.path.exists(constants.TOKEN_FILE): + with open(constants.TOKEN_FILE, "r") as file: + self.token = file.read().strip() + else: + self.token = None + + cdef get_encryption_key(self, command: FileCommand, str hardware_hash): + return f'{self.email}-{self.password}-{hardware_hash}-#%@AzaionKey@%#---' + + cdef login(self, str email, str password, persist_token:bool=False): + response = requests.post(f"{constants.API_URL}/login", json={"email": email, "password": password}) + response.raise_for_status() + self.token = response.json()["token"] + + if persist_token: + with open(constants.TOKEN_FILE, 'w') as file: + file.write(self.token) + + + cdef bytes load_file(self, command: FileCommand, persist_token:bool=False): + hardware_service = HardwareService() + hardware = hardware_service.get_hardware_info() + + if self.token is None: + self.login(self.email, self.password, persist_token) + + url = f"{constants.API_URL}/resources/get/{self.folder}" + headers = {"Authorization": f"Bearer {self.token}"} + payload = { + "password": self.password, + "hardware": hardware, + "fileName": command.filename + } + + response = requests.post(url, json=payload, headers=headers, stream=True) + if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN: + self.login(self.email, self.password, persist_token) + response = requests.post(url, json=payload, headers=headers, stream=True) + + key = self.get_encryption_key(command, hardware.hash) + encrypted_stream = io.BytesIO(response.content) + decrypted_stream = io.BytesIO() + Security.decrypt_to(encrypted_stream, decrypted_stream, key) + return decrypted_stream + + cdef bytes load_ai_model(self): + file_command = FileCommand(CommandType.LOAD, constants.AI_MODEL_FILE) + return self.load_file(file_command, True) + diff --git a/Azaion.AI/constants.pyx b/Azaion.AI/constants.pyx new file mode 100644 index 0000000..d020ef8 --- /dev/null +++ b/Azaion.AI/constants.pyx @@ -0,0 +1,17 @@ +# cython: language_level=3 + +cdef str SOCKET_HOST = "127.0.0.1" # Host for the socket server +cdef int SOCKET_PORT = 9127 # Port for the socket server +cdef int SOCKET_BUFFER_SIZE = 4096 # Buffer size for socket communication + +cdef int QUEUE_MAXSIZE = 1000 # Maximum size of the command queue +cdef str COMMANDS_QUEUE = "azaion-commands" +cdef str ANNOTATIONS_QUEUE = "azaion-annotations" + + +cdef str API_URL = "https://api.azaion.com" # Base URL for the external API +cdef str TOKEN_FILE = "token" + + +cdef str QUEUE_CONFIG_FILENAME = "secured-config.json" +cdef str AI_MODEL_FILE = "azaion.pt" \ No newline at end of file diff --git a/Azaion.AI/hardware_service.pyx b/Azaion.AI/hardware_service.pyx new file mode 100644 index 0000000..c069875 --- /dev/null +++ b/Azaion.AI/hardware_service.pyx @@ -0,0 +1,73 @@ +# cython: language_level=3 +import base64 +import subprocess +from hashlib import sha384 + +cdef class HardwareInfo: + cdef str cpu + cdef str gpu + cdef str memory + cdef str mac_address + cdef str hash + + def __init__(self, str cpu, str gpu, str memory, str mac_address, str hw_hash): + self.cpu = cpu + self.gpu = gpu + self.memory = memory + self.mac_address = mac_address + self.hash = hw_hash + + def __str__(self): + return f'CPU: {self.cpu}. GPU: {self.gpu}. Memory: {self.memory}. MAC Address: {self.mac_address}' + +cdef class HardwareService: + """Handles hardware information retrieval and hash generation.""" + + def __init__(self): + try: + if subprocess.check_output("ver", shell=True).decode('utf-8').startswith("Microsoft"): + self.is_windows = True + else: + self.is_windows = False + except Exception: + self.is_windows = False + + cdef HardwareInfo get_hardware_info(self): + if self.is_windows: + os_command = ( + "wmic CPU get Name /Value && " + "wmic path Win32_VideoController get Name /Value && " + "wmic OS get TotalVisibleMemorySize /Value" + ) + else: + os_command = ( + "/bin/bash -c \" lscpu | grep 'Model name:' | cut -d':' -f2 && " + "lspci | grep VGA | cut -d':' -f3 && " + "free -g | grep Mem: | awk '{print $2}' && \"" + ) + # in case of subprocess error do: + # cdef bytes os_command_bytes = os_command.encode('utf-8') + # and use os_command_bytes + result = subprocess.check_output(os_command, shell=True).decode('utf-8') + lines = [line.strip() for line in result.splitlines() if line.strip()] + + cdef str cpu = lines[0].replace("Name=", "").replace(" ", " ") + cdef str gpu = lines[1].replace("Name=", "").replace(" ", " ") + cdef str memory = lines[2].replace("TotalVisibleMemorySize=", "").replace(" ", " ") + + # Get MAC address + if self.is_windows: + mac_cmd = "getmac" + else: + mac_cmd = "cat /sys/class/net/*/address" + cdef str mac_address = subprocess.check_output(mac_cmd, shell=True, text=True).splitlines()[0].strip() + + cdef str full_hw_str = f'Azaion_{mac_address}_{cpu}_{gpu}' + hw_hash = self.calc_hash(full_hw_str) + return HardwareInfo(cpu, gpu, memory, mac_address, hw_hash) + + cdef str calc_hash(self, str s): + str_bytes = s.encode('utf-8') + hash_bytes = sha384(str_bytes).digest() + cdef str h = base64.b64encode(hash_bytes).decode('utf-8') + return h \ No newline at end of file diff --git a/Azaion.AI/inference.pyx b/Azaion.AI/inference.pyx new file mode 100644 index 0000000..3352520 --- /dev/null +++ b/Azaion.AI/inference.pyx @@ -0,0 +1,90 @@ +# cython: language_level=3 +from ultralytics import YOLO +import mimetypes +import cv2 +from ultralytics.engine.results import Boxes +from processor_command import FileCommand + +cdef class Inference: + """Handles YOLO inference using the AI model.""" + + def __init__(self, model_bytes, on_annotations): + self.model = YOLO(model_bytes) + self.on_annotations = on_annotations + + cdef bint is_video(self, str filepath): + mime_type, _ = mimetypes.guess_type(filepath) + return mime_type and mime_type.startswith("video") + + cdef run_inference(self, cmd: FileCommand, int batch_size=8, int frame_skip=4): + if self.is_video(cmd.filename): + return self._process_video(cmd, batch_size, frame_skip) + else: + return self._process_image(cmd) + + cdef _process_video(self, cmd: FileCommand, int batch_size, int frame_skip): + frame_count = 0 + batch_frame = [] + annotations = [] + v_input = cv2.VideoCapture(cmd.filename) + + while v_input.isOpened(): + ret, frame = v_input.read() + ms = v_input.get(cv2.CAP_PROP_POS_MSEC) + if not ret or frame is None: + break + + frame_count += 1 + if frame_count % frame_skip == 0: + batch_frame.append((frame, ms)) + + if len(batch_frame) == batch_size: + frames = list(map(lambda x: x[0], batch_frame)) + results = self.model.track(frames, persist=True) + + for frame, res in zip(batch_frame, results): + annotation = self.process_detections(int(frame[1]), frame[0], res.boxes) + if len(annotation.detections) > 0: + annotations.append(annotation) + self.on_annotations(cmd, annotations) + batch_frame.clear() + + v_input.release() + + cdef _process_image(self, cmd: FileCommand): + frame = cv2.imread(cmd.filename) + res = self.model.track(frame) + annotation = self.process_detections(0, frame, res[0].boxes) + self.on_annotations(cmd, [annotation]) + + cdef process_detections(self, float time, frame, boxes: Boxes): + detections = [] + for box in boxes: + b = box.xywhn[0].cpu().numpy() + cls = int(box.cls[0].cpu().numpy().item()) + detections.append(Detection(b[0], b[1], b[2], b[3], cls)) + _, encoded_image = cv2.imencode('.jpg', frame[0]) + image_bytes = encoded_image.tobytes() + return Annotation(image_bytes, time, detections) + + +cdef class Detection: + cdef double x + cdef double y + cdef double w + cdef double h + cdef int cls + + def __init__(self, double x, double y, double w, double h, int cls): + self.x = x + self.y = y + self.w = w + self.h = h + self.cls = cls + +cdef class Annotation: + + def __init__(self, image_bytes: bytes, float time, detections: [Detection]): + self.image = image_bytes + self.time = time + self.detections = detections diff --git a/Azaion.AI/libomp140.x86_64.dll b/Azaion.AI/libomp140.x86_64.dll new file mode 100644 index 0000000..c2d6d48 Binary files /dev/null and b/Azaion.AI/libomp140.x86_64.dll differ diff --git a/Azaion.AI/main.pyx b/Azaion.AI/main.pyx new file mode 100644 index 0000000..fd17ded --- /dev/null +++ b/Azaion.AI/main.pyx @@ -0,0 +1,97 @@ +# cython: language_level=3 +import queue +import threading +import constants +from api_client import ApiClient +from inference import Inference, Annotation +from processor_command import FileCommand, CommandType, ProcessorType +from remote_handlers import SocketHandler, RabbitHandler +import argparse + +cdef enum ListenOption: + SOCKET = 1 + QUEUE = 2 + +cdef class ParsedArguments: + cdef ListenOption listen + cdef str email + cdef str password + cdef str folder + cdef bint persist_token + + def __init__(self, ListenOption listen, str email, str password, str folder, bint persist_token): + self.listen = listen + self.email = email + self.password = password + self.folder = folder + self.persist_token = persist_token + +cdef class CommandProcessor: + + def __init__(self, args: ParsedArguments): + self.api_client = ApiClient(args.email, args.password, args.folder) + self.socket_handler = SocketHandler(self.on_message) + self.rabbit_handler = RabbitHandler(self.on_message) + self.command_queue = queue.Queue(maxsize=constants.QUEUE_MAXSIZE) + self.running = True + + def start(self): + threading.Thread(target=self.process_queue, daemon=True).start() + + cdef on_message(self, cmd: FileCommand): + try: + if cmd.command_type == CommandType.INFERENCE: + self.command_queue.put(cmd) + elif cmd.command_type == CommandType.LOAD: + threading.Thread(target=self.process_load, args=[cmd], daemon=True).start() + except Exception as e: + print(f"Error handling client: {e}") + + cdef on_annotations(self, cmd: FileCommand, annotations: [Annotation]): + handler = self.socket_handler if cmd.processor_type == ProcessorType.SOCKET else self.rabbit_handler + handler.send(annotations) + + + cdef process_queue(self): + while self.running: + try: + command = self.command_queue.get() + model = self.api_client.load_ai_model() + Inference(model, self.on_annotations).run_inference(command) + except Exception as e: + print(f"Error processing queue: {e}") + + cdef process_load(self, command: FileCommand): + response = self.api_client.load_file(command) + handler = self.socket_handler if command.processor_type == ProcessorType.SOCKET else self.rabbit_handler + handler.send(response) + + + def stop(self): + self.running = False + +def parse_arguments(): + parser = argparse.ArgumentParser(description="Command Processor") + parser.add_argument("--listen", type=ListenOption, choices=[ListenOption.SOCKET, ListenOption.QUEUE], default=ListenOption.SOCKET, help="socket: Local communication, queue: remote. Default is socket") + parser.add_argument("--email", type=str, default="", help="Email") + parser.add_argument("--pw", type=str, default="", help="Password") + parser.add_argument("--folder", type=str, default="", help="Folder to API inner folder to download file from") + parser.add_argument("--persist_token", type=bool, default=True, help="True for persisting token from API") + cdef args = parser.parse_args() + + cdef ListenOption listen = ListenOption(args.listen) + cdef str email = args.email + cdef str password = args.pw + cdef str folder = args.folder + cdef bint persist_token = args.persist_token + + return ParsedArguments(listen, email, password, folder, persist_token) + +if __name__ == '__main__': + args = parse_arguments() + processor = CommandProcessor(args) + try: + processor.start() + except KeyboardInterrupt: + processor.stop() + diff --git a/Azaion.AI/processor_command.pyx b/Azaion.AI/processor_command.pyx new file mode 100644 index 0000000..e96d7e6 --- /dev/null +++ b/Azaion.AI/processor_command.pyx @@ -0,0 +1,23 @@ +import msgpack + +cdef enum CommandType: + INFERENCE = 1 + LOAD = 2 + +cdef enum ProcessorType: + SOCKET = 1, + RABBIT = 2 + +cdef class FileCommand: + cdef str filename + + def __init__(self, command_type: CommandType, processor_type: ProcessorType, str filename): + self.command_type = command_type + self.processor_type = processor_type + self.filename = filename + + @staticmethod + cdef from_msgpack(bytes data, processor_type: ProcessorType): + unpacked = msgpack.unpackb(data, strict_map_key=False) + return FileCommand(unpacked.get("CommandType"), processor_type, unpacked.get("Filename") + ) diff --git a/Azaion.AI/remote_handlers.pyx b/Azaion.AI/remote_handlers.pyx new file mode 100644 index 0000000..ac1244b --- /dev/null +++ b/Azaion.AI/remote_handlers.pyx @@ -0,0 +1,114 @@ +# cython: language_level=3 +import json +import socket +import struct +import threading + +import msgpack +from msgpack import packb +from rstream import Producer, Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext + +import constants +from api_client import ApiClient +from processor_command import FileCommand, ProcessorType + +cdef class QueueConfig: + cdef str host + cdef int port + cdef str producer_user + cdef str producer_pw + cdef str consumer_user + cdef str consumer_pw + + @staticmethod + cdef QueueConfig from_json(str json_string): + cdef dict config_dict = json.loads(json_string) + cdef QueueConfig config = QueueConfig() + config.Host = config_dict["Host"] + config.Port = config_dict["Port"] + config.ProducerUsername = config_dict["ProducerUsername"] + config.ProducerPassword = config_dict["ProducerPassword"] + config.ConsumerUsername = config_dict["ConsumerUsername"] + config.ConsumerPassword = config_dict["ConsumerPassword"] + return config + +cdef class SocketHandler: + """Handles socket communication with size-prefixed messages.""" + + def __init__(self, on_message): + self.on_message = on_message + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect((constants.SOCKET_HOST, constants.SOCKET_PORT)) + self._socket.listen(1) + + cdef start(self): + threading.Thread(target=self.start, daemon=True).start() + + cdef start_inner(self): + while True: + self._connection, client_address = self._socket.accept() + size_data = self._connection.recv(4) + if not size_data: + raise ConnectionError("Connection closed while reading size prefix.") + data_size = struct.unpack('>I', size_data)[0] + # Read the full message + data = b"" + while len(data) < data_size: + packet = self._socket.recv(data_size - len(data)) + if not packet: + raise ConnectionError("Connection closed while reading data.") + data += packet + cmd = FileCommand.from_msgpack(data, ProcessorType.SOCKET) + self.on_message(cmd) + + async def send(self, object message): + data = msgpack.packb(message) + size_prefix = len(data).to_bytes(4, 'big') + self._connection.sendall(size_prefix + data) + + def close(self): + if self._socket: + self._socket.close() + self._socket = None + +cdef class RabbitHandler: + cdef str hardware_hash + + def __init__(self, config_filename, on_message): + self.on_message = on_message + cdef str config_str = ApiClient().load_file(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8') + self.queue_config = QueueConfig.from_json(config_str) + self.annotation_producer = Producer( + host=self.queue_config.host, + port=self.queue_config.port, + username=self.queue_config.producer_user, + password=self.queue_config.producer_pw + ) + self.command_consumer = Consumer( + host=self.queue_config.host, + port=self.queue_config.port, + username=self.queue_config.consumer_user, + password=self.queue_config.consumer_pw + ) + + cdef start(self): + self.command_consumer.start() + self.command_consumer.subscribe(stream=constants.COMMANDS_QUEUE, callback=self.on_message_inner, + offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None)) # put real offset + + + cdef on_message_inner(self, message: AMQPMessage, message_context: MessageContext): + cdef bytes body = message.body + cmd = FileCommand.from_msgpack(body, ProcessorType.RABBIT) + self.on_message(cmd) + + cpdef send(self, object message): + packed_message = AMQPMessage(body=packb(message)) + self.annotation_producer.send(constants.ANNOTATIONS_QUEUE, packed_message) + + async def close(self): + if self.annotation_producer: + await self.annotation_producer.close() + if self.command_consumer: + await self.command_consumer.close() \ No newline at end of file diff --git a/Azaion.AI/security.pyx b/Azaion.AI/security.pyx new file mode 100644 index 0000000..1b86764 --- /dev/null +++ b/Azaion.AI/security.pyx @@ -0,0 +1,40 @@ +# cython: language_level=3 +import hashlib +import os +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend + +BUFFER_SIZE = 64 * 1024 # 64 KB + +cdef class Security: + + cdef encrypt_to(self, input_stream, output_stream, key): + aes_key = hashlib.sha256(key.encode('utf-8')).digest() + iv = os.urandom(16) + output_stream.write(iv) # Write IV to the output stream + + cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend()) + encryptor = cipher.encryptor() + + # Read and encrypt in chunks + while chunk := input_stream.read(BUFFER_SIZE): + encrypted_data = encryptor.update(chunk) + output_stream.write(encrypted_data) + + final_data = encryptor.finalize() + output_stream.write(final_data) + + cdef decrypt_to(self, input_stream, output_stream, key): + aes_key = hashlib.sha256(key.encode('utf-8')).digest() + iv = input_stream.read(16) # AES block size is 16 bytes + + # Create cipher and decryptor + cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend()) + decryptor = cipher.decryptor() + + while chunk := input_stream.read(BUFFER_SIZE): + decrypted_data = decryptor.update(chunk) + output_stream.write(decrypted_data) + + final_data = decryptor.finalize() + output_stream.write(final_data) \ No newline at end of file diff --git a/Azaion.AI/setup.py b/Azaion.AI/setup.py new file mode 100644 index 0000000..4230875 --- /dev/null +++ b/Azaion.AI/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup, Extension +from Cython.Build import cythonize + +extensions = [ + Extension('main', ['main.pyx']), + Extension('api_client', ['api_client.pyx']), + Extension('constants', ['constants.pyx']), + Extension('hardware_service', ['hardware_service.pyx']), + Extension('inference', ['inference.pyx']), + Extension('processor_command', ['processor_command.pyx']), + Extension('remote_handlers', ['remote_handlers.pyx']), + Extension('security', ['security.pyx']) +] + +setup( + name="azaion.ai", + ext_modules=cythonize( + extensions, + compiler_directives={ + "language_level": 3, + "emit_code_comments" : False, + "binding": True + }, + gdb_debug=True + ), + zip_safe=False +) \ No newline at end of file