use cbc encryption decryption - works nice with c#

This commit is contained in:
Alex Bezdieniezhnykh
2025-02-28 00:49:40 +02:00
parent 58839933fc
commit 227d01ba5e
4 changed files with 27 additions and 31 deletions
-2
View File
@@ -15,5 +15,3 @@ cdef class ApiClient:
cdef load_bytes(self, str filename, str folder=*) cdef load_bytes(self, str filename, str folder=*)
cdef upload_file(self, str filename, str folder=*) cdef upload_file(self, str filename, str folder=*)
cdef load_ai_model(self) cdef load_ai_model(self)
cdef load_queue_config(self)
+3 -8
View File
@@ -105,8 +105,8 @@ cdef class ApiClient:
hw_hash = Security.get_hw_hash(hardware) hw_hash = Security.get_hw_hash(hardware)
key = Security.get_api_encryption_key(self.credentials, hw_hash) key = Security.get_api_encryption_key(self.credentials, hw_hash)
stream = BytesIO(response.raw.read()) resp_bytes = response.raw.read()
data = Security.decrypt_to(stream, key) data = Security.decrypt_to(resp_bytes, key)
constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes') constants.log(<str>f'Downloaded file: {filename}, {len(data)} bytes')
return data return data
@@ -118,9 +118,4 @@ cdef class ApiClient:
encrypted_model_bytes = encrypted_bytes_small + encrypted_bytes_big encrypted_model_bytes = encrypted_bytes_small + encrypted_bytes_big
key = Security.get_model_encryption_key() key = Security.get_model_encryption_key()
model_bytes = Security.decrypt_to(BytesIO(encrypted_model_bytes), key) model_bytes = Security.decrypt_to(encrypted_model_bytes, key)
cdef load_queue_config(self):
return self.load_bytes(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8')
+1 -1
View File
@@ -6,7 +6,7 @@ cdef class Security:
cdef encrypt_to(input_stream, key) cdef encrypt_to(input_stream, key)
@staticmethod @staticmethod
cdef decrypt_to(input_stream, key) cdef decrypt_to(input_bytes, key)
@staticmethod @staticmethod
cdef get_hw_hash(HardwareInfo hardware) cdef get_hw_hash(HardwareInfo hardware)
+23 -20
View File
@@ -6,43 +6,46 @@ from credentials cimport Credentials
from hardware_service cimport HardwareInfo from hardware_service cimport HardwareInfo
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
BUFFER_SIZE = 64 * 1024 # 64 KB BUFFER_SIZE = 64 * 1024 # 64 KB
cdef class Security: cdef class Security:
@staticmethod @staticmethod
cdef encrypt_to(input_stream, key): cdef encrypt_to(input_bytes, key):
cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest() cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest()
iv = os.urandom(16) iv = os.urandom(16)
cipher = Cipher(algorithms.AES(<bytes>aes_key), modes.CFB(iv), backend=default_backend()) cipher = Cipher(algorithms.AES(<bytes> aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
cdef bytearray res = bytearray() padded_plaintext = padder.update(input_bytes) + padder.finalize()
res.extend(iv) ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
while chunk := input_stream.read(BUFFER_SIZE):
encrypted_chunk = encryptor.update(chunk) return iv + ciphertext
res.extend(encrypted_chunk)
res.extend(encryptor.finalize())
return bytes(res)
@staticmethod @staticmethod
cdef decrypt_to(input_stream, key): cdef decrypt_to(ciphertext_with_iv_bytes, key):
cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest() cdef bytes aes_key = hashlib.sha256(key.encode('utf-8')).digest()
cdef bytes iv = input_stream.read(16) iv = ciphertext_with_iv_bytes[:16]
ciphertext_bytes = ciphertext_with_iv_bytes[16:]
cdef cipher = Cipher(algorithms.AES(<bytes>aes_key), modes.CFB(<bytes>iv), backend=default_backend()) cipher = Cipher(algorithms.AES(<bytes>aes_key), modes.CBC(<bytes>iv), backend=default_backend())
cdef decryptor = cipher.decryptor() decryptor = cipher.decryptor()
cdef bytearray res = bytearray() decrypted_padded_bytes = decryptor.update(ciphertext_bytes) + decryptor.finalize()
while chunk := input_stream.read(BUFFER_SIZE):
decrypted_chunk = decryptor.update(chunk)
res.extend(decrypted_chunk)
res.extend(decryptor.finalize())
return bytes(res) # Manual PKCS7 unpadding check and removal
padding_value = decrypted_padded_bytes[-1] # Get the last byte, which indicates padding length
if 1 <= padding_value <= 16: # Valid PKCS7 padding value range for AES-128
padding_length = padding_value
plaintext_bytes = decrypted_padded_bytes[:-padding_length] # Remove padding bytes
else:
plaintext_bytes = decrypted_padded_bytes
return bytes(plaintext_bytes)
@staticmethod @staticmethod
cdef get_hw_hash(HardwareInfo hardware): cdef get_hw_hash(HardwareInfo hardware):