Files
Oleksandr Bezdieniezhnykh 5fe67023b2 [AZ-329] [AZ-330] [AZ-523] [AZ-524] Batch 44 atomic refactor
Implements two new C12 services and rebalances the C11/C12 boundary
in one atomic commit:

* AZ-329 PostLandingUploadOrchestrator — gates C11 upload on the
  `flight_footer` FDR record's `clean_shutdown` field; 4 refusal
  modes; new FdrFooterReader Protocol + LocalFdrFooterReader.
* AZ-330 OperatorReLocService — AC-3.4 visual-loss re-localization
  hint; reuses shared LatLonAlt; OperatorCommandTransport Protocol
  cut (E-C8 owns the future pymavlink concrete); new FDR record
  kind `c12.reloc.requested`; log redaction (lat/lon 5 decimals,
  reason 200 chars).
* AZ-523 C11 internal flight-state gate removed (SRP refactor):
  `confirm_flight_state` / `FlightStateSignal` use /
  `FlightStateNotOnGroundError` deleted from C11; TileUploader
  contract bumped to v2.0.0 (frozen) with migration note; AZ-317
  superseded.
* AZ-524 Package rename `c12_operator_tooling` →
  `c12_operator_orchestrator` across source, tests, pyproject,
  CMake, Dockerfile, compose, CI, runtime-root services class
  (`OperatorOrchestratorServices`) + factory function
  (`build_operator_orchestrator`), logger namespaces, config slug,
  docs, and the E-C12 epic title.

Tests: 1543 passed, 80 skipped (all environment gates). Targeted
AC suite (AZ-329 + AZ-330 + FdrFooterReader): 37 passed. Cold-start
NFR-perf still ≤ 500 ms p99.

Tracker: AZ-317 → Done (superseded); AZ-319 v2.0.0 contract bump
comment; AZ-329/AZ-330 → In Testing; AZ-253 epic renamed; AZ-523
+ AZ-524 created and closed as audit-trail tickets.

See `_docs/03_implementation/batch_44_cycle1_report.md`.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 19:42:46 +03:00

