[AZ-591] Add airborne_bootstrap to populate _STRATEGY_REGISTRY

Batch 66 — fixes the production gap surfaced during the cycle-1
completeness-gate post-mortem: the central _STRATEGY_REGISTRY was
empty in production source, so compose_root() raised
StrategyNotLinkedError on the first component lookup and the
airborne binary couldn't reach takeoff.

Changes:

- New module `src/.../runtime_root/airborne_bootstrap.py` exposes
  `register_airborne_strategies()` and a documented
  `AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS` table. The function
  registers 14 entries into the central registry across 7
  strategy-selecting slots (c1_vio + c2_vpr + c2_5_rerank +
  c3_matcher + c3_5_adhop + c4_pose + c5_state). Per-slot wrappers
  adapt the registry-factory signature (config, constructed) to each
  per-component factory's kwarg surface and surface a
  AirborneBootstrapError when a required infrastructure dep is
  missing from constructed.

- `compose_root` gains a `pre_constructed` kwarg in live mode,
  symmetric with the replay-mode seam. Replay entries still take
  precedence on key collision (ADR-011). Existing callers unaffected
  (kwarg defaults to None).

- `runtime_root/__init__.py::main()` now calls
  `register_airborne_strategies()` before `compose_root(config)` so
  production binaries no longer crash at the registry-lookup step.

- Lazy-loading preserved: state_factory's private _STATE_REGISTRY is
  populated lazily inside the c5_state wrapper, gated by
  BUILD_STATE_GTSAM_ISAM2 / BUILD_STATE_ESKF env flags. pose_factory's
  own lazy-import fallback handles c4_pose without an explicit
  register() call.

- 7 new unit tests in `tests/unit/runtime_root/test_az591_airborne_\
  bootstrap.py` cover AC-1..AC-5 plus the negative-path
  AirborneBootstrapError contract. Full unit suite 2105 passed / 88
  environment-gated skips / 0 failures.

