[AZ-316] Implement C11 HttpTileDownloader (batch 40)

Lands the operator-side pre-flight download path: authenticated
httpx GETs against satellite-provider, RESTRICT-SAT-4 (>= 0.5 m/px)
enforcement at the C11 boundary, c6 writes via consumer-side cuts
(_TileWriterLike, _BudgetEnforcerLike), per-(flight_id, request_hash)
journal under cache_root/.c11/journal/ for idempotent re-runs (AC-8,
AC-12), 429 Retry-After + 5xx exponential backoff handling, fail-fast
on TLS / 401 / 403, and a redacted-bearer auth-header policy.

Architecture:
- AZ-507 cross-component rule held: tile_downloader.py imports zero
  c6 symbols; the composition-root _C6DownloadAdapter in
  runtime_root/c11_factory.py absorbs c6's TileMetadata / TileSource /
  FreshnessLabel / VotingStatus enum assembly.
- Sleep-callable injection (not full Clock) per Batch 39 precedent;
  default routes through WallClock.sleep_until_ns to keep the AZ-398
  invariant intact.
- No FDR records on the download path; spec mandates structured logs
  only (8 log kinds wired: session.start/end, resolution_rejected,
  freshness_rejected_summary, freshness_downgraded, batch.retry,
  provider.failed, budget.exceeded, idempotent_no_op).

Tests: 14 new downloader unit tests covering AC-1..AC-9, AC-11, AC-12
plus throughput NFR + 429 HTTP-date + 429 budget exhaustion; 2 new
TileDownloader Protocol conformance tests (AC-10). Full unit suite:
1420 passed, 80 skipped (env-gated), 0 failed.

