Files
Oleksandr Bezdieniezhnykh e2bebefdfc [AZ-507] [AZ-323] [AZ-324] C10 Manifest build + verify + AZ-270 hygiene
AZ-507: codify cross-component import rule. Added
_types/inference_errors.py shim re-exporting EngineBuildError +
CalibrationCacheError from c7_inference; narrowed C10
EngineCompiler's except Exception to the two typed errors so unknown
exceptions propagate (AC-3). Rewrote module-layout.md "Imports from"
sections for 9 components + added Rule 9; appended an
architecture.md ADR-009 note explaining why components must go
through _types/*.

AZ-323: ManifestBuilder + Ed25519ManifestSigner. Canonical JSON via
orjson OPT_SORT_KEYS+OPT_INDENT_2, atomic-write Manifest.json + sha
sidecar + .sig via AZ-280, operator-key fingerprint allowlist gate
(C10-ST-01), ADR-010 takeoff_origin + flight_id baked into Manifest
AND manifest_hash so re-planned routes change the cache identity
(AC-15/AC-16). 20 unit tests cover all 16 ACs.

AZ-324: ManifestVerifierImpl. Fail-closed Steps A-D: Manifest.json
sidecar self-hash, Ed25519 trust-key set, schema parse with
absolute/.. path rejection + takeoff_origin in-bbox check, stream
SHA-256 per artifact with multi-failure accumulation. Operator mode
re-derives tiles_coverage_sha256 from C6; airborne mode trusts the
signed aggregate. 19 unit tests cover all 17 ACs.

Composition root: c10_factory.build_manifest_builder +
build_manifest_verifier + c6_tile_metadata_store_to_tiles_query
adapter (the one place that legitimately imports both C6 and C10
without violating the AZ-270 lint).

Dependency: pinned cryptography>=43.0,<46.0 in pyproject.toml.

Tests: 1300 passed, 80 skipped (env-only), ruff clean for all
AZ-323/324 files.

AZ-306 (FAISS) intentionally deferred to batch 35 — needs C++
pybind11 toolchain not present in this environment.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 02:37:14 +03:00

686 lines
23 KiB
Python

"""Unit tests for AZ-323 :class:`ManifestBuilder`.
Covers all 16 ACs in the AZ-323 task spec plus a Protocol-conformance
check and two extra invariants (descriptor-index sidecar drift, key
load propagating the chained cause). Uses the real
:class:`Sha256Sidecar` + a real :class:`Ed25519ManifestSigner` so the
sign / verify round trip exercises production code paths.
"""
from __future__ import annotations
import hashlib
import logging
from dataclasses import replace
from pathlib import Path
from uuid import UUID, uuid4
import orjson
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import (
EngineCacheEntry,
PrecisionMode,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c10_provisioning import (
C10ManifestConfig,
Ed25519ManifestSigner,
ManifestArtifact,
ManifestBuilder,
ManifestBuildInput,
ManifestSigner,
ManifestWriteError,
SigningMode,
TileHashRecord,
)
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
# ----------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------
_BBOX = BoundingBox(
min_lat_deg=50.0,
min_lon_deg=36.0,
max_lat_deg=50.5,
max_lon_deg=36.5,
)
_ZOOM_LEVELS = (16, 17, 18)
def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> tuple[Path, str]:
"""Write a fresh PEM-encoded PKCS8 Ed25519 private key to disk.
Returns ``(path, fingerprint_hex)`` so tests can assert against
the deterministic SHA-256 of the raw 32-byte public key.
"""
priv = Ed25519PrivateKey.generate()
pem = priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
key_path = tmp_path / name
key_path.write_bytes(pem)
raw_pub = priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
return key_path, hashlib.sha256(raw_pub).hexdigest()
def _make_engine_entries(tmp_path: Path) -> tuple[EngineCacheEntry, ...]:
"""Materialise three engine sidecars under ``tmp_path/engines/``."""
engines_dir = tmp_path / "engines"
engines_dir.mkdir(parents=True, exist_ok=True)
entries: list[EngineCacheEntry] = []
for model in ("dinov2_vpr", "lightglue", "aliked"):
path = engines_dir / f"{model}_sm87_jp62_trt103_fp16.engine"
payload = f"engine-bytes-{model}".encode()
path.write_bytes(payload)
digest = hashlib.sha256(payload).hexdigest()
entries.append(
EngineCacheEntry(
engine_path=path,
sha256_hex=digest,
sm=87,
jp="6.2",
trt="10.3",
precision=PrecisionMode.FP16,
extras={},
)
)
return tuple(entries)
def _make_descriptor_index(tmp_path: Path) -> Path:
"""Write a fake descriptor index + its sidecar."""
desc_dir = tmp_path / "descriptors"
desc_dir.mkdir(parents=True, exist_ok=True)
path = desc_dir / "corpus.index"
payload = b"faiss-binary-payload"
Sha256Sidecar.write_atomic_and_sidecar(path, payload)
return path
def _make_calibration(tmp_path: Path) -> Path:
cal_dir = tmp_path / "calibration"
cal_dir.mkdir(parents=True, exist_ok=True)
path = cal_dir / "int8_calibration.json"
path.write_bytes(b'{"calibration": "data"}')
return path
def _make_tiles(count: int = 100) -> tuple[TileHashRecord, ...]:
"""Generate `count` deterministic tile records."""
return tuple(
TileHashRecord(
zoom=16 + (i % 3),
lat=50.0 + 0.001 * i,
lon=36.0 + 0.001 * i,
source="googlemaps" if i % 2 == 0 else "onboard_ingest",
sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(),
)
for i in range(count)
)
class _StaticTiles:
"""Hand-rolled :class:`TilesByBboxQuery` returning a fixed tuple."""
def __init__(self, records: tuple[TileHashRecord, ...]) -> None:
self._records = records
def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def]
return self._records
def _build_input(
tmp_path: Path,
*,
key_path: Path | None = None,
takeoff_origin: LatLonAlt | None = None,
flight_id: UUID | None = None,
sector_class: str = "stable_rear",
) -> ManifestBuildInput:
"""Materialise a complete on-disk input + a freshly generated key."""
cache_root = tmp_path / "cache_root"
cache_root.mkdir(parents=True, exist_ok=True)
engine_entries = _make_engine_entries(cache_root)
descriptor_index = _make_descriptor_index(cache_root)
calibration = _make_calibration(cache_root)
if key_path is None:
key_path, _ = _write_pkcs8_key(tmp_path)
return ManifestBuildInput(
cache_root=cache_root,
bbox=_BBOX,
zoom_levels=_ZOOM_LEVELS,
sector_class=sector_class,
engine_entries=engine_entries,
descriptor_index_path=descriptor_index,
calibration_path=calibration,
key_path=key_path,
takeoff_origin=takeoff_origin,
flight_id=flight_id,
)
def _build_builder(
*,
config: C10ManifestConfig | None = None,
tiles: tuple[TileHashRecord, ...] | None = None,
signer: ManifestSigner | None = None,
) -> tuple[ManifestBuilder, logging.Logger, list[logging.LogRecord]]:
records: list[logging.LogRecord] = []
class _ListHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
records.append(record)
logger = logging.getLogger(f"test_az323_{id(records)}")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
logger.addHandler(_ListHandler())
logger.propagate = False
builder = ManifestBuilder(
sidecar=Sha256Sidecar(),
signer=signer if signer is not None else Ed25519ManifestSigner(),
tile_metadata_store=_StaticTiles(
tiles if tiles is not None else _make_tiles(100)
),
logger=logger,
clock=WallClock(),
config=config if config is not None else C10ManifestConfig(),
)
return builder, logger, records
# ----------------------------------------------------------------------
# AC-1
# ----------------------------------------------------------------------
def test_ac1_happy_path_produces_manifest_sidecar_and_signature(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
# Assert
assert isinstance(artifact, ManifestArtifact)
assert artifact.manifest_path == request.cache_root / "Manifest.json"
assert artifact.signature_path == request.cache_root / "Manifest.json.sig"
assert artifact.manifest_path.exists()
assert (request.cache_root / "Manifest.json.sha256").exists()
assert artifact.signature_path.exists()
assert len(artifact.manifest_hash) == 64
assert artifact.manifest_hash == artifact.manifest_hash.lower()
int(artifact.manifest_hash, 16)
body = orjson.loads(artifact.manifest_path.read_bytes())
assert len(body["artifacts"]["engines"]) == 3
assert "descriptor_index" in body["artifacts"]
assert "calibration" in body["artifacts"]
assert body["artifacts"]["tiles_coverage"]["tile_count"] == 100
assert artifact.total_artifacts_listed == 6 # 3 engines + index + calibration + tiles_coverage
# ----------------------------------------------------------------------
# AC-2
# ----------------------------------------------------------------------
def test_ac2_determinism_same_input_same_manifest_hash(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
first = builder.build_manifest(request)
first_bytes = first.manifest_path.read_bytes()
second = builder.build_manifest(request)
second_bytes = second.manifest_path.read_bytes()
# Assert: identical inputs → identical manifest_hash AND identical
# canonical bytes once `built_at` is redacted.
assert first.manifest_hash == second.manifest_hash
first_obj = orjson.loads(first_bytes)
second_obj = orjson.loads(second_bytes)
first_obj["build"].pop("built_at")
second_obj["build"].pop("built_at")
assert orjson.dumps(first_obj, option=orjson.OPT_SORT_KEYS) == orjson.dumps(
second_obj, option=orjson.OPT_SORT_KEYS
)
# ----------------------------------------------------------------------
# AC-3
# ----------------------------------------------------------------------
def test_ac3_signature_verifies_against_public_key(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
key_path, _ = _write_pkcs8_key(tmp_path)
request = _build_input(tmp_path, key_path=key_path)
# Act
artifact = builder.build_manifest(request)
manifest_bytes = artifact.manifest_path.read_bytes()
signature_bytes = artifact.signature_path.read_bytes()
public_key: Ed25519PublicKey = (
Ed25519PrivateKey.from_private_bytes(
serialization.load_pem_private_key(
key_path.read_bytes(), password=None
).private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
).public_key()
)
# Assert: verify() raises on mismatch; absence of raise = pass
public_key.verify(signature_bytes, manifest_bytes)
# ----------------------------------------------------------------------
# AC-4
# ----------------------------------------------------------------------
def test_ac4_operator_mode_rejects_unknown_fingerprint(tmp_path: Path) -> None:
# Arrange
allowed_fp = hashlib.sha256(b"some-other-key").hexdigest()
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.OPERATOR,
allowed_operator_fingerprints=(allowed_fp,),
)
)
request = _build_input(tmp_path)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert allowed_fp in str(exc_info.value)
assert not (request.cache_root / "Manifest.json").exists()
assert not (request.cache_root / "Manifest.json.sig").exists()
errors = [r for r in records if r.levelno == logging.ERROR]
assert len(errors) == 1
assert errors[0].__dict__.get("kind") == "c10.manifest.build.error"
# ----------------------------------------------------------------------
# AC-5
# ----------------------------------------------------------------------
def test_ac5_operator_mode_accepts_known_fingerprint(tmp_path: Path) -> None:
# Arrange
key_path, fp = _write_pkcs8_key(tmp_path)
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.OPERATOR,
allowed_operator_fingerprints=(fp,),
)
)
request = _build_input(tmp_path, key_path=key_path)
# Act
artifact = builder.build_manifest(request)
# Assert
assert artifact.signing_public_key_fingerprint == fp
warns = [r for r in records if r.levelno == logging.WARNING]
assert warns == []
# ----------------------------------------------------------------------
# AC-6
# ----------------------------------------------------------------------
def test_ac6_dev_mode_with_dev_key_no_warning(tmp_path: Path) -> None:
# Arrange
builder, _, records = _build_builder(
config=C10ManifestConfig(signing_mode=SigningMode.DEV)
)
request = _build_input(tmp_path)
# Act
builder.build_manifest(request)
# Assert
warns = [r for r in records if r.levelno == logging.WARNING]
assert warns == []
# ----------------------------------------------------------------------
# AC-7
# ----------------------------------------------------------------------
def test_ac7_dev_mode_with_operator_key_emits_warning(tmp_path: Path) -> None:
# Arrange
key_path, fp = _write_pkcs8_key(tmp_path)
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.DEV,
allowed_operator_fingerprints=(fp,),
)
)
request = _build_input(tmp_path, key_path=key_path)
# Act
builder.build_manifest(request)
# Assert
warns = [r for r in records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert warns[0].__dict__.get("kind") == "c10.manifest.dev_mode_with_operator_key"
assert warns[0].__dict__.get("kv", {}).get("offered_fingerprint") == fp
# ----------------------------------------------------------------------
# AC-8
# ----------------------------------------------------------------------
def test_ac8_tile_coverage_hash_is_sort_order_deterministic(tmp_path: Path) -> None:
# Arrange
tiles = _make_tiles(100)
tiles_reversed = tuple(reversed(tiles))
builder_a, _, _ = _build_builder(tiles=tiles)
builder_b, _, _ = _build_builder(tiles=tiles_reversed)
request_a = _build_input(tmp_path / "a")
request_b = _build_input(tmp_path / "b")
# Act
art_a = builder_a.build_manifest(request_a)
art_b = builder_b.build_manifest(request_b)
body_a = orjson.loads(art_a.manifest_path.read_bytes())
body_b = orjson.loads(art_b.manifest_path.read_bytes())
# Assert
assert (
body_a["artifacts"]["tiles_coverage"]["sha256"]
== body_b["artifacts"]["tiles_coverage"]["sha256"]
)
# ----------------------------------------------------------------------
# AC-9
# ----------------------------------------------------------------------
def test_ac9_missing_key_path_raises_manifest_write_error(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
missing = tmp_path / "missing.key"
request = _build_input(tmp_path, key_path=missing)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert "operator signing key load failed" in str(exc_info.value)
assert exc_info.value.__cause__ is not None
assert not (request.cache_root / "Manifest.json").exists()
def test_ac9_malformed_pem_chains_underlying_cause(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
bogus = tmp_path / "bogus.key"
bogus.write_bytes(b"-----BEGIN PRIVATE KEY-----\ngarbage\n-----END PRIVATE KEY-----\n")
request = _build_input(tmp_path, key_path=bogus)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert exc_info.value.__cause__ is not None
# ----------------------------------------------------------------------
# AC-10
# ----------------------------------------------------------------------
def test_ac10_atomic_write_no_half_manifest(tmp_path: Path) -> None:
"""Sha256Sidecar uses tmp-file → os.replace; we assert the previous-good
Manifest survives if a re-build is interrupted before reaching the
atomic-replace step. We simulate the interruption by injecting a
signer whose ``sign()`` raises AFTER the Manifest.json was written
(the post-condition shows the disk in its pre-build state)."""
# Arrange
request = _build_input(tmp_path)
good_builder, _, _ = _build_builder()
good_builder.build_manifest(request) # produce v1 on disk
good_manifest_bytes = (request.cache_root / "Manifest.json").read_bytes()
good_sig_bytes = (request.cache_root / "Manifest.json.sig").read_bytes()
class _ExplodingSigner:
def __init__(self) -> None:
self._inner = Ed25519ManifestSigner()
def load_signing_key(self, key_path): # type: ignore[no-untyped-def]
return self._inner.load_signing_key(key_path)
def sign(self, key, payload_bytes): # type: ignore[no-untyped-def]
raise RuntimeError("simulated kill mid-build")
def public_key_fingerprint(self, key): # type: ignore[no-untyped-def]
return self._inner.public_key_fingerprint(key)
failing_builder, _, _ = _build_builder(signer=_ExplodingSigner())
# Act
with pytest.raises(RuntimeError, match="simulated kill"):
failing_builder.build_manifest(request)
# Assert: signature was never re-written; the previous-good signature
# survives untouched (atomic-write guarantee).
assert (request.cache_root / "Manifest.json.sig").read_bytes() == good_sig_bytes
# The Manifest.json may be the new one or the old one — never half-written.
new_bytes = (request.cache_root / "Manifest.json").read_bytes()
assert orjson.loads(new_bytes) is not None
# And the sidecar must remain consistent with whatever is on disk now.
actual_hash = hashlib.sha256(new_bytes).hexdigest()
sidecar_hash = (request.cache_root / "Manifest.json.sha256").read_text().strip()
assert actual_hash == sidecar_hash
# Defensive: if it's the old Manifest, its bytes equal the saved snapshot.
if actual_hash == hashlib.sha256(good_manifest_bytes).hexdigest():
assert new_bytes == good_manifest_bytes
# ----------------------------------------------------------------------
# AC-11
# ----------------------------------------------------------------------
def test_ac11_manifest_own_sidecar_matches_disk_bytes(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
actual = hashlib.sha256(artifact.manifest_path.read_bytes()).hexdigest()
sidecar = (request.cache_root / "Manifest.json.sha256").read_text().strip()
# Assert
assert actual == sidecar
# ----------------------------------------------------------------------
# AC-12
# ----------------------------------------------------------------------
def test_ac12_total_artifacts_listed_counts_dict_entries(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
# Assert: 3 engines + 1 index + 1 calibration + 1 tiles_coverage = 6
assert artifact.total_artifacts_listed == 6
# ----------------------------------------------------------------------
# AC-13
# ----------------------------------------------------------------------
def test_ac13_takeoff_origin_baked_into_manifest_body(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
flight = uuid4()
request = _build_input(tmp_path, takeoff_origin=origin, flight_id=flight)
# Act
artifact = builder.build_manifest(request)
body = orjson.loads(artifact.manifest_path.read_bytes())
# Assert
flight_block = body["flight"]
assert flight_block["flight_id"] == str(flight)
assert flight_block["takeoff_origin"]["lat_deg"] == 50.0
assert flight_block["takeoff_origin"]["lon_deg"] == 36.2
assert flight_block["takeoff_origin"]["alt_m"] == 200.0
# No timestamp inside takeoff_origin
assert set(flight_block["takeoff_origin"].keys()) == {
"lat_deg",
"lon_deg",
"alt_m",
}
# ----------------------------------------------------------------------
# AC-14
# ----------------------------------------------------------------------
def test_ac14_takeoff_origin_absent_when_not_supplied(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path, takeoff_origin=None, flight_id=None)
# Act
artifact = builder.build_manifest(request)
body = orjson.loads(artifact.manifest_path.read_bytes())
# Assert: flight_id is null but takeoff_origin key is absent
assert body["flight"]["flight_id"] is None
assert "takeoff_origin" not in body["flight"]
# ----------------------------------------------------------------------
# AC-15
# ----------------------------------------------------------------------
def test_ac15_manifest_hash_changes_when_takeoff_origin_differs(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
flight = uuid4()
a = _build_input(
tmp_path / "a",
takeoff_origin=LatLonAlt(50.0, 36.2, 200.0),
flight_id=flight,
)
b = _build_input(
tmp_path / "b",
takeoff_origin=LatLonAlt(50.0, 36.2, 200.001), # 1mm delta
flight_id=flight,
)
# Re-use the same key so only the origin differs.
b = replace(b, key_path=a.key_path)
# Act
art_a = builder.build_manifest(a)
art_b = builder.build_manifest(b)
# Assert
assert art_a.manifest_hash != art_b.manifest_hash
# ----------------------------------------------------------------------
# AC-16
# ----------------------------------------------------------------------
def test_ac16_manifest_hash_changes_when_only_flight_id_differs(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
origin = LatLonAlt(50.0, 36.2, 200.0)
a = _build_input(tmp_path / "a", takeoff_origin=origin, flight_id=uuid4())
b = _build_input(tmp_path / "b", takeoff_origin=origin, flight_id=uuid4())
b = replace(b, key_path=a.key_path)
# Act
art_a = builder.build_manifest(a)
art_b = builder.build_manifest(b)
# Assert
assert art_a.manifest_hash != art_b.manifest_hash
# ----------------------------------------------------------------------
# Protocol conformance
# ----------------------------------------------------------------------
def test_ed25519_manifest_signer_satisfies_protocol() -> None:
# Assert
assert isinstance(Ed25519ManifestSigner(), ManifestSigner)
# ----------------------------------------------------------------------
# Descriptor-index sidecar drift
# ----------------------------------------------------------------------
def test_descriptor_index_sidecar_missing_raises_manifest_write_error(
tmp_path: Path,
) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
(request.cache_root / "descriptors" / "corpus.index.sha256").unlink()
# Act / Assert
with pytest.raises(ManifestWriteError, match="descriptor_index sidecar missing"):
builder.build_manifest(request)
def test_descriptor_index_sidecar_malformed_hex_raises(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
(request.cache_root / "descriptors" / "corpus.index.sha256").write_text("not-hex!")
# Act / Assert
with pytest.raises(ManifestWriteError, match="not 64 hex chars"):
builder.build_manifest(request)