[AZ-319] C11 HttpTileUploader (post-landing upload path)

Lands the production HttpTileUploader composing AZ-317's gate, AZ-318's
per-flight signing, and consumer-side cuts over c6 storage. Implements
the full upload flow: gate ON_GROUND -> start_session -> enumerate
pending -> per-batch multipart POST with Ed25519 signing -> mark_uploaded
on ack -> end_session in finally. Honours Retry-After (RFC 7231 int +
HTTP-date), exponential backoff on 5xx, fail-fast on TLS/401/403.

Adds C11Config block, three FDR kinds (tile.queued, tile.rejected,
batch.complete), and the build_tile_uploader composition-root factory.
Cross-component access to c6 stays Protocol-cut (AZ-507 / AZ-270).

Tests: 17 new unit tests covering AC-1..AC-14 plus throughput NFR; AZ-272
schema fixtures for the three new FDR kinds. Full unit suite: 1404 passed.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 06:13:36 +03:00
parent cde237e236
commit 610e8a743c
15 changed files with 2461 additions and 24 deletions
@@ -2,19 +2,27 @@
Re-exports the Protocol surface (``TileDownloader``, ``TileUploader``,
``FlightStateSource``), the upload-side services that have landed
(``FlightStateGate`` from AZ-317, ``PerFlightKeyManager`` from
AZ-318), the C11 internal DTOs / enums, and the C11 error family.
The download-side concrete impl (``HttpTileDownloader``) ships in
AZ-316; the upload-side concrete impl (``TileUploader``) ships in
AZ-319 — both will be added to ``__all__`` then.
(``FlightStateGate`` from AZ-317, ``PerFlightKeyManager`` from AZ-318,
``HttpTileUploader`` from AZ-319), the C11 internal DTOs / enums, the
C11 error family, and the per-component config block. The download-side
concrete impl (``HttpTileDownloader``) ships in AZ-316; it will be added
to ``__all__`` then.
"""
from gps_denied_onboard.components.c11_tile_manager._types import (
FlightStateSignal,
IngestStatus,
PerTileStatus,
PublicKeyFingerprint,
UploadBatchReport,
UploadOutcome,
UploadRequest,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
FlightStateNotOnGroundError,
RateLimitedError,
SatelliteProviderError,
SessionNotActiveError,
SignatureRejectedError,
TileManagerError,
@@ -30,17 +38,34 @@ from gps_denied_onboard.components.c11_tile_manager.interface import (
from gps_denied_onboard.components.c11_tile_manager.signing_key import (
PerFlightKeyManager,
)
from gps_denied_onboard.components.c11_tile_manager.tile_uploader import (
HttpTileUploader,
canonical_payload_bytes,
)
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c11_tile_manager", C11Config)
__all__ = [
"C11Config",
"FlightStateGate",
"FlightStateNotOnGroundError",
"FlightStateSignal",
"FlightStateSource",
"HttpTileUploader",
"IngestStatus",
"PerFlightKeyManager",
"PerTileStatus",
"PublicKeyFingerprint",
"RateLimitedError",
"SatelliteProviderError",
"SessionNotActiveError",
"SignatureRejectedError",
"TileDownloader",
"TileManagerError",
"TileUploader",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
"canonical_payload_bytes",
]
@@ -1,10 +1,14 @@
"""C11 internal DTOs (AZ-317, AZ-318).
"""C11 internal DTOs (AZ-317, AZ-318, AZ-319).
* :class:`FlightStateSignal` — the five flight-state signals consumed by
the upload-side flight-state gate (AZ-317).
* :class:`PublicKeyFingerprint` — the per-flight Ed25519 keypair
fingerprint envelope returned by :meth:`PerFlightKeyManager.start_session`
(AZ-318).
* :class:`FlightStateSignal` — five flight-state signals consumed by the
upload-side flight-state gate (AZ-317).
* :class:`PublicKeyFingerprint` — per-flight Ed25519 keypair fingerprint
envelope returned by :meth:`PerFlightKeyManager.start_session` (AZ-318).
* :class:`UploadRequest`, :class:`UploadBatchReport`,
:class:`PerTileStatus`, :class:`IngestStatus`, :class:`UploadOutcome` —
upload-side DTOs and enums consumed and produced by the AZ-319
:class:`HttpTileUploader` (contract
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md`` v1.0.0).
Internal to the component — composition-root code reaches these via the
``c11_tile_manager`` package re-exports; consumers outside C11 use the
@@ -20,7 +24,12 @@ from uuid import UUID
__all__ = [
"FlightStateSignal",
"IngestStatus",
"PerTileStatus",
"PublicKeyFingerprint",
"UploadBatchReport",
"UploadOutcome",
"UploadRequest",
]
@@ -52,3 +61,103 @@ class PublicKeyFingerprint:
public_key_pem: bytes
fingerprint: str
generated_at: datetime
class IngestStatus(str, Enum):
"""Per-tile status returned by the parent-suite ingest endpoint.
``QUEUED`` / ``DUPLICATE`` / ``SUPERSEDED`` are all treated as
successful acknowledgement (AC-4); ``REJECTED`` keeps the tile in
C6's ``pending_uploads`` for human review (AC-3). The upload-side
flow only writes ``mark_uploaded`` for the three successful states.
"""
QUEUED = "queued"
REJECTED = "rejected"
DUPLICATE = "duplicate"
SUPERSEDED = "superseded"
class UploadOutcome(str, Enum):
"""Aggregate outcome of one :meth:`TileUploader.upload_pending_tiles` call.
Mirrors contract Shape § ``UploadBatchReport.outcome``:
* ``SUCCESS`` — every per-tile status is ``QUEUED`` /
``DUPLICATE`` / ``SUPERSEDED``.
* ``PARTIAL`` — some tiles were ``REJECTED`` while others were
acknowledged; the caller may re-invoke for the rejected set.
* ``FAILURE`` — the flight-state gate blocked or zero tiles could
be POSTed (TLS / 401 / 403 / persistent 5xx surface as raised
:class:`SatelliteProviderError`, NOT as ``FAILURE`` in a returned
report).
"""
SUCCESS = "success"
PARTIAL = "partial"
FAILURE = "failure"
@dataclass(frozen=True)
class UploadRequest:
"""Inputs to :meth:`TileUploader.upload_pending_tiles`.
``flight_id`` is optional: ``None`` means "every pending tile across
every flight" (typical post-landing operator workflow);
``UUID(...)`` restricts the batch to one flight (used by C12 when
re-driving a single flight's pending journal).
"""
batch_size: int
satellite_provider_url: str
flight_id: UUID | None = None
def __post_init__(self) -> None:
if not 1 <= self.batch_size <= 200:
raise ValueError(
"UploadRequest.batch_size must be in [1, 200]; "
f"got {self.batch_size}"
)
if not self.satellite_provider_url:
raise ValueError(
"UploadRequest.satellite_provider_url must be non-empty"
)
@dataclass(frozen=True)
class PerTileStatus:
"""Per-tile result attached to :class:`UploadBatchReport.per_tile_status`.
``rejection_reason`` is only populated when ``status == REJECTED``;
it carries the parent-suite's free-text explanation (e.g.
``"invalid signature"``, ``"capture_timestamp out of range"``).
The C11 uploader matches the ``"signature"`` substring to decide
whether to also call
:meth:`PerFlightKeyManager.record_signature_rejection`.
"""
tile_id: str
status: IngestStatus
rejection_reason: str | None = None
@dataclass(frozen=True)
class UploadBatchReport:
"""Aggregate report returned by :meth:`TileUploader.upload_pending_tiles`.
``batch_uuid`` is the LAST successful batch's identifier from the
parent-suite ingest endpoint (or a fresh ``uuid4()`` when no batch
POSTed; documented). The full per-batch ID stream is captured in
the FDR ``c11.upload.batch.complete`` records.
``public_key_fingerprint`` carries the 16-hex AZ-318 fingerprint so
the operator can cross-reference against the safety-officer's
pre-enrolment table without re-reading the FDR.
"""
batch_uuid: UUID
per_tile_status: tuple[PerTileStatus, ...]
retry_count: int
next_retry_at_s: int | None
outcome: UploadOutcome
public_key_fingerprint: str
@@ -0,0 +1,73 @@
"""C11 TileManager config block (AZ-319).
Registered into ``config.components['c11_tile_manager']`` by the
package ``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
reads this block to drive the upload path's HTTP behaviour and to
identify the producing companion against the parent suite's voting
layer.
The four fields below match the AZ-319 task spec § ``Outcome`` —
``config.c11.satellite_provider_ingest_url``,
``config.c11.upload_batch_size``, ``config.c11.upload_http_timeout_s``,
``config.c11.companion_id``. The ``upload_max_retry_after_s`` cap is
the Risk-3 ceiling on cumulative ``Retry-After`` budget for 429
responses (see :class:`RateLimitedError`).
"""
from __future__ import annotations
from dataclasses import dataclass
from gps_denied_onboard.config.schema import ConfigError
__all__ = ["C11Config"]
_DEFAULT_BATCH_SIZE: int = 25
_DEFAULT_HTTP_TIMEOUT_S: float = 30.0
_DEFAULT_MAX_RETRY_AFTER_S: int = 600
_MAX_BATCH_SIZE: int = 200
@dataclass(frozen=True)
class C11Config:
"""Per-component config for C11 tile manager (upload path).
``satellite_provider_ingest_url`` is the parent-suite ingest base
URL (e.g. ``https://satellite-provider.example.com``); the
uploader appends ``/api/satellite/tiles/ingest`` to it. Defaulted
to empty so unit tests / replay runs that do not exercise the
upload path stay no-op; production configuration MUST set this
via YAML / env override or :class:`HttpTileUploader` raises
:class:`SatelliteProviderError` on the first attempt.
``companion_id`` is the stable per-companion identifier the
parent suite's voting layer uses to attribute uploads to one
physical airframe. Defaulted to empty so test runs without a
paired companion stay valid; the factory raises ``ConfigError``
when the empty default is used in operator / production wiring.
"""
satellite_provider_ingest_url: str = ""
upload_batch_size: int = _DEFAULT_BATCH_SIZE
upload_http_timeout_s: float = _DEFAULT_HTTP_TIMEOUT_S
upload_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
companion_id: str = ""
def __post_init__(self) -> None:
if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE:
raise ConfigError(
"C11Config.upload_batch_size must be in "
f"[1, {_MAX_BATCH_SIZE}]; got {self.upload_batch_size}"
)
if self.upload_http_timeout_s <= 0:
raise ConfigError(
"C11Config.upload_http_timeout_s must be > 0; "
f"got {self.upload_http_timeout_s}"
)
if self.upload_max_retry_after_s <= 0:
raise ConfigError(
"C11Config.upload_max_retry_after_s must be > 0; "
f"got {self.upload_max_retry_after_s}"
)
@@ -1,4 +1,4 @@
"""C11 TileManager error family (AZ-317, AZ-318, plus reserved AZ-319 envelope).
"""C11 TileManager error family (AZ-317, AZ-318, AZ-319).
Rooted at :class:`TileManagerError`. The parent is declared here (rather
than alongside the AZ-316 ``TileDownloader``) so the upload-side tasks
@@ -11,9 +11,14 @@ subclasses without re-declaring the parent.
``ON_GROUND`` at upload entry.
* :class:`SessionNotActiveError` (AZ-318) — :meth:`PerFlightKeyManager.sign`
/ :meth:`record_signature_rejection` called outside an active session.
* :class:`SignatureRejectedError` (AZ-318 envelope) — defined here for
the upload-side error family; raised by ``TileUploader`` (separate
task) after parsing the ``satellite-provider`` ingest response.
* :class:`SignatureRejectedError` (AZ-318/AZ-319 envelope) — surfaced
by the AZ-319 :class:`HttpTileUploader` after parsing a
``REJECTED`` per-tile response whose ``rejection_reason`` mentions
the signature.
* :class:`SatelliteProviderError` (AZ-319) — TLS / 401 / 403 fail-fast
AND persistent-5xx fail-after-retries surface for the upload path.
* :class:`RateLimitedError` (AZ-319) — 429 with persistent
``Retry-After`` budget exhaustion.
"""
from __future__ import annotations
@@ -28,6 +33,8 @@ if TYPE_CHECKING:
__all__ = [
"FlightStateNotOnGroundError",
"RateLimitedError",
"SatelliteProviderError",
"SessionNotActiveError",
"SignatureRejectedError",
"TileManagerError",
@@ -77,3 +84,27 @@ class SignatureRejectedError(TileManagerError):
to surface the FDR + ERROR log envelope per AZ-318 AC-8 before
re-raising this exception to the operator-tooling layer.
"""
class SatelliteProviderError(TileManagerError):
"""``satellite-provider`` ingest endpoint failed terminally.
Surface for the upload path's fail-fast (TLS handshake failure,
401 / 403 auth rejection on the FIRST attempt) and
fail-after-retries (5 consecutive 5xx) classes per AZ-319 AC-9 /
AC-10. The original :class:`httpx.HTTPError` is preserved on
``__cause__``; the message names the HTTP status when known.
"""
class RateLimitedError(TileManagerError):
"""``satellite-provider`` ingest endpoint rate-limited the upload.
Raised when the parent suite returns 429 and the cumulative
``Retry-After`` budget exceeds
:attr:`C11Config.upload_max_retry_after_s`. The
AZ-319 :class:`HttpTileUploader` honours the FIRST 429's
``Retry-After`` (sleep + retry) but escalates to this error after
the configured budget so the operator can surface the rate-limit
state explicitly.
"""
@@ -4,7 +4,10 @@ Operator-side ONLY — excluded from airborne via CMake (`BUILD_C11_TILE_MANAGER
See `_docs/02_document/components/12_c11_tilemanager/`.
* :class:`TileDownloader` — pre-flight download path (AZ-316, pending).
* :class:`TileUploader` — post-landing upload path (AZ-319, pending).
* :class:`TileUploader` — post-landing upload path (AZ-319) — the
authoritative shape lives in
``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md``
v1.0.0 and is mirrored 1:1 here.
* :class:`FlightStateSource` — thin C11-facing adapter the upload-side
flight-state gate (AZ-317) calls to read "what is the FC saying right
now?". A concrete impl ships with E-C8 (subscribes to the FC adapter's
@@ -15,13 +18,16 @@ See `_docs/02_document/components/12_c11_tilemanager/`.
from __future__ import annotations
from collections.abc import Iterable
from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Protocol, runtime_checkable
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
from gps_denied_onboard._types.tile import TileRecord
from gps_denied_onboard.components.c11_tile_manager._types import (
FlightStateSignal,
UploadBatchReport,
UploadRequest,
)
__all__ = [
@@ -39,10 +45,25 @@ class TileDownloader(Protocol):
) -> Iterable[TileRecord]: ...
@runtime_checkable
class TileUploader(Protocol):
"""Post-landing batch upload to the `satellite-provider` ingest endpoint (D-PROJ-2)."""
"""Post-landing batch upload to ``satellite-provider`` ingest (D-PROJ-2).
def upload(self, tiles: Iterable[TileRecord], flight_id: str) -> None: ...
See ``_docs/02_document/contracts/c11_tilemanager/tile_uploader.md``
v1.0.0 for invariants I-1 .. I-8 and the per-method error matrix.
The :meth:`enumerate_pending_tiles` return type is the consumer-
side structural metadata shape (mirrors c6's ``TileMetadata``;
declared as ``Sequence[Any]`` here to keep C11 free of cross-
component imports per AZ-507).
"""
def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport: ...
def enumerate_pending_tiles(
self, flight_id: UUID | None = None
) -> Sequence[Any]: ...
def confirm_flight_state(self) -> FlightStateSignal: ...
@runtime_checkable
@@ -0,0 +1,754 @@
"""C11 ``HttpTileUploader`` (AZ-319) — concrete :class:`TileUploader`.
Operator-side post-landing upload path. Reads pending mid-flight tiles
from C6 (``source = onboard_ingest``, ``uploaded_at IS NULL``), packages
each per the D-PROJ-2 multipart contract sketch, signs with the per-flight
ephemeral key (AZ-318), POSTs to ``satellite-provider``'s ingest
endpoint, and marks acknowledged tiles uploaded. Gates on ``ON_GROUND``
(AZ-317) before any C6 read or network egress; zeroes the signing key
in a try/finally regardless of outcome.
Architecture
------------
The c6 storage surface is reached through structural :class:`Protocol`
cuts (:class:`_PendingMetadataReader`, :class:`_TileBytesReader`,
:class:`_TilePixelHandleLike`) defined in this module — never via a
direct ``from gps_denied_onboard.components.c6_tile_cache import`` (the
AZ-507 cross-component rule + the AZ-270 lint
``test_az270_compose_root.test_ac6`` enforce this on every
``components/**/*.py`` file). The composition root
(``runtime_root.c11_factory.build_tile_uploader``) is the single layer
that may bind concrete c6 implementations into the constructor.
"""
from __future__ import annotations
import hashlib
import json
import logging
import struct
from dataclasses import dataclass
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from types import TracebackType
from typing import Any, Protocol, runtime_checkable
from uuid import UUID, uuid4
import httpx
from gps_denied_onboard.components.c11_tile_manager._types import (
IngestStatus,
PerTileStatus,
UploadBatchReport,
UploadOutcome,
UploadRequest,
)
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
from gps_denied_onboard.components.c11_tile_manager.errors import (
RateLimitedError,
SatelliteProviderError,
SignatureRejectedError,
)
from gps_denied_onboard.components.c11_tile_manager.flight_state_gate import (
FlightStateGate,
)
from gps_denied_onboard.components.c11_tile_manager.signing_key import (
PerFlightKeyManager,
)
from gps_denied_onboard.fdr_client import (
CURRENT_SCHEMA_VERSION,
FdrClient,
FdrRecord,
)
__all__ = [
"HttpTileUploader",
"canonical_payload_bytes",
]
_INGEST_PATH = "/api/satellite/tiles/ingest"
_FDR_KIND_TILE_QUEUED = "c11.upload.tile.queued"
_FDR_KIND_TILE_REJECTED = "c11.upload.tile.rejected"
_FDR_KIND_BATCH_COMPLETE = "c11.upload.batch.complete"
_LOG_KIND_SESSION_START = "c11.upload.session.started"
_LOG_KIND_SESSION_END = "c11.upload.session.completed"
_LOG_KIND_RETRY = "c11.upload.batch.retry"
_LOG_KIND_PROVIDER_FAIL = "c11.upload.provider.failed"
_COMPONENT = "c11_tile_manager.tile_uploader"
_BACKOFF_SCHEDULE_S: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0)
_REJECTION_SIGNATURE_MARKER = "signature"
# ----------------------------------------------------------------------
# Consumer-side cuts over c6 (AZ-507): never imported across components.
# ----------------------------------------------------------------------
class _TilePixelHandleLike(Protocol):
"""Structural cut of c6's ``TilePixelHandle`` context manager.
The uploader only enters the context to read JPEG bytes for the
multipart payload; it never inspects the underlying mmap path or
holds the handle past the ``with`` block.
"""
def __enter__(self) -> memoryview: ...
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None: ...
@runtime_checkable
class _TileBytesReader(Protocol):
"""Structural cut of c6's ``TileStore.read_tile_pixels``.
The uploader takes ``Any`` for ``tile_id`` to avoid a c6 import; the
composition root binds the concrete c6 ``TileStore`` whose
``read_tile_pixels`` accepts the c6 ``TileId`` shape.
"""
def read_tile_pixels(self, tile_id: Any) -> _TilePixelHandleLike: ...
@runtime_checkable
class _PendingMetadataReader(Protocol):
"""Structural cut of c6's ``TileMetadataStore`` upload-side surface.
Three methods, narrow on purpose: enumerate pending,
forward-stamp ``uploaded_at`` per tile, and look up metadata by
id (used when ``request.flight_id`` filters the pending set).
"""
def pending_uploads(self) -> list[Any]: ...
def mark_uploaded(self, tile_id: Any, uploaded_at: datetime) -> None: ...
# ----------------------------------------------------------------------
# Canonical payload bytes (AC-13)
# ----------------------------------------------------------------------
def canonical_payload_bytes(
tile_blob: bytes, tile: Any, request: UploadRequest, companion_id: str
) -> bytes:
"""Deterministic SHA-256 over the multipart input set (AC-13).
Concatenation order is FROZEN: ``tile_blob`` || zoom_level (uint8) ||
latitude (float64 LE) || longitude (float64 LE) ||
capture_timestamp_ns (int64 LE) || flight_id (utf-8) ||
companion_id (utf-8) || quality_metadata_json (utf-8 with sorted
keys, no whitespace). Re-ordering OR re-formatting any field is a
breaking change to the parent suite's signature verification — bump
the contract major version per the contract's Versioning Rules.
"""
flight_id_bytes = (tile.flight_id or "").encode("utf-8")
companion_bytes = companion_id.encode("utf-8")
quality_json = _serialise_quality_metadata(getattr(tile, "quality_metadata", None))
capture_ts_ns = _datetime_to_unix_ns(tile.capture_timestamp)
parts = [
bytes(tile_blob),
struct.pack("<B", int(tile.tile_id.zoom_level)),
struct.pack("<d", float(tile.tile_id.lat)),
struct.pack("<d", float(tile.tile_id.lon)),
struct.pack("<q", capture_ts_ns),
flight_id_bytes,
companion_bytes,
quality_json.encode("utf-8"),
]
return hashlib.sha256(b"".join(parts)).digest()
def _serialise_quality_metadata(quality: Any) -> str:
if quality is None:
return "null"
payload = {
"estimator_label": quality.estimator_label,
"covariance_2x2": [list(row) for row in quality.covariance_2x2],
"last_anchor_age_ms": int(quality.last_anchor_age_ms),
"mre_px": float(quality.mre_px),
"imu_bias_norm": float(quality.imu_bias_norm),
}
return json.dumps(payload, sort_keys=True, separators=(",", ":"))
def _datetime_to_unix_ns(ts: datetime) -> int:
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
seconds = int(ts.timestamp())
micros = ts.microsecond
return seconds * 1_000_000_000 + micros * 1_000
# ----------------------------------------------------------------------
# Retry-After parsing (AC-8)
# ----------------------------------------------------------------------
def _parse_retry_after(header_value: str | None, max_s: int) -> int:
"""Return non-negative seconds to sleep, capped at ``max_s``.
Honours BOTH RFC 7231 forms: integer seconds and HTTP-date. Returns
0 when the header is missing or unparseable (the uploader still
backs off via its own schedule when the value is unusable).
"""
if header_value is None:
return 0
raw = header_value.strip()
if not raw:
return 0
if raw.isdigit():
return min(int(raw), max_s)
try:
retry_at = parsedate_to_datetime(raw)
except (TypeError, ValueError):
return 0
now = datetime.now(timezone.utc)
if retry_at.tzinfo is None:
retry_at = retry_at.replace(tzinfo=timezone.utc)
delta = (retry_at - now).total_seconds()
return max(0, min(int(delta), max_s))
# ----------------------------------------------------------------------
# Internal session-state container
# ----------------------------------------------------------------------
@dataclass
class _SessionState:
"""Mutable bookkeeping for one ``upload_pending_tiles`` call."""
flight_id_for_session: UUID
public_key_fingerprint: str
per_tile_results: list[PerTileStatus]
last_batch_uuid: UUID | None
retry_count: int
next_retry_at_s: int | None
rate_limit_budget_used_s: int
# ----------------------------------------------------------------------
# Concrete uploader
# ----------------------------------------------------------------------
class HttpTileUploader:
"""Concrete :class:`TileUploader` against ``satellite-provider``'s ingest endpoint.
All cross-component dependencies (``flight_state_gate``,
``key_manager``, ``tile_store``, ``tile_metadata_store``) are
constructor-injected via Protocol cuts. The ``http_client`` is an
:class:`httpx.Client` the caller owns; ``HttpTileUploader`` does
NOT close it — production wiring uses a long-lived client per
process; tests inject ``httpx.Client(transport=httpx.MockTransport)``
for deterministic responses.
"""
def __init__(
self,
*,
http_client: httpx.Client,
tile_store: _TileBytesReader,
tile_metadata_store: _PendingMetadataReader,
flight_state_gate: FlightStateGate,
key_manager: PerFlightKeyManager,
fdr_client: FdrClient,
logger: logging.Logger,
config: C11Config,
sleep: Any = None,
) -> None:
self._http_client = http_client
self._tile_store = tile_store
self._metadata_store = tile_metadata_store
self._gate = flight_state_gate
self._key_manager = key_manager
self._fdr = fdr_client
self._logger = logger
self._config = config
self._sleep = sleep if sleep is not None else _default_sleep
# ------------------------------------------------------------------
# Public Protocol surface
# ------------------------------------------------------------------
def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport:
"""Gate → start_session → enumerate → batch loop → finally end_session.
Order is FROZEN per Reliability constraint in the task spec —
re-ordering is a High Reliability finding at code-review time
because it breaks I-1 (gate before any read / network) or I-4
(zeroisation guarantee on every exit path).
"""
self._gate.confirm_on_ground()
flight_id_for_session = request.flight_id or uuid4()
fingerprint = self._key_manager.start_session(flight_id_for_session)
state = _SessionState(
flight_id_for_session=flight_id_for_session,
public_key_fingerprint=fingerprint.fingerprint,
per_tile_results=[],
last_batch_uuid=None,
retry_count=0,
next_retry_at_s=None,
rate_limit_budget_used_s=0,
)
self._logger.info(
"Per-flight upload session started",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_START,
"kv": {
"flight_id": str(flight_id_for_session),
"fingerprint": fingerprint.fingerprint,
"request_flight_filter": (
None if request.flight_id is None else str(request.flight_id)
),
},
},
)
outcome: UploadOutcome
try:
pending = self._enumerate(request)
if not pending:
outcome = UploadOutcome.SUCCESS
state.last_batch_uuid = uuid4()
else:
outcome = self._upload_batches(request, pending, state)
except (SatelliteProviderError, RateLimitedError, SignatureRejectedError):
outcome = UploadOutcome.FAILURE
raise
except Exception:
outcome = UploadOutcome.FAILURE
raise
finally:
self._key_manager.end_session()
self._emit_batch_complete(state, outcome)
self._logger.info(
"Per-flight upload session completed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_SESSION_END,
"kv": {
"flight_id": str(state.flight_id_for_session),
"fingerprint": state.public_key_fingerprint,
"outcome": outcome.value,
"total_attempted": len(state.per_tile_results),
},
},
)
assert state.last_batch_uuid is not None
return UploadBatchReport(
batch_uuid=state.last_batch_uuid,
per_tile_status=tuple(state.per_tile_results),
retry_count=state.retry_count,
next_retry_at_s=state.next_retry_at_s,
outcome=outcome,
public_key_fingerprint=state.public_key_fingerprint,
)
def enumerate_pending_tiles(
self, flight_id: UUID | None = None
) -> list[Any]:
"""Read-only enumeration; does NOT call the gate (per contract)."""
return self._filter_by_flight(self._metadata_store.pending_uploads(), flight_id)
def confirm_flight_state(self) -> Any:
"""Pass-through to :meth:`FlightStateGate.confirm_on_ground`."""
return self._gate.confirm_on_ground()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _enumerate(self, request: UploadRequest) -> list[Any]:
all_pending = self._metadata_store.pending_uploads()
return self._filter_by_flight(all_pending, request.flight_id)
@staticmethod
def _filter_by_flight(tiles: list[Any], flight_id: UUID | None) -> list[Any]:
if flight_id is None:
return list(tiles)
target = str(flight_id)
return [t for t in tiles if (t.flight_id or "") == target]
def _upload_batches(
self,
request: UploadRequest,
pending: list[Any],
state: _SessionState,
) -> UploadOutcome:
any_rejected = False
for batch_start in range(0, len(pending), request.batch_size):
batch = pending[batch_start : batch_start + request.batch_size]
response = self._post_batch_with_retry(request, batch, state)
batch_uuid, parsed = self._process_response(response, batch, state)
state.last_batch_uuid = batch_uuid
for tile_id, status in parsed:
if status.status == IngestStatus.REJECTED:
any_rejected = True
else:
self._metadata_store.mark_uploaded(
tile_id, datetime.now(timezone.utc)
)
state.per_tile_results.append(status)
if any_rejected:
return UploadOutcome.PARTIAL
return UploadOutcome.SUCCESS
def _post_batch_with_retry(
self,
request: UploadRequest,
batch: list[Any],
state: _SessionState,
) -> httpx.Response:
"""POST one batch, honouring 429 Retry-After and exponential 5xx backoff."""
ingest_url = request.satellite_provider_url.rstrip("/") + _INGEST_PATH
attempt = 0
last_error: str | None = None
while True:
attempt += 1
files, data = self._build_multipart(batch, request)
try:
response = self._http_client.post(
ingest_url,
files=files,
data=data,
timeout=self._config.upload_http_timeout_s,
)
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteError,
httpx.RemoteProtocolError,
) as exc:
last_error = f"transport:{type(exc).__name__}:{exc}"
if attempt >= len(_BACKOFF_SCHEDULE_S):
self._log_provider_failure("connection_error", None, last_error)
raise SatelliteProviderError(
"satellite-provider unreachable after "
f"{attempt} attempts: {last_error}"
) from exc
self._sleep_with_log(_BACKOFF_SCHEDULE_S[attempt - 1], last_error, state)
continue
if response.status_code in (401, 403):
self._log_provider_failure(
"auth_failed", response.status_code, "fail-fast"
)
raise SatelliteProviderError(
f"satellite-provider rejected auth (http_status="
f"{response.status_code}); fail-fast"
)
if response.status_code == 429:
wait_s = _parse_retry_after(
response.headers.get("Retry-After"),
self._config.upload_max_retry_after_s
- state.rate_limit_budget_used_s,
)
state.rate_limit_budget_used_s += wait_s
state.next_retry_at_s = wait_s
if wait_s <= 0 or state.rate_limit_budget_used_s >= self._config.upload_max_retry_after_s:
self._log_provider_failure(
"rate_limited", 429, "Retry-After budget exhausted"
)
raise RateLimitedError(
"satellite-provider rate-limited the upload; cumulative "
f"Retry-After budget {state.rate_limit_budget_used_s}s "
f"exceeds cap {self._config.upload_max_retry_after_s}s"
)
self._sleep_with_log(wait_s, f"http_429_retry_after={wait_s}", state)
continue
if response.status_code >= 500:
last_error = f"http_status={response.status_code}"
if attempt >= len(_BACKOFF_SCHEDULE_S):
self._log_provider_failure(
"persistent_5xx", response.status_code, last_error
)
raise SatelliteProviderError(
f"satellite-provider returned {response.status_code} "
f"after {attempt} attempts"
)
self._sleep_with_log(_BACKOFF_SCHEDULE_S[attempt - 1], last_error, state)
continue
if response.status_code != 202:
self._log_provider_failure(
"unexpected_status", response.status_code, "non-202"
)
raise SatelliteProviderError(
f"satellite-provider returned unexpected status "
f"{response.status_code} (expected 202)"
)
return response
def _build_multipart(
self, batch: list[Any], request: UploadRequest
) -> tuple[list[tuple[str, tuple[str, bytes, str]]], dict[str, str]]:
files: list[tuple[str, tuple[str, bytes, str]]] = []
per_tile_meta: list[dict[str, Any]] = []
for tile in batch:
with self._tile_store.read_tile_pixels(tile.tile_id) as view:
tile_bytes = bytes(view)
canonical = canonical_payload_bytes(
tile_bytes, tile, request, self._config.companion_id
)
signature = self._key_manager.sign(canonical)
tile_id_str = _tile_id_to_str(tile.tile_id)
files.append(
(
"tile_blob",
(f"{tile_id_str}.jpg", tile_bytes, "image/jpeg"),
)
)
per_tile_meta.append(
{
"tile_id": tile_id_str,
"zoomLevel": int(tile.tile_id.zoom_level),
"latitude": float(tile.tile_id.lat),
"longitude": float(tile.tile_id.lon),
"tile_size_meters": float(tile.tile_size_meters),
"tile_size_pixels": int(tile.tile_size_pixels),
"capture_timestamp": tile.capture_timestamp.isoformat(),
"flight_id": tile.flight_id or "",
"companion_id": self._config.companion_id,
"quality_metadata": _serialise_quality_metadata(
getattr(tile, "quality_metadata", None)
),
"signature": signature.hex(),
}
)
data = {"tiles_metadata": json.dumps(per_tile_meta, sort_keys=True)}
return files, data
def _process_response(
self,
response: httpx.Response,
batch: list[Any],
state: _SessionState,
) -> tuple[UUID, list[tuple[Any, PerTileStatus]]]:
"""Parse the parent suite's 202 body and emit per-tile FDR records."""
try:
body = response.json()
except ValueError as exc:
self._log_provider_failure(
"schema_not_json", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider returned non-JSON body for batch POST"
) from exc
try:
batch_uuid = UUID(str(body["batch_uuid"]))
per_tile_raw = body["per_tile_status"]
except (KeyError, ValueError) as exc:
self._log_provider_failure(
"schema_missing_fields", response.status_code, str(exc)
)
raise SatelliteProviderError(
"satellite-provider response missing batch_uuid or per_tile_status"
) from exc
batch_index = {_tile_id_to_str(t.tile_id): t for t in batch}
results: list[tuple[Any, PerTileStatus]] = []
for entry in per_tile_raw:
tile_id_str = str(entry["tile_id"])
try:
ingest_status = IngestStatus(entry["status"])
except (KeyError, ValueError) as exc:
self._log_provider_failure(
"schema_unknown_status",
response.status_code,
f"tile_id={tile_id_str}:{exc}",
)
raise SatelliteProviderError(
f"satellite-provider returned unknown status for "
f"tile_id={tile_id_str}: {entry.get('status')!r}"
) from exc
reason = entry.get("rejection_reason")
tile = batch_index.get(tile_id_str)
if tile is None:
self._log_provider_failure(
"schema_unknown_tile",
response.status_code,
f"tile_id={tile_id_str} not in batch",
)
raise SatelliteProviderError(
f"satellite-provider acknowledged tile_id={tile_id_str} "
"that was not in the POSTed batch"
)
status_dto = PerTileStatus(
tile_id=tile_id_str,
status=ingest_status,
rejection_reason=reason,
)
if ingest_status == IngestStatus.REJECTED:
self._handle_rejection(tile, tile_id_str, reason, state, batch_uuid)
else:
self._fdr.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_now(),
producer_id=self._fdr.producer_id,
kind=_FDR_KIND_TILE_QUEUED,
payload={
"flight_id": str(state.flight_id_for_session),
"tile_id": tile_id_str,
"fingerprint": state.public_key_fingerprint,
"batch_uuid": str(batch_uuid),
"status": ingest_status.value,
"observed_at_iso": _iso_now(),
},
)
)
results.append((tile.tile_id, status_dto))
return batch_uuid, results
def _handle_rejection(
self,
tile: Any,
tile_id_str: str,
reason: str | None,
state: _SessionState,
batch_uuid: UUID,
) -> None:
reason_text = reason or ""
if _REJECTION_SIGNATURE_MARKER in reason_text.lower():
self._key_manager.record_signature_rejection(
state.flight_id_for_session, tile_id_str
)
self._fdr.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_now(),
producer_id=self._fdr.producer_id,
kind=_FDR_KIND_TILE_REJECTED,
payload={
"flight_id": str(state.flight_id_for_session),
"tile_id": tile_id_str,
"fingerprint": state.public_key_fingerprint,
"batch_uuid": str(batch_uuid),
"rejection_reason": reason_text,
"observed_at_iso": _iso_now(),
},
)
)
def _emit_batch_complete(
self, state: _SessionState, outcome: UploadOutcome
) -> None:
total_attempted = len(state.per_tile_results)
queued = sum(
1
for s in state.per_tile_results
if s.status != IngestStatus.REJECTED
)
rejected = sum(
1 for s in state.per_tile_results if s.status == IngestStatus.REJECTED
)
batch_uuid_str = (
str(state.last_batch_uuid)
if state.last_batch_uuid is not None
else ""
)
self._fdr.enqueue(
FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_now(),
producer_id=self._fdr.producer_id,
kind=_FDR_KIND_BATCH_COMPLETE,
payload={
"flight_id": str(state.flight_id_for_session),
"fingerprint": state.public_key_fingerprint,
"batch_uuid": batch_uuid_str,
"outcome": outcome.value,
"total_attempted": total_attempted,
"total_queued": queued,
"total_rejected": rejected,
"retry_count": state.retry_count,
"observed_at_iso": _iso_now(),
},
)
)
def _sleep_with_log(
self, wait_s: float, reason: str, state: _SessionState
) -> None:
state.retry_count += 1
self._logger.warning(
"Upload batch retrying",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_RETRY,
"kv": {
"flight_id": str(state.flight_id_for_session),
"wait_s": wait_s,
"reason": reason,
"retry_count": state.retry_count,
},
},
)
self._sleep(wait_s)
def _log_provider_failure(
self, reason: str, http_status: int | None, detail: str
) -> None:
self._logger.error(
"Upload provider failed",
extra={
"component": _COMPONENT,
"kind": _LOG_KIND_PROVIDER_FAIL,
"kv": {
"reason": reason,
"http_status": http_status,
"detail": detail,
},
},
)
# ----------------------------------------------------------------------
# Module-level helpers
# ----------------------------------------------------------------------
def _tile_id_to_str(tile_id: Any) -> str:
return f"z{int(tile_id.zoom_level)}_{float(tile_id.lat):.6f}_{float(tile_id.lon):.6f}"
def _iso_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def _default_sleep(seconds: float) -> None:
"""Production sleep hook — routes through ``WallClock.sleep_until_ns``.
Tests inject a no-op or a stub so they don't wait on the retry path.
Routing through :class:`WallClock` keeps the AC-4 AST scan over
``components/`` from seeing a bare ``time.sleep``.
"""
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
clock.sleep_until_ns(clock.monotonic_ns() + int(seconds * 1_000_000_000))