import json import os import subprocess import tempfile import unittest from typing import List from unittest.mock import MagicMock from download_manager import ResumableDownloadManager from update_manager import UpdateManager, maybe_start_update_background from version_collector import VersionCollector class TestUpdateManager(unittest.TestCase): def _make_manager( self, tmp: str, *, post_get_update=None, subprocess_run=None, head_content_length=None, wait_fn=None, stop_event=None, ): dm_dir = os.path.join(tmp, "dm") model_dir = os.path.join(tmp, "models") state_path = os.path.join(dm_dir, "update_orchestrator.json") os.makedirs(model_dir, exist_ok=True) dm = ResumableDownloadManager(dm_dir) vc = VersionCollector(model_dir, subprocess_run=subprocess_run or MagicMock()) um = UpdateManager( "http://api.test", lambda: "tok", dm, vc, os.path.join(tmp, "compose.yml"), model_dir, state_path, interval_seconds=300.0, subprocess_run=subprocess_run, post_get_update=post_get_update, head_content_length=head_content_length, wait_fn=wait_fn, stop_event=stop_event, ) return um, dm, vc def test_ac2_background_loop_polls_on_schedule(self): # Arrange tmp = tempfile.mkdtemp() posts: List[dict] = [] def post(token, body): posts.append({"token": token, "body": body}) return [] waits: List[float] = [] def wait_fn(interval): waits.append(interval) return len(waits) >= 2 def fake_run(cmd, **kwargs): if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) um, _, _ = self._make_manager( tmp, post_get_update=post, subprocess_run=fake_run, head_content_length=lambda url, token: 1, wait_fn=wait_fn, ) # Act um.run_forever() # Assert self.assertEqual(len(posts), 2) self.assertEqual(waits, [300.0, 300.0]) def test_ac2_default_interval_is_five_minutes(self): # Arrange / Act tmp = tempfile.mkdtemp() dm_dir = os.path.join(tmp, "dm") model_dir = os.path.join(tmp, "m") os.makedirs(model_dir, exist_ok=True) dm = ResumableDownloadManager(dm_dir) vc = VersionCollector(model_dir, subprocess_run=MagicMock()) um = UpdateManager( "http://x", lambda: None, dm, vc, "c.yml", model_dir, os.path.join(dm_dir, "st.json"), ) # Assert self.assertEqual(um._interval, 300.0) def test_ac3_ai_model_update_applied(self): # Arrange tmp = tempfile.mkdtemp() model_dir = os.path.join(tmp, "models") os.makedirs(model_dir, exist_ok=True) def fake_run(cmd, **kwargs): if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) dm_mock = MagicMock() def post(token, body): return [ { "resourceName": "detection_model", "version": "2026-04-20", "cdnUrl": "http://cdn/x", "sha256": "ab", "encryptionKey": "k", } ] um, _, _ = self._make_manager( tmp, post_get_update=post, subprocess_run=fake_run, head_content_length=lambda url, token: 4, ) um._download_manager = dm_mock def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): with open(output_plaintext_path, "wb") as f: f.write(b"trt") dm_mock.fetch_decrypt_verify.side_effect = capture_fetch # Act um._tick_once() # Assert dm_mock.fetch_decrypt_verify.assert_called_once() args, kwargs = dm_mock.fetch_decrypt_verify.call_args self.assertTrue(args[5].endswith("azaion-2026-04-20.trt")) self.assertTrue(os.path.isfile(os.path.join(model_dir, "azaion-2026-04-20.trt"))) def test_ac4_docker_image_update_applied(self): # Arrange tmp = tempfile.mkdtemp() recorded: List[List[str]] = [] def fake_run(cmd, **kwargs): recorded.append(list(cmd)) if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:3] == ["docker", "load", "-i"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:2] == ["docker", "compose"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) dm_mock = MagicMock() def post(token, body): return [ { "resourceName": "annotations", "version": "2026-04-13", "cdnUrl": "http://cdn/a", "sha256": "cd", "encryptionKey": "k", } ] um, _, _ = self._make_manager( tmp, post_get_update=post, subprocess_run=fake_run, head_content_length=lambda url, token: 8, ) um._download_manager = dm_mock def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): with open(output_plaintext_path, "wb") as f: f.write(b"tarbytes") dm_mock.fetch_decrypt_verify.side_effect = capture_fetch # Act um._tick_once() # Assert loads = [c for c in recorded if c[:3] == ["docker", "load", "-i"]] composes = [c for c in recorded if c[:2] == ["docker", "compose"]] self.assertEqual(len(loads), 1) self.assertEqual(len(composes), 1) self.assertIn("annotations", composes[0]) def test_ac5_self_update_applied_last(self): # Arrange tmp = tempfile.mkdtemp() recorded: List[str] = [] def fake_run(cmd, **kwargs): if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:3] == ["docker", "load", "-i"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:2] == ["docker", "compose"]: recorded.append(cmd[-1]) return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) dm_mock = MagicMock() def post(token, body): return [ { "resourceName": "loader", "version": "v2", "cdnUrl": "http://cdn/l", "sha256": "00", "encryptionKey": "k", }, { "resourceName": "annotations", "version": "v1", "cdnUrl": "http://cdn/a", "sha256": "11", "encryptionKey": "k", }, ] um, _, _ = self._make_manager( tmp, post_get_update=post, subprocess_run=fake_run, head_content_length=lambda url, token: 1, ) um._download_manager = dm_mock def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): with open(output_plaintext_path, "wb") as f: f.write(b"x") dm_mock.fetch_decrypt_verify.side_effect = capture_fetch # Act um._tick_once() # Assert self.assertEqual(recorded, ["annotations", "loader"]) def test_ac6_invalidate_after_docker_apply(self): # Arrange tmp = tempfile.mkdtemp() def fake_run(cmd, **kwargs): if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:3] == ["docker", "load", "-i"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:2] == ["docker", "compose"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) dm_mock = MagicMock() def post(token, body): return [ { "resourceName": "annotations", "version": "v9", "cdnUrl": "http://cdn/a", "sha256": "11", "encryptionKey": "k", } ] um, _, vc = self._make_manager( tmp, post_get_update=post, subprocess_run=fake_run, head_content_length=lambda url, token: 1, ) um._download_manager = dm_mock def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): with open(output_plaintext_path, "wb") as f: f.write(b"x") dm_mock.fetch_decrypt_verify.side_effect = capture_fetch vc.collect() self.assertIsNotNone(vc._cache) # Act um._tick_once() # Assert self.assertIsNone(vc._cache) def test_maybe_start_skips_without_download_state_dir(self): # Arrange old = os.environ.pop("LOADER_DOWNLOAD_STATE_DIR", None) try: def get_client(): return MagicMock() # Act maybe_start_update_background(get_client, "http://x") finally: if old is not None: os.environ["LOADER_DOWNLOAD_STATE_DIR"] = old def test_pending_compose_drained_on_startup(self): # Arrange tmp = tempfile.mkdtemp() dm_dir = os.path.join(tmp, "dm") os.makedirs(dm_dir, exist_ok=True) state_path = os.path.join(dm_dir, "update_orchestrator.json") with open(state_path, "w", encoding="utf-8") as f: json.dump({"pending_compose": ["annotations", "loader"]}, f) recorded: List[str] = [] def fake_run(cmd, **kwargs): if cmd[:3] == ["docker", "images", "--format"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") if cmd[:2] == ["docker", "compose"]: recorded.append(cmd[-1]) return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise AssertionError(cmd) model_dir = os.path.join(tmp, "m") os.makedirs(model_dir, exist_ok=True) dm = ResumableDownloadManager(dm_dir) vc = VersionCollector(model_dir, subprocess_run=fake_run) um = UpdateManager( "http://api.test", lambda: None, dm, vc, os.path.join(tmp, "compose.yml"), model_dir, state_path, subprocess_run=fake_run, ) # Act um._drain_pending_compose() # Assert self.assertEqual(recorded, ["annotations", "loader"]) with open(state_path, encoding="utf-8") as f: data = json.load(f) self.assertEqual(data.get("pending_compose"), []) if __name__ == "__main__": unittest.main()