Files
loader/e2e/tests/test_zz_resilience.py
T
Oleksandr Bezdieniezhnykh 8f7deb3fca Add E2E tests, fix bugs
Made-with: Cursor
2026-04-13 05:17:48 +03:00

73 lines
2.0 KiB
Python

import os
import subprocess
import time
COMPOSE_FILE = os.path.join(os.path.dirname(__file__), "..", "docker-compose.test.yml")
def _compose(*args):
subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, *args],
capture_output=True, timeout=30,
)
def test_download_when_cdn_unavailable(base_url, logged_in_client):
# Arrange
_compose("stop", "mock-cdn")
time.sleep(1)
try:
# Act
try:
response = logged_in_client.post(
f"{base_url}/load/nocache",
json={"filename": "nocache", "folder": "models"},
timeout=15,
)
status = response.status_code
except Exception:
status = 0
# Assert
assert status != 200
finally:
_compose("start", "mock-cdn")
time.sleep(3)
def test_unlock_with_corrupt_archive(base_url, api_client):
# Arrange
subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "exec", "system-under-test",
"bash", "-c", "dd if=/dev/urandom of=/tmp/test.enc bs=1024 count=1 2>/dev/null"],
capture_output=True, timeout=15,
)
payload = {"email": "test@azaion.com", "password": "testpass"}
try:
# Act
response = api_client.post(f"{base_url}/unlock", json=payload)
assert response.status_code == 200
deadline = time.monotonic() + 30
body = None
while time.monotonic() < deadline:
status = api_client.get(f"{base_url}/unlock/status")
body = status.json()
if body["state"] in ("error", "ready"):
break
time.sleep(0.5)
# Assert
assert body is not None
assert body["state"] == "error"
assert body["error"] is not None
finally:
subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "exec", "system-under-test",
"bash", "-c", "rm -f /tmp/test.enc /tmp/test.tar"],
capture_output=True, timeout=15,
)