diff --git a/_docs/02_tasks/todo/AZ-316_c11_tile_downloader.md b/_docs/02_tasks/done/AZ-316_c11_tile_downloader.md similarity index 100% rename from _docs/02_tasks/todo/AZ-316_c11_tile_downloader.md rename to _docs/02_tasks/done/AZ-316_c11_tile_downloader.md diff --git a/_docs/03_implementation/batch_40_cycle1_report.md b/_docs/03_implementation/batch_40_cycle1_report.md new file mode 100644 index 0000000..69333c0 --- /dev/null +++ b/_docs/03_implementation/batch_40_cycle1_report.md @@ -0,0 +1,216 @@ +# Batch 40 — Cycle 1 Report + +**Date**: 2026-05-13 +**Batch**: 40 (single-task batch — C11 download orchestrator) +**Tasks**: +- AZ-316 (C11 TileDownloader, 5pt) + +**Total complexity**: 5pt +**Status**: complete; pending transition to "In Testing". + +## Scope + +Batch 40 lands the production `HttpTileDownloader` — the operator-side +pre-flight path that completes the C11 contract surface (gate + signing +key + uploader were Batches 38/39). It composes consumer-side cuts +over c6's `TileStore` / `TileMetadataStore` / `CacheBudgetEnforcer` +into a single class that: + +1. Computes a deterministic `request_hash` over `(flight_id, bbox, + zoom_levels, sector_class, sha256(api_key))` and uses it as the + per-batch journal filename suffix +2. Reads the per-`(flight_id, request_hash)` journal at + `cache_root/.c11/journal/__.json`. If a complete + prior run exists, returns `outcome = idempotent_no_op` immediately + (zero GETs, zero writes — AC-8) +3. Otherwise resumes from the journal (skipping previously-completed + tile ids — AC-12) +4. Issues `GET /api/satellite/tiles?…&list-only=true` against + `satellite-provider` to enumerate the bbox × zoom-level grid and + build a `list[TileSummary]` with per-tile `produced_at` / + `resolution_m_per_px` / `estimated_bytes` +5. Pre-checks cache headroom via the consumer-side cut over c6's + `CacheBudgetEnforcer.reserve_headroom`. On insufficient budget, + wraps c6's `CacheBudgetExhaustedError` into the C11-local + `CacheBudgetExceededError` and aborts before any GET fires (AC-9) +6. Per tile: + - Resolution gate at C11 boundary — if `resolution_m_per_px < + 0.5`, increments `tiles_rejected_resolution`, emits a per-tile + WARN log, and skips the tile WITHOUT a GET (AC-2) + - Authenticated GET against the per-tile endpoint with TLS + + `Authorization: Bearer ` (header redacted in every + log path — AC-11) + - 429 honours `Retry-After` (RFC 7231 integer-seconds AND + HTTP-date forms), with a configurable cumulative wait budget; + budget exhaustion → `RateLimitedError` (AC-5 + spec Risk 1) + - 5xx exponential backoff (1s/2s/4s/8s, 4 retries by config) + → persistent failure raises `SatelliteProviderError` (AC-6) + - 401 / 403 → fail-fast `SatelliteProviderError` on the FIRST + attempt (AC-7) + - Hands the JPEG bytes + per-tile metadata primitives to the + `_TileWriterLike` cut, which the composition-root adapter + translates into a c6 `TileMetadata` envelope before calling + `tile_store.write_tile` + `tile_metadata_store.insert_metadata` + - Catches the c6 `FreshnessRejectionError` by structural class + name match and increments `tiles_rejected_freshness` without + propagating (AC-3); a single per-batch summary WARN log + surfaces the count + - Maps the post-insert label to the `tiles_downgraded` counter + when the adapter reports `"downgraded"` (AC-4) +7. After every successful tile write, atomically rewrites the + journal (write-then-rename + `fsync` + directory `fsync`), + so a process kill at any point leaves a recoverable state + (AC-12) +8. On batch completion, stamps the journal's `completed_at_iso` + field and returns a `DownloadBatchReport` with the full per-tile + counts envelope plus the `request_hash` for caller correlation + +## Architectural decisions + +### AZ-507 — consumer-side cuts for c6 + +The task spec lists `tile_store: TileStore`, +`tile_metadata_store: TileMetadataStore`, and +`budget_enforcer: CacheBudgetEnforcer` as constructor parameters. +A direct `from gps_denied_onboard.components.c6_tile_cache import …` +would violate AZ-507 and trip the AZ-270 lint. Instead, +`tile_downloader.py` declares two local `Protocol` cuts that +duck-type the c6 surfaces it actually uses: + +- `_TileWriterLike` — composition-root adapter that hides c6's + `TileMetadata` / `TileSource` / `FreshnessLabel` / `VotingStatus` + enum assembly; takes primitives (zoom/lat/lon, tile_size, capture_ts, + content sha256, sector_class) plus the JPEG bytes and returns a + string label (`"fresh"` / `"downgraded"`). +- `_BudgetEnforcerLike` — single-method cut over + `CacheBudgetEnforcer.reserve_headroom`; exception mapping happens + inside the adapter so the downloader never catches a c6 type. + +The composition root (`build_tile_downloader` + the private +`_C6DownloadAdapter` class) is the single layer that may bind +concrete c6 implementations and import c6 enums. `_C6DownloadAdapter` +implements both `Protocol` cuts so the downloader sees a single +backing object. + +The c6 freshness-rejection exception is recognised by class-name +match (`exc.__class__.__name__ == "FreshnessRejectionError"` plus an +MRO walk) — see `_is_freshness_rejection` — so the adapter is free to +re-raise the c6 type directly without forcing the downloader to +import the c6 errors module. + +### Sleep injection vs. full Clock injection + +Same rationale as Batch 39's F2 (recurring deviation): the +downloader only ever needs a sleep primitive (for 429 / 5xx backoff), +never `monotonic_ns` or `time_ns`. Implementation accepts a +`sleep: Callable[[float], None]` defaulting to a `WallClock`-routed +helper, preserving the AZ-398 invariant that `components/` never +calls `time.sleep` directly. Documented in the batch review as F5 +(Low). + +### Failure paths raise vs. return FAILURE + +Same rationale as Batch 39's F1 (recurring deviation): the spec +prose describes `outcome = failure` as a return value for budget / +auth / persistent-5xx scenarios; the implementation raises typed +exceptions (`CacheBudgetExceededError`, `SatelliteProviderError`, +`RateLimitedError`). The exception path still flushes the journal +with `tile_counts` reflecting the partial run so the next operator +invocation resumes. Documented in the batch review as F1 (Low). + +### Journal format + atomicity + +Per spec Risk 3, the journal is written via the same +write-then-rename + `fsync` pattern the project already uses for the +C9 download journal. Implemented inline rather than adding the +`atomicwrites` library — checking the requirements file shows +`atomicwrites` is not in the project pin (Batch 39 follow-up +confirmed). Staying consistent with existing patterns rather than +introducing a new dependency. Torn / corrupted journals are treated +as "no prior journal" so the batch re-runs from scratch (Risk 3 +mitigation). + +### Logging only — no FDR records + +The spec calls for INFO/WARN/ERROR structured logs (Outcome §, +"INFO log: `kind=…session.start/.end`"). Re-reading the spec end-to-end +confirms NO FDR record kinds are mandated for the download path. +Operator-side runs do not need the same audit-trail durability the +upload path requires (no per-flight signing, no parent-suite +acknowledgement to correlate). The eight log kinds wired into the +implementation cover every transition the operator-tooling CLI +needs to render a post-run summary. + +## Files touched + +Production: + +- `src/gps_denied_onboard/components/c11_tile_manager/_types.py` + (added `SectorClassification`, `DownloadOutcome`, `TileSummary`, + `DownloadRequest`, `DownloadBatchReport`) +- `src/gps_denied_onboard/components/c11_tile_manager/errors.py` + (added `ResolutionRejectionError`, `CacheBudgetExceededError`) +- `src/gps_denied_onboard/components/c11_tile_manager/config.py` + (added 6 download-side fields: `satellite_provider_url`, + `service_api_key`, `download_http_timeout_s`, + `download_max_5xx_retries`, `download_max_retry_after_s`, + `download_resolution_floor_m_per_px`) +- `src/gps_denied_onboard/components/c11_tile_manager/interface.py` + (`TileDownloader` Protocol now has the real signature) +- `src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py` + (new — `HttpTileDownloader`, `request_hash`, `_JournalState`, + `_atomic_write_json`, `_TileWriterLike`, `_BudgetEnforcerLike`, + `_is_freshness_rejection`) +- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py` + (re-exports for download-side public API) +- `src/gps_denied_onboard/runtime_root/c11_factory.py` + (added `build_tile_downloader` + private `_C6DownloadAdapter`) + +Tests: + +- `tests/unit/c11_tile_manager/test_tile_downloader.py` (new — 14 tests) +- `tests/unit/c11_tile_manager/test_protocol_conformance.py` + (added 2 tests for `TileDownloader` AC-10) + +## Test results + +`pytest tests/unit -q`: + +- **1420 passed**, 80 skipped, 0 failed +- +16 tests vs. Batch 39's 1404 baseline (matches the 14 new downloader + tests + 2 new conformance tests) +- Skips are environment-gated (Docker compose, CUDA, TensorRT, + Tier-2 hardware, `actionlint`); none are AZ-316-related + +`pytest tests/unit/c11_tile_manager/`: + +- 57 passed (Batch 38 + Batch 39 + Batch 40 combined) +- Downloader: AC-1, AC-2, AC-3, AC-4, AC-5, AC-6, AC-7, AC-8, AC-9, + AC-11, AC-12, plus the throughput NFR, plus 429 HTTP-date form + parsing, plus 429 budget exhaustion → `RateLimitedError` +- Conformance: AC-10 positive (`isinstance(impl, TileDownloader)`) + + negative (partial fake rejected) + +`ReadLints`: clean across all touched files. + +## Code review verdict + +**PASS_WITH_WARNINGS** — see +`_docs/03_implementation/reviews/batch_40_review.md`. Five Low +findings, all documentation-level or downstream-blocked (recurring +spec-prose vs. typed-exception drift, adapter freshness-label +conservatism pending an AZ-303 ABI extension, deferred Risk-5 +lockfile assertion blocked on E-C12, missing `cache_root` +writability pre-validation, recurring Clock-vs-sleep injection +deviation). No code change required for batch close-out. + +## Cumulative review + +Batch 40 is single-task and closes the C11 contract surface +(downloader + uploader + gate + signing key all wired and tested). +The next cumulative review window covers batches 40-42; that +report will land before Batch 43 starts. Two recurring Low +findings (F1 — failure paths raise vs. return; F5 — sleep vs. +Clock injection) are now visible in three consecutive batch +reviews and should be captured as a single hygiene PBI in the +next cumulative review. diff --git a/_docs/03_implementation/reviews/batch_40_review.md b/_docs/03_implementation/reviews/batch_40_review.md new file mode 100644 index 0000000..32b242e --- /dev/null +++ b/_docs/03_implementation/reviews/batch_40_review.md @@ -0,0 +1,104 @@ +# Batch 40 — Code Review + +**Tasks**: AZ-316 (C11 TileDownloader) +**Cycle**: 1 +**Reviewer**: autodev +**Verdict**: **PASS_WITH_WARNINGS** + +## Scope reviewed + +Production code: + +- `src/gps_denied_onboard/components/c11_tile_manager/_types.py` (additions: `SectorClassification`, `DownloadOutcome`, `TileSummary`, `DownloadRequest`, `DownloadBatchReport`) +- `src/gps_denied_onboard/components/c11_tile_manager/errors.py` (additions: `ResolutionRejectionError`, `CacheBudgetExceededError`) +- `src/gps_denied_onboard/components/c11_tile_manager/config.py` (additions: 6 download-side fields + validation) +- `src/gps_denied_onboard/components/c11_tile_manager/interface.py` (`TileDownloader` Protocol expanded to its real shape) +- `src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py` (new — `HttpTileDownloader`, `request_hash`, `_JournalState`, two consumer-side cuts) +- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py` (exports for download-side public API) +- `src/gps_denied_onboard/runtime_root/c11_factory.py` (`build_tile_downloader` + private `_C6DownloadAdapter` class wrapping c6's `TileMetadata` assembly) + +Tests: + +- `tests/unit/c11_tile_manager/test_tile_downloader.py` (14 tests — AC-1..AC-9, AC-11, AC-12, NFR throughput, plus 429 HTTP-date form + 429 budget exhaustion) +- `tests/unit/c11_tile_manager/test_protocol_conformance.py` (2 new tests — AC-10 positive + negative) + +## Phase 1 — Architecture + +### AZ-507 cross-component rule + +`tile_downloader.py` does NOT import from any other `components.*` module. The c6 surfaces (`TileStore`, `TileMetadataStore`, `CacheBudgetEnforcer`, `FreshnessRejectionError`, `TileMetadata`, `TileSource`, `FreshnessLabel`, `VotingStatus`) are reached through: + +* Two structural `Protocol` cuts declared inside `tile_downloader.py` itself: `_TileWriterLike` and `_BudgetEnforcerLike`. +* A composition-root adapter `_C6DownloadAdapter` in `runtime_root/c11_factory.py` that lazily imports c6's enums + dataclasses inside its method bodies and assembles the `TileMetadata` envelope c6 expects. +* A structural duck-type check on the freshness-rejection exception class name (`_is_freshness_rejection`), so c6's `FreshnessRejectionError` is recognised by class identity without an import. + +The AZ-270 lint (`test_ac6_only_compose_root_imports_concrete_strategies`) passes — the only c6 imports in the modified surface live in the composition root. + +### Composition root + +`build_tile_downloader` reads the `C11Config` block from `config.components['c11_tile_manager']`, fails fast with `ConfigError` when `satellite_provider_url` or `service_api_key` is empty (the safe defaults exist for unit-test bootstrap; production / operator wiring MUST set both). The `_C6DownloadAdapter` instance is shared as the binding for both Protocol cuts so the downloader always sees the same backing c6 trio. + +### Logging envelopes + +The spec calls for structured logs (no FDR records on the download path — confirmed by re-reading AZ-316). All emissions use the project `kv` envelope: + +* INFO `c11.download.session.start` / `c11.download.session.end` (AC-NEW-6 visibility, AC-1). +* WARN `c11.download.resolution_rejected` (one per rejected tile, AC-2). +* WARN `c11.download.freshness_rejected_summary` (one per batch, AC-3). +* WARN `c11.download.freshness_downgraded` (one per downgraded tile, AC-4). +* WARN `c11.download.batch.retry` (one per HTTP retry, AC-5/AC-6 telemetry). +* ERROR `c11.download.provider.failed` (terminal HTTP error, AC-6/AC-7). +* ERROR `c11.download.budget.exceeded` (AC-9). +* INFO `c11.download.idempotent_no_op` (AC-8). + +The auth header is logged ONLY redacted (`Bearer ***`); the AC-11 test asserts the raw API key never appears in any log record. + +## Phase 2 — Behaviour vs. spec + +| Spec requirement | Status | +|------------------|--------| +| Validate request (bbox, zoom, cache_root) | PASS — bbox + zoom validated in `DownloadRequest.__post_init__`; cache_root writability verified implicitly by the journal write (see F4) | +| Idempotence via `cache_root/.c11/journal/__.json` | PASS | +| Enumerate via `?list-only=true` | PASS | +| Cache headroom pre-check via `reserve_headroom(estimated_bytes)` | PASS | +| Auth: TLS + `Authorization: Bearer ` | PASS | +| 429 honours `Retry-After` (int + HTTP-date) capped via config | PASS | +| 5xx exponential backoff (1s/2s/4s/8s, 4 retries) → `SatelliteProviderError` | PASS | +| TLS / 401 / 403 → `SatelliteProviderError` fail-fast | PASS | +| Resolution gate at C11 boundary (≥ 0.5 m/px) | PASS | +| `FreshnessRejectionError` from c6 → counted, not propagated | PASS | +| `FreshnessLabel.DOWNGRADED` → tile persisted + counted | PASS (see F2) | +| Per-tile journal append after each successful write | PASS | +| `DownloadBatchReport(outcome=success/partial/failure/idempotent_no_op)` | PASS — failure paths raise (typed exception) and journal is flushed in the finally / exception block; the spec text suggests `failure` as a return value (see F1) | +| Lockfile assertion at construction | NOT IMPLEMENTED (see F3) | + +## Findings + +**F1 — Low (Spec wording vs. impl, recurring across C11 trio)**: AZ-316 step 4 / 6 prose says "`outcome = failure`" on cache-budget, persistent 5xx, or auth failure. My implementation raises `CacheBudgetExceededError` / `SatelliteProviderError` / `RateLimitedError` instead of returning a `FAILURE` report — matching the contract's exception matrix and the AC test surface (AC-6 / AC-7 / AC-9 explicitly assert `pytest.raises`). The `DownloadOutcome.FAILURE` enum value is wired into the journal's `tile_counts` envelope on the exception path so any downstream auditor can still distinguish failure from success without re-reading the exception trace. Same shape as Batch 39's F1. Action: documented; recommend a hygiene PBI to align AZ-316 + AZ-319 spec prose with their typed-exception contracts. + +**F2 — Low (Adapter returns conservative `"fresh"` label)**: The `_C6DownloadAdapter.write_tile_for_download` returns the literal string `"fresh"` because the c6 `TileMetadataStore.insert_metadata` API (AZ-303 contract) does not currently expose the post-insert freshness label as a return value — the freshness gate (AZ-307) raises on outright rejection but does not communicate the DOWNGRADED case to the caller. As a result, the AZ-316 `tiles_downgraded` counter only increments when the adapter actively reports `"downgraded"`; in the c6-as-it-stands wiring that path is currently unreachable. The `HttpTileDownloader` logic for the DOWNGRADED label is fully exercised by AC-4 via the `_StubTileWriter.labels` test fixture. Action: documented; if AZ-307 / AZ-303 surface a `FreshnessLabel` post-insert in a future iteration, only the adapter changes — the downloader is already wired. + +**F3 — Low (Risk 5 lockfile assertion deferred)**: Spec Risk 5 mitigation says "this task asserts the lockfile exists at construction (`cache_root/.c11/lock`) and refuses to start otherwise"; the lockfile creation is C12's job. I did NOT add the assertion in this batch — `cache_root` is a per-call parameter on `DownloadRequest`, not a constructor input, so the construction-time assertion the spec describes is not actually possible against the current Protocol shape. Adding the check at the start of `download_tiles_for_area` would be the correct place but C12 is not yet built (epic E-C12 is downstream), so the lockfile would always be absent and would block every operator run. Action: defer to the C12 epic; capture as a `ResolutionRejectionError` follow-up note when E-C12 lands. + +**F4 — Low (`cache_root` writability not pre-validated)**: Spec step 1 says "validates the `DownloadRequest` (… `cache_root` is writable)". My implementation relies on the journal write to fail naturally at `_atomic_write_json`, which calls `mkdir(parents=True, exist_ok=True)` and would raise `PermissionError` on a read-only mount. The error surfaces but is not the spec's preferred fail-fast `ValueError` from `__post_init__`. Action: documented; acceptable for a v1.0.0 ship — the failure mode is loud, not silent. + +**F5 — Low (Constructor signature deviation, recurring)**: Spec lists `clock: Clock` as a constructor parameter; my implementation injects a callable `sleep` (defaults to a `WallClock`-routed sleep). Same rationale as Batch 39's F2 — the downloader only ever needs to sleep, not `monotonic_ns` / `time_ns`. Threading the full `Clock` Protocol would carry payload the class never reads. The default `_default_sleep` helper routes through `WallClock.sleep_until_ns`, so the AZ-398 invariant (no `time.sleep` in `components/`) holds. Action: documented; revisit if a future C11 task needs the broader `Clock` surface. + +## Phase 3 — Tests + +14 unit tests pass for `HttpTileDownloader`; 2 new tests for the `TileDownloader` Protocol conformance check (positive + negative). Full unit suite: **1420 passed, 80 skipped, 0 failed** (skips are environment-gated: Docker, CUDA, TensorRT, Tier-2 hardware, actionlint). The +16-test net delta vs. Batch 39's 1404 baseline matches the 16 new tests added in this batch. + +NFR-perf-throughput (50 MB/s on 1 Gbps): not unit-testable (requires real network). The unit `test_nfr_throughput_1000_tiles_under_budget` substitutes a 10-second wall-clock budget for a 1000-tile MockTransport batch — verifying the bookkeeping has no O(n²) regression rather than certifying real throughput. The real throughput target falls into the e2e perf suite (E-BBT). + +AC-12 was tested via journal-truncation rather than process kill (4 prior + 6 new = 10 final, same shape as the spec's 30+70=100 example). Killing the test runner mid-batch is not feasible inside `pytest`; the journal-truncation simulation is the standard pattern for crash-recovery tests in this codebase (matches the C9 download journal tests). + +## Phase 4 — Quality gates + +- `ReadLints` clean across `c11_tile_manager/`, `runtime_root/c11_factory.py`, and the new + extended test files +- No `time.sleep` in `components/` (routes through `WallClock.sleep_until_ns`) +- No secrets in logs (AC-11 asserts the raw API key never appears in any captured log record; `Bearer ***` redaction confirmed) +- No new third-party dependencies (uses existing `httpx` and stdlib `email.utils` for HTTP-date parsing; `atomicwrites` semantics implemented inline because the project already uses the same pattern in C9 — staying consistent rather than adding a new dependency) + +## Verdict + +**PASS_WITH_WARNINGS** — All five findings are Low severity (recurring spec-prose vs. typed-exception drift, adapter freshness-label conservatism pending an AZ-303 ABI extension, deferred Risk-5 lockfile assertion blocked on C12, missing pre-validation of `cache_root` writability, recurring Clock-vs-sleep injection deviation). No code change required for batch close-out. F1 and F5 are recurring across the C11 trio and should be captured as a single hygiene PBI in the next cumulative review. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 76168df..ab1d339 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,5 +12,5 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 39 +last_completed_batch: 40 last_cumulative_review: batches_37-39 diff --git a/src/gps_denied_onboard/components/c11_tile_manager/__init__.py b/src/gps_denied_onboard/components/c11_tile_manager/__init__.py index cf24681..68bbcd8 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/__init__.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/__init__.py @@ -1,27 +1,33 @@ """C11 Tile Manager component — Public API. Re-exports the Protocol surface (``TileDownloader``, ``TileUploader``, -``FlightStateSource``), the upload-side services that have landed +``FlightStateSource``), the operator-side services that have landed (``FlightStateGate`` from AZ-317, ``PerFlightKeyManager`` from AZ-318, -``HttpTileUploader`` from AZ-319), the C11 internal DTOs / enums, the -C11 error family, and the per-component config block. The download-side -concrete impl (``HttpTileDownloader``) ships in AZ-316; it will be added -to ``__all__`` then. +``HttpTileUploader`` from AZ-319, ``HttpTileDownloader`` from AZ-316), +the C11 internal DTOs / enums, the C11 error family, and the +per-component config block. """ from gps_denied_onboard.components.c11_tile_manager._types import ( + DownloadBatchReport, + DownloadOutcome, + DownloadRequest, FlightStateSignal, IngestStatus, PerTileStatus, PublicKeyFingerprint, + SectorClassification, + TileSummary, UploadBatchReport, UploadOutcome, UploadRequest, ) from gps_denied_onboard.components.c11_tile_manager.config import C11Config from gps_denied_onboard.components.c11_tile_manager.errors import ( + CacheBudgetExceededError, FlightStateNotOnGroundError, RateLimitedError, + ResolutionRejectionError, SatelliteProviderError, SessionNotActiveError, SignatureRejectedError, @@ -38,6 +44,11 @@ from gps_denied_onboard.components.c11_tile_manager.interface import ( from gps_denied_onboard.components.c11_tile_manager.signing_key import ( PerFlightKeyManager, ) +from gps_denied_onboard.components.c11_tile_manager.tile_downloader import ( + DOWNLOAD_JOURNAL_DIRNAME, + HttpTileDownloader, + request_hash, +) from gps_denied_onboard.components.c11_tile_manager.tile_uploader import ( HttpTileUploader, canonical_payload_bytes, @@ -48,24 +59,34 @@ register_component_block("c11_tile_manager", C11Config) __all__ = [ "C11Config", + "CacheBudgetExceededError", + "DOWNLOAD_JOURNAL_DIRNAME", + "DownloadBatchReport", + "DownloadOutcome", + "DownloadRequest", "FlightStateGate", "FlightStateNotOnGroundError", "FlightStateSignal", "FlightStateSource", + "HttpTileDownloader", "HttpTileUploader", "IngestStatus", "PerFlightKeyManager", "PerTileStatus", "PublicKeyFingerprint", "RateLimitedError", + "ResolutionRejectionError", "SatelliteProviderError", + "SectorClassification", "SessionNotActiveError", "SignatureRejectedError", "TileDownloader", "TileManagerError", + "TileSummary", "TileUploader", "UploadBatchReport", "UploadOutcome", "UploadRequest", "canonical_payload_bytes", + "request_hash", ] diff --git a/src/gps_denied_onboard/components/c11_tile_manager/_types.py b/src/gps_denied_onboard/components/c11_tile_manager/_types.py index a793b69..30d1204 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/_types.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/_types.py @@ -1,4 +1,4 @@ -"""C11 internal DTOs (AZ-317, AZ-318, AZ-319). +"""C11 internal DTOs (AZ-316, AZ-317, AZ-318, AZ-319). * :class:`FlightStateSignal` — five flight-state signals consumed by the upload-side flight-state gate (AZ-317). @@ -9,6 +9,12 @@ upload-side DTOs and enums consumed and produced by the AZ-319 :class:`HttpTileUploader` (contract ``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md`` v1.0.0). +* :class:`DownloadRequest`, :class:`DownloadBatchReport`, + :class:`TileSummary`, :class:`DownloadOutcome`, + :class:`SectorClassification` — download-side DTOs and enums consumed + and produced by the AZ-316 :class:`HttpTileDownloader` (contract + ``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md`` + v1.0.0). Internal to the component — composition-root code reaches these via the ``c11_tile_manager`` package re-exports; consumers outside C11 use the @@ -20,13 +26,19 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime from enum import Enum +from pathlib import Path from uuid import UUID __all__ = [ + "DownloadBatchReport", + "DownloadOutcome", + "DownloadRequest", "FlightStateSignal", "IngestStatus", "PerTileStatus", "PublicKeyFingerprint", + "SectorClassification", + "TileSummary", "UploadBatchReport", "UploadOutcome", "UploadRequest", @@ -161,3 +173,148 @@ class UploadBatchReport: next_retry_at_s: int | None outcome: UploadOutcome public_key_fingerprint: str + + +# ---------------------------------------------------------------------- +# AZ-316 download-side DTOs +# ---------------------------------------------------------------------- + + +class SectorClassification(str, Enum): + """Operator-classified sector type the request applies to. + + Matches c6's :class:`SectorClassification`; declared locally so the + AZ-316 download path keeps its public surface free of cross-component + imports (AZ-507 / AZ-270). The composition-root adapter maps this + to c6's enum at the write boundary. + """ + + ACTIVE_CONFLICT = "active_conflict" + STABLE_REAR = "stable_rear" + NEUTRAL = "neutral" + + +class DownloadOutcome(str, Enum): + """Aggregate outcome of one :meth:`TileDownloader.download_tiles_for_area` call. + + Mirrors contract Shape § ``DownloadBatchReport.outcome``: + + * ``SUCCESS`` — every requested tile was either downloaded, gated + by the resolution / freshness rule, or downgraded; no terminal + error occurred. + * ``PARTIAL`` — at least one tile failed terminally with a + transient error that did NOT escalate (e.g. a single 5xx that + retried-then-skipped); the batch journaled what it could. + * ``FAILURE`` — a terminal error aborted the batch (TLS / 401 / + 403 / persistent 5xx / rate-limit budget); typed exception + raises and the partial state is journaled for the next + idempotent re-run. + * ``IDEMPOTENT_NO_OP`` — the journal recorded a complete prior + run for the same ``(flight_id, request_hash)`` pair; zero GETs + issued, zero writes attempted. + """ + + SUCCESS = "success" + PARTIAL = "partial" + FAILURE = "failure" + IDEMPOTENT_NO_OP = "idempotent_no_op" + + +@dataclass(frozen=True) +class TileSummary: + """Per-tile descriptor returned by :meth:`TileDownloader.enumerate_remote_coverage`. + + ``produced_at`` is the parent-suite's "this tile was rendered at" + timestamp; ``resolution_m_per_px`` is the metres-per-pixel value + the C11 boundary tests against ``RESTRICT-SAT-4`` (≥ 0.5 m/px). + ``estimated_bytes`` is the parent-suite's content-length hint + used by the AZ-308 budget pre-check before any GET fires. + """ + + tile_id_str: str + zoom_level: int + lat: float + lon: float + produced_at: datetime + resolution_m_per_px: float + estimated_bytes: int + tile_size_meters: float + tile_size_pixels: int + + +@dataclass(frozen=True) +class DownloadRequest: + """Inputs to :meth:`TileDownloader.download_tiles_for_area`. + + ``bbox_min_lat`` / ``bbox_min_lon`` / ``bbox_max_lat`` / + ``bbox_max_lon`` are inclusive-exclusive WGS84 bounds (matches + c6's :class:`Bbox`). ``zoom_levels`` is the set of Web-Mercator + zoom levels to download; each zoom is treated as an independent + rectangular grid against the bbox. ``cache_root`` is the on-disk + root for both the c6 store AND the C11 download journal (under + ``cache_root/.c11/journal/__.json``). + ``flight_id`` identifies the operator's pre-flight build context; + re-running the same ``(flight_id, request_hash)`` is the + idempotence check. + """ + + flight_id: UUID + bbox_min_lat: float + bbox_min_lon: float + bbox_max_lat: float + bbox_max_lon: float + zoom_levels: tuple[int, ...] + sector_class: SectorClassification + cache_root: Path + + def __post_init__(self) -> None: + if self.bbox_min_lat >= self.bbox_max_lat: + raise ValueError( + "DownloadRequest.bbox_min_lat must be < bbox_max_lat; " + f"got [{self.bbox_min_lat}, {self.bbox_max_lat})" + ) + if self.bbox_min_lon >= self.bbox_max_lon: + raise ValueError( + "DownloadRequest.bbox_min_lon must be < bbox_max_lon; " + f"got [{self.bbox_min_lon}, {self.bbox_max_lon})" + ) + if not self.zoom_levels: + raise ValueError("DownloadRequest.zoom_levels must be non-empty") + for z in self.zoom_levels: + if not 0 <= int(z) <= 21: + raise ValueError( + f"DownloadRequest.zoom_levels: every zoom must be in " + f"[0, 21]; got {z}" + ) + + +@dataclass(frozen=True) +class DownloadBatchReport: + """Aggregate report returned by :meth:`TileDownloader.download_tiles_for_area`. + + Per-tile counts let the operator-tooling CLI render the post-run + summary without re-reading the journal: + + * ``tiles_requested`` — total tiles enumerated by + :meth:`enumerate_remote_coverage` for this bbox / zoom set. + * ``tiles_downloaded`` — bytes successfully written to the c6 + store (includes ``DOWNGRADED`` because those ARE persisted). + * ``tiles_rejected_resolution`` — gated by the C11 ≥ 0.5 m/px + resolution check before any GET. + * ``tiles_rejected_freshness`` — c6's freshness gate raised. + * ``tiles_downgraded`` — c6 returned the ``DOWNGRADED`` label; + tile IS in the store, but flagged as stable_rear stale. + * ``retry_count`` — total transient retries across the batch. + * ``request_hash`` — first 16 hex of the journal key (so the + caller can correlate to the on-disk journal file without + re-deriving the hash). + """ + + outcome: DownloadOutcome + tiles_requested: int + tiles_downloaded: int + tiles_rejected_resolution: int + tiles_rejected_freshness: int + tiles_downgraded: int + retry_count: int + request_hash: str diff --git a/src/gps_denied_onboard/components/c11_tile_manager/config.py b/src/gps_denied_onboard/components/c11_tile_manager/config.py index 83db36b..74d3ce7 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/config.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/config.py @@ -1,18 +1,20 @@ -"""C11 TileManager config block (AZ-319). +"""C11 TileManager config block (AZ-316, AZ-319). Registered into ``config.components['c11_tile_manager']`` by the -package ``__init__.py``. The composition-root factory -:func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader` -reads this block to drive the upload path's HTTP behaviour and to -identify the producing companion against the parent suite's voting -layer. +package ``__init__.py``. Two composition-root factories read this +block: -The four fields below match the AZ-319 task spec § ``Outcome`` — -``config.c11.satellite_provider_ingest_url``, -``config.c11.upload_batch_size``, ``config.c11.upload_http_timeout_s``, -``config.c11.companion_id``. The ``upload_max_retry_after_s`` cap is -the Risk-3 ceiling on cumulative ``Retry-After`` budget for 429 -responses (see :class:`RateLimitedError`). +* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader` + reads the ``upload_*`` fields and ``companion_id`` to drive AZ-319. +* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_downloader` + reads the ``satellite_provider_url``, ``service_api_key``, and + ``download_*`` fields to drive AZ-316. + +All defaults are conservative no-op values so unit tests / replay +runs that do not exercise C11 keep working without YAML; the factory +raises :class:`ConfigError` when an empty production-required field +(``service_api_key``, ``companion_id``, etc.) is observed in operator +wiring. """ from __future__ import annotations @@ -28,25 +30,42 @@ _DEFAULT_BATCH_SIZE: int = 25 _DEFAULT_HTTP_TIMEOUT_S: float = 30.0 _DEFAULT_MAX_RETRY_AFTER_S: int = 600 _MAX_BATCH_SIZE: int = 200 +_DEFAULT_DOWNLOAD_RESOLUTION_FLOOR: float = 0.5 +_DEFAULT_DOWNLOAD_MAX_5XX_RETRIES: int = 4 +_MIN_DOWNLOAD_RETRIES: int = 1 +_MAX_DOWNLOAD_RETRIES: int = 16 @dataclass(frozen=True) class C11Config: - """Per-component config for C11 tile manager (upload path). + """Per-component config for C11 tile manager (upload + download paths). - ``satellite_provider_ingest_url`` is the parent-suite ingest base - URL (e.g. ``https://satellite-provider.example.com``); the - uploader appends ``/api/satellite/tiles/ingest`` to it. Defaulted - to empty so unit tests / replay runs that do not exercise the - upload path stay no-op; production configuration MUST set this - via YAML / env override or :class:`HttpTileUploader` raises - :class:`SatelliteProviderError` on the first attempt. + Upload-side fields (AZ-319): - ``companion_id`` is the stable per-companion identifier the - parent suite's voting layer uses to attribute uploads to one - physical airframe. Defaulted to empty so test runs without a - paired companion stay valid; the factory raises ``ConfigError`` - when the empty default is used in operator / production wiring. + * ``satellite_provider_ingest_url`` — base URL for the upload + endpoint; ``HttpTileUploader`` appends + ``/api/satellite/tiles/ingest``. Empty → upload factory raises + :class:`ConfigError`. + * ``upload_batch_size`` — tiles per multipart POST. + * ``upload_http_timeout_s`` — per-request timeout (seconds). + * ``upload_max_retry_after_s`` — cumulative 429 ``Retry-After`` + cap before :class:`RateLimitedError`. + * ``companion_id`` — stable per-companion id for D-PROJ-2 voting. + + Download-side fields (AZ-316): + + * ``satellite_provider_url`` — base URL for the GET surface; + ``HttpTileDownloader`` appends per-tile / list paths. + * ``service_api_key`` — bearer token for authenticated GETs; + logged ONLY redacted (``Bearer ***``). Empty → download factory + raises :class:`ConfigError`. + * ``download_http_timeout_s`` — per-request timeout (seconds). + * ``download_max_5xx_retries`` — exponential-backoff cap before + :class:`SatelliteProviderError`. + * ``download_max_retry_after_s`` — cumulative 429 ``Retry-After`` + cap before :class:`RateLimitedError`. + * ``download_resolution_floor_m_per_px`` — RESTRICT-SAT-4 lower + bound for the C11 boundary check; defaults to 0.5 m/px. """ satellite_provider_ingest_url: str = "" @@ -55,6 +74,13 @@ class C11Config: upload_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S companion_id: str = "" + satellite_provider_url: str = "" + service_api_key: str = "" + download_http_timeout_s: float = _DEFAULT_HTTP_TIMEOUT_S + download_max_5xx_retries: int = _DEFAULT_DOWNLOAD_MAX_5XX_RETRIES + download_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S + download_resolution_floor_m_per_px: float = _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR + def __post_init__(self) -> None: if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE: raise ConfigError( @@ -71,3 +97,24 @@ class C11Config: "C11Config.upload_max_retry_after_s must be > 0; " f"got {self.upload_max_retry_after_s}" ) + if self.download_http_timeout_s <= 0: + raise ConfigError( + "C11Config.download_http_timeout_s must be > 0; " + f"got {self.download_http_timeout_s}" + ) + if not _MIN_DOWNLOAD_RETRIES <= self.download_max_5xx_retries <= _MAX_DOWNLOAD_RETRIES: + raise ConfigError( + "C11Config.download_max_5xx_retries must be in " + f"[{_MIN_DOWNLOAD_RETRIES}, {_MAX_DOWNLOAD_RETRIES}]; " + f"got {self.download_max_5xx_retries}" + ) + if self.download_max_retry_after_s <= 0: + raise ConfigError( + "C11Config.download_max_retry_after_s must be > 0; " + f"got {self.download_max_retry_after_s}" + ) + if self.download_resolution_floor_m_per_px <= 0: + raise ConfigError( + "C11Config.download_resolution_floor_m_per_px must be > 0; " + f"got {self.download_resolution_floor_m_per_px}" + ) diff --git a/src/gps_denied_onboard/components/c11_tile_manager/errors.py b/src/gps_denied_onboard/components/c11_tile_manager/errors.py index bf8a06e..1e18e52 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/errors.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/errors.py @@ -1,10 +1,9 @@ -"""C11 TileManager error family (AZ-317, AZ-318, AZ-319). +"""C11 TileManager error family (AZ-316, AZ-317, AZ-318, AZ-319). -Rooted at :class:`TileManagerError`. The parent is declared here (rather -than alongside the AZ-316 ``TileDownloader``) so the upload-side tasks -landing first do not need to wait on a downloader-only file. AZ-316 -(``HttpTileDownloader``) will add its download-side errors as further -subclasses without re-declaring the parent. +Rooted at :class:`TileManagerError`. Both the upload (AZ-319) and +download (AZ-316) paths share the family parent so cross-path callers +can ``except TileManagerError`` to catch any C11-side terminal failure +without enumerating subclasses. * :class:`FlightStateNotOnGroundError` (AZ-317) — defence-in-depth refusal when the flight controller reports anything other than @@ -15,10 +14,16 @@ subclasses without re-declaring the parent. by the AZ-319 :class:`HttpTileUploader` after parsing a ``REJECTED`` per-tile response whose ``rejection_reason`` mentions the signature. -* :class:`SatelliteProviderError` (AZ-319) — TLS / 401 / 403 fail-fast - AND persistent-5xx fail-after-retries surface for the upload path. -* :class:`RateLimitedError` (AZ-319) — 429 with persistent - ``Retry-After`` budget exhaustion. +* :class:`SatelliteProviderError` (AZ-316/AZ-319) — TLS / 401 / 403 + fail-fast AND persistent-5xx fail-after-retries surface for both + the download and upload paths. +* :class:`RateLimitedError` (AZ-316/AZ-319) — 429 with persistent + ``Retry-After`` budget exhaustion (download + upload share the type). +* :class:`ResolutionRejectionError` (AZ-316) — surfaced when the + downloader's RESTRICT-SAT-4 boundary check rejects a tile with + ``resolution_m_per_px < 0.5``. +* :class:`CacheBudgetExceededError` (AZ-316) — surfaced when c6's + AZ-308 budget enforcer cannot reserve head-room for the download. """ from __future__ import annotations @@ -32,8 +37,10 @@ if TYPE_CHECKING: ) __all__ = [ + "CacheBudgetExceededError", "FlightStateNotOnGroundError", "RateLimitedError", + "ResolutionRejectionError", "SatelliteProviderError", "SessionNotActiveError", "SignatureRejectedError", @@ -98,13 +105,37 @@ class SatelliteProviderError(TileManagerError): class RateLimitedError(TileManagerError): - """``satellite-provider`` ingest endpoint rate-limited the upload. + """``satellite-provider`` rate-limited the request (upload OR download). Raised when the parent suite returns 429 and the cumulative ``Retry-After`` budget exceeds - :attr:`C11Config.upload_max_retry_after_s`. The - AZ-319 :class:`HttpTileUploader` honours the FIRST 429's - ``Retry-After`` (sleep + retry) but escalates to this error after + :attr:`C11Config.upload_max_retry_after_s` (upload path) or + :attr:`C11Config.download_max_retry_after_s` (download path). + Both AZ-319 :class:`HttpTileUploader` and AZ-316 + :class:`HttpTileDownloader` honour the first 429's + ``Retry-After`` (sleep + retry) and escalate to this error after the configured budget so the operator can surface the rate-limit state explicitly. """ + + +class ResolutionRejectionError(TileManagerError): + """A downloaded tile failed the C11 ``RESTRICT-SAT-4`` resolution gate. + + Raised by per-tile validation when ``resolution_m_per_px < 0.5``. + The download path catches this internally and counts the tile in + :attr:`DownloadBatchReport.tiles_rejected_resolution` rather than + aborting the batch — the type stays exported for the operator + tooling to surface in CLI output. + """ + + +class CacheBudgetExceededError(TileManagerError): + """The c6 AZ-308 budget enforcer could not reserve head-room. + + Raised after the C11 download path's pre-check converts c6's + ``CacheBudgetExhaustedError`` into the C11-side type, so callers + only need to ``except TileManagerError`` (or this subclass) to + catch a cache-full failure. The original c6 error is preserved + on ``__cause__``. + """ diff --git a/src/gps_denied_onboard/components/c11_tile_manager/interface.py b/src/gps_denied_onboard/components/c11_tile_manager/interface.py index f9fbe04..3c6a045 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/interface.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/interface.py @@ -3,7 +3,10 @@ Operator-side ONLY — excluded from airborne via CMake (`BUILD_C11_TILE_MANAGER=OFF`). See `_docs/02_document/components/12_c11_tilemanager/`. -* :class:`TileDownloader` — pre-flight download path (AZ-316, pending). +* :class:`TileDownloader` — pre-flight download path (AZ-316). The + authoritative shape lives in + ``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md`` + v1.0.0 and is mirrored 1:1 here. * :class:`TileUploader` — post-landing upload path (AZ-319) — the authoritative shape lives in ``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md`` @@ -18,14 +21,15 @@ See `_docs/02_document/components/12_c11_tilemanager/`. from __future__ import annotations -from collections.abc import Iterable, Sequence -from pathlib import Path +from collections.abc import Sequence from typing import Any, Protocol, runtime_checkable from uuid import UUID -from gps_denied_onboard._types.tile import TileRecord from gps_denied_onboard.components.c11_tile_manager._types import ( + DownloadBatchReport, + DownloadRequest, FlightStateSignal, + TileSummary, UploadBatchReport, UploadRequest, ) @@ -37,12 +41,27 @@ __all__ = [ ] +@runtime_checkable class TileDownloader(Protocol): - """Pre-flight tile download from `satellite-provider`.""" + """Pre-flight tile download from ``satellite-provider`` (operator-side). - def download( - self, lat_lon_box: tuple[float, float, float, float], zoom: int, output_root: Path - ) -> Iterable[TileRecord]: ... + See ``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md`` + v1.0.0 for invariants I-1 .. I-5 and the per-method error matrix. + The :meth:`enumerate_remote_coverage` return type is + :class:`TileSummary` (DTO declared in C11's ``_types`` so the C12 + consumer never imports c6 to size a download). + """ + + def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport: ... + + def enumerate_remote_coverage( + self, + bbox_min_lat: float, + bbox_min_lon: float, + bbox_max_lat: float, + bbox_max_lon: float, + zoom_levels: Sequence[int], + ) -> Sequence[TileSummary]: ... @runtime_checkable diff --git a/src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py b/src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py new file mode 100644 index 0000000..a408b1d --- /dev/null +++ b/src/gps_denied_onboard/components/c11_tile_manager/tile_downloader.py @@ -0,0 +1,907 @@ +"""C11 ``HttpTileDownloader`` (AZ-316) — concrete :class:`TileDownloader`. + +Operator-side pre-flight download path. Authenticated GETs against +``satellite-provider``, RESTRICT-SAT-4 enforcement at the C11 boundary, +c6 writes via the AZ-303 store + metadata Protocols (which run AZ-307's +freshness gate at insert), AZ-308 cache-headroom pre-check before any +GET fires, and a per-``(flight_id, request_hash)`` journal for +idempotent re-runs. + +Architecture +------------ +The c6 storage surfaces are reached through structural :class:`Protocol` +cuts (:class:`_TileWriterLike`, :class:`_BudgetEnforcerLike`, +:class:`_FreshnessRejectionLike`) defined in this module — never via a +direct ``from gps_denied_onboard.components.c6_tile_cache import …``. +The composition root +(``runtime_root.c11_factory.build_tile_downloader``) is the single +layer that may bind concrete c6 implementations into the constructor. +That adapter handles c6's :class:`TileMetadata` / :class:`TileSource` / +:class:`FreshnessLabel` / :class:`SectorClassification` enums so the +downloader stays free of cross-component imports (AZ-507 / AZ-270). +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import tempfile +from dataclasses import dataclass, field +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from pathlib import Path +from typing import Any, Protocol, runtime_checkable +from uuid import UUID + +import httpx + +from gps_denied_onboard.components.c11_tile_manager._types import ( + DownloadBatchReport, + DownloadOutcome, + DownloadRequest, + SectorClassification, + TileSummary, +) +from gps_denied_onboard.components.c11_tile_manager.config import C11Config +from gps_denied_onboard.components.c11_tile_manager.errors import ( + CacheBudgetExceededError, + RateLimitedError, + SatelliteProviderError, +) + +__all__ = [ + "DOWNLOAD_JOURNAL_DIRNAME", + "HttpTileDownloader", + "request_hash", +] + + +_LIST_PATH = "/api/satellite/tiles" +_GET_PATH = "/api/satellite/tiles" +_LIST_QUERY_LIST_ONLY = "list-only" +DOWNLOAD_JOURNAL_DIRNAME = ".c11/journal" +_LOCKFILE_PATH = ".c11/lock" +_DEFAULT_BACKOFF_SCHEDULE_S: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0) + +_COMPONENT = "c11_tile_manager.tile_downloader" +_LOG_KIND_SESSION_START = "c11.download.session.start" +_LOG_KIND_SESSION_END = "c11.download.session.end" +_LOG_KIND_RESOLUTION_REJECT = "c11.download.resolution_rejected" +_LOG_KIND_FRESHNESS_REJECT = "c11.download.freshness_rejected_summary" +_LOG_KIND_FRESHNESS_DOWNGRADED = "c11.download.freshness_downgraded" +_LOG_KIND_RETRY = "c11.download.batch.retry" +_LOG_KIND_PROVIDER_FAIL = "c11.download.provider.failed" +_LOG_KIND_BUDGET_FAIL = "c11.download.budget.exceeded" +_LOG_KIND_IDEMPOTENT = "c11.download.idempotent_no_op" + +_AUTH_HEADER_REDACTED = "Bearer ***" + + +# ---------------------------------------------------------------------- +# Consumer-side cuts over c6 (AZ-507): never imported across components. +# ---------------------------------------------------------------------- + + +@runtime_checkable +class _TileWriterLike(Protocol): + """Composition-root adapter that hides c6's ``TileMetadata`` assembly. + + The downloader hands the adapter the primitives the parent suite + returned (zoom, lat, lon, tile_size, capture_ts, content sha256, + freshness label string, source string) plus the JPEG bytes. The + adapter assembles c6's :class:`TileMetadata` and calls + ``TileStore.write_tile`` + ``TileMetadataStore.insert_metadata`` + inside its own boundary, returning the c6 freshness label as a + plain string (``"fresh"`` / ``"downgraded"`` / etc.) so the + downloader can map it without importing c6's enum. + + Raises :class:`_FreshnessRejectionLike`-compatible exception on + c6 freshness gate refusal; the downloader catches by structural + type to keep the import boundary clean. + """ + + def write_tile_for_download( + self, + *, + tile_blob: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: datetime, + content_sha256_hex: str, + sector_class: str, + ) -> str: ... + + def tile_already_present( + self, *, zoom_level: int, lat: float, lon: float + ) -> bool: ... + + +@runtime_checkable +class _BudgetEnforcerLike(Protocol): + """Structural cut of c6's :meth:`CacheBudgetEnforcer.reserve_headroom`. + + Returns any object on success; raises any exception on failure + (the downloader maps either path into its own + :class:`CacheBudgetExceededError` envelope so callers do not need + to catch a c6 type). + """ + + def reserve_headroom(self, needed_bytes: int) -> Any: ... + + +# ---------------------------------------------------------------------- +# Request hash + journal helpers +# ---------------------------------------------------------------------- + + +def request_hash( + flight_id: UUID, + bbox_min_lat: float, + bbox_min_lon: float, + bbox_max_lat: float, + bbox_max_lon: float, + zoom_levels: tuple[int, ...], + sector_class: SectorClassification, + service_api_key: str, +) -> str: + """Stable 16-hex digest used as the journal filename suffix. + + Hashes a deterministic concatenation of the request fields plus a + SHA-256 of the service API key (so two operators with different + keys never share a journal). The digest is short enough for human + inspection and long enough for collision resistance within one + cache root. + """ + + api_key_digest = hashlib.sha256(service_api_key.encode("utf-8")).hexdigest() + payload = "|".join( + [ + str(flight_id), + f"{bbox_min_lat:.10f}", + f"{bbox_min_lon:.10f}", + f"{bbox_max_lat:.10f}", + f"{bbox_max_lon:.10f}", + ",".join(str(z) for z in sorted(zoom_levels)), + sector_class.value, + api_key_digest, + ] + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] + + +def _journal_path(cache_root: Path, flight_id: UUID, req_hash: str) -> Path: + return cache_root / DOWNLOAD_JOURNAL_DIRNAME / f"{flight_id}__{req_hash}.json" + + +@dataclass +class _JournalState: + """Per-batch journal payload. Persisted as compact JSON via atomicwrite.""" + + flight_id: str + request_hash: str + started_at_iso: str + completed_at_iso: str | None = None + tile_ids_completed: list[str] = field(default_factory=list) + tile_counts: dict[str, int] = field(default_factory=dict) + + def to_json_bytes(self) -> bytes: + return json.dumps( + { + "flight_id": self.flight_id, + "request_hash": self.request_hash, + "started_at_iso": self.started_at_iso, + "completed_at_iso": self.completed_at_iso, + "tile_ids_completed": self.tile_ids_completed, + "tile_counts": self.tile_counts, + }, + sort_keys=True, + separators=(",", ":"), + ).encode("utf-8") + + @classmethod + def from_json_bytes(cls, raw: bytes) -> _JournalState: + decoded = json.loads(raw.decode("utf-8")) + return cls( + flight_id=str(decoded["flight_id"]), + request_hash=str(decoded["request_hash"]), + started_at_iso=str(decoded["started_at_iso"]), + completed_at_iso=decoded.get("completed_at_iso"), + tile_ids_completed=list(decoded.get("tile_ids_completed") or []), + tile_counts=dict(decoded.get("tile_counts") or {}), + ) + + +def _atomic_write_json(path: Path, payload: bytes) -> None: + """Write-then-rename + fsync per the description.md atomicwrites pattern.""" + + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_str = tempfile.mkstemp( + prefix=path.name + ".", + suffix=".tmp", + dir=str(path.parent), + ) + tmp = Path(tmp_str) + try: + with os.fdopen(fd, "wb") as fp: + fp.write(payload) + fp.flush() + os.fsync(fp.fileno()) + os.replace(tmp, path) + # Best-effort directory fsync so the rename is durable on + # power-loss; not all filesystems implement directory fsync, + # so failure here is logged-and-ignored at the caller. + dir_fd = os.open(str(path.parent), os.O_RDONLY) + try: + os.fsync(dir_fd) + finally: + os.close(dir_fd) + except Exception: + if tmp.exists(): + tmp.unlink(missing_ok=True) + raise + + +def _read_journal(path: Path) -> _JournalState | None: + if not path.exists(): + return None + try: + return _JournalState.from_json_bytes(path.read_bytes()) + except (json.JSONDecodeError, KeyError, ValueError): + # Risk-3 mitigation: a torn / corrupt journal record is treated + # as "no prior journal" so the batch re-runs from scratch. + return None + + +# ---------------------------------------------------------------------- +# Retry-After parsing (mirrors AZ-319 helper) +# ---------------------------------------------------------------------- + + +def _parse_retry_after(header_value: str | None, max_s: int) -> int: + if header_value is None: + return 0 + raw = header_value.strip() + if not raw: + return 0 + if raw.isdigit(): + return min(int(raw), max_s) + try: + retry_at = parsedate_to_datetime(raw) + except (TypeError, ValueError): + return 0 + now = datetime.now(timezone.utc) + if retry_at.tzinfo is None: + retry_at = retry_at.replace(tzinfo=timezone.utc) + delta = (retry_at - now).total_seconds() + return max(0, min(int(delta), max_s)) + + +def _default_sleep(seconds: float) -> None: + """Production sleep hook routes through :class:`WallClock.sleep_until_ns`. + + Tests inject a no-op or a recorder. Routing through ``WallClock`` + keeps the AZ-398 AC-4 AST scan over ``components/`` from seeing a + bare ``time.sleep``. + """ + + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + clock.sleep_until_ns(clock.monotonic_ns() + int(seconds * 1_000_000_000)) + + +# ---------------------------------------------------------------------- +# Internal session-state container +# ---------------------------------------------------------------------- + + +@dataclass +class _DownloadSession: + """Mutable bookkeeping for one ``download_tiles_for_area`` call.""" + + request: DownloadRequest + journal: _JournalState + journal_path: Path + completed_set: set[str] + tiles_requested: int = 0 + tiles_downloaded: int = 0 + tiles_rejected_resolution: int = 0 + tiles_rejected_freshness: int = 0 + tiles_downgraded: int = 0 + retry_count: int = 0 + rate_limit_budget_used_s: int = 0 + + +# ---------------------------------------------------------------------- +# Concrete downloader +# ---------------------------------------------------------------------- + + +class HttpTileDownloader: + """Concrete :class:`TileDownloader` against ``satellite-provider``'s GET surface. + + All cross-component dependencies (``tile_writer``, + ``budget_enforcer``) are constructor-injected via Protocol cuts; + the composition root binds them to the c6 implementations. The + ``http_client`` is caller-owned: production wiring uses one + long-lived :class:`httpx.Client` per process; tests inject + ``httpx.Client(transport=httpx.MockTransport(...))`` for + deterministic responses. + """ + + def __init__( + self, + *, + http_client: httpx.Client, + tile_writer: _TileWriterLike, + budget_enforcer: _BudgetEnforcerLike, + logger: logging.Logger, + config: C11Config, + sleep: Any = None, + backoff_schedule_s: tuple[float, ...] | None = None, + ) -> None: + self._http_client = http_client + self._tile_writer = tile_writer + self._budget_enforcer = budget_enforcer + self._logger = logger + self._config = config + self._sleep = sleep if sleep is not None else _default_sleep + self._backoff_schedule_s = backoff_schedule_s or _DEFAULT_BACKOFF_SCHEDULE_S + + # ------------------------------------------------------------------ + # Public Protocol surface + # ------------------------------------------------------------------ + + def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport: + """Idempotence check → enumerate → budget → per-tile GET loop.""" + + req_hash = request_hash( + request.flight_id, + request.bbox_min_lat, + request.bbox_min_lon, + request.bbox_max_lat, + request.bbox_max_lon, + tuple(request.zoom_levels), + request.sector_class, + self._config.service_api_key, + ) + journal_path = _journal_path(request.cache_root, request.flight_id, req_hash) + existing = _read_journal(journal_path) + + if existing is not None and existing.completed_at_iso is not None: + self._logger.info( + "Idempotent re-run detected; zero work scheduled", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_IDEMPOTENT, + "kv": { + "flight_id": str(request.flight_id), + "request_hash": req_hash, + "tile_ids_completed": len(existing.tile_ids_completed), + }, + }, + ) + counts = existing.tile_counts + return DownloadBatchReport( + outcome=DownloadOutcome.IDEMPOTENT_NO_OP, + tiles_requested=int(counts.get("tiles_requested", len(existing.tile_ids_completed))), + tiles_downloaded=int(counts.get("tiles_downloaded", len(existing.tile_ids_completed))), + tiles_rejected_resolution=int(counts.get("tiles_rejected_resolution", 0)), + tiles_rejected_freshness=int(counts.get("tiles_rejected_freshness", 0)), + tiles_downgraded=int(counts.get("tiles_downgraded", 0)), + retry_count=0, + request_hash=req_hash, + ) + + journal = existing or _JournalState( + flight_id=str(request.flight_id), + request_hash=req_hash, + started_at_iso=_iso_now(), + ) + completed_set = set(journal.tile_ids_completed) + + session = _DownloadSession( + request=request, + journal=journal, + journal_path=journal_path, + completed_set=completed_set, + ) + + self._logger.info( + "Pre-flight tile download session started", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_SESSION_START, + "kv": { + "flight_id": str(request.flight_id), + "request_hash": req_hash, + "bbox": [ + request.bbox_min_lat, + request.bbox_min_lon, + request.bbox_max_lat, + request.bbox_max_lon, + ], + "zoom_levels": list(request.zoom_levels), + "sector_class": request.sector_class.value, + "resume_from_journal": completed_set != set(), + "tiles_already_completed": len(completed_set), + }, + }, + ) + + outcome: DownloadOutcome = DownloadOutcome.SUCCESS + try: + summaries = self._enumerate_remote(request) + session.tiles_requested = len(summaries) + self._reserve_budget(summaries, completed_set, session) + for summary in summaries: + if summary.tile_id_str in completed_set: + continue + self._download_one_tile(summary, request, session) + journal.completed_at_iso = _iso_now() + journal.tile_counts = self._counts_dict(session) + _atomic_write_json(journal_path, journal.to_json_bytes()) + return DownloadBatchReport( + outcome=outcome, + tiles_requested=session.tiles_requested, + tiles_downloaded=session.tiles_downloaded, + tiles_rejected_resolution=session.tiles_rejected_resolution, + tiles_rejected_freshness=session.tiles_rejected_freshness, + tiles_downgraded=session.tiles_downgraded, + retry_count=session.retry_count, + request_hash=req_hash, + ) + except ( + SatelliteProviderError, + RateLimitedError, + CacheBudgetExceededError, + ): + outcome = DownloadOutcome.FAILURE + journal.tile_counts = self._counts_dict(session) + _atomic_write_json(journal_path, journal.to_json_bytes()) + raise + except Exception: + outcome = DownloadOutcome.FAILURE + journal.tile_counts = self._counts_dict(session) + _atomic_write_json(journal_path, journal.to_json_bytes()) + raise + finally: + self._logger.info( + "Pre-flight tile download session ended", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_SESSION_END, + "kv": { + "flight_id": str(request.flight_id), + "request_hash": req_hash, + "outcome": outcome.value, + "tiles_requested": session.tiles_requested, + "tiles_downloaded": session.tiles_downloaded, + "tiles_rejected_resolution": session.tiles_rejected_resolution, + "tiles_rejected_freshness": session.tiles_rejected_freshness, + "tiles_downgraded": session.tiles_downgraded, + "retry_count": session.retry_count, + }, + }, + ) + if session.tiles_rejected_freshness: + self._logger.warning( + "Freshness gate rejected tiles in this batch", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FRESHNESS_REJECT, + "kv": { + "flight_id": str(request.flight_id), + "request_hash": req_hash, + "tiles_rejected_freshness": session.tiles_rejected_freshness, + }, + }, + ) + + def enumerate_remote_coverage( + self, + bbox_min_lat: float, + bbox_min_lon: float, + bbox_max_lat: float, + bbox_max_lon: float, + zoom_levels: Any, + ) -> list[TileSummary]: + """Read-only enumeration; issues one ``list-only=true`` GET.""" + + return list( + self._do_enumerate( + bbox_min_lat, + bbox_min_lon, + bbox_max_lat, + bbox_max_lon, + tuple(int(z) for z in zoom_levels), + ) + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _enumerate_remote(self, request: DownloadRequest) -> list[TileSummary]: + return list( + self._do_enumerate( + request.bbox_min_lat, + request.bbox_min_lon, + request.bbox_max_lat, + request.bbox_max_lon, + tuple(request.zoom_levels), + ) + ) + + def _do_enumerate( + self, + bbox_min_lat: float, + bbox_min_lon: float, + bbox_max_lat: float, + bbox_max_lon: float, + zoom_levels: tuple[int, ...], + ) -> list[TileSummary]: + params = { + "bbox": f"{bbox_min_lat},{bbox_min_lon},{bbox_max_lat},{bbox_max_lon}", + "zoom": ",".join(str(z) for z in zoom_levels), + _LIST_QUERY_LIST_ONLY: "true", + } + response = self._send_get( + self._config.satellite_provider_url.rstrip("/") + _LIST_PATH, + params=params, + session=None, + ) + try: + body = response.json() + except ValueError as exc: + self._log_provider_failure( + "list_not_json", response.status_code, str(exc) + ) + raise SatelliteProviderError( + "satellite-provider returned non-JSON list-only body" + ) from exc + try: + entries = body["tiles"] + except (KeyError, TypeError) as exc: + self._log_provider_failure( + "list_schema", response.status_code, str(exc) + ) + raise SatelliteProviderError( + "satellite-provider list-only response missing 'tiles'" + ) from exc + + summaries: list[TileSummary] = [] + for entry in entries: + try: + summaries.append( + TileSummary( + tile_id_str=str(entry["tile_id"]), + zoom_level=int(entry["zoom_level"]), + lat=float(entry["lat"]), + lon=float(entry["lon"]), + produced_at=_parse_iso(str(entry["produced_at"])), + resolution_m_per_px=float(entry["resolution_m_per_px"]), + estimated_bytes=int(entry["estimated_bytes"]), + tile_size_meters=float(entry.get("tile_size_meters", 100.0)), + tile_size_pixels=int(entry.get("tile_size_pixels", 256)), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + self._log_provider_failure( + "list_tile_schema", response.status_code, str(exc) + ) + raise SatelliteProviderError( + "satellite-provider list-only entry missing required fields" + ) from exc + return summaries + + def _reserve_budget( + self, + summaries: list[TileSummary], + completed_set: set[str], + session: _DownloadSession, + ) -> None: + remaining_bytes = sum( + int(s.estimated_bytes) + for s in summaries + if s.tile_id_str not in completed_set + ) + if remaining_bytes <= 0: + return + try: + self._budget_enforcer.reserve_headroom(remaining_bytes) + except CacheBudgetExceededError: + self._log_budget_failure(remaining_bytes) + raise + except Exception as exc: + self._log_budget_failure(remaining_bytes, detail=str(exc)) + raise CacheBudgetExceededError( + f"c6 budget enforcer refused {remaining_bytes} bytes " + f"of head-room: {exc}" + ) from exc + + def _download_one_tile( + self, + summary: TileSummary, + request: DownloadRequest, + session: _DownloadSession, + ) -> None: + if summary.resolution_m_per_px < self._config.download_resolution_floor_m_per_px: + session.tiles_rejected_resolution += 1 + self._logger.warning( + "Resolution gate rejected tile", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_RESOLUTION_REJECT, + "kv": { + "flight_id": str(request.flight_id), + "tile_id": summary.tile_id_str, + "resolution_m_per_px": summary.resolution_m_per_px, + "floor_m_per_px": self._config.download_resolution_floor_m_per_px, + }, + }, + ) + return + + ingest_url = ( + self._config.satellite_provider_url.rstrip("/") + + _GET_PATH + + f"/{summary.tile_id_str}" + ) + response = self._send_get(ingest_url, params=None, session=session) + if not response.content: + self._log_provider_failure( + "empty_body", response.status_code, summary.tile_id_str + ) + raise SatelliteProviderError( + f"satellite-provider returned empty body for tile_id=" + f"{summary.tile_id_str}" + ) + tile_blob = response.content + content_sha256_hex = hashlib.sha256(tile_blob).hexdigest() + + produced_at = summary.produced_at + if produced_at.tzinfo is None: + produced_at = produced_at.replace(tzinfo=timezone.utc) + + try: + label = self._tile_writer.write_tile_for_download( + tile_blob=tile_blob, + zoom_level=summary.zoom_level, + lat=summary.lat, + lon=summary.lon, + tile_size_meters=summary.tile_size_meters, + tile_size_pixels=summary.tile_size_pixels, + capture_timestamp=produced_at, + content_sha256_hex=content_sha256_hex, + sector_class=request.sector_class.value, + ) + except Exception as exc: + if _is_freshness_rejection(exc): + session.tiles_rejected_freshness += 1 + return + raise + + if label == "downgraded": + session.tiles_downgraded += 1 + session.tiles_downloaded += 1 + self._logger.warning( + "Freshness label downgraded by c6", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_FRESHNESS_DOWNGRADED, + "kv": { + "flight_id": str(request.flight_id), + "tile_id": summary.tile_id_str, + }, + }, + ) + else: + session.tiles_downloaded += 1 + + session.completed_set.add(summary.tile_id_str) + session.journal.tile_ids_completed = sorted(session.completed_set) + session.journal.tile_counts = self._counts_dict(session) + _atomic_write_json(session.journal_path, session.journal.to_json_bytes()) + + def _send_get( + self, + url: str, + params: dict[str, str] | None, + session: _DownloadSession | None, + ) -> httpx.Response: + """GET with auth header + 429 / 5xx handling.""" + + headers = {"Authorization": f"Bearer {self._config.service_api_key}"} + attempt = 0 + last_error: str | None = None + while True: + attempt += 1 + try: + response = self._http_client.get( + url, + params=params, + headers=headers, + timeout=self._config.download_http_timeout_s, + ) + except ( + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadTimeout, + httpx.WriteError, + httpx.RemoteProtocolError, + ) as exc: + last_error = f"transport:{type(exc).__name__}:{exc}" + if attempt > self._config.download_max_5xx_retries: + self._log_provider_failure("connection_error", None, last_error) + raise SatelliteProviderError( + f"satellite-provider unreachable after " + f"{attempt - 1} retries: {last_error}" + ) from exc + self._sleep_with_log( + self._backoff_for(attempt - 1), last_error, session + ) + continue + + if response.status_code in (401, 403): + self._log_provider_failure( + "auth_failed", response.status_code, "fail-fast" + ) + raise SatelliteProviderError( + f"satellite-provider rejected auth (http_status=" + f"{response.status_code}); fail-fast" + ) + + if response.status_code == 429: + wait_s = _parse_retry_after( + response.headers.get("Retry-After"), + self._config.download_max_retry_after_s + - (session.rate_limit_budget_used_s if session else 0), + ) + if session is not None: + session.rate_limit_budget_used_s += wait_s + if wait_s <= 0 or ( + session is not None + and session.rate_limit_budget_used_s + >= self._config.download_max_retry_after_s + ): + self._log_provider_failure( + "rate_limited", 429, "Retry-After budget exhausted" + ) + raise RateLimitedError( + "satellite-provider rate-limited the download; " + f"cumulative Retry-After budget " + f"{(session.rate_limit_budget_used_s if session else 0)}s " + f"exceeds cap {self._config.download_max_retry_after_s}s" + ) + self._sleep_with_log(wait_s, f"http_429_retry_after={wait_s}", session) + continue + + if response.status_code >= 500: + last_error = f"http_status={response.status_code}" + if attempt > self._config.download_max_5xx_retries: + self._log_provider_failure( + "persistent_5xx", response.status_code, last_error + ) + raise SatelliteProviderError( + f"satellite-provider returned {response.status_code} " + f"after {attempt - 1} retries" + ) + self._sleep_with_log( + self._backoff_for(attempt - 1), last_error, session + ) + continue + + if response.status_code != 200: + self._log_provider_failure( + "unexpected_status", response.status_code, "non-200" + ) + raise SatelliteProviderError( + f"satellite-provider returned unexpected status " + f"{response.status_code} (expected 200)" + ) + return response + + def _backoff_for(self, attempt_idx: int) -> float: + if attempt_idx < 0: + attempt_idx = 0 + if attempt_idx >= len(self._backoff_schedule_s): + attempt_idx = len(self._backoff_schedule_s) - 1 + return self._backoff_schedule_s[attempt_idx] + + def _sleep_with_log( + self, wait_s: float, reason: str, session: _DownloadSession | None + ) -> None: + if session is not None: + session.retry_count += 1 + self._logger.warning( + "Download batch retrying", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_RETRY, + "kv": { + "wait_s": wait_s, + "reason": reason, + "retry_count": session.retry_count if session is not None else None, + }, + }, + ) + self._sleep(wait_s) + + def _log_provider_failure( + self, reason: str, http_status: int | None, detail: str + ) -> None: + self._logger.error( + "Download provider failed", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_PROVIDER_FAIL, + "kv": { + "reason": reason, + "http_status": http_status, + "detail": detail, + "auth_header": _AUTH_HEADER_REDACTED, + }, + }, + ) + + def _log_budget_failure( + self, requested_bytes: int, detail: str | None = None + ) -> None: + self._logger.error( + "Cache-budget pre-check failed", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BUDGET_FAIL, + "kv": { + "requested_bytes": requested_bytes, + "detail": detail, + }, + }, + ) + + def _counts_dict(self, session: _DownloadSession) -> dict[str, int]: + return { + "tiles_requested": session.tiles_requested, + "tiles_downloaded": session.tiles_downloaded, + "tiles_rejected_resolution": session.tiles_rejected_resolution, + "tiles_rejected_freshness": session.tiles_rejected_freshness, + "tiles_downgraded": session.tiles_downgraded, + } + + +# ---------------------------------------------------------------------- +# Module-level helpers +# ---------------------------------------------------------------------- + + +def _iso_now() -> str: + return datetime.now(timezone.utc).isoformat(timespec="microseconds") + + +def _parse_iso(raw: str) -> datetime: + s = raw.strip() + if s.endswith("Z"): + s = s[:-1] + "+00:00" + return datetime.fromisoformat(s) + + +def _is_freshness_rejection(exc: BaseException) -> bool: + """Structural test: c6 raises ``FreshnessRejectionError``. + + The composition-root adapter is free to re-raise the c6 type + directly; we recognise it by class name to avoid importing the + c6 errors module here. + """ + + if exc.__class__.__name__ == "FreshnessRejectionError": + return True + for base in type(exc).__mro__: + if base.__name__ == "FreshnessRejectionError": + return True + return False diff --git a/src/gps_denied_onboard/runtime_root/c11_factory.py b/src/gps_denied_onboard/runtime_root/c11_factory.py index 51c60fa..db4baa8 100644 --- a/src/gps_denied_onboard/runtime_root/c11_factory.py +++ b/src/gps_denied_onboard/runtime_root/c11_factory.py @@ -1,6 +1,6 @@ -"""C11 TileManager composition-root factories (AZ-317, AZ-318, AZ-319). +"""C11 TileManager composition-root factories (AZ-316, AZ-317, AZ-318, AZ-319). -Wires the upload-side services that have landed: +Wires the operator-side services: * :func:`build_flight_state_gate` (AZ-317) — adapts an injected ``FlightStateSource`` (typically an E-C8 FC adapter wrapper) into @@ -12,10 +12,16 @@ Wires the upload-side services that have landed: key manager, the c6 storage cuts, an :class:`httpx.Client`, and the :class:`C11Config` block into the production :class:`HttpTileUploader`. +* :func:`build_tile_downloader` (AZ-316) — composes the c6 store + + metadata-store + budget-enforcer (wrapped in a single + composition-root adapter that hides c6's :class:`TileMetadata` + assembly), an :class:`httpx.Client`, and the :class:`C11Config` + block into the production :class:`HttpTileDownloader`. Composition root is the ONLY layer permitted to import from ``components.c11_tile_manager`` (per ``module-layout.md`` Rule 9 + -the AZ-270 lint). +the AZ-270 lint). It is also the only layer permitted to bridge the +c6 ↔ c11 boundary (per AZ-507). """ from __future__ import annotations @@ -28,6 +34,7 @@ from gps_denied_onboard.components.c11_tile_manager import ( C11Config, FlightStateGate, FlightStateSource, + HttpTileDownloader, HttpTileUploader, PerFlightKeyManager, ) @@ -42,6 +49,7 @@ if TYPE_CHECKING: __all__ = [ "build_flight_state_gate", "build_per_flight_key_manager", + "build_tile_downloader", "build_tile_uploader", ] @@ -51,6 +59,7 @@ _C11_SIGNING_LOGGER = "c11_tile_manager.signing_key" _C11_SIGNING_PRODUCER_ID = "c11_tile_manager.signing_key" _C11_UPLOADER_LOGGER = "c11_tile_manager.tile_uploader" _C11_UPLOADER_PRODUCER_ID = "c11_tile_manager.tile_uploader" +_C11_DOWNLOADER_LOGGER = "c11_tile_manager.tile_downloader" def build_flight_state_gate(*, source: FlightStateSource) -> FlightStateGate: @@ -145,3 +154,164 @@ def build_tile_uploader( logger=logger, config=block, ) + + +def build_tile_downloader( + config: Config, + *, + http_client: httpx.Client, + tile_store: Any, + tile_metadata_store: Any, + budget_enforcer: Any, + companion_id: str | None = None, +) -> HttpTileDownloader: + """Construct a wired :class:`HttpTileDownloader` (AZ-316). + + Wraps c6's ``TileStore`` + ``TileMetadataStore`` + + ``CacheBudgetEnforcer`` into a single composition-root adapter + that absorbs c6's :class:`TileMetadata` / :class:`TileSource` / + :class:`FreshnessLabel` / :class:`SectorClassification` enums + so the downloader stays free of cross-component imports + (AZ-507 / AZ-270). The c6 surfaces are caller-owned; production + wiring shares the same singletons used by the uploader and the + on-airframe ingest path. + """ + + block = config.components.get("c11_tile_manager") + if block is None: + raise ConfigError( + "build_tile_downloader: config.components['c11_tile_manager'] " + "block is missing — register C11Config and supply YAML" + ) + if not isinstance(block, C11Config): + raise ConfigError( + "build_tile_downloader: config.components['c11_tile_manager'] " + f"must be a C11Config, got {type(block).__name__}" + ) + if not block.satellite_provider_url: + raise ConfigError( + "build_tile_downloader: C11Config.satellite_provider_url " + "must be configured for production / operator wiring" + ) + if not block.service_api_key: + raise ConfigError( + "build_tile_downloader: C11Config.service_api_key must be " + "set; the operator-tooling deploy MUST inject the bearer " + "token via env override" + ) + logger = get_logger(_C11_DOWNLOADER_LOGGER) + adapter = _C6DownloadAdapter( + tile_store=tile_store, + metadata_store=tile_metadata_store, + budget_enforcer=budget_enforcer, + companion_id=companion_id or block.companion_id, + ) + return HttpTileDownloader( + http_client=http_client, + tile_writer=adapter, + budget_enforcer=adapter, + logger=logger, + config=block, + ) + + +class _C6DownloadAdapter: + """Composition-root bridge between AZ-316 and c6 storage. + + Implements both :class:`_TileWriterLike` and + :class:`_BudgetEnforcerLike` Protocol cuts (declared in + ``c11_tile_manager.tile_downloader`` as structural Protocols). + Hides c6's :class:`TileMetadata` / :class:`TileSource` / + :class:`FreshnessLabel` / :class:`SectorClassification` so the + AZ-316 module never imports c6. + """ + + __slots__ = ( + "_tile_store", + "_metadata_store", + "_budget_enforcer", + "_companion_id", + ) + + def __init__( + self, + *, + tile_store: Any, + metadata_store: Any, + budget_enforcer: Any, + companion_id: str, + ) -> None: + self._tile_store = tile_store + self._metadata_store = metadata_store + self._budget_enforcer = budget_enforcer + self._companion_id = companion_id + + def write_tile_for_download( + self, + *, + tile_blob: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: Any, + content_sha256_hex: str, + sector_class: str, + ) -> str: + from gps_denied_onboard.components.c6_tile_cache._types import ( + FreshnessLabel, + TileId, + TileMetadata, + TileSource, + VotingStatus, + ) + + tid = TileId(zoom_level=int(zoom_level), lat=float(lat), lon=float(lon)) + metadata = TileMetadata( + tile_id=tid, + tile_size_meters=float(tile_size_meters), + tile_size_pixels=int(tile_size_pixels), + capture_timestamp=capture_timestamp, + source=TileSource.GOOGLEMAPS, + content_sha256_hex=content_sha256_hex, + freshness_label=FreshnessLabel.FRESH, + flight_id=None, + companion_id=None, + quality_metadata=None, + voting_status=VotingStatus.TRUSTED, + ) + self._tile_store.write_tile(tile_blob, metadata) + self._metadata_store.insert_metadata(metadata) + # AZ-307 owns the actual freshness label after insert; for the + # download path the simplest contract is "FRESH on first write, + # DOWNGRADED if the row already existed under stable_rear stale + # rules". The c6 store does not currently expose the post-insert + # label as a return value (AZ-303 contract); we return "fresh" + # as the conservative default. A future c6 ABI extension that + # surfaces the label can update this adapter without touching + # the AZ-316 module. + return "fresh" + + def tile_already_present( + self, *, zoom_level: int, lat: float, lon: float + ) -> bool: + from gps_denied_onboard.components.c6_tile_cache._types import TileId + + tid = TileId(zoom_level=int(zoom_level), lat=float(lat), lon=float(lon)) + return bool(self._tile_store.tile_exists(tid)) + + def reserve_headroom(self, needed_bytes: int) -> Any: + from gps_denied_onboard.components.c11_tile_manager.errors import ( + CacheBudgetExceededError, + ) + from gps_denied_onboard.components.c6_tile_cache.errors import ( + CacheBudgetExhaustedError, + ) + + try: + return self._budget_enforcer.reserve_headroom(needed_bytes) + except CacheBudgetExhaustedError as exc: + raise CacheBudgetExceededError( + f"c6 cache budget exhausted: needed {needed_bytes} bytes; {exc}" + ) from exc diff --git a/tests/unit/c11_tile_manager/test_protocol_conformance.py b/tests/unit/c11_tile_manager/test_protocol_conformance.py index 5976459..02411e6 100644 --- a/tests/unit/c11_tile_manager/test_protocol_conformance.py +++ b/tests/unit/c11_tile_manager/test_protocol_conformance.py @@ -1,8 +1,8 @@ -"""AZ-319 AC-12 — `HttpTileUploader` satisfies the `TileUploader` Protocol. +"""C11 protocol conformance — uploader (AZ-319 AC-12) + downloader (AZ-316 AC-10). -Smoke-test that the concrete impl exposes every method the Protocol -requires (positive case) and that a partial fake omitting one of the -three required methods is correctly rejected (negative case). +Smoke-tests that each concrete impl exposes every method its Protocol +requires (positive cases) and that partial fakes omitting one of the +required methods are correctly rejected (negative cases). """ from __future__ import annotations @@ -13,9 +13,13 @@ import httpx from gps_denied_onboard.components.c11_tile_manager import ( C11Config, + HttpTileDownloader, HttpTileUploader, ) -from gps_denied_onboard.components.c11_tile_manager.interface import TileUploader +from gps_denied_onboard.components.c11_tile_manager.interface import ( + TileDownloader, + TileUploader, +) from gps_denied_onboard.fdr_client.fakes import FakeFdrSink _PRODUCER_ID = "c11_tile_manager.tile_uploader" @@ -38,6 +42,13 @@ class _PartialFakeMissingConfirm: return [] +class _PartialDownloaderMissingEnumerate: + """Conformance counterexample: missing ``enumerate_remote_coverage``.""" + + def download_tiles_for_area(self, request: object) -> object: # noqa: ARG002 + return None + + def test_ac12_concrete_uploader_satisfies_protocol() -> None: # Arrange — supply minimal-yet-valid dependencies; the Protocol # check only inspects method names, not their behaviour. @@ -68,3 +79,32 @@ def test_ac12_concrete_uploader_satisfies_protocol() -> None: def test_ac12_partial_fake_is_not_protocol_conformant() -> None: # Assert assert not isinstance(_PartialFakeMissingConfirm(), TileUploader) + + +def test_ac10_concrete_downloader_satisfies_protocol() -> None: + # Arrange + cfg = C11Config( + satellite_provider_url="https://parent-suite.test", + service_api_key="conformance-test-key", + download_http_timeout_s=5.0, + download_max_5xx_retries=4, + download_max_retry_after_s=600, + download_resolution_floor_m_per_px=0.5, + ) + transport = httpx.MockTransport(lambda r: httpx.Response(200, json={"tiles": []})) + downloader = HttpTileDownloader( + http_client=httpx.Client(transport=transport, base_url="https://parent-suite.test"), + tile_writer=object(), # type: ignore[arg-type] + budget_enforcer=object(), # type: ignore[arg-type] + logger=logging.getLogger("test_az316_conformance"), + config=cfg, + sleep=_NullSleep(), + ) + + # Assert + assert isinstance(downloader, TileDownloader) + + +def test_ac10_partial_downloader_is_not_protocol_conformant() -> None: + # Assert + assert not isinstance(_PartialDownloaderMissingEnumerate(), TileDownloader) diff --git a/tests/unit/c11_tile_manager/test_tile_downloader.py b/tests/unit/c11_tile_manager/test_tile_downloader.py new file mode 100644 index 0000000..5ee1898 --- /dev/null +++ b/tests/unit/c11_tile_manager/test_tile_downloader.py @@ -0,0 +1,739 @@ +"""AZ-316 ``HttpTileDownloader`` unit tests. + +Covers AC-1 .. AC-12 plus the throughput NFR from +``_docs/02_tasks/todo/AZ-316_c11_tile_downloader.md``. Uses +:class:`httpx.MockTransport` for deterministic HTTP responses, a +list-backed log handler for log capture, and stub C6 stores so this +suite never depends on AZ-303 / AZ-305 / AZ-307 / AZ-308 internals. +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any +from uuid import uuid4 + +import httpx +import pytest + +from gps_denied_onboard.components.c11_tile_manager import ( + C11Config, + CacheBudgetExceededError, + DownloadOutcome, + DownloadRequest, + HttpTileDownloader, + RateLimitedError, + SatelliteProviderError, + SectorClassification, + request_hash, +) + + +_BASE_URL = "https://parent-suite.test" +_LIST_PATH = "/api/satellite/tiles" +_API_KEY = "test-api-key-001" + + +# ---------------------------------------------------------------------- +# Stubs / fakes +# ---------------------------------------------------------------------- + + +class _StubFreshnessRejection(Exception): + """Composition-root would surface c6's ``FreshnessRejectionError``; + we mirror it locally so the structural check in the downloader + matches by class name without importing c6. + """ + + +# Re-name the class so the structural check (`__class__.__name__ == +# "FreshnessRejectionError"`) inside the downloader matches. +_StubFreshnessRejection.__name__ = "FreshnessRejectionError" + + +class _StubTileWriter: + """Captures `write_tile_for_download` calls + scripts the freshness label.""" + + def __init__( + self, + *, + labels: dict[str, str] | None = None, + rejected: set[str] | None = None, + ) -> None: + self.labels = labels or {} + self.rejected = rejected or set() + self.write_calls: list[dict[str, Any]] = [] + self.exists_calls: list[tuple[int, float, float]] = [] + + def write_tile_for_download( + self, + *, + tile_blob: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: datetime, + content_sha256_hex: str, + sector_class: str, + ) -> str: + tid = _tid(zoom_level, lat, lon) + self.write_calls.append( + { + "tile_id": tid, + "tile_blob_len": len(tile_blob), + "content_sha256_hex": content_sha256_hex, + "sector_class": sector_class, + } + ) + if tid in self.rejected: + raise _StubFreshnessRejection(f"freshness rejected {tid}") + return self.labels.get(tid, "fresh") + + def tile_already_present( + self, *, zoom_level: int, lat: float, lon: float + ) -> bool: + self.exists_calls.append((zoom_level, lat, lon)) + return False + + +class _StubBudgetEnforcer: + """Records `reserve_headroom` calls; raises pre-baked exception when set.""" + + def __init__(self, raise_on_call: Exception | None = None) -> None: + self.calls: list[int] = [] + self._raise = raise_on_call + + def reserve_headroom(self, needed_bytes: int) -> object: + self.calls.append(int(needed_bytes)) + if self._raise is not None: + raise self._raise + return object() + + +def _tid(zoom: int, lat: float, lon: float) -> str: + return f"z{int(zoom)}_{float(lat):.6f}_{float(lon):.6f}" + + +def _build_downloader( + *, + transport: httpx.MockTransport, + tile_writer: _StubTileWriter | None = None, + budget_enforcer: _StubBudgetEnforcer | None = None, + config: C11Config | None = None, + sleep_recorder: list[float] | None = None, + backoff_schedule_s: tuple[float, ...] | None = None, +) -> tuple[ + HttpTileDownloader, + list[logging.LogRecord], + _StubTileWriter, + _StubBudgetEnforcer, + list[float], +]: + log_records: list[logging.LogRecord] = [] + + class _Handler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_records.append(record) + + logger = logging.getLogger(f"test_az316_{id(log_records)}") + logger.handlers.clear() + logger.addHandler(_Handler()) + logger.setLevel(logging.DEBUG) + logger.propagate = False + + writer = tile_writer or _StubTileWriter() + enforcer = budget_enforcer or _StubBudgetEnforcer() + sleeps = sleep_recorder if sleep_recorder is not None else [] + + cfg = config or C11Config( + satellite_provider_url=_BASE_URL, + service_api_key=_API_KEY, + download_http_timeout_s=5.0, + download_max_5xx_retries=4, + download_max_retry_after_s=600, + download_resolution_floor_m_per_px=0.5, + ) + + client = httpx.Client(transport=transport, base_url=_BASE_URL) + downloader = HttpTileDownloader( + http_client=client, + tile_writer=writer, # type: ignore[arg-type] + budget_enforcer=enforcer, # type: ignore[arg-type] + logger=logger, + config=cfg, + sleep=lambda s: sleeps.append(s), + backoff_schedule_s=backoff_schedule_s, + ) + return downloader, log_records, writer, enforcer, sleeps + + +def _make_request( + *, + flight_id: Any | None = None, + cache_root: Path, + zoom_levels: tuple[int, ...] = (14,), +) -> DownloadRequest: + return DownloadRequest( + flight_id=flight_id or uuid4(), + bbox_min_lat=45.0, + bbox_min_lon=-122.5, + bbox_max_lat=45.5, + bbox_max_lon=-122.0, + zoom_levels=zoom_levels, + sector_class=SectorClassification.STABLE_REAR, + cache_root=cache_root, + ) + + +def _list_response( + tiles: list[dict[str, Any]] | None = None, +) -> httpx.Response: + return httpx.Response(200, json={"tiles": tiles or []}) + + +def _tile_entry( + *, + zoom: int, + lat: float, + lon: float, + resolution_m_per_px: float = 0.5, + estimated_bytes: int = 4096, + produced_at: datetime | None = None, +) -> dict[str, Any]: + produced = produced_at or datetime(2026, 5, 13, 0, 0, 0, tzinfo=timezone.utc) + return { + "tile_id": _tid(zoom, lat, lon), + "zoom_level": zoom, + "lat": lat, + "lon": lon, + "produced_at": produced.isoformat(), + "resolution_m_per_px": resolution_m_per_px, + "estimated_bytes": estimated_bytes, + "tile_size_meters": 100.0, + "tile_size_pixels": 256, + } + + +def _make_route_handler( + *, + list_response: httpx.Response | None = None, + tile_response_factory: Any = None, +) -> Any: + """Route GETs by URL path: list endpoint vs per-tile endpoint.""" + + def _handler(request: httpx.Request) -> httpx.Response: + path = request.url.path + is_list = ( + path.endswith(_LIST_PATH) + and request.url.params.get("list-only") == "true" + ) + if is_list: + return list_response or _list_response() + if tile_response_factory is None: + return httpx.Response(200, content=b"\xff\xd8\xff\xe0fake-jpeg") + return tile_response_factory(request) + + return _handler + + +# ---------------------------------------------------------------------- +# AC-1: 100-tile happy path +# ---------------------------------------------------------------------- + + +def test_ac1_100_tile_happy_path_writes_all(tmp_path: Path) -> None: + # Arrange + tiles = [ + _tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0 - i * 0.001) + for i in range(100) + ] + transport = httpx.MockTransport( + _make_route_handler(list_response=_list_response(tiles)) + ) + (downloader, _logs, writer, enforcer, _sleeps) = _build_downloader( + transport=transport + ) + request = _make_request(cache_root=tmp_path) + + # Act + report = downloader.download_tiles_for_area(request) + + # Assert + assert report.outcome == DownloadOutcome.SUCCESS + assert report.tiles_requested == 100 + assert report.tiles_downloaded == 100 + assert report.tiles_rejected_resolution == 0 + assert report.tiles_rejected_freshness == 0 + assert report.tiles_downgraded == 0 + assert len(writer.write_calls) == 100 + assert enforcer.calls == [4096 * 100] + + +# ---------------------------------------------------------------------- +# AC-2: resolution gate rejects sub-spec tiles BEFORE write +# ---------------------------------------------------------------------- + + +def test_ac2_resolution_gate_rejects_sub_spec_tiles(tmp_path: Path) -> None: + # Arrange + tiles = [] + for i in range(50): + res = 0.3 if i < 10 else 0.5 + tiles.append( + _tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0, resolution_m_per_px=res) + ) + transport = httpx.MockTransport( + _make_route_handler(list_response=_list_response(tiles)) + ) + (downloader, log_records, writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + + # Act + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + assert report.tiles_rejected_resolution == 10 + assert report.tiles_downloaded == 40 + assert len(writer.write_calls) == 40 + res_warnings = [r for r in log_records if getattr(r, "kind", "") == "c11.download.resolution_rejected"] + assert len(res_warnings) == 10 + + +# ---------------------------------------------------------------------- +# AC-3: c6 freshness rejection counted, not propagated +# ---------------------------------------------------------------------- + + +def test_ac3_freshness_rejections_counted_and_run_continues(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(10)] + rejected_ids = {_tid(14, 45.0 + i * 0.001, -122.0) for i in range(5)} + transport = httpx.MockTransport( + _make_route_handler(list_response=_list_response(tiles)) + ) + writer = _StubTileWriter(rejected=rejected_ids) + (downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, tile_writer=writer + ) + + # Act + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + assert report.tiles_rejected_freshness == 5 + assert report.tiles_downloaded == 5 + assert report.outcome == DownloadOutcome.SUCCESS + summary_warns = [ + r for r in log_records if getattr(r, "kind", "") == "c11.download.freshness_rejected_summary" + ] + assert len(summary_warns) == 1 + + +# ---------------------------------------------------------------------- +# AC-4: stable_rear stale tiles are surfaced as DOWNGRADED +# ---------------------------------------------------------------------- + + +def test_ac4_downgraded_label_counted_and_persisted(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(5)] + labels = {_tid(14, 45.0 + i * 0.001, -122.0): "downgraded" for i in range(3)} + transport = httpx.MockTransport( + _make_route_handler(list_response=_list_response(tiles)) + ) + writer = _StubTileWriter(labels=labels) + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, tile_writer=writer + ) + + # Act + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + assert report.tiles_downgraded == 3 + assert report.tiles_downloaded == 5 + + +# ---------------------------------------------------------------------- +# AC-5: 429 honours Retry-After +# ---------------------------------------------------------------------- + + +def test_ac5_429_honours_retry_after(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + state = {"attempts": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + state["attempts"] += 1 + if state["attempts"] == 1: + return httpx.Response(429, headers={"Retry-After": "30"}) + return httpx.Response(200, content=b"\xff\xd8tile") + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + sleeps: list[float] = [] + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, sleep_recorder=sleeps + ) + + # Act + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + assert state["attempts"] == 2 + assert sleeps and sleeps[0] >= 30 + assert report.retry_count >= 1 + assert report.outcome == DownloadOutcome.SUCCESS + + +# ---------------------------------------------------------------------- +# AC-6: persistent 5xx aborts with structured error +# ---------------------------------------------------------------------- + + +def test_ac6_persistent_5xx_raises_satellite_provider_error(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + state = {"attempts": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + state["attempts"] += 1 + return httpx.Response(503) + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + + # Act / Assert + with pytest.raises(SatelliteProviderError): + downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + assert state["attempts"] >= 5 + + +# ---------------------------------------------------------------------- +# AC-7: 401 fails fast (no retry) +# ---------------------------------------------------------------------- + + +def test_ac7_401_fails_fast_no_retry(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + state = {"attempts": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + state["attempts"] += 1 + return httpx.Response(401) + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + (downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + + # Act / Assert + with pytest.raises(SatelliteProviderError): + downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + assert state["attempts"] == 1 + + +# ---------------------------------------------------------------------- +# AC-8: idempotent re-run after success +# ---------------------------------------------------------------------- + + +def test_ac8_idempotent_rerun_yields_no_op(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(5)] + state = {"tile_gets": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + state["tile_gets"] += 1 + return httpx.Response(200, content=b"\xff\xd8tile") + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + request = _make_request(cache_root=tmp_path) + + # Act — first run + first = downloader.download_tiles_for_area(request) + first_get_count = state["tile_gets"] + # Act — second run (same request, same cache_root → idempotent) + second = downloader.download_tiles_for_area(request) + + # Assert + assert first.outcome == DownloadOutcome.SUCCESS + assert second.outcome == DownloadOutcome.IDEMPOTENT_NO_OP + assert state["tile_gets"] == first_get_count, "second run must NOT fetch" + assert second.tiles_downloaded == first.tiles_downloaded + + +# ---------------------------------------------------------------------- +# AC-9: cache-budget pre-check aborts before any GET +# ---------------------------------------------------------------------- + + +def test_ac9_cache_budget_pre_check_aborts(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0, estimated_bytes=10_000)] + transport_state = {"tile_gets": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + transport_state["tile_gets"] += 1 + return httpx.Response(200, content=b"\xff\xd8tile") + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + enforcer = _StubBudgetEnforcer( + raise_on_call=CacheBudgetExceededError("no headroom") + ) + (downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, budget_enforcer=enforcer + ) + + # Act / Assert + with pytest.raises(CacheBudgetExceededError): + downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + assert transport_state["tile_gets"] == 0 + assert enforcer.calls == [10_000] + + +# ---------------------------------------------------------------------- +# AC-11: service API key never logged in plaintext +# ---------------------------------------------------------------------- + + +def test_ac11_service_api_key_never_appears_in_logs(tmp_path: Path) -> None: + # Arrange — exercise the failure path so the provider-failed ERROR + # log fires (the code that explicitly redacts the auth header). + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + + def _factory(request: httpx.Request) -> httpx.Response: + return httpx.Response(401) + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + (downloader, log_records, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + with pytest.raises(SatelliteProviderError): + downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + flat = " ".join( + r.getMessage() + json.dumps(getattr(r, "kv", {})) for r in log_records + ) + assert _API_KEY not in flat + assert "Bearer ***" in flat + + +# ---------------------------------------------------------------------- +# AC-12: journal survives mid-batch crash; re-run completes the rest +# ---------------------------------------------------------------------- + + +def test_ac12_partial_journal_resumed_on_rerun(tmp_path: Path) -> None: + # Arrange — 10 tiles; first run fetches all 10 successfully and + # leaves a complete journal. A second run with the SAME request + # must short-circuit (AC-8 covers that). To exercise AC-12 we + # MANUALLY truncate the journal between runs to simulate a crash + # AFTER 4 tile-writes, BEFORE the completed_at_iso stamp. + tiles = [_tile_entry(zoom=14, lat=45.0 + i * 0.001, lon=-122.0) for i in range(10)] + state = {"tile_gets": 0} + + def _factory(request: httpx.Request) -> httpx.Response: + state["tile_gets"] += 1 + return httpx.Response(200, content=b"\xff\xd8tile") + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + (downloader, _logs, writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + request = _make_request(cache_root=tmp_path) + + # First run — completes + downloader.download_tiles_for_area(request) + after_first = state["tile_gets"] + write_calls_after_first = len(writer.write_calls) + assert after_first == 10 + assert write_calls_after_first == 10 + + # Simulate crash: rewrite the journal as "completed 4 of 10 tiles, + # NOT yet completed" (clear `completed_at_iso`). + rh = request_hash( + request.flight_id, + request.bbox_min_lat, + request.bbox_min_lon, + request.bbox_max_lat, + request.bbox_max_lon, + tuple(request.zoom_levels), + request.sector_class, + _API_KEY, + ) + journal_path = tmp_path / ".c11/journal" / f"{request.flight_id}__{rh}.json" + raw = json.loads(journal_path.read_text("utf-8")) + raw["completed_at_iso"] = None + raw["tile_ids_completed"] = sorted(raw["tile_ids_completed"])[:4] + raw["tile_counts"]["tiles_downloaded"] = 4 + journal_path.write_text(json.dumps(raw), encoding="utf-8") + + # Reset transport counter (writer still records cumulative calls) + state["tile_gets"] = 0 + writer.write_calls.clear() + + # Act — second run must fetch only the missing 6 + second = downloader.download_tiles_for_area(request) + + # Assert + assert state["tile_gets"] == 6 + assert len(writer.write_calls) == 6 + assert second.outcome == DownloadOutcome.SUCCESS + assert second.tiles_downloaded == 6 + + +# ---------------------------------------------------------------------- +# Retry-After HTTP-date form (Risk 1 from the spec) +# ---------------------------------------------------------------------- + + +def test_429_retry_after_http_date_form_parses(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + state = {"attempts": 0} + future = (datetime.now(timezone.utc) + timedelta(seconds=20)).strftime( + "%a, %d %b %Y %H:%M:%S GMT" + ) + + def _factory(request: httpx.Request) -> httpx.Response: + state["attempts"] += 1 + if state["attempts"] == 1: + return httpx.Response(429, headers={"Retry-After": future}) + return httpx.Response(200, content=b"\xff\xd8tile") + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + sleeps: list[float] = [] + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, sleep_recorder=sleeps + ) + + # Act + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + # Assert + assert state["attempts"] == 2 + assert sleeps and sleeps[0] >= 0 + assert report.outcome == DownloadOutcome.SUCCESS + + +# ---------------------------------------------------------------------- +# 429 budget exhaustion → RateLimitedError +# ---------------------------------------------------------------------- + + +def test_429_budget_exhaustion_raises_rate_limited_error(tmp_path: Path) -> None: + # Arrange + tiles = [_tile_entry(zoom=14, lat=45.0, lon=-122.0)] + + def _factory(request: httpx.Request) -> httpx.Response: + return httpx.Response(429, headers={"Retry-After": "300"}) + + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=_factory, + ) + ) + cfg = C11Config( + satellite_provider_url=_BASE_URL, + service_api_key=_API_KEY, + download_http_timeout_s=5.0, + download_max_5xx_retries=4, + download_max_retry_after_s=400, + download_resolution_floor_m_per_px=0.5, + ) + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport, config=cfg + ) + + # Act / Assert + with pytest.raises(RateLimitedError): + downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + + +# ---------------------------------------------------------------------- +# NFR — bookkeeping throughput on a 1000-tile happy path +# ---------------------------------------------------------------------- + + +def test_nfr_throughput_1000_tiles_under_budget(tmp_path: Path) -> None: + # Arrange + tiles = [ + _tile_entry(zoom=14, lat=45.0 + i * 0.0001, lon=-122.0 + i * 0.0001) + for i in range(1000) + ] + transport = httpx.MockTransport( + _make_route_handler( + list_response=_list_response(tiles), + tile_response_factory=lambda r: httpx.Response(200, content=b"\xff\xd8tile"), + ) + ) + (downloader, _logs, _writer, _enforcer, _sleeps) = _build_downloader( + transport=transport + ) + + import time as _time + + t0 = _time.perf_counter() + report = downloader.download_tiles_for_area(_make_request(cache_root=tmp_path)) + elapsed = _time.perf_counter() - t0 + + # Assert — budget is generous; the goal is to catch an O(n^2) + # bookkeeping regression, not to certify wall-clock throughput. + assert report.outcome == DownloadOutcome.SUCCESS + assert report.tiles_downloaded == 1000 + assert elapsed < 10.0, f"1000-tile bookkeeping took {elapsed:.2f}s"