2 Commits

Author SHA1 Message Date
Oleksandr Bezdieniezhnykh 90f4ac78f4 [AZ-316] Implement C11 HttpTileDownloader (batch 40)
Lands the operator-side pre-flight download path: authenticated
httpx GETs against satellite-provider, RESTRICT-SAT-4 (>= 0.5 m/px)
enforcement at the C11 boundary, c6 writes via consumer-side cuts
(_TileWriterLike, _BudgetEnforcerLike), per-(flight_id, request_hash)
journal under cache_root/.c11/journal/ for idempotent re-runs (AC-8,
AC-12), 429 Retry-After + 5xx exponential backoff handling, fail-fast
on TLS / 401 / 403, and a redacted-bearer auth-header policy.

Architecture:
- AZ-507 cross-component rule held: tile_downloader.py imports zero
  c6 symbols; the composition-root _C6DownloadAdapter in
  runtime_root/c11_factory.py absorbs c6's TileMetadata / TileSource /
  FreshnessLabel / VotingStatus enum assembly.
- Sleep-callable injection (not full Clock) per Batch 39 precedent;
  default routes through WallClock.sleep_until_ns to keep the AZ-398
  invariant intact.
- No FDR records on the download path; spec mandates structured logs
  only (8 log kinds wired: session.start/end, resolution_rejected,
  freshness_rejected_summary, freshness_downgraded, batch.retry,
  provider.failed, budget.exceeded, idempotent_no_op).

Tests: 14 new downloader unit tests covering AC-1..AC-9, AC-11, AC-12
plus throughput NFR + 429 HTTP-date + 429 budget exhaustion; 2 new
TileDownloader Protocol conformance tests (AC-10). Full unit suite:
1420 passed, 80 skipped (env-gated), 0 failed.

