mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 16:01:14 +00:00
[AZ-398] Replay: FrameSource + Clock Protocols + Clock injection
Ship the two Layer-1 cross-cutting Protocols replay mode needs to leave production C1-C5 components mode-agnostic (Invariant 1) and replay- deterministic (Invariant 2). Live + replay binaries see the same interfaces; only the strategy differs. * Clock Protocol (monotonic_ns / time_ns / sleep_until_ns) + WallClock (live + REALTIME replay) + TlogDerivedClock (ASAP replay; advance-on-call; non-monotonic source → ClockOrderingError). * FrameSource Protocol (next_frame -> NavCameraFrame | None / close) + LiveCameraFrameSource (cv2.VideoCapture device index) + VideoFileFrameSource (cv2.VideoCapture file). * Build-flag gating: BUILD_VIDEO_FILE_FRAME_SOURCE, BUILD_LIVE_CAMERA_FRAME_SOURCE (constructor-time check; Tier-0 OFF refuses construction with FrameSourceConfigError). * Composition-root factories: build_clock + build_frame_source. * Injected Clock across every component that previously called time.monotonic_ns() / time.sleep() directly: c5_state (estimator, ESKF, fallback watcher, source-label SM, isam2 handle), c8_fc_adapter (inbound MAVLink + MSP2, AP outbound, iNav outbound, QGC GCS), c13_fdr writer, c12_operator_tooling httpx flights client. All constructors default to WallClock() so existing call sites keep live-binary behaviour without a wiring change. * AC-4 CI guard (tests/_meta/test_no_direct_time_in_components.py) AST-scans components/**/*.py for direct time.monotonic_ns / time.time_ns / time.sleep references and fails loudly with file:line. * Conformance + factory tests: tests/unit/clock + tests/unit/frame_source. * Test fixture updates: FallbackWatcher / SourceLabelStateMachine clock_ns is now required (removed time.monotonic_ns default); test_az388 patches estimator._clock instead of a module-level time; test_az393 ardupilot adapter uses a _FixedClock test double. Excluded per the task spec: TlogReplayFcAdapter (AZ-399), ReplaySink (AZ-400), compose_replay (AZ-401), CLI (AZ-402), Docker/CI (AZ-403), E2E fixture (AZ-404), IMU auto-sync (AZ-405). Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,105 @@
|
||||
"""AC-4 — components MUST NOT call ``time.monotonic_ns`` / ``time.time_ns`` / ``time.sleep``.
|
||||
|
||||
Enforces Invariant 2 of the replay contract
|
||||
(``_docs/02_document/contracts/replay/replay_protocol.md``): every
|
||||
time-driven code path in a C* component consumes an injected
|
||||
:class:`Clock` instead. Replay determinism (R-DEMO-4) collapses the
|
||||
moment a component reaches into the stdlib ``time`` module directly,
|
||||
so this guard runs on every PR touching ``src/gps_denied_onboard/components/``.
|
||||
|
||||
The scan is AST-based — docstrings and comments mentioning the forbidden
|
||||
APIs do NOT trip it; only call sites and attribute references do.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
_FORBIDDEN_ATTRS: frozenset[str] = frozenset(
|
||||
{"monotonic_ns", "time_ns", "sleep"}
|
||||
)
|
||||
|
||||
_COMPONENTS_ROOT: Path = (
|
||||
Path(__file__).parent.parent.parent
|
||||
/ "src"
|
||||
/ "gps_denied_onboard"
|
||||
/ "components"
|
||||
)
|
||||
|
||||
|
||||
def _python_files_under(root: Path) -> list[Path]:
|
||||
return sorted(p for p in root.rglob("*.py") if p.is_file())
|
||||
|
||||
|
||||
def _find_direct_time_references(source: str) -> list[tuple[int, str]]:
|
||||
"""Return ``(lineno, attribute_name)`` for every direct ``time.X`` ref.
|
||||
|
||||
Only flags ``ast.Attribute(value=ast.Name(id='time'), attr=<x>)``
|
||||
where ``<x>`` is one of the forbidden names. Aliased imports
|
||||
(``import time as t`` → ``t.monotonic_ns()``) are intentionally NOT
|
||||
caught — the component code convention is to avoid such aliases, and
|
||||
catching them would require flow-sensitive analysis.
|
||||
"""
|
||||
hits: list[tuple[int, str]] = []
|
||||
tree = ast.parse(source)
|
||||
for node in ast.walk(tree):
|
||||
if not isinstance(node, ast.Attribute):
|
||||
continue
|
||||
if not isinstance(node.value, ast.Name):
|
||||
continue
|
||||
if node.value.id != "time":
|
||||
continue
|
||||
if node.attr in _FORBIDDEN_ATTRS:
|
||||
hits.append((node.lineno, f"time.{node.attr}"))
|
||||
return hits
|
||||
|
||||
|
||||
def test_components_have_no_direct_time_references() -> None:
|
||||
# Arrange
|
||||
files = _python_files_under(_COMPONENTS_ROOT)
|
||||
assert files, f"AST scan found no .py files under {_COMPONENTS_ROOT}"
|
||||
offences: list[str] = []
|
||||
# Act
|
||||
for file in files:
|
||||
source = file.read_text(encoding="utf-8")
|
||||
for lineno, attr in _find_direct_time_references(source):
|
||||
rel = file.relative_to(_COMPONENTS_ROOT.parent.parent.parent.parent)
|
||||
offences.append(f"{rel}:{lineno} — {attr}")
|
||||
# Assert
|
||||
assert not offences, (
|
||||
"Invariant 2 violation: direct stdlib-`time` references found in "
|
||||
"`src/gps_denied_onboard/components/**/*.py`. Consume an injected "
|
||||
"`Clock` (`gps_denied_onboard.clock`) instead.\n"
|
||||
+ "\n".join(offences)
|
||||
)
|
||||
|
||||
|
||||
def test_scan_helper_detects_known_forbidden_pattern() -> None:
|
||||
# Arrange — self-check the AST helper so a stale scan can't silently pass.
|
||||
source = "import time\ndef f() -> int:\n return time.monotonic_ns()\n"
|
||||
# Act
|
||||
hits = _find_direct_time_references(source)
|
||||
# Assert
|
||||
assert hits == [(3, "time.monotonic_ns")]
|
||||
|
||||
|
||||
def test_scan_helper_ignores_docstring_mentions() -> None:
|
||||
# Arrange — docstrings naming the forbidden API must not trip the scan.
|
||||
source = '"""This module talks about time.monotonic_ns in prose only."""\n'
|
||||
# Act
|
||||
hits = _find_direct_time_references(source)
|
||||
# Assert
|
||||
assert hits == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize("forbidden", sorted(_FORBIDDEN_ATTRS))
|
||||
def test_scan_helper_detects_each_forbidden_attr(forbidden: str) -> None:
|
||||
# Arrange
|
||||
source = f"import time\ntime.{forbidden}()\n"
|
||||
# Act
|
||||
hits = _find_direct_time_references(source)
|
||||
# Assert
|
||||
assert hits == [(2, f"time.{forbidden}")]
|
||||
@@ -213,7 +213,7 @@ def test_ac6_custom_threshold_5s_engages_at_5s() -> None:
|
||||
|
||||
def test_ac6_zero_threshold_rejected() -> None:
|
||||
with pytest.raises(ValueError, match="threshold_s must be > 0"):
|
||||
FallbackWatcher(threshold_s=0.0, fdr_client=None)
|
||||
FallbackWatcher(threshold_s=0.0, fdr_client=None, clock_ns=lambda: 0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@@ -378,17 +378,14 @@ def test_ac7_isam2_current_estimate_entry_engages_after_threshold() -> None:
|
||||
# and call current_estimate WITHOUT a seeded prior so it raises
|
||||
# EstimatorFatalError after the entry hook engages fallback.
|
||||
estimator._fallback._last_successful_estimate_ns = 0
|
||||
# Patch monotonic_ns inside the estimator module so the entry
|
||||
# hook sees the synthesised "now".
|
||||
# Patch the estimator's injected Clock so the entry hook sees the
|
||||
# synthesised "now" (AZ-398: components consume an injected
|
||||
# :class:`Clock`, not :func:`time.monotonic_ns`).
|
||||
from gps_denied_onboard.components.c5_state.errors import EstimatorFatalError
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"gps_denied_onboard.components.c5_state.gtsam_isam2_estimator.time.monotonic_ns",
|
||||
return_value=int(4.0 * 1e9),
|
||||
),
|
||||
pytest.raises(EstimatorFatalError),
|
||||
):
|
||||
estimator._clock = mock.MagicMock()
|
||||
estimator._clock.monotonic_ns.return_value = int(4.0 * 1e9)
|
||||
with pytest.raises(EstimatorFatalError):
|
||||
estimator.current_estimate()
|
||||
|
||||
assert len(engaged_seen) == 1
|
||||
|
||||
@@ -70,6 +70,22 @@ class _ConnStub:
|
||||
self.closed = True
|
||||
|
||||
|
||||
class _FixedClock:
|
||||
"""Test :class:`Clock` stand-in returning constant ``monotonic_ns``."""
|
||||
|
||||
def __init__(self, ns: int) -> None:
|
||||
self._ns = ns
|
||||
|
||||
def monotonic_ns(self) -> int:
|
||||
return self._ns
|
||||
|
||||
def time_ns(self) -> int:
|
||||
return self._ns
|
||||
|
||||
def sleep_until_ns(self, target_ns: int) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn() -> _ConnStub:
|
||||
return _ConnStub()
|
||||
@@ -85,7 +101,7 @@ def adapter(conn: _ConnStub, tmp_path) -> PymavlinkArdupilotAdapter:
|
||||
wgs_converter=mock.MagicMock(),
|
||||
covariance_projector=cov,
|
||||
fdr_client=fdr,
|
||||
clock=lambda: 1.0,
|
||||
clock=_FixedClock(1_000_000_000),
|
||||
flight_id="flt-test",
|
||||
connect_factory=lambda device, baud: conn,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,180 @@
|
||||
"""AZ-398 — :class:`Clock` Protocol conformance + WallClock parity + TlogDerivedClock semantics."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard.clock import Clock
|
||||
from gps_denied_onboard.clock.tlog_derived import (
|
||||
ClockOrderingError,
|
||||
TlogDerivedClock,
|
||||
)
|
||||
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||
from gps_denied_onboard.runtime_root.clock_factory import build_clock
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-1 — Protocol conformance.
|
||||
|
||||
|
||||
def test_wall_clock_satisfies_clock_protocol() -> None:
|
||||
# Assert
|
||||
assert isinstance(WallClock(), Clock)
|
||||
|
||||
|
||||
def test_tlog_derived_clock_satisfies_clock_protocol() -> None:
|
||||
# Assert
|
||||
assert isinstance(TlogDerivedClock([1, 2, 3]), Clock)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-5 — WallClock parity with :mod:`time`.
|
||||
|
||||
|
||||
def test_wall_clock_monotonic_ns_tracks_stdlib() -> None:
|
||||
# Arrange
|
||||
clock = WallClock()
|
||||
# Act
|
||||
stdlib_before = time.monotonic_ns()
|
||||
clock_now = clock.monotonic_ns()
|
||||
stdlib_after = time.monotonic_ns()
|
||||
# Assert
|
||||
assert stdlib_before <= clock_now <= stdlib_after
|
||||
|
||||
|
||||
def test_wall_clock_time_ns_tracks_stdlib_within_1ms() -> None:
|
||||
# Arrange
|
||||
clock = WallClock()
|
||||
# Act
|
||||
stdlib = time.time_ns()
|
||||
clock_now = clock.time_ns()
|
||||
# Assert
|
||||
assert abs(clock_now - stdlib) <= 1_000_000
|
||||
|
||||
|
||||
def test_wall_clock_sleep_until_ns_blocks_for_about_100ms() -> None:
|
||||
# Arrange
|
||||
clock = WallClock()
|
||||
# Act
|
||||
start = time.monotonic_ns()
|
||||
target = start + 100_000_000 # 100 ms in the future
|
||||
clock.sleep_until_ns(target)
|
||||
elapsed_ns = time.monotonic_ns() - start
|
||||
# Assert — AC-5 allows ±5 ms slack on a 100 ms sleep
|
||||
assert 95_000_000 <= elapsed_ns <= 200_000_000
|
||||
|
||||
|
||||
def test_wall_clock_sleep_until_past_target_is_noop() -> None:
|
||||
# Arrange
|
||||
clock = WallClock()
|
||||
past = clock.monotonic_ns() - 10_000_000_000 # 10 s ago
|
||||
# Act
|
||||
start = time.monotonic_ns()
|
||||
clock.sleep_until_ns(past)
|
||||
elapsed_ns = time.monotonic_ns() - start
|
||||
# Assert — should return almost immediately (no negative sleep)
|
||||
assert elapsed_ns < 5_000_000 # < 5 ms
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-6 — TlogDerivedClock advance-on-call semantics.
|
||||
|
||||
|
||||
def test_tlog_derived_clock_advances_per_call() -> None:
|
||||
# Arrange
|
||||
clock = TlogDerivedClock([1_000_000, 2_000_000, 3_000_000])
|
||||
# Act
|
||||
a = clock.monotonic_ns()
|
||||
b = clock.monotonic_ns()
|
||||
c = clock.monotonic_ns()
|
||||
# Assert
|
||||
assert (a, b, c) == (1_000_000, 2_000_000, 3_000_000)
|
||||
|
||||
|
||||
def test_tlog_derived_clock_time_ns_reflects_last_advance() -> None:
|
||||
# Arrange
|
||||
clock = TlogDerivedClock([42])
|
||||
# Act
|
||||
before = clock.time_ns()
|
||||
clock.monotonic_ns()
|
||||
after = clock.time_ns()
|
||||
# Assert
|
||||
assert (before, after) == (0, 42)
|
||||
|
||||
|
||||
def test_tlog_derived_clock_raises_on_non_monotonic_source() -> None:
|
||||
# Arrange
|
||||
clock = TlogDerivedClock([10, 5])
|
||||
clock.monotonic_ns()
|
||||
# Act + Assert
|
||||
with pytest.raises(ClockOrderingError):
|
||||
clock.monotonic_ns()
|
||||
|
||||
|
||||
def test_tlog_derived_clock_sleep_until_ns_is_noop() -> None:
|
||||
# Arrange
|
||||
clock = TlogDerivedClock([1])
|
||||
start = time.monotonic_ns()
|
||||
# Act
|
||||
clock.sleep_until_ns(10**18) # absurdly far in the future
|
||||
elapsed_ns = time.monotonic_ns() - start
|
||||
# Assert
|
||||
assert elapsed_ns < 5_000_000 # < 5 ms
|
||||
|
||||
|
||||
def test_tlog_derived_clock_accepts_callable_source() -> None:
|
||||
# Arrange
|
||||
counter = {"i": 0}
|
||||
|
||||
def source() -> int:
|
||||
counter["i"] += 1
|
||||
return counter["i"] * 1_000_000
|
||||
|
||||
clock = TlogDerivedClock(source)
|
||||
# Act
|
||||
a = clock.monotonic_ns()
|
||||
b = clock.monotonic_ns()
|
||||
# Assert
|
||||
assert (a, b) == (1_000_000, 2_000_000)
|
||||
|
||||
|
||||
def test_tlog_derived_clock_returns_last_value_when_source_exhausted() -> None:
|
||||
# Arrange
|
||||
clock = TlogDerivedClock([5])
|
||||
# Act
|
||||
first = clock.monotonic_ns()
|
||||
second = clock.monotonic_ns()
|
||||
# Assert — exhausted source returns the latched value, not an error
|
||||
assert (first, second) == (5, 5)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Composition-root factory (build_clock).
|
||||
|
||||
|
||||
def test_build_clock_wall_returns_wall_clock() -> None:
|
||||
# Assert
|
||||
assert isinstance(build_clock(kind="wall"), WallClock)
|
||||
|
||||
|
||||
def test_build_clock_tlog_returns_tlog_derived_clock() -> None:
|
||||
# Assert
|
||||
assert isinstance(build_clock(kind="tlog", source=[1, 2]), TlogDerivedClock)
|
||||
|
||||
|
||||
def test_build_clock_rejects_unknown_kind() -> None:
|
||||
# Act + Assert
|
||||
with pytest.raises(ValueError, match="unknown kind"):
|
||||
build_clock(kind="invalid") # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_build_clock_wall_rejects_source() -> None:
|
||||
with pytest.raises(ValueError, match="source must be None"):
|
||||
build_clock(kind="wall", source=[1])
|
||||
|
||||
|
||||
def test_build_clock_tlog_requires_source() -> None:
|
||||
with pytest.raises(ValueError, match="source is required"):
|
||||
build_clock(kind="tlog")
|
||||
@@ -0,0 +1,303 @@
|
||||
"""AZ-398 — :class:`FrameSource` Protocol conformance + concrete strategy ACs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||
from gps_denied_onboard.frame_source import (
|
||||
FrameSource,
|
||||
FrameSourceConfigError,
|
||||
)
|
||||
from gps_denied_onboard.frame_source.live_camera import LiveCameraFrameSource
|
||||
from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers.
|
||||
|
||||
|
||||
def _make_synthetic_video(path: Path, n_frames: int, fps: int = 30) -> None:
|
||||
"""Write an ``n_frames``-frame 64×48 BGR MP4V at ``path``."""
|
||||
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
||||
writer = cv2.VideoWriter(str(path), fourcc, fps, (64, 48))
|
||||
if not writer.isOpened():
|
||||
raise RuntimeError(f"OpenCV could not open writer at {path!s}")
|
||||
try:
|
||||
for i in range(n_frames):
|
||||
frame = np.full((48, 64, 3), i % 256, dtype=np.uint8)
|
||||
writer.write(frame)
|
||||
finally:
|
||||
writer.release()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def video_path_60(tmp_path: Path) -> Path:
|
||||
"""A synthetic 60-frame .mp4 file for AC-2."""
|
||||
path = tmp_path / "az398_synthetic.mp4"
|
||||
_make_synthetic_video(path, n_frames=60)
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "ON")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def disable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def disable_live_flag(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setenv("BUILD_LIVE_CAMERA_FRAME_SOURCE", "OFF")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-1 — Protocol conformance.
|
||||
|
||||
|
||||
def test_video_file_frame_source_satisfies_frame_source_protocol(
|
||||
enable_video_flag: None, video_path_60: Path
|
||||
) -> None:
|
||||
# Arrange + Act
|
||||
source = VideoFileFrameSource(
|
||||
path=video_path_60,
|
||||
camera_calibration_id="az398-synth",
|
||||
clock=WallClock(),
|
||||
)
|
||||
try:
|
||||
# Assert
|
||||
assert isinstance(source, FrameSource)
|
||||
finally:
|
||||
source.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-2 — VideoFileFrameSource produces 60 ordered frames + idempotent EOS.
|
||||
|
||||
|
||||
def test_video_file_frame_source_emits_60_frames_then_none(
|
||||
enable_video_flag: None, video_path_60: Path
|
||||
) -> None:
|
||||
# Arrange
|
||||
source = VideoFileFrameSource(
|
||||
path=video_path_60,
|
||||
camera_calibration_id="az398-synth",
|
||||
clock=WallClock(),
|
||||
)
|
||||
monotonics: list[int] = []
|
||||
try:
|
||||
# Act
|
||||
for _ in range(60):
|
||||
frame = source.next_frame()
|
||||
assert frame is not None
|
||||
monotonics.append(frame.metadata["monotonic_ns"])
|
||||
# AC-2: 61st call → None; subsequent calls also None
|
||||
eos_first = source.next_frame()
|
||||
eos_second = source.next_frame()
|
||||
finally:
|
||||
source.close()
|
||||
# Assert
|
||||
assert eos_first is None
|
||||
assert eos_second is None
|
||||
assert len(monotonics) == 60
|
||||
# Non-decreasing monotonic_ns ordering (Invariant 3 / AC-2)
|
||||
assert all(b >= a for a, b in zip(monotonics, monotonics[1:], strict=False))
|
||||
|
||||
|
||||
def test_video_file_frame_source_emits_frame_id_counter_and_metadata(
|
||||
enable_video_flag: None, video_path_60: Path
|
||||
) -> None:
|
||||
# Arrange
|
||||
source = VideoFileFrameSource(
|
||||
path=video_path_60,
|
||||
camera_calibration_id="az398-synth",
|
||||
clock=WallClock(),
|
||||
)
|
||||
try:
|
||||
# Act
|
||||
first = source.next_frame()
|
||||
second = source.next_frame()
|
||||
finally:
|
||||
source.close()
|
||||
# Assert
|
||||
assert first is not None and second is not None
|
||||
assert first.frame_id == 0
|
||||
assert second.frame_id == 1
|
||||
assert first.camera_calibration_id == "az398-synth"
|
||||
assert first.metadata["source"] == "video_file"
|
||||
assert "monotonic_ns" in first.metadata
|
||||
assert "source_pts_ns" in first.metadata
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-7 — corrupt video file raises FrameSourceConfigError on construction.
|
||||
|
||||
|
||||
def test_video_file_frame_source_rejects_corrupt_file(
|
||||
enable_video_flag: None, tmp_path: Path
|
||||
) -> None:
|
||||
# Arrange
|
||||
corrupt = tmp_path / "garbage.mp4"
|
||||
corrupt.write_bytes(b"not actually mp4 content" * 256)
|
||||
# Act + Assert
|
||||
with pytest.raises(FrameSourceConfigError):
|
||||
VideoFileFrameSource(
|
||||
path=corrupt,
|
||||
camera_calibration_id="az398-corrupt",
|
||||
clock=WallClock(),
|
||||
)
|
||||
|
||||
|
||||
def test_video_file_frame_source_rejects_missing_path(
|
||||
enable_video_flag: None, tmp_path: Path
|
||||
) -> None:
|
||||
# Act + Assert
|
||||
with pytest.raises(FrameSourceConfigError, match="does not exist"):
|
||||
VideoFileFrameSource(
|
||||
path=tmp_path / "missing.mp4",
|
||||
camera_calibration_id="az398-missing",
|
||||
clock=WallClock(),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-8 — Build-flag gating.
|
||||
|
||||
|
||||
def test_video_file_frame_source_refuses_when_build_flag_off(
|
||||
disable_video_flag: None, tmp_path: Path
|
||||
) -> None:
|
||||
# Arrange — create a real file so the gate is exercised before path checks
|
||||
valid = tmp_path / "any.mp4"
|
||||
valid.write_bytes(b"")
|
||||
# Act + Assert
|
||||
with pytest.raises(
|
||||
FrameSourceConfigError, match="BUILD_VIDEO_FILE_FRAME_SOURCE is OFF"
|
||||
):
|
||||
VideoFileFrameSource(
|
||||
path=valid,
|
||||
camera_calibration_id="az398-gate",
|
||||
clock=WallClock(),
|
||||
)
|
||||
|
||||
|
||||
def test_live_camera_frame_source_refuses_when_build_flag_off(
|
||||
disable_live_flag: None,
|
||||
) -> None:
|
||||
# Act + Assert
|
||||
with pytest.raises(
|
||||
FrameSourceConfigError, match="BUILD_LIVE_CAMERA_FRAME_SOURCE is OFF"
|
||||
):
|
||||
LiveCameraFrameSource(
|
||||
device_index=0,
|
||||
camera_calibration_id="az398-live-gate",
|
||||
clock=WallClock(),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-9 — Public API re-exports.
|
||||
|
||||
|
||||
def test_frame_source_public_module_only_exposes_protocol_and_errors() -> None:
|
||||
# Arrange
|
||||
from gps_denied_onboard import frame_source as module
|
||||
|
||||
# Assert — concrete strategies MUST NOT appear in __all__ per AC-9
|
||||
assert "FrameSource" in module.__all__
|
||||
assert "FrameSourceError" in module.__all__
|
||||
assert "FrameSourceConfigError" in module.__all__
|
||||
assert "LiveCameraFrameSource" not in module.__all__
|
||||
assert "VideoFileFrameSource" not in module.__all__
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC-10 — close is idempotent.
|
||||
|
||||
|
||||
def test_video_file_frame_source_close_is_idempotent(
|
||||
enable_video_flag: None, video_path_60: Path
|
||||
) -> None:
|
||||
# Arrange
|
||||
source = VideoFileFrameSource(
|
||||
path=video_path_60,
|
||||
camera_calibration_id="az398-synth",
|
||||
clock=WallClock(),
|
||||
)
|
||||
# Act — closing twice must not raise (AC-10)
|
||||
source.close()
|
||||
source.close()
|
||||
# Assert — next_frame after close returns None, not an exception
|
||||
assert source.next_frame() is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Factory.
|
||||
|
||||
|
||||
def test_build_frame_source_video_file_returns_video_file_source(
|
||||
enable_video_flag: None, video_path_60: Path
|
||||
) -> None:
|
||||
from gps_denied_onboard.runtime_root.frame_source_factory import (
|
||||
build_frame_source,
|
||||
)
|
||||
|
||||
source = build_frame_source(
|
||||
kind="video_file",
|
||||
camera_calibration_id="az398-factory",
|
||||
clock=WallClock(),
|
||||
video_path=video_path_60,
|
||||
)
|
||||
try:
|
||||
# Assert
|
||||
assert isinstance(source, VideoFileFrameSource)
|
||||
finally:
|
||||
source.close()
|
||||
|
||||
|
||||
def test_build_frame_source_rejects_unknown_kind() -> None:
|
||||
from gps_denied_onboard.runtime_root.frame_source_factory import (
|
||||
build_frame_source,
|
||||
)
|
||||
|
||||
with pytest.raises(FrameSourceConfigError, match="unknown kind"):
|
||||
build_frame_source(
|
||||
kind="invalid", # type: ignore[arg-type]
|
||||
camera_calibration_id="x",
|
||||
clock=WallClock(),
|
||||
)
|
||||
|
||||
|
||||
def test_build_frame_source_live_rejects_video_path(enable_video_flag: None) -> None:
|
||||
from gps_denied_onboard.runtime_root.frame_source_factory import (
|
||||
build_frame_source,
|
||||
)
|
||||
|
||||
with pytest.raises(FrameSourceConfigError, match="video_path must be None"):
|
||||
build_frame_source(
|
||||
kind="live",
|
||||
camera_calibration_id="x",
|
||||
clock=WallClock(),
|
||||
device_index=0,
|
||||
video_path="/tmp/whatever.mp4",
|
||||
)
|
||||
|
||||
|
||||
def test_build_frame_source_video_file_requires_video_path() -> None:
|
||||
from gps_denied_onboard.runtime_root.frame_source_factory import (
|
||||
build_frame_source,
|
||||
)
|
||||
|
||||
with pytest.raises(FrameSourceConfigError, match="video_path is required"):
|
||||
build_frame_source(
|
||||
kind="video_file",
|
||||
camera_calibration_id="x",
|
||||
clock=WallClock(),
|
||||
)
|
||||
Reference in New Issue
Block a user