"""Per-binary bootstrap for the airborne runtime (AZ-591). Populates the central ``_STRATEGY_REGISTRY`` (see :mod:`gps_denied_onboard.runtime_root`) with the (component, strategy) pairs the airborne binary supports, so that :func:`compose_root` can resolve ``config.components[slug].strategy`` for every strategy-selecting component (c1_vio, c2_vpr, c2_5_rerank, c3_matcher, c3_5_adhop, c4_pose, c5_state). Without this bootstrap, ``compose_root`` raises :class:`StrategyNotLinkedError` on the first component lookup because no module under :mod:`gps_denied_onboard.\ runtime_root` calls :func:`register_strategy` at import time — by design, per ADR-002, the build-flag gate must be the single place that decides which strategies are linked into a given binary. Call order at process start: 1. ``register_airborne_strategies()`` — once, before any ``compose_root`` call. 2. (Optional, test/production both) populate a ``pre_constructed`` dict with the infrastructure objects the wrappers below expect (``c13_fdr``, ``c6_descriptor_index``, ``c7_inference``, etc.). 3. ``compose_root(config, pre_constructed=pre_constructed)``. The wrapper factories below adapt the registry-factory signature ``(config, constructed)`` to each per-component factory's keyword-argument surface (e.g. ``build_vio_strategy(config, *, fdr_client=...)``). Every dep is looked up by a documented key in ``constructed``; a missing key surfaces as a :class:`AirborneBootstrapError` naming the missing dep + the consuming component slug. Lazy-loading is preserved at two levels: * **Central registry**: identity wrappers are registered for every strategy the binary supports. The wrappers themselves only import the concrete strategy module via the per-component factory (which already gates by ``BUILD_*`` env flags) — they do NOT eagerly import strategy modules. * **Per-component private registries** (``state_factory._STATE_REGISTRY`` needs explicit ``register_state_estimator`` calls; ``pose_factory`` has its own lazy-import fallback so no explicit registration is needed): the wrapper calls each strategy module's ``register()`` only when the config actually selects that strategy AND the matching ``BUILD_*`` flag is ON. ADR refs: ADR-001 (composition root is single registration site), ADR-002 (build-flag gate is the lazy-loading boundary), AZ-507 (cross-component import rule — bootstrap may import any component's Public API, but not its internals). """ from __future__ import annotations import json import logging import os from collections.abc import Mapping from pathlib import Path from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.fdr_client.client import make_fdr_client from gps_denied_onboard.helpers.feature_extractor import OpenCvOrbExtractor from gps_denied_onboard.helpers.imu_preintegrator import ( ImuPreintegrator, make_imu_preintegrator, ) from gps_denied_onboard.helpers.lightglue_runtime import LightGlueRuntime from gps_denied_onboard.helpers.ransac_filter import RansacFilter from gps_denied_onboard.helpers.wgs_converter import WgsConverter from gps_denied_onboard.runtime_root import register_strategy from gps_denied_onboard.runtime_root.errors import RuntimeNotAvailableError from gps_denied_onboard.runtime_root.inference_factory import build_inference_runtime from gps_denied_onboard.runtime_root.matcher_factory import build_matcher_strategy from gps_denied_onboard.runtime_root.pose_factory import build_pose_estimator from gps_denied_onboard.runtime_root.refiner_factory import build_refiner_strategy from gps_denied_onboard.runtime_root.rerank_factory import build_rerank_strategy from gps_denied_onboard.runtime_root.state_factory import build_state_estimator from gps_denied_onboard.runtime_root.storage_factory import ( build_descriptor_index, build_tile_store, ) from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy from gps_denied_onboard.runtime_root.vpr_factory import build_vpr_strategy if TYPE_CHECKING: from gps_denied_onboard._types.manifests import EngineHandle from gps_denied_onboard.components.c7_inference import InferenceRuntime from gps_denied_onboard.config import Config from gps_denied_onboard.helpers.feature_extractor import FeatureExtractor __all__ = [ "AIRBORNE_MAIN_PRODUCER_ID", "AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS", "C3_MATCHER_BUILD_FLAGS", "C5_STATE_BUILD_FLAGS", "C7_AIRBORNE_BUILD_FLAGS", "FAISS_BUILD_FLAG", "AirborneBootstrapError", "build_pre_constructed", "clear_imu_preintegrator_cache", "register_airborne_strategies", ] _IMU_PREINTEGRATOR_CACHE: dict[str, ImuPreintegrator] = {} """Per-process cache mapping ``camera_calibration_path`` to the :class:`ImuPreintegrator` built for that path. Backs AC-623.2: invoking :func:`build_pre_constructed` twice in the same process MUST return the SAME ``c5_imu_preintegrator`` instance when the calibration path is unchanged. The preintegrator is the only stateful c5 helper this phase wires; caching protects its bias / sample accumulator from being silently rebuilt on a re-invocation. Tests call :func:`clear_imu_preintegrator_cache` to isolate state. """ def clear_imu_preintegrator_cache() -> None: """Drop every cached :class:`ImuPreintegrator` (test-isolation only). Mirrors :func:`gps_denied_onboard.fdr_client.client.clear_fdr_client_cache` / :func:`gps_denied_onboard.runtime_root.state_factory.clear_state_registry`'s test-only contract: production code never calls this, but unit tests that exercise the per-path cache need a way to reset between cases. """ _IMU_PREINTEGRATOR_CACHE.clear() FAISS_BUILD_FLAG: Final[str] = "BUILD_FAISS_INDEX" """Env flag gating the FAISS-backed ``DescriptorIndex`` impl. Mirrors :func:`gps_denied_onboard.runtime_root.storage_factory.build_descriptor_index` which reads the same flag at composition time. Surfaced here so the airborne bootstrap can name the flag in an :class:`AirborneBootstrapError` when the flag is OFF but a consuming component still requires the index. """ C7_AIRBORNE_BUILD_FLAGS: Final[tuple[tuple[str, str], ...]] = ( ("tensorrt", "BUILD_TENSORRT_RUNTIME"), ("pytorch_fp16", "BUILD_PYTORCH_FP16_RUNTIME"), ) """Airborne-buildable C7 inference runtimes paired with their gating env flags. Production-default for the airborne binary is ``tensorrt`` (TensorRT FP16); ``pytorch_fp16`` is the Tier-0 / workstation fallback (per ``module-layout.md`` build-time exclusion table and the AZ-621 task spec). ``onnx_trt_ep`` is deliberately omitted — it is research-only and not built into the airborne binary, even though :mod:`gps_denied_onboard.runtime_root.inference_factory` supports the label. Surfaced here so :func:`_build_c7_inference` can name BOTH airborne flags in an :class:`AirborneBootstrapError` (AC-621.2) — the operator sees which flag must be flipped ON to enable the configured runtime AND which fallback flag would unblock the bootstrap with a different runtime selection. """ C5_STATE_BUILD_FLAGS: Final[Mapping[str, str]] = { "gtsam_isam2": "BUILD_STATE_GTSAM_ISAM2", "eskf": "BUILD_STATE_ESKF", } """Per-strategy ``BUILD_STATE_*`` flag matrix consumed by the airborne c5_state estimator pair builder (AZ-625 / Phase E.5). Mirrors :data:`gps_denied_onboard.runtime_root.state_factory._STATE_BUILD_FLAGS` verbatim — both this constant and the state factory's table read the same compile-time flags. ANY mutation of this matrix MUST be mirrored in ``state_factory._STATE_BUILD_FLAGS`` (and vice versa). Surfaced here so :func:`_build_c5_state_estimator_pair` can name the gating flag in an :class:`AirborneBootstrapError` (AC-625.2) when the configured C5 state strategy's flag is OFF in this binary, *before* :func:`build_state_estimator` has a chance to raise the lower-level :class:`StateEstimatorConfigError` (which is the state-factory-internal error type, not the operator-facing bootstrap-error contract this module owns). """ C3_MATCHER_BUILD_FLAGS: Final[Mapping[str, str]] = { "disk_lightglue": "BUILD_MATCHER_DISK_LIGHTGLUE", "aliked_lightglue": "BUILD_MATCHER_ALIKED_LIGHTGLUE", "xfeat": "BUILD_MATCHER_XFEAT", } """Per-strategy ``BUILD_MATCHER_*`` flag matrix consumed by the airborne LightGlue-runtime builder (AZ-622 / Phase D). Mirrors :data:`gps_denied_onboard.runtime_root.matcher_factory.\ _STRATEGY_TO_BUILD_FLAG` verbatim — both this constant and the matcher factory's table read the same compile-time flags. ANY mutation of this matrix MUST be mirrored in ``matcher_factory._STRATEGY_TO_BUILD_FLAG`` (and vice versa). Surfaced here so :func:`_build_c3_lightglue_runtime` can name the gating flag in an :class:`AirborneBootstrapError` (AC-622.2) when the configured C3 matcher strategy's flag is OFF in this binary. Note on flag naming: AZ-622's task spec uses the name ``BUILD_C3_MATCHER_DISK_LIGHTGLUE`` informally, but the actual production flag matrix is the older ``BUILD_MATCHER_*`` family (``matcher_factory._STRATEGY_TO_BUILD_FLAG``). Reusing those flags is the spirit of the AZ-622 ``MUST reuse the existing per-strategy BUILD_C3_MATCHER_* matrix`` constraint. """ AIRBORNE_MAIN_PRODUCER_ID: Final[str] = "airborne_main" """Producer ID for the per-binary shared FdrClient placed under ``pre_constructed['c13_fdr']``. Per-component callers can still obtain their own FdrClient via ``make_fdr_client(, config)`` — the cache in :mod:`gps_denied_onboard.fdr_client.client` ensures one instance per ``producer_id``. The ``"airborne_main"`` instance is the one passed via ``pre_constructed`` for the wrappers that accept ``fdr_client=`` as a kwarg. """ _LOG = logging.getLogger("gps_denied_onboard.runtime_root.airborne_bootstrap") class AirborneBootstrapError(RuntimeError): """Raised when an airborne wrapper factory cannot find a required dep. The wrapper factories registered by :func:`register_airborne_strategies` extract infrastructure objects (fdr_client, descriptor_index, etc.) from the ``constructed`` dict passed to them by :func:`compose_root`. The caller (production ``main()`` or a unit test) is responsible for seeding ``pre_constructed`` with these dependencies before invoking ``compose_root``. A missing dep surfaces this error naming both the consuming component slug and the missing key, so the operator can fix the bootstrap wiring without guessing. """ AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS: Mapping[str, tuple[str, ...]] = { "c1_vio": ("c13_fdr",), "c2_vpr": ("c6_descriptor_index", "c7_inference"), "c2_5_rerank": ( "c6_tile_store", "c3_lightglue_runtime", "c3_feature_extractor", "clock", ), "c3_matcher": ( "c3_lightglue_runtime", "c282_ransac_filter", "c7_inference", ), "c3_5_adhop": ("c282_ransac_filter", "c7_inference"), "c4_pose": ( "c282_ransac_filter", "c5_wgs_converter", "c5_se3_utils", "c5_isam2_graph_handle", ), "c5_state": ( "c5_imu_preintegrator", "c5_se3_utils", "c5_wgs_converter", "c13_fdr", ), } """Per-component infrastructure-dep keys the airborne wrappers expect to find in ``constructed`` (i.e. in the ``pre_constructed`` dict callers pass to ``compose_root``). Tests use this to seed mock objects; production wiring populates them from the takeoff orchestrator (separate task — AZ-591 follow-up infrastructure-prep).""" _C1_VIO_STRATEGIES: tuple[str, ...] = ("klt_ransac", "okvis2", "vins_mono") _C2_VPR_STRATEGIES: tuple[str, ...] = ( "ultra_vpr", "net_vlad", "mega_loc", "mix_vpr", "sela_vpr", "eigen_places", "salad", ) _C2_5_RERANK_STRATEGIES: tuple[str, ...] = ("inlier_count",) _C3_MATCHER_STRATEGIES: tuple[str, ...] = ("disk_lightglue", "aliked_lightglue") _C3_5_ADHOP_STRATEGIES: tuple[str, ...] = ("adhop",) _C4_POSE_STRATEGIES: tuple[str, ...] = ("opencv_gtsam",) _C5_STATE_STRATEGIES: tuple[str, ...] = ("gtsam_isam2", "eskf") def _require(constructed: Mapping[str, Any], key: str, component_slug: str) -> Any: """Extract ``constructed[key]`` or raise AirborneBootstrapError.""" if key not in constructed: available = sorted(constructed.keys()) raise AirborneBootstrapError( f"airborne_bootstrap: component {component_slug!r} requires " f"pre_constructed[{key!r}] to be populated before compose_root() runs; " f"available keys in constructed: {available}. " "Production main() must build infrastructure (c13_fdr, c6_*, " "c7_inference, etc.) into pre_constructed and pass it to " "compose_root(config, pre_constructed=...). Tests stub it via the " "same kwarg." ) return constructed[key] def _c1_vio_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: fdr_client = _require(constructed, "c13_fdr", "c1_vio") return build_vio_strategy(config, fdr_client=fdr_client) def _c2_vpr_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: descriptor_index = _require(constructed, "c6_descriptor_index", "c2_vpr") inference_runtime = _require(constructed, "c7_inference", "c2_vpr") return build_vpr_strategy( config, descriptor_index=descriptor_index, inference_runtime=inference_runtime, ) def _c2_5_rerank_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: tile_store = _require(constructed, "c6_tile_store", "c2_5_rerank") lightglue_runtime = _require(constructed, "c3_lightglue_runtime", "c2_5_rerank") feature_extractor = _require(constructed, "c3_feature_extractor", "c2_5_rerank") clock = _require(constructed, "clock", "c2_5_rerank") fdr_client = constructed.get("c13_fdr") return build_rerank_strategy( config, tile_store=tile_store, lightglue_runtime=lightglue_runtime, feature_extractor=feature_extractor, clock=clock, fdr_client=fdr_client, ) def _c3_matcher_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: lightglue_runtime = _require(constructed, "c3_lightglue_runtime", "c3_matcher") ransac_filter = _require(constructed, "c282_ransac_filter", "c3_matcher") inference_runtime = _require(constructed, "c7_inference", "c3_matcher") clock = constructed.get("clock") fdr_client = constructed.get("c13_fdr") return build_matcher_strategy( config, lightglue_runtime=lightglue_runtime, ransac_filter=ransac_filter, inference_runtime=inference_runtime, clock=clock, fdr_client=fdr_client, ) def _c3_5_adhop_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: ransac_filter = _require(constructed, "c282_ransac_filter", "c3_5_adhop") inference_runtime = _require(constructed, "c7_inference", "c3_5_adhop") clock = constructed.get("clock") fdr_client = constructed.get("c13_fdr") return build_refiner_strategy( config, ransac_filter=ransac_filter, inference_runtime=inference_runtime, clock=clock, fdr_client=fdr_client, ) def _c4_pose_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: ransac_filter = _require(constructed, "c282_ransac_filter", "c4_pose") wgs_converter = _require(constructed, "c5_wgs_converter", "c4_pose") se3_utils = _require(constructed, "c5_se3_utils", "c4_pose") isam2_graph_handle = _require(constructed, "c5_isam2_graph_handle", "c4_pose") fdr_client = constructed.get("c13_fdr") clock = constructed.get("clock") return build_pose_estimator( config, ransac_filter=ransac_filter, wgs_converter=wgs_converter, se3_utils=se3_utils, isam2_graph_handle=isam2_graph_handle, fdr_client=fdr_client, clock=clock, ) def _c5_state_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: # AZ-625 fast path: when build_pre_constructed has eagerly built the # (estimator, handle) pair, the estimator lives under the private # _c5_prebuilt_estimator key. Returning the prebuilt instance keeps # c4_pose's c5_isam2_graph_handle pointing at the SAME estimator's # _isam2_handle — the AC-625.3 cross-seam identity invariant. prebuilt = constructed.get(_C5_PREBUILT_ESTIMATOR_KEY) if prebuilt is not None: return prebuilt # Fallback path: tests / fixtures that bypass build_pre_constructed # (for example, the existing test_az401_compose_root_replay.py suite # which seeds pre_constructed manually) still drive the wrapper # through build_state_estimator directly. imu_preintegrator = _require(constructed, "c5_imu_preintegrator", "c5_state") se3_utils = _require(constructed, "c5_se3_utils", "c5_state") wgs_converter = _require(constructed, "c5_wgs_converter", "c5_state") fdr_client = _require(constructed, "c13_fdr", "c5_state") tile_store = constructed.get("c6_tile_store") camera_calibration = constructed.get("camera_calibration") flight_id = constructed.get("flight_id") companion_id = constructed.get("companion_id") _ensure_state_strategy_registered(config) estimator, _handle = build_state_estimator( config, imu_preintegrator=imu_preintegrator, se3_utils=se3_utils, wgs_converter=wgs_converter, fdr_client=fdr_client, tile_store=tile_store, camera_calibration=camera_calibration, flight_id=flight_id, companion_id=companion_id, ) return estimator def _ensure_state_strategy_registered(config: Config) -> None: """Register the c5_state strategy module if its build flag is on. state_factory does NOT have a lazy-import fallback (unlike pose_factory). The configured strategy module's ``register()`` must be called before ``build_state_estimator`` is invoked. We do this here, lazily, so the gtsam-bound code only loads when the airborne binary is actually configured for it AND the matching BUILD_* flag is ON. """ block = getattr(config, "components", None) or {} c5_block = block.get("c5_state") if isinstance(block, dict) else None strategy = ( getattr(c5_block, "strategy", "gtsam_isam2") if c5_block is not None else "gtsam_isam2" ) # state_factory._STATE_BUILD_FLAGS: gtsam_isam2 defaults ON-when-unset; # eskf defaults OFF-when-unset (mirror state_factory's own logic). if strategy == "gtsam_isam2": if os.environ.get("BUILD_STATE_GTSAM_ISAM2", "ON").upper() == "OFF": return from gps_denied_onboard.components.c5_state import gtsam_isam2_estimator gtsam_isam2_estimator.register() elif strategy == "eskf": if os.environ.get("BUILD_STATE_ESKF", "OFF").upper() != "ON": return from gps_denied_onboard.components.c5_state import eskf_baseline eskf_baseline.register() _C1_VIO_DEPENDS_ON: tuple[str, ...] = () _C2_VPR_DEPENDS_ON: tuple[str, ...] = () _C2_5_RERANK_DEPENDS_ON: tuple[str, ...] = ("c2_vpr",) _C3_MATCHER_DEPENDS_ON: tuple[str, ...] = () _C3_5_ADHOP_DEPENDS_ON: tuple[str, ...] = ("c3_matcher",) _C4_POSE_DEPENDS_ON: tuple[str, ...] = ("c1_vio", "c3_matcher") _C5_STATE_DEPENDS_ON: tuple[str, ...] = ("c1_vio", "c4_pose") """Inter-component dependency edges for ``_topo_order``. These are the runtime data-flow dependencies (NOT infrastructure deps in ``pre_constructed``): * c2_5_rerank reranks c2_vpr candidates → depends on c2_vpr. * c3_5_adhop refines c3_matcher correspondences → depends on c3_matcher. * c4_pose is anchored against c1_vio's pose estimate and consumes c3_matcher correspondences → depends on both. * c5_state fuses c1_vio + c4_pose updates → depends on both. The leaf slugs (c1_vio, c2_vpr, c3_matcher) have no inter-component deps inside the registry-driven path; their infrastructure deps (fdr_client, descriptor_index, etc.) come from ``pre_constructed``. """ _AIRBORNE_REGISTRATIONS: tuple[tuple[str, tuple[str, ...], Any, tuple[str, ...]], ...] = ( ("c1_vio", _C1_VIO_STRATEGIES, _c1_vio_wrapper, _C1_VIO_DEPENDS_ON), ("c2_vpr", _C2_VPR_STRATEGIES, _c2_vpr_wrapper, _C2_VPR_DEPENDS_ON), ( "c2_5_rerank", _C2_5_RERANK_STRATEGIES, _c2_5_rerank_wrapper, _C2_5_RERANK_DEPENDS_ON, ), ( "c3_matcher", _C3_MATCHER_STRATEGIES, _c3_matcher_wrapper, _C3_MATCHER_DEPENDS_ON, ), ( "c3_5_adhop", _C3_5_ADHOP_STRATEGIES, _c3_5_adhop_wrapper, _C3_5_ADHOP_DEPENDS_ON, ), ("c4_pose", _C4_POSE_STRATEGIES, _c4_pose_wrapper, _C4_POSE_DEPENDS_ON), ( "c5_state", _C5_STATE_STRATEGIES, _c5_state_wrapper, _C5_STATE_DEPENDS_ON, ), ) def _consumers_of_pre_constructed_key(key: str) -> tuple[str, ...]: """Return component slugs that require ``key`` in ``pre_constructed``. Reads :data:`AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS` — the single source of truth for which slot consumes which infrastructure dep. """ return tuple( sorted( slug for slug, required in AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS.items() if key in required ) ) def _configured_consumers_of_pre_constructed_key(config: Config, key: str) -> tuple[str, ...]: """Return consumers of ``key`` that are present in ``config.components``. Used to narrow an error message from "every theoretical consumer of ``c6_descriptor_index``" to "the consumer(s) you actually configured". Falls back to the full theoretical set when config carries no component blocks (e.g., bare ``Config()`` in early-bootstrap tests). """ consumers = _consumers_of_pre_constructed_key(key) components = getattr(config, "components", None) or {} if not isinstance(components, Mapping): return consumers configured = tuple(slug for slug in consumers if slug in components) return configured if configured else consumers def _build_c6_descriptor_index(config: Config) -> Any: """Build ``pre_constructed['c6_descriptor_index']`` via the C6 factory. Wraps :func:`storage_factory.build_descriptor_index` so a :class:`RuntimeNotAvailableError` (typically raised when ``BUILD_FAISS_INDEX`` is OFF) surfaces as an :class:`AirborneBootstrapError` naming the missing key, the gating build flag, and the component(s) that need the index. The original factory error is preserved via ``raise ... from``. AC-620.2: this is the path the test exercises when ``BUILD_FAISS_INDEX=OFF`` and a C2 strategy needing the index is configured. """ try: return build_descriptor_index(config) except RuntimeNotAvailableError as exc: consumers = _configured_consumers_of_pre_constructed_key(config, "c6_descriptor_index") raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c6_descriptor_index'] because " f"{FAISS_BUILD_FLAG} is OFF (or the FAISS impl module is " f"unavailable). Consuming components: {list(consumers)}. " f"Set {FAISS_BUILD_FLAG}=ON to enable the FAISS DescriptorIndex, " f"or reconfigure the consuming components to use a strategy " f"that does not require the index." ) from exc def _build_c6_tile_store(config: Config) -> Any: """Build ``pre_constructed['c6_tile_store']`` via the C6 factory. Thin pass-through to :func:`storage_factory.build_tile_store`. There is no ``BUILD_*`` flag for the tile store (the Postgres + filesystem backend is always built when c6 is configured); failures here surface as :class:`RuntimeNotAvailableError` with the operator-actionable message provided by the factory itself. """ return build_tile_store(config) def _build_c7_inference(config: Config) -> Any: """Build ``pre_constructed['c7_inference']`` via the C7 factory. Wraps :func:`inference_factory.build_inference_runtime` so a :class:`RuntimeNotAvailableError` (raised when the configured runtime's ``BUILD_*`` flag is OFF) surfaces as an :class:`AirborneBootstrapError` naming: * the missing key (``c7_inference``); * BOTH airborne-buildable runtimes and their gating flags (:data:`C7_AIRBORNE_BUILD_FLAGS`), so the operator sees the production-default (``tensorrt`` / ``BUILD_TENSORRT_RUNTIME``) AND the Tier-0 fallback (``pytorch_fp16`` / ``BUILD_PYTORCH_FP16_RUNTIME``); * the consuming component slug(s) — narrowed to configured consumers when available, else the full theoretical set. The original factory error is preserved via ``raise ... from`` so operators still see the upstream cause (e.g., "runtime 'tensorrt' requires BUILD_TENSORRT_RUNTIME=ON in this binary; the flag is OFF."). AC-621.2: this is the path the test exercises when both airborne C7 flags are OFF and a configured consumer (c2_vpr / c3_matcher / c3_5_adhop) still needs ``c7_inference``. """ try: return build_inference_runtime(config) except RuntimeNotAvailableError as exc: consumers = _configured_consumers_of_pre_constructed_key(config, "c7_inference") flag_options = ", ".join( f"{flag}=ON for runtime {runtime!r}" for runtime, flag in C7_AIRBORNE_BUILD_FLAGS ) raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c7_inference'] because no airborne C7 " f"inference runtime is buildable. Consuming components: " f"{list(consumers)}. Set one of: {flag_options}, and ensure " f"config.components['c7_inference'].runtime matches the " f"enabled flag. Upstream error: {exc}" ) from exc def _resolve_c3_matcher_strategy(config: Config) -> str: """Return the configured C3 matcher strategy, defaulting to disk_lightglue. Reuses :class:`gps_denied_onboard.components.c3_matcher.C3MatcherConfig`'s own default ``"disk_lightglue"`` when ``config.components`` carries no ``c3_matcher`` block (early-bootstrap tests with bare ``Config()``). """ block = config.components.get("c3_matcher") if block is None: return "disk_lightglue" return getattr(block, "strategy", "disk_lightglue") def _resolve_c5_state_strategy(config: Config) -> str: """Return the configured C5 state strategy, defaulting to gtsam_isam2. Mirrors :func:`_resolve_c3_matcher_strategy` for the C5 slot. Reuses :class:`gps_denied_onboard.components.c5_state.config.C5StateConfig`'s own default ``"gtsam_isam2"`` when ``config.components`` carries no ``c5_state`` block (early-bootstrap tests with bare ``Config()``). """ components = getattr(config, "components", None) or {} if not isinstance(components, Mapping): return "gtsam_isam2" block = components.get("c5_state") if block is None: return "gtsam_isam2" return getattr(block, "strategy", "gtsam_isam2") _C5_PREBUILT_ESTIMATOR_KEY: Final[str] = "_c5_prebuilt_estimator" """Internal coordination key under which :func:`build_pre_constructed` stores the pre-built :class:`StateEstimator` instance. The C5 state estimator and its :class:`ISam2GraphHandle` are constructed as a single tuple by :func:`build_state_estimator`; the handle is the iSAM2 graph wrapper held INSIDE the estimator. C4 (``c4_pose``) reaches into ``pre_constructed['c5_isam2_graph_handle']`` at compose time — but the C4 wrapper runs BEFORE the C5 wrapper in topological order (``_C5_STATE_DEPENDS_ON: ('c1_vio', 'c4_pose')``). The handle therefore MUST exist in ``pre_constructed`` before either wrapper runs, which means the bootstrap MUST build the (estimator, handle) pair eagerly (AZ-625). Storing the prebuilt estimator under this internal key lets the C5 wrapper short-circuit on it and return the SAME instance the handle was extracted from, so ``c4_pose._isam2_handle`` and ``c5_state._isam2_handle`` reference ONE object across the C4 / C5 seam (AC-625.3 identity contract). Deliberately NOT in :data:`AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS` — it is an internal coordination key, not a Public API surface that any component queries directly (only the C5 wrapper consults it, and only as a fast path). """ def _build_c5_state_estimator_pair( config: Config, *, imu_preintegrator: Any, se3_utils: Any, wgs_converter: Any, fdr_client: Any, tile_store: Any | None = None, camera_calibration: CameraCalibration | None = None, flight_id: str | None = None, companion_id: str | None = None, ) -> tuple[Any, Any]: """Build the ``(StateEstimator, ISam2GraphHandle)`` tuple eagerly. The C5 estimator and its iSAM2 graph handle are produced together by :func:`gps_denied_onboard.runtime_root.state_factory.build_state_estimator` — the handle is the wrapper around the estimator's internal ``_isam2`` + ``_smoother`` substrate (see :class:`gps_denied_onboard.components.c5_state._isam2_handle.\ ISam2GraphHandleImpl`). The handle's constructor takes the estimator as input, so the two cannot be separately constructed without a Protocol-seam change in C5 — explicitly forbidden by the AZ-618 umbrella's "MUST NOT touch any per-component factory signature" constraint. Building the pair eagerly at bootstrap time is the AZ-625 fix: the handle reaches ``pre_constructed['c5_isam2_graph_handle']`` so :func:`compose_root` can satisfy C4's lookup in topological order (``c4_pose`` runs before ``c5_state``); the estimator reaches a private coordination slot (``_c5_prebuilt_estimator``) so :func:`_c5_state_wrapper` can short-circuit and return the SAME instance the handle is bound to. The cross-seam identity invariant is verified by AC-625.3. Validation order matches the rest of the airborne bootstrap: 1. Resolve the configured C5 state strategy (default ``"gtsam_isam2"``). 2. Look it up in :data:`C5_STATE_BUILD_FLAGS`. An unknown strategy is an :class:`AirborneBootstrapError` naming the supported set (AC-625.2). 3. Read the gating ``BUILD_STATE_*`` flag with the SAME default ladder the state factory uses (:func:`os.environ.get(flag, "ON").upper() == "OFF"`); an explicit OFF raises :class:`AirborneBootstrapError` naming the flag and the consuming component slug ``c5_state`` (AC-625.2). 4. Lazily register the strategy via :func:`_ensure_state_strategy_registered` — same hook the C5 wrapper uses, so a binary configured for the ESKF baseline does not import gtsam at bootstrap time. 5. Delegate to :func:`build_state_estimator` with the infrastructure kwargs the wrapper would have passed; surface any :class:`StateEstimatorConfigError` as an :class:`AirborneBootstrapError` so the operator-facing error contract is uniform. The optional kwargs ``tile_store`` / ``camera_calibration`` / ``flight_id`` / ``companion_id`` exist for AZ-389 orthorectifier wiring; they are forwarded to :func:`build_state_estimator` which only consumes them when ``c5_state.orthorectifier.enabled`` is True. Until AZ-624 wires the operator-supplied flight metadata into ``pre_constructed``, callers pass the available defaults (today: ``tile_store=constructed['c6_tile_store']``, the rest ``None``). Raises: AirborneBootstrapError: when the configured strategy is not in :data:`C5_STATE_BUILD_FLAGS`, when the strategy's ``BUILD_STATE_*`` flag is OFF, or when :func:`build_state_estimator` itself rejects the configuration (the original :class:`StateEstimatorConfigError` is preserved as ``__cause__``). """ from gps_denied_onboard.components.c5_state.errors import StateEstimatorConfigError strategy = _resolve_c5_state_strategy(config) flag = C5_STATE_BUILD_FLAGS.get(strategy) if flag is None: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_isam2_graph_handle'] because " f"config.components['c5_state'].strategy={strategy!r} is " f"not in the airborne BUILD-flag matrix " f"{sorted(C5_STATE_BUILD_FLAGS.keys())!r}. Consuming " f"component: c5_state. Reconfigure the C5 state strategy " f"to one of the supported strategies." ) # Mirror state_factory._STATE_BUILD_FLAGS gate: default "ON" when # unset; only explicit "OFF" blocks. Keeping the default identical # to state_factory means AZ-625's pre-check fires before # build_state_estimator's own gate, so the operator sees the # bootstrap-error contract instead of the lower-level config error. if os.environ.get(flag, "ON").upper() == "OFF": raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_isam2_graph_handle'] because the " f"gating flag {flag}=ON is required for the configured " f"strategy={strategy!r}, but {flag} is OFF in this binary. " f"Consuming component: c5_state. Set {flag}=ON, or " f"reconfigure config.components['c5_state'].strategy to a " f"strategy whose BUILD_STATE_* flag is ON." ) _ensure_state_strategy_registered(config) try: estimator, handle = build_state_estimator( config, imu_preintegrator=imu_preintegrator, se3_utils=se3_utils, wgs_converter=wgs_converter, fdr_client=fdr_client, tile_store=tile_store, camera_calibration=camera_calibration, flight_id=flight_id, companion_id=companion_id, ) except StateEstimatorConfigError as exc: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_isam2_graph_handle'] for " f"strategy={strategy!r} (gating flag {flag} is ON). " f"Consuming component: c5_state. Upstream error: {exc}" ) from exc return estimator, handle def _is_build_flag_on(flag_name: str) -> bool: """Read a compile-time ``BUILD_*`` flag from the environment. Mirrors the same predicate used by :func:`gps_denied_onboard.runtime_root.matcher_factory.\ _is_build_flag_on` — ``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) is ON; everything else (including unset) is OFF. Defined locally so the bootstrap does not depend on the matcher-factory's private helper. """ raw = os.environ.get(flag_name, "") return raw.strip().lower() in {"on", "1", "true", "yes"} def _load_lightglue_engine_handle( config: Config, inference_runtime: InferenceRuntime ) -> EngineHandle: """Production loader for the shared LightGlue matcher engine. Reads ``config.components['c3_matcher'].lightglue_weights_path``, compiles the engine via the C7 :class:`InferenceRuntime.compile_engine` (TensorRT or PyTorch-FP16 per AZ-621), then deserialises it into an opaque :class:`EngineHandle`. The handle's lifecycle is owned by the :class:`LightGlueRuntime` instance returned by :func:`_build_c3_lightglue_runtime`. AZ-622 unit tests monkey-patch this function with a sentinel :class:`EngineHandle`-shaped mock so they can exercise the LightGlueRuntime wiring without standing up a real GPU + TensorRT toolchain (per AZ-622 ``Tier-2 Note``: real LightGlue inference correctness is verified by AZ-624's Jetson AC-5 run). Raises: AirborneBootstrapError: if ``c3_matcher.lightglue_weights_path`` is ``None`` (the operator-actionable message points at the production main() wiring task — AZ-624). RuntimeNotAvailableError: if the underlying :func:`InferenceRuntime.compile_engine` / :func:`deserialize_engine` paths fail (caller wraps this into an :class:`AirborneBootstrapError`). """ block = config.components.get("c3_matcher") weights_path = getattr(block, "lightglue_weights_path", None) if block is not None else None if weights_path is None: raise AirborneBootstrapError( "airborne_bootstrap: cannot construct " "pre_constructed['c3_lightglue_runtime'] because " "config.components['c3_matcher'].lightglue_weights_path " "is None. Production main() (AZ-624) must populate the " "path to the compiled LightGlue engine before calling " "build_pre_constructed; tests stub _load_lightglue_engine_handle " "via monkeypatch." ) from gps_denied_onboard._types.inference import BuildConfig, PrecisionMode build_config: BuildConfig = BuildConfig( precision=PrecisionMode.FP16, workspace_mb=512, calibration_dataset=None, optimization_profiles=(), ) cache_entry = inference_runtime.compile_engine(weights_path, build_config) return inference_runtime.deserialize_engine(cache_entry) def _build_c3_lightglue_runtime( config: Config, *, inference_runtime: InferenceRuntime ) -> LightGlueRuntime: """Build ``pre_constructed['c3_lightglue_runtime']`` for the airborne binary. 1. Resolve the configured C3 matcher strategy (default ``disk_lightglue``). 2. Look up the gating flag in :data:`C3_MATCHER_BUILD_FLAGS`. An unknown strategy or an OFF flag is an :class:`AirborneBootstrapError` naming the missing flag and the consuming component slug ``c3_matcher`` (AC-622.2). 3. Load the LightGlue engine handle via :func:`_load_lightglue_engine_handle` (the heavy seam unit tests monkeypatch — see AZ-622 ``Tier-2 Note``). 4. Wrap the handle in :class:`LightGlueRuntime` and return. The returned runtime is the SAME instance that the wrappers for ``c3_matcher`` and ``c2_5_rerank`` extract from ``pre_constructed['c3_lightglue_runtime']`` — identity-share is the structural R14 fix (avoids double GPU memory; AZ-344 AC-10). The cross-component identity-share assertion is verified at AZ-624's integration AC, not here. Raises: AirborneBootstrapError: when the configured strategy's ``BUILD_MATCHER_*`` flag is OFF, when the strategy is unknown to :data:`C3_MATCHER_BUILD_FLAGS`, or when the heavy seam fails (the upstream :class:`RuntimeNotAvailableError` is preserved as ``__cause__``). """ strategy = _resolve_c3_matcher_strategy(config) flag = C3_MATCHER_BUILD_FLAGS.get(strategy) if flag is None: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c3_lightglue_runtime'] because " f"config.components['c3_matcher'].strategy={strategy!r} is " f"not in the airborne BUILD-flag matrix " f"{sorted(C3_MATCHER_BUILD_FLAGS.keys())!r}. Consuming " f"component: c3_matcher. Reconfigure the C3 matcher to " f"select one of the supported strategies." ) if not _is_build_flag_on(flag): raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c3_lightglue_runtime'] because the " f"gating flag {flag}=ON is required for the configured " f"strategy={strategy!r}, but {flag} is OFF in this binary. " f"Consuming component: c3_matcher. Set {flag}=ON, or " f"reconfigure config.components['c3_matcher'].strategy to a " f"strategy whose BUILD_MATCHER_* flag is ON." ) try: engine_handle = _load_lightglue_engine_handle(config, inference_runtime) except RuntimeNotAvailableError as exc: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c3_lightglue_runtime'] because the " f"LightGlue engine load failed for strategy={strategy!r} " f"(gating flag {flag} is ON). Consuming component: " f"c3_matcher. Upstream error: {exc}" ) from exc return LightGlueRuntime(engine_handle) def _load_camera_calibration(config: Config) -> CameraCalibration: """Read the camera calibration JSON into a :class:`CameraCalibration` DTO. Mirrors the on-disk JSON shape that :func:`gps_denied_onboard.runtime_root._replay_branch._load_camera_calibration` already accepts (same calibration file the live and replay binaries share). Replicated here \u2014 not imported from ``_replay_branch`` \u2014 because the replay-branch helper raises ``CompositionError`` (replay-flow contract) where the airborne bootstrap MUST raise :class:`AirborneBootstrapError` per the AZ-618 umbrella's operator-error contract. Both helpers consume the same on-disk format; any future change to that format MUST land in lockstep here and in ``_replay_branch.py``. AZ-623 unit tests monkey-patch this function with a sentinel :class:`CameraCalibration` so they exercise the :func:`_build_c5_imu_preintegrator` wiring without an on-disk JSON file (per the same Tier-2 monkeypatch pattern the AZ-622 builders use for the heavy LightGlue seam). """ import numpy as np path = config.runtime.camera_calibration_path if not path: raise AirborneBootstrapError( "airborne_bootstrap: cannot construct " "pre_constructed['c5_imu_preintegrator'] because " "config.runtime.camera_calibration_path is empty. " "Consuming component: c5_state. Production main() (AZ-624) " "must populate the path to the camera calibration JSON; " "tests stub _load_camera_calibration via monkeypatch." ) calib_path = Path(path) try: blob = json.loads(calib_path.read_text(encoding="utf-8")) except OSError as exc: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_imu_preintegrator'] because the camera " f"calibration file at {path!r} could not be read: {exc!r}. " f"Consuming component: c5_state. Ensure " f"config.runtime.camera_calibration_path points at a readable " f"JSON file." ) from exc except json.JSONDecodeError as exc: raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_imu_preintegrator'] because the camera " f"calibration file at {path!r} is not valid JSON: {exc!r}. " f"Consuming component: c5_state. Validate the calibration JSON " f"shape against the on-disk format documented in " f"runtime_root._replay_branch._load_camera_calibration." ) from exc if not isinstance(blob, Mapping): raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_imu_preintegrator'] because the camera " f"calibration at {path!r} must decode to a JSON object; got " f"{type(blob).__name__}. Consuming component: c5_state." ) intrinsics = np.asarray(blob.get("intrinsics_3x3"), dtype=np.float64) if intrinsics.shape != (3, 3): raise AirborneBootstrapError( f"airborne_bootstrap: cannot construct " f"pre_constructed['c5_imu_preintegrator'] because the camera " f"calibration at {path!r} 'intrinsics_3x3' must be 3x3; got " f"shape {intrinsics.shape}. Consuming component: c5_state." ) distortion = np.asarray(blob.get("distortion", []), dtype=np.float64) body_to_camera = np.asarray( blob.get("body_to_camera_se3", np.eye(4).tolist()), dtype=np.float64, ) return CameraCalibration( camera_id=str(blob.get("camera_id", "airborne-camera")), intrinsics_3x3=intrinsics, distortion=distortion, body_to_camera_se3=body_to_camera, acquisition_method=str(blob.get("acquisition_method", "operator")), metadata=dict(blob.get("metadata", {})), ) def _build_c282_ransac_filter(config: Config) -> RansacFilter: """Build ``pre_constructed['c282_ransac_filter']`` for the airborne binary. :class:`RansacFilter` is a static-only OpenCV wrapper (per AZ-282 / E-CC-HELPERS); a fresh instance carries no state. Consumers (``c3_matcher``, ``c3_5_adhop``, ``c4_pose``) dispatch to its static methods, so identity-share is irrelevant \u2014 AC-623.2 explicitly permits a fresh instance per :func:`build_pre_constructed` call. No ``BUILD_*`` flag check applies: the helper is a CPU-only OpenCV wrapper with no compile-time gate. """ del config # placeholder for future config-driven RANSAC variant selection return RansacFilter() def _build_c5_imu_preintegrator(config: Config) -> ImuPreintegrator: """Build (or retrieve cached) ``pre_constructed['c5_imu_preintegrator']``. Reads ``config.runtime.camera_calibration_path``, loads the :class:`CameraCalibration` DTO via :func:`_load_camera_calibration`, and constructs an :class:`ImuPreintegrator` via :func:`make_imu_preintegrator`. The preintegrator is cached at module level keyed by the calibration path \u2014 AC-623.2 requires that two invocations of :func:`build_pre_constructed` return the SAME instance for the same path, so the bias / sample accumulator is not silently reset across re-invocations. Raises: AirborneBootstrapError: when ``camera_calibration_path`` is empty / unreadable / malformed (AC-623.3); message names both the missing input AND the consuming component slug ``c5_state`` per the AZ-618 umbrella's operator-error contract. """ path = config.runtime.camera_calibration_path if not path: raise AirborneBootstrapError( "airborne_bootstrap: cannot construct " "pre_constructed['c5_imu_preintegrator'] because " "config.runtime.camera_calibration_path is empty. " "Consuming component: c5_state. Production main() (AZ-624) " "must populate the path before calling build_pre_constructed; " "tests stub _load_camera_calibration via monkeypatch." ) cached = _IMU_PREINTEGRATOR_CACHE.get(path) if cached is not None: return cached calibration = _load_camera_calibration(config) preintegrator = make_imu_preintegrator(calibration) _IMU_PREINTEGRATOR_CACHE[path] = preintegrator return preintegrator def _build_c5_se3_utils(config: Config) -> Any: """Build ``pre_constructed['c5_se3_utils']`` for the airborne binary. Returns the :mod:`gps_denied_onboard.helpers.se3_utils` module itself as the namespace handle. Python modules support attribute access for their public names (``exp_map``, ``log_map``, ``matrix_to_se3``, ``se3_to_matrix``, ``adjoint``, ``is_valid_rotation``, ``SE3``); both :class:`OpenCVGtsamPoseEstimator` and :class:`GtsamIsam2StateEstimator` store the injected handle as ``self._se3_utils: Any`` and dispatch via attribute access, so the module satisfies the contract without an extra wrapper class. The existing C5 unit-test fixtures (e.g. ``tests/unit/c5_state/test_az386_eskf_baseline.py``) inject ``mock.MagicMock()`` for the same slot \u2014 attribute-access shape matches. Returning the module also satisfies AC-623.2's caching note incidentally: Python's import machinery returns the same module object across calls, so two invocations of :func:`build_pre_constructed` see the SAME ``c5_se3_utils`` value. """ del config # the module-as-namespace selection is config-independent from gps_denied_onboard.helpers import se3_utils as se3_utils_module return se3_utils_module def _build_c5_wgs_converter(config: Config) -> WgsConverter: """Build ``pre_constructed['c5_wgs_converter']`` for the airborne binary. :class:`WgsConverter` is a stateless static-only class (per AZ-279); a fresh instance carries no module-level state beyond pyproj's cached transformer pair. Returns ``WgsConverter()`` to match the same construction pattern :mod:`runtime_root._replay_branch` already uses (``wgs_converter = WgsConverter()`` at line 205). No ``BUILD_*`` flag check applies. """ del config # placeholder for future config-driven coord-system selection return WgsConverter() def _build_c3_feature_extractor(config: Config) -> FeatureExtractor: """Build ``pre_constructed['c3_feature_extractor']`` for the airborne binary. Returns the shared :class:`FeatureExtractor` that C2.5's :class:`InlierCountReRanker` consumes for both per-frame nav-camera images and per-candidate tile pixels (per :class:`AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS`'s ``c2_5_rerank`` row). The L1 helper module :mod:`gps_denied_onboard.helpers.feature_extractor` documents :class:`OpenCvOrbExtractor` as the production-ready airborne placeholder until the C7 ``InferenceRuntime``-backed DISK / ALIKED feature extractor lands; AZ-622 wires that placeholder in. No ``BUILD_*`` flag check applies here: the C3 matcher's per-strategy flag matrix gates the *matcher* engine (handled by :func:`_build_c3_lightglue_runtime`); the *feature extractor* is consumed by C2.5 (not C3) and is a pure-CPU OpenCV path that has no compile-time gate of its own. The ``config`` argument is accepted for symmetry with the other bootstrap builders and to keep the door open for a future config-driven feature-extractor selection (DISK / ALIKED swap-in) without changing the call site in :func:`build_pre_constructed`. """ del config # currently no config knobs; placeholder for future selection return OpenCvOrbExtractor() def _c4_pose_disabled(config: Config) -> bool: """True iff the airborne open-loop ESKF composition profile is active. AZ-776 / ADR-012: when ``config.components["c4_pose"].enabled`` is explicitly ``False`` the composition root strips ``c4_pose`` from the component graph and the C4 wrapper never runs. The eager ``(estimator, handle)`` build inside :func:`build_pre_constructed` therefore has no consumer for the iSAM2 graph handle slot (``c5_isam2_graph_handle``); leaving the slot absent makes the "no C4" property visible at the pre-bootstrap layer too. Returns ``False`` when the block is absent or when ``enabled`` is missing / true — the full GTSAM profile (ADR-003 steady state) remains the default airborne pipeline. Replay-mode component-block omission is handled separately by :func:`_replay_omits_component_block`. """ components = getattr(config, "components", None) or {} if not isinstance(components, Mapping): return False block = components.get("c4_pose") if block is None: return False enabled = getattr(block, "enabled", True) return not bool(enabled) def _replay_omits_component_block(config: Config, block_name: str) -> bool: """True iff replay-mode :class:`Config` has no ``components[block_name]`` entry. AZ-687: the replay CLI (``gps-denied-replay``) synthesizes a minimal :class:`Config` carrying only ``mode == "replay"`` plus the ``replay`` sub-block. The strategy packages that register their own component config blocks via :func:`register_component_block` (e.g. :mod:`gps_denied_onboard.components.c6_tile_cache`, :mod:`gps_denied_onboard.components.c7_inference`, :mod:`gps_denied_onboard.components.c5_state`) are not imported on that path, so :func:`gps_denied_onboard.config.loader.load_config` produces an empty ``components`` mapping. The :func:`build_pre_constructed` seeds that depend on those blocks (``c6_descriptor_index``, ``c6_tile_store``, ``c7_inference``, ``c3_lightglue_runtime``, the C5 ``(estimator, handle)`` pair) are only consumed by :func:`compose_root`'s per-component wrappers, which only execute for slugs actually present in ``config.components``. With an empty components mapping, no wrapper asks for those slots and skipping their bootstrap is safe. The fix lives at this layer (BUILD-PRE-CONSTRUCTED), NOT at :func:`storage_factory._c6_config` / :func:`inference_factory._c7_config`: those helpers deliberately raise :class:`KeyError` on missing blocks (per their docstrings, silent fallback would mask a missing import). Returns ``False`` outside replay mode regardless of block presence — live-mode contracts (AC-687-2) MUST keep seeding every key. """ if config.mode != "replay": return False components = getattr(config, "components", None) or {} if not isinstance(components, Mapping): return False return block_name not in components def build_pre_constructed(config: Config) -> dict[str, Any]: """Build the airborne ``pre_constructed`` dict for :func:`compose_root`. AZ-619 (Phase A) seeded ``c13_fdr`` and ``clock``. AZ-620 (Phase B) added the two C6 storage entries (``c6_descriptor_index`` + ``c6_tile_store``). AZ-621 (Phase C) added ``c7_inference`` (PyTorch FP16 vs. TensorRT, gated by :data:`C7_AIRBORNE_BUILD_FLAGS`). AZ-622 (Phase D) added ``c3_lightglue_runtime`` (single shared :class:`gps_denied_onboard.helpers.lightglue_runtime.LightGlueRuntime` instance, gated by :data:`C3_MATCHER_BUILD_FLAGS` per the configured strategy) + ``c3_feature_extractor`` (the shared :class:`gps_denied_onboard.helpers.feature_extractor.FeatureExtractor` used by C2.5). AZ-623 (Phase E) added the four stateless / cached c5 helpers: ``c282_ransac_filter`` (shared :class:`gps_denied_onboard.helpers.ransac_filter.RansacFilter`), ``c5_imu_preintegrator`` (per-calibration-path-cached :class:`gps_denied_onboard.helpers.imu_preintegrator.ImuPreintegrator`), ``c5_se3_utils`` (the :mod:`gps_denied_onboard.helpers.se3_utils` module as a namespace handle), and ``c5_wgs_converter`` (shared :class:`gps_denied_onboard.helpers.wgs_converter.WgsConverter`). AZ-625 (Phase E.5) adds ``c5_isam2_graph_handle`` and seeds an internal coordination key (``_c5_prebuilt_estimator``) by eagerly invoking :func:`build_state_estimator` once at bootstrap time and capturing the ``(StateEstimator, ISam2GraphHandle)`` tuple — the handle reaches ``c4_pose`` via ``pre_constructed`` (C4 runs before C5 in topo order), and the prebuilt estimator lets the C5 wrapper short-circuit without re-invoking the factory. Phase F (AZ-624) will wire ``runtime_root.main()`` and verify AC-1..AC-5 end-to-end. Returns a fresh dict on each call. The ``c13_fdr`` instance is cached inside :func:`make_fdr_client` (per-producer cache) so two calls within the same process return dicts where ``pre_constructed['c13_fdr']`` is the SAME object — AC-619.2. ``clock`` is a fresh :class:`WallClock` each call (stateless; the cache would be a no-op). The C6, C7, and C3 entries are constructed via the existing :mod:`storage_factory`, :mod:`inference_factory`, and helper modules without additional caching at this layer; the C7 :class:`InferenceRuntime` built for the ``c7_inference`` slot is reused as the engine source for the LightGlue matcher load (AZ-622) so the bootstrap does not double-build the inference runtime. AZ-623's ``c5_imu_preintegrator`` is cached at module level (:data:`_IMU_PREINTEGRATOR_CACHE`) keyed by ``config.runtime.camera_calibration_path`` so its bias / sample accumulator survives a re-invocation. The remaining AZ-623 c5 helpers are stateless: ``c282_ransac_filter`` and ``c5_wgs_converter`` are fresh static-only instances; ``c5_se3_utils`` is the :mod:`gps_denied_onboard.helpers.se3_utils` module. Replay-mode override: :func:`compose_root` merges ``replay_components`` over ``pre_constructed`` so the :class:`WallClock` here is replaced by the replay branch's :class:`TlogDerivedClock`. That's intentional and matches the contract in :func:`compose_root`'s docstring. Replay-mode guard (AZ-687): when ``config.mode == "replay"`` and the minimal replay Config omits a strategy-component block (``c6_tile_cache``, ``c7_inference``, ``c5_state``), the bootstrap skips the seeds that would otherwise call the corresponding ``_cN_config`` helper and raise :class:`KeyError`. The skipped slots are absent from the returned dict; ``compose_root``'s per-component wrappers only run for slugs in ``config.components`` and never read the skipped slots. Live mode is unaffected — every documented key in :data:`AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS` is still seeded. See :func:`_replay_omits_component_block` for the precise predicate. Raises: AirborneBootstrapError: if ``BUILD_FAISS_INDEX`` is OFF and any configured consumer (per :data:`AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS`) requires ``c6_descriptor_index``; OR if no airborne C7 inference runtime is buildable (both ``BUILD_TENSORRT_RUNTIME`` and ``BUILD_PYTORCH_FP16_RUNTIME`` OFF, or the configured runtime's matching flag is OFF) and any configured consumer requires ``c7_inference``; OR if the configured C3 matcher strategy's :data:`C3_MATCHER_BUILD_FLAGS` flag is OFF (or the strategy is unknown), or if the LightGlue engine load fails; OR (AZ-623) if ``config.runtime.camera_calibration_path`` is empty / unreadable / malformed JSON, blocking the ``c5_imu_preintegrator`` build; OR (AZ-625) if the configured C5 state strategy's :data:`C5_STATE_BUILD_FLAGS` flag is OFF (or the strategy is unknown), or if :func:`build_state_estimator` itself rejects the configuration when the ``(StateEstimator, ISam2GraphHandle)`` pair is built eagerly. The message names the consuming component slug(s) and the relevant gating flag(s) or missing inputs. """ constructed: dict[str, Any] = {} constructed["c13_fdr"] = make_fdr_client(AIRBORNE_MAIN_PRODUCER_ID, config) constructed["clock"] = WallClock() # AZ-687: a minimal replay Config carries no `c6_tile_cache` block, # so `_build_c6_*` would KeyError inside `_c6_config`. The skip is # safe because the only wrappers that read these slots # (c2_vpr / c2_5_rerank / c5_state) require their own component # entries in `config.components`, which the minimal replay Config # also omits. if not _replay_omits_component_block(config, "c6_tile_cache"): constructed["c6_descriptor_index"] = _build_c6_descriptor_index(config) constructed["c6_tile_store"] = _build_c6_tile_store(config) # AZ-687: c3_lightglue_runtime cascades on `constructed["c7_inference"]`, # so it's gated under the same `c7_inference` block guard. Replay # mode without the c7_inference block also drops the c3_matcher / # c2_5_rerank wrappers that would have consumed the runtime. if not _replay_omits_component_block(config, "c7_inference"): constructed["c7_inference"] = _build_c7_inference(config) constructed["c3_lightglue_runtime"] = _build_c3_lightglue_runtime( config, inference_runtime=constructed["c7_inference"] ) constructed["c3_feature_extractor"] = _build_c3_feature_extractor(config) constructed["c282_ransac_filter"] = _build_c282_ransac_filter(config) constructed["c5_imu_preintegrator"] = _build_c5_imu_preintegrator(config) constructed["c5_se3_utils"] = _build_c5_se3_utils(config) constructed["c5_wgs_converter"] = _build_c5_wgs_converter(config) # AZ-687: the eager (estimator, handle) build registers the gtsam-bound # state factory and reads `config.components["c5_state"]`. When the # block is absent in replay mode the c5_state wrapper itself will not # run, so the handle / prebuilt-estimator slots are unread; skip the # build to avoid forcing the gtsam import on the replay binary. if not _replay_omits_component_block(config, "c5_state"): estimator, handle = _build_c5_state_estimator_pair( config, imu_preintegrator=constructed["c5_imu_preintegrator"], se3_utils=constructed["c5_se3_utils"], wgs_converter=constructed["c5_wgs_converter"], fdr_client=constructed["c13_fdr"], tile_store=constructed.get("c6_tile_store"), ) # AZ-776 / ADR-012 open-loop ESKF profile: c4_pose.enabled=False # strips c4_pose from the composition graph, so the # c5_isam2_graph_handle slot has no consumer. ESKF returns # handle=None by design; omitting the slot makes the open-loop # property visible at the pre_constructed layer and refuses the # invariant "handle is None means C4 will crash on read". if not _c4_pose_disabled(config): constructed["c5_isam2_graph_handle"] = handle constructed[_C5_PREBUILT_ESTIMATOR_KEY] = estimator return constructed def register_airborne_strategies() -> None: """Register every airborne (component, strategy) pair into ``_STRATEGY_REGISTRY``. Idempotent: a second call within the same process is a no-op because the underlying :func:`register_strategy` short-circuits identical re-registrations (the dedup check in :mod:`gps_denied_onboard.runtime_root` compares :class:`_Registration` records by value). Side effects: * Mutates the central :data:`_STRATEGY_REGISTRY` for the 7 strategy- selecting airborne component slots. Each slot gets one entry per buildable strategy (the wrapper is the same callable across all strategies for a slot, because the underlying per-component factory handles the strategy switch internally). * Does NOT touch state_factory's ``_STATE_REGISTRY`` or pose_factory's ``_POSE_REGISTRY`` here — those are populated lazily by the c5_state wrapper at compose time, behind ``BUILD_STATE_*`` flag gates, so a binary configured for klt_ransac + ESKF never imports gtsam. Call ONCE at process start, before any :func:`compose_root` invocation. Tests call ``clear_strategy_registry()`` first to isolate state across test cases. """ for slug, strategies, wrapper, depends_on in _AIRBORNE_REGISTRATIONS: for strategy in strategies: register_strategy( slug, strategy, wrapper, tier="airborne", depends_on=depends_on, ) _LOG.info( "airborne_bootstrap.strategies_registered", extra={ "kind": "airborne_bootstrap.strategies_registered", "kv": { "slots": [slug for slug, *_ in _AIRBORNE_REGISTRATIONS], "total_registrations": sum( len(strategies) for _, strategies, *_ in _AIRBORNE_REGISTRATIONS ), }, }, )