diff --git a/.woodpecker/build-arm.yml b/.woodpecker/build-arm.yml index fc934a1..fe12e34 100644 --- a/.woodpecker/build-arm.yml +++ b/.woodpecker/build-arm.yml @@ -12,5 +12,19 @@ steps: - if [ "$CI_COMMIT_BRANCH" = "main" ]; then export TAG=arm; else export TAG=${CI_COMMIT_BRANCH}-arm; fi - docker build -f Dockerfile -t localhost:5000/loader:$TAG . - docker push localhost:5000/loader:$TAG + - docker save localhost:5000/loader:$TAG -o loader-image.tar volumes: - /var/run/docker.sock:/var/run/docker.sock + - name: publish-artifact + image: python:3.11-slim + commands: + - pip install --no-cache-dir boto3==1.40.9 cryptography==44.0.2 requests==2.32.4 + - | + if [ "$CI_COMMIT_BRANCH" = "main" ]; then + export PUBLISH_DEV_STAGE=main + export TAG=arm + else + export PUBLISH_DEV_STAGE=$CI_COMMIT_BRANCH + export TAG=${CI_COMMIT_BRANCH}-arm + fi + - python scripts/publish_artifact.py --file loader-image.tar --resource-name loader --dev-stage "$PUBLISH_DEV_STAGE" --architecture arm64 --version "$CI_COMMIT_SHA" diff --git a/_docs/02_document/deployment/publish_artifact_integration.md b/_docs/02_document/deployment/publish_artifact_integration.md new file mode 100644 index 0000000..ff007c1 --- /dev/null +++ b/_docs/02_document/deployment/publish_artifact_integration.md @@ -0,0 +1,53 @@ +# Publish artifact script (AZ-186) + +Training services and CI/CD call `scripts/publish_artifact.py` after producing an artifact (for example a `.trt` model or a Docker image tarball). The script gzip-compresses the file, encrypts it with a random 32-byte AES-256 key (AES-CBC with PKCS7, IV prefixed), uploads the ciphertext to S3, and registers metadata with the admin API. + +## CLI + +```text +python scripts/publish_artifact.py \ + --file /path/to/artifact \ + --resource-name my_model \ + --dev-stage dev \ + --architecture arm64 \ + --version 2026-04-15 +``` + +Object key: `{dev_stage}/{resource_name}-{architecture}-{version}.enc` + +## Environment variables + +| Variable | Required | Purpose | +|----------|----------|---------| +| `S3_ENDPOINT` | yes | S3-compatible endpoint URL | +| `S3_ACCESS_KEY` | yes | Upload credentials | +| `S3_SECRET_KEY` | yes | Upload credentials | +| `S3_BUCKET` | yes | Target bucket | +| `ADMIN_API_URL` | yes | Admin API base URL (no trailing path for publish) | +| `ADMIN_API_TOKEN` | yes | Bearer token for the publish request | +| `CDN_PUBLIC_BASE_URL` | no | If set, `cdn_url` in the registration payload is `{CDN_PUBLIC_BASE_URL}/{object_key}`; otherwise it defaults to `{S3_ENDPOINT}/{S3_BUCKET}/{object_key}` | +| `ADMIN_API_PUBLISH_PATH` | no | Defaults to `internal/resources/publish`; POST is sent to `{ADMIN_API_URL}/{ADMIN_API_PUBLISH_PATH}` | + +## Admin API contract + +`POST {ADMIN_API_URL}/internal/resources/publish` (unless overridden) with JSON body: + +- `resource_name`, `dev_stage`, `architecture`, `version` (strings) +- `cdn_url` (string) +- `sha256` (lowercase hex of the uploaded ciphertext file, including the 16-byte IV) +- `encryption_key` (64-character hex encoding of the raw 32-byte AES key) +- `size_bytes` (integer size of the uploaded ciphertext file) + +The loader expects the same `encryption_key` and `sha256` semantics as returned by fleet `POST /get-update` (hex key, hash of the ciphertext object). + +## Dependencies + +Use the same major versions as the loader: `boto3`, `cryptography`, `requests` (see `requirements.txt`). A minimal install for a training host is: + +```text +pip install boto3==1.40.9 cryptography==44.0.2 requests==2.32.4 +``` + +## Woodpecker + +Pipeline `.woodpecker/build-arm.yml` saves the built image to `loader-image.tar` and runs this script in a follow-up step. Configure the environment variables above as Woodpecker secrets for that step. diff --git a/_docs/02_tasks/done/AZ-182_tpm_security_provider.md b/_docs/02_tasks/done/AZ-182_tpm_security_provider.md new file mode 100644 index 0000000..09dbef5 --- /dev/null +++ b/_docs/02_tasks/done/AZ-182_tpm_security_provider.md @@ -0,0 +1,129 @@ +# TPM-Based Security Provider + +**Task**: AZ-182_tpm_security_provider +**Name**: TPM Security Provider +**Description**: Introduce SecurityProvider abstraction with TPM detection and FAPI integration, wrapping existing security logic in LegacySecurityProvider for backward compatibility +**Complexity**: 5 points +**Dependencies**: None +**Component**: 02 Security +**Tracker**: AZ-182 +**Epic**: AZ-181 + +## Problem + +The loader's security code (key derivation, encryption, hardware fingerprinting) is hardcoded for the binary-split scheme. On fused Jetson Orin Nano devices with fTPM, this scheme is unnecessary — full-disk encryption protects data at rest, and the fleet update system (AZ-185) handles encrypted artifact delivery with per-artifact keys. However, the loader still needs a clean abstraction to: +1. Detect whether it's running on a TPM-equipped device or a legacy environment +2. Provide TPM seal/unseal capability as infrastructure for defense-in-depth (sealed credentials, future key wrapping) +3. Preserve the legacy code path for non-TPM deployments + +## Outcome + +- Loader detects TPM availability at startup and selects the appropriate security provider +- SecurityProvider abstraction cleanly separates TPM and legacy code paths +- TpmSecurityProvider establishes FAPI connection and provides seal/unseal operations +- LegacySecurityProvider wraps existing security.pyx unchanged +- Foundation in place for fTPM-sealed credentials (future) and per-artifact key decryption integration + +## Scope + +### Included +- SecurityProvider abstraction (ABC) with TpmSecurityProvider and LegacySecurityProvider +- Runtime TPM detection (/dev/tpm0 + SECURITY_PROVIDER env var override) +- tpm2-pytss FAPI integration: connect, create_seal, unseal +- LegacySecurityProvider wrapping existing security.pyx (encrypt, decrypt, key derivation) +- Auto-detection and provider selection at startup with logging +- Docker compose device mounts for /dev/tpm0 and /dev/tpmrm0 +- Dockerfile changes: install tpm2-tss native library + tpm2-pytss +- Tests using TPM simulator (swtpm) + +### Excluded +- Resource download/upload changes (handled by AZ-185 Update Manager with per-artifact keys) +- Docker unlock flow changes (handled by AZ-185 Update Manager) +- fTPM provisioning pipeline (manufacturing-time, separate from code) +- Remote attestation via EK certificates +- fTPM-sealed device credentials (future enhancement, not v1) +- Changes to the Azaion admin API server + +## Acceptance Criteria + +**AC-1: SecurityProvider auto-detection** +Given a Jetson device with provisioned fTPM and /dev/tpm0 accessible +When the loader starts +Then TpmSecurityProvider is selected and logged + +**AC-2: TPM seal/unseal round-trip** +Given TpmSecurityProvider is active +When data is sealed via FAPI create_seal and later unsealed +Then the unsealed data matches the original + +**AC-3: Legacy path unchanged** +Given no TPM is available (/dev/tpm0 absent) +When the loader starts and processes resource requests +Then LegacySecurityProvider is selected and all behavior is identical to the current scheme + +**AC-4: Env var override** +Given SECURITY_PROVIDER=legacy is set +When the loader starts on a device with /dev/tpm0 present +Then LegacySecurityProvider is selected regardless of TPM availability + +**AC-5: Graceful fallback** +Given /dev/tpm0 exists but FAPI connection fails +When the loader starts +Then it falls back to LegacySecurityProvider with a warning log + +**AC-6: Docker container TPM access** +Given docker-compose.yml with /dev/tpm0 and /dev/tpmrm0 device mounts +When the loader container starts on a fused Jetson +Then TpmSecurityProvider can connect to fTPM via FAPI + +## Non-Functional Requirements + +**Performance** +- TPM seal/unseal latency must be under 500ms per operation + +**Compatibility** +- Must work on ARM64 Jetson Orin Nano with JetPack 6.1+ +- Must work inside Docker containers with --device mounts +- tpm2-pytss must be compatible with Python 3.11 and Cython compilation + +**Reliability** +- Graceful fallback to LegacySecurityProvider on any TPM initialization failure +- No crash on /dev/tpm0 absence — clean detection and fallback + +## Unit Tests + +| AC Ref | What to Test | Required Outcome | +|--------|-------------|-----------------| +| AC-1 | SecurityProvider factory with /dev/tpm0 mock present | TpmSecurityProvider selected | +| AC-2 | FAPI create_seal + unseal via swtpm | Data matches round-trip | +| AC-3 | SecurityProvider factory without /dev/tpm0 | LegacySecurityProvider selected | +| AC-4 | SECURITY_PROVIDER=legacy env var with /dev/tpm0 present | LegacySecurityProvider selected | +| AC-5 | /dev/tpm0 exists but FAPI raises exception | LegacySecurityProvider selected, warning logged | + +## Blackbox Tests + +| AC Ref | Initial Data/Conditions | What to Test | Expected Behavior | NFR References | +|--------|------------------------|-------------|-------------------|----------------| +| AC-3 | No TPM device available | POST /load/{filename} (split resource) | Existing binary-split behavior, all current tests pass | Compatibility | +| AC-6 | TPM simulator in Docker | Container starts with device mounts | FAPI connects, seal/unseal works | Compatibility | + +## Constraints + +- tpm2-pytss requires tpm2-tss >= 2.4.0 native library in the Docker image +- Tests require swtpm (software TPM simulator) — must be added to test infrastructure +- fTPM provisioning is out of scope — this task assumes a provisioned TPM exists +- PCR-based policy binding intentionally not used (known persistence issues on Orin Nano) + +## Risks & Mitigation + +**Risk 1: fTPM FAPI stability on Jetson Orin Nano** +- *Risk*: FAPI seal/unseal may have undocumented issues on Orin Nano (similar to PCR/NV persistence bugs) +- *Mitigation*: Design intentionally avoids PCR policies and NV indexes; uses SRK hierarchy only. Hardware validation required before production deployment. + +**Risk 2: swtpm test fidelity** +- *Risk*: Software TPM simulator may not reproduce all fTPM behaviors +- *Mitigation*: Integration tests on actual Jetson hardware as part of acceptance testing (outside CI). + +**Risk 3: tpm2-tss native library in Docker image** +- *Risk*: tpm2-tss may not be available in python:3.11-slim base image; ARM64 build may need compilation +- *Mitigation*: Add tpm2-tss to Dockerfile build step; verify ARM64 compatibility early. diff --git a/_docs/02_tasks/done/AZ-184_resumable_download_manager.md b/_docs/02_tasks/done/AZ-184_resumable_download_manager.md new file mode 100644 index 0000000..d602251 --- /dev/null +++ b/_docs/02_tasks/done/AZ-184_resumable_download_manager.md @@ -0,0 +1,79 @@ +# Resumable Download Manager + +**Task**: AZ-184_resumable_download_manager +**Name**: Resumable Download Manager +**Description**: Implement a resumable HTTP download manager for the loader that handles intermittent Starlink connectivity +**Complexity**: 3 points +**Dependencies**: None +**Component**: Loader +**Tracker**: AZ-184 +**Epic**: AZ-181 + +## Problem + +Jetsons on UAVs have intermittent Starlink connectivity. Downloads of large artifacts (AI models ~500MB, Docker images ~1GB) must survive connection drops and resume from where they left off. + +## Outcome + +- Downloads resume from the last byte received after connectivity loss +- Completed downloads are verified with SHA-256 before use +- Downloaded artifacts are decrypted with per-artifact AES-256 keys +- State persists across loader restarts + +## Scope + +### Included +- Resumable HTTP downloads using Range headers (S3 supports natively) +- JSON state file on disk tracking: url, expected_sha256, expected_size, bytes_downloaded, temp_file_path +- SHA-256 verification of completed downloads +- AES-256 decryption of downloaded artifacts using per-artifact key from /get-update response +- Retry with exponential backoff (1min, 5min, 15min, 1hr, max 4hr) +- State machine: pending -> downloading -> paused -> verifying -> decrypting -> complete / failed + +### Excluded +- Update check logic (AZ-185) +- Applying updates (AZ-185) +- CDN upload (AZ-186) + +## Acceptance Criteria + +**AC-1: Resume after connection drop** +Given a download is 60% complete and connectivity is lost +When connectivity returns +Then download resumes from byte offset (60% of file), not from scratch + +**AC-2: SHA-256 mismatch triggers re-download** +Given a completed download with corrupted data +When SHA-256 verification fails +Then the partial file is deleted and download restarts from scratch + +**AC-3: Decryption produces correct output** +Given a completed and verified download +When decrypted with the per-artifact AES-256 key +Then the output matches the original unencrypted artifact + +**AC-4: State survives restart** +Given a download is 40% complete and the loader container restarts +When the loader starts again +Then the download resumes from 40%, not from scratch + +**AC-5: Exponential backoff on repeated failures** +Given multiple consecutive connection failures +When retrying +Then wait times follow exponential backoff pattern + +## Unit Tests + +| AC Ref | What to Test | Required Outcome | +|--------|-------------|-----------------| +| AC-1 | Mock HTTP server drops connection mid-transfer | Resume with Range header from correct offset | +| AC-2 | Corrupt downloaded file | SHA-256 check fails, file deleted, retry flag set | +| AC-3 | Encrypt test file, download, decrypt | Round-trip matches original | +| AC-4 | Write state file, reload | State correctly restored | +| AC-5 | Track retry intervals | Backoff pattern matches spec | + +## Constraints + +- Must work inside Docker container +- S3-compatible CDN (current CDNManager already uses boto3) +- State file location must be on a volume that persists across container restarts diff --git a/_docs/02_tasks/done/AZ-185_update_manager.md b/_docs/02_tasks/done/AZ-185_update_manager.md new file mode 100644 index 0000000..7d39f90 --- /dev/null +++ b/_docs/02_tasks/done/AZ-185_update_manager.md @@ -0,0 +1,76 @@ +# Update Manager + +**Task**: AZ-185_update_manager +**Name**: Update Manager +**Description**: Implement the loader's background update loop that checks for new versions and applies AI model and Docker image updates +**Complexity**: 5 points +**Dependencies**: AZ-183, AZ-184 +**Component**: Loader +**Tracker**: AZ-185 +**Epic**: AZ-181 + +## Problem + +Jetsons need to automatically discover and install new AI models and Docker images without manual intervention. The update loop must handle version detection, server communication, and applying different update types. + +## Outcome + +- Loader automatically checks for updates every 5 minutes +- New AI models downloaded, decrypted, and placed in model directory +- New Docker images loaded and services restarted with minimal downtime +- Loader can update itself (self-update, applied last) + +## Scope + +### Included +- Version collector: scan model directory for .trt files (extract date from filename), query docker images for azaion/* tags, cache results +- Background loop (configurable interval, default 5 min): collect versions, call POST /get-update, trigger downloads +- Apply AI model: move decrypted .trt to model directory (detection API scans and picks newest) +- Apply Docker image: docker load -i, docker compose up -d {service} +- Self-update: loader updates itself last via docker compose up -d loader +- Integration with AZ-184 Resumable Download Manager for all downloads + +### Excluded +- Server-side /get-update endpoint (AZ-183) +- Download mechanics (AZ-184) +- CI/CD publish pipeline (AZ-186) +- Device provisioning (AZ-187) + +## Acceptance Criteria + +**AC-1: Version collector reads local state** +Given AI model azaion-2026-03-10.trt in model directory and Docker image azaion/annotations:arm64_2026-03-01 loaded +When version collector runs +Then it reports [{resource_name: "detection_model", version: "2026-03-10"}, {resource_name: "annotations", version: "arm64_2026-03-01"}] + +**AC-2: Background loop polls on schedule** +Given the loader is running with update interval set to 5 minutes +When 5 minutes elapse +Then POST /get-update is called with current versions + +**AC-3: AI model update applied** +Given /get-update returns a new detection_model version +When download and decryption complete +Then new .trt file is in the model directory + +**AC-4: Docker image update applied** +Given /get-update returns a new annotations version +When download and decryption complete +Then docker load succeeds and docker compose up -d annotations restarts the service + +**AC-5: Self-update applied last** +Given /get-update returns updates for both annotations and loader +When applying updates +Then annotations is updated first, loader is updated last + +**AC-6: Cached versions refresh after changes** +Given version collector cached its results +When a new model file appears in the directory or docker load completes +Then cache is invalidated and next collection reflects new state + +## Constraints + +- Docker socket must be mounted in the loader container (already the case) +- docker compose file path must be configurable (env var) +- Model directory path must be configurable (env var) +- Self-update must be robust: state file on disk ensures in-progress updates survive container restart diff --git a/_docs/02_tasks/done/AZ-186_cicd_artifact_publish.md b/_docs/02_tasks/done/AZ-186_cicd_artifact_publish.md new file mode 100644 index 0000000..c84f143 --- /dev/null +++ b/_docs/02_tasks/done/AZ-186_cicd_artifact_publish.md @@ -0,0 +1,67 @@ +# CI/CD Artifact Publish + +**Task**: AZ-186_cicd_artifact_publish +**Name**: CI/CD Artifact Publish +**Description**: Add encrypt-and-publish step to Woodpecker CI/CD pipeline and create a shared publish script usable by both CI/CD and training service +**Complexity**: 3 points +**Dependencies**: AZ-183 +**Component**: DevOps +**Tracker**: AZ-186 +**Epic**: AZ-181 + +## Problem + +Both CI/CD (for Docker images) and the training service (for AI models) need to encrypt artifacts and publish them to CDN + Resources table. The encryption and publish logic should be shared. + +## Outcome + +- Shared Python publish script that any producer can call +- Woodpecker pipeline automatically publishes encrypted Docker archives after build +- Training service can publish AI models using the same script +- Every artifact gets its own random AES-256 key + +## Scope + +### Included +- Shared publish script (Python): generate random AES-256 key, compress (gzip), encrypt (AES-256), SHA-256 hash, upload to S3, write Resources row +- Woodpecker pipeline step in build-arm.yml: after docker build+push, also docker save -> publish script +- S3 bucket structure: {dev_stage}/{resource_name}-{architecture}-{version}.enc +- Documentation for training service integration + +### Excluded +- Server-side Resources table (AZ-183, must exist first) +- Loader-side download/decrypt (AZ-184) +- Training service code changes (their team integrates the script) + +## Acceptance Criteria + +**AC-1: Publish script works end-to-end** +Given a local file (Docker archive or AI model) +When publish script is called with resource_name, dev_stage, architecture, version +Then file is compressed, encrypted with random key, uploaded to S3, and Resources row is written + +**AC-2: Woodpecker publishes after build** +Given a push to dev/stage/main branch +When Woodpecker build completes +Then the Docker image is also published as encrypted archive to CDN with Resources row + +**AC-3: Unique key per artifact** +Given two consecutive publishes of the same resource +When comparing encryption keys +Then each publish used a different random AES-256 key + +**AC-4: SHA-256 consistency** +Given a published artifact +When SHA-256 of the uploaded S3 object is computed +Then it matches the sha256 value in the Resources table + +**AC-5: Training service can use the script** +Given the publish script installed as a package or available as a standalone script +When the training service calls it after producing a .trt model +Then the model is published to CDN + Resources table + +## Constraints + +- Woodpecker runner has access to Docker socket and S3 credentials +- Publish script must work on both x86 (CI runner) and arm64 (training server if needed) +- S3 credentials and DB connection string passed via environment variables diff --git a/_docs/02_tasks/done/AZ-187_device_provisioning_script.md b/_docs/02_tasks/done/AZ-187_device_provisioning_script.md new file mode 100644 index 0000000..53e9bea --- /dev/null +++ b/_docs/02_tasks/done/AZ-187_device_provisioning_script.md @@ -0,0 +1,62 @@ +# Device Provisioning Script + +**Task**: AZ-187_device_provisioning_script +**Name**: Device Provisioning Script +**Description**: Create a shell script that provisions a Jetson device identity (CompanionPC user) during the fuse/flash pipeline +**Complexity**: 2 points +**Dependencies**: None +**Component**: DevOps +**Tracker**: AZ-187 +**Epic**: AZ-181 + +## Problem + +Each Jetson needs a unique CompanionPC user account for API authentication. This must be automated as part of the manufacturing/flash process so that provisioning 50+ devices is not manual. + +## Outcome + +- Single script creates device identity and embeds credentials in the rootfs +- Integrates into the fuse/flash pipeline between odmfuse.sh and flash.sh +- Provisioning runbook documents the full end-to-end flow + +## Scope + +### Included +- provision_device.sh: generate device email (azaion-jetson-{serial}@azaion.com), random 32-char password +- Call admin API POST /users to create Users row with Role=CompanionPC +- Write credentials config file to rootfs image (at known path, e.g., /etc/azaion/device.conf) +- Idempotency: re-running for same serial doesn't create duplicate user +- Provisioning runbook: step-by-step from unboxing through fusing, flashing, and first boot + +### Excluded +- fTPM provisioning (covered by NVIDIA's ftpm_provisioning.sh) +- Secure Boot fusing (covered by solution_draft02 Phase 1-2) +- OS hardening (covered by solution_draft02 Phase 3) +- Admin API user creation endpoint (assumed to exist) + +## Acceptance Criteria + +**AC-1: Script creates CompanionPC user** +Given a new device serial AZJN-0042 +When provision_device.sh is run with serial AZJN-0042 +Then admin API has a new user azaion-jetson-0042@azaion.com with Role=CompanionPC + +**AC-2: Credentials written to rootfs** +Given provision_device.sh completed successfully +When the rootfs image is inspected +Then /etc/azaion/device.conf contains the email and password + +**AC-3: Device can log in after flash** +Given a provisioned and flashed device boots for the first time +When the loader reads /etc/azaion/device.conf and calls POST /login +Then a valid JWT is returned + +**AC-4: Idempotent re-run** +Given provision_device.sh was already run for serial AZJN-0042 +When it is run again for the same serial +Then no duplicate user is created (existing user is reused or updated) + +**AC-5: Runbook complete** +Given the provisioning runbook +When followed step-by-step on a new Jetson Orin Nano +Then the device is fully fused, flashed, provisioned, and can communicate with the admin API diff --git a/_docs/03_implementation/batch_01_report.md b/_docs/03_implementation/batch_01_report.md index 708987b..9c8c947 100644 --- a/_docs/03_implementation/batch_01_report.md +++ b/_docs/03_implementation/batch_01_report.md @@ -1,18 +1,24 @@ # Batch Report **Batch**: 1 -**Tasks**: 01_test_infrastructure -**Date**: 2026-04-13 +**Tasks**: AZ-182, AZ-184, AZ-187 +**Date**: 2026-04-15 ## Task Results | Task | Status | Files Modified | Tests | AC Coverage | Issues | |------|--------|---------------|-------|-------------|--------| -| 01_test_infrastructure | Done | 12 files | 1/1 pass | 5/5 ACs (AC-1,2,3 require Docker) | None | +| AZ-182_tpm_security_provider | Done | 8 files | 8 pass (1 skip without swtpm) | 6/6 ACs covered | None | +| AZ-184_resumable_download_manager | Done | 2 files | 8 pass | 5/5 ACs covered | None | +| AZ-187_device_provisioning_script | Done | 3 files | 5 pass | 5/5 ACs covered | None | -## AC Test Coverage: 5/5 covered (3 require Docker environment) -## Code Review Verdict: PASS (infrastructure scaffold, no logic review needed) +## Excluded + +AZ-183 (Resources Table & Update API) — admin API repo, not this workspace. + +## AC Test Coverage: All covered (16/16) +## Code Review Verdict: PASS_WITH_WARNINGS ## Auto-Fix Attempts: 0 ## Stuck Agents: None -## Next Batch: 02_test_health_auth +## Next Batch: AZ-185, AZ-186 (Batch 2 — 8 points) diff --git a/_docs/03_implementation/batch_02_report.md b/_docs/03_implementation/batch_02_report.md index dee31d1..cca7f72 100644 --- a/_docs/03_implementation/batch_02_report.md +++ b/_docs/03_implementation/batch_02_report.md @@ -1,28 +1,19 @@ # Batch Report **Batch**: 2 -**Tasks**: 02_test_health_auth -**Date**: 2026-04-13 +**Tasks**: AZ-185, AZ-186 +**Date**: 2026-04-15 ## Task Results | Task | Status | Files Modified | Tests | AC Coverage | Issues | |------|--------|---------------|-------|-------------|--------| -| 02_test_health_auth | Done | 2 files | 6 tests | 5/5 ACs covered | None | +| AZ-185_update_manager | Done | 4 files | 10 pass | 6/6 ACs covered | None | +| AZ-186_cicd_artifact_publish | Done | 3 files | 8 pass | 5/5 ACs covered | None | -## AC Test Coverage: All covered - -| AC | Test | Status | -|----|------|--------| -| AC-1: Health returns 200 | test_health_returns_200 | Covered | -| AC-2: Status unauthenticated | test_status_unauthenticated | Covered | -| AC-3: Login valid | test_login_valid_credentials | Covered | -| AC-4: Login invalid | test_login_invalid_credentials | Covered | -| AC-5: Login empty body | test_login_empty_body | Covered | -| AC-2+3: Status authenticated | test_status_authenticated_after_login | Covered | - -## Code Review Verdict: PASS +## AC Test Coverage: All covered (11/11) +## Code Review Verdict: PASS_WITH_WARNINGS ## Auto-Fix Attempts: 0 ## Stuck Agents: None -## Next Batch: 03_test_resources, 04_test_unlock, 05_test_resilience_perf (parallel) +## Next Batch: All tasks complete diff --git a/_docs/03_implementation/reviews/batch_02_review.md b/_docs/03_implementation/reviews/batch_02_review.md new file mode 100644 index 0000000..1cf1b26 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_02_review.md @@ -0,0 +1,41 @@ +# Code Review Report + +**Batch**: 2 (AZ-185, AZ-186) +**Date**: 2026-04-15 +**Verdict**: PASS_WITH_WARNINGS + +## Spec Compliance + +All 11 acceptance criteria across 2 tasks are satisfied with corresponding tests. + +| Task | ACs | Covered | Status | +|------|-----|---------|--------| +| AZ-185 Update Manager | 6 | 6/6 | All pass (10 tests) | +| AZ-186 CI/CD Artifact Publish | 5 | 5/5 | All pass (8 tests) | + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Low | Style | scripts/publish_artifact.py | Union syntax fixed | +| 2 | Low | Maintainability | src/main.py:15 | Deprecated on_event startup | + +### Finding Details + +**F1: Union syntax fixed** (Low / Style) +- Location: `scripts/publish_artifact.py:172,182` +- Description: Used `list[str] | None` syntax, fixed to `Optional[List[str]]` for consistency +- Status: Fixed + +**F2: Deprecated on_event startup** (Low / Maintainability) +- Location: `src/main.py:15` +- Description: `@app.on_event("startup")` is deprecated in modern FastAPI in favor of lifespan context manager +- Suggestion: Migrate to `@asynccontextmanager lifespan` when upgrading FastAPI — not blocking +- Task: AZ-185 + +## Cross-Task Consistency + +- AZ-185 uses AZ-184's `ResumableDownloadManager` correctly via its public API +- AZ-186 encrypt format (IV + AES-CBC + PKCS7) is compatible with AZ-184's `decrypt_cbc_file()` +- AZ-186's `encryption_key` is hex-encoded; AZ-185's `_aes_key_from_encryption_field` handles hex decoding +- Self-update ordering (loader last) correctly implemented in AZ-185 diff --git a/_docs/_autopilot_state.md b/_docs/_autopilot_state.md index 789907e..4355925 100644 --- a/_docs/_autopilot_state.md +++ b/_docs/_autopilot_state.md @@ -2,8 +2,9 @@ ## Current Step flow: existing-code -step: 7 -name: Refactor -status: done -sub_step: 7 — Phase 7 Documentation (complete) +step: 9 +name: Implement +status: in_progress +sub_step: 6 — Launch Batch 2 implementers retry_count: 0 +current_task: Batch 2 (AZ-185, AZ-186) diff --git a/scripts/publish_artifact.py b/scripts/publish_artifact.py new file mode 100644 index 0000000..70a0dd5 --- /dev/null +++ b/scripts/publish_artifact.py @@ -0,0 +1,199 @@ +import argparse +import gzip +import hashlib +import logging +import os +import secrets +import shutil +import sys +import tempfile +from typing import Any, Dict, List, Optional + +import boto3 +import requests +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +logger = logging.getLogger(__name__) + +_DEFAULT_PUBLISH_PATH = "/internal/resources/publish" + + +def _require_env(name: str) -> str: + value = os.environ.get(name) + if not value: + raise ValueError(f"missing required environment variable: {name}") + return value + + +def object_key(dev_stage: str, resource_name: str, architecture: str, version: str) -> str: + return f"{dev_stage}/{resource_name}-{architecture}-{version}.enc" + + +def build_cdn_url(endpoint: str, bucket: str, key: str) -> str: + public_base = os.environ.get("CDN_PUBLIC_BASE_URL") + if public_base: + return f"{public_base.rstrip('/')}/{key}" + return f"{endpoint.rstrip('/')}/{bucket}/{key}" + + +def gzip_file(source_path: str, destination_path: str) -> None: + with open(source_path, "rb") as src, gzip.open( + destination_path, "wb", compresslevel=9 + ) as dst: + shutil.copyfileobj(src, dst, length=1024 * 1024) + + +def encrypt_aes256_cbc_file(plaintext_path: str, ciphertext_path: str, aes_key: bytes) -> None: + if len(aes_key) != 32: + raise ValueError("aes key must be 32 bytes") + iv = os.urandom(16) + cipher = Cipher( + algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend() + ) + encryptor = cipher.encryptor() + padder = padding.PKCS7(128).padder() + with open(ciphertext_path, "wb") as out: + out.write(iv) + with open(plaintext_path, "rb") as inp: + while True: + chunk = inp.read(1024 * 1024) + if not chunk: + break + padded = padder.update(chunk) + if padded: + out.write(encryptor.update(padded)) + tail = padder.finalize() + if tail: + out.write(encryptor.update(tail)) + out.write(encryptor.finalize()) + + +def sha256_file(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + while True: + block = f.read(1024 * 1024) + if not block: + break + h.update(block) + return h.hexdigest().lower() + + +def upload_s3_file( + endpoint: str, + access_key: str, + secret_key: str, + bucket: str, + key: str, + file_path: str, +) -> None: + client = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + with open(file_path, "rb") as body: + client.upload_fileobj(body, bucket, key) + + +def register_resource( + admin_base_url: str, + token: str, + payload: Dict[str, Any], +) -> None: + path = os.environ.get("ADMIN_API_PUBLISH_PATH", _DEFAULT_PUBLISH_PATH).lstrip("/") + base = admin_base_url.rstrip("/") + url = f"{base}/{path}" + resp = requests.post( + url, + headers={"Authorization": f"Bearer {token}"}, + json=payload, + timeout=120, + ) + resp.raise_for_status() + + +def publish( + file_path: str, + resource_name: str, + dev_stage: str, + architecture: str, + version: str, +) -> Dict[str, Any]: + endpoint = _require_env("S3_ENDPOINT") + access_key = _require_env("S3_ACCESS_KEY") + secret_key = _require_env("S3_SECRET_KEY") + bucket = _require_env("S3_BUCKET") + admin_url = _require_env("ADMIN_API_URL") + admin_token = _require_env("ADMIN_API_TOKEN") + + key = object_key(dev_stage, resource_name, architecture, version) + aes_key = secrets.token_bytes(32) + encryption_key_hex = aes_key.hex() + + gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name + enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name + try: + gzip_file(file_path, gz_path) + encrypt_aes256_cbc_file(gz_path, enc_path, aes_key) + digest = sha256_file(enc_path) + size_bytes = os.path.getsize(enc_path) + upload_s3_file(endpoint, access_key, secret_key, bucket, key, enc_path) + cdn_url = build_cdn_url(endpoint, bucket, key) + body = { + "resource_name": resource_name, + "dev_stage": dev_stage, + "architecture": architecture, + "version": version, + "cdn_url": cdn_url, + "sha256": digest, + "encryption_key": encryption_key_hex, + "size_bytes": size_bytes, + } + register_resource(admin_url, admin_token, body) + return { + "object_key": key, + "cdn_url": cdn_url, + "sha256": digest, + "encryption_key_hex": encryption_key_hex, + "size_bytes": size_bytes, + } + finally: + for p in (gz_path, enc_path): + try: + os.unlink(p) + except OSError: + pass + + +def parse_args(argv: List[str]) -> argparse.Namespace: + p = argparse.ArgumentParser(description="Compress, encrypt, upload artifact and register resource") + p.add_argument("--file", required=True, help="Path to file to publish") + p.add_argument("--resource-name", required=True) + p.add_argument("--dev-stage", required=True) + p.add_argument("--architecture", required=True) + p.add_argument("--version", required=True) + return p.parse_args(argv) + + +def main(argv: Optional[List[str]] = None) -> int: + args = parse_args(argv if argv is not None else sys.argv[1:]) + try: + publish( + args.file, + args.resource_name, + args.dev_stage, + args.architecture, + args.version, + ) + return 0 + except Exception: + logger.exception("publish failed") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/main.py b/src/main.py index c87e5eb..23f4c52 100644 --- a/src/main.py +++ b/src/main.py @@ -11,6 +11,15 @@ from security_provider import create_security_provider app = FastAPI(title="Azaion.Loader") + +@app.on_event("startup") +def _startup_update_manager(): + try: + from update_manager import maybe_start_update_background + except Exception: + return + maybe_start_update_background(get_api_client, RESOURCE_API_URL) + security_provider = create_security_provider() RESOURCE_API_URL = os.environ.get("RESOURCE_API_URL", "https://api.azaion.com") diff --git a/src/update_manager.py b/src/update_manager.py new file mode 100644 index 0000000..35fe460 --- /dev/null +++ b/src/update_manager.py @@ -0,0 +1,266 @@ +import hashlib +import json +import os +import subprocess +import threading +from typing import Any, Callable, Dict, List, Optional + +import requests +from loguru import logger + +from download_manager import ResumableDownloadManager +from version_collector import VersionCollector + + +def _aes_key_from_encryption_field(encryption_key: Any) -> bytes: + if isinstance(encryption_key, bytes): + if len(encryption_key) == 32: + return encryption_key + raise ValueError("invalid encryption key") + s = str(encryption_key).strip() + if len(s) == 64 and all(c in "0123456789abcdefABCDEF" for c in s): + return bytes.fromhex(s) + return hashlib.sha256(s.encode("utf-8")).digest() + + +def _sort_services_loader_last(services: List[str]) -> List[str]: + head = sorted(s for s in services if s != "loader") + tail = [s for s in services if s == "loader"] + return head + tail + + +def _sort_updates_loader_last(updates: List[dict]) -> List[dict]: + rest = [u for u in updates if u.get("resourceName") != "loader"] + rest.sort(key=lambda u: str(u.get("resourceName", ""))) + loader = [u for u in updates if u.get("resourceName") == "loader"] + return rest + loader + + +class UpdateManager: + def __init__( + self, + api_url: str, + get_token: Callable[[], Optional[str]], + download_manager: ResumableDownloadManager, + version_collector: VersionCollector, + compose_file: str, + model_dir: str, + state_path: str, + interval_seconds: float = 300.0, + *, + subprocess_run: Optional[Callable] = None, + post_get_update: Optional[Callable[..., Any]] = None, + head_content_length: Optional[Callable[..., int]] = None, + stop_event: Optional[threading.Event] = None, + wait_fn: Optional[Callable[[float], bool]] = None, + ) -> None: + self._api_url = api_url.rstrip("/") + self._get_token = get_token + self._download_manager = download_manager + self._version_collector = version_collector + self._compose_file = compose_file + self._model_dir = model_dir + self._state_path = state_path + self._interval = interval_seconds + self._subprocess_run = subprocess_run or subprocess.run + self._post_get_update = post_get_update or self._default_post_get_update + self._head_content_length = head_content_length or self._default_head_content_length + self._stop_event = stop_event or threading.Event() + self._wait_fn = wait_fn + + def _default_post_get_update(self, token: str, body: dict) -> Any: + url = f"{self._api_url}/get-update" + resp = requests.post( + url, + json=body, + headers={"Authorization": f"Bearer {token}"}, + timeout=120, + ) + resp.raise_for_status() + return resp.json() + + def _default_head_content_length(self, url: str, token: str) -> int: + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + resp = requests.head(url, headers=headers, allow_redirects=True, timeout=120) + resp.raise_for_status() + cl = resp.headers.get("Content-Length") + if not cl: + raise ValueError("missing Content-Length") + return int(cl) + + def _load_state(self) -> dict: + if not os.path.isfile(self._state_path): + return {"pending_compose": []} + with open(self._state_path, encoding="utf-8") as f: + data = json.load(f) + if "pending_compose" not in data: + data["pending_compose"] = [] + return data + + def _save_state(self, data: dict) -> None: + directory = os.path.dirname(self._state_path) + if directory: + os.makedirs(directory, exist_ok=True) + tmp = self._state_path + ".tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, sort_keys=True) + os.replace(tmp, self._state_path) + + def _drain_pending_compose(self) -> None: + state = self._load_state() + pending = list(dict.fromkeys(state.get("pending_compose") or [])) + if not pending: + return + for svc in _sort_services_loader_last(pending): + self._subprocess_run( + ["docker", "compose", "-f", self._compose_file, "up", "-d", svc], + check=True, + ) + state["pending_compose"] = [] + self._save_state(state) + + def _current_versions_payload(self) -> Dict[str, str]: + rows = self._version_collector.collect() + return {r.resource_name: r.version for r in rows} + + def _build_get_update_body(self) -> dict: + return { + "dev_stage": os.environ.get("LOADER_DEV_STAGE", ""), + "architecture": os.environ.get("LOADER_ARCH", ""), + "current_versions": self._current_versions_payload(), + } + + def _artifact_size(self, url: str, token: str) -> int: + return self._head_content_length(url, token) + + def _apply_model(self, item: dict, token: str) -> None: + name = str(item["resourceName"]) + version = str(item["version"]) + url = str(item["cdnUrl"]) + sha256 = str(item["sha256"]) + key = _aes_key_from_encryption_field(item["encryptionKey"]) + size = self._artifact_size(url, token) + job_id = f"update-{name}-{version}" + os.makedirs(self._model_dir, exist_ok=True) + out_path = os.path.join(self._model_dir, f"azaion-{version}.trt") + self._download_manager.fetch_decrypt_verify( + job_id, + url, + sha256, + size, + key, + out_path, + ) + self._version_collector.invalidate() + + def _mark_pending_compose(self, service: str) -> None: + state = self._load_state() + pending = list(state.get("pending_compose") or []) + if service not in pending: + pending.append(service) + state["pending_compose"] = pending + self._save_state(state) + + def _clear_pending_compose(self, service: str) -> None: + state = self._load_state() + pending = [s for s in (state.get("pending_compose") or []) if s != service] + state["pending_compose"] = pending + self._save_state(state) + + def _apply_docker_image(self, item: dict, token: str) -> None: + name = str(item["resourceName"]) + version = str(item["version"]) + url = str(item["cdnUrl"]) + sha256 = str(item["sha256"]) + key = _aes_key_from_encryption_field(item["encryptionKey"]) + size = self._artifact_size(url, token) + job_id = f"update-{name}-{version}" + artifact_dir = os.path.dirname(self._state_path) + os.makedirs(artifact_dir, exist_ok=True) + out_tar = os.path.join(artifact_dir, f"{job_id}.plaintext.tar") + self._download_manager.fetch_decrypt_verify( + job_id, + url, + sha256, + size, + key, + out_tar, + ) + self._subprocess_run(["docker", "load", "-i", out_tar], check=True) + self._version_collector.invalidate() + self._mark_pending_compose(name) + self._subprocess_run( + ["docker", "compose", "-f", self._compose_file, "up", "-d", name], + check=True, + ) + self._clear_pending_compose(name) + + def _tick_once(self) -> None: + token = self._get_token() + if not token: + return + self._drain_pending_compose() + body = self._build_get_update_body() + updates = self._post_get_update(token, body) + if not isinstance(updates, list): + return + for item in _sort_updates_loader_last(updates): + rname = str(item.get("resourceName", "")) + if rname == "detection_model": + self._apply_model(item, token) + else: + self._apply_docker_image(item, token) + + def run_forever(self) -> None: + while not self._stop_event.is_set(): + try: + self._drain_pending_compose() + self._tick_once() + except Exception as exc: + logger.exception("update manager tick failed: {}", exc) + if self._wait_fn is not None: + if self._wait_fn(self._interval): + break + elif self._stop_event.wait(self._interval): + break + + +def maybe_start_update_background( + get_api_client: Callable[[], Any], + api_url: str, +) -> None: + state_dir = os.environ.get("LOADER_DOWNLOAD_STATE_DIR") + if not state_dir: + return + model_dir = os.environ.get("LOADER_MODEL_DIR", "models") + compose_file = os.environ.get("LOADER_COMPOSE_FILE", "docker-compose.yml") + interval = float(os.environ.get("LOADER_UPDATE_INTERVAL_SEC", "300")) + orchestrator_path = os.environ.get( + "LOADER_UPDATE_STATE_PATH", + os.path.join(state_dir, "update_orchestrator.json"), + ) + + def token_getter() -> Optional[str]: + client = get_api_client() + return getattr(client, "token", None) + + try: + dm = ResumableDownloadManager(state_dir) + vc = VersionCollector(model_dir) + um = UpdateManager( + api_url, + token_getter, + dm, + vc, + compose_file, + model_dir, + orchestrator_path, + interval_seconds=interval, + ) + except Exception as exc: + logger.exception("update manager failed to start: {}", exc) + return + + threading.Thread(target=um.run_forever, name="loader-updates", daemon=True).start() diff --git a/src/version_collector.py b/src/version_collector.py new file mode 100644 index 0000000..79f0690 --- /dev/null +++ b/src/version_collector.py @@ -0,0 +1,91 @@ +import os +import re +import subprocess +from dataclasses import asdict, dataclass +from typing import Callable, List, Optional + +TRT_DATE_PATTERN = re.compile(r"^azaion-(\d{4}-\d{2}-\d{2})\.trt$", re.IGNORECASE) + + +@dataclass(frozen=True) +class ResourceVersion: + resource_name: str + version: str + + +class VersionCollector: + def __init__( + self, + model_dir: str, + *, + subprocess_run: Optional[Callable] = None, + ) -> None: + self._model_dir = model_dir + self._subprocess_run = subprocess_run or subprocess.run + self._cache: Optional[List[ResourceVersion]] = None + + def invalidate(self) -> None: + self._cache = None + + def collect(self) -> List[ResourceVersion]: + if self._cache is not None: + return list(self._cache) + rows = self._collect_uncached() + self._cache = rows + return list(rows) + + def collect_as_dicts(self) -> List[dict]: + return [asdict(r) for r in self.collect()] + + def _collect_uncached(self) -> List[ResourceVersion]: + out: List[ResourceVersion] = [] + mv = self._best_trt_version() + if mv is not None: + out.append(ResourceVersion("detection_model", mv)) + out.extend(self._docker_versions()) + rest = [r for r in out if r.resource_name != "detection_model"] + rest.sort(key=lambda r: r.resource_name) + if mv is not None: + return [ResourceVersion("detection_model", mv)] + rest + return rest + + def _best_trt_version(self) -> Optional[str]: + if not os.path.isdir(self._model_dir): + return None + best: Optional[str] = None + for name in os.listdir(self._model_dir): + m = TRT_DATE_PATTERN.match(name) + if not m: + continue + v = m.group(1) + if best is None or v > best: + best = v + return best + + def _docker_versions(self) -> List[ResourceVersion]: + try: + result = self._subprocess_run( + ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"], + capture_output=True, + text=True, + check=True, + ) + except (OSError, subprocess.CalledProcessError): + return [] + found: List[ResourceVersion] = [] + for line in result.stdout.splitlines(): + line = line.strip() + if not line or ":" in line: + continue + if not line.startswith("azaion/"): + continue + if ":" not in line: + continue + repo, tag = line.rsplit(":", 1) + if tag in ("", ""): + continue + parts = repo.split("/", 1) + if len(parts) < 2: + continue + found.append(ResourceVersion(parts[1], tag)) + return found diff --git a/tests/test_publish_artifact.py b/tests/test_publish_artifact.py new file mode 100644 index 0000000..d9bd8e4 --- /dev/null +++ b/tests/test_publish_artifact.py @@ -0,0 +1,331 @@ +import gzip +import importlib.util +import io +import os +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +import yaml + +from download_manager import decrypt_cbc_file + +_ROOT = Path(__file__).resolve().parents[1] +_SCRIPT = _ROOT / "scripts" / "publish_artifact.py" +_WOODPECKER = _ROOT / ".woodpecker" / "build-arm.yml" + + +def _load_publish(): + spec = importlib.util.spec_from_file_location("publish_artifact", _SCRIPT) + mod = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(mod) + return mod + + +def _s3_client_factory(storage): + def client(service_name, **kwargs): + if service_name != "s3": + raise AssertionError(service_name) + m = MagicMock() + + def upload_fileobj(body, bucket, key): + storage.setdefault(bucket, {})[key] = body.read() + + m.upload_fileobj.side_effect = upload_fileobj + + def get_object(Bucket=None, Key=None): + return {"Body": io.BytesIO(storage[Bucket][Key])} + + m.get_object.side_effect = get_object + return m + + return client + + +class TestPublishArtifact(unittest.TestCase): + def setUp(self): + self._env_patch = None + + def tearDown(self): + if self._env_patch: + self._env_patch.stop() + + def _base_env(self): + return { + "S3_ENDPOINT": "https://s3.example.test", + "S3_ACCESS_KEY": "ak", + "S3_SECRET_KEY": "sk", + "S3_BUCKET": "test-bucket", + "ADMIN_API_URL": "https://admin.example.test", + "ADMIN_API_TOKEN": "token", + } + + def test_ac1_end_to_end_publish(self): + # Arrange + mod = _load_publish() + env = self._base_env() + self._env_patch = patch.dict(os.environ, env, clear=False) + self._env_patch.start() + captured = {} + storage = {} + + def fake_post(url, headers=None, json=None, timeout=None): + class R: + status_code = 200 + + def raise_for_status(self): + pass + + captured["url"] = url + captured["body"] = json + return R() + + fd, src = tempfile.mkstemp() + os.close(fd) + try: + with open(src, "wb") as f: + f.write(b"artifact-bytes") + with patch.object( + mod.boto3, "client", side_effect=_s3_client_factory(storage) + ), patch.object(mod.requests, "post", side_effect=fake_post): + # Act + out = mod.publish( + src, + "loader", + "dev", + "arm64", + "v1", + ) + # Assert + self.assertEqual( + out["object_key"], + "dev/loader-arm64-v1.enc", + ) + key = out["object_key"] + body = storage["test-bucket"][key] + h = __import__("hashlib").sha256(body).hexdigest().lower() + self.assertEqual(h, out["sha256"]) + self.assertEqual(captured["body"]["sha256"], out["sha256"]) + self.assertEqual(captured["body"]["size_bytes"], len(body)) + self.assertEqual(captured["body"]["encryption_key"], out["encryption_key_hex"]) + self.assertEqual(captured["body"]["cdn_url"], out["cdn_url"]) + finally: + os.unlink(src) + + def test_ac2_woodpecker_publish_step_after_build(self): + # Arrange + raw = _WOODPECKER.read_text(encoding="utf-8") + # Act + doc = yaml.safe_load(raw) + names = [s["name"] for s in doc["steps"]] + # Assert + self.assertIn("build-push", names) + self.assertIn("publish-artifact", names) + self.assertLess(names.index("build-push"), names.index("publish-artifact")) + build_cmds = "\n".join(doc["steps"][names.index("build-push")]["commands"]) + self.assertIn("docker save", build_cmds) + pub_cmds = "\n".join(doc["steps"][names.index("publish-artifact")]["commands"]) + self.assertIn("publish_artifact.py", pub_cmds) + self.assertIn("loader-image.tar", pub_cmds) + + def test_ac3_unique_key_per_publish(self): + # Arrange + mod = _load_publish() + self._env_patch = patch.dict(os.environ, self._base_env(), clear=False) + self._env_patch.start() + keys = [] + storage = {} + + def capture_post(url, headers=None, json=None, timeout=None): + keys.append(json["encryption_key"]) + + class R: + status_code = 200 + + def raise_for_status(self): + pass + + return R() + + fd, src = tempfile.mkstemp() + os.close(fd) + try: + with open(src, "wb") as f: + f.write(b"x") + with patch.object( + mod.boto3, "client", side_effect=_s3_client_factory(storage) + ), patch.object(mod.requests, "post", side_effect=capture_post): + # Act + mod.publish(src, "r", "dev", "arm64", "1") + mod.publish(src, "r", "dev", "arm64", "2") + # Assert + self.assertEqual(len(keys), 2) + self.assertNotEqual(keys[0], keys[1]) + self.assertEqual(len(bytes.fromhex(keys[0])), 32) + self.assertEqual(len(bytes.fromhex(keys[1])), 32) + finally: + os.unlink(src) + + def test_ac4_sha256_matches_s3_object_and_registration(self): + # Arrange + mod = _load_publish() + self._env_patch = patch.dict(os.environ, self._base_env(), clear=False) + self._env_patch.start() + posted = {} + storage = {} + + def fake_post(url, headers=None, json=None, timeout=None): + posted.update(json) + + class R: + status_code = 200 + + def raise_for_status(self): + pass + + return R() + + fd, src = tempfile.mkstemp() + os.close(fd) + try: + with open(src, "wb") as f: + f.write(b"payload-for-hash") + with patch.object( + mod.boto3, "client", side_effect=_s3_client_factory(storage) + ), patch.object(mod.requests, "post", side_effect=fake_post): + # Act + out = mod.publish(src, "m", "stage", "arm64", "9.9.9") + key = out["object_key"] + body = storage["test-bucket"][key] + expect = __import__("hashlib").sha256(body).hexdigest().lower() + # Assert + self.assertEqual(posted["sha256"], expect) + self.assertEqual(out["sha256"], expect) + finally: + os.unlink(src) + + def test_ac5_main_entry_matches_cli_invocation(self): + # Arrange + mod = _load_publish() + self._env_patch = patch.dict(os.environ, self._base_env(), clear=False) + self._env_patch.start() + storage = {} + + def ok_post(url, headers=None, json=None, timeout=None): + class R: + status_code = 200 + + def raise_for_status(self): + pass + + return R() + + fd, src = tempfile.mkstemp() + os.close(fd) + try: + with open(src, "wb") as f: + f.write(b"cli-data") + with patch.object( + mod.boto3, "client", side_effect=_s3_client_factory(storage) + ), patch.object(mod.requests, "post", side_effect=ok_post): + # Act + code = mod.main( + [ + "--file", + src, + "--resource-name", + "model", + "--dev-stage", + "dev", + "--architecture", + "arm64", + "--version", + "0.0.1", + ] + ) + # Assert + self.assertEqual(code, 0) + self.assertGreater( + len(storage["test-bucket"]["dev/model-arm64-0.0.1.enc"]), 0 + ) + finally: + os.unlink(src) + + def test_ac5_cli_help_exits_zero(self): + # Act + r = subprocess.run( + [sys.executable, str(_SCRIPT), "--help"], + cwd=str(_ROOT), + capture_output=True, + text=True, + ) + # Assert + self.assertEqual(r.returncode, 0) + self.assertIn("--resource-name", r.stdout) + + def test_ac5_subprocess_script_missing_env_exits_nonzero(self): + # Arrange + fd, path = tempfile.mkstemp() + os.close(fd) + try: + minimal_env = { + k: v + for k, v in os.environ.items() + if k in ("PATH", "HOME", "TMPDIR", "SYSTEMROOT") + } + # Act + r = subprocess.run( + [ + sys.executable, + str(_SCRIPT), + "--file", + path, + "--resource-name", + "x", + "--dev-stage", + "d", + "--architecture", + "arm64", + "--version", + "1", + ], + cwd=str(_ROOT), + env=minimal_env, + capture_output=True, + text=True, + ) + # Assert + self.assertNotEqual(r.returncode, 0) + finally: + os.unlink(path) + + def test_encryption_compatible_with_decrypt_cbc_file(self): + # Arrange + mod = _load_publish() + aes_key = os.urandom(32) + fd, plain = tempfile.mkstemp() + os.close(fd) + gz_path = tempfile.NamedTemporaryFile(delete=False, suffix=".gz").name + enc_path = tempfile.NamedTemporaryFile(delete=False, suffix=".enc").name + dec_path = tempfile.NamedTemporaryFile(delete=False, suffix=".bin").name + try: + with open(plain, "wb") as f: + f.write(b"round-trip-plain") + mod.gzip_file(plain, gz_path) + mod.encrypt_aes256_cbc_file(gz_path, enc_path, aes_key) + # Act + decrypt_cbc_file(enc_path, aes_key, dec_path) + with open(dec_path, "rb") as f: + restored = gzip.decompress(f.read()) + # Assert + self.assertEqual(restored, b"round-trip-plain") + finally: + for p in (plain, gz_path, enc_path, dec_path): + try: + os.unlink(p) + except OSError: + pass diff --git a/tests/test_update_manager.py b/tests/test_update_manager.py new file mode 100644 index 0000000..c40ad15 --- /dev/null +++ b/tests/test_update_manager.py @@ -0,0 +1,351 @@ +import json +import os +import subprocess +import tempfile +import unittest +from typing import List +from unittest.mock import MagicMock + +from download_manager import ResumableDownloadManager +from update_manager import UpdateManager, maybe_start_update_background +from version_collector import VersionCollector + + +class TestUpdateManager(unittest.TestCase): + def _make_manager( + self, + tmp: str, + *, + post_get_update=None, + subprocess_run=None, + head_content_length=None, + wait_fn=None, + stop_event=None, + ): + dm_dir = os.path.join(tmp, "dm") + model_dir = os.path.join(tmp, "models") + state_path = os.path.join(dm_dir, "update_orchestrator.json") + os.makedirs(model_dir, exist_ok=True) + dm = ResumableDownloadManager(dm_dir) + vc = VersionCollector(model_dir, subprocess_run=subprocess_run or MagicMock()) + um = UpdateManager( + "http://api.test", + lambda: "tok", + dm, + vc, + os.path.join(tmp, "compose.yml"), + model_dir, + state_path, + interval_seconds=300.0, + subprocess_run=subprocess_run, + post_get_update=post_get_update, + head_content_length=head_content_length, + wait_fn=wait_fn, + stop_event=stop_event, + ) + return um, dm, vc + + def test_ac2_background_loop_polls_on_schedule(self): + # Arrange + tmp = tempfile.mkdtemp() + posts: List[dict] = [] + + def post(token, body): + posts.append({"token": token, "body": body}) + return [] + + waits: List[float] = [] + + def wait_fn(interval): + waits.append(interval) + return len(waits) >= 2 + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + um, _, _ = self._make_manager( + tmp, + post_get_update=post, + subprocess_run=fake_run, + head_content_length=lambda url, token: 1, + wait_fn=wait_fn, + ) + # Act + um.run_forever() + # Assert + self.assertEqual(len(posts), 2) + self.assertEqual(waits, [300.0, 300.0]) + + def test_ac2_default_interval_is_five_minutes(self): + # Arrange / Act + tmp = tempfile.mkdtemp() + dm_dir = os.path.join(tmp, "dm") + model_dir = os.path.join(tmp, "m") + os.makedirs(model_dir, exist_ok=True) + dm = ResumableDownloadManager(dm_dir) + vc = VersionCollector(model_dir, subprocess_run=MagicMock()) + um = UpdateManager( + "http://x", + lambda: None, + dm, + vc, + "c.yml", + model_dir, + os.path.join(dm_dir, "st.json"), + ) + # Assert + self.assertEqual(um._interval, 300.0) + + def test_ac3_ai_model_update_applied(self): + # Arrange + tmp = tempfile.mkdtemp() + model_dir = os.path.join(tmp, "models") + os.makedirs(model_dir, exist_ok=True) + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + dm_mock = MagicMock() + + def post(token, body): + return [ + { + "resourceName": "detection_model", + "version": "2026-04-20", + "cdnUrl": "http://cdn/x", + "sha256": "ab", + "encryptionKey": "k", + } + ] + + um, _, _ = self._make_manager( + tmp, + post_get_update=post, + subprocess_run=fake_run, + head_content_length=lambda url, token: 4, + ) + um._download_manager = dm_mock + + def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): + with open(output_plaintext_path, "wb") as f: + f.write(b"trt") + + dm_mock.fetch_decrypt_verify.side_effect = capture_fetch + # Act + um._tick_once() + # Assert + dm_mock.fetch_decrypt_verify.assert_called_once() + args, kwargs = dm_mock.fetch_decrypt_verify.call_args + self.assertTrue(args[5].endswith("azaion-2026-04-20.trt")) + self.assertTrue(os.path.isfile(os.path.join(model_dir, "azaion-2026-04-20.trt"))) + + def test_ac4_docker_image_update_applied(self): + # Arrange + tmp = tempfile.mkdtemp() + recorded: List[List[str]] = [] + + def fake_run(cmd, **kwargs): + recorded.append(list(cmd)) + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:3] == ["docker", "load", "-i"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:2] == ["docker", "compose"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + dm_mock = MagicMock() + + def post(token, body): + return [ + { + "resourceName": "annotations", + "version": "2026-04-13", + "cdnUrl": "http://cdn/a", + "sha256": "cd", + "encryptionKey": "k", + } + ] + + um, _, _ = self._make_manager( + tmp, + post_get_update=post, + subprocess_run=fake_run, + head_content_length=lambda url, token: 8, + ) + um._download_manager = dm_mock + + def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): + with open(output_plaintext_path, "wb") as f: + f.write(b"tarbytes") + + dm_mock.fetch_decrypt_verify.side_effect = capture_fetch + # Act + um._tick_once() + # Assert + loads = [c for c in recorded if c[:3] == ["docker", "load", "-i"]] + composes = [c for c in recorded if c[:2] == ["docker", "compose"]] + self.assertEqual(len(loads), 1) + self.assertEqual(len(composes), 1) + self.assertIn("annotations", composes[0]) + + def test_ac5_self_update_applied_last(self): + # Arrange + tmp = tempfile.mkdtemp() + recorded: List[str] = [] + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:3] == ["docker", "load", "-i"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:2] == ["docker", "compose"]: + recorded.append(cmd[-1]) + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + dm_mock = MagicMock() + + def post(token, body): + return [ + { + "resourceName": "loader", + "version": "v2", + "cdnUrl": "http://cdn/l", + "sha256": "00", + "encryptionKey": "k", + }, + { + "resourceName": "annotations", + "version": "v1", + "cdnUrl": "http://cdn/a", + "sha256": "11", + "encryptionKey": "k", + }, + ] + + um, _, _ = self._make_manager( + tmp, + post_get_update=post, + subprocess_run=fake_run, + head_content_length=lambda url, token: 1, + ) + um._download_manager = dm_mock + + def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): + with open(output_plaintext_path, "wb") as f: + f.write(b"x") + + dm_mock.fetch_decrypt_verify.side_effect = capture_fetch + # Act + um._tick_once() + # Assert + self.assertEqual(recorded, ["annotations", "loader"]) + + def test_ac6_invalidate_after_docker_apply(self): + # Arrange + tmp = tempfile.mkdtemp() + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:3] == ["docker", "load", "-i"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:2] == ["docker", "compose"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + dm_mock = MagicMock() + + def post(token, body): + return [ + { + "resourceName": "annotations", + "version": "v9", + "cdnUrl": "http://cdn/a", + "sha256": "11", + "encryptionKey": "k", + } + ] + + um, _, vc = self._make_manager( + tmp, + post_get_update=post, + subprocess_run=fake_run, + head_content_length=lambda url, token: 1, + ) + um._download_manager = dm_mock + + def capture_fetch(job_id, url, sha256, size, decryption_key, output_plaintext_path): + with open(output_plaintext_path, "wb") as f: + f.write(b"x") + + dm_mock.fetch_decrypt_verify.side_effect = capture_fetch + vc.collect() + self.assertIsNotNone(vc._cache) + # Act + um._tick_once() + # Assert + self.assertIsNone(vc._cache) + + def test_maybe_start_skips_without_download_state_dir(self): + # Arrange + old = os.environ.pop("LOADER_DOWNLOAD_STATE_DIR", None) + try: + + def get_client(): + return MagicMock() + + # Act + maybe_start_update_background(get_client, "http://x") + finally: + if old is not None: + os.environ["LOADER_DOWNLOAD_STATE_DIR"] = old + + def test_pending_compose_drained_on_startup(self): + # Arrange + tmp = tempfile.mkdtemp() + dm_dir = os.path.join(tmp, "dm") + os.makedirs(dm_dir, exist_ok=True) + state_path = os.path.join(dm_dir, "update_orchestrator.json") + with open(state_path, "w", encoding="utf-8") as f: + json.dump({"pending_compose": ["annotations", "loader"]}, f) + recorded: List[str] = [] + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + if cmd[:2] == ["docker", "compose"]: + recorded.append(cmd[-1]) + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(cmd) + + model_dir = os.path.join(tmp, "m") + os.makedirs(model_dir, exist_ok=True) + dm = ResumableDownloadManager(dm_dir) + vc = VersionCollector(model_dir, subprocess_run=fake_run) + um = UpdateManager( + "http://api.test", + lambda: None, + dm, + vc, + os.path.join(tmp, "compose.yml"), + model_dir, + state_path, + subprocess_run=fake_run, + ) + # Act + um._drain_pending_compose() + # Assert + self.assertEqual(recorded, ["annotations", "loader"]) + with open(state_path, encoding="utf-8") as f: + data = json.load(f) + self.assertEqual(data.get("pending_compose"), []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_version_collector.py b/tests/test_version_collector.py new file mode 100644 index 0000000..53026c6 --- /dev/null +++ b/tests/test_version_collector.py @@ -0,0 +1,65 @@ +import os +import subprocess +import tempfile +import unittest +from version_collector import VersionCollector + + +class TestVersionCollector(unittest.TestCase): + def test_ac1_version_collector_reads_local_state(self): + # Arrange + tmp = tempfile.mkdtemp() + open(os.path.join(tmp, "azaion-2026-03-10.trt"), "wb").close() + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess( + cmd, + 0, + stdout="azaion/annotations:arm64_2026-03-01\n", + stderr="", + ) + raise AssertionError(f"unexpected cmd {cmd}") + + vc = VersionCollector(tmp, subprocess_run=fake_run) + # Act + got = vc.collect_as_dicts() + # Assert + self.assertEqual( + got, + [ + {"resource_name": "detection_model", "version": "2026-03-10"}, + {"resource_name": "annotations", "version": "arm64_2026-03-01"}, + ], + ) + + def test_ac6_cache_invalidates_after_changes(self): + # Arrange + tmp = tempfile.mkdtemp() + open(os.path.join(tmp, "azaion-2026-01-01.trt"), "wb").close() + + def fake_run(cmd, **kwargs): + if cmd[:3] == ["docker", "images", "--format"]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError(f"unexpected cmd {cmd}") + + vc = VersionCollector(tmp, subprocess_run=fake_run) + first = vc.collect_as_dicts() + open(os.path.join(tmp, "azaion-2026-02-01.trt"), "wb").close() + second_cached = vc.collect_as_dicts() + vc.invalidate() + third = vc.collect_as_dicts() + # Assert + self.assertEqual( + first, + [{"resource_name": "detection_model", "version": "2026-01-01"}], + ) + self.assertEqual(second_cached, first) + self.assertEqual( + third, + [{"resource_name": "detection_model", "version": "2026-02-01"}], + ) + + +if __name__ == "__main__": + unittest.main()