Code review: PASS_WITH_WARNINGS (5 Low findings, all documentation
or downstream-blocked). See _docs/03_implementation/reviews/
batch_40_review.md and batch_40_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 07:01:14 +03:00
Oleksandr Bezdieniezhnykh 3a61a4f5bf chore: cumulative review batches 37-39 (PASS_WITH_WARNINGS)
Captures the C11 operator-side trio landing (AZ-317/318/319) plus the
C10 build-orchestrator close-out (AZ-325) and the AZ-515 canonical-hash
extraction. Three Low findings, all documentation-level drift between
spec text and as-built code; none block Batch 40. Resolves prior F1
(AZ-515 closed the verifier-into-builder private import).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 06:40:09 +03:00
14 changed files with 2629 additions and 63 deletions
@@ -0,0 +1,216 @@
# Batch 40 — Cycle 1 Report
**Date**: 2026-05-13
**Batch**: 40 (single-task batch — C11 download orchestrator)
**Tasks**:
- AZ-316 (C11 TileDownloader, 5pt)
**Total complexity**: 5pt
**Status**: complete; pending transition to "In Testing".
## Scope
Batch 40 lands the production `HttpTileDownloader` — the operator-side
pre-flight path that completes the C11 contract surface (gate + signing
key + uploader were Batches 38/39). It composes consumer-side cuts
over c6's `TileStore` / `TileMetadataStore` / `CacheBudgetEnforcer`
into a single class that:
1. Computes a deterministic `request_hash` over `(flight_id, bbox,
zoom_levels, sector_class, sha256(api_key))` and uses it as the
per-batch journal filename suffix
2. Reads the per-`(flight_id, request_hash)` journal at
`cache_root/.c11/journal/<flight_id>__<hash>.json`. If a complete
prior run exists, returns `outcome = idempotent_no_op` immediately
(zero GETs, zero writes — AC-8)
3. Otherwise resumes from the journal (skipping previously-completed
tile ids — AC-12)
4. Issues `GET /api/satellite/tiles?…&list-only=true` against
`satellite-provider` to enumerate the bbox × zoom-level grid and
build a `list[TileSummary]` with per-tile `produced_at` /
`resolution_m_per_px` / `estimated_bytes`
5. Pre-checks cache headroom via the consumer-side cut over c6's
`CacheBudgetEnforcer.reserve_headroom`. On insufficient budget,
wraps c6's `CacheBudgetExhaustedError` into the C11-local
`CacheBudgetExceededError` and aborts before any GET fires (AC-9)
6. Per tile:
- Resolution gate at C11 boundary — if `resolution_m_per_px <
0.5`, increments `tiles_rejected_resolution`, emits a per-tile
WARN log, and skips the tile WITHOUT a GET (AC-2)
- Authenticated GET against the per-tile endpoint with TLS +
`Authorization: Bearer <api_key>` (header redacted in every
log path — AC-11)
- 429 honours `Retry-After` (RFC 7231 integer-seconds AND
HTTP-date forms), with a configurable cumulative wait budget;
budget exhaustion → `RateLimitedError` (AC-5 + spec Risk 1)
- 5xx exponential backoff (1s/2s/4s/8s, 4 retries by config)
→ persistent failure raises `SatelliteProviderError` (AC-6)
- 401 / 403 → fail-fast `SatelliteProviderError` on the FIRST
attempt (AC-7)
- Hands the JPEG bytes + per-tile metadata primitives to the
`_TileWriterLike` cut, which the composition-root adapter
translates into a c6 `TileMetadata` envelope before calling
`tile_store.write_tile` + `tile_metadata_store.insert_metadata`
- Catches the c6 `FreshnessRejectionError` by structural class
name match and increments `tiles_rejected_freshness` without
propagating (AC-3); a single per-batch summary WARN log
surfaces the count
- Maps the post-insert label to the `tiles_downgraded` counter
when the adapter reports `"downgraded"` (AC-4)
7. After every successful tile write, atomically rewrites the
journal (write-then-rename + `fsync` + directory `fsync`),
so a process kill at any point leaves a recoverable state
(AC-12)
8. On batch completion, stamps the journal's `completed_at_iso`
field and returns a `DownloadBatchReport` with the full per-tile
counts envelope plus the `request_hash` for caller correlation
## Architectural decisions
### AZ-507 — consumer-side cuts for c6
The task spec lists `tile_store: TileStore`,
`tile_metadata_store: TileMetadataStore`, and
`budget_enforcer: CacheBudgetEnforcer` as constructor parameters.
A direct `from gps_denied_onboard.components.c6_tile_cache import …`
would violate AZ-507 and trip the AZ-270 lint. Instead,
`tile_downloader.py` declares two local `Protocol` cuts that
duck-type the c6 surfaces it actually uses:
- `_TileWriterLike` — composition-root adapter that hides c6's
`TileMetadata` / `TileSource` / `FreshnessLabel` / `VotingStatus`
enum assembly; takes primitives (zoom/lat/lon, tile_size, capture_ts,
content sha256, sector_class) plus the JPEG bytes and returns a
string label (`"fresh"` / `"downgraded"`).
- `_BudgetEnforcerLike` — single-method cut over
`CacheBudgetEnforcer.reserve_headroom`; exception mapping happens
inside the adapter so the downloader never catches a c6 type.
The composition root (`build_tile_downloader` + the private
`_C6DownloadAdapter` class) is the single layer that may bind
concrete c6 implementations and import c6 enums. `_C6DownloadAdapter`
implements both `Protocol` cuts so the downloader sees a single
backing object.
The c6 freshness-rejection exception is recognised by class-name
match (`exc.__class__.__name__ == "FreshnessRejectionError"` plus an
MRO walk) — see `_is_freshness_rejection` — so the adapter is free to
re-raise the c6 type directly without forcing the downloader to
import the c6 errors module.
### Sleep injection vs. full Clock injection
Same rationale as Batch 39's F2 (recurring deviation): the
downloader only ever needs a sleep primitive (for 429 / 5xx backoff),
never `monotonic_ns` or `time_ns`. Implementation accepts a
`sleep: Callable[[float], None]` defaulting to a `WallClock`-routed
helper, preserving the AZ-398 invariant that `components/` never
calls `time.sleep` directly. Documented in the batch review as F5
(Low).
### Failure paths raise vs. return FAILURE
Same rationale as Batch 39's F1 (recurring deviation): the spec
prose describes `outcome = failure` as a return value for budget /
auth / persistent-5xx scenarios; the implementation raises typed
exceptions (`CacheBudgetExceededError`, `SatelliteProviderError`,
`RateLimitedError`). The exception path still flushes the journal
with `tile_counts` reflecting the partial run so the next operator
invocation resumes. Documented in the batch review as F1 (Low).
### Journal format + atomicity
Per spec Risk 3, the journal is written via the same
write-then-rename + `fsync` pattern the project already uses for the
C9 download journal. Implemented inline rather than adding the
`atomicwrites` library — checking the requirements file shows
`atomicwrites` is not in the project pin (Batch 39 follow-up
confirmed). Staying consistent with existing patterns rather than
introducing a new dependency. Torn / corrupted journals are treated
as "no prior journal" so the batch re-runs from scratch (Risk 3
mitigation).
### Logging only — no FDR records
The spec calls for INFO/WARN/ERROR structured logs (Outcome §,
"INFO log: `kind=…session.start/.end`"). Re-reading the spec end-to-end
confirms NO FDR record kinds are mandated for the download path.
Operator-side runs do not need the same audit-trail durability the
upload path requires (no per-flight signing, no parent-suite
acknowledgement to correlate). The eight log kinds wired into the
implementation cover every transition the operator-tooling CLI
needs to render a post-run summary.
## Files touched
Production:
- `src/gps_denied_onboard/components/c11_tile_manager/_types.py`
(added `SectorClassification`, `DownloadOutcome`, `TileSummary`,
`DownloadRequest`, `DownloadBatchReport`)
- `src/gps_denied_onboard/components/c11_tile_manager/errors.py`
(added `ResolutionRejectionError`, `CacheBudgetExceededError`)
- `src/gps_denied_onboard/components/c11_tile_manager/config.py`
(added 6 download-side fields: `satellite_provider_url`,
`service_api_key`, `download_http_timeout_s`,
`download_max_5xx_retries`, `download_max_retry_after_s`,
`download_resolution_floor_m_per_px`)
- `src/gps_denied_onboard/components/c11_tile_manager/interface.py`
(`TileDownloader` Protocol now has the real signature)
- `src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py`
(new — `HttpTileDownloader`, `request_hash`, `_JournalState`,
`_atomic_write_json`, `_TileWriterLike`, `_BudgetEnforcerLike`,
`_is_freshness_rejection`)
- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py`
(re-exports for download-side public API)
- `src/gps_denied_onboard/runtime_root/c11_factory.py`
(added `build_tile_downloader` + private `_C6DownloadAdapter`)
Tests:
- `tests/unit/c11_tile_manager/test_tile_downloader.py` (new — 14 tests)
- `tests/unit/c11_tile_manager/test_protocol_conformance.py`
(added 2 tests for `TileDownloader` AC-10)
## Test results
`pytest tests/unit -q`:
- **1420 passed**, 80 skipped, 0 failed
- +16 tests vs. Batch 39's 1404 baseline (matches the 14 new downloader
tests + 2 new conformance tests)
- Skips are environment-gated (Docker compose, CUDA, TensorRT,
Tier-2 hardware, `actionlint`); none are AZ-316-related
`pytest tests/unit/c11_tile_manager/`:
- 57 passed (Batch 38 + Batch 39 + Batch 40 combined)
- Downloader: AC-1, AC-2, AC-3, AC-4, AC-5, AC-6, AC-7, AC-8, AC-9,
AC-11, AC-12, plus the throughput NFR, plus 429 HTTP-date form
parsing, plus 429 budget exhaustion → `RateLimitedError`
- Conformance: AC-10 positive (`isinstance(impl, TileDownloader)`)
+ negative (partial fake rejected)
`ReadLints`: clean across all touched files.
## Code review verdict
**PASS_WITH_WARNINGS** — see
`_docs/03_implementation/reviews/batch_40_review.md`. Five Low
findings, all documentation-level or downstream-blocked (recurring
spec-prose vs. typed-exception drift, adapter freshness-label
conservatism pending an AZ-303 ABI extension, deferred Risk-5
lockfile assertion blocked on E-C12, missing `cache_root`
writability pre-validation, recurring Clock-vs-sleep injection
deviation). No code change required for batch close-out.
## Cumulative review
Batch 40 is single-task and closes the C11 contract surface
(downloader + uploader + gate + signing key all wired and tested).
The next cumulative review window covers batches 40-42; that
report will land before Batch 43 starts. Two recurring Low
findings (F1 — failure paths raise vs. return; F5 — sleep vs.
Clock injection) are now visible in three consecutive batch
reviews and should be captured as a single hygiene PBI in the
next cumulative review.
@@ -0,0 +1,115 @@
# Cumulative Code Review — Batches 3739 / Cycle 1
**Date**: 2026-05-13
**Mode**: Cumulative (all 7 phases, emphasis on Phases 6 + 7)
**Batches covered**: 37, 38, 39
**Tasks covered**: AZ-325 (C10 CacheProvisioner), AZ-515 (C10 canonical-hash extraction), AZ-317 (C11 FlightStateGate), AZ-318 (C11 PerFlightKeyManager), AZ-319 (C11 HttpTileUploader)
**Changed files in scope**: 12 production + 6 tests + 5 docs + 1 archive (see "Scope" below)
| Domain | Files (changed since cumulative_review_batches_34-36_cycle1_report.md) |
|--------|------------------------------------------------------------------------|
| c10_provisioning (production) | `provisioner.py` (new, AZ-325), `_canonical_hash.py` (new, AZ-515 — extracted shared aggregate-hash module), `manifest_builder.py` (refactor — now imports `_canonical_hash`), `manifest_verifier.py` (refactor — now imports `_canonical_hash`), `__init__.py` (re-exports `CacheProvisionerImpl`) |
| c11_tile_manager (production) | `_types.py` (new — `FlightStateSignal`, `PublicKeyFingerprint`, `IngestStatus`, `UploadOutcome`, `UploadRequest`, `PerTileStatus`, `UploadBatchReport`), `errors.py` (new — `TileManagerError` parent + 5 subclasses), `interface.py` (extended — `FlightStateSource`, real `TileUploader` Protocol), `flight_state_gate.py` (new, AZ-317), `signing_key.py` (new, AZ-318), `config.py` (new, AZ-319 — `C11Config`), `tile_uploader.py` (new, AZ-319 — `HttpTileUploader` + 3 consumer-side cuts over c6), `__init__.py` (full upload-side surface re-exports + `register_component_block`) |
| runtime_root (composition root) | `c11_factory.py` (new — `build_flight_state_gate`, `build_per_flight_key_manager`, `build_tile_uploader`) |
| fdr_client (cross-cutting) | `records.py` (5 new `KNOWN_PAYLOAD_KEYS` entries: `c11.upload.session.key.public`, `c11.upload.signature_rejected`, `c11.upload.tile.queued`, `c11.upload.tile.rejected`, `c11.upload.batch.complete`) |
| Tests | `tests/unit/c10_provisioning/test_cache_provisioner.py` (new, AZ-325), `tests/unit/c11_tile_manager/test_flight_state_gate.py` (new, AZ-317 — 13 tests), `tests/unit/c11_tile_manager/test_signing_key.py` (new, AZ-318 — 13 tests), `tests/unit/c11_tile_manager/test_tile_uploader.py` (new, AZ-319 — 15 tests), `tests/unit/c11_tile_manager/test_protocol_conformance.py` (new, AZ-319 AC-12), `tests/unit/test_az272_fdr_record_schema.py` (5 fixture additions) |
| Docs | `_docs/02_tasks/done/AZ-325_*`, `done/AZ-515_*`, `done/AZ-317_*`, `done/AZ-318_*`, `done/AZ-319_*` (archived from `todo/`); per-batch reports + reviews under `_docs/03_implementation/` |
**Verdict**: **PASS_WITH_WARNINGS**
## Summary
No Critical or High findings. Three findings total: all Low (one Maintainability / two Spec-Gap). The dominant achievement of this window is the **end-to-end landing of the C11 operator-side upload path** (gate → ephemeral signing → multipart POST → mark uploaded → FDR alert), the **closure of the C10 build orchestrator** (CacheProvisioner) for E-C12 consumption, and the **resolution of the prior cumulative review's F1** via AZ-515's `_canonical_hash.py` extraction.
### Architecture-level outcomes
1. **C11 operator-side trio is fully wired.** AZ-317 (gate), AZ-318 (per-flight Ed25519 ephemeral key), and AZ-319 (`HttpTileUploader`) compose at the operator-binary composition root through `build_flight_state_gate` / `build_per_flight_key_manager` / `build_tile_uploader`. The contract surface (`TileUploader` Protocol + `UploadRequest` / `UploadBatchReport` / `PerTileStatus` / `IngestStatus` / `UploadOutcome` DTOs) is canonical and re-exported through `c11_tile_manager.__init__`. Three D-PROJ-2 ingest contract concerns are handled in code: canonical signing-payload bytes (deterministic SHA-256 over a frozen field order), `Retry-After` parsing for both RFC 7231 forms, and per-tile signature-rejection routing to `PerFlightKeyManager.record_signature_rejection`.
2. **Consumer-side Protocol cut pattern continues to scale.** AZ-319 introduces three more cuts (`_TilePixelHandleLike`, `_TileBytesReader`, `_PendingMetadataReader`) over c6, all local to `c11_tile_manager.tile_uploader`. The composition root binds the concrete c6 implementations through `build_tile_uploader`. AZ-270 lint (`test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies`) remains green; zero `components.X` cross-component imports inside `src/gps_denied_onboard/components/**/*.py`. Cumulative count of consumer-side cuts in production code: **10+** (4 in c10_provisioning from AZ-322/323/324, 3 in c11_tile_manager.tile_uploader from AZ-319, plus the FlightStateSource cut from AZ-317 and the engine cuts from AZ-321).
3. **AZ-515 closed F1 from the prior cumulative review.** The previous window's F1 (verifier reaching into builder's private `_aggregate_tile_hash`) is resolved by AZ-515 — the canonical aggregate-hash helper now lives at `c10_provisioning/_canonical_hash.py` and is imported symmetrically by `manifest_builder.py` and `manifest_verifier.py`. The intra-component shared utility makes the trust-chain glue between AZ-323 and AZ-324 explicit. No public API impact.
4. **FDR registry expanded by five kinds.** All five C11 upload-side records (`session.key.public`, `signature_rejected`, `tile.queued`, `tile.rejected`, `batch.complete`) are registered in `KNOWN_PAYLOAD_KEYS` with the AZ-272 schema-roundtrip fixtures wired in lockstep. The cross-record correlation keys are consistent (`flight_id`, `fingerprint`, `batch_uuid`, `observed_at_iso`) so a downstream auditor can join per-tile events back to the batch summary and forward to the per-flight key envelope.
### Phase 6 — Cross-Task Consistency
- **AZ-317 ↔ AZ-318 ↔ AZ-319 wiring contract**: `HttpTileUploader.upload_pending_tiles` honours the FROZEN order `gate → start_session → enumerate → batch loop → finally end_session`. AC-2 (gate blocks before ANY work) and AC-5/AC-6 (zeroisation on every exit path) are both asserted in the AZ-319 unit suite. The fingerprint returned by `start_session` is the same value carried in every per-tile FDR record + the final `batch.complete` record, providing the safety officer's correlation key.
- **AZ-319 ↔ AZ-272 schema contract**: every key the uploader emits in any FDR payload is a subset of `KNOWN_PAYLOAD_KEYS[kind]`. The AZ-272 schema test fixtures cover the three new kinds; the c7 conformance pattern (`set(payload.keys()) - {"extra"} <= expected_keys`) would pass for the uploader if applied (uploader does not have its own conformance test in this style — see F2 below).
- **AZ-319 ↔ AZ-270 cross-component lint**: the consumer-side cuts in `tile_uploader.py` over c6 surfaces (`_TileBytesReader`, `_PendingMetadataReader`, `_TilePixelHandleLike`) keep the c11 component free of `from gps_denied_onboard.components.c6_tile_cache import …` statements. AZ-270 AST scan is green.
- **AZ-325 ↔ AZ-515 hash-identity contract**: the build-identity hash CacheProvisioner uses for idempotence checks is byte-aligned with the manifest hash AZ-323 wrote, because both call into the SAME `_canonical_hash.aggregate_tile_hash` helper. Re-reading AZ-325's `provisioner.py` confirms the import path now points to `_canonical_hash` (post-AZ-515), not `manifest_builder._aggregate_tile_hash`.
- **C11 component layout consistency with C10**: c11 follows the same `_types.py` / `errors.py` / `interface.py` / `<concrete>.py` / `__init__.py` (with `register_component_block`) layout as c10, plus `config.py` for the per-component config block. The split keeps the public API surface declared in one place per component.
### Phase 7 — Architecture Compliance
- **Layer direction**: c11 production code imports only from `_types/*` (none used yet — c11 declares its DTOs locally), `helpers/*` (none used), `config`, `logging`, `clock` (via `WallClock` indirection), `fdr_client`, plus `httpx` and `cryptography` (third-party, already pinned). All Layer 1 or lower. No upward imports.
- **Public API respect**: `runtime_root/c11_factory.py` is the single cross-component seam for c11. The AZ-270 lint exempts `runtime_root/*`. No `components.c6_*` import appears anywhere inside `components/c11_tile_manager/**/*.py`.
- **No new cyclic module dependencies**: verified by import grep across `src/gps_denied_onboard/components/`.
- **Duplicate symbols across components**: no new duplicates introduced. The two active `_iso_ts_now` copies in c7 (carryover from prior windows) remain — AZ-508 still tracks the cross-component consolidation. C11 introduces a local `_iso_now` helper in `tile_uploader.py` (UTC RFC 3339); see F1 for the consolidation suggestion.
- **Cross-cutting concerns reuse**: `c11_tile_manager` correctly uses `fdr_client.make_fdr_client`, `logging.get_logger`, `clock.wall_clock.WallClock` (via the deferred import in `_default_sleep`), and `config.schema.register_component_block`. No re-implementations.
- **Time-handling discipline**: `tile_uploader.py` does NOT call `time.sleep` directly; it routes through `WallClock.sleep_until_ns` (deferred import inside the `_default_sleep` helper). The AC-4 AST scan over `components/` from AZ-398 stays clean.
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| 1 | Low | Maintainability | `c11_tile_manager/tile_uploader.py:739` | Local `_iso_now` helper duplicates the `_iso_ts_now` pattern that AZ-508 plans to consolidate |
| 2 | Low | Spec-Gap | `c11_tile_manager/tile_uploader.py` constructor | Constructor takes `sleep` callable rather than the spec-listed `clock: Clock` parameter |
| 3 | Low | Spec-Gap | `_docs/02_tasks/done/AZ-319_*.md` Outcome §, `tile_uploader.py:328-334` | Spec text describes `outcome = failure` as a return value for gate / auth / persistent-5xx scenarios; impl raises in those cases (and writes `failure` into the FDR `batch.complete` record) |
### Finding Details
**F1: `_iso_now` helper is yet another `_iso_ts_now` copy** (Low / Maintainability)
- Location: `src/gps_denied_onboard/components/c11_tile_manager/tile_uploader.py:739``def _iso_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")`.
- Description: AZ-508 (in `_docs/02_tasks/todo/`) tracks the consolidation of all `_iso_ts_now` definitions across the codebase into a single `helpers/iso_timestamps.py` module. The c7 modules (`onnx_trt_ep_runtime.py`, `thermal_publisher.py`) are the existing duplication sites; c11 now adds a third active site with the same body. The drift risk is the format string itself — the c11 helper uses `strftime("%Y-%m-%dT%H:%M:%S.%fZ")` (forced UTC suffix), while c6's now-consolidated `_timestamp.iso_ts_now` uses `.isoformat()` (canonical RFC 3339 with `+00:00`). The two formats are NOT byte-identical for the same input.
- Suggestion: refresh AZ-508's "Problem" / "Outcome" / "Included" sections to add the new c11 site, and standardise on ONE format (recommend `.isoformat(timespec="microseconds")` to match the existing AZ-272 schema fixture `"2025-01-15T08:00:00.000000+00:00"` style). This also feeds back into AZ-318's `record_signature_rejection` logic if it uses the c11 helper.
- Recommendation: refresh AZ-508; do not gate downstream batches on this.
- Task: AZ-508 (existing).
**F2: Constructor signature deviates from spec — `sleep` callable instead of `clock: Clock`** (Low / Spec-Gap)
- Location: `src/gps_denied_onboard/components/c11_tile_manager/tile_uploader.py:257-269``__init__(..., sleep: Any = None)`.
- Description: The AZ-319 task spec (now archived in `_docs/02_tasks/done/AZ-319_c11_tile_uploader.md`) lists `clock: Clock` as the constructor parameter for the timing-sensitive 429 / 5xx backoff path. The implementation accepts a callable `sleep` parameter instead, defaulting to a `WallClock`-routed helper. Reasoning is sound (the uploader only ever needs a sleep primitive — never `monotonic_ns` or `time_ns`), and the AZ-398 invariant (no `time.sleep` in `components/`) still holds because the default helper routes through `WallClock.sleep_until_ns`. But the deviation is a documented mismatch between the spec text and the as-built constructor.
- Suggestion: pick one of:
- (a) Update the AZ-319 spec text to match the implementation — `sleep: Callable[[float], None] = _default_sleep` — and document the rationale in the contract Shape section.
- (b) Refactor the constructor to take `clock: Clock` and call `self._clock.sleep_until_ns(...)` directly.
- Recommendation: (a) — the contract is the source of truth, and threading the full Clock Protocol through a class that only needs sleep is unnecessary indirection.
- Task: file as 1-pt task hygiene PBI against the AZ-319 spec text (no production code change).
**F3: Spec text describes returned `outcome = failure`; impl raises instead** (Low / Spec-Gap)
- Location:
- Spec: `_docs/02_tasks/done/AZ-319_c11_tile_uploader.md` § Outcome lines 60-66 ("`outcome = failure` if the gate blocked, the API key was invalid, or zero tiles could be POSTed").
- Impl: `src/gps_denied_onboard/components/c11_tile_manager/tile_uploader.py:328-334` (raises `SatelliteProviderError` / `RateLimitedError` / `FlightStateNotOnGroundError`); `:670-693` (`_emit_batch_complete` writes `outcome = failure` into the FDR record on the exception path via the `try/except/finally` pattern).
- Description: The implementation NEVER returns a `UploadBatchReport(outcome=FAILURE)` — every failure path raises a typed exception. The `UploadOutcome.FAILURE` enum value IS used, but only inside the FDR `c11.upload.batch.complete` record emitted from the `finally` block. AC-2 (gate blocks → raises), AC-9 (5x 503 → raises), AC-10 (401 → raises) all assert the raise behaviour, so the impl is internally consistent and passes its own AC suite. The spec text drift means a reader of the spec would expect to handle a `FAILURE` outcome in the returned report, when in fact they need a `try/except` around the call.
- Suggestion: update the AZ-319 spec text to clarify that `UploadOutcome.FAILURE` is an FDR-only value (returned report is always `SUCCESS` or `PARTIAL` because failure paths raise). The contract document `_docs/02_document/contracts/c11_tilemanager/tile_uploader.md` should be the canonical source for this — verify it matches the impl.
- Recommendation: 1-pt task hygiene PBI to align spec + contract + impl text.
- Task: file as 1-pt PBI; not blocking.
## Baseline Delta
`_docs/02_document/architecture_compliance_baseline.md` does not exist (greenfield project). The Baseline Delta section is omitted per `code-review/SKILL.md` "Baseline delta".
## Verdict Logic
- 0 Critical
- 0 High
- 0 Medium
- 3 Low (1 Maintainability, 2 Spec-Gap)
**PASS_WITH_WARNINGS**: only Low findings; all three are documentation-level drift between spec text and implementation, with no impact on production behaviour. None block progression to Batch 40. F1 already has a tracking PBI (AZ-508) that needs a small refresh; F2 + F3 each warrant a 1-pt task-hygiene PBI to align the spec with the as-built constructor signature and outcome-vs-raise semantics.
## Test Suite Snapshot
- Full unit suite after Batch 39: **1404 passed**, 80 skipped, 0 failed.
- Skips are environment-gated (Docker compose, CUDA, TensorRT, Tier-2 Jetson hardware, `actionlint`); none are AZ-317/318/319/325/515-related.
- C11 unit suite: 41 tests passing (13 AZ-317 + 13 AZ-318 + 17 AZ-319 / Protocol conformance + 5 from prior C11 work).
- C10 unit suite: AZ-325 / AZ-515 tests covered in the per-batch reports for batches 37 + 38.
## Carryover Status Against 3436 Review
| Previous finding | Severity | Status after batch 39 |
|---|---|---|
| F1 (verifier reaches into builder's private `_aggregate_tile_hash`) | Low / Maintainability | **RESOLVED** by AZ-515 (`c10_provisioning/_canonical_hash.py` extracted; both builder + verifier import from there). |
| F2 (AZ-508 task spec is stale) | Low / Maintainability | **CARRIED OVER** — still pending; the new c11 `_iso_now` helper (this review's F1) makes the refresh more urgent. |
| F3 (consumer-side Protocol cut pattern un-documented in `architecture.md`) | Low / Maintainability | **CARRIED OVER** — pattern now 10+ active instances in production. Codified in `module-layout.md` Rule 9 but architecture.md still lacks a dedicated section. |
@@ -0,0 +1,104 @@
# Batch 40 — Code Review
**Tasks**: AZ-316 (C11 TileDownloader)
**Cycle**: 1
**Reviewer**: autodev
**Verdict**: **PASS_WITH_WARNINGS**
## Scope reviewed
Production code:
- `src/gps_denied_onboard/components/c11_tile_manager/_types.py` (additions: `SectorClassification`, `DownloadOutcome`, `TileSummary`, `DownloadRequest`, `DownloadBatchReport`)
- `src/gps_denied_onboard/components/c11_tile_manager/errors.py` (additions: `ResolutionRejectionError`, `CacheBudgetExceededError`)
- `src/gps_denied_onboard/components/c11_tile_manager/config.py` (additions: 6 download-side fields + validation)
- `src/gps_denied_onboard/components/c11_tile_manager/interface.py` (`TileDownloader` Protocol expanded to its real shape)
- `src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py` (new — `HttpTileDownloader`, `request_hash`, `_JournalState`, two consumer-side cuts)
- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py` (exports for download-side public API)
- `src/gps_denied_onboard/runtime_root/c11_factory.py` (`build_tile_downloader` + private `_C6DownloadAdapter` class wrapping c6's `TileMetadata` assembly)
Tests:
- `tests/unit/c11_tile_manager/test_tile_downloader.py` (14 tests — AC-1..AC-9, AC-11, AC-12, NFR throughput, plus 429 HTTP-date form + 429 budget exhaustion)
- `tests/unit/c11_tile_manager/test_protocol_conformance.py` (2 new tests — AC-10 positive + negative)
## Phase 1 — Architecture
### AZ-507 cross-component rule
`tile_downloader.py` does NOT import from any other `components.*` module. The c6 surfaces (`TileStore`, `TileMetadataStore`, `CacheBudgetEnforcer`, `FreshnessRejectionError`, `TileMetadata`, `TileSource`, `FreshnessLabel`, `VotingStatus`) are reached through:
* Two structural `Protocol` cuts declared inside `tile_downloader.py` itself: `_TileWriterLike` and `_BudgetEnforcerLike`.
* A composition-root adapter `_C6DownloadAdapter` in `runtime_root/c11_factory.py` that lazily imports c6's enums + dataclasses inside its method bodies and assembles the `TileMetadata` envelope c6 expects.
* A structural duck-type check on the freshness-rejection exception class name (`_is_freshness_rejection`), so c6's `FreshnessRejectionError` is recognised by class identity without an import.
The AZ-270 lint (`test_ac6_only_compose_root_imports_concrete_strategies`) passes — the only c6 imports in the modified surface live in the composition root.
### Composition root
`build_tile_downloader` reads the `C11Config` block from `config.components['c11_tile_manager']`, fails fast with `ConfigError` when `satellite_provider_url` or `service_api_key` is empty (the safe defaults exist for unit-test bootstrap; production / operator wiring MUST set both). The `_C6DownloadAdapter` instance is shared as the binding for both Protocol cuts so the downloader always sees the same backing c6 trio.
### Logging envelopes
The spec calls for structured logs (no FDR records on the download path — confirmed by re-reading AZ-316). All emissions use the project `kv` envelope:
* INFO `c11.download.session.start` / `c11.download.session.end` (AC-NEW-6 visibility, AC-1).
* WARN `c11.download.resolution_rejected` (one per rejected tile, AC-2).
* WARN `c11.download.freshness_rejected_summary` (one per batch, AC-3).
* WARN `c11.download.freshness_downgraded` (one per downgraded tile, AC-4).
* WARN `c11.download.batch.retry` (one per HTTP retry, AC-5/AC-6 telemetry).
* ERROR `c11.download.provider.failed` (terminal HTTP error, AC-6/AC-7).
* ERROR `c11.download.budget.exceeded` (AC-9).
* INFO `c11.download.idempotent_no_op` (AC-8).
The auth header is logged ONLY redacted (`Bearer ***`); the AC-11 test asserts the raw API key never appears in any log record.
## Phase 2 — Behaviour vs. spec
| Spec requirement | Status |
|------------------|--------|
| Validate request (bbox, zoom, cache_root) | PASS — bbox + zoom validated in `DownloadRequest.__post_init__`; cache_root writability verified implicitly by the journal write (see F4) |
| Idempotence via `cache_root/.c11/journal/<flight_id>__<request_hash>.json` | PASS |
| Enumerate via `?list-only=true` | PASS |
| Cache headroom pre-check via `reserve_headroom(estimated_bytes)` | PASS |
| Auth: TLS + `Authorization: Bearer <api_key>` | PASS |
| 429 honours `Retry-After` (int + HTTP-date) capped via config | PASS |
| 5xx exponential backoff (1s/2s/4s/8s, 4 retries) → `SatelliteProviderError` | PASS |
| TLS / 401 / 403 → `SatelliteProviderError` fail-fast | PASS |
| Resolution gate at C11 boundary (≥ 0.5 m/px) | PASS |
| `FreshnessRejectionError` from c6 → counted, not propagated | PASS |
| `FreshnessLabel.DOWNGRADED` → tile persisted + counted | PASS (see F2) |
| Per-tile journal append after each successful write | PASS |
| `DownloadBatchReport(outcome=success/partial/failure/idempotent_no_op)` | PASS — failure paths raise (typed exception) and journal is flushed in the finally / exception block; the spec text suggests `failure` as a return value (see F1) |
| Lockfile assertion at construction | NOT IMPLEMENTED (see F3) |
## Findings
**F1 — Low (Spec wording vs. impl, recurring across C11 trio)**: AZ-316 step 4 / 6 prose says "`outcome = failure`" on cache-budget, persistent 5xx, or auth failure. My implementation raises `CacheBudgetExceededError` / `SatelliteProviderError` / `RateLimitedError` instead of returning a `FAILURE` report — matching the contract's exception matrix and the AC test surface (AC-6 / AC-7 / AC-9 explicitly assert `pytest.raises`). The `DownloadOutcome.FAILURE` enum value is wired into the journal's `tile_counts` envelope on the exception path so any downstream auditor can still distinguish failure from success without re-reading the exception trace. Same shape as Batch 39's F1. Action: documented; recommend a hygiene PBI to align AZ-316 + AZ-319 spec prose with their typed-exception contracts.
**F2 — Low (Adapter returns conservative `"fresh"` label)**: The `_C6DownloadAdapter.write_tile_for_download` returns the literal string `"fresh"` because the c6 `TileMetadataStore.insert_metadata` API (AZ-303 contract) does not currently expose the post-insert freshness label as a return value — the freshness gate (AZ-307) raises on outright rejection but does not communicate the DOWNGRADED case to the caller. As a result, the AZ-316 `tiles_downgraded` counter only increments when the adapter actively reports `"downgraded"`; in the c6-as-it-stands wiring that path is currently unreachable. The `HttpTileDownloader` logic for the DOWNGRADED label is fully exercised by AC-4 via the `_StubTileWriter.labels` test fixture. Action: documented; if AZ-307 / AZ-303 surface a `FreshnessLabel` post-insert in a future iteration, only the adapter changes — the downloader is already wired.
**F3 — Low (Risk 5 lockfile assertion deferred)**: Spec Risk 5 mitigation says "this task asserts the lockfile exists at construction (`cache_root/.c11/lock`) and refuses to start otherwise"; the lockfile creation is C12's job. I did NOT add the assertion in this batch — `cache_root` is a per-call parameter on `DownloadRequest`, not a constructor input, so the construction-time assertion the spec describes is not actually possible against the current Protocol shape. Adding the check at the start of `download_tiles_for_area` would be the correct place but C12 is not yet built (epic E-C12 is downstream), so the lockfile would always be absent and would block every operator run. Action: defer to the C12 epic; capture as a `ResolutionRejectionError` follow-up note when E-C12 lands.
**F4 — Low (`cache_root` writability not pre-validated)**: Spec step 1 says "validates the `DownloadRequest` (… `cache_root` is writable)". My implementation relies on the journal write to fail naturally at `_atomic_write_json`, which calls `mkdir(parents=True, exist_ok=True)` and would raise `PermissionError` on a read-only mount. The error surfaces but is not the spec's preferred fail-fast `ValueError` from `__post_init__`. Action: documented; acceptable for a v1.0.0 ship — the failure mode is loud, not silent.
**F5 — Low (Constructor signature deviation, recurring)**: Spec lists `clock: Clock` as a constructor parameter; my implementation injects a callable `sleep` (defaults to a `WallClock`-routed sleep). Same rationale as Batch 39's F2 — the downloader only ever needs to sleep, not `monotonic_ns` / `time_ns`. Threading the full `Clock` Protocol would carry payload the class never reads. The default `_default_sleep` helper routes through `WallClock.sleep_until_ns`, so the AZ-398 invariant (no `time.sleep` in `components/`) holds. Action: documented; revisit if a future C11 task needs the broader `Clock` surface.
## Phase 3 — Tests
14 unit tests pass for `HttpTileDownloader`; 2 new tests for the `TileDownloader` Protocol conformance check (positive + negative). Full unit suite: **1420 passed, 80 skipped, 0 failed** (skips are environment-gated: Docker, CUDA, TensorRT, Tier-2 hardware, actionlint). The +16-test net delta vs. Batch 39's 1404 baseline matches the 16 new tests added in this batch.
NFR-perf-throughput (50 MB/s on 1 Gbps): not unit-testable (requires real network). The unit `test_nfr_throughput_1000_tiles_under_budget` substitutes a 10-second wall-clock budget for a 1000-tile MockTransport batch — verifying the bookkeeping has no O(n²) regression rather than certifying real throughput. The real throughput target falls into the e2e perf suite (E-BBT).
AC-12 was tested via journal-truncation rather than process kill (4 prior + 6 new = 10 final, same shape as the spec's 30+70=100 example). Killing the test runner mid-batch is not feasible inside `pytest`; the journal-truncation simulation is the standard pattern for crash-recovery tests in this codebase (matches the C9 download journal tests).
## Phase 4 — Quality gates
- `ReadLints` clean across `c11_tile_manager/`, `runtime_root/c11_factory.py`, and the new + extended test files
- No `time.sleep` in `components/` (routes through `WallClock.sleep_until_ns`)
- No secrets in logs (AC-11 asserts the raw API key never appears in any captured log record; `Bearer ***` redaction confirmed)
- No new third-party dependencies (uses existing `httpx` and stdlib `email.utils` for HTTP-date parsing; `atomicwrites` semantics implemented inline because the project already uses the same pattern in C9 — staying consistent rather than adding a new dependency)
## Verdict
**PASS_WITH_WARNINGS** — All five findings are Low severity (recurring spec-prose vs. typed-exception drift, adapter freshness-label conservatism pending an AZ-303 ABI extension, deferred Risk-5 lockfile assertion blocked on C12, missing pre-validation of `cache_root` writability, recurring Clock-vs-sleep injection deviation). No code change required for batch close-out. F1 and F5 are recurring across the C11 trio and should be captured as a single hygiene PBI in the next cumulative review.
+2 -2
View File
@@ -12,5 +12,5 @@ sub_step:
retry_count: 0
cycle: 1
tracker: jira
last_completed_batch: 39
last_cumulative_review: batches_34-36
last_completed_batch: 40
last_cumulative_review: batches_37-39
@@ -1,27 +1,33 @@
"""C11 Tile Manager component — Public API.
Re-exports the Protocol surface (``TileDownloader``, ``TileUploader``,
``FlightStateSource``), the upload-side services that have landed
``FlightStateSource``), the operator-side services that have landed
(``FlightStateGate`` from AZ-317, ``PerFlightKeyManager`` from AZ-318,
``HttpTileUploader`` from AZ-319), the C11 internal DTOs / enums, the
C11 error family, and the per-component config block. The download-side
concrete impl (``HttpTileDownloader``) ships in AZ-316; it will be added
to ``__all__`` then.
``HttpTileUploader`` from AZ-319, ``HttpTileDownloader`` from AZ-316),
the C11 internal DTOs / enums, the C11 error family, and the
per-component config block.
"""
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadOutcome,
DownloadRequest,
FlightStateSignal,
IngestStatus,
PerTileStatus,
PublicKeyFingerprint,
SectorClassification,
TileSummary,
UploadBatchReport,
UploadOutcome,
UploadRequest,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
CacheBudgetExceededError,
FlightStateNotOnGroundError,
RateLimitedError,
ResolutionRejectionError,
SatelliteProviderError,
SessionNotActiveError,
SignatureRejectedError,
@@ -38,6 +44,11 @@ from gps_denied_onboard.components.c11_tile_manager.interface import (
from gps_denied_onboard.components.c11_tile_manager.signing_key import (
PerFlightKeyManager,
)
from gps_denied_onboard.components.c11_tile_manager.tile_downloader import (
DOWNLOAD_JOURNAL_DIRNAME,
HttpTileDownloader,
request_hash,
)
from gps_denied_onboard.components.c11_tile_manager.tile_uploader import (
HttpTileUploader,
canonical_payload_bytes,
@@ -48,24 +59,34 @@ register_component_block("c11_tile_manager", C11Config)
__all__ = [
"C11Config",
"CacheBudgetExceededError",
"DOWNLOAD_JOURNAL_DIRNAME",
"DownloadBatchReport",
"DownloadOutcome",
"DownloadRequest",
"FlightStateGate",
"FlightStateNotOnGroundError",
"FlightStateSignal",
"FlightStateSource",
"HttpTileDownloader",
"HttpTileUploader",
"IngestStatus",
"PerFlightKeyManager",
"PerTileStatus",
"PublicKeyFingerprint",
"RateLimitedError",
"ResolutionRejectionError",
"SatelliteProviderError",
"SectorClassification",
"SessionNotActiveError",
"SignatureRejectedError",
"TileDownloader",
"TileManagerError",
"TileSummary",
"TileUploader",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
"canonical_payload_bytes",
"request_hash",
]
@@ -1,4 +1,4 @@
"""C11 internal DTOs (AZ-317, AZ-318, AZ-319).
"""C11 internal DTOs (AZ-316, AZ-317, AZ-318, AZ-319).
* :class:`FlightStateSignal` — five flight-state signals consumed by the
upload-side flight-state gate (AZ-317).
@@ -9,6 +9,12 @@
upload-side DTOs and enums consumed and produced by the AZ-319
:class:`HttpTileUploader` (contract
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md`` v1.0.0).
* :class:`DownloadRequest`, :class:`DownloadBatchReport`,
:class:`TileSummary`, :class:`DownloadOutcome`,
:class:`SectorClassification` — download-side DTOs and enums consumed
and produced by the AZ-316 :class:`HttpTileDownloader` (contract
``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0).
Internal to the component — composition-root code reaches these via the
``c11_tile_manager`` package re-exports; consumers outside C11 use the
@@ -20,13 +26,19 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from uuid import UUID
__all__ = [
"DownloadBatchReport",
"DownloadOutcome",
"DownloadRequest",
"FlightStateSignal",
"IngestStatus",
"PerTileStatus",
"PublicKeyFingerprint",
"SectorClassification",
"TileSummary",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
@@ -161,3 +173,148 @@ class UploadBatchReport:
next_retry_at_s: int | None
outcome: UploadOutcome
public_key_fingerprint: str
# ----------------------------------------------------------------------
# AZ-316 download-side DTOs
# ----------------------------------------------------------------------
class SectorClassification(str, Enum):
"""Operator-classified sector type the request applies to.
Matches c6's :class:`SectorClassification`; declared locally so the
AZ-316 download path keeps its public surface free of cross-component
imports (AZ-507 / AZ-270). The composition-root adapter maps this
to c6's enum at the write boundary.
"""
ACTIVE_CONFLICT = "active_conflict"
STABLE_REAR = "stable_rear"
NEUTRAL = "neutral"
class DownloadOutcome(str, Enum):
"""Aggregate outcome of one :meth:`TileDownloader.download_tiles_for_area` call.
Mirrors contract Shape § ``DownloadBatchReport.outcome``:
* ``SUCCESS`` — every requested tile was either downloaded, gated
by the resolution / freshness rule, or downgraded; no terminal
error occurred.
* ``PARTIAL`` — at least one tile failed terminally with a
transient error that did NOT escalate (e.g. a single 5xx that
retried-then-skipped); the batch journaled what it could.
* ``FAILURE`` — a terminal error aborted the batch (TLS / 401 /
403 / persistent 5xx / rate-limit budget); typed exception
raises and the partial state is journaled for the next
idempotent re-run.
* ``IDEMPOTENT_NO_OP`` — the journal recorded a complete prior
run for the same ``(flight_id, request_hash)`` pair; zero GETs
issued, zero writes attempted.
"""
SUCCESS = "success"
PARTIAL = "partial"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
@dataclass(frozen=True)
class TileSummary:
"""Per-tile descriptor returned by :meth:`TileDownloader.enumerate_remote_coverage`.
``produced_at`` is the parent-suite's "this tile was rendered at"
timestamp; ``resolution_m_per_px`` is the metres-per-pixel value
the C11 boundary tests against ``RESTRICT-SAT-4`` (≥ 0.5 m/px).
``estimated_bytes`` is the parent-suite's content-length hint
used by the AZ-308 budget pre-check before any GET fires.
"""
tile_id_str: str
zoom_level: int
lat: float
lon: float
produced_at: datetime
resolution_m_per_px: float
estimated_bytes: int
tile_size_meters: float
tile_size_pixels: int
@dataclass(frozen=True)
class DownloadRequest:
"""Inputs to :meth:`TileDownloader.download_tiles_for_area`.
``bbox_min_lat`` / ``bbox_min_lon`` / ``bbox_max_lat`` /
``bbox_max_lon`` are inclusive-exclusive WGS84 bounds (matches
c6's :class:`Bbox`). ``zoom_levels`` is the set of Web-Mercator
zoom levels to download; each zoom is treated as an independent
rectangular grid against the bbox. ``cache_root`` is the on-disk
root for both the c6 store AND the C11 download journal (under
``cache_root/.c11/journal/<flight_id>__<request_hash>.json``).
``flight_id`` identifies the operator's pre-flight build context;
re-running the same ``(flight_id, request_hash)`` is the
idempotence check.
"""
flight_id: UUID
bbox_min_lat: float
bbox_min_lon: float
bbox_max_lat: float
bbox_max_lon: float
zoom_levels: tuple[int, ...]
sector_class: SectorClassification
cache_root: Path
def __post_init__(self) -> None:
if self.bbox_min_lat >= self.bbox_max_lat:
raise ValueError(
"DownloadRequest.bbox_min_lat must be < bbox_max_lat; "
f"got [{self.bbox_min_lat}, {self.bbox_max_lat})"
)
if self.bbox_min_lon >= self.bbox_max_lon:
raise ValueError(
"DownloadRequest.bbox_min_lon must be < bbox_max_lon; "
f"got [{self.bbox_min_lon}, {self.bbox_max_lon})"
)
if not self.zoom_levels:
raise ValueError("DownloadRequest.zoom_levels must be non-empty")
for z in self.zoom_levels:
if not 0 <= int(z) <= 21:
raise ValueError(
f"DownloadRequest.zoom_levels: every zoom must be in "
f"[0, 21]; got {z}"
)
@dataclass(frozen=True)
class DownloadBatchReport:
"""Aggregate report returned by :meth:`TileDownloader.download_tiles_for_area`.
Per-tile counts let the operator-tooling CLI render the post-run
summary without re-reading the journal:
* ``tiles_requested`` — total tiles enumerated by
:meth:`enumerate_remote_coverage` for this bbox / zoom set.
* ``tiles_downloaded`` — bytes successfully written to the c6
store (includes ``DOWNGRADED`` because those ARE persisted).
* ``tiles_rejected_resolution`` — gated by the C11 ≥ 0.5 m/px
resolution check before any GET.
* ``tiles_rejected_freshness`` — c6's freshness gate raised.
* ``tiles_downgraded`` — c6 returned the ``DOWNGRADED`` label;
tile IS in the store, but flagged as stable_rear stale.
* ``retry_count`` — total transient retries across the batch.
* ``request_hash`` — first 16 hex of the journal key (so the
caller can correlate to the on-disk journal file without
re-deriving the hash).
"""
outcome: DownloadOutcome
tiles_requested: int
tiles_downloaded: int
tiles_rejected_resolution: int
tiles_rejected_freshness: int
tiles_downgraded: int
retry_count: int
request_hash: str
@@ -1,18 +1,20 @@
"""C11 TileManager config block (AZ-319).
"""C11 TileManager config block (AZ-316, AZ-319).
Registered into ``config.components['c11_tile_manager']`` by the
package ``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
reads this block to drive the upload path's HTTP behaviour and to
identify the producing companion against the parent suite's voting
layer.
package ``__init__.py``. Two composition-root factories read this
block:
The four fields below match the AZ-319 task spec § ``Outcome`` —
``config.c11.satellite_provider_ingest_url``,
``config.c11.upload_batch_size``, ``config.c11.upload_http_timeout_s``,
``config.c11.companion_id``. The ``upload_max_retry_after_s`` cap is
the Risk-3 ceiling on cumulative ``Retry-After`` budget for 429
responses (see :class:`RateLimitedError`).
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
reads the ``upload_*`` fields and ``companion_id`` to drive AZ-319.
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_downloader`
reads the ``satellite_provider_url``, ``service_api_key``, and
``download_*`` fields to drive AZ-316.
All defaults are conservative no-op values so unit tests / replay
runs that do not exercise C11 keep working without YAML; the factory
raises :class:`ConfigError` when an empty production-required field
(``service_api_key``, ``companion_id``, etc.) is observed in operator
wiring.
"""
from __future__ import annotations
@@ -28,25 +30,42 @@ _DEFAULT_BATCH_SIZE: int = 25
_DEFAULT_HTTP_TIMEOUT_S: float = 30.0
_DEFAULT_MAX_RETRY_AFTER_S: int = 600
_MAX_BATCH_SIZE: int = 200
_DEFAULT_DOWNLOAD_RESOLUTION_FLOOR: float = 0.5
_DEFAULT_DOWNLOAD_MAX_5XX_RETRIES: int = 4
_MIN_DOWNLOAD_RETRIES: int = 1
_MAX_DOWNLOAD_RETRIES: int = 16
@dataclass(frozen=True)
class C11Config:
"""Per-component config for C11 tile manager (upload path).
"""Per-component config for C11 tile manager (upload + download paths).
``satellite_provider_ingest_url`` is the parent-suite ingest base
URL (e.g. ``https://satellite-provider.example.com``); the
uploader appends ``/api/satellite/tiles/ingest`` to it. Defaulted
to empty so unit tests / replay runs that do not exercise the
upload path stay no-op; production configuration MUST set this
via YAML / env override or :class:`HttpTileUploader` raises
:class:`SatelliteProviderError` on the first attempt.
Upload-side fields (AZ-319):
``companion_id`` is the stable per-companion identifier the
parent suite's voting layer uses to attribute uploads to one
physical airframe. Defaulted to empty so test runs without a
paired companion stay valid; the factory raises ``ConfigError``
when the empty default is used in operator / production wiring.
* ``satellite_provider_ingest_url`` — base URL for the upload
endpoint; ``HttpTileUploader`` appends
``/api/satellite/tiles/ingest``. Empty → upload factory raises
:class:`ConfigError`.
* ``upload_batch_size`` — tiles per multipart POST.
* ``upload_http_timeout_s`` — per-request timeout (seconds).
* ``upload_max_retry_after_s`` — cumulative 429 ``Retry-After``
cap before :class:`RateLimitedError`.
* ``companion_id`` — stable per-companion id for D-PROJ-2 voting.
Download-side fields (AZ-316):
* ``satellite_provider_url`` — base URL for the GET surface;
``HttpTileDownloader`` appends per-tile / list paths.
* ``service_api_key`` — bearer token for authenticated GETs;
logged ONLY redacted (``Bearer ***``). Empty → download factory
raises :class:`ConfigError`.
* ``download_http_timeout_s`` — per-request timeout (seconds).
* ``download_max_5xx_retries`` — exponential-backoff cap before
:class:`SatelliteProviderError`.
* ``download_max_retry_after_s`` — cumulative 429 ``Retry-After``
cap before :class:`RateLimitedError`.
* ``download_resolution_floor_m_per_px`` — RESTRICT-SAT-4 lower
bound for the C11 boundary check; defaults to 0.5 m/px.
"""
satellite_provider_ingest_url: str = ""
@@ -55,6 +74,13 @@ class C11Config:
upload_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
companion_id: str = ""
satellite_provider_url: str = ""
service_api_key: str = ""
download_http_timeout_s: float = _DEFAULT_HTTP_TIMEOUT_S
download_max_5xx_retries: int = _DEFAULT_DOWNLOAD_MAX_5XX_RETRIES
download_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
download_resolution_floor_m_per_px: float = _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR
def __post_init__(self) -> None:
if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE:
raise ConfigError(
@@ -71,3 +97,24 @@ class C11Config:
"C11Config.upload_max_retry_after_s must be > 0; "
f"got {self.upload_max_retry_after_s}"
)
if self.download_http_timeout_s <= 0:
raise ConfigError(
"C11Config.download_http_timeout_s must be > 0; "
f"got {self.download_http_timeout_s}"
)
if not _MIN_DOWNLOAD_RETRIES <= self.download_max_5xx_retries <= _MAX_DOWNLOAD_RETRIES:
raise ConfigError(
"C11Config.download_max_5xx_retries must be in "
f"[{_MIN_DOWNLOAD_RETRIES}, {_MAX_DOWNLOAD_RETRIES}]; "
f"got {self.download_max_5xx_retries}"
)
if self.download_max_retry_after_s <= 0:
raise ConfigError(
"C11Config.download_max_retry_after_s must be > 0; "
f"got {self.download_max_retry_after_s}"
)
if self.download_resolution_floor_m_per_px <= 0:
raise ConfigError(
"C11Config.download_resolution_floor_m_per_px must be > 0; "
f"got {self.download_resolution_floor_m_per_px}"
)
@@ -1,10 +1,9 @@
"""C11 TileManager error family (AZ-317, AZ-318, AZ-319).
"""C11 TileManager error family (AZ-316, AZ-317, AZ-318, AZ-319).
Rooted at :class:`TileManagerError`. The parent is declared here (rather
than alongside the AZ-316 ``TileDownloader``) so the upload-side tasks
landing first do not need to wait on a downloader-only file. AZ-316
(``HttpTileDownloader``) will add its download-side errors as further
subclasses without re-declaring the parent.
Rooted at :class:`TileManagerError`. Both the upload (AZ-319) and
download (AZ-316) paths share the family parent so cross-path callers
can ``except TileManagerError`` to catch any C11-side terminal failure
without enumerating subclasses.
* :class:`FlightStateNotOnGroundError` (AZ-317) — defence-in-depth
refusal when the flight controller reports anything other than
@@ -15,10 +14,16 @@ subclasses without re-declaring the parent.
by the AZ-319 :class:`HttpTileUploader` after parsing a
``REJECTED`` per-tile response whose ``rejection_reason`` mentions
the signature.
* :class:`SatelliteProviderError` (AZ-319) — TLS / 401 / 403 fail-fast
AND persistent-5xx fail-after-retries surface for the upload path.
* :class:`RateLimitedError` (AZ-319) — 429 with persistent
``Retry-After`` budget exhaustion.
* :class:`SatelliteProviderError` (AZ-316/AZ-319) — TLS / 401 / 403
fail-fast AND persistent-5xx fail-after-retries surface for both
the download and upload paths.
* :class:`RateLimitedError` (AZ-316/AZ-319) — 429 with persistent
``Retry-After`` budget exhaustion (download + upload share the type).
* :class:`ResolutionRejectionError` (AZ-316) — surfaced when the
downloader's RESTRICT-SAT-4 boundary check rejects a tile with
``resolution_m_per_px < 0.5``.
* :class:`CacheBudgetExceededError` (AZ-316) — surfaced when c6's
AZ-308 budget enforcer cannot reserve head-room for the download.
"""
from __future__ import annotations
@@ -32,8 +37,10 @@ if TYPE_CHECKING:
)
__all__ = [
"CacheBudgetExceededError",
"FlightStateNotOnGroundError",
"RateLimitedError",
"ResolutionRejectionError",
"SatelliteProviderError",
"SessionNotActiveError",
"SignatureRejectedError",
@@ -98,13 +105,37 @@ class SatelliteProviderError(TileManagerError):
class RateLimitedError(TileManagerError):
"""``satellite-provider`` ingest endpoint rate-limited the upload.
"""``satellite-provider`` rate-limited the request (upload OR download).
Raised when the parent suite returns 429 and the cumulative
``Retry-After`` budget exceeds
:attr:`C11Config.upload_max_retry_after_s`. The
AZ-319 :class:`HttpTileUploader` honours the FIRST 429's
``Retry-After`` (sleep + retry) but escalates to this error after
:attr:`C11Config.upload_max_retry_after_s` (upload path) or
:attr:`C11Config.download_max_retry_after_s` (download path).
Both AZ-319 :class:`HttpTileUploader` and AZ-316
:class:`HttpTileDownloader` honour the first 429's
``Retry-After`` (sleep + retry) and escalate to this error after
the configured budget so the operator can surface the rate-limit
state explicitly.
"""
class ResolutionRejectionError(TileManagerError):
"""A downloaded tile failed the C11 ``RESTRICT-SAT-4`` resolution gate.
Raised by per-tile validation when ``resolution_m_per_px < 0.5``.
The download path catches this internally and counts the tile in
:attr:`DownloadBatchReport.tiles_rejected_resolution` rather than
aborting the batch — the type stays exported for the operator
tooling to surface in CLI output.
"""
class CacheBudgetExceededError(TileManagerError):
"""The c6 AZ-308 budget enforcer could not reserve head-room.
Raised after the C11 download path's pre-check converts c6's
``CacheBudgetExhaustedError`` into the C11-side type, so callers
only need to ``except TileManagerError`` (or this subclass) to
catch a cache-full failure. The original c6 error is preserved
on ``__cause__``.
"""
@@ -3,7 +3,10 @@
Operator-side ONLY — excluded from airborne via CMake (`BUILD_C11_TILE_MANAGER=OFF`).
See `_docs/02_document/components/12_c11_tilemanager/`.
* :class:`TileDownloader` — pre-flight download path (AZ-316, pending).
* :class:`TileDownloader` — pre-flight download path (AZ-316). The
authoritative shape lives in
``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0 and is mirrored 1:1 here.
* :class:`TileUploader` — post-landing upload path (AZ-319) — the
authoritative shape lives in
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md``
@@ -18,14 +21,15 @@ See `_docs/02_document/components/12_c11_tilemanager/`.
from __future__ import annotations
from collections.abc import Iterable, Sequence
from pathlib import Path
from collections.abc import Sequence
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
from gps_denied_onboard._types.tile import TileRecord
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadRequest,
FlightStateSignal,
TileSummary,
UploadBatchReport,
UploadRequest,
)
@@ -37,12 +41,27 @@ __all__ = [
]
@runtime_checkable
class TileDownloader(Protocol):
"""Pre-flight tile download from `satellite-provider`."""
"""Pre-flight tile download from ``satellite-provider`` (operator-side).
def download(
self, lat_lon_box: tuple[float, float, float, float], zoom: int, output_root: Path
) -> Iterable[TileRecord]: ...
See ``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0 for invariants I-1 .. I-5 and the per-method error matrix.
The :meth:`enumerate_remote_coverage` return type is
:class:`TileSummary` (DTO declared in C11's ``_types`` so the C12
consumer never imports c6 to size a download).
"""
def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport: ...
def enumerate_remote_coverage(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: Sequence[int],
) -> Sequence[TileSummary]: ...
@runtime_checkable
@@ -0,0 +1,907 @@
"""C11 ``HttpTileDownloader`` (AZ-316) — concrete :class:`TileDownloader`.
Operator-side pre-flight download path. Authenticated GETs against
``satellite-provider``, RESTRICT-SAT-4 enforcement at the C11 boundary,
c6 writes via the AZ-303 store + metadata Protocols (which run AZ-307's
freshness gate at insert), AZ-308 cache-headroom pre-check before any
GET fires, and a per-``(flight_id, request_hash)`` journal for
idempotent re-runs.
Architecture
------------
The c6 storage surfaces are reached through structural :class:`Protocol`
cuts (:class:`_TileWriterLike`, :class:`_BudgetEnforcerLike`,
:class:`_FreshnessRejectionLike`) defined in this module — never via a
direct ``from gps_denied_onboard.components.c6_tile_cache import …``.
The composition root
(``runtime_root.c11_factory.build_tile_downloader``) is the single
layer that may bind concrete c6 implementations into the constructor.
That adapter handles c6's :class:`TileMetadata` / :class:`TileSource` /
:class:`FreshnessLabel` / :class:`SectorClassification` enums so the
downloader stays free of cross-component imports (AZ-507 / AZ-270).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
import httpx
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadOutcome,
DownloadRequest,
SectorClassification,
TileSummary,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
CacheBudgetExceededError,
RateLimitedError,
SatelliteProviderError,
)
__all__ = [
"DOWNLOAD_JOURNAL_DIRNAME",
"HttpTileDownloader",
"request_hash",
]
_LIST_PATH = "/api/satellite/tiles"
_GET_PATH = "/api/satellite/tiles"
_LIST_QUERY_LIST_ONLY = "list-only"
DOWNLOAD_JOURNAL_DIRNAME = ".c11/journal"
_LOCKFILE_PATH = ".c11/lock"
_DEFAULT_BACKOFF_SCHEDULE_S: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0)
_COMPONENT = "c11_tile_manager.tile_downloader"
_LOG_KIND_SESSION_START = "c11.download.session.start"
_LOG_KIND_SESSION_END = "c11.download.session.end"
_LOG_KIND_RESOLUTION_REJECT = "c11.download.resolution_rejected"
_LOG_KIND_FRESHNESS_REJECT = "c11.download.freshness_rejected_summary"
_LOG_KIND_FRESHNESS_DOWNGRADED = "c11.download.freshness_downgraded"
_LOG_KIND_RETRY = "c11.download.batch.retry"
_LOG_KIND_PROVIDER_FAIL = "c11.download.provider.failed"
_LOG_KIND_BUDGET_FAIL = "c11.download.budget.exceeded"
_LOG_KIND_IDEMPOTENT = "c11.download.idempotent_no_op"
_AUTH_HEADER_REDACTED = "Bearer ***"
# ----------------------------------------------------------------------
# Consumer-side cuts over c6 (AZ-507): never imported across components.
# ----------------------------------------------------------------------
@runtime_checkable
class _TileWriterLike(Protocol):
"""Composition-root adapter that hides c6's ``TileMetadata`` assembly.
The downloader hands the adapter the primitives the parent suite
returned (zoom, lat, lon, tile_size, capture_ts, content sha256,
freshness label string, source string) plus the JPEG bytes. The
adapter assembles c6's :class:`TileMetadata` and calls
``TileStore.write_tile`` + ``TileMetadataStore.insert_metadata``
inside its own boundary, returning the c6 freshness label as a
plain string (``"fresh"`` / ``"downgraded"`` / etc.) so the
downloader can map it without importing c6's enum.
Raises :class:`_FreshnessRejectionLike`-compatible exception on
c6 freshness gate refusal; the downloader catches by structural
type to keep the import boundary clean.
"""
def write_tile_for_download(
self,
*,
tile_blob: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: datetime,
content_sha256_hex: str,
sector_class: str,
) -> str: ...
def tile_already_present(
self, *, zoom_level: int, lat: float, lon: float
) -> bool: ...
@runtime_checkable
class _BudgetEnforcerLike(Protocol):
"""Structural cut of c6's :meth:`CacheBudgetEnforcer.reserve_headroom`.
Returns any object on success; raises any exception on failure
(the downloader maps either path into its own
:class:`CacheBudgetExceededError` envelope so callers do not need
to catch a c6 type).
"""
def reserve_headroom(self, needed_bytes: int) -> Any: ...
# ----------------------------------------------------------------------
# Request hash + journal helpers
# ----------------------------------------------------------------------
def request_hash(
flight_id: UUID,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: tuple[int, ...],
sector_class: SectorClassification,
service_api_key: str,
) -> str:
"""Stable 16-hex digest used as the journal filename suffix.
Hashes a deterministic concatenation of the request fields plus a
SHA-256 of the service API key (so two operators with different
keys never share a journal). The digest is short enough for human
inspection and long enough for collision resistance within one
cache root.
"""
api_key_digest = hashlib.sha256(service_api_key.encode("utf-8")).hexdigest()
payload = "|".join(
[
str(flight_id),
f"{bbox_min_lat:.10f}",
f"{bbox_min_lon:.10f}",
f"{bbox_max_lat:.10f}",
f"{bbox_max_lon:.10f}",
",".join(str(z) for z in sorted(zoom_levels)),
sector_class.value,
api_key_digest,
]
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]
def _journal_path(cache_root: Path, flight_id: UUID, req_hash: str) -> Path:
return cache_root / DOWNLOAD_JOURNAL_DIRNAME / f"{flight_id}__{req_hash}.json"
@dataclass
class _JournalState:
"""Per-batch journal payload. Persisted as compact JSON via atomicwrite."""
flight_id: str
request_hash: str
started_at_iso: str
completed_at_iso: str | None = None
tile_ids_completed: list[str] = field(default_factory=list)
tile_counts: dict[str, int] = field(default_factory=dict)
def to_json_bytes(self) -> bytes:
return json.dumps(
{
"flight_id": self.flight_id,
"request_hash": self.request_hash,
"started_at_iso": self.started_at_iso,
"completed_at_iso": self.completed_at_iso,
"tile_ids_completed": self.tile_ids_completed,
"tile_counts": self.tile_counts,
},
sort_keys=True,
separators=(",", ":"),
).encode("utf-8")
@classmethod
def from_json_bytes(cls, raw: bytes) -> _JournalState:
decoded = json.loads(raw.decode("utf-8"))
return cls(
flight_id=str(decoded["flight_id"]),
request_hash=str(decoded["request_hash"]),
started_at_iso=str(decoded["started_at_iso"]),
completed_at_iso=decoded.get("completed_at_iso"),
tile_ids_completed=list(decoded.get("tile_ids_completed") or []),
tile_counts=dict(decoded.get("tile_counts") or {}),
)
def _atomic_write_json(path: Path, payload: bytes) -> None:
"""Write-then-rename + fsync per the description.md atomicwrites pattern."""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_str = tempfile.mkstemp(
prefix=path.name + ".",
suffix=".tmp",
dir=str(path.parent),
)
tmp = Path(tmp_str)
try:
with os.fdopen(fd, "wb") as fp:
fp.write(payload)
fp.flush()
os.fsync(fp.fileno())
os.replace(tmp, path)
# Best-effort directory fsync so the rename is durable on
# power-loss; not all filesystems implement directory fsync,
# so failure here is logged-and-ignored at the caller.
dir_fd = os.open(str(path.parent), os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except Exception:
if tmp.exists():
tmp.unlink(missing_ok=True)
raise
def _read_journal(path: Path) -> _JournalState | None:
if not path.exists():
return None
try:
return _JournalState.from_json_bytes(path.read_bytes())
except (json.JSONDecodeError, KeyError, ValueError):
# Risk-3 mitigation: a torn / corrupt journal record is treated
# as "no prior journal" so the batch re-runs from scratch.
return None
# ----------------------------------------------------------------------
# Retry-After parsing (mirrors AZ-319 helper)
# ----------------------------------------------------------------------
def _parse_retry_after(header_value: str | None, max_s: int) -> int:
if header_value is None:
return 0
raw = header_value.strip()
if not raw:
return 0
if raw.isdigit():
return min(int(raw), max_s)
try:
retry_at = parsedate_to_datetime(raw)
except (TypeError, ValueError):
return 0
now = datetime.now(timezone.utc)
if retry_at.tzinfo is None:
retry_at = retry_at.replace(tzinfo=timezone.utc)
delta = (retry_at - now).total_seconds()
return max(0, min(int(delta), max_s))
def _default_sleep(seconds: float) -> None:
"""Production sleep hook routes through :class:`WallClock.sleep_until_ns`.
Tests inject a no-op or a recorder. Routing through ``WallClock``
keeps the AZ-398 AC-4 AST scan over ``components/`` from seeing a
bare ``time.sleep``.
"""
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
clock.sleep_until_ns(clock.monotonic_ns() + int(seconds * 1_000_000_000))
# ----------------------------------------------------------------------
# Internal session-state container
# ----------------------------------------------------------------------
@dataclass
class _DownloadSession:
"""Mutable bookkeeping for one ``download_tiles_for_area`` call."""
request: DownloadRequest
journal: _JournalState
journal_path: Path
completed_set: set[str]
tiles_requested: int = 0
tiles_downloaded: int = 0
tiles_rejected_resolution: int = 0
tiles_rejected_freshness: int = 0
tiles_downgraded: int = 0
retry_count: int = 0
rate_limit_budget_used_s: int = 0
# ----------------------------------------------------------------------
# Concrete downloader
# ----------------------------------------------------------------------
class HttpTileDownloader:
"""Concrete :class:`TileDownloader` against ``satellite-provider``'s GET surface.
All cross-component dependencies (``tile_writer``,
``budget_enforcer``) are constructor-injected via Protocol cuts;
the composition root binds them to the c6 implementations. The
``http_client`` is caller-owned: production wiring uses one
long-lived :class:`httpx.Client` per process; tests inject
``httpx.Client(transport=httpx.MockTransport(...))`` for
deterministic responses.
"""
def __init__(
self,
*,
http_client: httpx.Client,
tile_writer: _TileWriterLike,
budget_enforcer: _BudgetEnforcerLike,
logger: logging.Logger,
config: C11Config,
sleep: Any = None,
backoff_schedule_s: tuple[float, ...] | None = None,
) -> None:
self._http_client = http_client
self._tile_writer = tile_writer
self._budget_enforcer = budget_enforcer
self._logger = logger
self._config = config
self._sleep = sleep if sleep is not None else _default_sleep
self._backoff_schedule_s = backoff_schedule_s or _DEFAULT_BACKOFF_SCHEDULE_S
# ------------------------------------------------------------------
# Public Protocol surface
# ------------------------------------------------------------------
def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport:
"""Idempotence check → enumerate → budget → per-tile GET loop."""
req_hash = request_hash(
request.flight_id,
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
tuple(request.zoom_levels),
request.sector_class,
self._config.service_api_key,
)
journal_path = _journal_path(request.cache_root, request.flight_id, req_hash)
existing = _read_journal(journal_path)
if existing is not None and existing.completed_at_iso is not None:
self._logger.info(
"Idempotent re-run detected; zero work scheduled",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_IDEMPOTENT,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"tile_ids_completed": len(existing.tile_ids_completed),
},
},
)
counts = existing.tile_counts
return DownloadBatchReport(
outcome=DownloadOutcome.IDEMPOTENT_NO_OP,
tiles_requested=int(counts.get("tiles_requested", len(existing.tile_ids_completed))),
tiles_downloaded=int(counts.get("tiles_downloaded", len(existing.tile_ids_completed))),
tiles_rejected_resolution=int(counts.get("tiles_rejected_resolution", 0)),
tiles_rejected_freshness=int(counts.get("tiles_rejected_freshness", 0)),
tiles_downgraded=int(counts.get("tiles_downgraded", 0)),
retry_count=0,
request_hash=req_hash,
)
journal = existing or _JournalState(
flight_id=str(request.flight_id),
request_hash=req_hash,
started_at_iso=_iso_now(),
)
completed_set = set(journal.tile_ids_completed)
session = _DownloadSession(
request=request,
journal=journal,
journal_path=journal_path,
completed_set=completed_set,
)
self._logger.info(
"Pre-flight tile download session started",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_START,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"bbox": [
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
],
"zoom_levels": list(request.zoom_levels),
"sector_class": request.sector_class.value,
"resume_from_journal": completed_set != set(),
"tiles_already_completed": len(completed_set),
},
},
)
outcome: DownloadOutcome = DownloadOutcome.SUCCESS
try:
summaries = self._enumerate_remote(request)
session.tiles_requested = len(summaries)
self._reserve_budget(summaries, completed_set, session)
for summary in summaries:
if summary.tile_id_str in completed_set:
continue
self._download_one_tile(summary, request, session)
journal.completed_at_iso = _iso_now()
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
return DownloadBatchReport(
outcome=outcome,
tiles_requested=session.tiles_requested,
tiles_downloaded=session.tiles_downloaded,
tiles_rejected_resolution=session.tiles_rejected_resolution,
tiles_rejected_freshness=session.tiles_rejected_freshness,
tiles_downgraded=session.tiles_downgraded,
retry_count=session.retry_count,
request_hash=req_hash,
)
except (
SatelliteProviderError,
RateLimitedError,
CacheBudgetExceededError,
):
outcome = DownloadOutcome.FAILURE
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
raise
except Exception:
outcome = DownloadOutcome.FAILURE
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
raise
finally:
self._logger.info(
"Pre-flight tile download session ended",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_END,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"outcome": outcome.value,
"tiles_requested": session.tiles_requested,
"tiles_downloaded": session.tiles_downloaded,
"tiles_rejected_resolution": session.tiles_rejected_resolution,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
"tiles_downgraded": session.tiles_downgraded,
"retry_count": session.retry_count,
},
},
)
if session.tiles_rejected_freshness:
self._logger.warning(
"Freshness gate rejected tiles in this batch",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FRESHNESS_REJECT,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
},
},
)
def enumerate_remote_coverage(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: Any,
) -> list[TileSummary]:
"""Read-only enumeration; issues one ``list-only=true`` GET."""
return list(
self._do_enumerate(
bbox_min_lat,
bbox_min_lon,
bbox_max_lat,
bbox_max_lon,
tuple(int(z) for z in zoom_levels),
)
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _enumerate_remote(self, request: DownloadRequest) -> list[TileSummary]:
return list(
self._do_enumerate(
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
tuple(request.zoom_levels),
)
)
def _do_enumerate(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: tuple[int, ...],
) -> list[TileSummary]:
params = {
"bbox": f"{bbox_min_lat},{bbox_min_lon},{bbox_max_lat},{bbox_max_lon}",
"zoom": ",".join(str(z) for z in zoom_levels),
_LIST_QUERY_LIST_ONLY: "true",
}
response = self._send_get(
self._config.satellite_provider_url.rstrip("/") + _LIST_PATH,
params=params,
session=None,
)
try:
body = response.json()
except ValueError as exc:
self._log_provider_failure(
"list_not_json", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider returned non-JSON list-only body"
) from exc
try:
entries = body["tiles"]
except (KeyError, TypeError) as exc:
self._log_provider_failure(
"list_schema", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider list-only response missing 'tiles'"
) from exc
summaries: list[TileSummary] = []
for entry in entries:
try:
summaries.append(
TileSummary(
tile_id_str=str(entry["tile_id"]),
zoom_level=int(entry["zoom_level"]),
lat=float(entry["lat"]),
lon=float(entry["lon"]),
produced_at=_parse_iso(str(entry["produced_at"])),
resolution_m_per_px=float(entry["resolution_m_per_px"]),
estimated_bytes=int(entry["estimated_bytes"]),
tile_size_meters=float(entry.get("tile_size_meters", 100.0)),
tile_size_pixels=int(entry.get("tile_size_pixels", 256)),
)
)
except (KeyError, TypeError, ValueError) as exc:
self._log_provider_failure(
"list_tile_schema", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider list-only entry missing required fields"
) from exc
return summaries
def _reserve_budget(
self,
summaries: list[TileSummary],
completed_set: set[str],
session: _DownloadSession,
) -> None:
remaining_bytes = sum(
int(s.estimated_bytes)
for s in summaries
if s.tile_id_str not in completed_set
)
if remaining_bytes <= 0:
return
try:
self._budget_enforcer.reserve_headroom(remaining_bytes)
except CacheBudgetExceededError:
self._log_budget_failure(remaining_bytes)
raise
except Exception as exc:
self._log_budget_failure(remaining_bytes, detail=str(exc))
raise CacheBudgetExceededError(
f"c6 budget enforcer refused {remaining_bytes} bytes "
f"of head-room: {exc}"
) from exc
def _download_one_tile(
self,
summary: TileSummary,
request: DownloadRequest,
session: _DownloadSession,
) -> None:
if summary.resolution_m_per_px < self._config.download_resolution_floor_m_per_px:
session.tiles_rejected_resolution += 1
self._logger.warning(
"Resolution gate rejected tile",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_RESOLUTION_REJECT,
"kv": {
"flight_id": str(request.flight_id),
"tile_id": summary.tile_id_str,
"resolution_m_per_px": summary.resolution_m_per_px,
"floor_m_per_px": self._config.download_resolution_floor_m_per_px,
},
},
)
return
ingest_url = (
self._config.satellite_provider_url.rstrip("/")
+ _GET_PATH
+ f"/{summary.tile_id_str}"
)
response = self._send_get(ingest_url, params=None, session=session)
if not response.content:
self._log_provider_failure(
"empty_body", response.status_code, summary.tile_id_str
)
raise SatelliteProviderError(
f"satellite-provider returned empty body for tile_id="
f"{summary.tile_id_str}"
)
tile_blob = response.content
content_sha256_hex = hashlib.sha256(tile_blob).hexdigest()
produced_at = summary.produced_at
if produced_at.tzinfo is None:
produced_at = produced_at.replace(tzinfo=timezone.utc)
try:
label = self._tile_writer.write_tile_for_download(
tile_blob=tile_blob,
zoom_level=summary.zoom_level,
lat=summary.lat,
lon=summary.lon,
tile_size_meters=summary.tile_size_meters,
tile_size_pixels=summary.tile_size_pixels,
capture_timestamp=produced_at,
content_sha256_hex=content_sha256_hex,
sector_class=request.sector_class.value,
)
except Exception as exc:
if _is_freshness_rejection(exc):
session.tiles_rejected_freshness += 1
return
raise
if label == "downgraded":
session.tiles_downgraded += 1
session.tiles_downloaded += 1
self._logger.warning(
"Freshness label downgraded by c6",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FRESHNESS_DOWNGRADED,
"kv": {
"flight_id": str(request.flight_id),
"tile_id": summary.tile_id_str,
},
},
)
else:
session.tiles_downloaded += 1
session.completed_set.add(summary.tile_id_str)
session.journal.tile_ids_completed = sorted(session.completed_set)
session.journal.tile_counts = self._counts_dict(session)
_atomic_write_json(session.journal_path, session.journal.to_json_bytes())
def _send_get(
self,
url: str,
params: dict[str, str] | None,
session: _DownloadSession | None,
) -> httpx.Response:
"""GET with auth header + 429 / 5xx handling."""
headers = {"Authorization": f"Bearer {self._config.service_api_key}"}
attempt = 0
last_error: str | None = None
while True:
attempt += 1
try:
response = self._http_client.get(
url,
params=params,
headers=headers,
timeout=self._config.download_http_timeout_s,
)
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteError,
httpx.RemoteProtocolError,
) as exc:
last_error = f"transport:{type(exc).__name__}:{exc}"
if attempt > self._config.download_max_5xx_retries:
self._log_provider_failure("connection_error", None, last_error)
raise SatelliteProviderError(
f"satellite-provider unreachable after "
f"{attempt - 1} retries: {last_error}"
) from exc
self._sleep_with_log(
self._backoff_for(attempt - 1), last_error, session
)
continue
if response.status_code in (401, 403):
self._log_provider_failure(
"auth_failed", response.status_code, "fail-fast"
)
raise SatelliteProviderError(
f"satellite-provider rejected auth (http_status="
f"{response.status_code}); fail-fast"
)
if response.status_code == 429:
wait_s = _parse_retry_after(
response.headers.get("Retry-After"),
self._config.download_max_retry_after_s
- (session.rate_limit_budget_used_s if session else 0),
)
if session is not None:
session.rate_limit_budget_used_s += wait_s
if wait_s <= 0 or (
session is not None
and session.rate_limit_budget_used_s
>= self._config.download_max_retry_after_s
):
self._log_provider_failure(
"rate_limited", 429, "Retry-After budget exhausted"
)
raise RateLimitedError(
"satellite-provider rate-limited the download; "
f"cumulative Retry-After budget "
f"{(session.rate_limit_budget_used_s if session else 0)}s "
f"exceeds cap {self._config.download_max_retry_after_s}s"
)
self._sleep_with_log(wait_s, f"http_429_retry_after={wait_s}", session)
continue
if response.status_code >= 500:
last_error = f"http_status={response.status_code}"
if attempt > self._config.download_max_5xx_retries:
self._log_provider_failure(
"persistent_5xx", response.status_code, last_error
)
raise SatelliteProviderError(
f"satellite-provider returned {response.status_code} "
f"after {attempt - 1} retries"
)
self._sleep_with_log(
self._backoff_for(attempt - 1), last_error, session
)
continue
if response.status_code != 200:
self._log_provider_failure(
"unexpected_status", response.status_code, "non-200"
)
raise SatelliteProviderError(
f"satellite-provider returned unexpected status "
f"{response.status_code} (expected 200)"
)
return response
def _backoff_for(self, attempt_idx: int) -> float:
if attempt_idx < 0:
attempt_idx = 0
if attempt_idx >= len(self._backoff_schedule_s):
attempt_idx = len(self._backoff_schedule_s) - 1
return self._backoff_schedule_s[attempt_idx]
def _sleep_with_log(
self, wait_s: float, reason: str, session: _DownloadSession | None
) -> None:
if session is not None:
session.retry_count += 1
self._logger.warning(
"Download batch retrying",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_RETRY,
"kv": {
"wait_s": wait_s,
"reason": reason,
"retry_count": session.retry_count if session is not None else None,
},
},
)
self._sleep(wait_s)
def _log_provider_failure(
self, reason: str, http_status: int | None, detail: str
) -> None:
self._logger.error(
"Download provider failed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PROVIDER_FAIL,
"kv": {
"reason": reason,
"http_status": http_status,
"detail": detail,
"auth_header": _AUTH_HEADER_REDACTED,
},
},
)
def _log_budget_failure(
self, requested_bytes: int, detail: str | None = None
) -> None:
self._logger.error(
"Cache-budget pre-check failed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BUDGET_FAIL,
"kv": {
"requested_bytes": requested_bytes,
"detail": detail,
},
},
)
def _counts_dict(self, session: _DownloadSession) -> dict[str, int]:
return {
"tiles_requested": session.tiles_requested,
"tiles_downloaded": session.tiles_downloaded,
"tiles_rejected_resolution": session.tiles_rejected_resolution,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
"tiles_downgraded": session.tiles_downgraded,
}
# ----------------------------------------------------------------------
# Module-level helpers
# ----------------------------------------------------------------------
def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="microseconds")
def _parse_iso(raw: str) -> datetime:
s = raw.strip()
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s)
def _is_freshness_rejection(exc: BaseException) -> bool:
"""Structural test: c6 raises ``FreshnessRejectionError``.
The composition-root adapter is free to re-raise the c6 type
directly; we recognise it by class name to avoid importing the
c6 errors module here.
"""
if exc.__class__.__name__ == "FreshnessRejectionError":
return True
for base in type(exc).__mro__:
if base.__name__ == "FreshnessRejectionError":
return True
return False
@@ -1,6 +1,6 @@
"""C11 TileManager composition-root factories (AZ-317, AZ-318, AZ-319).
"""C11 TileManager composition-root factories (AZ-316, AZ-317, AZ-318, AZ-319).
Wires the upload-side services that have landed:
Wires the operator-side services:
* :func:`build_flight_state_gate` (AZ-317) — adapts an injected
``FlightStateSource`` (typically an E-C8 FC adapter wrapper) into
@@ -12,10 +12,16 @@ Wires the upload-side services that have landed:
key manager, the c6 storage cuts, an :class:`httpx.Client`, and
the :class:`C11Config` block into the production
:class:`HttpTileUploader`.
* :func:`build_tile_downloader` (AZ-316) — composes the c6 store +
metadata-store + budget-enforcer (wrapped in a single
composition-root adapter that hides c6's :class:`TileMetadata`
assembly), an :class:`httpx.Client`, and the :class:`C11Config`
block into the production :class:`HttpTileDownloader`.
Composition root is the ONLY layer permitted to import from
``components.c11_tile_manager`` (per ``module-layout.md`` Rule 9 +
the AZ-270 lint).
the AZ-270 lint). It is also the only layer permitted to bridge the
c6 ↔ c11 boundary (per AZ-507).
"""
from __future__ import annotations
@@ -28,6 +34,7 @@ from gps_denied_onboard.components.c11_tile_manager import (
C11Config,
FlightStateGate,
FlightStateSource,
HttpTileDownloader,
HttpTileUploader,
PerFlightKeyManager,
)
@@ -42,6 +49,7 @@ if TYPE_CHECKING:
__all__ = [
"build_flight_state_gate",
"build_per_flight_key_manager",
"build_tile_downloader",
"build_tile_uploader",
]
@@ -51,6 +59,7 @@ _C11_SIGNING_LOGGER = "c11_tile_manager.signing_key"
_C11_SIGNING_PRODUCER_ID = "c11_tile_manager.signing_key"
_C11_UPLOADER_LOGGER = "c11_tile_manager.tile_uploader"
_C11_UPLOADER_PRODUCER_ID = "c11_tile_manager.tile_uploader"
_C11_DOWNLOADER_LOGGER = "c11_tile_manager.tile_downloader"
def build_flight_state_gate(*, source: FlightStateSource) -> FlightStateGate:
@@ -145,3 +154,164 @@ def build_tile_uploader(
logger=logger,
config=block,
)
def build_tile_downloader(
config: Config,
*,
http_client: httpx.Client,
tile_store: Any,
tile_metadata_store: Any,
budget_enforcer: Any,
companion_id: str | None = None,
) -> HttpTileDownloader:
"""Construct a wired :class:`HttpTileDownloader` (AZ-316).
Wraps c6's ``TileStore`` + ``TileMetadataStore`` +
``CacheBudgetEnforcer`` into a single composition-root adapter
that absorbs c6's :class:`TileMetadata` / :class:`TileSource` /
:class:`FreshnessLabel` / :class:`SectorClassification` enums
so the downloader stays free of cross-component imports
(AZ-507 / AZ-270). The c6 surfaces are caller-owned; production
wiring shares the same singletons used by the uploader and the
on-airframe ingest path.
"""
block = config.components.get("c11_tile_manager")
if block is None:
raise ConfigError(
"build_tile_downloader: config.components['c11_tile_manager'] "
"block is missing — register C11Config and supply YAML"
)
if not isinstance(block, C11Config):
raise ConfigError(
"build_tile_downloader: config.components['c11_tile_manager'] "
f"must be a C11Config, got {type(block).__name__}"
)
if not block.satellite_provider_url:
raise ConfigError(
"build_tile_downloader: C11Config.satellite_provider_url "
"must be configured for production / operator wiring"
)
if not block.service_api_key:
raise ConfigError(
"build_tile_downloader: C11Config.service_api_key must be "
"set; the operator-tooling deploy MUST inject the bearer "
"token via env override"
)
logger = get_logger(_C11_DOWNLOADER_LOGGER)
adapter = _C6DownloadAdapter(
tile_store=tile_store,
metadata_store=tile_metadata_store,
budget_enforcer=budget_enforcer,
companion_id=companion_id or block.companion_id,
)
return HttpTileDownloader(
http_client=http_client,
tile_writer=adapter,
budget_enforcer=adapter,
logger=logger,
config=block,
)
class _C6DownloadAdapter:
"""Composition-root bridge between AZ-316 and c6 storage.
Implements both :class:`_TileWriterLike` and
:class:`_BudgetEnforcerLike` Protocol cuts (declared in
``c11_tile_manager.tile_downloader`` as structural Protocols).
Hides c6's :class:`TileMetadata` / :class:`TileSource` /
:class:`FreshnessLabel` / :class:`SectorClassification` so the
AZ-316 module never imports c6.
"""
__slots__ = (
"_tile_store",
"_metadata_store",
"_budget_enforcer",
"_companion_id",
)
def __init__(
self,
*,
tile_store: Any,
metadata_store: Any,
budget_enforcer: Any,
companion_id: str,
) -> None:
self._tile_store = tile_store
self._metadata_store = metadata_store
self._budget_enforcer = budget_enforcer
self._companion_id = companion_id
def write_tile_for_download(
self,
*,
tile_blob: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: Any,
content_sha256_hex: str,
sector_class: str,
) -> str:
from gps_denied_onboard.components.c6_tile_cache._types import (
FreshnessLabel,
TileId,
TileMetadata,
TileSource,
VotingStatus,
)
tid = TileId(zoom_level=int(zoom_level), lat=float(lat), lon=float(lon))
metadata = TileMetadata(
tile_id=tid,
tile_size_meters=float(tile_size_meters),
tile_size_pixels=int(tile_size_pixels),
capture_timestamp=capture_timestamp,
source=TileSource.GOOGLEMAPS,
content_sha256_hex=content_sha256_hex,
freshness_label=FreshnessLabel.FRESH,
flight_id=None,
companion_id=None,
quality_metadata=None,
voting_status=VotingStatus.TRUSTED,
)
self._tile_store.write_tile(tile_blob, metadata)
self._metadata_store.insert_metadata(metadata)
# AZ-307 owns the actual freshness label after insert; for the
# download path the simplest contract is "FRESH on first write,
# DOWNGRADED if the row already existed under stable_rear stale
# rules". The c6 store does not currently expose the post-insert
# label as a return value (AZ-303 contract); we return "fresh"
# as the conservative default. A future c6 ABI extension that
# surfaces the label can update this adapter without touching
# the AZ-316 module.
return "fresh"
def tile_already_present(
self, *, zoom_level: int, lat: float, lon: float
) -> bool:
from gps_denied_onboard.components.c6_tile_cache._types import TileId
tid = TileId(zoom_level=int(zoom_level), lat=float(lat), lon=float(lon))
return bool(self._tile_store.tile_exists(tid))
def reserve_headroom(self, needed_bytes: int) -> Any:
from gps_denied_onboard.components.c11_tile_manager.errors import (
CacheBudgetExceededError,
)
from gps_denied_onboard.components.c6_tile_cache.errors import (
CacheBudgetExhaustedError,
)
try:
return self._budget_enforcer.reserve_headroom(needed_bytes)
except CacheBudgetExhaustedError as exc:
raise CacheBudgetExceededError(
f"c6 cache budget exhausted: needed {needed_bytes} bytes; {exc}"
) from exc
@@ -1,8 +1,8 @@
"""AZ-319 AC-12 — `HttpTileUploader` satisfies the `TileUploader` Protocol.
"""C11 protocol conformance — uploader (AZ-319 AC-12) + downloader (AZ-316 AC-10).
Smoke-test that the concrete impl exposes every method the Protocol
requires (positive case) and that a partial fake omitting one of the
three required methods is correctly rejected (negative case).
Smoke-tests that each concrete impl exposes every method its Protocol
requires (positive cases) and that partial fakes omitting one of the
required methods are correctly rejected (negative cases).
"""
from __future__ import annotations
@@ -13,9 +13,13 @@ import httpx
from gps_denied_onboard.components.c11_tile_manager import (
C11Config,
HttpTileDownloader,
HttpTileUploader,
)
from gps_denied_onboard.components.c11_tile_manager.interface import TileUploader
from gps_denied_onboard.components.c11_tile_manager.interface import (
TileDownloader,
TileUploader,
)
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
_PRODUCER_ID = "c11_tile_manager.tile_uploader"
@@ -38,6 +42,13 @@ class _PartialFakeMissingConfirm:
return []
class _PartialDownloaderMissingEnumerate:
"""Conformance counterexample: missing ``enumerate_remote_coverage``."""
def download_tiles_for_area(self, request: object) -> object: # noqa: ARG002
return None
def test_ac12_concrete_uploader_satisfies_protocol() -> None:
# Arrange — supply minimal-yet-valid dependencies; the Protocol
# check only inspects method names, not their behaviour.
@@ -68,3 +79,32 @@ def test_ac12_concrete_uploader_satisfies_protocol() -> None:
def test_ac12_partial_fake_is_not_protocol_conformant() -> None:
# Assert
assert not isinstance(_PartialFakeMissingConfirm(), TileUploader)
def test_ac10_concrete_downloader_satisfies_protocol() -> None:
# Arrange
cfg = C11Config(
satellite_provider_url="https://parent-suite.test",
service_api_key="conformance-test-key",
download_http_timeout_s=5.0,
download_max_5xx_retries=4,
download_max_retry_after_s=600,
download_resolution_floor_m_per_px=0.5,
)
transport = httpx.MockTransport(lambda r: httpx.Response(200, json={"tiles": []}))
downloader = HttpTileDownloader(
http_client=httpx.Client(transport=transport, base_url="https://parent-suite.test"),
tile_writer=object(), # type: ignore[arg-type]
budget_enforcer=object(), # type: ignore[arg-type]
logger=logging.getLogger("test_az316_conformance"),
config=cfg,
sleep=_NullSleep(),
)
# Assert
assert isinstance(downloader, TileDownloader)
def test_ac10_partial_downloader_is_not_protocol_conformant() -> None:
# Assert
assert not isinstance(_PartialDownloaderMissingEnumerate(), TileDownloader)
@@ -0,0 +1,739 @@
"""AZ-316 ``HttpTileDownloader`` unit tests.
Covers AC-1 .. AC-12 plus the throughput NFR from
``_docs/02_tasks/todo/AZ-316_c11_tile_downloader.md``. Uses
:class:`httpx.MockTransport` for deterministic HTTP responses, a
list-backed log handler for log capture, and stub C6 stores so this
suite never depends on AZ-303 / AZ-305 / AZ-307 / AZ-308 internals.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
import httpx
import pytest
from gps_denied_onboard.components.c11_tile_manager import (
C11Config,
CacheBudgetExceededError,
DownloadOutcome,
DownloadRequest,
HttpTileDownloader,
RateLimitedError,
SatelliteProviderError,
SectorClassification,
request_hash,
)
_BASE_URL = "https://parent-suite.test"
_LIST_PATH = "/api/satellite/tiles"
_API_KEY = "test-api-key-001"
# ----------------------------------------------------------------------
# Stubs / fakes
# ----------------------------------------------------------------------
class _StubFreshnessRejection(Exception):
"""Composition-root would surface c6's ``FreshnessRejectionError``;
we mirror it locally so the structural check in the downloader
matches by class name without importing c6.
"""
# Re-name the class so the structural check (`__class__.__name__ ==
# "FreshnessRejectionError"`) inside the downloader matches.
_StubFreshnessRejection.__name__ = "FreshnessRejectionError"
class _StubTileWriter:
"""Captures `write_tile_for_download` calls + scripts the freshness label."""
def __init__(
self,
*,
labels: dict[str, str] | None = None,
rejected: set[str] | None = None,
) -> None:
self.labels = labels or {}
self.rejected = rejected or set()
self.write_calls: list[dict[str, Any]] = []
self.exists_calls: list[tuple[int, float, float]] = []
def write_tile_for_download(
self,
*,
tile_blob: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: datetime,
content_sha256_hex: str,
sector_class: str,
) -> str:
tid = _tid(zoom_level, lat, lon)
self.write_calls.append(
{
"tile_id": tid,
"tile_blob_len": len(tile_blob),
"content_sha256_hex": content_sha256_hex,
"sector_class": sector_class,
}
)
if tid in self.rejected:
raise _StubFreshnessRejection(f"freshness rejected {tid}")
return self.labels.get(tid, "fresh")
def tile_already_present(
self, *, zoom_level: int, lat: float, lon: float
) -> bool:
self.exists_calls.append((zoom_level, lat, lon))
return False
class _StubBudgetEnforcer:
"""Records `reserve_headroom` calls; raises pre-baked exception when set."""
def __init__(self, raise_on_call: Exception | None = None) -> None:
self.calls: list[int] = []
self._raise = raise_on_call
def reserve_headroom(self, needed_bytes: int) -> object:
self.calls.append(int(needed_bytes))
if self._raise is not None:
raise self._raise
return object()
def _tid(zoom: int, lat: float, lon: float) -> str:
return f"z{int(zoom)}_{float(lat):.6f}_{float(lon):.6f}"
def _build_downloader(
*,
transport: httpx.MockTransport,
tile_writer: _StubTileWriter | None = None,
budget_enforcer: _StubBudgetEnforcer | None = None,
config: C11Config | None = None,
sleep_recorder: list[float] | None = None,
backoff_schedule_s: tuple[float, ...] | None = None,
) -> tuple[
HttpTileDownloader,
list[logging.LogRecord],
_StubTileWriter,
_StubBudgetEnforcer,
list[float],
]:
log_records: list[logging.LogRecord] = []
class _Handler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
log_records.append(record)
logger = logging.getLogger(f"test_az316_{id(log_records)}")
logger.handlers.clear()
logger.addHandler(_Handler())
logger.setLevel(logging.DEBUG)
logger.propagate = False
writer = tile_writer or _StubTileWriter()
enforcer = budget_enforcer or _StubBudgetEnforcer()
sleeps = sleep_recorder if sleep_recorder is not None else []
cfg = config or C11Config(
satellite_provider_url=_BASE_URL,
service_api_key=_API_KEY,
download_http_timeout_s=5.0,
download_max_5xx_retries=4,
download_max_retry_after_s=600,
download_resolution_floor_m_per_px=0.5,
)
client = httpx.Client(transport=transport, base_url=_BASE_URL)
downloader = HttpTileDownloader(
http_client=client,
tile_writer=writer, # type: ignore[arg-type]
budget_enforcer=enforcer, # type: ignore[arg-type]
logger=logger,
config=cfg,
sleep=lambda s: sleeps.append(s),
backoff_schedule_s=backoff_schedule_s,
)
return downloader, log_records, writer, enforcer, sleeps
def _make_request(
*,
flight_id: Any | None = None,
cache_root: Path,
zoom_levels: tuple[int, ...] = (14,),
) -> DownloadRequest:
return DownloadRequest(
flight_id=flight_id or uuid4(),
bbox_min_lat=45.0,
bbox_min_lon=-122.5,
bbox_max_lat=45.5,
bbox_max_lon=-122.0,
zoom_levels=zoom_levels,
sector_class=SectorClassification.STABLE_REAR,
cache_root=cache_root,
)
def _list_response(
tiles: list[dict[str, Any]] | None = None,
) -> httpx.Response:
return httpx.Response(200, json={"tiles": tiles or []})
def _tile_entry(
*,
zoom: int,
lat: float,
lon: float,
resolution_m_per_px: float = 0.5,
estimated_bytes: int = 4096,
produced_at: datetime | None = None,
) -> dict[str, Any]:
produced = produced_at or datetime(2026, 5, 13, 0, 0, 0, tzinfo=timezone.utc)
return {
"tile_id": _tid(zoom, lat, lon),
"zoom_level": zoom,
"lat": lat,
"lon": lon,
"produced_at": produced.isoformat(),
"resolution_m_per_px": resolution_m_per_px,
"estimated_bytes": estimated_bytes,
"tile_size_meters": 100.0,
"tile_size_pixels": 256,
}
def _make_route_handler(
*,
list_response: httpx.Response | None = None,
tile_response_factory: Any = None,
) -> Any:
"""Route GETs by URL path: list endpoint vs per-tile endpoint."""
def _handler(request: httpx.Request) -> httpx.Response:
path = request.url.path
is_list = (
path.endswith(_LIST_PATH)
and request.url.params.get("list-only") == "true"
)
if is_list:
return list_response or _list_response()
if tile_response_factory is None:
return httpx.Response(200, content=b"\xff\xd8\xff\xe0fake-jpeg")
return tile_response_factory(request)
return _handler
# ----------------------------------------------------------------------
# AC-1: 100-tile happy path
# ----------------------------------------------------------------------
def test_ac1_100_tile_happy_path_writes_all(tmp_path: Path) -> None:
# Arrange
tiles = [
_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0 - i * 0.001)
for i in range(100)
]
transport = httpx.MockTransport(
_make_route_handler(list_response=_list_response(tiles))
)
(downloader, _logs, writer, enforcer, _sleeps) = _build_downloader(
transport=transport
)
request = _make_request(cache_root=tmp_path)
# Act
report = downloader.download_tiles_for_area(request)
# Assert
assert report.outcome == DownloadOutcome.SUCCESS
assert report.tiles_requested == 100
assert report.tiles_downloaded == 100
assert report.tiles_rejected_resolution == 0
assert report.tiles_rejected_freshness == 0
assert report.tiles_downgraded == 0
assert len(writer.write_calls) == 100
assert enforcer.calls == [4096 * 100]
# ----------------------------------------------------------------------
# AC-2: resolution gate rejects sub-spec tiles BEFORE write
# ----------------------------------------------------------------------
def test_ac2_resolution_gate_rejects_sub_spec_tiles(tmp_path: Path) -> None:
# Arrange
tiles = []
for i in range(50):
res = 0.3 if i < 10 else 0.5
tiles.append(
_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0, resolution_m_per_px=res)
)
transport = httpx.MockTransport(
_make_route_handler(list_response=_list_response(tiles))
)
(downloader, log_records, writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
# Act
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
assert report.tiles_rejected_resolution == 10
assert report.tiles_downloaded == 40
assert len(writer.write_calls) == 40
res_warnings = [r for r in log_records if getattr(r, "kind", "") == "c11.download.resolution_rejected"]
assert len(res_warnings) == 10
# ----------------------------------------------------------------------
# AC-3: c6 freshness rejection counted, not propagated
# ----------------------------------------------------------------------
def test_ac3_freshness_rejections_counted_and_run_continues(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(10)]
rejected_ids = {_tid(14, 45.0 + i * 0.001, -122.0) for i in range(5)}
transport = httpx.MockTransport(
_make_route_handler(list_response=_list_response(tiles))
)
writer = _StubTileWriter(rejected=rejected_ids)
(downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, tile_writer=writer
)
# Act
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
assert report.tiles_rejected_freshness == 5
assert report.tiles_downloaded == 5
assert report.outcome == DownloadOutcome.SUCCESS
summary_warns = [
r for r in log_records if getattr(r, "kind", "") == "c11.download.freshness_rejected_summary"
]
assert len(summary_warns) == 1
# ----------------------------------------------------------------------
# AC-4: stable_rear stale tiles are surfaced as DOWNGRADED
# ----------------------------------------------------------------------
def test_ac4_downgraded_label_counted_and_persisted(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(5)]
labels = {_tid(14, 45.0 + i * 0.001, -122.0): "downgraded" for i in range(3)}
transport = httpx.MockTransport(
_make_route_handler(list_response=_list_response(tiles))
)
writer = _StubTileWriter(labels=labels)
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, tile_writer=writer
)
# Act
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
assert report.tiles_downgraded == 3
assert report.tiles_downloaded == 5
# ----------------------------------------------------------------------
# AC-5: 429 honours Retry-After
# ----------------------------------------------------------------------
def test_ac5_429_honours_retry_after(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
state = {"attempts": 0}
def _factory(request: httpx.Request) -> httpx.Response:
state["attempts"] += 1
if state["attempts"] == 1:
return httpx.Response(429, headers={"Retry-After": "30"})
return httpx.Response(200, content=b"\xff\xd8tile")
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
sleeps: list[float] = []
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, sleep_recorder=sleeps
)
# Act
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
assert state["attempts"] == 2
assert sleeps and sleeps[0] >= 30
assert report.retry_count >= 1
assert report.outcome == DownloadOutcome.SUCCESS
# ----------------------------------------------------------------------
# AC-6: persistent 5xx aborts with structured error
# ----------------------------------------------------------------------
def test_ac6_persistent_5xx_raises_satellite_provider_error(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
state = {"attempts": 0}
def _factory(request: httpx.Request) -> httpx.Response:
state["attempts"] += 1
return httpx.Response(503)
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
# Act / Assert
with pytest.raises(SatelliteProviderError):
downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
assert state["attempts"] >= 5
# ----------------------------------------------------------------------
# AC-7: 401 fails fast (no retry)
# ----------------------------------------------------------------------
def test_ac7_401_fails_fast_no_retry(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
state = {"attempts": 0}
def _factory(request: httpx.Request) -> httpx.Response:
state["attempts"] += 1
return httpx.Response(401)
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
(downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
# Act / Assert
with pytest.raises(SatelliteProviderError):
downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
assert state["attempts"] == 1
# ----------------------------------------------------------------------
# AC-8: idempotent re-run after success
# ----------------------------------------------------------------------
def test_ac8_idempotent_rerun_yields_no_op(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(5)]
state = {"tile_gets": 0}
def _factory(request: httpx.Request) -> httpx.Response:
state["tile_gets"] += 1
return httpx.Response(200, content=b"\xff\xd8tile")
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
request = _make_request(cache_root=tmp_path)
# Act — first run
first = downloader.download_tiles_for_area(request)
first_get_count = state["tile_gets"]
# Act — second run (same request, same cache_root → idempotent)
second = downloader.download_tiles_for_area(request)
# Assert
assert first.outcome == DownloadOutcome.SUCCESS
assert second.outcome == DownloadOutcome.IDEMPOTENT_NO_OP
assert state["tile_gets"] == first_get_count, "second run must NOT fetch"
assert second.tiles_downloaded == first.tiles_downloaded
# ----------------------------------------------------------------------
# AC-9: cache-budget pre-check aborts before any GET
# ----------------------------------------------------------------------
def test_ac9_cache_budget_pre_check_aborts(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0, estimated_bytes=10_000)]
transport_state = {"tile_gets": 0}
def _factory(request: httpx.Request) -> httpx.Response:
transport_state["tile_gets"] += 1
return httpx.Response(200, content=b"\xff\xd8tile")
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
enforcer = _StubBudgetEnforcer(
raise_on_call=CacheBudgetExceededError("no headroom")
)
(downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, budget_enforcer=enforcer
)
# Act / Assert
with pytest.raises(CacheBudgetExceededError):
downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
assert transport_state["tile_gets"] == 0
assert enforcer.calls == [10_000]
# ----------------------------------------------------------------------
# AC-11: service API key never logged in plaintext
# ----------------------------------------------------------------------
def test_ac11_service_api_key_never_appears_in_logs(tmp_path: Path) -> None:
# Arrange — exercise the failure path so the provider-failed ERROR
# log fires (the code that explicitly redacts the auth header).
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
def _factory(request: httpx.Request) -> httpx.Response:
return httpx.Response(401)
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
(downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
with pytest.raises(SatelliteProviderError):
downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
flat = " ".join(
r.getMessage() + json.dumps(getattr(r, "kv", {})) for r in log_records
)
assert _API_KEY not in flat
assert "Bearer ***" in flat
# ----------------------------------------------------------------------
# AC-12: journal survives mid-batch crash; re-run completes the rest
# ----------------------------------------------------------------------
def test_ac12_partial_journal_resumed_on_rerun(tmp_path: Path) -> None:
# Arrange — 10 tiles; first run fetches all 10 successfully and
# leaves a complete journal. A second run with the SAME request
# must short-circuit (AC-8 covers that). To exercise AC-12 we
# MANUALLY truncate the journal between runs to simulate a crash
# AFTER 4 tile-writes, BEFORE the completed_at_iso stamp.
tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(10)]
state = {"tile_gets": 0}
def _factory(request: httpx.Request) -> httpx.Response:
state["tile_gets"] += 1
return httpx.Response(200, content=b"\xff\xd8tile")
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
(downloader, _logs, writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
request = _make_request(cache_root=tmp_path)
# First run — completes
downloader.download_tiles_for_area(request)
after_first = state["tile_gets"]
write_calls_after_first = len(writer.write_calls)
assert after_first == 10
assert write_calls_after_first == 10
# Simulate crash: rewrite the journal as "completed 4 of 10 tiles,
# NOT yet completed" (clear `completed_at_iso`).
rh = request_hash(
request.flight_id,
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
tuple(request.zoom_levels),
request.sector_class,
_API_KEY,
)
journal_path = tmp_path / ".c11/journal" / f"{request.flight_id}__{rh}.json"
raw = json.loads(journal_path.read_text("utf-8"))
raw["completed_at_iso"] = None
raw["tile_ids_completed"] = sorted(raw["tile_ids_completed"])[:4]
raw["tile_counts"]["tiles_downloaded"] = 4
journal_path.write_text(json.dumps(raw), encoding="utf-8")
# Reset transport counter (writer still records cumulative calls)
state["tile_gets"] = 0
writer.write_calls.clear()
# Act — second run must fetch only the missing 6
second = downloader.download_tiles_for_area(request)
# Assert
assert state["tile_gets"] == 6
assert len(writer.write_calls) == 6
assert second.outcome == DownloadOutcome.SUCCESS
assert second.tiles_downloaded == 6
# ----------------------------------------------------------------------
# Retry-After HTTP-date form (Risk 1 from the spec)
# ----------------------------------------------------------------------
def test_429_retry_after_http_date_form_parses(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
state = {"attempts": 0}
future = (datetime.now(timezone.utc) + timedelta(seconds=20)).strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
def _factory(request: httpx.Request) -> httpx.Response:
state["attempts"] += 1
if state["attempts"] == 1:
return httpx.Response(429, headers={"Retry-After": future})
return httpx.Response(200, content=b"\xff\xd8tile")
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
sleeps: list[float] = []
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, sleep_recorder=sleeps
)
# Act
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# Assert
assert state["attempts"] == 2
assert sleeps and sleeps[0] >= 0
assert report.outcome == DownloadOutcome.SUCCESS
# ----------------------------------------------------------------------
# 429 budget exhaustion → RateLimitedError
# ----------------------------------------------------------------------
def test_429_budget_exhaustion_raises_rate_limited_error(tmp_path: Path) -> None:
# Arrange
tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)]
def _factory(request: httpx.Request) -> httpx.Response:
return httpx.Response(429, headers={"Retry-After": "300"})
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=_factory,
)
)
cfg = C11Config(
satellite_provider_url=_BASE_URL,
service_api_key=_API_KEY,
download_http_timeout_s=5.0,
download_max_5xx_retries=4,
download_max_retry_after_s=400,
download_resolution_floor_m_per_px=0.5,
)
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport, config=cfg
)
# Act / Assert
with pytest.raises(RateLimitedError):
downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
# ----------------------------------------------------------------------
# NFR — bookkeeping throughput on a 1000-tile happy path
# ----------------------------------------------------------------------
def test_nfr_throughput_1000_tiles_under_budget(tmp_path: Path) -> None:
# Arrange
tiles = [
_tile_entry(zoom=14, lat=45.0 + i * 0.0001, lon=-122.0 + i * 0.0001)
for i in range(1000)
]
transport = httpx.MockTransport(
_make_route_handler(
list_response=_list_response(tiles),
tile_response_factory=lambda r: httpx.Response(200, content=b"\xff\xd8tile"),
)
)
(downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader(
transport=transport
)
import time as _time
t0 = _time.perf_counter()
report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path))
elapsed = _time.perf_counter() - t0
# Assert — budget is generous; the goal is to catch an O(n^2)
# bookkeeping regression, not to certify wall-clock throughput.
assert report.outcome == DownloadOutcome.SUCCESS
assert report.tiles_downloaded == 1000
assert elapsed < 10.0, f"1000-tile bookkeeping took {elapsed:.2f}s"