Files
loader/e2e/mocks/mock_api/app.py
T
Oleksandr Bezdieniezhnykh d244799f02 [AZ-182][AZ-184][AZ-187] Batch 1
Made-with: Cursor
2026-04-15 07:23:47 +03:00

142 lines
4.3 KiB
Python

import base64
import hashlib
import os
import secrets
import uuid
import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from fastapi import FastAPI, File, Request, UploadFile
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel
VALID_EMAIL = os.environ.get("MOCK_VALID_EMAIL", "test@azaion.com")
VALID_PASSWORD = os.environ.get("MOCK_VALID_PASSWORD", "testpass")
JWT_SECRET = os.environ.get("MOCK_JWT_SECRET", "e2e-mock-jwt-secret")
CDN_HOST = os.environ.get("MOCK_CDN_HOST", "http://mock-cdn:9000")
CDN_CONFIG_YAML = (
f"host: {CDN_HOST}\n"
"downloader_access_key: minioadmin\n"
"downloader_access_secret: minioadmin\n"
"uploader_access_key: minioadmin\n"
"uploader_access_secret: minioadmin\n"
)
uploaded_files: dict[str, bytes] = {}
app = FastAPI()
class LoginBody(BaseModel):
email: str
password: str
class GetUpdateBody(BaseModel):
dev_stage: str = ""
architecture: str = ""
current_versions: dict[str, str] = {}
def _calc_hash(key: str) -> str:
h = hashlib.sha384(key.encode("utf-8")).digest()
return base64.b64encode(h).decode("utf-8")
def _encrypt(plaintext: bytes, key: str) -> bytes:
aes_key = hashlib.sha256(key.encode("utf-8")).digest()
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded = padder.update(plaintext) + padder.finalize()
ciphertext = encryptor.update(padded) + encryptor.finalize()
return iv + ciphertext
@app.post("/login")
def login(body: LoginBody):
if body.email == VALID_EMAIL and body.password == VALID_PASSWORD:
token = jwt.encode(
{
"nameid": str(uuid.uuid4()),
"unique_name": body.email,
"role": "Admin",
},
JWT_SECRET,
algorithm="HS256",
)
if isinstance(token, bytes):
token = token.decode("ascii")
return {"token": token}
return JSONResponse(
status_code=409,
content={"ErrorCode": "AUTH_FAILED", "Message": "Invalid credentials"},
)
@app.post("/resources/get/{folder:path}")
async def resources_get(folder: str, request: Request):
body = await request.json()
hardware = body.get("hardware", "")
password = body.get("password", "")
filename = body.get("fileName", "")
hw_hash = _calc_hash(f"Azaion_{hardware}_%$$$)0_")
enc_key = _calc_hash(f"{VALID_EMAIL}-{password}-{hw_hash}-#%@AzaionKey@%#---")
if filename == "cdn.yaml":
encrypted = _encrypt(CDN_CONFIG_YAML.encode("utf-8"), enc_key)
return Response(content=encrypted, media_type="application/octet-stream")
storage_key = f"{folder}/{filename}" if folder else filename
if storage_key in uploaded_files:
encrypted = _encrypt(uploaded_files[storage_key], enc_key)
return Response(content=encrypted, media_type="application/octet-stream")
encrypted = _encrypt(b"\x00" * 32, enc_key)
return Response(content=encrypted, media_type="application/octet-stream")
@app.post("/resources/{folder}")
async def resources_upload(folder: str, data: UploadFile = File(...)):
content = await data.read()
storage_key = f"{folder}/{data.filename}"
uploaded_files[storage_key] = content
return Response(status_code=200)
@app.get("/resources/list/{folder}")
def resources_list(folder: str, search: str = ""):
return []
@app.get("/binary-split/key-fragment")
def binary_split_key_fragment():
return Response(content=secrets.token_bytes(16), media_type="application/octet-stream")
@app.post("/resources/check")
async def resources_check(request: Request):
await request.body()
return Response(status_code=200)
@app.post("/get-update")
def get_update(body: GetUpdateBody):
ann = body.current_versions.get("annotations", "")
if not ann or ann < "2026-04-13":
return [
{
"resourceName": "annotations",
"version": "2026-04-13",
"cdnUrl": f"{CDN_HOST}/fleet/annotations",
"sha256": "a" * 64,
"encryptionKey": "mock-fleet-encryption-key",
}
]
return []