[AZ-331] C1 VioStrategy: Protocol + DTOs + factory + C5 migration

Freezes the c1_vio Public API per
_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md v1.0.0:

- VioStrategy Protocol (4 methods: process_frame, reset_to_warm_start,
  health_snapshot, current_strategy_label) in
  components/c1_vio/interface.py.
- DTOs (VioOutput, VioHealth, FeatureQuality, WarmStartPose) + VioState
  enum in _types/nav.py — L1 placement so C5 + C13 consume them without
  crossing the components.* boundary (AZ-270 AC-6). The new VioOutput
  shape (frame_id: str, relative_pose_T: gtsam.Pose3,
  pose_covariance_6x6, imu_bias, feature_quality, emitted_at_ns)
  replaces the AZ-263 scaffolding in _types/vio.py, which is now
  deleted.
- VioError family (VioInitializingError / VioDegradedError /
  VioFatalError) in components/c1_vio/errors.py. Documented
  rationale: the degraded-operation path returns a VioOutput with
  inflated covariance + VioHealth.state=DEGRADED rather than raising
  VioDegradedError — the error type exists only for the rare
  degraded->fatal transition.
- C1VioConfig per-component config block (strategy enum,
  lost_frame_threshold default 9, warm_start_max_frames default 5)
  with constructor-time validation rejecting unknown strategy labels.
- StrategyNotAvailableError added to runtime_root/errors.py;
  composition-time error distinct from the VioError family.
- Composition-root factory build_vio_strategy in
  runtime_root/vio_factory.py with three BUILD_* gates (BUILD_OKVIS2,
  BUILD_VINS_MONO, BUILD_KLT_RANSAC). Concrete strategy modules are
  imported lazily via __import__ AFTER the flag check — Tier-0
  workstation builds with the flag OFF MUST NOT load the strategy
  module (Risk-2 / I-5; verifiable via sys.modules).
- 36 conformance tests cover all 9 ACs + NFR-perf-factory
  (p99 build under 200 ms x 1000 calls) + NFR-reliability-error-family.
  AC-8 introspects the contract file's Shape table and asserts method
  parity against the runtime Protocol; AC-9 asserts the frame_id
  annotation is 'str' (PEP-563 stringified).

C5 migration (consumers of the new VioOutput shape):
- gtsam_isam2_estimator.py + eskf_baseline.py: replaced
  vio.timestamp -> vio.emitted_at_ns (drops _datetime_to_ns on the
  VIO path), vio.pose_se3 -> vio.relative_pose_T (gtsam.Pose3 direct;
  drops _pose_se3_to_gtsam / _pose_se3_to_array), vio.covariance_6x6
  -> vio.pose_covariance_6x6 (rename).
- key_for_frame signature widened to UUID | int | str to accept the
  new str frame_id.
- 4 C5 test files migrated to the new VioOutput shape with helper
  fixtures producing ImuBias + FeatureQuality + str frame_id.
- c5_state/interface.py TYPE_CHECKING import path updated.

Bootstrap healthcheck + test_types_importable updated to drop the
deleted _types/vio module and pick up _types/inference (AZ-297) in
the same sweep.

