mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 08:41:12 +00:00
21a7784682
- gtsam_isam2_estimator: shim for gtsam>=4.3a0 aarch64 pre-release where IncrementalFixedLagSmoother/FixedLagSmootherKeyTimestampMap moved from gtsam_unstable to gtsam - inference_factory: eager import of c7_inference package so register_component_block runs before config.components is read - docker-compose.test.jetson.yml: remove companion and operator-orchestrator (not needed by replay CLI tests and crash in test env due to AZ-618 live-mode deps); add db-migrate and tile-init setup-profile services for Alembic migrations and FAISS fixture provisioning; update e2e-runner depends_on to db only - scripts/mk_test_faiss_fixture.py: generate minimal HNSW32 FAISS descriptor index into the tile-data volume for the test harness Co-authored-by: Cursor <cursoragent@cursor.com>
126 lines
4.6 KiB
Python
126 lines
4.6 KiB
Python
"""C7 inference-runtime composition-root factory (AZ-297).
|
|
|
|
:func:`build_inference_runtime` selects exactly one strategy by
|
|
``config.components['c7_inference'].runtime`` and respects compile-time
|
|
``BUILD_*`` gating: requesting a strategy whose flag is OFF raises
|
|
:class:`RuntimeNotAvailableError` at composition time (NOT at first
|
|
inference call).
|
|
|
|
The concrete strategy modules (``tensorrt_runtime``, ``onnx_trt_ep_runtime``,
|
|
``pytorch_fp16_runtime``) are imported lazily — a Tier-0 workstation
|
|
build with ``BUILD_TENSORRT_RUNTIME=OFF`` MUST NOT load
|
|
``c7_inference.tensorrt_runtime`` (Invariant I-5; verifiable via
|
|
``sys.modules``).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import TYPE_CHECKING
|
|
|
|
from gps_denied_onboard.runtime_root.errors import RuntimeNotAvailableError
|
|
|
|
# Eager package import so c7_inference.__init__.py runs
|
|
# `register_component_block("c7_inference", C7InferenceConfig)` before
|
|
# `_c7_config(config)` reads `config.components["c7_inference"]` below.
|
|
# The package __init__.py is import-safe (no concrete strategy modules)
|
|
# per the Risk-2 mitigation documented in c7_inference/__init__.py.
|
|
import gps_denied_onboard.components.c7_inference # noqa: F401
|
|
|
|
if TYPE_CHECKING:
|
|
from gps_denied_onboard.components.c7_inference import (
|
|
C7InferenceConfig,
|
|
InferenceRuntime,
|
|
)
|
|
from gps_denied_onboard.config.schema import Config
|
|
|
|
__all__ = ["build_inference_runtime"]
|
|
|
|
|
|
_RUNTIME_TO_BUILD_FLAG: dict[str, str] = {
|
|
"tensorrt": "BUILD_TENSORRT_RUNTIME",
|
|
"onnx_trt_ep": "BUILD_ONNX_TRT_EP_RUNTIME",
|
|
"pytorch_fp16": "BUILD_PYTORCH_FP16_RUNTIME",
|
|
}
|
|
|
|
_RUNTIME_TO_MODULE: dict[str, tuple[str, str]] = {
|
|
"tensorrt": (
|
|
"gps_denied_onboard.components.c7_inference.tensorrt_runtime",
|
|
"TensorrtRuntime",
|
|
),
|
|
"onnx_trt_ep": (
|
|
"gps_denied_onboard.components.c7_inference.onnx_trt_ep_runtime",
|
|
"OnnxTrtEpRuntime",
|
|
),
|
|
"pytorch_fp16": (
|
|
"gps_denied_onboard.components.c7_inference.pytorch_fp16_runtime",
|
|
"PytorchFp16Runtime",
|
|
),
|
|
}
|
|
|
|
|
|
def _is_build_flag_on(flag_name: str) -> bool:
|
|
"""Read a compile-time ``BUILD_*`` flag from the environment.
|
|
|
|
``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``;
|
|
anything else (including unset) → ``False``. Defaults to OFF so
|
|
test environments must opt-in explicitly per strategy.
|
|
"""
|
|
raw = os.environ.get(flag_name, "")
|
|
return raw.strip().lower() in {"on", "1", "true", "yes"}
|
|
|
|
|
|
def _c7_config(config: "Config") -> "C7InferenceConfig":
|
|
"""Pull the registered C7 config block.
|
|
|
|
``c7_inference.__init__`` registers it on import; a missing
|
|
registration is a developer error and surfaces as ``KeyError``
|
|
rather than a silent fallback.
|
|
"""
|
|
return config.components["c7_inference"]
|
|
|
|
|
|
def build_inference_runtime(config: "Config") -> "InferenceRuntime":
|
|
"""Construct the :class:`InferenceRuntime` impl selected by config.
|
|
|
|
The factory:
|
|
|
|
1. Reads ``config.components['c7_inference'].runtime``.
|
|
2. Checks the matching ``BUILD_*`` flag — if OFF, raises
|
|
:class:`RuntimeNotAvailableError` BEFORE any import.
|
|
3. Lazily imports the concrete strategy module.
|
|
4. Constructs and returns the strategy instance, passing ``config``.
|
|
|
|
Raises :class:`RuntimeNotAvailableError` when:
|
|
|
|
- The compile-time flag is OFF (the canonical Tier-0 path).
|
|
- The concrete strategy module has not been built yet (AZ-298 /
|
|
AZ-299 / AZ-300 are still pending) — the import fails and the
|
|
factory wraps :class:`ModuleNotFoundError`.
|
|
"""
|
|
block = _c7_config(config)
|
|
runtime = block.runtime
|
|
flag_name = _RUNTIME_TO_BUILD_FLAG.get(runtime)
|
|
module_info = _RUNTIME_TO_MODULE.get(runtime)
|
|
if flag_name is None or module_info is None:
|
|
raise RuntimeNotAvailableError(
|
|
f"InferenceRuntime runtime {runtime!r} is not buildable in "
|
|
"this binary."
|
|
)
|
|
if not _is_build_flag_on(flag_name):
|
|
raise RuntimeNotAvailableError(
|
|
f"InferenceRuntime runtime {runtime!r} requires "
|
|
f"{flag_name}=ON in this binary; the flag is OFF."
|
|
)
|
|
module_name, class_name = module_info
|
|
try:
|
|
module = __import__(module_name, fromlist=[class_name])
|
|
except ModuleNotFoundError as exc:
|
|
raise RuntimeNotAvailableError(
|
|
f"InferenceRuntime runtime {runtime!r} is configured but its "
|
|
f"concrete impl module {module_name!r} has not been built into "
|
|
"this binary yet (AZ-298 / AZ-299 / AZ-300 pending)."
|
|
) from exc
|
|
strategy_cls = getattr(module, class_name)
|
|
return strategy_cls(config)
|