890 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AZ-319 ``HttpTileUploader`` unit tests.
Covers AC-1, AC-3 .. AC-14 and the upload-throughput NFR from
``_docs/02_tasks/done/AZ-319_c11_tile_uploader.md``. AC-2 (the legacy
ON_GROUND gate) was removed in batch 44 — gating is now C12's
``PostLandingUploadOrchestrator`` responsibility.
Uses :class:`httpx.MockTransport` for deterministic HTTP responses,
:class:`FakeFdrSink` for FDR capture, a list-backed ``logging.Handler``
for log capture, and stub C6 stores / key manager so this suite never
drags in AZ-303 / AZ-305 / AZ-318 internals.
"""
from __future__ import annotations
import json
import logging
import random
from collections.abc import Iterable
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from uuid import UUID, uuid4
import httpx
import pytest
from gps_denied_onboard.components.c11_tile_manager import (
C11Config,
HttpTileUploader,
IngestStatus,
PerFlightKeyManager,
PublicKeyFingerprint,
RateLimitedError,
SatelliteProviderError,
UploadOutcome,
UploadRequest,
canonical_payload_bytes,
)
from gps_denied_onboard.fdr_client import FdrRecord
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
# ----------------------------------------------------------------------
# Fakes / fixtures
# ----------------------------------------------------------------------
_PRODUCER_ID = "c11_tile_manager.tile_uploader"
_INGEST_PATH = "/api/satellite/tiles/ingest"
_BASE_URL = "https://parent-suite.test"
_INGEST_URL = _BASE_URL + _INGEST_PATH
_COMPANION_ID = "test-companion-001"
@dataclass(frozen=True)
class _FakeTileId:
zoom_level: int
lat: float
lon: float
@dataclass(frozen=True)
class _FakeQuality:
estimator_label: str = "okvis2"
covariance_2x2: tuple[tuple[float, float], tuple[float, float]] = (
(0.5, 0.0),
(0.0, 0.5),
)
last_anchor_age_ms: int = 100
mre_px: float = 0.5
imu_bias_norm: float = 0.01
@dataclass
class _FakeTile:
tile_id: _FakeTileId
flight_id: str | None
capture_timestamp: datetime
tile_size_meters: float = 100.0
tile_size_pixels: int = 256
quality_metadata: _FakeQuality | None = None
@dataclass
class _FakePixelHandle:
payload: bytes
def __enter__(self) -> memoryview:
return memoryview(self.payload)
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
return None
class _FakeTileStore:
"""Test double for the structural ``_TileBytesReader`` cut."""
def __init__(self, blobs: dict[str, bytes] | None = None) -> None:
self._blobs = blobs or {}
self.read_calls: list[_FakeTileId] = []
def read_tile_pixels(self, tile_id: _FakeTileId) -> _FakePixelHandle:
self.read_calls.append(tile_id)
key = _tile_key(tile_id)
return _FakePixelHandle(self._blobs.get(key, b"\xff\xd8\xff\xe0fake-jpeg"))
class _FakeMetadataStore:
"""Test double for the structural ``_PendingMetadataReader`` cut."""
def __init__(self, pending: list[_FakeTile] | None = None) -> None:
self._pending = pending or []
self.pending_calls: int = 0
self.mark_calls: list[tuple[_FakeTileId, datetime]] = []
def pending_uploads(self) -> list[_FakeTile]:
self.pending_calls += 1
return list(self._pending)
def mark_uploaded(self, tile_id: _FakeTileId, uploaded_at: datetime) -> None:
self.mark_calls.append((tile_id, uploaded_at))
class _StubKeyManager:
"""Stand-in for AZ-318 ``PerFlightKeyManager``.
Mirrors the public surface ``HttpTileUploader`` actually uses:
``start_session`` / ``end_session`` / ``sign`` /
``record_signature_rejection`` plus ``is_active``. The ``signs``
counter lets tests assert the canonical bytes were signed once
per tile per attempt.
"""
def __init__(self, fingerprint_hex: str = "0123456789abcdef") -> None:
self._fingerprint_hex = fingerprint_hex
self._private_key: object | None = None
self._active_flight: UUID | None = None
self.start_calls: list[UUID] = []
self.end_calls: int = 0
self.signs: list[bytes] = []
self.signature_rejections: list[tuple[UUID, str]] = []
def start_session(self, flight_id: UUID) -> PublicKeyFingerprint:
self._private_key = object()
self._active_flight = flight_id
self.start_calls.append(flight_id)
return PublicKeyFingerprint(
flight_id=flight_id,
public_key_pem=b"-----BEGIN PUBLIC KEY-----\nFAKE\n-----END PUBLIC KEY-----\n",
fingerprint=self._fingerprint_hex,
generated_at=datetime(2025, 1, 15, 8, 0, tzinfo=timezone.utc),
)
def end_session(self) -> None:
if self._private_key is None:
return
self._private_key = None
self._active_flight = None
self.end_calls += 1
def sign(self, payload: bytes) -> bytes:
if self._private_key is None:
raise RuntimeError("sign called outside session")
self.signs.append(payload)
return b"sig-" + payload[:8]
def record_signature_rejection(self, flight_id: UUID, tile_id: str) -> None:
self.signature_rejections.append((flight_id, tile_id))
@property
def is_active(self) -> bool:
return self._private_key is not None
def _tile_key(tile_id: _FakeTileId) -> str:
return f"z{int(tile_id.zoom_level)}_{float(tile_id.lat):.6f}_{float(tile_id.lon):.6f}"
def _make_tile(
*,
zoom: int = 14,
lat: float = 45.0,
lon: float = -122.0,
flight_id: str | None = "00000000-0000-0000-0000-000000000020",
capture: datetime | None = None,
quality: _FakeQuality | None = None,
) -> _FakeTile:
return _FakeTile(
tile_id=_FakeTileId(zoom_level=zoom, lat=lat, lon=lon),
flight_id=flight_id,
capture_timestamp=capture
or datetime(2025, 1, 15, 8, 5, 0, tzinfo=timezone.utc),
quality_metadata=quality or _FakeQuality(),
)
def _build_uploader(
*,
transport: httpx.MockTransport,
pending: list[_FakeTile] | None = None,
blobs: dict[str, bytes] | None = None,
fingerprint_hex: str = "0123456789abcdef",
config: C11Config | None = None,
sleep_recorder: list[float] | None = None,
) -> tuple[
HttpTileUploader,
FakeFdrSink,
list[logging.LogRecord],
_StubKeyManager,
_FakeTileStore,
_FakeMetadataStore,
list[float],
]:
fdr = FakeFdrSink(_PRODUCER_ID)
log_records: list[logging.LogRecord] = []
class _Handler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
log_records.append(record)
logger = logging.getLogger(f"test_az319_{id(log_records)}")
logger.handlers.clear()
logger.addHandler(_Handler())
logger.setLevel(logging.DEBUG)
logger.propagate = False
key_manager = _StubKeyManager(fingerprint_hex=fingerprint_hex)
tile_store = _FakeTileStore(blobs=blobs)
metadata_store = _FakeMetadataStore(pending=pending)
sleeps = sleep_recorder if sleep_recorder is not None else []
def _sleep(seconds: float) -> None:
sleeps.append(seconds)
cfg = config or C11Config(
satellite_provider_ingest_url=_BASE_URL,
upload_batch_size=10,
upload_http_timeout_s=5.0,
upload_max_retry_after_s=600,
companion_id=_COMPANION_ID,
)
client = httpx.Client(transport=transport, base_url=_BASE_URL)
uploader = HttpTileUploader(
http_client=client,
tile_store=tile_store,
tile_metadata_store=metadata_store,
key_manager=key_manager, # type: ignore[arg-type]
fdr_client=fdr, # type: ignore[arg-type]
logger=logger,
config=cfg,
sleep=_sleep,
)
return uploader, fdr, log_records, key_manager, tile_store, metadata_store, sleeps
def _make_request(*, batch_size: int = 10, flight_id: UUID | None = None) -> UploadRequest:
return UploadRequest(
batch_size=batch_size,
satellite_provider_url=_BASE_URL,
flight_id=flight_id,
)
def _success_response(batch: list[_FakeTile], status: str = "queued") -> dict[str, Any]:
return {
"batch_uuid": str(uuid4()),
"per_tile_status": [
{"tile_id": _tile_key(t.tile_id), "status": status} for t in batch
],
}
def _kinds(records: Iterable[FdrRecord]) -> list[str]:
return [r.kind for r in records]
def _extract_posted_tile_ids(request: httpx.Request) -> list[str]:
"""Pull ``tile_id`` values out of the multipart ``tiles_metadata`` JSON.
``HttpTileUploader`` packs the per-tile metadata table as a single
JSON form field. Tests use this to echo back exactly the tile_ids
the uploader sent — without having to parse the full multipart
envelope.
"""
body = request.read()
boundary_value = request.headers["content-type"].split("boundary=")[-1].strip(' "')
boundary = b"--" + boundary_value.encode()
parts = body.split(boundary)
for part in parts:
if b'name="tiles_metadata"' not in part:
continue
sep = part.find(b"\r\n\r\n")
if sep < 0:
continue
payload = part[sep + 4 :].rstrip(b"-\r\n")
try:
decoded = json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return []
return [str(entry["tile_id"]) for entry in decoded]
return []
# ----------------------------------------------------------------------
# AC-1: 50-tile happy path
# ----------------------------------------------------------------------
def test_ac1_50_tile_happy_path_marks_all_uploaded() -> None:
# Arrange
pending = [
_make_tile(zoom=14, lat=45.0 + i * 0.001, lon=-122.0 + i * 0.001)
for i in range(50)
]
posted_batches: list[list[str]] = []
def _handler(request: httpx.Request) -> httpx.Response:
tile_ids = _extract_posted_tile_ids(request)
posted_batches.append(tile_ids)
body = {
"batch_uuid": str(uuid4()),
"per_tile_status": [
{"tile_id": tid, "status": "queued"} for tid in tile_ids
],
}
return httpx.Response(202, json=body)
transport = httpx.MockTransport(_handler)
(
uploader,
fdr,
_logs,
key_manager,
_tile_store,
metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act
report = uploader.upload_pending_tiles(_make_request(batch_size=10))
# Assert
assert report.outcome == UploadOutcome.SUCCESS
assert len(report.per_tile_status) == 50
assert len(metadata_store.mark_calls) == 50
assert "c11.upload.batch.complete" in _kinds(fdr.records)
batch_complete = [r for r in fdr.records if r.kind == "c11.upload.batch.complete"]
assert len(batch_complete) == 1
assert batch_complete[0].payload["total_attempted"] == 50
assert batch_complete[0].payload["total_queued"] == 50
assert batch_complete[0].payload["total_rejected"] == 0
assert batch_complete[0].payload["outcome"] == "success"
assert key_manager.end_calls == 1
# ----------------------------------------------------------------------
# AC-3: signature rejection — record + skip mark_uploaded; outcome=partial
# ----------------------------------------------------------------------
def test_ac3_signature_rejection_records_and_keeps_pending() -> None:
# Arrange
pending = [_make_tile(lon=-122.0 - i * 0.001) for i in range(5)]
def _handler(request: httpx.Request) -> httpx.Response:
per_tile = []
for i, tile in enumerate(pending):
tid = _tile_key(tile.tile_id)
if i == 0:
per_tile.append(
{
"tile_id": tid,
"status": "rejected",
"rejection_reason": "invalid signature",
}
)
else:
per_tile.append({"tile_id": tid, "status": "queued"})
return httpx.Response(202, json={"batch_uuid": str(uuid4()), "per_tile_status": per_tile})
transport = httpx.MockTransport(_handler)
(
uploader,
fdr,
_logs,
key_manager,
_tile_store,
metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act
report = uploader.upload_pending_tiles(_make_request(batch_size=10))
# Assert
assert report.outcome == UploadOutcome.PARTIAL
assert len(key_manager.signature_rejections) == 1
assert key_manager.signature_rejections[0][1] == _tile_key(pending[0].tile_id)
rejected_marked = [
m for m in metadata_store.mark_calls if m[0] == pending[0].tile_id
]
assert rejected_marked == []
assert "c11.upload.tile.rejected" in _kinds(fdr.records)
rejected_fdr = [r for r in fdr.records if r.kind == "c11.upload.tile.rejected"]
assert rejected_fdr[0].payload["rejection_reason"] == "invalid signature"
# ----------------------------------------------------------------------
# AC-4: duplicate / superseded treated as success
# ----------------------------------------------------------------------
def test_ac4_duplicate_and_superseded_are_success() -> None:
# Arrange
pending = [_make_tile(lon=-122.0 - i * 0.001) for i in range(8)]
def _handler(request: httpx.Request) -> httpx.Response:
per_tile = []
for i, tile in enumerate(pending):
tid = _tile_key(tile.tile_id)
status = "duplicate" if i < 5 else "superseded"
per_tile.append({"tile_id": tid, "status": status})
return httpx.Response(
202, json={"batch_uuid": str(uuid4()), "per_tile_status": per_tile}
)
transport = httpx.MockTransport(_handler)
(
uploader,
_fdr,
_logs,
_key_manager,
_tile_store,
metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act
report = uploader.upload_pending_tiles(_make_request(batch_size=10))
# Assert
assert report.outcome == UploadOutcome.SUCCESS
assert len(metadata_store.mark_calls) == 8
statuses = {s.status for s in report.per_tile_status}
assert statuses == {IngestStatus.DUPLICATE, IngestStatus.SUPERSEDED}
# ----------------------------------------------------------------------
# AC-5: signing key zeroised on success
# ----------------------------------------------------------------------
def test_ac5_signing_key_zeroised_on_success() -> None:
# Arrange
pending = [_make_tile()]
transport = httpx.MockTransport(
lambda r: httpx.Response(202, json=_success_response(pending))
)
(
uploader,
_fdr,
_logs,
key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act
uploader.upload_pending_tiles(_make_request())
# Assert
assert key_manager.end_calls == 1
assert key_manager.is_active is False
# ----------------------------------------------------------------------
# AC-6: zeroisation on failure (transport error after exhausted retries)
# ----------------------------------------------------------------------
def test_ac6_signing_key_zeroised_on_failure() -> None:
# Arrange
pending = [_make_tile()]
attempts = [0]
def _handler(request: httpx.Request) -> httpx.Response:
attempts[0] += 1
raise httpx.ConnectError("simulated network down")
transport = httpx.MockTransport(_handler)
(
uploader,
_fdr,
_logs,
key_manager,
_tile_store,
metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act / Assert
with pytest.raises(SatelliteProviderError):
uploader.upload_pending_tiles(_make_request())
assert key_manager.end_calls == 1
assert key_manager.is_active is False
assert metadata_store.mark_calls == []
# ----------------------------------------------------------------------
# AC-7: public-key FDR record precedes any tile FDR
# ----------------------------------------------------------------------
def test_ac7_public_key_fdr_precedes_tile_fdr() -> None:
# Arrange — FakeFdrSink only captures records the uploader enqueues
# itself; the AZ-318 manager's start_session FDR is emitted via the
# SAME ``_fdr`` sink in production wiring. Here we wire a single
# FakeFdrSink as both producers and pre-seed the start_session
# event so AC-7 ordering is exercised end-to-end.
pending = [_make_tile()]
transport = httpx.MockTransport(
lambda r: httpx.Response(202, json=_success_response(pending))
)
(
uploader,
fdr,
_logs,
_key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
fdr.enqueue(
FdrRecord(
schema_version=1,
ts="2025-01-15T08:00:00.000000Z",
producer_id=_PRODUCER_ID,
kind="c11.upload.session.key.public",
payload={
"flight_id": "00000000-0000-0000-0000-000000000020",
"public_key_pem": "FAKE",
"fingerprint": "0123456789abcdef",
"generated_at_iso": "2025-01-15T08:00:00.000000+00:00",
},
)
)
# Act
uploader.upload_pending_tiles(_make_request())
# Assert
kinds = _kinds(fdr.records)
key_idx = kinds.index("c11.upload.session.key.public")
tile_kinds = [k for k in kinds if k.startswith("c11.upload.tile.")]
assert tile_kinds, "expected at least one tile FDR record"
first_tile_idx = next(i for i, k in enumerate(kinds) if k.startswith("c11.upload.tile."))
assert key_idx < first_tile_idx
# ----------------------------------------------------------------------
# AC-8: 429 honours Retry-After
# ----------------------------------------------------------------------
def test_ac8_429_honours_retry_after_seconds() -> None:
# Arrange
pending = [_make_tile()]
state = {"attempt": 0}
def _handler(request: httpx.Request) -> httpx.Response:
state["attempt"] += 1
if state["attempt"] == 1:
return httpx.Response(429, headers={"Retry-After": "60"})
return httpx.Response(202, json=_success_response(pending))
transport = httpx.MockTransport(_handler)
sleeps: list[float] = []
(
uploader,
_fdr,
_logs,
_key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending, sleep_recorder=sleeps)
# Act
report = uploader.upload_pending_tiles(_make_request())
# Assert
assert state["attempt"] == 2
assert sleeps and sleeps[0] >= 60
assert report.retry_count >= 1
assert report.outcome == UploadOutcome.SUCCESS
# ----------------------------------------------------------------------
# AC-9: persistent 5xx aborts with structured error
# ----------------------------------------------------------------------
def test_ac9_persistent_5xx_raises_satellite_provider_error() -> None:
# Arrange
pending = [_make_tile()]
attempts = [0]
def _handler(request: httpx.Request) -> httpx.Response:
attempts[0] += 1
return httpx.Response(503)
transport = httpx.MockTransport(_handler)
(
uploader,
_fdr,
_logs,
key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act / Assert
with pytest.raises(SatelliteProviderError):
uploader.upload_pending_tiles(_make_request())
assert attempts[0] >= 4
assert key_manager.end_calls == 1
# ----------------------------------------------------------------------
# AC-10: TLS / 401 / 403 fail fast
# ----------------------------------------------------------------------
def test_ac10_401_fails_fast_no_retry() -> None:
# Arrange
pending = [_make_tile()]
attempts = [0]
def _handler(request: httpx.Request) -> httpx.Response:
attempts[0] += 1
return httpx.Response(401)
transport = httpx.MockTransport(_handler)
(
uploader,
_fdr,
log_records,
_key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act / Assert
with pytest.raises(SatelliteProviderError):
uploader.upload_pending_tiles(_make_request())
assert attempts[0] == 1
full_log = " ".join(r.getMessage() + json.dumps(getattr(r, "kv", {})) for r in log_records)
assert "BEGIN PUBLIC KEY" not in full_log
assert "Authorization" not in full_log
# ----------------------------------------------------------------------
# AC-11: empty pending → success, zero POSTs, session still cycled
# ----------------------------------------------------------------------
def test_ac11_empty_pending_set_is_success_no_posts() -> None:
# Arrange
posted: list[httpx.Request] = []
def _handler(request: httpx.Request) -> httpx.Response:
posted.append(request)
return httpx.Response(202, json={"batch_uuid": str(uuid4()), "per_tile_status": []})
transport = httpx.MockTransport(_handler)
(
uploader,
fdr,
_logs,
key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=[])
# Act
report = uploader.upload_pending_tiles(_make_request())
# Assert
assert report.outcome == UploadOutcome.SUCCESS
assert report.per_tile_status == ()
assert posted == []
assert key_manager.start_calls and key_manager.end_calls == 1
batch_complete = [r for r in fdr.records if r.kind == "c11.upload.batch.complete"]
assert len(batch_complete) == 1
assert batch_complete[0].payload["total_attempted"] == 0
# ----------------------------------------------------------------------
# AC-13: deterministic canonical signing bytes
# ----------------------------------------------------------------------
def test_ac13_canonical_payload_bytes_deterministic_for_same_input() -> None:
# Arrange
rnd = random.Random(0xC11)
request = _make_request()
samples = []
for _ in range(20):
tile = _make_tile(
zoom=rnd.randint(10, 18),
lat=rnd.uniform(-89, 89),
lon=rnd.uniform(-179, 179),
capture=datetime(
rnd.randint(2024, 2026),
rnd.randint(1, 12),
rnd.randint(1, 28),
tzinfo=timezone.utc,
),
quality=_FakeQuality(
covariance_2x2=(
(rnd.uniform(0, 1), rnd.uniform(0, 0.1)),
(rnd.uniform(0, 0.1), rnd.uniform(0, 1)),
),
last_anchor_age_ms=rnd.randint(0, 5000),
mre_px=rnd.uniform(0, 5),
imu_bias_norm=rnd.uniform(0, 1),
),
)
blob = bytes(rnd.randrange(256) for _ in range(64))
samples.append((blob, tile))
# Act
digests_first = [
canonical_payload_bytes(blob, tile, request, _COMPANION_ID)
for blob, tile in samples
]
digests_second = [
canonical_payload_bytes(blob, tile, request, _COMPANION_ID)
for blob, tile in samples
]
# Assert
assert digests_first == digests_second
assert all(len(d) == 32 for d in digests_first)
assert len(set(digests_first)) == len(digests_first)
# ----------------------------------------------------------------------
# AC-14: partial-success batch returns without raising
# ----------------------------------------------------------------------
def test_ac14_partial_success_batch_does_not_raise() -> None:
# Arrange
pending = [_make_tile(lon=-122.0 - i * 0.001) for i in range(10)]
def _handler(request: httpx.Request) -> httpx.Response:
per_tile = []
for i, tile in enumerate(pending):
tid = _tile_key(tile.tile_id)
if i < 7:
per_tile.append({"tile_id": tid, "status": "queued"})
else:
per_tile.append(
{
"tile_id": tid,
"status": "rejected",
"rejection_reason": "low quality",
}
)
return httpx.Response(
202, json={"batch_uuid": str(uuid4()), "per_tile_status": per_tile}
)
transport = httpx.MockTransport(_handler)
(
uploader,
_fdr,
_logs,
_key_manager,
_tile_store,
metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending)
# Act
report = uploader.upload_pending_tiles(_make_request(batch_size=10))
# Assert
assert report.outcome == UploadOutcome.PARTIAL
assert len(report.per_tile_status) == 10
assert sum(1 for s in report.per_tile_status if s.status == IngestStatus.QUEUED) == 7
assert sum(1 for s in report.per_tile_status if s.status == IngestStatus.REJECTED) == 3
assert len(metadata_store.mark_calls) == 7
# ----------------------------------------------------------------------
# Rate-limit budget exhaustion (Risk 3 / RateLimitedError)
# ----------------------------------------------------------------------
def test_429_budget_exhaustion_raises_rate_limited_error() -> None:
# Arrange
pending = [_make_tile()]
def _handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(429, headers={"Retry-After": "300"})
transport = httpx.MockTransport(_handler)
cfg = C11Config(
satellite_provider_ingest_url=_BASE_URL,
upload_batch_size=10,
upload_http_timeout_s=5.0,
upload_max_retry_after_s=400,
companion_id=_COMPANION_ID,
)
(
uploader,
_fdr,
_logs,
key_manager,
_tile_store,
_metadata_store,
_sleeps,
) = _build_uploader(transport=transport, pending=pending, config=cfg)
# Act / Assert
with pytest.raises(RateLimitedError):
uploader.upload_pending_tiles(_make_request())
assert key_manager.end_calls == 1
# ----------------------------------------------------------------------
# NFR — throughput on a 1000-tile happy path
# ----------------------------------------------------------------------
def test_nfr_throughput_1000_tiles_under_budget() -> None:
# Arrange — 50 tiles × 20 batches = 1000; an in-process MockTransport
# so this measures uploader bookkeeping, NOT real network. Budget is
# generous because the goal is to catch O(n^2) regressions, not to
# certify wall-clock throughput on the dev host.
pending = [
_make_tile(zoom=14, lat=45.0 + i * 0.0001, lon=-122.0 + i * 0.0001)
for i in range(1000)
]
def _handler(request: httpx.Request) -> httpx.Response:
tile_ids = _extract_posted_tile_ids(request)
return httpx.Response(
202,
json={
"batch_uuid": str(uuid4()),
"per_tile_status": [
{"tile_id": tid, "status": "queued"} for tid in tile_ids
],
},
)
transport = httpx.MockTransport(_handler)
(uploader, _fdr, _logs, _km, _ts, _ms, _sleeps) = _build_uploader(
transport=transport, pending=pending
)
import time as _time
t0 = _time.perf_counter()
report = uploader.upload_pending_tiles(_make_request(batch_size=50))
elapsed = _time.perf_counter() - t0
# Assert — 1000 tiles / 5s budget gives 200 tile/s of in-process
# uploader work; comfortably above the 20 tile/s NFR floor and
# generous enough to absorb dev-host noise.
assert report.outcome == UploadOutcome.SUCCESS
assert len(report.per_tile_status) == 1000
assert elapsed < 5.0, f"1000 tiles took {elapsed:.2f}s; > 5.0s budget"