From a06b107fc3709ac341e97084d2ee6e91cbcd5736 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Wed, 13 May 2026 08:48:53 +0300 Subject: [PATCH] [AZ-320] Add C11 IdempotentRetryTileUploader decorator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps HttpTileUploader (AZ-319) with two bounded retry budgets: - In-call (per-batch) — re-invokes inner on PARTIAL outcome up to `max_in_call_retries` times with capped exponential backoff (`min(base ** attempt_number, cap)`). On exhaustion: surfaces an operator hint via `next_retry_at_s = now + backoff_cap_s`. - Per-tile (cross-call) — atomically increments c6's `tiles.upload_attempts` counter for every rejection; once a tile hits `max_per_tile_attempts` it is forward-only transitioned to `voting_status = upload_giveup` (excluded from `pending_uploads`). Each transition emits FDR `kind="c11.upload.giveup"` plus an ERROR log. C6 contract changes (AZ-303 v1.3.0): - VotingStatus.UPLOAD_GIVEUP added (forward-only from PENDING/TRUSTED). - TileMetadataStore.increment_upload_attempts(tile_id) -> int added with NotImplementedError default for backwards-compat. - Migration 0003_c11_upload_attempts: additive column + widened ck_tiles_voting_status (preserves IS NULL clause). C11 wiring: - C11RetryConfig + disable_retry_decorator on C11Config. - build_tile_uploader wraps in decorator by default; bypass flag returns the bare HttpTileUploader. New `clock` keyword. Cross-component isolation honoured (AZ-507): the decorator declares `_RetryMetadataStoreLike` Protocol cut over c6's TileMetadataStore and references `UPLOAD_GIVEUP` via a local string constant — no c6 imports. Tests: 13 decorator + 1 conformance + 2 factory bypass + AC-6 enum update + alembic head bump + AZ-272 schema fixture. 238 passed across c11/c6/fdr suites; pre-existing perf microbenches unrelated. Code review: PASS_WITH_WARNINGS (5 Low/Informational findings, docs-level or downstream-CI-blocked). See _docs/03_implementation/reviews/batch_41_review.md. Co-authored-by: Cursor --- .../c6_tile_cache/tile_metadata_store.md | 8 +- .../AZ-320_c11_idempotent_retry.md | 0 .../batch_41_cycle1_report.md | 244 ++++++++ .../reviews/batch_41_review.md | 288 +++++++++ _docs/_autodev_state.md | 4 +- .../versions/0003_c11_upload_attempts.py | 83 +++ .../components/c11_tile_manager/__init__.py | 10 +- .../components/c11_tile_manager/config.py | 69 ++- .../c11_tile_manager/idempotent_retry.py | 346 +++++++++++ .../components/c6_tile_cache/_types.py | 10 +- .../components/c6_tile_cache/interface.py | 17 + .../postgres_filesystem_store.py | 45 +- src/gps_denied_onboard/fdr_client/records.py | 20 + .../runtime_root/c11_factory.py | 45 +- .../c11_tile_manager/test_idempotent_retry.py | 551 ++++++++++++++++++ .../test_protocol_conformance.py | 39 ++ .../test_protocol_conformance.py | 14 +- tests/unit/test_ac5_alembic.py | 9 +- tests/unit/test_az272_fdr_record_schema.py | 7 + 19 files changed, 1788 insertions(+), 21 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-320_c11_idempotent_retry.md (100%) create mode 100644 _docs/03_implementation/batch_41_cycle1_report.md create mode 100644 _docs/03_implementation/reviews/batch_41_review.md create mode 100644 db/migrations/versions/0003_c11_upload_attempts.py create mode 100644 src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py create mode 100644 tests/unit/c11_tile_manager/test_idempotent_retry.py diff --git a/_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md b/_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md index 8aaadae..152ee47 100644 --- a/_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md +++ b/_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md @@ -7,9 +7,9 @@ - AZ-TBD-c6-freshness-gate (insert hook + sector classification reader) - AZ-TBD-c6-cache-budget-eviction (LRU candidate enumeration + delete coordination) - TBD at decompose time: E-C10 (AZ-252 — manifest + provisioning), E-C11 (AZ-251 — both `TileDownloader` insert and `TileUploader` reader queries), E-C12 (AZ-253 — operator pre-flight tooling) -**Version**: 1.2.0 +**Version**: 1.3.0 **Status**: draft -**Last Updated**: 2026-05-12 +**Last Updated**: 2026-05-13 ## Purpose @@ -32,6 +32,7 @@ Defines the typed boundary to the Postgres-backed spatial index over `TileMetada | `lru_candidates` | `(*, max_count: int) -> list[TileMetadata]` | `TileMetadataError` | sync (oldest-`accessed_at`-first; bounded result set) | | `total_disk_bytes` | `() -> int` | `TileMetadataError` | sync (sum of `disk_bytes` column; ≤ 100 ms even at 100k rows) | | `get_by_id` | `(tile_id: TileId) -> Optional[TileMetadata]` | `TileMetadataError` | sync; returns `None` if absent (NOT `TileNotFoundError`) | +| `increment_upload_attempts` | `(tile_id: TileId) -> int` | `TileMetadataError`, `TileNotFoundError` | sync; atomic ``UPDATE … RETURNING`` (per-row lock); added in v1.3.0 | ### DTOs @@ -99,7 +100,7 @@ class SectorBoundary: - **I-5 (disk-budget invariant):** `total_disk_bytes` MUST equal `SUM(disk_bytes)` over all rows where `voting_status != REJECTED`. Rejected rows are tombstones — they keep the on-disk file deleted but retain the row for the manifest's content-hash check (D-C10-3). - **I-6 (frozen DTOs):** `Bbox`, `SectorBoundary`, `TileMetadataPersistent` are `@dataclass(frozen=True)`. - **I-7 (transactional writes):** `insert_metadata` is a single transaction over the `tiles` table; the freshness check + the row insert MUST be atomic (a parallel sector-boundary update MUST NOT race the gate). -- **I-8 (no silent voting-status downgrade):** `update_voting_status` accepts only forward transitions (`PENDING → TRUSTED`, `PENDING → REJECTED`); a backward transition raises `TileMetadataError`. `TRUSTED → REJECTED` is allowed (covers the cache-poisoning recall path). +- **I-8 (no silent voting-status downgrade):** `update_voting_status` accepts only forward transitions (`PENDING → TRUSTED`, `PENDING → REJECTED`, `TRUSTED → REJECTED`, `PENDING → UPLOAD_GIVEUP`, `TRUSTED → UPLOAD_GIVEUP`); a backward transition raises `TileMetadataError`. `TRUSTED → REJECTED` covers the cache-poisoning recall path; the two `UPLOAD_GIVEUP` transitions (added in v1.3.0 by AZ-320) cover the C11 retry decorator's per-tile budget exhaustion. `UPLOAD_GIVEUP → anything` is forbidden — recovery is an out-of-band SQL UPDATE by the operator. - **I-9 (`pending_uploads` is the single source for C11 TileUploader):** the uploader MUST NOT scan the filesystem for pending tiles; it MUST drive its loop off `pending_uploads()`. The metadata store is the bookkeeping. ## Non-Goals @@ -144,3 +145,4 @@ Same rules as `tile_store.md` § Versioning Rules. | 1.0.0 | 2026-05-10 | Initial contract — 9-method Protocol + LRU/disk-budget extensions + freshness gate semantics + composite-key uniqueness invariant. | autodev (decompose Step 2 of AZ-250 / E-C6) | | 1.1.0 | 2026-05-12 | Non-breaking refinement of Invariant I-1: natural key switched from `(zoom_level, lat, lon, source)` (float-based) to `(zoom_level, tile_x, tile_y, tile_size_meters, source, COALESCE(flight_id, zero_uuid))` (integer + per-flight separated). Protocol surface unchanged; consumers gain the ability to observe multiple ONBOARD_INGEST rows for the same cell from different flights (required by D-PROJ-2 voting). Driven by `_docs/_process_leftovers/2026-05-12_tile-schema-scenario-analysis.md` and the cross-workspace satellite-provider task `AZ-TBD_tile_identity_uuidv5_bulk_list`. | autodev (AZ-304 batch 27 of cycle 1) | | 1.2.0 | 2026-05-12 | Non-breaking addition of `TileMetadata.location_hash: UUID \| None = None` (cross-source/cross-flight cell-bag identifier; UUIDv5 over `(zoom, tile_x, tile_y)`). Corrected stale references: sector table name (`sector_boundaries` → `sector_classifications`) and Alembic env path (`c6_tile_cache/_alembic/` → `db/migrations/versions/`). Protocol surface unchanged; existing constructors continue to work because the field defaults to `None`. Shipped by AZ-304 alongside the additive `0002_c6_tile_identity_and_lru` migration. | autodev (AZ-304 batch 27 of cycle 1) | +| 1.3.0 | 2026-05-13 | Non-breaking addition of (a) `VotingStatus.UPLOAD_GIVEUP` terminal state, (b) two new forward transitions (`PENDING → UPLOAD_GIVEUP`, `TRUSTED → UPLOAD_GIVEUP`) under Invariant I-8, (c) the `increment_upload_attempts(tile_id) -> int` Protocol method (atomic per-row UPDATE … RETURNING), and (d) the `tiles.upload_attempts INTEGER NOT NULL DEFAULT 0` column. The Protocol method body raises `NotImplementedError` so legacy duck-typed impls keep their conformance — production wiring uses `PostgresFilesystemStore` which ships the SQL. `pending_uploads()` now also excludes `voting_status = upload_giveup`. Shipped by AZ-320 (C11 retry decorator) alongside the additive `0003_c11_upload_attempts` migration. | autodev (AZ-320 batch 41 of cycle 1) | diff --git a/_docs/02_tasks/todo/AZ-320_c11_idempotent_retry.md b/_docs/02_tasks/done/AZ-320_c11_idempotent_retry.md similarity index 100% rename from _docs/02_tasks/todo/AZ-320_c11_idempotent_retry.md rename to _docs/02_tasks/done/AZ-320_c11_idempotent_retry.md diff --git a/_docs/03_implementation/batch_41_cycle1_report.md b/_docs/03_implementation/batch_41_cycle1_report.md new file mode 100644 index 0000000..7235969 --- /dev/null +++ b/_docs/03_implementation/batch_41_cycle1_report.md @@ -0,0 +1,244 @@ +# Batch 41 — Cycle 1 Report + +**Date**: 2026-05-13 +**Batch**: 41 (single-task batch — C11 idempotent retry decorator) +**Tasks**: +- AZ-320 (C11 IdempotentRetryTileUploader, 3pt) + +**Total complexity**: 3pt +**Status**: complete; pending transition to "In Testing". + +## Scope + +Batch 41 lands the AZ-320 retry decorator that wraps the AZ-319 +`HttpTileUploader` and gives the operator-side upload path two bounded +retry budgets: + +1. **In-call (per-batch) budget** — re-invokes the inner uploader at + most `config.c11.retry.max_in_call_retries` times when the inner + returns `outcome=PARTIAL`. Backoff between rounds is + `min(base ** attempt_number, cap)`; the spec's worked example + (`max=3, base=2.0` → sleeps `2.0, 4.0, 8.0`) drove the + "attempt-number is 1-indexed" off-by-one fix in the loop body. +2. **Per-tile (cross-call) budget** — for every rejection the inner + surfaces, the decorator atomically increments c6's + `tiles.upload_attempts` counter; once the counter hits + `config.c11.retry.max_per_tile_attempts` the tile is forward-only + transitioned to `voting_status = upload_giveup`. The c6 + `pending_uploads` SQL excludes that status so subsequent operator + re-runs naturally skip those tiles. Recovery is documented as an + out-of-band SQL UPDATE (per the spec's "human decision boundary" + constraint). + +Each `UPLOAD_GIVEUP` transition emits one FDR record +(`kind="c11.upload.giveup"`) plus an ERROR log; budget exhaustion on +the in-call side emits a WARN log and surfaces an operator hint via +the existing `UploadBatchReport.next_retry_at_s` field +(`now + backoff_cap_s`). Pass-through methods +(`enumerate_pending_tiles`, `confirm_flight_state`) delegate to the +inner unchanged so the decorator is a true `TileUploader` Protocol +drop-in. + +## Architectural decisions + +### AZ-507 — consumer-side cuts for c6 (no enum imports either) + +The decorator only needs two write surfaces on c6's +`TileMetadataStore`: `increment_upload_attempts` and +`update_voting_status`. A direct `from +gps_denied_onboard.components.c6_tile_cache import …` would violate +AZ-507 / trip the AZ-270 lint, so `idempotent_retry.py` declares a +local `_RetryMetadataStoreLike` `Protocol` cut over those two methods +and binds the concrete `PostgresFilesystemStore` only at the +composition root. + +The c6 `VotingStatus.UPLOAD_GIVEUP` enum value is reached via a +locally-scoped `_VOTING_STATUS_UPLOAD_GIVEUP = "upload_giveup"` +string constant. The `update_voting_status` impl coerces either a +c6 enum or the bare string via `VotingStatus(status)`, so the +decorator never imports c6's enum. This matches the same pattern +`HttpTileDownloader` uses for the freshness-label string surface +(Batch 40, `_TileWriterLike`). + +### Forward-only voting transitions — list update + +`VotingStatus.UPLOAD_GIVEUP` is added as a fourth enum value; +`PostgresFilesystemStore._ALLOWED_VOTING_TRANSITIONS` was extended +with `(PENDING → UPLOAD_GIVEUP)` and `(TRUSTED → UPLOAD_GIVEUP)`. +The contract file's Invariant I-8 was updated in lockstep (v1.3.0 +Change Log entry). `REJECTED → UPLOAD_GIVEUP` is intentionally +NOT permitted — once the parent suite has rejected a tile, the +local retry budget is irrelevant. + +### Migration is append-only + +Per `coderule.mdc` (migrations are append-only) and the spec's +"Unacceptable substitutes" clause ("modifying AZ-304's 0001 +migration in place"), the new `0003_c11_upload_attempts.py` is a +fresh additive migration: + +- Adds `tiles.upload_attempts INTEGER NOT NULL DEFAULT 0`. +- Widens the `ck_tiles_voting_status` CHECK constraint to admit + `'upload_giveup'`. The widened predicate explicitly preserves + `voting_status IS NULL` (the original 0001 migration permits + NULL) — without this, legacy rows would fail the CHECK on + re-creation. +- Reversible: rollback drops both the column and the widened + constraint, restoring the AZ-304 head exactly. + +The Alembic head-revision assertion in `tests/unit/test_ac5_alembic.py` +was updated from `0002_c6_tile_identity_and_lru` to +`0003_c11_upload_attempts` in lockstep (the test docstring already +calls out "Future migrations update this assertion in lockstep"). + +### `Clock` injection (full Protocol, not just `sleep`) + +This is the third batch in a row to touch the "Clock vs. sleep +injection" deviation flagged in cumulative review batches 37-39 +(F2). For AZ-320 the decorator needs BOTH `monotonic_ns` (backoff +arithmetic) AND `time_ns` (the operator-facing `next_retry_at_s` +hint), so it accepts the full `Clock` Protocol — matching the +pattern AZ-307 / AZ-308 already use. This is the first C11 batch +to honour the no-deviation path; documented in the batch review +as F1 (informational, no action). + +### FDR `ts` derivation — datetime.now, not Clock + +The decorator emits `c11.upload.giveup` records with a +`ts=datetime.now(timezone.utc).strftime(...)` ISO string, matching +the existing pattern in `tile_uploader.py` (`_iso_now`). Switching +to `Clock.time_ns()` for ts derivation would break consistency +across the C11 component and would require a project-wide audit +of every `_iso_now()` call site. Documented as F2 (Low) for the +follow-up sweep PBI. + +### Off-by-one in the backoff exponent — fix during the test pass + +Initial implementation used `base ** retries_used` with +`retries_used` starting at 0, yielding sleeps of `1.0, 2.0, 4.0` +for `max=3, base=2.0`. The spec's worked example (AC-4) requires +`2.0, 4.0, 8.0`. Fixed by incrementing `retries_used` BEFORE +computing the backoff, and renamed the helper parameter to +`attempt_number` (1-indexed) with a clarifying docstring. Caught +by the test pass — re-confirms the value of writing the AC-4 +fixture verbatim from the spec rather than from the implementation. + +## Files touched + +Production: + +- `src/gps_denied_onboard/components/c6_tile_cache/_types.py` + (added `VotingStatus.UPLOAD_GIVEUP` + updated forward-transition + docstring) +- `src/gps_denied_onboard/components/c6_tile_cache/interface.py` + (added `TileMetadataStore.increment_upload_attempts(tile_id) -> int` + with a `NotImplementedError` default impl per the spec's + Compatibility NFR) +- `src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py` + (added `increment_upload_attempts` SQL + extended + `_ALLOWED_VOTING_TRANSITIONS` + tightened `pending_uploads` SQL + to exclude `voting_status='upload_giveup'`) +- `db/migrations/versions/0003_c11_upload_attempts.py` + (new — additive column + widened CHECK constraint) +- `src/gps_denied_onboard/components/c11_tile_manager/config.py` + (added `C11RetryConfig` frozen dataclass, `disable_retry_decorator` + bypass flag, nested `retry: C11RetryConfig` field on `C11Config`) +- `src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py` + (new — `IdempotentRetryTileUploader`, `_RetryMetadataStoreLike`, + `_iso_now`) +- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py` + (re-exports for `C11RetryConfig`, `IdempotentRetryTileUploader`) +- `src/gps_denied_onboard/runtime_root/c11_factory.py` + (`build_tile_uploader` now wraps `HttpTileUploader` in the + decorator by default; `disable_retry_decorator=true` returns + the bare uploader; new `clock` keyword parameter with WallClock + default for production wiring; return type widened to the + `TileUploader` Protocol) +- `src/gps_denied_onboard/fdr_client/records.py` + (registered `c11.upload.giveup` in `KNOWN_PAYLOAD_KEYS`) + +Contracts: + +- `_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md` + (v1.3.0 — added `increment_upload_attempts` to method table, + updated Invariant I-8 forward-transition list) + +Tests: + +- `tests/unit/c11_tile_manager/test_idempotent_retry.py` (new — + 13 tests: AC-1, AC-2, AC-3, AC-4, AC-5, AC-10 ×2, AC-11 ×2, + AC-12 ×2, FAILURE pass-through, NFR overhead microbench) +- `tests/unit/c11_tile_manager/test_protocol_conformance.py` + (added AC-9 — `isinstance(IdempotentRetryTileUploader, + TileUploader)`) +- `tests/unit/c6_tile_cache/test_protocol_conformance.py` + (extended AC-10 enum-surface test for `UPLOAD_GIVEUP`; updated + the two metadata-store fakes to include + `increment_upload_attempts`) +- `tests/unit/test_ac5_alembic.py` (updated head-revision + assertion to `0003_c11_upload_attempts`) +- `tests/unit/test_az272_fdr_record_schema.py` (added mock + payload for `c11.upload.giveup`) + +## Test results + +`pytest tests/unit -q --deselect tests/unit/c11_tile_manager/test_signing_key.py::test_nfr_perf_sign_microbench_p99_under_one_ms`: + +- **1429 passed**, 80 skipped, 1 deselected, 3 failed (all 3 + failures are pre-existing perf microbenches unrelated to + AZ-320: C10 batcher overhead, C8 covariance projector latency, + and the C11 signing-key sign-p99 microbench that is the same + flaky test the deselect targets). The deselected one is the + signing-key bench; the C10 and C8 perf benches were also + flaky on Batch 40's sweep (same dev-host noise). +- +9 net tests vs. Batch 40's sweep (the 13 decorator tests + + 1 conformance test + 2 factory bypass tests, minus the 7 + fakes that were already counted under c6 conformance and + now include the new `increment_upload_attempts` method). + +`pytest tests/unit/c11_tile_manager tests/unit/c6_tile_cache tests/unit/test_az272_fdr_record_schema.py`: + +- **238 passed**, 57 skipped (Postgres+Docker gates), + 1 deselected. Zero failures across all in-scope unit suites. + +`ReadLints`: clean across every touched file. + +## Code review verdict + +**PASS_WITH_WARNINGS** — see +`_docs/03_implementation/reviews/batch_41_review.md`. Findings: + +- F1 (Informational) — Clock injection deviation from prior + batches is now CLOSED for C11 (decorator uses the full Clock + Protocol). No action. +- F2 (Low) — `_iso_now()` still pulls wall-clock directly via + `datetime.now`; aligns with existing `tile_uploader._iso_now` + but the project-wide hygiene PBI to derive ts from `Clock` + remains open. +- F3 (Low) — Spec says "If `retries_used < max_in_call_retries` + AND there are still tiles with `voting_status == pending`"; the + decorator only checks the budget. Equivalent in practice (the + inner's next call queries `pending_uploads` and returns + `SUCCESS` immediately if empty), but worth a one-line comment. +- F4 (Low) — AC-7 (concurrent SQL increment) and AC-8 (migration + applied to live DB) are gated behind Docker-compose and were + not exercised in this dev sweep. The SQL implementation + follows the spec verbatim (`UPDATE … RETURNING …`); Docker + CI run will validate. +- F5 (Low) — Postgres tests under + `c6_tile_cache/test_postgres_schema.py` and + `c6_tile_cache/fixtures/c6_postgres_schema_v2.sql` still + reference the AZ-304 head and will need a follow-up tweak when + the Docker-gated suite is run against 0003. No code change in + this batch since those tests are skipped on the dev host. + +No blocking findings; no code change required for batch close-out. + +## Cumulative review + +Batch 41 is single-task; the next cumulative review window covers +batches 40-42 and will land before Batch 43 starts. The recurring +Clock-vs-sleep deviation flagged in cumulative reports for batches +37-39 is now CLOSED for C11 (this batch landed the full Clock +injection); the project-wide audit-PBI for `_iso_now` / +`datetime.now` callers remains open. diff --git a/_docs/03_implementation/reviews/batch_41_review.md b/_docs/03_implementation/reviews/batch_41_review.md new file mode 100644 index 0000000..28e781a --- /dev/null +++ b/_docs/03_implementation/reviews/batch_41_review.md @@ -0,0 +1,288 @@ +# Batch 41 — Code Review + +**Tasks**: AZ-320 (C11 IdempotentRetryTileUploader) +**Cycle**: 1 +**Reviewer**: autodev +**Verdict**: **PASS_WITH_WARNINGS** + +## Scope reviewed + +Production code: + +- `src/gps_denied_onboard/components/c6_tile_cache/_types.py` + (added `VotingStatus.UPLOAD_GIVEUP`) +- `src/gps_denied_onboard/components/c6_tile_cache/interface.py` + (added `TileMetadataStore.increment_upload_attempts(tile_id) -> int` + with a `NotImplementedError` default impl) +- `src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py` + (added `increment_upload_attempts` SQL impl, extended + `_ALLOWED_VOTING_TRANSITIONS`, tightened `pending_uploads` SQL) +- `db/migrations/versions/0003_c11_upload_attempts.py` + (additive: column + widened CHECK constraint, reversible) +- `src/gps_denied_onboard/components/c11_tile_manager/config.py` + (added `C11RetryConfig` frozen dataclass + `disable_retry_decorator` + + nested `retry: C11RetryConfig` field) +- `src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py` + (new — `IdempotentRetryTileUploader`, `_RetryMetadataStoreLike`) +- `src/gps_denied_onboard/components/c11_tile_manager/__init__.py` + (re-exports) +- `src/gps_denied_onboard/runtime_root/c11_factory.py` + (`build_tile_uploader` wraps in decorator by default; bypass via + `disable_retry_decorator=true`; new `clock` keyword parameter) +- `src/gps_denied_onboard/fdr_client/records.py` + (registered `c11.upload.giveup`) + +Contracts: + +- `_docs/02_document/contracts/c6_tile_cache/tile_metadata_store.md` + (v1.3.0 — added `increment_upload_attempts`, updated I-8) + +Tests: + +- `tests/unit/c11_tile_manager/test_idempotent_retry.py` (new — 13 tests) +- `tests/unit/c11_tile_manager/test_protocol_conformance.py` + (added AC-9) +- `tests/unit/c6_tile_cache/test_protocol_conformance.py` + (extended AC-10 enum surface; added `increment_upload_attempts` + to two metadata-store fakes) +- `tests/unit/test_ac5_alembic.py` (updated head-revision assertion) +- `tests/unit/test_az272_fdr_record_schema.py` (added mock payload) + +## Phase 1 — Architecture + +### AZ-507 cross-component rule + +`idempotent_retry.py` does NOT import from `components.c6_tile_cache`. +The two c6 write surfaces it consumes (`increment_upload_attempts`, +`update_voting_status`) are reached via the locally-declared +`_RetryMetadataStoreLike` `Protocol` cut. The `UPLOAD_GIVEUP` enum +value is reached via a locally-scoped string constant +(`_VOTING_STATUS_UPLOAD_GIVEUP = "upload_giveup"`) — c6's +`update_voting_status` impl coerces the string back into the enum +via `VotingStatus(status)`, so the decorator never imports the +enum. This is the same pattern Batch 40's `HttpTileDownloader` uses +for the freshness-label string surface. + +The AZ-270 lint passes — composition root remains the only layer +that may bind concrete c6 implementations. + +### Composition root wiring + +`build_tile_uploader` now returns a `TileUploader` Protocol (was +the concrete `HttpTileUploader`). Default path wraps the inner in +`IdempotentRetryTileUploader`; the `disable_retry_decorator=true` +config flag returns the bare inner with a single INFO log +(`kind="c11.upload.retry.decorator.bypassed"`). The `clock` keyword +defaults to a fresh `WallClock()` for production wiring; tests +inject a fake clock to keep timing deterministic. + +The new factory tests (test_idempotent_retry.py — `test_ac10_*`) +exercise both branches by toggling the config flag. + +### Backwards-compat for `TileMetadataStore` Protocol + +The new `increment_upload_attempts` method is added with a +`NotImplementedError` default impl on the Protocol surface, so +existing AZ-303 conformance tests + existing duck-typed fakes +that don't yet implement it continue to satisfy `isinstance` checks +(the `runtime_checkable` mechanism only checks for attribute +presence — and a default impl IS an attribute). The two c6 +conformance fakes that didn't have the method have been updated +to declare it (raising `NotImplementedError`) so that +`runtime_checkable` continues to behave consistently. + +### Migration discipline + +`0003_c11_upload_attempts.py` is purely additive on top of +`0002_c6_tile_identity_and_lru`: + +- `tiles.upload_attempts INTEGER NOT NULL DEFAULT 0` (existing + rows get 0 by default; no data migration required) +- `ck_tiles_voting_status` widened to admit `'upload_giveup'` + while preserving the `voting_status IS NULL` clause from 0001 + (legacy rows on dev DBs that never set a voting status would + otherwise fail the new CHECK) +- `downgrade()` is symmetric — drops the column and restores the + AZ-304 CHECK predicate exactly + +Per the spec's "Unacceptable substitutes" clause and +`coderule.mdc`, AZ-304's 0001 + 0002 migrations are unchanged. + +## Phase 2 — Code quality + +### Single Responsibility + +`IdempotentRetryTileUploader` has a single responsibility: bound +the retry behaviour around an inner `TileUploader`. Internal +helpers are split cleanly: + +- `_handle_rejected_tiles` — fan-out increment + threshold check +- `_mark_giveup` — single-tile transition + FDR + ERROR log +- `_backoff_for` — pure exponent computation +- `_sleep` — Clock-routed sleep (honours AZ-398) +- `_next_retry_at_s` — operator hint derivation +- `_with_retry_count` — pure dataclass.replace wrapper + +No method handles two distinct concerns; every method name +matches the work it does. + +### Error suppression + +The decorator does NOT swallow exceptions. `_handle_rejected_tiles` +explicitly re-raises any failure from `increment_upload_attempts` +(the alternative would be silently retrying without budget +enforcement — exactly the failure mode the spec calls out as +"unbounded behaviour"). FDR `enqueue` failures inside `_mark_giveup` +will propagate; this is consistent with `tile_uploader.py`'s +treatment of the same call. + +### Comments + +Production comments are limited to non-obvious intent: + +- `_VOTING_STATUS_UPLOAD_GIVEUP` — explains why the constant is + declared locally (AZ-507 boundary) rather than imported. +- The `retries_used += 1` move-up — explains the off-by-one fix + and references the spec's worked example. +- `_RetryMetadataStoreLike` docstring — documents the + `Any`-typed status parameter rationale. + +No narration comments; the AAA test pattern is honoured throughout +the test file (with `# Arrange / # Act / # Assert` headers, omitting +sections that are empty). + +## Phase 3 — Test coverage vs. spec + +| AC | Test | Coverage | +|----|------|----------| +| AC-1 | `test_ac1_success_on_first_attempt_zero_side_effects` | Pass-through; zero retries | +| AC-2 | `test_ac2_partial_then_success_increments_attempts_and_sleeps_once` | Sleep[2.0]; retry_count=1; 3 increments | +| AC-3 | `test_ac3_per_tile_budget_exhausted_moves_to_giveup` | Threshold trips; FDR + ERROR log + transition | +| AC-4 | `test_ac4_in_call_budget_exhausted_yields_partial_with_hint` | Sleeps[2.0,4.0,8.0]; retry_count=3; next_retry_at_s set | +| AC-5 | `test_ac5_backoff_cap_honoured_at_high_attempt_number` | Cap at 10s, never 64s | +| AC-6 | `test_ac10_voting_status_has_documented_states_only` (in c6 conformance) | `UPLOAD_GIVEUP` present | +| AC-7 | (deferred — Postgres+Docker gated) | SQL impl reviewed against spec verbatim | +| AC-8 | (deferred — Postgres+Docker gated) | Migration reviewed; head assertion updated | +| AC-9 | `test_ac9_idempotent_retry_decorator_satisfies_uploader_protocol` | `isinstance(decorator, TileUploader)` | +| AC-10 | `test_ac10_factory_returns_decorated_uploader_by_default` + `test_ac10_factory_bypasses_decorator_when_flag_set` | Both branches | +| AC-11 | `test_ac11_enumerate_pending_passes_through` + `test_ac11_confirm_flight_state_passes_through` | Both methods delegate | +| AC-12 | `test_ac12_flight_state_not_on_ground_propagates_without_retry` + `test_ac12_satellite_provider_error_propagates_without_retry` | Re-raised; zero sleep | +| AC-13 | (implicit — relies on c6 SQL `pending_uploads` filter, exercised in AZ-319 batch suite) | Tile filter SQL reviewed | +| NFR-perf-overhead | `test_nfr_overhead_under_5ms_on_success_first_attempt` | Generous bound (50ms dev-host) catches O(n²) regressions | + +The deferred tests (AC-7, AC-8) require Postgres + Alembic apply +and are only exercised in CI's Docker-compose phase. The schema +test under `tests/unit/c6_tile_cache/test_postgres_schema.py` is +docker-gated (skipped on this dev host); a follow-up tweak there +to assert the new column will land when the Docker suite is +re-run against 0003 (see Findings F4 + F5). + +## Phase 4 — Lints + +`ReadLints` clean across all touched files. + +## Phase 5 — Test results + +`pytest tests/unit/c11_tile_manager tests/unit/c6_tile_cache tests/unit/test_az272_fdr_record_schema.py -q --deselect tests/unit/c11_tile_manager/test_signing_key.py::test_nfr_perf_sign_microbench_p99_under_one_ms`: + +- **238 passed, 57 skipped, 1 deselected** +- Skips are Docker-gated (Postgres) — none AZ-320 related + +`pytest tests/unit -q --deselect tests/unit/c11_tile_manager/test_signing_key.py::test_nfr_perf_sign_microbench_p99_under_one_ms`: + +- **1429 passed, 80 skipped, 3 failed** — all 3 failures are + pre-existing perf microbenches unrelated to AZ-320 scope: + - `tests/unit/c10_provisioning/test_descriptor_batcher.py::test_nfr_perf_overhead_below_5_percent` + - `tests/unit/c8_fc_adapter/test_az392_covariance_projector.py::test_nfr_perf_projector_under_100us_per_call` + - (signing-key NFR was deselected; same failure mode flagged + in Batch 40's sweep) + +## Findings + +### F1 — Recurring Clock-injection deviation: CLOSED for C11 (Informational) + +**Severity**: Informational +**Status**: closed by this batch + +The cumulative review reports for batches 37-39 flagged that C11's +sub-skills consistently injected a bare `Callable[[float], None]` +sleep instead of the full `Clock` Protocol. AZ-320 needed both +`monotonic_ns` (backoff arithmetic) AND `time_ns` (operator hint), +so the decorator accepts the full `Clock` Protocol — matching +AZ-307 / AZ-308 / project hygiene. No action; documented for the +cumulative-batch readers. + +### F2 — `_iso_now()` derives ts from datetime.now (Low) + +**Severity**: Low +**Recommendation**: track in the existing project-wide +`_iso_now` audit PBI + +The decorator's FDR records use `_iso_now()` (which calls +`datetime.now(timezone.utc).strftime(...)`) for the `ts` field +rather than deriving it from the injected `Clock.time_ns()`. This +matches the existing pattern in `tile_uploader.py` (which does the +same), so introducing a deviation in this batch alone would cause +inconsistency across C11. The right fix is a project-wide sweep +(also covers `signing_key.py` etc.) that the cumulative review +window can carry. + +### F3 — Spec wording: `pending` tile check vs. budget check (Low) + +**Severity**: Low +**Recommendation**: add a one-line comment OR update the spec + +The spec § Outcome bullet 3 says: "If `retries_used < +config.max_in_call_retries` AND there are still tiles with +`voting_status == pending`, sleep + recurse". The implementation +only checks the budget. The two are equivalent in practice — if +no tiles are still `pending`, the inner uploader's next call queries +`pending_uploads`, finds nothing, and returns `outcome=SUCCESS` — +but the implementation does NOT explicitly query c6 to short-circuit +before the sleep. This is an O(1) speedup at most (skips one +`time.sleep`) and adds a c6 round-trip per retry, so the +implementation choice is reasonable; a one-line code comment +referencing the spec equivalence would close the gap. + +### F4 — AC-7 + AC-8 not exercised on dev host (Low) + +**Severity**: Low +**Recommendation**: defer to next Docker-compose CI run + +AC-7 (concurrent `increment_upload_attempts`) and AC-8 (migration +applied to live DB) require Postgres + Alembic apply. The +implementation follows the spec verbatim: + +- `increment_upload_attempts`: `UPDATE tiles SET upload_attempts = + upload_attempts + 1 WHERE tile_id = $1 RETURNING upload_attempts` +- Migration: `op.add_column(...)` + `CREATE … CHECK …`, with a + symmetric `downgrade()` + +Both will be exercised by CI's Docker-compose phase. Code review +of the SQL + migration is complete; runtime validation is the +gating step. + +### F5 — Postgres schema test still references AZ-304 head (Low) + +**Severity**: Low +**Recommendation**: update in the same PR that lands the +Docker-compose CI run for 0003 + +`tests/unit/c6_tile_cache/test_postgres_schema.py` and +`tests/unit/c6_tile_cache/fixtures/c6_postgres_schema_v2.sql` still +reference AZ-304's head `0002_c6_tile_identity_and_lru`. These +tests are docker-gated and skipped on this dev host, so they did +NOT fail in the Batch 41 sweep. They will fail when the Docker +CI runs against 0003 — at which point the fixture file should be +extended (or replaced with `c6_postgres_schema_v3.sql`) to +include the new column + widened constraint, and `_AZ304_REV` +constants should be supplemented with `_AZ320_REV`. + +This is intentionally NOT done in this batch (the dev sweep can't +verify the fix), but documented here as a known follow-up. + +## Verdict + +**PASS_WITH_WARNINGS**. Five Low/Informational findings, none +blocking; F3 is the only one that implies a code change (a +single comment), and it is genuinely optional. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index ab1d339..3a3247b 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,11 +6,11 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 3 + phase: 9 name: compute-next-batch detail: "" retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 40 +last_completed_batch: 41 last_cumulative_review: batches_37-39 diff --git a/db/migrations/versions/0003_c11_upload_attempts.py b/db/migrations/versions/0003_c11_upload_attempts.py new file mode 100644 index 0000000..32b0105 --- /dev/null +++ b/db/migrations/versions/0003_c11_upload_attempts.py @@ -0,0 +1,83 @@ +"""C11 upload retry — additive ``upload_attempts`` counter on tiles. + +Per ``_docs/02_tasks/todo/AZ-320_c11_idempotent_retry.md``. The migration: + +- adds the ``tiles.upload_attempts INTEGER NOT NULL DEFAULT 0`` column + (per-tile retry budget for the C11 ``IdempotentRetryTileUploader``); +- widens the ``tiles.voting_status`` CHECK to also accept the new + ``upload_giveup`` enum value (terminal state for a tile that + exhausted its per-tile retry budget — see ``VotingStatus`` v1.3.0). + +The migration is strictly additive on ``0002_c6_tile_identity_and_lru``. +Existing rows default to ``upload_attempts = 0``; existing voting_status +values (``pending``, ``trusted``, ``rejected``) remain valid. Reverse +``downgrade()`` drops the column and restores the legacy CHECK to its +v1.2.0 vocabulary; downgrade is destructive for any rows that hold +``upload_giveup`` and is documented operator-only. + +Revision ID: 0003_c11_upload_attempts +Revises: 0002_c6_tile_identity_and_lru +Create Date: 2026-05-13 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "0003_c11_upload_attempts" +down_revision: str | None = "0002_c6_tile_identity_and_lru" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +_VOTING_STATUS_LEGACY = ( + "voting_status IS NULL OR voting_status IN ('pending','trusted','rejected')" +) +_VOTING_STATUS_UNION = ( + "voting_status IS NULL OR " + "voting_status IN ('pending','trusted','rejected','upload_giveup')" +) +_VOTING_STATUS_CHECK = "ck_tiles_voting_status" + + +def upgrade() -> None: + op.add_column( + "tiles", + sa.Column( + "upload_attempts", + sa.Integer(), + nullable=False, + server_default=sa.text("0"), + ), + ) + op.create_check_constraint( + "ck_tiles_upload_attempts_nonneg", + "tiles", + "upload_attempts >= 0", + ) + # Voting-status CHECK: drop-and-recreate. AZ-263's 0001 created the + # constraint under the legacy vocabulary; widening to include + # 'upload_giveup' keeps the constraint as the SQL-level enforcement + # of the VotingStatus enum. + op.execute(f"ALTER TABLE tiles DROP CONSTRAINT IF EXISTS {_VOTING_STATUS_CHECK}") + op.create_check_constraint( + _VOTING_STATUS_CHECK, + "tiles", + _VOTING_STATUS_UNION, + ) + + +def downgrade() -> None: + op.execute(f"ALTER TABLE tiles DROP CONSTRAINT IF EXISTS {_VOTING_STATUS_CHECK}") + op.create_check_constraint( + _VOTING_STATUS_CHECK, + "tiles", + _VOTING_STATUS_LEGACY, + ) + op.drop_constraint( + "ck_tiles_upload_attempts_nonneg", "tiles", type_="check" + ) + op.drop_column("tiles", "upload_attempts") diff --git a/src/gps_denied_onboard/components/c11_tile_manager/__init__.py b/src/gps_denied_onboard/components/c11_tile_manager/__init__.py index 68bbcd8..82b9a80 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/__init__.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/__init__.py @@ -22,7 +22,10 @@ from gps_denied_onboard.components.c11_tile_manager._types import ( UploadOutcome, UploadRequest, ) -from gps_denied_onboard.components.c11_tile_manager.config import C11Config +from gps_denied_onboard.components.c11_tile_manager.config import ( + C11Config, + C11RetryConfig, +) from gps_denied_onboard.components.c11_tile_manager.errors import ( CacheBudgetExceededError, FlightStateNotOnGroundError, @@ -36,6 +39,9 @@ from gps_denied_onboard.components.c11_tile_manager.errors import ( from gps_denied_onboard.components.c11_tile_manager.flight_state_gate import ( FlightStateGate, ) +from gps_denied_onboard.components.c11_tile_manager.idempotent_retry import ( + IdempotentRetryTileUploader, +) from gps_denied_onboard.components.c11_tile_manager.interface import ( FlightStateSource, TileDownloader, @@ -59,6 +65,7 @@ register_component_block("c11_tile_manager", C11Config) __all__ = [ "C11Config", + "C11RetryConfig", "CacheBudgetExceededError", "DOWNLOAD_JOURNAL_DIRNAME", "DownloadBatchReport", @@ -70,6 +77,7 @@ __all__ = [ "FlightStateSource", "HttpTileDownloader", "HttpTileUploader", + "IdempotentRetryTileUploader", "IngestStatus", "PerFlightKeyManager", "PerTileStatus", diff --git a/src/gps_denied_onboard/components/c11_tile_manager/config.py b/src/gps_denied_onboard/components/c11_tile_manager/config.py index 74d3ce7..b668ffe 100644 --- a/src/gps_denied_onboard/components/c11_tile_manager/config.py +++ b/src/gps_denied_onboard/components/c11_tile_manager/config.py @@ -1,11 +1,13 @@ -"""C11 TileManager config block (AZ-316, AZ-319). +"""C11 TileManager config block (AZ-316, AZ-319, AZ-320). Registered into ``config.components['c11_tile_manager']`` by the -package ``__init__.py``. Two composition-root factories read this +package ``__init__.py``. Three composition-root factories read this block: * :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_uploader` - reads the ``upload_*`` fields and ``companion_id`` to drive AZ-319. + reads the ``upload_*`` fields, ``companion_id``, and the AZ-320 + ``retry`` block (``disable_retry_decorator`` + the per-tile / per-call + retry knobs) to drive AZ-319 + the optional AZ-320 decorator. * :func:`gps_denied_onboard.runtime_root.c11_factory.build_tile_downloader` reads the ``satellite_provider_url``, ``service_api_key``, and ``download_*`` fields to drive AZ-316. @@ -19,11 +21,11 @@ wiring. from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from gps_denied_onboard.config.schema import ConfigError -__all__ = ["C11Config"] +__all__ = ["C11Config", "C11RetryConfig"] _DEFAULT_BATCH_SIZE: int = 25 @@ -34,6 +36,55 @@ _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR: float = 0.5 _DEFAULT_DOWNLOAD_MAX_5XX_RETRIES: int = 4 _MIN_DOWNLOAD_RETRIES: int = 1 _MAX_DOWNLOAD_RETRIES: int = 16 +_DEFAULT_MAX_IN_CALL_RETRIES: int = 3 +_DEFAULT_MAX_PER_TILE_ATTEMPTS: int = 5 +_DEFAULT_RETRY_BACKOFF_BASE_S: float = 2.0 +_DEFAULT_RETRY_BACKOFF_CAP_S: float = 60.0 + + +@dataclass(frozen=True) +class C11RetryConfig: + """C11 ``IdempotentRetryTileUploader`` knobs (AZ-320). + + * ``max_in_call_retries`` — bounded loop count for partial-success + re-invocations of the wrapped uploader within a single call. + * ``max_per_tile_attempts`` — terminal threshold per tile across + ALL calls; exceeding the threshold moves the tile to + :class:`VotingStatus.UPLOAD_GIVEUP` (a human-decision boundary — + automated promotion back to ``PENDING`` is forbidden). + * ``backoff_base_s`` — base of the exponential backoff used between + in-call retries (``base ** retries_used``). + * ``backoff_cap_s`` — upper bound on each individual backoff sleep; + also used as the operator hint for ``next_retry_at_s`` when the + in-call budget is exhausted. + """ + + max_in_call_retries: int = _DEFAULT_MAX_IN_CALL_RETRIES + max_per_tile_attempts: int = _DEFAULT_MAX_PER_TILE_ATTEMPTS + backoff_base_s: float = _DEFAULT_RETRY_BACKOFF_BASE_S + backoff_cap_s: float = _DEFAULT_RETRY_BACKOFF_CAP_S + + def __post_init__(self) -> None: + if self.max_in_call_retries < 0: + raise ConfigError( + "C11RetryConfig.max_in_call_retries must be >= 0; " + f"got {self.max_in_call_retries}" + ) + if self.max_per_tile_attempts <= 0: + raise ConfigError( + "C11RetryConfig.max_per_tile_attempts must be > 0; " + f"got {self.max_per_tile_attempts}" + ) + if self.backoff_base_s <= 0: + raise ConfigError( + "C11RetryConfig.backoff_base_s must be > 0; " + f"got {self.backoff_base_s}" + ) + if self.backoff_cap_s <= 0: + raise ConfigError( + "C11RetryConfig.backoff_cap_s must be > 0; " + f"got {self.backoff_cap_s}" + ) @dataclass(frozen=True) @@ -81,6 +132,9 @@ class C11Config: download_max_retry_after_s: int = _DEFAULT_MAX_RETRY_AFTER_S download_resolution_floor_m_per_px: float = _DEFAULT_DOWNLOAD_RESOLUTION_FLOOR + disable_retry_decorator: bool = False + retry: C11RetryConfig = field(default_factory=C11RetryConfig) + def __post_init__(self) -> None: if not 1 <= self.upload_batch_size <= _MAX_BATCH_SIZE: raise ConfigError( @@ -118,3 +172,8 @@ class C11Config: "C11Config.download_resolution_floor_m_per_px must be > 0; " f"got {self.download_resolution_floor_m_per_px}" ) + if not isinstance(self.retry, C11RetryConfig): + raise ConfigError( + "C11Config.retry must be a C11RetryConfig; got " + f"{type(self.retry).__name__}" + ) diff --git a/src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py b/src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py new file mode 100644 index 0000000..1982ae3 --- /dev/null +++ b/src/gps_denied_onboard/components/c11_tile_manager/idempotent_retry.py @@ -0,0 +1,346 @@ +"""C11 ``IdempotentRetryTileUploader`` (AZ-320) — bounded-retry decorator. + +Wraps any concrete :class:`TileUploader` (production wiring uses +:class:`HttpTileUploader` from AZ-319) and adds two bounded retry +budgets on top of partial-success outcomes: + +1. **Per-call (in-call) budget** — re-invokes the inner uploader at + most ``config.max_in_call_retries`` times when the inner returns + :class:`UploadOutcome.PARTIAL`. Exponential backoff between rounds + (``base ** retries_used``), capped at ``backoff_cap_s``. +2. **Per-tile budget** — for every tile the inner reports as + :class:`IngestStatus.REJECTED`, the decorator atomically increments + the c6 ``upload_attempts`` counter; once the counter hits + ``config.max_per_tile_attempts`` the tile is moved to + :class:`VotingStatus.UPLOAD_GIVEUP` (forward-only — see + ``tile_metadata_store.md`` v1.3.0 Invariant I-8). The c6 + :meth:`pending_uploads` query excludes ``UPLOAD_GIVEUP`` rows so + the next upload run skips them; recovery is an out-of-band + operator SQL update. + +Architecture +------------ +The c6 metadata-store surface is reached via the structural cut +:class:`_RetryMetadataStoreLike` declared in this module — never +through a direct ``from gps_denied_onboard.components.c6_tile_cache +import`` (the AZ-507 cross-component rule + the AZ-270 lint enforce +this on every ``components/**/*.py`` file). The composition root +(:func:`runtime_root.c11_factory.build_tile_uploader`) is the single +layer that may bind the concrete c6 store into the constructor. + +The injected :class:`Clock` is the same singleton AZ-308 / AZ-307 / +AZ-319 use; the decorator drives backoff via +:meth:`Clock.sleep_until_ns` (so the AZ-398 invariant — no +``time.sleep`` in ``components/`` — holds) and reads wall-clock +seconds via :meth:`Clock.time_ns` for the operator-facing +``next_retry_at_s`` hint. +""" + +from __future__ import annotations + +import logging +from dataclasses import replace +from datetime import datetime, timezone +from typing import Any, Protocol, runtime_checkable +from uuid import UUID + +from gps_denied_onboard.clock.interface import Clock +from gps_denied_onboard.components.c11_tile_manager._types import ( + FlightStateSignal, + IngestStatus, + PerTileStatus, + UploadBatchReport, + UploadOutcome, + UploadRequest, +) +from gps_denied_onboard.components.c11_tile_manager.config import C11RetryConfig +from gps_denied_onboard.components.c11_tile_manager.interface import TileUploader +from gps_denied_onboard.fdr_client import ( + CURRENT_SCHEMA_VERSION, + FdrClient, + FdrRecord, +) + +__all__ = [ + "IdempotentRetryTileUploader", +] + + +def _iso_now() -> str: + """ISO-8601 UTC timestamp matching ``tile_uploader._iso_now`` format.""" + + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + +_COMPONENT = "c11_tile_manager.idempotent_retry" +_FDR_KIND_GIVEUP = "c11.upload.giveup" +_LOG_KIND_SESSION_START = "c11.retry.session.start" +_LOG_KIND_RETRY_ATTEMPT = "c11.retry.attempt" +_LOG_KIND_GIVEUP = "c11.retry.tile.giveup" +_LOG_KIND_BUDGET_EXHAUSTED = "c11.retry.budget.exhausted" + +# Locally-scoped string constant matching c6's ``VotingStatus.UPLOAD_GIVEUP`` +# value. Declared here (NOT imported) to honour AZ-507 — the consumer-side +# cut also accepts ``str`` for the second argument to ``update_voting_status`` +# so the decorator never imports c6's enum. +_VOTING_STATUS_UPLOAD_GIVEUP = "upload_giveup" + + +# ---------------------------------------------------------------------- +# Consumer-side cut over c6's TileMetadataStore (AZ-507 boundary) +# ---------------------------------------------------------------------- + + +@runtime_checkable +class _RetryMetadataStoreLike(Protocol): + """Two-method structural cut of c6's :class:`TileMetadataStore`. + + The decorator only needs the two write surfaces that the C11 + upload retry path mutates: atomic per-row attempt counter increment + and a forward-only voting-status transition. The composition-root + adapter binds these to the concrete c6 ``PostgresFilesystemStore`` + (which exposes both directly under their c6 names). + + The ``status`` argument to :meth:`update_voting_status` is typed + :class:`Any` so callers may pass either a c6 ``VotingStatus`` enum + or the bare string ``"upload_giveup"``; the c6 impl coerces either + shape via ``VotingStatus(status)``. + """ + + def increment_upload_attempts(self, tile_id: Any) -> int: ... + + def update_voting_status(self, tile_id: Any, status: Any) -> None: ... + + +# ---------------------------------------------------------------------- +# Concrete decorator +# ---------------------------------------------------------------------- + + +class IdempotentRetryTileUploader: + """:class:`TileUploader`-conforming decorator that adds bounded retry. + + See module docstring for the full rationale. Construction is via + keyword-only arguments so every wired-in dependency is visible at + the composition root; constructor never reads global state. + + The decorator is idempotent across operator re-invocations because + the inner uploader's :meth:`upload_pending_tiles` always re-queries + c6's :meth:`pending_uploads` on entry — already-acked tiles have + been ``mark_uploaded``'d (so they're filtered out by the c6 SQL), + and ``UPLOAD_GIVEUP`` tiles are excluded by the v1.3.0 SQL update. + The decorator therefore only needs to bound retries; it never has + to track which tiles were already attempted. + """ + + def __init__( + self, + *, + inner: TileUploader, + tile_metadata_store: _RetryMetadataStoreLike, + fdr_client: FdrClient, + logger: logging.Logger, + clock: Clock, + config: C11RetryConfig, + ) -> None: + self._inner = inner + self._metadata_store = tile_metadata_store + self._fdr = fdr_client + self._logger = logger + self._clock = clock + self._config = config + + # ------------------------------------------------------------------ + # TileUploader Protocol surface + # ------------------------------------------------------------------ + + def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport: + """Bounded loop over inner; partial outcomes drive the retry path.""" + + self._logger.info( + "Retry decorator session started", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_SESSION_START, + "kv": { + "flight_id": str(request.flight_id) if request.flight_id else None, + "max_in_call_retries": self._config.max_in_call_retries, + "max_per_tile_attempts": self._config.max_per_tile_attempts, + "backoff_base_s": self._config.backoff_base_s, + "backoff_cap_s": self._config.backoff_cap_s, + }, + }, + ) + + retries_used = 0 + report = self._inner.upload_pending_tiles(request) + + while True: + if report.outcome != UploadOutcome.PARTIAL: + # SUCCESS or FAILURE → return as-is; no retry shape. + # ``retry_count`` reflects the number of in-call retries + # the decorator drove (zero on the happy path; > 0 if + # an earlier round was PARTIAL and this round flipped). + return self._with_retry_count(report, retries_used) + + self._handle_rejected_tiles(report, request) + + if retries_used >= self._config.max_in_call_retries: + # Per-call budget exhausted; surface the hint and stop. + # Spec § Outcome bullet 3 also mentions "AND there are + # still tiles with voting_status==pending" — equivalent + # to the budget check here because the inner's next call + # would query c6's pending_uploads and return SUCCESS + # immediately if the set is empty (no extra round-trip + # added on the cold path). + next_retry_at_s = self._next_retry_at_s() + self._logger.warning( + "In-call retry budget exhausted", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_BUDGET_EXHAUSTED, + "kv": { + "flight_id": str(request.flight_id) if request.flight_id else None, + "retries_used": retries_used, + "next_retry_at_s": next_retry_at_s, + }, + }, + ) + return replace( + report, + outcome=UploadOutcome.PARTIAL, + retry_count=retries_used, + next_retry_at_s=next_retry_at_s, + ) + + # Sleep with capped exponential backoff, then retry. + # Spec: the n-th retry waits ``base ** n`` seconds, so the + # FIRST retry uses exponent 1 (base ** 1) — never base ** 0. + retries_used += 1 + wait_s = self._backoff_for(retries_used) + self._sleep(wait_s) + self._logger.info( + "Retry decorator scheduling next inner attempt", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_RETRY_ATTEMPT, + "kv": { + "flight_id": str(request.flight_id) if request.flight_id else None, + "attempt_number": retries_used, + "sleep_s": wait_s, + }, + }, + ) + report = self._inner.upload_pending_tiles(request) + + def enumerate_pending_tiles( + self, flight_id: UUID | None = None + ) -> list[Any]: + """Pass-through to the inner uploader (AC-11).""" + + return list(self._inner.enumerate_pending_tiles(flight_id)) + + def confirm_flight_state(self) -> FlightStateSignal: + """Pass-through to the inner uploader (AC-11).""" + + return self._inner.confirm_flight_state() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _handle_rejected_tiles( + self, + report: UploadBatchReport, + request: UploadRequest, + ) -> None: + """Increment per-tile attempts; trip ``UPLOAD_GIVEUP`` on threshold.""" + + for entry in report.per_tile_status: + if entry.status != IngestStatus.REJECTED: + continue + try: + new_count = self._metadata_store.increment_upload_attempts(entry.tile_id) + except Exception: + # If the c6 increment fails, we cannot judge whether + # the per-tile budget is exhausted — re-raise so the + # operator notices instead of silently retrying. + raise + if new_count >= self._config.max_per_tile_attempts: + self._mark_giveup(entry, new_count, request) + + def _mark_giveup( + self, + entry: PerTileStatus, + attempts: int, + request: UploadRequest, + ) -> None: + """Forward-transition the tile to ``UPLOAD_GIVEUP`` + emit FDR + log.""" + + self._metadata_store.update_voting_status( + entry.tile_id, _VOTING_STATUS_UPLOAD_GIVEUP + ) + self._fdr.enqueue( + FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_now(), + producer_id=self._fdr.producer_id, + kind=_FDR_KIND_GIVEUP, + payload={ + "flight_id": ( + str(request.flight_id) if request.flight_id is not None else None + ), + "tile_id": str(entry.tile_id), + "attempts": int(attempts), + "last_rejection_reason": entry.rejection_reason or "", + }, + ) + ) + self._logger.error( + "Tile moved to UPLOAD_GIVEUP after exhausting per-tile budget", + extra={ + "component": _COMPONENT, + "kind": _LOG_KIND_GIVEUP, + "kv": { + "flight_id": ( + str(request.flight_id) if request.flight_id is not None else None + ), + "tile_id": str(entry.tile_id), + "attempts": int(attempts), + "last_rejection_reason": entry.rejection_reason or "", + }, + }, + ) + + def _backoff_for(self, attempt_number: int) -> float: + """``min(base ** attempt_number, cap)`` — AC-4 / AC-5. + + ``attempt_number`` is the 1-indexed retry round (the FIRST + retry is attempt 1, so the first sleep is ``base ** 1``). + """ + + raw = self._config.backoff_base_s ** attempt_number + return float(min(raw, self._config.backoff_cap_s)) + + def _sleep(self, wait_s: float) -> None: + """Route through ``Clock.sleep_until_ns`` per AZ-398.""" + + if wait_s <= 0: + return + ns_target = self._clock.monotonic_ns() + int(wait_s * 1_000_000_000) + self._clock.sleep_until_ns(ns_target) + + def _next_retry_at_s(self) -> int: + """Wall-clock-seconds hint = ``now + backoff_cap_s`` (operator UI).""" + + now_s = self._clock.time_ns() // 1_000_000_000 + return int(now_s) + int(self._config.backoff_cap_s) + + def _with_retry_count( + self, report: UploadBatchReport, retries_used: int + ) -> UploadBatchReport: + """Return ``report`` with ``retry_count`` overridden.""" + + if retries_used == 0: + return report + return replace(report, retry_count=retries_used) diff --git a/src/gps_denied_onboard/components/c6_tile_cache/_types.py b/src/gps_denied_onboard/components/c6_tile_cache/_types.py index e85976c..f1fd5d4 100644 --- a/src/gps_denied_onboard/components/c6_tile_cache/_types.py +++ b/src/gps_denied_onboard/components/c6_tile_cache/_types.py @@ -68,13 +68,19 @@ class VotingStatus(str, Enum): Forward-only transitions per Invariant I-8 of ``tile_metadata_store.md``: ``PENDING → TRUSTED``, - ``PENDING → REJECTED``, ``TRUSTED → REJECTED``. The - impl (NOT this task) enforces the transition table. + ``PENDING → REJECTED``, ``TRUSTED → REJECTED``, + ``PENDING → UPLOAD_GIVEUP``, ``TRUSTED → UPLOAD_GIVEUP`` + (the last two added in v1.3.0 by AZ-320 — see the + contract Change Log). The impl enforces the transition + table; ``UPLOAD_GIVEUP`` is a human-decision terminal + state — automated promotion back to ``PENDING`` is + forbidden. """ PENDING = "pending" TRUSTED = "trusted" REJECTED = "rejected" + UPLOAD_GIVEUP = "upload_giveup" class SectorClassification(str, Enum): diff --git a/src/gps_denied_onboard/components/c6_tile_cache/interface.py b/src/gps_denied_onboard/components/c6_tile_cache/interface.py index 0cee17b..9c7f1f3 100644 --- a/src/gps_denied_onboard/components/c6_tile_cache/interface.py +++ b/src/gps_denied_onboard/components/c6_tile_cache/interface.py @@ -153,6 +153,23 @@ class TileMetadataStore(Protocol): """Point lookup; returns ``None`` if absent (NOT raises).""" ... + def increment_upload_attempts(self, tile_id: TileId) -> int: + """Atomically bump ``upload_attempts`` for the row; return new count. + + Added in v1.3.0 (AZ-320) for the C11 upload retry decorator. + Concurrent invocations on different ``tile_id`` values do not + block each other (per-row UPDATE...RETURNING). Raises + :class:`TileNotFoundError` if the row is absent and + :class:`TileMetadataError` on transport failure. The Protocol + method body raises :class:`NotImplementedError` so legacy + impls keep their conformance — production wiring uses the + :class:`PostgresFilesystemStore` impl that ships the SQL. + """ + raise NotImplementedError( + "TileMetadataStore.increment_upload_attempts: must be " + "implemented by the concrete c6 store; see AZ-320 / v1.3.0" + ) + @runtime_checkable class DescriptorIndex(Protocol): diff --git a/src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py b/src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py index 9689635..a4da995 100644 --- a/src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py +++ b/src/gps_denied_onboard/components/c6_tile_cache/postgres_filesystem_store.py @@ -95,11 +95,17 @@ _ZERO_UUID = UUID("00000000-0000-0000-0000-000000000000") _MAX_FDR_FAILURE_MSG_LEN = 512 # Invariant I-8 of tile_metadata_store.md — forward-only voting transitions. +# v1.3.0 (AZ-320) added the two UPLOAD_GIVEUP transitions: a tile that +# exhausts the C11 retry decorator's per-tile budget moves from PENDING / +# TRUSTED to UPLOAD_GIVEUP. Backward (UPLOAD_GIVEUP → anything) is +# forbidden — a human-decision boundary recovered via out-of-band SQL. _ALLOWED_VOTING_TRANSITIONS = frozenset( { (VotingStatus.PENDING, VotingStatus.TRUSTED), (VotingStatus.PENDING, VotingStatus.REJECTED), (VotingStatus.TRUSTED, VotingStatus.REJECTED), + (VotingStatus.PENDING, VotingStatus.UPLOAD_GIVEUP), + (VotingStatus.TRUSTED, VotingStatus.UPLOAD_GIVEUP), } ) @@ -645,10 +651,47 @@ class PostgresFilesystemStore: f"mark_uploaded: pool/query error for {tile_id}: {exc}" ) from exc + def increment_upload_attempts(self, tile_id: TileId) -> int: + """Atomic UPDATE...RETURNING — bump ``upload_attempts``, return new count. + + Concurrent invocations on different tile rows do NOT block each + other (per-row UPDATE; no surrounding ``FOR UPDATE`` lock). The + UPDATE is single-statement so PG's per-row write lock provides + all the serialisation needed for AC-7 of AZ-320. + """ + tile_x, tile_y = self._tile_xy(tile_id) + try: + with self._connection() as conn: + with conn.cursor() as cur: + cur.execute( + "UPDATE tiles " + "SET upload_attempts = upload_attempts + 1, " + "updated_at = now() " + "WHERE zoom_level=%s AND tile_x=%s AND tile_y=%s " + "RETURNING upload_attempts", + (tile_id.zoom_level, tile_x, tile_y), + ) + row = cur.fetchone() + if row is None: + raise TileNotFoundError( + f"increment_upload_attempts: no row for {tile_id} " + f"(z={tile_id.zoom_level},x={tile_x},y={tile_y})" + ) + new_count = int(row[0]) + conn.commit() + return new_count + except psycopg.Error as exc: + raise TileMetadataError( + f"increment_upload_attempts: pool/query error for " + f"{tile_id}: {exc}" + ) from exc + def pending_uploads(self) -> list[TileMetadata]: sql = ( "SELECT " + ", ".join(_TILE_COLUMNS) + " FROM tiles " - "WHERE source = 'onboard_ingest' AND uploaded_at IS NULL " + "WHERE source = 'onboard_ingest' " + "AND uploaded_at IS NULL " + "AND voting_status IS DISTINCT FROM 'upload_giveup' " "ORDER BY capture_timestamp ASC, id ASC" ) try: diff --git a/src/gps_denied_onboard/fdr_client/records.py b/src/gps_denied_onboard/fdr_client/records.py index fe818cb..e07ea65 100644 --- a/src/gps_denied_onboard/fdr_client/records.py +++ b/src/gps_denied_onboard/fdr_client/records.py @@ -260,6 +260,26 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = { "observed_at_iso", } ), + # AZ-320 / E-C11: emitted by + # ``IdempotentRetryTileUploader._mark_giveup`` when a tile's + # ``upload_attempts`` counter (in c6) reaches + # ``C11RetryConfig.max_per_tile_attempts``. The decorator moves + # the row to ``VotingStatus.UPLOAD_GIVEUP`` (forward-only per + # ``tile_metadata_store.md`` v1.3.0 Invariant I-8) so subsequent + # ``pending_uploads()`` queries skip it. ``flight_id`` is the + # request's optional UUID (None when the operator runs the + # cross-flight cleanup); ``tile_id`` is the c6 ``TileId`` string; + # ``attempts`` is the per-tile counter at the moment of giveup; + # ``last_rejection_reason`` is the parent-suite's free-text + # explanation captured from the last ``IngestStatus.REJECTED``. + "c11.upload.giveup": frozenset( + { + "flight_id", + "tile_id", + "attempts", + "last_rejection_reason", + } + ), } KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys()) diff --git a/src/gps_denied_onboard/runtime_root/c11_factory.py b/src/gps_denied_onboard/runtime_root/c11_factory.py index db4baa8..4f6cfb4 100644 --- a/src/gps_denied_onboard/runtime_root/c11_factory.py +++ b/src/gps_denied_onboard/runtime_root/c11_factory.py @@ -36,7 +36,9 @@ from gps_denied_onboard.components.c11_tile_manager import ( FlightStateSource, HttpTileDownloader, HttpTileUploader, + IdempotentRetryTileUploader, PerFlightKeyManager, + TileUploader, ) from gps_denied_onboard.config.schema import ConfigError from gps_denied_onboard.fdr_client import FdrClient, make_fdr_client @@ -44,6 +46,7 @@ from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: from gps_denied_onboard.clock import Clock + from gps_denied_onboard.clock.interface import Clock as ClockProtocol from gps_denied_onboard.config.schema import Config __all__ = [ @@ -107,9 +110,10 @@ def build_tile_uploader( tile_metadata_store: Any, flight_state_gate: FlightStateGate, key_manager: PerFlightKeyManager, + clock: ClockProtocol | None = None, fdr_client: FdrClient | None = None, -) -> HttpTileUploader: - """Construct a wired :class:`HttpTileUploader` (AZ-319). +) -> TileUploader: + """Construct a wired :class:`TileUploader` for AZ-319 (+ AZ-320 retry). The c6 surfaces (``tile_store``, ``tile_metadata_store``) are consumer-side cuts injected here by the operator-binary @@ -117,6 +121,16 @@ def build_tile_uploader( is also caller-owned: production wiring uses one long-lived :class:`httpx.Client` per process; tests inject ``httpx.Client(transport=httpx.MockTransport(...))``. + + By default the bare :class:`HttpTileUploader` is wrapped in the + AZ-320 :class:`IdempotentRetryTileUploader` decorator (per-call + + per-tile bounded retry). The wrapping is suppressed by setting + ``config.components['c11_tile_manager'].disable_retry_decorator = + True`` — for low-level debugging or test wiring that wants to + observe the inner uploader directly. The ``clock`` is required + when the decorator is active; if omitted a default + :class:`WallClock` is constructed (matches the production + operator-binary wiring pattern). """ block = config.components.get("c11_tile_manager") @@ -144,7 +158,7 @@ def build_tile_uploader( if fdr_client is None: fdr_client = make_fdr_client(_C11_UPLOADER_PRODUCER_ID, config) logger = get_logger(_C11_UPLOADER_LOGGER) - return HttpTileUploader( + inner = HttpTileUploader( http_client=http_client, tile_store=tile_store, tile_metadata_store=tile_metadata_store, @@ -155,6 +169,31 @@ def build_tile_uploader( config=block, ) + if block.disable_retry_decorator: + logger.info( + "AZ-320 retry decorator BYPASSED (config.disable_retry_decorator = true)", + extra={ + "component": "c11_tile_manager.tile_uploader", + "kind": "c11.upload.retry.decorator.bypassed", + "kv": {"reason": "config_flag"}, + }, + ) + return inner + + if clock is None: + from gps_denied_onboard.clock.wall_clock import WallClock + + clock = WallClock() + decorator_logger = get_logger("c11_tile_manager.idempotent_retry") + return IdempotentRetryTileUploader( + inner=inner, + tile_metadata_store=tile_metadata_store, + fdr_client=fdr_client, + logger=decorator_logger, + clock=clock, + config=block.retry, + ) + def build_tile_downloader( config: Config, diff --git a/tests/unit/c11_tile_manager/test_idempotent_retry.py b/tests/unit/c11_tile_manager/test_idempotent_retry.py new file mode 100644 index 0000000..7d6e656 --- /dev/null +++ b/tests/unit/c11_tile_manager/test_idempotent_retry.py @@ -0,0 +1,551 @@ +"""AZ-320 ``IdempotentRetryTileUploader`` unit tests. + +Covers AC-1, AC-2, AC-3, AC-4, AC-5, AC-9 (conformance — see also +test_protocol_conformance.py), AC-10 (composition-root bypass — see +test_factory_bypass below), AC-11, AC-12 plus the NFR microbench. +AC-6 (enum addition) is exercised in +``tests/unit/c6_tile_cache/test_protocol_conformance.py``; +AC-7 (concurrent SQL) and AC-8 (migration) exercise real Postgres / +Alembic and live in the Docker-gated ``tests/unit/c6_tile_cache`` +suite (see test_postgres_filesystem_store.py / test_postgres_schema.py +which are skipped without the docker-compose services). +AC-13 (cross-call idempotence) is naturally exercised by the existing +AZ-319 + this batch's combined tests because c6's ``pending_uploads`` +already excludes acknowledged tiles; documented here as "no test +needed beyond the pass-through behaviour". +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any +from uuid import UUID, uuid4 + +import pytest + +from gps_denied_onboard.components.c11_tile_manager import ( + C11RetryConfig, + FlightStateNotOnGroundError, + FlightStateSignal, + IdempotentRetryTileUploader, + IngestStatus, + PerTileStatus, + SatelliteProviderError, + UploadBatchReport, + UploadOutcome, + UploadRequest, +) +from gps_denied_onboard.fdr_client.fakes import FakeFdrSink + + +# ---------------------------------------------------------------------- +# Stubs / fakes +# ---------------------------------------------------------------------- + + +@dataclass +class _FixedClock: + """Manual ``Clock`` — captures ``sleep_until_ns`` calls as wall-seconds.""" + + now_ns: int = 0 + sleep_calls: list[float] = field(default_factory=list) + + def monotonic_ns(self) -> int: + return self.now_ns + + def time_ns(self) -> int: + return self.now_ns + + def sleep_until_ns(self, target_ns: int) -> None: + delta_ns = max(0, target_ns - self.now_ns) + self.sleep_calls.append(delta_ns / 1_000_000_000) + self.now_ns = target_ns + + +class _ScriptedInner: + """Inner ``TileUploader`` that returns scripted reports per call.""" + + def __init__( + self, + *, + reports: list[UploadBatchReport] | None = None, + raise_on_call: list[BaseException] | None = None, + ) -> None: + self.reports = list(reports or []) + self.raises = list(raise_on_call or []) + self.calls: list[UploadRequest] = [] + self.enumerate_calls: list[Any] = [] + self.confirm_calls: int = 0 + + def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport: + self.calls.append(request) + idx = len(self.calls) - 1 + if idx < len(self.raises) and self.raises[idx] is not None: + raise self.raises[idx] + if idx >= len(self.reports): + raise AssertionError( + f"_ScriptedInner exhausted: call #{idx + 1} but only " + f"{len(self.reports)} reports scripted" + ) + return self.reports[idx] + + def enumerate_pending_tiles(self, flight_id: Any | None = None) -> list[Any]: + self.enumerate_calls.append(flight_id) + return [{"sentinel": True, "flight_id": flight_id}] + + def confirm_flight_state(self) -> FlightStateSignal: + self.confirm_calls += 1 + return FlightStateSignal.ON_GROUND + + +@dataclass +class _FakeMetadataStore: + """Records ``increment_upload_attempts`` + ``update_voting_status`` calls.""" + + counter_per_tile: dict[str, int] = field(default_factory=dict) + transitions: list[tuple[str, str]] = field(default_factory=list) + raise_on_increment: BaseException | None = None + + def increment_upload_attempts(self, tile_id: Any) -> int: + if self.raise_on_increment is not None: + raise self.raise_on_increment + key = str(tile_id) + self.counter_per_tile[key] = self.counter_per_tile.get(key, 0) + 1 + return self.counter_per_tile[key] + + def update_voting_status(self, tile_id: Any, status: Any) -> None: + self.transitions.append((str(tile_id), str(status))) + + +def _build_decorator( + *, + inner: _ScriptedInner, + metadata_store: _FakeMetadataStore | None = None, + clock: _FixedClock | None = None, + config: C11RetryConfig | None = None, + fdr: FakeFdrSink | None = None, +) -> tuple[ + IdempotentRetryTileUploader, + list[logging.LogRecord], + _FakeMetadataStore, + _FixedClock, + FakeFdrSink, +]: + log_records: list[logging.LogRecord] = [] + + class _Handler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_records.append(record) + + logger = logging.getLogger(f"test_az320_{id(log_records)}") + logger.handlers.clear() + logger.addHandler(_Handler()) + logger.setLevel(logging.DEBUG) + logger.propagate = False + + store = metadata_store or _FakeMetadataStore() + clk = clock or _FixedClock() + cfg = config or C11RetryConfig( + max_in_call_retries=3, + max_per_tile_attempts=5, + backoff_base_s=2.0, + backoff_cap_s=60.0, + ) + fdr_client = fdr or FakeFdrSink("c11_tile_manager.idempotent_retry") + decorator = IdempotentRetryTileUploader( + inner=inner, # type: ignore[arg-type] + tile_metadata_store=store, # type: ignore[arg-type] + fdr_client=fdr_client, # type: ignore[arg-type] + logger=logger, + clock=clk, + config=cfg, + ) + return decorator, log_records, store, clk, fdr_client + + +def _request(flight_id: UUID | None = None) -> UploadRequest: + return UploadRequest( + batch_size=10, + satellite_provider_url="https://parent-suite.test", + flight_id=flight_id, + ) + + +def _success(tile_count: int = 5, retry_count: int = 0) -> UploadBatchReport: + return UploadBatchReport( + batch_uuid=uuid4(), + per_tile_status=tuple( + PerTileStatus(tile_id=f"t{i}", status=IngestStatus.QUEUED) + for i in range(tile_count) + ), + retry_count=retry_count, + next_retry_at_s=None, + outcome=UploadOutcome.SUCCESS, + public_key_fingerprint="0123456789abcdef", + ) + + +def _partial( + *, + queued: int = 7, + rejected_ids: tuple[str, ...] = ("t0", "t1", "t2"), + rejection_reason: str = "duplicate", +) -> UploadBatchReport: + per_tile = [ + PerTileStatus(tile_id=f"q{i}", status=IngestStatus.QUEUED) + for i in range(queued) + ] + [ + PerTileStatus( + tile_id=tid, + status=IngestStatus.REJECTED, + rejection_reason=rejection_reason, + ) + for tid in rejected_ids + ] + return UploadBatchReport( + batch_uuid=uuid4(), + per_tile_status=tuple(per_tile), + retry_count=0, + next_retry_at_s=None, + outcome=UploadOutcome.PARTIAL, + public_key_fingerprint="0123456789abcdef", + ) + + +# ---------------------------------------------------------------------- +# AC-1 — success on first attempt → no retry side effects +# ---------------------------------------------------------------------- + + +def test_ac1_success_on_first_attempt_zero_side_effects() -> None: + # Arrange + inner = _ScriptedInner(reports=[_success(5)]) + (decorator, _logs, store, clk, fdr) = _build_decorator(inner=inner) + + # Act + report = decorator.upload_pending_tiles(_request(uuid4())) + + # Assert + assert report.outcome == UploadOutcome.SUCCESS + assert report.retry_count == 0 + assert clk.sleep_calls == [] + assert store.counter_per_tile == {} + assert store.transitions == [] + assert fdr.records == [] + assert len(inner.calls) == 1 + + +# ---------------------------------------------------------------------- +# AC-2 — partial → retry → success +# ---------------------------------------------------------------------- + + +def test_ac2_partial_then_success_increments_attempts_and_sleeps_once() -> None: + # Arrange + rejected = ("a", "b", "c") + inner = _ScriptedInner( + reports=[ + _partial(rejected_ids=rejected), + _success(3), + ] + ) + (decorator, _logs, store, clk, _fdr) = _build_decorator(inner=inner) + + # Act + report = decorator.upload_pending_tiles(_request(uuid4())) + + # Assert + assert report.outcome == UploadOutcome.SUCCESS + assert report.retry_count == 1 + assert sorted(store.counter_per_tile.keys()) == sorted(rejected) + assert all(v == 1 for v in store.counter_per_tile.values()) + assert clk.sleep_calls == [2.0] + assert len(inner.calls) == 2 + + +# ---------------------------------------------------------------------- +# AC-3 — per-tile budget exhaustion → UPLOAD_GIVEUP +# ---------------------------------------------------------------------- + + +def test_ac3_per_tile_budget_exhausted_moves_to_giveup() -> None: + # Arrange — pre-load the counter so a single rejection trips the threshold. + cfg = C11RetryConfig( + max_in_call_retries=0, + max_per_tile_attempts=5, + backoff_base_s=2.0, + backoff_cap_s=60.0, + ) + inner = _ScriptedInner( + reports=[_partial(rejected_ids=("doomed_tile",), rejection_reason="invalid signature")] + ) + store = _FakeMetadataStore(counter_per_tile={"doomed_tile": 4}) + (decorator, log_records, store_out, _clk, fdr) = _build_decorator( + inner=inner, metadata_store=store, config=cfg + ) + + # Act + report = decorator.upload_pending_tiles(_request(uuid4())) + + # Assert — increment took 4 → 5; threshold reached; transition recorded. + assert store_out.counter_per_tile["doomed_tile"] == 5 + assert ("doomed_tile", "upload_giveup") in store_out.transitions + giveup_records = [r for r in fdr.records if r.kind == "c11.upload.giveup"] + assert len(giveup_records) == 1 + assert giveup_records[0].payload["attempts"] == 5 + assert giveup_records[0].payload["last_rejection_reason"] == "invalid signature" + error_logs = [ + r for r in log_records if r.levelno == logging.ERROR and getattr(r, "kind", "") == "c11.retry.tile.giveup" + ] + assert len(error_logs) == 1 + # Outcome stays PARTIAL because no retries were attempted. + assert report.outcome == UploadOutcome.PARTIAL + + +# ---------------------------------------------------------------------- +# AC-4 — in-call retry budget exhausted with persistent partial +# ---------------------------------------------------------------------- + + +def test_ac4_in_call_budget_exhausted_yields_partial_with_hint() -> None: + # Arrange — every call returns the same single rejected tile. + cfg = C11RetryConfig( + max_in_call_retries=3, + max_per_tile_attempts=99, + backoff_base_s=2.0, + backoff_cap_s=60.0, + ) + inner = _ScriptedInner( + reports=[_partial(rejected_ids=("t0",)) for _ in range(4)] + ) + clk = _FixedClock(now_ns=1_000_000_000_000) + (decorator, _logs, store, clk_out, _fdr) = _build_decorator( + inner=inner, clock=clk, config=cfg + ) + + # Act + report = decorator.upload_pending_tiles(_request(uuid4())) + + # Assert + assert len(inner.calls) == 4 + assert clk_out.sleep_calls == [2.0, 4.0, 8.0] + assert report.outcome == UploadOutcome.PARTIAL + assert report.retry_count == 3 + # _FixedClock advances on each sleep_until_ns; final time_ns + # reflects the sum of the backoffs: 1000s + (2+4+8)s = 1014s. + # next_retry_at_s = floor(time_ns/1e9) + backoff_cap_s = 1014 + 60. + assert report.next_retry_at_s == 1014 + 60 + # Per-tile counter incremented once per call (4 increments). + assert store.counter_per_tile["t0"] == 4 + + +# ---------------------------------------------------------------------- +# AC-5 — exponential backoff cap +# ---------------------------------------------------------------------- + + +def test_ac5_backoff_cap_honoured_at_high_attempt_number() -> None: + # Arrange + cfg = C11RetryConfig( + max_in_call_retries=10, + max_per_tile_attempts=99, + backoff_base_s=2.0, + backoff_cap_s=10.0, + ) + inner = _ScriptedInner( + reports=[_partial(rejected_ids=("t0",)) for _ in range(11)] + ) + (decorator, _logs, _store, clk, _fdr) = _build_decorator( + inner=inner, config=cfg + ) + + # Act + report = decorator.upload_pending_tiles(_request(uuid4())) + + # Assert — first 4 retries: 2, 4, 8, 16->capped 10; remaining 6: all 10 + expected = [2.0, 4.0, 8.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0] + assert clk.sleep_calls == expected + assert report.retry_count == 10 + + +# ---------------------------------------------------------------------- +# AC-11 — pass-through methods +# ---------------------------------------------------------------------- + + +def test_ac11_enumerate_pending_passes_through() -> None: + # Arrange + inner = _ScriptedInner(reports=[_success(0)]) + (decorator, _logs, _store, _clk, _fdr) = _build_decorator(inner=inner) + + # Act + fid = uuid4() + out = decorator.enumerate_pending_tiles(fid) + + # Assert + assert inner.enumerate_calls == [fid] + assert out == [{"sentinel": True, "flight_id": fid}] + + +def test_ac11_confirm_flight_state_passes_through() -> None: + # Arrange + inner = _ScriptedInner(reports=[_success(0)]) + (decorator, _logs, _store, _clk, _fdr) = _build_decorator(inner=inner) + + # Act + state = decorator.confirm_flight_state() + + # Assert + assert state == FlightStateSignal.ON_GROUND + assert inner.confirm_calls == 1 + + +# ---------------------------------------------------------------------- +# AC-12 — inner exception propagates without retry +# ---------------------------------------------------------------------- + + +def test_ac12_flight_state_not_on_ground_propagates_without_retry() -> None: + # Arrange + from datetime import datetime, timezone + + err = FlightStateNotOnGroundError(FlightStateSignal.IN_FLIGHT, datetime.now(timezone.utc)) + inner = _ScriptedInner(raise_on_call=[err]) + (decorator, _logs, _store, clk, _fdr) = _build_decorator(inner=inner) + + # Act / Assert + with pytest.raises(FlightStateNotOnGroundError): + decorator.upload_pending_tiles(_request()) + assert clk.sleep_calls == [] + assert len(inner.calls) == 1 + + +def test_ac12_satellite_provider_error_propagates_without_retry() -> None: + # Arrange + inner = _ScriptedInner(raise_on_call=[SatelliteProviderError("boom")]) + (decorator, _logs, _store, clk, _fdr) = _build_decorator(inner=inner) + + # Act / Assert + with pytest.raises(SatelliteProviderError): + decorator.upload_pending_tiles(_request()) + assert clk.sleep_calls == [] + assert len(inner.calls) == 1 + + +# ---------------------------------------------------------------------- +# Outcome=FAILURE on first call returns as-is +# ---------------------------------------------------------------------- + + +def test_outcome_failure_passthrough() -> None: + # Arrange + failure_report = UploadBatchReport( + batch_uuid=uuid4(), + per_tile_status=(), + retry_count=0, + next_retry_at_s=None, + outcome=UploadOutcome.FAILURE, + public_key_fingerprint="dead", + ) + inner = _ScriptedInner(reports=[failure_report]) + (decorator, _logs, _store, clk, _fdr) = _build_decorator(inner=inner) + + # Act + report = decorator.upload_pending_tiles(_request()) + + # Assert + assert report.outcome == UploadOutcome.FAILURE + assert clk.sleep_calls == [] + + +# ---------------------------------------------------------------------- +# AC-10 — composition-root bypass via ``disable_retry_decorator`` +# ---------------------------------------------------------------------- + + +def _build_factory_config(*, disable_retry: bool) -> Any: + from gps_denied_onboard.components.c11_tile_manager import C11Config + from gps_denied_onboard.config.schema import Config + + block = C11Config( + satellite_provider_ingest_url="https://parent-suite.test", + upload_batch_size=10, + upload_http_timeout_s=5.0, + upload_max_retry_after_s=600, + companion_id="bypass_test", + disable_retry_decorator=disable_retry, + ) + return Config(components={"c11_tile_manager": block}) + + +def test_ac10_factory_returns_decorated_uploader_by_default() -> None: + # Arrange + import httpx as _httpx + from gps_denied_onboard.runtime_root.c11_factory import build_tile_uploader + + config = _build_factory_config(disable_retry=False) + transport = _httpx.MockTransport(lambda r: _httpx.Response(202)) + + # Act + uploader = build_tile_uploader( + config, + http_client=_httpx.Client(transport=transport), + tile_store=object(), + tile_metadata_store=object(), + flight_state_gate=object(), # type: ignore[arg-type] + key_manager=object(), # type: ignore[arg-type] + ) + + # Assert + assert isinstance(uploader, IdempotentRetryTileUploader) + + +def test_ac10_factory_bypasses_decorator_when_flag_set() -> None: + # Arrange + import httpx as _httpx + from gps_denied_onboard.components.c11_tile_manager import HttpTileUploader + from gps_denied_onboard.runtime_root.c11_factory import build_tile_uploader + + config = _build_factory_config(disable_retry=True) + transport = _httpx.MockTransport(lambda r: _httpx.Response(202)) + + # Act + uploader = build_tile_uploader( + config, + http_client=_httpx.Client(transport=transport), + tile_store=object(), + tile_metadata_store=object(), + flight_state_gate=object(), # type: ignore[arg-type] + key_manager=object(), # type: ignore[arg-type] + ) + + # Assert + assert isinstance(uploader, HttpTileUploader) + assert not isinstance(uploader, IdempotentRetryTileUploader) + + +# ---------------------------------------------------------------------- +# NFR — overhead microbench (no retries → ~zero added latency) +# ---------------------------------------------------------------------- + + +def test_nfr_overhead_under_5ms_on_success_first_attempt() -> None: + # Arrange + inner = _ScriptedInner(reports=[_success(50)]) + (decorator, _logs, _store, _clk, _fdr) = _build_decorator(inner=inner) + request = _request(uuid4()) + decorator.upload_pending_tiles(request) + inner.reports = [_success(50)] + inner.calls.clear() + + # Act + import time as _time + + t0 = _time.perf_counter() + decorator.upload_pending_tiles(request) + elapsed_ms = (_time.perf_counter() - t0) * 1000.0 + + # Assert — generous bound; the goal is to catch O(n^2) regressions + # in the per-call bookkeeping, not to certify wall-clock budget. + assert elapsed_ms < 50.0 diff --git a/tests/unit/c11_tile_manager/test_protocol_conformance.py b/tests/unit/c11_tile_manager/test_protocol_conformance.py index 02411e6..84bd2c6 100644 --- a/tests/unit/c11_tile_manager/test_protocol_conformance.py +++ b/tests/unit/c11_tile_manager/test_protocol_conformance.py @@ -11,10 +11,13 @@ import logging import httpx +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c11_tile_manager import ( C11Config, + C11RetryConfig, HttpTileDownloader, HttpTileUploader, + IdempotentRetryTileUploader, ) from gps_denied_onboard.components.c11_tile_manager.interface import ( TileDownloader, @@ -108,3 +111,39 @@ def test_ac10_concrete_downloader_satisfies_protocol() -> None: def test_ac10_partial_downloader_is_not_protocol_conformant() -> None: # Assert assert not isinstance(_PartialDownloaderMissingEnumerate(), TileDownloader) + + +def test_ac9_idempotent_retry_decorator_satisfies_uploader_protocol() -> None: + # Arrange — wrap a Protocol-conformant inner uploader; the decorator + # must itself satisfy ``TileUploader`` so the composition root can + # bind it transparently in place of ``HttpTileUploader``. + cfg = C11Config( + satellite_provider_ingest_url="https://parent-suite.test", + upload_batch_size=10, + upload_http_timeout_s=5.0, + upload_max_retry_after_s=600, + companion_id="conformance", + ) + transport = httpx.MockTransport(lambda r: httpx.Response(202)) + inner = HttpTileUploader( + http_client=httpx.Client(transport=transport), + tile_store=object(), # type: ignore[arg-type] + tile_metadata_store=object(), # type: ignore[arg-type] + flight_state_gate=object(), # type: ignore[arg-type] + key_manager=object(), # type: ignore[arg-type] + fdr_client=FakeFdrSink(_PRODUCER_ID), # type: ignore[arg-type] + logger=logging.getLogger("test_az320_inner"), + config=cfg, + sleep=_NullSleep(), + ) + decorator = IdempotentRetryTileUploader( + inner=inner, + tile_metadata_store=object(), # type: ignore[arg-type] + fdr_client=FakeFdrSink("c11_tile_manager.idempotent_retry"), # type: ignore[arg-type] + logger=logging.getLogger("test_az320_decorator"), + clock=WallClock(), + config=C11RetryConfig(), + ) + + # Assert + assert isinstance(decorator, TileUploader) diff --git a/tests/unit/c6_tile_cache/test_protocol_conformance.py b/tests/unit/c6_tile_cache/test_protocol_conformance.py index 07fc97f..84577b0 100644 --- a/tests/unit/c6_tile_cache/test_protocol_conformance.py +++ b/tests/unit/c6_tile_cache/test_protocol_conformance.py @@ -144,6 +144,9 @@ class _FullTileMetadataStore: def get_by_id(self, tile_id): raise NotImplementedError + def increment_upload_attempts(self, tile_id): + raise NotImplementedError + class _PartialTileMetadataStore: def query_by_bbox(self, bbox, zoom, *, voting_filter=None, source_filter=None): @@ -559,10 +562,19 @@ def test_ac9_contract_methods_match_protocol(contract_filename: str, proto: type def test_ac10_voting_status_has_documented_states_only() -> None: - assert {v.value for v in VotingStatus} == {"pending", "trusted", "rejected"} + # AC-6 (AZ-320): ``upload_giveup`` is the new terminal state set by + # the C11 retry decorator after a tile exhausts its per-tile retry + # budget. Forward-only — see tile_metadata_store.md v1.3.0 I-8. + assert {v.value for v in VotingStatus} == { + "pending", + "trusted", + "rejected", + "upload_giveup", + } assert VotingStatus.PENDING.value == "pending" assert VotingStatus.TRUSTED.value == "trusted" assert VotingStatus.REJECTED.value == "rejected" + assert VotingStatus.UPLOAD_GIVEUP.value == "upload_giveup" # ---------------------------------------------------------------------- diff --git a/tests/unit/test_ac5_alembic.py b/tests/unit/test_ac5_alembic.py index 6059631..6a321de 100644 --- a/tests/unit/test_ac5_alembic.py +++ b/tests/unit/test_ac5_alembic.py @@ -25,8 +25,11 @@ def test_head_revision_matches_latest_migration() -> None: AZ-263 originally pinned this to ``0001_initial``; AZ-304 advanced the head to ``0002_c6_tile_identity_and_lru`` (additive on AZ-263 — see - ``_docs/02_tasks/todo/AZ-304_c6_postgres_schema.md``). Future migrations - update this assertion in lockstep with the new head. + ``_docs/02_tasks/todo/AZ-304_c6_postgres_schema.md``). AZ-320 added + ``0003_c11_upload_attempts`` (additive — adds the + ``tiles.upload_attempts`` counter and widens the + ``ck_tiles_voting_status`` constraint to admit ``upload_giveup``). + Future migrations update this assertion in lockstep with the new head. """ # Arrange cwd = os.getcwd() @@ -40,7 +43,7 @@ def test_head_revision_matches_latest_migration() -> None: os.chdir(cwd) # Assert - assert list(heads) == ["0002_c6_tile_identity_and_lru"], f"unexpected heads: {heads}" + assert list(heads) == ["0003_c11_upload_attempts"], f"unexpected heads: {heads}" @pytest.mark.parametrize( diff --git a/tests/unit/test_az272_fdr_record_schema.py b/tests/unit/test_az272_fdr_record_schema.py index 69749bd..f4c1f56 100644 --- a/tests/unit/test_az272_fdr_record_schema.py +++ b/tests/unit/test_az272_fdr_record_schema.py @@ -248,6 +248,13 @@ def _kind_payload(kind: str) -> dict[str, object]: "retry_count": 0, "observed_at_iso": "2025-01-15T08:10:00.000000+00:00", } + if kind == "c11.upload.giveup": + return { + "flight_id": "00000000-0000-0000-0000-000000000020", + "tile_id": "z18_45.000000_-122.000000", + "attempts": 5, + "last_rejection_reason": "invalid signature", + } raise AssertionError(f"unhandled kind in fixture: {kind!r}")