Files
gps-denied-onboard/src/gps_denied_onboard/replay_input/auto_sync.py
T
Oleksandr Bezdieniezhnykh 8149083cac [AZ-405] Replay — replay_input/ coordinator + IMU take-off auto-sync
Adds the Layer-4 cross-cutting `replay_input/` module per ADR-011:
ReplayInputAdapter converges (video, tlog) into the standard
FrameSource + FcAdapter + Clock surfaces the airborne composition
root consumes. Owns time-alignment between video frames and tlog
IMU/attitude ticks (manual via --time-offset-ms or auto via the
AZ-405 IMU-take-off detector + Farneback motion-onset detector).

Auto-sync algorithm (auto_sync.py):
- Tlog take-off detector: sustained vertical-accel excess > 0.5 g for
  >= 0.5 s + sustained attitude-rate magnitude > 1 rad/s.
- Video motion-onset detector: dense Farneback flow magnitude > 1.5 px
  sustained >= 0.5 s (deterministic per AC-10).
- compute_offset combines the two; confidence = min(tlog, video).
- validate_offset_or_fail implements the AC-9 95 % frame-window match
  validator with configurable threshold + window.

ReplayInputAdapter.open() ordering (AC-13):
1. Load tlog samples + fail-fast on missing RAW_IMU/SCALED_IMU2 or
   ATTITUDE BEFORE any video read.
2. Resolve offset (auto-sync OR manual override; manual bypasses the
   detectors entirely per AC-8).
3. Run AC-9 validator on resolved offset; raise auto-sync hard-fail
   for AC-7 (CLI exit 2 mapping).
4. Build single Clock instance per pace (TlogDerived/ASAP, Wall/REAL).
5. Construct VideoFileFrameSource and TlogReplayFcAdapter with the
   resolved offset baked in (replay protocol Invariant 8).

Structured log + FDR records on auto-sync detected / low-confidence /
AC-8 hard-fail kinds. Idempotent close (AC-12).

Tests: 25 unit tests across tests/unit/replay_input/ covering all 13
ACs (kernel-level synthetic fixtures for AC-1..AC-10; coordinator-
level OpenCV synthetic videos + faked pymavlink for AC-6..AC-13).

Contract update: replay_protocol.md v2.0.0 added fdr_client to the
ReplayInputAdapter __init__ signature (was missing in the prose; the
task spec already listed it in the allowed-imports section).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 09:50:51 +03:00

