initial commit

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-25 05:37:10 +02:00
commit 941b8199aa
21 changed files with 861 additions and 0 deletions
+15
View File
@@ -0,0 +1,15 @@
FROM python:3.11-slim
RUN apt-get update && apt-get install -y python3-dev gcc pciutils curl gnupg && \
install -m 0755 -d /etc/apt/keyrings && \
curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \
chmod a+r /etc/apt/keyrings/docker.asc && \
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian $(. /etc/os-release && echo $VERSION_CODENAME) stable" > /etc/apt/sources.list.d/docker.list && \
apt-get update && apt-get install -y docker-ce-cli && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN python setup.py build_ext --inplace
EXPOSE 8080
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
+3
View File
@@ -0,0 +1,3 @@
# Azaion.Loader
Cython/Python service for model download, binary-split decryption, and local cache management.
+27
View File
@@ -0,0 +1,27 @@
from user cimport User
from credentials cimport Credentials
from cdn_manager cimport CDNManager
cdef class ApiClient:
cdef Credentials credentials
cdef CDNManager cdn_manager
cdef str token, folder, api_url
cdef User user
cpdef set_credentials_from_dict(self, str email, str password)
cdef set_credentials(self, Credentials credentials)
cdef login(self)
cdef set_token(self, str token)
cdef get_user(self)
cdef request(self, str method, str url, object payload, bint is_stream)
cdef list_files(self, str folder, str search_file)
cdef check_resource(self)
cdef load_bytes(self, str filename, str folder)
cdef upload_file(self, str filename, bytes resource, str folder)
cdef load_big_file_cdn(self, str folder, str big_part)
cpdef load_big_small_resource(self, str resource_name, str folder)
cpdef upload_big_small_resource(self, bytes resource, str resource_name, str folder)
cpdef upload_to_cdn(self, str bucket, str filename, bytes file_bytes)
cpdef download_from_cdn(self, str bucket, str filename)
+217
View File
@@ -0,0 +1,217 @@
import json
import os
from http import HTTPStatus
from os import path
from uuid import UUID
import jwt
import requests
cimport constants
import yaml
from requests import HTTPError
from credentials cimport Credentials
from cdn_manager cimport CDNManager, CDNCredentials
from hardware_service cimport HardwareService
from security cimport Security
from user cimport User, RoleEnum
cdef class ApiClient:
def __init__(self, str api_url):
self.credentials = None
self.user = None
self.token = None
self.cdn_manager = None
self.api_url = api_url
cpdef set_credentials_from_dict(self, str email, str password):
self.set_credentials(Credentials(email, password))
cdef set_credentials(self, Credentials credentials):
self.credentials = credentials
if self.cdn_manager is not None:
return
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, <str>'')
yaml_config = yaml.safe_load(yaml_bytes)
creds = CDNCredentials(yaml_config["host"],
yaml_config["downloader_access_key"],
yaml_config["downloader_access_secret"],
yaml_config["uploader_access_key"],
yaml_config["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
cdef login(self):
response = None
try:
response = requests.post(f"{self.api_url}/login",
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.set_token(token)
except HTTPError as e:
res = response.json()
constants.logerror(str(res))
if response.status_code == HTTPStatus.CONFLICT:
raise Exception(f"Error {res['ErrorCode']}: {res['Message']}")
cdef set_token(self, str token):
self.token = token
claims = jwt.decode(token, options={"verify_signature": False})
try:
id = str(UUID(claims.get("nameid", "")))
except ValueError:
raise ValueError("Invalid GUID format in claims")
email = claims.get("unique_name", "")
role_str = claims.get("role", "")
if role_str == "ApiAdmin":
role = RoleEnum.ApiAdmin
elif role_str == "Admin":
role = RoleEnum.Admin
elif role_str == "ResourceUploader":
role = RoleEnum.ResourceUploader
elif role_str == "Validator":
role = RoleEnum.Validator
elif role_str == "Operator":
role = RoleEnum.Operator
else:
role = RoleEnum.NONE
self.user = User(id, email, role)
cdef get_user(self):
if self.user is None:
self.login()
return self.user
cdef upload_file(self, str filename, bytes resource, str folder):
if self.token is None:
self.login()
url = f"{self.api_url}/resources/{folder}"
headers = { "Authorization": f"Bearer {self.token}" }
files = {'data': (filename, resource)}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
constants.log(f"Uploaded {filename} to {self.api_url}/{folder} successfully: {r.status_code}.")
except Exception as e:
constants.logerror(f"Upload fail: {e}")
cdef list_files(self, str folder, str search_file):
response = self.request('get', f'{self.api_url}/resources/list/{folder}', {
"search": search_file
}, is_stream=False)
constants.log(<str> f'Get files list by {folder}')
return response.json()
cdef check_resource(self):
cdef str hardware = HardwareService.get_hardware_info()
payload = json.dumps({ "hardware": hardware }, indent=4)
response = self.request('post', f'{self.api_url}/resources/check', payload, is_stream=False)
cdef load_bytes(self, str filename, str folder):
cdef str hardware = HardwareService.get_hardware_info()
hw_hash = Security.get_hw_hash(hardware)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware,
"fileName": filename
}, indent=4)
response = self.request('post', f'{self.api_url}/resources/get/{folder}', payload, is_stream=True)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes')
return data
cdef request(self, str method, str url, object payload, bint is_stream):
if self.token is None:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.request(method, url, data=payload, headers=headers, stream=is_stream)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.request(method, url, data=payload, headers=headers, stream=is_stream)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
raise Exception(f'Internal API error! {response.text}')
if response.status_code == HTTPStatus.CONFLICT:
res = response.json()
err_code = res['ErrorCode']
err_msg = res['Message']
raise Exception(f"Error {err_code}: {err_msg}")
return response
cdef load_big_file_cdn(self, str folder, str big_part):
constants.log(f'downloading file {folder}/{big_part} from cdn...')
if self.cdn_manager.download(folder, big_part):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
return encrypted_bytes_big
else:
raise Exception(f'Cannot download file {folder}/{big_part} from CDN!')
cpdef load_big_small_resource(self, str resource_name, str folder):
cdef str big_part = f'{resource_name}.big'
cdef str small_part = f'{resource_name}.small'
encrypted_bytes_small = self.load_bytes(small_part, folder)
key = Security.get_resource_encryption_key()
constants.log(f'checking on existence for {folder}/{big_part}')
if os.path.exists(os.path.join(<str> folder, big_part)):
with open(path.join(<str> folder, big_part), 'rb') as binary_file:
local_bytes_big = binary_file.read()
constants.log(f'local file {folder}/{big_part} is found!')
try:
resource = Security.decrypt_to(encrypted_bytes_small + local_bytes_big, key)
return resource
except Exception as ex:
constants.logerror(f'Local file {folder}/{big_part} doesnt match with api file, old version')
remote_bytes_big = self.load_big_file_cdn(folder, big_part)
return Security.decrypt_to(encrypted_bytes_small + remote_bytes_big, key)
cpdef upload_big_small_resource(self, bytes resource, str resource_name, str folder):
cdef str big_part_name = f'{resource_name}.big'
cdef str small_part_name = f'{resource_name}.small'
key = Security.get_resource_encryption_key()
resource_encrypted = Security.encrypt_to(<bytes>resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.3 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size]
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(folder, <str>big_part_name, part_big)
with open(path.join(<str>folder, <str>big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, folder)
cpdef upload_to_cdn(self, str bucket, str filename, bytes file_bytes):
if self.cdn_manager is None:
raise Exception("CDN manager not initialized. Call set_credentials first.")
if not self.cdn_manager.upload(bucket, filename, file_bytes):
raise Exception(f"Failed to upload {filename} to CDN bucket {bucket}")
cpdef download_from_cdn(self, str bucket, str filename):
if self.cdn_manager is None:
raise Exception("CDN manager not initialized. Call set_credentials first.")
if not self.cdn_manager.download(bucket, filename):
raise Exception(f"Failed to download {filename} from CDN bucket {bucket}")
local_path = path.join(bucket, filename)
with open(local_path, 'rb') as f:
return f.read()
+69
View File
@@ -0,0 +1,69 @@
import hashlib
import os
import subprocess
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
API_SERVICES = [
"azaion/annotations",
"azaion/flights",
"azaion/detections",
"azaion/gps-denied-onboard",
"azaion/gps-denied-desktop",
"azaion/autopilot",
"azaion/ai-training",
]
def download_key_fragment(resource_api_url: str, token: str) -> bytes:
resp = requests.get(
f"{resource_api_url}/binary-split/key-fragment",
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
return resp.content
def decrypt_archive(encrypted_path: str, key_fragment: bytes, output_path: str):
aes_key = hashlib.sha256(key_fragment).digest()
with open(encrypted_path, "rb") as f_in:
iv = f_in.read(16)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
with open(output_path, "wb") as f_out:
while True:
chunk = f_in.read(64 * 1024)
if not chunk:
break
f_out.write(decryptor.update(chunk))
final = decryptor.finalize()
f_out.write(final)
with open(output_path, "rb") as f:
f.seek(-1, 2)
padding_len = f.read(1)[0]
if 1 <= padding_len <= 16:
size = os.path.getsize(output_path) - padding_len
with open(output_path, "r+b") as f:
f.truncate(size)
def docker_load(tar_path: str):
subprocess.run(["docker", "load", "-i", tar_path], check=True)
def check_images_loaded(version: str) -> bool:
for svc in API_SERVICES:
tag = f"{svc}:{version}"
result = subprocess.run(
["docker", "image", "inspect", tag],
capture_output=True,
)
if result.returncode != 0:
return False
return True
+14
View File
@@ -0,0 +1,14 @@
cdef class CDNCredentials:
cdef str host
cdef str downloader_access_key
cdef str downloader_access_secret
cdef str uploader_access_key
cdef str uploader_access_secret
cdef class CDNManager:
cdef CDNCredentials creds
cdef object download_client
cdef object upload_client
cdef upload(self, str bucket, str filename, bytes file_bytes)
cdef download(self, str bucket, str filename)
+44
View File
@@ -0,0 +1,44 @@
import io
import os
cimport constants
import boto3
cdef class CDNCredentials:
def __init__(self, host, downloader_access_key, downloader_access_secret, uploader_access_key, uploader_access_secret):
self.host = host
self.downloader_access_key = downloader_access_key
self.downloader_access_secret = downloader_access_secret
self.uploader_access_key = uploader_access_key
self.uploader_access_secret = uploader_access_secret
cdef class CDNManager:
def __init__(self, CDNCredentials credentials):
self.creds = credentials
self.download_client = boto3.client('s3', endpoint_url=self.creds.host,
aws_access_key_id=self.creds.downloader_access_key,
aws_secret_access_key=self.creds.downloader_access_secret)
self.upload_client = boto3.client('s3', endpoint_url=self.creds.host,
aws_access_key_id=self.creds.uploader_access_key,
aws_secret_access_key=self.creds.uploader_access_secret)
cdef upload(self, str bucket, str filename, bytes file_bytes):
try:
self.upload_client.upload_fileobj(io.BytesIO(file_bytes), bucket, filename)
constants.log(f'uploaded {filename} ({len(file_bytes)} bytes) to the {bucket}')
return True
except Exception as e:
constants.logerror(e)
return False
cdef download(self, str folder, str filename):
try:
os.makedirs(folder, exist_ok=True)
self.download_client.download_file(folder, filename, os.path.join(folder, filename))
constants.log(f'downloaded {filename} from the {folder}')
return True
except Exception as e:
constants.logerror(e)
return False
+18
View File
@@ -0,0 +1,18 @@
cdef str CONFIG_FILE # Port for the zmq
cdef int QUEUE_MAXSIZE # Maximum size of the command queue
cdef str COMMANDS_QUEUE # Name of the commands queue in rabbit
cdef str ANNOTATIONS_QUEUE # Name of the annotations queue in rabbit
cdef str QUEUE_CONFIG_FILENAME # queue config filename to load from api
cdef str AI_ONNX_MODEL_FILE
cdef str CDN_CONFIG
cdef str MODELS_FOLDER
cdef int SMALL_SIZE_KB
cdef log(str log_message)
cdef logerror(str error)
+45
View File
@@ -0,0 +1,45 @@
import sys
import time
from loguru import logger
cdef str CONFIG_FILE = "config.yaml" # Port for the zmq
cdef str QUEUE_CONFIG_FILENAME = "secured-config.json"
cdef str AI_ONNX_MODEL_FILE = "azaion.onnx"
cdef str CDN_CONFIG = "cdn.yaml"
cdef str MODELS_FOLDER = "models"
cdef int SMALL_SIZE_KB = 3
cdef int ALIGNMENT_WIDTH = 32
logger.remove()
log_format = "[{time:HH:mm:ss} {level}] {message}"
logger.add(
sink="Logs/log_loader_{time:YYYYMMDD}.txt",
level="INFO",
format=log_format,
enqueue=True,
rotation="1 day",
retention="30 days",
)
logger.add(
sys.stdout,
level="DEBUG",
format=log_format,
filter=lambda record: record["level"].name in ("INFO", "DEBUG", "SUCCESS"),
colorize=True
)
logger.add(
sys.stderr,
level="WARNING",
format=log_format,
colorize=True
)
cdef log(str log_message):
logger.info(log_message)
cdef logerror(str error):
logger.error(error)
+3
View File
@@ -0,0 +1,3 @@
cdef class Credentials:
cdef public str email
cdef public str password
+9
View File
@@ -0,0 +1,9 @@
cdef class Credentials:
def __init__(self, str email, str password):
self.email = email
self.password = password
def __str__(self):
return f'{self.email}: {self.password}'
+6
View File
@@ -0,0 +1,6 @@
cdef str _CACHED_HW_INFO
cdef class HardwareService:
@staticmethod
cdef str get_hardware_info()
+48
View File
@@ -0,0 +1,48 @@
import os
import subprocess
cimport constants
cdef str _CACHED_HW_INFO = None
cdef class HardwareService:
@staticmethod
cdef str get_hardware_info():
global _CACHED_HW_INFO
if _CACHED_HW_INFO is not None:
constants.log(<str>"Using cached hardware info")
return <str> _CACHED_HW_INFO
if os.name == 'nt': # windows
os_command = (
"powershell -Command \""
"Get-CimInstance -ClassName Win32_Processor | Select-Object -ExpandProperty Name | Write-Output; "
"Get-CimInstance -ClassName Win32_VideoController | Select-Object -ExpandProperty Name | Write-Output; "
"Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object -ExpandProperty TotalVisibleMemorySize | Write-Output; "
"(Get-Disk | Where-Object {$_.IsSystem -eq $true}).SerialNumber"
"\""
)
else:
os_command = (
"lscpu | grep 'Model name:' | cut -d':' -f2 && "
"lspci | grep VGA | cut -d':' -f3 && "
"free -k | awk '/^Mem:/ {print $2}' && "
"cat /sys/block/sda/device/vpd_pg80 2>/dev/null || cat /sys/block/sda/device/serial 2>/dev/null"
)
result = subprocess.check_output(os_command, shell=True).decode('utf-8', errors='ignore')
lines = [line.replace(" ", " ").replace("Name=", "").strip('\x00\x14 \t\n\r\v\f') for line in result.splitlines() if line.strip()]
cdef str cpu = lines[0]
cdef str gpu = lines[1]
# could be multiple gpus
len_lines = len(lines)
cdef str memory = lines[len_lines-2].replace("TotalVisibleMemorySize=", "").replace(" ", " ")
cdef str drive_serial = lines[len_lines-1]
cdef str res = f'CPU: {cpu}. GPU: {gpu}. Memory: {memory}. DriveSerial: {drive_serial}'
constants.log(<str>f'Gathered hardware: {res}')
_CACHED_HW_INFO = res
return res
+187
View File
@@ -0,0 +1,187 @@
import os
import threading
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks
from fastapi.responses import Response
from pydantic import BaseModel
from unlock_state import UnlockState
app = FastAPI(title="Azaion.Loader")
RESOURCE_API_URL = os.environ.get("RESOURCE_API_URL", "https://api.azaion.com")
IMAGES_PATH = os.environ.get("IMAGES_PATH", "/opt/azaion/images.enc")
API_VERSION = os.environ.get("API_VERSION", "latest")
api_client = None
def get_api_client():
global api_client
if api_client is None:
from api_client import ApiClient
api_client = ApiClient(RESOURCE_API_URL)
return api_client
class LoginRequest(BaseModel):
email: str
password: str
class LoadRequest(BaseModel):
filename: str
folder: str
class HealthResponse(BaseModel):
status: str
class StatusResponse(BaseModel):
status: str
authenticated: bool
modelCacheDir: str
unlock_state = UnlockState.idle
unlock_error: Optional[str] = None
unlock_lock = threading.Lock()
@app.get("/health")
def health() -> HealthResponse:
return HealthResponse(status="healthy")
@app.get("/status")
def status() -> StatusResponse:
client = get_api_client()
return StatusResponse(
status="healthy",
authenticated=client.token is not None,
modelCacheDir="models",
)
@app.post("/login")
def login(req: LoginRequest):
try:
client = get_api_client()
client.set_credentials_from_dict(req.email, req.password)
return {"status": "ok"}
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
@app.post("/load/{filename}")
def load_resource(filename: str, req: LoadRequest):
try:
client = get_api_client()
data = client.load_big_small_resource(req.filename, req.folder)
return Response(content=data, media_type="application/octet-stream")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/upload/{filename}")
def upload_resource(
filename: str,
data: UploadFile = File(...),
folder: str = Form("models"),
):
try:
client = get_api_client()
content = data.file.read()
client.upload_big_small_resource(content, filename, folder)
return {"status": "ok"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def _run_unlock(email: str, password: str):
global unlock_state, unlock_error
from binary_split import (
download_key_fragment,
decrypt_archive,
docker_load,
check_images_loaded,
)
try:
if check_images_loaded(API_VERSION):
with unlock_lock:
unlock_state = UnlockState.ready
return
with unlock_lock:
unlock_state = UnlockState.authenticating
client = get_api_client()
client.set_credentials_from_dict(email, password)
client.login()
token = client.token
with unlock_lock:
unlock_state = UnlockState.downloading_key
key_fragment = download_key_fragment(RESOURCE_API_URL, token)
with unlock_lock:
unlock_state = UnlockState.decrypting
tar_path = IMAGES_PATH.replace(".enc", ".tar")
decrypt_archive(IMAGES_PATH, key_fragment, tar_path)
with unlock_lock:
unlock_state = UnlockState.loading_images
docker_load(tar_path)
try:
os.remove(tar_path)
except OSError:
pass
with unlock_lock:
unlock_state = UnlockState.ready
unlock_error = None
except Exception as e:
with unlock_lock:
unlock_state = UnlockState.error
unlock_error = str(e)
@app.post("/unlock")
def unlock(req: LoginRequest, background_tasks: BackgroundTasks):
global unlock_state, unlock_error
with unlock_lock:
if unlock_state == UnlockState.ready:
return {"state": unlock_state.value}
if unlock_state not in (UnlockState.idle, UnlockState.error):
return {"state": unlock_state.value}
if not os.path.exists(IMAGES_PATH):
from binary_split import check_images_loaded
if check_images_loaded(API_VERSION):
with unlock_lock:
unlock_state = UnlockState.ready
return {"state": unlock_state.value}
raise HTTPException(status_code=404, detail="Encrypted archive not found")
with unlock_lock:
unlock_state = UnlockState.authenticating
unlock_error = None
background_tasks.add_task(_run_unlock, req.email, req.password)
return {"state": unlock_state.value}
@app.get("/unlock/status")
def get_unlock_status():
with unlock_lock:
return {"state": unlock_state.value, "error": unlock_error}
+11
View File
@@ -0,0 +1,11 @@
fastapi
uvicorn[standard]
Cython==3.1.3
requests==2.32.4
pyjwt==2.10.1
cryptography==44.0.2
boto3==1.40.9
loguru==0.7.3
pyyaml==6.0.2
psutil==7.0.0
python-multipart
+20
View File
@@ -0,0 +1,20 @@
from credentials cimport Credentials
cdef class Security:
@staticmethod
cdef encrypt_to(input_stream, key)
@staticmethod
cdef decrypt_to(input_bytes, key)
@staticmethod
cdef get_hw_hash(str hardware)
@staticmethod
cdef get_api_encryption_key(Credentials credentials, str hardware_hash)
@staticmethod
cdef get_resource_encryption_key()
@staticmethod
cdef calc_hash(str key)
+68
View File
@@ -0,0 +1,68 @@
import base64
import hashlib
import os
from hashlib import sha384
from credentials cimport Credentials
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
BUFFER_SIZE = 64 * 1024 # 64 KB
cdef class Security:
@staticmethod
cdef encrypt_to(input_bytes, key):
cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest()
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(<bytes> aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(input_bytes) + padder.finalize()
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return iv + ciphertext
@staticmethod
cdef decrypt_to(ciphertext_with_iv_bytes, key):
cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest()
iv = ciphertext_with_iv_bytes[:16]
ciphertext_bytes = ciphertext_with_iv_bytes[16:]
cipher = Cipher(algorithms.AES(<bytes>aes_key), modes.CBC(<bytes>iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_padded_bytes = decryptor.update(ciphertext_bytes) + decryptor.finalize()
# Manual PKCS7 unpadding check and removal
padding_value = decrypted_padded_bytes[-1] # Get the last byte, which indicates padding length
if 1 <= padding_value <= 16: # Valid PKCS7 padding value range for AES-128
padding_length = padding_value
plaintext_bytes = decrypted_padded_bytes[:-padding_length] # Remove padding bytes
else:
plaintext_bytes = decrypted_padded_bytes
return bytes(plaintext_bytes)
@staticmethod
cdef get_hw_hash(str hardware):
cdef str key = f'Azaion_{hardware}_%$$$)0_'
return Security.calc_hash(key)
@staticmethod
cdef get_api_encryption_key(Credentials creds, str hardware_hash):
cdef str key = f'{creds.email}-{creds.password}-{hardware_hash}-#%@AzaionKey@%#---'
return Security.calc_hash(key)
@staticmethod
cdef get_resource_encryption_key():
cdef str key = '-#%@AzaionKey@%#---234sdfklgvhjbnn'
return Security.calc_hash(key)
@staticmethod
cdef calc_hash(str key):
str_bytes = key.encode('utf-8')
hash_bytes = sha384(str_bytes).digest()
cdef str h = base64.b64encode(hash_bytes).decode('utf-8')
return h
+27
View File
@@ -0,0 +1,27 @@
from setuptools import setup, Extension
from Cython.Build import cythonize
extensions = [
Extension('constants', ['constants.pyx']),
Extension('credentials', ['credentials.pyx']),
Extension('user', ['user.pyx']),
Extension('security', ['security.pyx']),
Extension('hardware_service', ['hardware_service.pyx']),
Extension('cdn_manager', ['cdn_manager.pyx']),
Extension('api_client', ['api_client.pyx']),
]
setup(
name="azaion.loader",
ext_modules=cythonize(
extensions,
compiler_directives={
"language_level": 3,
"emit_code_comments": False,
"binding": True,
'boundscheck': False,
'wraparound': False,
}
),
zip_safe=False
)
+11
View File
@@ -0,0 +1,11 @@
from enum import Enum
class UnlockState(str, Enum):
idle = "idle"
authenticating = "authenticating"
downloading_key = "downloading_key"
decrypting = "decrypting"
loading_images = "loading_images"
ready = "ready"
error = "error"
+13
View File
@@ -0,0 +1,13 @@
cdef enum RoleEnum:
NONE = 0
Operator = 10
Validator = 20
CompanionPC = 30
Admin = 40
ResourceUploader = 50
ApiAdmin = 1000
cdef class User:
cdef public str id
cdef public str email
cdef public RoleEnum role
+6
View File
@@ -0,0 +1,6 @@
cdef class User:
def __init__(self, str id, str email, RoleEnum role):
self.id = id
self.email = email
self.role = role