diff --git a/_docs/02_tasks/todo/AZ-398_replay_frame_source_clock.md b/_docs/02_tasks/done/AZ-398_replay_frame_source_clock.md similarity index 100% rename from _docs/02_tasks/todo/AZ-398_replay_frame_source_clock.md rename to _docs/02_tasks/done/AZ-398_replay_frame_source_clock.md diff --git a/src/gps_denied_onboard/clock/__init__.py b/src/gps_denied_onboard/clock/__init__.py index a1feeba..f124108 100644 --- a/src/gps_denied_onboard/clock/__init__.py +++ b/src/gps_denied_onboard/clock/__init__.py @@ -1,7 +1,12 @@ -"""Clock interface + concrete implementations. +"""``Clock`` Protocol — public surface (AZ-398 v1.0.0). -The interface is bootstrap-stubbed here. `WallClock` (live) and `TlogDerivedClock` -(replay) are owned by AZ-401 (E-DEMO-REPLAY). +Re-exports the Protocol only; concrete strategies (``WallClock``, +``TlogDerivedClock``) are NOT exported via ``__all__`` per AC-9 — +composition-root code imports them from their concrete module paths +so the lazy-import boundary stays explicit. + +Components import :class:`Clock` and accept it via constructor +injection (Invariant 2). """ from gps_denied_onboard.clock.interface import Clock diff --git a/src/gps_denied_onboard/clock/interface.py b/src/gps_denied_onboard/clock/interface.py index 5f0cfac..1ce7567 100644 --- a/src/gps_denied_onboard/clock/interface.py +++ b/src/gps_denied_onboard/clock/interface.py @@ -1,20 +1,70 @@ -"""`Clock` Protocol. +"""``Clock`` Protocol — replay/live-agnostic monotonic + wall-clock time. -R-DEMO-4: production C1-C5 paths bake real-time-cadence assumptions; injected -Clock lets replay mode trip those timers consistently against tlog timestamps. +Frozen at AZ-398 v1.0.0 per the replay contract: +``_docs/02_document/contracts/replay/replay_protocol.md``. -Owned by AZ-401. Bootstrap ships the interface stub. +The Protocol is Layer 1 cross-cutting per ``module-layout.md`` — every +component that previously called :func:`time.monotonic_ns`, +:func:`time.time_ns`, or :func:`time.sleep` MUST consume an injected +:class:`Clock` instead (Invariant 2). The strategy is selected exactly +once at composition time (Invariant — Single Clock per process): + +- **Live / research / operator** binaries inject :class:`WallClock`. +- **Replay** binary injects :class:`TlogDerivedClock` (ASAP) or + :class:`WallClock` (REALTIME pace). + +Mode-specific behaviour lives in the strategy; consumers see only the +``Clock`` interface (R-DEMO-4 mitigation). """ from __future__ import annotations -from datetime import datetime -from typing import Protocol +from typing import Protocol, runtime_checkable +@runtime_checkable class Clock(Protocol): - """A monotonic clock abstraction.""" + """Monotonic + wall-clock + sleep-until abstraction (AZ-398 v1.0.0). - def now(self) -> datetime: ... + All three methods are non-blocking except :meth:`sleep_until_ns`, + which honours the configured replay pace: - def monotonic(self) -> float: ... + - ``WallClock.sleep_until_ns(t)`` blocks until ``time.monotonic_ns()`` + catches up to ``t`` (live + REALTIME replay). + - ``TlogDerivedClock.sleep_until_ns(t)`` is a no-op (ASAP replay). + + Strategies MUST guarantee :meth:`monotonic_ns` is non-decreasing + across calls within the same process (Invariant 3 spirit). + """ + + def monotonic_ns(self) -> int: + """Return the strategy's monotonic time in nanoseconds. + + For :class:`WallClock` this delegates to + :func:`time.monotonic_ns`. For :class:`TlogDerivedClock` this + returns the most recently advanced tlog timestamp (advance-on- + call semantics — see AC-6). + """ + ... + + def time_ns(self) -> int: + """Return the strategy's UTC wall-clock time in nanoseconds. + + Used for log timestamps that need calendar alignment (FDR + records, STATUSTEXT). For :class:`WallClock` this is + :func:`time.time_ns`; for :class:`TlogDerivedClock` this is the + tlog message's wall-clock timestamp (the ``time_unix_usec`` / + ``time_boot_ms`` field, normalised to ns). + """ + ... + + def sleep_until_ns(self, target_ns: int) -> None: + """Block until :meth:`monotonic_ns` would return ``target_ns``. + + Honours ``pace=REALTIME`` by sleeping the wall-clock delta; honours + ``pace=ASAP`` by no-op'ing. ``target_ns`` already in the past is a + no-op (no exception, no negative sleep). The Protocol does not + prescribe spurious-wakeup behaviour; strategies SHOULD use + :func:`time.sleep` (which retries internally on POSIX). + """ + ... diff --git a/src/gps_denied_onboard/clock/tlog_derived.py b/src/gps_denied_onboard/clock/tlog_derived.py new file mode 100644 index 0000000..63d0e4c --- /dev/null +++ b/src/gps_denied_onboard/clock/tlog_derived.py @@ -0,0 +1,92 @@ +"""``TlogDerivedClock`` strategy (AZ-398) — replay-only. + +Advances ``monotonic_ns`` on each call to the next timestamp emitted by +the wrapped tlog-timestamp source. Out-of-order timestamps raise +:class:`ClockOrderingError` (AC-6) — replay determinism is hard-failed, +never silently smoothed. + +The strategy is constructed by the replay composition root (AZ-401) +with a callable that yields tlog timestamps as the parser advances. +For unit tests, an iterator of pre-known timestamps suffices. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Iterator + + +class ClockOrderingError(RuntimeError): + """Raised when the tlog-timestamp source emits a non-monotonic value. + + Replay must be deterministic; a strategy that silently smooths + backward jumps would mask a genuine recording corruption. The error + names the offending pair so the operator can correlate against the + tlog message stream. + """ + + +class TlogDerivedClock: + """Replay :class:`Clock` strategy driven by the tlog timestamp stream. + + The source can be either a callable returning ``int`` ns (typical + when wired against the live tlog parser, AZ-399) or an iterable of + pre-known ``int`` ns values (typical in unit tests). Both are normalised + to an internal :class:`Iterator` lazily. + + Semantics: + + - :meth:`monotonic_ns` pulls the next value from the source on every + call and returns it (advance-on-call). The most-recently-returned + value is cached for :meth:`time_ns` so the two methods stay aligned. + - :meth:`time_ns` returns the latest cached value; if no call to + :meth:`monotonic_ns` has happened yet, it returns 0 (the replay + composition root must pump at least one frame before any consumer + asks for wall-clock time). + - :meth:`sleep_until_ns` is a no-op (``pace=ASAP``). + """ + + __slots__ = ("_source", "_last_ns") + + def __init__( + self, + source: Callable[[], int] | Iterable[int], + ) -> None: + if callable(source): + self._source: Iterator[int] = _iter_from_callable(source) + else: + self._source = iter(source) + self._last_ns = 0 + + def monotonic_ns(self) -> int: + try: + next_ns = next(self._source) + except StopIteration: + return self._last_ns + if next_ns < self._last_ns: + raise ClockOrderingError( + f"TlogDerivedClock: non-monotonic timestamp " + f"{next_ns} ns followed {self._last_ns} ns" + ) + self._last_ns = next_ns + return next_ns + + def time_ns(self) -> int: + return self._last_ns + + def sleep_until_ns(self, target_ns: int) -> None: + """No-op in ASAP pace (Invariant 6).""" + return None + + +def _iter_from_callable(source: Callable[[], int]) -> Iterator[int]: + """Wrap a callable as an iterator that calls it on each ``next()``. + + Used when the tlog parser exposes a ``next_timestamp_ns()``-style + hook; consumers should NOT pass a side-effectful callable that + blocks — the source is expected to be cheap (microsecond-class). + """ + while True: + yield source() + + +__all__ = ["ClockOrderingError", "TlogDerivedClock"] diff --git a/src/gps_denied_onboard/clock/wall_clock.py b/src/gps_denied_onboard/clock/wall_clock.py new file mode 100644 index 0000000..c49bea5 --- /dev/null +++ b/src/gps_denied_onboard/clock/wall_clock.py @@ -0,0 +1,42 @@ +"""``WallClock`` strategy (AZ-398) — live + REALTIME replay. + +Thin :class:`Clock` adapter over the standard-library :mod:`time` +module. Owned by ``clock/`` so the AC-4 AST scan over ``components/`` +remains clean: components MUST NOT call :func:`time.monotonic_ns` +directly; they call :meth:`WallClock.monotonic_ns` via injection. +""" + +from __future__ import annotations + +import time + + +class WallClock: + """Default :class:`Clock` strategy backed by :mod:`time`. + + Stateless; constructable without arguments. All three methods are + trivially Liskov-clean over the Protocol. + """ + + __slots__ = () + + def monotonic_ns(self) -> int: + return time.monotonic_ns() + + def time_ns(self) -> int: + return time.time_ns() + + def sleep_until_ns(self, target_ns: int) -> None: + """Block until ``time.monotonic_ns() >= target_ns``. + + A target already in the past is a no-op. Sub-millisecond + oversleep is accepted (AC-5: ≤ 5 ms drift on a 100 ms sleep). + """ + now = time.monotonic_ns() + delta_ns = target_ns - now + if delta_ns <= 0: + return + time.sleep(delta_ns / 1_000_000_000.0) + + +__all__ = ["WallClock"] diff --git a/src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py b/src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py index 71a2843..87b8edc 100644 --- a/src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py +++ b/src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py @@ -12,13 +12,15 @@ Retry policy (FAC-INV-5): from __future__ import annotations -import time +from collections.abc import Callable from pathlib import Path from typing import Final from uuid import UUID import httpx +from gps_denied_onboard.clock.wall_clock import WallClock + from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt from gps_denied_onboard.components.c12_operator_tooling.flights_api._parser import ( parse_flight_payload, @@ -49,6 +51,18 @@ _REDACTED: Final[str] = "" _RETRY_BACKOFF_S: Final[float] = 1.0 +def _wall_clock_sleep(seconds: float) -> None: + """Default sleep hook — routes through :class:`WallClock`. + + Kept as a module-level function (not a lambda or closure) so the + AC-4 AST scan over ``components/`` never sees a bare stdlib + ``time``-module sleep reference. Tests inject their own ``sleep`` + callable to skip the backoff. + """ + clock = WallClock() + clock.sleep_until_ns(clock.monotonic_ns() + int(seconds * 1_000_000_000)) + + class HttpxFlightsApiClient: """Concrete :class:`FlightsApiClient` against the parent-suite ``flights`` REST API. @@ -64,10 +78,12 @@ class HttpxFlightsApiClient: self, *, transport: httpx.BaseTransport | None = None, - sleep: object = time.sleep, + sleep: Callable[[float], None] | None = None, ) -> None: self._transport = transport - self._sleep = sleep + self._sleep: Callable[[float], None] = ( + sleep if sleep is not None else _wall_clock_sleep + ) self._log = get_logger("c12.flights_api") def fetch_flight( @@ -162,7 +178,7 @@ class HttpxFlightsApiClient: }, }, ) - self._sleep(_RETRY_BACKOFF_S) # type: ignore[operator] + self._sleep(_RETRY_BACKOFF_S) try: response = client.get(url, headers=headers) except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as exc: diff --git a/src/gps_denied_onboard/components/c13_fdr/writer.py b/src/gps_denied_onboard/components/c13_fdr/writer.py index bdd0eb9..f095da4 100644 --- a/src/gps_denied_onboard/components/c13_fdr/writer.py +++ b/src/gps_denied_onboard/components/c13_fdr/writer.py @@ -29,8 +29,10 @@ from collections.abc import Callable, Sequence from dataclasses import asdict from datetime import datetime, timezone from pathlib import Path +from typing import TYPE_CHECKING from uuid import UUID +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c13_fdr.errors import ( FdrAlreadyClosedError, FdrCloseWithoutOpenError, @@ -53,6 +55,9 @@ from gps_denied_onboard.fdr_client.records import ( ) from gps_denied_onboard.logging import get_logger +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + __all__ = ["FileFdrWriter"] _FLIGHT_HEADER_KIND = "flight_header" @@ -97,6 +102,7 @@ class FileFdrWriter: on_rotation: Callable[[FileFdrWriter, int], None] | None = None, record_kind_policy: RecordKindPolicy | None = None, drain_sleep_s: float = _DEFAULT_DRAIN_SLEEP_S, + clock: Clock | None = None, ) -> None: self._flight_root = Path(flight_root) self._flight_id = flight_id @@ -106,6 +112,7 @@ class FileFdrWriter: self._on_rotation = on_rotation self._record_kind_policy = record_kind_policy self._drain_sleep_s = drain_sleep_s + self._clock: Clock = clock if clock is not None else WallClock() # Filesystem state. self._flight_dir: Path = self._flight_root / str(flight_id) @@ -312,7 +319,7 @@ class FileFdrWriter: # iterate until the value is stable. Practically this converges # in one or two passes. ts = _iso_now() - mono_ns = time.monotonic_ns() + mono_ns = self._clock.monotonic_ns() records_written_now = self._records_written + 1 # +1 for the footer itself bytes_estimate = self._bytes_written footer: FlightFooter | None = None diff --git a/src/gps_denied_onboard/components/c5_state/_fallback_watcher.py b/src/gps_denied_onboard/components/c5_state/_fallback_watcher.py index 7619a83..4dca2c0 100644 --- a/src/gps_denied_onboard/components/c5_state/_fallback_watcher.py +++ b/src/gps_denied_onboard/components/c5_state/_fallback_watcher.py @@ -37,7 +37,6 @@ transitions. from __future__ import annotations import threading -import time from collections.abc import Callable from datetime import datetime, timezone from typing import TYPE_CHECKING, Final, Protocol, runtime_checkable @@ -107,8 +106,8 @@ class FallbackWatcher: *, threshold_s: float, fdr_client: FdrClient | None, + clock_ns: Callable[[], int], producer_id: str = "c5_state", - clock_ns: Callable[[], int] = time.monotonic_ns, ) -> None: if threshold_s <= 0.0: raise ValueError(f"FallbackWatcher.threshold_s must be > 0; got {threshold_s}") diff --git a/src/gps_denied_onboard/components/c5_state/_isam2_handle.py b/src/gps_denied_onboard/components/c5_state/_isam2_handle.py index f29547d..82d28f1 100644 --- a/src/gps_denied_onboard/components/c5_state/_isam2_handle.py +++ b/src/gps_denied_onboard/components/c5_state/_isam2_handle.py @@ -18,7 +18,6 @@ defensive trace. from __future__ import annotations -import time from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable import gtsam @@ -205,5 +204,10 @@ class ISam2GraphHandleImpl(ISam2GraphHandle): anchor (``_last_anchor_ns`` is initialised to 0 in the estimator constructor). This matches the C5 contract's documented "no anchor yet" sentinel. + + Reads the estimator's injected :class:`Clock` so replay / + unit-test runs see deterministic age values. """ - return (time.monotonic_ns() - self._estimator._last_anchor_ns) // 1_000_000 + return ( + self._estimator._clock.monotonic_ns() - self._estimator._last_anchor_ns + ) // 1_000_000 diff --git a/src/gps_denied_onboard/components/c5_state/_source_label_sm.py b/src/gps_denied_onboard/components/c5_state/_source_label_sm.py index fd0f5c2..43b58d7 100644 --- a/src/gps_denied_onboard/components/c5_state/_source_label_sm.py +++ b/src/gps_denied_onboard/components/c5_state/_source_label_sm.py @@ -35,7 +35,6 @@ matrix simpler. from __future__ import annotations import threading -import time from collections.abc import Callable from datetime import datetime, timezone from typing import TYPE_CHECKING, Final, Protocol, runtime_checkable @@ -154,8 +153,8 @@ class SourceLabelStateMachine: spoof_promotion_visual_consistency_tol_m: float, spoof_promotion_bounded_delta_m: float, fdr_client: FdrClient | None, + clock_ns: Callable[[], int], producer_id: str = "c5_state", - clock_ns: Callable[[], int] = time.monotonic_ns, ) -> None: if spoof_promotion_min_stable_s <= 0.0: raise ValueError( diff --git a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py index f36b65d..d6b51cd 100644 --- a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py +++ b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py @@ -47,7 +47,6 @@ filter; this module documents the deviation in the from __future__ import annotations import math -import time from collections import deque from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final, Literal @@ -57,6 +56,7 @@ import numpy as np from numpy.linalg import LinAlgError from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard._types.state import ( EstimatorHealth, EstimatorOutput, @@ -89,9 +89,9 @@ from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: from gps_denied_onboard._types.fc import GpsHealth, GpsSample - from gps_denied_onboard._types.nav import ImuWindow + from gps_denied_onboard._types.nav import ImuWindow, VioOutput from gps_denied_onboard._types.pose import PoseEstimate - from gps_denied_onboard._types.vio import VioOutput + from gps_denied_onboard.clock import Clock from gps_denied_onboard.config import Config __all__ = [ @@ -162,6 +162,7 @@ class EskfStateEstimator(StateEstimator): se3_utils: Any, wgs_converter: Any, fdr_client: Any, + clock: Clock | None = None, ) -> None: block = self._extract_block(config) self._config: Config = config @@ -170,6 +171,7 @@ class EskfStateEstimator(StateEstimator): self._se3_utils: Any = se3_utils self._wgs_converter: Any = wgs_converter self._fdr_client: Any = fdr_client + self._clock: Clock = clock if clock is not None else WallClock() self._log = get_logger("c5_state.eskf_baseline") self._nominal_pos: np.ndarray = np.zeros(3, dtype=np.float64) @@ -215,6 +217,7 @@ class EskfStateEstimator(StateEstimator): spoof_promotion_visual_consistency_tol_m=block.spoof_promotion_visual_consistency_tol_m, spoof_promotion_bounded_delta_m=block.spoof_promotion_bounded_delta_m, fdr_client=fdr_client, + clock_ns=self._clock.monotonic_ns, producer_id="c5_state", ) @@ -222,6 +225,7 @@ class EskfStateEstimator(StateEstimator): self._fallback = FallbackWatcher( threshold_s=block.no_estimate_fallback_s, fdr_client=fdr_client, + clock_ns=self._clock.monotonic_ns, producer_id="c5_state", ) @@ -538,7 +542,7 @@ class EskfStateEstimator(StateEstimator): # Both modes are treated identically by the ESKF — the # JACOBIAN exclusion is iSAM2-graph-specific. AC-4. - self._last_anchor_ns = time.monotonic_ns() + self._last_anchor_ns = self._clock.monotonic_ns() residual_pos = meas_pose[:3, 3] - self._nominal_pos meas_R = meas_pose[:3, :3] @@ -612,7 +616,7 @@ class EskfStateEstimator(StateEstimator): def current_estimate(self) -> EstimatorOutput: """Forward-time estimate. ``smoothed=False`` (Invariant 7).""" - now_ns = time.monotonic_ns() + now_ns = self._clock.monotonic_ns() self._fallback.check_and_engage(now_ns) cov6 = self._pose_covariance_6x6() @@ -629,7 +633,7 @@ class EskfStateEstimator(StateEstimator): ) raise - emitted_at = time.monotonic_ns() + emitted_at = self._clock.monotonic_ns() position_wgs84 = self._enu_pose_to_wgs84() orientation = _quat_to_quat_dto(self._nominal_q) velocity_world = ( @@ -864,7 +868,7 @@ class EskfStateEstimator(StateEstimator): return try: machine.notify_satellite_anchor( - now_ns=time.monotonic_ns(), + now_ns=self._clock.monotonic_ns(), gps_consistency_delta_m=None, ) except Exception as exc: diff --git a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py index 684e60a..6458514 100644 --- a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py +++ b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py @@ -31,7 +31,6 @@ there. from __future__ import annotations import math -import time from collections import deque from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final, Literal @@ -43,6 +42,7 @@ import numpy as np from numpy.linalg import LinAlgError from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard._types.state import ( EstimatorHealth, EstimatorOutput, @@ -79,9 +79,9 @@ from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: from gps_denied_onboard._types.fc import GpsHealth, GpsSample - from gps_denied_onboard._types.nav import ImuWindow + from gps_denied_onboard._types.nav import ImuWindow, VioOutput from gps_denied_onboard._types.pose import PoseEstimate - from gps_denied_onboard._types.vio import VioOutput + from gps_denied_onboard.clock import Clock from gps_denied_onboard.config import Config __all__ = [ @@ -148,6 +148,7 @@ class GtsamIsam2StateEstimator(StateEstimator): se3_utils: Any, wgs_converter: Any, fdr_client: Any, + clock: Clock | None = None, ) -> None: block = self._extract_block(config) @@ -157,6 +158,7 @@ class GtsamIsam2StateEstimator(StateEstimator): self._se3_utils: Any = se3_utils self._wgs_converter: Any = wgs_converter self._fdr_client: Any = fdr_client + self._clock: Clock = clock if clock is not None else WallClock() self._isam2 = gtsam.ISAM2(gtsam.ISAM2Params()) window_seconds: float = block.keyframe_window_size * _FRAME_PERIOD_S @@ -224,6 +226,7 @@ class GtsamIsam2StateEstimator(StateEstimator): spoof_promotion_visual_consistency_tol_m=block.spoof_promotion_visual_consistency_tol_m, spoof_promotion_bounded_delta_m=block.spoof_promotion_bounded_delta_m, fdr_client=fdr_client, + clock_ns=self._clock.monotonic_ns, producer_id="c5_state", ) # AC-NEW-8 rolling window of ``(ts_monotonic_ns, cov_norm)`` @@ -255,6 +258,7 @@ class GtsamIsam2StateEstimator(StateEstimator): self._fallback = FallbackWatcher( threshold_s=block.no_estimate_fallback_s, fdr_client=fdr_client, + clock_ns=self._clock.monotonic_ns, producer_id="c5_state", ) @@ -481,7 +485,7 @@ class GtsamIsam2StateEstimator(StateEstimator): # AC-6 / Invariant 11a: do NOT advance ``_last_added_ts_ns`` — # this is a pre-takeoff seed, not a measurement; the first # subsequent ``add_*`` call still sees the unguarded baseline. - ts_ns = time.monotonic_ns() + ts_ns = self._clock.monotonic_ns() try: handle.add_factor(factor) self._values.insert(prior_key, prior_pose) @@ -734,7 +738,7 @@ class GtsamIsam2StateEstimator(StateEstimator): # Both paths update the anchor freshness sentinel. The C5 # contract documents this — even the throttled JACOBIAN path # counts as a recent anchor for AC-1.3 binning. - self._last_anchor_ns = time.monotonic_ns() + self._last_anchor_ns = self._clock.monotonic_ns() if mode == "marginals": gtsam_pose = _pose_se3_to_gtsam(self._pose_estimate_to_matrix(pose)) @@ -923,7 +927,7 @@ class GtsamIsam2StateEstimator(StateEstimator): # AZ-388: AC-5.2 entry hook. Engages fallback if the # threshold has elapsed since the last successful estimate. # Idempotent / rate-limited. - self._fallback.check_and_engage(time.monotonic_ns()) + self._fallback.check_and_engage(self._clock.monotonic_ns()) if self._last_committed_pose_key is None: raise EstimatorFatalError( "current_estimate: no committed pose key yet (graph empty); " @@ -975,7 +979,7 @@ class GtsamIsam2StateEstimator(StateEstimator): velocity_world = self._latest_velocity_or_zero() last_anchor_age_ms = int(handle.last_anchor_age_ms()) source_label = self._derive_source_label() - emitted_at = time.monotonic_ns() + emitted_at = self._clock.monotonic_ns() self._record_cov_norm_sample(emitted_at, covariance) if self._isam2_state == IsamState.INIT: @@ -1063,7 +1067,7 @@ class GtsamIsam2StateEstimator(StateEstimator): last_anchor_age_ms = int(handle.last_anchor_age_ms()) source_label = self._derive_source_label() - emitted_at = time.monotonic_ns() + emitted_at = self._clock.monotonic_ns() out: list[EstimatorOutput] = [] for key, _ts in selected: @@ -1366,7 +1370,7 @@ class GtsamIsam2StateEstimator(StateEstimator): return try: machine.notify_satellite_anchor( - now_ns=time.monotonic_ns(), + now_ns=self._clock.monotonic_ns(), gps_consistency_delta_m=None, ) except Exception as exc: diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py index 606ad38..4ef396c 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py @@ -20,8 +20,7 @@ synchronously without a real serial port. from __future__ import annotations import threading -import time -from typing import Any, Final, Protocol +from typing import TYPE_CHECKING, Any, Final, Protocol from gps_denied_onboard._types.fc import ( AttitudeSample, @@ -34,10 +33,14 @@ from gps_denied_onboard._types.fc import ( TelemetryKind, ) from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing from gps_denied_onboard.logging import get_logger +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + __all__ = [ "AP_MESSAGE_TYPES", "MAVLinkSource", @@ -108,9 +111,11 @@ class PymavlinkInboundDecoder: attitude_ring_capacity: int = 100, gps_ring_capacity: int = 20, state_ring_capacity: int = 10, + clock: Clock | None = None, ) -> None: self._source = source self._bus = bus + self._clock: Clock = clock if clock is not None else WallClock() self._log = get_logger("c8_fc_adapter.inbound_mavlink") self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( imu_ring_capacity, kind_name="imu" @@ -218,7 +223,7 @@ class PymavlinkInboundDecoder: status = self._map_fix_type(fix_type) if status is GpsStatus.STABLE: status = self._maybe_promote_to_spoofed_or_non_spoofed() - captured_at = time.monotonic_ns() + captured_at = self._clock.monotonic_ns() payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at) # AC-5.1: cache warm-start hint on first 3D+ fix. if fix_type >= 3: @@ -232,7 +237,7 @@ class PymavlinkInboundDecoder: return self._dispatch(TelemetryKind.GPS_HEALTH, payload, ring=self.gps_ring) def _handle_heartbeat(self, msg: Any) -> bool: - captured_at = time.monotonic_ns() + captured_at = self._clock.monotonic_ns() state = self._map_mav_state( system_status=int(msg.system_status), base_mode=int(msg.base_mode), @@ -257,7 +262,7 @@ class PymavlinkInboundDecoder: text = text.decode("utf-8", errors="replace") if not any(sentinel.lower() in text.lower() for sentinel in _SPOOFING_SENTINELS): return - captured_at = time.monotonic_ns() + captured_at = self._clock.monotonic_ns() with self._lock: self._spoof_sentinel_seen_at = captured_at self._log.warning( @@ -278,7 +283,7 @@ class PymavlinkInboundDecoder: *, ring: TelemetryRing[FcTelemetryFrame], ) -> bool: - received_at = time.monotonic_ns() + received_at = self._clock.monotonic_ns() last = self._last_ts_ns.get(kind) if last is not None and received_at <= last: self._log.warning( @@ -329,7 +334,7 @@ class PymavlinkInboundDecoder: sentinel_at = self._spoof_sentinel_seen_at if sentinel_at is None: return GpsStatus.STABLE - now = time.monotonic_ns() + now = self._clock.monotonic_ns() if (now - sentinel_at) <= 5 * 1_000_000_000: return GpsStatus.SPOOFED return GpsStatus.STABLE diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py index 9cf9a1f..fa404ac 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py @@ -17,8 +17,7 @@ Tests drive the decoder via :meth:`feed_one_tick` which calls the from __future__ import annotations import threading -import time -from typing import Any, Final, Protocol +from typing import TYPE_CHECKING, Any, Final, Protocol from gps_denied_onboard._types.fc import ( AttitudeSample, @@ -31,10 +30,14 @@ from gps_denied_onboard._types.fc import ( TelemetryKind, ) from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing from gps_denied_onboard.logging import get_logger +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + __all__ = [ "Msp2InavInboundDecoder", "MspSource", @@ -74,9 +77,11 @@ class Msp2InavInboundDecoder: attitude_ring_capacity: int = 100, gps_ring_capacity: int = 20, state_ring_capacity: int = 10, + clock: Clock | None = None, ) -> None: self._source = source self._bus = bus + self._clock: Clock = clock if clock is not None else WallClock() self._log = get_logger("c8_fc_adapter.inbound_msp2") self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( imu_ring_capacity, kind_name="imu" @@ -118,10 +123,16 @@ class Msp2InavInboundDecoder: return dispatched def run_poll_loop(self, *, period_s: float = 0.01) -> None: - """Continuous polling loop; honours :meth:`stop`.""" + """Continuous polling loop; honours :meth:`stop`. + + Sleeps via the injected :class:`Clock` so replay binaries (which + wire a ``TlogDerivedClock``) advance instantly while the live + binary blocks for ``period_s`` between ticks. + """ + period_ns = int(period_s * 1_000_000_000) while not self._stop_flag.is_set(): self.feed_one_tick() - time.sleep(period_s) + self._clock.sleep_until_ns(self._clock.monotonic_ns() + period_ns) def stop(self) -> None: self._stop_flag.set() @@ -142,7 +153,7 @@ class Msp2InavInboundDecoder: raise ValueError( f"iNav IMU dict shape: expected 3-vectors, got accel={accel}, gyro={gyro}" ) - sensor_ts_ns = time.monotonic_ns() + sensor_ts_ns = self._clock.monotonic_ns() payload = ImuTelemetrySample(ts_ns=sensor_ts_ns, accel_xyz=accel, gyro_xyz=gyro) return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring) @@ -157,7 +168,7 @@ class Msp2InavInboundDecoder: roll_rad = float(raw["angx"]) * (3.141592653589793 / 180.0) pitch_rad = float(raw["angy"]) * (3.141592653589793 / 180.0) yaw_rad = float(raw["heading"]) * (3.141592653589793 / 180.0) - sensor_ts_ns = time.monotonic_ns() + sensor_ts_ns = self._clock.monotonic_ns() payload = AttitudeSample( ts_ns=sensor_ts_ns, roll_rad=roll_rad, @@ -180,7 +191,7 @@ class Msp2InavInboundDecoder: status = GpsStatus.DEGRADED else: status = GpsStatus.STABLE - captured_at = time.monotonic_ns() + captured_at = self._clock.monotonic_ns() if fix >= 2: lat_deg = float(raw["lat"]) / 1e7 lon_deg = float(raw["lon"]) / 1e7 @@ -198,7 +209,7 @@ class Msp2InavInboundDecoder: return False # iNav flight-state dict shape (subset we honour): # 'armed': bool, 'in_flight': bool, 'failsafe': bool - captured_at = time.monotonic_ns() + captured_at = self._clock.monotonic_ns() if raw.get("failsafe", False): state = FlightState.FAILED elif raw.get("in_flight", False): @@ -233,7 +244,7 @@ class Msp2InavInboundDecoder: *, ring: TelemetryRing[FcTelemetryFrame], ) -> bool: - received_at = time.monotonic_ns() + received_at = self._clock.monotonic_ns() last = self._last_ts_ns.get(kind) if last is not None and received_at <= last: self._log.warning( diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py index b1b5b29..4f652fc 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py @@ -24,10 +24,9 @@ Build flag: ``BUILD_GCS_QGC_MAVLINK``. from __future__ import annotations import threading -import time from collections.abc import Callable from datetime import datetime, timezone -from typing import Any, Final +from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.fc import ( FcKind, @@ -39,9 +38,13 @@ from gps_denied_onboard._types.fc import ( ) from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import EstimatorOutput +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter.errors import ( GcsAdapterConfigError, @@ -110,14 +113,14 @@ class QgcTelemetryAdapter: wgs_converter: Any, covariance_projector: CovarianceProjector, fdr_client: FdrClient, - clock: Callable[[], float] = time.monotonic, + clock: Clock | None = None, connect_factory: Callable[[str, int], Any] | None = None, ) -> None: self._config = config self._wgs_converter = wgs_converter self._cov_projector = covariance_projector self._fdr_client = fdr_client - self._clock = clock + self._clock: Clock = clock if clock is not None else WallClock() self._connect_factory = connect_factory self._log = get_logger("c8_gcs_adapter.qgc") # The modulo divisor — computed once at construction so unit @@ -333,7 +336,7 @@ class QgcTelemetryAdapter: return OperatorCommand( command=msg_type, payload=payload, - received_at=time.monotonic_ns(), + received_at=self._clock.monotonic_ns(), ) def _record_operator_command_fdr(self, cmd: OperatorCommand, msg: Any) -> None: @@ -374,4 +377,4 @@ class QgcTelemetryAdapter: return wgs def _clock_ms_boot(self) -> int: - return int(self._clock() * 1_000) + return self._clock.monotonic_ns() // 1_000_000 diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py index e6b90f5..af09191 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/msp2_inav_adapter.py @@ -13,9 +13,8 @@ Build flag: ``BUILD_FC_INAV``. from __future__ import annotations import threading -import time from collections.abc import Callable -from typing import Any, Final +from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.emitted import EmittedExternalPosition from gps_denied_onboard._types.fc import ( @@ -29,9 +28,13 @@ from gps_denied_onboard._types.fc import ( ) from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import EstimatorOutput +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c8_fc_adapter._msp2_sensor_gps_encoder import ( MSP2_SENSOR_GPS_CODE, encode_msp2_sensor_gps, @@ -71,7 +74,7 @@ class Msp2InavAdapter: wgs_converter: Any, covariance_projector: CovarianceProjector, fdr_client: FdrClient, - clock: Callable[[], float] = time.monotonic, + clock: Clock | None = None, msp_connect_factory: Callable[[str, int], Any] | None = None, secondary_mavlink_factory: Callable[[], Any] | None = None, ) -> None: @@ -79,7 +82,7 @@ class Msp2InavAdapter: self._wgs_converter = wgs_converter self._cov_projector = covariance_projector self._fdr_client = fdr_client - self._clock = clock + self._clock: Clock = clock if clock is not None else WallClock() self._msp_connect_factory = msp_connect_factory self._secondary_mavlink_factory = secondary_mavlink_factory self._log = get_logger("c8_fc_adapter.inav_adapter") @@ -94,10 +97,12 @@ class Msp2InavAdapter: # polling decoder lands in AZ-391; the per-adapter inbound # composition happens in a follow-up batch). self._bus = SubscriptionBus() - # Provenance rate-limiter for the secondary MAVLink STATUSTEXT. + # Provenance rate-limiter for the secondary MAVLink STATUSTEXT; + # the limiter expects a float-seconds clock, so we wrap the + # injected Clock's ns reading. self._provenance = StatusTextTransitionRateLimiter( send_statustext=self._send_statustext_secondary, - clock=time.monotonic, + clock=lambda: self._clock.monotonic_ns() / 1_000_000_000, ) # ------------------------------------------------------------------ @@ -165,7 +170,7 @@ class Msp2InavAdapter: raise FcEmitError("smoothed output cannot be emitted to FC (Invariant 6)") h_pos_accuracy_mm = self._cov_projector.to_inav_h_pos_accuracy_mm(output) wgs = self._extract_wgs84(output) - emitted_at = time.monotonic_ns() + emitted_at = self._clock.monotonic_ns() self._sequence_number = (self._sequence_number + 1) & 0xFF seq = self._sequence_number payload = encode_msp2_sensor_gps( @@ -227,7 +232,7 @@ class Msp2InavAdapter: state=FlightState.INIT, last_valid_gps_hint_wgs84=None, last_valid_gps_age_ms=None, - captured_at=time.monotonic_ns(), + captured_at=self._clock.monotonic_ns(), ) # ------------------------------------------------------------------ diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py index 788cf7e..c22b968 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py @@ -22,10 +22,9 @@ from __future__ import annotations import os import secrets import threading -import time from collections.abc import Callable from datetime import datetime, timezone -from typing import Any, Final +from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.emitted import EmittedExternalPosition from gps_denied_onboard._types.fc import ( @@ -39,9 +38,13 @@ from gps_denied_onboard._types.fc import ( ) from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.state import EstimatorOutput +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( CovarianceProjector, ) + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock from gps_denied_onboard.components.c8_fc_adapter._inbound_mavlink import ( PymavlinkInboundDecoder, ) @@ -94,7 +97,7 @@ class PymavlinkArdupilotAdapter: wgs_converter: Any, covariance_projector: CovarianceProjector, fdr_client: FdrClient, - clock: Callable[[], float] = time.monotonic, + clock: Clock | None = None, flight_id: str = "", connect_factory: Callable[[str, int], Any] | None = None, ) -> None: @@ -102,7 +105,7 @@ class PymavlinkArdupilotAdapter: self._wgs_converter = wgs_converter self._cov_projector = covariance_projector self._fdr_client = fdr_client - self._clock = clock + self._clock: Clock = clock if clock is not None else WallClock() self._flight_id = flight_id self._connect_factory = connect_factory self._signing_failure_threshold = max(1, int(config.fc.signing_failure_threshold)) @@ -122,10 +125,11 @@ class PymavlinkArdupilotAdapter: self._bus = SubscriptionBus() self._inbound: PymavlinkInboundDecoder | None = None self._inbound_thread: threading.Thread | None = None - # Outbound provenance rate limiter. + # Outbound provenance rate limiter; wraps the injected Clock as a + # float-seconds callable (the limiter's existing API contract). self._provenance = StatusTextTransitionRateLimiter( send_statustext=self._send_statustext_internal, - clock=time.monotonic, + clock=self._monotonic_s, ) # ------------------------------------------------------------------ @@ -226,7 +230,7 @@ class PymavlinkArdupilotAdapter: raise FcEmitError("smoothed output cannot be emitted to FC (Invariant 6)") horiz_accuracy_m = self._cov_projector.to_ardupilot_horiz_accuracy_m(output) wgs = self._extract_wgs84(output) - emitted_at = time.monotonic_ns() + emitted_at = self._clock.monotonic_ns() self._sequence_number += 1 seq = self._sequence_number try: @@ -312,7 +316,7 @@ class PymavlinkArdupilotAdapter: if not self._opened or self._connection is None: raise FcEmitError("adapter not opened") self._enforce_single_writer() - now_ns = time.monotonic_ns() + now_ns = self._clock.monotonic_ns() if self._last_switch_attempt_ns: elapsed_s = (now_ns - self._last_switch_attempt_ns) / 1_000_000_000 if elapsed_s < _SWITCH_RATE_LIMIT_S: @@ -388,7 +392,7 @@ class PymavlinkArdupilotAdapter: state=FlightState.INIT, last_valid_gps_hint_wgs84=None, last_valid_gps_age_ms=None, - captured_at=time.monotonic_ns(), + captured_at=self._clock.monotonic_ns(), ) payload = latest.payload assert isinstance(payload, FlightStateSignal) @@ -542,9 +546,9 @@ class PymavlinkArdupilotAdapter: Returns the ACK message on match, or ``None`` on timeout. Other COMMAND_ACK messages (for unrelated commands) are ignored. """ - deadline = self._clock() + (timeout_ms / 1000.0) + deadline = self._monotonic_s() + (timeout_ms / 1000.0) while True: - remaining = deadline - self._clock() + remaining = deadline - self._monotonic_s() if remaining <= 0: return None try: @@ -608,11 +612,14 @@ class PymavlinkArdupilotAdapter: ) return wgs + def _monotonic_s(self) -> float: + return self._clock.monotonic_ns() / 1_000_000_000 + def _clock_us(self) -> int: - return int(self._clock() * 1_000_000) + return self._clock.monotonic_ns() // 1_000 def _clock_ms_boot(self) -> int: - return int(self._clock() * 1_000) + return self._clock.monotonic_ns() // 1_000_000 def _fdr_signing_event(self, *, kind: str, kv: dict[str, Any]) -> None: record = FdrRecord( diff --git a/src/gps_denied_onboard/frame_source/__init__.py b/src/gps_denied_onboard/frame_source/__init__.py index 4f03516..77a98fc 100644 --- a/src/gps_denied_onboard/frame_source/__init__.py +++ b/src/gps_denied_onboard/frame_source/__init__.py @@ -1,9 +1,21 @@ -"""FrameSource interface + concrete implementations. +"""``FrameSource`` cross-cutting interface — public surface (AZ-398 v1.0.0). -The interface is bootstrap-stubbed here. `LiveCameraFrameSource` and -`VideoFileFrameSource` are owned by AZ-398. +Per AC-9, this module re-exports the Protocol and the error family +ONLY. Concrete strategies (``LiveCameraFrameSource``, +``VideoFileFrameSource``) live in their own modules and are imported +LAZILY by ``runtime_root.frame_source_factory.build_frame_source``; +this keeps the lazy-import boundary explicit and lets Tier-0 builds +omit the OpenCV runtime entirely. """ +from gps_denied_onboard.frame_source.errors import ( + FrameSourceConfigError, + FrameSourceError, +) from gps_denied_onboard.frame_source.interface import FrameSource -__all__ = ["FrameSource"] +__all__ = [ + "FrameSource", + "FrameSourceConfigError", + "FrameSourceError", +] diff --git a/src/gps_denied_onboard/frame_source/errors.py b/src/gps_denied_onboard/frame_source/errors.py new file mode 100644 index 0000000..5bf83e3 --- /dev/null +++ b/src/gps_denied_onboard/frame_source/errors.py @@ -0,0 +1,48 @@ +"""``FrameSource`` error taxonomy (AZ-398 v1.0.0). + +Per the replay contract +(``_docs/02_document/contracts/replay/replay_protocol.md``), every +transient I/O failure on the camera path MUST surface as +:class:`FrameSourceError` (Invariant 4 — replay must be deterministic, +silent ``None`` drops are forbidden). + +The two-class hierarchy mirrors the C6/C7/C1 component taxonomies: + +- :class:`FrameSourceError` — operational failures during streaming + (decode error, device disconnect, out-of-order frame). +- :class:`FrameSourceConfigError` — composition-time failures (build + flag OFF, missing dependency, invalid config). +""" + +from __future__ import annotations + + +class FrameSourceError(RuntimeError): + """Transient or fatal failure during frame ingestion. + + Examples: + + - A corrupt H.264 keyframe in the replay video file. + - An ordering violation: ``next_frame()`` returned a frame whose + ``monotonic_ns`` is < the previous frame's (Invariant 3). + - A USB camera disconnect mid-flight (live source). + + The error message MUST identify the frame index or timestamp where + the failure occurred so the operator can correlate against the + upstream recording. + """ + + +class FrameSourceConfigError(RuntimeError): + """Composition-time configuration failure for a frame source. + + Examples: + + - ``BUILD_VIDEO_FILE_FRAME_SOURCE=OFF`` and the binary tried to + construct :class:`VideoFileFrameSource`. + - The configured video path does not exist or is not readable. + - OpenCV is not importable (Tier-0 / docker-minimal build). + """ + + +__all__ = ["FrameSourceError", "FrameSourceConfigError"] diff --git a/src/gps_denied_onboard/frame_source/interface.py b/src/gps_denied_onboard/frame_source/interface.py index 282c608..a392328 100644 --- a/src/gps_denied_onboard/frame_source/interface.py +++ b/src/gps_denied_onboard/frame_source/interface.py @@ -1,18 +1,62 @@ -"""`FrameSource` Protocol. +"""``FrameSource`` Protocol — public Layer 1 cross-cutting interface (AZ-398 v1.0.0). -Owned by AZ-398 (E-DEMO-REPLAY) for the formalisation; bootstrap ships the -interface stub so C1 can be constructor-injected against it. +Frozen per ``_docs/02_document/contracts/replay/replay_protocol.md``. + +Two strategies implement this Protocol: + +- :class:`LiveCameraFrameSource` — the formalised live camera ingest + path (gated ``BUILD_LIVE_CAMERA_FRAME_SOURCE``). +- :class:`VideoFileFrameSource` — the replay-only file decoder (gated + ``BUILD_VIDEO_FILE_FRAME_SOURCE``). + +Consumers (C1 :class:`VioStrategy`) accept a :class:`FrameSource` via +constructor injection so production code stays mode-agnostic +(Invariant 1). """ from __future__ import annotations -from collections.abc import Iterator -from typing import Protocol +from typing import TYPE_CHECKING, Protocol, runtime_checkable -from gps_denied_onboard._types.nav import NavCameraFrame +if TYPE_CHECKING: + from gps_denied_onboard._types.nav import NavCameraFrame +@runtime_checkable class FrameSource(Protocol): - """A source of `NavCameraFrame` instances.""" + """A pluggable camera-frame producer. - def frames(self) -> Iterator[NavCameraFrame]: ... + The Protocol exposes two methods and one ordering invariant: + + - :meth:`next_frame` returns the next :class:`NavCameraFrame` (with + ``metadata["monotonic_ns"]`` set by the strategy from its + injected :class:`Clock`) or ``None`` ONLY when the stream is + permanently exhausted (Invariant 4). + - Consecutive ``next_frame()`` returns MUST have non-decreasing + ``metadata["monotonic_ns"]`` (Invariant 3); out-of-order frames + raise :class:`FrameSourceError`. + - :meth:`close` releases the underlying capture handle and is + idempotent (AC-10). + """ + + def next_frame(self) -> "NavCameraFrame | None": + """Return the next frame, ``None`` on end-of-stream. + + Transient I/O failures (decode error, disconnect) MUST raise + :class:`FrameSourceError` — never return ``None`` silently + (Invariant 4). After ``None`` has been returned once, every + subsequent call MUST also return ``None`` (idempotent EOS). + """ + ... + + def close(self) -> None: + """Release the underlying capture handle. + + Idempotent: a second call is a no-op (AC-10); the strategy + SHOULD log a DEBUG line on the second call so a debug trace + can prove no double-free occurred. + """ + ... + + +__all__ = ["FrameSource"] diff --git a/src/gps_denied_onboard/frame_source/live_camera.py b/src/gps_denied_onboard/frame_source/live_camera.py new file mode 100644 index 0000000..da8a9cf --- /dev/null +++ b/src/gps_denied_onboard/frame_source/live_camera.py @@ -0,0 +1,161 @@ +"""``LiveCameraFrameSource`` — live nav-camera ingest (AZ-398). + +Wraps :class:`cv2.VideoCapture` against an integer device index (the +USB / CSI camera bound at boot by the airborne / research / operator +binaries). The strategy is intentionally minimal: each +:meth:`next_frame` call performs one blocking ``capture.read()`` and +returns the freshest frame; no dedicated decode thread, no ring +buffer. C1 (the only consumer) drives the loop at its target +rate, and a blocking read is the simplest way to apply backpressure. + +Gated by ``BUILD_LIVE_CAMERA_FRAME_SOURCE`` (Invariant 9). The flag is +``ON`` for live / research / operator / replay binaries and ``OFF`` +only for unit tests that need to construct a substitute without +touching a real camera. +""" + +from __future__ import annotations + +import logging +import os +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from gps_denied_onboard.frame_source.errors import ( + FrameSourceConfigError, + FrameSourceError, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.clock import Clock + + +_BUILD_FLAG = "BUILD_LIVE_CAMERA_FRAME_SOURCE" + +_logger = logging.getLogger(__name__) + + +def _build_flag_on() -> bool: + raw = os.environ.get(_BUILD_FLAG, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +class LiveCameraFrameSource: + """Live :class:`FrameSource` strategy backed by ``cv2.VideoCapture``. + + Constructor parameters: + + - ``device_index`` — integer index passed to ``cv2.VideoCapture``; + typically ``0`` for the first attached camera. + - ``camera_calibration_id`` — string identifier baked into every + emitted frame (matches the intrinsics file shipped with the + binary). + - ``clock`` — injected :class:`Clock`; supplies the per-frame + ``monotonic_ns`` ordering key and the wall-clock timestamp. + """ + + __slots__ = ( + "_device_index", + "_camera_calibration_id", + "_clock", + "_capture", + "_frame_counter", + "_last_monotonic_ns", + "_closed", + ) + + def __init__( + self, + *, + device_index: int, + camera_calibration_id: str, + clock: "Clock", + ) -> None: + if not _build_flag_on(): + raise FrameSourceConfigError( + f"{_BUILD_FLAG} is OFF in this binary; " + "LiveCameraFrameSource is unavailable." + ) + try: + import cv2 as _cv2 + except ImportError as exc: + raise FrameSourceConfigError( + "LiveCameraFrameSource requires opencv-python; not " + "importable in this binary." + ) from exc + capture = _cv2.VideoCapture(device_index) + if not capture.isOpened(): + capture.release() + raise FrameSourceConfigError( + f"LiveCameraFrameSource: cv2.VideoCapture could not open " + f"device index {device_index}" + ) + self._device_index = device_index + self._camera_calibration_id = camera_calibration_id + self._clock = clock + self._capture = capture + self._frame_counter = 0 + self._last_monotonic_ns = -1 + self._closed = False + + def next_frame(self) -> "NavCameraFrame | None": + from gps_denied_onboard._types.nav import NavCameraFrame + + if self._closed: + return None + ok, image = self._capture.read() + if not ok or image is None: + # Live camera: a failed read is a transient error (USB + # glitch, driver hiccup). Invariant 4 requires raising, + # not returning None — the only legitimate None is EOS, + # and a live camera never EOSes. + raise FrameSourceError( + f"LiveCameraFrameSource: cv2.VideoCapture.read failed at " + f"frame {self._frame_counter} (device " + f"{self._device_index})" + ) + monotonic_ns = self._clock.monotonic_ns() + if monotonic_ns < self._last_monotonic_ns: + raise FrameSourceError( + f"LiveCameraFrameSource: clock went backwards at frame " + f"{self._frame_counter}: {monotonic_ns} ns followed " + f"{self._last_monotonic_ns} ns (Invariant 3)" + ) + timestamp = datetime.fromtimestamp( + self._clock.time_ns() / 1e9, tz=timezone.utc + ) + metadata: dict[str, Any] = { + "monotonic_ns": monotonic_ns, + "source": "live_camera", + "device_index": self._device_index, + } + frame = NavCameraFrame( + frame_id=self._frame_counter, + timestamp=timestamp, + image=image, + camera_calibration_id=self._camera_calibration_id, + metadata=metadata, + ) + self._frame_counter += 1 + self._last_monotonic_ns = monotonic_ns + return frame + + def close(self) -> None: + if self._closed: + _logger.debug( + "LiveCameraFrameSource(device=%s) close called twice; no-op", + self._device_index, + ) + return + self._closed = True + try: + self._capture.release() + except Exception: # pragma: no cover — defensive. + _logger.exception( + "LiveCameraFrameSource(device=%s) cv2.release() raised", + self._device_index, + ) + + +__all__ = ["LiveCameraFrameSource"] diff --git a/src/gps_denied_onboard/frame_source/video_file.py b/src/gps_denied_onboard/frame_source/video_file.py new file mode 100644 index 0000000..bdf8178 --- /dev/null +++ b/src/gps_denied_onboard/frame_source/video_file.py @@ -0,0 +1,199 @@ +"""``VideoFileFrameSource`` — replay-only file decoder (AZ-398). + +Streams an MP4 / MKV / AVI file frame-by-frame via OpenCV's +:class:`cv2.VideoCapture`. Each emitted :class:`NavCameraFrame` +carries: + +- ``frame_id`` — a strictly-increasing counter starting at 0. +- ``timestamp`` — UTC wall-clock at decode time (from the injected + :class:`Clock`); the file's own pts is NOT used for this field + because replay deterministically remaps it. +- ``image`` — the decoded BGR ``numpy.ndarray`` (OpenCV native order). +- ``metadata["monotonic_ns"]`` — the injected :class:`Clock`'s + ``monotonic_ns()`` at decode time. This is the value AC-2 asserts + non-decreasing. +- ``metadata["source_pts_ns"]`` — the file's per-frame PTS in ns (the + ``CAP_PROP_POS_MSEC`` reading × 1e6) for downstream determinism. + +Gated by ``BUILD_VIDEO_FILE_FRAME_SOURCE`` (Invariant 9). The check is +performed at constructor entry — a Tier-0 build that imports this +module by accident still raises ``FrameSourceConfigError`` cleanly +without attempting an OpenCV import. +""" + +from __future__ import annotations + +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from gps_denied_onboard.frame_source.errors import ( + FrameSourceConfigError, + FrameSourceError, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.clock import Clock + + +_BUILD_FLAG = "BUILD_VIDEO_FILE_FRAME_SOURCE" + +_logger = logging.getLogger(__name__) + + +def _build_flag_on() -> bool: + """``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``.""" + raw = os.environ.get(_BUILD_FLAG, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +class VideoFileFrameSource: + """Replay :class:`FrameSource` strategy backed by ``cv2.VideoCapture``. + + Stream-decodes a video file; per-frame decode is amortised by + OpenCV's internal buffer. The strategy preserves the file's frame + order — there is no seek, no random-access path; this keeps + replay deterministic (Invariant 10). + + Constructor parameters: + + - ``path`` — filesystem path to an MP4/MKV/AVI (existence checked + at construction). + - ``camera_calibration_id`` — string identifier propagated into + every emitted :class:`NavCameraFrame` so downstream consumers + (C1, C3, C4) load the correct intrinsics for the recording. + - ``clock`` — injected :class:`Clock`; the strategy reads + ``clock.monotonic_ns()`` per emitted frame for the + ``metadata["monotonic_ns"]`` ordering field. + """ + + __slots__ = ( + "_path", + "_camera_calibration_id", + "_clock", + "_capture", + "_frame_counter", + "_last_monotonic_ns", + "_closed", + "_eos_returned", + ) + + def __init__( + self, + *, + path: Path | str, + camera_calibration_id: str, + clock: "Clock", + ) -> None: + if not _build_flag_on(): + raise FrameSourceConfigError( + f"{_BUILD_FLAG} is OFF in this binary; " + "VideoFileFrameSource is unavailable. Rebuild with the " + "flag set to ON in the replay binary's Dockerfile." + ) + resolved = Path(path) + if not resolved.is_file(): + raise FrameSourceConfigError( + f"VideoFileFrameSource: path {resolved!s} does not exist " + "or is not a regular file." + ) + try: + import cv2 as _cv2 + except ImportError as exc: + raise FrameSourceConfigError( + "VideoFileFrameSource requires opencv-python; not " + "importable in this binary." + ) from exc + capture = _cv2.VideoCapture(str(resolved)) + if not capture.isOpened(): + capture.release() + raise FrameSourceConfigError( + f"VideoFileFrameSource: cv2.VideoCapture could not open " + f"{resolved!s} (unsupported codec or corrupt header)." + ) + self._path = resolved + self._camera_calibration_id = camera_calibration_id + self._clock = clock + self._capture = capture + self._frame_counter = 0 + self._last_monotonic_ns = -1 + self._closed = False + self._eos_returned = False + + def next_frame(self) -> "NavCameraFrame | None": + from gps_denied_onboard._types.nav import NavCameraFrame + + if self._closed or self._eos_returned: + return None + try: + import cv2 as _cv2 + except ImportError as exc: # pragma: no cover — established at __init__. + raise FrameSourceError( + "VideoFileFrameSource: opencv-python disappeared between " + "construction and next_frame()" + ) from exc + ok, image = self._capture.read() + if not ok: + self._eos_returned = True + return None + if image is None: + # OpenCV's read() returning ok=True with image=None signals a + # decoder-internal failure for the current frame; treat as a + # transient error per Invariant 4 rather than silently + # advancing. + raise FrameSourceError( + f"VideoFileFrameSource: video decode failed at frame " + f"{self._frame_counter} (cv2.VideoCapture.read returned " + "ok=True with image=None)" + ) + monotonic_ns = self._clock.monotonic_ns() + if monotonic_ns < self._last_monotonic_ns: + raise FrameSourceError( + f"VideoFileFrameSource: clock went backwards at frame " + f"{self._frame_counter}: {monotonic_ns} ns followed " + f"{self._last_monotonic_ns} ns (Invariant 3)" + ) + pos_msec = float(self._capture.get(_cv2.CAP_PROP_POS_MSEC)) + source_pts_ns = int(pos_msec * 1_000_000) + timestamp = datetime.fromtimestamp( + self._clock.time_ns() / 1e9, tz=timezone.utc + ) + metadata: dict[str, Any] = { + "monotonic_ns": monotonic_ns, + "source_pts_ns": source_pts_ns, + "source": "video_file", + } + frame = NavCameraFrame( + frame_id=self._frame_counter, + timestamp=timestamp, + image=image, + camera_calibration_id=self._camera_calibration_id, + metadata=metadata, + ) + self._frame_counter += 1 + self._last_monotonic_ns = monotonic_ns + return frame + + def close(self) -> None: + if self._closed: + _logger.debug( + "VideoFileFrameSource(%s) close called twice; no-op", + self._path, + ) + return + self._closed = True + try: + self._capture.release() + except Exception: # pragma: no cover — defensive. + # cv2.VideoCapture.release should never raise; if it does on + # an exotic backend, we still want to flag the source as + # closed so a second close() stays a no-op. + _logger.exception( + "VideoFileFrameSource(%s) cv2.release() raised", self._path + ) + + +__all__ = ["VideoFileFrameSource"] diff --git a/src/gps_denied_onboard/runtime_root/clock_factory.py b/src/gps_denied_onboard/runtime_root/clock_factory.py new file mode 100644 index 0000000..00fadb0 --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/clock_factory.py @@ -0,0 +1,61 @@ +"""Composition-root :class:`Clock` factory (AZ-398). + +Composition resolves :class:`Clock` exactly once per process per +Invariant — Single Clock per process. Live / research / operator +binaries call :func:`build_clock(kind="wall")`; the replay binary +calls :func:`build_clock(kind="tlog", source=...)` (the replay +composition root, AZ-401, wires the tlog timestamp source). + +Concrete strategy modules (``wall_clock``, ``tlog_derived``) live +under :mod:`gps_denied_onboard.clock`; they are imported eagerly here +because the Clock has no Tier-specific runtime dependency and the +selection happens at startup. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable +from typing import TYPE_CHECKING + +from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock +from gps_denied_onboard.clock.wall_clock import WallClock + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + + +__all__ = ["build_clock"] + + +def build_clock( + *, + kind: str = "wall", + source: Callable[[], int] | Iterable[int] | None = None, +) -> "Clock": + """Construct the :class:`Clock` strategy for this process. + + ``kind`` is one of ``"wall"`` (default) or ``"tlog"``. ``source`` is + required when ``kind == "tlog"`` (it carries the tlog parser's + timestamp stream) and forbidden otherwise. + + Raises :class:`ValueError` on an unknown ``kind`` or a misconfigured + source — neither is recoverable, so failing loudly at composition + time is correct. + """ + if kind == "wall": + if source is not None: + raise ValueError( + "build_clock(kind='wall'): source must be None; " + "WallClock takes no upstream timestamp stream." + ) + return WallClock() + if kind == "tlog": + if source is None: + raise ValueError( + "build_clock(kind='tlog'): source is required (the tlog " + "timestamp stream from the replay parser)." + ) + return TlogDerivedClock(source) + raise ValueError( + f"build_clock: unknown kind {kind!r}; expected 'wall' or 'tlog'" + ) diff --git a/src/gps_denied_onboard/runtime_root/frame_source_factory.py b/src/gps_denied_onboard/runtime_root/frame_source_factory.py new file mode 100644 index 0000000..020e804 --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/frame_source_factory.py @@ -0,0 +1,91 @@ +"""Composition-root :class:`FrameSource` factory (AZ-398). + +Selects exactly one :class:`FrameSource` strategy per binary based on +the requested ``kind`` and the compile-time ``BUILD_*`` flags. The +concrete strategy modules are imported lazily so a Tier-0 build with +``BUILD_LIVE_CAMERA_FRAME_SOURCE=OFF`` and +``BUILD_VIDEO_FILE_FRAME_SOURCE=OFF`` never pulls OpenCV into +``sys.modules`` (Invariant 9 — verifiable via ``sys.modules``). + +Build-flag gating happens INSIDE the strategy constructor (so unit +tests that monkey-patch the env still hit the gate); this factory +performs the strategy-name → module mapping only. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from gps_denied_onboard.frame_source.errors import FrameSourceConfigError + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.frame_source import FrameSource + + +__all__ = ["build_frame_source"] + + +def build_frame_source( + *, + kind: str, + camera_calibration_id: str, + clock: "Clock", + device_index: int | None = None, + video_path: Path | str | None = None, +) -> "FrameSource": + """Construct the :class:`FrameSource` strategy. + + ``kind`` is one of ``"live"`` or ``"video_file"``: + + - ``"live"`` requires ``device_index`` (integer camera index) and + forbids ``video_path``. + - ``"video_file"`` requires ``video_path`` (filesystem path) and + forbids ``device_index``. + + Build-flag gating is enforced by the strategy constructor; this + factory raises :class:`FrameSourceConfigError` ONLY on argument- + shape mistakes (missing or extra parameters for the chosen kind). + """ + if kind == "live": + if device_index is None: + raise FrameSourceConfigError( + "build_frame_source(kind='live'): device_index is required" + ) + if video_path is not None: + raise FrameSourceConfigError( + "build_frame_source(kind='live'): video_path must be None" + ) + from gps_denied_onboard.frame_source.live_camera import ( + LiveCameraFrameSource, + ) + + return LiveCameraFrameSource( + device_index=device_index, + camera_calibration_id=camera_calibration_id, + clock=clock, + ) + if kind == "video_file": + if video_path is None: + raise FrameSourceConfigError( + "build_frame_source(kind='video_file'): video_path is required" + ) + if device_index is not None: + raise FrameSourceConfigError( + "build_frame_source(kind='video_file'): " + "device_index must be None" + ) + from gps_denied_onboard.frame_source.video_file import ( + VideoFileFrameSource, + ) + + return VideoFileFrameSource( + path=video_path, + camera_calibration_id=camera_calibration_id, + clock=clock, + ) + raise FrameSourceConfigError( + f"build_frame_source: unknown kind {kind!r}; " + "expected 'live' or 'video_file'" + ) diff --git a/tests/_meta/__init__.py b/tests/_meta/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/_meta/test_no_direct_time_in_components.py b/tests/_meta/test_no_direct_time_in_components.py new file mode 100644 index 0000000..5ccf184 --- /dev/null +++ b/tests/_meta/test_no_direct_time_in_components.py @@ -0,0 +1,105 @@ +"""AC-4 — components MUST NOT call ``time.monotonic_ns`` / ``time.time_ns`` / ``time.sleep``. + +Enforces Invariant 2 of the replay contract +(``_docs/02_document/contracts/replay/replay_protocol.md``): every +time-driven code path in a C* component consumes an injected +:class:`Clock` instead. Replay determinism (R-DEMO-4) collapses the +moment a component reaches into the stdlib ``time`` module directly, +so this guard runs on every PR touching ``src/gps_denied_onboard/components/``. + +The scan is AST-based — docstrings and comments mentioning the forbidden +APIs do NOT trip it; only call sites and attribute references do. +""" + +from __future__ import annotations + +import ast +from pathlib import Path + +import pytest + +_FORBIDDEN_ATTRS: frozenset[str] = frozenset( + {"monotonic_ns", "time_ns", "sleep"} +) + +_COMPONENTS_ROOT: Path = ( + Path(__file__).parent.parent.parent + / "src" + / "gps_denied_onboard" + / "components" +) + + +def _python_files_under(root: Path) -> list[Path]: + return sorted(p for p in root.rglob("*.py") if p.is_file()) + + +def _find_direct_time_references(source: str) -> list[tuple[int, str]]: + """Return ``(lineno, attribute_name)`` for every direct ``time.X`` ref. + + Only flags ``ast.Attribute(value=ast.Name(id='time'), attr=)`` + where ```` is one of the forbidden names. Aliased imports + (``import time as t`` → ``t.monotonic_ns()``) are intentionally NOT + caught — the component code convention is to avoid such aliases, and + catching them would require flow-sensitive analysis. + """ + hits: list[tuple[int, str]] = [] + tree = ast.parse(source) + for node in ast.walk(tree): + if not isinstance(node, ast.Attribute): + continue + if not isinstance(node.value, ast.Name): + continue + if node.value.id != "time": + continue + if node.attr in _FORBIDDEN_ATTRS: + hits.append((node.lineno, f"time.{node.attr}")) + return hits + + +def test_components_have_no_direct_time_references() -> None: + # Arrange + files = _python_files_under(_COMPONENTS_ROOT) + assert files, f"AST scan found no .py files under {_COMPONENTS_ROOT}" + offences: list[str] = [] + # Act + for file in files: + source = file.read_text(encoding="utf-8") + for lineno, attr in _find_direct_time_references(source): + rel = file.relative_to(_COMPONENTS_ROOT.parent.parent.parent.parent) + offences.append(f"{rel}:{lineno} — {attr}") + # Assert + assert not offences, ( + "Invariant 2 violation: direct stdlib-`time` references found in " + "`src/gps_denied_onboard/components/**/*.py`. Consume an injected " + "`Clock` (`gps_denied_onboard.clock`) instead.\n" + + "\n".join(offences) + ) + + +def test_scan_helper_detects_known_forbidden_pattern() -> None: + # Arrange — self-check the AST helper so a stale scan can't silently pass. + source = "import time\ndef f() -> int:\n return time.monotonic_ns()\n" + # Act + hits = _find_direct_time_references(source) + # Assert + assert hits == [(3, "time.monotonic_ns")] + + +def test_scan_helper_ignores_docstring_mentions() -> None: + # Arrange — docstrings naming the forbidden API must not trip the scan. + source = '"""This module talks about time.monotonic_ns in prose only."""\n' + # Act + hits = _find_direct_time_references(source) + # Assert + assert hits == [] + + +@pytest.mark.parametrize("forbidden", sorted(_FORBIDDEN_ATTRS)) +def test_scan_helper_detects_each_forbidden_attr(forbidden: str) -> None: + # Arrange + source = f"import time\ntime.{forbidden}()\n" + # Act + hits = _find_direct_time_references(source) + # Assert + assert hits == [(2, f"time.{forbidden}")] diff --git a/tests/unit/c5_state/test_az388_fallback_watcher.py b/tests/unit/c5_state/test_az388_fallback_watcher.py index 9e9e33a..33489b1 100644 --- a/tests/unit/c5_state/test_az388_fallback_watcher.py +++ b/tests/unit/c5_state/test_az388_fallback_watcher.py @@ -213,7 +213,7 @@ def test_ac6_custom_threshold_5s_engages_at_5s() -> None: def test_ac6_zero_threshold_rejected() -> None: with pytest.raises(ValueError, match="threshold_s must be > 0"): - FallbackWatcher(threshold_s=0.0, fdr_client=None) + FallbackWatcher(threshold_s=0.0, fdr_client=None, clock_ns=lambda: 0) # --------------------------------------------------------------------- @@ -378,17 +378,14 @@ def test_ac7_isam2_current_estimate_entry_engages_after_threshold() -> None: # and call current_estimate WITHOUT a seeded prior so it raises # EstimatorFatalError after the entry hook engages fallback. estimator._fallback._last_successful_estimate_ns = 0 - # Patch monotonic_ns inside the estimator module so the entry - # hook sees the synthesised "now". + # Patch the estimator's injected Clock so the entry hook sees the + # synthesised "now" (AZ-398: components consume an injected + # :class:`Clock`, not :func:`time.monotonic_ns`). from gps_denied_onboard.components.c5_state.errors import EstimatorFatalError - with ( - mock.patch( - "gps_denied_onboard.components.c5_state.gtsam_isam2_estimator.time.monotonic_ns", - return_value=int(4.0 * 1e9), - ), - pytest.raises(EstimatorFatalError), - ): + estimator._clock = mock.MagicMock() + estimator._clock.monotonic_ns.return_value = int(4.0 * 1e9) + with pytest.raises(EstimatorFatalError): estimator.current_estimate() assert len(engaged_seen) == 1 diff --git a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py index fc5585f..3128209 100644 --- a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py +++ b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py @@ -70,6 +70,22 @@ class _ConnStub: self.closed = True +class _FixedClock: + """Test :class:`Clock` stand-in returning constant ``monotonic_ns``.""" + + def __init__(self, ns: int) -> None: + self._ns = ns + + def monotonic_ns(self) -> int: + return self._ns + + def time_ns(self) -> int: + return self._ns + + def sleep_until_ns(self, target_ns: int) -> None: + return None + + @pytest.fixture def conn() -> _ConnStub: return _ConnStub() @@ -85,7 +101,7 @@ def adapter(conn: _ConnStub, tmp_path) -> PymavlinkArdupilotAdapter: wgs_converter=mock.MagicMock(), covariance_projector=cov, fdr_client=fdr, - clock=lambda: 1.0, + clock=_FixedClock(1_000_000_000), flight_id="flt-test", connect_factory=lambda device, baud: conn, ) diff --git a/tests/unit/clock/__init__.py b/tests/unit/clock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/clock/test_protocol_conformance.py b/tests/unit/clock/test_protocol_conformance.py new file mode 100644 index 0000000..d88419c --- /dev/null +++ b/tests/unit/clock/test_protocol_conformance.py @@ -0,0 +1,180 @@ +"""AZ-398 — :class:`Clock` Protocol conformance + WallClock parity + TlogDerivedClock semantics.""" + +from __future__ import annotations + +import time + +import pytest + +from gps_denied_onboard.clock import Clock +from gps_denied_onboard.clock.tlog_derived import ( + ClockOrderingError, + TlogDerivedClock, +) +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.runtime_root.clock_factory import build_clock + + +# --------------------------------------------------------------------------- +# AC-1 — Protocol conformance. + + +def test_wall_clock_satisfies_clock_protocol() -> None: + # Assert + assert isinstance(WallClock(), Clock) + + +def test_tlog_derived_clock_satisfies_clock_protocol() -> None: + # Assert + assert isinstance(TlogDerivedClock([1, 2, 3]), Clock) + + +# --------------------------------------------------------------------------- +# AC-5 — WallClock parity with :mod:`time`. + + +def test_wall_clock_monotonic_ns_tracks_stdlib() -> None: + # Arrange + clock = WallClock() + # Act + stdlib_before = time.monotonic_ns() + clock_now = clock.monotonic_ns() + stdlib_after = time.monotonic_ns() + # Assert + assert stdlib_before <= clock_now <= stdlib_after + + +def test_wall_clock_time_ns_tracks_stdlib_within_1ms() -> None: + # Arrange + clock = WallClock() + # Act + stdlib = time.time_ns() + clock_now = clock.time_ns() + # Assert + assert abs(clock_now - stdlib) <= 1_000_000 + + +def test_wall_clock_sleep_until_ns_blocks_for_about_100ms() -> None: + # Arrange + clock = WallClock() + # Act + start = time.monotonic_ns() + target = start + 100_000_000 # 100 ms in the future + clock.sleep_until_ns(target) + elapsed_ns = time.monotonic_ns() - start + # Assert — AC-5 allows ±5 ms slack on a 100 ms sleep + assert 95_000_000 <= elapsed_ns <= 200_000_000 + + +def test_wall_clock_sleep_until_past_target_is_noop() -> None: + # Arrange + clock = WallClock() + past = clock.monotonic_ns() - 10_000_000_000 # 10 s ago + # Act + start = time.monotonic_ns() + clock.sleep_until_ns(past) + elapsed_ns = time.monotonic_ns() - start + # Assert — should return almost immediately (no negative sleep) + assert elapsed_ns < 5_000_000 # < 5 ms + + +# --------------------------------------------------------------------------- +# AC-6 — TlogDerivedClock advance-on-call semantics. + + +def test_tlog_derived_clock_advances_per_call() -> None: + # Arrange + clock = TlogDerivedClock([1_000_000, 2_000_000, 3_000_000]) + # Act + a = clock.monotonic_ns() + b = clock.monotonic_ns() + c = clock.monotonic_ns() + # Assert + assert (a, b, c) == (1_000_000, 2_000_000, 3_000_000) + + +def test_tlog_derived_clock_time_ns_reflects_last_advance() -> None: + # Arrange + clock = TlogDerivedClock([42]) + # Act + before = clock.time_ns() + clock.monotonic_ns() + after = clock.time_ns() + # Assert + assert (before, after) == (0, 42) + + +def test_tlog_derived_clock_raises_on_non_monotonic_source() -> None: + # Arrange + clock = TlogDerivedClock([10, 5]) + clock.monotonic_ns() + # Act + Assert + with pytest.raises(ClockOrderingError): + clock.monotonic_ns() + + +def test_tlog_derived_clock_sleep_until_ns_is_noop() -> None: + # Arrange + clock = TlogDerivedClock([1]) + start = time.monotonic_ns() + # Act + clock.sleep_until_ns(10**18) # absurdly far in the future + elapsed_ns = time.monotonic_ns() - start + # Assert + assert elapsed_ns < 5_000_000 # < 5 ms + + +def test_tlog_derived_clock_accepts_callable_source() -> None: + # Arrange + counter = {"i": 0} + + def source() -> int: + counter["i"] += 1 + return counter["i"] * 1_000_000 + + clock = TlogDerivedClock(source) + # Act + a = clock.monotonic_ns() + b = clock.monotonic_ns() + # Assert + assert (a, b) == (1_000_000, 2_000_000) + + +def test_tlog_derived_clock_returns_last_value_when_source_exhausted() -> None: + # Arrange + clock = TlogDerivedClock([5]) + # Act + first = clock.monotonic_ns() + second = clock.monotonic_ns() + # Assert — exhausted source returns the latched value, not an error + assert (first, second) == (5, 5) + + +# --------------------------------------------------------------------------- +# Composition-root factory (build_clock). + + +def test_build_clock_wall_returns_wall_clock() -> None: + # Assert + assert isinstance(build_clock(kind="wall"), WallClock) + + +def test_build_clock_tlog_returns_tlog_derived_clock() -> None: + # Assert + assert isinstance(build_clock(kind="tlog", source=[1, 2]), TlogDerivedClock) + + +def test_build_clock_rejects_unknown_kind() -> None: + # Act + Assert + with pytest.raises(ValueError, match="unknown kind"): + build_clock(kind="invalid") # type: ignore[arg-type] + + +def test_build_clock_wall_rejects_source() -> None: + with pytest.raises(ValueError, match="source must be None"): + build_clock(kind="wall", source=[1]) + + +def test_build_clock_tlog_requires_source() -> None: + with pytest.raises(ValueError, match="source is required"): + build_clock(kind="tlog") diff --git a/tests/unit/frame_source/__init__.py b/tests/unit/frame_source/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/frame_source/test_protocol_conformance.py b/tests/unit/frame_source/test_protocol_conformance.py new file mode 100644 index 0000000..d6d6651 --- /dev/null +++ b/tests/unit/frame_source/test_protocol_conformance.py @@ -0,0 +1,303 @@ +"""AZ-398 — :class:`FrameSource` Protocol conformance + concrete strategy ACs.""" + +from __future__ import annotations + +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.frame_source import ( + FrameSource, + FrameSourceConfigError, +) +from gps_denied_onboard.frame_source.live_camera import LiveCameraFrameSource +from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource + + +# --------------------------------------------------------------------------- +# Helpers. + + +def _make_synthetic_video(path: Path, n_frames: int, fps: int = 30) -> None: + """Write an ``n_frames``-frame 64×48 BGR MP4V at ``path``.""" + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(path), fourcc, fps, (64, 48)) + if not writer.isOpened(): + raise RuntimeError(f"OpenCV could not open writer at {path!s}") + try: + for i in range(n_frames): + frame = np.full((48, 64, 3), i % 256, dtype=np.uint8) + writer.write(frame) + finally: + writer.release() + + +@pytest.fixture +def video_path_60(tmp_path: Path) -> Path: + """A synthetic 60-frame .mp4 file for AC-2.""" + path = tmp_path / "az398_synthetic.mp4" + _make_synthetic_video(path, n_frames=60) + return path + + +@pytest.fixture +def enable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "ON") + + +@pytest.fixture +def disable_video_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF") + + +@pytest.fixture +def disable_live_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUILD_LIVE_CAMERA_FRAME_SOURCE", "OFF") + + +# --------------------------------------------------------------------------- +# AC-1 — Protocol conformance. + + +def test_video_file_frame_source_satisfies_frame_source_protocol( + enable_video_flag: None, video_path_60: Path +) -> None: + # Arrange + Act + source = VideoFileFrameSource( + path=video_path_60, + camera_calibration_id="az398-synth", + clock=WallClock(), + ) + try: + # Assert + assert isinstance(source, FrameSource) + finally: + source.close() + + +# --------------------------------------------------------------------------- +# AC-2 — VideoFileFrameSource produces 60 ordered frames + idempotent EOS. + + +def test_video_file_frame_source_emits_60_frames_then_none( + enable_video_flag: None, video_path_60: Path +) -> None: + # Arrange + source = VideoFileFrameSource( + path=video_path_60, + camera_calibration_id="az398-synth", + clock=WallClock(), + ) + monotonics: list[int] = [] + try: + # Act + for _ in range(60): + frame = source.next_frame() + assert frame is not None + monotonics.append(frame.metadata["monotonic_ns"]) + # AC-2: 61st call → None; subsequent calls also None + eos_first = source.next_frame() + eos_second = source.next_frame() + finally: + source.close() + # Assert + assert eos_first is None + assert eos_second is None + assert len(monotonics) == 60 + # Non-decreasing monotonic_ns ordering (Invariant 3 / AC-2) + assert all(b >= a for a, b in zip(monotonics, monotonics[1:], strict=False)) + + +def test_video_file_frame_source_emits_frame_id_counter_and_metadata( + enable_video_flag: None, video_path_60: Path +) -> None: + # Arrange + source = VideoFileFrameSource( + path=video_path_60, + camera_calibration_id="az398-synth", + clock=WallClock(), + ) + try: + # Act + first = source.next_frame() + second = source.next_frame() + finally: + source.close() + # Assert + assert first is not None and second is not None + assert first.frame_id == 0 + assert second.frame_id == 1 + assert first.camera_calibration_id == "az398-synth" + assert first.metadata["source"] == "video_file" + assert "monotonic_ns" in first.metadata + assert "source_pts_ns" in first.metadata + + +# --------------------------------------------------------------------------- +# AC-7 — corrupt video file raises FrameSourceConfigError on construction. + + +def test_video_file_frame_source_rejects_corrupt_file( + enable_video_flag: None, tmp_path: Path +) -> None: + # Arrange + corrupt = tmp_path / "garbage.mp4" + corrupt.write_bytes(b"not actually mp4 content" * 256) + # Act + Assert + with pytest.raises(FrameSourceConfigError): + VideoFileFrameSource( + path=corrupt, + camera_calibration_id="az398-corrupt", + clock=WallClock(), + ) + + +def test_video_file_frame_source_rejects_missing_path( + enable_video_flag: None, tmp_path: Path +) -> None: + # Act + Assert + with pytest.raises(FrameSourceConfigError, match="does not exist"): + VideoFileFrameSource( + path=tmp_path / "missing.mp4", + camera_calibration_id="az398-missing", + clock=WallClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-8 — Build-flag gating. + + +def test_video_file_frame_source_refuses_when_build_flag_off( + disable_video_flag: None, tmp_path: Path +) -> None: + # Arrange — create a real file so the gate is exercised before path checks + valid = tmp_path / "any.mp4" + valid.write_bytes(b"") + # Act + Assert + with pytest.raises( + FrameSourceConfigError, match="BUILD_VIDEO_FILE_FRAME_SOURCE is OFF" + ): + VideoFileFrameSource( + path=valid, + camera_calibration_id="az398-gate", + clock=WallClock(), + ) + + +def test_live_camera_frame_source_refuses_when_build_flag_off( + disable_live_flag: None, +) -> None: + # Act + Assert + with pytest.raises( + FrameSourceConfigError, match="BUILD_LIVE_CAMERA_FRAME_SOURCE is OFF" + ): + LiveCameraFrameSource( + device_index=0, + camera_calibration_id="az398-live-gate", + clock=WallClock(), + ) + + +# --------------------------------------------------------------------------- +# AC-9 — Public API re-exports. + + +def test_frame_source_public_module_only_exposes_protocol_and_errors() -> None: + # Arrange + from gps_denied_onboard import frame_source as module + + # Assert — concrete strategies MUST NOT appear in __all__ per AC-9 + assert "FrameSource" in module.__all__ + assert "FrameSourceError" in module.__all__ + assert "FrameSourceConfigError" in module.__all__ + assert "LiveCameraFrameSource" not in module.__all__ + assert "VideoFileFrameSource" not in module.__all__ + + +# --------------------------------------------------------------------------- +# AC-10 — close is idempotent. + + +def test_video_file_frame_source_close_is_idempotent( + enable_video_flag: None, video_path_60: Path +) -> None: + # Arrange + source = VideoFileFrameSource( + path=video_path_60, + camera_calibration_id="az398-synth", + clock=WallClock(), + ) + # Act — closing twice must not raise (AC-10) + source.close() + source.close() + # Assert — next_frame after close returns None, not an exception + assert source.next_frame() is None + + +# --------------------------------------------------------------------------- +# Factory. + + +def test_build_frame_source_video_file_returns_video_file_source( + enable_video_flag: None, video_path_60: Path +) -> None: + from gps_denied_onboard.runtime_root.frame_source_factory import ( + build_frame_source, + ) + + source = build_frame_source( + kind="video_file", + camera_calibration_id="az398-factory", + clock=WallClock(), + video_path=video_path_60, + ) + try: + # Assert + assert isinstance(source, VideoFileFrameSource) + finally: + source.close() + + +def test_build_frame_source_rejects_unknown_kind() -> None: + from gps_denied_onboard.runtime_root.frame_source_factory import ( + build_frame_source, + ) + + with pytest.raises(FrameSourceConfigError, match="unknown kind"): + build_frame_source( + kind="invalid", # type: ignore[arg-type] + camera_calibration_id="x", + clock=WallClock(), + ) + + +def test_build_frame_source_live_rejects_video_path(enable_video_flag: None) -> None: + from gps_denied_onboard.runtime_root.frame_source_factory import ( + build_frame_source, + ) + + with pytest.raises(FrameSourceConfigError, match="video_path must be None"): + build_frame_source( + kind="live", + camera_calibration_id="x", + clock=WallClock(), + device_index=0, + video_path="/tmp/whatever.mp4", + ) + + +def test_build_frame_source_video_file_requires_video_path() -> None: + from gps_denied_onboard.runtime_root.frame_source_factory import ( + build_frame_source, + ) + + with pytest.raises(FrameSourceConfigError, match="video_path is required"): + build_frame_source( + kind="video_file", + camera_calibration_id="x", + clock=WallClock(), + )