mirror of
https://github.com/azaion/loader.git
synced 2026-04-22 22:46:32 +00:00
[AZ-185][AZ-186] Batch 2
Made-with: Cursor
This commit is contained in:
@@ -12,5 +12,19 @@ steps:
|
||||
- if [ "$CI_COMMIT_BRANCH" = "main" ]; then export TAG=arm; else export TAG=${CI_COMMIT_BRANCH}-arm; fi
|
||||
- docker build -f Dockerfile -t localhost:5000/loader:$TAG .
|
||||
- docker push localhost:5000/loader:$TAG
|
||||
- docker save localhost:5000/loader:$TAG -o loader-image.tar
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
- name: publish-artifact
|
||||
image: python:3.11-slim
|
||||
commands:
|
||||
- pip install --no-cache-dir boto3==1.40.9 cryptography==44.0.2 requests==2.32.4
|
||||
- |
|
||||
if [ "$CI_COMMIT_BRANCH" = "main" ]; then
|
||||
export PUBLISH_DEV_STAGE=main
|
||||
export TAG=arm
|
||||
else
|
||||
export PUBLISH_DEV_STAGE=$CI_COMMIT_BRANCH
|
||||
export TAG=${CI_COMMIT_BRANCH}-arm
|
||||
fi
|
||||
- python scripts/publish_artifact.py --file loader-image.tar --resource-name loader --dev-stage "$PUBLISH_DEV_STAGE" --architecture arm64 --version "$CI_COMMIT_SHA"
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
# Publish artifact script (AZ-186)
|
||||
|
||||
Training services and CI/CD call `scripts/publish_artifact.py` after producing an artifact (for example a `.trt` model or a Docker image tarball). The script gzip-compresses the file, encrypts it with a random 32-byte AES-256 key (AES-CBC with PKCS7, IV prefixed), uploads the ciphertext to S3, and registers metadata with the admin API.
|
||||
|
||||
## CLI
|
||||
|
||||
```text
|
||||
python scripts/publish_artifact.py \
|
||||
--file /path/to/artifact \
|
||||
--resource-name my_model \
|
||||
--dev-stage dev \
|
||||
--architecture arm64 \
|
||||
--version 2026-04-15
|
||||
```
|
||||
|
||||
Object key: `{dev_stage}/{resource_name}-{architecture}-{version}.enc`
|
||||
|
||||
## Environment variables
|
||||
|
||||
| Variable | Required | Purpose |
|
||||
|----------|----------|---------|
|
||||
| `S3_ENDPOINT` | yes | S3-compatible endpoint URL |
|
||||
| `S3_ACCESS_KEY` | yes | Upload credentials |
|
||||
| `S3_SECRET_KEY` | yes | Upload credentials |
|
||||
| `S3_BUCKET` | yes | Target bucket |
|
||||
| `ADMIN_API_URL` | yes | Admin API base URL (no trailing path for publish) |
|
||||
| `ADMIN_API_TOKEN` | yes | Bearer token for the publish request |
|
||||
| `CDN_PUBLIC_BASE_URL` | no | If set, `cdn_url` in the registration payload is `{CDN_PUBLIC_BASE_URL}/{object_key}`; otherwise it defaults to `{S3_ENDPOINT}/{S3_BUCKET}/{object_key}` |
|
||||
| `ADMIN_API_PUBLISH_PATH` | no | Defaults to `internal/resources/publish`; POST is sent to `{ADMIN_API_URL}/{ADMIN_API_PUBLISH_PATH}` |
|
||||
|
||||
## Admin API contract
|
||||
|
||||
`POST {ADMIN_API_URL}/internal/resources/publish` (unless overridden) with JSON body:
|
||||
|
||||
- `resource_name`, `dev_stage`, `architecture`, `version` (strings)
|
||||
- `cdn_url` (string)
|
||||
- `sha256` (lowercase hex of the uploaded ciphertext file, including the 16-byte IV)
|
||||
- `encryption_key` (64-character hex encoding of the raw 32-byte AES key)
|
||||
- `size_bytes` (integer size of the uploaded ciphertext file)
|
||||
|
||||
The loader expects the same `encryption_key` and `sha256` semantics as returned by fleet `POST /get-update` (hex key, hash of the ciphertext object).
|
||||
|
||||
## Dependencies
|
||||
|
||||
Use the same major versions as the loader: `boto3`, `cryptography`, `requests` (see `requirements.txt`). A minimal install for a training host is:
|
||||
|
||||
```text
|
||||
pip install boto3==1.40.9 cryptography==44.0.2 requests==2.32.4
|
||||
```
|
||||
|
||||
## Woodpecker
|
||||
|
||||
Pipeline `.woodpecker/build-arm.yml` saves the built image to `loader-image.tar` and runs this script in a follow-up step. Configure the environment variables above as Woodpecker secrets for that step.
|
||||
@@ -0,0 +1,129 @@
|
||||
# TPM-Based Security Provider
|
||||
|
||||
**Task**: AZ-182_tpm_security_provider
|
||||
**Name**: TPM Security Provider
|
||||
**Description**: Introduce SecurityProvider abstraction with TPM detection and FAPI integration, wrapping existing security logic in LegacySecurityProvider for backward compatibility
|
||||
**Complexity**: 5 points
|
||||
**Dependencies**: None
|
||||
**Component**: 02 Security
|
||||
**Tracker**: AZ-182
|
||||
**Epic**: AZ-181
|
||||
|
||||
## Problem
|
||||
|
||||
The loader's security code (key derivation, encryption, hardware fingerprinting) is hardcoded for the binary-split scheme. On fused Jetson Orin Nano devices with fTPM, this scheme is unnecessary — full-disk encryption protects data at rest, and the fleet update system (AZ-185) handles encrypted artifact delivery with per-artifact keys. However, the loader still needs a clean abstraction to:
|
||||
1. Detect whether it's running on a TPM-equipped device or a legacy environment
|
||||
2. Provide TPM seal/unseal capability as infrastructure for defense-in-depth (sealed credentials, future key wrapping)
|
||||
3. Preserve the legacy code path for non-TPM deployments
|
||||
|
||||
## Outcome
|
||||
|
||||
- Loader detects TPM availability at startup and selects the appropriate security provider
|
||||
- SecurityProvider abstraction cleanly separates TPM and legacy code paths
|
||||
- TpmSecurityProvider establishes FAPI connection and provides seal/unseal operations
|
||||
- LegacySecurityProvider wraps existing security.pyx unchanged
|
||||
- Foundation in place for fTPM-sealed credentials (future) and per-artifact key decryption integration
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- SecurityProvider abstraction (ABC) with TpmSecurityProvider and LegacySecurityProvider
|
||||
- Runtime TPM detection (/dev/tpm0 + SECURITY_PROVIDER env var override)
|
||||
- tpm2-pytss FAPI integration: connect, create_seal, unseal
|
||||
- LegacySecurityProvider wrapping existing security.pyx (encrypt, decrypt, key derivation)
|
||||
- Auto-detection and provider selection at startup with logging
|
||||
- Docker compose device mounts for /dev/tpm0 and /dev/tpmrm0
|
||||
- Dockerfile changes: install tpm2-tss native library + tpm2-pytss
|
||||
- Tests using TPM simulator (swtpm)
|
||||
|
||||
### Excluded
|
||||
- Resource download/upload changes (handled by AZ-185 Update Manager with per-artifact keys)
|
||||
- Docker unlock flow changes (handled by AZ-185 Update Manager)
|
||||
- fTPM provisioning pipeline (manufacturing-time, separate from code)
|
||||
- Remote attestation via EK certificates
|
||||
- fTPM-sealed device credentials (future enhancement, not v1)
|
||||
- Changes to the Azaion admin API server
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: SecurityProvider auto-detection**
|
||||
Given a Jetson device with provisioned fTPM and /dev/tpm0 accessible
|
||||
When the loader starts
|
||||
Then TpmSecurityProvider is selected and logged
|
||||
|
||||
**AC-2: TPM seal/unseal round-trip**
|
||||
Given TpmSecurityProvider is active
|
||||
When data is sealed via FAPI create_seal and later unsealed
|
||||
Then the unsealed data matches the original
|
||||
|
||||
**AC-3: Legacy path unchanged**
|
||||
Given no TPM is available (/dev/tpm0 absent)
|
||||
When the loader starts and processes resource requests
|
||||
Then LegacySecurityProvider is selected and all behavior is identical to the current scheme
|
||||
|
||||
**AC-4: Env var override**
|
||||
Given SECURITY_PROVIDER=legacy is set
|
||||
When the loader starts on a device with /dev/tpm0 present
|
||||
Then LegacySecurityProvider is selected regardless of TPM availability
|
||||
|
||||
**AC-5: Graceful fallback**
|
||||
Given /dev/tpm0 exists but FAPI connection fails
|
||||
When the loader starts
|
||||
Then it falls back to LegacySecurityProvider with a warning log
|
||||
|
||||
**AC-6: Docker container TPM access**
|
||||
Given docker-compose.yml with /dev/tpm0 and /dev/tpmrm0 device mounts
|
||||
When the loader container starts on a fused Jetson
|
||||
Then TpmSecurityProvider can connect to fTPM via FAPI
|
||||
|
||||
## Non-Functional Requirements
|
||||
|
||||
**Performance**
|
||||
- TPM seal/unseal latency must be under 500ms per operation
|
||||
|
||||
**Compatibility**
|
||||
- Must work on ARM64 Jetson Orin Nano with JetPack 6.1+
|
||||
- Must work inside Docker containers with --device mounts
|
||||
- tpm2-pytss must be compatible with Python 3.11 and Cython compilation
|
||||
|
||||
**Reliability**
|
||||
- Graceful fallback to LegacySecurityProvider on any TPM initialization failure
|
||||
- No crash on /dev/tpm0 absence — clean detection and fallback
|
||||
|
||||
## Unit Tests
|
||||
|
||||
| AC Ref | What to Test | Required Outcome |
|
||||
|--------|-------------|-----------------|
|
||||
| AC-1 | SecurityProvider factory with /dev/tpm0 mock present | TpmSecurityProvider selected |
|
||||
| AC-2 | FAPI create_seal + unseal via swtpm | Data matches round-trip |
|
||||
| AC-3 | SecurityProvider factory without /dev/tpm0 | LegacySecurityProvider selected |
|
||||
| AC-4 | SECURITY_PROVIDER=legacy env var with /dev/tpm0 present | LegacySecurityProvider selected |
|
||||
| AC-5 | /dev/tpm0 exists but FAPI raises exception | LegacySecurityProvider selected, warning logged |
|
||||
|
||||
## Blackbox Tests
|
||||
|
||||
| AC Ref | Initial Data/Conditions | What to Test | Expected Behavior | NFR References |
|
||||
|--------|------------------------|-------------|-------------------|----------------|
|
||||
| AC-3 | No TPM device available | POST /load/{filename} (split resource) | Existing binary-split behavior, all current tests pass | Compatibility |
|
||||
| AC-6 | TPM simulator in Docker | Container starts with device mounts | FAPI connects, seal/unseal works | Compatibility |
|
||||
|
||||
## Constraints
|
||||
|
||||
- tpm2-pytss requires tpm2-tss >= 2.4.0 native library in the Docker image
|
||||
- Tests require swtpm (software TPM simulator) — must be added to test infrastructure
|
||||
- fTPM provisioning is out of scope — this task assumes a provisioned TPM exists
|
||||
- PCR-based policy binding intentionally not used (known persistence issues on Orin Nano)
|
||||
|
||||
## Risks & Mitigation
|
||||
|
||||
**Risk 1: fTPM FAPI stability on Jetson Orin Nano**
|
||||
- *Risk*: FAPI seal/unseal may have undocumented issues on Orin Nano (similar to PCR/NV persistence bugs)
|
||||
- *Mitigation*: Design intentionally avoids PCR policies and NV indexes; uses SRK hierarchy only. Hardware validation required before production deployment.
|
||||
|
||||
**Risk 2: swtpm test fidelity**
|
||||
- *Risk*: Software TPM simulator may not reproduce all fTPM behaviors
|
||||
- *Mitigation*: Integration tests on actual Jetson hardware as part of acceptance testing (outside CI).
|
||||
|
||||
**Risk 3: tpm2-tss native library in Docker image**
|
||||
- *Risk*: tpm2-tss may not be available in python:3.11-slim base image; ARM64 build may need compilation
|
||||
- *Mitigation*: Add tpm2-tss to Dockerfile build step; verify ARM64 compatibility early.
|
||||
@@ -0,0 +1,79 @@
|
||||
# Resumable Download Manager
|
||||
|
||||
**Task**: AZ-184_resumable_download_manager
|
||||
**Name**: Resumable Download Manager
|
||||
**Description**: Implement a resumable HTTP download manager for the loader that handles intermittent Starlink connectivity
|
||||
**Complexity**: 3 points
|
||||
**Dependencies**: None
|
||||
**Component**: Loader
|
||||
**Tracker**: AZ-184
|
||||
**Epic**: AZ-181
|
||||
|
||||
## Problem
|
||||
|
||||
Jetsons on UAVs have intermittent Starlink connectivity. Downloads of large artifacts (AI models ~500MB, Docker images ~1GB) must survive connection drops and resume from where they left off.
|
||||
|
||||
## Outcome
|
||||
|
||||
- Downloads resume from the last byte received after connectivity loss
|
||||
- Completed downloads are verified with SHA-256 before use
|
||||
- Downloaded artifacts are decrypted with per-artifact AES-256 keys
|
||||
- State persists across loader restarts
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- Resumable HTTP downloads using Range headers (S3 supports natively)
|
||||
- JSON state file on disk tracking: url, expected_sha256, expected_size, bytes_downloaded, temp_file_path
|
||||
- SHA-256 verification of completed downloads
|
||||
- AES-256 decryption of downloaded artifacts using per-artifact key from /get-update response
|
||||
- Retry with exponential backoff (1min, 5min, 15min, 1hr, max 4hr)
|
||||
- State machine: pending -> downloading -> paused -> verifying -> decrypting -> complete / failed
|
||||
|
||||
### Excluded
|
||||
- Update check logic (AZ-185)
|
||||
- Applying updates (AZ-185)
|
||||
- CDN upload (AZ-186)
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Resume after connection drop**
|
||||
Given a download is 60% complete and connectivity is lost
|
||||
When connectivity returns
|
||||
Then download resumes from byte offset (60% of file), not from scratch
|
||||
|
||||
**AC-2: SHA-256 mismatch triggers re-download**
|
||||
Given a completed download with corrupted data
|
||||
When SHA-256 verification fails
|
||||
Then the partial file is deleted and download restarts from scratch
|
||||
|
||||
**AC-3: Decryption produces correct output**
|
||||
Given a completed and verified download
|
||||
When decrypted with the per-artifact AES-256 key
|
||||
Then the output matches the original unencrypted artifact
|
||||
|
||||
**AC-4: State survives restart**
|
||||
Given a download is 40% complete and the loader container restarts
|
||||
When the loader starts again
|
||||
Then the download resumes from 40%, not from scratch
|
||||
|
||||
**AC-5: Exponential backoff on repeated failures**
|
||||
Given multiple consecutive connection failures
|
||||
When retrying
|
||||
Then wait times follow exponential backoff pattern
|
||||
|
||||
## Unit Tests
|
||||
|
||||
| AC Ref | What to Test | Required Outcome |
|
||||
|--------|-------------|-----------------|
|
||||
| AC-1 | Mock HTTP server drops connection mid-transfer | Resume with Range header from correct offset |
|
||||
| AC-2 | Corrupt downloaded file | SHA-256 check fails, file deleted, retry flag set |
|
||||
| AC-3 | Encrypt test file, download, decrypt | Round-trip matches original |
|
||||
| AC-4 | Write state file, reload | State correctly restored |
|
||||
| AC-5 | Track retry intervals | Backoff pattern matches spec |
|
||||
|
||||
## Constraints
|
||||
|
||||
- Must work inside Docker container
|
||||
- S3-compatible CDN (current CDNManager already uses boto3)
|
||||
- State file location must be on a volume that persists across container restarts
|
||||
@@ -0,0 +1,76 @@
|
||||
# Update Manager
|
||||
|
||||
**Task**: AZ-185_update_manager
|
||||
**Name**: Update Manager
|
||||
**Description**: Implement the loader's background update loop that checks for new versions and applies AI model and Docker image updates
|
||||
**Complexity**: 5 points
|
||||
**Dependencies**: AZ-183, AZ-184
|
||||
**Component**: Loader
|
||||
**Tracker**: AZ-185
|
||||
**Epic**: AZ-181
|
||||
|
||||
## Problem
|
||||
|
||||
Jetsons need to automatically discover and install new AI models and Docker images without manual intervention. The update loop must handle version detection, server communication, and applying different update types.
|
||||
|
||||
## Outcome
|
||||
|
||||
- Loader automatically checks for updates every 5 minutes
|
||||
- New AI models downloaded, decrypted, and placed in model directory
|
||||
- New Docker images loaded and services restarted with minimal downtime
|
||||
- Loader can update itself (self-update, applied last)
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- Version collector: scan model directory for .trt files (extract date from filename), query docker images for azaion/* tags, cache results
|
||||
- Background loop (configurable interval, default 5 min): collect versions, call POST /get-update, trigger downloads
|
||||
- Apply AI model: move decrypted .trt to model directory (detection API scans and picks newest)
|
||||
- Apply Docker image: docker load -i, docker compose up -d {service}
|
||||
- Self-update: loader updates itself last via docker compose up -d loader
|
||||
- Integration with AZ-184 Resumable Download Manager for all downloads
|
||||
|
||||
### Excluded
|
||||
- Server-side /get-update endpoint (AZ-183)
|
||||
- Download mechanics (AZ-184)
|
||||
- CI/CD publish pipeline (AZ-186)
|
||||
- Device provisioning (AZ-187)
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Version collector reads local state**
|
||||
Given AI model azaion-2026-03-10.trt in model directory and Docker image azaion/annotations:arm64_2026-03-01 loaded
|
||||
When version collector runs
|
||||
Then it reports [{resource_name: "detection_model", version: "2026-03-10"}, {resource_name: "annotations", version: "arm64_2026-03-01"}]
|
||||
|
||||
**AC-2: Background loop polls on schedule**
|
||||
Given the loader is running with update interval set to 5 minutes
|
||||
When 5 minutes elapse
|
||||
Then POST /get-update is called with current versions
|
||||
|
||||
**AC-3: AI model update applied**
|
||||
Given /get-update returns a new detection_model version
|
||||
When download and decryption complete
|
||||
Then new .trt file is in the model directory
|
||||
|
||||
**AC-4: Docker image update applied**
|
||||
Given /get-update returns a new annotations version
|
||||
When download and decryption complete
|
||||
Then docker load succeeds and docker compose up -d annotations restarts the service
|
||||
|
||||
**AC-5: Self-update applied last**
|
||||
Given /get-update returns updates for both annotations and loader
|
||||
When applying updates
|
||||
Then annotations is updated first, loader is updated last
|
||||
|
||||
**AC-6: Cached versions refresh after changes**
|
||||
Given version collector cached its results
|
||||
When a new model file appears in the directory or docker load completes
|
||||
Then cache is invalidated and next collection reflects new state
|
||||
|
||||
## Constraints
|
||||
|
||||
- Docker socket must be mounted in the loader container (already the case)
|
||||
- docker compose file path must be configurable (env var)
|
||||
- Model directory path must be configurable (env var)
|
||||
- Self-update must be robust: state file on disk ensures in-progress updates survive container restart
|
||||
@@ -0,0 +1,67 @@
|
||||
# CI/CD Artifact Publish
|
||||
|
||||
**Task**: AZ-186_cicd_artifact_publish
|
||||
**Name**: CI/CD Artifact Publish
|
||||
**Description**: Add encrypt-and-publish step to Woodpecker CI/CD pipeline and create a shared publish script usable by both CI/CD and training service
|
||||
**Complexity**: 3 points
|
||||
**Dependencies**: AZ-183
|
||||
**Component**: DevOps
|
||||
**Tracker**: AZ-186
|
||||
**Epic**: AZ-181
|
||||
|
||||
## Problem
|
||||
|
||||
Both CI/CD (for Docker images) and the training service (for AI models) need to encrypt artifacts and publish them to CDN + Resources table. The encryption and publish logic should be shared.
|
||||
|
||||
## Outcome
|
||||
|
||||
- Shared Python publish script that any producer can call
|
||||
- Woodpecker pipeline automatically publishes encrypted Docker archives after build
|
||||
- Training service can publish AI models using the same script
|
||||
- Every artifact gets its own random AES-256 key
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- Shared publish script (Python): generate random AES-256 key, compress (gzip), encrypt (AES-256), SHA-256 hash, upload to S3, write Resources row
|
||||
- Woodpecker pipeline step in build-arm.yml: after docker build+push, also docker save -> publish script
|
||||
- S3 bucket structure: {dev_stage}/{resource_name}-{architecture}-{version}.enc
|
||||
- Documentation for training service integration
|
||||
|
||||
### Excluded
|
||||
- Server-side Resources table (AZ-183, must exist first)
|
||||
- Loader-side download/decrypt (AZ-184)
|
||||
- Training service code changes (their team integrates the script)
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Publish script works end-to-end**
|
||||
Given a local file (Docker archive or AI model)
|
||||
When publish script is called with resource_name, dev_stage, architecture, version
|
||||
Then file is compressed, encrypted with random key, uploaded to S3, and Resources row is written
|
||||
|
||||
**AC-2: Woodpecker publishes after build**
|
||||
Given a push to dev/stage/main branch
|
||||
When Woodpecker build completes
|
||||
Then the Docker image is also published as encrypted archive to CDN with Resources row
|
||||
|
||||
**AC-3: Unique key per artifact**
|
||||
Given two consecutive publishes of the same resource
|
||||
When comparing encryption keys
|
||||
Then each publish used a different random AES-256 key
|
||||
|
||||
**AC-4: SHA-256 consistency**
|
||||
Given a published artifact
|
||||
When SHA-256 of the uploaded S3 object is computed
|
||||
Then it matches the sha256 value in the Resources table
|
||||
|
||||
**AC-5: Training service can use the script**
|
||||
Given the publish script installed as a package or available as a standalone script
|
||||
When the training service calls it after producing a .trt model
|
||||
Then the model is published to CDN + Resources table
|
||||
|
||||
## Constraints
|
||||
|
||||
- Woodpecker runner has access to Docker socket and S3 credentials
|
||||
- Publish script must work on both x86 (CI runner) and arm64 (training server if needed)
|
||||
- S3 credentials and DB connection string passed via environment variables
|
||||
@@ -0,0 +1,62 @@
|
||||
# Device Provisioning Script
|
||||
|
||||
**Task**: AZ-187_device_provisioning_script
|
||||
**Name**: Device Provisioning Script
|
||||
**Description**: Create a shell script that provisions a Jetson device identity (CompanionPC user) during the fuse/flash pipeline
|
||||
**Complexity**: 2 points
|
||||
**Dependencies**: None
|
||||
**Component**: DevOps
|
||||
**Tracker**: AZ-187
|
||||
**Epic**: AZ-181
|
||||
|
||||
## Problem
|
||||
|
||||
Each Jetson needs a unique CompanionPC user account for API authentication. This must be automated as part of the manufacturing/flash process so that provisioning 50+ devices is not manual.
|
||||
|
||||
## Outcome
|
||||
|
||||
- Single script creates device identity and embeds credentials in the rootfs
|
||||
- Integrates into the fuse/flash pipeline between odmfuse.sh and flash.sh
|
||||
- Provisioning runbook documents the full end-to-end flow
|
||||
|
||||
## Scope
|
||||
|
||||
### Included
|
||||
- provision_device.sh: generate device email (azaion-jetson-{serial}@azaion.com), random 32-char password
|
||||
- Call admin API POST /users to create Users row with Role=CompanionPC
|
||||
- Write credentials config file to rootfs image (at known path, e.g., /etc/azaion/device.conf)
|
||||
- Idempotency: re-running for same serial doesn't create duplicate user
|
||||
- Provisioning runbook: step-by-step from unboxing through fusing, flashing, and first boot
|
||||
|
||||
### Excluded
|
||||
- fTPM provisioning (covered by NVIDIA's ftpm_provisioning.sh)
|
||||
- Secure Boot fusing (covered by solution_draft02 Phase 1-2)
|
||||
- OS hardening (covered by solution_draft02 Phase 3)
|
||||
- Admin API user creation endpoint (assumed to exist)
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
**AC-1: Script creates CompanionPC user**
|
||||
Given a new device serial AZJN-0042
|
||||
When provision_device.sh is run with serial AZJN-0042
|
||||
Then admin API has a new user azaion-jetson-0042@azaion.com with Role=CompanionPC
|
||||
|
||||
**AC-2: Credentials written to rootfs**
|
||||
Given provision_device.sh completed successfully
|
||||
When the rootfs image is inspected
|
||||
Then /etc/azaion/device.conf contains the email and password
|
||||
|
||||
**AC-3: Device can log in after flash**
|
||||
Given a provisioned and flashed device boots for the first time
|
||||
When the loader reads /etc/azaion/device.conf and calls POST /login
|
||||
Then a valid JWT is returned
|
||||
|
||||
**AC-4: Idempotent re-run**
|
||||
Given provision_device.sh was already run for serial AZJN-0042
|
||||
When it is run again for the same serial
|
||||
Then no duplicate user is created (existing user is reused or updated)
|
||||
|
||||
**AC-5: Runbook complete**
|
||||
Given the provisioning runbook
|
||||
When followed step-by-step on a new Jetson Orin Nano
|
||||
Then the device is fully fused, flashed, provisioned, and can communicate with the admin API
|
||||
@@ -1,18 +1,24 @@
|
||||
# Batch Report
|
||||
|
||||
**Batch**: 1
|
||||
**Tasks**: 01_test_infrastructure
|
||||
**Date**: 2026-04-13
|
||||
**Tasks**: AZ-182, AZ-184, AZ-187
|
||||
**Date**: 2026-04-15
|
||||
|
||||
## Task Results
|
||||
|
||||
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|
||||
|------|--------|---------------|-------|-------------|--------|
|
||||
| 01_test_infrastructure | Done | 12 files | 1/1 pass | 5/5 ACs (AC-1,2,3 require Docker) | None |
|
||||
| AZ-182_tpm_security_provider | Done | 8 files | 8 pass (1 skip without swtpm) | 6/6 ACs covered | None |
|
||||
| AZ-184_resumable_download_manager | Done | 2 files | 8 pass | 5/5 ACs covered | None |
|
||||
| AZ-187_device_provisioning_script | Done | 3 files | 5 pass | 5/5 ACs covered | None |
|
||||
|
||||
## AC Test Coverage: 5/5 covered (3 require Docker environment)
|
||||
## Code Review Verdict: PASS (infrastructure scaffold, no logic review needed)
|
||||
## Excluded
|
||||
|
||||
AZ-183 (Resources Table & Update API) — admin API repo, not this workspace.
|
||||
|
||||
## AC Test Coverage: All covered (16/16)
|
||||
## Code Review Verdict: PASS_WITH_WARNINGS
|
||||
## Auto-Fix Attempts: 0
|
||||
## Stuck Agents: None
|
||||
|
||||
## Next Batch: 02_test_health_auth
|
||||
## Next Batch: AZ-185, AZ-186 (Batch 2 — 8 points)
|
||||
|
||||
@@ -1,28 +1,19 @@
|
||||
# Batch Report
|
||||
|
||||
**Batch**: 2
|
||||
**Tasks**: 02_test_health_auth
|
||||
**Date**: 2026-04-13
|
||||
**Tasks**: AZ-185, AZ-186
|
||||
**Date**: 2026-04-15
|
||||
|
||||
## Task Results
|
||||
|
||||
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|
||||
|------|--------|---------------|-------|-------------|--------|
|
||||
| 02_test_health_auth | Done | 2 files | 6 tests | 5/5 ACs covered | None |
|
||||
| AZ-185_update_manager | Done | 4 files | 10 pass | 6/6 ACs covered | None |
|
||||
| AZ-186_cicd_artifact_publish | Done | 3 files | 8 pass | 5/5 ACs covered | None |
|
||||
|
||||
## AC Test Coverage: All covered
|
||||
|
||||
| AC | Test | Status |
|
||||
|----|------|--------|
|
||||
| AC-1: Health returns 200 | test_health_returns_200 | Covered |
|
||||
| AC-2: Status unauthenticated | test_status_unauthenticated | Covered |
|
||||
| AC-3: Login valid | test_login_valid_credentials | Covered |
|
||||
| AC-4: Login invalid | test_login_invalid_credentials | Covered |
|
||||
| AC-5: Login empty body | test_login_empty_body | Covered |
|
||||
| AC-2+3: Status authenticated | test_status_authenticated_after_login | Covered |
|
||||
|
||||
## Code Review Verdict: PASS
|
||||
## AC Test Coverage: All covered (11/11)
|
||||
## Code Review Verdict: PASS_WITH_WARNINGS
|
||||
## Auto-Fix Attempts: 0
|
||||
## Stuck Agents: None
|
||||
|
||||
## Next Batch: 03_test_resources, 04_test_unlock, 05_test_resilience_perf (parallel)
|
||||
## Next Batch: All tasks complete
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
# Code Review Report
|
||||
|
||||
**Batch**: 2 (AZ-185, AZ-186)
|
||||
**Date**: 2026-04-15
|
||||
**Verdict**: PASS_WITH_WARNINGS
|
||||
|
||||
## Spec Compliance
|
||||
|
||||
All 11 acceptance criteria across 2 tasks are satisfied with corresponding tests.
|
||||
|
||||
| Task | ACs | Covered | Status |
|
||||
|------|-----|---------|--------|
|
||||
| AZ-185 Update Manager | 6 | 6/6 | All pass (10 tests) |
|
||||
| AZ-186 CI/CD Artifact Publish | 5 | 5/5 | All pass (8 tests) |
|
||||
|
||||
## Findings
|
||||
|
||||
| # | Severity | Category | File:Line | Title |
|
||||
|---|----------|----------|-----------|-------|
|
||||
| 1 | Low | Style | scripts/publish_artifact.py | Union syntax fixed |
|
||||
| 2 | Low | Maintainability | src/main.py:15 | Deprecated on_event startup |
|
||||
|
||||
### Finding Details
|
||||
|
||||
**F1: Union syntax fixed** (Low / Style)
|
||||
- Location: `scripts/publish_artifact.py:172,182`
|
||||
- Description: Used `list[str] | None` syntax, fixed to `Optional[List[str]]` for consistency
|
||||
- Status: Fixed
|
||||
|
||||
**F2: Deprecated on_event startup** (Low / Maintainability)
|
||||
- Location: `src/main.py:15`
|
||||
- Description: `@app.on_event("startup")` is deprecated in modern FastAPI in favor of lifespan context manager
|
||||
- Suggestion: Migrate to `@asynccontextmanager lifespan` when upgrading FastAPI — not blocking
|
||||
- Task: AZ-185
|
||||
|
||||
## Cross-Task Consistency
|
||||
|
||||
- AZ-185 uses AZ-184's `ResumableDownloadManager` correctly via its public API
|
||||
- AZ-186 encrypt format (IV + AES-CBC + PKCS7) is compatible with AZ-184's `decrypt_cbc_file()`
|
||||
- AZ-186's `encryption_key` is hex-encoded; AZ-185's `_aes_key_from_encryption_field` handles hex decoding
|
||||
- Self-update ordering (loader last) correctly implemented in AZ-185
|
||||
@@ -2,8 +2,9 @@
|
||||
|
||||
## Current Step
|
||||
flow: existing-code
|
||||
step: 7
|
||||
name: Refactor
|
||||
status: done
|
||||
sub_step: 7 — Phase 7 Documentation (complete)
|
||||
step: 9
|
||||
name: Implement
|
||||
status: in_progress
|
||||
sub_step: 6 — Launch Batch 2 implementers
|
||||
retry_count: 0
|
||||
current_task: Batch 2 (AZ-185, AZ-186)
|
||||
|
||||
@@ -0,0 +1,199 @@
|
||||
import argparse
|
||||
import gzip
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import secrets
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import boto3
|
||||
import requests
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_PUBLISH_PATH = "/internal/resources/publish"
|
||||
|
||||
|
||||
def _require_env(name: str) -> str:
|
||||
value = os.environ.get(name)
|
||||
if not value:
|
||||
raise ValueError(f"missing required environment variable: {name}")
|
||||
return value
|
||||
|
||||
|
||||
def object_key(dev_stage: str, resource_name: str, architecture: str, version: str) -> str:
|
||||
return f"{dev_stage}/{resource_name}-{architecture}-{version}.enc"
|
||||
|
||||
|
||||
def build_cdn_url(endpoint: str, bucket: str, key: str) -> str:
|
||||
public_base = os.environ.get("CDN_PUBLIC_BASE_URL")
|
||||
if public_base:
|
||||
return f"{public_base.rstrip('/')}/{key}"
|
||||
return f"{endpoint.rstrip('/')}/{bucket}/{key}"
|
||||
|
||||
|
||||
def gzip_file(source_path: str, destination_path: str) -> None:
|
||||
with open(source_path, "rb") as src, gzip.open(
|
||||
destination_path, "wb", compresslevel=9
|
||||
) as dst:
|
||||
shutil.copyfileobj(src, dst, length=1024 * 1024)
|
||||
|
||||
|
||||
def encrypt_aes256_cbc_file(plaintext_path: str, ciphertext_path: str, aes_key: bytes) -> None:
|
||||
if len(aes_key) != 32:
|
||||
raise ValueError("aes key must be 32 bytes")
|
||||
iv = os.urandom(16)
|
||||
cipher = Cipher(
|
||||
algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend()
|
||||
)
|
||||
encryptor = cipher.encryptor()
|
||||
padder = padding.PKCS7(128).padder()
|
||||
with open(ciphertext_path, "wb") as out:
|
||||
out.write(iv)
|
||||
with open(plaintext_path, "rb") as inp:
|
||||
while True:
|
||||
chunk = inp.read(1024 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
padded = padder.update(chunk)
|
||||
if padded:
|
||||
out.write(encryptor.update(padded))
|
||||
tail = padder.finalize()
|
||||
if tail:
|
||||
out.write(encryptor.update(tail))
|
||||
out.write(encryptor.finalize())
|
||||
|
||||
|
||||
def sha256_file(path: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
while True:
|
||||
block = f.read(1024 * 1024)
|
||||
if not block:
|
||||
break
|
||||
h.update(block)
|
||||
return h.hexdigest().lower()
|
||||
|
||||
|
||||
def upload_s3_file(
|
||||
endpoint: str,
|
||||
access_key: str,
|
||||
secret_key: str,
|
||||
bucket: str,
|
||||
key: str,
|
||||
file_path: str,
|
||||
) -> None:
|
||||
client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=endpoint,
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
)
|
||||
with open(file_path, "rb") as body:
|
||||
client.upload_fileobj(body, bucket, key)
|
||||
|
||||
|
||||
def register_resource(
|
||||
admin_base_url: str,
|
||||
token: str,
|
||||
payload: Dict[str, Any],
|
||||
) -> None:
|
||||
path = os.environ.get("ADMIN_API_PUBLISH_PATH", _DEFAULT_PUBLISH_PATH).lstrip("/")
|
||||
base = admin_base_url.rstrip("/")
|
||||
url = f"{base}/{path}"
|
||||
resp = requests.post(
|
||||
url,
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
json=payload,
|
||||
timeout=120,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
|
||||
def publish(
|
||||
file_path: str,
|
||||
resource_name: str,
|
||||
dev_stage: str,
|
||||
architecture: str,
|
||||
version: str,
|
||||
) -> Dict[str, Any]:
|
||||
endpoint = _require_env("S3_ENDPOINT")
|
||||
access_key = _require_env("S3_ACCESS_KEY")
|
||||
secret_key = _require_env("S3_SECRET_KEY")
|
||||
bucket = _require_env("S3_BUCKET")
|
||||
admin_url = _require_env("ADMIN_API_URL")
|
||||
admin_token = _require_env("ADMIN_API_TOKEN")
|
||||
|
||||
key = object_key(dev_stage, resource_name, architecture, version)
|
||||
aes_key = secrets.token_bytes(32)
|
||||
encryption_key_hex = aes_key.hex()
|
||||
|
||||
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
|
||||
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
|
||||
try:
|
||||
gzip_file(file_path, gz_path)
|
||||
encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
|
||||
digest = sha256_file(enc_path)
|
||||
size_bytes = os.path.getsize(enc_path)
|
||||
upload_s3_file(endpoint, access_key, secret_key, bucket, key, enc_path)
|
||||
cdn_url = build_cdn_url(endpoint, bucket, key)
|
||||
body = {
|
||||
"resource_name": resource_name,
|
||||
"dev_stage": dev_stage,
|
||||
"architecture": architecture,
|
||||
"version": version,
|
||||
"cdn_url": cdn_url,
|
||||
"sha256": digest,
|
||||
"encryption_key": encryption_key_hex,
|
||||
"size_bytes": size_bytes,
|
||||
}
|
||||
register_resource(admin_url, admin_token, body)
|
||||
return {
|
||||
"object_key": key,
|
||||
"cdn_url": cdn_url,
|
||||
"sha256": digest,
|
||||
"encryption_key_hex": encryption_key_hex,
|
||||
"size_bytes": size_bytes,
|
||||
}
|
||||
finally:
|
||||
for p in (gz_path, enc_path):
|
||||
try:
|
||||
os.unlink(p)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def parse_args(argv: List[str]) -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Compress, encrypt, upload artifact and register resource")
|
||||
p.add_argument("--file", required=True, help="Path to file to publish")
|
||||
p.add_argument("--resource-name", required=True)
|
||||
p.add_argument("--dev-stage", required=True)
|
||||
p.add_argument("--architecture", required=True)
|
||||
p.add_argument("--version", required=True)
|
||||
return p.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None) -> int:
|
||||
args = parse_args(argv if argv is not None else sys.argv[1:])
|
||||
try:
|
||||
publish(
|
||||
args.file,
|
||||
args.resource_name,
|
||||
args.dev_stage,
|
||||
args.architecture,
|
||||
args.version,
|
||||
)
|
||||
return 0
|
||||
except Exception:
|
||||
logger.exception("publish failed")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -11,6 +11,15 @@ from security_provider import create_security_provider
|
||||
|
||||
app = FastAPI(title="Azaion.Loader")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
def _startup_update_manager():
|
||||
try:
|
||||
from update_manager import maybe_start_update_background
|
||||
except Exception:
|
||||
return
|
||||
maybe_start_update_background(get_api_client, RESOURCE_API_URL)
|
||||
|
||||
security_provider = create_security_provider()
|
||||
|
||||
RESOURCE_API_URL = os.environ.get("RESOURCE_API_URL", "https://api.azaion.com")
|
||||
|
||||
@@ -0,0 +1,266 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from download_manager import ResumableDownloadManager
|
||||
from version_collector import VersionCollector
|
||||
|
||||
|
||||
def _aes_key_from_encryption_field(encryption_key: Any) -> bytes:
|
||||
if isinstance(encryption_key, bytes):
|
||||
if len(encryption_key) == 32:
|
||||
return encryption_key
|
||||
raise ValueError("invalid encryption key")
|
||||
s = str(encryption_key).strip()
|
||||
if len(s) == 64 and all(c in "0123456789abcdefABCDEF" for c in s):
|
||||
return bytes.fromhex(s)
|
||||
return hashlib.sha256(s.encode("utf-8")).digest()
|
||||
|
||||
|
||||
def _sort_services_loader_last(services: List[str]) -> List[str]:
|
||||
head = sorted(s for s in services if s != "loader")
|
||||
tail = [s for s in services if s == "loader"]
|
||||
return head + tail
|
||||
|
||||
|
||||
def _sort_updates_loader_last(updates: List[dict]) -> List[dict]:
|
||||
rest = [u for u in updates if u.get("resourceName") != "loader"]
|
||||
rest.sort(key=lambda u: str(u.get("resourceName", "")))
|
||||
loader = [u for u in updates if u.get("resourceName") == "loader"]
|
||||
return rest + loader
|
||||
|
||||
|
||||
class UpdateManager:
|
||||
def __init__(
|
||||
self,
|
||||
api_url: str,
|
||||
get_token: Callable[[], Optional[str]],
|
||||
download_manager: ResumableDownloadManager,
|
||||
version_collector: VersionCollector,
|
||||
compose_file: str,
|
||||
model_dir: str,
|
||||
state_path: str,
|
||||
interval_seconds: float = 300.0,
|
||||
*,
|
||||
subprocess_run: Optional[Callable] = None,
|
||||
post_get_update: Optional[Callable[..., Any]] = None,
|
||||
head_content_length: Optional[Callable[..., int]] = None,
|
||||
stop_event: Optional[threading.Event] = None,
|
||||
wait_fn: Optional[Callable[[float], bool]] = None,
|
||||
) -> None:
|
||||
self._api_url = api_url.rstrip("/")
|
||||
self._get_token = get_token
|
||||
self._download_manager = download_manager
|
||||
self._version_collector = version_collector
|
||||
self._compose_file = compose_file
|
||||
self._model_dir = model_dir
|
||||
self._state_path = state_path
|
||||
self._interval = interval_seconds
|
||||
self._subprocess_run = subprocess_run or subprocess.run
|
||||
self._post_get_update = post_get_update or self._default_post_get_update
|
||||
self._head_content_length = head_content_length or self._default_head_content_length
|
||||
self._stop_event = stop_event or threading.Event()
|
||||
self._wait_fn = wait_fn
|
||||
|
||||
def _default_post_get_update(self, token: str, body: dict) -> Any:
|
||||
url = f"{self._api_url}/get-update"
|
||||
resp = requests.post(
|
||||
url,
|
||||
json=body,
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
timeout=120,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def _default_head_content_length(self, url: str, token: str) -> int:
|
||||
headers = {}
|
||||
if token:
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
resp = requests.head(url, headers=headers, allow_redirects=True, timeout=120)
|
||||
resp.raise_for_status()
|
||||
cl = resp.headers.get("Content-Length")
|
||||
if not cl:
|
||||
raise ValueError("missing Content-Length")
|
||||
return int(cl)
|
||||
|
||||
def _load_state(self) -> dict:
|
||||
if not os.path.isfile(self._state_path):
|
||||
return {"pending_compose": []}
|
||||
with open(self._state_path, encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if "pending_compose" not in data:
|
||||
data["pending_compose"] = []
|
||||
return data
|
||||
|
||||
def _save_state(self, data: dict) -> None:
|
||||
directory = os.path.dirname(self._state_path)
|
||||
if directory:
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
tmp = self._state_path + ".tmp"
|
||||
with open(tmp, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, sort_keys=True)
|
||||
os.replace(tmp, self._state_path)
|
||||
|
||||
def _drain_pending_compose(self) -> None:
|
||||
state = self._load_state()
|
||||
pending = list(dict.fromkeys(state.get("pending_compose") or []))
|
||||
if not pending:
|
||||
return
|
||||
for svc in _sort_services_loader_last(pending):
|
||||
self._subprocess_run(
|
||||
["docker", "compose", "-f", self._compose_file, "up", "-d", svc],
|
||||
check=True,
|
||||
)
|
||||
state["pending_compose"] = []
|
||||
self._save_state(state)
|
||||
|
||||
def _current_versions_payload(self) -> Dict[str, str]:
|
||||
rows = self._version_collector.collect()
|
||||
return {r.resource_name: r.version for r in rows}
|
||||
|
||||
def _build_get_update_body(self) -> dict:
|
||||
return {
|
||||
"dev_stage": os.environ.get("LOADER_DEV_STAGE", ""),
|
||||
"architecture": os.environ.get("LOADER_ARCH", ""),
|
||||
"current_versions": self._current_versions_payload(),
|
||||
}
|
||||
|
||||
def _artifact_size(self, url: str, token: str) -> int:
|
||||
return self._head_content_length(url, token)
|
||||
|
||||
def _apply_model(self, item: dict, token: str) -> None:
|
||||
name = str(item["resourceName"])
|
||||
version = str(item["version"])
|
||||
url = str(item["cdnUrl"])
|
||||
sha256 = str(item["sha256"])
|
||||
key = _aes_key_from_encryption_field(item["encryptionKey"])
|
||||
size = self._artifact_size(url, token)
|
||||
job_id = f"update-{name}-{version}"
|
||||
os.makedirs(self._model_dir, exist_ok=True)
|
||||
out_path = os.path.join(self._model_dir, f"azaion-{version}.trt")
|
||||
self._download_manager.fetch_decrypt_verify(
|
||||
job_id,
|
||||
url,
|
||||
sha256,
|
||||
size,
|
||||
key,
|
||||
out_path,
|
||||
)
|
||||
self._version_collector.invalidate()
|
||||
|
||||
def _mark_pending_compose(self, service: str) -> None:
|
||||
state = self._load_state()
|
||||
pending = list(state.get("pending_compose") or [])
|
||||
if service not in pending:
|
||||
pending.append(service)
|
||||
state["pending_compose"] = pending
|
||||
self._save_state(state)
|
||||
|
||||
def _clear_pending_compose(self, service: str) -> None:
|
||||
state = self._load_state()
|
||||
pending = [s for s in (state.get("pending_compose") or []) if s != service]
|
||||
state["pending_compose"] = pending
|
||||
self._save_state(state)
|
||||
|
||||
def _apply_docker_image(self, item: dict, token: str) -> None:
|
||||
name = str(item["resourceName"])
|
||||
version = str(item["version"])
|
||||
url = str(item["cdnUrl"])
|
||||
sha256 = str(item["sha256"])
|
||||
key = _aes_key_from_encryption_field(item["encryptionKey"])
|
||||
size = self._artifact_size(url, token)
|
||||
job_id = f"update-{name}-{version}"
|
||||
artifact_dir = os.path.dirname(self._state_path)
|
||||
os.makedirs(artifact_dir, exist_ok=True)
|
||||
out_tar = os.path.join(artifact_dir, f"{job_id}.plaintext.tar")
|
||||
self._download_manager.fetch_decrypt_verify(
|
||||
job_id,
|
||||
url,
|
||||
sha256,
|
||||
size,
|
||||
key,
|
||||
out_tar,
|
||||
)
|
||||
self._subprocess_run(["docker", "load", "-i", out_tar], check=True)
|
||||
self._version_collector.invalidate()
|
||||
self._mark_pending_compose(name)
|
||||
self._subprocess_run(
|
||||
["docker", "compose", "-f", self._compose_file, "up", "-d", name],
|
||||
check=True,
|
||||
)
|
||||
self._clear_pending_compose(name)
|
||||
|
||||
def _tick_once(self) -> None:
|
||||
token = self._get_token()
|
||||
if not token:
|
||||
return
|
||||
self._drain_pending_compose()
|
||||
body = self._build_get_update_body()
|
||||
updates = self._post_get_update(token, body)
|
||||
if not isinstance(updates, list):
|
||||
return
|
||||
for item in _sort_updates_loader_last(updates):
|
||||
rname = str(item.get("resourceName", ""))
|
||||
if rname == "detection_model":
|
||||
self._apply_model(item, token)
|
||||
else:
|
||||
self._apply_docker_image(item, token)
|
||||
|
||||
def run_forever(self) -> None:
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
self._drain_pending_compose()
|
||||
self._tick_once()
|
||||
except Exception as exc:
|
||||
logger.exception("update manager tick failed: {}", exc)
|
||||
if self._wait_fn is not None:
|
||||
if self._wait_fn(self._interval):
|
||||
break
|
||||
elif self._stop_event.wait(self._interval):
|
||||
break
|
||||
|
||||
|
||||
def maybe_start_update_background(
|
||||
get_api_client: Callable[[], Any],
|
||||
api_url: str,
|
||||
) -> None:
|
||||
state_dir = os.environ.get("LOADER_DOWNLOAD_STATE_DIR")
|
||||
if not state_dir:
|
||||
return
|
||||
model_dir = os.environ.get("LOADER_MODEL_DIR", "models")
|
||||
compose_file = os.environ.get("LOADER_COMPOSE_FILE", "docker-compose.yml")
|
||||
interval = float(os.environ.get("LOADER_UPDATE_INTERVAL_SEC", "300"))
|
||||
orchestrator_path = os.environ.get(
|
||||
"LOADER_UPDATE_STATE_PATH",
|
||||
os.path.join(state_dir, "update_orchestrator.json"),
|
||||
)
|
||||
|
||||
def token_getter() -> Optional[str]:
|
||||
client = get_api_client()
|
||||
return getattr(client, "token", None)
|
||||
|
||||
try:
|
||||
dm = ResumableDownloadManager(state_dir)
|
||||
vc = VersionCollector(model_dir)
|
||||
um = UpdateManager(
|
||||
api_url,
|
||||
token_getter,
|
||||
dm,
|
||||
vc,
|
||||
compose_file,
|
||||
model_dir,
|
||||
orchestrator_path,
|
||||
interval_seconds=interval,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("update manager failed to start: {}", exc)
|
||||
return
|
||||
|
||||
threading.Thread(target=um.run_forever, name="loader-updates", daemon=True).start()
|
||||
@@ -0,0 +1,91 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Callable, List, Optional
|
||||
|
||||
TRT_DATE_PATTERN = re.compile(r"^azaion-(\d{4}-\d{2}-\d{2})\.trt$", re.IGNORECASE)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResourceVersion:
|
||||
resource_name: str
|
||||
version: str
|
||||
|
||||
|
||||
class VersionCollector:
|
||||
def __init__(
|
||||
self,
|
||||
model_dir: str,
|
||||
*,
|
||||
subprocess_run: Optional[Callable] = None,
|
||||
) -> None:
|
||||
self._model_dir = model_dir
|
||||
self._subprocess_run = subprocess_run or subprocess.run
|
||||
self._cache: Optional[List[ResourceVersion]] = None
|
||||
|
||||
def invalidate(self) -> None:
|
||||
self._cache = None
|
||||
|
||||
def collect(self) -> List[ResourceVersion]:
|
||||
if self._cache is not None:
|
||||
return list(self._cache)
|
||||
rows = self._collect_uncached()
|
||||
self._cache = rows
|
||||
return list(rows)
|
||||
|
||||
def collect_as_dicts(self) -> List[dict]:
|
||||
return [asdict(r) for r in self.collect()]
|
||||
|
||||
def _collect_uncached(self) -> List[ResourceVersion]:
|
||||
out: List[ResourceVersion] = []
|
||||
mv = self._best_trt_version()
|
||||
if mv is not None:
|
||||
out.append(ResourceVersion("detection_model", mv))
|
||||
out.extend(self._docker_versions())
|
||||
rest = [r for r in out if r.resource_name != "detection_model"]
|
||||
rest.sort(key=lambda r: r.resource_name)
|
||||
if mv is not None:
|
||||
return [ResourceVersion("detection_model", mv)] + rest
|
||||
return rest
|
||||
|
||||
def _best_trt_version(self) -> Optional[str]:
|
||||
if not os.path.isdir(self._model_dir):
|
||||
return None
|
||||
best: Optional[str] = None
|
||||
for name in os.listdir(self._model_dir):
|
||||
m = TRT_DATE_PATTERN.match(name)
|
||||
if not m:
|
||||
continue
|
||||
v = m.group(1)
|
||||
if best is None or v > best:
|
||||
best = v
|
||||
return best
|
||||
|
||||
def _docker_versions(self) -> List[ResourceVersion]:
|
||||
try:
|
||||
result = self._subprocess_run(
|
||||
["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
except (OSError, subprocess.CalledProcessError):
|
||||
return []
|
||||
found: List[ResourceVersion] = []
|
||||
for line in result.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if not line or ":<none>" in line:
|
||||
continue
|
||||
if not line.startswith("azaion/"):
|
||||
continue
|
||||
if ":" not in line:
|
||||
continue
|
||||
repo, tag = line.rsplit(":", 1)
|
||||
if tag in ("<none>", ""):
|
||||
continue
|
||||
parts = repo.split("/", 1)
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
found.append(ResourceVersion(parts[1], tag))
|
||||
return found
|
||||
@@ -0,0 +1,331 @@
|
||||
import gzip
|
||||
import importlib.util
|
||||
import io
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import yaml
|
||||
|
||||
from download_manager import decrypt_cbc_file
|
||||
|
||||
_ROOT = Path(__file__).resolve().parents[1]
|
||||
_SCRIPT = _ROOT / "scripts" / "publish_artifact.py"
|
||||
_WOODPECKER = _ROOT / ".woodpecker" / "build-arm.yml"
|
||||
|
||||
|
||||
def _load_publish():
|
||||
spec = importlib.util.spec_from_file_location("publish_artifact", _SCRIPT)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def _s3_client_factory(storage):
|
||||
def client(service_name, **kwargs):
|
||||
if service_name != "s3":
|
||||
raise AssertionError(service_name)
|
||||
m = MagicMock()
|
||||
|
||||
def upload_fileobj(body, bucket, key):
|
||||
storage.setdefault(bucket, {})[key] = body.read()
|
||||
|
||||
m.upload_fileobj.side_effect = upload_fileobj
|
||||
|
||||
def get_object(Bucket=None, Key=None):
|
||||
return {"Body": io.BytesIO(storage[Bucket][Key])}
|
||||
|
||||
m.get_object.side_effect = get_object
|
||||
return m
|
||||
|
||||
return client
|
||||
|
||||
|
||||
class TestPublishArtifact(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._env_patch = None
|
||||
|
||||
def tearDown(self):
|
||||
if self._env_patch:
|
||||
self._env_patch.stop()
|
||||
|
||||
def _base_env(self):
|
||||
return {
|
||||
"S3_ENDPOINT": "https://s3.example.test",
|
||||
"S3_ACCESS_KEY": "ak",
|
||||
"S3_SECRET_KEY": "sk",
|
||||
"S3_BUCKET": "test-bucket",
|
||||
"ADMIN_API_URL": "https://admin.example.test",
|
||||
"ADMIN_API_TOKEN": "token",
|
||||
}
|
||||
|
||||
def test_ac1_end_to_end_publish(self):
|
||||
# Arrange
|
||||
mod = _load_publish()
|
||||
env = self._base_env()
|
||||
self._env_patch = patch.dict(os.environ, env, clear=False)
|
||||
self._env_patch.start()
|
||||
captured = {}
|
||||
storage = {}
|
||||
|
||||
def fake_post(url, headers=None, json=None, timeout=None):
|
||||
class R:
|
||||
status_code = 200
|
||||
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
|
||||
captured["url"] = url
|
||||
captured["body"] = json
|
||||
return R()
|
||||
|
||||
fd, src = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
try:
|
||||
with open(src, "wb") as f:
|
||||
f.write(b"artifact-bytes")
|
||||
with patch.object(
|
||||
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
||||
), patch.object(mod.requests, "post", side_effect=fake_post):
|
||||
# Act
|
||||
out = mod.publish(
|
||||
src,
|
||||
"loader",
|
||||
"dev",
|
||||
"arm64",
|
||||
"v1",
|
||||
)
|
||||
# Assert
|
||||
self.assertEqual(
|
||||
out["object_key"],
|
||||
"dev/loader-arm64-v1.enc",
|
||||
)
|
||||
key = out["object_key"]
|
||||
body = storage["test-bucket"][key]
|
||||
h = __import__("hashlib").sha256(body).hexdigest().lower()
|
||||
self.assertEqual(h, out["sha256"])
|
||||
self.assertEqual(captured["body"]["sha256"], out["sha256"])
|
||||
self.assertEqual(captured["body"]["size_bytes"], len(body))
|
||||
self.assertEqual(captured["body"]["encryption_key"], out["encryption_key_hex"])
|
||||
self.assertEqual(captured["body"]["cdn_url"], out["cdn_url"])
|
||||
finally:
|
||||
os.unlink(src)
|
||||
|
||||
def test_ac2_woodpecker_publish_step_after_build(self):
|
||||
# Arrange
|
||||
raw = _WOODPECKER.read_text(encoding="utf-8")
|
||||
# Act
|
||||
doc = yaml.safe_load(raw)
|
||||
names = [s["name"] for s in doc["steps"]]
|
||||
# Assert
|
||||
self.assertIn("build-push", names)
|
||||
self.assertIn("publish-artifact", names)
|
||||
self.assertLess(names.index("build-push"), names.index("publish-artifact"))
|
||||
build_cmds = "\n".join(doc["steps"][names.index("build-push")]["commands"])
|
||||
self.assertIn("docker save", build_cmds)
|
||||
pub_cmds = "\n".join(doc["steps"][names.index("publish-artifact")]["commands"])
|
||||
self.assertIn("publish_artifact.py", pub_cmds)
|
||||
self.assertIn("loader-image.tar", pub_cmds)
|
||||
|
||||
def test_ac3_unique_key_per_publish(self):
|
||||
# Arrange
|
||||
mod = _load_publish()
|
||||
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
||||
self._env_patch.start()
|
||||
keys = []
|
||||
storage = {}
|
||||
|
||||
def capture_post(url, headers=None, json=None, timeout=None):
|
||||
keys.append(json["encryption_key"])
|
||||
|
||||
class R:
|
||||
status_code = 200
|
||||
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
|
||||
return R()
|
||||
|
||||
fd, src = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
try:
|
||||
with open(src, "wb") as f:
|
||||
f.write(b"x")
|
||||
with patch.object(
|
||||
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
||||
), patch.object(mod.requests, "post", side_effect=capture_post):
|
||||
# Act
|
||||
mod.publish(src, "r", "dev", "arm64", "1")
|
||||
mod.publish(src, "r", "dev", "arm64", "2")
|
||||
# Assert
|
||||
self.assertEqual(len(keys), 2)
|
||||
self.assertNotEqual(keys[0], keys[1])
|
||||
self.assertEqual(len(bytes.fromhex(keys[0])), 32)
|
||||
self.assertEqual(len(bytes.fromhex(keys[1])), 32)
|
||||
finally:
|
||||
os.unlink(src)
|
||||
|
||||
def test_ac4_sha256_matches_s3_object_and_registration(self):
|
||||
# Arrange
|
||||
mod = _load_publish()
|
||||
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
||||
self._env_patch.start()
|
||||
posted = {}
|
||||
storage = {}
|
||||
|
||||
def fake_post(url, headers=None, json=None, timeout=None):
|
||||
posted.update(json)
|
||||
|
||||
class R:
|
||||
status_code = 200
|
||||
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
|
||||
return R()
|
||||
|
||||
fd, src = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
try:
|
||||
with open(src, "wb") as f:
|
||||
f.write(b"payload-for-hash")
|
||||
with patch.object(
|
||||
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
||||
), patch.object(mod.requests, "post", side_effect=fake_post):
|
||||
# Act
|
||||
out = mod.publish(src, "m", "stage", "arm64", "9.9.9")
|
||||
key = out["object_key"]
|
||||
body = storage["test-bucket"][key]
|
||||
expect = __import__("hashlib").sha256(body).hexdigest().lower()
|
||||
# Assert
|
||||
self.assertEqual(posted["sha256"], expect)
|
||||
self.assertEqual(out["sha256"], expect)
|
||||
finally:
|
||||
os.unlink(src)
|
||||
|
||||
def test_ac5_main_entry_matches_cli_invocation(self):
|
||||
# Arrange
|
||||
mod = _load_publish()
|
||||
self._env_patch = patch.dict(os.environ, self._base_env(), clear=False)
|
||||
self._env_patch.start()
|
||||
storage = {}
|
||||
|
||||
def ok_post(url, headers=None, json=None, timeout=None):
|
||||
class R:
|
||||
status_code = 200
|
||||
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
|
||||
return R()
|
||||
|
||||
fd, src = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
try:
|
||||
with open(src, "wb") as f:
|
||||
f.write(b"cli-data")
|
||||
with patch.object(
|
||||
mod.boto3, "client", side_effect=_s3_client_factory(storage)
|
||||
), patch.object(mod.requests, "post", side_effect=ok_post):
|
||||
# Act
|
||||
code = mod.main(
|
||||
[
|
||||
"--file",
|
||||
src,
|
||||
"--resource-name",
|
||||
"model",
|
||||
"--dev-stage",
|
||||
"dev",
|
||||
"--architecture",
|
||||
"arm64",
|
||||
"--version",
|
||||
"0.0.1",
|
||||
]
|
||||
)
|
||||
# Assert
|
||||
self.assertEqual(code, 0)
|
||||
self.assertGreater(
|
||||
len(storage["test-bucket"]["dev/model-arm64-0.0.1.enc"]), 0
|
||||
)
|
||||
finally:
|
||||
os.unlink(src)
|
||||
|
||||
def test_ac5_cli_help_exits_zero(self):
|
||||
# Act
|
||||
r = subprocess.run(
|
||||
[sys.executable, str(_SCRIPT), "--help"],
|
||||
cwd=str(_ROOT),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
# Assert
|
||||
self.assertEqual(r.returncode, 0)
|
||||
self.assertIn("--resource-name", r.stdout)
|
||||
|
||||
def test_ac5_subprocess_script_missing_env_exits_nonzero(self):
|
||||
# Arrange
|
||||
fd, path = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
try:
|
||||
minimal_env = {
|
||||
k: v
|
||||
for k, v in os.environ.items()
|
||||
if k in ("PATH", "HOME", "TMPDIR", "SYSTEMROOT")
|
||||
}
|
||||
# Act
|
||||
r = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(_SCRIPT),
|
||||
"--file",
|
||||
path,
|
||||
"--resource-name",
|
||||
"x",
|
||||
"--dev-stage",
|
||||
"d",
|
||||
"--architecture",
|
||||
"arm64",
|
||||
"--version",
|
||||
"1",
|
||||
],
|
||||
cwd=str(_ROOT),
|
||||
env=minimal_env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
# Assert
|
||||
self.assertNotEqual(r.returncode, 0)
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_encryption_compatible_with_decrypt_cbc_file(self):
|
||||
# Arrange
|
||||
mod = _load_publish()
|
||||
aes_key = os.urandom(32)
|
||||
fd, plain = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name
|
||||
enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name
|
||||
dec_path = tempfile.NamedTemporaryFile(delete=False, suffix=".bin").name
|
||||
try:
|
||||
with open(plain, "wb") as f:
|
||||
f.write(b"round-trip-plain")
|
||||
mod.gzip_file(plain, gz_path)
|
||||
mod.encrypt_aes256_cbc_file(gz_path, enc_path, aes_key)
|
||||
# Act
|
||||
decrypt_cbc_file(enc_path, aes_key, dec_path)
|
||||
with open(dec_path, "rb") as f:
|
||||
restored = gzip.decompress(f.read())
|
||||
# Assert
|
||||
self.assertEqual(restored, b"round-trip-plain")
|
||||
finally:
|
||||
for p in (plain, gz_path, enc_path, dec_path):
|
||||
try:
|
||||
os.unlink(p)
|
||||
except OSError:
|
||||
pass
|
||||
@@ -0,0 +1,351 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from typing import List
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from download_manager import ResumableDownloadManager
|
||||
from update_manager import UpdateManager, maybe_start_update_background
|
||||
from version_collector import VersionCollector
|
||||
|
||||
|
||||
class TestUpdateManager(unittest.TestCase):
|
||||
def _make_manager(
|
||||
self,
|
||||
tmp: str,
|
||||
*,
|
||||
post_get_update=None,
|
||||
subprocess_run=None,
|
||||
head_content_length=None,
|
||||
wait_fn=None,
|
||||
stop_event=None,
|
||||
):
|
||||
dm_dir = os.path.join(tmp, "dm")
|
||||
model_dir = os.path.join(tmp, "models")
|
||||
state_path = os.path.join(dm_dir, "update_orchestrator.json")
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
dm = ResumableDownloadManager(dm_dir)
|
||||
vc = VersionCollector(model_dir, subprocess_run=subprocess_run or MagicMock())
|
||||
um = UpdateManager(
|
||||
"http://api.test",
|
||||
lambda: "tok",
|
||||
dm,
|
||||
vc,
|
||||
os.path.join(tmp, "compose.yml"),
|
||||
model_dir,
|
||||
state_path,
|
||||
interval_seconds=300.0,
|
||||
subprocess_run=subprocess_run,
|
||||
post_get_update=post_get_update,
|
||||
head_content_length=head_content_length,
|
||||
wait_fn=wait_fn,
|
||||
stop_event=stop_event,
|
||||
)
|
||||
return um, dm, vc
|
||||
|
||||
def test_ac2_background_loop_polls_on_schedule(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
posts: List[dict] = []
|
||||
|
||||
def post(token, body):
|
||||
posts.append({"token": token, "body": body})
|
||||
return []
|
||||
|
||||
waits: List[float] = []
|
||||
|
||||
def wait_fn(interval):
|
||||
waits.append(interval)
|
||||
return len(waits) >= 2
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
um, _, _ = self._make_manager(
|
||||
tmp,
|
||||
post_get_update=post,
|
||||
subprocess_run=fake_run,
|
||||
head_content_length=lambda url, token: 1,
|
||||
wait_fn=wait_fn,
|
||||
)
|
||||
# Act
|
||||
um.run_forever()
|
||||
# Assert
|
||||
self.assertEqual(len(posts), 2)
|
||||
self.assertEqual(waits, [300.0, 300.0])
|
||||
|
||||
def test_ac2_default_interval_is_five_minutes(self):
|
||||
# Arrange / Act
|
||||
tmp = tempfile.mkdtemp()
|
||||
dm_dir = os.path.join(tmp, "dm")
|
||||
model_dir = os.path.join(tmp, "m")
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
dm = ResumableDownloadManager(dm_dir)
|
||||
vc = VersionCollector(model_dir, subprocess_run=MagicMock())
|
||||
um = UpdateManager(
|
||||
"http://x",
|
||||
lambda: None,
|
||||
dm,
|
||||
vc,
|
||||
"c.yml",
|
||||
model_dir,
|
||||
os.path.join(dm_dir, "st.json"),
|
||||
)
|
||||
# Assert
|
||||
self.assertEqual(um._interval, 300.0)
|
||||
|
||||
def test_ac3_ai_model_update_applied(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
model_dir = os.path.join(tmp, "models")
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
dm_mock = MagicMock()
|
||||
|
||||
def post(token, body):
|
||||
return [
|
||||
{
|
||||
"resourceName": "detection_model",
|
||||
"version": "2026-04-20",
|
||||
"cdnUrl": "http://cdn/x",
|
||||
"sha256": "ab",
|
||||
"encryptionKey": "k",
|
||||
}
|
||||
]
|
||||
|
||||
um, _, _ = self._make_manager(
|
||||
tmp,
|
||||
post_get_update=post,
|
||||
subprocess_run=fake_run,
|
||||
head_content_length=lambda url, token: 4,
|
||||
)
|
||||
um._download_manager = dm_mock
|
||||
|
||||
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
|
||||
with open(output_plaintext_path, "wb") as f:
|
||||
f.write(b"trt")
|
||||
|
||||
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
|
||||
# Act
|
||||
um._tick_once()
|
||||
# Assert
|
||||
dm_mock.fetch_decrypt_verify.assert_called_once()
|
||||
args, kwargs = dm_mock.fetch_decrypt_verify.call_args
|
||||
self.assertTrue(args[5].endswith("azaion-2026-04-20.trt"))
|
||||
self.assertTrue(os.path.isfile(os.path.join(model_dir, "azaion-2026-04-20.trt")))
|
||||
|
||||
def test_ac4_docker_image_update_applied(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
recorded: List[List[str]] = []
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
recorded.append(list(cmd))
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:3] == ["docker", "load", "-i"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:2] == ["docker", "compose"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
dm_mock = MagicMock()
|
||||
|
||||
def post(token, body):
|
||||
return [
|
||||
{
|
||||
"resourceName": "annotations",
|
||||
"version": "2026-04-13",
|
||||
"cdnUrl": "http://cdn/a",
|
||||
"sha256": "cd",
|
||||
"encryptionKey": "k",
|
||||
}
|
||||
]
|
||||
|
||||
um, _, _ = self._make_manager(
|
||||
tmp,
|
||||
post_get_update=post,
|
||||
subprocess_run=fake_run,
|
||||
head_content_length=lambda url, token: 8,
|
||||
)
|
||||
um._download_manager = dm_mock
|
||||
|
||||
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
|
||||
with open(output_plaintext_path, "wb") as f:
|
||||
f.write(b"tarbytes")
|
||||
|
||||
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
|
||||
# Act
|
||||
um._tick_once()
|
||||
# Assert
|
||||
loads = [c for c in recorded if c[:3] == ["docker", "load", "-i"]]
|
||||
composes = [c for c in recorded if c[:2] == ["docker", "compose"]]
|
||||
self.assertEqual(len(loads), 1)
|
||||
self.assertEqual(len(composes), 1)
|
||||
self.assertIn("annotations", composes[0])
|
||||
|
||||
def test_ac5_self_update_applied_last(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
recorded: List[str] = []
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:3] == ["docker", "load", "-i"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:2] == ["docker", "compose"]:
|
||||
recorded.append(cmd[-1])
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
dm_mock = MagicMock()
|
||||
|
||||
def post(token, body):
|
||||
return [
|
||||
{
|
||||
"resourceName": "loader",
|
||||
"version": "v2",
|
||||
"cdnUrl": "http://cdn/l",
|
||||
"sha256": "00",
|
||||
"encryptionKey": "k",
|
||||
},
|
||||
{
|
||||
"resourceName": "annotations",
|
||||
"version": "v1",
|
||||
"cdnUrl": "http://cdn/a",
|
||||
"sha256": "11",
|
||||
"encryptionKey": "k",
|
||||
},
|
||||
]
|
||||
|
||||
um, _, _ = self._make_manager(
|
||||
tmp,
|
||||
post_get_update=post,
|
||||
subprocess_run=fake_run,
|
||||
head_content_length=lambda url, token: 1,
|
||||
)
|
||||
um._download_manager = dm_mock
|
||||
|
||||
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
|
||||
with open(output_plaintext_path, "wb") as f:
|
||||
f.write(b"x")
|
||||
|
||||
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
|
||||
# Act
|
||||
um._tick_once()
|
||||
# Assert
|
||||
self.assertEqual(recorded, ["annotations", "loader"])
|
||||
|
||||
def test_ac6_invalidate_after_docker_apply(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:3] == ["docker", "load", "-i"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:2] == ["docker", "compose"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
dm_mock = MagicMock()
|
||||
|
||||
def post(token, body):
|
||||
return [
|
||||
{
|
||||
"resourceName": "annotations",
|
||||
"version": "v9",
|
||||
"cdnUrl": "http://cdn/a",
|
||||
"sha256": "11",
|
||||
"encryptionKey": "k",
|
||||
}
|
||||
]
|
||||
|
||||
um, _, vc = self._make_manager(
|
||||
tmp,
|
||||
post_get_update=post,
|
||||
subprocess_run=fake_run,
|
||||
head_content_length=lambda url, token: 1,
|
||||
)
|
||||
um._download_manager = dm_mock
|
||||
|
||||
def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path):
|
||||
with open(output_plaintext_path, "wb") as f:
|
||||
f.write(b"x")
|
||||
|
||||
dm_mock.fetch_decrypt_verify.side_effect = capture_fetch
|
||||
vc.collect()
|
||||
self.assertIsNotNone(vc._cache)
|
||||
# Act
|
||||
um._tick_once()
|
||||
# Assert
|
||||
self.assertIsNone(vc._cache)
|
||||
|
||||
def test_maybe_start_skips_without_download_state_dir(self):
|
||||
# Arrange
|
||||
old = os.environ.pop("LOADER_DOWNLOAD_STATE_DIR", None)
|
||||
try:
|
||||
|
||||
def get_client():
|
||||
return MagicMock()
|
||||
|
||||
# Act
|
||||
maybe_start_update_background(get_client, "http://x")
|
||||
finally:
|
||||
if old is not None:
|
||||
os.environ["LOADER_DOWNLOAD_STATE_DIR"] = old
|
||||
|
||||
def test_pending_compose_drained_on_startup(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
dm_dir = os.path.join(tmp, "dm")
|
||||
os.makedirs(dm_dir, exist_ok=True)
|
||||
state_path = os.path.join(dm_dir, "update_orchestrator.json")
|
||||
with open(state_path, "w", encoding="utf-8") as f:
|
||||
json.dump({"pending_compose": ["annotations", "loader"]}, f)
|
||||
recorded: List[str] = []
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if cmd[:2] == ["docker", "compose"]:
|
||||
recorded.append(cmd[-1])
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(cmd)
|
||||
|
||||
model_dir = os.path.join(tmp, "m")
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
dm = ResumableDownloadManager(dm_dir)
|
||||
vc = VersionCollector(model_dir, subprocess_run=fake_run)
|
||||
um = UpdateManager(
|
||||
"http://api.test",
|
||||
lambda: None,
|
||||
dm,
|
||||
vc,
|
||||
os.path.join(tmp, "compose.yml"),
|
||||
model_dir,
|
||||
state_path,
|
||||
subprocess_run=fake_run,
|
||||
)
|
||||
# Act
|
||||
um._drain_pending_compose()
|
||||
# Assert
|
||||
self.assertEqual(recorded, ["annotations", "loader"])
|
||||
with open(state_path, encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual(data.get("pending_compose"), [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,65 @@
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from version_collector import VersionCollector
|
||||
|
||||
|
||||
class TestVersionCollector(unittest.TestCase):
|
||||
def test_ac1_version_collector_reads_local_state(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
open(os.path.join(tmp, "azaion-2026-03-10.trt"), "wb").close()
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(
|
||||
cmd,
|
||||
0,
|
||||
stdout="azaion/annotations:arm64_2026-03-01\n",
|
||||
stderr="",
|
||||
)
|
||||
raise AssertionError(f"unexpected cmd {cmd}")
|
||||
|
||||
vc = VersionCollector(tmp, subprocess_run=fake_run)
|
||||
# Act
|
||||
got = vc.collect_as_dicts()
|
||||
# Assert
|
||||
self.assertEqual(
|
||||
got,
|
||||
[
|
||||
{"resource_name": "detection_model", "version": "2026-03-10"},
|
||||
{"resource_name": "annotations", "version": "arm64_2026-03-01"},
|
||||
],
|
||||
)
|
||||
|
||||
def test_ac6_cache_invalidates_after_changes(self):
|
||||
# Arrange
|
||||
tmp = tempfile.mkdtemp()
|
||||
open(os.path.join(tmp, "azaion-2026-01-01.trt"), "wb").close()
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
if cmd[:3] == ["docker", "images", "--format"]:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
raise AssertionError(f"unexpected cmd {cmd}")
|
||||
|
||||
vc = VersionCollector(tmp, subprocess_run=fake_run)
|
||||
first = vc.collect_as_dicts()
|
||||
open(os.path.join(tmp, "azaion-2026-02-01.trt"), "wb").close()
|
||||
second_cached = vc.collect_as_dicts()
|
||||
vc.invalidate()
|
||||
third = vc.collect_as_dicts()
|
||||
# Assert
|
||||
self.assertEqual(
|
||||
first,
|
||||
[{"resource_name": "detection_model", "version": "2026-01-01"}],
|
||||
)
|
||||
self.assertEqual(second_cached, first)
|
||||
self.assertEqual(
|
||||
third,
|
||||
[{"resource_name": "detection_model", "version": "2026-02-01"}],
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user