mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 09:46:32 +00:00
Add E2E tests, fix bugs
Made-with: Cursor
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
FROM python:3.11-slim
|
||||
WORKDIR /app
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY app.py .
|
||||
EXPOSE 9090
|
||||
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "9090"]
|
||||
@@ -0,0 +1,119 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import os
|
||||
import secrets
|
||||
import uuid
|
||||
|
||||
import jwt
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from fastapi import FastAPI, File, Request, UploadFile
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
from pydantic import BaseModel
|
||||
|
||||
VALID_EMAIL = os.environ.get("MOCK_VALID_EMAIL", "test@azaion.com")
|
||||
VALID_PASSWORD = os.environ.get("MOCK_VALID_PASSWORD", "testpass")
|
||||
JWT_SECRET = os.environ.get("MOCK_JWT_SECRET", "e2e-mock-jwt-secret")
|
||||
CDN_HOST = os.environ.get("MOCK_CDN_HOST", "http://mock-cdn:9000")
|
||||
|
||||
CDN_CONFIG_YAML = (
|
||||
f"host: {CDN_HOST}\n"
|
||||
"downloader_access_key: minioadmin\n"
|
||||
"downloader_access_secret: minioadmin\n"
|
||||
"uploader_access_key: minioadmin\n"
|
||||
"uploader_access_secret: minioadmin\n"
|
||||
)
|
||||
|
||||
uploaded_files: dict[str, bytes] = {}
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class LoginBody(BaseModel):
|
||||
email: str
|
||||
password: str
|
||||
|
||||
|
||||
def _calc_hash(key: str) -> str:
|
||||
h = hashlib.sha384(key.encode("utf-8")).digest()
|
||||
return base64.b64encode(h).decode("utf-8")
|
||||
|
||||
|
||||
def _encrypt(plaintext: bytes, key: str) -> bytes:
|
||||
aes_key = hashlib.sha256(key.encode("utf-8")).digest()
|
||||
iv = os.urandom(16)
|
||||
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
padder = padding.PKCS7(128).padder()
|
||||
padded = padder.update(plaintext) + padder.finalize()
|
||||
ciphertext = encryptor.update(padded) + encryptor.finalize()
|
||||
return iv + ciphertext
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
def login(body: LoginBody):
|
||||
if body.email == VALID_EMAIL and body.password == VALID_PASSWORD:
|
||||
token = jwt.encode(
|
||||
{
|
||||
"nameid": str(uuid.uuid4()),
|
||||
"unique_name": body.email,
|
||||
"role": "Admin",
|
||||
},
|
||||
JWT_SECRET,
|
||||
algorithm="HS256",
|
||||
)
|
||||
if isinstance(token, bytes):
|
||||
token = token.decode("ascii")
|
||||
return {"token": token}
|
||||
return JSONResponse(
|
||||
status_code=409,
|
||||
content={"ErrorCode": "AUTH_FAILED", "Message": "Invalid credentials"},
|
||||
)
|
||||
|
||||
|
||||
@app.post("/resources/get/{folder:path}")
|
||||
async def resources_get(folder: str, request: Request):
|
||||
body = await request.json()
|
||||
hardware = body.get("hardware", "")
|
||||
password = body.get("password", "")
|
||||
filename = body.get("fileName", "")
|
||||
|
||||
hw_hash = _calc_hash(f"Azaion_{hardware}_%$$$)0_")
|
||||
enc_key = _calc_hash(f"{VALID_EMAIL}-{password}-{hw_hash}-#%@AzaionKey@%#---")
|
||||
|
||||
if filename == "cdn.yaml":
|
||||
encrypted = _encrypt(CDN_CONFIG_YAML.encode("utf-8"), enc_key)
|
||||
return Response(content=encrypted, media_type="application/octet-stream")
|
||||
|
||||
storage_key = f"{folder}/{filename}" if folder else filename
|
||||
if storage_key in uploaded_files:
|
||||
encrypted = _encrypt(uploaded_files[storage_key], enc_key)
|
||||
return Response(content=encrypted, media_type="application/octet-stream")
|
||||
|
||||
encrypted = _encrypt(b"\x00" * 32, enc_key)
|
||||
return Response(content=encrypted, media_type="application/octet-stream")
|
||||
|
||||
|
||||
@app.post("/resources/{folder}")
|
||||
async def resources_upload(folder: str, data: UploadFile = File(...)):
|
||||
content = await data.read()
|
||||
storage_key = f"{folder}/{data.filename}"
|
||||
uploaded_files[storage_key] = content
|
||||
return Response(status_code=200)
|
||||
|
||||
|
||||
@app.get("/resources/list/{folder}")
|
||||
def resources_list(folder: str, search: str = ""):
|
||||
return []
|
||||
|
||||
|
||||
@app.get("/binary-split/key-fragment")
|
||||
def binary_split_key_fragment():
|
||||
return Response(content=secrets.token_bytes(16), media_type="application/octet-stream")
|
||||
|
||||
|
||||
@app.post("/resources/check")
|
||||
async def resources_check(request: Request):
|
||||
await request.body()
|
||||
return Response(status_code=200)
|
||||
@@ -0,0 +1,5 @@
|
||||
fastapi
|
||||
uvicorn
|
||||
pyjwt
|
||||
python-multipart
|
||||
cryptography
|
||||
Reference in New Issue
Block a user