Files
ai-training/src/api_client.py
T
Oleksandr Bezdieniezhnykh c20018745b Add core functionality for API client, CDN management, and data augmentation
- Introduced `ApiClient` for handling API interactions, including file uploads and downloads.
- Implemented `CDNManager` for managing CDN operations with AWS S3.
- Added `Augmentator` class for image augmentation, including bounding box corrections and transformations.
- Created utility functions for annotation conversion and dataset visualization.
- Established a new rules file for sound notifications during human input requests.

These additions enhance the system's capabilities for data handling and user interaction, laying the groundwork for future features.

Simplify autopilot state file to minimal current-step pointer; add execution safety rule to cursor-meta; remove Completed Steps/Key Decisions/Retry Log/Blockers from state template and all references.
2026-03-28 00:12:54 +02:00

128 lines
4.7 KiB
Python

import io
import json
import os
from http import HTTPStatus
from os import path
import requests
import yaml
import constants
from cdn_manager import CDNCredentials, CDNManager
from hardware_service import get_hardware_info
from security import Security
class ApiCredentials:
def __init__(self, url, email, password):
self.url = url
self.email = email
self.password = password
class ApiClient:
def __init__(self):
self.token = None
with open(constants.CONFIG_FILE, "r") as f:
config_dict = yaml.safe_load(f)
api_c = config_dict["api"]
self.credentials = ApiCredentials(api_c["url"],
api_c["email"],
api_c["password"])
yaml_bytes = self.load_bytes(constants.CDN_CONFIG, '')
data = yaml.safe_load(io.BytesIO(yaml_bytes))
creds = CDNCredentials(data["host"],
data["downloader_access_key"],
data["downloader_access_secret"],
data["uploader_access_key"],
data["uploader_access_secret"])
self.cdn_manager = CDNManager(creds)
def login(self):
response = requests.post(f'{self.credentials.url}/login',
json={"email": self.credentials.email, "password": self.credentials.password})
response.raise_for_status()
token = response.json()["token"]
self.token = token
def upload_file(self, filename: str, file_bytes: bytearray, folder):
if self.token is None:
self.login()
url = f"{self.credentials.url}/resources/{folder}"
headers = {"Authorization": f"Bearer {self.token}"}
files = {'data': (filename, io.BytesIO(file_bytes))}
try:
r = requests.post(url, headers=headers, files=files, allow_redirects=True)
r.raise_for_status()
print(f"Upload {len(file_bytes)} bytes ({filename}) to {self.credentials.url}. Result: {r.status_code}")
except Exception as e:
print(f"Upload fail: {e}")
def load_bytes(self, filename, folder):
hardware_str = get_hardware_info()
if self.token is None:
self.login()
url = f"{self.credentials.url}/resources/get/{folder}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
payload = json.dumps(
{
"password": self.credentials.password,
"hardware": hardware_str,
"fileName": filename
}, indent=4)
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.UNAUTHORIZED or response.status_code == HTTPStatus.FORBIDDEN:
self.login()
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.post(url, data=payload, headers=headers, stream=True)
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
print('500!')
hw_hash = Security.get_hw_hash(hardware_str)
key = Security.get_api_encryption_key(self.credentials, hw_hash)
resp_bytes = response.raw.read()
data = Security.decrypt_to(resp_bytes, key)
print(f'Downloaded file: {filename}, {len(data)} bytes')
return data
def load_big_small_resource(self, resource_name, folder, key):
big_part = path.join(folder, f'{resource_name}.big')
small_part = f'{resource_name}.small'
with open(big_part, 'rb') as binary_file:
encrypted_bytes_big = binary_file.read()
encrypted_bytes_small = self.load_bytes(small_part, folder)
encrypted_bytes = encrypted_bytes_small + encrypted_bytes_big
result = Security.decrypt_to(encrypted_bytes, key)
return result
def upload_big_small_resource(self, resource, resource_name, folder, key):
big_part_name = f'{resource_name}.big'
small_part_name = f'{resource_name}.small'
resource_encrypted = Security.encrypt_to(resource, key)
part_small_size = min(constants.SMALL_SIZE_KB * 1024, int(0.2 * len(resource_encrypted)))
part_small = resource_encrypted[:part_small_size] # slice bytes for part1
part_big = resource_encrypted[part_small_size:]
self.cdn_manager.upload(constants.MODELS_FOLDER, big_part_name, part_big)
os.makedirs(folder, exist_ok=True)
with open(path.join(folder, big_part_name), 'wb') as f:
f.write(part_big)
self.upload_file(small_part_name, part_small, constants.MODELS_FOLDER)