End-to-end takeoff still needs a follow-up task to wire infrastructure
pre-construction (c13_fdr / c6_* / c7_inference / etc.) into the
pre_constructed dict passed to compose_root. That follow-up is gated
by AZ-591 landing first; recommended split into per-component
infrastructure-prep tasks (3pt each).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-16 12:58:38 +03:00
parent 6d51e06886
commit f7a99282fb
6 changed files with 796 additions and 4 deletions
@@ -0,0 +1,414 @@
"""Per-binary bootstrap for the airborne runtime (AZ-591).
Populates the central ``_STRATEGY_REGISTRY`` (see
:mod:`gps_denied_onboard.runtime_root`) with the (component, strategy)
pairs the airborne binary supports, so that :func:`compose_root` can
resolve ``config.components[slug].strategy`` for every strategy-selecting
component (c1_vio, c2_vpr, c2_5_rerank, c3_matcher, c3_5_adhop, c4_pose,
c5_state).
Without this bootstrap, ``compose_root`` raises :class:`StrategyNotLinkedError`
on the first component lookup because no module under :mod:`gps_denied_onboard.\
runtime_root` calls :func:`register_strategy` at import time — by design, per
ADR-002, the build-flag gate must be the single place that decides which
strategies are linked into a given binary.
Call order at process start:
1. ``register_airborne_strategies()`` — once, before any ``compose_root`` call.
2. (Optional, test/production both) populate a ``pre_constructed`` dict with
the infrastructure objects the wrappers below expect (``c13_fdr``,
``c6_descriptor_index``, ``c7_inference``, etc.).
3. ``compose_root(config, pre_constructed=pre_constructed)``.
The wrapper factories below adapt the registry-factory signature
``(config, constructed)`` to each per-component factory's keyword-argument
surface (e.g. ``build_vio_strategy(config, *, fdr_client=...)``). Every dep is
looked up by a documented key in ``constructed``; a missing key surfaces as a
:class:`AirborneBootstrapError` naming the missing dep + the consuming
component slug.
Lazy-loading is preserved at two levels:
* **Central registry**: identity wrappers are registered for every strategy
the binary supports. The wrappers themselves only import the concrete
strategy module via the per-component factory (which already gates by
``BUILD_*`` env flags) — they do NOT eagerly import strategy modules.
* **Per-component private registries** (``state_factory._STATE_REGISTRY``
needs explicit ``register_state_estimator`` calls; ``pose_factory`` has its
own lazy-import fallback so no explicit registration is needed): the
wrapper calls each strategy module's ``register()`` only when the config
actually selects that strategy AND the matching ``BUILD_*`` flag is ON.
ADR refs: ADR-001 (composition root is single registration site), ADR-002
(build-flag gate is the lazy-loading boundary), AZ-507 (cross-component
import rule — bootstrap may import any component's Public API, but not its
internals).
"""
from __future__ import annotations
import logging
import os
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from gps_denied_onboard.runtime_root import register_strategy
from gps_denied_onboard.runtime_root.matcher_factory import build_matcher_strategy
from gps_denied_onboard.runtime_root.pose_factory import build_pose_estimator
from gps_denied_onboard.runtime_root.refiner_factory import build_refiner_strategy
from gps_denied_onboard.runtime_root.rerank_factory import build_rerank_strategy
from gps_denied_onboard.runtime_root.state_factory import build_state_estimator
from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy
from gps_denied_onboard.runtime_root.vpr_factory import build_vpr_strategy
if TYPE_CHECKING:
from gps_denied_onboard.config import Config
__all__ = [
"AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS",
"AirborneBootstrapError",
"register_airborne_strategies",
]
_LOG = logging.getLogger("gps_denied_onboard.runtime_root.airborne_bootstrap")
class AirborneBootstrapError(RuntimeError):
"""Raised when an airborne wrapper factory cannot find a required dep.
The wrapper factories registered by :func:`register_airborne_strategies`
extract infrastructure objects (fdr_client, descriptor_index, etc.) from
the ``constructed`` dict passed to them by :func:`compose_root`. The
caller (production ``main()`` or a unit test) is responsible for seeding
``pre_constructed`` with these dependencies before invoking
``compose_root``. A missing dep surfaces this error naming both the
consuming component slug and the missing key, so the operator can fix
the bootstrap wiring without guessing.
"""
AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS: Mapping[str, tuple[str, ...]] = {
"c1_vio": ("c13_fdr",),
"c2_vpr": ("c6_descriptor_index", "c7_inference"),
"c2_5_rerank": (
"c6_tile_store",
"c3_lightglue_runtime",
"c3_feature_extractor",
"clock",
),
"c3_matcher": (
"c3_lightglue_runtime",
"c282_ransac_filter",
"c7_inference",
),
"c3_5_adhop": ("c282_ransac_filter", "c7_inference"),
"c4_pose": (
"c282_ransac_filter",
"c5_wgs_converter",
"c5_se3_utils",
"c5_isam2_graph_handle",
),
"c5_state": (
"c5_imu_preintegrator",
"c5_se3_utils",
"c5_wgs_converter",
"c13_fdr",
),
}
"""Per-component infrastructure-dep keys the airborne wrappers expect to find
in ``constructed`` (i.e. in the ``pre_constructed`` dict callers pass to
``compose_root``). Tests use this to seed mock objects; production wiring
populates them from the takeoff orchestrator (separate task — AZ-591 follow-up
infrastructure-prep)."""
_C1_VIO_STRATEGIES: tuple[str, ...] = ("klt_ransac", "okvis2", "vins_mono")
_C2_VPR_STRATEGIES: tuple[str, ...] = (
"ultra_vpr",
"net_vlad",
"mega_loc",
"mix_vpr",
"sela_vpr",
"eigen_places",
"salad",
)
_C2_5_RERANK_STRATEGIES: tuple[str, ...] = ("inlier_count",)
_C3_MATCHER_STRATEGIES: tuple[str, ...] = ("disk_lightglue", "aliked_lightglue")
_C3_5_ADHOP_STRATEGIES: tuple[str, ...] = ("adhop",)
_C4_POSE_STRATEGIES: tuple[str, ...] = ("opencv_gtsam",)
_C5_STATE_STRATEGIES: tuple[str, ...] = ("gtsam_isam2", "eskf")
def _require(
constructed: Mapping[str, Any], key: str, component_slug: str
) -> Any:
"""Extract ``constructed[key]`` or raise AirborneBootstrapError."""
if key not in constructed:
available = sorted(constructed.keys())
raise AirborneBootstrapError(
f"airborne_bootstrap: component {component_slug!r} requires "
f"pre_constructed[{key!r}] to be populated before compose_root() runs; "
f"available keys in constructed: {available}. "
"Production main() must build infrastructure (c13_fdr, c6_*, "
"c7_inference, etc.) into pre_constructed and pass it to "
"compose_root(config, pre_constructed=...). Tests stub it via the "
"same kwarg."
)
return constructed[key]
def _c1_vio_wrapper(config: "Config", constructed: Mapping[str, Any]) -> Any:
fdr_client = _require(constructed, "c13_fdr", "c1_vio")
return build_vio_strategy(config, fdr_client=fdr_client)
def _c2_vpr_wrapper(config: "Config", constructed: Mapping[str, Any]) -> Any:
descriptor_index = _require(constructed, "c6_descriptor_index", "c2_vpr")
inference_runtime = _require(constructed, "c7_inference", "c2_vpr")
return build_vpr_strategy(
config,
descriptor_index=descriptor_index,
inference_runtime=inference_runtime,
)
def _c2_5_rerank_wrapper(
config: "Config", constructed: Mapping[str, Any]
) -> Any:
tile_store = _require(constructed, "c6_tile_store", "c2_5_rerank")
lightglue_runtime = _require(
constructed, "c3_lightglue_runtime", "c2_5_rerank"
)
feature_extractor = _require(
constructed, "c3_feature_extractor", "c2_5_rerank"
)
clock = _require(constructed, "clock", "c2_5_rerank")
fdr_client = constructed.get("c13_fdr")
return build_rerank_strategy(
config,
tile_store=tile_store,
lightglue_runtime=lightglue_runtime,
feature_extractor=feature_extractor,
clock=clock,
fdr_client=fdr_client,
)
def _c3_matcher_wrapper(
config: "Config", constructed: Mapping[str, Any]
) -> Any:
lightglue_runtime = _require(
constructed, "c3_lightglue_runtime", "c3_matcher"
)
ransac_filter = _require(constructed, "c282_ransac_filter", "c3_matcher")
inference_runtime = _require(constructed, "c7_inference", "c3_matcher")
clock = constructed.get("clock")
fdr_client = constructed.get("c13_fdr")
return build_matcher_strategy(
config,
lightglue_runtime=lightglue_runtime,
ransac_filter=ransac_filter,
inference_runtime=inference_runtime,
clock=clock,
fdr_client=fdr_client,
)
def _c3_5_adhop_wrapper(
config: "Config", constructed: Mapping[str, Any]
) -> Any:
ransac_filter = _require(constructed, "c282_ransac_filter", "c3_5_adhop")
inference_runtime = _require(constructed, "c7_inference", "c3_5_adhop")
clock = constructed.get("clock")
fdr_client = constructed.get("c13_fdr")
return build_refiner_strategy(
config,
ransac_filter=ransac_filter,
inference_runtime=inference_runtime,
clock=clock,
fdr_client=fdr_client,
)
def _c4_pose_wrapper(config: "Config", constructed: Mapping[str, Any]) -> Any:
ransac_filter = _require(constructed, "c282_ransac_filter", "c4_pose")
wgs_converter = _require(constructed, "c5_wgs_converter", "c4_pose")
se3_utils = _require(constructed, "c5_se3_utils", "c4_pose")
isam2_graph_handle = _require(
constructed, "c5_isam2_graph_handle", "c4_pose"
)
fdr_client = constructed.get("c13_fdr")
clock = constructed.get("clock")
return build_pose_estimator(
config,
ransac_filter=ransac_filter,
wgs_converter=wgs_converter,
se3_utils=se3_utils,
isam2_graph_handle=isam2_graph_handle,
fdr_client=fdr_client,
clock=clock,
)
def _c5_state_wrapper(config: "Config", constructed: Mapping[str, Any]) -> Any:
imu_preintegrator = _require(
constructed, "c5_imu_preintegrator", "c5_state"
)
se3_utils = _require(constructed, "c5_se3_utils", "c5_state")
wgs_converter = _require(constructed, "c5_wgs_converter", "c5_state")
fdr_client = _require(constructed, "c13_fdr", "c5_state")
tile_store = constructed.get("c6_tile_store")
camera_calibration = constructed.get("camera_calibration")
flight_id = constructed.get("flight_id")
companion_id = constructed.get("companion_id")
_ensure_state_strategy_registered(config)
estimator, _handle = build_state_estimator(
config,
imu_preintegrator=imu_preintegrator,
se3_utils=se3_utils,
wgs_converter=wgs_converter,
fdr_client=fdr_client,
tile_store=tile_store,
camera_calibration=camera_calibration,
flight_id=flight_id,
companion_id=companion_id,
)
return estimator
def _ensure_state_strategy_registered(config: "Config") -> None:
"""Register the c5_state strategy module if its build flag is on.
state_factory does NOT have a lazy-import fallback (unlike pose_factory).
The configured strategy module's ``register()`` must be called before
``build_state_estimator`` is invoked. We do this here, lazily, so the
gtsam-bound code only loads when the airborne binary is actually
configured for it AND the matching BUILD_* flag is ON.
"""
block = getattr(config, "components", None) or {}
c5_block = block.get("c5_state") if isinstance(block, dict) else None
strategy = (
getattr(c5_block, "strategy", "gtsam_isam2")
if c5_block is not None
else "gtsam_isam2"
)
# state_factory._STATE_BUILD_FLAGS: gtsam_isam2 defaults ON-when-unset;
# eskf defaults OFF-when-unset (mirror state_factory's own logic).
if strategy == "gtsam_isam2":
if os.environ.get("BUILD_STATE_GTSAM_ISAM2", "ON").upper() == "OFF":
return
from gps_denied_onboard.components.c5_state import gtsam_isam2_estimator
gtsam_isam2_estimator.register()
elif strategy == "eskf":
if os.environ.get("BUILD_STATE_ESKF", "OFF").upper() != "ON":
return
from gps_denied_onboard.components.c5_state import eskf_baseline
eskf_baseline.register()
_C1_VIO_DEPENDS_ON: tuple[str, ...] = ()
_C2_VPR_DEPENDS_ON: tuple[str, ...] = ()
_C2_5_RERANK_DEPENDS_ON: tuple[str, ...] = ("c2_vpr",)
_C3_MATCHER_DEPENDS_ON: tuple[str, ...] = ()
_C3_5_ADHOP_DEPENDS_ON: tuple[str, ...] = ("c3_matcher",)
_C4_POSE_DEPENDS_ON: tuple[str, ...] = ("c1_vio", "c3_matcher")
_C5_STATE_DEPENDS_ON: tuple[str, ...] = ("c1_vio", "c4_pose")
"""Inter-component dependency edges for ``_topo_order``. These are the runtime
data-flow dependencies (NOT infrastructure deps in ``pre_constructed``):
* c2_5_rerank reranks c2_vpr candidates → depends on c2_vpr.
* c3_5_adhop refines c3_matcher correspondences → depends on c3_matcher.
* c4_pose is anchored against c1_vio's pose estimate and consumes c3_matcher
correspondences → depends on both.
* c5_state fuses c1_vio + c4_pose updates → depends on both.
The leaf slugs (c1_vio, c2_vpr, c3_matcher) have no inter-component deps
inside the registry-driven path; their infrastructure deps (fdr_client,
descriptor_index, etc.) come from ``pre_constructed``.
"""
_AIRBORNE_REGISTRATIONS: tuple[
tuple[str, tuple[str, ...], Any, tuple[str, ...]], ...
] = (
("c1_vio", _C1_VIO_STRATEGIES, _c1_vio_wrapper, _C1_VIO_DEPENDS_ON),
("c2_vpr", _C2_VPR_STRATEGIES, _c2_vpr_wrapper, _C2_VPR_DEPENDS_ON),
(
"c2_5_rerank",
_C2_5_RERANK_STRATEGIES,
_c2_5_rerank_wrapper,
_C2_5_RERANK_DEPENDS_ON,
),
(
"c3_matcher",
_C3_MATCHER_STRATEGIES,
_c3_matcher_wrapper,
_C3_MATCHER_DEPENDS_ON,
),
(
"c3_5_adhop",
_C3_5_ADHOP_STRATEGIES,
_c3_5_adhop_wrapper,
_C3_5_ADHOP_DEPENDS_ON,
),
("c4_pose", _C4_POSE_STRATEGIES, _c4_pose_wrapper, _C4_POSE_DEPENDS_ON),
(
"c5_state",
_C5_STATE_STRATEGIES,
_c5_state_wrapper,
_C5_STATE_DEPENDS_ON,
),
)
def register_airborne_strategies() -> None:
"""Register every airborne (component, strategy) pair into ``_STRATEGY_REGISTRY``.
Idempotent: a second call within the same process is a no-op because the
underlying :func:`register_strategy` short-circuits identical
re-registrations (the dedup check in
:mod:`gps_denied_onboard.runtime_root` compares
:class:`_Registration` records by value).
Side effects:
* Mutates the central :data:`_STRATEGY_REGISTRY` for the 7 strategy-
selecting airborne component slots. Each slot gets one entry per
buildable strategy (the wrapper is the same callable across all
strategies for a slot, because the underlying per-component factory
handles the strategy switch internally).
* Does NOT touch state_factory's ``_STATE_REGISTRY`` or pose_factory's
``_POSE_REGISTRY`` here — those are populated lazily by the c5_state
wrapper at compose time, behind ``BUILD_STATE_*`` flag gates, so a
binary configured for klt_ransac + ESKF never imports gtsam.
Call ONCE at process start, before any :func:`compose_root` invocation.
Tests call ``clear_strategy_registry()`` first to isolate state across
test cases.
"""
for slug, strategies, wrapper, depends_on in _AIRBORNE_REGISTRATIONS:
for strategy in strategies:
register_strategy(
slug,
strategy,
wrapper,
tier="airborne",
depends_on=depends_on,
)
_LOG.info(
"airborne_bootstrap.strategies_registered",
extra={
"kind": "airborne_bootstrap.strategies_registered",
"kv": {
"slots": [slug for slug, *_ in _AIRBORNE_REGISTRATIONS],
"total_registrations": sum(
len(strategies)
for _, strategies, *_ in _AIRBORNE_REGISTRATIONS
),
},
},
)