import json import os import uuid from pathlib import Path from unittest.mock import MagicMock import pytest import yaml from loguru import logger from legacy_security_provider import LegacySecurityProvider from security import security_decrypt_to from security_provider import create_security_provider, should_attempt_tpm def _compose_path(): return Path(__file__).resolve().parents[1] / "e2e" / "docker-compose.test.yml" @pytest.fixture def clear_security_env(monkeypatch): monkeypatch.delenv("SECURITY_PROVIDER", raising=False) monkeypatch.delenv("TSS2_TCTI", raising=False) monkeypatch.delenv("TPM2TOOLS_TCTI", raising=False) monkeypatch.delenv("TSS2_FAPICONF", raising=False) monkeypatch.delenv("TPM2_SIM_HOST", raising=False) monkeypatch.delenv("TPM2_SIM_PORT", raising=False) def test_ac1_auto_detection_selects_tpm_when_tpm0_present( monkeypatch, clear_security_env ): # Arrange monkeypatch.setattr( os.path, "exists", lambda p: str(p) == "/dev/tpm0", ) fake_tpm = MagicMock() fake_tpm.kind = "tpm" import tpm_security_provider as tsp monkeypatch.setattr(tsp, "TpmSecurityProvider", lambda: fake_tpm) # Act provider = create_security_provider() # Assert assert provider is fake_tpm def test_ac2_tpm_seal_unseal_roundtrip(tmp_path, monkeypatch): # Arrange sim_host = os.environ.get("TPM2_SIM_HOST", "") sim_port = os.environ.get("TPM2_SIM_PORT", "2321") fapi_conf = os.environ.get("TSS2_FAPICONF", "") if not fapi_conf and not sim_host: pytest.skip( "Set TPM2_SIM_HOST or TSS2_FAPICONF for TPM simulator (e.g. Docker swtpm)" ) if sim_host and not fapi_conf: (tmp_path / "user").mkdir() (tmp_path / "system" / "policy").mkdir(parents=True) (tmp_path / "log").mkdir() cfg = { "profile_name": "P_ECCP256SHA256", "profile_dir": "/etc/tpm2-tss/fapi-profiles/", "user_dir": str(tmp_path / "user"), "system_dir": str(tmp_path / "system"), "tcti": f"swtpm:host={sim_host},port={sim_port}", "ek_cert_less": "yes", "system_pcrs": [], "log_dir": str(tmp_path / "log"), "firmware_log_file": "/dev/null", "ima_log_file": "/dev/null", } p = tmp_path / "fapi.json" p.write_text(json.dumps(cfg), encoding="utf-8") monkeypatch.setenv("TSS2_FAPICONF", str(p)) from tpm_security_provider import TpmSecurityProvider try: provider = TpmSecurityProvider() except Exception: pytest.skip("TPM simulator not reachable with current FAPI config") payload = b"azaion-loader-seal-test" path = f"/HS/SRK/az182_{uuid.uuid4().hex}" # Act try: provider.seal(path, payload) out = provider.unseal(path) finally: try: provider._fapi.delete(path) except Exception: pass # Assert assert out == payload def test_ac3_legacy_when_no_tpm_device_or_tcti(monkeypatch, clear_security_env): # Arrange monkeypatch.setattr(os.path, "exists", lambda p: False) # Act provider = create_security_provider() # Assert assert provider.kind == "legacy" blob = provider.encrypt_to(b"plain", "secret-key") assert provider.decrypt_to(blob, "secret-key") == b"plain" assert ( provider.decrypt_to(blob, "secret-key") == security_decrypt_to(blob, "secret-key") ) def test_ac4_env_legacy_overrides_tpm_device(monkeypatch, clear_security_env): # Arrange monkeypatch.setenv("SECURITY_PROVIDER", "legacy") monkeypatch.setattr( os.path, "exists", lambda p: str(p) in ("/dev/tpm0", "/dev/tpmrm0"), ) # Act provider = create_security_provider() # Assert assert provider.kind == "legacy" def test_ac5_fapi_failure_falls_back_to_legacy_with_warning( monkeypatch, clear_security_env ): # Arrange monkeypatch.setattr( os.path, "exists", lambda p: str(p) == "/dev/tpm0", ) import tpm_security_provider as tsp def _boom(*_a, **_k): raise RuntimeError("fapi init failed") monkeypatch.setattr(tsp, "TpmSecurityProvider", _boom) messages = [] def _capture(message): messages.append(str(message)) hid = logger.add(_capture, level="WARNING") # Act try: provider = create_security_provider() finally: logger.remove(hid) # Assert assert provider.kind == "legacy" assert any("TPM security provider failed" in m for m in messages) def test_ac6_compose_declares_tpm_device_mounts_and_swtpm(): # Arrange raw = _compose_path().read_text(encoding="utf-8") data = yaml.safe_load(raw) # Assert jetson = data["x-tpm-device-mounts-for-jetson"] assert "/dev/tpm0" in jetson["devices"] assert "/dev/tpmrm0" in jetson["devices"] assert "swtpm" in data["services"] sut_env = data["services"]["system-under-test"]["environment"] assert "TSS2_FAPICONF" in sut_env sut_vols = data["services"]["system-under-test"]["volumes"] assert any("fapi-config" in str(v) for v in sut_vols) fapi_file = Path(__file__).resolve().parents[1] / "e2e" / "fapi-config.swtpm.json" assert "swtpm:" in fapi_file.read_text(encoding="utf-8") def test_should_attempt_tpm_respects_device_and_tcti(monkeypatch, clear_security_env): # Arrange / Act / Assert monkeypatch.setattr(os.path, "exists", lambda p: False) assert should_attempt_tpm(os.environ, os.path.exists) is False monkeypatch.setenv("TSS2_TCTI", "mssim:host=127.0.0.1,port=2321") assert should_attempt_tpm(os.environ, os.path.exists) is True monkeypatch.delenv("TSS2_TCTI", raising=False) monkeypatch.setenv("TSS2_FAPICONF", "/etc/tpm2-tss/fapi-config.json") assert should_attempt_tpm(os.environ, os.path.exists) is True monkeypatch.delenv("TSS2_FAPICONF", raising=False) monkeypatch.setattr(os.path, "exists", lambda p: str(p) == "/dev/tpmrm0") assert should_attempt_tpm(os.environ, os.path.exists) is True def test_legacy_provider_matches_security_module_helpers(): # Arrange leg = LegacySecurityProvider() data = b"x" * 500 key = "k" # Act enc = leg.encrypt_to(data, key) # Assert assert security_decrypt_to(enc, key) == data assert leg.decrypt_to(enc, key) == data