[AZ-185][AZ-186] Batch 2

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-04-15 07:32:37 +03:00
parent d244799f02
commit 9a0248af72
18 changed files with 1857 additions and 26 deletions
+14
View File
@@ -12,5 +12,19 @@ steps:
- if [ "$CI_COMMIT_BRANCH" = "main" ]; then export TAG=arm; else export TAG=${CI_COMMIT_BRANCH}-arm; fi
- docker build -f Dockerfile -t localhost:5000/loader:$TAG .
- docker push localhost:5000/loader:$TAG
- docker save localhost:5000/loader:$TAG -o loader-image.tar
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- name: publish-artifact
image: python:3.11-slim
commands:
- pip install --no-cache-dir boto3==1.40.9 cryptography==44.0.2 requests==2.32.4
- |
if [ "$CI_COMMIT_BRANCH" = "main" ]; then
export PUBLISH_DEV_STAGE=main
export TAG=arm
else
export PUBLISH_DEV_STAGE=$CI_COMMIT_BRANCH
export TAG=${CI_COMMIT_BRANCH}-arm
fi
- python scripts/publish_artifact.py --file loader-image.tar --resource-name loader --dev-stage "$PUBLISH_DEV_STAGE" --architecture arm64 --version "$CI_COMMIT_SHA"
@@ -0,0 +1,53 @@
# Publish artifact script (AZ-186)
Training services and CI/CD call `scripts/publish_artifact.py` after producing an artifact (for example a `.trt` model or a Docker image tarball). The script gzip-compresses the file, encrypts it with a random 32-byte AES-256 key (AES-CBC with PKCS7, IV prefixed), uploads the ciphertext to S3, and registers metadata with the admin API.
## CLI
```text
python scripts/publish_artifact.py \
--file /path/to/artifact \
--resource-name my_model \
--dev-stage dev \
--architecture arm64 \
--version 2026-04-15
```
Object key: `{dev_stage}/{resource_name}-{architecture}-{version}.enc`
## Environment variables
| Variable | Required | Purpose |
|----------|----------|---------|
| `S3_ENDPOINT` | yes | S3-compatible endpoint URL |
| `S3_ACCESS_KEY` | yes | Upload credentials |
| `S3_SECRET_KEY` | yes | Upload credentials |
| `S3_BUCKET` | yes | Target bucket |
| `ADMIN_API_URL` | yes | Admin API base URL (no trailing path for publish) |
| `ADMIN_API_TOKEN` | yes | Bearer token for the publish request |
| `CDN_PUBLIC_BASE_URL` | no | If set, `cdn_url` in the registration payload is `{CDN_PUBLIC_BASE_URL}/{object_key}`; otherwise it defaults to `{S3_ENDPOINT}/{S3_BUCKET}/{object_key}` |
| `ADMIN_API_PUBLISH_PATH` | no | Defaults to `internal/resources/publish`; POST is sent to `{ADMIN_API_URL}/{ADMIN_API_PUBLISH_PATH}` |
## Admin API contract
`POST {ADMIN_API_URL}/internal/resources/publish` (unless overridden) with JSON body:
- `resource_name`, `dev_stage`, `architecture`, `version` (strings)
- `cdn_url` (string)
- `sha256` (lowercase hex of the uploaded ciphertext file, including the 16-byte IV)
- `encryption_key` (64-character hex encoding of the raw 32-byte AES key)
- `size_bytes` (integer size of the uploaded ciphertext file)
The loader expects the same `encryption_key` and `sha256` semantics as returned by fleet `POST /get-update` (hex key, hash of the ciphertext object).
## Dependencies
Use the same major versions as the loader: `boto3`, `cryptography`, `requests` (see `requirements.txt`). A minimal install for a training host is:
```text
pip install boto3==1.40.9 cryptography==44.0.2 requests==2.32.4
```
## Woodpecker
Pipeline `.woodpecker/build-arm.yml` saves the built image to `loader-image.tar` and runs this script in a follow-up step. Configure the environment variables above as Woodpecker secrets for that step.
@@ -0,0 +1,129 @@
# TPM-Based Security Provider
**Task**: AZ-182_tpm_security_provider
**Name**: TPM Security Provider
**Description**: Introduce SecurityProvider abstraction with TPM detection and FAPI integration, wrapping existing security logic in LegacySecurityProvider for backward compatibility
**Complexity**: 5 points
**Dependencies**: None
**Component**: 02 Security
**Tracker**: AZ-182
**Epic**: AZ-181
## Problem
The loader's security code (key derivation, encryption, hardware fingerprinting) is hardcoded for the binary-split scheme. On fused Jetson Orin Nano devices with fTPM, this scheme is unnecessary — full-disk encryption protects data at rest, and the fleet update system (AZ-185) handles encrypted artifact delivery with per-artifact keys. However, the loader still needs a clean abstraction to:
1. Detect whether it's running on a TPM-equipped device or a legacy environment
2. Provide TPM seal/unseal capability as infrastructure for defense-in-depth (sealed credentials, future key wrapping)
3. Preserve the legacy code path for non-TPM deployments
## Outcome
- Loader detects TPM availability at startup and selects the appropriate security provider
- SecurityProvider abstraction cleanly separates TPM and legacy code paths
- TpmSecurityProvider establishes FAPI connection and provides seal/unseal operations
- LegacySecurityProvider wraps existing security.pyx unchanged
- Foundation in place for fTPM-sealed credentials (future) and per-artifact key decryption integration
## Scope
### Included
- SecurityProvider abstraction (ABC) with TpmSecurityProvider and LegacySecurityProvider
- Runtime TPM detection (/dev/tpm0 + SECURITY_PROVIDER env var override)
- tpm2-pytss FAPI integration: connect, create_seal, unseal
- LegacySecurityProvider wrapping existing security.pyx (encrypt, decrypt, key derivation)
- Auto-detection and provider selection at startup with logging
- Docker compose device mounts for /dev/tpm0 and /dev/tpmrm0
- Dockerfile changes: install tpm2-tss native library + tpm2-pytss
- Tests using TPM simulator (swtpm)
### Excluded
- Resource download/upload changes (handled by AZ-185 Update Manager with per-artifact keys)
- Docker unlock flow changes (handled by AZ-185 Update Manager)
- fTPM provisioning pipeline (manufacturing-time, separate from code)
- Remote attestation via EK certificates
- fTPM-sealed device credentials (future enhancement, not v1)
- Changes to the Azaion admin API server
## Acceptance Criteria
**AC-1: SecurityProvider auto-detection**
Given a Jetson device with provisioned fTPM and /dev/tpm0 accessible
When the loader starts
Then TpmSecurityProvider is selected and logged
**AC-2: TPM seal/unseal round-trip**
Given TpmSecurityProvider is active
When data is sealed via FAPI create_seal and later unsealed
Then the unsealed data matches the original
**AC-3: Legacy path unchanged**
Given no TPM is available (/dev/tpm0 absent)
When the loader starts and processes resource requests
Then LegacySecurityProvider is selected and all behavior is identical to the current scheme
**AC-4: Env var override**
Given SECURITY_PROVIDER=legacy is set
When the loader starts on a device with /dev/tpm0 present
Then LegacySecurityProvider is selected regardless of TPM availability
**AC-5: Graceful fallback**
Given /dev/tpm0 exists but FAPI connection fails
When the loader starts
Then it falls back to LegacySecurityProvider with a warning log
**AC-6: Docker container TPM access**
Given docker-compose.yml with /dev/tpm0 and /dev/tpmrm0 device mounts
When the loader container starts on a fused Jetson
Then TpmSecurityProvider can connect to fTPM via FAPI
## Non-Functional Requirements
**Performance**
- TPM seal/unseal latency must be under 500ms per operation
**Compatibility**
- Must work on ARM64 Jetson Orin Nano with JetPack 6.1+
- Must work inside Docker containers with --device mounts
- tpm2-pytss must be compatible with Python 3.11 and Cython compilation
**Reliability**
- Graceful fallback to LegacySecurityProvider on any TPM initialization failure
- No crash on /dev/tpm0 absence — clean detection and fallback
## Unit Tests
| AC Ref | What to Test | Required Outcome |
|--------|-------------|-----------------|
| AC-1 | SecurityProvider factory with /dev/tpm0 mock present | TpmSecurityProvider selected |
| AC-2 | FAPI create_seal + unseal via swtpm | Data matches round-trip |
| AC-3 | SecurityProvider factory without /dev/tpm0 | LegacySecurityProvider selected |
| AC-4 | SECURITY_PROVIDER=legacy env var with /dev/tpm0 present | LegacySecurityProvider selected |
| AC-5 | /dev/tpm0 exists but FAPI raises exception | LegacySecurityProvider selected, warning logged |
## Blackbox Tests
| AC Ref | Initial Data/Conditions | What to Test | Expected Behavior | NFR References |
|--------|------------------------|-------------|-------------------|----------------|
| AC-3 | No TPM device available | POST /load/{filename} (split resource) | Existing binary-split behavior, all current tests pass | Compatibility |
| AC-6 | TPM simulator in Docker | Container starts with device mounts | FAPI connects, seal/unseal works | Compatibility |
## Constraints
- tpm2-pytss requires tpm2-tss >= 2.4.0 native library in the Docker image
- Tests require swtpm (software TPM simulator) — must be added to test infrastructure
- fTPM provisioning is out of scope — this task assumes a provisioned TPM exists
- PCR-based policy binding intentionally not used (known persistence issues on Orin Nano)
## Risks & Mitigation
**Risk 1: fTPM FAPI stability on Jetson Orin Nano**
- *Risk*: FAPI seal/unseal may have undocumented issues on Orin Nano (similar to PCR/NV persistence bugs)
- *Mitigation*: Design intentionally avoids PCR policies and NV indexes; uses SRK hierarchy only. Hardware validation required before production deployment.
**Risk 2: swtpm test fidelity**
- *Risk*: Software TPM simulator may not reproduce all fTPM behaviors
- *Mitigation*: Integration tests on actual Jetson hardware as part of acceptance testing (outside CI).
**Risk 3: tpm2-tss native library in Docker image**
- *Risk*: tpm2-tss may not be available in python:3.11-slim base image; ARM64 build may need compilation
- *Mitigation*: Add tpm2-tss to Dockerfile build step; verify ARM64 compatibility early.
@@ -0,0 +1,79 @@
# Resumable Download Manager
**Task**: AZ-184_resumable_download_manager
**Name**: Resumable Download Manager
**Description**: Implement a resumable HTTP download manager for the loader that handles intermittent Starlink connectivity
**Complexity**: 3 points
**Dependencies**: None
**Component**: Loader
**Tracker**: AZ-184
**Epic**: AZ-181
## Problem
Jetsons on UAVs have intermittent Starlink connectivity. Downloads of large artifacts (AI models ~500MB, Docker images ~1GB) must survive connection drops and resume from where they left off.
## Outcome
- Downloads resume from the last byte received after connectivity loss
- Completed downloads are verified with SHA-256 before use
- Downloaded artifacts are decrypted with per-artifact AES-256 keys
- State persists across loader restarts
## Scope
### Included
- Resumable HTTP downloads using Range headers (S3 supports natively)
- JSON state file on disk tracking: url, expected_sha256, expected_size, bytes_downloaded, temp_file_path
- SHA-256 verification of completed downloads
- AES-256 decryption of downloaded artifacts using per-artifact key from /get-update response
- Retry with exponential backoff (1min, 5min, 15min, 1hr, max 4hr)
- State machine: pending -> downloading -> paused -> verifying -> decrypting -> complete / failed
### Excluded
- Update check logic (AZ-185)
- Applying updates (AZ-185)
- CDN upload (AZ-186)
## Acceptance Criteria
**AC-1: Resume after connection drop**
Given a download is 60% complete and connectivity is lost
When connectivity returns
Then download resumes from byte offset (60% of file), not from scratch
**AC-2: SHA-256 mismatch triggers re-download**
Given a completed download with corrupted data
When SHA-256 verification fails
Then the partial file is deleted and download restarts from scratch
**AC-3: Decryption produces correct output**
Given a completed and verified download
When decrypted with the per-artifact AES-256 key
Then the output matches the original unencrypted artifact
**AC-4: State survives restart**
Given a download is 40% complete and the loader container restarts
When the loader starts again
Then the download resumes from 40%, not from scratch
**AC-5: Exponential backoff on repeated failures**
Given multiple consecutive connection failures
When retrying
Then wait times follow exponential backoff pattern
## Unit Tests
| AC Ref | What to Test | Required Outcome |
|--------|-------------|-----------------|
| AC-1 | Mock HTTP server drops connection mid-transfer | Resume with Range header from correct offset |
| AC-2 | Corrupt downloaded file | SHA-256 check fails, file deleted, retry flag set |
| AC-3 | Encrypt test file, download, decrypt | Round-trip matches original |
| AC-4 | Write state file, reload | State correctly restored |
| AC-5 | Track retry intervals | Backoff pattern matches spec |
## Constraints
- Must work inside Docker container
- S3-compatible CDN (current CDNManager already uses boto3)
- State file location must be on a volume that persists across container restarts
@@ -0,0 +1,76 @@
# Update Manager
**Task**: AZ-185_update_manager
**Name**: Update Manager
**Description**: Implement the loader's background update loop that checks for new versions and applies AI model and Docker image updates
**Complexity**: 5 points
**Dependencies**: AZ-183, AZ-184
**Component**: Loader
**Tracker**: AZ-185
**Epic**: AZ-181
## Problem
Jetsons need to automatically discover and install new AI models and Docker images without manual intervention. The update loop must handle version detection, server communication, and applying different update types.
## Outcome
- Loader automatically checks for updates every 5 minutes
- New AI models downloaded, decrypted, and placed in model directory
- New Docker images loaded and services restarted with minimal downtime
- Loader can update itself (self-update, applied last)
## Scope
### Included
- Version collector: scan model directory for .trt files (extract date from filename), query docker images for azaion/* tags, cache results
- Background loop (configurable interval, default 5 min): collect versions, call POST /get-update, trigger downloads
- Apply AI model: move decrypted .trt to model directory (detection API scans and picks newest)
- Apply Docker image: docker load -i, docker compose up -d {service}
- Self-update: loader updates itself last via docker compose up -d loader
- Integration with AZ-184 Resumable Download Manager for all downloads
### Excluded
- Server-side /get-update endpoint (AZ-183)
- Download mechanics (AZ-184)
- CI/CD publish pipeline (AZ-186)
- Device provisioning (AZ-187)
## Acceptance Criteria
**AC-1: Version collector reads local state**
Given AI model azaion-2026-03-10.trt in model directory and Docker image azaion/annotations:arm64_2026-03-01 loaded
When version collector runs
Then it reports [{resource_name: "detection_model", version: "2026-03-10"}, {resource_name: "annotations", version: "arm64_2026-03-01"}]
**AC-2: Background loop polls on schedule**
Given the loader is running with update interval set to 5 minutes
When 5 minutes elapse
Then POST /get-update is called with current versions
**AC-3: AI model update applied**
Given /get-update returns a new detection_model version
When download and decryption complete
Then new .trt file is in the model directory
**AC-4: Docker image update applied**
Given /get-update returns a new annotations version
When download and decryption complete
Then docker load succeeds and docker compose up -d annotations restarts the service
**AC-5: Self-update applied last**
Given /get-update returns updates for both annotations and loader
When applying updates
Then annotations is updated first, loader is updated last
**AC-6: Cached versions refresh after changes**
Given version collector cached its results
When a new model file appears in the directory or docker load completes
Then cache is invalidated and next collection reflects new state
## Constraints
- Docker socket must be mounted in the loader container (already the case)
- docker compose file path must be configurable (env var)
- Model directory path must be configurable (env var)
- Self-update must be robust: state file on disk ensures in-progress updates survive container restart
@@ -0,0 +1,67 @@
# CI/CD Artifact Publish
**Task**: AZ-186_cicd_artifact_publish
**Name**: CI/CD Artifact Publish
**Description**: Add encrypt-and-publish step to Woodpecker CI/CD pipeline and create a shared publish script usable by both CI/CD and training service
**Complexity**: 3 points
**Dependencies**: AZ-183
**Component**: DevOps
**Tracker**: AZ-186
**Epic**: AZ-181
## Problem
Both CI/CD (for Docker images) and the training service (for AI models) need to encrypt artifacts and publish them to CDN + Resources table. The encryption and publish logic should be shared.
## Outcome
- Shared Python publish script that any producer can call
- Woodpecker pipeline automatically publishes encrypted Docker archives after build
- Training service can publish AI models using the same script
- Every artifact gets its own random AES-256 key
## Scope
### Included
- Shared publish script (Python): generate random AES-256 key, compress (gzip), encrypt (AES-256), SHA-256 hash, upload to S3, write Resources row
- Woodpecker pipeline step in build-arm.yml: after docker build+push, also docker save -> publish script
- S3 bucket structure: {dev_stage}/{resource_name}-{architecture}-{version}.enc
- Documentation for training service integration
### Excluded
- Server-side Resources table (AZ-183, must exist first)
- Loader-side download/decrypt (AZ-184)
- Training service code changes (their team integrates the script)
## Acceptance Criteria
**AC-1: Publish script works end-to-end**
Given a local file (Docker archive or AI model)
When publish script is called with resource_name, dev_stage, architecture, version
Then file is compressed, encrypted with random key, uploaded to S3, and Resources row is written
**AC-2: Woodpecker publishes after build**
Given a push to dev/stage/main branch
When Woodpecker build completes
Then the Docker image is also published as encrypted archive to CDN with Resources row
**AC-3: Unique key per artifact**
Given two consecutive publishes of the same resource
When comparing encryption keys
Then each publish used a different random AES-256 key
**AC-4: SHA-256 consistency**
Given a published artifact
When SHA-256 of the uploaded S3 object is computed
Then it matches the sha256 value in the Resources table
**AC-5: Training service can use the script**
Given the publish script installed as a package or available as a standalone script
When the training service calls it after producing a .trt model
Then the model is published to CDN + Resources table
## Constraints
- Woodpecker runner has access to Docker socket and S3 credentials
- Publish script must work on both x86 (CI runner) and arm64 (training server if needed)
- S3 credentials and DB connection string passed via environment variables
@@ -0,0 +1,62 @@
# Device Provisioning Script
**Task**: AZ-187_device_provisioning_script
**Name**: Device Provisioning Script
**Description**: Create a shell script that provisions a Jetson device identity (CompanionPC user) during the fuse/flash pipeline
**Complexity**: 2 points
**Dependencies**: None
**Component**: DevOps
**Tracker**: AZ-187
**Epic**: AZ-181
## Problem
Each Jetson needs a unique CompanionPC user account for API authentication. This must be automated as part of the manufacturing/flash process so that provisioning 50+ devices is not manual.
## Outcome
- Single script creates device identity and embeds credentials in the rootfs
- Integrates into the fuse/flash pipeline between odmfuse.sh and flash.sh
- Provisioning runbook documents the full end-to-end flow
## Scope
### Included
- provision_device.sh: generate device email (azaion-jetson-{serial}@azaion.com), random 32-char password
- Call admin API POST /users to create Users row with Role=CompanionPC
- Write credentials config file to rootfs image (at known path, e.g., /etc/azaion/device.conf)
- Idempotency: re-running for same serial doesn't create duplicate user
- Provisioning runbook: step-by-step from unboxing through fusing, flashing, and first boot
### Excluded
- fTPM provisioning (covered by NVIDIA's ftpm_provisioning.sh)
- Secure Boot fusing (covered by solution_draft02 Phase 1-2)
- OS hardening (covered by solution_draft02 Phase 3)
- Admin API user creation endpoint (assumed to exist)
## Acceptance Criteria
**AC-1: Script creates CompanionPC user**
Given a new device serial AZJN-0042
When provision_device.sh is run with serial AZJN-0042
Then admin API has a new user azaion-jetson-0042@azaion.com with Role=CompanionPC
**AC-2: Credentials written to rootfs**
Given provision_device.sh completed successfully
When the rootfs image is inspected
Then /etc/azaion/device.conf contains the email and password
**AC-3: Device can log in after flash**
Given a provisioned and flashed device boots for the first time
When the loader reads /etc/azaion/device.conf and calls POST /login
Then a valid JWT is returned
**AC-4: Idempotent re-run**
Given provision_device.sh was already run for serial AZJN-0042
When it is run again for the same serial
Then no duplicate user is created (existing user is reused or updated)
**AC-5: Runbook complete**
Given the provisioning runbook
When followed step-by-step on a new Jetson Orin Nano
Then the device is fully fused, flashed, provisioned, and can communicate with the admin API
+12 -6
View File
@@ -1,18 +1,24 @@
# Batch Report
**Batch**: 1
**Tasks**: 01_test_infrastructure
**Date**: 2026-04-13
**Tasks**: AZ-182, AZ-184, AZ-187
**Date**: 2026-04-15
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|---------------|-------|-------------|--------|
| 01_test_infrastructure | Done | 12 files | 1/1 pass | 5/5 ACs (AC-1,2,3 require Docker) | None |
| AZ-182_tpm_security_provider | Done | 8 files | 8 pass (1 skip without swtpm) | 6/6 ACs covered | None |
| AZ-184_resumable_download_manager | Done | 2 files | 8 pass | 5/5 ACs covered | None |
| AZ-187_device_provisioning_script | Done | 3 files | 5 pass | 5/5 ACs covered | None |
## AC Test Coverage: 5/5 covered (3 require Docker environment)
## Code Review Verdict: PASS (infrastructure scaffold, no logic review needed)
## Excluded
AZ-183 (Resources Table & Update API) — admin API repo, not this workspace.
## AC Test Coverage: All covered (16/16)
## Code Review Verdict: PASS_WITH_WARNINGS
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Next Batch: 02_test_health_auth
## Next Batch: AZ-185, AZ-186 (Batch 2 — 8 points)
+7 -16
View File
@@ -1,28 +1,19 @@
# Batch Report
**Batch**: 2
**Tasks**: 02_test_health_auth
**Date**: 2026-04-13
**Tasks**: AZ-185, AZ-186
**Date**: 2026-04-15
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|---------------|-------|-------------|--------|
| 02_test_health_auth | Done | 2 files | 6 tests | 5/5 ACs covered | None |
| AZ-185_update_manager | Done | 4 files | 10 pass | 6/6 ACs covered | None |
| AZ-186_cicd_artifact_publish | Done | 3 files | 8 pass | 5/5 ACs covered | None |
## AC Test Coverage: All covered
| AC | Test | Status |
|----|------|--------|
| AC-1: Health returns 200 | test_health_returns_200 | Covered |
| AC-2: Status unauthenticated | test_status_unauthenticated | Covered |
| AC-3: Login valid | test_login_valid_credentials | Covered |
| AC-4: Login invalid | test_login_invalid_credentials | Covered |
| AC-5: Login empty body | test_login_empty_body | Covered |
| AC-2+3: Status authenticated | test_status_authenticated_after_login | Covered |
## Code Review Verdict: PASS
## AC Test Coverage: All covered (11/11)
## Code Review Verdict: PASS_WITH_WARNINGS
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Next Batch: 03_test_resources, 04_test_unlock, 05_test_resilience_perf (parallel)
## Next Batch: All tasks complete
@@ -0,0 +1,41 @@
# Code Review Report
**Batch**: 2 (AZ-185, AZ-186)
**Date**: 2026-04-15
**Verdict**: PASS_WITH_WARNINGS
## Spec Compliance
All 11 acceptance criteria across 2 tasks are satisfied with corresponding tests.
| Task | ACs | Covered | Status |
|------|-----|---------|--------|
| AZ-185 Update Manager | 6 | 6/6 | All pass (10 tests) |
| AZ-186 CI/CD Artifact Publish | 5 | 5/5 | All pass (8 tests) |
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| 1 | Low | Style | scripts/publish_artifact.py | Union syntax fixed |
| 2 | Low | Maintainability | src/main.py:15 | Deprecated on_event startup |
### Finding Details
**F1: Union syntax fixed** (Low / Style)
- Location: `scripts/publish_artifact.py:172,182`
- Description: Used `list[str] | None` syntax, fixed to `Optional[List[str]]` for consistency
- Status: Fixed
**F2: Deprecated on_event startup** (Low / Maintainability)
- Location: `src/main.py:15`
- Description: `@app.on_event("startup")` is deprecated in modern FastAPI in favor of lifespan context manager
- Suggestion: Migrate to `@asynccontextmanager lifespan` when upgrading FastAPI — not blocking
- Task: AZ-185
## Cross-Task Consistency
- AZ-185 uses AZ-184's `ResumableDownloadManager` correctly via its public API
- AZ-186 encrypt format (IV + AES-CBC + PKCS7) is compatible with AZ-184's `decrypt_cbc_file()`
- AZ-186's `encryption_key` is hex-encoded; AZ-185's `_aes_key_from_encryption_field` handles hex decoding
- Self-update ordering (loader last) correctly implemented in AZ-185
+5 -4
View File
@@ -2,8 +2,9 @@
## Current Step
flow: existing-code
step: 7
name: Refactor
status: done
sub_step: 7Phase 7 Documentation (complete)
step: 9
name: Implement
status: in_progress
sub_step: 6Launch Batch 2 implementers
retry_count: 0
current_task: Batch 2 (AZ-185, AZ-186)
+199
View File
@@ -0,0 +1,199 @@
import argparse
import gzip
import hashlib
import logging
import os
import secrets
import shutil
import sys
import tempfile
from typing import Any, Dict, List, Optional
import boto3
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
logger = logging.getLogger(__name__)
_DEFAULT_PUBLISH_PATH = "/internal/resources/publish"
def _require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise ValueError(f"missing required environment variable: {name}")
return value
def object_key(dev_stage: str, resource_name: str, architecture: str, version: str) -> str:
return f"{dev_stage}/{resource_name}-{architecture}-{version}.enc"
def build_cdn_url(endpoint: str, bucket: str, key: str) -> str:
public_base = os.environ.get("CDN_PUBLIC_BASE_URL")
if public_base:
return f"{public_base.rstrip('/')}/{key}"
return f"{endpoint.rstrip('/')}/{bucket}/{key}"
def gzip_file(source_path: str, destination_path: str) -> None:
with open(source_path, "rb") as src, gzip.open(
destination_path, "wb", compresslevel=9
) as dst:
shutil.copyfileobj(src, dst, length=1024 * 1024)
def encrypt_aes256_cbc_file(plaintext_path: str, ciphertext_path: str, aes_key: bytes) -> None:
if len(aes_key) != 32:
raise ValueError("aes key must be 32 bytes")
iv = os.urandom(16)
cipher = Cipher(
algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend()
)
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
with open(ciphertext_path, "wb") as out:
out.write(iv)
with open(plaintext_path, "rb") as inp:
while True:
chunk = inp.read(1024 * 1024)
if not chunk:
break
padded = padder.update(chunk)
if padded:
out.write(encryptor.update(padded))
tail = padder.finalize()
if tail:
out.write(encryptor.update(tail))
out.write(encryptor.finalize())
def sha256_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
block = f.read(1024 * 1024)
if not block:
break
h.update(block)
return h.hexdigest().lower()
def upload_s3_file(
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
key: str,
file_path: str,
) -> None:
client = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
with open(file_path, "rb") as body:
client.upload_fileobj(body, bucket, key)
def register_resource(
admin_base_url: str,
token: str,
payload: Dict[str, Any],
) -> None:
path = os.environ.get("ADMIN_API_PUBLISH_PATH", _DEFAULT_PUBLISH_PATH).lstrip("/")
base = admin_base_url.rstrip("/")
url = f"{base}/{path}"
resp = requests.post(
url,
headers={"Authorization": f"Bearer {token}"},
json=payload,
timeout=120,
)
resp.raise_for_status()
def publish(
file_path: str,
resource_name: str,
dev_stage: str,
architecture: str,
version: str,
) -> Dict[str, Any]:
endpoint = _require_env("S3_ENDPOINT")
access_key = _require_env("S3_ACCESS_KEY")
secret_key = _require_env("S3_SECRET_KEY")
bucket = _require_env("S3_BUCKET")
admin_url = _require_env("ADMIN_API_URL")
admin_token = _require_env("ADMIN_API_TOKEN")
key = object_key(dev_stage, resource_name, architecture, version)
aes_key = secrets.token_bytes(32)
encryption_key_hex = aes_key.hex()
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
try:
gzip_file(file_path, gz_path)
encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
digest = sha256_file(enc_path)
size_bytes = os.path.getsize(enc_path)
upload_s3_file(endpoint, access_key, secret_key, bucket, key, enc_path)
cdn_url = build_cdn_url(endpoint, bucket, key)
body = {
"resource_name": resource_name,
"dev_stage": dev_stage,
"architecture": architecture,
"version": version,
"cdn_url": cdn_url,
"sha256": digest,
"encryption_key": encryption_key_hex,
"size_bytes": size_bytes,
}
register_resource(admin_url, admin_token, body)
return {
"object_key": key,
"cdn_url": cdn_url,
"sha256": digest,
"encryption_key_hex": encryption_key_hex,
"size_bytes": size_bytes,
}
finally:
for p in (gz_path, enc_path):
try:
os.unlink(p)
except OSError:
pass
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Compress, encrypt, upload artifact and register resource")
p.add_argument("--file", required=True, help="Path to file to publish")
p.add_argument("--resource-name", required=True)
p.add_argument("--dev-stage", required=True)
p.add_argument("--architecture", required=True)
p.add_argument("--version", required=True)
return p.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(argv if argv is not None else sys.argv[1:])
try:
publish(
args.file,
args.resource_name,
args.dev_stage,
args.architecture,
args.version,
)
return 0
except Exception:
logger.exception("publish failed")
return 1
if __name__ == "__main__":
sys.exit(main())
+9
View File
@@ -11,6 +11,15 @@ from security_provider import create_security_provider
app = FastAPI(title="Azaion.Loader")
@app.on_event("startup")
def _startup_update_manager():
try:
from update_manager import maybe_start_update_background
except Exception:
return
maybe_start_update_background(get_api_client, RESOURCE_API_URL)
security_provider = create_security_provider()
RESOURCE_API_URL = os.environ.get("RESOURCE_API_URL", "https://api.azaion.com")
+266
View File
@@ -0,0 +1,266 @@
import hashlib
import json
import os
import subprocess
import threading
from typing import Any, Callable, Dict, List, Optional
import requests
from loguru import logger
from download_manager import ResumableDownloadManager
from version_collector import VersionCollector
def _aes_key_from_encryption_field(encryption_key: Any) -> bytes:
if isinstance(encryption_key, bytes):
if len(encryption_key) == 32:
return encryption_key
raise ValueError("invalid encryption key")
s = str(encryption_key).strip()
if len(s) == 64 and all(c in "0123456789abcdefABCDEF" for c in s):
return bytes.fromhex(s)
return hashlib.sha256(s.encode("utf-8")).digest()
def _sort_services_loader_last(services: List[str]) -> List[str]:
head = sorted(s for s in services if s != "loader")
tail = [s for s in services if s == "loader"]
return head + tail
def _sort_updates_loader_last(updates: List[dict]) -> List[dict]:
rest = [u for u in updates if u.get("resourceName") != "loader"]
rest.sort(key=lambda u: str(u.get("resourceName", "")))
loader = [u for u in updates if u.get("resourceName") == "loader"]
return rest + loader
class UpdateManager:
def __init__(
self,
api_url: str,
get_token: Callable[[], Optional[str]],
download_manager: ResumableDownloadManager,
version_collector: VersionCollector,
compose_file: str,
model_dir: str,
state_path: str,
interval_seconds: float = 300.0,
*,
subprocess_run: Optional[Callable] = None,
post_get_update: Optional[Callable[..., Any]] = None,
head_content_length: Optional[Callable[..., int]] = None,
stop_event: Optional[threading.Event] = None,
wait_fn: Optional[Callable[[float], bool]] = None,
) -> None:
self._api_url = api_url.rstrip("/")
self._get_token = get_token
self._download_manager = download_manager
self._version_collector = version_collector
self._compose_file = compose_file
self._model_dir = model_dir
self._state_path = state_path
self._interval = interval_seconds
self._subprocess_run = subprocess_run or subprocess.run
self._post_get_update = post_get_update or self._default_post_get_update
self._head_content_length = head_content_length or self._default_head_content_length
self._stop_event = stop_event or threading.Event()
self._wait_fn = wait_fn
def _default_post_get_update(self, token: str, body: dict) -> Any:
url = f"{self._api_url}/get-update"
resp = requests.post(
url,
json=body,
headers={"Authorization": f"Bearer {token}"},
timeout=120,
)
resp.raise_for_status()
return resp.json()
def _default_head_content_length(self, url: str, token: str) -> int:
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
resp = requests.head(url, headers=headers, allow_redirects=True, timeout=120)
resp.raise_for_status()
cl = resp.headers.get("Content-Length")
if not cl:
raise ValueError("missing Content-Length")
return int(cl)
def _load_state(self) -> dict:
if not os.path.isfile(self._state_path):
return {"pending_compose": []}
with open(self._state_path, encoding="utf-8") as f:
data = json.load(f)
if "pending_compose" not in data:
data["pending_compose"] = []
return data
def _save_state(self, data: dict) -> None:
directory = os.path.dirname(self._state_path)
if directory:
os.makedirs(directory, exist_ok=True)
tmp = self._state_path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True)
os.replace(tmp, self._state_path)
def _drain_pending_compose(self) -> None:
state = self._load_state()
pending = list(dict.fromkeys(state.get("pending_compose") or []))
if not pending:
return
for svc in _sort_services_loader_last(pending):
self._subprocess_run(
["docker", "compose", "-f", self._compose_file, "up", "-d", svc],
check=True,
)
state["pending_compose"] = []
self._save_state(state)
def _current_versions_payload(self) -> Dict[str, str]:
rows = self._version_collector.collect()
return {r.resource_name: r.version for r in rows}
def _build_get_update_body(self) -> dict:
return {
"dev_stage": os.environ.get("LOADER_DEV_STAGE", ""),
"architecture": os.environ.get("LOADER_ARCH", ""),
"current_versions": self._current_versions_payload(),
}
def _artifact_size(self, url: str, token: str) -> int:
return self._head_content_length(url, token)
def _apply_model(self, item: dict, token: str) -> None:
name = str(item["resourceName"])
version = str(item["version"])
url = str(item["cdnUrl"])
sha256 = str(item["sha256"])
key = _aes_key_from_encryption_field(item["encryptionKey"])
size = self._artifact_size(url, token)
job_id = f"update-{name}-{version}"
os.makedirs(self._model_dir, exist_ok=True)
out_path = os.path.join(self._model_dir, f"azaion-{version}.trt")
self._download_manager.fetch_decrypt_verify(
job_id,
url,
sha256,
size,
key,
out_path,
)
self._version_collector.invalidate()
def _mark_pending_compose(self, service: str) -> None:
state = self._load_state()
pending = list(state.get("pending_compose") or [])
if service not in pending:
pending.append(service)
state["pending_compose"] = pending
self._save_state(state)
def _clear_pending_compose(self, service: str) -> None:
state = self._load_state()
pending = [s for s in (state.get("pending_compose") or []) if s != service]
state["pending_compose"] = pending
self._save_state(state)
def _apply_docker_image(self, item: dict, token: str) -> None:
name = str(item["resourceName"])
version = str(item["version"])
url = str(item["cdnUrl"])
sha256 = str(item["sha256"])
key = _aes_key_from_encryption_field(item["encryptionKey"])
size = self._artifact_size(url, token)
job_id = f"update-{name}-{version}"
artifact_dir = os.path.dirname(self._state_path)
os.makedirs(artifact_dir, exist_ok=True)
out_tar = os.path.join(artifact_dir, f"{job_id}.plaintext.tar")
self._download_manager.fetch_decrypt_verify(
job_id,
url,
sha256,
size,
key,
out_tar,
)
self._subprocess_run(["docker", "load", "-i", out_tar], check=True)
self._version_collector.invalidate()
self._mark_pending_compose(name)
self._subprocess_run(
["docker", "compose", "-f", self._compose_file, "up", "-d", name],
check=True,
)
self._clear_pending_compose(name)
def _tick_once(self) -> None:
token = self._get_token()
if not token:
return
self._drain_pending_compose()
body = self._build_get_update_body()
updates = self._post_get_update(token, body)
if not isinstance(updates, list):
return
for item in _sort_updates_loader_last(updates):
rname = str(item.get("resourceName", ""))
if rname == "detection_model":
self._apply_model(item, token)
else:
self._apply_docker_image(item, token)
def run_forever(self) -> None:
while not self._stop_event.is_set():
try:
self._drain_pending_compose()
self._tick_once()
except Exception as exc:
logger.exception("update manager tick failed: {}", exc)
if self._wait_fn is not None:
if self._wait_fn(self._interval):
break
elif self._stop_event.wait(self._interval):
break
def maybe_start_update_background(
get_api_client: Callable[[], Any],
api_url: str,
) -> None:
state_dir = os.environ.get("LOADER_DOWNLOAD_STATE_DIR")
if not state_dir:
return
model_dir = os.environ.get("LOADER_MODEL_DIR", "models")
compose_file = os.environ.get("LOADER_COMPOSE_FILE", "docker-compose.yml")
interval = float(os.environ.get("LOADER_UPDATE_INTERVAL_SEC", "300"))
orchestrator_path = os.environ.get(
"LOADER_UPDATE_STATE_PATH",
os.path.join(state_dir, "update_orchestrator.json"),
)
def token_getter() -> Optional[str]:
client = get_api_client()
return getattr(client, "token", None)
try:
dm = ResumableDownloadManager(state_dir)
vc = VersionCollector(model_dir)
um = UpdateManager(
api_url,
token_getter,
dm,
vc,
compose_file,
model_dir,
orchestrator_path,
interval_seconds=interval,
)
except Exception as exc:
logger.exception("update manager failed to start: {}", exc)
return
threading.Thread(target=um.run_forever, name="loader-updates", daemon=True).start()
+91
View File
@@ -0,0 +1,91 @@
import os
import re
import subprocess
from dataclasses import asdict, dataclass
from typing import Callable, List, Optional
TRT_DATE_PATTERN = re.compile(r"^azaion-(\d{4}-\d{2}-\d{2})\.trt$", re.IGNORECASE)
@dataclass(frozen=True)
class ResourceVersion:
resource_name: str
version: str
class VersionCollector:
def __init__(
self,
model_dir: str,
*,
subprocess_run: Optional[Callable] = None,
) -> None:
self._model_dir = model_dir
self._subprocess_run = subprocess_run or subprocess.run
self._cache: Optional[List[ResourceVersion]] = None
def invalidate(self) -> None:
self._cache = None
def collect(self) -> List[ResourceVersion]:
if self._cache is not None:
return list(self._cache)
rows = self._collect_uncached()
self._cache = rows
return list(rows)
def collect_as_dicts(self) -> List[dict]:
return [asdict(r) for r in self.collect()]
def _collect_uncached(self) -> List[ResourceVersion]:
out: List[ResourceVersion] = []
mv = self._best_trt_version()
if mv is not None:
out.append(ResourceVersion("detection_model", mv))
out.extend(self._docker_versions())
rest = [r for r in out if r.resource_name != "detection_model"]
rest.sort(key=lambda r: r.resource_name)
if mv is not None:
return [ResourceVersion("detection_model", mv)] + rest
return rest
def _best_trt_version(self) -> Optional[str]:
if not os.path.isdir(self._model_dir):
return None
best: Optional[str] = None
for name in os.listdir(self._model_dir):
m = TRT_DATE_PATTERN.match(name)
if not m:
continue
v = m.group(1)
if best is None or v > best:
best = v
return best
def _docker_versions(self) -> List[ResourceVersion]:
try:
result = self._subprocess_run(
["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"],
capture_output=True,
text=True,
check=True,
)
except (OSError, subprocess.CalledProcessError):
return []
found: List[ResourceVersion] = []
for line in result.stdout.splitlines():
line = line.strip()
if not line or ":<none>" in line:
continue
if not line.startswith("azaion/"):
continue
if ":" not in line:
continue
repo, tag = line.rsplit(":", 1)
if tag in ("<none>", ""):
continue
parts = repo.split("/", 1)
if len(parts) < 2:
continue
found.append(ResourceVersion(parts[1], tag))
return found
+331
View File
@@ -0,0 +1,331 @@
import gzip
import importlib.util
import io
import os
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
import yaml
from download_manager import decrypt_cbc_file
_ROOT = Path(__file__).resolve().parents[1]
_SCRIPT = _ROOT / "scripts" / "publish_artifact.py"
_WOODPECKER = _ROOT / ".woodpecker" / "build-arm.yml"
def _load_publish():
spec = importlib.util.spec_from_file_location("publish_artifact", _SCRIPT)
mod = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(mod)
return mod
def _s3_client_factory(storage):
def client(service_name, **kwargs):
if service_name != "s3":
raise AssertionError(service_name)
m = MagicMock()
def upload_fileobj(body, bucket, key):
storage.setdefault(bucket, {})[key] = body.read()
m.upload_fileobj.side_effect = upload_fileobj
def get_object(Bucket=None, Key=None):
return {"Body": io.BytesIO(storage[Bucket][Key])}
m.get_object.side_effect = get_object
return m
return client
class TestPublishArtifact(unittest.TestCase):
def setUp(self):
self._env_patch = None
def tearDown(self):
if self._env_patch:
self._env_patch.stop()
def _base_env(self):
return {
"S3_ENDPOINT": "https://s3.example.test",
"S3_ACCESS_KEY": "ak",
"S3_SECRET_KEY": "sk",
"S3_BUCKET": "test-bucket",
"ADMIN_API_URL": "https://admin.example.test",
"ADMIN_API_TOKEN": "token",
}
def test_ac1_end_to_end_publish(self):
# Arrange
mod = _load_publish()
env = self._base_env()
self._env_patch = patch.dict(os.environ, env, clear=False)
self._env_patch.start()
captured = {}
storage = {}
def fake_post(url, headers=None, json=None, timeout=None):
class R:
status_code = 200
def raise_for_status(self):
pass
captured["url"] = url
captured["body"] = json
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"artifact-bytes")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=fake_post):
# Act
out = mod.publish(
src,
"loader",
"dev",
"arm64",
"v1",
)
# Assert
self.assertEqual(
out["object_key"],
"dev/loader-arm64-v1.enc",
)
key = out["object_key"]
body = storage["test-bucket"][key]
h = __import__("hashlib").sha256(body).hexdigest().lower()
self.assertEqual(h, out["sha256"])
self.assertEqual(captured["body"]["sha256"], out["sha256"])
self.assertEqual(captured["body"]["size_bytes"], len(body))
self.assertEqual(captured["body"]["encryption_key"], out["encryption_key_hex"])
self.assertEqual(captured["body"]["cdn_url"], out["cdn_url"])
finally:
os.unlink(src)
def test_ac2_woodpecker_publish_step_after_build(self):
# Arrange
raw = _WOODPECKER.read_text(encoding="utf-8")
# Act
doc = yaml.safe_load(raw)
names = [s["name"] for s in doc["steps"]]
# Assert
self.assertIn("build-push", names)
self.assertIn("publish-artifact", names)
self.assertLess(names.index("build-push"), names.index("publish-artifact"))
build_cmds = "\n".join(doc["steps"][names.index("build-push")]["commands"])
self.assertIn("docker save", build_cmds)
pub_cmds = "\n".join(doc["steps"][names.index("publish-artifact")]["commands"])
self.assertIn("publish_artifact.py", pub_cmds)
self.assertIn("loader-image.tar", pub_cmds)
def test_ac3_unique_key_per_publish(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
keys = []
storage = {}
def capture_post(url, headers=None, json=None, timeout=None):
keys.append(json["encryption_key"])
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"x")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=capture_post):
# Act
mod.publish(src, "r", "dev", "arm64", "1")
mod.publish(src, "r", "dev", "arm64", "2")
# Assert
self.assertEqual(len(keys), 2)
self.assertNotEqual(keys[0], keys[1])
self.assertEqual(len(bytes.fromhex(keys[0])), 32)
self.assertEqual(len(bytes.fromhex(keys[1])), 32)
finally:
os.unlink(src)
def test_ac4_sha256_matches_s3_object_and_registration(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
posted = {}
storage = {}
def fake_post(url, headers=None, json=None, timeout=None):
posted.update(json)
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"payload-for-hash")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=fake_post):
# Act
out = mod.publish(src, "m", "stage", "arm64", "9.9.9")
key = out["object_key"]
body = storage["test-bucket"][key]
expect = __import__("hashlib").sha256(body).hexdigest().lower()
# Assert
self.assertEqual(posted["sha256"], expect)
self.assertEqual(out["sha256"], expect)
finally:
os.unlink(src)
def test_ac5_main_entry_matches_cli_invocation(self):
# Arrange
mod = _load_publish()
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
self._env_patch.start()
storage = {}
def ok_post(url, headers=None, json=None, timeout=None):
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
fd, src = tempfile.mkstemp()
os.close(fd)
try:
with open(src, "wb") as f:
f.write(b"cli-data")
with patch.object(
mod.boto3, "client", side_effect=_s3_client_factory(storage)
), patch.object(mod.requests, "post", side_effect=ok_post):
# Act
code = mod.main(
[
"--file",
src,
"--resource-name",
"model",
"--dev-stage",
"dev",
"--architecture",
"arm64",
"--version",
"0.0.1",
]
)
# Assert
self.assertEqual(code, 0)
self.assertGreater(
len(storage["test-bucket"]["dev/model-arm64-0.0.1.enc"]), 0
)
finally:
os.unlink(src)
def test_ac5_cli_help_exits_zero(self):
# Act
r = subprocess.run(
[sys.executable, str(_SCRIPT), "--help"],
cwd=str(_ROOT),
capture_output=True,
text=True,
)
# Assert
self.assertEqual(r.returncode, 0)
self.assertIn("--resource-name", r.stdout)
def test_ac5_subprocess_script_missing_env_exits_nonzero(self):
# Arrange
fd, path = tempfile.mkstemp()
os.close(fd)
try:
minimal_env = {
k: v
for k, v in os.environ.items()
if k in ("PATH", "HOME", "TMPDIR", "SYSTEMROOT")
}
# Act
r = subprocess.run(
[
sys.executable,
str(_SCRIPT),
"--file",
path,
"--resource-name",
"x",
"--dev-stage",
"d",
"--architecture",
"arm64",
"--version",
"1",
],
cwd=str(_ROOT),
env=minimal_env,
capture_output=True,
text=True,
)
# Assert
self.assertNotEqual(r.returncode, 0)
finally:
os.unlink(path)
def test_encryption_compatible_with_decrypt_cbc_file(self):
# Arrange
mod = _load_publish()
aes_key = os.urandom(32)
fd, plain = tempfile.mkstemp()
os.close(fd)
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
dec_path = tempfile.NamedTemporaryFile(delete=False, suffix=".bin").name
try:
with open(plain, "wb") as f:
f.write(b"round-trip-plain")
mod.gzip_file(plain, gz_path)
mod.encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
# Act
decrypt_cbc_file(enc_path, aes_key, dec_path)
with open(dec_path, "rb") as f:
restored = gzip.decompress(f.read())
# Assert
self.assertEqual(restored, b"round-trip-plain")
finally:
for p in (plain, gz_path, enc_path, dec_path):
try:
os.unlink(p)
except OSError:
pass
+351
View File
@@ -0,0 +1,351 @@
import json
import os
import subprocess
import tempfile
import unittest
from typing import List
from unittest.mock import MagicMock
from download_manager import ResumableDownloadManager
from update_manager import UpdateManager, maybe_start_update_background
from version_collector import VersionCollector
class TestUpdateManager(unittest.TestCase):
def _make_manager(
self,
tmp: str,
*,
post_get_update=None,
subprocess_run=None,
head_content_length=None,
wait_fn=None,
stop_event=None,
):
dm_dir = os.path.join(tmp, "dm")
model_dir = os.path.join(tmp, "models")
state_path = os.path.join(dm_dir, "update_orchestrator.json")
os.makedirs(model_dir, exist_ok=True)
dm = ResumableDownloadManager(dm_dir)
vc = VersionCollector(model_dir, subprocess_run=subprocess_run or MagicMock())
um = UpdateManager(
"http://api.test",
lambda: "tok",
dm,
vc,
os.path.join(tmp, "compose.yml"),
model_dir,
state_path,
interval_seconds=300.0,
subprocess_run=subprocess_run,
post_get_update=post_get_update,
head_content_length=head_content_length,
wait_fn=wait_fn,
stop_event=stop_event,
)
return um, dm, vc
def test_ac2_background_loop_polls_on_schedule(self):
# Arrange
tmp = tempfile.mkdtemp()
posts: List[dict] = []
def post(token, body):
posts.append({"token": token, "body": body})
return []
waits: List[float] = []
def wait_fn(interval):
waits.append(interval)
return len(waits) >= 2
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
um, _, _ = self._make_manager(
tmp,
post_get_update=post,
subprocess_run=fake_run,
head_content_length=lambda url, token: 1,
wait_fn=wait_fn,
)
# Act
um.run_forever()
# Assert
self.assertEqual(len(posts), 2)
self.assertEqual(waits, [300.0, 300.0])
def test_ac2_default_interval_is_five_minutes(self):
# Arrange / Act
tmp = tempfile.mkdtemp()
dm_dir = os.path.join(tmp, "dm")
model_dir = os.path.join(tmp, "m")
os.makedirs(model_dir, exist_ok=True)
dm = ResumableDownloadManager(dm_dir)
vc = VersionCollector(model_dir, subprocess_run=MagicMock())
um = UpdateManager(
"http://x",
lambda: None,
dm,
vc,
"c.yml",
model_dir,
os.path.join(dm_dir, "st.json"),
)
# Assert
self.assertEqual(um._interval, 300.0)
def test_ac3_ai_model_update_applied(self):
# Arrange
tmp = tempfile.mkdtemp()
model_dir = os.path.join(tmp, "models")
os.makedirs(model_dir, exist_ok=True)
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
dm_mock = MagicMock()
def post(token, body):
return [
{
"resourceName": "detection_model",
"version": "2026-04-20",
"cdnUrl": "http://cdn/x",
"sha256": "ab",
"encryptionKey": "k",
}
]
um, _, _ = self._make_manager(
tmp,
post_get_update=post,
subprocess_run=fake_run,
head_content_length=lambda url, token: 4,
)
um._download_manager = dm_mock
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
with open(output_plaintext_path, "wb") as f:
f.write(b"trt")
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
# Act
um._tick_once()
# Assert
dm_mock.fetch_decrypt_verify.assert_called_once()
args, kwargs = dm_mock.fetch_decrypt_verify.call_args
self.assertTrue(args[5].endswith("azaion-2026-04-20.trt"))
self.assertTrue(os.path.isfile(os.path.join(model_dir, "azaion-2026-04-20.trt")))
def test_ac4_docker_image_update_applied(self):
# Arrange
tmp = tempfile.mkdtemp()
recorded: List[List[str]] = []
def fake_run(cmd, **kwargs):
recorded.append(list(cmd))
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:3] == ["docker", "load", "-i"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:2] == ["docker", "compose"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
dm_mock = MagicMock()
def post(token, body):
return [
{
"resourceName": "annotations",
"version": "2026-04-13",
"cdnUrl": "http://cdn/a",
"sha256": "cd",
"encryptionKey": "k",
}
]
um, _, _ = self._make_manager(
tmp,
post_get_update=post,
subprocess_run=fake_run,
head_content_length=lambda url, token: 8,
)
um._download_manager = dm_mock
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
with open(output_plaintext_path, "wb") as f:
f.write(b"tarbytes")
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
# Act
um._tick_once()
# Assert
loads = [c for c in recorded if c[:3] == ["docker", "load", "-i"]]
composes = [c for c in recorded if c[:2] == ["docker", "compose"]]
self.assertEqual(len(loads), 1)
self.assertEqual(len(composes), 1)
self.assertIn("annotations", composes[0])
def test_ac5_self_update_applied_last(self):
# Arrange
tmp = tempfile.mkdtemp()
recorded: List[str] = []
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:3] == ["docker", "load", "-i"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:2] == ["docker", "compose"]:
recorded.append(cmd[-1])
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
dm_mock = MagicMock()
def post(token, body):
return [
{
"resourceName": "loader",
"version": "v2",
"cdnUrl": "http://cdn/l",
"sha256": "00",
"encryptionKey": "k",
},
{
"resourceName": "annotations",
"version": "v1",
"cdnUrl": "http://cdn/a",
"sha256": "11",
"encryptionKey": "k",
},
]
um, _, _ = self._make_manager(
tmp,
post_get_update=post,
subprocess_run=fake_run,
head_content_length=lambda url, token: 1,
)
um._download_manager = dm_mock
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
with open(output_plaintext_path, "wb") as f:
f.write(b"x")
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
# Act
um._tick_once()
# Assert
self.assertEqual(recorded, ["annotations", "loader"])
def test_ac6_invalidate_after_docker_apply(self):
# Arrange
tmp = tempfile.mkdtemp()
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:3] == ["docker", "load", "-i"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:2] == ["docker", "compose"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
dm_mock = MagicMock()
def post(token, body):
return [
{
"resourceName": "annotations",
"version": "v9",
"cdnUrl": "http://cdn/a",
"sha256": "11",
"encryptionKey": "k",
}
]
um, _, vc = self._make_manager(
tmp,
post_get_update=post,
subprocess_run=fake_run,
head_content_length=lambda url, token: 1,
)
um._download_manager = dm_mock
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
with open(output_plaintext_path, "wb") as f:
f.write(b"x")
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
vc.collect()
self.assertIsNotNone(vc._cache)
# Act
um._tick_once()
# Assert
self.assertIsNone(vc._cache)
def test_maybe_start_skips_without_download_state_dir(self):
# Arrange
old = os.environ.pop("LOADER_DOWNLOAD_STATE_DIR", None)
try:
def get_client():
return MagicMock()
# Act
maybe_start_update_background(get_client, "http://x")
finally:
if old is not None:
os.environ["LOADER_DOWNLOAD_STATE_DIR"] = old
def test_pending_compose_drained_on_startup(self):
# Arrange
tmp = tempfile.mkdtemp()
dm_dir = os.path.join(tmp, "dm")
os.makedirs(dm_dir, exist_ok=True)
state_path = os.path.join(dm_dir, "update_orchestrator.json")
with open(state_path, "w", encoding="utf-8") as f:
json.dump({"pending_compose": ["annotations", "loader"]}, f)
recorded: List[str] = []
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if cmd[:2] == ["docker", "compose"]:
recorded.append(cmd[-1])
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(cmd)
model_dir = os.path.join(tmp, "m")
os.makedirs(model_dir, exist_ok=True)
dm = ResumableDownloadManager(dm_dir)
vc = VersionCollector(model_dir, subprocess_run=fake_run)
um = UpdateManager(
"http://api.test",
lambda: None,
dm,
vc,
os.path.join(tmp, "compose.yml"),
model_dir,
state_path,
subprocess_run=fake_run,
)
# Act
um._drain_pending_compose()
# Assert
self.assertEqual(recorded, ["annotations", "loader"])
with open(state_path, encoding="utf-8") as f:
data = json.load(f)
self.assertEqual(data.get("pending_compose"), [])
if __name__ == "__main__":
unittest.main()
+65
View File
@@ -0,0 +1,65 @@
import os
import subprocess
import tempfile
import unittest
from version_collector import VersionCollector
class TestVersionCollector(unittest.TestCase):
def test_ac1_version_collector_reads_local_state(self):
# Arrange
tmp = tempfile.mkdtemp()
open(os.path.join(tmp, "azaion-2026-03-10.trt"), "wb").close()
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(
cmd,
0,
stdout="azaion/annotations:arm64_2026-03-01\n",
stderr="",
)
raise AssertionError(f"unexpected cmd {cmd}")
vc = VersionCollector(tmp, subprocess_run=fake_run)
# Act
got = vc.collect_as_dicts()
# Assert
self.assertEqual(
got,
[
{"resource_name": "detection_model", "version": "2026-03-10"},
{"resource_name": "annotations", "version": "arm64_2026-03-01"},
],
)
def test_ac6_cache_invalidates_after_changes(self):
# Arrange
tmp = tempfile.mkdtemp()
open(os.path.join(tmp, "azaion-2026-01-01.trt"), "wb").close()
def fake_run(cmd, **kwargs):
if cmd[:3] == ["docker", "images", "--format"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise AssertionError(f"unexpected cmd {cmd}")
vc = VersionCollector(tmp, subprocess_run=fake_run)
first = vc.collect_as_dicts()
open(os.path.join(tmp, "azaion-2026-02-01.trt"), "wb").close()
second_cached = vc.collect_as_dicts()
vc.invalidate()
third = vc.collect_as_dicts()
# Assert
self.assertEqual(
first,
[{"resource_name": "detection_model", "version": "2026-01-01"}],
)
self.assertEqual(second_cached, first)
self.assertEqual(
third,
[{"resource_name": "detection_model", "version": "2026-02-01"}],
)
if __name__ == "__main__":
unittest.main()