mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 21:01:12 +00:00
b15454b9a9
Phase 1 hotfix:
- C11 HttpTileDownloader adapted to satellite-provider v2.0.0
z/x/y inventory contract (bulk POST keyed by slippy-map coords).
- Unit tests rewritten to exercise the new inventory schema.
- E2E smoke test updated to match the v2.0.0 wire.
Phase 2 (Derkachi seed + smoke-validated on Jetson):
- tests/fixtures/derkachi_c6/{README,bbox.yaml,seed_region.py}
drives POST /api/satellite/region against satellite-provider
with Google Maps as the imagery source. Smoke run produced
4 regions, 175 tiles, inventory 32/32.
- scripts/mint_dev_jwt.py + run-tests-jetson.sh auto-mint and
export SATELLITE_PROVIDER_API_KEY using JWT_SECRET / JWT_ISSUER
/ JWT_AUDIENCE env vars (no host port mappings; e2e-runner
reaches SP via internal docker network only).
Spec amendment: AZ-777 todo spec updated to record the
Google Maps imagery source decision and STOP-gate state.
AZ-777 Phase 3+ work is superseded by Epic AZ-835 (see next
commit).
Co-authored-by: Cursor <cursoragent@cursor.com>
152 lines
4.4 KiB
Python
152 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Mint a dev JWT for the parent-suite satellite-provider (AZ-777).
|
|
|
|
Reads JWT_SECRET / JWT_ISSUER / JWT_AUDIENCE from environment or from
|
|
`.env.test` (when run from the repo root). Prints the JWT to stdout so
|
|
the caller can pipe / paste / `export` it.
|
|
|
|
DEV-ONLY: the same secret signs the JWT and validates it on the
|
|
provider side; production deploys retrieve operator JWTs from the admin
|
|
API (AZ-690) instead. Mirrors `SatelliteProvider.TestSupport.JwtTokenFactory.Create`
|
|
on the .NET side so dev tokens behave identically to integration-test ones.
|
|
|
|
Usage::
|
|
|
|
python scripts/mint_dev_jwt.py
|
|
python scripts/mint_dev_jwt.py --lifetime-hours 12 --subject e2e-runner
|
|
python scripts/mint_dev_jwt.py --permission GPS # unlocks /api/satellite/upload
|
|
export SATELLITE_PROVIDER_API_KEY="$(python scripts/mint_dev_jwt.py)"
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import jwt
|
|
except ImportError as exc:
|
|
sys.stderr.write(
|
|
"ERROR: pyjwt not installed. Run `pip install pyjwt>=2.8,<3.0`\n"
|
|
"(or `pip install -e .[dev]` from the repo root) and retry.\n"
|
|
f"Underlying ImportError: {exc}\n"
|
|
)
|
|
sys.exit(72)
|
|
|
|
|
|
_MIN_SECRET_BYTES = 32
|
|
|
|
|
|
def _load_env_file(path: Path) -> dict[str, str]:
|
|
"""Parse a minimal KEY=VALUE env file. Honours quoting; ignores comments."""
|
|
|
|
if not path.is_file():
|
|
return {}
|
|
out: dict[str, str] = {}
|
|
for raw in path.read_text("utf-8").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
if key:
|
|
out[key] = value
|
|
return out
|
|
|
|
|
|
def _resolve(name: str, env_file_values: dict[str, str]) -> str | None:
|
|
value = os.environ.get(name)
|
|
if value:
|
|
return value
|
|
return env_file_values.get(name)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"--lifetime-hours",
|
|
type=float,
|
|
default=8.0,
|
|
help="Token lifetime in hours (default: 8).",
|
|
)
|
|
parser.add_argument(
|
|
"--subject",
|
|
default="gps-denied-onboard-e2e",
|
|
help="`sub` claim (default: gps-denied-onboard-e2e).",
|
|
)
|
|
parser.add_argument(
|
|
"--env-file",
|
|
default=".env.test",
|
|
help="Fallback env file (default: .env.test in CWD).",
|
|
)
|
|
parser.add_argument(
|
|
"--permission",
|
|
action="append",
|
|
default=None,
|
|
metavar="NAME",
|
|
help=(
|
|
"Add a value to the `permissions` JWT claim. Repeatable "
|
|
"(e.g. --permission GPS --permission FL). Use `GPS` to unlock "
|
|
"/api/satellite/upload on the satellite-provider."
|
|
),
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
env_file_values = _load_env_file(Path(args.env_file))
|
|
secret = _resolve("JWT_SECRET", env_file_values)
|
|
issuer = _resolve("JWT_ISSUER", env_file_values)
|
|
audience = _resolve("JWT_AUDIENCE", env_file_values)
|
|
|
|
missing = [
|
|
name
|
|
for name, value in [
|
|
("JWT_SECRET", secret),
|
|
("JWT_ISSUER", issuer),
|
|
("JWT_AUDIENCE", audience),
|
|
]
|
|
if not value
|
|
]
|
|
if missing:
|
|
sys.stderr.write(
|
|
"ERROR: required env var(s) not set: "
|
|
+ ", ".join(missing)
|
|
+ f"\n(looked at environment + {args.env_file})\n"
|
|
)
|
|
return 73
|
|
|
|
assert secret is not None
|
|
if len(secret.encode("utf-8")) < _MIN_SECRET_BYTES:
|
|
sys.stderr.write(
|
|
f"ERROR: JWT_SECRET is {len(secret)} bytes; HMAC-SHA256 requires "
|
|
f">= {_MIN_SECRET_BYTES} bytes per the provider's contract.\n"
|
|
)
|
|
return 74
|
|
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": args.subject,
|
|
"iss": issuer,
|
|
"aud": audience,
|
|
"jti": uuid.uuid4().hex,
|
|
"iat": int(now.timestamp()),
|
|
"nbf": int(now.timestamp()),
|
|
"exp": int((now + timedelta(hours=args.lifetime_hours)).timestamp()),
|
|
}
|
|
if args.permission:
|
|
payload["permissions"] = list(args.permission)
|
|
|
|
token = jwt.encode(payload, secret, algorithm="HS256")
|
|
sys.stdout.write(token + "\n")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|