Files
gps-denied-onboard/tests/unit/clock/test_protocol_conformance.py
Oleksandr Bezdieniezhnykh 823c0f1b2e [AZ-398] Replay: FrameSource + Clock Protocols + Clock injection
Ship the two Layer-1 cross-cutting Protocols replay mode needs to leave
production C1-C5 components mode-agnostic (Invariant 1) and replay-
deterministic (Invariant 2). Live + replay binaries see the same
interfaces; only the strategy differs.

* Clock Protocol (monotonic_ns / time_ns / sleep_until_ns) +
  WallClock (live + REALTIME replay) + TlogDerivedClock (ASAP replay;
  advance-on-call; non-monotonic source → ClockOrderingError).
* FrameSource Protocol (next_frame -> NavCameraFrame | None / close)
  + LiveCameraFrameSource (cv2.VideoCapture device index) +
  VideoFileFrameSource (cv2.VideoCapture file).
* Build-flag gating: BUILD_VIDEO_FILE_FRAME_SOURCE,
  BUILD_LIVE_CAMERA_FRAME_SOURCE (constructor-time check; Tier-0 OFF
  refuses construction with FrameSourceConfigError).
* Composition-root factories: build_clock + build_frame_source.
* Injected Clock across every component that previously called
  time.monotonic_ns() / time.sleep() directly: c5_state (estimator,
  ESKF, fallback watcher, source-label SM, isam2 handle), c8_fc_adapter
  (inbound MAVLink + MSP2, AP outbound, iNav outbound, QGC GCS),
  c13_fdr writer, c12_operator_tooling httpx flights client. All
  constructors default to WallClock() so existing call sites keep
  live-binary behaviour without a wiring change.
* AC-4 CI guard (tests/_meta/test_no_direct_time_in_components.py)
  AST-scans components/**/*.py for direct time.monotonic_ns /
  time.time_ns / time.sleep references and fails loudly with file:line.
* Conformance + factory tests: tests/unit/clock + tests/unit/frame_source.
* Test fixture updates: FallbackWatcher / SourceLabelStateMachine
  clock_ns is now required (removed time.monotonic_ns default);
  test_az388 patches estimator._clock instead of a module-level time;
  test_az393 ardupilot adapter uses a _FixedClock test double.

Excluded per the task spec: TlogReplayFcAdapter (AZ-399), ReplaySink
(AZ-400), compose_replay (AZ-401), CLI (AZ-402), Docker/CI (AZ-403),
E2E fixture (AZ-404), IMU auto-sync (AZ-405).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 05:10:01 +03:00

181 lines
5.0 KiB
Python

