[AZ-405] Replay — replay_input/ coordinator + IMU take-off auto-sync

Adds the Layer-4 cross-cutting `replay_input/` module per ADR-011:
ReplayInputAdapter converges (video, tlog) into the standard
FrameSource + FcAdapter + Clock surfaces the airborne composition
root consumes. Owns time-alignment between video frames and tlog
IMU/attitude ticks (manual via --time-offset-ms or auto via the
AZ-405 IMU-take-off detector + Farneback motion-onset detector).

Auto-sync algorithm (auto_sync.py):
- Tlog take-off detector: sustained vertical-accel excess > 0.5 g for
  >= 0.5 s + sustained attitude-rate magnitude > 1 rad/s.
- Video motion-onset detector: dense Farneback flow magnitude > 1.5 px
  sustained >= 0.5 s (deterministic per AC-10).
- compute_offset combines the two; confidence = min(tlog, video).
- validate_offset_or_fail implements the AC-9 95 % frame-window match
  validator with configurable threshold + window.

ReplayInputAdapter.open() ordering (AC-13):
1. Load tlog samples + fail-fast on missing RAW_IMU/SCALED_IMU2 or
   ATTITUDE BEFORE any video read.
2. Resolve offset (auto-sync OR manual override; manual bypasses the
   detectors entirely per AC-8).
3. Run AC-9 validator on resolved offset; raise auto-sync hard-fail
   for AC-7 (CLI exit 2 mapping).
4. Build single Clock instance per pace (TlogDerived/ASAP, Wall/REAL).
5. Construct VideoFileFrameSource and TlogReplayFcAdapter with the
   resolved offset baked in (replay protocol Invariant 8).

Structured log + FDR records on auto-sync detected / low-confidence /
AC-8 hard-fail kinds. Idempotent close (AC-12).

Tests: 25 unit tests across tests/unit/replay_input/ covering all 13
ACs (kernel-level synthetic fixtures for AC-1..AC-10; coordinator-
level OpenCV synthetic videos + faked pymavlink for AC-6..AC-13).

