Files
detections/src/loader_http_client.pyx
T
2026-03-31 06:30:22 +03:00

79 lines
2.7 KiB
Cython

import requests
from loguru import logger
HTTP_TIMEOUT = 120
cdef class LoadResult:
def __init__(self, err, data=None):
self.err = err
self.data = data
cdef class LoaderHttpClient:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
cdef LoadResult load_big_small_resource(self, str filename, str directory):
try:
response = requests.post(
f"{self.base_url}/load/{filename}",
json={"filename": filename, "folder": directory},
stream=True,
timeout=HTTP_TIMEOUT,
)
response.raise_for_status()
return LoadResult(None, response.content)
except Exception as e:
logger.error(f"LoaderHttpClient.load_big_small_resource failed: {e}")
return LoadResult(str(e))
cdef LoadResult upload_big_small_resource(self, bytes content, str filename, str directory):
try:
response = requests.post(
f"{self.base_url}/upload/{filename}",
files={"data": (filename, content)},
data={"folder": directory},
timeout=HTTP_TIMEOUT,
)
response.raise_for_status()
return LoadResult(None)
except Exception as e:
logger.error(f"LoaderHttpClient.upload_big_small_resource failed: {e}")
return LoadResult(str(e))
cpdef object fetch_user_ai_settings(self, str user_id, str bearer_token):
try:
headers = {}
if bearer_token:
headers["Authorization"] = f"Bearer {bearer_token}"
response = requests.get(
f"{self.base_url}/api/users/{user_id}/ai-settings",
headers=headers,
timeout=30,
)
if response.status_code != 200:
return None
return response.json()
except Exception as e:
logger.error(f"LoaderHttpClient.fetch_user_ai_settings failed: {e}")
return None
cpdef object fetch_media_path(self, str media_id, str bearer_token):
try:
headers = {}
if bearer_token:
headers["Authorization"] = f"Bearer {bearer_token}"
response = requests.get(
f"{self.base_url}/api/media/{media_id}",
headers=headers,
timeout=30,
)
if response.status_code != 200:
return None
data = response.json()
return data.get("path")
except Exception as e:
logger.error(f"LoaderHttpClient.fetch_media_path failed: {e}")
return None