[AZ-182][AZ-184][AZ-187] Batch 1

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-04-15 07:23:47 +03:00
parent 765d3d32c1
commit d244799f02
22 changed files with 1622 additions and 16 deletions
+224
View File
@@ -0,0 +1,224 @@
import json
import subprocess
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from urllib.parse import urlparse
import pytest
import requests
REPO_ROOT = Path(__file__).resolve().parents[1]
PROVISION_SCRIPT = REPO_ROOT / "scripts" / "provision_device.sh"
class _ProvisionTestState:
lock = threading.Lock()
users: dict[str, dict] = {}
def _read_json_body(handler: BaseHTTPRequestHandler) -> dict:
length = int(handler.headers.get("Content-Length", "0"))
raw = handler.rfile.read(length) if length else b"{}"
return json.loads(raw.decode("utf-8"))
def _send_json(handler: BaseHTTPRequestHandler, code: int, payload: dict | None = None):
body = b""
if payload is not None:
body = json.dumps(payload).encode("utf-8")
handler.send_response(code)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
if body:
handler.wfile.write(body)
class _AdminMockHandler(BaseHTTPRequestHandler):
def log_message(self, _format, *_args):
return
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path != "/users":
self.send_error(404)
return
body = _read_json_body(self)
email = body.get("email", "")
password = body.get("password", "")
role = body.get("role", "")
with _ProvisionTestState.lock:
if email in _ProvisionTestState.users:
_send_json(self, 409, {"detail": "exists"})
return
_ProvisionTestState.users[email] = {"password": password, "role": role}
_send_json(self, 201, {"email": email, "role": role})
def do_PATCH(self):
parsed = urlparse(self.path)
if parsed.path != "/users/password":
self.send_error(404)
return
body = _read_json_body(self)
email = body.get("email", "")
password = body.get("password", "")
with _ProvisionTestState.lock:
if email not in _ProvisionTestState.users:
self.send_error(404)
return
_ProvisionTestState.users[email]["password"] = password
_send_json(self, 200, {"status": "ok"})
def handle_login_post(self):
body = _read_json_body(self)
email = body.get("email", "")
password = body.get("password", "")
with _ProvisionTestState.lock:
row = _ProvisionTestState.users.get(email)
if not row or row["password"] != password or row["role"] != "CompanionPC":
_send_json(self, 401, {"detail": "invalid"})
return
_send_json(self, 200, {"token": "provision-test-jwt"})
def _handler_factory():
class H(_AdminMockHandler):
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == "/login":
self.handle_login_post()
return
super().do_POST()
return H
@pytest.fixture
def mock_admin_server():
# Arrange
with _ProvisionTestState.lock:
_ProvisionTestState.users.clear()
server = HTTPServer(("127.0.0.1", 0), _handler_factory())
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
host, port = server.server_address
base = f"http://{host}:{port}"
yield base
server.shutdown()
server.server_close()
thread.join(timeout=5)
def _run_provision(serial: str, api_url: str, rootfs: Path) -> subprocess.CompletedProcess:
return subprocess.run(
[str(PROVISION_SCRIPT), "--serial", serial, "--api-url", api_url, "--rootfs-dir", str(rootfs)],
capture_output=True,
text=True,
check=False,
)
def _parse_device_conf(path: Path) -> dict[str, str]:
out: dict[str, str] = {}
for line in path.read_text(encoding="utf-8").splitlines():
if "=" not in line:
continue
key, _, val = line.partition("=")
out[key.strip()] = val.strip()
return out
def test_provision_creates_companionpc_user(mock_admin_server, tmp_path):
# Arrange
rootfs = tmp_path / "rootfs"
serial = "AZJN-0042"
expected_email = "azaion-jetson-0042@azaion.com"
# Act
result = _run_provision(serial, mock_admin_server, rootfs)
# Assert
assert result.returncode == 0, result.stderr + result.stdout
with _ProvisionTestState.lock:
row = _ProvisionTestState.users.get(expected_email)
assert row is not None
assert row["role"] == "CompanionPC"
assert len(row["password"]) == 32
def test_provision_writes_device_conf(mock_admin_server, tmp_path):
# Arrange
rootfs = tmp_path / "rootfs"
serial = "AZJN-0042"
conf_path = rootfs / "etc" / "azaion" / "device.conf"
# Act
result = _run_provision(serial, mock_admin_server, rootfs)
# Assert
assert result.returncode == 0, result.stderr + result.stdout
assert conf_path.is_file()
data = _parse_device_conf(conf_path)
assert data["AZAION_DEVICE_EMAIL"] == "azaion-jetson-0042@azaion.com"
assert len(data["AZAION_DEVICE_PASSWORD"]) == 32
assert data["AZAION_DEVICE_PASSWORD"].isalnum()
def test_credentials_allow_login_after_provision(mock_admin_server, tmp_path):
# Arrange
rootfs = tmp_path / "rootfs"
serial = "AZJN-0042"
conf_path = rootfs / "etc" / "azaion" / "device.conf"
# Act
prov = _run_provision(serial, mock_admin_server, rootfs)
assert prov.returncode == 0, prov.stderr + prov.stdout
creds = _parse_device_conf(conf_path)
login_resp = requests.post(
f"{mock_admin_server}/login",
json={"email": creds["AZAION_DEVICE_EMAIL"], "password": creds["AZAION_DEVICE_PASSWORD"]},
timeout=5,
)
# Assert
assert login_resp.status_code == 200
assert login_resp.json().get("token") == "provision-test-jwt"
def test_provision_idempotent_no_duplicate_user(mock_admin_server, tmp_path):
# Arrange
rootfs = tmp_path / "rootfs"
serial = "AZJN-0042"
expected_email = "azaion-jetson-0042@azaion.com"
# Act
first = _run_provision(serial, mock_admin_server, rootfs)
second = _run_provision(serial, mock_admin_server, rootfs)
# Assert
assert first.returncode == 0, first.stderr + first.stdout
assert second.returncode == 0, second.stderr + second.stdout
with _ProvisionTestState.lock:
assert expected_email in _ProvisionTestState.users
assert len(_ProvisionTestState.users) == 1
def test_runbook_documents_end_to_end_flow():
# Arrange
runbook = REPO_ROOT / "_docs" / "02_document" / "deployment" / "provisioning_runbook.md"
text = runbook.read_text(encoding="utf-8")
# Act
markers = [
"prerequisites" in text.lower(),
"provision_device.sh" in text,
"device.conf" in text,
"POST" in text and "/users" in text,
"flash" in text.lower(),
"login" in text.lower(),
]
# Assert
assert runbook.is_file()
assert all(markers)