Files
loader/binary_split.py
T
Oleksandr Bezdieniezhnykh 941b8199aa initial commit
Made-with: Cursor
2026-03-25 05:37:10 +02:00

70 lines
1.9 KiB
Python

import hashlib
import os
import subprocess
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
API_SERVICES = [
"azaion/annotations",
"azaion/flights",
"azaion/detections",
"azaion/gps-denied-onboard",
"azaion/gps-denied-desktop",
"azaion/autopilot",
"azaion/ai-training",
]
def download_key_fragment(resource_api_url: str, token: str) -> bytes:
resp = requests.get(
f"{resource_api_url}/binary-split/key-fragment",
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
return resp.content
def decrypt_archive(encrypted_path: str, key_fragment: bytes, output_path: str):
aes_key = hashlib.sha256(key_fragment).digest()
with open(encrypted_path, "rb") as f_in:
iv = f_in.read(16)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
with open(output_path, "wb") as f_out:
while True:
chunk = f_in.read(64 * 1024)
if not chunk:
break
f_out.write(decryptor.update(chunk))
final = decryptor.finalize()
f_out.write(final)
with open(output_path, "rb") as f:
f.seek(-1, 2)
padding_len = f.read(1)[0]
if 1 <= padding_len <= 16:
size = os.path.getsize(output_path) - padding_len
with open(output_path, "r+b") as f:
f.truncate(size)
def docker_load(tar_path: str):
subprocess.run(["docker", "load", "-i", tar_path], check=True)
def check_images_loaded(version: str) -> bool:
for svc in API_SERVICES:
tag = f"{svc}:{version}"
result = subprocess.run(
["docker", "image", "inspect", tag],
capture_output=True,
)
if result.returncode != 0:
return False
return True