mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:21:13 +00:00
e2bebefdfc
AZ-507: codify cross-component import rule. Added _types/inference_errors.py shim re-exporting EngineBuildError + CalibrationCacheError from c7_inference; narrowed C10 EngineCompiler's except Exception to the two typed errors so unknown exceptions propagate (AC-3). Rewrote module-layout.md "Imports from" sections for 9 components + added Rule 9; appended an architecture.md ADR-009 note explaining why components must go through _types/*. AZ-323: ManifestBuilder + Ed25519ManifestSigner. Canonical JSON via orjson OPT_SORT_KEYS+OPT_INDENT_2, atomic-write Manifest.json + sha sidecar + .sig via AZ-280, operator-key fingerprint allowlist gate (C10-ST-01), ADR-010 takeoff_origin + flight_id baked into Manifest AND manifest_hash so re-planned routes change the cache identity (AC-15/AC-16). 20 unit tests cover all 16 ACs. AZ-324: ManifestVerifierImpl. Fail-closed Steps A-D: Manifest.json sidecar self-hash, Ed25519 trust-key set, schema parse with absolute/.. path rejection + takeoff_origin in-bbox check, stream SHA-256 per artifact with multi-failure accumulation. Operator mode re-derives tiles_coverage_sha256 from C6; airborne mode trusts the signed aggregate. 19 unit tests cover all 17 ACs. Composition root: c10_factory.build_manifest_builder + build_manifest_verifier + c6_tile_metadata_store_to_tiles_query adapter (the one place that legitimately imports both C6 and C10 without violating the AZ-270 lint). Dependency: pinned cryptography>=43.0,<46.0 in pyproject.toml. Tests: 1300 passed, 80 skipped (env-only), ruff clean for all AZ-323/324 files. AZ-306 (FAISS) intentionally deferred to batch 35 — needs C++ pybind11 toolchain not present in this environment. Co-authored-by: Cursor <cursoragent@cursor.com>
686 lines
23 KiB
Python
686 lines
23 KiB
Python
"""Unit tests for AZ-323 :class:`ManifestBuilder`.
|
|
|
|
Covers all 16 ACs in the AZ-323 task spec plus a Protocol-conformance
|
|
check and two extra invariants (descriptor-index sidecar drift, key
|
|
load propagating the chained cause). Uses the real
|
|
:class:`Sha256Sidecar` + a real :class:`Ed25519ManifestSigner` so the
|
|
sign / verify round trip exercises production code paths.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
from dataclasses import replace
|
|
from pathlib import Path
|
|
from uuid import UUID, uuid4
|
|
|
|
import orjson
|
|
import pytest
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
|
Ed25519PrivateKey,
|
|
Ed25519PublicKey,
|
|
)
|
|
|
|
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
|
|
from gps_denied_onboard._types.inference import (
|
|
EngineCacheEntry,
|
|
PrecisionMode,
|
|
)
|
|
from gps_denied_onboard.clock.wall_clock import WallClock
|
|
from gps_denied_onboard.components.c10_provisioning import (
|
|
C10ManifestConfig,
|
|
Ed25519ManifestSigner,
|
|
ManifestArtifact,
|
|
ManifestBuilder,
|
|
ManifestBuildInput,
|
|
ManifestSigner,
|
|
ManifestWriteError,
|
|
SigningMode,
|
|
TileHashRecord,
|
|
)
|
|
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Helpers
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
_BBOX = BoundingBox(
|
|
min_lat_deg=50.0,
|
|
min_lon_deg=36.0,
|
|
max_lat_deg=50.5,
|
|
max_lon_deg=36.5,
|
|
)
|
|
_ZOOM_LEVELS = (16, 17, 18)
|
|
|
|
|
|
def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> tuple[Path, str]:
|
|
"""Write a fresh PEM-encoded PKCS8 Ed25519 private key to disk.
|
|
|
|
Returns ``(path, fingerprint_hex)`` so tests can assert against
|
|
the deterministic SHA-256 of the raw 32-byte public key.
|
|
"""
|
|
|
|
priv = Ed25519PrivateKey.generate()
|
|
pem = priv.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
key_path = tmp_path / name
|
|
key_path.write_bytes(pem)
|
|
raw_pub = priv.public_key().public_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PublicFormat.Raw,
|
|
)
|
|
return key_path, hashlib.sha256(raw_pub).hexdigest()
|
|
|
|
|
|
def _make_engine_entries(tmp_path: Path) -> tuple[EngineCacheEntry, ...]:
|
|
"""Materialise three engine sidecars under ``tmp_path/engines/``."""
|
|
|
|
engines_dir = tmp_path / "engines"
|
|
engines_dir.mkdir(parents=True, exist_ok=True)
|
|
entries: list[EngineCacheEntry] = []
|
|
for model in ("dinov2_vpr", "lightglue", "aliked"):
|
|
path = engines_dir / f"{model}_sm87_jp62_trt103_fp16.engine"
|
|
payload = f"engine-bytes-{model}".encode()
|
|
path.write_bytes(payload)
|
|
digest = hashlib.sha256(payload).hexdigest()
|
|
entries.append(
|
|
EngineCacheEntry(
|
|
engine_path=path,
|
|
sha256_hex=digest,
|
|
sm=87,
|
|
jp="6.2",
|
|
trt="10.3",
|
|
precision=PrecisionMode.FP16,
|
|
extras={},
|
|
)
|
|
)
|
|
return tuple(entries)
|
|
|
|
|
|
def _make_descriptor_index(tmp_path: Path) -> Path:
|
|
"""Write a fake descriptor index + its sidecar."""
|
|
|
|
desc_dir = tmp_path / "descriptors"
|
|
desc_dir.mkdir(parents=True, exist_ok=True)
|
|
path = desc_dir / "corpus.index"
|
|
payload = b"faiss-binary-payload"
|
|
Sha256Sidecar.write_atomic_and_sidecar(path, payload)
|
|
return path
|
|
|
|
|
|
def _make_calibration(tmp_path: Path) -> Path:
|
|
cal_dir = tmp_path / "calibration"
|
|
cal_dir.mkdir(parents=True, exist_ok=True)
|
|
path = cal_dir / "int8_calibration.json"
|
|
path.write_bytes(b'{"calibration": "data"}')
|
|
return path
|
|
|
|
|
|
def _make_tiles(count: int = 100) -> tuple[TileHashRecord, ...]:
|
|
"""Generate `count` deterministic tile records."""
|
|
|
|
return tuple(
|
|
TileHashRecord(
|
|
zoom=16 + (i % 3),
|
|
lat=50.0 + 0.001 * i,
|
|
lon=36.0 + 0.001 * i,
|
|
source="googlemaps" if i % 2 == 0 else "onboard_ingest",
|
|
sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(),
|
|
)
|
|
for i in range(count)
|
|
)
|
|
|
|
|
|
class _StaticTiles:
|
|
"""Hand-rolled :class:`TilesByBboxQuery` returning a fixed tuple."""
|
|
|
|
def __init__(self, records: tuple[TileHashRecord, ...]) -> None:
|
|
self._records = records
|
|
|
|
def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def]
|
|
return self._records
|
|
|
|
|
|
def _build_input(
|
|
tmp_path: Path,
|
|
*,
|
|
key_path: Path | None = None,
|
|
takeoff_origin: LatLonAlt | None = None,
|
|
flight_id: UUID | None = None,
|
|
sector_class: str = "stable_rear",
|
|
) -> ManifestBuildInput:
|
|
"""Materialise a complete on-disk input + a freshly generated key."""
|
|
|
|
cache_root = tmp_path / "cache_root"
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
engine_entries = _make_engine_entries(cache_root)
|
|
descriptor_index = _make_descriptor_index(cache_root)
|
|
calibration = _make_calibration(cache_root)
|
|
if key_path is None:
|
|
key_path, _ = _write_pkcs8_key(tmp_path)
|
|
return ManifestBuildInput(
|
|
cache_root=cache_root,
|
|
bbox=_BBOX,
|
|
zoom_levels=_ZOOM_LEVELS,
|
|
sector_class=sector_class,
|
|
engine_entries=engine_entries,
|
|
descriptor_index_path=descriptor_index,
|
|
calibration_path=calibration,
|
|
key_path=key_path,
|
|
takeoff_origin=takeoff_origin,
|
|
flight_id=flight_id,
|
|
)
|
|
|
|
|
|
def _build_builder(
|
|
*,
|
|
config: C10ManifestConfig | None = None,
|
|
tiles: tuple[TileHashRecord, ...] | None = None,
|
|
signer: ManifestSigner | None = None,
|
|
) -> tuple[ManifestBuilder, logging.Logger, list[logging.LogRecord]]:
|
|
records: list[logging.LogRecord] = []
|
|
|
|
class _ListHandler(logging.Handler):
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
records.append(record)
|
|
|
|
logger = logging.getLogger(f"test_az323_{id(records)}")
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.handlers.clear()
|
|
logger.addHandler(_ListHandler())
|
|
logger.propagate = False
|
|
|
|
builder = ManifestBuilder(
|
|
sidecar=Sha256Sidecar(),
|
|
signer=signer if signer is not None else Ed25519ManifestSigner(),
|
|
tile_metadata_store=_StaticTiles(
|
|
tiles if tiles is not None else _make_tiles(100)
|
|
),
|
|
logger=logger,
|
|
clock=WallClock(),
|
|
config=config if config is not None else C10ManifestConfig(),
|
|
)
|
|
return builder, logger, records
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac1_happy_path_produces_manifest_sidecar_and_signature(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
|
|
# Assert
|
|
assert isinstance(artifact, ManifestArtifact)
|
|
assert artifact.manifest_path == request.cache_root / "Manifest.json"
|
|
assert artifact.signature_path == request.cache_root / "Manifest.json.sig"
|
|
assert artifact.manifest_path.exists()
|
|
assert (request.cache_root / "Manifest.json.sha256").exists()
|
|
assert artifact.signature_path.exists()
|
|
assert len(artifact.manifest_hash) == 64
|
|
assert artifact.manifest_hash == artifact.manifest_hash.lower()
|
|
int(artifact.manifest_hash, 16)
|
|
body = orjson.loads(artifact.manifest_path.read_bytes())
|
|
assert len(body["artifacts"]["engines"]) == 3
|
|
assert "descriptor_index" in body["artifacts"]
|
|
assert "calibration" in body["artifacts"]
|
|
assert body["artifacts"]["tiles_coverage"]["tile_count"] == 100
|
|
assert artifact.total_artifacts_listed == 6 # 3 engines + index + calibration + tiles_coverage
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac2_determinism_same_input_same_manifest_hash(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act
|
|
first = builder.build_manifest(request)
|
|
first_bytes = first.manifest_path.read_bytes()
|
|
second = builder.build_manifest(request)
|
|
second_bytes = second.manifest_path.read_bytes()
|
|
|
|
# Assert: identical inputs → identical manifest_hash AND identical
|
|
# canonical bytes once `built_at` is redacted.
|
|
assert first.manifest_hash == second.manifest_hash
|
|
first_obj = orjson.loads(first_bytes)
|
|
second_obj = orjson.loads(second_bytes)
|
|
first_obj["build"].pop("built_at")
|
|
second_obj["build"].pop("built_at")
|
|
assert orjson.dumps(first_obj, option=orjson.OPT_SORT_KEYS) == orjson.dumps(
|
|
second_obj, option=orjson.OPT_SORT_KEYS
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac3_signature_verifies_against_public_key(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
key_path, _ = _write_pkcs8_key(tmp_path)
|
|
request = _build_input(tmp_path, key_path=key_path)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
manifest_bytes = artifact.manifest_path.read_bytes()
|
|
signature_bytes = artifact.signature_path.read_bytes()
|
|
public_key: Ed25519PublicKey = (
|
|
Ed25519PrivateKey.from_private_bytes(
|
|
serialization.load_pem_private_key(
|
|
key_path.read_bytes(), password=None
|
|
).private_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PrivateFormat.Raw,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
).public_key()
|
|
)
|
|
|
|
# Assert: verify() raises on mismatch; absence of raise = pass
|
|
public_key.verify(signature_bytes, manifest_bytes)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac4_operator_mode_rejects_unknown_fingerprint(tmp_path: Path) -> None:
|
|
# Arrange
|
|
allowed_fp = hashlib.sha256(b"some-other-key").hexdigest()
|
|
builder, _, records = _build_builder(
|
|
config=C10ManifestConfig(
|
|
signing_mode=SigningMode.OPERATOR,
|
|
allowed_operator_fingerprints=(allowed_fp,),
|
|
)
|
|
)
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ManifestWriteError) as exc_info:
|
|
builder.build_manifest(request)
|
|
assert allowed_fp in str(exc_info.value)
|
|
assert not (request.cache_root / "Manifest.json").exists()
|
|
assert not (request.cache_root / "Manifest.json.sig").exists()
|
|
errors = [r for r in records if r.levelno == logging.ERROR]
|
|
assert len(errors) == 1
|
|
assert errors[0].__dict__.get("kind") == "c10.manifest.build.error"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac5_operator_mode_accepts_known_fingerprint(tmp_path: Path) -> None:
|
|
# Arrange
|
|
key_path, fp = _write_pkcs8_key(tmp_path)
|
|
builder, _, records = _build_builder(
|
|
config=C10ManifestConfig(
|
|
signing_mode=SigningMode.OPERATOR,
|
|
allowed_operator_fingerprints=(fp,),
|
|
)
|
|
)
|
|
request = _build_input(tmp_path, key_path=key_path)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
|
|
# Assert
|
|
assert artifact.signing_public_key_fingerprint == fp
|
|
warns = [r for r in records if r.levelno == logging.WARNING]
|
|
assert warns == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac6_dev_mode_with_dev_key_no_warning(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, records = _build_builder(
|
|
config=C10ManifestConfig(signing_mode=SigningMode.DEV)
|
|
)
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act
|
|
builder.build_manifest(request)
|
|
|
|
# Assert
|
|
warns = [r for r in records if r.levelno == logging.WARNING]
|
|
assert warns == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac7_dev_mode_with_operator_key_emits_warning(tmp_path: Path) -> None:
|
|
# Arrange
|
|
key_path, fp = _write_pkcs8_key(tmp_path)
|
|
builder, _, records = _build_builder(
|
|
config=C10ManifestConfig(
|
|
signing_mode=SigningMode.DEV,
|
|
allowed_operator_fingerprints=(fp,),
|
|
)
|
|
)
|
|
request = _build_input(tmp_path, key_path=key_path)
|
|
|
|
# Act
|
|
builder.build_manifest(request)
|
|
|
|
# Assert
|
|
warns = [r for r in records if r.levelno == logging.WARNING]
|
|
assert len(warns) == 1
|
|
assert warns[0].__dict__.get("kind") == "c10.manifest.dev_mode_with_operator_key"
|
|
assert warns[0].__dict__.get("kv", {}).get("offered_fingerprint") == fp
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac8_tile_coverage_hash_is_sort_order_deterministic(tmp_path: Path) -> None:
|
|
# Arrange
|
|
tiles = _make_tiles(100)
|
|
tiles_reversed = tuple(reversed(tiles))
|
|
builder_a, _, _ = _build_builder(tiles=tiles)
|
|
builder_b, _, _ = _build_builder(tiles=tiles_reversed)
|
|
request_a = _build_input(tmp_path / "a")
|
|
request_b = _build_input(tmp_path / "b")
|
|
|
|
# Act
|
|
art_a = builder_a.build_manifest(request_a)
|
|
art_b = builder_b.build_manifest(request_b)
|
|
body_a = orjson.loads(art_a.manifest_path.read_bytes())
|
|
body_b = orjson.loads(art_b.manifest_path.read_bytes())
|
|
|
|
# Assert
|
|
assert (
|
|
body_a["artifacts"]["tiles_coverage"]["sha256"]
|
|
== body_b["artifacts"]["tiles_coverage"]["sha256"]
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac9_missing_key_path_raises_manifest_write_error(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
missing = tmp_path / "missing.key"
|
|
request = _build_input(tmp_path, key_path=missing)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ManifestWriteError) as exc_info:
|
|
builder.build_manifest(request)
|
|
assert "operator signing key load failed" in str(exc_info.value)
|
|
assert exc_info.value.__cause__ is not None
|
|
assert not (request.cache_root / "Manifest.json").exists()
|
|
|
|
|
|
def test_ac9_malformed_pem_chains_underlying_cause(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
bogus = tmp_path / "bogus.key"
|
|
bogus.write_bytes(b"-----BEGIN PRIVATE KEY-----\ngarbage\n-----END PRIVATE KEY-----\n")
|
|
request = _build_input(tmp_path, key_path=bogus)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ManifestWriteError) as exc_info:
|
|
builder.build_manifest(request)
|
|
assert exc_info.value.__cause__ is not None
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac10_atomic_write_no_half_manifest(tmp_path: Path) -> None:
|
|
"""Sha256Sidecar uses tmp-file → os.replace; we assert the previous-good
|
|
Manifest survives if a re-build is interrupted before reaching the
|
|
atomic-replace step. We simulate the interruption by injecting a
|
|
signer whose ``sign()`` raises AFTER the Manifest.json was written
|
|
(the post-condition shows the disk in its pre-build state)."""
|
|
|
|
# Arrange
|
|
request = _build_input(tmp_path)
|
|
good_builder, _, _ = _build_builder()
|
|
good_builder.build_manifest(request) # produce v1 on disk
|
|
good_manifest_bytes = (request.cache_root / "Manifest.json").read_bytes()
|
|
good_sig_bytes = (request.cache_root / "Manifest.json.sig").read_bytes()
|
|
|
|
class _ExplodingSigner:
|
|
def __init__(self) -> None:
|
|
self._inner = Ed25519ManifestSigner()
|
|
|
|
def load_signing_key(self, key_path): # type: ignore[no-untyped-def]
|
|
return self._inner.load_signing_key(key_path)
|
|
|
|
def sign(self, key, payload_bytes): # type: ignore[no-untyped-def]
|
|
raise RuntimeError("simulated kill mid-build")
|
|
|
|
def public_key_fingerprint(self, key): # type: ignore[no-untyped-def]
|
|
return self._inner.public_key_fingerprint(key)
|
|
|
|
failing_builder, _, _ = _build_builder(signer=_ExplodingSigner())
|
|
|
|
# Act
|
|
with pytest.raises(RuntimeError, match="simulated kill"):
|
|
failing_builder.build_manifest(request)
|
|
|
|
# Assert: signature was never re-written; the previous-good signature
|
|
# survives untouched (atomic-write guarantee).
|
|
assert (request.cache_root / "Manifest.json.sig").read_bytes() == good_sig_bytes
|
|
# The Manifest.json may be the new one or the old one — never half-written.
|
|
new_bytes = (request.cache_root / "Manifest.json").read_bytes()
|
|
assert orjson.loads(new_bytes) is not None
|
|
# And the sidecar must remain consistent with whatever is on disk now.
|
|
actual_hash = hashlib.sha256(new_bytes).hexdigest()
|
|
sidecar_hash = (request.cache_root / "Manifest.json.sha256").read_text().strip()
|
|
assert actual_hash == sidecar_hash
|
|
# Defensive: if it's the old Manifest, its bytes equal the saved snapshot.
|
|
if actual_hash == hashlib.sha256(good_manifest_bytes).hexdigest():
|
|
assert new_bytes == good_manifest_bytes
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-11
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac11_manifest_own_sidecar_matches_disk_bytes(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
actual = hashlib.sha256(artifact.manifest_path.read_bytes()).hexdigest()
|
|
sidecar = (request.cache_root / "Manifest.json.sha256").read_text().strip()
|
|
|
|
# Assert
|
|
assert actual == sidecar
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-12
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac12_total_artifacts_listed_counts_dict_entries(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
|
|
# Assert: 3 engines + 1 index + 1 calibration + 1 tiles_coverage = 6
|
|
assert artifact.total_artifacts_listed == 6
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-13
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac13_takeoff_origin_baked_into_manifest_body(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
|
|
flight = uuid4()
|
|
request = _build_input(tmp_path, takeoff_origin=origin, flight_id=flight)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
body = orjson.loads(artifact.manifest_path.read_bytes())
|
|
|
|
# Assert
|
|
flight_block = body["flight"]
|
|
assert flight_block["flight_id"] == str(flight)
|
|
assert flight_block["takeoff_origin"]["lat_deg"] == 50.0
|
|
assert flight_block["takeoff_origin"]["lon_deg"] == 36.2
|
|
assert flight_block["takeoff_origin"]["alt_m"] == 200.0
|
|
# No timestamp inside takeoff_origin
|
|
assert set(flight_block["takeoff_origin"].keys()) == {
|
|
"lat_deg",
|
|
"lon_deg",
|
|
"alt_m",
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-14
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac14_takeoff_origin_absent_when_not_supplied(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path, takeoff_origin=None, flight_id=None)
|
|
|
|
# Act
|
|
artifact = builder.build_manifest(request)
|
|
body = orjson.loads(artifact.manifest_path.read_bytes())
|
|
|
|
# Assert: flight_id is null but takeoff_origin key is absent
|
|
assert body["flight"]["flight_id"] is None
|
|
assert "takeoff_origin" not in body["flight"]
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-15
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac15_manifest_hash_changes_when_takeoff_origin_differs(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
flight = uuid4()
|
|
a = _build_input(
|
|
tmp_path / "a",
|
|
takeoff_origin=LatLonAlt(50.0, 36.2, 200.0),
|
|
flight_id=flight,
|
|
)
|
|
b = _build_input(
|
|
tmp_path / "b",
|
|
takeoff_origin=LatLonAlt(50.0, 36.2, 200.001), # 1mm delta
|
|
flight_id=flight,
|
|
)
|
|
|
|
# Re-use the same key so only the origin differs.
|
|
b = replace(b, key_path=a.key_path)
|
|
|
|
# Act
|
|
art_a = builder.build_manifest(a)
|
|
art_b = builder.build_manifest(b)
|
|
|
|
# Assert
|
|
assert art_a.manifest_hash != art_b.manifest_hash
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-16
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ac16_manifest_hash_changes_when_only_flight_id_differs(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
origin = LatLonAlt(50.0, 36.2, 200.0)
|
|
a = _build_input(tmp_path / "a", takeoff_origin=origin, flight_id=uuid4())
|
|
b = _build_input(tmp_path / "b", takeoff_origin=origin, flight_id=uuid4())
|
|
b = replace(b, key_path=a.key_path)
|
|
|
|
# Act
|
|
art_a = builder.build_manifest(a)
|
|
art_b = builder.build_manifest(b)
|
|
|
|
# Assert
|
|
assert art_a.manifest_hash != art_b.manifest_hash
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Protocol conformance
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_ed25519_manifest_signer_satisfies_protocol() -> None:
|
|
# Assert
|
|
assert isinstance(Ed25519ManifestSigner(), ManifestSigner)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Descriptor-index sidecar drift
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def test_descriptor_index_sidecar_missing_raises_manifest_write_error(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
(request.cache_root / "descriptors" / "corpus.index.sha256").unlink()
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ManifestWriteError, match="descriptor_index sidecar missing"):
|
|
builder.build_manifest(request)
|
|
|
|
|
|
def test_descriptor_index_sidecar_malformed_hex_raises(tmp_path: Path) -> None:
|
|
# Arrange
|
|
builder, _, _ = _build_builder()
|
|
request = _build_input(tmp_path)
|
|
(request.cache_root / "descriptors" / "corpus.index.sha256").write_text("not-hex!")
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ManifestWriteError, match="not 64 hex chars"):
|
|
builder.build_manifest(request)
|