Files
gps-denied-onboard/tests/fixtures/derkachi_c6/seed_region.py
T
Oleksandr Bezdieniezhnykh b15454b9a9 [AZ-777] Phase 1 hotfix (z/x/y) + Phase 2 Derkachi seed + ops
Phase 1 hotfix:
- C11 HttpTileDownloader adapted to satellite-provider v2.0.0
  z/x/y inventory contract (bulk POST keyed by slippy-map coords).
- Unit tests rewritten to exercise the new inventory schema.
- E2E smoke test updated to match the v2.0.0 wire.

Phase 2 (Derkachi seed + smoke-validated on Jetson):
- tests/fixtures/derkachi_c6/{README,bbox.yaml,seed_region.py}
  drives POST /api/satellite/region against satellite-provider
  with Google Maps as the imagery source. Smoke run produced
  4 regions, 175 tiles, inventory 32/32.
- scripts/mint_dev_jwt.py + run-tests-jetson.sh auto-mint and
  export SATELLITE_PROVIDER_API_KEY using JWT_SECRET / JWT_ISSUER
  / JWT_AUDIENCE env vars (no host port mappings; e2e-runner
  reaches SP via internal docker network only).

Spec amendment: AZ-777 todo spec updated to record the
Google Maps imagery source decision and STOP-gate state.

AZ-777 Phase 3+ work is superseded by Epic AZ-835 (see next
commit).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 17:39:21 +03:00

523 lines
19 KiB
Python

