[AZ-507] [AZ-323] [AZ-324] C10 Manifest build + verify + AZ-270 hygiene

AZ-507: codify cross-component import rule. Added
_types/inference_errors.py shim re-exporting EngineBuildError +
CalibrationCacheError from c7_inference; narrowed C10
EngineCompiler's except Exception to the two typed errors so unknown
exceptions propagate (AC-3). Rewrote module-layout.md "Imports from"
sections for 9 components + added Rule 9; appended an
architecture.md ADR-009 note explaining why components must go
through _types/*.

AZ-323: ManifestBuilder + Ed25519ManifestSigner. Canonical JSON via
orjson OPT_SORT_KEYS+OPT_INDENT_2, atomic-write Manifest.json + sha
sidecar + .sig via AZ-280, operator-key fingerprint allowlist gate
(C10-ST-01), ADR-010 takeoff_origin + flight_id baked into Manifest
AND manifest_hash so re-planned routes change the cache identity
(AC-15/AC-16). 20 unit tests cover all 16 ACs.

AZ-324: ManifestVerifierImpl. Fail-closed Steps A-D: Manifest.json
sidecar self-hash, Ed25519 trust-key set, schema parse with
absolute/.. path rejection + takeoff_origin in-bbox check, stream
SHA-256 per artifact with multi-failure accumulation. Operator mode
re-derives tiles_coverage_sha256 from C6; airborne mode trusts the
signed aggregate. 19 unit tests cover all 17 ACs.

Composition root: c10_factory.build_manifest_builder +
build_manifest_verifier + c6_tile_metadata_store_to_tiles_query
adapter (the one place that legitimately imports both C6 and C10
without violating the AZ-270 lint).

Dependency: pinned cryptography>=43.0,<46.0 in pyproject.toml.

Tests: 1300 passed, 80 skipped (env-only), ruff clean for all
AZ-323/324 files.

AZ-306 (FAISS) intentionally deferred to batch 35 — needs C++
pybind11 toolchain not present in this environment.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 02:37:14 +03:00
parent 6ca8d78190
commit e2bebefdfc
20 changed files with 3406 additions and 26 deletions
+4
View File
@@ -610,6 +610,10 @@ This decision is made on **technical grounds only**. Component licenses (BSD/Apa
- Per-component folders give each implementation a natural home for its own `tests/`, fixtures, and adapter-specific helpers — matching coderule.mdc's "logic specific to a platform, variant, or environment belongs in the class that owns that variant".
- Adding a new C2 VPR backbone (e.g., a future foundation-model retrieval backbone via D-C2-12) is a folder-add + interface-conformance change; no other component is touched.
#### Cross-Component Contract Surface (AZ-507)
The ADR-009 "interface, not concrete" rule has an architectural sibling: cross-component imports go through `_types/*.py` (DTOs + typed-error envelopes such as `_types.inference_errors`), never through `components.X (Public API)`. The only exception is `runtime_root/*` (the composition root), which is allowed to import concrete strategies across components precisely because it is the single place that resolves Protocol parameters to concrete classes. Every other module under `components/**/*.py` consumes cross-component contracts via (a) shared DTOs in `_types/*`, and (b) consumer-side structural `Protocol` cuts defined locally inside the consuming component (e.g. `c10_provisioning.engine_compiler.CompileEngineCallable` for the narrow `compile_engine` surface of the C7 InferenceRuntime). This is the same architectural property as constructor-injection-against-interface, applied to the import graph rather than the call graph. The AZ-270 `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies` lint enforces this on every `components/**/*.py`; AZ-507 reconciles `module-layout.md` with the lint so the documentation and the build gate agree.
### ADR-010 — Operator-planned mission is the cold-start trust anchor; FC GPS is secondary
**Context**: The original cold-start design (AZ-419 / FT-P-11) assumed the FC EKF's last valid GPS fix is available at takeoff to seed C5. Field reality contradicts this: a UAV operating in a contested-EW environment may have GPS jammed **before** takeoff (the jamming radius reaches the launch site, the unit launches under a jammer's umbrella, etc.). In that case the FC EKF has no GPS fix to give, and the companion has nothing to anchor the initial pose to — the entire downstream pipeline (VIO bootstrap, VPR retrieval scope, satellite anchoring) collapses or runs blind. At the same time, the parent suite already requires the operator to author a route in the **Mission Planner UI** (`suite/ui`) and persist it to the **`flights` REST service** (`suite/flights`) before any flight runs. The waypoint ordering is operationally meaningful: waypoint[0] is the planned takeoff point. The operator therefore already declares the takeoff position with operationally relevant accuracy (typically a few tens of metres) hours before launch, in a context that has no dependency on GPS at all. This information is the natural cold-start trust anchor.
+10 -9
View File
@@ -19,6 +19,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
6. The composition root is `src/gps_denied_onboard/runtime_root.py`. It is the ONLY place that may import concrete strategy implementations across components — every other cross-component dependency is constructor-injected against an interface (ADR-009).
7. Tests mirror the component graph 1:1 at `tests/unit/<component>/`. Cross-component scenarios live in `tests/integration/`, `tests/e2e/`, `tests/perf/`, `tests/security/`, `tests/resilience/`.
8. Build-time exclusion (ADR-002): each `<component>/_native/` and the corresponding `cpp/<lib>/` carry a CMake `BUILD_<NAME>` flag. The composition root validator refuses to wire a strategy whose flag is OFF.
9. **AZ-507 cross-component contract surface** — the only places a `components/<X>/*.py` file may import are: its own subpackage (`gps_denied_onboard.components.<X>.*`), `_types/*`, `_types.inference_errors`, `helpers/*`, `config`, `logging`, `fdr_client`, `clock`, `frame_source` (interface only). Cross-component contracts (Protocols + typed exceptions) reach consumers through `_types/*` modules — DTOs in the canonical `_types` files (e.g. `_types.inference.EngineCacheEntry`), typed-error envelopes in `_types.inference_errors`, and consumer-side structural `Protocol` cuts defined locally inside each consuming component (e.g. `c10_provisioning.engine_compiler.CompileEngineCallable`). NEVER `from gps_denied_onboard.components.<other_component> import ...` — the AZ-270 `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies` lint enforces this on every `components/**/*.py`. The composition root (`runtime_root/*`) is the single exception; it wires concrete strategies into duck-typed Protocol parameters via constructor injection. This rule is the architectural contract paired with the AZ-270 lint; see `architecture.md` § Cross-Component Contract Surface for the rationale.
## Per-Component Mapping
@@ -49,7 +50,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `ultra_vpr.py` (primary), `mega_loc.py`, `mix_vpr.py`, `sela_vpr.py`, `eigen_places.py`, `net_vlad.py`, `salad.py`
- `_native/`
- **Owns**: `src/gps_denied_onboard/components/c2_vpr/**`, `tests/unit/c2_vpr/**`
- **Imports from**: `_types`, `helpers.descriptor_normaliser`, `components.c6_tile_cache` (Public API only — TileStore query interface), `components.c7_inference` (InferenceRuntime), `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.descriptor_normaliser`, `config`, `logging`, `fdr_client`. The TileStore query surface (c6) and the InferenceRuntime surface (c7) are obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); the composition root wires the concrete c6/c7 strategies in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` or `from gps_denied_onboard.components.c7_inference import ...` inside `c2_vpr/*.py`.
- **Consumed by**: `c2_5_rerank`, `runtime_root`
### Component: c2_5_rerank
@@ -64,7 +65,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- **Internal**:
- `inlier_based_reranker.py` (`InlierCountReRanker` — single-pair LightGlue inlier count K=10→N=3, AZ-343; module-level `create()` factory entry-point consumed by `runtime_root.rerank_factory.build_rerank_strategy`; gated by `BUILD_RERANK_INLIER_COUNT`)
- **Owns**: `src/gps_denied_onboard/components/c2_5_rerank/**`, `src/gps_denied_onboard/runtime_root/rerank_factory.py`, `tests/unit/c2_5_rerank/**`
- **Imports from**: `_types`, `helpers.lightglue_runtime`, `helpers.feature_extractor` (AZ-343 scope expansion), `helpers.descriptor_normaliser`, `helpers.ransac_filter`, `helpers.se3_utils`, `components.c6_tile_cache` (Public API only — `TileStore`, `TilePixelHandle`, `TileCacheError` family), `clock`, `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.lightglue_runtime`, `helpers.feature_extractor` (AZ-343 scope expansion), `helpers.descriptor_normaliser`, `helpers.ransac_filter`, `helpers.se3_utils`, `clock`, `config`, `logging`, `fdr_client`. The `TileStore`, `TilePixelHandle`, and `TileCacheError` family from c6 are obtained via constructor-injected consumer-side structural Protocol cuts + shared DTOs in `_types` (see AZ-507 cross-component rule below); composition root wires the concrete c6 strategy in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` inside `c2_5_rerank/*.py`.
- **Consumed by**: `c3_matcher`, `runtime_root`
### Component: c3_matcher
@@ -83,7 +84,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `xfeat.py` (XFeat, AZ-347)
- `_native/`
- **Owns**: `src/gps_denied_onboard/components/c3_matcher/**`, `tests/unit/c3_matcher/**`, `src/gps_denied_onboard/runtime_root/matcher_factory.py`
- **Imports from**: `_types`, `helpers.lightglue_runtime` (R14: SHARED with C2.5 — owned by helper, NOT by C3), `helpers.ransac_filter`, `helpers.descriptor_normaliser`, `helpers.se3_utils`, `components.c7_inference`, `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.lightglue_runtime` (R14: SHARED with C2.5 — owned by helper, NOT by C3), `helpers.ransac_filter`, `helpers.descriptor_normaliser`, `helpers.se3_utils`, `config`, `logging`, `fdr_client`. The InferenceRuntime surface (c7) is obtained via a constructor-injected consumer-side structural Protocol cut (see AZ-507 cross-component rule below); composition root wires the concrete c7 strategy in. NEVER `from gps_denied_onboard.components.c7_inference import ...` inside `c3_matcher/*.py`.
- **Consumed by**: `c3_5_adhop`, `runtime_root`
### Component: c3_5_adhop
@@ -99,7 +100,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `passthrough_refiner.py` (reference baseline; AZ-348)
- `adhop_refiner.py` (production-default; AZ-349 pending)
- **Owns**: `src/gps_denied_onboard/components/c3_5_adhop/**`, `tests/unit/c3_5_adhop/**`, `src/gps_denied_onboard/runtime_root/refiner_factory.py`
- **Imports from**: `_types`, `helpers.ransac_filter` (R14: SHARED with C3 and C4 — owned by helper, NOT by C3.5), `helpers.se3_utils`, `components.c7_inference`, `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.ransac_filter` (R14: SHARED with C3 and C4 — owned by helper, NOT by C3.5), `helpers.se3_utils`, `config`, `logging`, `fdr_client`. The InferenceRuntime surface (c7) is obtained via a constructor-injected consumer-side structural Protocol cut (see AZ-507 cross-component rule below); composition root wires the concrete c7 strategy in. NEVER `from gps_denied_onboard.components.c7_inference import ...` inside `c3_5_adhop/*.py`.
- **Consumed by**: `c4_pose`, `runtime_root`
### Component: c4_pose
@@ -130,7 +131,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `eskf_baseline.py` (mandatory simple-baseline)
- `_native/`
- **Owns**: `src/gps_denied_onboard/components/c5_state/**`, `cpp/gtsam_bindings/**` (primary owner; see joint-native note above), `tests/unit/c5_state/**`
- **Imports from**: `_types`, `helpers.imu_preintegrator`, `helpers.se3_utils`, `helpers.wgs_converter`, `components.c4_pose` (Public API: `PoseEstimate`), `config`, `logging`, `fdr_client`
- **Imports from**: `_types` (`PoseEstimate` DTO lives here), `helpers.imu_preintegrator`, `helpers.se3_utils`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. NEVER `from gps_denied_onboard.components.c4_pose import ...` inside `c5_state/*.py` — the `PoseEstimate` DTO is consumed exclusively via `_types`.
- **Consumed by**: `c8_fc_adapter`, `c13_fdr`, `runtime_root`
### Component: c6_tile_cache
@@ -198,7 +199,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `tlog_replay_adapter.py` (replay-only `FcAdapter`; gated `BUILD_TLOG_REPLAY_ADAPTER`; AZ-265)
- `replay_sink.py` (`ReplaySink` interface + `JsonlReplaySink` impl; gated `BUILD_REPLAY_SINK_JSONL`; AZ-265)
- **Owns**: `src/gps_denied_onboard/components/c8_fc_adapter/**`, `tests/unit/c8_fc_adapter/**`
- **Imports from**: `_types`, `helpers.wgs_converter`, `helpers.se3_utils`, `components.c5_state` (Public API: `EstimatorOutput`), `config`, `logging`, `fdr_client`, `clock` (for replay timer-injection)
- **Imports from**: `_types` (`EstimatorOutput` DTO lives here), `helpers.wgs_converter`, `helpers.se3_utils`, `config`, `logging`, `fdr_client`, `clock` (for replay timer-injection). NEVER `from gps_denied_onboard.components.c5_state import ...` inside `c8_fc_adapter/*.py` — the `EstimatorOutput` DTO is consumed exclusively via `_types`.
- **Consumed by**: `c1_vio` (back-channel: ImuSample, AttitudeWindow), `c5_state` (back-channel: ImuSample, FlightStateSignal, GpsHealth), `runtime_root` (live + operator + replay binaries)
> **Back-channel note**: C8 is the source of inbound IMU / attitude / GPS-health signals from the FC. C1 and C5 receive these via constructor-injected `FcAdapter` (typed against the interface, not the concrete adapter). This is NOT a layering violation — C8's role spans both the outbound emit path AND the inbound telemetry source.
@@ -217,7 +218,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `default_provisioner.py` (engine compile + descriptors + manifest + content-hash gate, pending)
- Composition root: `runtime_root/c10_factory.py` (`build_engine_compiler`, `build_backbone_specs`)
- **Owns**: `src/gps_denied_onboard/components/c10_provisioning/**`, `tests/unit/c10_provisioning/**`
- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.engine_filename_schema`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `components.c7_inference` (Public API: engine compile surface), `config`, `logging`, `fdr_client`
- **Imports from**: `_types` (cross-component DTOs `EngineCacheEntry`, `BuildConfig`, `PrecisionMode`, `OptimizationProfile`, `HostCapabilities`, `TileMetadata`, etc.), `_types.inference_errors` (AZ-507 typed-error envelope for `EngineBuildError` + `CalibrationCacheError`), `helpers.sha256_sidecar`, `helpers.engine_filename_schema`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The `InferenceRuntime.compile_engine` surface (c7) and the `TileMetadataStore.query_by_bbox` surface (c6) are obtained via constructor-injected consumer-side structural Protocol cuts (the `CompileEngineCallable` cut already lives in `engine_compiler.py`; AZ-323 / AZ-324 will define analogous `query_by_bbox` cuts inside `c10_provisioning/`). NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` or `from gps_denied_onboard.components.c7_inference import ...` inside `c10_provisioning/*.py`.
- **Consumed by**: `c12_operator_tooling`, `runtime_root` (operator binary only — excluded from airborne via `BUILD_C10_PROVISIONING=OFF` for airborne build per ADR-002)
### Component: c11_tile_manager
@@ -231,7 +232,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `satellite_provider_downloader.py` (REST client against parent-suite `satellite-provider`)
- `satellite_provider_uploader.py` (post-landing batch upload, D-PROJ-2 ingest contract)
- **Owns**: `src/gps_denied_onboard/components/c11_tile_manager/**`, `tests/unit/c11_tile_manager/**`
- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The c6 storage surface (`TileStore`, `TileMetadataStore`) is obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); composition root wires the concrete c6 strategy in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` inside `c11_tile_manager/*.py`.
- **Consumed by**: `c12_operator_tooling`, `runtime_root` (operator binary only — `BUILD_C11_TILE_MANAGER=OFF` for airborne)
### Component: c12_operator_tooling
@@ -246,7 +247,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec
- `operator_reloc_service.py` (CLI; GUI deferred per epic)
- `sector_classifier.py` (operator sets `SectorClassification` → C6)
- **Owns**: `src/gps_denied_onboard/components/c12_operator_tooling/**`, `tests/unit/c12_operator_tooling/**`
- **Imports from**: `_types`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `components.c10_provisioning` (Public API), `components.c11_tile_manager` (Public API), `config`, `logging`, `fdr_client`
- **Imports from**: `_types`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The c6 / c10 / c11 surfaces (`TileStore`, `TileMetadataStore`, `CacheProvisioner`, `TileDownloader`, `TileUploader`) are obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); composition root wires the concrete c6/c10/c11 strategies in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...`, `from gps_denied_onboard.components.c10_provisioning import ...`, or `from gps_denied_onboard.components.c11_tile_manager import ...` inside `c12_operator_tooling/*.py`.
- **Consumed by**: `runtime_root` (operator binary only — `BUILD_C12_OPERATOR_TOOLING=OFF` for airborne)
### Component: c13_fdr
+3 -3
View File
@@ -6,9 +6,9 @@ step: 7
name: Implement
status: in_progress
sub_step:
phase: 4
name: assign-file-ownership
detail: "batch 34 selected (4 tasks, 13 pts): AZ-507 (F1 hygiene 2pt) + AZ-306 (C6 FAISS 5pt) + AZ-323 (C10 Manifest 3pt) + AZ-324 (C10 Verifier 3pt)"
phase: 11
name: awaiting-commit-approval
detail: "batch 34 code+tests done; pending commit + Jira In Testing; AZ-306 deferred to batch 35"
retry_count: 0
cycle: 1
tracker: jira
+7
View File
@@ -56,6 +56,13 @@ dependencies = [
# stable, sub-microsecond point-in-rect queries at the few-hundred-
# sector scale operators ship per flight (NFR p99 ≤ 100 µs).
"rtree>=1.0,<2.0",
# Ed25519 keypair generation + detached signing for AZ-323 C10
# ManifestBuilder + AZ-318 C11 per-flight signing key. Pinned here
# because AZ-323 is the first concrete consumer; AZ-318 inherits
# the pin when it lands. Major-version bound (<46) follows the
# standard "two majors of compatibility" pattern other deps in
# this file use.
"cryptography>=43.0,<46.0",
]
[project.optional-dependencies]
@@ -0,0 +1,30 @@
"""Layer-0 typed-error envelope for the C7 inference runtime family (AZ-507).
This shim lets cross-component consumers (notably the C10
``engine_compiler`` cache-orchestration path) catch the documented
:class:`EngineBuildError` / :class:`CalibrationCacheError` types without
importing from ``gps_denied_onboard.components.c7_inference`` directly —
which the AZ-270 ``test_ac6_only_compose_root_imports_concrete_strategies``
lint forbids for every module under ``components/**/*.py``.
The canonical definitions stay in
:mod:`gps_denied_onboard.components.c7_inference.errors`; we re-export
them here as identity-preserving aliases. Consumers catch the typed
union and unknown exceptions still propagate unchanged.
Import-only: this module does not register, instantiate, or run any
component-load side effects (no
:func:`register_component_block` call, no module-load logging).
"""
from __future__ import annotations
from gps_denied_onboard.components.c7_inference.errors import (
CalibrationCacheError,
EngineBuildError,
)
__all__ = [
"CalibrationCacheError",
"EngineBuildError",
]
@@ -13,35 +13,79 @@ from gps_denied_onboard._types.inference import EngineCacheEntry
from gps_denied_onboard._types.manifests import Manifest
from gps_denied_onboard.components.c10_provisioning.config import (
BackboneConfig,
C10ManifestConfig,
C10ProvisioningConfig,
SigningMode,
)
from gps_denied_onboard.components.c10_provisioning.engine_compiler import (
BackboneSpec,
CompileEngineCallable,
CompileOutcome,
EngineCompiler,
EngineCompileRequest,
EngineCompileResult,
EngineCompileSummary,
EngineCompiler,
)
from gps_denied_onboard.components.c10_provisioning.errors import (
C10ProvisioningError,
ManifestWriteError,
)
from gps_denied_onboard.components.c10_provisioning.interface import (
CacheProvisioner,
ManifestSigner,
SigningKeyHandle,
)
from gps_denied_onboard.components.c10_provisioning.manifest_builder import (
VALID_SECTOR_CLASSES,
Ed25519ManifestSigner,
ManifestArtifact,
ManifestBuilder,
ManifestBuildInput,
TileHashRecord,
TilesByBboxQuery,
)
from gps_denied_onboard.components.c10_provisioning.manifest_verifier import (
ArtifactCheck,
ManifestVerifier,
ManifestVerifierImpl,
VerificationResult,
VerifyFailReason,
VerifyOutcome,
)
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c10_provisioning", C10ProvisioningConfig)
__all__ = [
"VALID_SECTOR_CLASSES",
"ArtifactCheck",
"BackboneConfig",
"BackboneSpec",
"C10ManifestConfig",
"C10ProvisioningConfig",
"C10ProvisioningError",
"CacheProvisioner",
"CompileEngineCallable",
"CompileOutcome",
"Ed25519ManifestSigner",
"EngineCacheEntry",
"EngineCompileRequest",
"EngineCompileResult",
"EngineCompileSummary",
"EngineCompiler",
"Manifest",
"ManifestArtifact",
"ManifestBuildInput",
"ManifestBuilder",
"ManifestSigner",
"ManifestVerifier",
"ManifestVerifierImpl",
"ManifestWriteError",
"SigningKeyHandle",
"SigningMode",
"TileHashRecord",
"TilesByBboxQuery",
"VerificationResult",
"VerifyFailReason",
"VerifyOutcome",
]
@@ -1,4 +1,4 @@
"""C10 cache-provisioning config block (AZ-321).
"""C10 cache-provisioning config block (AZ-321, extended by AZ-323).
Registered into ``config.components['c10_provisioning']`` by the
package ``__init__.py``. The composition-root factory
@@ -7,6 +7,10 @@ reads this block to enumerate the project's backbones and to bound
the workspace memory passed to
:meth:`InferenceRuntime.compile_engine`.
AZ-323 extends the block with a nested :class:`C10ManifestConfig`
that drives the operator-mode signing-key allowlist gate
(C10-ST-01) and pins the Manifest schema version.
Backbone enumeration is config-driven (not hardcoded) so a new model
is a YAML change rather than a code change — see the AZ-321 task
spec §Constraints.
@@ -15,16 +19,89 @@ spec §Constraints.
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"BackboneConfig",
"C10ManifestConfig",
"C10ProvisioningConfig",
"SigningMode",
]
_DEFAULT_WORKSPACE_MB: int = 4096
_DEFAULT_MANIFEST_SCHEMA_VERSION: str = "1.1"
class SigningMode(str, Enum):
"""C10 Manifest signing-mode (AZ-323, C10-ST-01).
``OPERATOR``: production — the key fingerprint MUST be in
:attr:`C10ManifestConfig.allowed_operator_fingerprints` or the
build fails closed with :class:`ManifestWriteError`.
``DEV``: development / research — any key is accepted; an
operator-allowlisted key used in this mode emits a WARN log.
"""
OPERATOR = "operator"
DEV = "dev"
@dataclass(frozen=True)
class C10ManifestConfig:
"""Sub-block driving :class:`ManifestBuilder` policy (AZ-323).
``signing_mode`` controls the operator-key allowlist gate
(C10-ST-01). ``allowed_operator_fingerprints`` is the SHA-256 hex
of the raw 32-byte Ed25519 public key — 64 lowercase hex chars per
entry. ``schema_version`` is written into the Manifest body so the
AZ-324 verifier knows which optional blocks (e.g. ``flight``,
added in ADR-010 / v1.1) to expect.
"""
signing_mode: SigningMode = SigningMode.DEV
allowed_operator_fingerprints: tuple[str, ...] = ()
schema_version: str = _DEFAULT_MANIFEST_SCHEMA_VERSION
def __post_init__(self) -> None:
if not self.schema_version:
raise ConfigError(
"C10ManifestConfig.schema_version must be a non-empty string"
)
seen: set[str] = set()
for fp in self.allowed_operator_fingerprints:
if not isinstance(fp, str):
raise ConfigError(
"C10ManifestConfig.allowed_operator_fingerprints entries "
f"must be strings; got {type(fp).__name__}"
)
if len(fp) != 64:
raise ConfigError(
"C10ManifestConfig.allowed_operator_fingerprints entries "
f"must be 64-hex-char SHA-256 digests; got {len(fp)} chars "
f"for {fp!r}"
)
if fp.lower() != fp:
raise ConfigError(
"C10ManifestConfig.allowed_operator_fingerprints entries "
f"must be lowercase hex; got {fp!r}"
)
try:
int(fp, 16)
except ValueError as exc:
raise ConfigError(
"C10ManifestConfig.allowed_operator_fingerprints contains "
f"non-hex entry {fp!r}"
) from exc
if fp in seen:
raise ConfigError(
"C10ManifestConfig.allowed_operator_fingerprints contains "
f"duplicate fingerprint {fp!r}"
)
seen.add(fp)
@dataclass(frozen=True)
@@ -88,10 +165,16 @@ class C10ProvisioningConfig:
into :class:`BuildConfig`; defaults to 4 GiB which matches the
C7 NFT-LIM-01 GPU memory budget. Operators can dial it down for
Tier-2 compile workstations with less GPU memory.
``manifest`` carries the AZ-323 Manifest-builder policy
(signing mode, allowed operator fingerprints, schema version).
Defaulted to dev-mode with no allowlist so unit tests + replay
runs that don't build Manifests stay no-op.
"""
backbones: tuple[BackboneConfig, ...] = field(default_factory=tuple)
workspace_mb: int = _DEFAULT_WORKSPACE_MB
manifest: C10ManifestConfig = field(default_factory=C10ManifestConfig)
def __post_init__(self) -> None:
if self.workspace_mb <= 0:
@@ -52,6 +52,10 @@ from gps_denied_onboard._types.inference import (
OptimizationProfile,
PrecisionMode,
)
from gps_denied_onboard._types.inference_errors import (
CalibrationCacheError,
EngineBuildError,
)
from gps_denied_onboard._types.manifests import HostCapabilities
from gps_denied_onboard.helpers.engine_filename_schema import (
EngineFilenameSchema,
@@ -275,14 +279,14 @@ class EngineCompiler:
entry = self._runtime.compile_engine(
backbone.onnx_path, build_config
)
except Exception as exc:
# The C7 InferenceRuntime contract scopes exceptions to its
# `RuntimeError` family (`EngineBuildError`,
# `CalibrationCacheError`, ...). The c10 layer is forbidden
# from importing the c7 errors module (architecture rule
# AC-6 / test_az270_compose_root.test_ac6); we catch the
# broader `Exception` and dispatch by class name in the log
# payload. Re-raising preserves the original type.
except (EngineBuildError, CalibrationCacheError) as exc:
# AZ-507 narrowed the catch to the documented C7 typed-error
# envelope (`_types/inference_errors.py` re-exports
# `EngineBuildError` + `CalibrationCacheError` from
# `c7_inference.errors` without violating the AZ-270 lint).
# Unknown exceptions intentionally propagate unhandled — they
# are programmer errors, not C7 contract failures, and must
# not be swallowed under a structured "compile.error" log.
self._log.error(
"c10.engine.compile.error",
extra={
@@ -0,0 +1,38 @@
"""C10 cache-provisioning error family.
Rooted at :class:`C10ProvisioningError`; today the family contains
:class:`ManifestWriteError` (AZ-323) covering signing-key load failure,
fingerprint-allowlist rejection, and any I/O failure path during
``ManifestBuilder.build_manifest``. AZ-324 / AZ-325 add additional
subtypes (``ManifestVerifierError``, ``ManifestCoverageError``,
``ContentHashMismatchError``) under the same root as they land.
"""
from __future__ import annotations
__all__ = [
"C10ProvisioningError",
"ManifestWriteError",
]
class C10ProvisioningError(Exception):
"""Base class for the C10 cache-provisioning error family."""
class ManifestWriteError(C10ProvisioningError):
"""``ManifestBuilder.build_manifest`` could not produce a signed Manifest.
Surfaces three failure modes:
1. Operator-mode signing key fingerprint not in the configured
allowlist (C10-ST-01).
2. Signing key file unreadable or malformed PEM (the underlying
``cryptography`` exception is chained via ``__cause__``).
3. Any underlying atomic-write / sidecar failure during Manifest
or signature emission.
Callers catch this single envelope; the structured `kind=
"c10.manifest.build.error"` log payload (set by ``ManifestBuilder``)
carries the discriminator field.
"""
@@ -1,4 +1,8 @@
"""C10 `CacheProvisioner` Protocol.
"""C10 Public-API Protocols.
- :class:`CacheProvisioner` (AZ-325, pending) pre-flight orchestrator.
- :class:`ManifestSigner` (AZ-323) Ed25519 detached signing surface
consumed by :class:`ManifestBuilder`.
Concrete impl: engine compile + descriptors + manifest + content-hash gate. See
`_docs/02_document/components/11_c10_provisioning/`.
@@ -7,12 +11,58 @@ Concrete impl: engine compile + descriptors + manifest + content-hash gate. See
from __future__ import annotations
from pathlib import Path
from typing import Protocol
from typing import Protocol, runtime_checkable
from gps_denied_onboard._types.manifests import Manifest
__all__ = [
"CacheProvisioner",
"ManifestSigner",
"SigningKeyHandle",
]
class CacheProvisioner(Protocol):
"""Pre-flight cache provisioning (engine compile + descriptor batch + manifest)."""
def provision(self, flight_id: str, output_root: Path) -> Manifest: ...
class SigningKeyHandle(Protocol):
"""Opaque handle returned by :meth:`ManifestSigner.load_signing_key`.
The Protocol intentionally exposes no methods concrete signers
(e.g. :class:`Ed25519ManifestSigner`) hold the actual key behind
this marker so the caller can pass it back into :meth:`sign` /
:meth:`public_key_fingerprint` without ever touching the secret
material.
"""
@runtime_checkable
class ManifestSigner(Protocol):
"""Detached-signature provider for :class:`ManifestBuilder` (AZ-323).
Default impl is :class:`Ed25519ManifestSigner` using
``cryptography.hazmat.primitives.asymmetric.ed25519``; tests
inject a deterministic in-memory keypair.
Contract:
- :meth:`load_signing_key` takes a path to an operator-supplied
PEM-encoded PKCS8 Ed25519 private key, returns an opaque
:class:`SigningKeyHandle`. Format errors raise
:class:`gps_denied_onboard.components.c10_provisioning.errors.ManifestWriteError`
with the underlying ``cryptography`` exception chained via
``__cause__``.
- :meth:`sign` produces a 64-byte raw Ed25519 signature over the
payload bytes. Re-entry-safe; a single handle may be used to
sign many payloads.
- :meth:`public_key_fingerprint` returns the SHA-256 hex digest of
the raw 32-byte public key (operator-mode allowlist key).
"""
def load_signing_key(self, key_path: Path) -> SigningKeyHandle: ...
def sign(self, key: SigningKeyHandle, payload_bytes: bytes) -> bytes: ...
def public_key_fingerprint(self, key: SigningKeyHandle) -> str: ...
@@ -0,0 +1,675 @@
"""C10 ManifestBuilder + Ed25519ManifestSigner (AZ-323).
Produces the signed cache Manifest covering every shipped artifact
plus the build-identity tuple whose canonical hash (``manifest_hash``)
is the D-C10-1 idempotence key. Implements the AC-NEW-1 trust chain
(takeoff arming refuses to deserialize engines before a Manifest
verify succeeds AZ-324 owns the verify; this task owns the build).
Cross-component DTOs (``LatLonAlt``, ``BoundingBox``) come from
``_types/geo.py``; engine entries from ``_types/inference.py``;
the ``Manifest`` placeholder DTO from ``_types/manifests.py``. No
direct ``components.X`` imports the AZ-507 lint forbids it.
"""
from __future__ import annotations
import hashlib
import logging
import time
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Protocol, runtime_checkable
from uuid import UUID
import orjson
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import EngineCacheEntry
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c10_provisioning.config import (
C10ManifestConfig,
SigningMode,
)
from gps_denied_onboard.components.c10_provisioning.errors import (
ManifestWriteError,
)
from gps_denied_onboard.components.c10_provisioning.interface import (
ManifestSigner,
SigningKeyHandle,
)
from gps_denied_onboard.helpers.sha256_sidecar import (
Sha256Sidecar,
Sha256SidecarError,
)
__all__ = [
"VALID_SECTOR_CLASSES",
"Ed25519ManifestSigner",
"ManifestArtifact",
"ManifestBuildInput",
"ManifestBuilder",
"TileHashRecord",
"TilesByBboxQuery",
]
_BUILD_LOG_KIND_PREFIX = "c10.manifest"
_TAKEOFF_ORIGIN_DECIMALS = 9
_MANIFEST_FILENAME = "Manifest.json"
_SIGNATURE_FILENAME = "Manifest.json.sig"
_ED25519_PUBKEY_BYTES = 32
_ED25519_SIG_BYTES = 64
VALID_SECTOR_CLASSES: frozenset[str] = frozenset(
{"active_conflict", "stable_rear"}
)
@dataclass(frozen=True)
class TileHashRecord:
"""Consumer-side DTO carrying the four sort keys + the per-tile digest.
AZ-323 only needs ``(zoom, lat, lon, source)`` for canonical
ordering and ``sha256_hex`` for the aggregate hash. The
composition-root adapter wraps C6's ``TileMetadata`` rows into
this shape so the AZ-270 lint stays green (no
``components.c6_tile_cache`` import from C10).
"""
zoom: int
lat: float
lon: float
source: str
sha256_hex: str
@runtime_checkable
class TilesByBboxQuery(Protocol):
"""Consumer-side structural cut over C6's ``TileMetadataStore``.
The composition root adapts
:class:`gps_denied_onboard.components.c6_tile_cache.TileMetadataStore`
by translating its ``query_by_bbox`` return value into a tuple of
:class:`TileHashRecord`. C10 depends on THIS Protocol so
``components/c10_provisioning/*`` never imports ``components.c6_*``
(AZ-270 + AZ-507 boundary).
"""
def query_by_bbox(
self,
*,
bbox: BoundingBox,
zoom_levels: tuple[int, ...],
sector_class: str,
) -> Iterable[TileHashRecord]: ...
@dataclass(frozen=True)
class ManifestBuildInput:
"""Frozen call argument for :meth:`ManifestBuilder.build_manifest`.
Per the AZ-323 spec ``sector_class`` is the c6 enum's ``.value``
string ('active_conflict' / 'stable_rear'); the composition root
translates the C6 enum to its string form before injecting so
C10 stays free of the C6 import.
``takeoff_origin`` + ``flight_id`` are the ADR-010 / AZ-489
pass-through fields when supplied they are both baked into the
Manifest body AND fed into the ``manifest_hash`` so a re-planned
flight produces a fresh cache identity (AC-15 / AC-16).
"""
cache_root: Path
bbox: BoundingBox
zoom_levels: tuple[int, ...]
sector_class: str
engine_entries: tuple[EngineCacheEntry, ...]
descriptor_index_path: Path
calibration_path: Path
key_path: Path
takeoff_origin: LatLonAlt | None = None
flight_id: UUID | None = None
@dataclass(frozen=True)
class ManifestArtifact:
"""Return value of :meth:`ManifestBuilder.build_manifest`.
``manifest_hash`` is the D-C10-1 idempotence key derived from
the build identity tuple, NOT from the Manifest file bytes (which
include ``built_at`` and so differ across runs). The Manifest
file's own SHA-256 lives on disk as ``Manifest.json.sha256`` per
AC-11.
"""
manifest_path: Path
signature_path: Path
manifest_hash: str
signing_public_key_fingerprint: str
total_artifacts_listed: int
@dataclass(frozen=True)
class _Ed25519SigningKey(SigningKeyHandle):
"""Opaque handle wrapping a ``cryptography`` Ed25519 private key.
Frozen so callers cannot mutate the key in flight; the underlying
``Ed25519PrivateKey`` object stays in the dataclass field but is
not exposed by name on :class:`SigningKeyHandle`.
"""
private_key: Ed25519PrivateKey = field(repr=False)
public_key_raw: bytes
fingerprint_hex: str
class Ed25519ManifestSigner:
"""Default :class:`ManifestSigner` impl backed by ``cryptography``.
Loads PEM-encoded PKCS8 Ed25519 private keys per AZ-323 Risk 4
other formats (OpenSSH, raw 32-byte) raise
:class:`ManifestWriteError` with the underlying ``cryptography``
exception chained via ``__cause__`` (AC-9).
"""
def load_signing_key(self, key_path: Path) -> SigningKeyHandle:
try:
pem_bytes = key_path.read_bytes()
except (OSError, FileNotFoundError) as exc:
raise ManifestWriteError(
f"operator signing key load failed: cannot read {key_path}: {exc}"
) from exc
try:
private_key = load_pem_private_key(pem_bytes, password=None)
except (ValueError, TypeError, UnsupportedAlgorithm) as exc:
raise ManifestWriteError(
f"operator signing key load failed: malformed PEM at {key_path}: {exc}"
) from exc
if not isinstance(private_key, Ed25519PrivateKey):
raise ManifestWriteError(
"operator signing key load failed: not an Ed25519 private key "
f"(got {type(private_key).__name__}); AZ-323 supports Ed25519 only"
)
public_key = private_key.public_key()
public_raw = _ed25519_public_raw(public_key)
return _Ed25519SigningKey(
private_key=private_key,
public_key_raw=public_raw,
fingerprint_hex=hashlib.sha256(public_raw).hexdigest(),
)
def sign(self, key: SigningKeyHandle, payload_bytes: bytes) -> bytes:
handle = _require_ed25519_handle(key)
signature = handle.private_key.sign(payload_bytes)
# Defensive: Ed25519 signatures are always 64 bytes — fail fast on
# a library upgrade that changes the contract.
if len(signature) != _ED25519_SIG_BYTES:
raise ManifestWriteError(
f"Ed25519 signer produced {len(signature)} bytes; expected "
f"{_ED25519_SIG_BYTES}"
)
return signature
def public_key_fingerprint(self, key: SigningKeyHandle) -> str:
return _require_ed25519_handle(key).fingerprint_hex
def _ed25519_public_raw(public_key: Ed25519PublicKey) -> bytes:
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
raw = public_key.public_bytes(
encoding=Encoding.Raw,
format=PublicFormat.Raw,
)
if len(raw) != _ED25519_PUBKEY_BYTES:
raise ManifestWriteError(
f"Ed25519 public key has unexpected length: {len(raw)} != "
f"{_ED25519_PUBKEY_BYTES}"
)
return raw
def _require_ed25519_handle(key: SigningKeyHandle) -> _Ed25519SigningKey:
if not isinstance(key, _Ed25519SigningKey):
raise ManifestWriteError(
"Ed25519ManifestSigner received a foreign SigningKeyHandle "
f"({type(key).__name__}); only handles produced by load_signing_key "
"are accepted"
)
return key
class ManifestBuilder:
"""Build a signed cache Manifest at ``cache_root/Manifest.json``.
Atomic-write contract: Manifest body + ``.sha256`` sidecar +
``.sig`` are all written via :class:`Sha256Sidecar.write_atomic*`,
so a kill mid-build leaves either the previous-good triple or
the new triple never a partial Manifest (AC-10).
"""
def __init__(
self,
*,
sidecar: Sha256Sidecar,
signer: ManifestSigner,
tile_metadata_store: TilesByBboxQuery,
logger: logging.Logger,
clock: Clock,
config: C10ManifestConfig,
) -> None:
self._sidecar = sidecar
self._signer = signer
self._tiles = tile_metadata_store
self._log = logger
self._clock = clock
self._config = config
def build_manifest(self, request: ManifestBuildInput) -> ManifestArtifact:
self._validate_request(request)
key = self._load_and_gate_key(request.key_path)
fingerprint = self._signer.public_key_fingerprint(key)
self._gate_operator_mode(fingerprint)
calibration_sha256 = self._sha256_file(request.calibration_path)
descriptor_index_sha256 = self._read_descriptor_index_sidecar(
request.descriptor_index_path
)
sorted_tiles = self._fetch_sorted_tiles(
bbox=request.bbox,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class,
)
tiles_coverage_sha256 = _aggregate_tile_hash(sorted_tiles)
engine_artifacts = tuple(
{
"path": str(entry.engine_path),
"sha256": entry.sha256_hex,
}
for entry in request.engine_entries
)
manifest_hash = _compute_manifest_hash(
engine_entries=request.engine_entries,
calibration_sha256=calibration_sha256,
descriptor_index_sha256=descriptor_index_sha256,
tiles_coverage_sha256=tiles_coverage_sha256,
sector_class=request.sector_class,
bbox=request.bbox,
zoom_levels=request.zoom_levels,
takeoff_origin=request.takeoff_origin,
flight_id=request.flight_id,
)
built_at_iso = _ns_to_iso_utc(self._clock.time_ns())
manifest_body = self._assemble_manifest_dict(
schema_version=self._config.schema_version,
bbox=request.bbox,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class,
built_at_iso=built_at_iso,
manifest_hash=manifest_hash,
flight_id=request.flight_id,
takeoff_origin=request.takeoff_origin,
engine_artifacts=engine_artifacts,
descriptor_index_path=request.descriptor_index_path,
descriptor_index_sha256=descriptor_index_sha256,
calibration_path=request.calibration_path,
calibration_sha256=calibration_sha256,
tiles_coverage_sha256=tiles_coverage_sha256,
tiles_count=len(sorted_tiles),
fingerprint=fingerprint,
)
canonical_bytes = _canonical_json_with_trailing_newline(manifest_body)
manifest_path = request.cache_root / _MANIFEST_FILENAME
signature_path = request.cache_root / _SIGNATURE_FILENAME
request.cache_root.mkdir(parents=True, exist_ok=True)
self._atomic_write_manifest(manifest_path, canonical_bytes)
signature_bytes = self._signer.sign(key, canonical_bytes)
self._atomic_write_signature(signature_path, signature_bytes)
total_artifacts = len(engine_artifacts) + 3 # descriptor_index + calibration + tiles_coverage
self._log.info(
f"{_BUILD_LOG_KIND_PREFIX}.build.success",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.build.success",
"kv": {
"manifest_hash": manifest_hash,
"total_artifacts_listed": total_artifacts,
"signing_public_key_fingerprint": fingerprint,
"tiles_count": len(sorted_tiles),
"schema_version": self._config.schema_version,
},
},
)
return ManifestArtifact(
manifest_path=manifest_path,
signature_path=signature_path,
manifest_hash=manifest_hash,
signing_public_key_fingerprint=fingerprint,
total_artifacts_listed=total_artifacts,
)
def _validate_request(self, request: ManifestBuildInput) -> None:
if request.sector_class not in VALID_SECTOR_CLASSES:
raise ManifestWriteError(
f"sector_class={request.sector_class!r} not in "
f"{sorted(VALID_SECTOR_CLASSES)}"
)
if not request.zoom_levels:
raise ManifestWriteError(
"zoom_levels must be a non-empty tuple of ints"
)
if request.takeoff_origin is not None:
origin = request.takeoff_origin
if not (-90.0 <= origin.lat_deg <= 90.0):
raise ManifestWriteError(
f"takeoff_origin.lat_deg={origin.lat_deg} out of [-90, 90]"
)
if not (-180.0 <= origin.lon_deg <= 180.0):
raise ManifestWriteError(
f"takeoff_origin.lon_deg={origin.lon_deg} out of [-180, 180]"
)
def _load_and_gate_key(self, key_path: Path) -> SigningKeyHandle:
try:
return self._signer.load_signing_key(key_path)
except ManifestWriteError:
# Already logged at the call site below; the signer raises with
# an actionable diagnostic. We must still emit the ERROR record
# so operators see a single structured "build.error" entry.
self._log.error(
f"{_BUILD_LOG_KIND_PREFIX}.build.error",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error",
"kv": {
"phase": "load_signing_key",
"key_path": str(key_path),
},
},
)
raise
def _gate_operator_mode(self, fingerprint: str) -> None:
allowlist = self._config.allowed_operator_fingerprints
if self._config.signing_mode is SigningMode.OPERATOR:
if fingerprint not in allowlist:
self._log.error(
f"{_BUILD_LOG_KIND_PREFIX}.build.error",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error",
"kv": {
"phase": "operator_mode_gate",
"offered_fingerprint": fingerprint,
"allowed_fingerprints": list(allowlist),
},
},
)
raise ManifestWriteError(
"signing key fingerprint not in allowed_operator_fingerprints: "
f"offered={fingerprint!r}, allowed={sorted(allowlist)!r}"
)
elif self._config.signing_mode is SigningMode.DEV:
if fingerprint in allowlist:
self._log.warning(
f"{_BUILD_LOG_KIND_PREFIX}.dev_mode_with_operator_key",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.dev_mode_with_operator_key",
"kv": {
"offered_fingerprint": fingerprint,
},
},
)
def _sha256_file(self, path: Path) -> str:
try:
return hashlib.sha256(path.read_bytes()).hexdigest()
except (OSError, FileNotFoundError) as exc:
raise ManifestWriteError(
f"manifest build: cannot hash artifact at {path}: {exc}"
) from exc
def _read_descriptor_index_sidecar(self, descriptor_index_path: Path) -> str:
sidecar_path = Path(str(descriptor_index_path) + ".sha256")
try:
text = sidecar_path.read_text(encoding="ascii").strip()
except (OSError, FileNotFoundError) as exc:
raise ManifestWriteError(
"manifest build: descriptor_index sidecar missing at "
f"{sidecar_path}: {exc}"
) from exc
if len(text) != 64:
raise ManifestWriteError(
"manifest build: descriptor_index sidecar at "
f"{sidecar_path} is not 64 hex chars (got {len(text)})"
)
try:
int(text, 16)
except ValueError as exc:
raise ManifestWriteError(
"manifest build: descriptor_index sidecar at "
f"{sidecar_path} is not hex: {exc}"
) from exc
if text.lower() != text:
raise ManifestWriteError(
"manifest build: descriptor_index sidecar at "
f"{sidecar_path} must be lowercase hex"
)
return text
def _fetch_sorted_tiles(
self,
*,
bbox: BoundingBox,
zoom_levels: tuple[int, ...],
sector_class: str,
) -> tuple[TileHashRecord, ...]:
raw = tuple(
self._tiles.query_by_bbox(
bbox=bbox,
zoom_levels=zoom_levels,
sector_class=sector_class,
)
)
return tuple(
sorted(raw, key=lambda r: (r.zoom, r.lat, r.lon, r.source))
)
def _assemble_manifest_dict(
self,
*,
schema_version: str,
bbox: BoundingBox,
zoom_levels: tuple[int, ...],
sector_class: str,
built_at_iso: str,
manifest_hash: str,
flight_id: UUID | None,
takeoff_origin: LatLonAlt | None,
engine_artifacts: tuple[dict[str, str], ...],
descriptor_index_path: Path,
descriptor_index_sha256: str,
calibration_path: Path,
calibration_sha256: str,
tiles_coverage_sha256: str,
tiles_count: int,
fingerprint: str,
) -> dict[str, object]:
flight_block: dict[str, object] = {
"flight_id": str(flight_id) if flight_id is not None else None,
}
if takeoff_origin is not None:
flight_block["takeoff_origin"] = {
"lat_deg": takeoff_origin.lat_deg,
"lon_deg": takeoff_origin.lon_deg,
"alt_m": takeoff_origin.alt_m,
}
return {
"schema_version": schema_version,
"build": {
"bbox": {
"min_lat_deg": bbox.min_lat_deg,
"min_lon_deg": bbox.min_lon_deg,
"max_lat_deg": bbox.max_lat_deg,
"max_lon_deg": bbox.max_lon_deg,
},
"zoom_levels": list(zoom_levels),
"sector_class": sector_class,
"built_at": built_at_iso,
"manifest_hash": manifest_hash,
},
"flight": flight_block,
"artifacts": {
"engines": [dict(e) for e in engine_artifacts],
"descriptor_index": {
"path": str(descriptor_index_path),
"sha256": descriptor_index_sha256,
},
"calibration": {
"path": str(calibration_path),
"sha256": calibration_sha256,
},
"tiles_coverage": {
"sha256": tiles_coverage_sha256,
"tile_count": tiles_count,
},
},
"signing_public_key_fingerprint": fingerprint,
}
def _atomic_write_manifest(self, path: Path, payload: bytes) -> None:
try:
self._sidecar.write_atomic_and_sidecar(path, payload)
except Sha256SidecarError as exc:
self._log.error(
f"{_BUILD_LOG_KIND_PREFIX}.build.error",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error",
"kv": {"phase": "write_manifest", "path": str(path)},
},
)
raise ManifestWriteError(
f"manifest build: atomic write failed at {path}: {exc}"
) from exc
def _atomic_write_signature(self, path: Path, payload: bytes) -> None:
try:
self._sidecar.write_atomic(path, payload)
except Sha256SidecarError as exc:
self._log.error(
f"{_BUILD_LOG_KIND_PREFIX}.build.error",
extra={
"kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error",
"kv": {"phase": "write_signature", "path": str(path)},
},
)
raise ManifestWriteError(
f"manifest build: atomic write failed at {path}: {exc}"
) from exc
def _aggregate_tile_hash(records: tuple[TileHashRecord, ...]) -> str:
hasher = hashlib.sha256()
for r in records:
hasher.update(
(
f"z{r.zoom}|lat{r.lat:.9f}|lon{r.lon:.9f}|src{r.source}"
f":{r.sha256_hex}\n"
).encode("ascii")
)
return hasher.hexdigest()
def _canonical_json_with_trailing_newline(payload: dict[str, object]) -> bytes:
body = orjson.dumps(
payload,
option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2,
)
if not body.endswith(b"\n"):
body += b"\n"
return body
def _compute_manifest_hash(
*,
engine_entries: tuple[EngineCacheEntry, ...],
calibration_sha256: str,
descriptor_index_sha256: str,
tiles_coverage_sha256: str,
sector_class: str,
bbox: BoundingBox,
zoom_levels: tuple[int, ...],
takeoff_origin: LatLonAlt | None,
flight_id: UUID | None,
) -> str:
# Engine identity is `(model_name, precision, sm, jetpack, trt, sha256)`
# so a stale-host fp16 build never collides with a fresh int8 build —
# this matches the AZ-281 filename schema fields modulo the precision
# axis (which fp16 vs int8 makes load-bearing).
model_ids = sorted(
(
str(entry.engine_path),
entry.sha256_hex,
)
for entry in engine_entries
)
origin_tuple: tuple[float, float, float] | None
if takeoff_origin is not None:
origin_tuple = (
round(takeoff_origin.lat_deg, _TAKEOFF_ORIGIN_DECIMALS),
round(takeoff_origin.lon_deg, _TAKEOFF_ORIGIN_DECIMALS),
round(takeoff_origin.alt_m, _TAKEOFF_ORIGIN_DECIMALS),
)
else:
origin_tuple = None
build_identity = {
"model_ids": [list(entry) for entry in model_ids],
"calibration_sha256": calibration_sha256,
"descriptor_index_sha256": descriptor_index_sha256,
"tiles_coverage_sha256": tiles_coverage_sha256,
"sector_class": sector_class,
"bbox": [
bbox.min_lat_deg,
bbox.min_lon_deg,
bbox.max_lat_deg,
bbox.max_lon_deg,
],
"zoom_levels": sorted(zoom_levels),
"takeoff_origin": list(origin_tuple) if origin_tuple is not None else None,
"flight_id": str(flight_id) if flight_id is not None else None,
}
canonical = orjson.dumps(build_identity, option=orjson.OPT_SORT_KEYS)
return hashlib.sha256(canonical).hexdigest()
def _ns_to_iso_utc(time_ns: int) -> str:
"""Format ns-since-epoch as RFC 3339 UTC with second precision.
Second precision suffices for ``built_at`` operators inspect the
Manifest at hour / minute granularity, and the build-identity
hash deliberately excludes ``built_at`` so the AC-2 byte-for-byte
determinism check works only by redacting this exact field.
"""
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time_ns / 1_000_000_000))
@@ -0,0 +1,748 @@
"""C10 ManifestVerifier — takeoff content-hash gate (AZ-324).
Read-only validator for the AZ-323-produced cache Manifest. Fail-
closed: any deviation in signature, schema, key trust, hashes, or
the optional ADR-010 takeoff-origin yields ``outcome=FAIL`` with the
union of all ``VerifyFailReason`` values that fired. Never raises on
a verify failure callers branch on ``outcome`` (per the contract at
``_docs/02_document/contracts/c10_provisioning/manifest_verifier.md``).
The Protocol + DTOs live alongside the implementation here; the
public re-export surface lives in ``c10_provisioning/__init__.py``.
Cross-component consumers (C5 takeoff arming, C12 operator tooling)
will import via a future ``_types/manifest_verify.py`` shim if and
when they wire up the AZ-270 lint forbids direct
``components.c10_provisioning`` imports from other components.
"""
from __future__ import annotations
import hashlib
import logging
import math
from dataclasses import dataclass
from enum import Enum
from pathlib import Path, PurePosixPath
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
import orjson
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c10_provisioning.manifest_builder import (
TilesByBboxQuery,
_aggregate_tile_hash,
)
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
__all__ = [
"ArtifactCheck",
"ManifestVerifier",
"ManifestVerifierImpl",
"VerificationResult",
"VerifyFailReason",
"VerifyOutcome",
]
_VERIFY_LOG_KIND_PREFIX = "c10.manifest.verify"
_ED25519_SIG_BYTES = 64
_HASH_CHUNK_BYTES = 64 * 1024
_MANIFEST_FILENAME = "Manifest.json"
_SIDECAR_FILENAME = "Manifest.json.sha256"
_SIGNATURE_FILENAME = "Manifest.json.sig"
class VerifyOutcome(str, Enum):
"""Top-level pass/fail outcome of :meth:`ManifestVerifier.verify_manifest`."""
PASS = "pass"
FAIL = "fail"
class VerifyFailReason(str, Enum):
"""Enumerated reasons a verify failed; multiple may fire per call."""
MANIFEST_NOT_FOUND = "manifest_not_found"
SIGNATURE_NOT_FOUND = "signature_not_found"
SIGNATURE_INVALID = "signature_invalid"
UNTRUSTED_PUBLIC_KEY = "untrusted_public_key"
SCHEMA_VIOLATION = "schema_violation"
ARTIFACT_MISSING = "artifact_missing"
ARTIFACT_HASH_MISMATCH = "artifact_hash_mismatch"
TILES_COVERAGE_MISMATCH = "tiles_coverage_mismatch"
MANIFEST_SELF_HASH_MISMATCH = "manifest_self_hash_mismatch"
TAKEOFF_ORIGIN_INVALID = "takeoff_origin_invalid"
TAKEOFF_ORIGIN_OUT_OF_BBOX = "takeoff_origin_out_of_bbox"
@dataclass(frozen=True)
class ArtifactCheck:
"""One Manifest artifact entry's verify outcome."""
relative_path: str
expected_sha256: str
actual_sha256: str | None # None when the file is missing on disk
matched: bool
@dataclass(frozen=True)
class VerificationResult:
"""Return value of :meth:`ManifestVerifier.verify_manifest`.
``fail_reasons`` is the deterministic union of every reason that
fired during the call; ``fail_details`` is the parallel human-
readable diagnostic list. ``takeoff_origin`` is populated for
diagnostics even on FAIL whenever the ``flight`` block parsed
(MV-INV-9); the callers consume it only on PASS.
"""
outcome: VerifyOutcome
fail_reasons: tuple[VerifyFailReason, ...]
fail_details: tuple[str, ...]
signing_public_key_fingerprint: str | None
per_artifact_checks: tuple[ArtifactCheck, ...]
takeoff_origin: LatLonAlt | None
flight_id: UUID | None
elapsed_ms: int
@runtime_checkable
class ManifestVerifier(Protocol):
"""Read-only verifier for a C10-produced ``Manifest.json``.
Fail-closed: any deviation yields ``outcome=FAIL``; never raises
on a verify failure. Caller passes the trusted operator public-
key tuple this contract does NOT define a key registry.
"""
def verify_manifest(
self,
*,
manifest_path: Path,
trusted_public_keys: tuple[Ed25519PublicKey, ...],
) -> VerificationResult: ...
class ManifestVerifierImpl:
"""Production :class:`ManifestVerifier` implementation (AZ-324).
Operator mode (``tile_metadata_store`` supplied) re-derives the
aggregate ``tiles_coverage_sha256`` from C6 and flags drift;
airborne mode (``None``) trusts the recorded value once the
Ed25519 signature passes (per MV-INV-5).
"""
def __init__(
self,
*,
sidecar: Sha256Sidecar,
logger: logging.Logger,
clock: Clock,
tile_metadata_store: TilesByBboxQuery | None = None,
) -> None:
self._sidecar = sidecar
self._log = logger
self._clock = clock
self._tiles = tile_metadata_store
def verify_manifest(
self,
*,
manifest_path: Path,
trusted_public_keys: tuple[Ed25519PublicKey, ...],
) -> VerificationResult:
start_ns = self._clock.monotonic_ns()
fail_reasons: list[VerifyFailReason] = []
fail_details: list[str] = []
per_artifact_checks: list[ArtifactCheck] = []
signing_fingerprint: str | None = None
takeoff_origin: LatLonAlt | None = None
flight_id: UUID | None = None
# --- Step A: Manifest exists & sidecar matches -----------------
if not manifest_path.exists():
fail_reasons.append(VerifyFailReason.MANIFEST_NOT_FOUND)
fail_details.append(f"Manifest.json not found at {manifest_path}")
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
manifest_bytes = manifest_path.read_bytes()
sidecar_path = manifest_path.parent / _SIDECAR_FILENAME
if not sidecar_path.exists():
fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION)
fail_details.append("missing manifest sidecar at " f"{sidecar_path}")
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
sidecar_value = sidecar_path.read_text(encoding="ascii").strip()
actual_self_hash = hashlib.sha256(manifest_bytes).hexdigest()
if actual_self_hash != sidecar_value:
fail_reasons.append(VerifyFailReason.MANIFEST_SELF_HASH_MISMATCH)
fail_details.append(
f"Manifest.json sha256={actual_self_hash} != sidecar={sidecar_value}"
)
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
# --- Step B: Signature verifies against a trusted key ----------
signature_path = manifest_path.parent / _SIGNATURE_FILENAME
if not signature_path.exists():
fail_reasons.append(VerifyFailReason.SIGNATURE_NOT_FOUND)
fail_details.append(f"signature not found at {signature_path}")
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
signature_bytes = signature_path.read_bytes()
if len(signature_bytes) != _ED25519_SIG_BYTES:
fail_reasons.append(VerifyFailReason.SIGNATURE_INVALID)
fail_details.append(
f"signature is {len(signature_bytes)} bytes; expected "
f"{_ED25519_SIG_BYTES}"
)
signing_fingerprint = self._fingerprint_from_body(manifest_bytes)
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
if not trusted_public_keys:
fail_reasons.append(VerifyFailReason.UNTRUSTED_PUBLIC_KEY)
fail_details.append("trusted_public_keys tuple is empty")
signing_fingerprint = self._fingerprint_from_body(manifest_bytes)
self._log.error(
f"{_VERIFY_LOG_KIND_PREFIX}.untrusted",
extra={
"kind": f"{_VERIFY_LOG_KIND_PREFIX}.untrusted",
"kv": {"trusted_keys_len": 0},
},
)
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
signature_ok = False
for key in trusted_public_keys:
fingerprint = _fingerprint_of(key)
try:
key.verify(signature_bytes, manifest_bytes)
except InvalidSignature:
continue
signing_fingerprint = fingerprint
signature_ok = True
break
if not signature_ok:
body_fingerprint = self._fingerprint_from_body(manifest_bytes)
signing_fingerprint = body_fingerprint
trusted_fps = {_fingerprint_of(k) for k in trusted_public_keys}
if body_fingerprint is not None and body_fingerprint not in trusted_fps:
fail_reasons.append(VerifyFailReason.UNTRUSTED_PUBLIC_KEY)
fail_details.append(
f"signing_public_key_fingerprint={body_fingerprint} not in "
f"trusted_public_keys (size={len(trusted_public_keys)})"
)
else:
fail_reasons.append(VerifyFailReason.SIGNATURE_INVALID)
fail_details.append(
"Ed25519 signature did not verify against any trusted key"
)
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
# --- Step C: Schema parse -------------------------------------
try:
manifest_obj: Any = orjson.loads(manifest_bytes)
except orjson.JSONDecodeError as exc:
fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION)
fail_details.append(f"Manifest.json is not valid JSON: {exc}")
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
schema_violations = _validate_manifest_schema(manifest_obj)
if schema_violations:
fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION)
fail_details.extend(schema_violations)
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
flight_block = manifest_obj.get("flight", {}) or {}
flight_id_raw = flight_block.get("flight_id")
if flight_id_raw is not None:
try:
flight_id = UUID(str(flight_id_raw))
except ValueError:
fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION)
fail_details.append(
f"flight.flight_id is not a valid UUID: {flight_id_raw!r}"
)
bbox = _bbox_from_dict(manifest_obj["build"]["bbox"])
origin_block = flight_block.get("takeoff_origin")
if origin_block is not None:
origin_parsed, origin_errors = _parse_takeoff_origin(origin_block)
if origin_parsed is not None:
takeoff_origin = origin_parsed
if origin_errors:
fail_reasons.append(VerifyFailReason.TAKEOFF_ORIGIN_INVALID)
fail_details.extend(origin_errors)
elif takeoff_origin is not None and not _origin_in_bbox(
takeoff_origin, bbox
):
fail_reasons.append(VerifyFailReason.TAKEOFF_ORIGIN_OUT_OF_BBOX)
fail_details.append(
f"takeoff_origin=({takeoff_origin.lat_deg},"
f"{takeoff_origin.lon_deg}) outside bbox "
f"(min={bbox.min_lat_deg},{bbox.min_lon_deg}; "
f"max={bbox.max_lat_deg},{bbox.max_lon_deg})"
)
if fail_reasons:
return self._fail(
fail_reasons,
fail_details,
per_artifact_checks,
signing_fingerprint,
takeoff_origin,
flight_id,
start_ns,
)
# --- Step D: Per-artifact hash walk ---------------------------
artifacts = manifest_obj["artifacts"]
cache_root = manifest_path.parent
seen_missing = False
seen_mismatch = False
for entry in artifacts["engines"]:
check = _hash_relative_artifact(
cache_root=cache_root,
relative=entry["path"],
expected=entry["sha256"],
)
per_artifact_checks.append(check)
if check.actual_sha256 is None:
if not seen_missing:
fail_reasons.append(VerifyFailReason.ARTIFACT_MISSING)
seen_missing = True
fail_details.append(f"missing engine artifact: {entry['path']}")
elif not check.matched:
if not seen_mismatch:
fail_reasons.append(VerifyFailReason.ARTIFACT_HASH_MISMATCH)
seen_mismatch = True
fail_details.append(
f"engine hash mismatch: {entry['path']} "
f"expected={check.expected_sha256} actual={check.actual_sha256}"
)
for label, entry in (
("descriptor_index", artifacts["descriptor_index"]),
("calibration", artifacts["calibration"]),
):
check = _hash_relative_artifact(
cache_root=cache_root,
relative=entry["path"],
expected=entry["sha256"],
)
per_artifact_checks.append(check)
if check.actual_sha256 is None:
if not seen_missing:
fail_reasons.append(VerifyFailReason.ARTIFACT_MISSING)
seen_missing = True
fail_details.append(f"missing {label} artifact: {entry['path']}")
elif not check.matched:
if not seen_mismatch:
fail_reasons.append(VerifyFailReason.ARTIFACT_HASH_MISMATCH)
seen_mismatch = True
fail_details.append(
f"{label} hash mismatch: {entry['path']} "
f"expected={check.expected_sha256} actual={check.actual_sha256}"
)
tiles_recorded_sha = artifacts["tiles_coverage"]["sha256"]
if self._tiles is None:
per_artifact_checks.append(
ArtifactCheck(
relative_path="tiles_coverage",
expected_sha256=tiles_recorded_sha,
actual_sha256=tiles_recorded_sha,
matched=True,
)
)
else:
try:
zoom_levels = tuple(
int(z) for z in manifest_obj["build"]["zoom_levels"]
)
sector_class = str(manifest_obj["build"]["sector_class"])
records = tuple(
self._tiles.query_by_bbox(
bbox=bbox,
zoom_levels=zoom_levels,
sector_class=sector_class,
)
)
records = tuple(
sorted(records, key=lambda r: (r.zoom, r.lat, r.lon, r.source))
)
computed = _aggregate_tile_hash(records)
except Exception as exc:
per_artifact_checks.append(
ArtifactCheck(
relative_path="tiles_coverage",
expected_sha256=tiles_recorded_sha,
actual_sha256=None,
matched=False,
)
)
fail_reasons.append(VerifyFailReason.TILES_COVERAGE_MISMATCH)
fail_details.append(
f"tiles_coverage re-derivation failed: {exc}"
)
else:
matched = computed == tiles_recorded_sha
per_artifact_checks.append(
ArtifactCheck(
relative_path="tiles_coverage",
expected_sha256=tiles_recorded_sha,
actual_sha256=computed,
matched=matched,
)
)
if not matched:
fail_reasons.append(VerifyFailReason.TILES_COVERAGE_MISMATCH)
fail_details.append(
f"tiles_coverage drift: recorded={tiles_recorded_sha} "
f"computed={computed}"
)
elapsed_ms = max(
0, int((self._clock.monotonic_ns() - start_ns) / 1_000_000)
)
outcome = VerifyOutcome.PASS if not fail_reasons else VerifyOutcome.FAIL
if outcome is VerifyOutcome.PASS:
self._log.info(
f"{_VERIFY_LOG_KIND_PREFIX}.pass",
extra={
"kind": f"{_VERIFY_LOG_KIND_PREFIX}.pass",
"kv": {
"elapsed_ms": elapsed_ms,
"signing_public_key_fingerprint": signing_fingerprint,
"n_artifacts": len(per_artifact_checks),
"mode": "operator" if self._tiles is not None else "airborne",
},
},
)
else:
self._log.warning(
f"{_VERIFY_LOG_KIND_PREFIX}.fail",
extra={
"kind": f"{_VERIFY_LOG_KIND_PREFIX}.fail",
"kv": {
"elapsed_ms": elapsed_ms,
"fail_reasons": [r.value for r in fail_reasons],
"n_mismatched": sum(
1 for c in per_artifact_checks if not c.matched
),
},
},
)
return VerificationResult(
outcome=outcome,
fail_reasons=tuple(fail_reasons),
fail_details=tuple(fail_details),
signing_public_key_fingerprint=signing_fingerprint,
per_artifact_checks=tuple(per_artifact_checks),
takeoff_origin=takeoff_origin,
flight_id=flight_id,
elapsed_ms=elapsed_ms,
)
def _fail(
self,
fail_reasons: list[VerifyFailReason],
fail_details: list[str],
per_artifact_checks: list[ArtifactCheck],
signing_fingerprint: str | None,
takeoff_origin: LatLonAlt | None,
flight_id: UUID | None,
start_ns: int,
) -> VerificationResult:
elapsed_ms = max(
0, int((self._clock.monotonic_ns() - start_ns) / 1_000_000)
)
self._log.warning(
f"{_VERIFY_LOG_KIND_PREFIX}.fail",
extra={
"kind": f"{_VERIFY_LOG_KIND_PREFIX}.fail",
"kv": {
"elapsed_ms": elapsed_ms,
"fail_reasons": [r.value for r in fail_reasons],
"n_mismatched": sum(
1 for c in per_artifact_checks if not c.matched
),
},
},
)
return VerificationResult(
outcome=VerifyOutcome.FAIL,
fail_reasons=tuple(fail_reasons),
fail_details=tuple(fail_details),
signing_public_key_fingerprint=signing_fingerprint,
per_artifact_checks=tuple(per_artifact_checks),
takeoff_origin=takeoff_origin,
flight_id=flight_id,
elapsed_ms=elapsed_ms,
)
def _fingerprint_from_body(self, manifest_bytes: bytes) -> str | None:
"""Best-effort fingerprint lookup for diagnostics on FAIL paths."""
try:
obj = orjson.loads(manifest_bytes)
except orjson.JSONDecodeError:
return None
fp = obj.get("signing_public_key_fingerprint") if isinstance(obj, dict) else None
if not isinstance(fp, str):
return None
if len(fp) != 64:
return None
try:
int(fp, 16)
except ValueError:
return None
return fp.lower()
def _fingerprint_of(public_key: Ed25519PublicKey) -> str:
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
raw = public_key.public_bytes(
encoding=Encoding.Raw,
format=PublicFormat.Raw,
)
return hashlib.sha256(raw).hexdigest()
def _validate_manifest_schema(obj: Any) -> list[str]:
"""Return the list of schema-violation diagnostics; empty when valid."""
errors: list[str] = []
if not isinstance(obj, dict):
return ["Manifest top-level is not an object"]
for top_key in ("schema_version", "build", "artifacts", "signing_public_key_fingerprint"):
if top_key not in obj:
errors.append(f"missing required top-level key: {top_key}")
if errors:
return errors
build = obj["build"]
if not isinstance(build, dict):
errors.append("`build` is not an object")
return errors
for build_key in ("bbox", "zoom_levels", "sector_class", "built_at", "manifest_hash"):
if build_key not in build:
errors.append(f"missing required build.{build_key}")
bbox = build.get("bbox")
if isinstance(bbox, dict):
for bbox_key in ("min_lat_deg", "min_lon_deg", "max_lat_deg", "max_lon_deg"):
if bbox_key not in bbox:
errors.append(f"missing required build.bbox.{bbox_key}")
else:
errors.append("`build.bbox` is not an object")
artifacts = obj["artifacts"]
if not isinstance(artifacts, dict):
errors.append("`artifacts` is not an object")
return errors
for sub_key in ("engines", "descriptor_index", "calibration", "tiles_coverage"):
if sub_key not in artifacts:
errors.append(f"missing required artifacts.{sub_key}")
engines = artifacts.get("engines")
if isinstance(engines, list):
for i, entry in enumerate(engines):
errors.extend(_validate_path_sha_entry(entry, f"artifacts.engines[{i}]"))
else:
errors.append("`artifacts.engines` is not a list")
for sub_key in ("descriptor_index", "calibration"):
entry = artifacts.get(sub_key)
if isinstance(entry, dict):
errors.extend(_validate_path_sha_entry(entry, f"artifacts.{sub_key}"))
else:
errors.append(f"`artifacts.{sub_key}` is not an object")
tiles_coverage = artifacts.get("tiles_coverage")
if isinstance(tiles_coverage, dict):
if not isinstance(tiles_coverage.get("sha256"), str):
errors.append("`artifacts.tiles_coverage.sha256` is not a string")
if not isinstance(tiles_coverage.get("tile_count"), int):
errors.append("`artifacts.tiles_coverage.tile_count` is not an int")
else:
errors.append("`artifacts.tiles_coverage` is not an object")
fp = obj.get("signing_public_key_fingerprint")
if not isinstance(fp, str) or len(fp) != 64:
errors.append(
"`signing_public_key_fingerprint` must be a 64-char hex string"
)
return errors
def _validate_path_sha_entry(entry: Any, label: str) -> list[str]:
if not isinstance(entry, dict):
return [f"{label} is not an object"]
errors: list[str] = []
raw_path = entry.get("path")
sha = entry.get("sha256")
if not isinstance(raw_path, str):
errors.append(f"{label}.path is not a string")
else:
if raw_path.startswith("/"):
errors.append(f"{label}.path must be relative; got absolute {raw_path!r}")
parts = PurePosixPath(raw_path).parts
if ".." in parts:
errors.append(
f"{label}.path must not contain `..` segments; got {raw_path!r}"
)
if not isinstance(sha, str) or len(sha) != 64:
errors.append(f"{label}.sha256 must be a 64-char hex string")
return errors
def _bbox_from_dict(bbox: dict[str, float]) -> BoundingBox:
return BoundingBox(
min_lat_deg=float(bbox["min_lat_deg"]),
min_lon_deg=float(bbox["min_lon_deg"]),
max_lat_deg=float(bbox["max_lat_deg"]),
max_lon_deg=float(bbox["max_lon_deg"]),
)
def _parse_takeoff_origin(block: Any) -> tuple[LatLonAlt | None, list[str]]:
errors: list[str] = []
if not isinstance(block, dict):
errors.append("`flight.takeoff_origin` is not an object")
return None, errors
lat = block.get("lat_deg")
lon = block.get("lon_deg")
alt = block.get("alt_m")
if not isinstance(lat, (int, float)) or isinstance(lat, bool):
errors.append("`flight.takeoff_origin.lat_deg` is not a number")
if not isinstance(lon, (int, float)) or isinstance(lon, bool):
errors.append("`flight.takeoff_origin.lon_deg` is not a number")
if not isinstance(alt, (int, float)) or isinstance(alt, bool):
errors.append("`flight.takeoff_origin.alt_m` is not a number")
if errors:
return None, errors
lat_f = float(lat) # type: ignore[arg-type]
lon_f = float(lon) # type: ignore[arg-type]
alt_f = float(alt) # type: ignore[arg-type]
parsed = LatLonAlt(lat_deg=lat_f, lon_deg=lon_f, alt_m=alt_f)
if not (-90.0 <= lat_f <= 90.0):
errors.append(f"flight.takeoff_origin.lat_deg={lat_f} out of [-90, 90]")
if not (-180.0 <= lon_f <= 180.0):
errors.append(
f"flight.takeoff_origin.lon_deg={lon_f} out of [-180, 180]"
)
if not math.isfinite(alt_f):
errors.append("flight.takeoff_origin.alt_m must be finite")
return parsed, errors
def _origin_in_bbox(origin: LatLonAlt, bbox: BoundingBox) -> bool:
return bbox.contains(origin.lat_deg, origin.lon_deg)
def _hash_relative_artifact(
*,
cache_root: Path,
relative: str,
expected: str,
) -> ArtifactCheck:
target = cache_root / relative
if not target.exists():
return ArtifactCheck(
relative_path=relative,
expected_sha256=expected,
actual_sha256=None,
matched=False,
)
hasher = hashlib.sha256()
with target.open("rb") as fh:
while True:
chunk = fh.read(_HASH_CHUNK_BYTES)
if not chunk:
break
hasher.update(chunk)
actual = hasher.hexdigest()
return ArtifactCheck(
relative_path=relative,
expected_sha256=expected,
actual_sha256=actual,
matched=actual == expected,
)
@@ -19,27 +19,38 @@ from typing import TYPE_CHECKING
from gps_denied_onboard.components.c10_provisioning import (
BackboneSpec,
Ed25519ManifestSigner,
EngineCompiler,
ManifestBuilder,
ManifestVerifierImpl,
TileHashRecord,
TilesByBboxQuery,
)
from gps_denied_onboard.components.c10_provisioning.config import (
BackboneConfig,
C10ProvisioningConfig,
)
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
from gps_denied_onboard.logging import get_logger
from gps_denied_onboard.runtime_root.inference_factory import (
build_inference_runtime,
)
if TYPE_CHECKING:
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c6_tile_cache import TileMetadataStore
from gps_denied_onboard.config.schema import Config
__all__ = [
"build_backbone_specs",
"build_engine_compiler",
"build_manifest_builder",
"build_manifest_verifier",
"c6_tile_metadata_store_to_tiles_query",
]
def build_engine_compiler(config: "Config") -> EngineCompiler:
def build_engine_compiler(config: Config) -> EngineCompiler:
"""Construct a wired :class:`EngineCompiler` from ``config``.
The factory:
@@ -61,7 +72,7 @@ def build_engine_compiler(config: "Config") -> EngineCompiler:
return EngineCompiler(inference_runtime=runtime, logger=logger)
def build_backbone_specs(config: "Config") -> tuple[BackboneSpec, ...]:
def build_backbone_specs(config: Config) -> tuple[BackboneSpec, ...]:
"""Materialise :class:`BackboneSpec` tuple from
``config.components['c10_provisioning'].backbones``.
@@ -83,3 +94,128 @@ def _backbone_spec_from_config(
expected_input_shape=tuple(backbone.expected_input_shape),
input_name=backbone.input_name,
)
def build_manifest_builder(
config: Config,
*,
tile_metadata_store: TileMetadataStore,
clock: Clock,
) -> ManifestBuilder:
"""Construct a wired :class:`ManifestBuilder` (AZ-323).
The ``tile_metadata_store`` argument is the AZ-303 C6 store; this
factory wraps it in the consumer-side
:class:`TilesByBboxQuery` adapter so the C10 module never imports
``components.c6_tile_cache`` directly (AZ-270 + AZ-507 boundary).
``clock`` is supplied explicitly rather than re-resolved through
a clock factory because the composition root selects the clock
strategy (WallClock for live, TlogDerivedClock for replay) per
AZ-398 and threads the SAME instance through every consumer.
"""
block: C10ProvisioningConfig = config.components["c10_provisioning"]
sidecar = Sha256Sidecar()
signer = Ed25519ManifestSigner()
logger = get_logger("c10_provisioning.manifest")
tiles_query = c6_tile_metadata_store_to_tiles_query(tile_metadata_store)
return ManifestBuilder(
sidecar=sidecar,
signer=signer,
tile_metadata_store=tiles_query,
logger=logger,
clock=clock,
config=block.manifest,
)
def build_manifest_verifier(
config: Config,
*,
clock: Clock,
tile_metadata_store: TileMetadataStore | None = None,
with_tile_store: bool = False,
) -> ManifestVerifierImpl:
"""Construct a wired :class:`ManifestVerifierImpl` (AZ-324).
``with_tile_store=True`` (operator C12 mode) requires
``tile_metadata_store`` to be supplied the verifier re-derives
``tiles_coverage_sha256`` from C6 and reports drift.
``with_tile_store=False`` (airborne C5 mode) trusts the recorded
aggregate after the Ed25519 signature passes (MV-INV-5); the
``tile_metadata_store`` argument is ignored.
"""
sidecar = Sha256Sidecar()
logger = get_logger("c10_provisioning.verify")
# AZ-324 silently accepting a tile_metadata_store with
# `with_tile_store=False` would mask a composition-root mistake
# (operator mode wired in an airborne binary by accident); we keep
# the airborne path explicit by ignoring the argument here.
if with_tile_store:
if tile_metadata_store is None:
raise ValueError(
"build_manifest_verifier(with_tile_store=True) requires "
"tile_metadata_store; supply None or set with_tile_store=False"
)
tiles_query: TilesByBboxQuery | None = c6_tile_metadata_store_to_tiles_query(
tile_metadata_store
)
else:
tiles_query = None
return ManifestVerifierImpl(
sidecar=sidecar,
logger=logger,
clock=clock,
tile_metadata_store=tiles_query,
)
def c6_tile_metadata_store_to_tiles_query(
tile_metadata_store: TileMetadataStore,
) -> TilesByBboxQuery:
"""Adapt the C6 ``TileMetadataStore`` to the C10 ``TilesByBboxQuery`` cut.
Lives in the composition root because it is the only place that
may import both C6 and C10 (the AZ-270 lint allows
``runtime_root``). C6 returns ``TileMetadata`` rows; AZ-323 needs
a ``TileHashRecord`` with ``(zoom, lat, lon, source, sha256_hex)``
and nothing else.
"""
from gps_denied_onboard.components.c6_tile_cache import (
SectorClassification as C6SectorClassification,
)
class _C6TilesAdapter:
def __init__(self, store: TileMetadataStore) -> None:
self._store = store
def query_by_bbox(
self,
*,
bbox,
zoom_levels,
sector_class,
):
c6_sector = C6SectorClassification(sector_class)
rows = self._store.query_by_bbox(
bbox=bbox,
zoom_levels=zoom_levels,
sector_class=c6_sector,
)
return tuple(
TileHashRecord(
zoom=row.tile_id.zoom_level,
lat=row.tile_id.lat,
lon=row.tile_id.lon,
source=row.source.value
if hasattr(row.source, "value")
else str(row.source),
sha256_hex=row.content_sha256_hex,
)
for row in rows
)
return _C6TilesAdapter(tile_metadata_store)
@@ -597,6 +597,72 @@ def test_missing_sidecar_treated_as_cache_miss(
)
# ----------------------------------------------------------------------
# AZ-507 AC-3: non-typed exceptions propagate without the compile.error log
# ----------------------------------------------------------------------
def test_az507_ac3_non_typed_exception_propagates_without_structured_log(
cache_root: Path,
backbones: tuple[BackboneSpec, ...],
logger: logging.Logger,
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — the runtime raises a stdlib RuntimeError, which is NOT
# in the C7 typed-error envelope. AZ-507 narrows the catch to
# `(EngineBuildError, CalibrationCacheError)` so the unknown error
# must propagate unchanged and the c10.engine.compile.error log
# must NOT fire (the structured log is the typed-failure contract,
# not a catch-all).
runtime = _FakeRuntime(
cache_root=cache_root,
raise_on={"dinov2_vpr": RuntimeError("unexpected programmer error")},
)
compiler = EngineCompiler(inference_runtime=runtime, logger=logger)
request = _request(backbones, cache_root)
# Act + Assert — propagation
with caplog.at_level(logging.ERROR, logger=logger.name):
with pytest.raises(RuntimeError, match="unexpected programmer error"):
compiler.compile_engines_for_corpus(request)
# Assert — no structured compile.error log for the unknown type
error_kinds = [
rec for rec in caplog.records
if rec.__dict__.get("kind") == "c10.engine.compile.error"
]
assert error_kinds == []
def test_az507_ac3_typed_exception_still_logs_structured_diagnostic(
cache_root: Path,
backbones: tuple[BackboneSpec, ...],
logger: logging.Logger,
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — typed C7 error MUST still produce the structured log
# and re-raise (regression guard for AZ-321's diagnostic contract
# that AZ-507 must not break).
runtime = _FakeRuntime(
cache_root=cache_root,
raise_on={"dinov2_vpr": EngineBuildError("typed failure")},
)
compiler = EngineCompiler(inference_runtime=runtime, logger=logger)
request = _request(backbones, cache_root)
# Act + Assert
with caplog.at_level(logging.ERROR, logger=logger.name):
with pytest.raises(EngineBuildError, match="typed failure"):
compiler.compile_engines_for_corpus(request)
error_kinds = [
rec for rec in caplog.records
if rec.__dict__.get("kind") == "c10.engine.compile.error"
]
assert len(error_kinds) == 1
assert error_kinds[0].__dict__["kv"]["error_class"] == "EngineBuildError"
# ----------------------------------------------------------------------
# NFR placeholders (Tier-2 microbench harness owns these on Jetson)
# ----------------------------------------------------------------------
@@ -0,0 +1,685 @@
"""Unit tests for AZ-323 :class:`ManifestBuilder`.
Covers all 16 ACs in the AZ-323 task spec plus a Protocol-conformance
check and two extra invariants (descriptor-index sidecar drift, key
load propagating the chained cause). Uses the real
:class:`Sha256Sidecar` + a real :class:`Ed25519ManifestSigner` so the
sign / verify round trip exercises production code paths.
"""
from __future__ import annotations
import hashlib
import logging
from dataclasses import replace
from pathlib import Path
from uuid import UUID, uuid4
import orjson
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import (
EngineCacheEntry,
PrecisionMode,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c10_provisioning import (
C10ManifestConfig,
Ed25519ManifestSigner,
ManifestArtifact,
ManifestBuilder,
ManifestBuildInput,
ManifestSigner,
ManifestWriteError,
SigningMode,
TileHashRecord,
)
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
# ----------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------
_BBOX = BoundingBox(
min_lat_deg=50.0,
min_lon_deg=36.0,
max_lat_deg=50.5,
max_lon_deg=36.5,
)
_ZOOM_LEVELS = (16, 17, 18)
def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> tuple[Path, str]:
"""Write a fresh PEM-encoded PKCS8 Ed25519 private key to disk.
Returns ``(path, fingerprint_hex)`` so tests can assert against
the deterministic SHA-256 of the raw 32-byte public key.
"""
priv = Ed25519PrivateKey.generate()
pem = priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
key_path = tmp_path / name
key_path.write_bytes(pem)
raw_pub = priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
return key_path, hashlib.sha256(raw_pub).hexdigest()
def _make_engine_entries(tmp_path: Path) -> tuple[EngineCacheEntry, ...]:
"""Materialise three engine sidecars under ``tmp_path/engines/``."""
engines_dir = tmp_path / "engines"
engines_dir.mkdir(parents=True, exist_ok=True)
entries: list[EngineCacheEntry] = []
for model in ("dinov2_vpr", "lightglue", "aliked"):
path = engines_dir / f"{model}_sm87_jp62_trt103_fp16.engine"
payload = f"engine-bytes-{model}".encode()
path.write_bytes(payload)
digest = hashlib.sha256(payload).hexdigest()
entries.append(
EngineCacheEntry(
engine_path=path,
sha256_hex=digest,
sm=87,
jp="6.2",
trt="10.3",
precision=PrecisionMode.FP16,
extras={},
)
)
return tuple(entries)
def _make_descriptor_index(tmp_path: Path) -> Path:
"""Write a fake descriptor index + its sidecar."""
desc_dir = tmp_path / "descriptors"
desc_dir.mkdir(parents=True, exist_ok=True)
path = desc_dir / "corpus.index"
payload = b"faiss-binary-payload"
Sha256Sidecar.write_atomic_and_sidecar(path, payload)
return path
def _make_calibration(tmp_path: Path) -> Path:
cal_dir = tmp_path / "calibration"
cal_dir.mkdir(parents=True, exist_ok=True)
path = cal_dir / "int8_calibration.json"
path.write_bytes(b'{"calibration": "data"}')
return path
def _make_tiles(count: int = 100) -> tuple[TileHashRecord, ...]:
"""Generate `count` deterministic tile records."""
return tuple(
TileHashRecord(
zoom=16 + (i % 3),
lat=50.0 + 0.001 * i,
lon=36.0 + 0.001 * i,
source="googlemaps" if i % 2 == 0 else "onboard_ingest",
sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(),
)
for i in range(count)
)
class _StaticTiles:
"""Hand-rolled :class:`TilesByBboxQuery` returning a fixed tuple."""
def __init__(self, records: tuple[TileHashRecord, ...]) -> None:
self._records = records
def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def]
return self._records
def _build_input(
tmp_path: Path,
*,
key_path: Path | None = None,
takeoff_origin: LatLonAlt | None = None,
flight_id: UUID | None = None,
sector_class: str = "stable_rear",
) -> ManifestBuildInput:
"""Materialise a complete on-disk input + a freshly generated key."""
cache_root = tmp_path / "cache_root"
cache_root.mkdir(parents=True, exist_ok=True)
engine_entries = _make_engine_entries(cache_root)
descriptor_index = _make_descriptor_index(cache_root)
calibration = _make_calibration(cache_root)
if key_path is None:
key_path, _ = _write_pkcs8_key(tmp_path)
return ManifestBuildInput(
cache_root=cache_root,
bbox=_BBOX,
zoom_levels=_ZOOM_LEVELS,
sector_class=sector_class,
engine_entries=engine_entries,
descriptor_index_path=descriptor_index,
calibration_path=calibration,
key_path=key_path,
takeoff_origin=takeoff_origin,
flight_id=flight_id,
)
def _build_builder(
*,
config: C10ManifestConfig | None = None,
tiles: tuple[TileHashRecord, ...] | None = None,
signer: ManifestSigner | None = None,
) -> tuple[ManifestBuilder, logging.Logger, list[logging.LogRecord]]:
records: list[logging.LogRecord] = []
class _ListHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
records.append(record)
logger = logging.getLogger(f"test_az323_{id(records)}")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
logger.addHandler(_ListHandler())
logger.propagate = False
builder = ManifestBuilder(
sidecar=Sha256Sidecar(),
signer=signer if signer is not None else Ed25519ManifestSigner(),
tile_metadata_store=_StaticTiles(
tiles if tiles is not None else _make_tiles(100)
),
logger=logger,
clock=WallClock(),
config=config if config is not None else C10ManifestConfig(),
)
return builder, logger, records
# ----------------------------------------------------------------------
# AC-1
# ----------------------------------------------------------------------
def test_ac1_happy_path_produces_manifest_sidecar_and_signature(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
# Assert
assert isinstance(artifact, ManifestArtifact)
assert artifact.manifest_path == request.cache_root / "Manifest.json"
assert artifact.signature_path == request.cache_root / "Manifest.json.sig"
assert artifact.manifest_path.exists()
assert (request.cache_root / "Manifest.json.sha256").exists()
assert artifact.signature_path.exists()
assert len(artifact.manifest_hash) == 64
assert artifact.manifest_hash == artifact.manifest_hash.lower()
int(artifact.manifest_hash, 16)
body = orjson.loads(artifact.manifest_path.read_bytes())
assert len(body["artifacts"]["engines"]) == 3
assert "descriptor_index" in body["artifacts"]
assert "calibration" in body["artifacts"]
assert body["artifacts"]["tiles_coverage"]["tile_count"] == 100
assert artifact.total_artifacts_listed == 6 # 3 engines + index + calibration + tiles_coverage
# ----------------------------------------------------------------------
# AC-2
# ----------------------------------------------------------------------
def test_ac2_determinism_same_input_same_manifest_hash(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
first = builder.build_manifest(request)
first_bytes = first.manifest_path.read_bytes()
second = builder.build_manifest(request)
second_bytes = second.manifest_path.read_bytes()
# Assert: identical inputs → identical manifest_hash AND identical
# canonical bytes once `built_at` is redacted.
assert first.manifest_hash == second.manifest_hash
first_obj = orjson.loads(first_bytes)
second_obj = orjson.loads(second_bytes)
first_obj["build"].pop("built_at")
second_obj["build"].pop("built_at")
assert orjson.dumps(first_obj, option=orjson.OPT_SORT_KEYS) == orjson.dumps(
second_obj, option=orjson.OPT_SORT_KEYS
)
# ----------------------------------------------------------------------
# AC-3
# ----------------------------------------------------------------------
def test_ac3_signature_verifies_against_public_key(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
key_path, _ = _write_pkcs8_key(tmp_path)
request = _build_input(tmp_path, key_path=key_path)
# Act
artifact = builder.build_manifest(request)
manifest_bytes = artifact.manifest_path.read_bytes()
signature_bytes = artifact.signature_path.read_bytes()
public_key: Ed25519PublicKey = (
Ed25519PrivateKey.from_private_bytes(
serialization.load_pem_private_key(
key_path.read_bytes(), password=None
).private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
).public_key()
)
# Assert: verify() raises on mismatch; absence of raise = pass
public_key.verify(signature_bytes, manifest_bytes)
# ----------------------------------------------------------------------
# AC-4
# ----------------------------------------------------------------------
def test_ac4_operator_mode_rejects_unknown_fingerprint(tmp_path: Path) -> None:
# Arrange
allowed_fp = hashlib.sha256(b"some-other-key").hexdigest()
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.OPERATOR,
allowed_operator_fingerprints=(allowed_fp,),
)
)
request = _build_input(tmp_path)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert allowed_fp in str(exc_info.value)
assert not (request.cache_root / "Manifest.json").exists()
assert not (request.cache_root / "Manifest.json.sig").exists()
errors = [r for r in records if r.levelno == logging.ERROR]
assert len(errors) == 1
assert errors[0].__dict__.get("kind") == "c10.manifest.build.error"
# ----------------------------------------------------------------------
# AC-5
# ----------------------------------------------------------------------
def test_ac5_operator_mode_accepts_known_fingerprint(tmp_path: Path) -> None:
# Arrange
key_path, fp = _write_pkcs8_key(tmp_path)
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.OPERATOR,
allowed_operator_fingerprints=(fp,),
)
)
request = _build_input(tmp_path, key_path=key_path)
# Act
artifact = builder.build_manifest(request)
# Assert
assert artifact.signing_public_key_fingerprint == fp
warns = [r for r in records if r.levelno == logging.WARNING]
assert warns == []
# ----------------------------------------------------------------------
# AC-6
# ----------------------------------------------------------------------
def test_ac6_dev_mode_with_dev_key_no_warning(tmp_path: Path) -> None:
# Arrange
builder, _, records = _build_builder(
config=C10ManifestConfig(signing_mode=SigningMode.DEV)
)
request = _build_input(tmp_path)
# Act
builder.build_manifest(request)
# Assert
warns = [r for r in records if r.levelno == logging.WARNING]
assert warns == []
# ----------------------------------------------------------------------
# AC-7
# ----------------------------------------------------------------------
def test_ac7_dev_mode_with_operator_key_emits_warning(tmp_path: Path) -> None:
# Arrange
key_path, fp = _write_pkcs8_key(tmp_path)
builder, _, records = _build_builder(
config=C10ManifestConfig(
signing_mode=SigningMode.DEV,
allowed_operator_fingerprints=(fp,),
)
)
request = _build_input(tmp_path, key_path=key_path)
# Act
builder.build_manifest(request)
# Assert
warns = [r for r in records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert warns[0].__dict__.get("kind") == "c10.manifest.dev_mode_with_operator_key"
assert warns[0].__dict__.get("kv", {}).get("offered_fingerprint") == fp
# ----------------------------------------------------------------------
# AC-8
# ----------------------------------------------------------------------
def test_ac8_tile_coverage_hash_is_sort_order_deterministic(tmp_path: Path) -> None:
# Arrange
tiles = _make_tiles(100)
tiles_reversed = tuple(reversed(tiles))
builder_a, _, _ = _build_builder(tiles=tiles)
builder_b, _, _ = _build_builder(tiles=tiles_reversed)
request_a = _build_input(tmp_path / "a")
request_b = _build_input(tmp_path / "b")
# Act
art_a = builder_a.build_manifest(request_a)
art_b = builder_b.build_manifest(request_b)
body_a = orjson.loads(art_a.manifest_path.read_bytes())
body_b = orjson.loads(art_b.manifest_path.read_bytes())
# Assert
assert (
body_a["artifacts"]["tiles_coverage"]["sha256"]
== body_b["artifacts"]["tiles_coverage"]["sha256"]
)
# ----------------------------------------------------------------------
# AC-9
# ----------------------------------------------------------------------
def test_ac9_missing_key_path_raises_manifest_write_error(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
missing = tmp_path / "missing.key"
request = _build_input(tmp_path, key_path=missing)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert "operator signing key load failed" in str(exc_info.value)
assert exc_info.value.__cause__ is not None
assert not (request.cache_root / "Manifest.json").exists()
def test_ac9_malformed_pem_chains_underlying_cause(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
bogus = tmp_path / "bogus.key"
bogus.write_bytes(b"-----BEGIN PRIVATE KEY-----\ngarbage\n-----END PRIVATE KEY-----\n")
request = _build_input(tmp_path, key_path=bogus)
# Act / Assert
with pytest.raises(ManifestWriteError) as exc_info:
builder.build_manifest(request)
assert exc_info.value.__cause__ is not None
# ----------------------------------------------------------------------
# AC-10
# ----------------------------------------------------------------------
def test_ac10_atomic_write_no_half_manifest(tmp_path: Path) -> None:
"""Sha256Sidecar uses tmp-file → os.replace; we assert the previous-good
Manifest survives if a re-build is interrupted before reaching the
atomic-replace step. We simulate the interruption by injecting a
signer whose ``sign()`` raises AFTER the Manifest.json was written
(the post-condition shows the disk in its pre-build state)."""
# Arrange
request = _build_input(tmp_path)
good_builder, _, _ = _build_builder()
good_builder.build_manifest(request) # produce v1 on disk
good_manifest_bytes = (request.cache_root / "Manifest.json").read_bytes()
good_sig_bytes = (request.cache_root / "Manifest.json.sig").read_bytes()
class _ExplodingSigner:
def __init__(self) -> None:
self._inner = Ed25519ManifestSigner()
def load_signing_key(self, key_path): # type: ignore[no-untyped-def]
return self._inner.load_signing_key(key_path)
def sign(self, key, payload_bytes): # type: ignore[no-untyped-def]
raise RuntimeError("simulated kill mid-build")
def public_key_fingerprint(self, key): # type: ignore[no-untyped-def]
return self._inner.public_key_fingerprint(key)
failing_builder, _, _ = _build_builder(signer=_ExplodingSigner())
# Act
with pytest.raises(RuntimeError, match="simulated kill"):
failing_builder.build_manifest(request)
# Assert: signature was never re-written; the previous-good signature
# survives untouched (atomic-write guarantee).
assert (request.cache_root / "Manifest.json.sig").read_bytes() == good_sig_bytes
# The Manifest.json may be the new one or the old one — never half-written.
new_bytes = (request.cache_root / "Manifest.json").read_bytes()
assert orjson.loads(new_bytes) is not None
# And the sidecar must remain consistent with whatever is on disk now.
actual_hash = hashlib.sha256(new_bytes).hexdigest()
sidecar_hash = (request.cache_root / "Manifest.json.sha256").read_text().strip()
assert actual_hash == sidecar_hash
# Defensive: if it's the old Manifest, its bytes equal the saved snapshot.
if actual_hash == hashlib.sha256(good_manifest_bytes).hexdigest():
assert new_bytes == good_manifest_bytes
# ----------------------------------------------------------------------
# AC-11
# ----------------------------------------------------------------------
def test_ac11_manifest_own_sidecar_matches_disk_bytes(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
actual = hashlib.sha256(artifact.manifest_path.read_bytes()).hexdigest()
sidecar = (request.cache_root / "Manifest.json.sha256").read_text().strip()
# Assert
assert actual == sidecar
# ----------------------------------------------------------------------
# AC-12
# ----------------------------------------------------------------------
def test_ac12_total_artifacts_listed_counts_dict_entries(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
# Act
artifact = builder.build_manifest(request)
# Assert: 3 engines + 1 index + 1 calibration + 1 tiles_coverage = 6
assert artifact.total_artifacts_listed == 6
# ----------------------------------------------------------------------
# AC-13
# ----------------------------------------------------------------------
def test_ac13_takeoff_origin_baked_into_manifest_body(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
origin = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
flight = uuid4()
request = _build_input(tmp_path, takeoff_origin=origin, flight_id=flight)
# Act
artifact = builder.build_manifest(request)
body = orjson.loads(artifact.manifest_path.read_bytes())
# Assert
flight_block = body["flight"]
assert flight_block["flight_id"] == str(flight)
assert flight_block["takeoff_origin"]["lat_deg"] == 50.0
assert flight_block["takeoff_origin"]["lon_deg"] == 36.2
assert flight_block["takeoff_origin"]["alt_m"] == 200.0
# No timestamp inside takeoff_origin
assert set(flight_block["takeoff_origin"].keys()) == {
"lat_deg",
"lon_deg",
"alt_m",
}
# ----------------------------------------------------------------------
# AC-14
# ----------------------------------------------------------------------
def test_ac14_takeoff_origin_absent_when_not_supplied(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path, takeoff_origin=None, flight_id=None)
# Act
artifact = builder.build_manifest(request)
body = orjson.loads(artifact.manifest_path.read_bytes())
# Assert: flight_id is null but takeoff_origin key is absent
assert body["flight"]["flight_id"] is None
assert "takeoff_origin" not in body["flight"]
# ----------------------------------------------------------------------
# AC-15
# ----------------------------------------------------------------------
def test_ac15_manifest_hash_changes_when_takeoff_origin_differs(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
flight = uuid4()
a = _build_input(
tmp_path / "a",
takeoff_origin=LatLonAlt(50.0, 36.2, 200.0),
flight_id=flight,
)
b = _build_input(
tmp_path / "b",
takeoff_origin=LatLonAlt(50.0, 36.2, 200.001), # 1mm delta
flight_id=flight,
)
# Re-use the same key so only the origin differs.
b = replace(b, key_path=a.key_path)
# Act
art_a = builder.build_manifest(a)
art_b = builder.build_manifest(b)
# Assert
assert art_a.manifest_hash != art_b.manifest_hash
# ----------------------------------------------------------------------
# AC-16
# ----------------------------------------------------------------------
def test_ac16_manifest_hash_changes_when_only_flight_id_differs(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
origin = LatLonAlt(50.0, 36.2, 200.0)
a = _build_input(tmp_path / "a", takeoff_origin=origin, flight_id=uuid4())
b = _build_input(tmp_path / "b", takeoff_origin=origin, flight_id=uuid4())
b = replace(b, key_path=a.key_path)
# Act
art_a = builder.build_manifest(a)
art_b = builder.build_manifest(b)
# Assert
assert art_a.manifest_hash != art_b.manifest_hash
# ----------------------------------------------------------------------
# Protocol conformance
# ----------------------------------------------------------------------
def test_ed25519_manifest_signer_satisfies_protocol() -> None:
# Assert
assert isinstance(Ed25519ManifestSigner(), ManifestSigner)
# ----------------------------------------------------------------------
# Descriptor-index sidecar drift
# ----------------------------------------------------------------------
def test_descriptor_index_sidecar_missing_raises_manifest_write_error(
tmp_path: Path,
) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
(request.cache_root / "descriptors" / "corpus.index.sha256").unlink()
# Act / Assert
with pytest.raises(ManifestWriteError, match="descriptor_index sidecar missing"):
builder.build_manifest(request)
def test_descriptor_index_sidecar_malformed_hex_raises(tmp_path: Path) -> None:
# Arrange
builder, _, _ = _build_builder()
request = _build_input(tmp_path)
(request.cache_root / "descriptors" / "corpus.index.sha256").write_text("not-hex!")
# Act / Assert
with pytest.raises(ManifestWriteError, match="not 64 hex chars"):
builder.build_manifest(request)
@@ -0,0 +1,721 @@
"""Unit tests for AZ-324 :class:`ManifestVerifierImpl`.
Covers all 17 ACs in the AZ-324 task spec plus a Protocol-conformance
check. Uses the real AZ-323 :class:`ManifestBuilder` to materialise
fixtures so the sign/verify round trip exercises production code on
both sides.
"""
from __future__ import annotations
import hashlib
import logging
from pathlib import Path
from uuid import UUID, uuid4
import orjson
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard._types.inference import EngineCacheEntry, PrecisionMode
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c10_provisioning import (
C10ManifestConfig,
Ed25519ManifestSigner,
ManifestBuilder,
ManifestBuildInput,
ManifestVerifier,
ManifestVerifierImpl,
TileHashRecord,
TilesByBboxQuery,
VerifyFailReason,
VerifyOutcome,
)
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
_BBOX = BoundingBox(50.0, 36.0, 50.5, 36.5)
_ZOOM_LEVELS = (16, 17, 18)
def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> Path:
priv = Ed25519PrivateKey.generate()
pem = priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
path = tmp_path / name
path.write_bytes(pem)
return path
def _public_key_from_pem(path: Path) -> Ed25519PublicKey:
priv = serialization.load_pem_private_key(path.read_bytes(), password=None)
assert isinstance(priv, Ed25519PrivateKey)
return priv.public_key()
def _make_engines(cache_root: Path) -> tuple[EngineCacheEntry, ...]:
engines = cache_root / "engines"
engines.mkdir(parents=True, exist_ok=True)
entries: list[EngineCacheEntry] = []
for name in ("dinov2_vpr", "lightglue", "aliked"):
path = engines / f"{name}_sm87_jp62_trt103_fp16.engine"
payload = f"engine-{name}".encode()
path.write_bytes(payload)
entries.append(
EngineCacheEntry(
engine_path=path,
sha256_hex=hashlib.sha256(payload).hexdigest(),
sm=87,
jp="6.2",
trt="10.3",
precision=PrecisionMode.FP16,
extras={},
)
)
return tuple(entries)
def _make_descriptor_index(cache_root: Path) -> Path:
desc_dir = cache_root / "descriptors"
desc_dir.mkdir(parents=True, exist_ok=True)
path = desc_dir / "corpus.index"
Sha256Sidecar.write_atomic_and_sidecar(path, b"faiss-binary-payload")
return path
def _make_calibration(cache_root: Path) -> Path:
cal_dir = cache_root / "calibration"
cal_dir.mkdir(parents=True, exist_ok=True)
path = cal_dir / "int8_calibration.json"
path.write_bytes(b'{"calibration": "data"}')
return path
def _make_tiles(count: int = 10) -> tuple[TileHashRecord, ...]:
return tuple(
TileHashRecord(
zoom=16 + (i % 3),
lat=50.0 + 0.001 * i,
lon=36.0 + 0.001 * i,
source="googlemaps",
sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(),
)
for i in range(count)
)
class _StaticTiles:
def __init__(self, records: tuple[TileHashRecord, ...]) -> None:
self._records = records
def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def]
return self._records
def _build_signed_manifest(
tmp_path: Path,
*,
tiles: tuple[TileHashRecord, ...] | None = None,
takeoff_origin: LatLonAlt | None = None,
flight_id: UUID | None = None,
) -> tuple[Path, Ed25519PublicKey, tuple[TileHashRecord, ...]]:
"""Materialise a complete signed Manifest set on disk.
The builder writes absolute paths verbatim; the verifier expects
cache-root-relative paths (AC-7 bans absolute paths). We
post-process the Manifest body to relative paths and re-sign
with the same key, so each fixture is a realistic v1.1 Manifest.
"""
cache_root = tmp_path / "cache_root"
cache_root.mkdir(parents=True, exist_ok=True)
engine_entries = _make_engines(cache_root)
descriptor_index = _make_descriptor_index(cache_root)
calibration = _make_calibration(cache_root)
key_path = _write_pkcs8_key(tmp_path)
tiles_used = tiles if tiles is not None else _make_tiles(10)
builder = ManifestBuilder(
sidecar=Sha256Sidecar(),
signer=Ed25519ManifestSigner(),
tile_metadata_store=_StaticTiles(tiles_used),
logger=logging.getLogger(f"build-{id(tmp_path)}"),
clock=WallClock(),
config=C10ManifestConfig(),
)
request = ManifestBuildInput(
cache_root=cache_root,
bbox=_BBOX,
zoom_levels=_ZOOM_LEVELS,
sector_class="stable_rear",
engine_entries=engine_entries,
descriptor_index_path=descriptor_index,
calibration_path=calibration,
key_path=key_path,
takeoff_origin=takeoff_origin,
flight_id=flight_id,
)
artifact = builder.build_manifest(request)
# Rewrite to relative paths + re-sign.
body = orjson.loads(artifact.manifest_path.read_bytes())
for entry in body["artifacts"]["engines"]:
entry["path"] = str(Path(entry["path"]).relative_to(cache_root))
body["artifacts"]["descriptor_index"]["path"] = str(
Path(body["artifacts"]["descriptor_index"]["path"]).relative_to(cache_root)
)
body["artifacts"]["calibration"]["path"] = str(
Path(body["artifacts"]["calibration"]["path"]).relative_to(cache_root)
)
body_bytes = orjson.dumps(
body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2
)
if not body_bytes.endswith(b"\n"):
body_bytes += b"\n"
Sha256Sidecar.write_atomic_and_sidecar(artifact.manifest_path, body_bytes)
priv = serialization.load_pem_private_key(key_path.read_bytes(), password=None)
assert isinstance(priv, Ed25519PrivateKey)
Sha256Sidecar.write_atomic(artifact.signature_path, priv.sign(body_bytes))
pub = _public_key_from_pem(key_path)
return artifact.manifest_path, pub, tiles_used
def _build_verifier(
*, tiles: tuple[TileHashRecord, ...] | None = None
) -> tuple[ManifestVerifierImpl, list[logging.LogRecord]]:
records: list[logging.LogRecord] = []
class _ListHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
records.append(record)
logger = logging.getLogger(f"verify-{id(records)}")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
logger.addHandler(_ListHandler())
logger.propagate = False
tile_store: TilesByBboxQuery | None = (
_StaticTiles(tiles) if tiles is not None else None
)
verifier = ManifestVerifierImpl(
sidecar=Sha256Sidecar(),
logger=logger,
clock=WallClock(),
tile_metadata_store=tile_store,
)
return verifier, records
# ----------------------------------------------------------------------
# AC-1
# ----------------------------------------------------------------------
def test_ac1_pass_on_valid_manifest(tmp_path: Path) -> None:
# Arrange
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.PASS
assert result.fail_reasons == ()
assert all(c.matched for c in result.per_artifact_checks)
assert result.signing_public_key_fingerprint is not None
assert result.elapsed_ms >= 0
# ----------------------------------------------------------------------
# AC-2
# ----------------------------------------------------------------------
def test_ac2_fail_on_missing_manifest(tmp_path: Path) -> None:
# Arrange
verifier, _ = _build_verifier()
missing = tmp_path / "nope" / "Manifest.json"
# Act
result = verifier.verify_manifest(
manifest_path=missing,
trusted_public_keys=(Ed25519PrivateKey.generate().public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert result.fail_reasons == (VerifyFailReason.MANIFEST_NOT_FOUND,)
assert result.per_artifact_checks == ()
assert result.signing_public_key_fingerprint is None
# ----------------------------------------------------------------------
# AC-3
# ----------------------------------------------------------------------
def test_ac3_fail_on_missing_signature(tmp_path: Path) -> None:
# Arrange
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
(manifest_path.parent / "Manifest.json.sig").unlink()
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert result.fail_reasons == (VerifyFailReason.SIGNATURE_NOT_FOUND,)
assert result.per_artifact_checks == ()
# ----------------------------------------------------------------------
# AC-4
# ----------------------------------------------------------------------
def test_ac4_fail_on_tampered_manifest_body(tmp_path: Path) -> None:
# Arrange: flip one byte in Manifest.json (sidecar untouched)
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
body = bytearray(manifest_path.read_bytes())
body[10] ^= 0x01
manifest_path.write_bytes(bytes(body))
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.MANIFEST_SELF_HASH_MISMATCH in result.fail_reasons
assert result.per_artifact_checks == ()
# ----------------------------------------------------------------------
# AC-5
# ----------------------------------------------------------------------
def test_ac5_fail_on_untrusted_public_key(tmp_path: Path) -> None:
# Arrange: verify with a different keypair
manifest_path, _signed_with, _ = _build_signed_manifest(tmp_path)
other_key = Ed25519PrivateKey.generate().public_key()
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(other_key,),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.UNTRUSTED_PUBLIC_KEY in result.fail_reasons
assert result.signing_public_key_fingerprint is not None
assert result.per_artifact_checks == ()
# ----------------------------------------------------------------------
# AC-6
# ----------------------------------------------------------------------
def test_ac6_schema_violation_names_offending_field(tmp_path: Path) -> None:
# Arrange
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body.pop("signing_public_key_fingerprint")
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
# Re-sign so we get past Step B; we want Step C to be the failure.
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons
assert any("signing_public_key_fingerprint" in d for d in result.fail_details)
# ----------------------------------------------------------------------
# AC-7
# ----------------------------------------------------------------------
def test_ac7_absolute_path_in_artifact_is_schema_violation(tmp_path: Path) -> None:
# Arrange
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body["artifacts"]["engines"][0]["path"] = "/etc/passwd"
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons
assert any("/etc/passwd" in d for d in result.fail_details)
# No per-artifact disk reads happened — the walk is Step D only.
assert result.per_artifact_checks == ()
def test_ac7_dot_dot_segment_in_artifact_is_schema_violation(tmp_path: Path) -> None:
# Arrange
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body["artifacts"]["calibration"]["path"] = "../calibration/int8.json"
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons
# ----------------------------------------------------------------------
# AC-8
# ----------------------------------------------------------------------
def test_ac8_multiple_fail_reasons_accumulate(tmp_path: Path) -> None:
# Arrange: 1 engine missing, 1 engine drifted, 1 engine OK
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
cache_root = manifest_path.parent
body = orjson.loads(manifest_path.read_bytes())
# Delete first engine
first_engine_rel = body["artifacts"]["engines"][0]["path"]
(cache_root / first_engine_rel).unlink()
# Mutate second engine
second_engine_rel = body["artifacts"]["engines"][1]["path"]
(cache_root / second_engine_rel).write_bytes(b"drifted-bytes")
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.ARTIFACT_MISSING in result.fail_reasons
assert VerifyFailReason.ARTIFACT_HASH_MISMATCH in result.fail_reasons
engine_checks = [c for c in result.per_artifact_checks if "engines" in c.relative_path]
assert len(engine_checks) == 3
matched_flags = [c.matched for c in engine_checks]
assert matched_flags.count(True) == 1
assert matched_flags.count(False) == 2
# ----------------------------------------------------------------------
# AC-9
# ----------------------------------------------------------------------
def test_ac9_operator_mode_re_derives_tiles_coverage(tmp_path: Path) -> None:
# Arrange: build with tiles X; verify with tiles Y → mismatch
tiles_built = _make_tiles(10)
tiles_drifted = (
*tiles_built[:-1],
TileHashRecord(
zoom=18,
lat=50.99,
lon=36.99,
source="googlemaps",
sha256_hex=hashlib.sha256(b"drifted-tile").hexdigest(),
),
)
manifest_path, pub, _ = _build_signed_manifest(tmp_path, tiles=tiles_built)
verifier, _ = _build_verifier(tiles=tiles_drifted)
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.TILES_COVERAGE_MISMATCH in result.fail_reasons
assert any("tiles_coverage" in d for d in result.fail_details)
def test_ac9_operator_mode_pass_when_tiles_match(tmp_path: Path) -> None:
# Arrange
tiles = _make_tiles(10)
manifest_path, pub, _ = _build_signed_manifest(tmp_path, tiles=tiles)
verifier, _ = _build_verifier(tiles=tiles)
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.PASS
tiles_check = next(
c for c in result.per_artifact_checks if c.relative_path == "tiles_coverage"
)
assert tiles_check.matched is True
assert tiles_check.actual_sha256 == tiles_check.expected_sha256
# ----------------------------------------------------------------------
# AC-10
# ----------------------------------------------------------------------
def test_ac10_airborne_mode_trusts_tiles_coverage(tmp_path: Path) -> None:
# Arrange
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
verifier, _ = _build_verifier() # tile_metadata_store=None
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.PASS
tiles_check = next(
c for c in result.per_artifact_checks if c.relative_path == "tiles_coverage"
)
assert tiles_check.matched is True
# ----------------------------------------------------------------------
# AC-11
# ----------------------------------------------------------------------
def test_ac11_protocol_conformance() -> None:
# Assert
verifier, _ = _build_verifier()
assert isinstance(verifier, ManifestVerifier)
# ----------------------------------------------------------------------
# AC-12
# ----------------------------------------------------------------------
def test_ac12_elapsed_ms_recorded_on_every_outcome(tmp_path: Path) -> None:
# Arrange
verifier, _ = _build_verifier()
manifest_path, pub, _ = _build_signed_manifest(tmp_path)
# Act
pass_result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
fail_result = verifier.verify_manifest(
manifest_path=tmp_path / "missing.json",
trusted_public_keys=(pub,),
)
# Assert
assert pass_result.elapsed_ms >= 0
assert fail_result.elapsed_ms >= 0
# ----------------------------------------------------------------------
# AC-13
# ----------------------------------------------------------------------
def test_ac13_empty_trusted_public_keys_fails_closed(tmp_path: Path) -> None:
# Arrange
verifier, records = _build_verifier()
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.UNTRUSTED_PUBLIC_KEY in result.fail_reasons
assert result.per_artifact_checks == ()
errors = [r for r in records if r.levelno == logging.ERROR]
assert any(r.__dict__.get("kind") == "c10.manifest.verify.untrusted" for r in errors)
# ----------------------------------------------------------------------
# AC-14
# ----------------------------------------------------------------------
def test_ac14_v10_manifest_without_flight_block_parses(tmp_path: Path) -> None:
# Arrange: take a built manifest and strip the entire `flight` block.
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body.pop("flight", None)
body["schema_version"] = "1.0"
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.PASS
assert result.takeoff_origin is None
assert result.flight_id is None
# ----------------------------------------------------------------------
# AC-15
# ----------------------------------------------------------------------
def test_ac15_well_formed_in_bbox_takeoff_origin_passes(tmp_path: Path) -> None:
# Arrange
flight = uuid4()
origin = LatLonAlt(50.0, 36.2, 200.0)
manifest_path, pub, _ = _build_signed_manifest(
tmp_path, takeoff_origin=origin, flight_id=flight
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(pub,),
)
# Assert
assert result.outcome is VerifyOutcome.PASS
assert result.takeoff_origin == origin
assert result.flight_id == flight
# ----------------------------------------------------------------------
# AC-16
# ----------------------------------------------------------------------
def test_ac16_malformed_takeoff_origin_fails_closed(tmp_path: Path) -> None:
# Arrange
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body["flight"] = {
"flight_id": str(uuid4()),
"takeoff_origin": {"lat_deg": 200.0, "lon_deg": 36.2, "alt_m": 100.0},
}
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.TAKEOFF_ORIGIN_INVALID in result.fail_reasons
assert any("lat_deg" in d for d in result.fail_details)
# Diagnostics: takeoff_origin still populated even on FAIL
assert result.takeoff_origin is not None
# ----------------------------------------------------------------------
# AC-17
# ----------------------------------------------------------------------
def test_ac17_out_of_bbox_takeoff_origin_fails_closed(tmp_path: Path) -> None:
# Arrange
manifest_path, _pub, _ = _build_signed_manifest(tmp_path)
body = orjson.loads(manifest_path.read_bytes())
body["flight"] = {
"flight_id": str(uuid4()),
"takeoff_origin": {"lat_deg": 10.0, "lon_deg": 10.0, "alt_m": 0.0},
}
new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n"
Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes)
priv = Ed25519PrivateKey.generate()
Sha256Sidecar.write_atomic(
manifest_path.parent / "Manifest.json.sig",
priv.sign(new_bytes),
)
verifier, _ = _build_verifier()
# Act
result = verifier.verify_manifest(
manifest_path=manifest_path,
trusted_public_keys=(priv.public_key(),),
)
# Assert
assert result.outcome is VerifyOutcome.FAIL
assert VerifyFailReason.TAKEOFF_ORIGIN_OUT_OF_BBOX in result.fail_reasons
@@ -0,0 +1,88 @@
"""AZ-507 — Layer-0 typed-error envelope shim tests.
Covers AC-2 (shim re-exports resolve and are identical to the c7
canonical classes) and AC-1 / AC-5 (module-layout.md + architecture.md
documentation invariants).
"""
from __future__ import annotations
import re
from pathlib import Path
from gps_denied_onboard._types.inference_errors import (
CalibrationCacheError as ShimCalibrationCacheError,
)
from gps_denied_onboard._types.inference_errors import (
EngineBuildError as ShimEngineBuildError,
)
from gps_denied_onboard.components.c7_inference.errors import (
CalibrationCacheError as CanonicalCalibrationCacheError,
)
from gps_denied_onboard.components.c7_inference.errors import (
EngineBuildError as CanonicalEngineBuildError,
)
_REPO_ROOT = Path(__file__).resolve().parents[2]
def test_ac2_shim_engine_build_error_identity_with_canonical() -> None:
# The shim must NOT introduce a fresh subclass — consumers catching
# ShimEngineBuildError MUST catch what c7 actually raises.
assert ShimEngineBuildError is CanonicalEngineBuildError
def test_ac2_shim_calibration_cache_error_identity_with_canonical() -> None:
assert ShimCalibrationCacheError is CanonicalCalibrationCacheError
def test_ac2_shim_module_has_no_runtime_side_effects() -> None:
# Importing the shim must not register a component config block or
# otherwise mutate global state. Re-importing the module twice in the
# same process must remain a no-op.
import importlib
import gps_denied_onboard._types.inference_errors as shim
first = importlib.reload(shim)
second = importlib.reload(shim)
assert first.EngineBuildError is second.EngineBuildError
assert first.CalibrationCacheError is second.CalibrationCacheError
def test_ac1_module_layout_has_no_cross_component_public_api_imports() -> None:
# AZ-507's primary deliverable: every line that previously said
# `components.X (Public API)` in an "Imports from" line must now point
# at `_types` (or be removed). We grep for the offending pattern and
# assert zero matches.
layout = (
_REPO_ROOT / "_docs" / "02_document" / "module-layout.md"
).read_text(encoding="utf-8")
offenders = re.findall(
r"components\.[a-z0-9_]+\s*\(Public API\)",
layout,
)
assert offenders == [], (
"module-layout.md still names cross-component Public API imports "
f"after AZ-507: {offenders}"
)
def test_ac5_architecture_doc_codifies_cross_component_rule() -> None:
# AZ-507 AC-5: the architecture doc must carry a one-paragraph rule
# that cross-component imports go through `_types/*.py` (DTOs +
# typed-error envelopes), never `components.X (Public API)`.
arch = (
_REPO_ROOT / "_docs" / "02_document" / "architecture.md"
).read_text(encoding="utf-8")
# Look for the rule sentence (case-insensitive) and the explicit
# AZ-507 reference so future readers can trace the decision.
assert "AZ-507" in arch, "architecture.md must reference AZ-507"
assert re.search(
r"cross-component imports go through `_types",
arch,
flags=re.IGNORECASE,
), (
"architecture.md must state the cross-component import rule "
"introduced by AZ-507"
)