Files
loader/binary_split.py
T
Oleksandr Bezdieniezhnykh 4eaf218f09 Quality cleanup refactoring
Made-with: Cursor
2026-04-13 06:21:26 +03:00

64 lines
1.9 KiB
Python

import hashlib
import subprocess
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
API_SERVICES = [
"azaion/annotations",
"azaion/flights",
"azaion/detections",
"azaion/gps-denied-onboard",
"azaion/gps-denied-desktop",
"azaion/autopilot",
"azaion/ai-training",
]
def download_key_fragment(resource_api_url: str, token: str) -> bytes:
resp = requests.get(
f"{resource_api_url}/binary-split/key-fragment",
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
return resp.content
def decrypt_archive(encrypted_path: str, key_fragment: bytes, output_path: str):
aes_key = hashlib.sha256(key_fragment).digest()
with open(encrypted_path, "rb") as f_in:
iv = f_in.read(16)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
unpadder = padding.PKCS7(128).unpadder()
with open(output_path, "wb") as f_out:
while True:
chunk = f_in.read(64 * 1024)
if not chunk:
break
decrypted = decryptor.update(chunk)
if decrypted:
f_out.write(unpadder.update(decrypted))
final_decrypted = decryptor.finalize()
f_out.write(unpadder.update(final_decrypted) + unpadder.finalize())
def docker_load(tar_path: str):
subprocess.run(["docker", "load", "-i", tar_path], check=True)
def check_images_loaded(version: str) -> bool:
for svc in API_SERVICES:
tag = f"{svc}:{version}"
result = subprocess.run(
["docker", "image", "inspect", tag],
capture_output=True,
)
if result.returncode != 0:
return False
return True