"""AZ-398 — :class:`Clock` Protocol conformance + WallClock parity + TlogDerivedClock semantics."""
from __future__ import annotations
import time
import pytest
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.clock.tlog_derived import (
ClockOrderingError,
TlogDerivedClock,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.runtime_root.clock_factory import build_clock
# ---------------------------------------------------------------------------
# AC-1 — Protocol conformance.
def test_wall_clock_satisfies_clock_protocol() -> None:
# Assert
assert isinstance(WallClock(), Clock)
def test_tlog_derived_clock_satisfies_clock_protocol() -> None:
# Assert
assert isinstance(TlogDerivedClock([1, 2, 3]), Clock)
# ---------------------------------------------------------------------------
# AC-5 — WallClock parity with :mod:`time`.
def test_wall_clock_monotonic_ns_tracks_stdlib() -> None:
# Arrange
clock = WallClock()
# Act
stdlib_before = time.monotonic_ns()
clock_now = clock.monotonic_ns()
stdlib_after = time.monotonic_ns()
# Assert
assert stdlib_before <= clock_now <= stdlib_after
def test_wall_clock_time_ns_tracks_stdlib_within_1ms() -> None:
# Arrange
clock = WallClock()
# Act
stdlib = time.time_ns()
clock_now = clock.time_ns()
# Assert
assert abs(clock_now - stdlib) <= 1_000_000
def test_wall_clock_sleep_until_ns_blocks_for_about_100ms() -> None:
# Arrange
clock = WallClock()
# Act
start = time.monotonic_ns()
target = start + 100_000_000 # 100 ms in the future
clock.sleep_until_ns(target)
elapsed_ns = time.monotonic_ns() - start
# Assert — AC-5 allows ±5 ms slack on a 100 ms sleep
assert 95_000_000 <= elapsed_ns <= 200_000_000
def test_wall_clock_sleep_until_past_target_is_noop() -> None:
# Arrange
clock = WallClock()
past = clock.monotonic_ns() - 10_000_000_000 # 10 s ago
# Act
start = time.monotonic_ns()
clock.sleep_until_ns(past)
elapsed_ns = time.monotonic_ns() - start
# Assert — should return almost immediately (no negative sleep)
assert elapsed_ns < 5_000_000 # < 5 ms
# ---------------------------------------------------------------------------
# AC-6 — TlogDerivedClock advance-on-call semantics.
def test_tlog_derived_clock_advances_per_call() -> None:
# Arrange
clock = TlogDerivedClock([1_000_000, 2_000_000, 3_000_000])
# Act
a = clock.monotonic_ns()
b = clock.monotonic_ns()
c = clock.monotonic_ns()
# Assert
assert (a, b, c) == (1_000_000, 2_000_000, 3_000_000)
def test_tlog_derived_clock_time_ns_reflects_last_advance() -> None:
# Arrange
clock = TlogDerivedClock([42])
# Act
before = clock.time_ns()
clock.monotonic_ns()
after = clock.time_ns()
# Assert
assert (before, after) == (0, 42)
def test_tlog_derived_clock_raises_on_non_monotonic_source() -> None:
# Arrange
clock = TlogDerivedClock([10, 5])
clock.monotonic_ns()
# Act + Assert
with pytest.raises(ClockOrderingError):
clock.monotonic_ns()
def test_tlog_derived_clock_sleep_until_ns_is_noop() -> None:
# Arrange
clock = TlogDerivedClock([1])
start = time.monotonic_ns()
# Act
clock.sleep_until_ns(10**18) # absurdly far in the future
elapsed_ns = time.monotonic_ns() - start
# Assert
assert elapsed_ns < 5_000_000 # < 5 ms
def test_tlog_derived_clock_accepts_callable_source() -> None:
# Arrange
counter = {"i": 0}
def source() -> int:
counter["i"] += 1
return counter["i"] * 1_000_000
clock = TlogDerivedClock(source)
# Act
a = clock.monotonic_ns()
b = clock.monotonic_ns()
# Assert
assert (a, b) == (1_000_000, 2_000_000)
def test_tlog_derived_clock_returns_last_value_when_source_exhausted() -> None:
# Arrange
clock = TlogDerivedClock([5])
# Act
first = clock.monotonic_ns()
second = clock.monotonic_ns()
# Assert — exhausted source returns the latched value, not an error
assert (first, second) == (5, 5)
# ---------------------------------------------------------------------------
# Composition-root factory (build_clock).
def test_build_clock_wall_returns_wall_clock() -> None:
# Assert
assert isinstance(build_clock(kind="wall"), WallClock)
def test_build_clock_tlog_returns_tlog_derived_clock() -> None:
# Assert
assert isinstance(build_clock(kind="tlog", source=[1, 2]), TlogDerivedClock)
def test_build_clock_rejects_unknown_kind() -> None:
# Act + Assert
with pytest.raises(ValueError, match="unknown kind"):
build_clock(kind="invalid") # type: ignore[arg-type]
def test_build_clock_wall_rejects_source() -> None:
with pytest.raises(ValueError, match="source must be None"):
build_clock(kind="wall", source=[1])
def test_build_clock_tlog_requires_source() -> None:
with pytest.raises(ValueError, match="source is required"):
build_clock(kind="tlog")