Files
Oleksandr Bezdieniezhnykh 823c0f1b2e [AZ-398] Replay: FrameSource + Clock Protocols + Clock injection
Ship the two Layer-1 cross-cutting Protocols replay mode needs to leave
production C1-C5 components mode-agnostic (Invariant 1) and replay-
deterministic (Invariant 2). Live + replay binaries see the same
interfaces; only the strategy differs.

* Clock Protocol (monotonic_ns / time_ns / sleep_until_ns) +
  WallClock (live + REALTIME replay) + TlogDerivedClock (ASAP replay;
  advance-on-call; non-monotonic source → ClockOrderingError).
* FrameSource Protocol (next_frame -> NavCameraFrame | None / close)
  + LiveCameraFrameSource (cv2.VideoCapture device index) +
  VideoFileFrameSource (cv2.VideoCapture file).
* Build-flag gating: BUILD_VIDEO_FILE_FRAME_SOURCE,
  BUILD_LIVE_CAMERA_FRAME_SOURCE (constructor-time check; Tier-0 OFF
  refuses construction with FrameSourceConfigError).
* Composition-root factories: build_clock + build_frame_source.
* Injected Clock across every component that previously called
  time.monotonic_ns() / time.sleep() directly: c5_state (estimator,
  ESKF, fallback watcher, source-label SM, isam2 handle), c8_fc_adapter
  (inbound MAVLink + MSP2, AP outbound, iNav outbound, QGC GCS),
  c13_fdr writer, c12_operator_tooling httpx flights client. All
  constructors default to WallClock() so existing call sites keep
  live-binary behaviour without a wiring change.
* AC-4 CI guard (tests/_meta/test_no_direct_time_in_components.py)
  AST-scans components/**/*.py for direct time.monotonic_ns /
  time.time_ns / time.sleep references and fails loudly with file:line.
* Conformance + factory tests: tests/unit/clock + tests/unit/frame_source.
* Test fixture updates: FallbackWatcher / SourceLabelStateMachine
  clock_ns is now required (removed time.monotonic_ns default);
  test_az388 patches estimator._clock instead of a module-level time;
  test_az393 ardupilot adapter uses a _FixedClock test double.

Excluded per the task spec: TlogReplayFcAdapter (AZ-399), ReplaySink
(AZ-400), compose_replay (AZ-401), CLI (AZ-402), Docker/CI (AZ-403),
E2E fixture (AZ-404), IMU auto-sync (AZ-405).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 05:10:01 +03:00