#!/usr/bin/env python3
"""Seed the Derkachi reference tile catalog via satellite-provider's Region API.
AZ-777 Phase 2 deliverable. Reads ``bbox.yaml`` next to this script and
submits one or more ``POST /api/satellite/request`` calls per zoom level
to register the Derkachi bbox with the parent-suite satellite-provider.
Polls each region's status until terminal, then verifies the expected
tile count via ``POST /api/satellite/tiles/inventory``.
This script is intended to run from the gps-denied-onboard repo root
against a running satellite-provider (typically the Jetson e2e harness's
``satellite-provider`` service). It does NOT spin up the service itself
and does NOT modify any satellite-provider code or configuration.
Required environment (loaded from ``.env.test`` if not exported)::
SATELLITE_PROVIDER_URL e.g. https://satellite-provider:8080
SATELLITE_PROVIDER_API_KEY a valid HS256 JWT (mint with scripts/mint_dev_jwt.py)
SATELLITE_PROVIDER_TLS_INSECURE optional, "1" to accept self-signed dev certs
JWT_SECRET / JWT_ISSUER / JWT_AUDIENCE required only if --auto-mint-jwt is passed
Usage::
# mint a JWT then seed using defaults from bbox.yaml
export SATELLITE_PROVIDER_API_KEY="$(python scripts/mint_dev_jwt.py)"
python tests/fixtures/derkachi_c6/seed_region.py
# dry-run (validate config + auth without submitting requests)
python tests/fixtures/derkachi_c6/seed_region.py --dry-run
# right-sized to actual flight extent (faster, fewer tiles)
python tests/fixtures/derkachi_c6/seed_region.py --right-sized-flight
# write a JSON summary for downstream consumers (fixture / CI)
python tests/fixtures/derkachi_c6/seed_region.py --output-summary /tmp/seed.json
Exit codes::
0 all regions reached terminal status and inventory verification passed
71 config file missing / malformed
72 required env var missing
73 satellite-provider unreachable (TCP / TLS error)
74 region request rejected (HTTP 4xx / 5xx)
75 one or more regions failed during background processing
76 inventory verification mismatch (fewer tiles present than expected)
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
try:
import httpx
except ImportError as exc:
sys.stderr.write(
f"ERROR: httpx not installed: {exc}\nRun `pip install -e .[dev]` from the repo root.\n"
)
sys.exit(72)
try:
import yaml
except ImportError as exc:
sys.stderr.write(
f"ERROR: PyYAML not installed: {exc}\nRun `pip install -e .[dev]` from the repo root.\n"
)
sys.exit(72)
_REQUEST_TIMEOUT_S = 30.0
_POLL_INTERVAL_S = 5.0
_POLL_MAX_ATTEMPTS = 60 # 60 * 5s = 5 min per region
_TERMINAL_STATUSES = frozenset({"completed", "failed", "error", "done", "succeeded"})
_FAILURE_STATUSES = frozenset({"failed", "error"})
@dataclass
class RegionChunk:
"""One Region API submission: a square area at one zoom level."""
zoom: int
center_lat: float
center_lon: float
size_meters: int
chunk_label: str # e.g. "z18-north" — for human-readable logs only
region_id: uuid.UUID = field(default_factory=uuid.uuid4)
submitted_status: str | None = None
terminal_status: str | None = None
tiles_downloaded: int = 0
tiles_reused: int = 0
csv_path: str | None = None
summary_path: str | None = None
def _load_env_file(path: Path) -> dict[str, str]:
"""Parse a KEY=VALUE env file. Honours quoting; ignores comments."""
if not path.is_file():
return {}
out: dict[str, str] = {}
for raw in path.read_text("utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
out[key.strip()] = value.strip().strip('"').strip("'")
return out
def _resolve_env(name: str, env_file_values: dict[str, str]) -> str | None:
return os.environ.get(name) or env_file_values.get(name)
def _compute_chunks(config: dict[str, Any], right_sized: bool) -> list[RegionChunk]:
"""Plan all Region API submissions for one seeding pass.
Splits each zoom level into N chunks across the lat axis so each
chunk fits within the Region API's sizeMeters cap (10000).
"""
if right_sized:
bbox = config["actual_flight_extent"]
else:
bbox = config["bbox"]
chunks_per_zoom = config["chunking"]["chunks_per_zoom"]
size_meters = int(config["chunking"]["size_meters_per_chunk"])
zoom_levels = config["zoom_levels"]
if right_sized:
# The flight extent is < 1 km, one chunk per zoom is sufficient.
chunks_per_zoom = 1
size_meters = 1000
lat_centers: list[float]
if chunks_per_zoom == 1:
lat_centers = [(bbox["lat_min"] + bbox["lat_max"]) / 2.0]
else:
span = bbox["lat_max"] - bbox["lat_min"]
step = span / chunks_per_zoom
lat_centers = [bbox["lat_min"] + step * (i + 0.5) for i in range(chunks_per_zoom)]
center_lon = (bbox["lon_min"] + bbox["lon_max"]) / 2.0
chunks: list[RegionChunk] = []
for zoom in zoom_levels:
for idx, lat in enumerate(lat_centers):
label_suffix = f"chunk{idx}" if chunks_per_zoom > 1 else "single"
chunks.append(
RegionChunk(
zoom=zoom,
center_lat=lat,
center_lon=center_lon,
size_meters=size_meters,
chunk_label=f"z{zoom}-{label_suffix}",
)
)
return chunks
def _expected_tile_coords(config: dict[str, Any], right_sized: bool) -> list[tuple[int, int, int]]:
"""Compute the slippy-map (z, x, y) tile coords covering the bbox.
Used by the inventory verification step.
"""
if right_sized:
bbox = config["actual_flight_extent"]
else:
bbox = config["bbox"]
coords: list[tuple[int, int, int]] = []
for z in config["zoom_levels"]:
n = 2**z
x_min = int((bbox["lon_min"] + 180) / 360 * n)
x_max = int((bbox["lon_max"] + 180) / 360 * n)
y_min = int((1 - math.asinh(math.tan(math.radians(bbox["lat_max"]))) / math.pi) / 2 * n)
y_max = int((1 - math.asinh(math.tan(math.radians(bbox["lat_min"]))) / math.pi) / 2 * n)
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
coords.append((z, x, y))
return coords
def _submit_region(
client: httpx.Client, sp_url: str, headers: dict[str, str], chunk: RegionChunk
) -> tuple[bool, str]:
"""Submit one Region API request. Returns (success, message)."""
body = {
"id": str(chunk.region_id),
"latitude": chunk.center_lat,
"longitude": chunk.center_lon,
"sizeMeters": chunk.size_meters,
"zoomLevel": chunk.zoom,
"stitchTiles": False,
}
try:
resp = client.post(
f"{sp_url}/api/satellite/request",
headers=headers,
json=body,
timeout=_REQUEST_TIMEOUT_S,
)
except httpx.HTTPError as exc:
return False, f"network error: {exc}"
if resp.status_code != 200:
return False, f"HTTP {resp.status_code}: {resp.text[:200]}"
try:
payload = resp.json()
chunk.submitted_status = payload.get("status")
except json.JSONDecodeError as exc:
return False, f"unexpected response body (not JSON): {exc}; raw={resp.text[:200]}"
return True, f"submitted; initial status={chunk.submitted_status}"
def _poll_region(
client: httpx.Client, sp_url: str, headers: dict[str, str], chunk: RegionChunk
) -> str:
"""Poll one Region until terminal status. Updates chunk fields in-place.
Returns the final status string. Raises RuntimeError on timeout.
"""
for attempt in range(1, _POLL_MAX_ATTEMPTS + 1):
try:
resp = client.get(
f"{sp_url}/api/satellite/region/{chunk.region_id}",
headers=headers,
timeout=_REQUEST_TIMEOUT_S,
)
resp.raise_for_status()
payload = resp.json()
except (httpx.HTTPError, json.JSONDecodeError) as exc:
sys.stderr.write(f" [{chunk.chunk_label}] poll attempt {attempt} failed: {exc}\n")
time.sleep(_POLL_INTERVAL_S)
continue
status = (payload.get("status") or "").lower()
chunk.terminal_status = status
chunk.tiles_downloaded = payload.get("tilesDownloaded", 0)
chunk.tiles_reused = payload.get("tilesReused", 0)
chunk.csv_path = payload.get("csvFilePath")
chunk.summary_path = payload.get("summaryFilePath")
if status in _TERMINAL_STATUSES:
return status
if attempt % 6 == 0: # every ~30s
sys.stderr.write(
f" [{chunk.chunk_label}] still {status} (attempt {attempt}/{_POLL_MAX_ATTEMPTS})\n"
)
time.sleep(_POLL_INTERVAL_S)
raise RuntimeError(
f"region {chunk.region_id} ({chunk.chunk_label}) did not reach terminal "
f"status within {_POLL_MAX_ATTEMPTS * _POLL_INTERVAL_S:.0f}s"
)
def _verify_inventory(
client: httpx.Client,
sp_url: str,
headers: dict[str, str],
expected_coords: list[tuple[int, int, int]],
) -> tuple[int, int]:
"""Query inventory for the expected tile coords. Returns (present, total)."""
BATCH_SIZE = 5000
total_present = 0
total = 0
for batch_start in range(0, len(expected_coords), BATCH_SIZE):
batch = expected_coords[batch_start : batch_start + BATCH_SIZE]
body = {"tiles": [{"z": z, "x": x, "y": y} for z, x, y in batch]}
try:
resp = client.post(
f"{sp_url}/api/satellite/tiles/inventory",
headers=headers,
json=body,
timeout=_REQUEST_TIMEOUT_S,
)
resp.raise_for_status()
payload = resp.json()
except (httpx.HTTPError, json.JSONDecodeError) as exc:
sys.stderr.write(f"inventory batch starting at {batch_start} failed: {exc}\n")
continue
results = payload.get("results", [])
total += len(results)
total_present += sum(1 for r in results if r.get("present"))
return total_present, total
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--bbox-config",
type=Path,
default=Path(__file__).parent / "bbox.yaml",
help="Path to bbox.yaml (default: alongside this script).",
)
parser.add_argument(
"--env-file",
type=Path,
default=Path(".env.test"),
help="Fallback env file (default: .env.test in CWD).",
)
parser.add_argument(
"--output-summary",
type=Path,
default=None,
help="Optional path to write a JSON summary of the seeding run.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Plan + validate auth, but do not submit Region requests.",
)
parser.add_argument(
"--right-sized-flight",
action="store_true",
help=(
"Use the actual_flight_extent bbox (~1 km^2) instead of the full "
"AZ-777 spec bbox (~80 km^2). ~98%% fewer tiles, useful when only "
"the specific Derkachi clip needs coverage."
),
)
parser.add_argument(
"--skip-poll",
action="store_true",
help="Submit all regions but do not poll; exit immediately after submission.",
)
parser.add_argument(
"--skip-inventory-verification",
action="store_true",
help="Skip the final inventory verification step.",
)
args = parser.parse_args()
if not args.bbox_config.is_file():
sys.stderr.write(f"ERROR: bbox config not found: {args.bbox_config}\n")
return 71
try:
config = yaml.safe_load(args.bbox_config.read_text("utf-8"))
except yaml.YAMLError as exc:
sys.stderr.write(f"ERROR: failed to parse {args.bbox_config}: {exc}\n")
return 71
env_file_values = _load_env_file(args.env_file)
sp_url = _resolve_env("SATELLITE_PROVIDER_URL", env_file_values)
jwt_token = _resolve_env("SATELLITE_PROVIDER_API_KEY", env_file_values)
tls_insecure = _resolve_env("SATELLITE_PROVIDER_TLS_INSECURE", env_file_values) == "1"
if not sp_url:
sys.stderr.write("ERROR: SATELLITE_PROVIDER_URL not set (env or .env.test).\n")
return 72
if not jwt_token:
sys.stderr.write(
"ERROR: SATELLITE_PROVIDER_API_KEY not set. Mint with:\n"
" python scripts/mint_dev_jwt.py\n"
)
return 72
chunks = _compute_chunks(config, args.right_sized_flight)
expected_coords = _expected_tile_coords(config, args.right_sized_flight)
# Budget check — loud warning if over-budget per AZ-777 spec line 178.
avg_bytes = int(config["catalog_size_budget"]["estimated_avg_bytes_per_tile"])
budget_bytes = int(config["catalog_size_budget"]["max_bytes_db_side"])
estimated_total = len(expected_coords) * avg_bytes
over_budget = estimated_total > budget_bytes
print(
f"[plan] satellite-provider: {sp_url} (tls_insecure={tls_insecure})\n"
f"[plan] bbox mode: {'right-sized flight' if args.right_sized_flight else 'spec bbox (~80 km^2)'}\n"
f"[plan] zoom levels: {config['zoom_levels']}\n"
f"[plan] region chunks to submit: {len(chunks)}\n"
f"[plan] expected tile coverage: {len(expected_coords)} tiles\n"
f"[plan] estimated DB size: {estimated_total / 1_048_576:.1f} MB "
f"(budget: {budget_bytes / 1_048_576:.0f} MB)\n"
f"[plan] imagery source: {config['imagery_source']['provider']}/{config['imagery_source']['layer']}\n"
f"[plan] license: {config['license']['source']}\n"
f"[plan] attribution: {config['license']['attribution_text']}\n"
)
if over_budget:
overage_pct = (estimated_total - budget_bytes) / budget_bytes * 100
sys.stderr.write(
"WARNING: estimated DB size exceeds spec budget by "
f"~{overage_pct:.0f}%. Per AZ-777 line 178 you can:\n"
" - drop a zoom level (edit bbox.yaml::zoom_levels)\n"
" - reduce bbox (edit bbox.yaml::bbox)\n"
" - use --right-sized-flight (tight to actual flight extent)\n"
"Continuing anyway. Use --dry-run to inspect without seeding.\n"
)
if args.dry_run:
print("[dry-run] would submit:")
for c in chunks:
print(
f" {c.chunk_label}: id={c.region_id} "
f"lat={c.center_lat:.5f} lon={c.center_lon:.5f} "
f"size={c.size_meters} zoom={c.zoom}"
)
return 0
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json",
}
client = httpx.Client(verify=not tls_insecure)
try:
# ----- Phase A: submit all regions upfront -----
print(f"\n[submit] sending {len(chunks)} region requests...")
submission_failures: list[tuple[RegionChunk, str]] = []
for c in chunks:
ok, msg = _submit_region(client, sp_url, headers, c)
print(f" [{c.chunk_label}] {msg}")
if not ok:
submission_failures.append((c, msg))
if submission_failures:
sys.stderr.write(f"ERROR: {len(submission_failures)} submission(s) failed:\n")
for c, msg in submission_failures:
sys.stderr.write(f" [{c.chunk_label}] {msg}\n")
return 74
if args.skip_poll:
print(
"\n[skip-poll] all submissions sent; "
"background processing continues asynchronously. "
f"Region IDs: {[str(c.region_id) for c in chunks]}"
)
return 0
# ----- Phase B: poll each region until terminal -----
print(f"\n[poll] waiting for {len(chunks)} regions to reach terminal status...")
poll_failures: list[RegionChunk] = []
for c in chunks:
try:
status = _poll_region(client, sp_url, headers, c)
except RuntimeError as exc:
sys.stderr.write(f" [{c.chunk_label}] {exc}\n")
poll_failures.append(c)
continue
tiles = c.tiles_downloaded + c.tiles_reused
print(
f" [{c.chunk_label}] terminal={status} tiles={tiles} "
f"(downloaded={c.tiles_downloaded} reused={c.tiles_reused})"
)
if status in _FAILURE_STATUSES:
poll_failures.append(c)
if poll_failures:
sys.stderr.write(f"ERROR: {len(poll_failures)} region(s) did not complete cleanly\n")
return 75
# ----- Phase C: verify inventory -----
if not args.skip_inventory_verification:
print(f"\n[inventory] verifying {len(expected_coords)} expected tile coords...")
present, queried = _verify_inventory(client, sp_url, headers, expected_coords)
print(
f"[inventory] present: {present}/{queried} "
f"({present / queried * 100:.1f}% coverage)"
if queried
else "[inventory] no tiles queried"
)
if queried and present < queried:
missing = queried - present
sys.stderr.write(
f"WARNING: {missing} expected tile(s) not present in inventory. "
"This may indicate partial region failures, edge-tile gaps, or "
"Google Maps API timeouts. Re-run seed_region.py to fill gaps "
"(producer dedups via UPSERT-on-coord, so retries are safe).\n"
)
if present / queried < 0.95:
return 76
finally:
client.close()
# ----- Summary output -----
total_downloaded = sum(c.tiles_downloaded for c in chunks)
total_reused = sum(c.tiles_reused for c in chunks)
print(
f"\n[done] seeded {len(chunks)} regions: "
f"downloaded={total_downloaded} reused={total_reused}"
)
if args.output_summary:
summary = {
"sp_url": sp_url,
"bbox_mode": "right-sized" if args.right_sized_flight else "spec",
"imagery_source": config["imagery_source"],
"license": config["license"],
"chunks": [
{
"label": c.chunk_label,
"region_id": str(c.region_id),
"zoom": c.zoom,
"center_lat": c.center_lat,
"center_lon": c.center_lon,
"size_meters": c.size_meters,
"terminal_status": c.terminal_status,
"tiles_downloaded": c.tiles_downloaded,
"tiles_reused": c.tiles_reused,
"csv_path": c.csv_path,
"summary_path": c.summary_path,
}
for c in chunks
],
"totals": {
"regions": len(chunks),
"tiles_downloaded": total_downloaded,
"tiles_reused": total_reused,
},
}
args.output_summary.parent.mkdir(parents=True, exist_ok=True)
args.output_summary.write_text(json.dumps(summary, indent=2))
print(f"[done] summary written to {args.output_summary}")
return 0
if __name__ == "__main__":
raise SystemExit(main())