import json import subprocess import threading from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from urllib.parse import urlparse import pytest import requests REPO_ROOT = Path(__file__).resolve().parents[1] PROVISION_SCRIPT = REPO_ROOT / "scripts" / "provision_device.sh" class _ProvisionTestState: lock = threading.Lock() users: dict[str, dict] = {} def _read_json_body(handler: BaseHTTPRequestHandler) -> dict: length = int(handler.headers.get("Content-Length", "0")) raw = handler.rfile.read(length) if length else b"{}" return json.loads(raw.decode("utf-8")) def _send_json(handler: BaseHTTPRequestHandler, code: int, payload: dict | None = None): body = b"" if payload is not None: body = json.dumps(payload).encode("utf-8") handler.send_response(code) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() if body: handler.wfile.write(body) class _AdminMockHandler(BaseHTTPRequestHandler): def log_message(self, _format, *_args): return def do_POST(self): parsed = urlparse(self.path) if parsed.path != "/users": self.send_error(404) return body = _read_json_body(self) email = body.get("email", "") password = body.get("password", "") role = body.get("role", "") with _ProvisionTestState.lock: if email in _ProvisionTestState.users: _send_json(self, 409, {"detail": "exists"}) return _ProvisionTestState.users[email] = {"password": password, "role": role} _send_json(self, 201, {"email": email, "role": role}) def do_PATCH(self): parsed = urlparse(self.path) if parsed.path != "/users/password": self.send_error(404) return body = _read_json_body(self) email = body.get("email", "") password = body.get("password", "") with _ProvisionTestState.lock: if email not in _ProvisionTestState.users: self.send_error(404) return _ProvisionTestState.users[email]["password"] = password _send_json(self, 200, {"status": "ok"}) def handle_login_post(self): body = _read_json_body(self) email = body.get("email", "") password = body.get("password", "") with _ProvisionTestState.lock: row = _ProvisionTestState.users.get(email) if not row or row["password"] != password or row["role"] != "CompanionPC": _send_json(self, 401, {"detail": "invalid"}) return _send_json(self, 200, {"token": "provision-test-jwt"}) def _handler_factory(): class H(_AdminMockHandler): def do_POST(self): parsed = urlparse(self.path) if parsed.path == "/login": self.handle_login_post() return super().do_POST() return H @pytest.fixture def mock_admin_server(): # Arrange with _ProvisionTestState.lock: _ProvisionTestState.users.clear() server = HTTPServer(("127.0.0.1", 0), _handler_factory()) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() host, port = server.server_address base = f"http://{host}:{port}" yield base server.shutdown() server.server_close() thread.join(timeout=5) def _run_provision(serial: str, api_url: str, rootfs: Path) -> subprocess.CompletedProcess: return subprocess.run( [str(PROVISION_SCRIPT), "--serial", serial, "--api-url", api_url, "--rootfs-dir", str(rootfs)], capture_output=True, text=True, check=False, ) def _parse_device_conf(path: Path) -> dict[str, str]: out: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): if "=" not in line: continue key, _, val = line.partition("=") out[key.strip()] = val.strip() return out def test_provision_creates_companionpc_user(mock_admin_server, tmp_path): # Arrange rootfs = tmp_path / "rootfs" serial = "AZJN-0042" expected_email = "azaion-jetson-0042@azaion.com" # Act result = _run_provision(serial, mock_admin_server, rootfs) # Assert assert result.returncode == 0, result.stderr + result.stdout with _ProvisionTestState.lock: row = _ProvisionTestState.users.get(expected_email) assert row is not None assert row["role"] == "CompanionPC" assert len(row["password"]) == 32 def test_provision_writes_device_conf(mock_admin_server, tmp_path): # Arrange rootfs = tmp_path / "rootfs" serial = "AZJN-0042" conf_path = rootfs / "etc" / "azaion" / "device.conf" # Act result = _run_provision(serial, mock_admin_server, rootfs) # Assert assert result.returncode == 0, result.stderr + result.stdout assert conf_path.is_file() data = _parse_device_conf(conf_path) assert data["AZAION_DEVICE_EMAIL"] == "azaion-jetson-0042@azaion.com" assert len(data["AZAION_DEVICE_PASSWORD"]) == 32 assert data["AZAION_DEVICE_PASSWORD"].isalnum() def test_credentials_allow_login_after_provision(mock_admin_server, tmp_path): # Arrange rootfs = tmp_path / "rootfs" serial = "AZJN-0042" conf_path = rootfs / "etc" / "azaion" / "device.conf" # Act prov = _run_provision(serial, mock_admin_server, rootfs) assert prov.returncode == 0, prov.stderr + prov.stdout creds = _parse_device_conf(conf_path) login_resp = requests.post( f"{mock_admin_server}/login", json={"email": creds["AZAION_DEVICE_EMAIL"], "password": creds["AZAION_DEVICE_PASSWORD"]}, timeout=5, ) # Assert assert login_resp.status_code == 200 assert login_resp.json().get("token") == "provision-test-jwt" def test_provision_idempotent_no_duplicate_user(mock_admin_server, tmp_path): # Arrange rootfs = tmp_path / "rootfs" serial = "AZJN-0042" expected_email = "azaion-jetson-0042@azaion.com" # Act first = _run_provision(serial, mock_admin_server, rootfs) second = _run_provision(serial, mock_admin_server, rootfs) # Assert assert first.returncode == 0, first.stderr + first.stdout assert second.returncode == 0, second.stderr + second.stdout with _ProvisionTestState.lock: assert expected_email in _ProvisionTestState.users assert len(_ProvisionTestState.users) == 1 def test_runbook_documents_end_to_end_flow(): # Arrange runbook = REPO_ROOT / "_docs" / "02_document" / "deployment" / "provisioning_runbook.md" text = runbook.read_text(encoding="utf-8") # Act markers = [ "prerequisites" in text.lower(), "provision_device.sh" in text, "device.conf" in text, "POST" in text and "/users" in text, "flash" in text.lower(), "login" in text.lower(), ] # Assert assert runbook.is_file() assert all(markers)