[AZ-302] C7 ThermalStatePublisher — jtop/NVML 1 Hz background poller

Implements AZ-297 InferenceRuntime's thermal_state() side: a singleton
background-thread publisher that polls jtop (preferred) or pynvml
(fallback) at config.thermal_poll_hz, stores an atomic ThermalState
snapshot, and emits c7.thermal_transition FDR records on every throttle
flip with a WARN log on entry and an INFO log on exit. Default-safe on
TelemetryUnavailableError per Invariant I-6 with a 1-Hz rate-limited
WARN.

Sources return a raw ThermalReading; the publisher stamps measured_at_ns
via its injected Clock so _JtopSource / _PynvmlSource stay clean of
direct time.* calls (Invariant 2). _poll_once is the deterministic test
seam — start() spawns the production thread.

- c7.thermal_transition registered in fdr_client.records KNOWN_PAYLOAD_KEYS
- [telemetry] optional dep group (jetson-stats, pynvml) added to pyproject
- 14 unit tests (AC-1..AC-6, AC-8, NFR-default-safe, structural)
  green; AC-7 / AC-1 microbench / NFR-perf-poll Tier-2 deferred
- full unit suite: 1140 passed, 11 expected Tier-2/CUDA skips

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 10:33:37 +03:00
parent 59f56c032f
commit 49a67f770d
9 changed files with 1175 additions and 2 deletions
@@ -56,6 +56,11 @@ from gps_denied_onboard.components.c7_inference.manifest import (
ManifestReader,
ManifestReaderProtocol,
)
from gps_denied_onboard.components.c7_inference.thermal_publisher import (
ThermalReading,
ThermalSource,
ThermalStatePublisher,
)
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c7_inference", C7InferenceConfig)
@@ -84,7 +89,10 @@ __all__ = [
"PrecisionMode",
"RuntimeError",
"TelemetryUnavailableError",
"ThermalReading",
"ThermalSource",
"ThermalState",
"ThermalStatePublisher",
"default_registry",
"register_architecture",
]
@@ -0,0 +1,464 @@
"""``ThermalStatePublisher`` — 1 Hz background thermal poller (AZ-302).
Owns the single C7 thermal-telemetry source for the companion
process. Reads ``jtop`` (preferred) or ``pynvml`` (fallback) once per
poll period, builds a fresh :class:`ThermalState`, and stores it
atomically in ``_atomic_snapshot``. :meth:`read` is wait-free —
returns the current snapshot via a single object-reference load
(atomic under the Python GIL on CPython 3.10 / 3.11; documented in
the impl notes for the AZ-302 task spec).
Throttle-state transitions emit:
- one ``kind="c7.thermal_transition"`` FDR record per flip,
- one WARN log on throttle entry (False → True),
- one INFO log on throttle exit (True → False).
Telemetry unavailability is the default-safe path (Invariant I-6):
on :class:`TelemetryUnavailableError` from the source the publisher
emits a rate-limited (≤ 1/sec) WARN log and stores a snapshot with
``is_telemetry_available=False, thermal_throttle_active=False``;
consumers see truthful "telemetry off" instead of a lie about
throttle.
Composition contract:
publisher = ThermalStatePublisher(config, fdr_client)
publisher.start()
...
# consumers
snapshot = publisher.read()
...
publisher.stop()
``start()`` is what selects the source (``jtop`` → ``pynvml``);
if both backends are unavailable on the host, ``start()`` raises
:class:`TelemetryUnavailableError` (AC-5).
AC mapping (see ``_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md``):
- AC-1 : :meth:`read` returns the current ``_atomic_snapshot``;
wait-free perf is Tier-2 microbench.
- AC-2 / AC-3 : :meth:`_poll_once` detects throttle transitions and
emits FDR + log.
- AC-4 : repeated :class:`TelemetryUnavailableError` keeps the
default-safe snapshot + rate-limits WARN logs.
- AC-5 : :meth:`start` raises when no source is available.
- AC-6 : :meth:`start` / :meth:`stop` are idempotent.
- AC-7 / NFR-perf-poll : Tier-2 hot-path benchmark.
- AC-8 : FDR ``c7.thermal_transition`` payload matches the schema.
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Protocol, runtime_checkable
from gps_denied_onboard._types.thermal import ThermalState
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c7_inference.errors import (
TelemetryUnavailableError,
)
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.fdr_client.client import FdrClient
__all__ = [
"ThermalReading",
"ThermalSource",
"ThermalStatePublisher",
]
@dataclass(frozen=True)
class ThermalReading:
"""Raw thermal reading produced by a :class:`ThermalSource`.
The publisher stamps :attr:`ThermalState.measured_at_ns` from its
injected :class:`Clock` so :class:`ThermalSource` impls never reach
into ``time.monotonic_ns`` directly (Invariant 2 of the replay
contract — see ``tests/_meta/test_no_direct_time_in_components.py``).
"""
cpu_temp_c: float | None
gpu_temp_c: float | None
thermal_throttle_active: bool
measured_clock_mhz: int | None
_PRODUCER_ID = "c7_inference.thermal"
_TRANSITION_KIND = "c7.thermal_transition"
_UNAVAILABLE_KIND = "c7.thermal.unavailable"
_WARN_RATE_LIMIT_NS = 1_000_000_000 # 1 second between WARN logs on unavailable
@runtime_checkable
class ThermalSource(Protocol):
"""Single-poll thermal source — :class:`ThermalStatePublisher`'s only collaborator.
``read()`` must produce a fresh :class:`ThermalReading` or raise
:class:`TelemetryUnavailableError`. The publisher catches the latter
and falls back to the default-safe snapshot. The source owns its
backend lifecycle (jtop context manager, pynvml init/shutdown).
"""
def read(self) -> ThermalReading: # pragma: no cover - structural
...
def close(self) -> None: # pragma: no cover - structural
...
class ThermalStatePublisher:
"""Singleton-by-convention 1 Hz thermal poller."""
def __init__(
self,
config: Config,
fdr_client: FdrClient,
*,
source: ThermalSource | None = None,
clock: Clock | None = None,
) -> None:
self._config = config
self._fdr = fdr_client
self._clock = clock if clock is not None else WallClock()
self._injected_source = source
self._source: ThermalSource | None = None
self._period_ns = int(
1_000_000_000 / config.components["c7_inference"].thermal_poll_hz
)
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._logger = get_logger("c7_inference.thermal")
self._atomic_snapshot: ThermalState = _default_safe_snapshot(
self._clock.monotonic_ns()
)
self._last_throttle: bool = False
self._last_unavailable_warn_ns: int = 0
@property
def producer_id(self) -> str:
return _PRODUCER_ID
def start(self) -> None:
"""Select the source, prime the snapshot, spawn the polling thread."""
with self._lock:
if self._thread is not None and self._thread.is_alive():
return
if self._source is None:
self._source = self._select_source()
self._stop_event.clear()
self._poll_once(initial=True)
thread = threading.Thread(
target=self._loop, name=f"{_PRODUCER_ID}.poll", daemon=True
)
self._thread = thread
thread.start()
self._logger.info(
"thermal publisher started",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {
"event": "start",
"period_ns": self._period_ns,
"source": type(self._source).__name__
if self._source is not None
else "None",
},
},
)
def stop(self) -> None:
"""Signal the polling thread, join it, close the source. Idempotent."""
with self._lock:
if self._thread is None:
return
self._stop_event.set()
thread = self._thread
self._thread = None
thread.join(timeout=2.0 * self._period_ns / 1_000_000_000)
source = self._source
if source is not None:
try:
source.close()
except Exception as exc:
self._logger.warning(
"thermal source close raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"event": "stop_close_error", "error": str(exc)},
},
)
self._source = None
self._logger.info(
"thermal publisher stopped",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"event": "stop"},
},
)
def is_running(self) -> bool:
thread = self._thread
return thread is not None and thread.is_alive()
def read(self) -> ThermalState:
"""Wait-free reader. Returns the current ``_atomic_snapshot``."""
return self._atomic_snapshot
def _select_source(self) -> ThermalSource:
if self._injected_source is not None:
return self._injected_source
try:
return _JtopSource()
except TelemetryUnavailableError:
pass
try:
return _PynvmlSource()
except TelemetryUnavailableError as exc:
raise TelemetryUnavailableError(
"thermal publisher: neither jtop nor pynvml available; cannot "
"bind a telemetry source"
) from exc
def _loop(self) -> None:
while not self._stop_event.is_set():
self._poll_once()
if self._stop_event.wait(timeout=self._period_ns / 1_000_000_000):
break
def _poll_once(self, *, initial: bool = False) -> None:
"""One poll cycle: read source, detect transition, update snapshot.
Public-by-name-with-underscore so the AC-2..AC-4 tests can call
it deterministically without spinning up the background thread.
"""
source = self._source
if source is None:
return
now_ns = self._clock.monotonic_ns()
try:
reading = source.read()
except TelemetryUnavailableError as exc:
self._atomic_snapshot = _default_safe_snapshot(now_ns)
self._maybe_warn_unavailable(str(exc))
self._last_throttle = False
return
fresh = ThermalState(
cpu_temp_c=reading.cpu_temp_c,
gpu_temp_c=reading.gpu_temp_c,
thermal_throttle_active=reading.thermal_throttle_active,
measured_clock_mhz=reading.measured_clock_mhz,
measured_at_ns=now_ns,
is_telemetry_available=True,
)
previous = self._last_throttle
self._atomic_snapshot = fresh
self._last_throttle = fresh.thermal_throttle_active
if initial:
return
if previous != fresh.thermal_throttle_active:
self._emit_transition(previous=previous, fresh=fresh)
def _maybe_warn_unavailable(self, reason: str) -> None:
now_ns = self._clock.monotonic_ns()
if now_ns - self._last_unavailable_warn_ns < _WARN_RATE_LIMIT_NS:
return
self._last_unavailable_warn_ns = now_ns
self._logger.warning(
"thermal telemetry unavailable",
extra={
"kind": _UNAVAILABLE_KIND,
"kv": {"reason": reason},
},
)
def _emit_transition(
self, *, previous: bool, fresh: ThermalState
) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_now(),
producer_id=_PRODUCER_ID,
kind=_TRANSITION_KIND,
payload={
"previous_state": previous,
"new_state": fresh.thermal_throttle_active,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
"measured_clock_mhz": fresh.measured_clock_mhz,
"measured_at_ns": fresh.measured_at_ns,
},
)
self._fdr.enqueue(record)
if fresh.thermal_throttle_active:
self._logger.warning(
"thermal throttle ENTRY",
extra={
"kind": _TRANSITION_KIND,
"kv": {
"previous_state": previous,
"new_state": True,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
},
},
)
else:
self._logger.info(
"thermal throttle exit",
extra={
"kind": _TRANSITION_KIND,
"kv": {
"previous_state": previous,
"new_state": False,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
},
},
)
def _default_safe_snapshot(measured_at_ns: int) -> ThermalState:
return ThermalState(
cpu_temp_c=None,
gpu_temp_c=None,
thermal_throttle_active=False,
measured_clock_mhz=None,
measured_at_ns=measured_at_ns,
is_telemetry_available=False,
)
def _iso_ts_now() -> str:
"""RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
class _JtopSource:
"""``jtop`` (jetson-stats) thermal source — Tier-2 production path.
Constructor raises :class:`TelemetryUnavailableError` when ``jtop``
is not importable so the publisher can fall back to pynvml.
"""
__slots__ = ("_jtop",)
def __init__(self) -> None:
try:
from jtop import jtop
except ImportError as exc:
raise TelemetryUnavailableError(
"jtop not installed"
) from exc
self._jtop = jtop()
try:
self._jtop.start()
except Exception as exc:
raise TelemetryUnavailableError(
f"jtop start failed: {exc}"
) from exc
def read(self) -> ThermalReading:
try:
stats = self._jtop.stats
temps = stats.get("Temp", {}) or {}
gpu_t = temps.get("GPU")
cpu_t = temps.get("CPU")
throttle = bool(stats.get("THROTTLE", 0))
clock_mhz = stats.get("GPU0_FREQ")
return ThermalReading(
cpu_temp_c=float(cpu_t) if cpu_t is not None else None,
gpu_temp_c=float(gpu_t) if gpu_t is not None else None,
thermal_throttle_active=throttle,
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
)
except Exception as exc:
raise TelemetryUnavailableError(
f"jtop read failed: {exc}"
) from exc
def close(self) -> None:
try:
self._jtop.close()
except Exception as exc:
get_logger("c7_inference.thermal").warning(
"jtop close raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"error": str(exc)},
},
)
class _PynvmlSource:
"""``pynvml`` NVML thermal source — Tier-2 production fallback."""
__slots__ = ("_handle", "_pynvml")
def __init__(self) -> None:
try:
import pynvml
except ImportError as exc:
raise TelemetryUnavailableError(
"pynvml not installed"
) from exc
try:
pynvml.nvmlInit()
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
except pynvml.NVMLError as exc:
raise TelemetryUnavailableError(
f"pynvml init failed: {exc}"
) from exc
self._pynvml = pynvml
def read(self) -> ThermalReading:
try:
gpu_t = self._pynvml.nvmlDeviceGetTemperature(
self._handle, self._pynvml.NVML_TEMPERATURE_GPU
)
throttle_mask = self._pynvml.nvmlDeviceGetCurrentClocksThrottleReasons(
self._handle
)
throttle_active = (
throttle_mask != 0
and throttle_mask != self._pynvml.nvmlClocksThrottleReasonNone
)
try:
clock_mhz = self._pynvml.nvmlDeviceGetClockInfo(
self._handle, self._pynvml.NVML_CLOCK_GRAPHICS
)
except self._pynvml.NVMLError:
clock_mhz = None
return ThermalReading(
cpu_temp_c=None, # NVML reports GPU only; CPU temp needs jtop / lm-sensors
gpu_temp_c=float(gpu_t),
thermal_throttle_active=bool(throttle_active),
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
)
except self._pynvml.NVMLError as exc:
raise TelemetryUnavailableError(
f"NVML read failed: {exc}"
) from exc
def close(self) -> None:
try:
self._pynvml.nvmlShutdown()
except self._pynvml.NVMLError as exc:
get_logger("c7_inference.thermal").warning(
"NVML shutdown raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"error": str(exc)},
},
)
@@ -101,6 +101,20 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = {
"threshold_m",
}
),
# AZ-302 / E-C7: emitted on every thermal-throttle transition (False->True or
# True->False). One record per flip; steady-state polls emit nothing on this
# kind. `previous_state` and `new_state` are the throttle booleans pre/post
# transition; temperatures and clock are taken from the same poll cycle.
"c7.thermal_transition": frozenset(
{
"previous_state",
"new_state",
"gpu_temp_c",
"cpu_temp_c",
"measured_clock_mhz",
"measured_at_ns",
}
),
}
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())