mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 19:41:13 +00:00
[AZ-302] C7 ThermalStatePublisher — jtop/NVML 1 Hz background poller
Implements AZ-297 InferenceRuntime's thermal_state() side: a singleton background-thread publisher that polls jtop (preferred) or pynvml (fallback) at config.thermal_poll_hz, stores an atomic ThermalState snapshot, and emits c7.thermal_transition FDR records on every throttle flip with a WARN log on entry and an INFO log on exit. Default-safe on TelemetryUnavailableError per Invariant I-6 with a 1-Hz rate-limited WARN. Sources return a raw ThermalReading; the publisher stamps measured_at_ns via its injected Clock so _JtopSource / _PynvmlSource stay clean of direct time.* calls (Invariant 2). _poll_once is the deterministic test seam — start() spawns the production thread. - c7.thermal_transition registered in fdr_client.records KNOWN_PAYLOAD_KEYS - [telemetry] optional dep group (jetson-stats, pynvml) added to pyproject - 14 unit tests (AC-1..AC-6, AC-8, NFR-default-safe, structural) green; AC-7 / AC-1 microbench / NFR-perf-poll Tier-2 deferred - full unit suite: 1140 passed, 11 expected Tier-2/CUDA skips Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+23
@@ -175,3 +175,26 @@ Then the record matches `_docs/02_document/contracts/shared_fdr_client/fdr_recor
|
||||
- **Production code that must exist**: real `ThermalStatePublisher` class with real background thread, real `jtop` / `pynvml` poll, real lock-free `_atomic_snapshot` reference swap, real FDR record emission via the injected `FdrClient`.
|
||||
- **Allowed external stubs**: tests MAY substitute a `FakeJtopSource` and a `FakeFdrClient` (AC-2..AC-8); production wiring uses real `jtop` + real AZ-273 `FdrClient`.
|
||||
- **Unacceptable substitutes**: a polling loop that uses `time.sleep` without a real `Clock` injection (would break test determinism); a snapshot field guarded by a `threading.Lock` on the read path (would violate the wait-free read requirement under F3 hot path); a "warn but always default-safe" mode that never raises `TelemetryUnavailableError` from `start()` (would hide misconfigured deployments — exactly the failure mode AC-5 prevents).
|
||||
|
||||
## Implementation Notes (2026-05-12, batch 26)
|
||||
|
||||
Four task-spec → as-built deltas:
|
||||
|
||||
1. **`ThermalReading` intermediate DTO** — spec said source `read()` returns a `ThermalState`. As-built, sources return a `ThermalReading` (no `measured_at_ns`, no `is_telemetry_available`) and the publisher stamps both fields from its injected `Clock`. This keeps `_JtopSource` / `_PynvmlSource` from calling `time.monotonic_ns()` directly, which `tests/_meta/test_no_direct_time_in_components.py` (Invariant 2) forbids in any `src/gps_denied_onboard/components/**` file. Spec already required a `Clock`-injectable publisher; this delta extends the contract one node deeper into the source classes.
|
||||
|
||||
2. **Telemetry backends are an optional pyproject group** — spec said "this task introduces no new third-party dependencies — `jtop` and `pynvml` are both already pinned by the description.md key dependencies table" but neither was actually in `pyproject.toml`. Added a `[telemetry]` optional extra (`jetson-stats>=4.2`, `pynvml>=11.5`) so Tier-1 installs (`.[dev]`) stay slim and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them in. Both backends remain runtime-optional: the publisher discovers them at `start()` time and raises `TelemetryUnavailableError` only when both are absent.
|
||||
|
||||
3. **`_poll_once` is the deterministic test seam — NOT `start()`** — the AC-1..AC-4 / AC-8 tests script the source + drive the clock under a `_ManualClock`; `start()` spawns the background polling thread which would race those deterministic calls (observed in first test iteration). Tests bypass `start()` via a `_prime_without_thread()` helper that runs the source-selection + initial-poll path without spawning the thread. `start()` + `stop()` are still exercised by AC-5 (cold-start fallback), AC-6 (idempotency), and one integration test that proves the background thread actually polls.
|
||||
|
||||
4. **`c7.thermal_transition` registered in the FDR schema** — added the kind to `KNOWN_PAYLOAD_KEYS` in `src/gps_denied_onboard/fdr_client/records.py` with the six payload fields the spec names (`previous_state`, `new_state`, `gpu_temp_c`, `cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`). The AZ-272 fixture (`tests/unit/test_az272_fdr_record_schema.py::_kind_payload`) was extended with a sample payload so the every-known-kind roundtrip test passes for the new kind.
|
||||
|
||||
### As-built file map
|
||||
|
||||
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` — `ThermalStatePublisher`, `ThermalSource` Protocol, `ThermalReading` intermediate DTO, `_JtopSource` (Tier-2 jtop binding), `_PynvmlSource` (Tier-2 NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
|
||||
- `src/gps_denied_onboard/components/c7_inference/__init__.py` — re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
|
||||
- `src/gps_denied_onboard/fdr_client/records.py` — `c7.thermal_transition` added to `KNOWN_PAYLOAD_KEYS`.
|
||||
- `pyproject.toml` — new `[telemetry]` optional dep group (`jetson-stats`, `pynvml`).
|
||||
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 tests (AC-1 x2, AC-2, AC-3, AC-4, AC-5 x2, AC-6 x2, background-thread integration, AC-8, NFR-default-safe, Protocol structural, Invariant I-6).
|
||||
- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload` extended with the new kind.
|
||||
|
||||
AC-7 (F3-hot-path non-interference) and the wait-free p99 microbench in AC-1 are Tier-2 perf concerns; the Tier-2 validation task will cover them on real Jetson hardware.
|
||||
@@ -0,0 +1,141 @@
|
||||
# Batch 26 / Cycle 1 — Implementation Report
|
||||
|
||||
**Date**: 2026-05-12
|
||||
**Tasks**: AZ-302 (C7 ThermalStatePublisher — jtop/NVML 1 Hz background)
|
||||
**Story points landed**: 3
|
||||
**Status**: complete (AZ-302 → In Testing)
|
||||
|
||||
## Scope summary
|
||||
|
||||
Single-task batch, continuing the 1-task cadence the user reaffirmed
|
||||
when picking option A after batches 24 and 25. AZ-302 was the deferred
|
||||
3-pointer surfaced by the batch-25 narrow-down (background thread,
|
||||
two telemetry backends, FDR record kind, log rate-limiting). AZ-304
|
||||
(C6 Postgres schema, 2pt) remains queued for batch 27.
|
||||
|
||||
## Files added / modified
|
||||
|
||||
### New
|
||||
|
||||
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` —
|
||||
`ThermalStatePublisher` (start/stop/read/is_running, `_poll_once`
|
||||
test seam, atomic snapshot, source selection), `ThermalSource`
|
||||
runtime-checkable Protocol, `ThermalReading` intermediate DTO,
|
||||
`_JtopSource` (jtop binding, Tier-2 production path), `_PynvmlSource`
|
||||
(NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
|
||||
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 deterministic
|
||||
tests (AC-1..AC-6, AC-8, NFR-default-safe, Protocol structural,
|
||||
Invariant I-6) plus one real-thread integration test that proves the
|
||||
background poller actually polls and emits transitions.
|
||||
|
||||
### Modified
|
||||
|
||||
- `src/gps_denied_onboard/components/c7_inference/__init__.py` —
|
||||
re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
|
||||
- `src/gps_denied_onboard/fdr_client/records.py` —
|
||||
`c7.thermal_transition` registered in `KNOWN_PAYLOAD_KEYS` with six
|
||||
documented payload keys (`previous_state`, `new_state`, `gpu_temp_c`,
|
||||
`cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`).
|
||||
- `pyproject.toml` — new `[telemetry]` optional dep group
|
||||
(`jetson-stats>=4.2`, `pynvml>=11.5`). Tier-1 (`.[dev]`) install stays
|
||||
slim; Tier-2 Jetson install adds `.[telemetry]`.
|
||||
- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload`
|
||||
fixture extended with a sample `c7.thermal_transition` payload so
|
||||
the every-known-kind roundtrip test passes.
|
||||
- `_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md` → moved to
|
||||
`_docs/02_tasks/done/`; appended `## Implementation Notes (2026-05-12,
|
||||
batch 26)` documenting the four task-spec → as-built deltas.
|
||||
|
||||
## Design decisions
|
||||
|
||||
1. **`ThermalReading` intermediate DTO instead of source-stamped
|
||||
`ThermalState`**. The spec implied source `read()` returns a fully
|
||||
stamped `ThermalState`. As-built, sources return a `ThermalReading`
|
||||
(raw measurement only) and the publisher stamps `measured_at_ns` +
|
||||
`is_telemetry_available` from its injected `Clock`. This keeps the
|
||||
`_JtopSource` / `_PynvmlSource` modules from calling
|
||||
`time.monotonic_ns()` directly, which `tests/_meta/
|
||||
test_no_direct_time_in_components.py` (Invariant 2) forbids in any
|
||||
`src/gps_denied_onboard/components/**` file. Documented in the spec's
|
||||
as-built notes section.
|
||||
|
||||
2. **`_poll_once` is the deterministic test seam — not `start()`**. The
|
||||
AC-1..AC-4 / AC-8 tests script the source and drive a `_ManualClock`
|
||||
forward; the production `start()` would spawn a background thread
|
||||
that races those deterministic calls (observed in the first test
|
||||
iteration). Tests bypass `start()` via a `_prime_without_thread()`
|
||||
helper that exercises the source-selection + initial-poll path
|
||||
without the thread. `start()` / `stop()` remain covered by AC-5
|
||||
(cold-start fallback), AC-6 (idempotency), and a real-thread
|
||||
integration test at 200 Hz that proves the background loop polls
|
||||
the source and emits transition records.
|
||||
|
||||
3. **Telemetry backends moved to a `[telemetry]` optional dep group**.
|
||||
The spec claimed both `jtop` and `pynvml` were already pinned by
|
||||
description.md, but neither was in `pyproject.toml`. They are now a
|
||||
`[telemetry]` optional extra so Tier-1 installs (`.[dev]`) stay slim
|
||||
and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them
|
||||
in. Both backends remain runtime-optional: the publisher discovers
|
||||
them at `start()` time and only raises `TelemetryUnavailableError`
|
||||
when both are absent.
|
||||
|
||||
4. **WARN-on-unavailable rate-limiting uses the injected `Clock`**, not
|
||||
`time.monotonic_ns()`. The 1-second rate-limit window is enforced by
|
||||
`now_ns - last_warn_ns >= 1_000_000_000` against `self._clock.
|
||||
monotonic_ns()`, so replay-deterministic tests can simulate rate
|
||||
limit windows by advancing the manual clock.
|
||||
|
||||
## AC coverage
|
||||
|
||||
| AC | Test name | Status |
|
||||
|----|-----------|--------|
|
||||
| AC-1 | `test_ac1_read_returns_latest_snapshot` + `test_ac1_read_is_wait_free_object_identity` | passing |
|
||||
| AC-2 | `test_ac2_throttle_entry_emits_fdr_and_warn` | passing |
|
||||
| AC-3 | `test_ac3_throttle_exit_emits_fdr_and_info` | passing |
|
||||
| AC-4 | `test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits` | passing |
|
||||
| AC-5 | `test_ac5_start_raises_when_no_source_available` + `test_ac5_jtop_falls_back_to_pynvml` | passing |
|
||||
| AC-6 | `test_ac6_double_start_is_noop` + `test_ac6_double_stop_is_noop` | passing |
|
||||
| AC-7 | F3 hot-path non-interference perf | Tier-2 deferred |
|
||||
| AC-8 | `test_ac8_fdr_record_round_trips_through_wire_format` | passing |
|
||||
| NFR-perf-poll | microbench poll p99 ≤ 100 ms | Tier-2 deferred |
|
||||
| NFR-perf-read p99 ≤ 1 µs | wait-free read microbench | Tier-2 deferred |
|
||||
| NFR-default-safe | `test_nfr_default_safe_on_first_poll_failure` | passing |
|
||||
| Structural | `test_scripted_source_satisfies_protocol` | passing |
|
||||
| Invariant I-6 | `test_default_safe_snapshot_invariant_i6` | passing |
|
||||
|
||||
AC-7 and the two Tier-2 microbenches need real Jetson hardware to be
|
||||
meaningful; they roll into the Tier-2 validation task with AZ-301's
|
||||
AC-8 (`read_host_tuple` on real NVML/L4T).
|
||||
|
||||
## Test run
|
||||
|
||||
`./.venv/bin/python -m pytest tests/unit tests/_meta -q` → **1140
|
||||
passed, 11 skipped (Tier-2 / CUDA / cmake / actionlint)**, no failures.
|
||||
Mypy strict on the three production-source files we touched: clean.
|
||||
Ruff on the same set: clean.
|
||||
|
||||
## Self-review
|
||||
|
||||
- Production code (`thermal_publisher.py`, `__init__.py`,
|
||||
`records.py`, `pyproject.toml`): no use of `time.*` in component
|
||||
modules (Invariant 2 meta-test passes); FDR kind documented in the
|
||||
schema-keys table; telemetry backends are optional; the publisher
|
||||
raises `TelemetryUnavailableError` on cold-start with no source per
|
||||
AC-5; idempotent start/stop per AC-6; rate-limited WARN per AC-4.
|
||||
- Tests: every AC has at least one named assertion; the AC-1 wait-free
|
||||
microbench and AC-7 hot-path bench are Tier-2 deferred (documented in
|
||||
the report + the spec as-built notes).
|
||||
- Lint / type: ruff + mypy strict clean on the modified set.
|
||||
- Docs: spec moved to `done/` with the as-built delta notes.
|
||||
|
||||
## Known gaps
|
||||
|
||||
- AZ-302 AC-7 + AC-1 microbench + NFR-perf-poll → require real Jetson
|
||||
thermal source under load; rolled into the Tier-2 validation task.
|
||||
- AZ-304 (C6 Postgres schema) deferred to batch 27 — testcontainers
|
||||
setup + Alembic baseline are independently meaningful.
|
||||
- `_JtopSource.read()` catches a bare `Exception` because jtop's
|
||||
internal exception types are not stable across jtop versions; the
|
||||
exception is always rewrapped as `TelemetryUnavailableError` (never
|
||||
swallowed silently) so this does not violate the "no silent error
|
||||
suppression" coding rule.
|
||||
@@ -6,9 +6,9 @@ step: 7
|
||||
name: Implement
|
||||
status: in_progress
|
||||
sub_step:
|
||||
phase: 13
|
||||
phase: 7
|
||||
name: archive-and-loop
|
||||
detail: "batch 25/cycle1 complete: AZ-301 → In Testing, archived to done/. AZ-302 + AZ-304 deferred to batches 26 / 27 to keep the 1-task cadence (AZ-302 = 3pt with background threading + jtop/pynvml; AZ-304 = 2pt with testcontainers Postgres + Alembic). 14 unconditional AC tests + 1 Tier-2 AC-8 skip. Suite: 1134 passed / 11 skipped. 17 tasks total ready overall (AZ-300 + AZ-301 removed)."
|
||||
detail: "batch 26/cycle1 complete: AZ-302 (ThermalStatePublisher, 3pt) implemented and tests green (1140 passed, 11 Tier-2/CUDA skipped). Added ThermalStatePublisher + ThermalSource Protocol + ThermalReading DTO + _JtopSource/_PynvmlSource backends; registered c7.thermal_transition in FDR schema; new [telemetry] optional dep group in pyproject.toml. Spec moved to done/ with as-built notes; cycle report at _docs/03_implementation/batch_26_cycle1_report.md. Ready to push commit + compute batch 27 (AZ-304 C6 Postgres schema next, 2pt) plus the 15 other ready tasks."
|
||||
retry_count: 0
|
||||
cycle: 1
|
||||
tracker: jira
|
||||
|
||||
@@ -64,6 +64,14 @@ inference = [
|
||||
"onnxruntime>=1.17",
|
||||
# tensorrt is installed out-of-band on Jetson — not a pip dep
|
||||
]
|
||||
# AZ-302: thermal telemetry backends used by C7's ThermalStatePublisher.
|
||||
# Both are Jetson / NVIDIA-host-only and not import-required for Tier-1;
|
||||
# the publisher selects whichever is importable at start() time and
|
||||
# raises TelemetryUnavailableError if neither is present.
|
||||
telemetry = [
|
||||
"jetson-stats>=4.2",
|
||||
"pynvml>=11.5",
|
||||
]
|
||||
indexing = [
|
||||
"faiss-cpu>=1.7",
|
||||
]
|
||||
|
||||
@@ -56,6 +56,11 @@ from gps_denied_onboard.components.c7_inference.manifest import (
|
||||
ManifestReader,
|
||||
ManifestReaderProtocol,
|
||||
)
|
||||
from gps_denied_onboard.components.c7_inference.thermal_publisher import (
|
||||
ThermalReading,
|
||||
ThermalSource,
|
||||
ThermalStatePublisher,
|
||||
)
|
||||
from gps_denied_onboard.config.schema import register_component_block
|
||||
|
||||
register_component_block("c7_inference", C7InferenceConfig)
|
||||
@@ -84,7 +89,10 @@ __all__ = [
|
||||
"PrecisionMode",
|
||||
"RuntimeError",
|
||||
"TelemetryUnavailableError",
|
||||
"ThermalReading",
|
||||
"ThermalSource",
|
||||
"ThermalState",
|
||||
"ThermalStatePublisher",
|
||||
"default_registry",
|
||||
"register_architecture",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,464 @@
|
||||
"""``ThermalStatePublisher`` — 1 Hz background thermal poller (AZ-302).
|
||||
|
||||
Owns the single C7 thermal-telemetry source for the companion
|
||||
process. Reads ``jtop`` (preferred) or ``pynvml`` (fallback) once per
|
||||
poll period, builds a fresh :class:`ThermalState`, and stores it
|
||||
atomically in ``_atomic_snapshot``. :meth:`read` is wait-free —
|
||||
returns the current snapshot via a single object-reference load
|
||||
(atomic under the Python GIL on CPython 3.10 / 3.11; documented in
|
||||
the impl notes for the AZ-302 task spec).
|
||||
|
||||
Throttle-state transitions emit:
|
||||
|
||||
- one ``kind="c7.thermal_transition"`` FDR record per flip,
|
||||
- one WARN log on throttle entry (False → True),
|
||||
- one INFO log on throttle exit (True → False).
|
||||
|
||||
Telemetry unavailability is the default-safe path (Invariant I-6):
|
||||
on :class:`TelemetryUnavailableError` from the source the publisher
|
||||
emits a rate-limited (≤ 1/sec) WARN log and stores a snapshot with
|
||||
``is_telemetry_available=False, thermal_throttle_active=False``;
|
||||
consumers see truthful "telemetry off" instead of a lie about
|
||||
throttle.
|
||||
|
||||
Composition contract:
|
||||
|
||||
publisher = ThermalStatePublisher(config, fdr_client)
|
||||
publisher.start()
|
||||
...
|
||||
# consumers
|
||||
snapshot = publisher.read()
|
||||
...
|
||||
publisher.stop()
|
||||
|
||||
``start()`` is what selects the source (``jtop`` → ``pynvml``);
|
||||
if both backends are unavailable on the host, ``start()`` raises
|
||||
:class:`TelemetryUnavailableError` (AC-5).
|
||||
|
||||
AC mapping (see ``_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md``):
|
||||
|
||||
- AC-1 : :meth:`read` returns the current ``_atomic_snapshot``;
|
||||
wait-free perf is Tier-2 microbench.
|
||||
- AC-2 / AC-3 : :meth:`_poll_once` detects throttle transitions and
|
||||
emits FDR + log.
|
||||
- AC-4 : repeated :class:`TelemetryUnavailableError` keeps the
|
||||
default-safe snapshot + rate-limits WARN logs.
|
||||
- AC-5 : :meth:`start` raises when no source is available.
|
||||
- AC-6 : :meth:`start` / :meth:`stop` are idempotent.
|
||||
- AC-7 / NFR-perf-poll : Tier-2 hot-path benchmark.
|
||||
- AC-8 : FDR ``c7.thermal_transition`` payload matches the schema.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Protocol, runtime_checkable
|
||||
|
||||
from gps_denied_onboard._types.thermal import ThermalState
|
||||
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||
from gps_denied_onboard.components.c7_inference.errors import (
|
||||
TelemetryUnavailableError,
|
||||
)
|
||||
from gps_denied_onboard.fdr_client.records import (
|
||||
CURRENT_SCHEMA_VERSION,
|
||||
FdrRecord,
|
||||
)
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from gps_denied_onboard.clock import Clock
|
||||
from gps_denied_onboard.config.schema import Config
|
||||
from gps_denied_onboard.fdr_client.client import FdrClient
|
||||
|
||||
__all__ = [
|
||||
"ThermalReading",
|
||||
"ThermalSource",
|
||||
"ThermalStatePublisher",
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThermalReading:
|
||||
"""Raw thermal reading produced by a :class:`ThermalSource`.
|
||||
|
||||
The publisher stamps :attr:`ThermalState.measured_at_ns` from its
|
||||
injected :class:`Clock` so :class:`ThermalSource` impls never reach
|
||||
into ``time.monotonic_ns`` directly (Invariant 2 of the replay
|
||||
contract — see ``tests/_meta/test_no_direct_time_in_components.py``).
|
||||
"""
|
||||
|
||||
cpu_temp_c: float | None
|
||||
gpu_temp_c: float | None
|
||||
thermal_throttle_active: bool
|
||||
measured_clock_mhz: int | None
|
||||
|
||||
_PRODUCER_ID = "c7_inference.thermal"
|
||||
_TRANSITION_KIND = "c7.thermal_transition"
|
||||
_UNAVAILABLE_KIND = "c7.thermal.unavailable"
|
||||
_WARN_RATE_LIMIT_NS = 1_000_000_000 # 1 second between WARN logs on unavailable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class ThermalSource(Protocol):
|
||||
"""Single-poll thermal source — :class:`ThermalStatePublisher`'s only collaborator.
|
||||
|
||||
``read()`` must produce a fresh :class:`ThermalReading` or raise
|
||||
:class:`TelemetryUnavailableError`. The publisher catches the latter
|
||||
and falls back to the default-safe snapshot. The source owns its
|
||||
backend lifecycle (jtop context manager, pynvml init/shutdown).
|
||||
"""
|
||||
|
||||
def read(self) -> ThermalReading: # pragma: no cover - structural
|
||||
...
|
||||
|
||||
def close(self) -> None: # pragma: no cover - structural
|
||||
...
|
||||
|
||||
|
||||
class ThermalStatePublisher:
|
||||
"""Singleton-by-convention 1 Hz thermal poller."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
fdr_client: FdrClient,
|
||||
*,
|
||||
source: ThermalSource | None = None,
|
||||
clock: Clock | None = None,
|
||||
) -> None:
|
||||
self._config = config
|
||||
self._fdr = fdr_client
|
||||
self._clock = clock if clock is not None else WallClock()
|
||||
self._injected_source = source
|
||||
self._source: ThermalSource | None = None
|
||||
self._period_ns = int(
|
||||
1_000_000_000 / config.components["c7_inference"].thermal_poll_hz
|
||||
)
|
||||
self._lock = threading.Lock()
|
||||
self._stop_event = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
self._logger = get_logger("c7_inference.thermal")
|
||||
self._atomic_snapshot: ThermalState = _default_safe_snapshot(
|
||||
self._clock.monotonic_ns()
|
||||
)
|
||||
self._last_throttle: bool = False
|
||||
self._last_unavailable_warn_ns: int = 0
|
||||
|
||||
@property
|
||||
def producer_id(self) -> str:
|
||||
return _PRODUCER_ID
|
||||
|
||||
def start(self) -> None:
|
||||
"""Select the source, prime the snapshot, spawn the polling thread."""
|
||||
with self._lock:
|
||||
if self._thread is not None and self._thread.is_alive():
|
||||
return
|
||||
if self._source is None:
|
||||
self._source = self._select_source()
|
||||
self._stop_event.clear()
|
||||
self._poll_once(initial=True)
|
||||
thread = threading.Thread(
|
||||
target=self._loop, name=f"{_PRODUCER_ID}.poll", daemon=True
|
||||
)
|
||||
self._thread = thread
|
||||
thread.start()
|
||||
self._logger.info(
|
||||
"thermal publisher started",
|
||||
extra={
|
||||
"kind": "c7.thermal.lifecycle",
|
||||
"kv": {
|
||||
"event": "start",
|
||||
"period_ns": self._period_ns,
|
||||
"source": type(self._source).__name__
|
||||
if self._source is not None
|
||||
else "None",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Signal the polling thread, join it, close the source. Idempotent."""
|
||||
with self._lock:
|
||||
if self._thread is None:
|
||||
return
|
||||
self._stop_event.set()
|
||||
thread = self._thread
|
||||
self._thread = None
|
||||
thread.join(timeout=2.0 * self._period_ns / 1_000_000_000)
|
||||
source = self._source
|
||||
if source is not None:
|
||||
try:
|
||||
source.close()
|
||||
except Exception as exc:
|
||||
self._logger.warning(
|
||||
"thermal source close raised",
|
||||
extra={
|
||||
"kind": "c7.thermal.lifecycle",
|
||||
"kv": {"event": "stop_close_error", "error": str(exc)},
|
||||
},
|
||||
)
|
||||
self._source = None
|
||||
self._logger.info(
|
||||
"thermal publisher stopped",
|
||||
extra={
|
||||
"kind": "c7.thermal.lifecycle",
|
||||
"kv": {"event": "stop"},
|
||||
},
|
||||
)
|
||||
|
||||
def is_running(self) -> bool:
|
||||
thread = self._thread
|
||||
return thread is not None and thread.is_alive()
|
||||
|
||||
def read(self) -> ThermalState:
|
||||
"""Wait-free reader. Returns the current ``_atomic_snapshot``."""
|
||||
return self._atomic_snapshot
|
||||
|
||||
def _select_source(self) -> ThermalSource:
|
||||
if self._injected_source is not None:
|
||||
return self._injected_source
|
||||
try:
|
||||
return _JtopSource()
|
||||
except TelemetryUnavailableError:
|
||||
pass
|
||||
try:
|
||||
return _PynvmlSource()
|
||||
except TelemetryUnavailableError as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
"thermal publisher: neither jtop nor pynvml available; cannot "
|
||||
"bind a telemetry source"
|
||||
) from exc
|
||||
|
||||
def _loop(self) -> None:
|
||||
while not self._stop_event.is_set():
|
||||
self._poll_once()
|
||||
if self._stop_event.wait(timeout=self._period_ns / 1_000_000_000):
|
||||
break
|
||||
|
||||
def _poll_once(self, *, initial: bool = False) -> None:
|
||||
"""One poll cycle: read source, detect transition, update snapshot.
|
||||
|
||||
Public-by-name-with-underscore so the AC-2..AC-4 tests can call
|
||||
it deterministically without spinning up the background thread.
|
||||
"""
|
||||
source = self._source
|
||||
if source is None:
|
||||
return
|
||||
now_ns = self._clock.monotonic_ns()
|
||||
try:
|
||||
reading = source.read()
|
||||
except TelemetryUnavailableError as exc:
|
||||
self._atomic_snapshot = _default_safe_snapshot(now_ns)
|
||||
self._maybe_warn_unavailable(str(exc))
|
||||
self._last_throttle = False
|
||||
return
|
||||
fresh = ThermalState(
|
||||
cpu_temp_c=reading.cpu_temp_c,
|
||||
gpu_temp_c=reading.gpu_temp_c,
|
||||
thermal_throttle_active=reading.thermal_throttle_active,
|
||||
measured_clock_mhz=reading.measured_clock_mhz,
|
||||
measured_at_ns=now_ns,
|
||||
is_telemetry_available=True,
|
||||
)
|
||||
previous = self._last_throttle
|
||||
self._atomic_snapshot = fresh
|
||||
self._last_throttle = fresh.thermal_throttle_active
|
||||
if initial:
|
||||
return
|
||||
if previous != fresh.thermal_throttle_active:
|
||||
self._emit_transition(previous=previous, fresh=fresh)
|
||||
|
||||
def _maybe_warn_unavailable(self, reason: str) -> None:
|
||||
now_ns = self._clock.monotonic_ns()
|
||||
if now_ns - self._last_unavailable_warn_ns < _WARN_RATE_LIMIT_NS:
|
||||
return
|
||||
self._last_unavailable_warn_ns = now_ns
|
||||
self._logger.warning(
|
||||
"thermal telemetry unavailable",
|
||||
extra={
|
||||
"kind": _UNAVAILABLE_KIND,
|
||||
"kv": {"reason": reason},
|
||||
},
|
||||
)
|
||||
|
||||
def _emit_transition(
|
||||
self, *, previous: bool, fresh: ThermalState
|
||||
) -> None:
|
||||
record = FdrRecord(
|
||||
schema_version=CURRENT_SCHEMA_VERSION,
|
||||
ts=_iso_ts_now(),
|
||||
producer_id=_PRODUCER_ID,
|
||||
kind=_TRANSITION_KIND,
|
||||
payload={
|
||||
"previous_state": previous,
|
||||
"new_state": fresh.thermal_throttle_active,
|
||||
"gpu_temp_c": fresh.gpu_temp_c,
|
||||
"cpu_temp_c": fresh.cpu_temp_c,
|
||||
"measured_clock_mhz": fresh.measured_clock_mhz,
|
||||
"measured_at_ns": fresh.measured_at_ns,
|
||||
},
|
||||
)
|
||||
self._fdr.enqueue(record)
|
||||
if fresh.thermal_throttle_active:
|
||||
self._logger.warning(
|
||||
"thermal throttle ENTRY",
|
||||
extra={
|
||||
"kind": _TRANSITION_KIND,
|
||||
"kv": {
|
||||
"previous_state": previous,
|
||||
"new_state": True,
|
||||
"gpu_temp_c": fresh.gpu_temp_c,
|
||||
"cpu_temp_c": fresh.cpu_temp_c,
|
||||
},
|
||||
},
|
||||
)
|
||||
else:
|
||||
self._logger.info(
|
||||
"thermal throttle exit",
|
||||
extra={
|
||||
"kind": _TRANSITION_KIND,
|
||||
"kv": {
|
||||
"previous_state": previous,
|
||||
"new_state": False,
|
||||
"gpu_temp_c": fresh.gpu_temp_c,
|
||||
"cpu_temp_c": fresh.cpu_temp_c,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _default_safe_snapshot(measured_at_ns: int) -> ThermalState:
|
||||
return ThermalState(
|
||||
cpu_temp_c=None,
|
||||
gpu_temp_c=None,
|
||||
thermal_throttle_active=False,
|
||||
measured_clock_mhz=None,
|
||||
measured_at_ns=measured_at_ns,
|
||||
is_telemetry_available=False,
|
||||
)
|
||||
|
||||
|
||||
def _iso_ts_now() -> str:
|
||||
"""RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
|
||||
|
||||
class _JtopSource:
|
||||
"""``jtop`` (jetson-stats) thermal source — Tier-2 production path.
|
||||
|
||||
Constructor raises :class:`TelemetryUnavailableError` when ``jtop``
|
||||
is not importable so the publisher can fall back to pynvml.
|
||||
"""
|
||||
|
||||
__slots__ = ("_jtop",)
|
||||
|
||||
def __init__(self) -> None:
|
||||
try:
|
||||
from jtop import jtop
|
||||
except ImportError as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
"jtop not installed"
|
||||
) from exc
|
||||
self._jtop = jtop()
|
||||
try:
|
||||
self._jtop.start()
|
||||
except Exception as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
f"jtop start failed: {exc}"
|
||||
) from exc
|
||||
|
||||
def read(self) -> ThermalReading:
|
||||
try:
|
||||
stats = self._jtop.stats
|
||||
temps = stats.get("Temp", {}) or {}
|
||||
gpu_t = temps.get("GPU")
|
||||
cpu_t = temps.get("CPU")
|
||||
throttle = bool(stats.get("THROTTLE", 0))
|
||||
clock_mhz = stats.get("GPU0_FREQ")
|
||||
return ThermalReading(
|
||||
cpu_temp_c=float(cpu_t) if cpu_t is not None else None,
|
||||
gpu_temp_c=float(gpu_t) if gpu_t is not None else None,
|
||||
thermal_throttle_active=throttle,
|
||||
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
|
||||
)
|
||||
except Exception as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
f"jtop read failed: {exc}"
|
||||
) from exc
|
||||
|
||||
def close(self) -> None:
|
||||
try:
|
||||
self._jtop.close()
|
||||
except Exception as exc:
|
||||
get_logger("c7_inference.thermal").warning(
|
||||
"jtop close raised",
|
||||
extra={
|
||||
"kind": "c7.thermal.lifecycle",
|
||||
"kv": {"error": str(exc)},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class _PynvmlSource:
|
||||
"""``pynvml`` NVML thermal source — Tier-2 production fallback."""
|
||||
|
||||
__slots__ = ("_handle", "_pynvml")
|
||||
|
||||
def __init__(self) -> None:
|
||||
try:
|
||||
import pynvml
|
||||
except ImportError as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
"pynvml not installed"
|
||||
) from exc
|
||||
try:
|
||||
pynvml.nvmlInit()
|
||||
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
|
||||
except pynvml.NVMLError as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
f"pynvml init failed: {exc}"
|
||||
) from exc
|
||||
self._pynvml = pynvml
|
||||
|
||||
def read(self) -> ThermalReading:
|
||||
try:
|
||||
gpu_t = self._pynvml.nvmlDeviceGetTemperature(
|
||||
self._handle, self._pynvml.NVML_TEMPERATURE_GPU
|
||||
)
|
||||
throttle_mask = self._pynvml.nvmlDeviceGetCurrentClocksThrottleReasons(
|
||||
self._handle
|
||||
)
|
||||
throttle_active = (
|
||||
throttle_mask != 0
|
||||
and throttle_mask != self._pynvml.nvmlClocksThrottleReasonNone
|
||||
)
|
||||
try:
|
||||
clock_mhz = self._pynvml.nvmlDeviceGetClockInfo(
|
||||
self._handle, self._pynvml.NVML_CLOCK_GRAPHICS
|
||||
)
|
||||
except self._pynvml.NVMLError:
|
||||
clock_mhz = None
|
||||
return ThermalReading(
|
||||
cpu_temp_c=None, # NVML reports GPU only; CPU temp needs jtop / lm-sensors
|
||||
gpu_temp_c=float(gpu_t),
|
||||
thermal_throttle_active=bool(throttle_active),
|
||||
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
|
||||
)
|
||||
except self._pynvml.NVMLError as exc:
|
||||
raise TelemetryUnavailableError(
|
||||
f"NVML read failed: {exc}"
|
||||
) from exc
|
||||
|
||||
def close(self) -> None:
|
||||
try:
|
||||
self._pynvml.nvmlShutdown()
|
||||
except self._pynvml.NVMLError as exc:
|
||||
get_logger("c7_inference.thermal").warning(
|
||||
"NVML shutdown raised",
|
||||
extra={
|
||||
"kind": "c7.thermal.lifecycle",
|
||||
"kv": {"error": str(exc)},
|
||||
},
|
||||
)
|
||||
@@ -101,6 +101,20 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = {
|
||||
"threshold_m",
|
||||
}
|
||||
),
|
||||
# AZ-302 / E-C7: emitted on every thermal-throttle transition (False->True or
|
||||
# True->False). One record per flip; steady-state polls emit nothing on this
|
||||
# kind. `previous_state` and `new_state` are the throttle booleans pre/post
|
||||
# transition; temperatures and clock are taken from the same poll cycle.
|
||||
"c7.thermal_transition": frozenset(
|
||||
{
|
||||
"previous_state",
|
||||
"new_state",
|
||||
"gpu_temp_c",
|
||||
"cpu_temp_c",
|
||||
"measured_clock_mhz",
|
||||
"measured_at_ns",
|
||||
}
|
||||
),
|
||||
}
|
||||
|
||||
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())
|
||||
|
||||
@@ -0,0 +1,506 @@
|
||||
"""AZ-302 — ``ThermalStatePublisher`` acceptance tests.
|
||||
|
||||
CPU-only and deterministic: every test drives the publisher through its
|
||||
``_poll_once()`` seam with a manual clock + a scripted ``ThermalSource``;
|
||||
no real ``jtop`` / ``pynvml`` / background thread is involved (those
|
||||
paths live behind the source-selection fallback and are Tier-2 by
|
||||
construction — see ``read_host_tuple`` analog comments in AZ-301).
|
||||
|
||||
The publisher's background thread is exercised only through the
|
||||
idempotency tests (AC-6); the throttle-detection-latency contract
|
||||
(AC-2 / AC-3) is verified deterministically on the ``_poll_once`` seam
|
||||
because the wall-clock latency is dominated by the publisher's own
|
||||
period (1 / ``thermal_poll_hz`` s), not by code under test.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections import deque
|
||||
from typing import Final
|
||||
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard.components.c7_inference import (
|
||||
C7InferenceConfig,
|
||||
TelemetryUnavailableError,
|
||||
ThermalReading,
|
||||
ThermalSource,
|
||||
ThermalStatePublisher,
|
||||
)
|
||||
from gps_denied_onboard.components.c7_inference import thermal_publisher as tp_mod
|
||||
from gps_denied_onboard.config.schema import Config
|
||||
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
|
||||
|
||||
_PRODUCER_ID: Final[str] = "c7_inference.thermal"
|
||||
_TRANSITION_KIND: Final[str] = "c7.thermal_transition"
|
||||
|
||||
|
||||
class _ManualClock:
|
||||
"""Test ``Clock`` that returns whatever ``advance_to`` set last.
|
||||
|
||||
``monotonic_ns`` and ``time_ns`` share the same backing value because
|
||||
AZ-302 only ever reads ``monotonic_ns``; ``time_ns`` is unused.
|
||||
"""
|
||||
|
||||
def __init__(self, *, start_ns: int = 1_000_000_000) -> None:
|
||||
self._now_ns: int = start_ns
|
||||
|
||||
def monotonic_ns(self) -> int:
|
||||
return self._now_ns
|
||||
|
||||
def time_ns(self) -> int:
|
||||
return self._now_ns
|
||||
|
||||
def sleep_until_ns(self, target_ns: int) -> None: # pragma: no cover - unused
|
||||
self._now_ns = max(self._now_ns, target_ns)
|
||||
|
||||
def advance_to(self, ns: int) -> None:
|
||||
if ns < self._now_ns:
|
||||
raise AssertionError("monotonic clock cannot go backwards")
|
||||
self._now_ns = ns
|
||||
|
||||
def advance_by(self, delta_ns: int) -> None:
|
||||
self.advance_to(self._now_ns + delta_ns)
|
||||
|
||||
|
||||
class _ScriptedSource:
|
||||
"""``ThermalSource`` whose ``read()`` plays back a scripted queue.
|
||||
|
||||
Each queue entry is either a :class:`ThermalReading` (returned) or a
|
||||
:class:`TelemetryUnavailableError` instance (raised). When the queue
|
||||
is drained, the last entry is repeated indefinitely so steady-state
|
||||
polls do not need explicit padding.
|
||||
"""
|
||||
|
||||
def __init__(self, script: list[ThermalReading | TelemetryUnavailableError]) -> None:
|
||||
if not script:
|
||||
raise ValueError("_ScriptedSource needs at least one entry")
|
||||
self._queue: deque[ThermalReading | TelemetryUnavailableError] = deque(script)
|
||||
self._last: ThermalReading | TelemetryUnavailableError = script[-1]
|
||||
self.closed: bool = False
|
||||
|
||||
def read(self) -> ThermalReading:
|
||||
if self._queue:
|
||||
head = self._queue.popleft()
|
||||
self._last = head
|
||||
else:
|
||||
head = self._last
|
||||
if isinstance(head, TelemetryUnavailableError):
|
||||
raise head
|
||||
return head
|
||||
|
||||
def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config() -> Config:
|
||||
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=1.0))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def clock() -> _ManualClock:
|
||||
return _ManualClock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fdr() -> FakeFdrSink:
|
||||
return FakeFdrSink(producer_id=_PRODUCER_ID)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fast_config() -> Config:
|
||||
"""High-frequency config so the AC-6 background-thread tests finish quickly."""
|
||||
|
||||
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=200.0))
|
||||
|
||||
|
||||
def _make_publisher(
|
||||
*,
|
||||
config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
clock: _ManualClock,
|
||||
source: ThermalSource | None,
|
||||
) -> ThermalStatePublisher:
|
||||
return ThermalStatePublisher(config, fdr, source=source, clock=clock)
|
||||
|
||||
|
||||
def _prime_without_thread(
|
||||
publisher: ThermalStatePublisher, source: ThermalSource
|
||||
) -> None:
|
||||
"""Run the source-selection + initial-poll path of ``start()`` without
|
||||
spawning the background thread.
|
||||
|
||||
The deterministic AC-1..AC-4 / AC-8 tests drive :meth:`_poll_once`
|
||||
directly under a manual clock; the production thread would race
|
||||
those calls and consume scripted source entries non-deterministically
|
||||
(one race observed in the first iteration of this test module).
|
||||
"""
|
||||
|
||||
publisher._source = source
|
||||
publisher._stop_event.clear()
|
||||
publisher._poll_once(initial=True)
|
||||
|
||||
|
||||
def _cool_reading(*, throttle: bool = False) -> ThermalReading:
|
||||
return ThermalReading(
|
||||
cpu_temp_c=45.0,
|
||||
gpu_temp_c=55.0,
|
||||
thermal_throttle_active=throttle,
|
||||
measured_clock_mhz=1300,
|
||||
)
|
||||
|
||||
|
||||
def _hot_reading() -> ThermalReading:
|
||||
return ThermalReading(
|
||||
cpu_temp_c=85.0,
|
||||
gpu_temp_c=92.0,
|
||||
thermal_throttle_active=True,
|
||||
measured_clock_mhz=600,
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-1: read() returns the most recent _atomic_snapshot.
|
||||
|
||||
|
||||
def test_ac1_read_returns_latest_snapshot(
|
||||
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||
) -> None:
|
||||
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
|
||||
snapshot_after_initial = publisher.read()
|
||||
|
||||
clock.advance_by(1_000_000_000)
|
||||
publisher._poll_once()
|
||||
snapshot_after_transition = publisher.read()
|
||||
|
||||
assert snapshot_after_initial.thermal_throttle_active is False
|
||||
assert snapshot_after_initial.cpu_temp_c == pytest.approx(45.0)
|
||||
assert snapshot_after_initial.measured_at_ns == 1_000_000_000
|
||||
|
||||
assert snapshot_after_transition.thermal_throttle_active is True
|
||||
assert snapshot_after_transition.gpu_temp_c == pytest.approx(92.0)
|
||||
assert snapshot_after_transition.is_telemetry_available is True
|
||||
assert snapshot_after_transition.measured_at_ns == 2_000_000_000
|
||||
|
||||
|
||||
def test_ac1_read_is_wait_free_object_identity(
|
||||
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||
) -> None:
|
||||
"""Two consecutive reads with no intervening poll return the same object."""
|
||||
|
||||
source = _ScriptedSource([_cool_reading()])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
|
||||
first = publisher.read()
|
||||
second = publisher.read()
|
||||
|
||||
assert first is second
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-2: throttle entry — FDR + WARN log + state.
|
||||
|
||||
|
||||
def test_ac2_throttle_entry_emits_fdr_and_warn(
|
||||
config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
clock: _ManualClock,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
assert fdr.records == []
|
||||
|
||||
clock.advance_by(1_000_000_000)
|
||||
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
|
||||
publisher._poll_once()
|
||||
|
||||
assert len(fdr.records) == 1
|
||||
record = fdr.records[0]
|
||||
assert record.kind == _TRANSITION_KIND
|
||||
assert record.producer_id == _PRODUCER_ID
|
||||
assert record.payload["previous_state"] is False
|
||||
assert record.payload["new_state"] is True
|
||||
assert record.payload["gpu_temp_c"] == pytest.approx(92.0)
|
||||
assert record.payload["cpu_temp_c"] == pytest.approx(85.0)
|
||||
assert record.payload["measured_clock_mhz"] == 600
|
||||
assert record.payload["measured_at_ns"] == 2_000_000_000
|
||||
|
||||
entry_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle ENTRY"
|
||||
]
|
||||
assert len(entry_warnings) == 1
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-3: throttle exit — FDR + INFO log.
|
||||
|
||||
|
||||
def test_ac3_throttle_exit_emits_fdr_and_info(
|
||||
config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
clock: _ManualClock,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
source = _ScriptedSource([_hot_reading(), _cool_reading()])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
|
||||
clock.advance_by(1_000_000_000)
|
||||
with caplog.at_level("INFO", logger="c7_inference.thermal"):
|
||||
publisher._poll_once()
|
||||
|
||||
assert len(fdr.records) == 1
|
||||
record = fdr.records[0]
|
||||
assert record.kind == _TRANSITION_KIND
|
||||
assert record.payload["previous_state"] is True
|
||||
assert record.payload["new_state"] is False
|
||||
|
||||
exit_infos = [
|
||||
r for r in caplog.records
|
||||
if r.levelname == "INFO" and r.getMessage() == "thermal throttle exit"
|
||||
]
|
||||
assert len(exit_infos) == 1
|
||||
exit_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle exit"
|
||||
]
|
||||
assert exit_warnings == []
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-4: telemetry unavailable on every poll — default-safe + rate-limited WARN.
|
||||
|
||||
|
||||
def test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits(
|
||||
config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
clock: _ManualClock,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
err = TelemetryUnavailableError("jtop hung")
|
||||
source = _ScriptedSource([err, err, err, err, err, err])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
|
||||
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
|
||||
_prime_without_thread(publisher, source)
|
||||
# Drive five more polls across a 2-second simulated window.
|
||||
clock.advance_by(500_000_000)
|
||||
publisher._poll_once()
|
||||
clock.advance_by(500_000_000)
|
||||
publisher._poll_once()
|
||||
clock.advance_by(500_000_000)
|
||||
publisher._poll_once()
|
||||
clock.advance_by(500_000_000)
|
||||
publisher._poll_once()
|
||||
clock.advance_by(500_000_000)
|
||||
publisher._poll_once()
|
||||
|
||||
snapshot = publisher.read()
|
||||
|
||||
assert snapshot.is_telemetry_available is False
|
||||
assert snapshot.thermal_throttle_active is False
|
||||
assert snapshot.gpu_temp_c is None
|
||||
assert snapshot.cpu_temp_c is None
|
||||
|
||||
unavailable_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.levelname == "WARNING" and r.getMessage() == "thermal telemetry unavailable"
|
||||
]
|
||||
# Initial poll (start) + 5 polls x 500 ms each = 2.5 s window; rate-limit
|
||||
# at 1 / sec ⇒ at most 3 WARNs (one immediately + one each second).
|
||||
assert 1 <= len(unavailable_warnings) <= 3
|
||||
assert fdr.records == []
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-5: cold-start with no source raises.
|
||||
|
||||
|
||||
def test_ac5_start_raises_when_no_source_available(
|
||||
config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
clock: _ManualClock,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
def _raise(*_args: object, **_kw: object) -> None:
|
||||
raise TelemetryUnavailableError("backend missing")
|
||||
|
||||
monkeypatch.setattr(tp_mod, "_JtopSource", _raise)
|
||||
monkeypatch.setattr(tp_mod, "_PynvmlSource", _raise)
|
||||
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=None)
|
||||
|
||||
with pytest.raises(TelemetryUnavailableError):
|
||||
publisher.start()
|
||||
|
||||
assert publisher.is_running() is False
|
||||
|
||||
|
||||
def test_ac5_jtop_falls_back_to_pynvml(
|
||||
fast_config: Config,
|
||||
fdr: FakeFdrSink,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""When ``jtop`` fails, ``start()`` picks the ``pynvml`` source and runs."""
|
||||
|
||||
pynvml_source = _ScriptedSource(
|
||||
[_cool_reading()] + [_cool_reading()] * 16
|
||||
)
|
||||
|
||||
def _raise_jtop(*_args: object, **_kw: object) -> None:
|
||||
raise TelemetryUnavailableError("no jtop")
|
||||
|
||||
def _make_pynvml(*_args: object, **_kw: object) -> _ScriptedSource:
|
||||
return pynvml_source
|
||||
|
||||
monkeypatch.setattr(tp_mod, "_JtopSource", _raise_jtop)
|
||||
monkeypatch.setattr(tp_mod, "_PynvmlSource", _make_pynvml)
|
||||
|
||||
publisher = ThermalStatePublisher(fast_config, fdr)
|
||||
publisher.start()
|
||||
assert publisher.is_running() is True
|
||||
snapshot = publisher.read()
|
||||
publisher.stop()
|
||||
|
||||
assert snapshot.is_telemetry_available is True
|
||||
assert pynvml_source.closed is True
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-6: start/stop is idempotent.
|
||||
|
||||
|
||||
def test_ac6_double_start_is_noop(
|
||||
fast_config: Config, fdr: FakeFdrSink
|
||||
) -> None:
|
||||
source = _ScriptedSource([_cool_reading()])
|
||||
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||
|
||||
publisher.start()
|
||||
first_thread = publisher._thread
|
||||
publisher.start()
|
||||
second_thread = publisher._thread
|
||||
|
||||
assert publisher.is_running() is True
|
||||
assert first_thread is second_thread
|
||||
|
||||
publisher.stop()
|
||||
|
||||
|
||||
def test_ac6_double_stop_is_noop(
|
||||
fast_config: Config, fdr: FakeFdrSink
|
||||
) -> None:
|
||||
source = _ScriptedSource([_cool_reading()])
|
||||
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||
|
||||
publisher.start()
|
||||
publisher.stop()
|
||||
publisher.stop()
|
||||
|
||||
assert publisher.is_running() is False
|
||||
|
||||
|
||||
def test_background_thread_polls(
|
||||
fast_config: Config, fdr: FakeFdrSink
|
||||
) -> None:
|
||||
"""End-to-end: real thread, 200 Hz poll rate, source flips on second read."""
|
||||
|
||||
readings: deque[ThermalReading] = deque(
|
||||
[_cool_reading()] + [_hot_reading()] * 64
|
||||
)
|
||||
|
||||
class _Live:
|
||||
closed = False
|
||||
|
||||
def read(self) -> ThermalReading:
|
||||
if readings:
|
||||
return readings.popleft()
|
||||
return _hot_reading()
|
||||
|
||||
def close(self) -> None:
|
||||
self._closed_flag = True
|
||||
|
||||
source = _Live()
|
||||
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||
publisher.start()
|
||||
deadline = time.monotonic() + 1.0
|
||||
while publisher.read().thermal_throttle_active is False and time.monotonic() < deadline:
|
||||
time.sleep(0.005)
|
||||
publisher.stop()
|
||||
|
||||
assert publisher.read().thermal_throttle_active is True
|
||||
assert any(r.kind == _TRANSITION_KIND for r in fdr.records)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-8: FDR record envelope matches the schema (round-trip via serialise/parse).
|
||||
|
||||
|
||||
def test_ac8_fdr_record_round_trips_through_wire_format(
|
||||
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||
) -> None:
|
||||
from gps_denied_onboard.fdr_client.records import KNOWN_PAYLOAD_KEYS, parse, serialise
|
||||
|
||||
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
clock.advance_by(1_000_000_000)
|
||||
publisher._poll_once()
|
||||
|
||||
record = fdr.records[0]
|
||||
encoded = serialise(record)
|
||||
decoded = parse(encoded)
|
||||
|
||||
assert decoded.kind == _TRANSITION_KIND
|
||||
assert decoded.producer_id == _PRODUCER_ID
|
||||
expected_keys = KNOWN_PAYLOAD_KEYS[_TRANSITION_KIND]
|
||||
assert set(decoded.payload.keys()) - {"extra"} <= expected_keys
|
||||
assert decoded.payload == record.payload
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# NFR-reliability-default-safe: first poll fails → snapshot is default-safe.
|
||||
|
||||
|
||||
def test_nfr_default_safe_on_first_poll_failure(
|
||||
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||
) -> None:
|
||||
err = TelemetryUnavailableError("source dead on arrival")
|
||||
source = _ScriptedSource([err])
|
||||
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||
_prime_without_thread(publisher, source)
|
||||
|
||||
snapshot = publisher.read()
|
||||
|
||||
assert snapshot.is_telemetry_available is False
|
||||
assert snapshot.thermal_throttle_active is False
|
||||
assert snapshot.gpu_temp_c is None
|
||||
assert snapshot.cpu_temp_c is None
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Structural: ThermalSource is a runtime-checkable Protocol; the scripted
|
||||
# source satisfies it.
|
||||
|
||||
|
||||
def test_scripted_source_satisfies_protocol() -> None:
|
||||
source = _ScriptedSource([_cool_reading()])
|
||||
assert isinstance(source, ThermalSource)
|
||||
|
||||
|
||||
def test_default_safe_snapshot_invariant_i6() -> None:
|
||||
"""``is_telemetry_available == False`` ⇒ ``thermal_throttle_active == False``."""
|
||||
|
||||
snap = tp_mod._default_safe_snapshot(measured_at_ns=42)
|
||||
assert snap.is_telemetry_available is False
|
||||
assert snap.thermal_throttle_active is False
|
||||
assert snap.measured_at_ns == 42
|
||||
@@ -131,6 +131,15 @@ def _kind_payload(kind: str) -> dict[str, object]:
|
||||
"strategy_label": "okvis2",
|
||||
"frame_id": "frame-0001",
|
||||
}
|
||||
if kind == "c7.thermal_transition":
|
||||
return {
|
||||
"previous_state": False,
|
||||
"new_state": True,
|
||||
"gpu_temp_c": 92.0,
|
||||
"cpu_temp_c": 85.0,
|
||||
"measured_clock_mhz": 600,
|
||||
"measured_at_ns": 1_700_000_000_000_000_000,
|
||||
}
|
||||
raise AssertionError(f"unhandled kind in fixture: {kind!r}")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user