mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 10:36:32 +00:00
d244799f02
Made-with: Cursor
214 lines
6.3 KiB
Python
214 lines
6.3 KiB
Python
import json
|
|
import os
|
|
import uuid
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
import yaml
|
|
from loguru import logger
|
|
|
|
from legacy_security_provider import LegacySecurityProvider
|
|
from security import security_decrypt_to
|
|
from security_provider import create_security_provider, should_attempt_tpm
|
|
|
|
|
|
def _compose_path():
|
|
return Path(__file__).resolve().parents[1] / "e2e" / "docker-compose.test.yml"
|
|
|
|
|
|
@pytest.fixture
|
|
def clear_security_env(monkeypatch):
|
|
monkeypatch.delenv("SECURITY_PROVIDER", raising=False)
|
|
monkeypatch.delenv("TSS2_TCTI", raising=False)
|
|
monkeypatch.delenv("TPM2TOOLS_TCTI", raising=False)
|
|
monkeypatch.delenv("TSS2_FAPICONF", raising=False)
|
|
monkeypatch.delenv("TPM2_SIM_HOST", raising=False)
|
|
monkeypatch.delenv("TPM2_SIM_PORT", raising=False)
|
|
|
|
|
|
def test_ac1_auto_detection_selects_tpm_when_tpm0_present(
|
|
monkeypatch, clear_security_env
|
|
):
|
|
# Arrange
|
|
monkeypatch.setattr(
|
|
os.path,
|
|
"exists",
|
|
lambda p: str(p) == "/dev/tpm0",
|
|
)
|
|
fake_tpm = MagicMock()
|
|
fake_tpm.kind = "tpm"
|
|
import tpm_security_provider as tsp
|
|
|
|
monkeypatch.setattr(tsp, "TpmSecurityProvider", lambda: fake_tpm)
|
|
|
|
# Act
|
|
provider = create_security_provider()
|
|
|
|
# Assert
|
|
assert provider is fake_tpm
|
|
|
|
|
|
def test_ac2_tpm_seal_unseal_roundtrip(tmp_path, monkeypatch):
|
|
# Arrange
|
|
sim_host = os.environ.get("TPM2_SIM_HOST", "")
|
|
sim_port = os.environ.get("TPM2_SIM_PORT", "2321")
|
|
fapi_conf = os.environ.get("TSS2_FAPICONF", "")
|
|
if not fapi_conf and not sim_host:
|
|
pytest.skip(
|
|
"Set TPM2_SIM_HOST or TSS2_FAPICONF for TPM simulator (e.g. Docker swtpm)"
|
|
)
|
|
if sim_host and not fapi_conf:
|
|
(tmp_path / "user").mkdir()
|
|
(tmp_path / "system" / "policy").mkdir(parents=True)
|
|
(tmp_path / "log").mkdir()
|
|
cfg = {
|
|
"profile_name": "P_ECCP256SHA256",
|
|
"profile_dir": "/etc/tpm2-tss/fapi-profiles/",
|
|
"user_dir": str(tmp_path / "user"),
|
|
"system_dir": str(tmp_path / "system"),
|
|
"tcti": f"swtpm:host={sim_host},port={sim_port}",
|
|
"ek_cert_less": "yes",
|
|
"system_pcrs": [],
|
|
"log_dir": str(tmp_path / "log"),
|
|
"firmware_log_file": "/dev/null",
|
|
"ima_log_file": "/dev/null",
|
|
}
|
|
p = tmp_path / "fapi.json"
|
|
p.write_text(json.dumps(cfg), encoding="utf-8")
|
|
monkeypatch.setenv("TSS2_FAPICONF", str(p))
|
|
|
|
from tpm_security_provider import TpmSecurityProvider
|
|
|
|
try:
|
|
provider = TpmSecurityProvider()
|
|
except Exception:
|
|
pytest.skip("TPM simulator not reachable with current FAPI config")
|
|
payload = b"azaion-loader-seal-test"
|
|
path = f"/HS/SRK/az182_{uuid.uuid4().hex}"
|
|
|
|
# Act
|
|
try:
|
|
provider.seal(path, payload)
|
|
out = provider.unseal(path)
|
|
finally:
|
|
try:
|
|
provider._fapi.delete(path)
|
|
except Exception:
|
|
pass
|
|
|
|
# Assert
|
|
assert out == payload
|
|
|
|
|
|
def test_ac3_legacy_when_no_tpm_device_or_tcti(monkeypatch, clear_security_env):
|
|
# Arrange
|
|
monkeypatch.setattr(os.path, "exists", lambda p: False)
|
|
|
|
# Act
|
|
provider = create_security_provider()
|
|
|
|
# Assert
|
|
assert provider.kind == "legacy"
|
|
blob = provider.encrypt_to(b"plain", "secret-key")
|
|
assert provider.decrypt_to(blob, "secret-key") == b"plain"
|
|
assert (
|
|
provider.decrypt_to(blob, "secret-key")
|
|
== security_decrypt_to(blob, "secret-key")
|
|
)
|
|
|
|
|
|
def test_ac4_env_legacy_overrides_tpm_device(monkeypatch, clear_security_env):
|
|
# Arrange
|
|
monkeypatch.setenv("SECURITY_PROVIDER", "legacy")
|
|
monkeypatch.setattr(
|
|
os.path,
|
|
"exists",
|
|
lambda p: str(p) in ("/dev/tpm0", "/dev/tpmrm0"),
|
|
)
|
|
|
|
# Act
|
|
provider = create_security_provider()
|
|
|
|
# Assert
|
|
assert provider.kind == "legacy"
|
|
|
|
|
|
def test_ac5_fapi_failure_falls_back_to_legacy_with_warning(
|
|
monkeypatch, clear_security_env
|
|
):
|
|
# Arrange
|
|
monkeypatch.setattr(
|
|
os.path,
|
|
"exists",
|
|
lambda p: str(p) == "/dev/tpm0",
|
|
)
|
|
import tpm_security_provider as tsp
|
|
|
|
def _boom(*_a, **_k):
|
|
raise RuntimeError("fapi init failed")
|
|
|
|
monkeypatch.setattr(tsp, "TpmSecurityProvider", _boom)
|
|
messages = []
|
|
|
|
def _capture(message):
|
|
messages.append(str(message))
|
|
|
|
hid = logger.add(_capture, level="WARNING")
|
|
|
|
# Act
|
|
try:
|
|
provider = create_security_provider()
|
|
finally:
|
|
logger.remove(hid)
|
|
|
|
# Assert
|
|
assert provider.kind == "legacy"
|
|
assert any("TPM security provider failed" in m for m in messages)
|
|
|
|
|
|
def test_ac6_compose_declares_tpm_device_mounts_and_swtpm():
|
|
# Arrange
|
|
raw = _compose_path().read_text(encoding="utf-8")
|
|
data = yaml.safe_load(raw)
|
|
|
|
# Assert
|
|
jetson = data["x-tpm-device-mounts-for-jetson"]
|
|
assert "/dev/tpm0" in jetson["devices"]
|
|
assert "/dev/tpmrm0" in jetson["devices"]
|
|
assert "swtpm" in data["services"]
|
|
sut_env = data["services"]["system-under-test"]["environment"]
|
|
assert "TSS2_FAPICONF" in sut_env
|
|
sut_vols = data["services"]["system-under-test"]["volumes"]
|
|
assert any("fapi-config" in str(v) for v in sut_vols)
|
|
fapi_file = Path(__file__).resolve().parents[1] / "e2e" / "fapi-config.swtpm.json"
|
|
assert "swtpm:" in fapi_file.read_text(encoding="utf-8")
|
|
|
|
|
|
def test_should_attempt_tpm_respects_device_and_tcti(monkeypatch, clear_security_env):
|
|
# Arrange / Act / Assert
|
|
monkeypatch.setattr(os.path, "exists", lambda p: False)
|
|
assert should_attempt_tpm(os.environ, os.path.exists) is False
|
|
monkeypatch.setenv("TSS2_TCTI", "mssim:host=127.0.0.1,port=2321")
|
|
assert should_attempt_tpm(os.environ, os.path.exists) is True
|
|
monkeypatch.delenv("TSS2_TCTI", raising=False)
|
|
monkeypatch.setenv("TSS2_FAPICONF", "/etc/tpm2-tss/fapi-config.json")
|
|
assert should_attempt_tpm(os.environ, os.path.exists) is True
|
|
monkeypatch.delenv("TSS2_FAPICONF", raising=False)
|
|
monkeypatch.setattr(os.path, "exists", lambda p: str(p) == "/dev/tpmrm0")
|
|
assert should_attempt_tpm(os.environ, os.path.exists) is True
|
|
|
|
|
|
def test_legacy_provider_matches_security_module_helpers():
|
|
# Arrange
|
|
leg = LegacySecurityProvider()
|
|
data = b"x" * 500
|
|
key = "k"
|
|
|
|
# Act
|
|
enc = leg.encrypt_to(data, key)
|
|
|
|
# Assert
|
|
assert security_decrypt_to(enc, key) == data
|
|
assert leg.decrypt_to(enc, key) == data
|