mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 06:56:31 +00:00
9a0248af72
Made-with: Cursor
332 lines
10 KiB
Python
332 lines
10 KiB
Python
import gzip
|
|
import importlib.util
|
|
import io
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import yaml
|
|
|
|
from download_manager import decrypt_cbc_file
|
|
|
|
_ROOT = Path(__file__).resolve().parents[1]
|
|
_SCRIPT = _ROOT / "scripts" / "publish_artifact.py"
|
|
_WOODPECKER = _ROOT / ".woodpecker" / "build-arm.yml"
|
|
|
|
|
|
def _load_publish():
|
|
spec = importlib.util.spec_from_file_location("publish_artifact", _SCRIPT)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
def _s3_client_factory(storage):
|
|
def client(service_name, **kwargs):
|
|
if service_name != "s3":
|
|
raise AssertionError(service_name)
|
|
m = MagicMock()
|
|
|
|
def upload_fileobj(body, bucket, key):
|
|
storage.setdefault(bucket, {})[key] = body.read()
|
|
|
|
m.upload_fileobj.side_effect = upload_fileobj
|
|
|
|
def get_object(Bucket=None, Key=None):
|
|
return {"Body": io.BytesIO(storage[Bucket][Key])}
|
|
|
|
m.get_object.side_effect = get_object
|
|
return m
|
|
|
|
return client
|
|
|
|
|
|
class TestPublishArtifact(unittest.TestCase):
|
|
def setUp(self):
|
|
self._env_patch = None
|
|
|
|
def tearDown(self):
|
|
if self._env_patch:
|
|
self._env_patch.stop()
|
|
|
|
def _base_env(self):
|
|
return {
|
|
"S3_ENDPOINT": "https://s3.example.test",
|
|
"S3_ACCESS_KEY": "ak",
|
|
"S3_SECRET_KEY": "sk",
|
|
"S3_BUCKET": "test-bucket",
|
|
"ADMIN_API_URL": "https://admin.example.test",
|
|
"ADMIN_API_TOKEN": "token",
|
|
}
|
|
|
|
def test_ac1_end_to_end_publish(self):
|
|
# Arrange
|
|
mod = _load_publish()
|
|
env = self._base_env()
|
|
self._env_patch = patch.dict(os.environ, env, clear=False)
|
|
self._env_patch.start()
|
|
captured = {}
|
|
storage = {}
|
|
|
|
def fake_post(url, headers=None, json=None, timeout=None):
|
|
class R:
|
|
status_code = 200
|
|
|
|
def raise_for_status(self):
|
|
pass
|
|
|
|
captured["url"] = url
|
|
captured["body"] = json
|
|
return R()
|
|
|
|
fd, src = tempfile.mkstemp()
|
|
os.close(fd)
|
|
try:
|
|
with open(src, "wb") as f:
|
|
f.write(b"artifact-bytes")
|
|
with patch.object(
|
|
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
|
), patch.object(mod.requests, "post", side_effect=fake_post):
|
|
# Act
|
|
out = mod.publish(
|
|
src,
|
|
"loader",
|
|
"dev",
|
|
"arm64",
|
|
"v1",
|
|
)
|
|
# Assert
|
|
self.assertEqual(
|
|
out["object_key"],
|
|
"dev/loader-arm64-v1.enc",
|
|
)
|
|
key = out["object_key"]
|
|
body = storage["test-bucket"][key]
|
|
h = __import__("hashlib").sha256(body).hexdigest().lower()
|
|
self.assertEqual(h, out["sha256"])
|
|
self.assertEqual(captured["body"]["sha256"], out["sha256"])
|
|
self.assertEqual(captured["body"]["size_bytes"], len(body))
|
|
self.assertEqual(captured["body"]["encryption_key"], out["encryption_key_hex"])
|
|
self.assertEqual(captured["body"]["cdn_url"], out["cdn_url"])
|
|
finally:
|
|
os.unlink(src)
|
|
|
|
def test_ac2_woodpecker_publish_step_after_build(self):
|
|
# Arrange
|
|
raw = _WOODPECKER.read_text(encoding="utf-8")
|
|
# Act
|
|
doc = yaml.safe_load(raw)
|
|
names = [s["name"] for s in doc["steps"]]
|
|
# Assert
|
|
self.assertIn("build-push", names)
|
|
self.assertIn("publish-artifact", names)
|
|
self.assertLess(names.index("build-push"), names.index("publish-artifact"))
|
|
build_cmds = "\n".join(doc["steps"][names.index("build-push")]["commands"])
|
|
self.assertIn("docker save", build_cmds)
|
|
pub_cmds = "\n".join(doc["steps"][names.index("publish-artifact")]["commands"])
|
|
self.assertIn("publish_artifact.py", pub_cmds)
|
|
self.assertIn("loader-image.tar", pub_cmds)
|
|
|
|
def test_ac3_unique_key_per_publish(self):
|
|
# Arrange
|
|
mod = _load_publish()
|
|
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
|
self._env_patch.start()
|
|
keys = []
|
|
storage = {}
|
|
|
|
def capture_post(url, headers=None, json=None, timeout=None):
|
|
keys.append(json["encryption_key"])
|
|
|
|
class R:
|
|
status_code = 200
|
|
|
|
def raise_for_status(self):
|
|
pass
|
|
|
|
return R()
|
|
|
|
fd, src = tempfile.mkstemp()
|
|
os.close(fd)
|
|
try:
|
|
with open(src, "wb") as f:
|
|
f.write(b"x")
|
|
with patch.object(
|
|
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
|
), patch.object(mod.requests, "post", side_effect=capture_post):
|
|
# Act
|
|
mod.publish(src, "r", "dev", "arm64", "1")
|
|
mod.publish(src, "r", "dev", "arm64", "2")
|
|
# Assert
|
|
self.assertEqual(len(keys), 2)
|
|
self.assertNotEqual(keys[0], keys[1])
|
|
self.assertEqual(len(bytes.fromhex(keys[0])), 32)
|
|
self.assertEqual(len(bytes.fromhex(keys[1])), 32)
|
|
finally:
|
|
os.unlink(src)
|
|
|
|
def test_ac4_sha256_matches_s3_object_and_registration(self):
|
|
# Arrange
|
|
mod = _load_publish()
|
|
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
|
self._env_patch.start()
|
|
posted = {}
|
|
storage = {}
|
|
|
|
def fake_post(url, headers=None, json=None, timeout=None):
|
|
posted.update(json)
|
|
|
|
class R:
|
|
status_code = 200
|
|
|
|
def raise_for_status(self):
|
|
pass
|
|
|
|
return R()
|
|
|
|
fd, src = tempfile.mkstemp()
|
|
os.close(fd)
|
|
try:
|
|
with open(src, "wb") as f:
|
|
f.write(b"payload-for-hash")
|
|
with patch.object(
|
|
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
|
), patch.object(mod.requests, "post", side_effect=fake_post):
|
|
# Act
|
|
out = mod.publish(src, "m", "stage", "arm64", "9.9.9")
|
|
key = out["object_key"]
|
|
body = storage["test-bucket"][key]
|
|
expect = __import__("hashlib").sha256(body).hexdigest().lower()
|
|
# Assert
|
|
self.assertEqual(posted["sha256"], expect)
|
|
self.assertEqual(out["sha256"], expect)
|
|
finally:
|
|
os.unlink(src)
|
|
|
|
def test_ac5_main_entry_matches_cli_invocation(self):
|
|
# Arrange
|
|
mod = _load_publish()
|
|
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
|
self._env_patch.start()
|
|
storage = {}
|
|
|
|
def ok_post(url, headers=None, json=None, timeout=None):
|
|
class R:
|
|
status_code = 200
|
|
|
|
def raise_for_status(self):
|
|
pass
|
|
|
|
return R()
|
|
|
|
fd, src = tempfile.mkstemp()
|
|
os.close(fd)
|
|
try:
|
|
with open(src, "wb") as f:
|
|
f.write(b"cli-data")
|
|
with patch.object(
|
|
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
|
), patch.object(mod.requests, "post", side_effect=ok_post):
|
|
# Act
|
|
code = mod.main(
|
|
[
|
|
"--file",
|
|
src,
|
|
"--resource-name",
|
|
"model",
|
|
"--dev-stage",
|
|
"dev",
|
|
"--architecture",
|
|
"arm64",
|
|
"--version",
|
|
"0.0.1",
|
|
]
|
|
)
|
|
# Assert
|
|
self.assertEqual(code, 0)
|
|
self.assertGreater(
|
|
len(storage["test-bucket"]["dev/model-arm64-0.0.1.enc"]), 0
|
|
)
|
|
finally:
|
|
os.unlink(src)
|
|
|
|
def test_ac5_cli_help_exits_zero(self):
|
|
# Act
|
|
r = subprocess.run(
|
|
[sys.executable, str(_SCRIPT), "--help"],
|
|
cwd=str(_ROOT),
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
# Assert
|
|
self.assertEqual(r.returncode, 0)
|
|
self.assertIn("--resource-name", r.stdout)
|
|
|
|
def test_ac5_subprocess_script_missing_env_exits_nonzero(self):
|
|
# Arrange
|
|
fd, path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
try:
|
|
minimal_env = {
|
|
k: v
|
|
for k, v in os.environ.items()
|
|
if k in ("PATH", "HOME", "TMPDIR", "SYSTEMROOT")
|
|
}
|
|
# Act
|
|
r = subprocess.run(
|
|
[
|
|
sys.executable,
|
|
str(_SCRIPT),
|
|
"--file",
|
|
path,
|
|
"--resource-name",
|
|
"x",
|
|
"--dev-stage",
|
|
"d",
|
|
"--architecture",
|
|
"arm64",
|
|
"--version",
|
|
"1",
|
|
],
|
|
cwd=str(_ROOT),
|
|
env=minimal_env,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
# Assert
|
|
self.assertNotEqual(r.returncode, 0)
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_encryption_compatible_with_decrypt_cbc_file(self):
|
|
# Arrange
|
|
mod = _load_publish()
|
|
aes_key = os.urandom(32)
|
|
fd, plain = tempfile.mkstemp()
|
|
os.close(fd)
|
|
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
|
|
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
|
|
dec_path = tempfile.NamedTemporaryFile(delete=False, suffix=".bin").name
|
|
try:
|
|
with open(plain, "wb") as f:
|
|
f.write(b"round-trip-plain")
|
|
mod.gzip_file(plain, gz_path)
|
|
mod.encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
|
|
# Act
|
|
decrypt_cbc_file(enc_path, aes_key, dec_path)
|
|
with open(dec_path, "rb") as f:
|
|
restored = gzip.decompress(f.read())
|
|
# Assert
|
|
self.assertEqual(restored, b"round-trip-plain")
|
|
finally:
|
|
for p in (plain, gz_path, enc_path, dec_path):
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|