mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 06:56:31 +00:00
[AZ-187] Rules & cleanup
Made-with: Cursor
This commit is contained in:
@@ -1,224 +0,0 @@
|
||||
import json
|
||||
import subprocess
|
||||
import threading
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
PROVISION_SCRIPT = REPO_ROOT / "scripts" / "provision_device.sh"
|
||||
|
||||
|
||||
class _ProvisionTestState:
|
||||
lock = threading.Lock()
|
||||
users: dict[str, dict] = {}
|
||||
|
||||
|
||||
def _read_json_body(handler: BaseHTTPRequestHandler) -> dict:
|
||||
length = int(handler.headers.get("Content-Length", "0"))
|
||||
raw = handler.rfile.read(length) if length else b"{}"
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
|
||||
|
||||
def _send_json(handler: BaseHTTPRequestHandler, code: int, payload: dict | None = None):
|
||||
body = b""
|
||||
if payload is not None:
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
handler.send_response(code)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", str(len(body)))
|
||||
handler.end_headers()
|
||||
if body:
|
||||
handler.wfile.write(body)
|
||||
|
||||
|
||||
class _AdminMockHandler(BaseHTTPRequestHandler):
|
||||
def log_message(self, _format, *_args):
|
||||
return
|
||||
|
||||
def do_POST(self):
|
||||
parsed = urlparse(self.path)
|
||||
if parsed.path != "/users":
|
||||
self.send_error(404)
|
||||
return
|
||||
body = _read_json_body(self)
|
||||
email = body.get("email", "")
|
||||
password = body.get("password", "")
|
||||
role = body.get("role", "")
|
||||
with _ProvisionTestState.lock:
|
||||
if email in _ProvisionTestState.users:
|
||||
_send_json(self, 409, {"detail": "exists"})
|
||||
return
|
||||
_ProvisionTestState.users[email] = {"password": password, "role": role}
|
||||
_send_json(self, 201, {"email": email, "role": role})
|
||||
|
||||
def do_PATCH(self):
|
||||
parsed = urlparse(self.path)
|
||||
if parsed.path != "/users/password":
|
||||
self.send_error(404)
|
||||
return
|
||||
body = _read_json_body(self)
|
||||
email = body.get("email", "")
|
||||
password = body.get("password", "")
|
||||
with _ProvisionTestState.lock:
|
||||
if email not in _ProvisionTestState.users:
|
||||
self.send_error(404)
|
||||
return
|
||||
_ProvisionTestState.users[email]["password"] = password
|
||||
_send_json(self, 200, {"status": "ok"})
|
||||
|
||||
def handle_login_post(self):
|
||||
body = _read_json_body(self)
|
||||
email = body.get("email", "")
|
||||
password = body.get("password", "")
|
||||
with _ProvisionTestState.lock:
|
||||
row = _ProvisionTestState.users.get(email)
|
||||
if not row or row["password"] != password or row["role"] != "CompanionPC":
|
||||
_send_json(self, 401, {"detail": "invalid"})
|
||||
return
|
||||
_send_json(self, 200, {"token": "provision-test-jwt"})
|
||||
|
||||
|
||||
def _handler_factory():
|
||||
class H(_AdminMockHandler):
|
||||
def do_POST(self):
|
||||
parsed = urlparse(self.path)
|
||||
if parsed.path == "/login":
|
||||
self.handle_login_post()
|
||||
return
|
||||
super().do_POST()
|
||||
|
||||
return H
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_admin_server():
|
||||
# Arrange
|
||||
with _ProvisionTestState.lock:
|
||||
_ProvisionTestState.users.clear()
|
||||
server = HTTPServer(("127.0.0.1", 0), _handler_factory())
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
host, port = server.server_address
|
||||
base = f"http://{host}:{port}"
|
||||
yield base
|
||||
server.shutdown()
|
||||
server.server_close()
|
||||
thread.join(timeout=5)
|
||||
|
||||
|
||||
def _run_provision(serial: str, api_url: str, rootfs: Path) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
[str(PROVISION_SCRIPT), "--serial", serial, "--api-url", api_url, "--rootfs-dir", str(rootfs)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
|
||||
def _parse_device_conf(path: Path) -> dict[str, str]:
|
||||
out: dict[str, str] = {}
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
if "=" not in line:
|
||||
continue
|
||||
key, _, val = line.partition("=")
|
||||
out[key.strip()] = val.strip()
|
||||
return out
|
||||
|
||||
|
||||
def test_provision_creates_companionpc_user(mock_admin_server, tmp_path):
|
||||
# Arrange
|
||||
rootfs = tmp_path / "rootfs"
|
||||
serial = "AZJN-0042"
|
||||
expected_email = "azaion-jetson-0042@azaion.com"
|
||||
|
||||
# Act
|
||||
result = _run_provision(serial, mock_admin_server, rootfs)
|
||||
|
||||
# Assert
|
||||
assert result.returncode == 0, result.stderr + result.stdout
|
||||
with _ProvisionTestState.lock:
|
||||
row = _ProvisionTestState.users.get(expected_email)
|
||||
assert row is not None
|
||||
assert row["role"] == "CompanionPC"
|
||||
assert len(row["password"]) == 32
|
||||
|
||||
|
||||
def test_provision_writes_device_conf(mock_admin_server, tmp_path):
|
||||
# Arrange
|
||||
rootfs = tmp_path / "rootfs"
|
||||
serial = "AZJN-0042"
|
||||
conf_path = rootfs / "etc" / "azaion" / "device.conf"
|
||||
|
||||
# Act
|
||||
result = _run_provision(serial, mock_admin_server, rootfs)
|
||||
|
||||
# Assert
|
||||
assert result.returncode == 0, result.stderr + result.stdout
|
||||
assert conf_path.is_file()
|
||||
data = _parse_device_conf(conf_path)
|
||||
assert data["AZAION_DEVICE_EMAIL"] == "azaion-jetson-0042@azaion.com"
|
||||
assert len(data["AZAION_DEVICE_PASSWORD"]) == 32
|
||||
assert data["AZAION_DEVICE_PASSWORD"].isalnum()
|
||||
|
||||
|
||||
def test_credentials_allow_login_after_provision(mock_admin_server, tmp_path):
|
||||
# Arrange
|
||||
rootfs = tmp_path / "rootfs"
|
||||
serial = "AZJN-0042"
|
||||
conf_path = rootfs / "etc" / "azaion" / "device.conf"
|
||||
|
||||
# Act
|
||||
prov = _run_provision(serial, mock_admin_server, rootfs)
|
||||
assert prov.returncode == 0, prov.stderr + prov.stdout
|
||||
creds = _parse_device_conf(conf_path)
|
||||
login_resp = requests.post(
|
||||
f"{mock_admin_server}/login",
|
||||
json={"email": creds["AZAION_DEVICE_EMAIL"], "password": creds["AZAION_DEVICE_PASSWORD"]},
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert login_resp.status_code == 200
|
||||
assert login_resp.json().get("token") == "provision-test-jwt"
|
||||
|
||||
|
||||
def test_provision_idempotent_no_duplicate_user(mock_admin_server, tmp_path):
|
||||
# Arrange
|
||||
rootfs = tmp_path / "rootfs"
|
||||
serial = "AZJN-0042"
|
||||
expected_email = "azaion-jetson-0042@azaion.com"
|
||||
|
||||
# Act
|
||||
first = _run_provision(serial, mock_admin_server, rootfs)
|
||||
second = _run_provision(serial, mock_admin_server, rootfs)
|
||||
|
||||
# Assert
|
||||
assert first.returncode == 0, first.stderr + first.stdout
|
||||
assert second.returncode == 0, second.stderr + second.stdout
|
||||
with _ProvisionTestState.lock:
|
||||
assert expected_email in _ProvisionTestState.users
|
||||
assert len(_ProvisionTestState.users) == 1
|
||||
|
||||
|
||||
def test_runbook_documents_end_to_end_flow():
|
||||
# Arrange
|
||||
runbook = REPO_ROOT / "_docs" / "02_document" / "deployment" / "provisioning_runbook.md"
|
||||
text = runbook.read_text(encoding="utf-8")
|
||||
|
||||
# Act
|
||||
markers = [
|
||||
"prerequisites" in text.lower(),
|
||||
"provision_device.sh" in text,
|
||||
"device.conf" in text,
|
||||
"POST" in text and "/users" in text,
|
||||
"flash" in text.lower(),
|
||||
"login" in text.lower(),
|
||||
]
|
||||
|
||||
# Assert
|
||||
assert runbook.is_file()
|
||||
assert all(markers)
|
||||
Reference in New Issue
Block a user