"""AZ-320 ``IdempotentRetryTileUploader`` unit tests. Covers AC-1, AC-2, AC-3, AC-4, AC-5, AC-9 (conformance — see also test_protocol_conformance.py), AC-10 (composition-root bypass — see test_factory_bypass below), AC-11, AC-12 plus the NFR microbench. AC-6 (enum addition) is exercised in ``tests/unit/c6_tile_cache/test_protocol_conformance.py``; AC-7 (concurrent SQL) and AC-8 (migration) exercise real Postgres / Alembic and live in the Docker-gated ``tests/unit/c6_tile_cache`` suite (see test_postgres_filesystem_store.py / test_postgres_schema.py which are skipped without the docker-compose services). AC-13 (cross-call idempotence) is naturally exercised by the existing AZ-319 + this batch's combined tests because c6's ``pending_uploads`` already excludes acknowledged tiles; documented here as "no test needed beyond the pass-through behaviour". """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Any from uuid import UUID, uuid4 import pytest from gps_denied_onboard.components.c11_tile_manager import ( C11RetryConfig, IdempotentRetryTileUploader, IngestStatus, PerTileStatus, SatelliteProviderError, UploadBatchReport, UploadOutcome, UploadRequest, ) from gps_denied_onboard.fdr_client.fakes import FakeFdrSink # ---------------------------------------------------------------------- # Stubs / fakes # ---------------------------------------------------------------------- @dataclass class _FixedClock: """Manual ``Clock`` — captures ``sleep_until_ns`` calls as wall-seconds.""" now_ns: int = 0 sleep_calls: list[float] = field(default_factory=list) def monotonic_ns(self) -> int: return self.now_ns def time_ns(self) -> int: return self.now_ns def sleep_until_ns(self, target_ns: int) -> None: delta_ns = max(0, target_ns - self.now_ns) self.sleep_calls.append(delta_ns / 1_000_000_000) self.now_ns = target_ns class _ScriptedInner: """Inner ``TileUploader`` that returns scripted reports per call.""" def __init__( self, *, reports: list[UploadBatchReport] | None = None, raise_on_call: list[BaseException] | None = None, ) -> None: self.reports = list(reports or []) self.raises = list(raise_on_call or []) self.calls: list[UploadRequest] = [] self.enumerate_calls: list[Any] = [] def upload_pending_tiles(self, request: UploadRequest) -> UploadBatchReport: self.calls.append(request) idx = len(self.calls) - 1 if idx < len(self.raises) and self.raises[idx] is not None: raise self.raises[idx] if idx >= len(self.reports): raise AssertionError( f"_ScriptedInner exhausted: call #{idx + 1} but only " f"{len(self.reports)} reports scripted" ) return self.reports[idx] def enumerate_pending_tiles(self, flight_id: Any | None = None) -> list[Any]: self.enumerate_calls.append(flight_id) return [{"sentinel": True, "flight_id": flight_id}] @dataclass class _FakeMetadataStore: """Records ``increment_upload_attempts`` + ``update_voting_status`` calls.""" counter_per_tile: dict[str, int] = field(default_factory=dict) transitions: list[tuple[str, str]] = field(default_factory=list) raise_on_increment: BaseException | None = None def increment_upload_attempts(self, tile_id: Any) -> int: if self.raise_on_increment is not None: raise self.raise_on_increment key = str(tile_id) self.counter_per_tile[key] = self.counter_per_tile.get(key, 0) + 1 return self.counter_per_tile[key] def update_voting_status(self, tile_id: Any, status: Any) -> None: self.transitions.append((str(tile_id), str(status))) def _build_decorator( *, inner: _ScriptedInner, metadata_store: _FakeMetadataStore | None = None, clock: _FixedClock | None = None, config: C11RetryConfig | None = None, fdr: FakeFdrSink | None = None, ) -> tuple[ IdempotentRetryTileUploader, list[logging.LogRecord], _FakeMetadataStore, _FixedClock, FakeFdrSink, ]: log_records: list[logging.LogRecord] = [] class _Handler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_records.append(record) logger = logging.getLogger(f"test_az320_{id(log_records)}") logger.handlers.clear() logger.addHandler(_Handler()) logger.setLevel(logging.DEBUG) logger.propagate = False store = metadata_store or _FakeMetadataStore() clk = clock or _FixedClock() cfg = config or C11RetryConfig( max_in_call_retries=3, max_per_tile_attempts=5, backoff_base_s=2.0, backoff_cap_s=60.0, ) fdr_client = fdr or FakeFdrSink("c11_tile_manager.idempotent_retry") decorator = IdempotentRetryTileUploader( inner=inner, # type: ignore[arg-type] tile_metadata_store=store, # type: ignore[arg-type] fdr_client=fdr_client, # type: ignore[arg-type] logger=logger, clock=clk, config=cfg, ) return decorator, log_records, store, clk, fdr_client def _request(flight_id: UUID | None = None) -> UploadRequest: return UploadRequest( batch_size=10, satellite_provider_url="https://parent-suite.test", flight_id=flight_id, ) def _success(tile_count: int = 5, retry_count: int = 0) -> UploadBatchReport: return UploadBatchReport( batch_uuid=uuid4(), per_tile_status=tuple( PerTileStatus(tile_id=f"t{i}", status=IngestStatus.QUEUED) for i in range(tile_count) ), retry_count=retry_count, next_retry_at_s=None, outcome=UploadOutcome.SUCCESS, public_key_fingerprint="0123456789abcdef", ) def _partial( *, queued: int = 7, rejected_ids: tuple[str, ...] = ("t0", "t1", "t2"), rejection_reason: str = "duplicate", ) -> UploadBatchReport: per_tile = [ PerTileStatus(tile_id=f"q{i}", status=IngestStatus.QUEUED) for i in range(queued) ] + [ PerTileStatus( tile_id=tid, status=IngestStatus.REJECTED, rejection_reason=rejection_reason, ) for tid in rejected_ids ] return UploadBatchReport( batch_uuid=uuid4(), per_tile_status=tuple(per_tile), retry_count=0, next_retry_at_s=None, outcome=UploadOutcome.PARTIAL, public_key_fingerprint="0123456789abcdef", ) # ---------------------------------------------------------------------- # AC-1 — success on first attempt → no retry side effects # ---------------------------------------------------------------------- def test_ac1_success_on_first_attempt_zero_side_effects() -> None: # Arrange inner = _ScriptedInner(reports=[_success(5)]) (decorator, _logs, store, clk, fdr) = _build_decorator(inner=inner) # Act report = decorator.upload_pending_tiles(_request(uuid4())) # Assert assert report.outcome == UploadOutcome.SUCCESS assert report.retry_count == 0 assert clk.sleep_calls == [] assert store.counter_per_tile == {} assert store.transitions == [] assert fdr.records == [] assert len(inner.calls) == 1 # ---------------------------------------------------------------------- # AC-2 — partial → retry → success # ---------------------------------------------------------------------- def test_ac2_partial_then_success_increments_attempts_and_sleeps_once() -> None: # Arrange rejected = ("a", "b", "c") inner = _ScriptedInner( reports=[ _partial(rejected_ids=rejected), _success(3), ] ) (decorator, _logs, store, clk, _fdr) = _build_decorator(inner=inner) # Act report = decorator.upload_pending_tiles(_request(uuid4())) # Assert assert report.outcome == UploadOutcome.SUCCESS assert report.retry_count == 1 assert sorted(store.counter_per_tile.keys()) == sorted(rejected) assert all(v == 1 for v in store.counter_per_tile.values()) assert clk.sleep_calls == [2.0] assert len(inner.calls) == 2 # ---------------------------------------------------------------------- # AC-3 — per-tile budget exhaustion → UPLOAD_GIVEUP # ---------------------------------------------------------------------- def test_ac3_per_tile_budget_exhausted_moves_to_giveup() -> None: # Arrange — pre-load the counter so a single rejection trips the threshold. cfg = C11RetryConfig( max_in_call_retries=0, max_per_tile_attempts=5, backoff_base_s=2.0, backoff_cap_s=60.0, ) inner = _ScriptedInner( reports=[_partial(rejected_ids=("doomed_tile",), rejection_reason="invalid signature")] ) store = _FakeMetadataStore(counter_per_tile={"doomed_tile": 4}) (decorator, log_records, store_out, _clk, fdr) = _build_decorator( inner=inner, metadata_store=store, config=cfg ) # Act report = decorator.upload_pending_tiles(_request(uuid4())) # Assert — increment took 4 → 5; threshold reached; transition recorded. assert store_out.counter_per_tile["doomed_tile"] == 5 assert ("doomed_tile", "upload_giveup") in store_out.transitions giveup_records = [r for r in fdr.records if r.kind == "c11.upload.giveup"] assert len(giveup_records) == 1 assert giveup_records[0].payload["attempts"] == 5 assert giveup_records[0].payload["last_rejection_reason"] == "invalid signature" error_logs = [ r for r in log_records if r.levelno == logging.ERROR and getattr(r, "kind", "") == "c11.retry.tile.giveup" ] assert len(error_logs) == 1 # Outcome stays PARTIAL because no retries were attempted. assert report.outcome == UploadOutcome.PARTIAL # ---------------------------------------------------------------------- # AC-4 — in-call retry budget exhausted with persistent partial # ---------------------------------------------------------------------- def test_ac4_in_call_budget_exhausted_yields_partial_with_hint() -> None: # Arrange — every call returns the same single rejected tile. cfg = C11RetryConfig( max_in_call_retries=3, max_per_tile_attempts=99, backoff_base_s=2.0, backoff_cap_s=60.0, ) inner = _ScriptedInner( reports=[_partial(rejected_ids=("t0",)) for _ in range(4)] ) clk = _FixedClock(now_ns=1_000_000_000_000) (decorator, _logs, store, clk_out, _fdr) = _build_decorator( inner=inner, clock=clk, config=cfg ) # Act report = decorator.upload_pending_tiles(_request(uuid4())) # Assert assert len(inner.calls) == 4 assert clk_out.sleep_calls == [2.0, 4.0, 8.0] assert report.outcome == UploadOutcome.PARTIAL assert report.retry_count == 3 # _FixedClock advances on each sleep_until_ns; final time_ns # reflects the sum of the backoffs: 1000s + (2+4+8)s = 1014s. # next_retry_at_s = floor(time_ns/1e9) + backoff_cap_s = 1014 + 60. assert report.next_retry_at_s == 1014 + 60 # Per-tile counter incremented once per call (4 increments). assert store.counter_per_tile["t0"] == 4 # ---------------------------------------------------------------------- # AC-5 — exponential backoff cap # ---------------------------------------------------------------------- def test_ac5_backoff_cap_honoured_at_high_attempt_number() -> None: # Arrange cfg = C11RetryConfig( max_in_call_retries=10, max_per_tile_attempts=99, backoff_base_s=2.0, backoff_cap_s=10.0, ) inner = _ScriptedInner( reports=[_partial(rejected_ids=("t0",)) for _ in range(11)] ) (decorator, _logs, _store, clk, _fdr) = _build_decorator( inner=inner, config=cfg ) # Act report = decorator.upload_pending_tiles(_request(uuid4())) # Assert — first 4 retries: 2, 4, 8, 16->capped 10; remaining 6: all 10 expected = [2.0, 4.0, 8.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0] assert clk.sleep_calls == expected assert report.retry_count == 10 # ---------------------------------------------------------------------- # AC-11 — pass-through methods # ---------------------------------------------------------------------- def test_ac11_enumerate_pending_passes_through() -> None: # Arrange inner = _ScriptedInner(reports=[_success(0)]) (decorator, _logs, _store, _clk, _fdr) = _build_decorator(inner=inner) # Act fid = uuid4() out = decorator.enumerate_pending_tiles(fid) # Assert assert inner.enumerate_calls == [fid] assert out == [{"sentinel": True, "flight_id": fid}] # ---------------------------------------------------------------------- # AC-12 — inner exception propagates without retry # ---------------------------------------------------------------------- def test_ac12_satellite_provider_error_propagates_without_retry() -> None: # Arrange inner = _ScriptedInner(raise_on_call=[SatelliteProviderError("boom")]) (decorator, _logs, _store, clk, _fdr) = _build_decorator(inner=inner) # Act / Assert with pytest.raises(SatelliteProviderError): decorator.upload_pending_tiles(_request()) assert clk.sleep_calls == [] assert len(inner.calls) == 1 # ---------------------------------------------------------------------- # Outcome=FAILURE on first call returns as-is # ---------------------------------------------------------------------- def test_outcome_failure_passthrough() -> None: # Arrange failure_report = UploadBatchReport( batch_uuid=uuid4(), per_tile_status=(), retry_count=0, next_retry_at_s=None, outcome=UploadOutcome.FAILURE, public_key_fingerprint="dead", ) inner = _ScriptedInner(reports=[failure_report]) (decorator, _logs, _store, clk, _fdr) = _build_decorator(inner=inner) # Act report = decorator.upload_pending_tiles(_request()) # Assert assert report.outcome == UploadOutcome.FAILURE assert clk.sleep_calls == [] # ---------------------------------------------------------------------- # AC-10 — composition-root bypass via ``disable_retry_decorator`` # ---------------------------------------------------------------------- def _build_factory_config(*, disable_retry: bool) -> Any: from gps_denied_onboard.components.c11_tile_manager import C11Config from gps_denied_onboard.config.schema import Config block = C11Config( satellite_provider_ingest_url="https://parent-suite.test", upload_batch_size=10, upload_http_timeout_s=5.0, upload_max_retry_after_s=600, companion_id="bypass_test", disable_retry_decorator=disable_retry, ) return Config(components={"c11_tile_manager": block}) def test_ac10_factory_returns_decorated_uploader_by_default() -> None: # Arrange import httpx as _httpx from gps_denied_onboard.runtime_root.c11_factory import build_tile_uploader config = _build_factory_config(disable_retry=False) transport = _httpx.MockTransport(lambda r: _httpx.Response(202)) # Act uploader = build_tile_uploader( config, http_client=_httpx.Client(transport=transport), tile_store=object(), tile_metadata_store=object(), key_manager=object(), # type: ignore[arg-type] ) # Assert assert isinstance(uploader, IdempotentRetryTileUploader) def test_ac10_factory_bypasses_decorator_when_flag_set() -> None: # Arrange import httpx as _httpx from gps_denied_onboard.components.c11_tile_manager import HttpTileUploader from gps_denied_onboard.runtime_root.c11_factory import build_tile_uploader config = _build_factory_config(disable_retry=True) transport = _httpx.MockTransport(lambda r: _httpx.Response(202)) # Act uploader = build_tile_uploader( config, http_client=_httpx.Client(transport=transport), tile_store=object(), tile_metadata_store=object(), key_manager=object(), # type: ignore[arg-type] ) # Assert assert isinstance(uploader, HttpTileUploader) assert not isinstance(uploader, IdempotentRetryTileUploader) # ---------------------------------------------------------------------- # NFR — overhead microbench (no retries → ~zero added latency) # ---------------------------------------------------------------------- def test_nfr_overhead_under_5ms_on_success_first_attempt() -> None: # Arrange inner = _ScriptedInner(reports=[_success(50)]) (decorator, _logs, _store, _clk, _fdr) = _build_decorator(inner=inner) request = _request(uuid4()) decorator.upload_pending_tiles(request) inner.reports = [_success(50)] inner.calls.clear() # Act import time as _time t0 = _time.perf_counter() decorator.upload_pending_tiles(request) elapsed_ms = (_time.perf_counter() - t0) * 1000.0 # Assert — generous bound; the goal is to catch O(n^2) regressions # in the per-call bookkeeping, not to certify wall-clock budget. assert elapsed_ms < 50.0