import os import subprocess import time import boto3 import pytest import requests from botocore.config import Config from botocore.exceptions import ClientError COMPOSE_FILE = os.path.join(os.path.dirname(__file__), "docker-compose.test.yml") @pytest.fixture(scope="session") def base_url(): return os.environ.get("LOADER_URL", "http://localhost:8080").rstrip("/") @pytest.fixture(scope="session", autouse=True) def _reset_loader(base_url): subprocess.run( ["docker", "compose", "-f", COMPOSE_FILE, "restart", "system-under-test"], capture_output=True, timeout=30, ) endpoint = os.environ.get("MINIO_URL", "http://localhost:9000") s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id="minioadmin", aws_secret_access_key="minioadmin", config=Config(signature_version="s3v4"), region_name="us-east-1", ) for bucket in ["models"]: try: s3.head_bucket(Bucket=bucket) for obj in s3.list_objects_v2(Bucket=bucket).get("Contents", []): s3.delete_object(Bucket=bucket, Key=obj["Key"]) except ClientError: s3.create_bucket(Bucket=bucket) session = requests.Session() deadline = time.monotonic() + 30 while time.monotonic() < deadline: try: if session.get(f"{base_url}/health", timeout=2).status_code == 200: break except Exception: pass time.sleep(1) @pytest.fixture def api_client(): return requests.Session() @pytest.fixture def logged_in_client(base_url, api_client): email = os.environ.get("TEST_EMAIL", "test@azaion.com") password = os.environ.get("TEST_PASSWORD", "testpass") response = api_client.post( f"{base_url}/login", json={"email": email, "password": password}, ) response.raise_for_status() return api_client