mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 18:31:13 +00:00
[AZ-611] Add --skip-auto-sync flag to bypass AC-9 validator
Mid-flight fixtures (Derkachi) and stationary-still scenarios (FT-P-01) have no take-off spike for the IMU detector and produce false-positive video motion onsets, so the AC-9 frame-window validator rejects every plausible offset. Add an operator-acknowledged opt-out: a new ReplayConfig.skip_auto_sync_validation flag that suppresses validation, paired with a hard requirement that time_offset_ms also be set (silent-zero guard at both schema and adapter layers). Wired through schema -> CLI (--skip-auto-sync) -> composition root -> ReplayInputAdapter; Derkachi e2e fixture now passes time_offset_ms=0 + skip_auto_sync=True by default since the synth tlog and the video share the same t=0 anchor by construction. 5 new unit tests: * schema gate rejects skip=True without manual offset * schema gate accepts the legal pair * default field value is False (default-construction safety) * adapter constructor mirrors the schema gate * adapter open() bypasses validate_offset_or_fail when flag is set All 38 unit tests in test_az401 + test_az405 pass on Mac. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -35,6 +35,7 @@ from typing import Any, Final
|
|||||||
|
|
||||||
from gps_denied_onboard.config import (
|
from gps_denied_onboard.config import (
|
||||||
Config,
|
Config,
|
||||||
|
ConfigError,
|
||||||
ReplayConfig,
|
ReplayConfig,
|
||||||
load_config,
|
load_config,
|
||||||
)
|
)
|
||||||
@@ -125,6 +126,21 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
"ReplayInputAdapter (AZ-405) auto-detects via IMU take-off."
|
"ReplayInputAdapter (AZ-405) auto-detects via IMU take-off."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--skip-auto-sync",
|
||||||
|
dest="skip_auto_sync_validation",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"AZ-611: Also skip the AC-9 frame-window validator that "
|
||||||
|
"runs on the resolved offset. Only legal in combination "
|
||||||
|
"with --time-offset-ms (a manual offset is mandatory so "
|
||||||
|
"the bypass cannot mask a silent-zero auto-sync result). "
|
||||||
|
"Intended for fixtures where neither the IMU take-off "
|
||||||
|
"detector nor the video motion-onset detector can "
|
||||||
|
"produce a reliable signal (mid-flight clips, stationary "
|
||||||
|
"still-image scenarios)."
|
||||||
|
),
|
||||||
|
)
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
@@ -200,6 +216,7 @@ def _build_replay_config(
|
|||||||
output_path=str(args.output),
|
output_path=str(args.output),
|
||||||
pace=args.pace,
|
pace=args.pace,
|
||||||
time_offset_ms=args.time_offset_ms,
|
time_offset_ms=args.time_offset_ms,
|
||||||
|
skip_auto_sync_validation=bool(args.skip_auto_sync_validation),
|
||||||
target_fc_dialect=base_config.replay.target_fc_dialect,
|
target_fc_dialect=base_config.replay.target_fc_dialect,
|
||||||
auto_sync=base_config.replay.auto_sync,
|
auto_sync=base_config.replay.auto_sync,
|
||||||
)
|
)
|
||||||
@@ -292,6 +309,13 @@ def main(
|
|||||||
except ReplayCliError as exc:
|
except ReplayCliError as exc:
|
||||||
print(f"gps-denied-replay: {exc}", file=sys.stderr, flush=True)
|
print(f"gps-denied-replay: {exc}", file=sys.stderr, flush=True)
|
||||||
return EXIT_GENERIC_FAILURE
|
return EXIT_GENERIC_FAILURE
|
||||||
|
except ConfigError as exc:
|
||||||
|
# ReplayConfig.__post_init__ rejects illegal combinations
|
||||||
|
# (e.g. --skip-auto-sync without --time-offset-ms; AZ-611).
|
||||||
|
# Surface as a clean operator-facing error rather than the
|
||||||
|
# generic-failure stack trace.
|
||||||
|
print(f"gps-denied-replay: {exc}", file=sys.stderr, flush=True)
|
||||||
|
return EXIT_GENERIC_FAILURE
|
||||||
except ReplayInputAdapterError as exc:
|
except ReplayInputAdapterError as exc:
|
||||||
# AC-8 hard-fail: auto-sync detected an offset that violates the
|
# AC-8 hard-fail: auto-sync detected an offset that violates the
|
||||||
# match-window threshold, or the tlog is missing required fields.
|
# match-window threshold, or the tlog is missing required fields.
|
||||||
|
|||||||
@@ -351,7 +351,17 @@ class ReplayConfig:
|
|||||||
:class:`WallClock`.
|
:class:`WallClock`.
|
||||||
time_offset_ms: Manual override for the video-vs-tlog offset.
|
time_offset_ms: Manual override for the video-vs-tlog offset.
|
||||||
``None`` means "run AZ-405 auto-sync"; an integer value
|
``None`` means "run AZ-405 auto-sync"; an integer value
|
||||||
bypasses auto-sync entirely.
|
bypasses auto-sync DETECTION but the AC-9 frame-window
|
||||||
|
validator still runs on the resolved offset.
|
||||||
|
skip_auto_sync_validation: When ``True``, the AC-9 frame-window
|
||||||
|
validator is ALSO bypassed. AZ-611 — intended for fixtures
|
||||||
|
where neither the IMU take-off detector nor the video
|
||||||
|
motion-onset detector can produce a reliable signal
|
||||||
|
(mid-flight clips with no take-off spike, stationary
|
||||||
|
still-image scenarios). Refusable design: only legal in
|
||||||
|
combination with a non-``None`` ``time_offset_ms`` so the
|
||||||
|
offset value is operator-acknowledged rather than the
|
||||||
|
silent-zero default.
|
||||||
target_fc_dialect: One of :data:`KNOWN_FC_DIALECTS`; controls
|
target_fc_dialect: One of :data:`KNOWN_FC_DIALECTS`; controls
|
||||||
which pymavlink dialect the :class:`TlogReplayFcAdapter`
|
which pymavlink dialect the :class:`TlogReplayFcAdapter`
|
||||||
decodes.
|
decodes.
|
||||||
@@ -364,6 +374,7 @@ class ReplayConfig:
|
|||||||
output_path: str = "/tmp/replay.jsonl"
|
output_path: str = "/tmp/replay.jsonl"
|
||||||
pace: str = "asap"
|
pace: str = "asap"
|
||||||
time_offset_ms: int | None = None
|
time_offset_ms: int | None = None
|
||||||
|
skip_auto_sync_validation: bool = False
|
||||||
target_fc_dialect: str = "ardupilot_plane"
|
target_fc_dialect: str = "ardupilot_plane"
|
||||||
auto_sync: ReplayAutoSyncConfig = field(default_factory=ReplayAutoSyncConfig)
|
auto_sync: ReplayAutoSyncConfig = field(default_factory=ReplayAutoSyncConfig)
|
||||||
|
|
||||||
@@ -385,6 +396,23 @@ class ReplayConfig:
|
|||||||
"ReplayConfig.time_offset_ms must be int or None; "
|
"ReplayConfig.time_offset_ms must be int or None; "
|
||||||
f"got {type(self.time_offset_ms).__name__}"
|
f"got {type(self.time_offset_ms).__name__}"
|
||||||
)
|
)
|
||||||
|
if not isinstance(self.skip_auto_sync_validation, bool):
|
||||||
|
raise ConfigError(
|
||||||
|
"ReplayConfig.skip_auto_sync_validation must be a bool; "
|
||||||
|
f"got {type(self.skip_auto_sync_validation).__name__}"
|
||||||
|
)
|
||||||
|
if self.skip_auto_sync_validation and self.time_offset_ms is None:
|
||||||
|
# Skipping validation without a manual offset would mean
|
||||||
|
# the offset comes from auto-sync detection AND the
|
||||||
|
# validator that would catch a bad detection is disabled
|
||||||
|
# — a silent-zero bug magnet. Force the operator to
|
||||||
|
# commit to a value.
|
||||||
|
raise ConfigError(
|
||||||
|
"ReplayConfig.skip_auto_sync_validation=True requires "
|
||||||
|
"ReplayConfig.time_offset_ms to be set (manual offset "
|
||||||
|
"required so the bypass cannot mask a silent-zero "
|
||||||
|
"auto-sync result)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# Documented defaults for cross-cutting blocks ONLY. Per-component defaults
|
# Documented defaults for cross-cutting blocks ONLY. Per-component defaults
|
||||||
|
|||||||
@@ -105,7 +105,16 @@ class ReplayInputAdapter:
|
|||||||
the coordinator's own structured-event mirror.
|
the coordinator's own structured-event mirror.
|
||||||
- ``pace`` — :class:`ReplayPace` (``ASAP`` or ``REALTIME``).
|
- ``pace`` — :class:`ReplayPace` (``ASAP`` or ``REALTIME``).
|
||||||
- ``manual_time_offset_ms`` — ``None`` triggers auto-sync; an
|
- ``manual_time_offset_ms`` — ``None`` triggers auto-sync; an
|
||||||
integer bypasses auto-sync entirely (AC-8).
|
integer bypasses auto-sync DETECTION but the AC-9 frame-window
|
||||||
|
validator still runs on the resolved offset (AC-8).
|
||||||
|
- ``skip_auto_sync_validation`` — when ``True``, ALSO skip the
|
||||||
|
AC-9 validator. Only legal in combination with a non-``None``
|
||||||
|
``manual_time_offset_ms`` (the coordinator refuses both-None
|
||||||
|
to avoid silent-zero offset bugs). Intended for fixtures where
|
||||||
|
neither the IMU take-off detector nor the video motion-onset
|
||||||
|
detector can produce a reliable signal (mid-flight clips,
|
||||||
|
stationary still-image scenarios — see AZ-611). Default
|
||||||
|
``False``.
|
||||||
- ``auto_sync_config`` — :class:`AutoSyncConfig` thresholds.
|
- ``auto_sync_config`` — :class:`AutoSyncConfig` thresholds.
|
||||||
|
|
||||||
Behaviour:
|
Behaviour:
|
||||||
@@ -127,6 +136,7 @@ class ReplayInputAdapter:
|
|||||||
"_fdr_client",
|
"_fdr_client",
|
||||||
"_pace",
|
"_pace",
|
||||||
"_manual_time_offset_ms",
|
"_manual_time_offset_ms",
|
||||||
|
"_skip_auto_sync_validation",
|
||||||
"_auto_sync_config",
|
"_auto_sync_config",
|
||||||
"_tlog_source_factory",
|
"_tlog_source_factory",
|
||||||
"_video_frames_factory",
|
"_video_frames_factory",
|
||||||
@@ -150,6 +160,7 @@ class ReplayInputAdapter:
|
|||||||
pace: ReplayPace,
|
pace: ReplayPace,
|
||||||
manual_time_offset_ms: int | None,
|
manual_time_offset_ms: int | None,
|
||||||
auto_sync_config: AutoSyncConfig,
|
auto_sync_config: AutoSyncConfig,
|
||||||
|
skip_auto_sync_validation: bool = False,
|
||||||
tlog_source_factory: Any | None = None,
|
tlog_source_factory: Any | None = None,
|
||||||
video_frames_factory: Any | None = None,
|
video_frames_factory: Any | None = None,
|
||||||
video_timestamps_factory: Any | None = None,
|
video_timestamps_factory: Any | None = None,
|
||||||
@@ -172,6 +183,22 @@ class ReplayInputAdapter:
|
|||||||
raise ReplayInputAdapterError(
|
raise ReplayInputAdapterError(
|
||||||
f"pace must be a ReplayPace enum; got {type(pace).__name__}"
|
f"pace must be a ReplayPace enum; got {type(pace).__name__}"
|
||||||
)
|
)
|
||||||
|
if not isinstance(skip_auto_sync_validation, bool):
|
||||||
|
raise ReplayInputAdapterError(
|
||||||
|
"skip_auto_sync_validation must be a bool; got "
|
||||||
|
f"{type(skip_auto_sync_validation).__name__}"
|
||||||
|
)
|
||||||
|
if skip_auto_sync_validation and manual_time_offset_ms is None:
|
||||||
|
# Mirror the ReplayConfig.__post_init__ gate. Without a
|
||||||
|
# manual offset there is no operator-acknowledged value
|
||||||
|
# to skip validation against — auto-sync would compute
|
||||||
|
# an offset of unknown quality and the validator that
|
||||||
|
# would catch a bad detection is disabled. Refuse so
|
||||||
|
# this can't silently mask a wrong offset.
|
||||||
|
raise ReplayInputAdapterError(
|
||||||
|
"skip_auto_sync_validation=True requires "
|
||||||
|
"manual_time_offset_ms to be set"
|
||||||
|
)
|
||||||
self._video_path = video_path
|
self._video_path = video_path
|
||||||
self._tlog_path = tlog_path
|
self._tlog_path = tlog_path
|
||||||
self._camera_calibration = camera_calibration
|
self._camera_calibration = camera_calibration
|
||||||
@@ -180,6 +207,7 @@ class ReplayInputAdapter:
|
|||||||
self._fdr_client = fdr_client
|
self._fdr_client = fdr_client
|
||||||
self._pace = pace
|
self._pace = pace
|
||||||
self._manual_time_offset_ms = manual_time_offset_ms
|
self._manual_time_offset_ms = manual_time_offset_ms
|
||||||
|
self._skip_auto_sync_validation = skip_auto_sync_validation
|
||||||
self._auto_sync_config = auto_sync_config
|
self._auto_sync_config = auto_sync_config
|
||||||
self._tlog_source_factory = tlog_source_factory
|
self._tlog_source_factory = tlog_source_factory
|
||||||
self._video_frames_factory = video_frames_factory
|
self._video_frames_factory = video_frames_factory
|
||||||
@@ -221,8 +249,26 @@ class ReplayInputAdapter:
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3 — load video frame timestamps and run AC-9 validator.
|
# Step 3 — load video frame timestamps and run AC-9 validator
|
||||||
|
# unless the operator explicitly opted out via
|
||||||
|
# skip_auto_sync_validation (AZ-611). The opt-out is meant for
|
||||||
|
# mid-flight + stationary fixtures where neither detector can
|
||||||
|
# produce a reliable signal; the constructor already enforced
|
||||||
|
# that the opt-out requires a manual offset.
|
||||||
video_frame_timestamps_ns = self._load_video_timestamps()
|
video_frame_timestamps_ns = self._load_video_timestamps()
|
||||||
|
if self._skip_auto_sync_validation:
|
||||||
|
self._log.info(
|
||||||
|
f"{_LOG_KIND_OPEN_MANUAL}: ac9_validator_skipped "
|
||||||
|
f"(resolved_offset_ms={resolved_offset_ms})",
|
||||||
|
extra={
|
||||||
|
"kind": _LOG_KIND_OPEN_MANUAL,
|
||||||
|
"kv": {
|
||||||
|
"resolved_offset_ms": resolved_offset_ms,
|
||||||
|
"ac9_validator_skipped": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
result_code = validate_offset_or_fail(
|
result_code = validate_offset_or_fail(
|
||||||
resolved_offset_ms,
|
resolved_offset_ms,
|
||||||
tlog_imu_timestamps_ns,
|
tlog_imu_timestamps_ns,
|
||||||
|
|||||||
@@ -225,6 +225,7 @@ def _build_replay_input_bundle(
|
|||||||
fdr_client=fdr_client,
|
fdr_client=fdr_client,
|
||||||
pace=pace,
|
pace=pace,
|
||||||
manual_time_offset_ms=config.replay.time_offset_ms,
|
manual_time_offset_ms=config.replay.time_offset_ms,
|
||||||
|
skip_auto_sync_validation=config.replay.skip_auto_sync_validation,
|
||||||
auto_sync_config=auto_sync,
|
auto_sync_config=auto_sync,
|
||||||
mavlink_transport=mavlink_transport,
|
mavlink_transport=mavlink_transport,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -149,11 +149,22 @@ class ReplayRunResult:
|
|||||||
def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
||||||
"""Return a callable that invokes the ``gps-denied-replay`` console-script.
|
"""Return a callable that invokes the ``gps-denied-replay`` console-script.
|
||||||
|
|
||||||
The callable accepts keyword overrides for ``pace`` and
|
The callable accepts keyword overrides for ``pace``,
|
||||||
``time_offset_ms``; everything else is taken from
|
``time_offset_ms``, and ``skip_auto_sync`` (AZ-611); everything
|
||||||
``derkachi_replay_inputs``. Output is written to a fresh path per
|
else is taken from ``derkachi_replay_inputs``. Output is written
|
||||||
invocation so determinism comparisons (AC-5) get two independent
|
to a fresh path per invocation so determinism comparisons (AC-5)
|
||||||
files.
|
get two independent files.
|
||||||
|
|
||||||
|
Derkachi is a mid-flight fixture (no take-off spike) and the only
|
||||||
|
motion the video detector sees in the first 60 s is camera shake
|
||||||
|
and scenery change — neither tlog nor video can produce a
|
||||||
|
reliable auto-sync signal. The synth tlog and the video share
|
||||||
|
the same ``t=0`` anchor by construction (see
|
||||||
|
``_tlog_synth.py``), so the correct offset is exactly ``0``. The
|
||||||
|
fixture defaults reflect that — heavy ACs pass
|
||||||
|
``time_offset_ms=0`` + ``skip_auto_sync=True`` so the run never
|
||||||
|
touches the AC-9 validator that would otherwise reject the
|
||||||
|
fixture's false-positive video motion onset.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
binary = shutil.which("gps-denied-replay")
|
binary = shutil.which("gps-denied-replay")
|
||||||
@@ -169,7 +180,12 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
|||||||
|
|
||||||
invocation_count = {"n": 0}
|
invocation_count = {"n": 0}
|
||||||
|
|
||||||
def _run(*, pace: str = "asap", time_offset_ms: int | None = None) -> ReplayRunResult:
|
def _run(
|
||||||
|
*,
|
||||||
|
pace: str = "asap",
|
||||||
|
time_offset_ms: int | None = 0,
|
||||||
|
skip_auto_sync: bool = True,
|
||||||
|
) -> ReplayRunResult:
|
||||||
import time
|
import time
|
||||||
|
|
||||||
invocation_count["n"] += 1
|
invocation_count["n"] += 1
|
||||||
@@ -195,6 +211,8 @@ def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
|||||||
]
|
]
|
||||||
if time_offset_ms is not None:
|
if time_offset_ms is not None:
|
||||||
argv.extend(["--time-offset-ms", str(time_offset_ms)])
|
argv.extend(["--time-offset-ms", str(time_offset_ms)])
|
||||||
|
if skip_auto_sync:
|
||||||
|
argv.append("--skip-auto-sync")
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
completed = subprocess.run(
|
completed = subprocess.run(
|
||||||
argv,
|
argv,
|
||||||
|
|||||||
@@ -610,6 +610,81 @@ def test_ac7_ac8_validator_hard_fail_raises_on_open(
|
|||||||
adapter.open()
|
adapter.open()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------
|
||||||
|
# AZ-611 — skip_auto_sync_validation bypasses the AC-9 validator
|
||||||
|
|
||||||
|
|
||||||
|
def test_az611_skip_auto_sync_validation_bypasses_ac9(
|
||||||
|
synthetic_video: Path,
|
||||||
|
synthetic_tlog_path: Path,
|
||||||
|
camera_calibration: CameraCalibration,
|
||||||
|
fake_wgs_converter: mock.MagicMock,
|
||||||
|
fake_fdr_client: mock.MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""A manual offset that WOULD hard-fail AC-9 succeeds when the
|
||||||
|
operator explicitly opts out via ``skip_auto_sync_validation=True``.
|
||||||
|
Mirrors the AC-7 hard-fail scenario above so the bypass is the
|
||||||
|
only variable.
|
||||||
|
"""
|
||||||
|
# Arrange — same manual offset (60 s) that AC-7 above proves
|
||||||
|
# pushes every frame outside the IMU window.
|
||||||
|
messages = _build_takeoff_messages()
|
||||||
|
adapter = ReplayInputAdapter(
|
||||||
|
video_path=synthetic_video,
|
||||||
|
tlog_path=synthetic_tlog_path,
|
||||||
|
camera_calibration=camera_calibration,
|
||||||
|
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||||||
|
wgs_converter=fake_wgs_converter,
|
||||||
|
fdr_client=fake_fdr_client,
|
||||||
|
pace=ReplayPace.ASAP,
|
||||||
|
manual_time_offset_ms=60_000,
|
||||||
|
skip_auto_sync_validation=True,
|
||||||
|
auto_sync_config=AutoSyncConfig(),
|
||||||
|
tlog_source_factory=_factory_for(messages),
|
||||||
|
video_timestamps_factory=_video_timestamps_factory(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
try:
|
||||||
|
bundle = adapter.open()
|
||||||
|
|
||||||
|
# Assert — the bypass let the open() complete with the manual
|
||||||
|
# offset intact, even though the validator would have rejected it.
|
||||||
|
assert bundle.resolved_time_offset_ms == 60_000
|
||||||
|
assert bundle.auto_sync_result is None
|
||||||
|
finally:
|
||||||
|
adapter.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_az611_skip_auto_sync_validation_requires_manual_offset(
|
||||||
|
synthetic_video: Path,
|
||||||
|
synthetic_tlog_path: Path,
|
||||||
|
camera_calibration: CameraCalibration,
|
||||||
|
fake_wgs_converter: mock.MagicMock,
|
||||||
|
fake_fdr_client: mock.MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Constructor refuses ``skip_auto_sync_validation=True`` paired
|
||||||
|
with ``manual_time_offset_ms=None`` (silent-zero guard).
|
||||||
|
"""
|
||||||
|
# Act / Assert
|
||||||
|
with pytest.raises(
|
||||||
|
ReplayInputAdapterError,
|
||||||
|
match=r"skip_auto_sync_validation=True requires.*manual_time_offset_ms",
|
||||||
|
):
|
||||||
|
ReplayInputAdapter(
|
||||||
|
video_path=synthetic_video,
|
||||||
|
tlog_path=synthetic_tlog_path,
|
||||||
|
camera_calibration=camera_calibration,
|
||||||
|
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||||||
|
wgs_converter=fake_wgs_converter,
|
||||||
|
fdr_client=fake_fdr_client,
|
||||||
|
pace=ReplayPace.ASAP,
|
||||||
|
manual_time_offset_ms=None,
|
||||||
|
skip_auto_sync_validation=True,
|
||||||
|
auto_sync_config=AutoSyncConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------
|
# ---------------------------------------------------------------------
|
||||||
# AC-6 — low combined confidence WARN-and-proceed
|
# AC-6 — low combined confidence WARN-and-proceed
|
||||||
|
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
|
|||||||
)
|
)
|
||||||
from gps_denied_onboard.config import (
|
from gps_denied_onboard.config import (
|
||||||
Config,
|
Config,
|
||||||
|
ConfigError,
|
||||||
ReplayAutoSyncConfig,
|
ReplayAutoSyncConfig,
|
||||||
ReplayConfig,
|
ReplayConfig,
|
||||||
RuntimeConfig,
|
RuntimeConfig,
|
||||||
@@ -733,3 +734,58 @@ def test_compose_root_replay_with_no_calib_path_raises(
|
|||||||
match=r"(camera_calibration_path|CAMERA_CALIBRATION_PATH)",
|
match=r"(camera_calibration_path|CAMERA_CALIBRATION_PATH)",
|
||||||
):
|
):
|
||||||
compose_root(config)
|
compose_root(config)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AZ-611 — ReplayConfig.skip_auto_sync_validation schema gate
|
||||||
|
|
||||||
|
|
||||||
|
def test_az611_skip_auto_sync_without_manual_offset_rejected_at_init() -> None:
|
||||||
|
"""``__post_init__`` refuses ``skip_auto_sync_validation=True`` paired
|
||||||
|
with ``time_offset_ms=None`` — the bypass is only legal once the
|
||||||
|
operator has committed to an explicit manual offset.
|
||||||
|
"""
|
||||||
|
# Act / Assert
|
||||||
|
with pytest.raises(
|
||||||
|
ConfigError,
|
||||||
|
match=r"skip_auto_sync_validation=True requires.*time_offset_ms",
|
||||||
|
):
|
||||||
|
ReplayConfig(
|
||||||
|
video_path="/dev/null/fake.mp4",
|
||||||
|
tlog_path="/dev/null/fake.tlog",
|
||||||
|
output_path="/tmp/replay.jsonl",
|
||||||
|
pace="asap",
|
||||||
|
time_offset_ms=None,
|
||||||
|
skip_auto_sync_validation=True,
|
||||||
|
target_fc_dialect="ardupilot_plane",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_az611_skip_auto_sync_with_manual_offset_accepted() -> None:
|
||||||
|
"""The legal combination — ``skip_auto_sync_validation=True`` with an
|
||||||
|
explicit ``time_offset_ms`` — constructs cleanly and round-trips
|
||||||
|
both flags onto the frozen dataclass.
|
||||||
|
"""
|
||||||
|
# Act
|
||||||
|
cfg = ReplayConfig(
|
||||||
|
video_path="/dev/null/fake.mp4",
|
||||||
|
tlog_path="/dev/null/fake.tlog",
|
||||||
|
output_path="/tmp/replay.jsonl",
|
||||||
|
pace="asap",
|
||||||
|
time_offset_ms=0,
|
||||||
|
skip_auto_sync_validation=True,
|
||||||
|
target_fc_dialect="ardupilot_plane",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert cfg.skip_auto_sync_validation is True
|
||||||
|
assert cfg.time_offset_ms == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_az611_skip_auto_sync_defaults_to_false() -> None:
|
||||||
|
"""Default-constructed ReplayConfig must not opt out of validation."""
|
||||||
|
# Act
|
||||||
|
cfg = ReplayConfig()
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert cfg.skip_auto_sync_validation is False
|
||||||
|
|||||||
Reference in New Issue
Block a user