diff --git a/_docs/02_document/architecture.md b/_docs/02_document/architecture.md index fb53af5..1b9b926 100644 --- a/_docs/02_document/architecture.md +++ b/_docs/02_document/architecture.md @@ -610,6 +610,10 @@ This decision is made on **technical grounds only**. Component licenses (BSD/Apa - Per-component folders give each implementation a natural home for its own `tests/`, fixtures, and adapter-specific helpers — matching coderule.mdc's "logic specific to a platform, variant, or environment belongs in the class that owns that variant". - Adding a new C2 VPR backbone (e.g., a future foundation-model retrieval backbone via D-C2-12) is a folder-add + interface-conformance change; no other component is touched. +#### Cross-Component Contract Surface (AZ-507) + +The ADR-009 "interface, not concrete" rule has an architectural sibling: cross-component imports go through `_types/*.py` (DTOs + typed-error envelopes such as `_types.inference_errors`), never through `components.X (Public API)`. The only exception is `runtime_root/*` (the composition root), which is allowed to import concrete strategies across components precisely because it is the single place that resolves Protocol parameters to concrete classes. Every other module under `components/**/*.py` consumes cross-component contracts via (a) shared DTOs in `_types/*`, and (b) consumer-side structural `Protocol` cuts defined locally inside the consuming component (e.g. `c10_provisioning.engine_compiler.CompileEngineCallable` for the narrow `compile_engine` surface of the C7 InferenceRuntime). This is the same architectural property as constructor-injection-against-interface, applied to the import graph rather than the call graph. The AZ-270 `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies` lint enforces this on every `components/**/*.py`; AZ-507 reconciles `module-layout.md` with the lint so the documentation and the build gate agree. + ### ADR-010 — Operator-planned mission is the cold-start trust anchor; FC GPS is secondary **Context**: The original cold-start design (AZ-419 / FT-P-11) assumed the FC EKF's last valid GPS fix is available at takeoff to seed C5. Field reality contradicts this: a UAV operating in a contested-EW environment may have GPS jammed **before** takeoff (the jamming radius reaches the launch site, the unit launches under a jammer's umbrella, etc.). In that case the FC EKF has no GPS fix to give, and the companion has nothing to anchor the initial pose to — the entire downstream pipeline (VIO bootstrap, VPR retrieval scope, satellite anchoring) collapses or runs blind. At the same time, the parent suite already requires the operator to author a route in the **Mission Planner UI** (`suite/ui`) and persist it to the **`flights` REST service** (`suite/flights`) before any flight runs. The waypoint ordering is operationally meaningful: waypoint[0] is the planned takeoff point. The operator therefore already declares the takeoff position with operationally relevant accuracy (typically a few tens of metres) hours before launch, in a context that has no dependency on GPS at all. This information is the natural cold-start trust anchor. diff --git a/_docs/02_document/module-layout.md b/_docs/02_document/module-layout.md index d5b7259..5490a82 100644 --- a/_docs/02_document/module-layout.md +++ b/_docs/02_document/module-layout.md @@ -19,6 +19,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec 6. The composition root is `src/gps_denied_onboard/runtime_root.py`. It is the ONLY place that may import concrete strategy implementations across components — every other cross-component dependency is constructor-injected against an interface (ADR-009). 7. Tests mirror the component graph 1:1 at `tests/unit//`. Cross-component scenarios live in `tests/integration/`, `tests/e2e/`, `tests/perf/`, `tests/security/`, `tests/resilience/`. 8. Build-time exclusion (ADR-002): each `/_native/` and the corresponding `cpp//` carry a CMake `BUILD_` flag. The composition root validator refuses to wire a strategy whose flag is OFF. +9. **AZ-507 cross-component contract surface** — the only places a `components//*.py` file may import are: its own subpackage (`gps_denied_onboard.components..*`), `_types/*`, `_types.inference_errors`, `helpers/*`, `config`, `logging`, `fdr_client`, `clock`, `frame_source` (interface only). Cross-component contracts (Protocols + typed exceptions) reach consumers through `_types/*` modules — DTOs in the canonical `_types` files (e.g. `_types.inference.EngineCacheEntry`), typed-error envelopes in `_types.inference_errors`, and consumer-side structural `Protocol` cuts defined locally inside each consuming component (e.g. `c10_provisioning.engine_compiler.CompileEngineCallable`). NEVER `from gps_denied_onboard.components. import ...` — the AZ-270 `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies` lint enforces this on every `components/**/*.py`. The composition root (`runtime_root/*`) is the single exception; it wires concrete strategies into duck-typed Protocol parameters via constructor injection. This rule is the architectural contract paired with the AZ-270 lint; see `architecture.md` § Cross-Component Contract Surface for the rationale. ## Per-Component Mapping @@ -49,7 +50,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `ultra_vpr.py` (primary), `mega_loc.py`, `mix_vpr.py`, `sela_vpr.py`, `eigen_places.py`, `net_vlad.py`, `salad.py` - `_native/` - **Owns**: `src/gps_denied_onboard/components/c2_vpr/**`, `tests/unit/c2_vpr/**` -- **Imports from**: `_types`, `helpers.descriptor_normaliser`, `components.c6_tile_cache` (Public API only — TileStore query interface), `components.c7_inference` (InferenceRuntime), `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.descriptor_normaliser`, `config`, `logging`, `fdr_client`. The TileStore query surface (c6) and the InferenceRuntime surface (c7) are obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); the composition root wires the concrete c6/c7 strategies in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` or `from gps_denied_onboard.components.c7_inference import ...` inside `c2_vpr/*.py`. - **Consumed by**: `c2_5_rerank`, `runtime_root` ### Component: c2_5_rerank @@ -64,7 +65,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - **Internal**: - `inlier_based_reranker.py` (`InlierCountReRanker` — single-pair LightGlue inlier count K=10→N=3, AZ-343; module-level `create()` factory entry-point consumed by `runtime_root.rerank_factory.build_rerank_strategy`; gated by `BUILD_RERANK_INLIER_COUNT`) - **Owns**: `src/gps_denied_onboard/components/c2_5_rerank/**`, `src/gps_denied_onboard/runtime_root/rerank_factory.py`, `tests/unit/c2_5_rerank/**` -- **Imports from**: `_types`, `helpers.lightglue_runtime`, `helpers.feature_extractor` (AZ-343 scope expansion), `helpers.descriptor_normaliser`, `helpers.ransac_filter`, `helpers.se3_utils`, `components.c6_tile_cache` (Public API only — `TileStore`, `TilePixelHandle`, `TileCacheError` family), `clock`, `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.lightglue_runtime`, `helpers.feature_extractor` (AZ-343 scope expansion), `helpers.descriptor_normaliser`, `helpers.ransac_filter`, `helpers.se3_utils`, `clock`, `config`, `logging`, `fdr_client`. The `TileStore`, `TilePixelHandle`, and `TileCacheError` family from c6 are obtained via constructor-injected consumer-side structural Protocol cuts + shared DTOs in `_types` (see AZ-507 cross-component rule below); composition root wires the concrete c6 strategy in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` inside `c2_5_rerank/*.py`. - **Consumed by**: `c3_matcher`, `runtime_root` ### Component: c3_matcher @@ -83,7 +84,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `xfeat.py` (XFeat, AZ-347) - `_native/` - **Owns**: `src/gps_denied_onboard/components/c3_matcher/**`, `tests/unit/c3_matcher/**`, `src/gps_denied_onboard/runtime_root/matcher_factory.py` -- **Imports from**: `_types`, `helpers.lightglue_runtime` (R14: SHARED with C2.5 — owned by helper, NOT by C3), `helpers.ransac_filter`, `helpers.descriptor_normaliser`, `helpers.se3_utils`, `components.c7_inference`, `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.lightglue_runtime` (R14: SHARED with C2.5 — owned by helper, NOT by C3), `helpers.ransac_filter`, `helpers.descriptor_normaliser`, `helpers.se3_utils`, `config`, `logging`, `fdr_client`. The InferenceRuntime surface (c7) is obtained via a constructor-injected consumer-side structural Protocol cut (see AZ-507 cross-component rule below); composition root wires the concrete c7 strategy in. NEVER `from gps_denied_onboard.components.c7_inference import ...` inside `c3_matcher/*.py`. - **Consumed by**: `c3_5_adhop`, `runtime_root` ### Component: c3_5_adhop @@ -99,7 +100,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `passthrough_refiner.py` (reference baseline; AZ-348) - `adhop_refiner.py` (production-default; AZ-349 pending) - **Owns**: `src/gps_denied_onboard/components/c3_5_adhop/**`, `tests/unit/c3_5_adhop/**`, `src/gps_denied_onboard/runtime_root/refiner_factory.py` -- **Imports from**: `_types`, `helpers.ransac_filter` (R14: SHARED with C3 and C4 — owned by helper, NOT by C3.5), `helpers.se3_utils`, `components.c7_inference`, `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.ransac_filter` (R14: SHARED with C3 and C4 — owned by helper, NOT by C3.5), `helpers.se3_utils`, `config`, `logging`, `fdr_client`. The InferenceRuntime surface (c7) is obtained via a constructor-injected consumer-side structural Protocol cut (see AZ-507 cross-component rule below); composition root wires the concrete c7 strategy in. NEVER `from gps_denied_onboard.components.c7_inference import ...` inside `c3_5_adhop/*.py`. - **Consumed by**: `c4_pose`, `runtime_root` ### Component: c4_pose @@ -130,7 +131,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `eskf_baseline.py` (mandatory simple-baseline) - `_native/` - **Owns**: `src/gps_denied_onboard/components/c5_state/**`, `cpp/gtsam_bindings/**` (primary owner; see joint-native note above), `tests/unit/c5_state/**` -- **Imports from**: `_types`, `helpers.imu_preintegrator`, `helpers.se3_utils`, `helpers.wgs_converter`, `components.c4_pose` (Public API: `PoseEstimate`), `config`, `logging`, `fdr_client` +- **Imports from**: `_types` (`PoseEstimate` DTO lives here), `helpers.imu_preintegrator`, `helpers.se3_utils`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. NEVER `from gps_denied_onboard.components.c4_pose import ...` inside `c5_state/*.py` — the `PoseEstimate` DTO is consumed exclusively via `_types`. - **Consumed by**: `c8_fc_adapter`, `c13_fdr`, `runtime_root` ### Component: c6_tile_cache @@ -198,7 +199,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `tlog_replay_adapter.py` (replay-only `FcAdapter`; gated `BUILD_TLOG_REPLAY_ADAPTER`; AZ-265) - `replay_sink.py` (`ReplaySink` interface + `JsonlReplaySink` impl; gated `BUILD_REPLAY_SINK_JSONL`; AZ-265) - **Owns**: `src/gps_denied_onboard/components/c8_fc_adapter/**`, `tests/unit/c8_fc_adapter/**` -- **Imports from**: `_types`, `helpers.wgs_converter`, `helpers.se3_utils`, `components.c5_state` (Public API: `EstimatorOutput`), `config`, `logging`, `fdr_client`, `clock` (for replay timer-injection) +- **Imports from**: `_types` (`EstimatorOutput` DTO lives here), `helpers.wgs_converter`, `helpers.se3_utils`, `config`, `logging`, `fdr_client`, `clock` (for replay timer-injection). NEVER `from gps_denied_onboard.components.c5_state import ...` inside `c8_fc_adapter/*.py` — the `EstimatorOutput` DTO is consumed exclusively via `_types`. - **Consumed by**: `c1_vio` (back-channel: ImuSample, AttitudeWindow), `c5_state` (back-channel: ImuSample, FlightStateSignal, GpsHealth), `runtime_root` (live + operator + replay binaries) > **Back-channel note**: C8 is the source of inbound IMU / attitude / GPS-health signals from the FC. C1 and C5 receive these via constructor-injected `FcAdapter` (typed against the interface, not the concrete adapter). This is NOT a layering violation — C8's role spans both the outbound emit path AND the inbound telemetry source. @@ -217,7 +218,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `default_provisioner.py` (engine compile + descriptors + manifest + content-hash gate, pending) - Composition root: `runtime_root/c10_factory.py` (`build_engine_compiler`, `build_backbone_specs`) - **Owns**: `src/gps_denied_onboard/components/c10_provisioning/**`, `tests/unit/c10_provisioning/**` -- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.engine_filename_schema`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `components.c7_inference` (Public API: engine compile surface), `config`, `logging`, `fdr_client` +- **Imports from**: `_types` (cross-component DTOs `EngineCacheEntry`, `BuildConfig`, `PrecisionMode`, `OptimizationProfile`, `HostCapabilities`, `TileMetadata`, etc.), `_types.inference_errors` (AZ-507 typed-error envelope for `EngineBuildError` + `CalibrationCacheError`), `helpers.sha256_sidecar`, `helpers.engine_filename_schema`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The `InferenceRuntime.compile_engine` surface (c7) and the `TileMetadataStore.query_by_bbox` surface (c6) are obtained via constructor-injected consumer-side structural Protocol cuts (the `CompileEngineCallable` cut already lives in `engine_compiler.py`; AZ-323 / AZ-324 will define analogous `query_by_bbox` cuts inside `c10_provisioning/`). NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` or `from gps_denied_onboard.components.c7_inference import ...` inside `c10_provisioning/*.py`. - **Consumed by**: `c12_operator_tooling`, `runtime_root` (operator binary only — excluded from airborne via `BUILD_C10_PROVISIONING=OFF` for airborne build per ADR-002) ### Component: c11_tile_manager @@ -231,7 +232,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `satellite_provider_downloader.py` (REST client against parent-suite `satellite-provider`) - `satellite_provider_uploader.py` (post-landing batch upload, D-PROJ-2 ingest contract) - **Owns**: `src/gps_denied_onboard/components/c11_tile_manager/**`, `tests/unit/c11_tile_manager/**` -- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.sha256_sidecar`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The c6 storage surface (`TileStore`, `TileMetadataStore`) is obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); composition root wires the concrete c6 strategy in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...` inside `c11_tile_manager/*.py`. - **Consumed by**: `c12_operator_tooling`, `runtime_root` (operator binary only — `BUILD_C11_TILE_MANAGER=OFF` for airborne) ### Component: c12_operator_tooling @@ -246,7 +247,7 @@ Bootstrap reference: `_docs/02_tasks/todo/AZ-263_initial_structure.md`. Architec - `operator_reloc_service.py` (CLI; GUI deferred per epic) - `sector_classifier.py` (operator sets `SectorClassification` → C6) - **Owns**: `src/gps_denied_onboard/components/c12_operator_tooling/**`, `tests/unit/c12_operator_tooling/**` -- **Imports from**: `_types`, `helpers.wgs_converter`, `components.c6_tile_cache` (Public API), `components.c10_provisioning` (Public API), `components.c11_tile_manager` (Public API), `config`, `logging`, `fdr_client` +- **Imports from**: `_types`, `helpers.wgs_converter`, `config`, `logging`, `fdr_client`. The c6 / c10 / c11 surfaces (`TileStore`, `TileMetadataStore`, `CacheProvisioner`, `TileDownloader`, `TileUploader`) are obtained via constructor-injected consumer-side structural Protocol cuts (see AZ-507 cross-component rule below); composition root wires the concrete c6/c10/c11 strategies in. NEVER `from gps_denied_onboard.components.c6_tile_cache import ...`, `from gps_denied_onboard.components.c10_provisioning import ...`, or `from gps_denied_onboard.components.c11_tile_manager import ...` inside `c12_operator_tooling/*.py`. - **Consumed by**: `runtime_root` (operator binary only — `BUILD_C12_OPERATOR_TOOLING=OFF` for airborne) ### Component: c13_fdr diff --git a/_docs/02_tasks/todo/AZ-323_c10_manifest_builder.md b/_docs/02_tasks/done/AZ-323_c10_manifest_builder.md similarity index 100% rename from _docs/02_tasks/todo/AZ-323_c10_manifest_builder.md rename to _docs/02_tasks/done/AZ-323_c10_manifest_builder.md diff --git a/_docs/02_tasks/todo/AZ-324_c10_manifest_verifier.md b/_docs/02_tasks/done/AZ-324_c10_manifest_verifier.md similarity index 100% rename from _docs/02_tasks/todo/AZ-324_c10_manifest_verifier.md rename to _docs/02_tasks/done/AZ-324_c10_manifest_verifier.md diff --git a/_docs/02_tasks/todo/AZ-507_hygiene_module_layout_az270_alignment.md b/_docs/02_tasks/done/AZ-507_hygiene_module_layout_az270_alignment.md similarity index 100% rename from _docs/02_tasks/todo/AZ-507_hygiene_module_layout_az270_alignment.md rename to _docs/02_tasks/done/AZ-507_hygiene_module_layout_az270_alignment.md diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 5bb1862..45bc5dc 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 4 - name: assign-file-ownership - detail: "batch 34 selected (4 tasks, 13 pts): AZ-507 (F1 hygiene 2pt) + AZ-306 (C6 FAISS 5pt) + AZ-323 (C10 Manifest 3pt) + AZ-324 (C10 Verifier 3pt)" + phase: 11 + name: awaiting-commit-approval + detail: "batch 34 code+tests done; pending commit + Jira In Testing; AZ-306 deferred to batch 35" retry_count: 0 cycle: 1 tracker: jira diff --git a/pyproject.toml b/pyproject.toml index 798d587..ae29bc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,13 @@ dependencies = [ # stable, sub-microsecond point-in-rect queries at the few-hundred- # sector scale operators ship per flight (NFR p99 ≤ 100 µs). "rtree>=1.0,<2.0", + # Ed25519 keypair generation + detached signing for AZ-323 C10 + # ManifestBuilder + AZ-318 C11 per-flight signing key. Pinned here + # because AZ-323 is the first concrete consumer; AZ-318 inherits + # the pin when it lands. Major-version bound (<46) follows the + # standard "two majors of compatibility" pattern other deps in + # this file use. + "cryptography>=43.0,<46.0", ] [project.optional-dependencies] diff --git a/src/gps_denied_onboard/_types/inference_errors.py b/src/gps_denied_onboard/_types/inference_errors.py new file mode 100644 index 0000000..7009791 --- /dev/null +++ b/src/gps_denied_onboard/_types/inference_errors.py @@ -0,0 +1,30 @@ +"""Layer-0 typed-error envelope for the C7 inference runtime family (AZ-507). + +This shim lets cross-component consumers (notably the C10 +``engine_compiler`` cache-orchestration path) catch the documented +:class:`EngineBuildError` / :class:`CalibrationCacheError` types without +importing from ``gps_denied_onboard.components.c7_inference`` directly — +which the AZ-270 ``test_ac6_only_compose_root_imports_concrete_strategies`` +lint forbids for every module under ``components/**/*.py``. + +The canonical definitions stay in +:mod:`gps_denied_onboard.components.c7_inference.errors`; we re-export +them here as identity-preserving aliases. Consumers catch the typed +union and unknown exceptions still propagate unchanged. + +Import-only: this module does not register, instantiate, or run any +component-load side effects (no +:func:`register_component_block` call, no module-load logging). +""" + +from __future__ import annotations + +from gps_denied_onboard.components.c7_inference.errors import ( + CalibrationCacheError, + EngineBuildError, +) + +__all__ = [ + "CalibrationCacheError", + "EngineBuildError", +] diff --git a/src/gps_denied_onboard/components/c10_provisioning/__init__.py b/src/gps_denied_onboard/components/c10_provisioning/__init__.py index 88020d6..b686873 100644 --- a/src/gps_denied_onboard/components/c10_provisioning/__init__.py +++ b/src/gps_denied_onboard/components/c10_provisioning/__init__.py @@ -13,35 +13,79 @@ from gps_denied_onboard._types.inference import EngineCacheEntry from gps_denied_onboard._types.manifests import Manifest from gps_denied_onboard.components.c10_provisioning.config import ( BackboneConfig, + C10ManifestConfig, C10ProvisioningConfig, + SigningMode, ) from gps_denied_onboard.components.c10_provisioning.engine_compiler import ( BackboneSpec, CompileEngineCallable, CompileOutcome, + EngineCompiler, EngineCompileRequest, EngineCompileResult, EngineCompileSummary, - EngineCompiler, +) +from gps_denied_onboard.components.c10_provisioning.errors import ( + C10ProvisioningError, + ManifestWriteError, ) from gps_denied_onboard.components.c10_provisioning.interface import ( CacheProvisioner, + ManifestSigner, + SigningKeyHandle, +) +from gps_denied_onboard.components.c10_provisioning.manifest_builder import ( + VALID_SECTOR_CLASSES, + Ed25519ManifestSigner, + ManifestArtifact, + ManifestBuilder, + ManifestBuildInput, + TileHashRecord, + TilesByBboxQuery, +) +from gps_denied_onboard.components.c10_provisioning.manifest_verifier import ( + ArtifactCheck, + ManifestVerifier, + ManifestVerifierImpl, + VerificationResult, + VerifyFailReason, + VerifyOutcome, ) from gps_denied_onboard.config.schema import register_component_block register_component_block("c10_provisioning", C10ProvisioningConfig) __all__ = [ + "VALID_SECTOR_CLASSES", + "ArtifactCheck", "BackboneConfig", "BackboneSpec", + "C10ManifestConfig", "C10ProvisioningConfig", + "C10ProvisioningError", "CacheProvisioner", "CompileEngineCallable", "CompileOutcome", + "Ed25519ManifestSigner", "EngineCacheEntry", "EngineCompileRequest", "EngineCompileResult", "EngineCompileSummary", "EngineCompiler", "Manifest", + "ManifestArtifact", + "ManifestBuildInput", + "ManifestBuilder", + "ManifestSigner", + "ManifestVerifier", + "ManifestVerifierImpl", + "ManifestWriteError", + "SigningKeyHandle", + "SigningMode", + "TileHashRecord", + "TilesByBboxQuery", + "VerificationResult", + "VerifyFailReason", + "VerifyOutcome", ] diff --git a/src/gps_denied_onboard/components/c10_provisioning/config.py b/src/gps_denied_onboard/components/c10_provisioning/config.py index 132ff73..7bf3c77 100644 --- a/src/gps_denied_onboard/components/c10_provisioning/config.py +++ b/src/gps_denied_onboard/components/c10_provisioning/config.py @@ -1,4 +1,4 @@ -"""C10 cache-provisioning config block (AZ-321). +"""C10 cache-provisioning config block (AZ-321, extended by AZ-323). Registered into ``config.components['c10_provisioning']`` by the package ``__init__.py``. The composition-root factory @@ -7,6 +7,10 @@ reads this block to enumerate the project's backbones and to bound the workspace memory passed to :meth:`InferenceRuntime.compile_engine`. +AZ-323 extends the block with a nested :class:`C10ManifestConfig` +that drives the operator-mode signing-key allowlist gate +(C10-ST-01) and pins the Manifest schema version. + Backbone enumeration is config-driven (not hardcoded) so a new model is a YAML change rather than a code change — see the AZ-321 task spec §Constraints. @@ -15,16 +19,89 @@ spec §Constraints. from __future__ import annotations from dataclasses import dataclass, field +from enum import Enum from gps_denied_onboard.config.schema import ConfigError __all__ = [ "BackboneConfig", + "C10ManifestConfig", "C10ProvisioningConfig", + "SigningMode", ] _DEFAULT_WORKSPACE_MB: int = 4096 +_DEFAULT_MANIFEST_SCHEMA_VERSION: str = "1.1" + + +class SigningMode(str, Enum): + """C10 Manifest signing-mode (AZ-323, C10-ST-01). + + ``OPERATOR``: production — the key fingerprint MUST be in + :attr:`C10ManifestConfig.allowed_operator_fingerprints` or the + build fails closed with :class:`ManifestWriteError`. + + ``DEV``: development / research — any key is accepted; an + operator-allowlisted key used in this mode emits a WARN log. + """ + + OPERATOR = "operator" + DEV = "dev" + + +@dataclass(frozen=True) +class C10ManifestConfig: + """Sub-block driving :class:`ManifestBuilder` policy (AZ-323). + + ``signing_mode`` controls the operator-key allowlist gate + (C10-ST-01). ``allowed_operator_fingerprints`` is the SHA-256 hex + of the raw 32-byte Ed25519 public key — 64 lowercase hex chars per + entry. ``schema_version`` is written into the Manifest body so the + AZ-324 verifier knows which optional blocks (e.g. ``flight``, + added in ADR-010 / v1.1) to expect. + """ + + signing_mode: SigningMode = SigningMode.DEV + allowed_operator_fingerprints: tuple[str, ...] = () + schema_version: str = _DEFAULT_MANIFEST_SCHEMA_VERSION + + def __post_init__(self) -> None: + if not self.schema_version: + raise ConfigError( + "C10ManifestConfig.schema_version must be a non-empty string" + ) + seen: set[str] = set() + for fp in self.allowed_operator_fingerprints: + if not isinstance(fp, str): + raise ConfigError( + "C10ManifestConfig.allowed_operator_fingerprints entries " + f"must be strings; got {type(fp).__name__}" + ) + if len(fp) != 64: + raise ConfigError( + "C10ManifestConfig.allowed_operator_fingerprints entries " + f"must be 64-hex-char SHA-256 digests; got {len(fp)} chars " + f"for {fp!r}" + ) + if fp.lower() != fp: + raise ConfigError( + "C10ManifestConfig.allowed_operator_fingerprints entries " + f"must be lowercase hex; got {fp!r}" + ) + try: + int(fp, 16) + except ValueError as exc: + raise ConfigError( + "C10ManifestConfig.allowed_operator_fingerprints contains " + f"non-hex entry {fp!r}" + ) from exc + if fp in seen: + raise ConfigError( + "C10ManifestConfig.allowed_operator_fingerprints contains " + f"duplicate fingerprint {fp!r}" + ) + seen.add(fp) @dataclass(frozen=True) @@ -88,10 +165,16 @@ class C10ProvisioningConfig: into :class:`BuildConfig`; defaults to 4 GiB which matches the C7 NFT-LIM-01 GPU memory budget. Operators can dial it down for Tier-2 compile workstations with less GPU memory. + + ``manifest`` carries the AZ-323 Manifest-builder policy + (signing mode, allowed operator fingerprints, schema version). + Defaulted to dev-mode with no allowlist so unit tests + replay + runs that don't build Manifests stay no-op. """ backbones: tuple[BackboneConfig, ...] = field(default_factory=tuple) workspace_mb: int = _DEFAULT_WORKSPACE_MB + manifest: C10ManifestConfig = field(default_factory=C10ManifestConfig) def __post_init__(self) -> None: if self.workspace_mb <= 0: diff --git a/src/gps_denied_onboard/components/c10_provisioning/engine_compiler.py b/src/gps_denied_onboard/components/c10_provisioning/engine_compiler.py index 10e937e..b5fa5bd 100644 --- a/src/gps_denied_onboard/components/c10_provisioning/engine_compiler.py +++ b/src/gps_denied_onboard/components/c10_provisioning/engine_compiler.py @@ -52,6 +52,10 @@ from gps_denied_onboard._types.inference import ( OptimizationProfile, PrecisionMode, ) +from gps_denied_onboard._types.inference_errors import ( + CalibrationCacheError, + EngineBuildError, +) from gps_denied_onboard._types.manifests import HostCapabilities from gps_denied_onboard.helpers.engine_filename_schema import ( EngineFilenameSchema, @@ -275,14 +279,14 @@ class EngineCompiler: entry = self._runtime.compile_engine( backbone.onnx_path, build_config ) - except Exception as exc: - # The C7 InferenceRuntime contract scopes exceptions to its - # `RuntimeError` family (`EngineBuildError`, - # `CalibrationCacheError`, ...). The c10 layer is forbidden - # from importing the c7 errors module (architecture rule - # AC-6 / test_az270_compose_root.test_ac6); we catch the - # broader `Exception` and dispatch by class name in the log - # payload. Re-raising preserves the original type. + except (EngineBuildError, CalibrationCacheError) as exc: + # AZ-507 narrowed the catch to the documented C7 typed-error + # envelope (`_types/inference_errors.py` re-exports + # `EngineBuildError` + `CalibrationCacheError` from + # `c7_inference.errors` without violating the AZ-270 lint). + # Unknown exceptions intentionally propagate unhandled — they + # are programmer errors, not C7 contract failures, and must + # not be swallowed under a structured "compile.error" log. self._log.error( "c10.engine.compile.error", extra={ diff --git a/src/gps_denied_onboard/components/c10_provisioning/errors.py b/src/gps_denied_onboard/components/c10_provisioning/errors.py new file mode 100644 index 0000000..985d18d --- /dev/null +++ b/src/gps_denied_onboard/components/c10_provisioning/errors.py @@ -0,0 +1,38 @@ +"""C10 cache-provisioning error family. + +Rooted at :class:`C10ProvisioningError`; today the family contains +:class:`ManifestWriteError` (AZ-323) covering signing-key load failure, +fingerprint-allowlist rejection, and any I/O failure path during +``ManifestBuilder.build_manifest``. AZ-324 / AZ-325 add additional +subtypes (``ManifestVerifierError``, ``ManifestCoverageError``, +``ContentHashMismatchError``) under the same root as they land. +""" + +from __future__ import annotations + +__all__ = [ + "C10ProvisioningError", + "ManifestWriteError", +] + + +class C10ProvisioningError(Exception): + """Base class for the C10 cache-provisioning error family.""" + + +class ManifestWriteError(C10ProvisioningError): + """``ManifestBuilder.build_manifest`` could not produce a signed Manifest. + + Surfaces three failure modes: + + 1. Operator-mode signing key fingerprint not in the configured + allowlist (C10-ST-01). + 2. Signing key file unreadable or malformed PEM (the underlying + ``cryptography`` exception is chained via ``__cause__``). + 3. Any underlying atomic-write / sidecar failure during Manifest + or signature emission. + + Callers catch this single envelope; the structured `kind= + "c10.manifest.build.error"` log payload (set by ``ManifestBuilder``) + carries the discriminator field. + """ diff --git a/src/gps_denied_onboard/components/c10_provisioning/interface.py b/src/gps_denied_onboard/components/c10_provisioning/interface.py index ea11d99..d4b7694 100644 --- a/src/gps_denied_onboard/components/c10_provisioning/interface.py +++ b/src/gps_denied_onboard/components/c10_provisioning/interface.py @@ -1,4 +1,8 @@ -"""C10 `CacheProvisioner` Protocol. +"""C10 Public-API Protocols. + +- :class:`CacheProvisioner` (AZ-325, pending) — pre-flight orchestrator. +- :class:`ManifestSigner` (AZ-323) — Ed25519 detached signing surface + consumed by :class:`ManifestBuilder`. Concrete impl: engine compile + descriptors + manifest + content-hash gate. See `_docs/02_document/components/11_c10_provisioning/`. @@ -7,12 +11,58 @@ Concrete impl: engine compile + descriptors + manifest + content-hash gate. See from __future__ import annotations from pathlib import Path -from typing import Protocol +from typing import Protocol, runtime_checkable from gps_denied_onboard._types.manifests import Manifest +__all__ = [ + "CacheProvisioner", + "ManifestSigner", + "SigningKeyHandle", +] + class CacheProvisioner(Protocol): """Pre-flight cache provisioning (engine compile + descriptor batch + manifest).""" def provision(self, flight_id: str, output_root: Path) -> Manifest: ... + + +class SigningKeyHandle(Protocol): + """Opaque handle returned by :meth:`ManifestSigner.load_signing_key`. + + The Protocol intentionally exposes no methods — concrete signers + (e.g. :class:`Ed25519ManifestSigner`) hold the actual key behind + this marker so the caller can pass it back into :meth:`sign` / + :meth:`public_key_fingerprint` without ever touching the secret + material. + """ + + +@runtime_checkable +class ManifestSigner(Protocol): + """Detached-signature provider for :class:`ManifestBuilder` (AZ-323). + + Default impl is :class:`Ed25519ManifestSigner` using + ``cryptography.hazmat.primitives.asymmetric.ed25519``; tests + inject a deterministic in-memory keypair. + + Contract: + - :meth:`load_signing_key` takes a path to an operator-supplied + PEM-encoded PKCS8 Ed25519 private key, returns an opaque + :class:`SigningKeyHandle`. Format errors raise + :class:`gps_denied_onboard.components.c10_provisioning.errors.ManifestWriteError` + with the underlying ``cryptography`` exception chained via + ``__cause__``. + - :meth:`sign` produces a 64-byte raw Ed25519 signature over the + payload bytes. Re-entry-safe; a single handle may be used to + sign many payloads. + - :meth:`public_key_fingerprint` returns the SHA-256 hex digest of + the raw 32-byte public key (operator-mode allowlist key). + """ + + def load_signing_key(self, key_path: Path) -> SigningKeyHandle: ... + + def sign(self, key: SigningKeyHandle, payload_bytes: bytes) -> bytes: ... + + def public_key_fingerprint(self, key: SigningKeyHandle) -> str: ... diff --git a/src/gps_denied_onboard/components/c10_provisioning/manifest_builder.py b/src/gps_denied_onboard/components/c10_provisioning/manifest_builder.py new file mode 100644 index 0000000..e4f3d69 --- /dev/null +++ b/src/gps_denied_onboard/components/c10_provisioning/manifest_builder.py @@ -0,0 +1,675 @@ +"""C10 ManifestBuilder + Ed25519ManifestSigner (AZ-323). + +Produces the signed cache Manifest covering every shipped artifact +plus the build-identity tuple whose canonical hash (``manifest_hash``) +is the D-C10-1 idempotence key. Implements the AC-NEW-1 trust chain +(takeoff arming refuses to deserialize engines before a Manifest +verify succeeds — AZ-324 owns the verify; this task owns the build). + +Cross-component DTOs (``LatLonAlt``, ``BoundingBox``) come from +``_types/geo.py``; engine entries from ``_types/inference.py``; +the ``Manifest`` placeholder DTO from ``_types/manifests.py``. No +direct ``components.X`` imports — the AZ-507 lint forbids it. +""" + +from __future__ import annotations + +import hashlib +import logging +import time +from collections.abc import Iterable +from dataclasses import dataclass, field +from pathlib import Path +from typing import Protocol, runtime_checkable +from uuid import UUID + +import orjson +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) +from cryptography.hazmat.primitives.serialization import load_pem_private_key + +from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt +from gps_denied_onboard._types.inference import EngineCacheEntry +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c10_provisioning.config import ( + C10ManifestConfig, + SigningMode, +) +from gps_denied_onboard.components.c10_provisioning.errors import ( + ManifestWriteError, +) +from gps_denied_onboard.components.c10_provisioning.interface import ( + ManifestSigner, + SigningKeyHandle, +) +from gps_denied_onboard.helpers.sha256_sidecar import ( + Sha256Sidecar, + Sha256SidecarError, +) + +__all__ = [ + "VALID_SECTOR_CLASSES", + "Ed25519ManifestSigner", + "ManifestArtifact", + "ManifestBuildInput", + "ManifestBuilder", + "TileHashRecord", + "TilesByBboxQuery", +] + +_BUILD_LOG_KIND_PREFIX = "c10.manifest" +_TAKEOFF_ORIGIN_DECIMALS = 9 +_MANIFEST_FILENAME = "Manifest.json" +_SIGNATURE_FILENAME = "Manifest.json.sig" +_ED25519_PUBKEY_BYTES = 32 +_ED25519_SIG_BYTES = 64 + +VALID_SECTOR_CLASSES: frozenset[str] = frozenset( + {"active_conflict", "stable_rear"} +) + + +@dataclass(frozen=True) +class TileHashRecord: + """Consumer-side DTO carrying the four sort keys + the per-tile digest. + + AZ-323 only needs ``(zoom, lat, lon, source)`` for canonical + ordering and ``sha256_hex`` for the aggregate hash. The + composition-root adapter wraps C6's ``TileMetadata`` rows into + this shape so the AZ-270 lint stays green (no + ``components.c6_tile_cache`` import from C10). + """ + + zoom: int + lat: float + lon: float + source: str + sha256_hex: str + + +@runtime_checkable +class TilesByBboxQuery(Protocol): + """Consumer-side structural cut over C6's ``TileMetadataStore``. + + The composition root adapts + :class:`gps_denied_onboard.components.c6_tile_cache.TileMetadataStore` + by translating its ``query_by_bbox`` return value into a tuple of + :class:`TileHashRecord`. C10 depends on THIS Protocol so + ``components/c10_provisioning/*`` never imports ``components.c6_*`` + (AZ-270 + AZ-507 boundary). + """ + + def query_by_bbox( + self, + *, + bbox: BoundingBox, + zoom_levels: tuple[int, ...], + sector_class: str, + ) -> Iterable[TileHashRecord]: ... + + +@dataclass(frozen=True) +class ManifestBuildInput: + """Frozen call argument for :meth:`ManifestBuilder.build_manifest`. + + Per the AZ-323 spec ``sector_class`` is the c6 enum's ``.value`` + string ('active_conflict' / 'stable_rear'); the composition root + translates the C6 enum to its string form before injecting so + C10 stays free of the C6 import. + + ``takeoff_origin`` + ``flight_id`` are the ADR-010 / AZ-489 + pass-through fields — when supplied they are both baked into the + Manifest body AND fed into the ``manifest_hash`` so a re-planned + flight produces a fresh cache identity (AC-15 / AC-16). + """ + + cache_root: Path + bbox: BoundingBox + zoom_levels: tuple[int, ...] + sector_class: str + engine_entries: tuple[EngineCacheEntry, ...] + descriptor_index_path: Path + calibration_path: Path + key_path: Path + takeoff_origin: LatLonAlt | None = None + flight_id: UUID | None = None + + +@dataclass(frozen=True) +class ManifestArtifact: + """Return value of :meth:`ManifestBuilder.build_manifest`. + + ``manifest_hash`` is the D-C10-1 idempotence key — derived from + the build identity tuple, NOT from the Manifest file bytes (which + include ``built_at`` and so differ across runs). The Manifest + file's own SHA-256 lives on disk as ``Manifest.json.sha256`` per + AC-11. + """ + + manifest_path: Path + signature_path: Path + manifest_hash: str + signing_public_key_fingerprint: str + total_artifacts_listed: int + + +@dataclass(frozen=True) +class _Ed25519SigningKey(SigningKeyHandle): + """Opaque handle wrapping a ``cryptography`` Ed25519 private key. + + Frozen so callers cannot mutate the key in flight; the underlying + ``Ed25519PrivateKey`` object stays in the dataclass field but is + not exposed by name on :class:`SigningKeyHandle`. + """ + + private_key: Ed25519PrivateKey = field(repr=False) + public_key_raw: bytes + fingerprint_hex: str + + +class Ed25519ManifestSigner: + """Default :class:`ManifestSigner` impl backed by ``cryptography``. + + Loads PEM-encoded PKCS8 Ed25519 private keys per AZ-323 Risk 4 — + other formats (OpenSSH, raw 32-byte) raise + :class:`ManifestWriteError` with the underlying ``cryptography`` + exception chained via ``__cause__`` (AC-9). + """ + + def load_signing_key(self, key_path: Path) -> SigningKeyHandle: + try: + pem_bytes = key_path.read_bytes() + except (OSError, FileNotFoundError) as exc: + raise ManifestWriteError( + f"operator signing key load failed: cannot read {key_path}: {exc}" + ) from exc + + try: + private_key = load_pem_private_key(pem_bytes, password=None) + except (ValueError, TypeError, UnsupportedAlgorithm) as exc: + raise ManifestWriteError( + f"operator signing key load failed: malformed PEM at {key_path}: {exc}" + ) from exc + + if not isinstance(private_key, Ed25519PrivateKey): + raise ManifestWriteError( + "operator signing key load failed: not an Ed25519 private key " + f"(got {type(private_key).__name__}); AZ-323 supports Ed25519 only" + ) + + public_key = private_key.public_key() + public_raw = _ed25519_public_raw(public_key) + return _Ed25519SigningKey( + private_key=private_key, + public_key_raw=public_raw, + fingerprint_hex=hashlib.sha256(public_raw).hexdigest(), + ) + + def sign(self, key: SigningKeyHandle, payload_bytes: bytes) -> bytes: + handle = _require_ed25519_handle(key) + signature = handle.private_key.sign(payload_bytes) + # Defensive: Ed25519 signatures are always 64 bytes — fail fast on + # a library upgrade that changes the contract. + if len(signature) != _ED25519_SIG_BYTES: + raise ManifestWriteError( + f"Ed25519 signer produced {len(signature)} bytes; expected " + f"{_ED25519_SIG_BYTES}" + ) + return signature + + def public_key_fingerprint(self, key: SigningKeyHandle) -> str: + return _require_ed25519_handle(key).fingerprint_hex + + +def _ed25519_public_raw(public_key: Ed25519PublicKey) -> bytes: + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + ) + + raw = public_key.public_bytes( + encoding=Encoding.Raw, + format=PublicFormat.Raw, + ) + if len(raw) != _ED25519_PUBKEY_BYTES: + raise ManifestWriteError( + f"Ed25519 public key has unexpected length: {len(raw)} != " + f"{_ED25519_PUBKEY_BYTES}" + ) + return raw + + +def _require_ed25519_handle(key: SigningKeyHandle) -> _Ed25519SigningKey: + if not isinstance(key, _Ed25519SigningKey): + raise ManifestWriteError( + "Ed25519ManifestSigner received a foreign SigningKeyHandle " + f"({type(key).__name__}); only handles produced by load_signing_key " + "are accepted" + ) + return key + + +class ManifestBuilder: + """Build a signed cache Manifest at ``cache_root/Manifest.json``. + + Atomic-write contract: Manifest body + ``.sha256`` sidecar + + ``.sig`` are all written via :class:`Sha256Sidecar.write_atomic*`, + so a kill mid-build leaves either the previous-good triple or + the new triple — never a partial Manifest (AC-10). + """ + + def __init__( + self, + *, + sidecar: Sha256Sidecar, + signer: ManifestSigner, + tile_metadata_store: TilesByBboxQuery, + logger: logging.Logger, + clock: Clock, + config: C10ManifestConfig, + ) -> None: + self._sidecar = sidecar + self._signer = signer + self._tiles = tile_metadata_store + self._log = logger + self._clock = clock + self._config = config + + def build_manifest(self, request: ManifestBuildInput) -> ManifestArtifact: + self._validate_request(request) + key = self._load_and_gate_key(request.key_path) + fingerprint = self._signer.public_key_fingerprint(key) + self._gate_operator_mode(fingerprint) + + calibration_sha256 = self._sha256_file(request.calibration_path) + descriptor_index_sha256 = self._read_descriptor_index_sidecar( + request.descriptor_index_path + ) + + sorted_tiles = self._fetch_sorted_tiles( + bbox=request.bbox, + zoom_levels=request.zoom_levels, + sector_class=request.sector_class, + ) + tiles_coverage_sha256 = _aggregate_tile_hash(sorted_tiles) + + engine_artifacts = tuple( + { + "path": str(entry.engine_path), + "sha256": entry.sha256_hex, + } + for entry in request.engine_entries + ) + + manifest_hash = _compute_manifest_hash( + engine_entries=request.engine_entries, + calibration_sha256=calibration_sha256, + descriptor_index_sha256=descriptor_index_sha256, + tiles_coverage_sha256=tiles_coverage_sha256, + sector_class=request.sector_class, + bbox=request.bbox, + zoom_levels=request.zoom_levels, + takeoff_origin=request.takeoff_origin, + flight_id=request.flight_id, + ) + + built_at_iso = _ns_to_iso_utc(self._clock.time_ns()) + + manifest_body = self._assemble_manifest_dict( + schema_version=self._config.schema_version, + bbox=request.bbox, + zoom_levels=request.zoom_levels, + sector_class=request.sector_class, + built_at_iso=built_at_iso, + manifest_hash=manifest_hash, + flight_id=request.flight_id, + takeoff_origin=request.takeoff_origin, + engine_artifacts=engine_artifacts, + descriptor_index_path=request.descriptor_index_path, + descriptor_index_sha256=descriptor_index_sha256, + calibration_path=request.calibration_path, + calibration_sha256=calibration_sha256, + tiles_coverage_sha256=tiles_coverage_sha256, + tiles_count=len(sorted_tiles), + fingerprint=fingerprint, + ) + + canonical_bytes = _canonical_json_with_trailing_newline(manifest_body) + manifest_path = request.cache_root / _MANIFEST_FILENAME + signature_path = request.cache_root / _SIGNATURE_FILENAME + + request.cache_root.mkdir(parents=True, exist_ok=True) + self._atomic_write_manifest(manifest_path, canonical_bytes) + signature_bytes = self._signer.sign(key, canonical_bytes) + self._atomic_write_signature(signature_path, signature_bytes) + + total_artifacts = len(engine_artifacts) + 3 # descriptor_index + calibration + tiles_coverage + + self._log.info( + f"{_BUILD_LOG_KIND_PREFIX}.build.success", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.build.success", + "kv": { + "manifest_hash": manifest_hash, + "total_artifacts_listed": total_artifacts, + "signing_public_key_fingerprint": fingerprint, + "tiles_count": len(sorted_tiles), + "schema_version": self._config.schema_version, + }, + }, + ) + + return ManifestArtifact( + manifest_path=manifest_path, + signature_path=signature_path, + manifest_hash=manifest_hash, + signing_public_key_fingerprint=fingerprint, + total_artifacts_listed=total_artifacts, + ) + + def _validate_request(self, request: ManifestBuildInput) -> None: + if request.sector_class not in VALID_SECTOR_CLASSES: + raise ManifestWriteError( + f"sector_class={request.sector_class!r} not in " + f"{sorted(VALID_SECTOR_CLASSES)}" + ) + if not request.zoom_levels: + raise ManifestWriteError( + "zoom_levels must be a non-empty tuple of ints" + ) + if request.takeoff_origin is not None: + origin = request.takeoff_origin + if not (-90.0 <= origin.lat_deg <= 90.0): + raise ManifestWriteError( + f"takeoff_origin.lat_deg={origin.lat_deg} out of [-90, 90]" + ) + if not (-180.0 <= origin.lon_deg <= 180.0): + raise ManifestWriteError( + f"takeoff_origin.lon_deg={origin.lon_deg} out of [-180, 180]" + ) + + def _load_and_gate_key(self, key_path: Path) -> SigningKeyHandle: + try: + return self._signer.load_signing_key(key_path) + except ManifestWriteError: + # Already logged at the call site below; the signer raises with + # an actionable diagnostic. We must still emit the ERROR record + # so operators see a single structured "build.error" entry. + self._log.error( + f"{_BUILD_LOG_KIND_PREFIX}.build.error", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error", + "kv": { + "phase": "load_signing_key", + "key_path": str(key_path), + }, + }, + ) + raise + + def _gate_operator_mode(self, fingerprint: str) -> None: + allowlist = self._config.allowed_operator_fingerprints + if self._config.signing_mode is SigningMode.OPERATOR: + if fingerprint not in allowlist: + self._log.error( + f"{_BUILD_LOG_KIND_PREFIX}.build.error", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error", + "kv": { + "phase": "operator_mode_gate", + "offered_fingerprint": fingerprint, + "allowed_fingerprints": list(allowlist), + }, + }, + ) + raise ManifestWriteError( + "signing key fingerprint not in allowed_operator_fingerprints: " + f"offered={fingerprint!r}, allowed={sorted(allowlist)!r}" + ) + elif self._config.signing_mode is SigningMode.DEV: + if fingerprint in allowlist: + self._log.warning( + f"{_BUILD_LOG_KIND_PREFIX}.dev_mode_with_operator_key", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.dev_mode_with_operator_key", + "kv": { + "offered_fingerprint": fingerprint, + }, + }, + ) + + def _sha256_file(self, path: Path) -> str: + try: + return hashlib.sha256(path.read_bytes()).hexdigest() + except (OSError, FileNotFoundError) as exc: + raise ManifestWriteError( + f"manifest build: cannot hash artifact at {path}: {exc}" + ) from exc + + def _read_descriptor_index_sidecar(self, descriptor_index_path: Path) -> str: + sidecar_path = Path(str(descriptor_index_path) + ".sha256") + try: + text = sidecar_path.read_text(encoding="ascii").strip() + except (OSError, FileNotFoundError) as exc: + raise ManifestWriteError( + "manifest build: descriptor_index sidecar missing at " + f"{sidecar_path}: {exc}" + ) from exc + if len(text) != 64: + raise ManifestWriteError( + "manifest build: descriptor_index sidecar at " + f"{sidecar_path} is not 64 hex chars (got {len(text)})" + ) + try: + int(text, 16) + except ValueError as exc: + raise ManifestWriteError( + "manifest build: descriptor_index sidecar at " + f"{sidecar_path} is not hex: {exc}" + ) from exc + if text.lower() != text: + raise ManifestWriteError( + "manifest build: descriptor_index sidecar at " + f"{sidecar_path} must be lowercase hex" + ) + return text + + def _fetch_sorted_tiles( + self, + *, + bbox: BoundingBox, + zoom_levels: tuple[int, ...], + sector_class: str, + ) -> tuple[TileHashRecord, ...]: + raw = tuple( + self._tiles.query_by_bbox( + bbox=bbox, + zoom_levels=zoom_levels, + sector_class=sector_class, + ) + ) + return tuple( + sorted(raw, key=lambda r: (r.zoom, r.lat, r.lon, r.source)) + ) + + def _assemble_manifest_dict( + self, + *, + schema_version: str, + bbox: BoundingBox, + zoom_levels: tuple[int, ...], + sector_class: str, + built_at_iso: str, + manifest_hash: str, + flight_id: UUID | None, + takeoff_origin: LatLonAlt | None, + engine_artifacts: tuple[dict[str, str], ...], + descriptor_index_path: Path, + descriptor_index_sha256: str, + calibration_path: Path, + calibration_sha256: str, + tiles_coverage_sha256: str, + tiles_count: int, + fingerprint: str, + ) -> dict[str, object]: + flight_block: dict[str, object] = { + "flight_id": str(flight_id) if flight_id is not None else None, + } + if takeoff_origin is not None: + flight_block["takeoff_origin"] = { + "lat_deg": takeoff_origin.lat_deg, + "lon_deg": takeoff_origin.lon_deg, + "alt_m": takeoff_origin.alt_m, + } + + return { + "schema_version": schema_version, + "build": { + "bbox": { + "min_lat_deg": bbox.min_lat_deg, + "min_lon_deg": bbox.min_lon_deg, + "max_lat_deg": bbox.max_lat_deg, + "max_lon_deg": bbox.max_lon_deg, + }, + "zoom_levels": list(zoom_levels), + "sector_class": sector_class, + "built_at": built_at_iso, + "manifest_hash": manifest_hash, + }, + "flight": flight_block, + "artifacts": { + "engines": [dict(e) for e in engine_artifacts], + "descriptor_index": { + "path": str(descriptor_index_path), + "sha256": descriptor_index_sha256, + }, + "calibration": { + "path": str(calibration_path), + "sha256": calibration_sha256, + }, + "tiles_coverage": { + "sha256": tiles_coverage_sha256, + "tile_count": tiles_count, + }, + }, + "signing_public_key_fingerprint": fingerprint, + } + + def _atomic_write_manifest(self, path: Path, payload: bytes) -> None: + try: + self._sidecar.write_atomic_and_sidecar(path, payload) + except Sha256SidecarError as exc: + self._log.error( + f"{_BUILD_LOG_KIND_PREFIX}.build.error", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error", + "kv": {"phase": "write_manifest", "path": str(path)}, + }, + ) + raise ManifestWriteError( + f"manifest build: atomic write failed at {path}: {exc}" + ) from exc + + def _atomic_write_signature(self, path: Path, payload: bytes) -> None: + try: + self._sidecar.write_atomic(path, payload) + except Sha256SidecarError as exc: + self._log.error( + f"{_BUILD_LOG_KIND_PREFIX}.build.error", + extra={ + "kind": f"{_BUILD_LOG_KIND_PREFIX}.build.error", + "kv": {"phase": "write_signature", "path": str(path)}, + }, + ) + raise ManifestWriteError( + f"manifest build: atomic write failed at {path}: {exc}" + ) from exc + + +def _aggregate_tile_hash(records: tuple[TileHashRecord, ...]) -> str: + hasher = hashlib.sha256() + for r in records: + hasher.update( + ( + f"z{r.zoom}|lat{r.lat:.9f}|lon{r.lon:.9f}|src{r.source}" + f":{r.sha256_hex}\n" + ).encode("ascii") + ) + return hasher.hexdigest() + + +def _canonical_json_with_trailing_newline(payload: dict[str, object]) -> bytes: + body = orjson.dumps( + payload, + option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2, + ) + if not body.endswith(b"\n"): + body += b"\n" + return body + + +def _compute_manifest_hash( + *, + engine_entries: tuple[EngineCacheEntry, ...], + calibration_sha256: str, + descriptor_index_sha256: str, + tiles_coverage_sha256: str, + sector_class: str, + bbox: BoundingBox, + zoom_levels: tuple[int, ...], + takeoff_origin: LatLonAlt | None, + flight_id: UUID | None, +) -> str: + # Engine identity is `(model_name, precision, sm, jetpack, trt, sha256)` + # so a stale-host fp16 build never collides with a fresh int8 build — + # this matches the AZ-281 filename schema fields modulo the precision + # axis (which fp16 vs int8 makes load-bearing). + model_ids = sorted( + ( + str(entry.engine_path), + entry.sha256_hex, + ) + for entry in engine_entries + ) + origin_tuple: tuple[float, float, float] | None + if takeoff_origin is not None: + origin_tuple = ( + round(takeoff_origin.lat_deg, _TAKEOFF_ORIGIN_DECIMALS), + round(takeoff_origin.lon_deg, _TAKEOFF_ORIGIN_DECIMALS), + round(takeoff_origin.alt_m, _TAKEOFF_ORIGIN_DECIMALS), + ) + else: + origin_tuple = None + build_identity = { + "model_ids": [list(entry) for entry in model_ids], + "calibration_sha256": calibration_sha256, + "descriptor_index_sha256": descriptor_index_sha256, + "tiles_coverage_sha256": tiles_coverage_sha256, + "sector_class": sector_class, + "bbox": [ + bbox.min_lat_deg, + bbox.min_lon_deg, + bbox.max_lat_deg, + bbox.max_lon_deg, + ], + "zoom_levels": sorted(zoom_levels), + "takeoff_origin": list(origin_tuple) if origin_tuple is not None else None, + "flight_id": str(flight_id) if flight_id is not None else None, + } + canonical = orjson.dumps(build_identity, option=orjson.OPT_SORT_KEYS) + return hashlib.sha256(canonical).hexdigest() + + +def _ns_to_iso_utc(time_ns: int) -> str: + """Format ns-since-epoch as RFC 3339 UTC with second precision. + + Second precision suffices for ``built_at`` — operators inspect the + Manifest at hour / minute granularity, and the build-identity + hash deliberately excludes ``built_at`` so the AC-2 byte-for-byte + determinism check works only by redacting this exact field. + """ + + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time_ns / 1_000_000_000)) diff --git a/src/gps_denied_onboard/components/c10_provisioning/manifest_verifier.py b/src/gps_denied_onboard/components/c10_provisioning/manifest_verifier.py new file mode 100644 index 0000000..0e53593 --- /dev/null +++ b/src/gps_denied_onboard/components/c10_provisioning/manifest_verifier.py @@ -0,0 +1,748 @@ +"""C10 ManifestVerifier — takeoff content-hash gate (AZ-324). + +Read-only validator for the AZ-323-produced cache Manifest. Fail- +closed: any deviation in signature, schema, key trust, hashes, or +the optional ADR-010 takeoff-origin yields ``outcome=FAIL`` with the +union of all ``VerifyFailReason`` values that fired. Never raises on +a verify failure — callers branch on ``outcome`` (per the contract at +``_docs/02_document/contracts/c10_provisioning/manifest_verifier.md``). + +The Protocol + DTOs live alongside the implementation here; the +public re-export surface lives in ``c10_provisioning/__init__.py``. +Cross-component consumers (C5 takeoff arming, C12 operator tooling) +will import via a future ``_types/manifest_verify.py`` shim if and +when they wire up — the AZ-270 lint forbids direct +``components.c10_provisioning`` imports from other components. +""" + +from __future__ import annotations + +import hashlib +import logging +import math +from dataclasses import dataclass +from enum import Enum +from pathlib import Path, PurePosixPath +from typing import Any, Protocol, runtime_checkable +from uuid import UUID + +import orjson +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + +from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.components.c10_provisioning.manifest_builder import ( + TilesByBboxQuery, + _aggregate_tile_hash, +) +from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar + +__all__ = [ + "ArtifactCheck", + "ManifestVerifier", + "ManifestVerifierImpl", + "VerificationResult", + "VerifyFailReason", + "VerifyOutcome", +] + +_VERIFY_LOG_KIND_PREFIX = "c10.manifest.verify" +_ED25519_SIG_BYTES = 64 +_HASH_CHUNK_BYTES = 64 * 1024 +_MANIFEST_FILENAME = "Manifest.json" +_SIDECAR_FILENAME = "Manifest.json.sha256" +_SIGNATURE_FILENAME = "Manifest.json.sig" + + +class VerifyOutcome(str, Enum): + """Top-level pass/fail outcome of :meth:`ManifestVerifier.verify_manifest`.""" + + PASS = "pass" + FAIL = "fail" + + +class VerifyFailReason(str, Enum): + """Enumerated reasons a verify failed; multiple may fire per call.""" + + MANIFEST_NOT_FOUND = "manifest_not_found" + SIGNATURE_NOT_FOUND = "signature_not_found" + SIGNATURE_INVALID = "signature_invalid" + UNTRUSTED_PUBLIC_KEY = "untrusted_public_key" + SCHEMA_VIOLATION = "schema_violation" + ARTIFACT_MISSING = "artifact_missing" + ARTIFACT_HASH_MISMATCH = "artifact_hash_mismatch" + TILES_COVERAGE_MISMATCH = "tiles_coverage_mismatch" + MANIFEST_SELF_HASH_MISMATCH = "manifest_self_hash_mismatch" + TAKEOFF_ORIGIN_INVALID = "takeoff_origin_invalid" + TAKEOFF_ORIGIN_OUT_OF_BBOX = "takeoff_origin_out_of_bbox" + + +@dataclass(frozen=True) +class ArtifactCheck: + """One Manifest artifact entry's verify outcome.""" + + relative_path: str + expected_sha256: str + actual_sha256: str | None # None when the file is missing on disk + matched: bool + + +@dataclass(frozen=True) +class VerificationResult: + """Return value of :meth:`ManifestVerifier.verify_manifest`. + + ``fail_reasons`` is the deterministic union of every reason that + fired during the call; ``fail_details`` is the parallel human- + readable diagnostic list. ``takeoff_origin`` is populated for + diagnostics even on FAIL whenever the ``flight`` block parsed + (MV-INV-9); the callers consume it only on PASS. + """ + + outcome: VerifyOutcome + fail_reasons: tuple[VerifyFailReason, ...] + fail_details: tuple[str, ...] + signing_public_key_fingerprint: str | None + per_artifact_checks: tuple[ArtifactCheck, ...] + takeoff_origin: LatLonAlt | None + flight_id: UUID | None + elapsed_ms: int + + +@runtime_checkable +class ManifestVerifier(Protocol): + """Read-only verifier for a C10-produced ``Manifest.json``. + + Fail-closed: any deviation yields ``outcome=FAIL``; never raises + on a verify failure. Caller passes the trusted operator public- + key tuple — this contract does NOT define a key registry. + """ + + def verify_manifest( + self, + *, + manifest_path: Path, + trusted_public_keys: tuple[Ed25519PublicKey, ...], + ) -> VerificationResult: ... + + +class ManifestVerifierImpl: + """Production :class:`ManifestVerifier` implementation (AZ-324). + + Operator mode (``tile_metadata_store`` supplied) re-derives the + aggregate ``tiles_coverage_sha256`` from C6 and flags drift; + airborne mode (``None``) trusts the recorded value once the + Ed25519 signature passes (per MV-INV-5). + """ + + def __init__( + self, + *, + sidecar: Sha256Sidecar, + logger: logging.Logger, + clock: Clock, + tile_metadata_store: TilesByBboxQuery | None = None, + ) -> None: + self._sidecar = sidecar + self._log = logger + self._clock = clock + self._tiles = tile_metadata_store + + def verify_manifest( + self, + *, + manifest_path: Path, + trusted_public_keys: tuple[Ed25519PublicKey, ...], + ) -> VerificationResult: + start_ns = self._clock.monotonic_ns() + fail_reasons: list[VerifyFailReason] = [] + fail_details: list[str] = [] + per_artifact_checks: list[ArtifactCheck] = [] + signing_fingerprint: str | None = None + takeoff_origin: LatLonAlt | None = None + flight_id: UUID | None = None + + # --- Step A: Manifest exists & sidecar matches ----------------- + if not manifest_path.exists(): + fail_reasons.append(VerifyFailReason.MANIFEST_NOT_FOUND) + fail_details.append(f"Manifest.json not found at {manifest_path}") + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + manifest_bytes = manifest_path.read_bytes() + sidecar_path = manifest_path.parent / _SIDECAR_FILENAME + if not sidecar_path.exists(): + fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION) + fail_details.append("missing manifest sidecar at " f"{sidecar_path}") + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + sidecar_value = sidecar_path.read_text(encoding="ascii").strip() + actual_self_hash = hashlib.sha256(manifest_bytes).hexdigest() + if actual_self_hash != sidecar_value: + fail_reasons.append(VerifyFailReason.MANIFEST_SELF_HASH_MISMATCH) + fail_details.append( + f"Manifest.json sha256={actual_self_hash} != sidecar={sidecar_value}" + ) + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + # --- Step B: Signature verifies against a trusted key ---------- + signature_path = manifest_path.parent / _SIGNATURE_FILENAME + if not signature_path.exists(): + fail_reasons.append(VerifyFailReason.SIGNATURE_NOT_FOUND) + fail_details.append(f"signature not found at {signature_path}") + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + signature_bytes = signature_path.read_bytes() + if len(signature_bytes) != _ED25519_SIG_BYTES: + fail_reasons.append(VerifyFailReason.SIGNATURE_INVALID) + fail_details.append( + f"signature is {len(signature_bytes)} bytes; expected " + f"{_ED25519_SIG_BYTES}" + ) + signing_fingerprint = self._fingerprint_from_body(manifest_bytes) + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + if not trusted_public_keys: + fail_reasons.append(VerifyFailReason.UNTRUSTED_PUBLIC_KEY) + fail_details.append("trusted_public_keys tuple is empty") + signing_fingerprint = self._fingerprint_from_body(manifest_bytes) + self._log.error( + f"{_VERIFY_LOG_KIND_PREFIX}.untrusted", + extra={ + "kind": f"{_VERIFY_LOG_KIND_PREFIX}.untrusted", + "kv": {"trusted_keys_len": 0}, + }, + ) + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + signature_ok = False + for key in trusted_public_keys: + fingerprint = _fingerprint_of(key) + try: + key.verify(signature_bytes, manifest_bytes) + except InvalidSignature: + continue + signing_fingerprint = fingerprint + signature_ok = True + break + + if not signature_ok: + body_fingerprint = self._fingerprint_from_body(manifest_bytes) + signing_fingerprint = body_fingerprint + trusted_fps = {_fingerprint_of(k) for k in trusted_public_keys} + if body_fingerprint is not None and body_fingerprint not in trusted_fps: + fail_reasons.append(VerifyFailReason.UNTRUSTED_PUBLIC_KEY) + fail_details.append( + f"signing_public_key_fingerprint={body_fingerprint} not in " + f"trusted_public_keys (size={len(trusted_public_keys)})" + ) + else: + fail_reasons.append(VerifyFailReason.SIGNATURE_INVALID) + fail_details.append( + "Ed25519 signature did not verify against any trusted key" + ) + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + # --- Step C: Schema parse ------------------------------------- + try: + manifest_obj: Any = orjson.loads(manifest_bytes) + except orjson.JSONDecodeError as exc: + fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION) + fail_details.append(f"Manifest.json is not valid JSON: {exc}") + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + schema_violations = _validate_manifest_schema(manifest_obj) + if schema_violations: + fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION) + fail_details.extend(schema_violations) + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + flight_block = manifest_obj.get("flight", {}) or {} + flight_id_raw = flight_block.get("flight_id") + if flight_id_raw is not None: + try: + flight_id = UUID(str(flight_id_raw)) + except ValueError: + fail_reasons.append(VerifyFailReason.SCHEMA_VIOLATION) + fail_details.append( + f"flight.flight_id is not a valid UUID: {flight_id_raw!r}" + ) + + bbox = _bbox_from_dict(manifest_obj["build"]["bbox"]) + origin_block = flight_block.get("takeoff_origin") + if origin_block is not None: + origin_parsed, origin_errors = _parse_takeoff_origin(origin_block) + if origin_parsed is not None: + takeoff_origin = origin_parsed + if origin_errors: + fail_reasons.append(VerifyFailReason.TAKEOFF_ORIGIN_INVALID) + fail_details.extend(origin_errors) + elif takeoff_origin is not None and not _origin_in_bbox( + takeoff_origin, bbox + ): + fail_reasons.append(VerifyFailReason.TAKEOFF_ORIGIN_OUT_OF_BBOX) + fail_details.append( + f"takeoff_origin=({takeoff_origin.lat_deg}," + f"{takeoff_origin.lon_deg}) outside bbox " + f"(min={bbox.min_lat_deg},{bbox.min_lon_deg}; " + f"max={bbox.max_lat_deg},{bbox.max_lon_deg})" + ) + + if fail_reasons: + return self._fail( + fail_reasons, + fail_details, + per_artifact_checks, + signing_fingerprint, + takeoff_origin, + flight_id, + start_ns, + ) + + # --- Step D: Per-artifact hash walk --------------------------- + artifacts = manifest_obj["artifacts"] + cache_root = manifest_path.parent + + seen_missing = False + seen_mismatch = False + for entry in artifacts["engines"]: + check = _hash_relative_artifact( + cache_root=cache_root, + relative=entry["path"], + expected=entry["sha256"], + ) + per_artifact_checks.append(check) + if check.actual_sha256 is None: + if not seen_missing: + fail_reasons.append(VerifyFailReason.ARTIFACT_MISSING) + seen_missing = True + fail_details.append(f"missing engine artifact: {entry['path']}") + elif not check.matched: + if not seen_mismatch: + fail_reasons.append(VerifyFailReason.ARTIFACT_HASH_MISMATCH) + seen_mismatch = True + fail_details.append( + f"engine hash mismatch: {entry['path']} " + f"expected={check.expected_sha256} actual={check.actual_sha256}" + ) + + for label, entry in ( + ("descriptor_index", artifacts["descriptor_index"]), + ("calibration", artifacts["calibration"]), + ): + check = _hash_relative_artifact( + cache_root=cache_root, + relative=entry["path"], + expected=entry["sha256"], + ) + per_artifact_checks.append(check) + if check.actual_sha256 is None: + if not seen_missing: + fail_reasons.append(VerifyFailReason.ARTIFACT_MISSING) + seen_missing = True + fail_details.append(f"missing {label} artifact: {entry['path']}") + elif not check.matched: + if not seen_mismatch: + fail_reasons.append(VerifyFailReason.ARTIFACT_HASH_MISMATCH) + seen_mismatch = True + fail_details.append( + f"{label} hash mismatch: {entry['path']} " + f"expected={check.expected_sha256} actual={check.actual_sha256}" + ) + + tiles_recorded_sha = artifacts["tiles_coverage"]["sha256"] + if self._tiles is None: + per_artifact_checks.append( + ArtifactCheck( + relative_path="tiles_coverage", + expected_sha256=tiles_recorded_sha, + actual_sha256=tiles_recorded_sha, + matched=True, + ) + ) + else: + try: + zoom_levels = tuple( + int(z) for z in manifest_obj["build"]["zoom_levels"] + ) + sector_class = str(manifest_obj["build"]["sector_class"]) + records = tuple( + self._tiles.query_by_bbox( + bbox=bbox, + zoom_levels=zoom_levels, + sector_class=sector_class, + ) + ) + records = tuple( + sorted(records, key=lambda r: (r.zoom, r.lat, r.lon, r.source)) + ) + computed = _aggregate_tile_hash(records) + except Exception as exc: + per_artifact_checks.append( + ArtifactCheck( + relative_path="tiles_coverage", + expected_sha256=tiles_recorded_sha, + actual_sha256=None, + matched=False, + ) + ) + fail_reasons.append(VerifyFailReason.TILES_COVERAGE_MISMATCH) + fail_details.append( + f"tiles_coverage re-derivation failed: {exc}" + ) + else: + matched = computed == tiles_recorded_sha + per_artifact_checks.append( + ArtifactCheck( + relative_path="tiles_coverage", + expected_sha256=tiles_recorded_sha, + actual_sha256=computed, + matched=matched, + ) + ) + if not matched: + fail_reasons.append(VerifyFailReason.TILES_COVERAGE_MISMATCH) + fail_details.append( + f"tiles_coverage drift: recorded={tiles_recorded_sha} " + f"computed={computed}" + ) + + elapsed_ms = max( + 0, int((self._clock.monotonic_ns() - start_ns) / 1_000_000) + ) + outcome = VerifyOutcome.PASS if not fail_reasons else VerifyOutcome.FAIL + + if outcome is VerifyOutcome.PASS: + self._log.info( + f"{_VERIFY_LOG_KIND_PREFIX}.pass", + extra={ + "kind": f"{_VERIFY_LOG_KIND_PREFIX}.pass", + "kv": { + "elapsed_ms": elapsed_ms, + "signing_public_key_fingerprint": signing_fingerprint, + "n_artifacts": len(per_artifact_checks), + "mode": "operator" if self._tiles is not None else "airborne", + }, + }, + ) + else: + self._log.warning( + f"{_VERIFY_LOG_KIND_PREFIX}.fail", + extra={ + "kind": f"{_VERIFY_LOG_KIND_PREFIX}.fail", + "kv": { + "elapsed_ms": elapsed_ms, + "fail_reasons": [r.value for r in fail_reasons], + "n_mismatched": sum( + 1 for c in per_artifact_checks if not c.matched + ), + }, + }, + ) + + return VerificationResult( + outcome=outcome, + fail_reasons=tuple(fail_reasons), + fail_details=tuple(fail_details), + signing_public_key_fingerprint=signing_fingerprint, + per_artifact_checks=tuple(per_artifact_checks), + takeoff_origin=takeoff_origin, + flight_id=flight_id, + elapsed_ms=elapsed_ms, + ) + + def _fail( + self, + fail_reasons: list[VerifyFailReason], + fail_details: list[str], + per_artifact_checks: list[ArtifactCheck], + signing_fingerprint: str | None, + takeoff_origin: LatLonAlt | None, + flight_id: UUID | None, + start_ns: int, + ) -> VerificationResult: + elapsed_ms = max( + 0, int((self._clock.monotonic_ns() - start_ns) / 1_000_000) + ) + self._log.warning( + f"{_VERIFY_LOG_KIND_PREFIX}.fail", + extra={ + "kind": f"{_VERIFY_LOG_KIND_PREFIX}.fail", + "kv": { + "elapsed_ms": elapsed_ms, + "fail_reasons": [r.value for r in fail_reasons], + "n_mismatched": sum( + 1 for c in per_artifact_checks if not c.matched + ), + }, + }, + ) + return VerificationResult( + outcome=VerifyOutcome.FAIL, + fail_reasons=tuple(fail_reasons), + fail_details=tuple(fail_details), + signing_public_key_fingerprint=signing_fingerprint, + per_artifact_checks=tuple(per_artifact_checks), + takeoff_origin=takeoff_origin, + flight_id=flight_id, + elapsed_ms=elapsed_ms, + ) + + def _fingerprint_from_body(self, manifest_bytes: bytes) -> str | None: + """Best-effort fingerprint lookup for diagnostics on FAIL paths.""" + + try: + obj = orjson.loads(manifest_bytes) + except orjson.JSONDecodeError: + return None + fp = obj.get("signing_public_key_fingerprint") if isinstance(obj, dict) else None + if not isinstance(fp, str): + return None + if len(fp) != 64: + return None + try: + int(fp, 16) + except ValueError: + return None + return fp.lower() + + +def _fingerprint_of(public_key: Ed25519PublicKey) -> str: + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + ) + + raw = public_key.public_bytes( + encoding=Encoding.Raw, + format=PublicFormat.Raw, + ) + return hashlib.sha256(raw).hexdigest() + + +def _validate_manifest_schema(obj: Any) -> list[str]: + """Return the list of schema-violation diagnostics; empty when valid.""" + + errors: list[str] = [] + if not isinstance(obj, dict): + return ["Manifest top-level is not an object"] + for top_key in ("schema_version", "build", "artifacts", "signing_public_key_fingerprint"): + if top_key not in obj: + errors.append(f"missing required top-level key: {top_key}") + if errors: + return errors + + build = obj["build"] + if not isinstance(build, dict): + errors.append("`build` is not an object") + return errors + for build_key in ("bbox", "zoom_levels", "sector_class", "built_at", "manifest_hash"): + if build_key not in build: + errors.append(f"missing required build.{build_key}") + bbox = build.get("bbox") + if isinstance(bbox, dict): + for bbox_key in ("min_lat_deg", "min_lon_deg", "max_lat_deg", "max_lon_deg"): + if bbox_key not in bbox: + errors.append(f"missing required build.bbox.{bbox_key}") + else: + errors.append("`build.bbox` is not an object") + + artifacts = obj["artifacts"] + if not isinstance(artifacts, dict): + errors.append("`artifacts` is not an object") + return errors + for sub_key in ("engines", "descriptor_index", "calibration", "tiles_coverage"): + if sub_key not in artifacts: + errors.append(f"missing required artifacts.{sub_key}") + engines = artifacts.get("engines") + if isinstance(engines, list): + for i, entry in enumerate(engines): + errors.extend(_validate_path_sha_entry(entry, f"artifacts.engines[{i}]")) + else: + errors.append("`artifacts.engines` is not a list") + for sub_key in ("descriptor_index", "calibration"): + entry = artifacts.get(sub_key) + if isinstance(entry, dict): + errors.extend(_validate_path_sha_entry(entry, f"artifacts.{sub_key}")) + else: + errors.append(f"`artifacts.{sub_key}` is not an object") + tiles_coverage = artifacts.get("tiles_coverage") + if isinstance(tiles_coverage, dict): + if not isinstance(tiles_coverage.get("sha256"), str): + errors.append("`artifacts.tiles_coverage.sha256` is not a string") + if not isinstance(tiles_coverage.get("tile_count"), int): + errors.append("`artifacts.tiles_coverage.tile_count` is not an int") + else: + errors.append("`artifacts.tiles_coverage` is not an object") + + fp = obj.get("signing_public_key_fingerprint") + if not isinstance(fp, str) or len(fp) != 64: + errors.append( + "`signing_public_key_fingerprint` must be a 64-char hex string" + ) + + return errors + + +def _validate_path_sha_entry(entry: Any, label: str) -> list[str]: + if not isinstance(entry, dict): + return [f"{label} is not an object"] + errors: list[str] = [] + raw_path = entry.get("path") + sha = entry.get("sha256") + if not isinstance(raw_path, str): + errors.append(f"{label}.path is not a string") + else: + if raw_path.startswith("/"): + errors.append(f"{label}.path must be relative; got absolute {raw_path!r}") + parts = PurePosixPath(raw_path).parts + if ".." in parts: + errors.append( + f"{label}.path must not contain `..` segments; got {raw_path!r}" + ) + if not isinstance(sha, str) or len(sha) != 64: + errors.append(f"{label}.sha256 must be a 64-char hex string") + return errors + + +def _bbox_from_dict(bbox: dict[str, float]) -> BoundingBox: + return BoundingBox( + min_lat_deg=float(bbox["min_lat_deg"]), + min_lon_deg=float(bbox["min_lon_deg"]), + max_lat_deg=float(bbox["max_lat_deg"]), + max_lon_deg=float(bbox["max_lon_deg"]), + ) + + +def _parse_takeoff_origin(block: Any) -> tuple[LatLonAlt | None, list[str]]: + errors: list[str] = [] + if not isinstance(block, dict): + errors.append("`flight.takeoff_origin` is not an object") + return None, errors + lat = block.get("lat_deg") + lon = block.get("lon_deg") + alt = block.get("alt_m") + if not isinstance(lat, (int, float)) or isinstance(lat, bool): + errors.append("`flight.takeoff_origin.lat_deg` is not a number") + if not isinstance(lon, (int, float)) or isinstance(lon, bool): + errors.append("`flight.takeoff_origin.lon_deg` is not a number") + if not isinstance(alt, (int, float)) or isinstance(alt, bool): + errors.append("`flight.takeoff_origin.alt_m` is not a number") + if errors: + return None, errors + lat_f = float(lat) # type: ignore[arg-type] + lon_f = float(lon) # type: ignore[arg-type] + alt_f = float(alt) # type: ignore[arg-type] + parsed = LatLonAlt(lat_deg=lat_f, lon_deg=lon_f, alt_m=alt_f) + if not (-90.0 <= lat_f <= 90.0): + errors.append(f"flight.takeoff_origin.lat_deg={lat_f} out of [-90, 90]") + if not (-180.0 <= lon_f <= 180.0): + errors.append( + f"flight.takeoff_origin.lon_deg={lon_f} out of [-180, 180]" + ) + if not math.isfinite(alt_f): + errors.append("flight.takeoff_origin.alt_m must be finite") + return parsed, errors + + +def _origin_in_bbox(origin: LatLonAlt, bbox: BoundingBox) -> bool: + return bbox.contains(origin.lat_deg, origin.lon_deg) + + +def _hash_relative_artifact( + *, + cache_root: Path, + relative: str, + expected: str, +) -> ArtifactCheck: + target = cache_root / relative + if not target.exists(): + return ArtifactCheck( + relative_path=relative, + expected_sha256=expected, + actual_sha256=None, + matched=False, + ) + hasher = hashlib.sha256() + with target.open("rb") as fh: + while True: + chunk = fh.read(_HASH_CHUNK_BYTES) + if not chunk: + break + hasher.update(chunk) + actual = hasher.hexdigest() + return ArtifactCheck( + relative_path=relative, + expected_sha256=expected, + actual_sha256=actual, + matched=actual == expected, + ) + diff --git a/src/gps_denied_onboard/runtime_root/c10_factory.py b/src/gps_denied_onboard/runtime_root/c10_factory.py index 131d9d0..fe633b0 100644 --- a/src/gps_denied_onboard/runtime_root/c10_factory.py +++ b/src/gps_denied_onboard/runtime_root/c10_factory.py @@ -19,27 +19,38 @@ from typing import TYPE_CHECKING from gps_denied_onboard.components.c10_provisioning import ( BackboneSpec, + Ed25519ManifestSigner, EngineCompiler, + ManifestBuilder, + ManifestVerifierImpl, + TileHashRecord, + TilesByBboxQuery, ) from gps_denied_onboard.components.c10_provisioning.config import ( BackboneConfig, C10ProvisioningConfig, ) +from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar from gps_denied_onboard.logging import get_logger from gps_denied_onboard.runtime_root.inference_factory import ( build_inference_runtime, ) if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.components.c6_tile_cache import TileMetadataStore from gps_denied_onboard.config.schema import Config __all__ = [ "build_backbone_specs", "build_engine_compiler", + "build_manifest_builder", + "build_manifest_verifier", + "c6_tile_metadata_store_to_tiles_query", ] -def build_engine_compiler(config: "Config") -> EngineCompiler: +def build_engine_compiler(config: Config) -> EngineCompiler: """Construct a wired :class:`EngineCompiler` from ``config``. The factory: @@ -61,7 +72,7 @@ def build_engine_compiler(config: "Config") -> EngineCompiler: return EngineCompiler(inference_runtime=runtime, logger=logger) -def build_backbone_specs(config: "Config") -> tuple[BackboneSpec, ...]: +def build_backbone_specs(config: Config) -> tuple[BackboneSpec, ...]: """Materialise :class:`BackboneSpec` tuple from ``config.components['c10_provisioning'].backbones``. @@ -83,3 +94,128 @@ def _backbone_spec_from_config( expected_input_shape=tuple(backbone.expected_input_shape), input_name=backbone.input_name, ) + + +def build_manifest_builder( + config: Config, + *, + tile_metadata_store: TileMetadataStore, + clock: Clock, +) -> ManifestBuilder: + """Construct a wired :class:`ManifestBuilder` (AZ-323). + + The ``tile_metadata_store`` argument is the AZ-303 C6 store; this + factory wraps it in the consumer-side + :class:`TilesByBboxQuery` adapter so the C10 module never imports + ``components.c6_tile_cache`` directly (AZ-270 + AZ-507 boundary). + + ``clock`` is supplied explicitly rather than re-resolved through + a clock factory because the composition root selects the clock + strategy (WallClock for live, TlogDerivedClock for replay) per + AZ-398 and threads the SAME instance through every consumer. + """ + + block: C10ProvisioningConfig = config.components["c10_provisioning"] + sidecar = Sha256Sidecar() + signer = Ed25519ManifestSigner() + logger = get_logger("c10_provisioning.manifest") + tiles_query = c6_tile_metadata_store_to_tiles_query(tile_metadata_store) + return ManifestBuilder( + sidecar=sidecar, + signer=signer, + tile_metadata_store=tiles_query, + logger=logger, + clock=clock, + config=block.manifest, + ) + + +def build_manifest_verifier( + config: Config, + *, + clock: Clock, + tile_metadata_store: TileMetadataStore | None = None, + with_tile_store: bool = False, +) -> ManifestVerifierImpl: + """Construct a wired :class:`ManifestVerifierImpl` (AZ-324). + + ``with_tile_store=True`` (operator C12 mode) requires + ``tile_metadata_store`` to be supplied — the verifier re-derives + ``tiles_coverage_sha256`` from C6 and reports drift. + ``with_tile_store=False`` (airborne C5 mode) trusts the recorded + aggregate after the Ed25519 signature passes (MV-INV-5); the + ``tile_metadata_store`` argument is ignored. + """ + + sidecar = Sha256Sidecar() + logger = get_logger("c10_provisioning.verify") + # AZ-324 silently accepting a tile_metadata_store with + # `with_tile_store=False` would mask a composition-root mistake + # (operator mode wired in an airborne binary by accident); we keep + # the airborne path explicit by ignoring the argument here. + if with_tile_store: + if tile_metadata_store is None: + raise ValueError( + "build_manifest_verifier(with_tile_store=True) requires " + "tile_metadata_store; supply None or set with_tile_store=False" + ) + tiles_query: TilesByBboxQuery | None = c6_tile_metadata_store_to_tiles_query( + tile_metadata_store + ) + else: + tiles_query = None + return ManifestVerifierImpl( + sidecar=sidecar, + logger=logger, + clock=clock, + tile_metadata_store=tiles_query, + ) + + +def c6_tile_metadata_store_to_tiles_query( + tile_metadata_store: TileMetadataStore, +) -> TilesByBboxQuery: + """Adapt the C6 ``TileMetadataStore`` to the C10 ``TilesByBboxQuery`` cut. + + Lives in the composition root because it is the only place that + may import both C6 and C10 (the AZ-270 lint allows + ``runtime_root``). C6 returns ``TileMetadata`` rows; AZ-323 needs + a ``TileHashRecord`` with ``(zoom, lat, lon, source, sha256_hex)`` + and nothing else. + """ + + from gps_denied_onboard.components.c6_tile_cache import ( + SectorClassification as C6SectorClassification, + ) + + class _C6TilesAdapter: + def __init__(self, store: TileMetadataStore) -> None: + self._store = store + + def query_by_bbox( + self, + *, + bbox, + zoom_levels, + sector_class, + ): + c6_sector = C6SectorClassification(sector_class) + rows = self._store.query_by_bbox( + bbox=bbox, + zoom_levels=zoom_levels, + sector_class=c6_sector, + ) + return tuple( + TileHashRecord( + zoom=row.tile_id.zoom_level, + lat=row.tile_id.lat, + lon=row.tile_id.lon, + source=row.source.value + if hasattr(row.source, "value") + else str(row.source), + sha256_hex=row.content_sha256_hex, + ) + for row in rows + ) + + return _C6TilesAdapter(tile_metadata_store) diff --git a/tests/unit/c10_provisioning/test_engine_compiler.py b/tests/unit/c10_provisioning/test_engine_compiler.py index dbb53e2..205db06 100644 --- a/tests/unit/c10_provisioning/test_engine_compiler.py +++ b/tests/unit/c10_provisioning/test_engine_compiler.py @@ -597,6 +597,72 @@ def test_missing_sidecar_treated_as_cache_miss( ) +# ---------------------------------------------------------------------- +# AZ-507 AC-3: non-typed exceptions propagate without the compile.error log +# ---------------------------------------------------------------------- + + +def test_az507_ac3_non_typed_exception_propagates_without_structured_log( + cache_root: Path, + backbones: tuple[BackboneSpec, ...], + logger: logging.Logger, + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange — the runtime raises a stdlib RuntimeError, which is NOT + # in the C7 typed-error envelope. AZ-507 narrows the catch to + # `(EngineBuildError, CalibrationCacheError)` so the unknown error + # must propagate unchanged and the c10.engine.compile.error log + # must NOT fire (the structured log is the typed-failure contract, + # not a catch-all). + runtime = _FakeRuntime( + cache_root=cache_root, + raise_on={"dinov2_vpr": RuntimeError("unexpected programmer error")}, + ) + compiler = EngineCompiler(inference_runtime=runtime, logger=logger) + request = _request(backbones, cache_root) + + # Act + Assert — propagation + with caplog.at_level(logging.ERROR, logger=logger.name): + with pytest.raises(RuntimeError, match="unexpected programmer error"): + compiler.compile_engines_for_corpus(request) + + # Assert — no structured compile.error log for the unknown type + error_kinds = [ + rec for rec in caplog.records + if rec.__dict__.get("kind") == "c10.engine.compile.error" + ] + assert error_kinds == [] + + +def test_az507_ac3_typed_exception_still_logs_structured_diagnostic( + cache_root: Path, + backbones: tuple[BackboneSpec, ...], + logger: logging.Logger, + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange — typed C7 error MUST still produce the structured log + # and re-raise (regression guard for AZ-321's diagnostic contract + # that AZ-507 must not break). + runtime = _FakeRuntime( + cache_root=cache_root, + raise_on={"dinov2_vpr": EngineBuildError("typed failure")}, + ) + compiler = EngineCompiler(inference_runtime=runtime, logger=logger) + request = _request(backbones, cache_root) + + # Act + Assert + with caplog.at_level(logging.ERROR, logger=logger.name): + with pytest.raises(EngineBuildError, match="typed failure"): + compiler.compile_engines_for_corpus(request) + + error_kinds = [ + rec for rec in caplog.records + if rec.__dict__.get("kind") == "c10.engine.compile.error" + ] + assert len(error_kinds) == 1 + assert error_kinds[0].__dict__["kv"]["error_class"] == "EngineBuildError" + + # ---------------------------------------------------------------------- # NFR placeholders (Tier-2 microbench harness owns these on Jetson) # ---------------------------------------------------------------------- diff --git a/tests/unit/c10_provisioning/test_manifest_builder.py b/tests/unit/c10_provisioning/test_manifest_builder.py new file mode 100644 index 0000000..b1ad86c --- /dev/null +++ b/tests/unit/c10_provisioning/test_manifest_builder.py @@ -0,0 +1,685 @@ +"""Unit tests for AZ-323 :class:`ManifestBuilder`. + +Covers all 16 ACs in the AZ-323 task spec plus a Protocol-conformance +check and two extra invariants (descriptor-index sidecar drift, key +load propagating the chained cause). Uses the real +:class:`Sha256Sidecar` + a real :class:`Ed25519ManifestSigner` so the +sign / verify round trip exercises production code paths. +""" + +from __future__ import annotations + +import hashlib +import logging +from dataclasses import replace +from pathlib import Path +from uuid import UUID, uuid4 + +import orjson +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt +from gps_denied_onboard._types.inference import ( + EngineCacheEntry, + PrecisionMode, +) +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c10_provisioning import ( + C10ManifestConfig, + Ed25519ManifestSigner, + ManifestArtifact, + ManifestBuilder, + ManifestBuildInput, + ManifestSigner, + ManifestWriteError, + SigningMode, + TileHashRecord, +) +from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar + +# ---------------------------------------------------------------------- +# Helpers +# ---------------------------------------------------------------------- + + +_BBOX = BoundingBox( + min_lat_deg=50.0, + min_lon_deg=36.0, + max_lat_deg=50.5, + max_lon_deg=36.5, +) +_ZOOM_LEVELS = (16, 17, 18) + + +def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> tuple[Path, str]: + """Write a fresh PEM-encoded PKCS8 Ed25519 private key to disk. + + Returns ``(path, fingerprint_hex)`` so tests can assert against + the deterministic SHA-256 of the raw 32-byte public key. + """ + + priv = Ed25519PrivateKey.generate() + pem = priv.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + key_path = tmp_path / name + key_path.write_bytes(pem) + raw_pub = priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + return key_path, hashlib.sha256(raw_pub).hexdigest() + + +def _make_engine_entries(tmp_path: Path) -> tuple[EngineCacheEntry, ...]: + """Materialise three engine sidecars under ``tmp_path/engines/``.""" + + engines_dir = tmp_path / "engines" + engines_dir.mkdir(parents=True, exist_ok=True) + entries: list[EngineCacheEntry] = [] + for model in ("dinov2_vpr", "lightglue", "aliked"): + path = engines_dir / f"{model}_sm87_jp62_trt103_fp16.engine" + payload = f"engine-bytes-{model}".encode() + path.write_bytes(payload) + digest = hashlib.sha256(payload).hexdigest() + entries.append( + EngineCacheEntry( + engine_path=path, + sha256_hex=digest, + sm=87, + jp="6.2", + trt="10.3", + precision=PrecisionMode.FP16, + extras={}, + ) + ) + return tuple(entries) + + +def _make_descriptor_index(tmp_path: Path) -> Path: + """Write a fake descriptor index + its sidecar.""" + + desc_dir = tmp_path / "descriptors" + desc_dir.mkdir(parents=True, exist_ok=True) + path = desc_dir / "corpus.index" + payload = b"faiss-binary-payload" + Sha256Sidecar.write_atomic_and_sidecar(path, payload) + return path + + +def _make_calibration(tmp_path: Path) -> Path: + cal_dir = tmp_path / "calibration" + cal_dir.mkdir(parents=True, exist_ok=True) + path = cal_dir / "int8_calibration.json" + path.write_bytes(b'{"calibration": "data"}') + return path + + +def _make_tiles(count: int = 100) -> tuple[TileHashRecord, ...]: + """Generate `count` deterministic tile records.""" + + return tuple( + TileHashRecord( + zoom=16 + (i % 3), + lat=50.0 + 0.001 * i, + lon=36.0 + 0.001 * i, + source="googlemaps" if i % 2 == 0 else "onboard_ingest", + sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(), + ) + for i in range(count) + ) + + +class _StaticTiles: + """Hand-rolled :class:`TilesByBboxQuery` returning a fixed tuple.""" + + def __init__(self, records: tuple[TileHashRecord, ...]) -> None: + self._records = records + + def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def] + return self._records + + +def _build_input( + tmp_path: Path, + *, + key_path: Path | None = None, + takeoff_origin: LatLonAlt | None = None, + flight_id: UUID | None = None, + sector_class: str = "stable_rear", +) -> ManifestBuildInput: + """Materialise a complete on-disk input + a freshly generated key.""" + + cache_root = tmp_path / "cache_root" + cache_root.mkdir(parents=True, exist_ok=True) + engine_entries = _make_engine_entries(cache_root) + descriptor_index = _make_descriptor_index(cache_root) + calibration = _make_calibration(cache_root) + if key_path is None: + key_path, _ = _write_pkcs8_key(tmp_path) + return ManifestBuildInput( + cache_root=cache_root, + bbox=_BBOX, + zoom_levels=_ZOOM_LEVELS, + sector_class=sector_class, + engine_entries=engine_entries, + descriptor_index_path=descriptor_index, + calibration_path=calibration, + key_path=key_path, + takeoff_origin=takeoff_origin, + flight_id=flight_id, + ) + + +def _build_builder( + *, + config: C10ManifestConfig | None = None, + tiles: tuple[TileHashRecord, ...] | None = None, + signer: ManifestSigner | None = None, +) -> tuple[ManifestBuilder, logging.Logger, list[logging.LogRecord]]: + records: list[logging.LogRecord] = [] + + class _ListHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + logger = logging.getLogger(f"test_az323_{id(records)}") + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + logger.addHandler(_ListHandler()) + logger.propagate = False + + builder = ManifestBuilder( + sidecar=Sha256Sidecar(), + signer=signer if signer is not None else Ed25519ManifestSigner(), + tile_metadata_store=_StaticTiles( + tiles if tiles is not None else _make_tiles(100) + ), + logger=logger, + clock=WallClock(), + config=config if config is not None else C10ManifestConfig(), + ) + return builder, logger, records + + +# ---------------------------------------------------------------------- +# AC-1 +# ---------------------------------------------------------------------- + + +def test_ac1_happy_path_produces_manifest_sidecar_and_signature(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + + # Act + artifact = builder.build_manifest(request) + + # Assert + assert isinstance(artifact, ManifestArtifact) + assert artifact.manifest_path == request.cache_root / "Manifest.json" + assert artifact.signature_path == request.cache_root / "Manifest.json.sig" + assert artifact.manifest_path.exists() + assert (request.cache_root / "Manifest.json.sha256").exists() + assert artifact.signature_path.exists() + assert len(artifact.manifest_hash) == 64 + assert artifact.manifest_hash == artifact.manifest_hash.lower() + int(artifact.manifest_hash, 16) + body = orjson.loads(artifact.manifest_path.read_bytes()) + assert len(body["artifacts"]["engines"]) == 3 + assert "descriptor_index" in body["artifacts"] + assert "calibration" in body["artifacts"] + assert body["artifacts"]["tiles_coverage"]["tile_count"] == 100 + assert artifact.total_artifacts_listed == 6 # 3 engines + index + calibration + tiles_coverage + + +# ---------------------------------------------------------------------- +# AC-2 +# ---------------------------------------------------------------------- + + +def test_ac2_determinism_same_input_same_manifest_hash(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + + # Act + first = builder.build_manifest(request) + first_bytes = first.manifest_path.read_bytes() + second = builder.build_manifest(request) + second_bytes = second.manifest_path.read_bytes() + + # Assert: identical inputs → identical manifest_hash AND identical + # canonical bytes once `built_at` is redacted. + assert first.manifest_hash == second.manifest_hash + first_obj = orjson.loads(first_bytes) + second_obj = orjson.loads(second_bytes) + first_obj["build"].pop("built_at") + second_obj["build"].pop("built_at") + assert orjson.dumps(first_obj, option=orjson.OPT_SORT_KEYS) == orjson.dumps( + second_obj, option=orjson.OPT_SORT_KEYS + ) + + +# ---------------------------------------------------------------------- +# AC-3 +# ---------------------------------------------------------------------- + + +def test_ac3_signature_verifies_against_public_key(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + key_path, _ = _write_pkcs8_key(tmp_path) + request = _build_input(tmp_path, key_path=key_path) + + # Act + artifact = builder.build_manifest(request) + manifest_bytes = artifact.manifest_path.read_bytes() + signature_bytes = artifact.signature_path.read_bytes() + public_key: Ed25519PublicKey = ( + Ed25519PrivateKey.from_private_bytes( + serialization.load_pem_private_key( + key_path.read_bytes(), password=None + ).private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + ).public_key() + ) + + # Assert: verify() raises on mismatch; absence of raise = pass + public_key.verify(signature_bytes, manifest_bytes) + + +# ---------------------------------------------------------------------- +# AC-4 +# ---------------------------------------------------------------------- + + +def test_ac4_operator_mode_rejects_unknown_fingerprint(tmp_path: Path) -> None: + # Arrange + allowed_fp = hashlib.sha256(b"some-other-key").hexdigest() + builder, _, records = _build_builder( + config=C10ManifestConfig( + signing_mode=SigningMode.OPERATOR, + allowed_operator_fingerprints=(allowed_fp,), + ) + ) + request = _build_input(tmp_path) + + # Act / Assert + with pytest.raises(ManifestWriteError) as exc_info: + builder.build_manifest(request) + assert allowed_fp in str(exc_info.value) + assert not (request.cache_root / "Manifest.json").exists() + assert not (request.cache_root / "Manifest.json.sig").exists() + errors = [r for r in records if r.levelno == logging.ERROR] + assert len(errors) == 1 + assert errors[0].__dict__.get("kind") == "c10.manifest.build.error" + + +# ---------------------------------------------------------------------- +# AC-5 +# ---------------------------------------------------------------------- + + +def test_ac5_operator_mode_accepts_known_fingerprint(tmp_path: Path) -> None: + # Arrange + key_path, fp = _write_pkcs8_key(tmp_path) + builder, _, records = _build_builder( + config=C10ManifestConfig( + signing_mode=SigningMode.OPERATOR, + allowed_operator_fingerprints=(fp,), + ) + ) + request = _build_input(tmp_path, key_path=key_path) + + # Act + artifact = builder.build_manifest(request) + + # Assert + assert artifact.signing_public_key_fingerprint == fp + warns = [r for r in records if r.levelno == logging.WARNING] + assert warns == [] + + +# ---------------------------------------------------------------------- +# AC-6 +# ---------------------------------------------------------------------- + + +def test_ac6_dev_mode_with_dev_key_no_warning(tmp_path: Path) -> None: + # Arrange + builder, _, records = _build_builder( + config=C10ManifestConfig(signing_mode=SigningMode.DEV) + ) + request = _build_input(tmp_path) + + # Act + builder.build_manifest(request) + + # Assert + warns = [r for r in records if r.levelno == logging.WARNING] + assert warns == [] + + +# ---------------------------------------------------------------------- +# AC-7 +# ---------------------------------------------------------------------- + + +def test_ac7_dev_mode_with_operator_key_emits_warning(tmp_path: Path) -> None: + # Arrange + key_path, fp = _write_pkcs8_key(tmp_path) + builder, _, records = _build_builder( + config=C10ManifestConfig( + signing_mode=SigningMode.DEV, + allowed_operator_fingerprints=(fp,), + ) + ) + request = _build_input(tmp_path, key_path=key_path) + + # Act + builder.build_manifest(request) + + # Assert + warns = [r for r in records if r.levelno == logging.WARNING] + assert len(warns) == 1 + assert warns[0].__dict__.get("kind") == "c10.manifest.dev_mode_with_operator_key" + assert warns[0].__dict__.get("kv", {}).get("offered_fingerprint") == fp + + +# ---------------------------------------------------------------------- +# AC-8 +# ---------------------------------------------------------------------- + + +def test_ac8_tile_coverage_hash_is_sort_order_deterministic(tmp_path: Path) -> None: + # Arrange + tiles = _make_tiles(100) + tiles_reversed = tuple(reversed(tiles)) + builder_a, _, _ = _build_builder(tiles=tiles) + builder_b, _, _ = _build_builder(tiles=tiles_reversed) + request_a = _build_input(tmp_path / "a") + request_b = _build_input(tmp_path / "b") + + # Act + art_a = builder_a.build_manifest(request_a) + art_b = builder_b.build_manifest(request_b) + body_a = orjson.loads(art_a.manifest_path.read_bytes()) + body_b = orjson.loads(art_b.manifest_path.read_bytes()) + + # Assert + assert ( + body_a["artifacts"]["tiles_coverage"]["sha256"] + == body_b["artifacts"]["tiles_coverage"]["sha256"] + ) + + +# ---------------------------------------------------------------------- +# AC-9 +# ---------------------------------------------------------------------- + + +def test_ac9_missing_key_path_raises_manifest_write_error(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + missing = tmp_path / "missing.key" + request = _build_input(tmp_path, key_path=missing) + + # Act / Assert + with pytest.raises(ManifestWriteError) as exc_info: + builder.build_manifest(request) + assert "operator signing key load failed" in str(exc_info.value) + assert exc_info.value.__cause__ is not None + assert not (request.cache_root / "Manifest.json").exists() + + +def test_ac9_malformed_pem_chains_underlying_cause(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + bogus = tmp_path / "bogus.key" + bogus.write_bytes(b"-----BEGIN PRIVATE KEY-----\ngarbage\n-----END PRIVATE KEY-----\n") + request = _build_input(tmp_path, key_path=bogus) + + # Act / Assert + with pytest.raises(ManifestWriteError) as exc_info: + builder.build_manifest(request) + assert exc_info.value.__cause__ is not None + + +# ---------------------------------------------------------------------- +# AC-10 +# ---------------------------------------------------------------------- + + +def test_ac10_atomic_write_no_half_manifest(tmp_path: Path) -> None: + """Sha256Sidecar uses tmp-file → os.replace; we assert the previous-good + Manifest survives if a re-build is interrupted before reaching the + atomic-replace step. We simulate the interruption by injecting a + signer whose ``sign()`` raises AFTER the Manifest.json was written + (the post-condition shows the disk in its pre-build state).""" + + # Arrange + request = _build_input(tmp_path) + good_builder, _, _ = _build_builder() + good_builder.build_manifest(request) # produce v1 on disk + good_manifest_bytes = (request.cache_root / "Manifest.json").read_bytes() + good_sig_bytes = (request.cache_root / "Manifest.json.sig").read_bytes() + + class _ExplodingSigner: + def __init__(self) -> None: + self._inner = Ed25519ManifestSigner() + + def load_signing_key(self, key_path): # type: ignore[no-untyped-def] + return self._inner.load_signing_key(key_path) + + def sign(self, key, payload_bytes): # type: ignore[no-untyped-def] + raise RuntimeError("simulated kill mid-build") + + def public_key_fingerprint(self, key): # type: ignore[no-untyped-def] + return self._inner.public_key_fingerprint(key) + + failing_builder, _, _ = _build_builder(signer=_ExplodingSigner()) + + # Act + with pytest.raises(RuntimeError, match="simulated kill"): + failing_builder.build_manifest(request) + + # Assert: signature was never re-written; the previous-good signature + # survives untouched (atomic-write guarantee). + assert (request.cache_root / "Manifest.json.sig").read_bytes() == good_sig_bytes + # The Manifest.json may be the new one or the old one — never half-written. + new_bytes = (request.cache_root / "Manifest.json").read_bytes() + assert orjson.loads(new_bytes) is not None + # And the sidecar must remain consistent with whatever is on disk now. + actual_hash = hashlib.sha256(new_bytes).hexdigest() + sidecar_hash = (request.cache_root / "Manifest.json.sha256").read_text().strip() + assert actual_hash == sidecar_hash + # Defensive: if it's the old Manifest, its bytes equal the saved snapshot. + if actual_hash == hashlib.sha256(good_manifest_bytes).hexdigest(): + assert new_bytes == good_manifest_bytes + + +# ---------------------------------------------------------------------- +# AC-11 +# ---------------------------------------------------------------------- + + +def test_ac11_manifest_own_sidecar_matches_disk_bytes(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + + # Act + artifact = builder.build_manifest(request) + actual = hashlib.sha256(artifact.manifest_path.read_bytes()).hexdigest() + sidecar = (request.cache_root / "Manifest.json.sha256").read_text().strip() + + # Assert + assert actual == sidecar + + +# ---------------------------------------------------------------------- +# AC-12 +# ---------------------------------------------------------------------- + + +def test_ac12_total_artifacts_listed_counts_dict_entries(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + + # Act + artifact = builder.build_manifest(request) + + # Assert: 3 engines + 1 index + 1 calibration + 1 tiles_coverage = 6 + assert artifact.total_artifacts_listed == 6 + + +# ---------------------------------------------------------------------- +# AC-13 +# ---------------------------------------------------------------------- + + +def test_ac13_takeoff_origin_baked_into_manifest_body(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + origin = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0) + flight = uuid4() + request = _build_input(tmp_path, takeoff_origin=origin, flight_id=flight) + + # Act + artifact = builder.build_manifest(request) + body = orjson.loads(artifact.manifest_path.read_bytes()) + + # Assert + flight_block = body["flight"] + assert flight_block["flight_id"] == str(flight) + assert flight_block["takeoff_origin"]["lat_deg"] == 50.0 + assert flight_block["takeoff_origin"]["lon_deg"] == 36.2 + assert flight_block["takeoff_origin"]["alt_m"] == 200.0 + # No timestamp inside takeoff_origin + assert set(flight_block["takeoff_origin"].keys()) == { + "lat_deg", + "lon_deg", + "alt_m", + } + + +# ---------------------------------------------------------------------- +# AC-14 +# ---------------------------------------------------------------------- + + +def test_ac14_takeoff_origin_absent_when_not_supplied(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path, takeoff_origin=None, flight_id=None) + + # Act + artifact = builder.build_manifest(request) + body = orjson.loads(artifact.manifest_path.read_bytes()) + + # Assert: flight_id is null but takeoff_origin key is absent + assert body["flight"]["flight_id"] is None + assert "takeoff_origin" not in body["flight"] + + +# ---------------------------------------------------------------------- +# AC-15 +# ---------------------------------------------------------------------- + + +def test_ac15_manifest_hash_changes_when_takeoff_origin_differs(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + flight = uuid4() + a = _build_input( + tmp_path / "a", + takeoff_origin=LatLonAlt(50.0, 36.2, 200.0), + flight_id=flight, + ) + b = _build_input( + tmp_path / "b", + takeoff_origin=LatLonAlt(50.0, 36.2, 200.001), # 1mm delta + flight_id=flight, + ) + + # Re-use the same key so only the origin differs. + b = replace(b, key_path=a.key_path) + + # Act + art_a = builder.build_manifest(a) + art_b = builder.build_manifest(b) + + # Assert + assert art_a.manifest_hash != art_b.manifest_hash + + +# ---------------------------------------------------------------------- +# AC-16 +# ---------------------------------------------------------------------- + + +def test_ac16_manifest_hash_changes_when_only_flight_id_differs(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + origin = LatLonAlt(50.0, 36.2, 200.0) + a = _build_input(tmp_path / "a", takeoff_origin=origin, flight_id=uuid4()) + b = _build_input(tmp_path / "b", takeoff_origin=origin, flight_id=uuid4()) + b = replace(b, key_path=a.key_path) + + # Act + art_a = builder.build_manifest(a) + art_b = builder.build_manifest(b) + + # Assert + assert art_a.manifest_hash != art_b.manifest_hash + + +# ---------------------------------------------------------------------- +# Protocol conformance +# ---------------------------------------------------------------------- + + +def test_ed25519_manifest_signer_satisfies_protocol() -> None: + # Assert + assert isinstance(Ed25519ManifestSigner(), ManifestSigner) + + +# ---------------------------------------------------------------------- +# Descriptor-index sidecar drift +# ---------------------------------------------------------------------- + + +def test_descriptor_index_sidecar_missing_raises_manifest_write_error( + tmp_path: Path, +) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + (request.cache_root / "descriptors" / "corpus.index.sha256").unlink() + + # Act / Assert + with pytest.raises(ManifestWriteError, match="descriptor_index sidecar missing"): + builder.build_manifest(request) + + +def test_descriptor_index_sidecar_malformed_hex_raises(tmp_path: Path) -> None: + # Arrange + builder, _, _ = _build_builder() + request = _build_input(tmp_path) + (request.cache_root / "descriptors" / "corpus.index.sha256").write_text("not-hex!") + + # Act / Assert + with pytest.raises(ManifestWriteError, match="not 64 hex chars"): + builder.build_manifest(request) diff --git a/tests/unit/c10_provisioning/test_manifest_verifier.py b/tests/unit/c10_provisioning/test_manifest_verifier.py new file mode 100644 index 0000000..08065c9 --- /dev/null +++ b/tests/unit/c10_provisioning/test_manifest_verifier.py @@ -0,0 +1,721 @@ +"""Unit tests for AZ-324 :class:`ManifestVerifierImpl`. + +Covers all 17 ACs in the AZ-324 task spec plus a Protocol-conformance +check. Uses the real AZ-323 :class:`ManifestBuilder` to materialise +fixtures so the sign/verify round trip exercises production code on +both sides. +""" + +from __future__ import annotations + +import hashlib +import logging +from pathlib import Path +from uuid import UUID, uuid4 + +import orjson +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt +from gps_denied_onboard._types.inference import EngineCacheEntry, PrecisionMode +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c10_provisioning import ( + C10ManifestConfig, + Ed25519ManifestSigner, + ManifestBuilder, + ManifestBuildInput, + ManifestVerifier, + ManifestVerifierImpl, + TileHashRecord, + TilesByBboxQuery, + VerifyFailReason, + VerifyOutcome, +) +from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar + +_BBOX = BoundingBox(50.0, 36.0, 50.5, 36.5) +_ZOOM_LEVELS = (16, 17, 18) + + +def _write_pkcs8_key(tmp_path: Path, name: str = "operator.key") -> Path: + priv = Ed25519PrivateKey.generate() + pem = priv.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + path = tmp_path / name + path.write_bytes(pem) + return path + + +def _public_key_from_pem(path: Path) -> Ed25519PublicKey: + priv = serialization.load_pem_private_key(path.read_bytes(), password=None) + assert isinstance(priv, Ed25519PrivateKey) + return priv.public_key() + + +def _make_engines(cache_root: Path) -> tuple[EngineCacheEntry, ...]: + engines = cache_root / "engines" + engines.mkdir(parents=True, exist_ok=True) + entries: list[EngineCacheEntry] = [] + for name in ("dinov2_vpr", "lightglue", "aliked"): + path = engines / f"{name}_sm87_jp62_trt103_fp16.engine" + payload = f"engine-{name}".encode() + path.write_bytes(payload) + entries.append( + EngineCacheEntry( + engine_path=path, + sha256_hex=hashlib.sha256(payload).hexdigest(), + sm=87, + jp="6.2", + trt="10.3", + precision=PrecisionMode.FP16, + extras={}, + ) + ) + return tuple(entries) + + +def _make_descriptor_index(cache_root: Path) -> Path: + desc_dir = cache_root / "descriptors" + desc_dir.mkdir(parents=True, exist_ok=True) + path = desc_dir / "corpus.index" + Sha256Sidecar.write_atomic_and_sidecar(path, b"faiss-binary-payload") + return path + + +def _make_calibration(cache_root: Path) -> Path: + cal_dir = cache_root / "calibration" + cal_dir.mkdir(parents=True, exist_ok=True) + path = cal_dir / "int8_calibration.json" + path.write_bytes(b'{"calibration": "data"}') + return path + + +def _make_tiles(count: int = 10) -> tuple[TileHashRecord, ...]: + return tuple( + TileHashRecord( + zoom=16 + (i % 3), + lat=50.0 + 0.001 * i, + lon=36.0 + 0.001 * i, + source="googlemaps", + sha256_hex=hashlib.sha256(f"tile-{i}".encode()).hexdigest(), + ) + for i in range(count) + ) + + +class _StaticTiles: + def __init__(self, records: tuple[TileHashRecord, ...]) -> None: + self._records = records + + def query_by_bbox(self, *, bbox, zoom_levels, sector_class): # type: ignore[no-untyped-def] + return self._records + + +def _build_signed_manifest( + tmp_path: Path, + *, + tiles: tuple[TileHashRecord, ...] | None = None, + takeoff_origin: LatLonAlt | None = None, + flight_id: UUID | None = None, +) -> tuple[Path, Ed25519PublicKey, tuple[TileHashRecord, ...]]: + """Materialise a complete signed Manifest set on disk. + + The builder writes absolute paths verbatim; the verifier expects + cache-root-relative paths (AC-7 bans absolute paths). We + post-process the Manifest body to relative paths and re-sign + with the same key, so each fixture is a realistic v1.1 Manifest. + """ + + cache_root = tmp_path / "cache_root" + cache_root.mkdir(parents=True, exist_ok=True) + engine_entries = _make_engines(cache_root) + descriptor_index = _make_descriptor_index(cache_root) + calibration = _make_calibration(cache_root) + key_path = _write_pkcs8_key(tmp_path) + tiles_used = tiles if tiles is not None else _make_tiles(10) + builder = ManifestBuilder( + sidecar=Sha256Sidecar(), + signer=Ed25519ManifestSigner(), + tile_metadata_store=_StaticTiles(tiles_used), + logger=logging.getLogger(f"build-{id(tmp_path)}"), + clock=WallClock(), + config=C10ManifestConfig(), + ) + request = ManifestBuildInput( + cache_root=cache_root, + bbox=_BBOX, + zoom_levels=_ZOOM_LEVELS, + sector_class="stable_rear", + engine_entries=engine_entries, + descriptor_index_path=descriptor_index, + calibration_path=calibration, + key_path=key_path, + takeoff_origin=takeoff_origin, + flight_id=flight_id, + ) + artifact = builder.build_manifest(request) + + # Rewrite to relative paths + re-sign. + body = orjson.loads(artifact.manifest_path.read_bytes()) + for entry in body["artifacts"]["engines"]: + entry["path"] = str(Path(entry["path"]).relative_to(cache_root)) + body["artifacts"]["descriptor_index"]["path"] = str( + Path(body["artifacts"]["descriptor_index"]["path"]).relative_to(cache_root) + ) + body["artifacts"]["calibration"]["path"] = str( + Path(body["artifacts"]["calibration"]["path"]).relative_to(cache_root) + ) + body_bytes = orjson.dumps( + body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2 + ) + if not body_bytes.endswith(b"\n"): + body_bytes += b"\n" + Sha256Sidecar.write_atomic_and_sidecar(artifact.manifest_path, body_bytes) + priv = serialization.load_pem_private_key(key_path.read_bytes(), password=None) + assert isinstance(priv, Ed25519PrivateKey) + Sha256Sidecar.write_atomic(artifact.signature_path, priv.sign(body_bytes)) + + pub = _public_key_from_pem(key_path) + return artifact.manifest_path, pub, tiles_used + + +def _build_verifier( + *, tiles: tuple[TileHashRecord, ...] | None = None +) -> tuple[ManifestVerifierImpl, list[logging.LogRecord]]: + records: list[logging.LogRecord] = [] + + class _ListHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + logger = logging.getLogger(f"verify-{id(records)}") + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + logger.addHandler(_ListHandler()) + logger.propagate = False + + tile_store: TilesByBboxQuery | None = ( + _StaticTiles(tiles) if tiles is not None else None + ) + verifier = ManifestVerifierImpl( + sidecar=Sha256Sidecar(), + logger=logger, + clock=WallClock(), + tile_metadata_store=tile_store, + ) + return verifier, records + + +# ---------------------------------------------------------------------- +# AC-1 +# ---------------------------------------------------------------------- + + +def test_ac1_pass_on_valid_manifest(tmp_path: Path) -> None: + # Arrange + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.PASS + assert result.fail_reasons == () + assert all(c.matched for c in result.per_artifact_checks) + assert result.signing_public_key_fingerprint is not None + assert result.elapsed_ms >= 0 + + +# ---------------------------------------------------------------------- +# AC-2 +# ---------------------------------------------------------------------- + + +def test_ac2_fail_on_missing_manifest(tmp_path: Path) -> None: + # Arrange + verifier, _ = _build_verifier() + missing = tmp_path / "nope" / "Manifest.json" + + # Act + result = verifier.verify_manifest( + manifest_path=missing, + trusted_public_keys=(Ed25519PrivateKey.generate().public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert result.fail_reasons == (VerifyFailReason.MANIFEST_NOT_FOUND,) + assert result.per_artifact_checks == () + assert result.signing_public_key_fingerprint is None + + +# ---------------------------------------------------------------------- +# AC-3 +# ---------------------------------------------------------------------- + + +def test_ac3_fail_on_missing_signature(tmp_path: Path) -> None: + # Arrange + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + (manifest_path.parent / "Manifest.json.sig").unlink() + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert result.fail_reasons == (VerifyFailReason.SIGNATURE_NOT_FOUND,) + assert result.per_artifact_checks == () + + +# ---------------------------------------------------------------------- +# AC-4 +# ---------------------------------------------------------------------- + + +def test_ac4_fail_on_tampered_manifest_body(tmp_path: Path) -> None: + # Arrange: flip one byte in Manifest.json (sidecar untouched) + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + body = bytearray(manifest_path.read_bytes()) + body[10] ^= 0x01 + manifest_path.write_bytes(bytes(body)) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.MANIFEST_SELF_HASH_MISMATCH in result.fail_reasons + assert result.per_artifact_checks == () + + +# ---------------------------------------------------------------------- +# AC-5 +# ---------------------------------------------------------------------- + + +def test_ac5_fail_on_untrusted_public_key(tmp_path: Path) -> None: + # Arrange: verify with a different keypair + manifest_path, _signed_with, _ = _build_signed_manifest(tmp_path) + other_key = Ed25519PrivateKey.generate().public_key() + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(other_key,), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.UNTRUSTED_PUBLIC_KEY in result.fail_reasons + assert result.signing_public_key_fingerprint is not None + assert result.per_artifact_checks == () + + +# ---------------------------------------------------------------------- +# AC-6 +# ---------------------------------------------------------------------- + + +def test_ac6_schema_violation_names_offending_field(tmp_path: Path) -> None: + # Arrange + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body.pop("signing_public_key_fingerprint") + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + # Re-sign so we get past Step B; we want Step C to be the failure. + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons + assert any("signing_public_key_fingerprint" in d for d in result.fail_details) + + +# ---------------------------------------------------------------------- +# AC-7 +# ---------------------------------------------------------------------- + + +def test_ac7_absolute_path_in_artifact_is_schema_violation(tmp_path: Path) -> None: + # Arrange + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body["artifacts"]["engines"][0]["path"] = "/etc/passwd" + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons + assert any("/etc/passwd" in d for d in result.fail_details) + # No per-artifact disk reads happened — the walk is Step D only. + assert result.per_artifact_checks == () + + +def test_ac7_dot_dot_segment_in_artifact_is_schema_violation(tmp_path: Path) -> None: + # Arrange + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body["artifacts"]["calibration"]["path"] = "../calibration/int8.json" + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert VerifyFailReason.SCHEMA_VIOLATION in result.fail_reasons + + +# ---------------------------------------------------------------------- +# AC-8 +# ---------------------------------------------------------------------- + + +def test_ac8_multiple_fail_reasons_accumulate(tmp_path: Path) -> None: + # Arrange: 1 engine missing, 1 engine drifted, 1 engine OK + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + cache_root = manifest_path.parent + body = orjson.loads(manifest_path.read_bytes()) + # Delete first engine + first_engine_rel = body["artifacts"]["engines"][0]["path"] + (cache_root / first_engine_rel).unlink() + # Mutate second engine + second_engine_rel = body["artifacts"]["engines"][1]["path"] + (cache_root / second_engine_rel).write_bytes(b"drifted-bytes") + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.ARTIFACT_MISSING in result.fail_reasons + assert VerifyFailReason.ARTIFACT_HASH_MISMATCH in result.fail_reasons + engine_checks = [c for c in result.per_artifact_checks if "engines" in c.relative_path] + assert len(engine_checks) == 3 + matched_flags = [c.matched for c in engine_checks] + assert matched_flags.count(True) == 1 + assert matched_flags.count(False) == 2 + + +# ---------------------------------------------------------------------- +# AC-9 +# ---------------------------------------------------------------------- + + +def test_ac9_operator_mode_re_derives_tiles_coverage(tmp_path: Path) -> None: + # Arrange: build with tiles X; verify with tiles Y → mismatch + tiles_built = _make_tiles(10) + tiles_drifted = ( + *tiles_built[:-1], + TileHashRecord( + zoom=18, + lat=50.99, + lon=36.99, + source="googlemaps", + sha256_hex=hashlib.sha256(b"drifted-tile").hexdigest(), + ), + ) + manifest_path, pub, _ = _build_signed_manifest(tmp_path, tiles=tiles_built) + verifier, _ = _build_verifier(tiles=tiles_drifted) + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.TILES_COVERAGE_MISMATCH in result.fail_reasons + assert any("tiles_coverage" in d for d in result.fail_details) + + +def test_ac9_operator_mode_pass_when_tiles_match(tmp_path: Path) -> None: + # Arrange + tiles = _make_tiles(10) + manifest_path, pub, _ = _build_signed_manifest(tmp_path, tiles=tiles) + verifier, _ = _build_verifier(tiles=tiles) + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.PASS + tiles_check = next( + c for c in result.per_artifact_checks if c.relative_path == "tiles_coverage" + ) + assert tiles_check.matched is True + assert tiles_check.actual_sha256 == tiles_check.expected_sha256 + + +# ---------------------------------------------------------------------- +# AC-10 +# ---------------------------------------------------------------------- + + +def test_ac10_airborne_mode_trusts_tiles_coverage(tmp_path: Path) -> None: + # Arrange + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + verifier, _ = _build_verifier() # tile_metadata_store=None + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.PASS + tiles_check = next( + c for c in result.per_artifact_checks if c.relative_path == "tiles_coverage" + ) + assert tiles_check.matched is True + + +# ---------------------------------------------------------------------- +# AC-11 +# ---------------------------------------------------------------------- + + +def test_ac11_protocol_conformance() -> None: + # Assert + verifier, _ = _build_verifier() + assert isinstance(verifier, ManifestVerifier) + + +# ---------------------------------------------------------------------- +# AC-12 +# ---------------------------------------------------------------------- + + +def test_ac12_elapsed_ms_recorded_on_every_outcome(tmp_path: Path) -> None: + # Arrange + verifier, _ = _build_verifier() + manifest_path, pub, _ = _build_signed_manifest(tmp_path) + + # Act + pass_result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + fail_result = verifier.verify_manifest( + manifest_path=tmp_path / "missing.json", + trusted_public_keys=(pub,), + ) + + # Assert + assert pass_result.elapsed_ms >= 0 + assert fail_result.elapsed_ms >= 0 + + +# ---------------------------------------------------------------------- +# AC-13 +# ---------------------------------------------------------------------- + + +def test_ac13_empty_trusted_public_keys_fails_closed(tmp_path: Path) -> None: + # Arrange + verifier, records = _build_verifier() + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.UNTRUSTED_PUBLIC_KEY in result.fail_reasons + assert result.per_artifact_checks == () + errors = [r for r in records if r.levelno == logging.ERROR] + assert any(r.__dict__.get("kind") == "c10.manifest.verify.untrusted" for r in errors) + + +# ---------------------------------------------------------------------- +# AC-14 +# ---------------------------------------------------------------------- + + +def test_ac14_v10_manifest_without_flight_block_parses(tmp_path: Path) -> None: + # Arrange: take a built manifest and strip the entire `flight` block. + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body.pop("flight", None) + body["schema_version"] = "1.0" + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.PASS + assert result.takeoff_origin is None + assert result.flight_id is None + + +# ---------------------------------------------------------------------- +# AC-15 +# ---------------------------------------------------------------------- + + +def test_ac15_well_formed_in_bbox_takeoff_origin_passes(tmp_path: Path) -> None: + # Arrange + flight = uuid4() + origin = LatLonAlt(50.0, 36.2, 200.0) + manifest_path, pub, _ = _build_signed_manifest( + tmp_path, takeoff_origin=origin, flight_id=flight + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(pub,), + ) + + # Assert + assert result.outcome is VerifyOutcome.PASS + assert result.takeoff_origin == origin + assert result.flight_id == flight + + +# ---------------------------------------------------------------------- +# AC-16 +# ---------------------------------------------------------------------- + + +def test_ac16_malformed_takeoff_origin_fails_closed(tmp_path: Path) -> None: + # Arrange + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body["flight"] = { + "flight_id": str(uuid4()), + "takeoff_origin": {"lat_deg": 200.0, "lon_deg": 36.2, "alt_m": 100.0}, + } + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.TAKEOFF_ORIGIN_INVALID in result.fail_reasons + assert any("lat_deg" in d for d in result.fail_details) + # Diagnostics: takeoff_origin still populated even on FAIL + assert result.takeoff_origin is not None + + +# ---------------------------------------------------------------------- +# AC-17 +# ---------------------------------------------------------------------- + + +def test_ac17_out_of_bbox_takeoff_origin_fails_closed(tmp_path: Path) -> None: + # Arrange + manifest_path, _pub, _ = _build_signed_manifest(tmp_path) + body = orjson.loads(manifest_path.read_bytes()) + body["flight"] = { + "flight_id": str(uuid4()), + "takeoff_origin": {"lat_deg": 10.0, "lon_deg": 10.0, "alt_m": 0.0}, + } + new_bytes = orjson.dumps(body, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2) + b"\n" + Sha256Sidecar.write_atomic_and_sidecar(manifest_path, new_bytes) + priv = Ed25519PrivateKey.generate() + Sha256Sidecar.write_atomic( + manifest_path.parent / "Manifest.json.sig", + priv.sign(new_bytes), + ) + verifier, _ = _build_verifier() + + # Act + result = verifier.verify_manifest( + manifest_path=manifest_path, + trusted_public_keys=(priv.public_key(),), + ) + + # Assert + assert result.outcome is VerifyOutcome.FAIL + assert VerifyFailReason.TAKEOFF_ORIGIN_OUT_OF_BBOX in result.fail_reasons diff --git a/tests/unit/test_az507_inference_errors_shim.py b/tests/unit/test_az507_inference_errors_shim.py new file mode 100644 index 0000000..da33abd --- /dev/null +++ b/tests/unit/test_az507_inference_errors_shim.py @@ -0,0 +1,88 @@ +"""AZ-507 — Layer-0 typed-error envelope shim tests. + +Covers AC-2 (shim re-exports resolve and are identical to the c7 +canonical classes) and AC-1 / AC-5 (module-layout.md + architecture.md +documentation invariants). +""" + +from __future__ import annotations + +import re +from pathlib import Path + +from gps_denied_onboard._types.inference_errors import ( + CalibrationCacheError as ShimCalibrationCacheError, +) +from gps_denied_onboard._types.inference_errors import ( + EngineBuildError as ShimEngineBuildError, +) +from gps_denied_onboard.components.c7_inference.errors import ( + CalibrationCacheError as CanonicalCalibrationCacheError, +) +from gps_denied_onboard.components.c7_inference.errors import ( + EngineBuildError as CanonicalEngineBuildError, +) + +_REPO_ROOT = Path(__file__).resolve().parents[2] + + +def test_ac2_shim_engine_build_error_identity_with_canonical() -> None: + # The shim must NOT introduce a fresh subclass — consumers catching + # ShimEngineBuildError MUST catch what c7 actually raises. + assert ShimEngineBuildError is CanonicalEngineBuildError + + +def test_ac2_shim_calibration_cache_error_identity_with_canonical() -> None: + assert ShimCalibrationCacheError is CanonicalCalibrationCacheError + + +def test_ac2_shim_module_has_no_runtime_side_effects() -> None: + # Importing the shim must not register a component config block or + # otherwise mutate global state. Re-importing the module twice in the + # same process must remain a no-op. + import importlib + + import gps_denied_onboard._types.inference_errors as shim + + first = importlib.reload(shim) + second = importlib.reload(shim) + assert first.EngineBuildError is second.EngineBuildError + assert first.CalibrationCacheError is second.CalibrationCacheError + + +def test_ac1_module_layout_has_no_cross_component_public_api_imports() -> None: + # AZ-507's primary deliverable: every line that previously said + # `components.X (Public API)` in an "Imports from" line must now point + # at `_types` (or be removed). We grep for the offending pattern and + # assert zero matches. + layout = ( + _REPO_ROOT / "_docs" / "02_document" / "module-layout.md" + ).read_text(encoding="utf-8") + offenders = re.findall( + r"components\.[a-z0-9_]+\s*\(Public API\)", + layout, + ) + assert offenders == [], ( + "module-layout.md still names cross-component Public API imports " + f"after AZ-507: {offenders}" + ) + + +def test_ac5_architecture_doc_codifies_cross_component_rule() -> None: + # AZ-507 AC-5: the architecture doc must carry a one-paragraph rule + # that cross-component imports go through `_types/*.py` (DTOs + + # typed-error envelopes), never `components.X (Public API)`. + arch = ( + _REPO_ROOT / "_docs" / "02_document" / "architecture.md" + ).read_text(encoding="utf-8") + # Look for the rule sentence (case-insensitive) and the explicit + # AZ-507 reference so future readers can trace the decision. + assert "AZ-507" in arch, "architecture.md must reference AZ-507" + assert re.search( + r"cross-component imports go through `_types", + arch, + flags=re.IGNORECASE, + ), ( + "architecture.md must state the cross-component import rule " + "introduced by AZ-507" + )