mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 23:26:32 +00:00
d244799f02
Made-with: Cursor
142 lines
4.3 KiB
Python
142 lines
4.3 KiB
Python
import base64
|
|
import hashlib
|
|
import os
|
|
import secrets
|
|
import uuid
|
|
|
|
import jwt
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.primitives import padding
|
|
from fastapi import FastAPI, File, Request, UploadFile
|
|
from fastapi.responses import JSONResponse, Response
|
|
from pydantic import BaseModel
|
|
|
|
VALID_EMAIL = os.environ.get("MOCK_VALID_EMAIL", "test@azaion.com")
|
|
VALID_PASSWORD = os.environ.get("MOCK_VALID_PASSWORD", "testpass")
|
|
JWT_SECRET = os.environ.get("MOCK_JWT_SECRET", "e2e-mock-jwt-secret")
|
|
CDN_HOST = os.environ.get("MOCK_CDN_HOST", "http://mock-cdn:9000")
|
|
|
|
CDN_CONFIG_YAML = (
|
|
f"host: {CDN_HOST}\n"
|
|
"downloader_access_key: minioadmin\n"
|
|
"downloader_access_secret: minioadmin\n"
|
|
"uploader_access_key: minioadmin\n"
|
|
"uploader_access_secret: minioadmin\n"
|
|
)
|
|
|
|
uploaded_files: dict[str, bytes] = {}
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
class LoginBody(BaseModel):
|
|
email: str
|
|
password: str
|
|
|
|
|
|
class GetUpdateBody(BaseModel):
|
|
dev_stage: str = ""
|
|
architecture: str = ""
|
|
current_versions: dict[str, str] = {}
|
|
|
|
|
|
def _calc_hash(key: str) -> str:
|
|
h = hashlib.sha384(key.encode("utf-8")).digest()
|
|
return base64.b64encode(h).decode("utf-8")
|
|
|
|
|
|
def _encrypt(plaintext: bytes, key: str) -> bytes:
|
|
aes_key = hashlib.sha256(key.encode("utf-8")).digest()
|
|
iv = os.urandom(16)
|
|
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
|
encryptor = cipher.encryptor()
|
|
padder = padding.PKCS7(128).padder()
|
|
padded = padder.update(plaintext) + padder.finalize()
|
|
ciphertext = encryptor.update(padded) + encryptor.finalize()
|
|
return iv + ciphertext
|
|
|
|
|
|
@app.post("/login")
|
|
def login(body: LoginBody):
|
|
if body.email == VALID_EMAIL and body.password == VALID_PASSWORD:
|
|
token = jwt.encode(
|
|
{
|
|
"nameid": str(uuid.uuid4()),
|
|
"unique_name": body.email,
|
|
"role": "Admin",
|
|
},
|
|
JWT_SECRET,
|
|
algorithm="HS256",
|
|
)
|
|
if isinstance(token, bytes):
|
|
token = token.decode("ascii")
|
|
return {"token": token}
|
|
return JSONResponse(
|
|
status_code=409,
|
|
content={"ErrorCode": "AUTH_FAILED", "Message": "Invalid credentials"},
|
|
)
|
|
|
|
|
|
@app.post("/resources/get/{folder:path}")
|
|
async def resources_get(folder: str, request: Request):
|
|
body = await request.json()
|
|
hardware = body.get("hardware", "")
|
|
password = body.get("password", "")
|
|
filename = body.get("fileName", "")
|
|
|
|
hw_hash = _calc_hash(f"Azaion_{hardware}_%$$$)0_")
|
|
enc_key = _calc_hash(f"{VALID_EMAIL}-{password}-{hw_hash}-#%@AzaionKey@%#---")
|
|
|
|
if filename == "cdn.yaml":
|
|
encrypted = _encrypt(CDN_CONFIG_YAML.encode("utf-8"), enc_key)
|
|
return Response(content=encrypted, media_type="application/octet-stream")
|
|
|
|
storage_key = f"{folder}/{filename}" if folder else filename
|
|
if storage_key in uploaded_files:
|
|
encrypted = _encrypt(uploaded_files[storage_key], enc_key)
|
|
return Response(content=encrypted, media_type="application/octet-stream")
|
|
|
|
encrypted = _encrypt(b"\x00" * 32, enc_key)
|
|
return Response(content=encrypted, media_type="application/octet-stream")
|
|
|
|
|
|
@app.post("/resources/{folder}")
|
|
async def resources_upload(folder: str, data: UploadFile = File(...)):
|
|
content = await data.read()
|
|
storage_key = f"{folder}/{data.filename}"
|
|
uploaded_files[storage_key] = content
|
|
return Response(status_code=200)
|
|
|
|
|
|
@app.get("/resources/list/{folder}")
|
|
def resources_list(folder: str, search: str = ""):
|
|
return []
|
|
|
|
|
|
@app.get("/binary-split/key-fragment")
|
|
def binary_split_key_fragment():
|
|
return Response(content=secrets.token_bytes(16), media_type="application/octet-stream")
|
|
|
|
|
|
@app.post("/resources/check")
|
|
async def resources_check(request: Request):
|
|
await request.body()
|
|
return Response(status_code=200)
|
|
|
|
|
|
@app.post("/get-update")
|
|
def get_update(body: GetUpdateBody):
|
|
ann = body.current_versions.get("annotations", "")
|
|
if not ann or ann < "2026-04-13":
|
|
return [
|
|
{
|
|
"resourceName": "annotations",
|
|
"version": "2026-04-13",
|
|
"cdnUrl": f"{CDN_HOST}/fleet/annotations",
|
|
"sha256": "a" * 64,
|
|
"encryptionKey": "mock-fleet-encryption-key",
|
|
}
|
|
]
|
|
return []
|