mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 21:51:12 +00:00
[AZ-320] Add C11 IdempotentRetryTileUploader decorator
Wraps HttpTileUploader (AZ-319) with two bounded retry budgets: - In-call (per-batch) — re-invokes inner on PARTIAL outcome up to `max_in_call_retries` times with capped exponential backoff (`min(base ** attempt_number, cap)`). On exhaustion: surfaces an operator hint via `next_retry_at_s = now + backoff_cap_s`. - Per-tile (cross-call) — atomically increments c6's `tiles.upload_attempts` counter for every rejection; once a tile hits `max_per_tile_attempts` it is forward-only transitioned to `voting_status = upload_giveup` (excluded from `pending_uploads`). Each transition emits FDR `kind="c11.upload.giveup"` plus an ERROR log. C6 contract changes (AZ-303 v1.3.0): - VotingStatus.UPLOAD_GIVEUP added (forward-only from PENDING/TRUSTED). - TileMetadataStore.increment_upload_attempts(tile_id) -> int added with NotImplementedError default for backwards-compat. - Migration 0003_c11_upload_attempts: additive column + widened ck_tiles_voting_status (preserves IS NULL clause). C11 wiring: - C11RetryConfig + disable_retry_decorator on C11Config. - build_tile_uploader wraps in decorator by default; bypass flag returns the bare HttpTileUploader. New `clock` keyword. Cross-component isolation honoured (AZ-507): the decorator declares `_RetryMetadataStoreLike` Protocol cut over c6's TileMetadataStore and references `UPLOAD_GIVEUP` via a local string constant — no c6 imports. Tests: 13 decorator + 1 conformance + 2 factory bypass + AC-6 enum update + alembic head bump + AZ-272 schema fixture. 238 passed across c11/c6/fdr suites; pre-existing perf microbenches unrelated. Code review: PASS_WITH_WARNINGS (5 Low/Informational findings, docs-level or downstream-CI-blocked). See _docs/03_implementation/reviews/batch_41_review.md. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -22,7 +22,10 @@ from gps_denied_onboard.components.c11_tile_manager._types import (
|
||||
UploadOutcome,
|
||||
UploadRequest,
|
||||
)
|
||||
from gps_denied_onboard.components.c11_tile_manager.config import C11Config
|
||||
from gps_denied_onboard.components.c11_tile_manager.config import (
|
||||
C11Config,
|
||||
C11RetryConfig,
|
||||
)
|
||||
from gps_denied_onboard.components.c11_tile_manager.errors import (
|
||||
CacheBudgetExceededError,
|
||||
FlightStateNotOnGroundError,
|
||||
@@ -36,6 +39,9 @@ from gps_denied_onboard.components.c11_tile_manager.errors import (
|
||||
from gps_denied_onboard.components.c11_tile_manager.flight_state_gate import (
|
||||
FlightStateGate,
|
||||
)
|
||||
from gps_denied_onboard.components.c11_tile_manager.idempotent_retry import (
|
||||
IdempotentRetryTileUploader,
|
||||
)
|
||||
from gps_denied_onboard.components.c11_tile_manager.interface import (
|
||||
FlightStateSource,
|
||||
TileDownloader,
|
||||
@@ -59,6 +65,7 @@ register_component_block("c11_tile_manager", C11Config)
|
||||
|
||||
__all__ = [
|
||||
"C11Config",
|
||||
"C11RetryConfig",
|
||||
"CacheBudgetExceededError",
|
||||
"DOWNLOAD_JOURNAL_DIRNAME",
|
||||
"DownloadBatchReport",
|
||||
@@ -70,6 +77,7 @@ __all__ = [
|
||||
"FlightStateSource",
|
||||
"HttpTileDownloader",
|
||||
"HttpTileUploader",
|
||||
"IdempotentRetryTileUploader",
|
||||
"IngestStatus",
|
||||
"PerFlightKeyManager",
|
||||
"PerTileStatus",
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
"""C11 TileManager config block (AZ-316, AZ-319).
|
||||
"""C11 TileManager config block (AZ-316, AZ-319, AZ-320).
|
||||
|
||||
Registered into ``config.components['c11_tile_manager']`` by the
|
||||
package ``__init__.py``. Two composition-root factories read this
|
||||
package ``__init__.py``. Three composition-root factories read this
|
||||
block:
|
||||
|
||||
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader`
|
||||
reads the ``upload_*`` fields and ``companion_id`` to drive AZ-319.
|
||||
reads the ``upload_*`` fields, ``companion_id``, and the AZ-320
|
||||
``retry`` block (``disable_retry_decorator`` + the per-tile / per-call
|
||||
retry knobs) to drive AZ-319 + the optional AZ-320 decorator.
|
||||
* :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_downloader`
|
||||
reads the ``satellite_provider_url``, ``service_api_key``, and
|
||||
``download_*`` fields to drive AZ-316.
|
||||
@@ -19,11 +21,11 @@ wiring.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from gps_denied_onboard.config.schema import ConfigError
|
||||
|
||||
__all__ = ["C11Config"]
|
||||
__all__ = ["C11Config", "C11RetryConfig"]
|
||||
|
||||
|
||||
_DEFAULT_BATCH_SIZE: int = 25
|
||||
@@ -34,6 +36,55 @@ _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR: float = 0.5
|
||||
_DEFAULT_DOWNLOAD_MAX_5XX_RETRIES: int = 4
|
||||
_MIN_DOWNLOAD_RETRIES: int = 1
|
||||
_MAX_DOWNLOAD_RETRIES: int = 16
|
||||
_DEFAULT_MAX_IN_CALL_RETRIES: int = 3
|
||||
_DEFAULT_MAX_PER_TILE_ATTEMPTS: int = 5
|
||||
_DEFAULT_RETRY_BACKOFF_BASE_S: float = 2.0
|
||||
_DEFAULT_RETRY_BACKOFF_CAP_S: float = 60.0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class C11RetryConfig:
|
||||
"""C11 ``IdempotentRetryTileUploader`` knobs (AZ-320).
|
||||
|
||||
* ``max_in_call_retries`` — bounded loop count for partial-success
|
||||
re-invocations of the wrapped uploader within a single call.
|
||||
* ``max_per_tile_attempts`` — terminal threshold per tile across
|
||||
ALL calls; exceeding the threshold moves the tile to
|
||||
:class:`VotingStatus.UPLOAD_GIVEUP` (a human-decision boundary —
|
||||
automated promotion back to ``PENDING`` is forbidden).
|
||||
* ``backoff_base_s`` — base of the exponential backoff used between
|
||||
in-call retries (``base ** retries_used``).
|
||||
* ``backoff_cap_s`` — upper bound on each individual backoff sleep;
|
||||
also used as the operator hint for ``next_retry_at_s`` when the
|
||||
in-call budget is exhausted.
|
||||
"""
|
||||
|
||||
max_in_call_retries: int = _DEFAULT_MAX_IN_CALL_RETRIES
|
||||
max_per_tile_attempts: int = _DEFAULT_MAX_PER_TILE_ATTEMPTS
|
||||
backoff_base_s: float = _DEFAULT_RETRY_BACKOFF_BASE_S
|
||||
backoff_cap_s: float = _DEFAULT_RETRY_BACKOFF_CAP_S
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.max_in_call_retries < 0:
|
||||
raise ConfigError(
|
||||
"C11RetryConfig.max_in_call_retries must be >= 0; "
|
||||
f"got {self.max_in_call_retries}"
|
||||
)
|
||||
if self.max_per_tile_attempts <= 0:
|
||||
raise ConfigError(
|
||||
"C11RetryConfig.max_per_tile_attempts must be > 0; "
|
||||
f"got {self.max_per_tile_attempts}"
|
||||
)
|
||||
if self.backoff_base_s <= 0:
|
||||
raise ConfigError(
|
||||
"C11RetryConfig.backoff_base_s must be > 0; "
|
||||
f"got {self.backoff_base_s}"
|
||||
)
|
||||
if self.backoff_cap_s <= 0:
|
||||
raise ConfigError(
|
||||
"C11RetryConfig.backoff_cap_s must be > 0; "
|
||||
f"got {self.backoff_cap_s}"
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -81,6 +132,9 @@ class C11Config:
|
||||
download_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S
|
||||
download_resolution_floor_m_per_px: float = _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR
|
||||
|
||||
disable_retry_decorator: bool = False
|
||||
retry: C11RetryConfig = field(default_factory=C11RetryConfig)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE:
|
||||
raise ConfigError(
|
||||
@@ -118,3 +172,8 @@ class C11Config:
|
||||
"C11Config.download_resolution_floor_m_per_px must be > 0; "
|
||||
f"got {self.download_resolution_floor_m_per_px}"
|
||||
)
|
||||
if not isinstance(self.retry, C11RetryConfig):
|
||||
raise ConfigError(
|
||||
"C11Config.retry must be a C11RetryConfig; got "
|
||||
f"{type(self.retry).__name__}"
|
||||
)
|
||||
|
||||
@@ -0,0 +1,346 @@
|
||||
"""C11 ``IdempotentRetryTileUploader`` (AZ-320) — bounded-retry decorator.
|
||||
|
||||
Wraps any concrete :class:`TileUploader` (production wiring uses
|
||||
:class:`HttpTileUploader` from AZ-319) and adds two bounded retry
|
||||
budgets on top of partial-success outcomes:
|
||||
|
||||
1. **Per-call (in-call) budget** — re-invokes the inner uploader at
|
||||
most ``config.max_in_call_retries`` times when the inner returns
|
||||
:class:`UploadOutcome.PARTIAL`. Exponential backoff between rounds
|
||||
(``base ** retries_used``), capped at ``backoff_cap_s``.
|
||||
2. **Per-tile budget** — for every tile the inner reports as
|
||||
:class:`IngestStatus.REJECTED`, the decorator atomically increments
|
||||
the c6 ``upload_attempts`` counter; once the counter hits
|
||||
``config.max_per_tile_attempts`` the tile is moved to
|
||||
:class:`VotingStatus.UPLOAD_GIVEUP` (forward-only — see
|
||||
``tile_metadata_store.md`` v1.3.0 Invariant I-8). The c6
|
||||
:meth:`pending_uploads` query excludes ``UPLOAD_GIVEUP`` rows so
|
||||
the next upload run skips them; recovery is an out-of-band
|
||||
operator SQL update.
|
||||
|
||||
Architecture
|
||||
------------
|
||||
The c6 metadata-store surface is reached via the structural cut
|
||||
:class:`_RetryMetadataStoreLike` declared in this module — never
|
||||
through a direct ``from gps_denied_onboard.components.c6_tile_cache
|
||||
import`` (the AZ-507 cross-component rule + the AZ-270 lint enforce
|
||||
this on every ``components/**/*.py`` file). The composition root
|
||||
(:func:`runtime_root.c11_factory.build_tile_uploader`) is the single
|
||||
layer that may bind the concrete c6 store into the constructor.
|
||||
|
||||
The injected :class:`Clock` is the same singleton AZ-308 / AZ-307 /
|
||||
AZ-319 use; the decorator drives backoff via
|
||||
:meth:`Clock.sleep_until_ns` (so the AZ-398 invariant — no
|
||||
``time.sleep`` in ``components/`` — holds) and reads wall-clock
|
||||
seconds via :meth:`Clock.time_ns` for the operator-facing
|
||||
``next_retry_at_s`` hint.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import replace
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
from uuid import UUID
|
||||
|
||||
from gps_denied_onboard.clock.interface import Clock
|
||||
from gps_denied_onboard.components.c11_tile_manager._types import (
|
||||
FlightStateSignal,
|
||||
IngestStatus,
|
||||
PerTileStatus,
|
||||
UploadBatchReport,
|
||||
UploadOutcome,
|
||||
UploadRequest,
|
||||
)
|
||||
from gps_denied_onboard.components.c11_tile_manager.config import C11RetryConfig
|
||||
from gps_denied_onboard.components.c11_tile_manager.interface import TileUploader
|
||||
from gps_denied_onboard.fdr_client import (
|
||||
CURRENT_SCHEMA_VERSION,
|
||||
FdrClient,
|
||||
FdrRecord,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"IdempotentRetryTileUploader",
|
||||
]
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
"""ISO-8601 UTC timestamp matching ``tile_uploader._iso_now`` format."""
|
||||
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
|
||||
|
||||
_COMPONENT = "c11_tile_manager.idempotent_retry"
|
||||
_FDR_KIND_GIVEUP = "c11.upload.giveup"
|
||||
_LOG_KIND_SESSION_START = "c11.retry.session.start"
|
||||
_LOG_KIND_RETRY_ATTEMPT = "c11.retry.attempt"
|
||||
_LOG_KIND_GIVEUP = "c11.retry.tile.giveup"
|
||||
_LOG_KIND_BUDGET_EXHAUSTED = "c11.retry.budget.exhausted"
|
||||
|
||||
# Locally-scoped string constant matching c6's ``VotingStatus.UPLOAD_GIVEUP``
|
||||
# value. Declared here (NOT imported) to honour AZ-507 — the consumer-side
|
||||
# cut also accepts ``str`` for the second argument to ``update_voting_status``
|
||||
# so the decorator never imports c6's enum.
|
||||
_VOTING_STATUS_UPLOAD_GIVEUP = "upload_giveup"
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Consumer-side cut over c6's TileMetadataStore (AZ-507 boundary)
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class _RetryMetadataStoreLike(Protocol):
|
||||
"""Two-method structural cut of c6's :class:`TileMetadataStore`.
|
||||
|
||||
The decorator only needs the two write surfaces that the C11
|
||||
upload retry path mutates: atomic per-row attempt counter increment
|
||||
and a forward-only voting-status transition. The composition-root
|
||||
adapter binds these to the concrete c6 ``PostgresFilesystemStore``
|
||||
(which exposes both directly under their c6 names).
|
||||
|
||||
The ``status`` argument to :meth:`update_voting_status` is typed
|
||||
:class:`Any` so callers may pass either a c6 ``VotingStatus`` enum
|
||||
or the bare string ``"upload_giveup"``; the c6 impl coerces either
|
||||
shape via ``VotingStatus(status)``.
|
||||
"""
|
||||
|
||||
def increment_upload_attempts(self, tile_id: Any) -> int: ...
|
||||
|
||||
def update_voting_status(self, tile_id: Any, status: Any) -> None: ...
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Concrete decorator
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
|
||||
class IdempotentRetryTileUploader:
|
||||
""":class:`TileUploader`-conforming decorator that adds bounded retry.
|
||||
|
||||
See module docstring for the full rationale. Construction is via
|
||||
keyword-only arguments so every wired-in dependency is visible at
|
||||
the composition root; constructor never reads global state.
|
||||
|
||||
The decorator is idempotent across operator re-invocations because
|
||||
the inner uploader's :meth:`upload_pending_tiles` always re-queries
|
||||
c6's :meth:`pending_uploads` on entry — already-acked tiles have
|
||||
been ``mark_uploaded``'d (so they're filtered out by the c6 SQL),
|
||||
and ``UPLOAD_GIVEUP`` tiles are excluded by the v1.3.0 SQL update.
|
||||
The decorator therefore only needs to bound retries; it never has
|
||||
to track which tiles were already attempted.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
inner: TileUploader,
|
||||
tile_metadata_store: _RetryMetadataStoreLike,
|
||||
fdr_client: FdrClient,
|
||||
logger: logging.Logger,
|
||||
clock: Clock,
|
||||
config: C11RetryConfig,
|
||||
) -> None:
|
||||
self._inner = inner
|
||||
self._metadata_store = tile_metadata_store
|
||||
self._fdr = fdr_client
|
||||
self._logger = logger
|
||||
self._clock = clock
|
||||
self._config = config
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# TileUploader Protocol surface
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport:
|
||||
"""Bounded loop over inner; partial outcomes drive the retry path."""
|
||||
|
||||
self._logger.info(
|
||||
"Retry decorator session started",
|
||||
extra={
|
||||
"component": _COMPONENT,
|
||||
"kind": _LOG_KIND_SESSION_START,
|
||||
"kv": {
|
||||
"flight_id": str(request.flight_id) if request.flight_id else None,
|
||||
"max_in_call_retries": self._config.max_in_call_retries,
|
||||
"max_per_tile_attempts": self._config.max_per_tile_attempts,
|
||||
"backoff_base_s": self._config.backoff_base_s,
|
||||
"backoff_cap_s": self._config.backoff_cap_s,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
retries_used = 0
|
||||
report = self._inner.upload_pending_tiles(request)
|
||||
|
||||
while True:
|
||||
if report.outcome != UploadOutcome.PARTIAL:
|
||||
# SUCCESS or FAILURE → return as-is; no retry shape.
|
||||
# ``retry_count`` reflects the number of in-call retries
|
||||
# the decorator drove (zero on the happy path; > 0 if
|
||||
# an earlier round was PARTIAL and this round flipped).
|
||||
return self._with_retry_count(report, retries_used)
|
||||
|
||||
self._handle_rejected_tiles(report, request)
|
||||
|
||||
if retries_used >= self._config.max_in_call_retries:
|
||||
# Per-call budget exhausted; surface the hint and stop.
|
||||
# Spec § Outcome bullet 3 also mentions "AND there are
|
||||
# still tiles with voting_status==pending" — equivalent
|
||||
# to the budget check here because the inner's next call
|
||||
# would query c6's pending_uploads and return SUCCESS
|
||||
# immediately if the set is empty (no extra round-trip
|
||||
# added on the cold path).
|
||||
next_retry_at_s = self._next_retry_at_s()
|
||||
self._logger.warning(
|
||||
"In-call retry budget exhausted",
|
||||
extra={
|
||||
"component": _COMPONENT,
|
||||
"kind": _LOG_KIND_BUDGET_EXHAUSTED,
|
||||
"kv": {
|
||||
"flight_id": str(request.flight_id) if request.flight_id else None,
|
||||
"retries_used": retries_used,
|
||||
"next_retry_at_s": next_retry_at_s,
|
||||
},
|
||||
},
|
||||
)
|
||||
return replace(
|
||||
report,
|
||||
outcome=UploadOutcome.PARTIAL,
|
||||
retry_count=retries_used,
|
||||
next_retry_at_s=next_retry_at_s,
|
||||
)
|
||||
|
||||
# Sleep with capped exponential backoff, then retry.
|
||||
# Spec: the n-th retry waits ``base ** n`` seconds, so the
|
||||
# FIRST retry uses exponent 1 (base ** 1) — never base ** 0.
|
||||
retries_used += 1
|
||||
wait_s = self._backoff_for(retries_used)
|
||||
self._sleep(wait_s)
|
||||
self._logger.info(
|
||||
"Retry decorator scheduling next inner attempt",
|
||||
extra={
|
||||
"component": _COMPONENT,
|
||||
"kind": _LOG_KIND_RETRY_ATTEMPT,
|
||||
"kv": {
|
||||
"flight_id": str(request.flight_id) if request.flight_id else None,
|
||||
"attempt_number": retries_used,
|
||||
"sleep_s": wait_s,
|
||||
},
|
||||
},
|
||||
)
|
||||
report = self._inner.upload_pending_tiles(request)
|
||||
|
||||
def enumerate_pending_tiles(
|
||||
self, flight_id: UUID | None = None
|
||||
) -> list[Any]:
|
||||
"""Pass-through to the inner uploader (AC-11)."""
|
||||
|
||||
return list(self._inner.enumerate_pending_tiles(flight_id))
|
||||
|
||||
def confirm_flight_state(self) -> FlightStateSignal:
|
||||
"""Pass-through to the inner uploader (AC-11)."""
|
||||
|
||||
return self._inner.confirm_flight_state()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _handle_rejected_tiles(
|
||||
self,
|
||||
report: UploadBatchReport,
|
||||
request: UploadRequest,
|
||||
) -> None:
|
||||
"""Increment per-tile attempts; trip ``UPLOAD_GIVEUP`` on threshold."""
|
||||
|
||||
for entry in report.per_tile_status:
|
||||
if entry.status != IngestStatus.REJECTED:
|
||||
continue
|
||||
try:
|
||||
new_count = self._metadata_store.increment_upload_attempts(entry.tile_id)
|
||||
except Exception:
|
||||
# If the c6 increment fails, we cannot judge whether
|
||||
# the per-tile budget is exhausted — re-raise so the
|
||||
# operator notices instead of silently retrying.
|
||||
raise
|
||||
if new_count >= self._config.max_per_tile_attempts:
|
||||
self._mark_giveup(entry, new_count, request)
|
||||
|
||||
def _mark_giveup(
|
||||
self,
|
||||
entry: PerTileStatus,
|
||||
attempts: int,
|
||||
request: UploadRequest,
|
||||
) -> None:
|
||||
"""Forward-transition the tile to ``UPLOAD_GIVEUP`` + emit FDR + log."""
|
||||
|
||||
self._metadata_store.update_voting_status(
|
||||
entry.tile_id, _VOTING_STATUS_UPLOAD_GIVEUP
|
||||
)
|
||||
self._fdr.enqueue(
|
||||
FdrRecord(
|
||||
schema_version=CURRENT_SCHEMA_VERSION,
|
||||
ts=_iso_now(),
|
||||
producer_id=self._fdr.producer_id,
|
||||
kind=_FDR_KIND_GIVEUP,
|
||||
payload={
|
||||
"flight_id": (
|
||||
str(request.flight_id) if request.flight_id is not None else None
|
||||
),
|
||||
"tile_id": str(entry.tile_id),
|
||||
"attempts": int(attempts),
|
||||
"last_rejection_reason": entry.rejection_reason or "",
|
||||
},
|
||||
)
|
||||
)
|
||||
self._logger.error(
|
||||
"Tile moved to UPLOAD_GIVEUP after exhausting per-tile budget",
|
||||
extra={
|
||||
"component": _COMPONENT,
|
||||
"kind": _LOG_KIND_GIVEUP,
|
||||
"kv": {
|
||||
"flight_id": (
|
||||
str(request.flight_id) if request.flight_id is not None else None
|
||||
),
|
||||
"tile_id": str(entry.tile_id),
|
||||
"attempts": int(attempts),
|
||||
"last_rejection_reason": entry.rejection_reason or "",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def _backoff_for(self, attempt_number: int) -> float:
|
||||
"""``min(base ** attempt_number, cap)`` — AC-4 / AC-5.
|
||||
|
||||
``attempt_number`` is the 1-indexed retry round (the FIRST
|
||||
retry is attempt 1, so the first sleep is ``base ** 1``).
|
||||
"""
|
||||
|
||||
raw = self._config.backoff_base_s ** attempt_number
|
||||
return float(min(raw, self._config.backoff_cap_s))
|
||||
|
||||
def _sleep(self, wait_s: float) -> None:
|
||||
"""Route through ``Clock.sleep_until_ns`` per AZ-398."""
|
||||
|
||||
if wait_s <= 0:
|
||||
return
|
||||
ns_target = self._clock.monotonic_ns() + int(wait_s * 1_000_000_000)
|
||||
self._clock.sleep_until_ns(ns_target)
|
||||
|
||||
def _next_retry_at_s(self) -> int:
|
||||
"""Wall-clock-seconds hint = ``now + backoff_cap_s`` (operator UI)."""
|
||||
|
||||
now_s = self._clock.time_ns() // 1_000_000_000
|
||||
return int(now_s) + int(self._config.backoff_cap_s)
|
||||
|
||||
def _with_retry_count(
|
||||
self, report: UploadBatchReport, retries_used: int
|
||||
) -> UploadBatchReport:
|
||||
"""Return ``report`` with ``retry_count`` overridden."""
|
||||
|
||||
if retries_used == 0:
|
||||
return report
|
||||
return replace(report, retry_count=retries_used)
|
||||
@@ -68,13 +68,19 @@ class VotingStatus(str, Enum):
|
||||
|
||||
Forward-only transitions per Invariant I-8 of
|
||||
``tile_metadata_store.md``: ``PENDING → TRUSTED``,
|
||||
``PENDING → REJECTED``, ``TRUSTED → REJECTED``. The
|
||||
impl (NOT this task) enforces the transition table.
|
||||
``PENDING → REJECTED``, ``TRUSTED → REJECTED``,
|
||||
``PENDING → UPLOAD_GIVEUP``, ``TRUSTED → UPLOAD_GIVEUP``
|
||||
(the last two added in v1.3.0 by AZ-320 — see the
|
||||
contract Change Log). The impl enforces the transition
|
||||
table; ``UPLOAD_GIVEUP`` is a human-decision terminal
|
||||
state — automated promotion back to ``PENDING`` is
|
||||
forbidden.
|
||||
"""
|
||||
|
||||
PENDING = "pending"
|
||||
TRUSTED = "trusted"
|
||||
REJECTED = "rejected"
|
||||
UPLOAD_GIVEUP = "upload_giveup"
|
||||
|
||||
|
||||
class SectorClassification(str, Enum):
|
||||
|
||||
@@ -153,6 +153,23 @@ class TileMetadataStore(Protocol):
|
||||
"""Point lookup; returns ``None`` if absent (NOT raises)."""
|
||||
...
|
||||
|
||||
def increment_upload_attempts(self, tile_id: TileId) -> int:
|
||||
"""Atomically bump ``upload_attempts`` for the row; return new count.
|
||||
|
||||
Added in v1.3.0 (AZ-320) for the C11 upload retry decorator.
|
||||
Concurrent invocations on different ``tile_id`` values do not
|
||||
block each other (per-row UPDATE...RETURNING). Raises
|
||||
:class:`TileNotFoundError` if the row is absent and
|
||||
:class:`TileMetadataError` on transport failure. The Protocol
|
||||
method body raises :class:`NotImplementedError` so legacy
|
||||
impls keep their conformance — production wiring uses the
|
||||
:class:`PostgresFilesystemStore` impl that ships the SQL.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"TileMetadataStore.increment_upload_attempts: must be "
|
||||
"implemented by the concrete c6 store; see AZ-320 / v1.3.0"
|
||||
)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class DescriptorIndex(Protocol):
|
||||
|
||||
@@ -95,11 +95,17 @@ _ZERO_UUID = UUID("00000000-0000-0000-0000-000000000000")
|
||||
_MAX_FDR_FAILURE_MSG_LEN = 512
|
||||
|
||||
# Invariant I-8 of tile_metadata_store.md — forward-only voting transitions.
|
||||
# v1.3.0 (AZ-320) added the two UPLOAD_GIVEUP transitions: a tile that
|
||||
# exhausts the C11 retry decorator's per-tile budget moves from PENDING /
|
||||
# TRUSTED to UPLOAD_GIVEUP. Backward (UPLOAD_GIVEUP → anything) is
|
||||
# forbidden — a human-decision boundary recovered via out-of-band SQL.
|
||||
_ALLOWED_VOTING_TRANSITIONS = frozenset(
|
||||
{
|
||||
(VotingStatus.PENDING, VotingStatus.TRUSTED),
|
||||
(VotingStatus.PENDING, VotingStatus.REJECTED),
|
||||
(VotingStatus.TRUSTED, VotingStatus.REJECTED),
|
||||
(VotingStatus.PENDING, VotingStatus.UPLOAD_GIVEUP),
|
||||
(VotingStatus.TRUSTED, VotingStatus.UPLOAD_GIVEUP),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -645,10 +651,47 @@ class PostgresFilesystemStore:
|
||||
f"mark_uploaded: pool/query error for {tile_id}: {exc}"
|
||||
) from exc
|
||||
|
||||
def increment_upload_attempts(self, tile_id: TileId) -> int:
|
||||
"""Atomic UPDATE...RETURNING — bump ``upload_attempts``, return new count.
|
||||
|
||||
Concurrent invocations on different tile rows do NOT block each
|
||||
other (per-row UPDATE; no surrounding ``FOR UPDATE`` lock). The
|
||||
UPDATE is single-statement so PG's per-row write lock provides
|
||||
all the serialisation needed for AC-7 of AZ-320.
|
||||
"""
|
||||
tile_x, tile_y = self._tile_xy(tile_id)
|
||||
try:
|
||||
with self._connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE tiles "
|
||||
"SET upload_attempts = upload_attempts + 1, "
|
||||
"updated_at = now() "
|
||||
"WHERE zoom_level=%s AND tile_x=%s AND tile_y=%s "
|
||||
"RETURNING upload_attempts",
|
||||
(tile_id.zoom_level, tile_x, tile_y),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
raise TileNotFoundError(
|
||||
f"increment_upload_attempts: no row for {tile_id} "
|
||||
f"(z={tile_id.zoom_level},x={tile_x},y={tile_y})"
|
||||
)
|
||||
new_count = int(row[0])
|
||||
conn.commit()
|
||||
return new_count
|
||||
except psycopg.Error as exc:
|
||||
raise TileMetadataError(
|
||||
f"increment_upload_attempts: pool/query error for "
|
||||
f"{tile_id}: {exc}"
|
||||
) from exc
|
||||
|
||||
def pending_uploads(self) -> list[TileMetadata]:
|
||||
sql = (
|
||||
"SELECT " + ", ".join(_TILE_COLUMNS) + " FROM tiles "
|
||||
"WHERE source = 'onboard_ingest' AND uploaded_at IS NULL "
|
||||
"WHERE source = 'onboard_ingest' "
|
||||
"AND uploaded_at IS NULL "
|
||||
"AND voting_status IS DISTINCT FROM 'upload_giveup' "
|
||||
"ORDER BY capture_timestamp ASC, id ASC"
|
||||
)
|
||||
try:
|
||||
|
||||
@@ -260,6 +260,26 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = {
|
||||
"observed_at_iso",
|
||||
}
|
||||
),
|
||||
# AZ-320 / E-C11: emitted by
|
||||
# ``IdempotentRetryTileUploader._mark_giveup`` when a tile's
|
||||
# ``upload_attempts`` counter (in c6) reaches
|
||||
# ``C11RetryConfig.max_per_tile_attempts``. The decorator moves
|
||||
# the row to ``VotingStatus.UPLOAD_GIVEUP`` (forward-only per
|
||||
# ``tile_metadata_store.md`` v1.3.0 Invariant I-8) so subsequent
|
||||
# ``pending_uploads()`` queries skip it. ``flight_id`` is the
|
||||
# request's optional UUID (None when the operator runs the
|
||||
# cross-flight cleanup); ``tile_id`` is the c6 ``TileId`` string;
|
||||
# ``attempts`` is the per-tile counter at the moment of giveup;
|
||||
# ``last_rejection_reason`` is the parent-suite's free-text
|
||||
# explanation captured from the last ``IngestStatus.REJECTED``.
|
||||
"c11.upload.giveup": frozenset(
|
||||
{
|
||||
"flight_id",
|
||||
"tile_id",
|
||||
"attempts",
|
||||
"last_rejection_reason",
|
||||
}
|
||||
),
|
||||
}
|
||||
|
||||
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())
|
||||
|
||||
@@ -36,7 +36,9 @@ from gps_denied_onboard.components.c11_tile_manager import (
|
||||
FlightStateSource,
|
||||
HttpTileDownloader,
|
||||
HttpTileUploader,
|
||||
IdempotentRetryTileUploader,
|
||||
PerFlightKeyManager,
|
||||
TileUploader,
|
||||
)
|
||||
from gps_denied_onboard.config.schema import ConfigError
|
||||
from gps_denied_onboard.fdr_client import FdrClient, make_fdr_client
|
||||
@@ -44,6 +46,7 @@ from gps_denied_onboard.logging import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from gps_denied_onboard.clock import Clock
|
||||
from gps_denied_onboard.clock.interface import Clock as ClockProtocol
|
||||
from gps_denied_onboard.config.schema import Config
|
||||
|
||||
__all__ = [
|
||||
@@ -107,9 +110,10 @@ def build_tile_uploader(
|
||||
tile_metadata_store: Any,
|
||||
flight_state_gate: FlightStateGate,
|
||||
key_manager: PerFlightKeyManager,
|
||||
clock: ClockProtocol | None = None,
|
||||
fdr_client: FdrClient | None = None,
|
||||
) -> HttpTileUploader:
|
||||
"""Construct a wired :class:`HttpTileUploader` (AZ-319).
|
||||
) -> TileUploader:
|
||||
"""Construct a wired :class:`TileUploader` for AZ-319 (+ AZ-320 retry).
|
||||
|
||||
The c6 surfaces (``tile_store``, ``tile_metadata_store``) are
|
||||
consumer-side cuts injected here by the operator-binary
|
||||
@@ -117,6 +121,16 @@ def build_tile_uploader(
|
||||
is also caller-owned: production wiring uses one long-lived
|
||||
:class:`httpx.Client` per process; tests inject
|
||||
``httpx.Client(transport=httpx.MockTransport(...))``.
|
||||
|
||||
By default the bare :class:`HttpTileUploader` is wrapped in the
|
||||
AZ-320 :class:`IdempotentRetryTileUploader` decorator (per-call +
|
||||
per-tile bounded retry). The wrapping is suppressed by setting
|
||||
``config.components['c11_tile_manager'].disable_retry_decorator =
|
||||
True`` — for low-level debugging or test wiring that wants to
|
||||
observe the inner uploader directly. The ``clock`` is required
|
||||
when the decorator is active; if omitted a default
|
||||
:class:`WallClock` is constructed (matches the production
|
||||
operator-binary wiring pattern).
|
||||
"""
|
||||
|
||||
block = config.components.get("c11_tile_manager")
|
||||
@@ -144,7 +158,7 @@ def build_tile_uploader(
|
||||
if fdr_client is None:
|
||||
fdr_client = make_fdr_client(_C11_UPLOADER_PRODUCER_ID, config)
|
||||
logger = get_logger(_C11_UPLOADER_LOGGER)
|
||||
return HttpTileUploader(
|
||||
inner = HttpTileUploader(
|
||||
http_client=http_client,
|
||||
tile_store=tile_store,
|
||||
tile_metadata_store=tile_metadata_store,
|
||||
@@ -155,6 +169,31 @@ def build_tile_uploader(
|
||||
config=block,
|
||||
)
|
||||
|
||||
if block.disable_retry_decorator:
|
||||
logger.info(
|
||||
"AZ-320 retry decorator BYPASSED (config.disable_retry_decorator = true)",
|
||||
extra={
|
||||
"component": "c11_tile_manager.tile_uploader",
|
||||
"kind": "c11.upload.retry.decorator.bypassed",
|
||||
"kv": {"reason": "config_flag"},
|
||||
},
|
||||
)
|
||||
return inner
|
||||
|
||||
if clock is None:
|
||||
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||
|
||||
clock = WallClock()
|
||||
decorator_logger = get_logger("c11_tile_manager.idempotent_retry")
|
||||
return IdempotentRetryTileUploader(
|
||||
inner=inner,
|
||||
tile_metadata_store=tile_metadata_store,
|
||||
fdr_client=fdr_client,
|
||||
logger=decorator_logger,
|
||||
clock=clock,
|
||||
config=block.retry,
|
||||
)
|
||||
|
||||
|
||||
def build_tile_downloader(
|
||||
config: Config,
|
||||
|
||||
Reference in New Issue
Block a user