647 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Auto-sync detectors + offset compute + AC-9 validator (AZ-405).
Three concerns:
1. **Tlog take-off detector** — walks the head of the tlog, looks for
a sustained vertical-acceleration excess + sustained attitude-rate
excess, returns ``(takeoff_ns, confidence)``.
2. **Video motion-onset detector** — runs OpenCV pyramidal optical
flow over the leading seconds of the video, returns
``(motion_onset_ns, confidence)``.
3. **AC-9 frame-window match validator** — given a candidate offset
and the tlog/video timestamp series, returns 0 if ≥ 95 % of
video frames have an IMU sample within ± 100 ms after the offset
is applied; 2 otherwise.
The detector functions are split into a thin path-reading wrapper
(``detect_tlog_takeoff`` / ``detect_video_motion_onset``) and a pure
sample-driven core (``_compute_tlog_takeoff_from_samples`` /
``_compute_video_onset_from_samples``). Tests exercise the pure cores
directly with synthetic fixtures; production calls the wrappers,
which read the tlog via ``pymavlink`` and the video via ``cv2``.
Both wrappers accept an optional ``source_factory`` (tlog) /
``frames_factory`` (video) injection point so unit tests can swap in
fakes without touching the filesystem (mirrors AZ-399's pattern).
"""
from __future__ import annotations
import bisect
import math
import os
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
from gps_denied_onboard._types.fc import FcKind
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
from gps_denied_onboard.replay_input.interface import AutoSyncConfig, AutoSyncDecision
if TYPE_CHECKING:
import numpy as np
__all__ = [
"TlogSamples",
"compute_offset",
"detect_tlog_takeoff",
"detect_video_motion_onset",
"validate_offset_or_fail",
]
# Conversion: MAVLink RAW_IMU / SCALED_IMU2 publish accelerometer
# components in mG (milli-G); 1 g ≡ 9.80665 m/s² by ISO 80000-3.
_MG_PER_G: float = 1000.0
# Per the AZ-405 spec, the vertical-accel signal of interest is the
# magnitude excess above gravity (i.e., body acceleration regardless
# of frame orientation). At rest |a| ≈ 1 g; during upward thrust |a|
# > 1 g; during free-fall |a| ≈ 0 g. The take-off pattern is a
# sustained excess with positive sign (upward thrust), so we use
# ``|total_g - 1.0|`` as the criterion.
_REST_TOTAL_G: float = 1.0
# ---------------------------------------------------------------------
# DTOs (internal — public API surfaces results via AutoSyncDecision)
@dataclass(frozen=True, slots=True)
class _DetectorResult:
"""Outcome of a single detector pass.
``onset_ns`` is the best-guess event start (ns); ``confidence``
is in [0, 1] and reflects how sustained the signal was relative
to the configured threshold + sustained-time requirement.
"""
onset_ns: int
confidence: float
@dataclass(frozen=True, slots=True)
class TlogSamples:
"""Pre-loaded tlog samples extracted by the take-off detector.
Used as the input shape for :func:`_compute_tlog_takeoff_from_samples`
so unit tests can build a deterministic fixture without parsing a
real ``.tlog`` file.
Attributes:
accel: Sequence of ``(ts_ns, total_accel_g)`` pairs sourced
from ``RAW_IMU`` / ``SCALED_IMU2`` messages.
attitude: Sequence of ``(ts_ns, roll_rad, pitch_rad, yaw_rad)``
tuples sourced from ``ATTITUDE`` messages.
imu_count_by_type: Map of message-type-name → count, used for
the ``"tlog missing required message types: [...]"``
error path (R-DEMO-3).
"""
accel: tuple[tuple[int, float], ...]
attitude: tuple[tuple[int, float, float, float], ...]
imu_count_by_type: dict[str, int]
# ---------------------------------------------------------------------
# Public entrypoints
def detect_tlog_takeoff(
tlog_path: Path,
target_fc_dialect: FcKind,
config: AutoSyncConfig,
*,
source_factory: Callable[[str], Any] | None = None,
) -> _DetectorResult:
"""Walk the tlog head, detect the take-off pattern, return result.
Args:
tlog_path: Path to the tlog file. Existence is checked at
entry.
target_fc_dialect: ``ARDUPILOT_PLANE`` or ``INAV``. Both speak
``ardupilotmega`` MAVLink on the GCS telemetry channel
(the iNav-side native MSP traffic is irrelevant here);
this parameter is accepted for parity with the rest of
the replay surface and is also used in the missing-
messages error to name the dialect explicitly.
config: Operator-tunable thresholds (see
:class:`AutoSyncConfig`).
source_factory: Test-only injection — when provided, replaces
the pymavlink open call with the factory's return value.
The factory must yield an object with ``recv_match`` /
``close`` semantics matching pymavlink's
``mavutil.mavlink_connection``.
Raises:
ReplayInputAdapterError: When the tlog is missing
``RAW_IMU`` / ``SCALED_IMU2`` (no IMU samples) or
``ATTITUDE`` (no attitude samples). This is the R-DEMO-3
fail-fast path — it surfaces BEFORE any video read in the
coordinator's ``open()`` flow.
"""
if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV):
raise ReplayInputAdapterError(
f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; got {target_fc_dialect!r}"
)
if not tlog_path.is_file():
raise ReplayInputAdapterError(f"tlog file not found: {tlog_path}")
samples = _load_tlog_samples(
tlog_path,
config.prescan_max_messages,
source_factory=source_factory,
)
return _compute_tlog_takeoff_from_samples(samples, config)
def detect_video_motion_onset(
video_path: Path,
config: AutoSyncConfig,
*,
frames_factory: Callable[[Path, float], Iterable[tuple[int, "np.ndarray"]]]
| None = None,
) -> _DetectorResult:
"""Scan the leading video segment, detect motion onset, return result.
Args:
video_path: Path to an MP4 / MKV / AVI file.
config: Operator-tunable thresholds (see
:class:`AutoSyncConfig`).
frames_factory: Test-only injection — when provided, returns
a synthetic iterable of ``(monotonic_ns, frame_bgr)``
tuples. Must yield at least 2 frames for the pairwise
optical-flow magnitudes to compute.
Raises:
ReplayInputAdapterError: When the video file is missing or
unreadable, or fewer than 2 frames are decoded.
"""
if not video_path.is_file():
raise ReplayInputAdapterError(f"video file not found: {video_path}")
if frames_factory is None:
frames = list(_read_video_frames(video_path, config.video_motion_scan_seconds))
else:
frames = list(frames_factory(video_path, config.video_motion_scan_seconds))
if len(frames) < 2:
raise ReplayInputAdapterError(
f"video file unreadable or too short: {video_path} "
f"(decoded {len(frames)} frame(s); need ≥ 2)"
)
flow_samples = _compute_flow_magnitudes(frames)
return _compute_video_onset_from_samples(flow_samples, config)
def compute_offset(
tlog_result: _DetectorResult,
video_result: _DetectorResult,
) -> AutoSyncDecision:
"""Combine tlog + video detector outputs into an :class:`AutoSyncDecision`.
Offset semantics (positive = video starts before take-off recorded
in tlog): ``offset_ns = tlog_takeoff_ns - video_motion_onset_ns``.
Combined confidence = ``min(tlog_confidence, video_confidence)`` —
the weakest signal dominates so downstream WARN-and-proceed (AC-6)
fires whenever either side is unreliable.
"""
offset_ns = tlog_result.onset_ns - video_result.onset_ns
combined = min(tlog_result.confidence, video_result.confidence)
return AutoSyncDecision(
offset_ms=offset_ns // 1_000_000,
tlog_takeoff_ns=tlog_result.onset_ns,
video_motion_onset_ns=video_result.onset_ns,
tlog_confidence=tlog_result.confidence,
video_confidence=video_result.confidence,
combined_confidence=combined,
)
def validate_offset_or_fail(
offset_ms: int,
tlog_imu_timestamps_ns: Iterable[int],
video_frame_timestamps_ns: Iterable[int],
threshold_pct: float,
*,
window_ms: int = 100,
) -> int:
"""AC-9 frame-window match validator.
Returns ``0`` when ≥ ``threshold_pct`` % of video frames have an
IMU sample within ± ``window_ms`` after the offset is applied;
returns ``2`` otherwise (CLI exit code for AC-8 hard-fail).
The check is symmetric in offset sign — the offset is added to
each video timestamp and the nearest tlog IMU timestamp is then
looked up by binary search.
"""
video_list = list(video_frame_timestamps_ns)
if not video_list:
# Degenerate input — no frames to match. The replay binary
# rejects empty videos earlier, so reaching this branch
# would be a bug; return 2 so the operator sees the hard-fail
# rather than a false PASS.
return 2
tlog_sorted = sorted(tlog_imu_timestamps_ns)
if not tlog_sorted:
return 2
offset_ns = int(offset_ms) * 1_000_000
window_ns = int(window_ms) * 1_000_000
matched = 0
for vts in video_list:
target_ns = vts + offset_ns
idx = bisect.bisect_left(tlog_sorted, target_ns)
# The nearest IMU sample is whichever of the immediate
# neighbours of `target_ns` is closer. Either may be out of
# range at the ends of the array.
nearest: int | None = None
for j in (idx - 1, idx):
if 0 <= j < len(tlog_sorted):
cand = tlog_sorted[j]
if nearest is None or abs(cand - target_ns) < abs(nearest - target_ns):
nearest = cand
if nearest is not None and abs(nearest - target_ns) <= window_ns:
matched += 1
match_pct = (matched / len(video_list)) * 100.0
return 0 if match_pct >= threshold_pct else 2
# ---------------------------------------------------------------------
# Pure compute kernels (testable without disk IO)
def _compute_tlog_takeoff_from_samples(
samples: TlogSamples,
config: AutoSyncConfig,
) -> _DetectorResult:
"""Pure detector: turn pre-loaded tlog samples into a result.
Algorithm: find the first sustained-window where (a) accel
magnitude excess above 1 g exceeds the threshold for at least
``sustained_seconds``, and (b) attitude-rate magnitude exceeds
its threshold sustained over the same duration. Combined
confidence = ``min(accel_ratio, attitude_ratio)`` — both
signals must agree for a high-confidence take-off.
Raises:
ReplayInputAdapterError: When the tlog had no IMU samples or
no ATTITUDE samples (R-DEMO-3 fail-fast).
"""
if not samples.accel:
missing = ["RAW_IMU", "SCALED_IMU2"]
raise ReplayInputAdapterError(
f"tlog missing required message types: {missing}"
)
if not samples.attitude:
raise ReplayInputAdapterError(
"tlog missing required message types: ['ATTITUDE']"
)
sustained_ns = int(config.sustained_seconds * 1_000_000_000)
# Pair-wise attitude rates (rad/s magnitude vector) — emitted at
# the timestamp of the LATER sample so the rate aligns with when
# it is observable downstream.
attitude_rates: list[tuple[int, float]] = []
for i in range(1, len(samples.attitude)):
ts_prev, roll_prev, pitch_prev, yaw_prev = samples.attitude[i - 1]
ts_curr, roll_curr, pitch_curr, yaw_curr = samples.attitude[i]
dt_s = (ts_curr - ts_prev) / 1_000_000_000.0
if dt_s <= 0.0:
continue
dr = roll_curr - roll_prev
dp = pitch_curr - pitch_prev
dy = _wrap_pi(yaw_curr - yaw_prev)
rate_mag = math.sqrt((dr / dt_s) ** 2 + (dp / dt_s) ** 2 + (dy / dt_s) ** 2)
attitude_rates.append((ts_curr, rate_mag))
accel_excess = tuple(
(ts, abs(total_g - _REST_TOTAL_G)) for ts, total_g in samples.accel
)
accel_event = _find_sustained_event(
accel_excess,
threshold=config.takeoff_accel_threshold_g,
sustained_ns=sustained_ns,
)
attitude_event = _find_sustained_event(
tuple(attitude_rates),
threshold=config.takeoff_attitude_rate_threshold_rad_s,
sustained_ns=sustained_ns,
)
if accel_event is None and attitude_event is None:
# Neither signal crossed; best we can do is flag "no clear
# take-off" so the coordinator can WARN and continue with the
# tlog start as a fallback origin.
first_ns = samples.accel[0][0]
return _DetectorResult(onset_ns=first_ns, confidence=0.0)
if accel_event is not None and attitude_event is not None:
# Both signals fired — they should both point at the same
# event. We adopt the EARLIER of the two onsets so the offset
# is referenced against the moment thrust began (the attitude
# body-rate spike usually trails the thrust by a few hundred
# ms during a vertical climb).
onset_ns = min(accel_event[0], attitude_event[0])
# Confidence is the weakest of the two signals, scaled by
# how cleanly they agree. We keep it simple: min().
confidence = min(accel_event[1], attitude_event[1])
elif accel_event is not None:
# Only the accel signal — discount confidence so the
# combined offset eventually trips the WARN-and-proceed
# threshold (combined_confidence < 0.80 → AC-6).
onset_ns, raw_conf = accel_event
confidence = raw_conf * 0.6
else:
# Only attitude rate — same rationale as above. The
# mypy-narrowing else covers attitude_event is not None.
assert attitude_event is not None
onset_ns, raw_conf = attitude_event
confidence = raw_conf * 0.6
return _DetectorResult(onset_ns=onset_ns, confidence=confidence)
def _compute_video_onset_from_samples(
flow_samples: tuple[tuple[int, float], ...],
config: AutoSyncConfig,
) -> _DetectorResult:
"""Pure detector: turn pre-computed optical-flow magnitudes into a result.
Algorithm: find the first sustained window where the flow
magnitude exceeds the configured threshold for at least
``sustained_seconds``. Confidence = sustained ratio.
"""
if not flow_samples:
return _DetectorResult(onset_ns=0, confidence=0.0)
sustained_ns = int(config.sustained_seconds * 1_000_000_000)
event = _find_sustained_event(
flow_samples,
threshold=config.video_motion_threshold,
sustained_ns=sustained_ns,
)
if event is None:
return _DetectorResult(onset_ns=flow_samples[0][0], confidence=0.0)
onset_ns, confidence = event
return _DetectorResult(onset_ns=onset_ns, confidence=confidence)
def _find_sustained_event(
samples: tuple[tuple[int, float], ...] | list[tuple[int, float]],
*,
threshold: float,
sustained_ns: int,
) -> tuple[int, float] | None:
"""Sliding-window scan: return ``(start_ns, ratio)`` of the
earliest window where the fraction of samples above
``threshold`` is maximised, provided that fraction is ≥ 0.5
(signal-vs-noise floor) and the window covers at least 80 % of
``sustained_ns`` (guards against truncated windows at the tail).
Returns ``None`` when no qualifying window exists.
"""
seq = list(samples)
n = len(seq)
if n < 2:
return None
best_start_ns: int | None = None
best_ratio = 0.0
min_window_ns = int(sustained_ns * 0.8)
for i in range(n):
start_ns = seq[i][0]
end_ns = start_ns + sustained_ns
# Walk j forward while still inside the window.
j = i
above = 0
while j < n and seq[j][0] <= end_ns:
if seq[j][1] > threshold:
above += 1
j += 1
window_size = j - i
if window_size < 2:
continue
window_dur_ns = seq[j - 1][0] - start_ns
if window_dur_ns < min_window_ns:
continue
ratio = above / window_size
if ratio > best_ratio:
best_ratio = ratio
best_start_ns = start_ns
if best_start_ns is None or best_ratio < 0.5:
return None
return (best_start_ns, best_ratio)
def _wrap_pi(angle_rad: float) -> float:
"""Wrap an angle delta into ``(-π, π]`` to handle yaw wrap-around."""
twopi = 2.0 * math.pi
a = angle_rad % twopi
if a > math.pi:
a -= twopi
return a
# ---------------------------------------------------------------------
# Disk-reading wrappers (production paths)
_REQUIRED_TLOG_TYPES: tuple[str, ...] = (
"RAW_IMU",
"SCALED_IMU2",
"ATTITUDE",
)
def _load_tlog_samples(
tlog_path: Path,
max_messages: int,
*,
source_factory: Callable[[str], Any] | None,
) -> TlogSamples:
"""Stream the tlog head, capture IMU + ATTITUDE samples.
Mirrors the AZ-399 source-factory test pattern: production builds
use ``pymavlink`` lazily; tests pass an in-memory fake.
"""
source = _open_tlog(tlog_path, source_factory=source_factory)
accel: list[tuple[int, float]] = []
attitude: list[tuple[int, float, float, float]] = []
counts: dict[str, int] = {}
try:
for _ in range(max_messages):
try:
msg = source.recv_match(
type=list(_REQUIRED_TLOG_TYPES),
blocking=False,
)
except Exception as exc: # pragma: no cover — defensive.
raise ReplayInputAdapterError(
f"tlog scan failed on {tlog_path}: {exc!r}"
) from exc
if msg is None:
break
msg_type = _safe_msg_type(msg)
if not msg_type:
continue
counts[msg_type] = counts.get(msg_type, 0) + 1
ts_ns = _msg_timestamp_ns(msg)
if msg_type in ("RAW_IMU", "SCALED_IMU2"):
xa = float(getattr(msg, "xacc", 0.0)) / _MG_PER_G
ya = float(getattr(msg, "yacc", 0.0)) / _MG_PER_G
za = float(getattr(msg, "zacc", 0.0)) / _MG_PER_G
total_g = math.sqrt(xa * xa + ya * ya + za * za)
accel.append((ts_ns, total_g))
elif msg_type == "ATTITUDE":
roll = float(getattr(msg, "roll", 0.0))
pitch = float(getattr(msg, "pitch", 0.0))
yaw = float(getattr(msg, "yaw", 0.0))
attitude.append((ts_ns, roll, pitch, yaw))
finally:
if hasattr(source, "close"):
try:
source.close()
except Exception: # pragma: no cover — defensive.
pass
return TlogSamples(
accel=tuple(accel),
attitude=tuple(attitude),
imu_count_by_type=counts,
)
def _open_tlog(
tlog_path: Path,
*,
source_factory: Callable[[str], Any] | None,
) -> Any:
if source_factory is not None:
return source_factory(str(tlog_path))
try:
from pymavlink import mavutil # type: ignore[import-not-found]
except ImportError as exc:
raise ReplayInputAdapterError(
"pymavlink is required for replay auto-sync but is not "
"importable in this binary"
) from exc
return mavutil.mavlink_connection(
str(tlog_path),
dialect="ardupilotmega",
mavlink_version="2.0",
)
def _safe_msg_type(msg: Any) -> str:
try:
if hasattr(msg, "get_type"):
return str(msg.get_type())
except Exception:
return ""
return type(msg).__name__
def _msg_timestamp_ns(msg: Any) -> int:
raw = getattr(msg, "_timestamp", None)
if raw is None:
raise ReplayInputAdapterError(
"tlog message missing _timestamp attribute; pymavlink "
"mavlogfile should populate it on every recv_match() return"
)
return int(float(raw) * 1_000_000_000)
def _read_video_frames(
video_path: Path,
scan_seconds: float,
) -> Iterable[tuple[int, "np.ndarray"]]:
"""Decode the leading ``scan_seconds`` of the video.
Yields ``(monotonic_ns, frame_bgr)`` tuples where ``monotonic_ns``
is the file's per-frame ``CAP_PROP_POS_MSEC × 1e6`` so the
returned timestamps align with what
:class:`VideoFileFrameSource` will report later. The Python
``time.monotonic_ns()`` is NOT used — the auto-sync result has to
be deterministic across runs (AC-10) and tied to the video
timeline.
"""
try:
import cv2 as _cv2 # type: ignore[import-not-found]
except ImportError as exc:
raise ReplayInputAdapterError(
"opencv-python is required for replay auto-sync but is "
"not importable in this binary"
) from exc
capture = _cv2.VideoCapture(str(video_path))
if not capture.isOpened():
capture.release()
raise ReplayInputAdapterError(
f"video file unreadable / unsupported codec: {video_path}"
)
try:
max_pos_ms = scan_seconds * 1000.0
while True:
ok, frame = capture.read()
if not ok or frame is None:
break
pos_ms = float(capture.get(_cv2.CAP_PROP_POS_MSEC))
if pos_ms > max_pos_ms:
break
ts_ns = int(pos_ms * 1_000_000)
yield ts_ns, frame
finally:
capture.release()
def _compute_flow_magnitudes(
frames: list[tuple[int, "np.ndarray"]],
) -> tuple[tuple[int, float], ...]:
"""Pairwise mean optical-flow magnitude between consecutive frames.
Uses Farneback dense flow (``cv2.calcOpticalFlowFarneback``)
rather than pyramidal LK because Farneback returns a flow field
over the whole image with no per-frame feature-tracking state, so
the result is deterministic given the same input frames (AC-10).
Returns ``((ts_ns_of_second_frame, mean_magnitude_px), ...)``.
"""
try:
import cv2 as _cv2 # type: ignore[import-not-found]
import numpy as _np # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover — guarded at call sites.
raise ReplayInputAdapterError(
"opencv-python + numpy are required for replay auto-sync"
) from exc
if len(frames) < 2:
return ()
# Convert all frames to grayscale once up-front so the per-pair
# cost is dominated by the optical-flow computation itself.
gray_frames = []
for ts_ns, frame in frames:
gray = _cv2.cvtColor(frame, _cv2.COLOR_BGR2GRAY)
gray_frames.append((ts_ns, gray))
out: list[tuple[int, float]] = []
for i in range(1, len(gray_frames)):
prev_ts, prev = gray_frames[i - 1]
curr_ts, curr = gray_frames[i]
flow = _cv2.calcOpticalFlowFarneback(
prev,
curr,
None,
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0,
)
# ``flow`` shape: (H, W, 2) — dx + dy per pixel.
magnitudes = _np.sqrt(flow[..., 0] ** 2 + flow[..., 1] ** 2)
mean_mag = float(magnitudes.mean())
out.append((curr_ts, mean_mag))
return tuple(out)
# Re-export the BUILD-flag check for symmetry with other replay modules.
def _build_flag_on(name: str) -> bool:
raw = os.environ.get(name, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}