Contract update: replay_protocol.md v2.0.0 added fdr_client to the
ReplayInputAdapter __init__ signature (was missing in the prose; the
task spec already listed it in the allowed-imports section).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 09:50:51 +03:00
parent f9b4241d3a
commit 8149083cac
14 changed files with 2979 additions and 4 deletions
@@ -0,0 +1,646 @@
"""Auto-sync detectors + offset compute + AC-9 validator (AZ-405).
Three concerns:
1. **Tlog take-off detector** — walks the head of the tlog, looks for
a sustained vertical-acceleration excess + sustained attitude-rate
excess, returns ``(takeoff_ns, confidence)``.
2. **Video motion-onset detector** — runs OpenCV pyramidal optical
flow over the leading seconds of the video, returns
``(motion_onset_ns, confidence)``.
3. **AC-9 frame-window match validator** — given a candidate offset
and the tlog/video timestamp series, returns 0 if ≥ 95 % of
video frames have an IMU sample within ± 100 ms after the offset
is applied; 2 otherwise.
The detector functions are split into a thin path-reading wrapper
(``detect_tlog_takeoff`` / ``detect_video_motion_onset``) and a pure
sample-driven core (``_compute_tlog_takeoff_from_samples`` /
``_compute_video_onset_from_samples``). Tests exercise the pure cores
directly with synthetic fixtures; production calls the wrappers,
which read the tlog via ``pymavlink`` and the video via ``cv2``.
Both wrappers accept an optional ``source_factory`` (tlog) /
``frames_factory`` (video) injection point so unit tests can swap in
fakes without touching the filesystem (mirrors AZ-399's pattern).
"""
from __future__ import annotations
import bisect
import math
import os
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
from gps_denied_onboard._types.fc import FcKind
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
from gps_denied_onboard.replay_input.interface import AutoSyncConfig, AutoSyncDecision
if TYPE_CHECKING:
import numpy as np
__all__ = [
"TlogSamples",
"compute_offset",
"detect_tlog_takeoff",
"detect_video_motion_onset",
"validate_offset_or_fail",
]
# Conversion: MAVLink RAW_IMU / SCALED_IMU2 publish accelerometer
# components in mG (milli-G); 1 g ≡ 9.80665 m/s² by ISO 80000-3.
_MG_PER_G: float = 1000.0
# Per the AZ-405 spec, the vertical-accel signal of interest is the
# magnitude excess above gravity (i.e., body acceleration regardless
# of frame orientation). At rest |a| ≈ 1 g; during upward thrust |a|
# > 1 g; during free-fall |a| ≈ 0 g. The take-off pattern is a
# sustained excess with positive sign (upward thrust), so we use
# ``|total_g - 1.0|`` as the criterion.
_REST_TOTAL_G: float = 1.0
# ---------------------------------------------------------------------
# DTOs (internal — public API surfaces results via AutoSyncDecision)
@dataclass(frozen=True, slots=True)
class _DetectorResult:
"""Outcome of a single detector pass.
``onset_ns`` is the best-guess event start (ns); ``confidence``
is in [0, 1] and reflects how sustained the signal was relative
to the configured threshold + sustained-time requirement.
"""
onset_ns: int
confidence: float
@dataclass(frozen=True, slots=True)
class TlogSamples:
"""Pre-loaded tlog samples extracted by the take-off detector.
Used as the input shape for :func:`_compute_tlog_takeoff_from_samples`
so unit tests can build a deterministic fixture without parsing a
real ``.tlog`` file.
Attributes:
accel: Sequence of ``(ts_ns, total_accel_g)`` pairs sourced
from ``RAW_IMU`` / ``SCALED_IMU2`` messages.
attitude: Sequence of ``(ts_ns, roll_rad, pitch_rad, yaw_rad)``
tuples sourced from ``ATTITUDE`` messages.
imu_count_by_type: Map of message-type-name → count, used for
the ``"tlog missing required message types: [...]"``
error path (R-DEMO-3).
"""
accel: tuple[tuple[int, float], ...]
attitude: tuple[tuple[int, float, float, float], ...]
imu_count_by_type: dict[str, int]
# ---------------------------------------------------------------------
# Public entrypoints
def detect_tlog_takeoff(
tlog_path: Path,
target_fc_dialect: FcKind,
config: AutoSyncConfig,
*,
source_factory: Callable[[str], Any] | None = None,
) -> _DetectorResult:
"""Walk the tlog head, detect the take-off pattern, return result.
Args:
tlog_path: Path to the tlog file. Existence is checked at
entry.
target_fc_dialect: ``ARDUPILOT_PLANE`` or ``INAV``. Both speak
``ardupilotmega`` MAVLink on the GCS telemetry channel
(the iNav-side native MSP traffic is irrelevant here);
this parameter is accepted for parity with the rest of
the replay surface and is also used in the missing-
messages error to name the dialect explicitly.
config: Operator-tunable thresholds (see
:class:`AutoSyncConfig`).
source_factory: Test-only injection — when provided, replaces
the pymavlink open call with the factory's return value.
The factory must yield an object with ``recv_match`` /
``close`` semantics matching pymavlink's
``mavutil.mavlink_connection``.
Raises:
ReplayInputAdapterError: When the tlog is missing
``RAW_IMU`` / ``SCALED_IMU2`` (no IMU samples) or
``ATTITUDE`` (no attitude samples). This is the R-DEMO-3
fail-fast path — it surfaces BEFORE any video read in the
coordinator's ``open()`` flow.
"""
if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV):
raise ReplayInputAdapterError(
f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; got {target_fc_dialect!r}"
)
if not tlog_path.is_file():
raise ReplayInputAdapterError(f"tlog file not found: {tlog_path}")
samples = _load_tlog_samples(
tlog_path,
config.prescan_max_messages,
source_factory=source_factory,
)
return _compute_tlog_takeoff_from_samples(samples, config)
def detect_video_motion_onset(
video_path: Path,
config: AutoSyncConfig,
*,
frames_factory: Callable[[Path, float], Iterable[tuple[int, "np.ndarray"]]]
| None = None,
) -> _DetectorResult:
"""Scan the leading video segment, detect motion onset, return result.
Args:
video_path: Path to an MP4 / MKV / AVI file.
config: Operator-tunable thresholds (see
:class:`AutoSyncConfig`).
frames_factory: Test-only injection — when provided, returns
a synthetic iterable of ``(monotonic_ns, frame_bgr)``
tuples. Must yield at least 2 frames for the pairwise
optical-flow magnitudes to compute.
Raises:
ReplayInputAdapterError: When the video file is missing or
unreadable, or fewer than 2 frames are decoded.
"""
if not video_path.is_file():
raise ReplayInputAdapterError(f"video file not found: {video_path}")
if frames_factory is None:
frames = list(_read_video_frames(video_path, config.video_motion_scan_seconds))
else:
frames = list(frames_factory(video_path, config.video_motion_scan_seconds))
if len(frames) < 2:
raise ReplayInputAdapterError(
f"video file unreadable or too short: {video_path} "
f"(decoded {len(frames)} frame(s); need ≥ 2)"
)
flow_samples = _compute_flow_magnitudes(frames)
return _compute_video_onset_from_samples(flow_samples, config)
def compute_offset(
tlog_result: _DetectorResult,
video_result: _DetectorResult,
) -> AutoSyncDecision:
"""Combine tlog + video detector outputs into an :class:`AutoSyncDecision`.
Offset semantics (positive = video starts before take-off recorded
in tlog): ``offset_ns = tlog_takeoff_ns - video_motion_onset_ns``.
Combined confidence = ``min(tlog_confidence, video_confidence)`` —
the weakest signal dominates so downstream WARN-and-proceed (AC-6)
fires whenever either side is unreliable.
"""
offset_ns = tlog_result.onset_ns - video_result.onset_ns
combined = min(tlog_result.confidence, video_result.confidence)
return AutoSyncDecision(
offset_ms=offset_ns // 1_000_000,
tlog_takeoff_ns=tlog_result.onset_ns,
video_motion_onset_ns=video_result.onset_ns,
tlog_confidence=tlog_result.confidence,
video_confidence=video_result.confidence,
combined_confidence=combined,
)
def validate_offset_or_fail(
offset_ms: int,
tlog_imu_timestamps_ns: Iterable[int],
video_frame_timestamps_ns: Iterable[int],
threshold_pct: float,
*,
window_ms: int = 100,
) -> int:
"""AC-9 frame-window match validator.
Returns ``0`` when ≥ ``threshold_pct`` % of video frames have an
IMU sample within ± ``window_ms`` after the offset is applied;
returns ``2`` otherwise (CLI exit code for AC-8 hard-fail).
The check is symmetric in offset sign — the offset is added to
each video timestamp and the nearest tlog IMU timestamp is then
looked up by binary search.
"""
video_list = list(video_frame_timestamps_ns)
if not video_list:
# Degenerate input — no frames to match. The replay binary
# rejects empty videos earlier, so reaching this branch
# would be a bug; return 2 so the operator sees the hard-fail
# rather than a false PASS.
return 2
tlog_sorted = sorted(tlog_imu_timestamps_ns)
if not tlog_sorted:
return 2
offset_ns = int(offset_ms) * 1_000_000
window_ns = int(window_ms) * 1_000_000
matched = 0
for vts in video_list:
target_ns = vts + offset_ns
idx = bisect.bisect_left(tlog_sorted, target_ns)
# The nearest IMU sample is whichever of the immediate
# neighbours of `target_ns` is closer. Either may be out of
# range at the ends of the array.
nearest: int | None = None
for j in (idx - 1, idx):
if 0 <= j < len(tlog_sorted):
cand = tlog_sorted[j]
if nearest is None or abs(cand - target_ns) < abs(nearest - target_ns):
nearest = cand
if nearest is not None and abs(nearest - target_ns) <= window_ns:
matched += 1
match_pct = (matched / len(video_list)) * 100.0
return 0 if match_pct >= threshold_pct else 2
# ---------------------------------------------------------------------
# Pure compute kernels (testable without disk IO)
def _compute_tlog_takeoff_from_samples(
samples: TlogSamples,
config: AutoSyncConfig,
) -> _DetectorResult:
"""Pure detector: turn pre-loaded tlog samples into a result.
Algorithm: find the first sustained-window where (a) accel
magnitude excess above 1 g exceeds the threshold for at least
``sustained_seconds``, and (b) attitude-rate magnitude exceeds
its threshold sustained over the same duration. Combined
confidence = ``min(accel_ratio, attitude_ratio)`` — both
signals must agree for a high-confidence take-off.
Raises:
ReplayInputAdapterError: When the tlog had no IMU samples or
no ATTITUDE samples (R-DEMO-3 fail-fast).
"""
if not samples.accel:
missing = ["RAW_IMU", "SCALED_IMU2"]
raise ReplayInputAdapterError(
f"tlog missing required message types: {missing}"
)
if not samples.attitude:
raise ReplayInputAdapterError(
"tlog missing required message types: ['ATTITUDE']"
)
sustained_ns = int(config.sustained_seconds * 1_000_000_000)
# Pair-wise attitude rates (rad/s magnitude vector) — emitted at
# the timestamp of the LATER sample so the rate aligns with when
# it is observable downstream.
attitude_rates: list[tuple[int, float]] = []
for i in range(1, len(samples.attitude)):
ts_prev, roll_prev, pitch_prev, yaw_prev = samples.attitude[i - 1]
ts_curr, roll_curr, pitch_curr, yaw_curr = samples.attitude[i]
dt_s = (ts_curr - ts_prev) / 1_000_000_000.0
if dt_s <= 0.0:
continue
dr = roll_curr - roll_prev
dp = pitch_curr - pitch_prev
dy = _wrap_pi(yaw_curr - yaw_prev)
rate_mag = math.sqrt((dr / dt_s) ** 2 + (dp / dt_s) ** 2 + (dy / dt_s) ** 2)
attitude_rates.append((ts_curr, rate_mag))
accel_excess = tuple(
(ts, abs(total_g - _REST_TOTAL_G)) for ts, total_g in samples.accel
)
accel_event = _find_sustained_event(
accel_excess,
threshold=config.takeoff_accel_threshold_g,
sustained_ns=sustained_ns,
)
attitude_event = _find_sustained_event(
tuple(attitude_rates),
threshold=config.takeoff_attitude_rate_threshold_rad_s,
sustained_ns=sustained_ns,
)
if accel_event is None and attitude_event is None:
# Neither signal crossed; best we can do is flag "no clear
# take-off" so the coordinator can WARN and continue with the
# tlog start as a fallback origin.
first_ns = samples.accel[0][0]
return _DetectorResult(onset_ns=first_ns, confidence=0.0)
if accel_event is not None and attitude_event is not None:
# Both signals fired — they should both point at the same
# event. We adopt the EARLIER of the two onsets so the offset
# is referenced against the moment thrust began (the attitude
# body-rate spike usually trails the thrust by a few hundred
# ms during a vertical climb).
onset_ns = min(accel_event[0], attitude_event[0])
# Confidence is the weakest of the two signals, scaled by
# how cleanly they agree. We keep it simple: min().
confidence = min(accel_event[1], attitude_event[1])
elif accel_event is not None:
# Only the accel signal — discount confidence so the
# combined offset eventually trips the WARN-and-proceed
# threshold (combined_confidence < 0.80 → AC-6).
onset_ns, raw_conf = accel_event
confidence = raw_conf * 0.6
else:
# Only attitude rate — same rationale as above. The
# mypy-narrowing else covers attitude_event is not None.
assert attitude_event is not None
onset_ns, raw_conf = attitude_event
confidence = raw_conf * 0.6
return _DetectorResult(onset_ns=onset_ns, confidence=confidence)
def _compute_video_onset_from_samples(
flow_samples: tuple[tuple[int, float], ...],
config: AutoSyncConfig,
) -> _DetectorResult:
"""Pure detector: turn pre-computed optical-flow magnitudes into a result.
Algorithm: find the first sustained window where the flow
magnitude exceeds the configured threshold for at least
``sustained_seconds``. Confidence = sustained ratio.
"""
if not flow_samples:
return _DetectorResult(onset_ns=0, confidence=0.0)
sustained_ns = int(config.sustained_seconds * 1_000_000_000)
event = _find_sustained_event(
flow_samples,
threshold=config.video_motion_threshold,
sustained_ns=sustained_ns,
)
if event is None:
return _DetectorResult(onset_ns=flow_samples[0][0], confidence=0.0)
onset_ns, confidence = event
return _DetectorResult(onset_ns=onset_ns, confidence=confidence)
def _find_sustained_event(
samples: tuple[tuple[int, float], ...] | list[tuple[int, float]],
*,
threshold: float,
sustained_ns: int,
) -> tuple[int, float] | None:
"""Sliding-window scan: return ``(start_ns, ratio)`` of the
earliest window where the fraction of samples above
``threshold`` is maximised, provided that fraction is ≥ 0.5
(signal-vs-noise floor) and the window covers at least 80 % of
``sustained_ns`` (guards against truncated windows at the tail).
Returns ``None`` when no qualifying window exists.
"""
seq = list(samples)
n = len(seq)
if n < 2:
return None
best_start_ns: int | None = None
best_ratio = 0.0
min_window_ns = int(sustained_ns * 0.8)
for i in range(n):
start_ns = seq[i][0]
end_ns = start_ns + sustained_ns
# Walk j forward while still inside the window.
j = i
above = 0
while j < n and seq[j][0] <= end_ns:
if seq[j][1] > threshold:
above += 1
j += 1
window_size = j - i
if window_size < 2:
continue
window_dur_ns = seq[j - 1][0] - start_ns
if window_dur_ns < min_window_ns:
continue
ratio = above / window_size
if ratio > best_ratio:
best_ratio = ratio
best_start_ns = start_ns
if best_start_ns is None or best_ratio < 0.5:
return None
return (best_start_ns, best_ratio)
def _wrap_pi(angle_rad: float) -> float:
"""Wrap an angle delta into ``(-π, π]`` to handle yaw wrap-around."""
twopi = 2.0 * math.pi
a = angle_rad % twopi
if a > math.pi:
a -= twopi
return a
# ---------------------------------------------------------------------
# Disk-reading wrappers (production paths)
_REQUIRED_TLOG_TYPES: tuple[str, ...] = (
"RAW_IMU",
"SCALED_IMU2",
"ATTITUDE",
)
def _load_tlog_samples(
tlog_path: Path,
max_messages: int,
*,
source_factory: Callable[[str], Any] | None,
) -> TlogSamples:
"""Stream the tlog head, capture IMU + ATTITUDE samples.
Mirrors the AZ-399 source-factory test pattern: production builds
use ``pymavlink`` lazily; tests pass an in-memory fake.
"""
source = _open_tlog(tlog_path, source_factory=source_factory)
accel: list[tuple[int, float]] = []
attitude: list[tuple[int, float, float, float]] = []
counts: dict[str, int] = {}
try:
for _ in range(max_messages):
try:
msg = source.recv_match(
type=list(_REQUIRED_TLOG_TYPES),
blocking=False,
)
except Exception as exc: # pragma: no cover — defensive.
raise ReplayInputAdapterError(
f"tlog scan failed on {tlog_path}: {exc!r}"
) from exc
if msg is None:
break
msg_type = _safe_msg_type(msg)
if not msg_type:
continue
counts[msg_type] = counts.get(msg_type, 0) + 1
ts_ns = _msg_timestamp_ns(msg)
if msg_type in ("RAW_IMU", "SCALED_IMU2"):
xa = float(getattr(msg, "xacc", 0.0)) / _MG_PER_G
ya = float(getattr(msg, "yacc", 0.0)) / _MG_PER_G
za = float(getattr(msg, "zacc", 0.0)) / _MG_PER_G
total_g = math.sqrt(xa * xa + ya * ya + za * za)
accel.append((ts_ns, total_g))
elif msg_type == "ATTITUDE":
roll = float(getattr(msg, "roll", 0.0))
pitch = float(getattr(msg, "pitch", 0.0))
yaw = float(getattr(msg, "yaw", 0.0))
attitude.append((ts_ns, roll, pitch, yaw))
finally:
if hasattr(source, "close"):
try:
source.close()
except Exception: # pragma: no cover — defensive.
pass
return TlogSamples(
accel=tuple(accel),
attitude=tuple(attitude),
imu_count_by_type=counts,
)
def _open_tlog(
tlog_path: Path,
*,
source_factory: Callable[[str], Any] | None,
) -> Any:
if source_factory is not None:
return source_factory(str(tlog_path))
try:
from pymavlink import mavutil # type: ignore[import-not-found]
except ImportError as exc:
raise ReplayInputAdapterError(
"pymavlink is required for replay auto-sync but is not "
"importable in this binary"
) from exc
return mavutil.mavlink_connection(
str(tlog_path),
dialect="ardupilotmega",
mavlink_version="2.0",
)
def _safe_msg_type(msg: Any) -> str:
try:
if hasattr(msg, "get_type"):
return str(msg.get_type())
except Exception:
return ""
return type(msg).__name__
def _msg_timestamp_ns(msg: Any) -> int:
raw = getattr(msg, "_timestamp", None)
if raw is None:
raise ReplayInputAdapterError(
"tlog message missing _timestamp attribute; pymavlink "
"mavlogfile should populate it on every recv_match() return"
)
return int(float(raw) * 1_000_000_000)
def _read_video_frames(
video_path: Path,
scan_seconds: float,
) -> Iterable[tuple[int, "np.ndarray"]]:
"""Decode the leading ``scan_seconds`` of the video.
Yields ``(monotonic_ns, frame_bgr)`` tuples where ``monotonic_ns``
is the file's per-frame ``CAP_PROP_POS_MSEC × 1e6`` so the
returned timestamps align with what
:class:`VideoFileFrameSource` will report later. The Python
``time.monotonic_ns()`` is NOT used — the auto-sync result has to
be deterministic across runs (AC-10) and tied to the video
timeline.
"""
try:
import cv2 as _cv2 # type: ignore[import-not-found]
except ImportError as exc:
raise ReplayInputAdapterError(
"opencv-python is required for replay auto-sync but is "
"not importable in this binary"
) from exc
capture = _cv2.VideoCapture(str(video_path))
if not capture.isOpened():
capture.release()
raise ReplayInputAdapterError(
f"video file unreadable / unsupported codec: {video_path}"
)
try:
max_pos_ms = scan_seconds * 1000.0
while True:
ok, frame = capture.read()
if not ok or frame is None:
break
pos_ms = float(capture.get(_cv2.CAP_PROP_POS_MSEC))
if pos_ms > max_pos_ms:
break
ts_ns = int(pos_ms * 1_000_000)
yield ts_ns, frame
finally:
capture.release()
def _compute_flow_magnitudes(
frames: list[tuple[int, "np.ndarray"]],
) -> tuple[tuple[int, float], ...]:
"""Pairwise mean optical-flow magnitude between consecutive frames.
Uses Farneback dense flow (``cv2.calcOpticalFlowFarneback``)
rather than pyramidal LK because Farneback returns a flow field
over the whole image with no per-frame feature-tracking state, so
the result is deterministic given the same input frames (AC-10).
Returns ``((ts_ns_of_second_frame, mean_magnitude_px), ...)``.
"""
try:
import cv2 as _cv2 # type: ignore[import-not-found]
import numpy as _np # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover — guarded at call sites.
raise ReplayInputAdapterError(
"opencv-python + numpy are required for replay auto-sync"
) from exc
if len(frames) < 2:
return ()
# Convert all frames to grayscale once up-front so the per-pair
# cost is dominated by the optical-flow computation itself.
gray_frames = []
for ts_ns, frame in frames:
gray = _cv2.cvtColor(frame, _cv2.COLOR_BGR2GRAY)
gray_frames.append((ts_ns, gray))
out: list[tuple[int, float]] = []
for i in range(1, len(gray_frames)):
prev_ts, prev = gray_frames[i - 1]
curr_ts, curr = gray_frames[i]
flow = _cv2.calcOpticalFlowFarneback(
prev,
curr,
None,
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0,
)
# ``flow`` shape: (H, W, 2) — dx + dy per pixel.
magnitudes = _np.sqrt(flow[..., 0] ** 2 + flow[..., 1] ** 2)
mean_mag = float(magnitudes.mean())
out.append((curr_ts, mean_mag))
return tuple(out)
# Re-export the BUILD-flag check for symmetry with other replay modules.
def _build_flag_on(name: str) -> bool:
raw = os.environ.get(name, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}