Full unit-test sweep: 884 passed, 2 pre-existing environment skips
(cmake, actionlint).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 04:44:31 +03:00
parent daff5d4d1c
commit 6c7d24f7e0
20 changed files with 1027 additions and 81 deletions
+111 -6
View File
@@ -1,16 +1,25 @@
"""Navigation-side DTOs: camera frames, IMU samples, attitude, FC flight state, GPS health.
"""Navigation-side DTOs: camera frames, IMU samples, attitude, VIO output.
These are type-only stubs created by AZ-263 (Bootstrap). Concrete field semantics are
defined in `_docs/02_document/architecture.md § 4` and the C1 / C5 / C8 component
specs. Concrete subclasses are owned by the components that emit them; downstream
consumers depend on the DTOs declared here.
Type-only stubs created by AZ-263 (Bootstrap) and extended by AZ-331
(C1 VioStrategy contract freeze, v1.0.0). Concrete field semantics
are defined in ``_docs/02_document/architecture.md § 4`` and the C1
/ C5 / C8 component specs. Concrete subclasses are owned by the
components that emit them; downstream consumers depend on the DTOs
declared here. Cross-component DTOs (``VioOutput``, ``VioHealth``)
live at this L1 layer per module-layout.md — components MUST NOT
import them from other components.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
import numpy.typing as npt
from gps_denied_onboard.helpers.se3_utils import SE3
@dataclass(frozen=True)
@@ -75,3 +84,99 @@ class AttitudeWindow:
# canonical shape uses enums + monotonic_ns timestamps; the old stubs
# from AZ-263 used `str` + `datetime` and were never wired by any
# production producer or consumer).
# ----------------------------------------------------------------------
# AZ-331 — C1 VioStrategy contract DTOs (v1.0.0).
from enum import Enum
class VioState(str, Enum):
"""C1 VIO health state reported on every ``VioHealth`` snapshot."""
INIT = "init"
TRACKING = "tracking"
DEGRADED = "degraded"
LOST = "lost"
@dataclass(frozen=True)
class FeatureQuality:
"""Per-frame feature-tracking diagnostics (C1 contract v1.0.0).
Surfaced inside :class:`VioOutput`; consumed by C13 FDR + C5 fusion
for adaptive-gating decisions.
"""
tracked: int
new: int
lost: int
mean_parallax: float
mre_px: float
@dataclass(frozen=True)
class WarmStartPose:
"""Hint passed to ``VioStrategy.reset_to_warm_start`` after F8 reboot.
``body_T_world`` is a :class:`gtsam.Pose3` (= ``SE3``); the warm-start
+ F8 reboot recovery wiring task in E-C1 owns the on-disk persistence
pattern. ``captured_at_ns`` is :func:`time.monotonic_ns` at hint
production time.
"""
body_T_world: "SE3"
velocity_b: tuple[float, float, float]
bias: "ImuBias"
captured_at_ns: int
@dataclass(frozen=True)
class VioOutput:
"""C1 strategy per-frame output (C1 contract v1.0.0).
``frame_id`` is a stable string identifier echoed from the input
``NavCameraFrame.frame_id`` (stringified if the source is an int
or UUID); C5 uses it as a hashable key into its frame→GTSAM-key
map (Invariant — alignment).
``relative_pose_T`` is the strategy's current pose as a
:class:`gtsam.Pose3`; expressed in the strategy's own internal
frame (VIO has no absolute world reference — the internal frame
drifts from world over time). C5 computes the between-factor
delta via ``prev.between(curr)`` and inserts ``curr`` as the
iSAM2 Values entry.
``pose_covariance_6x6`` is the symmetric positive-definite 6×6
covariance in the rotation-then-translation tangent-space order;
strategies MUST NOT tighten this during a degradation event
(Invariant — honest covariance is the AC-NEW-4 / AC-NEW-7 floor).
``imu_bias`` is the strategy's latest bias estimate;
``feature_quality`` is per-frame tracker diagnostics;
``emitted_at_ns`` is :func:`time.monotonic_ns` at output time.
"""
frame_id: str
relative_pose_T: "SE3"
pose_covariance_6x6: "npt.NDArray[Any]"
imu_bias: "ImuBias"
feature_quality: FeatureQuality
emitted_at_ns: int
@dataclass(frozen=True)
class VioHealth:
"""C1 strategy health snapshot (C1 contract v1.0.0).
Returned from :meth:`VioStrategy.health_snapshot`; consumed by C13
FDR and the composition-root watchdog. ``consecutive_lost`` ticks
every time ``state == LOST``; the strategy raises
:class:`VioFatalError` once this exceeds
``config.vio.lost_frame_threshold`` (default 9).
"""
state: VioState
consecutive_lost: int
bias_norm: float
-21
View File
@@ -1,21 +0,0 @@
"""C1 VIO output DTO."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class VioOutput:
"""VIO pose + uncertainty + health bundle.
Concrete semantics in `_docs/02_document/components/01_c1_vio/description.md § 2`.
"""
frame_id: int
timestamp: datetime
pose_se3: Any
covariance_6x6: Any | None = None
health_flags: dict[str, Any] = field(default_factory=dict)
@@ -1,6 +1,52 @@
"""C1 VIO component — Public API."""
"""C1 VIO — Public API (AZ-331).
from gps_denied_onboard._types.vio import VioOutput
Per ``vio_strategy_protocol.md`` v1.0.0 the public surface consists
of:
- :class:`VioStrategy` Protocol (4 methods).
- DTOs / enum re-exported from :mod:`gps_denied_onboard._types.nav`
(the L1 home for cross-component DTOs): :class:`VioOutput`,
:class:`VioHealth`, :class:`FeatureQuality`, :class:`WarmStartPose`,
:class:`VioState`.
- Error family rooted at :class:`VioError`; three documented subtypes.
- Config block :class:`C1VioConfig` (registered on import).
Concrete strategies (``Okvis2Strategy``, ``VinsMonoStrategy``,
``KltRansacStrategy``) live in sibling modules and are imported
lazily by :mod:`gps_denied_onboard.runtime_root.vio_factory` —
Risk-2 mitigation: this ``__init__.py`` MUST NOT import any concrete
strategy module.
"""
from gps_denied_onboard._types.nav import (
FeatureQuality,
VioHealth,
VioOutput,
VioState,
WarmStartPose,
)
from gps_denied_onboard.components.c1_vio.config import C1VioConfig
from gps_denied_onboard.components.c1_vio.errors import (
VioDegradedError,
VioError,
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.components.c1_vio.interface import VioStrategy
from gps_denied_onboard.config.schema import register_component_block
__all__ = ["VioOutput", "VioStrategy"]
register_component_block("c1_vio", C1VioConfig)
__all__ = [
"C1VioConfig",
"FeatureQuality",
"VioDegradedError",
"VioError",
"VioFatalError",
"VioHealth",
"VioInitializingError",
"VioOutput",
"VioState",
"VioStrategy",
"WarmStartPose",
]
@@ -0,0 +1,63 @@
"""C1 VIO strategy config block (AZ-331).
Registered into ``config.components['c1_vio']`` by the package
``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.vio_factory.build_vio_strategy`
reads this block to select the strategy and configure the LOST→FATAL
transition + warm-start convergence budget.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Final
from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"C1VioConfig",
"KNOWN_STRATEGIES",
]
KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset(
{"okvis2", "vins_mono", "klt_ransac"}
)
@dataclass(frozen=True)
class C1VioConfig:
"""Per-component config for C1 VIO.
``strategy`` selects exactly one of the three backends
(``okvis2`` / ``vins_mono`` / ``klt_ransac``); the
composition-root factory respects compile-time ``BUILD_*`` gating
on top of this label.
``lost_frame_threshold`` is the number of consecutive ``LOST``
frames before the strategy raises :class:`VioFatalError`;
default 9 per ``vio_strategy_protocol.md`` v1.0.0.
``warm_start_max_frames`` is the convergence budget after
:meth:`VioStrategy.reset_to_warm_start`; default 5.
"""
strategy: str = "klt_ransac"
lost_frame_threshold: int = 9
warm_start_max_frames: int = 5
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STRATEGIES:
raise ConfigError(
f"C1VioConfig.strategy={self.strategy!r} not in "
f"{sorted(KNOWN_STRATEGIES)}"
)
if self.lost_frame_threshold < 1:
raise ConfigError(
f"C1VioConfig.lost_frame_threshold must be >= 1; "
f"got {self.lost_frame_threshold}"
)
if self.warm_start_max_frames < 1:
raise ConfigError(
f"C1VioConfig.warm_start_max_frames must be >= 1; "
f"got {self.warm_start_max_frames}"
)
@@ -0,0 +1,64 @@
"""C1 VioStrategy error taxonomy (AZ-331).
Every ``VioStrategy`` method raises only members of :class:`VioError`.
Lower-level exceptions from OpenCV / OKVIS2 / VINS-Mono / GTSAM MUST
be caught and rewrapped — the contract closes the error envelope so
consumers can ``except VioError`` once and handle the family.
:class:`VioDegradedError` is documented but is **not raised** during
the normal degraded-operation path: degraded operation returns a
``VioOutput`` with inflated covariance and ``VioHealth.state == DEGRADED``.
The error type exists for the rare degraded→fatal transition.
A separate composition-time error
(:class:`gps_denied_onboard.runtime_root.errors.StrategyNotAvailableError`)
lives outside this family — it is raised by the factory, not by a
``VioStrategy`` method.
"""
from __future__ import annotations
__all__ = [
"VioDegradedError",
"VioError",
"VioFatalError",
"VioInitializingError",
]
class VioError(Exception):
"""Base class for the C1 VIO error family.
All three documented subtypes are children. Consumers catch the
family with ``except c1_vio.errors.VioError as e``.
"""
class VioInitializingError(VioError):
"""Raised while ``VioHealth.state == INIT`` and the strategy has no
pose to emit.
C5 fusion catches this and falls back to the FC IMU prior until
the strategy reports ``TRACKING``.
"""
class VioDegradedError(VioError):
"""Raised on the rare degraded→fatal transition.
The normal degraded-operation path is NOT this exception — it is
a ``VioOutput`` with inflated covariance + ``VioHealth.state ==
DEGRADED``. This type exists only so consumer ``except VioError``
wrappers can name the family member explicitly.
"""
class VioFatalError(VioError):
"""Raised once ``consecutive_lost`` exceeds the configured threshold
(default 9) or on irrecoverable backend init failure during
:meth:`VioStrategy.reset_to_warm_start`.
The AC-5.2 fallback path engages once this fires: the camera
ingest loop stops feeding the strategy and the watchdog flips
the composition root into FC-IMU-only mode.
"""
@@ -1,20 +1,94 @@
"""C1 `VioStrategy` Protocol.
"""C1 ``VioStrategy`` Protocol (AZ-331).
Concrete strategies: OKVIS2 (default), VINS-Mono (research-only), KLT/RANSAC
(mandatory simple baseline). See `_docs/02_document/components/01_c1_vio/`.
PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; four
methods that span the camera-ingest hot path
(``process_frame``), F8 reboot recovery (``reset_to_warm_start``),
diagnostics (``health_snapshot``), and self-identification
(``current_strategy_label``).
Concrete impls — :class:`Okvis2Strategy` (AZ-332),
:class:`VinsMonoStrategy` (AZ-333), :class:`KltRansacStrategy`
(AZ-334) — live in sibling modules and are imported lazily by
:mod:`gps_denied_onboard.runtime_root.vio_factory`.
The contract at
``_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md``
v1.0.0 is the authoritative shape; this module mirrors it 1:1.
"""
from __future__ import annotations
from typing import Protocol
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable
from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame
from gps_denied_onboard._types.vio import VioOutput
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuWindow,
NavCameraFrame,
VioHealth,
VioOutput,
WarmStartPose,
)
__all__ = ["VioStrategy"]
@runtime_checkable
class VioStrategy(Protocol):
"""Visual-Inertial-Odometry strategy."""
"""On-Jetson visual / visual-inertial odometry runtime.
def step(self, frame: NavCameraFrame, imu: ImuWindow) -> VioOutput:
"""Process a single nav-camera frame + IMU window and return a VIO update."""
Implementations:
:class:`Okvis2Strategy` (production-default, OKVIS2 SLAM),
:class:`VinsMonoStrategy` (research-only),
:class:`KltRansacStrategy` (mandatory simple-baseline per ADR-002).
Selection is owned by the composition root.
Invariants (see ``vio_strategy_protocol.md`` v1.0.0):
- Single-threaded per instance (one camera-ingest writer thread).
- ``current_strategy_label()`` is constant per instance and
equals ``config.vio.strategy`` exactly.
- Error envelope is closed — only members of
:class:`VioError` escape ``process_frame``.
"""
def process_frame(
self,
frame: "NavCameraFrame",
imu: "ImuWindow",
calibration: "CameraCalibration",
) -> "VioOutput":
"""Camera-ingest hot-path call (one per nav-camera frame).
``VioOutput.frame_id`` MUST equal ``frame.frame_id`` (C5
alignment invariant). Raises :class:`VioInitializingError`
while booting (state == INIT; no output emitted) and
:class:`VioFatalError` once ``consecutive_lost`` exceeds the
configured threshold. During DEGRADED operation the method
returns normally with an inflated covariance — NOT raises
:class:`VioDegradedError`.
"""
...
def reset_to_warm_start(self, hint: "WarmStartPose") -> None:
"""Destructive re-init from an F8-reboot warm-start hint.
Clears keyframe window, IMU integration state, feature
tracks. Subsequent ``process_frame`` calls re-initialise from
the hint. Raises :class:`VioFatalError` only on irrecoverable
backend init failure.
"""
...
def health_snapshot(self) -> "VioHealth":
"""Most-recent health state — FDR-stamped per the AC-NEW-3 audit."""
...
def current_strategy_label(
self,
) -> Literal["okvis2", "vins_mono", "klt_ransac"]:
"""Identify which concrete strategy is wired here.
Returned string equals ``config.vio.strategy`` exactly.
AC-NEW-3 FDR audit relies on this property.
"""
...
@@ -449,9 +449,9 @@ class EskfStateEstimator(StateEstimator):
residual in the previous body frame.
"""
self._close_cold_start_window()
ts_ns = _datetime_to_ns(vio.timestamp)
ts_ns = vio.emitted_at_ns
self._guard_timestamp(ts_ns, source="vio")
curr_pose = _pose_se3_to_array(vio.pose_se3)
curr_pose = vio.relative_pose_T.matrix()
if self._prev_vio_pose is None:
self._prev_vio_pose = curr_pose
@@ -498,7 +498,7 @@ class EskfStateEstimator(StateEstimator):
H = np.zeros((6, _N_STATE), dtype=np.float64)
H[0:3, _IDX_POS] = np.eye(3)
H[3:6, _IDX_ROT] = prev_R # rotate body-frame perturbation back to world
R_meas = _measurement_noise(vio.covariance_6x6)
R_meas = _measurement_noise(vio.pose_covariance_6x6)
try:
self._kalman_update(H, residual, R_meas, source="vio")
@@ -6,7 +6,7 @@ real bodies. AZ-383 owns the three Protocol factor-add methods:
* ``add_vio(vio: VioOutput)`` — ``BetweenFactorPose3`` between
consecutive pose keys with a noise model derived from
``vio.covariance_6x6``.
``vio.pose_covariance_6x6``.
* ``add_pose_anchor(pose: PoseEstimate)`` — mode-dispatched per
``pose.covariance_mode``: ``"marginals"`` → ``PriorFactorPose3`` +
``update``; ``"jacobian"`` → skip iSAM2 add (per the AZ-361 cross-task
@@ -403,7 +403,7 @@ class GtsamIsam2StateEstimator(StateEstimator):
"""Register a callback fired exactly once per fallback recovery."""
return self._fallback.subscribe_recovered(callback)
def key_for_frame(self, frame_id: UUID | int) -> int:
def key_for_frame(self, frame_id: UUID | int | str) -> int:
"""Return the GTSAM ``Key`` for ``frame_id``, assigning on first use.
AZ-383 calls this from ``add_vio`` and ``add_pose_anchor`` to
@@ -653,10 +653,10 @@ class GtsamIsam2StateEstimator(StateEstimator):
"""
handle = self._require_handle()
self._close_cold_start_window()
ts_ns = _datetime_to_ns(vio.timestamp)
ts_ns = vio.emitted_at_ns
self._guard_timestamp(ts_ns, source="vio")
curr_pose = _pose_se3_to_gtsam(vio.pose_se3)
curr_pose = vio.relative_pose_T
curr_key = self.key_for_frame(vio.frame_id)
if self._prev_vio is None:
@@ -674,10 +674,10 @@ class GtsamIsam2StateEstimator(StateEstimator):
)
return
prev_pose = _pose_se3_to_gtsam(self._prev_vio.pose_se3)
prev_pose = self._prev_vio.relative_pose_T
prev_key = self.key_for_frame(self._prev_vio.frame_id)
relative_pose = prev_pose.between(curr_pose)
noise = _build_pose_noise(vio.covariance_6x6)
noise = _build_pose_noise(vio.pose_covariance_6x6)
factor = gtsam.BetweenFactorPose3(prev_key, curr_key, relative_pose, noise)
try:
@@ -26,7 +26,7 @@ if TYPE_CHECKING:
EstimatorHealth,
EstimatorOutput,
)
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard._types.nav import VioOutput
__all__ = ["StateEstimator"]
+1 -1
View File
@@ -23,12 +23,12 @@ def check() -> int:
from gps_denied_onboard._types import ( # noqa: F401
calibration,
emitted,
inference,
manifests,
matching,
nav,
pose,
tile,
vio,
vpr,
)
from gps_denied_onboard.logging import get_logger # noqa: F401
+13 -1
View File
@@ -2,7 +2,7 @@
These are raised at composition time (``build_*`` factory entry) and
NOT during the running flight. Components own their per-runtime error
families; this module owns the cross-component selection error.
families; this module owns the cross-component selection errors.
"""
from __future__ import annotations
@@ -22,3 +22,15 @@ class RuntimeNotAvailableError(RuntimeError):
The message MUST name the requested runtime label so the operator can
correlate against ``.env``'s ``BUILD_*`` matrix without guessing.
"""
class StrategyNotAvailableError(RuntimeError):
"""Raised when ``build_vio_strategy`` is asked for a VIO strategy whose
compile-time ``BUILD_*`` flag is OFF (AZ-331).
Distinct from :class:`RuntimeNotAvailableError` because the C1
contract names this error type explicitly (AC-5). The message
MUST name both the requested strategy label and the missing
``BUILD_*`` flag so the operator can correlate against the
binary's compile matrix.
"""
@@ -0,0 +1,114 @@
"""C1 VIO strategy composition-root factory (AZ-331).
:func:`build_vio_strategy` selects exactly one strategy by
``config.components['c1_vio'].strategy`` and respects compile-time
``BUILD_*`` gating: requesting a strategy whose flag is OFF raises
:class:`StrategyNotAvailableError` at composition time (NOT at first
frame).
Concrete strategy modules (``okvis2``, ``vins_mono``, ``klt_ransac``)
are imported lazily — a Tier-0 workstation build with
``BUILD_OKVIS2=OFF`` MUST NOT load ``c1_vio.okvis2`` (Risk-2 / I-5;
verifiable via ``sys.modules``).
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
if TYPE_CHECKING:
from gps_denied_onboard.components.c1_vio import (
C1VioConfig,
VioStrategy,
)
from gps_denied_onboard.components.c13_fdr import FdrClient
from gps_denied_onboard.config.schema import Config
__all__ = ["build_vio_strategy"]
_STRATEGY_TO_BUILD_FLAG: dict[str, str] = {
"okvis2": "BUILD_OKVIS2",
"vins_mono": "BUILD_VINS_MONO",
"klt_ransac": "BUILD_KLT_RANSAC",
}
_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = {
"okvis2": ("gps_denied_onboard.components.c1_vio.okvis2", "Okvis2Strategy"),
"vins_mono": (
"gps_denied_onboard.components.c1_vio.vins_mono",
"VinsMonoStrategy",
),
"klt_ransac": (
"gps_denied_onboard.components.c1_vio.klt_ransac",
"KltRansacStrategy",
),
}
def _is_build_flag_on(flag_name: str) -> bool:
"""Read a compile-time ``BUILD_*`` flag from the environment.
``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``;
anything else (including unset) → ``False``. Defaults to OFF so
test environments must opt-in explicitly per strategy.
"""
raw = os.environ.get(flag_name, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}
def _c1_config(config: "Config") -> "C1VioConfig":
"""Pull the registered C1 config block.
``c1_vio.__init__`` registers it on import; a missing
registration is a developer error and surfaces as ``KeyError``
rather than a silent fallback.
"""
return config.components["c1_vio"]
def build_vio_strategy(
config: "Config",
*,
fdr_client: "FdrClient",
) -> "VioStrategy":
"""Construct the :class:`VioStrategy` impl selected by config.
1. Reads ``config.components['c1_vio'].strategy``.
2. Checks the matching ``BUILD_*`` flag — if OFF, raises
:class:`StrategyNotAvailableError` BEFORE any import.
3. Lazily imports the concrete strategy module.
4. Constructs and returns the strategy instance, passing
``config`` and ``fdr_client``.
Raises :class:`StrategyNotAvailableError` when the compile-time
flag is OFF (canonical Tier-0 path) or when the concrete strategy
module has not been built yet (AZ-332 / AZ-333 / AZ-334 pending).
"""
block = _c1_config(config)
strategy = block.strategy
flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy)
module_info = _STRATEGY_TO_MODULE.get(strategy)
if flag_name is None or module_info is None:
raise StrategyNotAvailableError(
f"VioStrategy {strategy!r} is not buildable in this binary."
)
if not _is_build_flag_on(flag_name):
raise StrategyNotAvailableError(
f"VioStrategy {strategy!r} requires {flag_name}=ON in this "
"binary; the flag is OFF."
)
module_name, class_name = module_info
try:
module = __import__(module_name, fromlist=[class_name])
except ModuleNotFoundError as exc:
raise StrategyNotAvailableError(
f"VioStrategy {strategy!r} is configured but its concrete impl "
f"module {module_name!r} has not been built into this binary "
"yet (AZ-332 / AZ-333 / AZ-334 pending)."
) from exc
strategy_cls = getattr(module, class_name)
return strategy_cls(config, fdr_client=fdr_client)