mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:31:13 +00:00
823c0f1b2e
Ship the two Layer-1 cross-cutting Protocols replay mode needs to leave production C1-C5 components mode-agnostic (Invariant 1) and replay- deterministic (Invariant 2). Live + replay binaries see the same interfaces; only the strategy differs. * Clock Protocol (monotonic_ns / time_ns / sleep_until_ns) + WallClock (live + REALTIME replay) + TlogDerivedClock (ASAP replay; advance-on-call; non-monotonic source → ClockOrderingError). * FrameSource Protocol (next_frame -> NavCameraFrame | None / close) + LiveCameraFrameSource (cv2.VideoCapture device index) + VideoFileFrameSource (cv2.VideoCapture file). * Build-flag gating: BUILD_VIDEO_FILE_FRAME_SOURCE, BUILD_LIVE_CAMERA_FRAME_SOURCE (constructor-time check; Tier-0 OFF refuses construction with FrameSourceConfigError). * Composition-root factories: build_clock + build_frame_source. * Injected Clock across every component that previously called time.monotonic_ns() / time.sleep() directly: c5_state (estimator, ESKF, fallback watcher, source-label SM, isam2 handle), c8_fc_adapter (inbound MAVLink + MSP2, AP outbound, iNav outbound, QGC GCS), c13_fdr writer, c12_operator_tooling httpx flights client. All constructors default to WallClock() so existing call sites keep live-binary behaviour without a wiring change. * AC-4 CI guard (tests/_meta/test_no_direct_time_in_components.py) AST-scans components/**/*.py for direct time.monotonic_ns / time.time_ns / time.sleep references and fails loudly with file:line. * Conformance + factory tests: tests/unit/clock + tests/unit/frame_source. * Test fixture updates: FallbackWatcher / SourceLabelStateMachine clock_ns is now required (removed time.monotonic_ns default); test_az388 patches estimator._clock instead of a module-level time; test_az393 ardupilot adapter uses a _FixedClock test double. Excluded per the task spec: TlogReplayFcAdapter (AZ-399), ReplaySink (AZ-400), compose_replay (AZ-401), CLI (AZ-402), Docker/CI (AZ-403), E2E fixture (AZ-404), IMU auto-sync (AZ-405). Co-authored-by: Cursor <cursoragent@cursor.com>
106 lines
3.7 KiB
Python
106 lines
3.7 KiB
Python
"""AC-4 — components MUST NOT call ``time.monotonic_ns`` / ``time.time_ns`` / ``time.sleep``.
|
|
|
|
Enforces Invariant 2 of the replay contract
|
|
(``_docs/02_document/contracts/replay/replay_protocol.md``): every
|
|
time-driven code path in a C* component consumes an injected
|
|
:class:`Clock` instead. Replay determinism (R-DEMO-4) collapses the
|
|
moment a component reaches into the stdlib ``time`` module directly,
|
|
so this guard runs on every PR touching ``src/gps_denied_onboard/components/``.
|
|
|
|
The scan is AST-based — docstrings and comments mentioning the forbidden
|
|
APIs do NOT trip it; only call sites and attribute references do.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
_FORBIDDEN_ATTRS: frozenset[str] = frozenset(
|
|
{"monotonic_ns", "time_ns", "sleep"}
|
|
)
|
|
|
|
_COMPONENTS_ROOT: Path = (
|
|
Path(__file__).parent.parent.parent
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "components"
|
|
)
|
|
|
|
|
|
def _python_files_under(root: Path) -> list[Path]:
|
|
return sorted(p for p in root.rglob("*.py") if p.is_file())
|
|
|
|
|
|
def _find_direct_time_references(source: str) -> list[tuple[int, str]]:
|
|
"""Return ``(lineno, attribute_name)`` for every direct ``time.X`` ref.
|
|
|
|
Only flags ``ast.Attribute(value=ast.Name(id='time'), attr=<x>)``
|
|
where ``<x>`` is one of the forbidden names. Aliased imports
|
|
(``import time as t`` → ``t.monotonic_ns()``) are intentionally NOT
|
|
caught — the component code convention is to avoid such aliases, and
|
|
catching them would require flow-sensitive analysis.
|
|
"""
|
|
hits: list[tuple[int, str]] = []
|
|
tree = ast.parse(source)
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.Attribute):
|
|
continue
|
|
if not isinstance(node.value, ast.Name):
|
|
continue
|
|
if node.value.id != "time":
|
|
continue
|
|
if node.attr in _FORBIDDEN_ATTRS:
|
|
hits.append((node.lineno, f"time.{node.attr}"))
|
|
return hits
|
|
|
|
|
|
def test_components_have_no_direct_time_references() -> None:
|
|
# Arrange
|
|
files = _python_files_under(_COMPONENTS_ROOT)
|
|
assert files, f"AST scan found no .py files under {_COMPONENTS_ROOT}"
|
|
offences: list[str] = []
|
|
# Act
|
|
for file in files:
|
|
source = file.read_text(encoding="utf-8")
|
|
for lineno, attr in _find_direct_time_references(source):
|
|
rel = file.relative_to(_COMPONENTS_ROOT.parent.parent.parent.parent)
|
|
offences.append(f"{rel}:{lineno} — {attr}")
|
|
# Assert
|
|
assert not offences, (
|
|
"Invariant 2 violation: direct stdlib-`time` references found in "
|
|
"`src/gps_denied_onboard/components/**/*.py`. Consume an injected "
|
|
"`Clock` (`gps_denied_onboard.clock`) instead.\n"
|
|
+ "\n".join(offences)
|
|
)
|
|
|
|
|
|
def test_scan_helper_detects_known_forbidden_pattern() -> None:
|
|
# Arrange — self-check the AST helper so a stale scan can't silently pass.
|
|
source = "import time\ndef f() -> int:\n return time.monotonic_ns()\n"
|
|
# Act
|
|
hits = _find_direct_time_references(source)
|
|
# Assert
|
|
assert hits == [(3, "time.monotonic_ns")]
|
|
|
|
|
|
def test_scan_helper_ignores_docstring_mentions() -> None:
|
|
# Arrange — docstrings naming the forbidden API must not trip the scan.
|
|
source = '"""This module talks about time.monotonic_ns in prose only."""\n'
|
|
# Act
|
|
hits = _find_direct_time_references(source)
|
|
# Assert
|
|
assert hits == []
|
|
|
|
|
|
@pytest.mark.parametrize("forbidden", sorted(_FORBIDDEN_ATTRS))
|
|
def test_scan_helper_detects_each_forbidden_attr(forbidden: str) -> None:
|
|
# Arrange
|
|
source = f"import time\ntime.{forbidden}()\n"
|
|
# Act
|
|
hits = _find_direct_time_references(source)
|
|
# Assert
|
|
assert hits == [(2, f"time.{forbidden}")]
|