Files
loader/tests/test_publish_artifact.py
T
Oleksandr Bezdieniezhnykh 9a0248af72 [AZ-185][AZ-186] Batch 2
Made-with: Cursor
2026-04-15 07:32:37 +03:00

332 lines
10 KiB
Python

import gzip
import importlib.util
import io
import os
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
import yaml
from download_manager import decrypt_cbc_file
_ROOT = Path(__file__).resolve().parents[1]
_SCRIPT = _ROOT / "scripts" / "publish_artifact.py"
_WOODPECKER = _ROOT / ".woodpecker" / "build-arm.yml"
def _load_publish():
spec = importlib.util.spec_from_file_location("publish_artifact", _SCRIPT)
mod = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(mod)
return mod
def _s3_client_factory(storage):
def client(service_name, **kwargs):
if service_name != "s3":
raise AssertionError(service_name)
m = MagicMock()
def upload_fileobj(body, bucket, key):
storage.setdefault(bucket, {})[key] = body.read()
m.upload_fileobj.side_effect = upload_fileobj
def get_object(Bucket=None, Key=None):
return {"Body": io.BytesIO(storage[Bucket][Key])}
m.get_object.side_effect = get_object
return m
return client
class TestPublishArtifact(unittest.TestCase):
def setUp(self):
self._env_patch = None
def tearDown(self):
if self._env_patch:
self._env_patch.stop()
def _base_env(self):
return {
"S3_ENDPOINT": "https://s3.example.test",
"S3_ACCESS_KEY": "ak",
"S3_SECRET_KEY": "sk",
"S3_BUCKET": "test-bucket",
"ADMIN_API_URL": "https://admin.example.test",
"ADMIN_API_TOKEN": "token",
}
def test_ac1_end_to_end_publish(self):
# Arrange
mod = _load_publish()
env = self._base_env()
self._env_patch = patch.dict(os.environ, env, clear=False)
self._env_patch.start()
captured = {}
storage = {}
def fake_post(url, headers=None, json=None, timeout=None):
class R:
status_code = 200
def raise_for_status(self):
pass
captured["url"] = url
captured["body"] = json
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"artifact-bytes")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=fake_post):
# Act
out = mod.publish(
src,
"loader",
"dev",
"arm64",
"v1",
)
# Assert
self.assertEqual(
out["object_key"],
"dev/loader-arm64-v1.enc",
)
key = out["object_key"]
body = storage["test-bucket"][key]
h = __import__("hashlib").sha256(body).hexdigest().lower()
self.assertEqual(h, out["sha256"])
self.assertEqual(captured["body"]["sha256"], out["sha256"])
self.assertEqual(captured["body"]["size_bytes"], len(body))
self.assertEqual(captured["body"]["encryption_key"], out["encryption_key_hex"])
self.assertEqual(captured["body"]["cdn_url"], out["cdn_url"])
finally:
os.unlink(src)
def test_ac2_woodpecker_publish_step_after_build(self):
# Arrange
raw = _WOODPECKER.read_text(encoding="utf-8")
# Act
doc = yaml.safe_load(raw)
names = [s["name"] for s in doc["steps"]]
# Assert
self.assertIn("build-push", names)
self.assertIn("publish-artifact", names)
self.assertLess(names.index("build-push"), names.index("publish-artifact"))
build_cmds = "\n".join(doc["steps"][names.index("build-push")]["commands"])
self.assertIn("docker save", build_cmds)
pub_cmds = "\n".join(doc["steps"][names.index("publish-artifact")]["commands"])
self.assertIn("publish_artifact.py", pub_cmds)
self.assertIn("loader-image.tar", pub_cmds)
def test_ac3_unique_key_per_publish(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
keys = []
storage = {}
def capture_post(url, headers=None, json=None, timeout=None):
keys.append(json["encryption_key"])
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"x")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=capture_post):
# Act
mod.publish(src, "r", "dev", "arm64", "1")
mod.publish(src, "r", "dev", "arm64", "2")
# Assert
self.assertEqual(len(keys), 2)
self.assertNotEqual(keys[0], keys[1])
self.assertEqual(len(bytes.fromhex(keys[0])), 32)
self.assertEqual(len(bytes.fromhex(keys[1])), 32)
finally:
os.unlink(src)
def test_ac4_sha256_matches_s3_object_and_registration(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
posted = {}
storage = {}
def fake_post(url, headers=None, json=None, timeout=None):
posted.update(json)
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"payload-for-hash")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=fake_post):
# Act
out = mod.publish(src, "m", "stage", "arm64", "9.9.9")
key = out["object_key"]
body = storage["test-bucket"][key]
expect = __import__("hashlib").sha256(body).hexdigest().lower()
# Assert
self.assertEqual(posted["sha256"], expect)
self.assertEqual(out["sha256"], expect)
finally:
os.unlink(src)
def test_ac5_main_entry_matches_cli_invocation(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
storage = {}
def ok_post(url, headers=None, json=None, timeout=None):
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"cli-data")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=ok_post):
# Act
code = mod.main(
[
"--file",
src,
"--resource-name",
"model",
"--dev-stage",
"dev",
"--architecture",
"arm64",
"--version",
"0.0.1",
]
)
# Assert
self.assertEqual(code, 0)
self.assertGreater(
len(storage["test-bucket"]["dev/model-arm64-0.0.1.enc"]), 0
)
finally:
os.unlink(src)
def test_ac5_cli_help_exits_zero(self):
# Act
r = subprocess.run(
[sys.executable, str(_SCRIPT), "--help"],
cwd=str(_ROOT),
capture_output=True,
text=True,
)
# Assert
self.assertEqual(r.returncode, 0)
self.assertIn("--resource-name", r.stdout)
def test_ac5_subprocess_script_missing_env_exits_nonzero(self):
# Arrange
fd, path = tempfile.mkstemp()
os.close(fd)
try:
minimal_env = {
k: v
for k, v in os.environ.items()
if k in ("PATH", "HOME", "TMPDIR", "SYSTEMROOT")
}
# Act
r = subprocess.run(
[
sys.executable,
str(_SCRIPT),
"--file",
path,
"--resource-name",
"x",
"--dev-stage",
"d",
"--architecture",
"arm64",
"--version",
"1",
],
cwd=str(_ROOT),
env=minimal_env,
capture_output=True,
text=True,
)
# Assert
self.assertNotEqual(r.returncode, 0)
finally:
os.unlink(path)
def test_encryption_compatible_with_decrypt_cbc_file(self):
# Arrange
mod = _load_publish()
aes_key = os.urandom(32)
fd, plain = tempfile.mkstemp()
os.close(fd)
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
dec_path = tempfile.NamedTemporaryFile(delete=False, suffix=".bin").name
try:
with open(plain, "wb") as f:
f.write(b"round-trip-plain")
mod.gzip_file(plain, gz_path)
mod.encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
# Act
decrypt_cbc_file(enc_path, aes_key, dec_path)
with open(dec_path, "rb") as f:
restored = gzip.decompress(f.read())
# Assert
self.assertEqual(restored, b"round-trip-plain")
finally:
for p in (plain, gz_path, enc_path, dec_path):
try:
os.unlink(p)
except OSError:
pass