[AZ-302] C7 ThermalStatePublisher — jtop/NVML 1 Hz background poller

Implements AZ-297 InferenceRuntime's thermal_state() side: a singleton
background-thread publisher that polls jtop (preferred) or pynvml
(fallback) at config.thermal_poll_hz, stores an atomic ThermalState
snapshot, and emits c7.thermal_transition FDR records on every throttle
flip with a WARN log on entry and an INFO log on exit. Default-safe on
TelemetryUnavailableError per Invariant I-6 with a 1-Hz rate-limited
WARN.

Sources return a raw ThermalReading; the publisher stamps measured_at_ns
via its injected Clock so _JtopSource / _PynvmlSource stay clean of
direct time.* calls (Invariant 2). _poll_once is the deterministic test
seam — start() spawns the production thread.

- c7.thermal_transition registered in fdr_client.records KNOWN_PAYLOAD_KEYS
- [telemetry] optional dep group (jetson-stats, pynvml) added to pyproject
- 14 unit tests (AC-1..AC-6, AC-8, NFR-default-safe, structural)
  green; AC-7 / AC-1 microbench / NFR-perf-poll Tier-2 deferred
- full unit suite: 1140 passed, 11 expected Tier-2/CUDA skips

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 10:33:37 +03:00
parent 59f56c032f
commit 49a67f770d
9 changed files with 1175 additions and 2 deletions
@@ -175,3 +175,26 @@ Then the record matches `_docs/02_document/contracts/shared_fdr_client/fdr_recor
- **Production code that must exist**: real `ThermalStatePublisher` class with real background thread, real `jtop` / `pynvml` poll, real lock-free `_atomic_snapshot` reference swap, real FDR record emission via the injected `FdrClient`.
- **Allowed external stubs**: tests MAY substitute a `FakeJtopSource` and a `FakeFdrClient` (AC-2..AC-8); production wiring uses real `jtop` + real AZ-273 `FdrClient`.
- **Unacceptable substitutes**: a polling loop that uses `time.sleep` without a real `Clock` injection (would break test determinism); a snapshot field guarded by a `threading.Lock` on the read path (would violate the wait-free read requirement under F3 hot path); a "warn but always default-safe" mode that never raises `TelemetryUnavailableError` from `start()` (would hide misconfigured deployments — exactly the failure mode AC-5 prevents).
## Implementation Notes (2026-05-12, batch 26)
Four task-spec → as-built deltas:
1. **`ThermalReading` intermediate DTO** — spec said source `read()` returns a `ThermalState`. As-built, sources return a `ThermalReading` (no `measured_at_ns`, no `is_telemetry_available`) and the publisher stamps both fields from its injected `Clock`. This keeps `_JtopSource` / `_PynvmlSource` from calling `time.monotonic_ns()` directly, which `tests/_meta/test_no_direct_time_in_components.py` (Invariant 2) forbids in any `src/gps_denied_onboard/components/**` file. Spec already required a `Clock`-injectable publisher; this delta extends the contract one node deeper into the source classes.
2. **Telemetry backends are an optional pyproject group** — spec said "this task introduces no new third-party dependencies — `jtop` and `pynvml` are both already pinned by the description.md key dependencies table" but neither was actually in `pyproject.toml`. Added a `[telemetry]` optional extra (`jetson-stats>=4.2`, `pynvml>=11.5`) so Tier-1 installs (`.[dev]`) stay slim and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them in. Both backends remain runtime-optional: the publisher discovers them at `start()` time and raises `TelemetryUnavailableError` only when both are absent.
3. **`_poll_once` is the deterministic test seam — NOT `start()`** — the AC-1..AC-4 / AC-8 tests script the source + drive the clock under a `_ManualClock`; `start()` spawns the background polling thread which would race those deterministic calls (observed in first test iteration). Tests bypass `start()` via a `_prime_without_thread()` helper that runs the source-selection + initial-poll path without spawning the thread. `start()` + `stop()` are still exercised by AC-5 (cold-start fallback), AC-6 (idempotency), and one integration test that proves the background thread actually polls.
4. **`c7.thermal_transition` registered in the FDR schema** — added the kind to `KNOWN_PAYLOAD_KEYS` in `src/gps_denied_onboard/fdr_client/records.py` with the six payload fields the spec names (`previous_state`, `new_state`, `gpu_temp_c`, `cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`). The AZ-272 fixture (`tests/unit/test_az272_fdr_record_schema.py::_kind_payload`) was extended with a sample payload so the every-known-kind roundtrip test passes for the new kind.
### As-built file map
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py``ThermalStatePublisher`, `ThermalSource` Protocol, `ThermalReading` intermediate DTO, `_JtopSource` (Tier-2 jtop binding), `_PynvmlSource` (Tier-2 NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
- `src/gps_denied_onboard/components/c7_inference/__init__.py` — re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
- `src/gps_denied_onboard/fdr_client/records.py``c7.thermal_transition` added to `KNOWN_PAYLOAD_KEYS`.
- `pyproject.toml` — new `[telemetry]` optional dep group (`jetson-stats`, `pynvml`).
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 tests (AC-1 x2, AC-2, AC-3, AC-4, AC-5 x2, AC-6 x2, background-thread integration, AC-8, NFR-default-safe, Protocol structural, Invariant I-6).
- `tests/unit/test_az272_fdr_record_schema.py``_kind_payload` extended with the new kind.
AC-7 (F3-hot-path non-interference) and the wait-free p99 microbench in AC-1 are Tier-2 perf concerns; the Tier-2 validation task will cover them on real Jetson hardware.
@@ -0,0 +1,141 @@
# Batch 26 / Cycle 1 — Implementation Report
**Date**: 2026-05-12
**Tasks**: AZ-302 (C7 ThermalStatePublisher — jtop/NVML 1 Hz background)
**Story points landed**: 3
**Status**: complete (AZ-302 → In Testing)
## Scope summary
Single-task batch, continuing the 1-task cadence the user reaffirmed
when picking option A after batches 24 and 25. AZ-302 was the deferred
3-pointer surfaced by the batch-25 narrow-down (background thread,
two telemetry backends, FDR record kind, log rate-limiting). AZ-304
(C6 Postgres schema, 2pt) remains queued for batch 27.
## Files added / modified
### New
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py`
`ThermalStatePublisher` (start/stop/read/is_running, `_poll_once`
test seam, atomic snapshot, source selection), `ThermalSource`
runtime-checkable Protocol, `ThermalReading` intermediate DTO,
`_JtopSource` (jtop binding, Tier-2 production path), `_PynvmlSource`
(NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 deterministic
tests (AC-1..AC-6, AC-8, NFR-default-safe, Protocol structural,
Invariant I-6) plus one real-thread integration test that proves the
background poller actually polls and emits transitions.
### Modified
- `src/gps_denied_onboard/components/c7_inference/__init__.py`
re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
- `src/gps_denied_onboard/fdr_client/records.py`
`c7.thermal_transition` registered in `KNOWN_PAYLOAD_KEYS` with six
documented payload keys (`previous_state`, `new_state`, `gpu_temp_c`,
`cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`).
- `pyproject.toml` — new `[telemetry]` optional dep group
(`jetson-stats>=4.2`, `pynvml>=11.5`). Tier-1 (`.[dev]`) install stays
slim; Tier-2 Jetson install adds `.[telemetry]`.
- `tests/unit/test_az272_fdr_record_schema.py``_kind_payload`
fixture extended with a sample `c7.thermal_transition` payload so
the every-known-kind roundtrip test passes.
- `_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md` → moved to
`_docs/02_tasks/done/`; appended `## Implementation Notes (2026-05-12,
batch 26)` documenting the four task-spec → as-built deltas.
## Design decisions
1. **`ThermalReading` intermediate DTO instead of source-stamped
`ThermalState`**. The spec implied source `read()` returns a fully
stamped `ThermalState`. As-built, sources return a `ThermalReading`
(raw measurement only) and the publisher stamps `measured_at_ns` +
`is_telemetry_available` from its injected `Clock`. This keeps the
`_JtopSource` / `_PynvmlSource` modules from calling
`time.monotonic_ns()` directly, which `tests/_meta/
test_no_direct_time_in_components.py` (Invariant 2) forbids in any
`src/gps_denied_onboard/components/**` file. Documented in the spec's
as-built notes section.
2. **`_poll_once` is the deterministic test seam — not `start()`**. The
AC-1..AC-4 / AC-8 tests script the source and drive a `_ManualClock`
forward; the production `start()` would spawn a background thread
that races those deterministic calls (observed in the first test
iteration). Tests bypass `start()` via a `_prime_without_thread()`
helper that exercises the source-selection + initial-poll path
without the thread. `start()` / `stop()` remain covered by AC-5
(cold-start fallback), AC-6 (idempotency), and a real-thread
integration test at 200 Hz that proves the background loop polls
the source and emits transition records.
3. **Telemetry backends moved to a `[telemetry]` optional dep group**.
The spec claimed both `jtop` and `pynvml` were already pinned by
description.md, but neither was in `pyproject.toml`. They are now a
`[telemetry]` optional extra so Tier-1 installs (`.[dev]`) stay slim
and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them
in. Both backends remain runtime-optional: the publisher discovers
them at `start()` time and only raises `TelemetryUnavailableError`
when both are absent.
4. **WARN-on-unavailable rate-limiting uses the injected `Clock`**, not
`time.monotonic_ns()`. The 1-second rate-limit window is enforced by
`now_ns - last_warn_ns >= 1_000_000_000` against `self._clock.
monotonic_ns()`, so replay-deterministic tests can simulate rate
limit windows by advancing the manual clock.
## AC coverage
| AC | Test name | Status |
|----|-----------|--------|
| AC-1 | `test_ac1_read_returns_latest_snapshot` + `test_ac1_read_is_wait_free_object_identity` | passing |
| AC-2 | `test_ac2_throttle_entry_emits_fdr_and_warn` | passing |
| AC-3 | `test_ac3_throttle_exit_emits_fdr_and_info` | passing |
| AC-4 | `test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits` | passing |
| AC-5 | `test_ac5_start_raises_when_no_source_available` + `test_ac5_jtop_falls_back_to_pynvml` | passing |
| AC-6 | `test_ac6_double_start_is_noop` + `test_ac6_double_stop_is_noop` | passing |
| AC-7 | F3 hot-path non-interference perf | Tier-2 deferred |
| AC-8 | `test_ac8_fdr_record_round_trips_through_wire_format` | passing |
| NFR-perf-poll | microbench poll p99 ≤ 100 ms | Tier-2 deferred |
| NFR-perf-read p99 ≤ 1 µs | wait-free read microbench | Tier-2 deferred |
| NFR-default-safe | `test_nfr_default_safe_on_first_poll_failure` | passing |
| Structural | `test_scripted_source_satisfies_protocol` | passing |
| Invariant I-6 | `test_default_safe_snapshot_invariant_i6` | passing |
AC-7 and the two Tier-2 microbenches need real Jetson hardware to be
meaningful; they roll into the Tier-2 validation task with AZ-301's
AC-8 (`read_host_tuple` on real NVML/L4T).
## Test run
`./.venv/bin/python -m pytest tests/unit tests/_meta -q` → **1140
passed, 11 skipped (Tier-2 / CUDA / cmake / actionlint)**, no failures.
Mypy strict on the three production-source files we touched: clean.
Ruff on the same set: clean.
## Self-review
- Production code (`thermal_publisher.py`, `__init__.py`,
`records.py`, `pyproject.toml`): no use of `time.*` in component
modules (Invariant 2 meta-test passes); FDR kind documented in the
schema-keys table; telemetry backends are optional; the publisher
raises `TelemetryUnavailableError` on cold-start with no source per
AC-5; idempotent start/stop per AC-6; rate-limited WARN per AC-4.
- Tests: every AC has at least one named assertion; the AC-1 wait-free
microbench and AC-7 hot-path bench are Tier-2 deferred (documented in
the report + the spec as-built notes).
- Lint / type: ruff + mypy strict clean on the modified set.
- Docs: spec moved to `done/` with the as-built delta notes.
## Known gaps
- AZ-302 AC-7 + AC-1 microbench + NFR-perf-poll → require real Jetson
thermal source under load; rolled into the Tier-2 validation task.
- AZ-304 (C6 Postgres schema) deferred to batch 27 — testcontainers
setup + Alembic baseline are independently meaningful.
- `_JtopSource.read()` catches a bare `Exception` because jtop's
internal exception types are not stable across jtop versions; the
exception is always rewrapped as `TelemetryUnavailableError` (never
swallowed silently) so this does not violate the "no silent error
suppression" coding rule.
+2 -2
View File
@@ -6,9 +6,9 @@ step: 7
name: Implement
status: in_progress
sub_step:
phase: 13
phase: 7
name: archive-and-loop
detail: "batch 25/cycle1 complete: AZ-301 → In Testing, archived to done/. AZ-302 + AZ-304 deferred to batches 26 / 27 to keep the 1-task cadence (AZ-302 = 3pt with background threading + jtop/pynvml; AZ-304 = 2pt with testcontainers Postgres + Alembic). 14 unconditional AC tests + 1 Tier-2 AC-8 skip. Suite: 1134 passed / 11 skipped. 17 tasks total ready overall (AZ-300 + AZ-301 removed)."
detail: "batch 26/cycle1 complete: AZ-302 (ThermalStatePublisher, 3pt) implemented and tests green (1140 passed, 11 Tier-2/CUDA skipped). Added ThermalStatePublisher + ThermalSource Protocol + ThermalReading DTO + _JtopSource/_PynvmlSource backends; registered c7.thermal_transition in FDR schema; new [telemetry] optional dep group in pyproject.toml. Spec moved to done/ with as-built notes; cycle report at _docs/03_implementation/batch_26_cycle1_report.md. Ready to push commit + compute batch 27 (AZ-304 C6 Postgres schema next, 2pt) plus the 15 other ready tasks."
retry_count: 0
cycle: 1
tracker: jira
+8
View File
@@ -64,6 +64,14 @@ inference = [
"onnxruntime>=1.17",
# tensorrt is installed out-of-band on Jetson — not a pip dep
]
# AZ-302: thermal telemetry backends used by C7's ThermalStatePublisher.
# Both are Jetson / NVIDIA-host-only and not import-required for Tier-1;
# the publisher selects whichever is importable at start() time and
# raises TelemetryUnavailableError if neither is present.
telemetry = [
"jetson-stats>=4.2",
"pynvml>=11.5",
]
indexing = [
"faiss-cpu>=1.7",
]
@@ -56,6 +56,11 @@ from gps_denied_onboard.components.c7_inference.manifest import (
ManifestReader,
ManifestReaderProtocol,
)
from gps_denied_onboard.components.c7_inference.thermal_publisher import (
ThermalReading,
ThermalSource,
ThermalStatePublisher,
)
from gps_denied_onboard.config.schema import register_component_block
register_component_block("c7_inference", C7InferenceConfig)
@@ -84,7 +89,10 @@ __all__ = [
"PrecisionMode",
"RuntimeError",
"TelemetryUnavailableError",
"ThermalReading",
"ThermalSource",
"ThermalState",
"ThermalStatePublisher",
"default_registry",
"register_architecture",
]
@@ -0,0 +1,464 @@
"""``ThermalStatePublisher`` — 1 Hz background thermal poller (AZ-302).
Owns the single C7 thermal-telemetry source for the companion
process. Reads ``jtop`` (preferred) or ``pynvml`` (fallback) once per
poll period, builds a fresh :class:`ThermalState`, and stores it
atomically in ``_atomic_snapshot``. :meth:`read` is wait-free —
returns the current snapshot via a single object-reference load
(atomic under the Python GIL on CPython 3.10 / 3.11; documented in
the impl notes for the AZ-302 task spec).
Throttle-state transitions emit:
- one ``kind="c7.thermal_transition"`` FDR record per flip,
- one WARN log on throttle entry (False → True),
- one INFO log on throttle exit (True → False).
Telemetry unavailability is the default-safe path (Invariant I-6):
on :class:`TelemetryUnavailableError` from the source the publisher
emits a rate-limited (≤ 1/sec) WARN log and stores a snapshot with
``is_telemetry_available=False, thermal_throttle_active=False``;
consumers see truthful "telemetry off" instead of a lie about
throttle.
Composition contract:
publisher = ThermalStatePublisher(config, fdr_client)
publisher.start()
...
# consumers
snapshot = publisher.read()
...
publisher.stop()
``start()`` is what selects the source (``jtop`` → ``pynvml``);
if both backends are unavailable on the host, ``start()`` raises
:class:`TelemetryUnavailableError` (AC-5).
AC mapping (see ``_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md``):
- AC-1 : :meth:`read` returns the current ``_atomic_snapshot``;
wait-free perf is Tier-2 microbench.
- AC-2 / AC-3 : :meth:`_poll_once` detects throttle transitions and
emits FDR + log.
- AC-4 : repeated :class:`TelemetryUnavailableError` keeps the
default-safe snapshot + rate-limits WARN logs.
- AC-5 : :meth:`start` raises when no source is available.
- AC-6 : :meth:`start` / :meth:`stop` are idempotent.
- AC-7 / NFR-perf-poll : Tier-2 hot-path benchmark.
- AC-8 : FDR ``c7.thermal_transition`` payload matches the schema.
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Protocol, runtime_checkable
from gps_denied_onboard._types.thermal import ThermalState
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c7_inference.errors import (
TelemetryUnavailableError,
)
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.fdr_client.client import FdrClient
__all__ = [
"ThermalReading",
"ThermalSource",
"ThermalStatePublisher",
]
@dataclass(frozen=True)
class ThermalReading:
"""Raw thermal reading produced by a :class:`ThermalSource`.
The publisher stamps :attr:`ThermalState.measured_at_ns` from its
injected :class:`Clock` so :class:`ThermalSource` impls never reach
into ``time.monotonic_ns`` directly (Invariant 2 of the replay
contract — see ``tests/_meta/test_no_direct_time_in_components.py``).
"""
cpu_temp_c: float | None
gpu_temp_c: float | None
thermal_throttle_active: bool
measured_clock_mhz: int | None
_PRODUCER_ID = "c7_inference.thermal"
_TRANSITION_KIND = "c7.thermal_transition"
_UNAVAILABLE_KIND = "c7.thermal.unavailable"
_WARN_RATE_LIMIT_NS = 1_000_000_000 # 1 second between WARN logs on unavailable
@runtime_checkable
class ThermalSource(Protocol):
"""Single-poll thermal source — :class:`ThermalStatePublisher`'s only collaborator.
``read()`` must produce a fresh :class:`ThermalReading` or raise
:class:`TelemetryUnavailableError`. The publisher catches the latter
and falls back to the default-safe snapshot. The source owns its
backend lifecycle (jtop context manager, pynvml init/shutdown).
"""
def read(self) -> ThermalReading: # pragma: no cover - structural
...
def close(self) -> None: # pragma: no cover - structural
...
class ThermalStatePublisher:
"""Singleton-by-convention 1 Hz thermal poller."""
def __init__(
self,
config: Config,
fdr_client: FdrClient,
*,
source: ThermalSource | None = None,
clock: Clock | None = None,
) -> None:
self._config = config
self._fdr = fdr_client
self._clock = clock if clock is not None else WallClock()
self._injected_source = source
self._source: ThermalSource | None = None
self._period_ns = int(
1_000_000_000 / config.components["c7_inference"].thermal_poll_hz
)
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._logger = get_logger("c7_inference.thermal")
self._atomic_snapshot: ThermalState = _default_safe_snapshot(
self._clock.monotonic_ns()
)
self._last_throttle: bool = False
self._last_unavailable_warn_ns: int = 0
@property
def producer_id(self) -> str:
return _PRODUCER_ID
def start(self) -> None:
"""Select the source, prime the snapshot, spawn the polling thread."""
with self._lock:
if self._thread is not None and self._thread.is_alive():
return
if self._source is None:
self._source = self._select_source()
self._stop_event.clear()
self._poll_once(initial=True)
thread = threading.Thread(
target=self._loop, name=f"{_PRODUCER_ID}.poll", daemon=True
)
self._thread = thread
thread.start()
self._logger.info(
"thermal publisher started",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {
"event": "start",
"period_ns": self._period_ns,
"source": type(self._source).__name__
if self._source is not None
else "None",
},
},
)
def stop(self) -> None:
"""Signal the polling thread, join it, close the source. Idempotent."""
with self._lock:
if self._thread is None:
return
self._stop_event.set()
thread = self._thread
self._thread = None
thread.join(timeout=2.0 * self._period_ns / 1_000_000_000)
source = self._source
if source is not None:
try:
source.close()
except Exception as exc:
self._logger.warning(
"thermal source close raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"event": "stop_close_error", "error": str(exc)},
},
)
self._source = None
self._logger.info(
"thermal publisher stopped",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"event": "stop"},
},
)
def is_running(self) -> bool:
thread = self._thread
return thread is not None and thread.is_alive()
def read(self) -> ThermalState:
"""Wait-free reader. Returns the current ``_atomic_snapshot``."""
return self._atomic_snapshot
def _select_source(self) -> ThermalSource:
if self._injected_source is not None:
return self._injected_source
try:
return _JtopSource()
except TelemetryUnavailableError:
pass
try:
return _PynvmlSource()
except TelemetryUnavailableError as exc:
raise TelemetryUnavailableError(
"thermal publisher: neither jtop nor pynvml available; cannot "
"bind a telemetry source"
) from exc
def _loop(self) -> None:
while not self._stop_event.is_set():
self._poll_once()
if self._stop_event.wait(timeout=self._period_ns / 1_000_000_000):
break
def _poll_once(self, *, initial: bool = False) -> None:
"""One poll cycle: read source, detect transition, update snapshot.
Public-by-name-with-underscore so the AC-2..AC-4 tests can call
it deterministically without spinning up the background thread.
"""
source = self._source
if source is None:
return
now_ns = self._clock.monotonic_ns()
try:
reading = source.read()
except TelemetryUnavailableError as exc:
self._atomic_snapshot = _default_safe_snapshot(now_ns)
self._maybe_warn_unavailable(str(exc))
self._last_throttle = False
return
fresh = ThermalState(
cpu_temp_c=reading.cpu_temp_c,
gpu_temp_c=reading.gpu_temp_c,
thermal_throttle_active=reading.thermal_throttle_active,
measured_clock_mhz=reading.measured_clock_mhz,
measured_at_ns=now_ns,
is_telemetry_available=True,
)
previous = self._last_throttle
self._atomic_snapshot = fresh
self._last_throttle = fresh.thermal_throttle_active
if initial:
return
if previous != fresh.thermal_throttle_active:
self._emit_transition(previous=previous, fresh=fresh)
def _maybe_warn_unavailable(self, reason: str) -> None:
now_ns = self._clock.monotonic_ns()
if now_ns - self._last_unavailable_warn_ns < _WARN_RATE_LIMIT_NS:
return
self._last_unavailable_warn_ns = now_ns
self._logger.warning(
"thermal telemetry unavailable",
extra={
"kind": _UNAVAILABLE_KIND,
"kv": {"reason": reason},
},
)
def _emit_transition(
self, *, previous: bool, fresh: ThermalState
) -> None:
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_ts_now(),
producer_id=_PRODUCER_ID,
kind=_TRANSITION_KIND,
payload={
"previous_state": previous,
"new_state": fresh.thermal_throttle_active,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
"measured_clock_mhz": fresh.measured_clock_mhz,
"measured_at_ns": fresh.measured_at_ns,
},
)
self._fdr.enqueue(record)
if fresh.thermal_throttle_active:
self._logger.warning(
"thermal throttle ENTRY",
extra={
"kind": _TRANSITION_KIND,
"kv": {
"previous_state": previous,
"new_state": True,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
},
},
)
else:
self._logger.info(
"thermal throttle exit",
extra={
"kind": _TRANSITION_KIND,
"kv": {
"previous_state": previous,
"new_state": False,
"gpu_temp_c": fresh.gpu_temp_c,
"cpu_temp_c": fresh.cpu_temp_c,
},
},
)
def _default_safe_snapshot(measured_at_ns: int) -> ThermalState:
return ThermalState(
cpu_temp_c=None,
gpu_temp_c=None,
thermal_throttle_active=False,
measured_clock_mhz=None,
measured_at_ns=measured_at_ns,
is_telemetry_available=False,
)
def _iso_ts_now() -> str:
"""RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
class _JtopSource:
"""``jtop`` (jetson-stats) thermal source — Tier-2 production path.
Constructor raises :class:`TelemetryUnavailableError` when ``jtop``
is not importable so the publisher can fall back to pynvml.
"""
__slots__ = ("_jtop",)
def __init__(self) -> None:
try:
from jtop import jtop
except ImportError as exc:
raise TelemetryUnavailableError(
"jtop not installed"
) from exc
self._jtop = jtop()
try:
self._jtop.start()
except Exception as exc:
raise TelemetryUnavailableError(
f"jtop start failed: {exc}"
) from exc
def read(self) -> ThermalReading:
try:
stats = self._jtop.stats
temps = stats.get("Temp", {}) or {}
gpu_t = temps.get("GPU")
cpu_t = temps.get("CPU")
throttle = bool(stats.get("THROTTLE", 0))
clock_mhz = stats.get("GPU0_FREQ")
return ThermalReading(
cpu_temp_c=float(cpu_t) if cpu_t is not None else None,
gpu_temp_c=float(gpu_t) if gpu_t is not None else None,
thermal_throttle_active=throttle,
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
)
except Exception as exc:
raise TelemetryUnavailableError(
f"jtop read failed: {exc}"
) from exc
def close(self) -> None:
try:
self._jtop.close()
except Exception as exc:
get_logger("c7_inference.thermal").warning(
"jtop close raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"error": str(exc)},
},
)
class _PynvmlSource:
"""``pynvml`` NVML thermal source — Tier-2 production fallback."""
__slots__ = ("_handle", "_pynvml")
def __init__(self) -> None:
try:
import pynvml
except ImportError as exc:
raise TelemetryUnavailableError(
"pynvml not installed"
) from exc
try:
pynvml.nvmlInit()
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
except pynvml.NVMLError as exc:
raise TelemetryUnavailableError(
f"pynvml init failed: {exc}"
) from exc
self._pynvml = pynvml
def read(self) -> ThermalReading:
try:
gpu_t = self._pynvml.nvmlDeviceGetTemperature(
self._handle, self._pynvml.NVML_TEMPERATURE_GPU
)
throttle_mask = self._pynvml.nvmlDeviceGetCurrentClocksThrottleReasons(
self._handle
)
throttle_active = (
throttle_mask != 0
and throttle_mask != self._pynvml.nvmlClocksThrottleReasonNone
)
try:
clock_mhz = self._pynvml.nvmlDeviceGetClockInfo(
self._handle, self._pynvml.NVML_CLOCK_GRAPHICS
)
except self._pynvml.NVMLError:
clock_mhz = None
return ThermalReading(
cpu_temp_c=None, # NVML reports GPU only; CPU temp needs jtop / lm-sensors
gpu_temp_c=float(gpu_t),
thermal_throttle_active=bool(throttle_active),
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
)
except self._pynvml.NVMLError as exc:
raise TelemetryUnavailableError(
f"NVML read failed: {exc}"
) from exc
def close(self) -> None:
try:
self._pynvml.nvmlShutdown()
except self._pynvml.NVMLError as exc:
get_logger("c7_inference.thermal").warning(
"NVML shutdown raised",
extra={
"kind": "c7.thermal.lifecycle",
"kv": {"error": str(exc)},
},
)
@@ -101,6 +101,20 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = {
"threshold_m",
}
),
# AZ-302 / E-C7: emitted on every thermal-throttle transition (False->True or
# True->False). One record per flip; steady-state polls emit nothing on this
# kind. `previous_state` and `new_state` are the throttle booleans pre/post
# transition; temperatures and clock are taken from the same poll cycle.
"c7.thermal_transition": frozenset(
{
"previous_state",
"new_state",
"gpu_temp_c",
"cpu_temp_c",
"measured_clock_mhz",
"measured_at_ns",
}
),
}
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())
@@ -0,0 +1,506 @@
"""AZ-302 — ``ThermalStatePublisher`` acceptance tests.
CPU-only and deterministic: every test drives the publisher through its
``_poll_once()`` seam with a manual clock + a scripted ``ThermalSource``;
no real ``jtop`` / ``pynvml`` / background thread is involved (those
paths live behind the source-selection fallback and are Tier-2 by
construction — see ``read_host_tuple`` analog comments in AZ-301).
The publisher's background thread is exercised only through the
idempotency tests (AC-6); the throttle-detection-latency contract
(AC-2 / AC-3) is verified deterministically on the ``_poll_once`` seam
because the wall-clock latency is dominated by the publisher's own
period (1 / ``thermal_poll_hz`` s), not by code under test.
"""
from __future__ import annotations
import time
from collections import deque
from typing import Final
import pytest
from gps_denied_onboard.components.c7_inference import (
C7InferenceConfig,
TelemetryUnavailableError,
ThermalReading,
ThermalSource,
ThermalStatePublisher,
)
from gps_denied_onboard.components.c7_inference import thermal_publisher as tp_mod
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
_PRODUCER_ID: Final[str] = "c7_inference.thermal"
_TRANSITION_KIND: Final[str] = "c7.thermal_transition"
class _ManualClock:
"""Test ``Clock`` that returns whatever ``advance_to`` set last.
``monotonic_ns`` and ``time_ns`` share the same backing value because
AZ-302 only ever reads ``monotonic_ns``; ``time_ns`` is unused.
"""
def __init__(self, *, start_ns: int = 1_000_000_000) -> None:
self._now_ns: int = start_ns
def monotonic_ns(self) -> int:
return self._now_ns
def time_ns(self) -> int:
return self._now_ns
def sleep_until_ns(self, target_ns: int) -> None: # pragma: no cover - unused
self._now_ns = max(self._now_ns, target_ns)
def advance_to(self, ns: int) -> None:
if ns < self._now_ns:
raise AssertionError("monotonic clock cannot go backwards")
self._now_ns = ns
def advance_by(self, delta_ns: int) -> None:
self.advance_to(self._now_ns + delta_ns)
class _ScriptedSource:
"""``ThermalSource`` whose ``read()`` plays back a scripted queue.
Each queue entry is either a :class:`ThermalReading` (returned) or a
:class:`TelemetryUnavailableError` instance (raised). When the queue
is drained, the last entry is repeated indefinitely so steady-state
polls do not need explicit padding.
"""
def __init__(self, script: list[ThermalReading | TelemetryUnavailableError]) -> None:
if not script:
raise ValueError("_ScriptedSource needs at least one entry")
self._queue: deque[ThermalReading | TelemetryUnavailableError] = deque(script)
self._last: ThermalReading | TelemetryUnavailableError = script[-1]
self.closed: bool = False
def read(self) -> ThermalReading:
if self._queue:
head = self._queue.popleft()
self._last = head
else:
head = self._last
if isinstance(head, TelemetryUnavailableError):
raise head
return head
def close(self) -> None:
self.closed = True
@pytest.fixture
def config() -> Config:
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=1.0))
@pytest.fixture
def clock() -> _ManualClock:
return _ManualClock()
@pytest.fixture
def fdr() -> FakeFdrSink:
return FakeFdrSink(producer_id=_PRODUCER_ID)
@pytest.fixture
def fast_config() -> Config:
"""High-frequency config so the AC-6 background-thread tests finish quickly."""
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=200.0))
def _make_publisher(
*,
config: Config,
fdr: FakeFdrSink,
clock: _ManualClock,
source: ThermalSource | None,
) -> ThermalStatePublisher:
return ThermalStatePublisher(config, fdr, source=source, clock=clock)
def _prime_without_thread(
publisher: ThermalStatePublisher, source: ThermalSource
) -> None:
"""Run the source-selection + initial-poll path of ``start()`` without
spawning the background thread.
The deterministic AC-1..AC-4 / AC-8 tests drive :meth:`_poll_once`
directly under a manual clock; the production thread would race
those calls and consume scripted source entries non-deterministically
(one race observed in the first iteration of this test module).
"""
publisher._source = source
publisher._stop_event.clear()
publisher._poll_once(initial=True)
def _cool_reading(*, throttle: bool = False) -> ThermalReading:
return ThermalReading(
cpu_temp_c=45.0,
gpu_temp_c=55.0,
thermal_throttle_active=throttle,
measured_clock_mhz=1300,
)
def _hot_reading() -> ThermalReading:
return ThermalReading(
cpu_temp_c=85.0,
gpu_temp_c=92.0,
thermal_throttle_active=True,
measured_clock_mhz=600,
)
# ----------------------------------------------------------------------
# AC-1: read() returns the most recent _atomic_snapshot.
def test_ac1_read_returns_latest_snapshot(
config: Config, fdr: FakeFdrSink, clock: _ManualClock
) -> None:
source = _ScriptedSource([_cool_reading(), _hot_reading()])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
snapshot_after_initial = publisher.read()
clock.advance_by(1_000_000_000)
publisher._poll_once()
snapshot_after_transition = publisher.read()
assert snapshot_after_initial.thermal_throttle_active is False
assert snapshot_after_initial.cpu_temp_c == pytest.approx(45.0)
assert snapshot_after_initial.measured_at_ns == 1_000_000_000
assert snapshot_after_transition.thermal_throttle_active is True
assert snapshot_after_transition.gpu_temp_c == pytest.approx(92.0)
assert snapshot_after_transition.is_telemetry_available is True
assert snapshot_after_transition.measured_at_ns == 2_000_000_000
def test_ac1_read_is_wait_free_object_identity(
config: Config, fdr: FakeFdrSink, clock: _ManualClock
) -> None:
"""Two consecutive reads with no intervening poll return the same object."""
source = _ScriptedSource([_cool_reading()])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
first = publisher.read()
second = publisher.read()
assert first is second
# ----------------------------------------------------------------------
# AC-2: throttle entry — FDR + WARN log + state.
def test_ac2_throttle_entry_emits_fdr_and_warn(
config: Config,
fdr: FakeFdrSink,
clock: _ManualClock,
caplog: pytest.LogCaptureFixture,
) -> None:
source = _ScriptedSource([_cool_reading(), _hot_reading()])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
assert fdr.records == []
clock.advance_by(1_000_000_000)
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
publisher._poll_once()
assert len(fdr.records) == 1
record = fdr.records[0]
assert record.kind == _TRANSITION_KIND
assert record.producer_id == _PRODUCER_ID
assert record.payload["previous_state"] is False
assert record.payload["new_state"] is True
assert record.payload["gpu_temp_c"] == pytest.approx(92.0)
assert record.payload["cpu_temp_c"] == pytest.approx(85.0)
assert record.payload["measured_clock_mhz"] == 600
assert record.payload["measured_at_ns"] == 2_000_000_000
entry_warnings = [
r for r in caplog.records
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle ENTRY"
]
assert len(entry_warnings) == 1
# ----------------------------------------------------------------------
# AC-3: throttle exit — FDR + INFO log.
def test_ac3_throttle_exit_emits_fdr_and_info(
config: Config,
fdr: FakeFdrSink,
clock: _ManualClock,
caplog: pytest.LogCaptureFixture,
) -> None:
source = _ScriptedSource([_hot_reading(), _cool_reading()])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
clock.advance_by(1_000_000_000)
with caplog.at_level("INFO", logger="c7_inference.thermal"):
publisher._poll_once()
assert len(fdr.records) == 1
record = fdr.records[0]
assert record.kind == _TRANSITION_KIND
assert record.payload["previous_state"] is True
assert record.payload["new_state"] is False
exit_infos = [
r for r in caplog.records
if r.levelname == "INFO" and r.getMessage() == "thermal throttle exit"
]
assert len(exit_infos) == 1
exit_warnings = [
r for r in caplog.records
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle exit"
]
assert exit_warnings == []
# ----------------------------------------------------------------------
# AC-4: telemetry unavailable on every poll — default-safe + rate-limited WARN.
def test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits(
config: Config,
fdr: FakeFdrSink,
clock: _ManualClock,
caplog: pytest.LogCaptureFixture,
) -> None:
err = TelemetryUnavailableError("jtop hung")
source = _ScriptedSource([err, err, err, err, err, err])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
_prime_without_thread(publisher, source)
# Drive five more polls across a 2-second simulated window.
clock.advance_by(500_000_000)
publisher._poll_once()
clock.advance_by(500_000_000)
publisher._poll_once()
clock.advance_by(500_000_000)
publisher._poll_once()
clock.advance_by(500_000_000)
publisher._poll_once()
clock.advance_by(500_000_000)
publisher._poll_once()
snapshot = publisher.read()
assert snapshot.is_telemetry_available is False
assert snapshot.thermal_throttle_active is False
assert snapshot.gpu_temp_c is None
assert snapshot.cpu_temp_c is None
unavailable_warnings = [
r for r in caplog.records
if r.levelname == "WARNING" and r.getMessage() == "thermal telemetry unavailable"
]
# Initial poll (start) + 5 polls x 500 ms each = 2.5 s window; rate-limit
# at 1 / sec ⇒ at most 3 WARNs (one immediately + one each second).
assert 1 <= len(unavailable_warnings) <= 3
assert fdr.records == []
# ----------------------------------------------------------------------
# AC-5: cold-start with no source raises.
def test_ac5_start_raises_when_no_source_available(
config: Config,
fdr: FakeFdrSink,
clock: _ManualClock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
def _raise(*_args: object, **_kw: object) -> None:
raise TelemetryUnavailableError("backend missing")
monkeypatch.setattr(tp_mod, "_JtopSource", _raise)
monkeypatch.setattr(tp_mod, "_PynvmlSource", _raise)
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=None)
with pytest.raises(TelemetryUnavailableError):
publisher.start()
assert publisher.is_running() is False
def test_ac5_jtop_falls_back_to_pynvml(
fast_config: Config,
fdr: FakeFdrSink,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When ``jtop`` fails, ``start()`` picks the ``pynvml`` source and runs."""
pynvml_source = _ScriptedSource(
[_cool_reading()] + [_cool_reading()] * 16
)
def _raise_jtop(*_args: object, **_kw: object) -> None:
raise TelemetryUnavailableError("no jtop")
def _make_pynvml(*_args: object, **_kw: object) -> _ScriptedSource:
return pynvml_source
monkeypatch.setattr(tp_mod, "_JtopSource", _raise_jtop)
monkeypatch.setattr(tp_mod, "_PynvmlSource", _make_pynvml)
publisher = ThermalStatePublisher(fast_config, fdr)
publisher.start()
assert publisher.is_running() is True
snapshot = publisher.read()
publisher.stop()
assert snapshot.is_telemetry_available is True
assert pynvml_source.closed is True
# ----------------------------------------------------------------------
# AC-6: start/stop is idempotent.
def test_ac6_double_start_is_noop(
fast_config: Config, fdr: FakeFdrSink
) -> None:
source = _ScriptedSource([_cool_reading()])
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
publisher.start()
first_thread = publisher._thread
publisher.start()
second_thread = publisher._thread
assert publisher.is_running() is True
assert first_thread is second_thread
publisher.stop()
def test_ac6_double_stop_is_noop(
fast_config: Config, fdr: FakeFdrSink
) -> None:
source = _ScriptedSource([_cool_reading()])
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
publisher.start()
publisher.stop()
publisher.stop()
assert publisher.is_running() is False
def test_background_thread_polls(
fast_config: Config, fdr: FakeFdrSink
) -> None:
"""End-to-end: real thread, 200 Hz poll rate, source flips on second read."""
readings: deque[ThermalReading] = deque(
[_cool_reading()] + [_hot_reading()] * 64
)
class _Live:
closed = False
def read(self) -> ThermalReading:
if readings:
return readings.popleft()
return _hot_reading()
def close(self) -> None:
self._closed_flag = True
source = _Live()
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
publisher.start()
deadline = time.monotonic() + 1.0
while publisher.read().thermal_throttle_active is False and time.monotonic() < deadline:
time.sleep(0.005)
publisher.stop()
assert publisher.read().thermal_throttle_active is True
assert any(r.kind == _TRANSITION_KIND for r in fdr.records)
# ----------------------------------------------------------------------
# AC-8: FDR record envelope matches the schema (round-trip via serialise/parse).
def test_ac8_fdr_record_round_trips_through_wire_format(
config: Config, fdr: FakeFdrSink, clock: _ManualClock
) -> None:
from gps_denied_onboard.fdr_client.records import KNOWN_PAYLOAD_KEYS, parse, serialise
source = _ScriptedSource([_cool_reading(), _hot_reading()])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
clock.advance_by(1_000_000_000)
publisher._poll_once()
record = fdr.records[0]
encoded = serialise(record)
decoded = parse(encoded)
assert decoded.kind == _TRANSITION_KIND
assert decoded.producer_id == _PRODUCER_ID
expected_keys = KNOWN_PAYLOAD_KEYS[_TRANSITION_KIND]
assert set(decoded.payload.keys()) - {"extra"} <= expected_keys
assert decoded.payload == record.payload
# ----------------------------------------------------------------------
# NFR-reliability-default-safe: first poll fails → snapshot is default-safe.
def test_nfr_default_safe_on_first_poll_failure(
config: Config, fdr: FakeFdrSink, clock: _ManualClock
) -> None:
err = TelemetryUnavailableError("source dead on arrival")
source = _ScriptedSource([err])
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
_prime_without_thread(publisher, source)
snapshot = publisher.read()
assert snapshot.is_telemetry_available is False
assert snapshot.thermal_throttle_active is False
assert snapshot.gpu_temp_c is None
assert snapshot.cpu_temp_c is None
# ----------------------------------------------------------------------
# Structural: ThermalSource is a runtime-checkable Protocol; the scripted
# source satisfies it.
def test_scripted_source_satisfies_protocol() -> None:
source = _ScriptedSource([_cool_reading()])
assert isinstance(source, ThermalSource)
def test_default_safe_snapshot_invariant_i6() -> None:
"""``is_telemetry_available == False`` ⇒ ``thermal_throttle_active == False``."""
snap = tp_mod._default_safe_snapshot(measured_at_ns=42)
assert snap.is_telemetry_available is False
assert snap.thermal_throttle_active is False
assert snap.measured_at_ns == 42
@@ -131,6 +131,15 @@ def _kind_payload(kind: str) -> dict[str, object]:
"strategy_label": "okvis2",
"frame_id": "frame-0001",
}
if kind == "c7.thermal_transition":
return {
"previous_state": False,
"new_state": True,
"gpu_temp_c": 92.0,
"cpu_temp_c": 85.0,
"measured_clock_mhz": 600,
"measured_at_ns": 1_700_000_000_000_000_000,
}
raise AssertionError(f"unhandled kind in fixture: {kind!r}")