Code review: PASS_WITH_WARNINGS (5 Low findings, all documentation
or downstream-blocked). See _docs/03_implementation/reviews/
batch_40_review.md and batch_40_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 07:01:14 +03:00
parent 3a61a4f5bf
commit 90f4ac78f4
13 changed files with 2513 additions and 62 deletions
@@ -1,27 +1,33 @@
"""C11 Tile Manager component — Public API.
Re-exports the Protocol surface (``TileDownloader``, ``TileUploader``,
``FlightStateSource``), the upload-side services that have landed
``FlightStateSource``), the operator-side services that have landed
(``FlightStateGate`` from AZ-317, ``PerFlightKeyManager`` from AZ-318,
``HttpTileUploader`` from AZ-319), the C11 internal DTOs / enums, the
C11 error family, and the per-component config block. The download-side
concrete impl (``HttpTileDownloader``) ships in AZ-316; it will be added
to ``__all__`` then.
``HttpTileUploader`` from AZ-319, ``HttpTileDownloader`` from AZ-316),
the C11 internal DTOs / enums, the C11 error family, and the
per-component config block.
"""
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadOutcome,
DownloadRequest,
FlightStateSignal,
IngestStatus,
PerTileStatus,
PublicKeyFingerprint,
SectorClassification,
TileSummary,
UploadBatchReport,
UploadOutcome,
UploadRequest,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
CacheBudgetExceededError,
FlightStateNotOnGroundError,
RateLimitedError,
ResolutionRejectionError,
SatelliteProviderError,
SessionNotActiveError,
SignatureRejectedError,
@@ -38,6 +44,11 @@ from gps_denied_onboard.components.c11_tile_manager.interface import (
from gps_denied_onboard.components.c11_tile_manager.signing_key import (
PerFlightKeyManager,
)
from gps_denied_onboard.components.c11_tile_manager.tile_downloader import (
DOWNLOAD_JOURNAL_DIRNAME,
HttpTileDownloader,
request_hash,
)
from gps_denied_onboard.components.c11_tile_manager.tile_uploader import (
HttpTileUploader,
canonical_payload_bytes,
@@ -48,24 +59,34 @@ register_component_block("c11_tile_manager", C11Config)
__all__ = [
"C11Config",
"CacheBudgetExceededError",
"DOWNLOAD_JOURNAL_DIRNAME",
"DownloadBatchReport",
"DownloadOutcome",
"DownloadRequest",
"FlightStateGate",
"FlightStateNotOnGroundError",
"FlightStateSignal",
"FlightStateSource",
"HttpTileDownloader",
"HttpTileUploader",
"IngestStatus",
"PerFlightKeyManager",
"PerTileStatus",
"PublicKeyFingerprint",
"RateLimitedError",
"ResolutionRejectionError",
"SatelliteProviderError",
"SectorClassification",
"SessionNotActiveError",
"SignatureRejectedError",
"TileDownloader",
"TileManagerError",
"TileSummary",
"TileUploader",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
"canonical_payload_bytes",
"request_hash",
]
@@ -1,4 +1,4 @@
"""C11 internal DTOs (AZ-317, AZ-318, AZ-319).
"""C11 internal DTOs (AZ-316, AZ-317, AZ-318, AZ-319).
* :class:`FlightStateSignal` — five flight-state signals consumed by the
upload-side flight-state gate (AZ-317).
@@ -9,6 +9,12 @@
upload-side DTOs and enums consumed and produced by the AZ-319
:class:`HttpTileUploader` (contract
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md`` v1.0.0).
* :class:`DownloadRequest`, :class:`DownloadBatchReport`,
:class:`TileSummary`, :class:`DownloadOutcome`,
:class:`SectorClassification` — download-side DTOs and enums consumed
and produced by the AZ-316 :class:`HttpTileDownloader` (contract
``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0).
Internal to the component — composition-root code reaches these via the
``c11_tile_manager`` package re-exports; consumers outside C11 use the
@@ -20,13 +26,19 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from uuid import UUID
__all__ = [
"DownloadBatchReport",
"DownloadOutcome",
"DownloadRequest",
"FlightStateSignal",
"IngestStatus",
"PerTileStatus",
"PublicKeyFingerprint",
"SectorClassification",
"TileSummary",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
@@ -161,3 +173,148 @@ class UploadBatchReport:
next_retry_at_s: int | None
outcome: UploadOutcome
public_key_fingerprint: str
# ----------------------------------------------------------------------
# AZ-316 download-side DTOs
# ----------------------------------------------------------------------
class SectorClassification(str, Enum):
"""Operator-classified sector type the request applies to.
Matches c6's :class:`SectorClassification`; declared locally so the
AZ-316 download path keeps its public surface free of cross-component
imports (AZ-507 / AZ-270). The composition-root adapter maps this
to c6's enum at the write boundary.
"""
ACTIVE_CONFLICT = "active_conflict"
STABLE_REAR = "stable_rear"
NEUTRAL = "neutral"
class DownloadOutcome(str, Enum):
"""Aggregate outcome of one :meth:`TileDownloader.download_tiles_for_area` call.
Mirrors contract Shape § ``DownloadBatchReport.outcome``:
* ``SUCCESS`` — every requested tile was either downloaded, gated
by the resolution / freshness rule, or downgraded; no terminal
error occurred.
* ``PARTIAL`` — at least one tile failed terminally with a
transient error that did NOT escalate (e.g. a single 5xx that
retried-then-skipped); the batch journaled what it could.
* ``FAILURE`` — a terminal error aborted the batch (TLS / 401 /
403 / persistent 5xx / rate-limit budget); typed exception
raises and the partial state is journaled for the next
idempotent re-run.
* ``IDEMPOTENT_NO_OP`` — the journal recorded a complete prior
run for the same ``(flight_id, request_hash)`` pair; zero GETs
issued, zero writes attempted.
"""
SUCCESS = "success"
PARTIAL = "partial"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
@dataclass(frozen=True)
class TileSummary:
"""Per-tile descriptor returned by :meth:`TileDownloader.enumerate_remote_coverage`.
``produced_at`` is the parent-suite's "this tile was rendered at"
timestamp; ``resolution_m_per_px`` is the metres-per-pixel value
the C11 boundary tests against ``RESTRICT-SAT-4`` (≥ 0.5 m/px).
``estimated_bytes`` is the parent-suite's content-length hint
used by the AZ-308 budget pre-check before any GET fires.
"""
tile_id_str: str
zoom_level: int
lat: float
lon: float
produced_at: datetime
resolution_m_per_px: float
estimated_bytes: int
tile_size_meters: float
tile_size_pixels: int
@dataclass(frozen=True)
class DownloadRequest:
"""Inputs to :meth:`TileDownloader.download_tiles_for_area`.
``bbox_min_lat`` / ``bbox_min_lon`` / ``bbox_max_lat`` /
``bbox_max_lon`` are inclusive-exclusive WGS84 bounds (matches
c6's :class:`Bbox`). ``zoom_levels`` is the set of Web-Mercator
zoom levels to download; each zoom is treated as an independent
rectangular grid against the bbox. ``cache_root`` is the on-disk
root for both the c6 store AND the C11 download journal (under
``cache_root/.c11/journal/<flight_id>__<request_hash>.json``).
``flight_id`` identifies the operator's pre-flight build context;
re-running the same ``(flight_id, request_hash)`` is the
idempotence check.
"""
flight_id: UUID
bbox_min_lat: float
bbox_min_lon: float
bbox_max_lat: float
bbox_max_lon: float
zoom_levels: tuple[int, ...]
sector_class: SectorClassification
cache_root: Path
def __post_init__(self) -> None:
if self.bbox_min_lat >= self.bbox_max_lat:
raise ValueError(
"DownloadRequest.bbox_min_lat must be < bbox_max_lat; "
f"got [{self.bbox_min_lat}, {self.bbox_max_lat})"
)
if self.bbox_min_lon >= self.bbox_max_lon:
raise ValueError(
"DownloadRequest.bbox_min_lon must be < bbox_max_lon; "
f"got [{self.bbox_min_lon}, {self.bbox_max_lon})"
)
if not self.zoom_levels:
raise ValueError("DownloadRequest.zoom_levels must be non-empty")
for z in self.zoom_levels:
if not 0 <= int(z) <= 21:
raise ValueError(
f"DownloadRequest.zoom_levels: every zoom must be in "
f"[0, 21]; got {z}"
)
@dataclass(frozen=True)
class DownloadBatchReport:
"""Aggregate report returned by :meth:`TileDownloader.download_tiles_for_area`.
Per-tile counts let the operator-tooling CLI render the post-run
summary without re-reading the journal:
* ``tiles_requested`` — total tiles enumerated by
:meth:`enumerate_remote_coverage` for this bbox / zoom set.
* ``tiles_downloaded`` — bytes successfully written to the c6
store (includes ``DOWNGRADED`` because those ARE persisted).
* ``tiles_rejected_resolution`` — gated by the C11 ≥ 0.5 m/px
resolution check before any GET.
* ``tiles_rejected_freshness`` — c6's freshness gate raised.
* ``tiles_downgraded`` — c6 returned the ``DOWNGRADED`` label;
tile IS in the store, but flagged as stable_rear stale.
* ``retry_count`` — total transient retries across the batch.
* ``request_hash`` — first 16 hex of the journal key (so the
caller can correlate to the on-disk journal file without
re-deriving the hash).
"""
outcome: DownloadOutcome
tiles_requested: int
tiles_downloaded: int
tiles_rejected_resolution: int
tiles_rejected_freshness: int
tiles_downgraded: int
retry_count: int
request_hash: str
@@ -1,18 +1,20 @@
"""C11 TileManager config block (AZ-319).
"""C11 TileManager config block (AZ-316, AZ-319).
Registered into ``config.components['c11_tile_manager']`` by the
package ``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
reads this block to drive the upload path's HTTP behaviour and to
identify the producing companion against the parent suite's voting
layer.
package ``__init__.py``. Two composition-root factories read this
block:
The four fields below match the AZ-319 task spec § ``Outcome`` —
``config.c11.satellite_provider_ingest_url``,
``config.c11.upload_batch_size``, ``config.c11.upload_http_timeout_s``,
``config.c11.companion_id``. The ``upload_max_retry_after_s`` cap is
the Risk-3 ceiling on cumulative ``Retry-After`` budget for 429
responses (see :class:`RateLimitedError`).
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
reads the ``upload_*`` fields and ``companion_id`` to drive AZ-319.
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_downloader`
reads the ``satellite_provider_url``, ``service_api_key``, and
``download_*`` fields to drive AZ-316.
All defaults are conservative no-op values so unit tests / replay
runs that do not exercise C11 keep working without YAML; the factory
raises :class:`ConfigError` when an empty production-required field
(``service_api_key``, ``companion_id``, etc.) is observed in operator
wiring.
"""
from __future__ import annotations
@@ -28,25 +30,42 @@ _DEFAULT_BATCH_SIZE: int = 25
_DEFAULT_HTTP_TIMEOUT_S: float = 30.0
_DEFAULT_MAX_RETRY_AFTER_S: int = 600
_MAX_BATCH_SIZE: int = 200
_DEFAULT_DOWNLOAD_RESOLUTION_FLOOR: float = 0.5
_DEFAULT_DOWNLOAD_MAX_5XX_RETRIES: int = 4
_MIN_DOWNLOAD_RETRIES: int = 1
_MAX_DOWNLOAD_RETRIES: int = 16
@dataclass(frozen=True)
class C11Config:
"""Per-component config for C11 tile manager (upload path).
"""Per-component config for C11 tile manager (upload + download paths).
``satellite_provider_ingest_url`` is the parent-suite ingest base
URL (e.g. ``https://satellite-provider.example.com``); the
uploader appends ``/api/satellite/tiles/ingest`` to it. Defaulted
to empty so unit tests / replay runs that do not exercise the
upload path stay no-op; production configuration MUST set this
via YAML / env override or :class:`HttpTileUploader` raises
:class:`SatelliteProviderError` on the first attempt.
Upload-side fields (AZ-319):
``companion_id`` is the stable per-companion identifier the
parent suite's voting layer uses to attribute uploads to one
physical airframe. Defaulted to empty so test runs without a
paired companion stay valid; the factory raises ``ConfigError``
when the empty default is used in operator / production wiring.
* ``satellite_provider_ingest_url`` — base URL for the upload
endpoint; ``HttpTileUploader`` appends
``/api/satellite/tiles/ingest``. Empty → upload factory raises
:class:`ConfigError`.
* ``upload_batch_size`` — tiles per multipart POST.
* ``upload_http_timeout_s`` — per-request timeout (seconds).
* ``upload_max_retry_after_s`` — cumulative 429 ``Retry-After``
cap before :class:`RateLimitedError`.
* ``companion_id`` — stable per-companion id for D-PROJ-2 voting.
Download-side fields (AZ-316):
* ``satellite_provider_url`` — base URL for the GET surface;
``HttpTileDownloader`` appends per-tile / list paths.
* ``service_api_key`` — bearer token for authenticated GETs;
logged ONLY redacted (``Bearer ***``). Empty → download factory
raises :class:`ConfigError`.
* ``download_http_timeout_s`` — per-request timeout (seconds).
* ``download_max_5xx_retries`` — exponential-backoff cap before
:class:`SatelliteProviderError`.
* ``download_max_retry_after_s`` — cumulative 429 ``Retry-After``
cap before :class:`RateLimitedError`.
* ``download_resolution_floor_m_per_px`` — RESTRICT-SAT-4 lower
bound for the C11 boundary check; defaults to 0.5 m/px.
"""
satellite_provider_ingest_url: str = ""
@@ -55,6 +74,13 @@ class C11Config:
upload_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
companion_id: str = ""
satellite_provider_url: str = ""
service_api_key: str = ""
download_http_timeout_s: float = _DEFAULT_HTTP_TIMEOUT_S
download_max_5xx_retries: int = _DEFAULT_DOWNLOAD_MAX_5XX_RETRIES
download_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
download_resolution_floor_m_per_px: float = _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR
def __post_init__(self) -> None:
if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE:
raise ConfigError(
@@ -71,3 +97,24 @@ class C11Config:
"C11Config.upload_max_retry_after_s must be > 0; "
f"got {self.upload_max_retry_after_s}"
)
if self.download_http_timeout_s <= 0:
raise ConfigError(
"C11Config.download_http_timeout_s must be > 0; "
f"got {self.download_http_timeout_s}"
)
if not _MIN_DOWNLOAD_RETRIES <= self.download_max_5xx_retries <= _MAX_DOWNLOAD_RETRIES:
raise ConfigError(
"C11Config.download_max_5xx_retries must be in "
f"[{_MIN_DOWNLOAD_RETRIES}, {_MAX_DOWNLOAD_RETRIES}]; "
f"got {self.download_max_5xx_retries}"
)
if self.download_max_retry_after_s <= 0:
raise ConfigError(
"C11Config.download_max_retry_after_s must be > 0; "
f"got {self.download_max_retry_after_s}"
)
if self.download_resolution_floor_m_per_px <= 0:
raise ConfigError(
"C11Config.download_resolution_floor_m_per_px must be > 0; "
f"got {self.download_resolution_floor_m_per_px}"
)
@@ -1,10 +1,9 @@
"""C11 TileManager error family (AZ-317, AZ-318, AZ-319).
"""C11 TileManager error family (AZ-316, AZ-317, AZ-318, AZ-319).
Rooted at :class:`TileManagerError`. The parent is declared here (rather
than alongside the AZ-316 ``TileDownloader``) so the upload-side tasks
landing first do not need to wait on a downloader-only file. AZ-316
(``HttpTileDownloader``) will add its download-side errors as further
subclasses without re-declaring the parent.
Rooted at :class:`TileManagerError`. Both the upload (AZ-319) and
download (AZ-316) paths share the family parent so cross-path callers
can ``except TileManagerError`` to catch any C11-side terminal failure
without enumerating subclasses.
* :class:`FlightStateNotOnGroundError` (AZ-317) — defence-in-depth
refusal when the flight controller reports anything other than
@@ -15,10 +14,16 @@ subclasses without re-declaring the parent.
by the AZ-319 :class:`HttpTileUploader` after parsing a
``REJECTED`` per-tile response whose ``rejection_reason`` mentions
the signature.
* :class:`SatelliteProviderError` (AZ-319) — TLS / 401 / 403 fail-fast
AND persistent-5xx fail-after-retries surface for the upload path.
* :class:`RateLimitedError` (AZ-319) — 429 with persistent
``Retry-After`` budget exhaustion.
* :class:`SatelliteProviderError` (AZ-316/AZ-319) — TLS / 401 / 403
fail-fast AND persistent-5xx fail-after-retries surface for both
the download and upload paths.
* :class:`RateLimitedError` (AZ-316/AZ-319) — 429 with persistent
``Retry-After`` budget exhaustion (download + upload share the type).
* :class:`ResolutionRejectionError` (AZ-316) — surfaced when the
downloader's RESTRICT-SAT-4 boundary check rejects a tile with
``resolution_m_per_px < 0.5``.
* :class:`CacheBudgetExceededError` (AZ-316) — surfaced when c6's
AZ-308 budget enforcer cannot reserve head-room for the download.
"""
from __future__ import annotations
@@ -32,8 +37,10 @@ if TYPE_CHECKING:
)
__all__ = [
"CacheBudgetExceededError",
"FlightStateNotOnGroundError",
"RateLimitedError",
"ResolutionRejectionError",
"SatelliteProviderError",
"SessionNotActiveError",
"SignatureRejectedError",
@@ -98,13 +105,37 @@ class SatelliteProviderError(TileManagerError):
class RateLimitedError(TileManagerError):
"""``satellite-provider`` ingest endpoint rate-limited the upload.
"""``satellite-provider`` rate-limited the request (upload OR download).
Raised when the parent suite returns 429 and the cumulative
``Retry-After`` budget exceeds
:attr:`C11Config.upload_max_retry_after_s`. The
AZ-319 :class:`HttpTileUploader` honours the FIRST 429's
``Retry-After`` (sleep + retry) but escalates to this error after
:attr:`C11Config.upload_max_retry_after_s` (upload path) or
:attr:`C11Config.download_max_retry_after_s` (download path).
Both AZ-319 :class:`HttpTileUploader` and AZ-316
:class:`HttpTileDownloader` honour the first 429's
``Retry-After`` (sleep + retry) and escalate to this error after
the configured budget so the operator can surface the rate-limit
state explicitly.
"""
class ResolutionRejectionError(TileManagerError):
"""A downloaded tile failed the C11 ``RESTRICT-SAT-4`` resolution gate.
Raised by per-tile validation when ``resolution_m_per_px < 0.5``.
The download path catches this internally and counts the tile in
:attr:`DownloadBatchReport.tiles_rejected_resolution` rather than
aborting the batch — the type stays exported for the operator
tooling to surface in CLI output.
"""
class CacheBudgetExceededError(TileManagerError):
"""The c6 AZ-308 budget enforcer could not reserve head-room.
Raised after the C11 download path's pre-check converts c6's
``CacheBudgetExhaustedError`` into the C11-side type, so callers
only need to ``except TileManagerError`` (or this subclass) to
catch a cache-full failure. The original c6 error is preserved
on ``__cause__``.
"""
@@ -3,7 +3,10 @@
Operator-side ONLY — excluded from airborne via CMake (`BUILD_C11_TILE_MANAGER=OFF`).
See `_docs/02_document/components/12_c11_tilemanager/`.
* :class:`TileDownloader` — pre-flight download path (AZ-316, pending).
* :class:`TileDownloader` — pre-flight download path (AZ-316). The
authoritative shape lives in
``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0 and is mirrored 1:1 here.
* :class:`TileUploader` — post-landing upload path (AZ-319) — the
authoritative shape lives in
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md``
@@ -18,14 +21,15 @@ See `_docs/02_document/components/12_c11_tilemanager/`.
from __future__ import annotations
from collections.abc import Iterable, Sequence
from pathlib import Path
from collections.abc import Sequence
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
from gps_denied_onboard._types.tile import TileRecord
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadRequest,
FlightStateSignal,
TileSummary,
UploadBatchReport,
UploadRequest,
)
@@ -37,12 +41,27 @@ __all__ = [
]
@runtime_checkable
class TileDownloader(Protocol):
"""Pre-flight tile download from `satellite-provider`."""
"""Pre-flight tile download from ``satellite-provider`` (operator-side).
def download(
self, lat_lon_box: tuple[float, float, float, float], zoom: int, output_root: Path
) -> Iterable[TileRecord]: ...
See ``_docs/02_document/contracts/c11_tilemanager/tile_downloader.md``
v1.0.0 for invariants I-1 .. I-5 and the per-method error matrix.
The :meth:`enumerate_remote_coverage` return type is
:class:`TileSummary` (DTO declared in C11's ``_types`` so the C12
consumer never imports c6 to size a download).
"""
def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport: ...
def enumerate_remote_coverage(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: Sequence[int],
) -> Sequence[TileSummary]: ...
@runtime_checkable
@@ -0,0 +1,907 @@
"""C11 ``HttpTileDownloader`` (AZ-316) — concrete :class:`TileDownloader`.
Operator-side pre-flight download path. Authenticated GETs against
``satellite-provider``, RESTRICT-SAT-4 enforcement at the C11 boundary,
c6 writes via the AZ-303 store + metadata Protocols (which run AZ-307's
freshness gate at insert), AZ-308 cache-headroom pre-check before any
GET fires, and a per-``(flight_id, request_hash)`` journal for
idempotent re-runs.
Architecture
------------
The c6 storage surfaces are reached through structural :class:`Protocol`
cuts (:class:`_TileWriterLike`, :class:`_BudgetEnforcerLike`,
:class:`_FreshnessRejectionLike`) defined in this module — never via a
direct ``from gps_denied_onboard.components.c6_tile_cache import …``.
The composition root
(``runtime_root.c11_factory.build_tile_downloader``) is the single
layer that may bind concrete c6 implementations into the constructor.
That adapter handles c6's :class:`TileMetadata` / :class:`TileSource` /
:class:`FreshnessLabel` / :class:`SectorClassification` enums so the
downloader stays free of cross-component imports (AZ-507 / AZ-270).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
import httpx
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
DownloadOutcome,
DownloadRequest,
SectorClassification,
TileSummary,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
CacheBudgetExceededError,
RateLimitedError,
SatelliteProviderError,
)
__all__ = [
"DOWNLOAD_JOURNAL_DIRNAME",
"HttpTileDownloader",
"request_hash",
]
_LIST_PATH = "/api/satellite/tiles"
_GET_PATH = "/api/satellite/tiles"
_LIST_QUERY_LIST_ONLY = "list-only"
DOWNLOAD_JOURNAL_DIRNAME = ".c11/journal"
_LOCKFILE_PATH = ".c11/lock"
_DEFAULT_BACKOFF_SCHEDULE_S: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0)
_COMPONENT = "c11_tile_manager.tile_downloader"
_LOG_KIND_SESSION_START = "c11.download.session.start"
_LOG_KIND_SESSION_END = "c11.download.session.end"
_LOG_KIND_RESOLUTION_REJECT = "c11.download.resolution_rejected"
_LOG_KIND_FRESHNESS_REJECT = "c11.download.freshness_rejected_summary"
_LOG_KIND_FRESHNESS_DOWNGRADED = "c11.download.freshness_downgraded"
_LOG_KIND_RETRY = "c11.download.batch.retry"
_LOG_KIND_PROVIDER_FAIL = "c11.download.provider.failed"
_LOG_KIND_BUDGET_FAIL = "c11.download.budget.exceeded"
_LOG_KIND_IDEMPOTENT = "c11.download.idempotent_no_op"
_AUTH_HEADER_REDACTED = "Bearer ***"
# ----------------------------------------------------------------------
# Consumer-side cuts over c6 (AZ-507): never imported across components.
# ----------------------------------------------------------------------
@runtime_checkable
class _TileWriterLike(Protocol):
"""Composition-root adapter that hides c6's ``TileMetadata`` assembly.
The downloader hands the adapter the primitives the parent suite
returned (zoom, lat, lon, tile_size, capture_ts, content sha256,
freshness label string, source string) plus the JPEG bytes. The
adapter assembles c6's :class:`TileMetadata` and calls
``TileStore.write_tile`` + ``TileMetadataStore.insert_metadata``
inside its own boundary, returning the c6 freshness label as a
plain string (``"fresh"`` / ``"downgraded"`` / etc.) so the
downloader can map it without importing c6's enum.
Raises :class:`_FreshnessRejectionLike`-compatible exception on
c6 freshness gate refusal; the downloader catches by structural
type to keep the import boundary clean.
"""
def write_tile_for_download(
self,
*,
tile_blob: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: datetime,
content_sha256_hex: str,
sector_class: str,
) -> str: ...
def tile_already_present(
self, *, zoom_level: int, lat: float, lon: float
) -> bool: ...
@runtime_checkable
class _BudgetEnforcerLike(Protocol):
"""Structural cut of c6's :meth:`CacheBudgetEnforcer.reserve_headroom`.
Returns any object on success; raises any exception on failure
(the downloader maps either path into its own
:class:`CacheBudgetExceededError` envelope so callers do not need
to catch a c6 type).
"""
def reserve_headroom(self, needed_bytes: int) -> Any: ...
# ----------------------------------------------------------------------
# Request hash + journal helpers
# ----------------------------------------------------------------------
def request_hash(
flight_id: UUID,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: tuple[int, ...],
sector_class: SectorClassification,
service_api_key: str,
) -> str:
"""Stable 16-hex digest used as the journal filename suffix.
Hashes a deterministic concatenation of the request fields plus a
SHA-256 of the service API key (so two operators with different
keys never share a journal). The digest is short enough for human
inspection and long enough for collision resistance within one
cache root.
"""
api_key_digest = hashlib.sha256(service_api_key.encode("utf-8")).hexdigest()
payload = "|".join(
[
str(flight_id),
f"{bbox_min_lat:.10f}",
f"{bbox_min_lon:.10f}",
f"{bbox_max_lat:.10f}",
f"{bbox_max_lon:.10f}",
",".join(str(z) for z in sorted(zoom_levels)),
sector_class.value,
api_key_digest,
]
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]
def _journal_path(cache_root: Path, flight_id: UUID, req_hash: str) -> Path:
return cache_root / DOWNLOAD_JOURNAL_DIRNAME / f"{flight_id}__{req_hash}.json"
@dataclass
class _JournalState:
"""Per-batch journal payload. Persisted as compact JSON via atomicwrite."""
flight_id: str
request_hash: str
started_at_iso: str
completed_at_iso: str | None = None
tile_ids_completed: list[str] = field(default_factory=list)
tile_counts: dict[str, int] = field(default_factory=dict)
def to_json_bytes(self) -> bytes:
return json.dumps(
{
"flight_id": self.flight_id,
"request_hash": self.request_hash,
"started_at_iso": self.started_at_iso,
"completed_at_iso": self.completed_at_iso,
"tile_ids_completed": self.tile_ids_completed,
"tile_counts": self.tile_counts,
},
sort_keys=True,
separators=(",", ":"),
).encode("utf-8")
@classmethod
def from_json_bytes(cls, raw: bytes) -> _JournalState:
decoded = json.loads(raw.decode("utf-8"))
return cls(
flight_id=str(decoded["flight_id"]),
request_hash=str(decoded["request_hash"]),
started_at_iso=str(decoded["started_at_iso"]),
completed_at_iso=decoded.get("completed_at_iso"),
tile_ids_completed=list(decoded.get("tile_ids_completed") or []),
tile_counts=dict(decoded.get("tile_counts") or {}),
)
def _atomic_write_json(path: Path, payload: bytes) -> None:
"""Write-then-rename + fsync per the description.md atomicwrites pattern."""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_str = tempfile.mkstemp(
prefix=path.name + ".",
suffix=".tmp",
dir=str(path.parent),
)
tmp = Path(tmp_str)
try:
with os.fdopen(fd, "wb") as fp:
fp.write(payload)
fp.flush()
os.fsync(fp.fileno())
os.replace(tmp, path)
# Best-effort directory fsync so the rename is durable on
# power-loss; not all filesystems implement directory fsync,
# so failure here is logged-and-ignored at the caller.
dir_fd = os.open(str(path.parent), os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except Exception:
if tmp.exists():
tmp.unlink(missing_ok=True)
raise
def _read_journal(path: Path) -> _JournalState | None:
if not path.exists():
return None
try:
return _JournalState.from_json_bytes(path.read_bytes())
except (json.JSONDecodeError, KeyError, ValueError):
# Risk-3 mitigation: a torn / corrupt journal record is treated
# as "no prior journal" so the batch re-runs from scratch.
return None
# ----------------------------------------------------------------------
# Retry-After parsing (mirrors AZ-319 helper)
# ----------------------------------------------------------------------
def _parse_retry_after(header_value: str | None, max_s: int) -> int:
if header_value is None:
return 0
raw = header_value.strip()
if not raw:
return 0
if raw.isdigit():
return min(int(raw), max_s)
try:
retry_at = parsedate_to_datetime(raw)
except (TypeError, ValueError):
return 0
now = datetime.now(timezone.utc)
if retry_at.tzinfo is None:
retry_at = retry_at.replace(tzinfo=timezone.utc)
delta = (retry_at - now).total_seconds()
return max(0, min(int(delta), max_s))
def _default_sleep(seconds: float) -> None:
"""Production sleep hook routes through :class:`WallClock.sleep_until_ns`.
Tests inject a no-op or a recorder. Routing through ``WallClock``
keeps the AZ-398 AC-4 AST scan over ``components/`` from seeing a
bare ``time.sleep``.
"""
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
clock.sleep_until_ns(clock.monotonic_ns() + int(seconds * 1_000_000_000))
# ----------------------------------------------------------------------
# Internal session-state container
# ----------------------------------------------------------------------
@dataclass
class _DownloadSession:
"""Mutable bookkeeping for one ``download_tiles_for_area`` call."""
request: DownloadRequest
journal: _JournalState
journal_path: Path
completed_set: set[str]
tiles_requested: int = 0
tiles_downloaded: int = 0
tiles_rejected_resolution: int = 0
tiles_rejected_freshness: int = 0
tiles_downgraded: int = 0
retry_count: int = 0
rate_limit_budget_used_s: int = 0
# ----------------------------------------------------------------------
# Concrete downloader
# ----------------------------------------------------------------------
class HttpTileDownloader:
"""Concrete :class:`TileDownloader` against ``satellite-provider``'s GET surface.
All cross-component dependencies (``tile_writer``,
``budget_enforcer``) are constructor-injected via Protocol cuts;
the composition root binds them to the c6 implementations. The
``http_client`` is caller-owned: production wiring uses one
long-lived :class:`httpx.Client` per process; tests inject
``httpx.Client(transport=httpx.MockTransport(...))`` for
deterministic responses.
"""
def __init__(
self,
*,
http_client: httpx.Client,
tile_writer: _TileWriterLike,
budget_enforcer: _BudgetEnforcerLike,
logger: logging.Logger,
config: C11Config,
sleep: Any = None,
backoff_schedule_s: tuple[float, ...] | None = None,
) -> None:
self._http_client = http_client
self._tile_writer = tile_writer
self._budget_enforcer = budget_enforcer
self._logger = logger
self._config = config
self._sleep = sleep if sleep is not None else _default_sleep
self._backoff_schedule_s = backoff_schedule_s or _DEFAULT_BACKOFF_SCHEDULE_S
# ------------------------------------------------------------------
# Public Protocol surface
# ------------------------------------------------------------------
def download_tiles_for_area(self, request: DownloadRequest) -> DownloadBatchReport:
"""Idempotence check → enumerate → budget → per-tile GET loop."""
req_hash = request_hash(
request.flight_id,
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
tuple(request.zoom_levels),
request.sector_class,
self._config.service_api_key,
)
journal_path = _journal_path(request.cache_root, request.flight_id, req_hash)
existing = _read_journal(journal_path)
if existing is not None and existing.completed_at_iso is not None:
self._logger.info(
"Idempotent re-run detected; zero work scheduled",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_IDEMPOTENT,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"tile_ids_completed": len(existing.tile_ids_completed),
},
},
)
counts = existing.tile_counts
return DownloadBatchReport(
outcome=DownloadOutcome.IDEMPOTENT_NO_OP,
tiles_requested=int(counts.get("tiles_requested", len(existing.tile_ids_completed))),
tiles_downloaded=int(counts.get("tiles_downloaded", len(existing.tile_ids_completed))),
tiles_rejected_resolution=int(counts.get("tiles_rejected_resolution", 0)),
tiles_rejected_freshness=int(counts.get("tiles_rejected_freshness", 0)),
tiles_downgraded=int(counts.get("tiles_downgraded", 0)),
retry_count=0,
request_hash=req_hash,
)
journal = existing or _JournalState(
flight_id=str(request.flight_id),
request_hash=req_hash,
started_at_iso=_iso_now(),
)
completed_set = set(journal.tile_ids_completed)
session = _DownloadSession(
request=request,
journal=journal,
journal_path=journal_path,
completed_set=completed_set,
)
self._logger.info(
"Pre-flight tile download session started",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_START,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"bbox": [
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
],
"zoom_levels": list(request.zoom_levels),
"sector_class": request.sector_class.value,
"resume_from_journal": completed_set != set(),
"tiles_already_completed": len(completed_set),
},
},
)
outcome: DownloadOutcome = DownloadOutcome.SUCCESS
try:
summaries = self._enumerate_remote(request)
session.tiles_requested = len(summaries)
self._reserve_budget(summaries, completed_set, session)
for summary in summaries:
if summary.tile_id_str in completed_set:
continue
self._download_one_tile(summary, request, session)
journal.completed_at_iso = _iso_now()
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
return DownloadBatchReport(
outcome=outcome,
tiles_requested=session.tiles_requested,
tiles_downloaded=session.tiles_downloaded,
tiles_rejected_resolution=session.tiles_rejected_resolution,
tiles_rejected_freshness=session.tiles_rejected_freshness,
tiles_downgraded=session.tiles_downgraded,
retry_count=session.retry_count,
request_hash=req_hash,
)
except (
SatelliteProviderError,
RateLimitedError,
CacheBudgetExceededError,
):
outcome = DownloadOutcome.FAILURE
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
raise
except Exception:
outcome = DownloadOutcome.FAILURE
journal.tile_counts = self._counts_dict(session)
_atomic_write_json(journal_path, journal.to_json_bytes())
raise
finally:
self._logger.info(
"Pre-flight tile download session ended",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_END,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"outcome": outcome.value,
"tiles_requested": session.tiles_requested,
"tiles_downloaded": session.tiles_downloaded,
"tiles_rejected_resolution": session.tiles_rejected_resolution,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
"tiles_downgraded": session.tiles_downgraded,
"retry_count": session.retry_count,
},
},
)
if session.tiles_rejected_freshness:
self._logger.warning(
"Freshness gate rejected tiles in this batch",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FRESHNESS_REJECT,
"kv": {
"flight_id": str(request.flight_id),
"request_hash": req_hash,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
},
},
)
def enumerate_remote_coverage(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: Any,
) -> list[TileSummary]:
"""Read-only enumeration; issues one ``list-only=true`` GET."""
return list(
self._do_enumerate(
bbox_min_lat,
bbox_min_lon,
bbox_max_lat,
bbox_max_lon,
tuple(int(z) for z in zoom_levels),
)
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _enumerate_remote(self, request: DownloadRequest) -> list[TileSummary]:
return list(
self._do_enumerate(
request.bbox_min_lat,
request.bbox_min_lon,
request.bbox_max_lat,
request.bbox_max_lon,
tuple(request.zoom_levels),
)
)
def _do_enumerate(
self,
bbox_min_lat: float,
bbox_min_lon: float,
bbox_max_lat: float,
bbox_max_lon: float,
zoom_levels: tuple[int, ...],
) -> list[TileSummary]:
params = {
"bbox": f"{bbox_min_lat},{bbox_min_lon},{bbox_max_lat},{bbox_max_lon}",
"zoom": ",".join(str(z) for z in zoom_levels),
_LIST_QUERY_LIST_ONLY: "true",
}
response = self._send_get(
self._config.satellite_provider_url.rstrip("/") + _LIST_PATH,
params=params,
session=None,
)
try:
body = response.json()
except ValueError as exc:
self._log_provider_failure(
"list_not_json", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider returned non-JSON list-only body"
) from exc
try:
entries = body["tiles"]
except (KeyError, TypeError) as exc:
self._log_provider_failure(
"list_schema", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider list-only response missing 'tiles'"
) from exc
summaries: list[TileSummary] = []
for entry in entries:
try:
summaries.append(
TileSummary(
tile_id_str=str(entry["tile_id"]),
zoom_level=int(entry["zoom_level"]),
lat=float(entry["lat"]),
lon=float(entry["lon"]),
produced_at=_parse_iso(str(entry["produced_at"])),
resolution_m_per_px=float(entry["resolution_m_per_px"]),
estimated_bytes=int(entry["estimated_bytes"]),
tile_size_meters=float(entry.get("tile_size_meters", 100.0)),
tile_size_pixels=int(entry.get("tile_size_pixels", 256)),
)
)
except (KeyError, TypeError, ValueError) as exc:
self._log_provider_failure(
"list_tile_schema", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider list-only entry missing required fields"
) from exc
return summaries
def _reserve_budget(
self,
summaries: list[TileSummary],
completed_set: set[str],
session: _DownloadSession,
) -> None:
remaining_bytes = sum(
int(s.estimated_bytes)
for s in summaries
if s.tile_id_str not in completed_set
)
if remaining_bytes <= 0:
return
try:
self._budget_enforcer.reserve_headroom(remaining_bytes)
except CacheBudgetExceededError:
self._log_budget_failure(remaining_bytes)
raise
except Exception as exc:
self._log_budget_failure(remaining_bytes, detail=str(exc))
raise CacheBudgetExceededError(
f"c6 budget enforcer refused {remaining_bytes} bytes "
f"of head-room: {exc}"
) from exc
def _download_one_tile(
self,
summary: TileSummary,
request: DownloadRequest,
session: _DownloadSession,
) -> None:
if summary.resolution_m_per_px < self._config.download_resolution_floor_m_per_px:
session.tiles_rejected_resolution += 1
self._logger.warning(
"Resolution gate rejected tile",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_RESOLUTION_REJECT,
"kv": {
"flight_id": str(request.flight_id),
"tile_id": summary.tile_id_str,
"resolution_m_per_px": summary.resolution_m_per_px,
"floor_m_per_px": self._config.download_resolution_floor_m_per_px,
},
},
)
return
ingest_url = (
self._config.satellite_provider_url.rstrip("/")
+ _GET_PATH
+ f"/{summary.tile_id_str}"
)
response = self._send_get(ingest_url, params=None, session=session)
if not response.content:
self._log_provider_failure(
"empty_body", response.status_code, summary.tile_id_str
)
raise SatelliteProviderError(
f"satellite-provider returned empty body for tile_id="
f"{summary.tile_id_str}"
)
tile_blob = response.content
content_sha256_hex = hashlib.sha256(tile_blob).hexdigest()
produced_at = summary.produced_at
if produced_at.tzinfo is None:
produced_at = produced_at.replace(tzinfo=timezone.utc)
try:
label = self._tile_writer.write_tile_for_download(
tile_blob=tile_blob,
zoom_level=summary.zoom_level,
lat=summary.lat,
lon=summary.lon,
tile_size_meters=summary.tile_size_meters,
tile_size_pixels=summary.tile_size_pixels,
capture_timestamp=produced_at,
content_sha256_hex=content_sha256_hex,
sector_class=request.sector_class.value,
)
except Exception as exc:
if _is_freshness_rejection(exc):
session.tiles_rejected_freshness += 1
return
raise
if label == "downgraded":
session.tiles_downgraded += 1
session.tiles_downloaded += 1
self._logger.warning(
"Freshness label downgraded by c6",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_FRESHNESS_DOWNGRADED,
"kv": {
"flight_id": str(request.flight_id),
"tile_id": summary.tile_id_str,
},
},
)
else:
session.tiles_downloaded += 1
session.completed_set.add(summary.tile_id_str)
session.journal.tile_ids_completed = sorted(session.completed_set)
session.journal.tile_counts = self._counts_dict(session)
_atomic_write_json(session.journal_path, session.journal.to_json_bytes())
def _send_get(
self,
url: str,
params: dict[str, str] | None,
session: _DownloadSession | None,
) -> httpx.Response:
"""GET with auth header + 429 / 5xx handling."""
headers = {"Authorization": f"Bearer {self._config.service_api_key}"}
attempt = 0
last_error: str | None = None
while True:
attempt += 1
try:
response = self._http_client.get(
url,
params=params,
headers=headers,
timeout=self._config.download_http_timeout_s,
)
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteError,
httpx.RemoteProtocolError,
) as exc:
last_error = f"transport:{type(exc).__name__}:{exc}"
if attempt > self._config.download_max_5xx_retries:
self._log_provider_failure("connection_error", None, last_error)
raise SatelliteProviderError(
f"satellite-provider unreachable after "
f"{attempt - 1} retries: {last_error}"
) from exc
self._sleep_with_log(
self._backoff_for(attempt - 1), last_error, session
)
continue
if response.status_code in (401, 403):
self._log_provider_failure(
"auth_failed", response.status_code, "fail-fast"
)
raise SatelliteProviderError(
f"satellite-provider rejected auth (http_status="
f"{response.status_code}); fail-fast"
)
if response.status_code == 429:
wait_s = _parse_retry_after(
response.headers.get("Retry-After"),
self._config.download_max_retry_after_s
- (session.rate_limit_budget_used_s if session else 0),
)
if session is not None:
session.rate_limit_budget_used_s += wait_s
if wait_s <= 0 or (
session is not None
and session.rate_limit_budget_used_s
>= self._config.download_max_retry_after_s
):
self._log_provider_failure(
"rate_limited", 429, "Retry-After budget exhausted"
)
raise RateLimitedError(
"satellite-provider rate-limited the download; "
f"cumulative Retry-After budget "
f"{(session.rate_limit_budget_used_s if session else 0)}s "
f"exceeds cap {self._config.download_max_retry_after_s}s"
)
self._sleep_with_log(wait_s, f"http_429_retry_after={wait_s}", session)
continue
if response.status_code >= 500:
last_error = f"http_status={response.status_code}"
if attempt > self._config.download_max_5xx_retries:
self._log_provider_failure(
"persistent_5xx", response.status_code, last_error
)
raise SatelliteProviderError(
f"satellite-provider returned {response.status_code} "
f"after {attempt - 1} retries"
)
self._sleep_with_log(
self._backoff_for(attempt - 1), last_error, session
)
continue
if response.status_code != 200:
self._log_provider_failure(
"unexpected_status", response.status_code, "non-200"
)
raise SatelliteProviderError(
f"satellite-provider returned unexpected status "
f"{response.status_code} (expected 200)"
)
return response
def _backoff_for(self, attempt_idx: int) -> float:
if attempt_idx < 0:
attempt_idx = 0
if attempt_idx >= len(self._backoff_schedule_s):
attempt_idx = len(self._backoff_schedule_s) - 1
return self._backoff_schedule_s[attempt_idx]
def _sleep_with_log(
self, wait_s: float, reason: str, session: _DownloadSession | None
) -> None:
if session is not None:
session.retry_count += 1
self._logger.warning(
"Download batch retrying",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_RETRY,
"kv": {
"wait_s": wait_s,
"reason": reason,
"retry_count": session.retry_count if session is not None else None,
},
},
)
self._sleep(wait_s)
def _log_provider_failure(
self, reason: str, http_status: int | None, detail: str
) -> None:
self._logger.error(
"Download provider failed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PROVIDER_FAIL,
"kv": {
"reason": reason,
"http_status": http_status,
"detail": detail,
"auth_header": _AUTH_HEADER_REDACTED,
},
},
)
def _log_budget_failure(
self, requested_bytes: int, detail: str | None = None
) -> None:
self._logger.error(
"Cache-budget pre-check failed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_BUDGET_FAIL,
"kv": {
"requested_bytes": requested_bytes,
"detail": detail,
},
},
)
def _counts_dict(self, session: _DownloadSession) -> dict[str, int]:
return {
"tiles_requested": session.tiles_requested,
"tiles_downloaded": session.tiles_downloaded,
"tiles_rejected_resolution": session.tiles_rejected_resolution,
"tiles_rejected_freshness": session.tiles_rejected_freshness,
"tiles_downgraded": session.tiles_downgraded,
}
# ----------------------------------------------------------------------
# Module-level helpers
# ----------------------------------------------------------------------
def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="microseconds")
def _parse_iso(raw: str) -> datetime:
s = raw.strip()
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s)
def _is_freshness_rejection(exc: BaseException) -> bool:
"""Structural test: c6 raises ``FreshnessRejectionError``.
The composition-root adapter is free to re-raise the c6 type
directly; we recognise it by class name to avoid importing the
c6 errors module here.
"""
if exc.__class__.__name__ == "FreshnessRejectionError":
return True
for base in type(exc).__mro__:
if base.__name__ == "FreshnessRejectionError":
return True
return False