304 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AZ-398 — :class:`FrameSource` Protocol conformance + concrete strategy ACs."""
from __future__ import annotations
from pathlib import Path
import cv2
import numpy as np
import pytest
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.frame_source import (
FrameSource,
FrameSourceConfigError,
)
from gps_denied_onboard.frame_source.live_camera import LiveCameraFrameSource
from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource
# ---------------------------------------------------------------------------
# Helpers.
def _make_synthetic_video(path: Path, n_frames: int, fps: int = 30) -> None:
"""Write an ``n_frames``-frame 64×48 BGR MP4V at ``path``."""
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(path), fourcc, fps, (64, 48))
if not writer.isOpened():
raise RuntimeError(f"OpenCV could not open writer at {path!s}")
try:
for i in range(n_frames):
frame = np.full((48, 64, 3), i % 256, dtype=np.uint8)
writer.write(frame)
finally:
writer.release()
@pytest.fixture
def video_path_60(tmp_path: Path) -> Path:
"""A synthetic 60-frame .mp4 file for AC-2."""
path = tmp_path / "az398_synthetic.mp4"
_make_synthetic_video(path, n_frames=60)
return path
@pytest.fixture
def enable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "ON")
@pytest.fixture
def disable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF")
@pytest.fixture
def disable_live_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_LIVE_CAMERA_FRAME_SOURCE", "OFF")
# ---------------------------------------------------------------------------
# AC-1 — Protocol conformance.
def test_video_file_frame_source_satisfies_frame_source_protocol(
enable_video_flag: None, video_path_60: Path
) -> None:
# Arrange + Act
source = VideoFileFrameSource(
path=video_path_60,
camera_calibration_id="az398-synth",
clock=WallClock(),
)
try:
# Assert
assert isinstance(source, FrameSource)
finally:
source.close()
# ---------------------------------------------------------------------------
# AC-2 — VideoFileFrameSource produces 60 ordered frames + idempotent EOS.
def test_video_file_frame_source_emits_60_frames_then_none(
enable_video_flag: None, video_path_60: Path
) -> None:
# Arrange
source = VideoFileFrameSource(
path=video_path_60,
camera_calibration_id="az398-synth",
clock=WallClock(),
)
monotonics: list[int] = []
try:
# Act
for _ in range(60):
frame = source.next_frame()
assert frame is not None
monotonics.append(frame.metadata["monotonic_ns"])
# AC-2: 61st call → None; subsequent calls also None
eos_first = source.next_frame()
eos_second = source.next_frame()
finally:
source.close()
# Assert
assert eos_first is None
assert eos_second is None
assert len(monotonics) == 60
# Non-decreasing monotonic_ns ordering (Invariant 3 / AC-2)
assert all(b >= a for a, b in zip(monotonics, monotonics[1:], strict=False))
def test_video_file_frame_source_emits_frame_id_counter_and_metadata(
enable_video_flag: None, video_path_60: Path
) -> None:
# Arrange
source = VideoFileFrameSource(
path=video_path_60,
camera_calibration_id="az398-synth",
clock=WallClock(),
)
try:
# Act
first = source.next_frame()
second = source.next_frame()
finally:
source.close()
# Assert
assert first is not None and second is not None
assert first.frame_id == 0
assert second.frame_id == 1
assert first.camera_calibration_id == "az398-synth"
assert first.metadata["source"] == "video_file"
assert "monotonic_ns" in first.metadata
assert "source_pts_ns" in first.metadata
# ---------------------------------------------------------------------------
# AC-7 — corrupt video file raises FrameSourceConfigError on construction.
def test_video_file_frame_source_rejects_corrupt_file(
enable_video_flag: None, tmp_path: Path
) -> None:
# Arrange
corrupt = tmp_path / "garbage.mp4"
corrupt.write_bytes(b"not actually mp4 content" * 256)
# Act + Assert
with pytest.raises(FrameSourceConfigError):
VideoFileFrameSource(
path=corrupt,
camera_calibration_id="az398-corrupt",
clock=WallClock(),
)
def test_video_file_frame_source_rejects_missing_path(
enable_video_flag: None, tmp_path: Path
) -> None:
# Act + Assert
with pytest.raises(FrameSourceConfigError, match="does not exist"):
VideoFileFrameSource(
path=tmp_path / "missing.mp4",
camera_calibration_id="az398-missing",
clock=WallClock(),
)
# ---------------------------------------------------------------------------
# AC-8 — Build-flag gating.
def test_video_file_frame_source_refuses_when_build_flag_off(
disable_video_flag: None, tmp_path: Path
) -> None:
# Arrange — create a real file so the gate is exercised before path checks
valid = tmp_path / "any.mp4"
valid.write_bytes(b"")
# Act + Assert
with pytest.raises(
FrameSourceConfigError, match="BUILD_VIDEO_FILE_FRAME_SOURCE is OFF"
):
VideoFileFrameSource(
path=valid,
camera_calibration_id="az398-gate",
clock=WallClock(),
)
def test_live_camera_frame_source_refuses_when_build_flag_off(
disable_live_flag: None,
) -> None:
# Act + Assert
with pytest.raises(
FrameSourceConfigError, match="BUILD_LIVE_CAMERA_FRAME_SOURCE is OFF"
):
LiveCameraFrameSource(
device_index=0,
camera_calibration_id="az398-live-gate",
clock=WallClock(),
)
# ---------------------------------------------------------------------------
# AC-9 — Public API re-exports.
def test_frame_source_public_module_only_exposes_protocol_and_errors() -> None:
# Arrange
from gps_denied_onboard import frame_source as module
# Assert — concrete strategies MUST NOT appear in __all__ per AC-9
assert "FrameSource" in module.__all__
assert "FrameSourceError" in module.__all__
assert "FrameSourceConfigError" in module.__all__
assert "LiveCameraFrameSource" not in module.__all__
assert "VideoFileFrameSource" not in module.__all__
# ---------------------------------------------------------------------------
# AC-10 — close is idempotent.
def test_video_file_frame_source_close_is_idempotent(
enable_video_flag: None, video_path_60: Path
) -> None:
# Arrange
source = VideoFileFrameSource(
path=video_path_60,
camera_calibration_id="az398-synth",
clock=WallClock(),
)
# Act — closing twice must not raise (AC-10)
source.close()
source.close()
# Assert — next_frame after close returns None, not an exception
assert source.next_frame() is None
# ---------------------------------------------------------------------------
# Factory.
def test_build_frame_source_video_file_returns_video_file_source(
enable_video_flag: None, video_path_60: Path
) -> None:
from gps_denied_onboard.runtime_root.frame_source_factory import (
build_frame_source,
)
source = build_frame_source(
kind="video_file",
camera_calibration_id="az398-factory",
clock=WallClock(),
video_path=video_path_60,
)
try:
# Assert
assert isinstance(source, VideoFileFrameSource)
finally:
source.close()
def test_build_frame_source_rejects_unknown_kind() -> None:
from gps_denied_onboard.runtime_root.frame_source_factory import (
build_frame_source,
)
with pytest.raises(FrameSourceConfigError, match="unknown kind"):
build_frame_source(
kind="invalid", # type: ignore[arg-type]
camera_calibration_id="x",
clock=WallClock(),
)
def test_build_frame_source_live_rejects_video_path(enable_video_flag: None) -> None:
from gps_denied_onboard.runtime_root.frame_source_factory import (
build_frame_source,
)
with pytest.raises(FrameSourceConfigError, match="video_path must be None"):
build_frame_source(
kind="live",
camera_calibration_id="x",
clock=WallClock(),
device_index=0,
video_path="/tmp/whatever.mp4",
)
def test_build_frame_source_video_file_requires_video_path() -> None:
from gps_denied_onboard.runtime_root.frame_source_factory import (
build_frame_source,
)
with pytest.raises(FrameSourceConfigError, match="video_path is required"):
build_frame_source(
kind="video_file",
camera_calibration_id="x",
clock=WallClock(),
)