[AZ-515] Extract C10 canonical hash helpers to shared module

Cumulative-review F1 (batches 34-36, carried into batch 37): both
manifest_verifier.py (AZ-324) and provisioner.py (AZ-325) imported
leading-underscore privates _aggregate_tile_hash + _compute_manifest_hash
from manifest_builder.py (AZ-323). The helpers encode the trust-chain
formula shared across all three components; the import shape gave
readers no static signal that a refactor would silently break two
modules.

Move the formula into c10_provisioning/_canonical_hash.py:

- TileHashRecord (moved from manifest_builder)
- aggregate_tile_hash (renamed, public)
- compute_manifest_hash (renamed, public)
- TAKEOFF_ORIGIN_DECIMALS constant (moved)

Callers updated to import directly from _canonical_hash. Bodies
unchanged; manifest hashes are byte-for-byte identical.

Tests: c10_provisioning suite 86/86 pass; full project 1370/1370 pass.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 05:24:06 +03:00
parent a9c8d60087
commit ca0430a44d
5 changed files with 183 additions and 102 deletions
@@ -34,6 +34,11 @@ from cryptography.hazmat.primitives.serialization import load_pem_private_key
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import EngineCacheEntry
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c10_provisioning._canonical_hash import (
TileHashRecord,
aggregate_tile_hash,
compute_manifest_hash,
)
from gps_denied_onboard.components.c10_provisioning.config import (
C10ManifestConfig,
SigningMode,
@@ -56,12 +61,10 @@ __all__ = [
"ManifestArtifact",
"ManifestBuildInput",
"ManifestBuilder",
"TileHashRecord",
"TilesByBboxQuery",
]
_BUILD_LOG_KIND_PREFIX = "c10.manifest"
_TAKEOFF_ORIGIN_DECIMALS = 9
_MANIFEST_FILENAME = "Manifest.json"
_SIGNATURE_FILENAME = "Manifest.json.sig"
_ED25519_PUBKEY_BYTES = 32
@@ -72,24 +75,6 @@ VALID_SECTOR_CLASSES: frozenset[str] = frozenset(
)
@dataclass(frozen=True)
class TileHashRecord:
"""Consumer-side DTO carrying the four sort keys + the per-tile digest.
AZ-323 only needs ``(zoom, lat, lon, source)`` for canonical
ordering and ``sha256_hex`` for the aggregate hash. The
composition-root adapter wraps C6's ``TileMetadata`` rows into
this shape so the AZ-270 lint stays green (no
``components.c6_tile_cache`` import from C10).
"""
zoom: int
lat: float
lon: float
source: str
sha256_hex: str
@runtime_checkable
class TilesByBboxQuery(Protocol):
"""Consumer-side structural cut over C6's ``TileMetadataStore``.
@@ -294,7 +279,7 @@ class ManifestBuilder:
zoom_levels=request.zoom_levels,
sector_class=request.sector_class,
)
tiles_coverage_sha256 = _aggregate_tile_hash(sorted_tiles)
tiles_coverage_sha256 = aggregate_tile_hash(sorted_tiles)
engine_artifacts = tuple(
{
@@ -304,7 +289,7 @@ class ManifestBuilder:
for entry in request.engine_entries
)
manifest_hash = _compute_manifest_hash(
manifest_hash = compute_manifest_hash(
engine_entries=request.engine_entries,
calibration_sha256=calibration_sha256,
descriptor_index_sha256=descriptor_index_sha256,
@@ -589,18 +574,6 @@ class ManifestBuilder:
) from exc
def _aggregate_tile_hash(records: tuple[TileHashRecord, ...]) -> str:
hasher = hashlib.sha256()
for r in records:
hasher.update(
(
f"z{r.zoom}|lat{r.lat:.9f}|lon{r.lon:.9f}|src{r.source}"
f":{r.sha256_hex}\n"
).encode("ascii")
)
return hasher.hexdigest()
def _canonical_json_with_trailing_newline(payload: dict[str, object]) -> bytes:
body = orjson.dumps(
payload,
@@ -611,58 +584,6 @@ def _canonical_json_with_trailing_newline(payload: dict[str, object]) -> bytes:
return body
def _compute_manifest_hash(
*,
engine_entries: tuple[EngineCacheEntry, ...],
calibration_sha256: str,
descriptor_index_sha256: str,
tiles_coverage_sha256: str,
sector_class: str,
bbox: BoundingBox,
zoom_levels: tuple[int, ...],
takeoff_origin: LatLonAlt | None,
flight_id: UUID | None,
) -> str:
# Engine identity is `(model_name, precision, sm, jetpack, trt, sha256)`
# so a stale-host fp16 build never collides with a fresh int8 build —
# this matches the AZ-281 filename schema fields modulo the precision
# axis (which fp16 vs int8 makes load-bearing).
model_ids = sorted(
(
str(entry.engine_path),
entry.sha256_hex,
)
for entry in engine_entries
)
origin_tuple: tuple[float, float, float] | None
if takeoff_origin is not None:
origin_tuple = (
round(takeoff_origin.lat_deg, _TAKEOFF_ORIGIN_DECIMALS),
round(takeoff_origin.lon_deg, _TAKEOFF_ORIGIN_DECIMALS),
round(takeoff_origin.alt_m, _TAKEOFF_ORIGIN_DECIMALS),
)
else:
origin_tuple = None
build_identity = {
"model_ids": [list(entry) for entry in model_ids],
"calibration_sha256": calibration_sha256,
"descriptor_index_sha256": descriptor_index_sha256,
"tiles_coverage_sha256": tiles_coverage_sha256,
"sector_class": sector_class,
"bbox": [
bbox.min_lat_deg,
bbox.min_lon_deg,
bbox.max_lat_deg,
bbox.max_lon_deg,
],
"zoom_levels": sorted(zoom_levels),
"takeoff_origin": list(origin_tuple) if origin_tuple is not None else None,
"flight_id": str(flight_id) if flight_id is not None else None,
}
canonical = orjson.dumps(build_identity, option=orjson.OPT_SORT_KEYS)
return hashlib.sha256(canonical).hexdigest()
def _ns_to_iso_utc(time_ns: int) -> str:
"""Format ns-since-epoch as RFC 3339 UTC with second precision.