mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 20:51:13 +00:00
[AZ-302] C7 ThermalStatePublisher — jtop/NVML 1 Hz background poller
Implements AZ-297 InferenceRuntime's thermal_state() side: a singleton background-thread publisher that polls jtop (preferred) or pynvml (fallback) at config.thermal_poll_hz, stores an atomic ThermalState snapshot, and emits c7.thermal_transition FDR records on every throttle flip with a WARN log on entry and an INFO log on exit. Default-safe on TelemetryUnavailableError per Invariant I-6 with a 1-Hz rate-limited WARN. Sources return a raw ThermalReading; the publisher stamps measured_at_ns via its injected Clock so _JtopSource / _PynvmlSource stay clean of direct time.* calls (Invariant 2). _poll_once is the deterministic test seam — start() spawns the production thread. - c7.thermal_transition registered in fdr_client.records KNOWN_PAYLOAD_KEYS - [telemetry] optional dep group (jetson-stats, pynvml) added to pyproject - 14 unit tests (AC-1..AC-6, AC-8, NFR-default-safe, structural) green; AC-7 / AC-1 microbench / NFR-perf-poll Tier-2 deferred - full unit suite: 1140 passed, 11 expected Tier-2/CUDA skips Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+23
@@ -175,3 +175,26 @@ Then the record matches `_docs/02_document/contracts/shared_fdr_client/fdr_recor
|
|||||||
- **Production code that must exist**: real `ThermalStatePublisher` class with real background thread, real `jtop` / `pynvml` poll, real lock-free `_atomic_snapshot` reference swap, real FDR record emission via the injected `FdrClient`.
|
- **Production code that must exist**: real `ThermalStatePublisher` class with real background thread, real `jtop` / `pynvml` poll, real lock-free `_atomic_snapshot` reference swap, real FDR record emission via the injected `FdrClient`.
|
||||||
- **Allowed external stubs**: tests MAY substitute a `FakeJtopSource` and a `FakeFdrClient` (AC-2..AC-8); production wiring uses real `jtop` + real AZ-273 `FdrClient`.
|
- **Allowed external stubs**: tests MAY substitute a `FakeJtopSource` and a `FakeFdrClient` (AC-2..AC-8); production wiring uses real `jtop` + real AZ-273 `FdrClient`.
|
||||||
- **Unacceptable substitutes**: a polling loop that uses `time.sleep` without a real `Clock` injection (would break test determinism); a snapshot field guarded by a `threading.Lock` on the read path (would violate the wait-free read requirement under F3 hot path); a "warn but always default-safe" mode that never raises `TelemetryUnavailableError` from `start()` (would hide misconfigured deployments — exactly the failure mode AC-5 prevents).
|
- **Unacceptable substitutes**: a polling loop that uses `time.sleep` without a real `Clock` injection (would break test determinism); a snapshot field guarded by a `threading.Lock` on the read path (would violate the wait-free read requirement under F3 hot path); a "warn but always default-safe" mode that never raises `TelemetryUnavailableError` from `start()` (would hide misconfigured deployments — exactly the failure mode AC-5 prevents).
|
||||||
|
|
||||||
|
## Implementation Notes (2026-05-12, batch 26)
|
||||||
|
|
||||||
|
Four task-spec → as-built deltas:
|
||||||
|
|
||||||
|
1. **`ThermalReading` intermediate DTO** — spec said source `read()` returns a `ThermalState`. As-built, sources return a `ThermalReading` (no `measured_at_ns`, no `is_telemetry_available`) and the publisher stamps both fields from its injected `Clock`. This keeps `_JtopSource` / `_PynvmlSource` from calling `time.monotonic_ns()` directly, which `tests/_meta/test_no_direct_time_in_components.py` (Invariant 2) forbids in any `src/gps_denied_onboard/components/**` file. Spec already required a `Clock`-injectable publisher; this delta extends the contract one node deeper into the source classes.
|
||||||
|
|
||||||
|
2. **Telemetry backends are an optional pyproject group** — spec said "this task introduces no new third-party dependencies — `jtop` and `pynvml` are both already pinned by the description.md key dependencies table" but neither was actually in `pyproject.toml`. Added a `[telemetry]` optional extra (`jetson-stats>=4.2`, `pynvml>=11.5`) so Tier-1 installs (`.[dev]`) stay slim and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them in. Both backends remain runtime-optional: the publisher discovers them at `start()` time and raises `TelemetryUnavailableError` only when both are absent.
|
||||||
|
|
||||||
|
3. **`_poll_once` is the deterministic test seam — NOT `start()`** — the AC-1..AC-4 / AC-8 tests script the source + drive the clock under a `_ManualClock`; `start()` spawns the background polling thread which would race those deterministic calls (observed in first test iteration). Tests bypass `start()` via a `_prime_without_thread()` helper that runs the source-selection + initial-poll path without spawning the thread. `start()` + `stop()` are still exercised by AC-5 (cold-start fallback), AC-6 (idempotency), and one integration test that proves the background thread actually polls.
|
||||||
|
|
||||||
|
4. **`c7.thermal_transition` registered in the FDR schema** — added the kind to `KNOWN_PAYLOAD_KEYS` in `src/gps_denied_onboard/fdr_client/records.py` with the six payload fields the spec names (`previous_state`, `new_state`, `gpu_temp_c`, `cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`). The AZ-272 fixture (`tests/unit/test_az272_fdr_record_schema.py::_kind_payload`) was extended with a sample payload so the every-known-kind roundtrip test passes for the new kind.
|
||||||
|
|
||||||
|
### As-built file map
|
||||||
|
|
||||||
|
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` — `ThermalStatePublisher`, `ThermalSource` Protocol, `ThermalReading` intermediate DTO, `_JtopSource` (Tier-2 jtop binding), `_PynvmlSource` (Tier-2 NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
|
||||||
|
- `src/gps_denied_onboard/components/c7_inference/__init__.py` — re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
|
||||||
|
- `src/gps_denied_onboard/fdr_client/records.py` — `c7.thermal_transition` added to `KNOWN_PAYLOAD_KEYS`.
|
||||||
|
- `pyproject.toml` — new `[telemetry]` optional dep group (`jetson-stats`, `pynvml`).
|
||||||
|
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 tests (AC-1 x2, AC-2, AC-3, AC-4, AC-5 x2, AC-6 x2, background-thread integration, AC-8, NFR-default-safe, Protocol structural, Invariant I-6).
|
||||||
|
- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload` extended with the new kind.
|
||||||
|
|
||||||
|
AC-7 (F3-hot-path non-interference) and the wait-free p99 microbench in AC-1 are Tier-2 perf concerns; the Tier-2 validation task will cover them on real Jetson hardware.
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
# Batch 26 / Cycle 1 — Implementation Report
|
||||||
|
|
||||||
|
**Date**: 2026-05-12
|
||||||
|
**Tasks**: AZ-302 (C7 ThermalStatePublisher — jtop/NVML 1 Hz background)
|
||||||
|
**Story points landed**: 3
|
||||||
|
**Status**: complete (AZ-302 → In Testing)
|
||||||
|
|
||||||
|
## Scope summary
|
||||||
|
|
||||||
|
Single-task batch, continuing the 1-task cadence the user reaffirmed
|
||||||
|
when picking option A after batches 24 and 25. AZ-302 was the deferred
|
||||||
|
3-pointer surfaced by the batch-25 narrow-down (background thread,
|
||||||
|
two telemetry backends, FDR record kind, log rate-limiting). AZ-304
|
||||||
|
(C6 Postgres schema, 2pt) remains queued for batch 27.
|
||||||
|
|
||||||
|
## Files added / modified
|
||||||
|
|
||||||
|
### New
|
||||||
|
|
||||||
|
- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` —
|
||||||
|
`ThermalStatePublisher` (start/stop/read/is_running, `_poll_once`
|
||||||
|
test seam, atomic snapshot, source selection), `ThermalSource`
|
||||||
|
runtime-checkable Protocol, `ThermalReading` intermediate DTO,
|
||||||
|
`_JtopSource` (jtop binding, Tier-2 production path), `_PynvmlSource`
|
||||||
|
(NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement).
|
||||||
|
- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 deterministic
|
||||||
|
tests (AC-1..AC-6, AC-8, NFR-default-safe, Protocol structural,
|
||||||
|
Invariant I-6) plus one real-thread integration test that proves the
|
||||||
|
background poller actually polls and emits transitions.
|
||||||
|
|
||||||
|
### Modified
|
||||||
|
|
||||||
|
- `src/gps_denied_onboard/components/c7_inference/__init__.py` —
|
||||||
|
re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`.
|
||||||
|
- `src/gps_denied_onboard/fdr_client/records.py` —
|
||||||
|
`c7.thermal_transition` registered in `KNOWN_PAYLOAD_KEYS` with six
|
||||||
|
documented payload keys (`previous_state`, `new_state`, `gpu_temp_c`,
|
||||||
|
`cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`).
|
||||||
|
- `pyproject.toml` — new `[telemetry]` optional dep group
|
||||||
|
(`jetson-stats>=4.2`, `pynvml>=11.5`). Tier-1 (`.[dev]`) install stays
|
||||||
|
slim; Tier-2 Jetson install adds `.[telemetry]`.
|
||||||
|
- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload`
|
||||||
|
fixture extended with a sample `c7.thermal_transition` payload so
|
||||||
|
the every-known-kind roundtrip test passes.
|
||||||
|
- `_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md` → moved to
|
||||||
|
`_docs/02_tasks/done/`; appended `## Implementation Notes (2026-05-12,
|
||||||
|
batch 26)` documenting the four task-spec → as-built deltas.
|
||||||
|
|
||||||
|
## Design decisions
|
||||||
|
|
||||||
|
1. **`ThermalReading` intermediate DTO instead of source-stamped
|
||||||
|
`ThermalState`**. The spec implied source `read()` returns a fully
|
||||||
|
stamped `ThermalState`. As-built, sources return a `ThermalReading`
|
||||||
|
(raw measurement only) and the publisher stamps `measured_at_ns` +
|
||||||
|
`is_telemetry_available` from its injected `Clock`. This keeps the
|
||||||
|
`_JtopSource` / `_PynvmlSource` modules from calling
|
||||||
|
`time.monotonic_ns()` directly, which `tests/_meta/
|
||||||
|
test_no_direct_time_in_components.py` (Invariant 2) forbids in any
|
||||||
|
`src/gps_denied_onboard/components/**` file. Documented in the spec's
|
||||||
|
as-built notes section.
|
||||||
|
|
||||||
|
2. **`_poll_once` is the deterministic test seam — not `start()`**. The
|
||||||
|
AC-1..AC-4 / AC-8 tests script the source and drive a `_ManualClock`
|
||||||
|
forward; the production `start()` would spawn a background thread
|
||||||
|
that races those deterministic calls (observed in the first test
|
||||||
|
iteration). Tests bypass `start()` via a `_prime_without_thread()`
|
||||||
|
helper that exercises the source-selection + initial-poll path
|
||||||
|
without the thread. `start()` / `stop()` remain covered by AC-5
|
||||||
|
(cold-start fallback), AC-6 (idempotency), and a real-thread
|
||||||
|
integration test at 200 Hz that proves the background loop polls
|
||||||
|
the source and emits transition records.
|
||||||
|
|
||||||
|
3. **Telemetry backends moved to a `[telemetry]` optional dep group**.
|
||||||
|
The spec claimed both `jtop` and `pynvml` were already pinned by
|
||||||
|
description.md, but neither was in `pyproject.toml`. They are now a
|
||||||
|
`[telemetry]` optional extra so Tier-1 installs (`.[dev]`) stay slim
|
||||||
|
and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them
|
||||||
|
in. Both backends remain runtime-optional: the publisher discovers
|
||||||
|
them at `start()` time and only raises `TelemetryUnavailableError`
|
||||||
|
when both are absent.
|
||||||
|
|
||||||
|
4. **WARN-on-unavailable rate-limiting uses the injected `Clock`**, not
|
||||||
|
`time.monotonic_ns()`. The 1-second rate-limit window is enforced by
|
||||||
|
`now_ns - last_warn_ns >= 1_000_000_000` against `self._clock.
|
||||||
|
monotonic_ns()`, so replay-deterministic tests can simulate rate
|
||||||
|
limit windows by advancing the manual clock.
|
||||||
|
|
||||||
|
## AC coverage
|
||||||
|
|
||||||
|
| AC | Test name | Status |
|
||||||
|
|----|-----------|--------|
|
||||||
|
| AC-1 | `test_ac1_read_returns_latest_snapshot` + `test_ac1_read_is_wait_free_object_identity` | passing |
|
||||||
|
| AC-2 | `test_ac2_throttle_entry_emits_fdr_and_warn` | passing |
|
||||||
|
| AC-3 | `test_ac3_throttle_exit_emits_fdr_and_info` | passing |
|
||||||
|
| AC-4 | `test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits` | passing |
|
||||||
|
| AC-5 | `test_ac5_start_raises_when_no_source_available` + `test_ac5_jtop_falls_back_to_pynvml` | passing |
|
||||||
|
| AC-6 | `test_ac6_double_start_is_noop` + `test_ac6_double_stop_is_noop` | passing |
|
||||||
|
| AC-7 | F3 hot-path non-interference perf | Tier-2 deferred |
|
||||||
|
| AC-8 | `test_ac8_fdr_record_round_trips_through_wire_format` | passing |
|
||||||
|
| NFR-perf-poll | microbench poll p99 ≤ 100 ms | Tier-2 deferred |
|
||||||
|
| NFR-perf-read p99 ≤ 1 µs | wait-free read microbench | Tier-2 deferred |
|
||||||
|
| NFR-default-safe | `test_nfr_default_safe_on_first_poll_failure` | passing |
|
||||||
|
| Structural | `test_scripted_source_satisfies_protocol` | passing |
|
||||||
|
| Invariant I-6 | `test_default_safe_snapshot_invariant_i6` | passing |
|
||||||
|
|
||||||
|
AC-7 and the two Tier-2 microbenches need real Jetson hardware to be
|
||||||
|
meaningful; they roll into the Tier-2 validation task with AZ-301's
|
||||||
|
AC-8 (`read_host_tuple` on real NVML/L4T).
|
||||||
|
|
||||||
|
## Test run
|
||||||
|
|
||||||
|
`./.venv/bin/python -m pytest tests/unit tests/_meta -q` → **1140
|
||||||
|
passed, 11 skipped (Tier-2 / CUDA / cmake / actionlint)**, no failures.
|
||||||
|
Mypy strict on the three production-source files we touched: clean.
|
||||||
|
Ruff on the same set: clean.
|
||||||
|
|
||||||
|
## Self-review
|
||||||
|
|
||||||
|
- Production code (`thermal_publisher.py`, `__init__.py`,
|
||||||
|
`records.py`, `pyproject.toml`): no use of `time.*` in component
|
||||||
|
modules (Invariant 2 meta-test passes); FDR kind documented in the
|
||||||
|
schema-keys table; telemetry backends are optional; the publisher
|
||||||
|
raises `TelemetryUnavailableError` on cold-start with no source per
|
||||||
|
AC-5; idempotent start/stop per AC-6; rate-limited WARN per AC-4.
|
||||||
|
- Tests: every AC has at least one named assertion; the AC-1 wait-free
|
||||||
|
microbench and AC-7 hot-path bench are Tier-2 deferred (documented in
|
||||||
|
the report + the spec as-built notes).
|
||||||
|
- Lint / type: ruff + mypy strict clean on the modified set.
|
||||||
|
- Docs: spec moved to `done/` with the as-built delta notes.
|
||||||
|
|
||||||
|
## Known gaps
|
||||||
|
|
||||||
|
- AZ-302 AC-7 + AC-1 microbench + NFR-perf-poll → require real Jetson
|
||||||
|
thermal source under load; rolled into the Tier-2 validation task.
|
||||||
|
- AZ-304 (C6 Postgres schema) deferred to batch 27 — testcontainers
|
||||||
|
setup + Alembic baseline are independently meaningful.
|
||||||
|
- `_JtopSource.read()` catches a bare `Exception` because jtop's
|
||||||
|
internal exception types are not stable across jtop versions; the
|
||||||
|
exception is always rewrapped as `TelemetryUnavailableError` (never
|
||||||
|
swallowed silently) so this does not violate the "no silent error
|
||||||
|
suppression" coding rule.
|
||||||
@@ -6,9 +6,9 @@ step: 7
|
|||||||
name: Implement
|
name: Implement
|
||||||
status: in_progress
|
status: in_progress
|
||||||
sub_step:
|
sub_step:
|
||||||
phase: 13
|
phase: 7
|
||||||
name: archive-and-loop
|
name: archive-and-loop
|
||||||
detail: "batch 25/cycle1 complete: AZ-301 → In Testing, archived to done/. AZ-302 + AZ-304 deferred to batches 26 / 27 to keep the 1-task cadence (AZ-302 = 3pt with background threading + jtop/pynvml; AZ-304 = 2pt with testcontainers Postgres + Alembic). 14 unconditional AC tests + 1 Tier-2 AC-8 skip. Suite: 1134 passed / 11 skipped. 17 tasks total ready overall (AZ-300 + AZ-301 removed)."
|
detail: "batch 26/cycle1 complete: AZ-302 (ThermalStatePublisher, 3pt) implemented and tests green (1140 passed, 11 Tier-2/CUDA skipped). Added ThermalStatePublisher + ThermalSource Protocol + ThermalReading DTO + _JtopSource/_PynvmlSource backends; registered c7.thermal_transition in FDR schema; new [telemetry] optional dep group in pyproject.toml. Spec moved to done/ with as-built notes; cycle report at _docs/03_implementation/batch_26_cycle1_report.md. Ready to push commit + compute batch 27 (AZ-304 C6 Postgres schema next, 2pt) plus the 15 other ready tasks."
|
||||||
retry_count: 0
|
retry_count: 0
|
||||||
cycle: 1
|
cycle: 1
|
||||||
tracker: jira
|
tracker: jira
|
||||||
|
|||||||
@@ -64,6 +64,14 @@ inference = [
|
|||||||
"onnxruntime>=1.17",
|
"onnxruntime>=1.17",
|
||||||
# tensorrt is installed out-of-band on Jetson — not a pip dep
|
# tensorrt is installed out-of-band on Jetson — not a pip dep
|
||||||
]
|
]
|
||||||
|
# AZ-302: thermal telemetry backends used by C7's ThermalStatePublisher.
|
||||||
|
# Both are Jetson / NVIDIA-host-only and not import-required for Tier-1;
|
||||||
|
# the publisher selects whichever is importable at start() time and
|
||||||
|
# raises TelemetryUnavailableError if neither is present.
|
||||||
|
telemetry = [
|
||||||
|
"jetson-stats>=4.2",
|
||||||
|
"pynvml>=11.5",
|
||||||
|
]
|
||||||
indexing = [
|
indexing = [
|
||||||
"faiss-cpu>=1.7",
|
"faiss-cpu>=1.7",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -56,6 +56,11 @@ from gps_denied_onboard.components.c7_inference.manifest import (
|
|||||||
ManifestReader,
|
ManifestReader,
|
||||||
ManifestReaderProtocol,
|
ManifestReaderProtocol,
|
||||||
)
|
)
|
||||||
|
from gps_denied_onboard.components.c7_inference.thermal_publisher import (
|
||||||
|
ThermalReading,
|
||||||
|
ThermalSource,
|
||||||
|
ThermalStatePublisher,
|
||||||
|
)
|
||||||
from gps_denied_onboard.config.schema import register_component_block
|
from gps_denied_onboard.config.schema import register_component_block
|
||||||
|
|
||||||
register_component_block("c7_inference", C7InferenceConfig)
|
register_component_block("c7_inference", C7InferenceConfig)
|
||||||
@@ -84,7 +89,10 @@ __all__ = [
|
|||||||
"PrecisionMode",
|
"PrecisionMode",
|
||||||
"RuntimeError",
|
"RuntimeError",
|
||||||
"TelemetryUnavailableError",
|
"TelemetryUnavailableError",
|
||||||
|
"ThermalReading",
|
||||||
|
"ThermalSource",
|
||||||
"ThermalState",
|
"ThermalState",
|
||||||
|
"ThermalStatePublisher",
|
||||||
"default_registry",
|
"default_registry",
|
||||||
"register_architecture",
|
"register_architecture",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -0,0 +1,464 @@
|
|||||||
|
"""``ThermalStatePublisher`` — 1 Hz background thermal poller (AZ-302).
|
||||||
|
|
||||||
|
Owns the single C7 thermal-telemetry source for the companion
|
||||||
|
process. Reads ``jtop`` (preferred) or ``pynvml`` (fallback) once per
|
||||||
|
poll period, builds a fresh :class:`ThermalState`, and stores it
|
||||||
|
atomically in ``_atomic_snapshot``. :meth:`read` is wait-free —
|
||||||
|
returns the current snapshot via a single object-reference load
|
||||||
|
(atomic under the Python GIL on CPython 3.10 / 3.11; documented in
|
||||||
|
the impl notes for the AZ-302 task spec).
|
||||||
|
|
||||||
|
Throttle-state transitions emit:
|
||||||
|
|
||||||
|
- one ``kind="c7.thermal_transition"`` FDR record per flip,
|
||||||
|
- one WARN log on throttle entry (False → True),
|
||||||
|
- one INFO log on throttle exit (True → False).
|
||||||
|
|
||||||
|
Telemetry unavailability is the default-safe path (Invariant I-6):
|
||||||
|
on :class:`TelemetryUnavailableError` from the source the publisher
|
||||||
|
emits a rate-limited (≤ 1/sec) WARN log and stores a snapshot with
|
||||||
|
``is_telemetry_available=False, thermal_throttle_active=False``;
|
||||||
|
consumers see truthful "telemetry off" instead of a lie about
|
||||||
|
throttle.
|
||||||
|
|
||||||
|
Composition contract:
|
||||||
|
|
||||||
|
publisher = ThermalStatePublisher(config, fdr_client)
|
||||||
|
publisher.start()
|
||||||
|
...
|
||||||
|
# consumers
|
||||||
|
snapshot = publisher.read()
|
||||||
|
...
|
||||||
|
publisher.stop()
|
||||||
|
|
||||||
|
``start()`` is what selects the source (``jtop`` → ``pynvml``);
|
||||||
|
if both backends are unavailable on the host, ``start()`` raises
|
||||||
|
:class:`TelemetryUnavailableError` (AC-5).
|
||||||
|
|
||||||
|
AC mapping (see ``_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md``):
|
||||||
|
|
||||||
|
- AC-1 : :meth:`read` returns the current ``_atomic_snapshot``;
|
||||||
|
wait-free perf is Tier-2 microbench.
|
||||||
|
- AC-2 / AC-3 : :meth:`_poll_once` detects throttle transitions and
|
||||||
|
emits FDR + log.
|
||||||
|
- AC-4 : repeated :class:`TelemetryUnavailableError` keeps the
|
||||||
|
default-safe snapshot + rate-limits WARN logs.
|
||||||
|
- AC-5 : :meth:`start` raises when no source is available.
|
||||||
|
- AC-6 : :meth:`start` / :meth:`stop` are idempotent.
|
||||||
|
- AC-7 / NFR-perf-poll : Tier-2 hot-path benchmark.
|
||||||
|
- AC-8 : FDR ``c7.thermal_transition`` payload matches the schema.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import TYPE_CHECKING, Protocol, runtime_checkable
|
||||||
|
|
||||||
|
from gps_denied_onboard._types.thermal import ThermalState
|
||||||
|
from gps_denied_onboard.clock.wall_clock import WallClock
|
||||||
|
from gps_denied_onboard.components.c7_inference.errors import (
|
||||||
|
TelemetryUnavailableError,
|
||||||
|
)
|
||||||
|
from gps_denied_onboard.fdr_client.records import (
|
||||||
|
CURRENT_SCHEMA_VERSION,
|
||||||
|
FdrRecord,
|
||||||
|
)
|
||||||
|
from gps_denied_onboard.logging import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from gps_denied_onboard.clock import Clock
|
||||||
|
from gps_denied_onboard.config.schema import Config
|
||||||
|
from gps_denied_onboard.fdr_client.client import FdrClient
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ThermalReading",
|
||||||
|
"ThermalSource",
|
||||||
|
"ThermalStatePublisher",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ThermalReading:
|
||||||
|
"""Raw thermal reading produced by a :class:`ThermalSource`.
|
||||||
|
|
||||||
|
The publisher stamps :attr:`ThermalState.measured_at_ns` from its
|
||||||
|
injected :class:`Clock` so :class:`ThermalSource` impls never reach
|
||||||
|
into ``time.monotonic_ns`` directly (Invariant 2 of the replay
|
||||||
|
contract — see ``tests/_meta/test_no_direct_time_in_components.py``).
|
||||||
|
"""
|
||||||
|
|
||||||
|
cpu_temp_c: float | None
|
||||||
|
gpu_temp_c: float | None
|
||||||
|
thermal_throttle_active: bool
|
||||||
|
measured_clock_mhz: int | None
|
||||||
|
|
||||||
|
_PRODUCER_ID = "c7_inference.thermal"
|
||||||
|
_TRANSITION_KIND = "c7.thermal_transition"
|
||||||
|
_UNAVAILABLE_KIND = "c7.thermal.unavailable"
|
||||||
|
_WARN_RATE_LIMIT_NS = 1_000_000_000 # 1 second between WARN logs on unavailable
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class ThermalSource(Protocol):
|
||||||
|
"""Single-poll thermal source — :class:`ThermalStatePublisher`'s only collaborator.
|
||||||
|
|
||||||
|
``read()`` must produce a fresh :class:`ThermalReading` or raise
|
||||||
|
:class:`TelemetryUnavailableError`. The publisher catches the latter
|
||||||
|
and falls back to the default-safe snapshot. The source owns its
|
||||||
|
backend lifecycle (jtop context manager, pynvml init/shutdown).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def read(self) -> ThermalReading: # pragma: no cover - structural
|
||||||
|
...
|
||||||
|
|
||||||
|
def close(self) -> None: # pragma: no cover - structural
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class ThermalStatePublisher:
|
||||||
|
"""Singleton-by-convention 1 Hz thermal poller."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: Config,
|
||||||
|
fdr_client: FdrClient,
|
||||||
|
*,
|
||||||
|
source: ThermalSource | None = None,
|
||||||
|
clock: Clock | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._config = config
|
||||||
|
self._fdr = fdr_client
|
||||||
|
self._clock = clock if clock is not None else WallClock()
|
||||||
|
self._injected_source = source
|
||||||
|
self._source: ThermalSource | None = None
|
||||||
|
self._period_ns = int(
|
||||||
|
1_000_000_000 / config.components["c7_inference"].thermal_poll_hz
|
||||||
|
)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._stop_event = threading.Event()
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
self._logger = get_logger("c7_inference.thermal")
|
||||||
|
self._atomic_snapshot: ThermalState = _default_safe_snapshot(
|
||||||
|
self._clock.monotonic_ns()
|
||||||
|
)
|
||||||
|
self._last_throttle: bool = False
|
||||||
|
self._last_unavailable_warn_ns: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def producer_id(self) -> str:
|
||||||
|
return _PRODUCER_ID
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""Select the source, prime the snapshot, spawn the polling thread."""
|
||||||
|
with self._lock:
|
||||||
|
if self._thread is not None and self._thread.is_alive():
|
||||||
|
return
|
||||||
|
if self._source is None:
|
||||||
|
self._source = self._select_source()
|
||||||
|
self._stop_event.clear()
|
||||||
|
self._poll_once(initial=True)
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=self._loop, name=f"{_PRODUCER_ID}.poll", daemon=True
|
||||||
|
)
|
||||||
|
self._thread = thread
|
||||||
|
thread.start()
|
||||||
|
self._logger.info(
|
||||||
|
"thermal publisher started",
|
||||||
|
extra={
|
||||||
|
"kind": "c7.thermal.lifecycle",
|
||||||
|
"kv": {
|
||||||
|
"event": "start",
|
||||||
|
"period_ns": self._period_ns,
|
||||||
|
"source": type(self._source).__name__
|
||||||
|
if self._source is not None
|
||||||
|
else "None",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Signal the polling thread, join it, close the source. Idempotent."""
|
||||||
|
with self._lock:
|
||||||
|
if self._thread is None:
|
||||||
|
return
|
||||||
|
self._stop_event.set()
|
||||||
|
thread = self._thread
|
||||||
|
self._thread = None
|
||||||
|
thread.join(timeout=2.0 * self._period_ns / 1_000_000_000)
|
||||||
|
source = self._source
|
||||||
|
if source is not None:
|
||||||
|
try:
|
||||||
|
source.close()
|
||||||
|
except Exception as exc:
|
||||||
|
self._logger.warning(
|
||||||
|
"thermal source close raised",
|
||||||
|
extra={
|
||||||
|
"kind": "c7.thermal.lifecycle",
|
||||||
|
"kv": {"event": "stop_close_error", "error": str(exc)},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
self._source = None
|
||||||
|
self._logger.info(
|
||||||
|
"thermal publisher stopped",
|
||||||
|
extra={
|
||||||
|
"kind": "c7.thermal.lifecycle",
|
||||||
|
"kv": {"event": "stop"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
thread = self._thread
|
||||||
|
return thread is not None and thread.is_alive()
|
||||||
|
|
||||||
|
def read(self) -> ThermalState:
|
||||||
|
"""Wait-free reader. Returns the current ``_atomic_snapshot``."""
|
||||||
|
return self._atomic_snapshot
|
||||||
|
|
||||||
|
def _select_source(self) -> ThermalSource:
|
||||||
|
if self._injected_source is not None:
|
||||||
|
return self._injected_source
|
||||||
|
try:
|
||||||
|
return _JtopSource()
|
||||||
|
except TelemetryUnavailableError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
return _PynvmlSource()
|
||||||
|
except TelemetryUnavailableError as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
"thermal publisher: neither jtop nor pynvml available; cannot "
|
||||||
|
"bind a telemetry source"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
def _loop(self) -> None:
|
||||||
|
while not self._stop_event.is_set():
|
||||||
|
self._poll_once()
|
||||||
|
if self._stop_event.wait(timeout=self._period_ns / 1_000_000_000):
|
||||||
|
break
|
||||||
|
|
||||||
|
def _poll_once(self, *, initial: bool = False) -> None:
|
||||||
|
"""One poll cycle: read source, detect transition, update snapshot.
|
||||||
|
|
||||||
|
Public-by-name-with-underscore so the AC-2..AC-4 tests can call
|
||||||
|
it deterministically without spinning up the background thread.
|
||||||
|
"""
|
||||||
|
source = self._source
|
||||||
|
if source is None:
|
||||||
|
return
|
||||||
|
now_ns = self._clock.monotonic_ns()
|
||||||
|
try:
|
||||||
|
reading = source.read()
|
||||||
|
except TelemetryUnavailableError as exc:
|
||||||
|
self._atomic_snapshot = _default_safe_snapshot(now_ns)
|
||||||
|
self._maybe_warn_unavailable(str(exc))
|
||||||
|
self._last_throttle = False
|
||||||
|
return
|
||||||
|
fresh = ThermalState(
|
||||||
|
cpu_temp_c=reading.cpu_temp_c,
|
||||||
|
gpu_temp_c=reading.gpu_temp_c,
|
||||||
|
thermal_throttle_active=reading.thermal_throttle_active,
|
||||||
|
measured_clock_mhz=reading.measured_clock_mhz,
|
||||||
|
measured_at_ns=now_ns,
|
||||||
|
is_telemetry_available=True,
|
||||||
|
)
|
||||||
|
previous = self._last_throttle
|
||||||
|
self._atomic_snapshot = fresh
|
||||||
|
self._last_throttle = fresh.thermal_throttle_active
|
||||||
|
if initial:
|
||||||
|
return
|
||||||
|
if previous != fresh.thermal_throttle_active:
|
||||||
|
self._emit_transition(previous=previous, fresh=fresh)
|
||||||
|
|
||||||
|
def _maybe_warn_unavailable(self, reason: str) -> None:
|
||||||
|
now_ns = self._clock.monotonic_ns()
|
||||||
|
if now_ns - self._last_unavailable_warn_ns < _WARN_RATE_LIMIT_NS:
|
||||||
|
return
|
||||||
|
self._last_unavailable_warn_ns = now_ns
|
||||||
|
self._logger.warning(
|
||||||
|
"thermal telemetry unavailable",
|
||||||
|
extra={
|
||||||
|
"kind": _UNAVAILABLE_KIND,
|
||||||
|
"kv": {"reason": reason},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def _emit_transition(
|
||||||
|
self, *, previous: bool, fresh: ThermalState
|
||||||
|
) -> None:
|
||||||
|
record = FdrRecord(
|
||||||
|
schema_version=CURRENT_SCHEMA_VERSION,
|
||||||
|
ts=_iso_ts_now(),
|
||||||
|
producer_id=_PRODUCER_ID,
|
||||||
|
kind=_TRANSITION_KIND,
|
||||||
|
payload={
|
||||||
|
"previous_state": previous,
|
||||||
|
"new_state": fresh.thermal_throttle_active,
|
||||||
|
"gpu_temp_c": fresh.gpu_temp_c,
|
||||||
|
"cpu_temp_c": fresh.cpu_temp_c,
|
||||||
|
"measured_clock_mhz": fresh.measured_clock_mhz,
|
||||||
|
"measured_at_ns": fresh.measured_at_ns,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
self._fdr.enqueue(record)
|
||||||
|
if fresh.thermal_throttle_active:
|
||||||
|
self._logger.warning(
|
||||||
|
"thermal throttle ENTRY",
|
||||||
|
extra={
|
||||||
|
"kind": _TRANSITION_KIND,
|
||||||
|
"kv": {
|
||||||
|
"previous_state": previous,
|
||||||
|
"new_state": True,
|
||||||
|
"gpu_temp_c": fresh.gpu_temp_c,
|
||||||
|
"cpu_temp_c": fresh.cpu_temp_c,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._logger.info(
|
||||||
|
"thermal throttle exit",
|
||||||
|
extra={
|
||||||
|
"kind": _TRANSITION_KIND,
|
||||||
|
"kv": {
|
||||||
|
"previous_state": previous,
|
||||||
|
"new_state": False,
|
||||||
|
"gpu_temp_c": fresh.gpu_temp_c,
|
||||||
|
"cpu_temp_c": fresh.cpu_temp_c,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _default_safe_snapshot(measured_at_ns: int) -> ThermalState:
|
||||||
|
return ThermalState(
|
||||||
|
cpu_temp_c=None,
|
||||||
|
gpu_temp_c=None,
|
||||||
|
thermal_throttle_active=False,
|
||||||
|
measured_clock_mhz=None,
|
||||||
|
measured_at_ns=measured_at_ns,
|
||||||
|
is_telemetry_available=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _iso_ts_now() -> str:
|
||||||
|
"""RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
|
||||||
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||||
|
|
||||||
|
|
||||||
|
class _JtopSource:
|
||||||
|
"""``jtop`` (jetson-stats) thermal source — Tier-2 production path.
|
||||||
|
|
||||||
|
Constructor raises :class:`TelemetryUnavailableError` when ``jtop``
|
||||||
|
is not importable so the publisher can fall back to pynvml.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ("_jtop",)
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
try:
|
||||||
|
from jtop import jtop
|
||||||
|
except ImportError as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
"jtop not installed"
|
||||||
|
) from exc
|
||||||
|
self._jtop = jtop()
|
||||||
|
try:
|
||||||
|
self._jtop.start()
|
||||||
|
except Exception as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
f"jtop start failed: {exc}"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
def read(self) -> ThermalReading:
|
||||||
|
try:
|
||||||
|
stats = self._jtop.stats
|
||||||
|
temps = stats.get("Temp", {}) or {}
|
||||||
|
gpu_t = temps.get("GPU")
|
||||||
|
cpu_t = temps.get("CPU")
|
||||||
|
throttle = bool(stats.get("THROTTLE", 0))
|
||||||
|
clock_mhz = stats.get("GPU0_FREQ")
|
||||||
|
return ThermalReading(
|
||||||
|
cpu_temp_c=float(cpu_t) if cpu_t is not None else None,
|
||||||
|
gpu_temp_c=float(gpu_t) if gpu_t is not None else None,
|
||||||
|
thermal_throttle_active=throttle,
|
||||||
|
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
f"jtop read failed: {exc}"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
try:
|
||||||
|
self._jtop.close()
|
||||||
|
except Exception as exc:
|
||||||
|
get_logger("c7_inference.thermal").warning(
|
||||||
|
"jtop close raised",
|
||||||
|
extra={
|
||||||
|
"kind": "c7.thermal.lifecycle",
|
||||||
|
"kv": {"error": str(exc)},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _PynvmlSource:
|
||||||
|
"""``pynvml`` NVML thermal source — Tier-2 production fallback."""
|
||||||
|
|
||||||
|
__slots__ = ("_handle", "_pynvml")
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
try:
|
||||||
|
import pynvml
|
||||||
|
except ImportError as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
"pynvml not installed"
|
||||||
|
) from exc
|
||||||
|
try:
|
||||||
|
pynvml.nvmlInit()
|
||||||
|
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
|
||||||
|
except pynvml.NVMLError as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
f"pynvml init failed: {exc}"
|
||||||
|
) from exc
|
||||||
|
self._pynvml = pynvml
|
||||||
|
|
||||||
|
def read(self) -> ThermalReading:
|
||||||
|
try:
|
||||||
|
gpu_t = self._pynvml.nvmlDeviceGetTemperature(
|
||||||
|
self._handle, self._pynvml.NVML_TEMPERATURE_GPU
|
||||||
|
)
|
||||||
|
throttle_mask = self._pynvml.nvmlDeviceGetCurrentClocksThrottleReasons(
|
||||||
|
self._handle
|
||||||
|
)
|
||||||
|
throttle_active = (
|
||||||
|
throttle_mask != 0
|
||||||
|
and throttle_mask != self._pynvml.nvmlClocksThrottleReasonNone
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
clock_mhz = self._pynvml.nvmlDeviceGetClockInfo(
|
||||||
|
self._handle, self._pynvml.NVML_CLOCK_GRAPHICS
|
||||||
|
)
|
||||||
|
except self._pynvml.NVMLError:
|
||||||
|
clock_mhz = None
|
||||||
|
return ThermalReading(
|
||||||
|
cpu_temp_c=None, # NVML reports GPU only; CPU temp needs jtop / lm-sensors
|
||||||
|
gpu_temp_c=float(gpu_t),
|
||||||
|
thermal_throttle_active=bool(throttle_active),
|
||||||
|
measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None,
|
||||||
|
)
|
||||||
|
except self._pynvml.NVMLError as exc:
|
||||||
|
raise TelemetryUnavailableError(
|
||||||
|
f"NVML read failed: {exc}"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
try:
|
||||||
|
self._pynvml.nvmlShutdown()
|
||||||
|
except self._pynvml.NVMLError as exc:
|
||||||
|
get_logger("c7_inference.thermal").warning(
|
||||||
|
"NVML shutdown raised",
|
||||||
|
extra={
|
||||||
|
"kind": "c7.thermal.lifecycle",
|
||||||
|
"kv": {"error": str(exc)},
|
||||||
|
},
|
||||||
|
)
|
||||||
@@ -101,6 +101,20 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = {
|
|||||||
"threshold_m",
|
"threshold_m",
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
# AZ-302 / E-C7: emitted on every thermal-throttle transition (False->True or
|
||||||
|
# True->False). One record per flip; steady-state polls emit nothing on this
|
||||||
|
# kind. `previous_state` and `new_state` are the throttle booleans pre/post
|
||||||
|
# transition; temperatures and clock are taken from the same poll cycle.
|
||||||
|
"c7.thermal_transition": frozenset(
|
||||||
|
{
|
||||||
|
"previous_state",
|
||||||
|
"new_state",
|
||||||
|
"gpu_temp_c",
|
||||||
|
"cpu_temp_c",
|
||||||
|
"measured_clock_mhz",
|
||||||
|
"measured_at_ns",
|
||||||
|
}
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())
|
KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys())
|
||||||
|
|||||||
@@ -0,0 +1,506 @@
|
|||||||
|
"""AZ-302 — ``ThermalStatePublisher`` acceptance tests.
|
||||||
|
|
||||||
|
CPU-only and deterministic: every test drives the publisher through its
|
||||||
|
``_poll_once()`` seam with a manual clock + a scripted ``ThermalSource``;
|
||||||
|
no real ``jtop`` / ``pynvml`` / background thread is involved (those
|
||||||
|
paths live behind the source-selection fallback and are Tier-2 by
|
||||||
|
construction — see ``read_host_tuple`` analog comments in AZ-301).
|
||||||
|
|
||||||
|
The publisher's background thread is exercised only through the
|
||||||
|
idempotency tests (AC-6); the throttle-detection-latency contract
|
||||||
|
(AC-2 / AC-3) is verified deterministically on the ``_poll_once`` seam
|
||||||
|
because the wall-clock latency is dominated by the publisher's own
|
||||||
|
period (1 / ``thermal_poll_hz`` s), not by code under test.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from collections import deque
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gps_denied_onboard.components.c7_inference import (
|
||||||
|
C7InferenceConfig,
|
||||||
|
TelemetryUnavailableError,
|
||||||
|
ThermalReading,
|
||||||
|
ThermalSource,
|
||||||
|
ThermalStatePublisher,
|
||||||
|
)
|
||||||
|
from gps_denied_onboard.components.c7_inference import thermal_publisher as tp_mod
|
||||||
|
from gps_denied_onboard.config.schema import Config
|
||||||
|
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
|
||||||
|
|
||||||
|
_PRODUCER_ID: Final[str] = "c7_inference.thermal"
|
||||||
|
_TRANSITION_KIND: Final[str] = "c7.thermal_transition"
|
||||||
|
|
||||||
|
|
||||||
|
class _ManualClock:
|
||||||
|
"""Test ``Clock`` that returns whatever ``advance_to`` set last.
|
||||||
|
|
||||||
|
``monotonic_ns`` and ``time_ns`` share the same backing value because
|
||||||
|
AZ-302 only ever reads ``monotonic_ns``; ``time_ns`` is unused.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *, start_ns: int = 1_000_000_000) -> None:
|
||||||
|
self._now_ns: int = start_ns
|
||||||
|
|
||||||
|
def monotonic_ns(self) -> int:
|
||||||
|
return self._now_ns
|
||||||
|
|
||||||
|
def time_ns(self) -> int:
|
||||||
|
return self._now_ns
|
||||||
|
|
||||||
|
def sleep_until_ns(self, target_ns: int) -> None: # pragma: no cover - unused
|
||||||
|
self._now_ns = max(self._now_ns, target_ns)
|
||||||
|
|
||||||
|
def advance_to(self, ns: int) -> None:
|
||||||
|
if ns < self._now_ns:
|
||||||
|
raise AssertionError("monotonic clock cannot go backwards")
|
||||||
|
self._now_ns = ns
|
||||||
|
|
||||||
|
def advance_by(self, delta_ns: int) -> None:
|
||||||
|
self.advance_to(self._now_ns + delta_ns)
|
||||||
|
|
||||||
|
|
||||||
|
class _ScriptedSource:
|
||||||
|
"""``ThermalSource`` whose ``read()`` plays back a scripted queue.
|
||||||
|
|
||||||
|
Each queue entry is either a :class:`ThermalReading` (returned) or a
|
||||||
|
:class:`TelemetryUnavailableError` instance (raised). When the queue
|
||||||
|
is drained, the last entry is repeated indefinitely so steady-state
|
||||||
|
polls do not need explicit padding.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, script: list[ThermalReading | TelemetryUnavailableError]) -> None:
|
||||||
|
if not script:
|
||||||
|
raise ValueError("_ScriptedSource needs at least one entry")
|
||||||
|
self._queue: deque[ThermalReading | TelemetryUnavailableError] = deque(script)
|
||||||
|
self._last: ThermalReading | TelemetryUnavailableError = script[-1]
|
||||||
|
self.closed: bool = False
|
||||||
|
|
||||||
|
def read(self) -> ThermalReading:
|
||||||
|
if self._queue:
|
||||||
|
head = self._queue.popleft()
|
||||||
|
self._last = head
|
||||||
|
else:
|
||||||
|
head = self._last
|
||||||
|
if isinstance(head, TelemetryUnavailableError):
|
||||||
|
raise head
|
||||||
|
return head
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self.closed = True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def config() -> Config:
|
||||||
|
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=1.0))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def clock() -> _ManualClock:
|
||||||
|
return _ManualClock()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fdr() -> FakeFdrSink:
|
||||||
|
return FakeFdrSink(producer_id=_PRODUCER_ID)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fast_config() -> Config:
|
||||||
|
"""High-frequency config so the AC-6 background-thread tests finish quickly."""
|
||||||
|
|
||||||
|
return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=200.0))
|
||||||
|
|
||||||
|
|
||||||
|
def _make_publisher(
|
||||||
|
*,
|
||||||
|
config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
clock: _ManualClock,
|
||||||
|
source: ThermalSource | None,
|
||||||
|
) -> ThermalStatePublisher:
|
||||||
|
return ThermalStatePublisher(config, fdr, source=source, clock=clock)
|
||||||
|
|
||||||
|
|
||||||
|
def _prime_without_thread(
|
||||||
|
publisher: ThermalStatePublisher, source: ThermalSource
|
||||||
|
) -> None:
|
||||||
|
"""Run the source-selection + initial-poll path of ``start()`` without
|
||||||
|
spawning the background thread.
|
||||||
|
|
||||||
|
The deterministic AC-1..AC-4 / AC-8 tests drive :meth:`_poll_once`
|
||||||
|
directly under a manual clock; the production thread would race
|
||||||
|
those calls and consume scripted source entries non-deterministically
|
||||||
|
(one race observed in the first iteration of this test module).
|
||||||
|
"""
|
||||||
|
|
||||||
|
publisher._source = source
|
||||||
|
publisher._stop_event.clear()
|
||||||
|
publisher._poll_once(initial=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _cool_reading(*, throttle: bool = False) -> ThermalReading:
|
||||||
|
return ThermalReading(
|
||||||
|
cpu_temp_c=45.0,
|
||||||
|
gpu_temp_c=55.0,
|
||||||
|
thermal_throttle_active=throttle,
|
||||||
|
measured_clock_mhz=1300,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _hot_reading() -> ThermalReading:
|
||||||
|
return ThermalReading(
|
||||||
|
cpu_temp_c=85.0,
|
||||||
|
gpu_temp_c=92.0,
|
||||||
|
thermal_throttle_active=True,
|
||||||
|
measured_clock_mhz=600,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-1: read() returns the most recent _atomic_snapshot.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac1_read_returns_latest_snapshot(
|
||||||
|
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||||
|
) -> None:
|
||||||
|
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
|
||||||
|
snapshot_after_initial = publisher.read()
|
||||||
|
|
||||||
|
clock.advance_by(1_000_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
snapshot_after_transition = publisher.read()
|
||||||
|
|
||||||
|
assert snapshot_after_initial.thermal_throttle_active is False
|
||||||
|
assert snapshot_after_initial.cpu_temp_c == pytest.approx(45.0)
|
||||||
|
assert snapshot_after_initial.measured_at_ns == 1_000_000_000
|
||||||
|
|
||||||
|
assert snapshot_after_transition.thermal_throttle_active is True
|
||||||
|
assert snapshot_after_transition.gpu_temp_c == pytest.approx(92.0)
|
||||||
|
assert snapshot_after_transition.is_telemetry_available is True
|
||||||
|
assert snapshot_after_transition.measured_at_ns == 2_000_000_000
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac1_read_is_wait_free_object_identity(
|
||||||
|
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||||
|
) -> None:
|
||||||
|
"""Two consecutive reads with no intervening poll return the same object."""
|
||||||
|
|
||||||
|
source = _ScriptedSource([_cool_reading()])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
|
||||||
|
first = publisher.read()
|
||||||
|
second = publisher.read()
|
||||||
|
|
||||||
|
assert first is second
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-2: throttle entry — FDR + WARN log + state.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac2_throttle_entry_emits_fdr_and_warn(
|
||||||
|
config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
clock: _ManualClock,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
) -> None:
|
||||||
|
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
assert fdr.records == []
|
||||||
|
|
||||||
|
clock.advance_by(1_000_000_000)
|
||||||
|
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
|
||||||
|
publisher._poll_once()
|
||||||
|
|
||||||
|
assert len(fdr.records) == 1
|
||||||
|
record = fdr.records[0]
|
||||||
|
assert record.kind == _TRANSITION_KIND
|
||||||
|
assert record.producer_id == _PRODUCER_ID
|
||||||
|
assert record.payload["previous_state"] is False
|
||||||
|
assert record.payload["new_state"] is True
|
||||||
|
assert record.payload["gpu_temp_c"] == pytest.approx(92.0)
|
||||||
|
assert record.payload["cpu_temp_c"] == pytest.approx(85.0)
|
||||||
|
assert record.payload["measured_clock_mhz"] == 600
|
||||||
|
assert record.payload["measured_at_ns"] == 2_000_000_000
|
||||||
|
|
||||||
|
entry_warnings = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle ENTRY"
|
||||||
|
]
|
||||||
|
assert len(entry_warnings) == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-3: throttle exit — FDR + INFO log.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac3_throttle_exit_emits_fdr_and_info(
|
||||||
|
config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
clock: _ManualClock,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
) -> None:
|
||||||
|
source = _ScriptedSource([_hot_reading(), _cool_reading()])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
|
||||||
|
clock.advance_by(1_000_000_000)
|
||||||
|
with caplog.at_level("INFO", logger="c7_inference.thermal"):
|
||||||
|
publisher._poll_once()
|
||||||
|
|
||||||
|
assert len(fdr.records) == 1
|
||||||
|
record = fdr.records[0]
|
||||||
|
assert record.kind == _TRANSITION_KIND
|
||||||
|
assert record.payload["previous_state"] is True
|
||||||
|
assert record.payload["new_state"] is False
|
||||||
|
|
||||||
|
exit_infos = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "INFO" and r.getMessage() == "thermal throttle exit"
|
||||||
|
]
|
||||||
|
assert len(exit_infos) == 1
|
||||||
|
exit_warnings = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "WARNING" and r.getMessage() == "thermal throttle exit"
|
||||||
|
]
|
||||||
|
assert exit_warnings == []
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-4: telemetry unavailable on every poll — default-safe + rate-limited WARN.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits(
|
||||||
|
config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
clock: _ManualClock,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
) -> None:
|
||||||
|
err = TelemetryUnavailableError("jtop hung")
|
||||||
|
source = _ScriptedSource([err, err, err, err, err, err])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
|
||||||
|
with caplog.at_level("WARNING", logger="c7_inference.thermal"):
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
# Drive five more polls across a 2-second simulated window.
|
||||||
|
clock.advance_by(500_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
clock.advance_by(500_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
clock.advance_by(500_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
clock.advance_by(500_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
clock.advance_by(500_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
|
||||||
|
snapshot = publisher.read()
|
||||||
|
|
||||||
|
assert snapshot.is_telemetry_available is False
|
||||||
|
assert snapshot.thermal_throttle_active is False
|
||||||
|
assert snapshot.gpu_temp_c is None
|
||||||
|
assert snapshot.cpu_temp_c is None
|
||||||
|
|
||||||
|
unavailable_warnings = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "WARNING" and r.getMessage() == "thermal telemetry unavailable"
|
||||||
|
]
|
||||||
|
# Initial poll (start) + 5 polls x 500 ms each = 2.5 s window; rate-limit
|
||||||
|
# at 1 / sec ⇒ at most 3 WARNs (one immediately + one each second).
|
||||||
|
assert 1 <= len(unavailable_warnings) <= 3
|
||||||
|
assert fdr.records == []
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-5: cold-start with no source raises.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac5_start_raises_when_no_source_available(
|
||||||
|
config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
clock: _ManualClock,
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
def _raise(*_args: object, **_kw: object) -> None:
|
||||||
|
raise TelemetryUnavailableError("backend missing")
|
||||||
|
|
||||||
|
monkeypatch.setattr(tp_mod, "_JtopSource", _raise)
|
||||||
|
monkeypatch.setattr(tp_mod, "_PynvmlSource", _raise)
|
||||||
|
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=None)
|
||||||
|
|
||||||
|
with pytest.raises(TelemetryUnavailableError):
|
||||||
|
publisher.start()
|
||||||
|
|
||||||
|
assert publisher.is_running() is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac5_jtop_falls_back_to_pynvml(
|
||||||
|
fast_config: Config,
|
||||||
|
fdr: FakeFdrSink,
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
"""When ``jtop`` fails, ``start()`` picks the ``pynvml`` source and runs."""
|
||||||
|
|
||||||
|
pynvml_source = _ScriptedSource(
|
||||||
|
[_cool_reading()] + [_cool_reading()] * 16
|
||||||
|
)
|
||||||
|
|
||||||
|
def _raise_jtop(*_args: object, **_kw: object) -> None:
|
||||||
|
raise TelemetryUnavailableError("no jtop")
|
||||||
|
|
||||||
|
def _make_pynvml(*_args: object, **_kw: object) -> _ScriptedSource:
|
||||||
|
return pynvml_source
|
||||||
|
|
||||||
|
monkeypatch.setattr(tp_mod, "_JtopSource", _raise_jtop)
|
||||||
|
monkeypatch.setattr(tp_mod, "_PynvmlSource", _make_pynvml)
|
||||||
|
|
||||||
|
publisher = ThermalStatePublisher(fast_config, fdr)
|
||||||
|
publisher.start()
|
||||||
|
assert publisher.is_running() is True
|
||||||
|
snapshot = publisher.read()
|
||||||
|
publisher.stop()
|
||||||
|
|
||||||
|
assert snapshot.is_telemetry_available is True
|
||||||
|
assert pynvml_source.closed is True
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-6: start/stop is idempotent.
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac6_double_start_is_noop(
|
||||||
|
fast_config: Config, fdr: FakeFdrSink
|
||||||
|
) -> None:
|
||||||
|
source = _ScriptedSource([_cool_reading()])
|
||||||
|
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||||
|
|
||||||
|
publisher.start()
|
||||||
|
first_thread = publisher._thread
|
||||||
|
publisher.start()
|
||||||
|
second_thread = publisher._thread
|
||||||
|
|
||||||
|
assert publisher.is_running() is True
|
||||||
|
assert first_thread is second_thread
|
||||||
|
|
||||||
|
publisher.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac6_double_stop_is_noop(
|
||||||
|
fast_config: Config, fdr: FakeFdrSink
|
||||||
|
) -> None:
|
||||||
|
source = _ScriptedSource([_cool_reading()])
|
||||||
|
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||||
|
|
||||||
|
publisher.start()
|
||||||
|
publisher.stop()
|
||||||
|
publisher.stop()
|
||||||
|
|
||||||
|
assert publisher.is_running() is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_background_thread_polls(
|
||||||
|
fast_config: Config, fdr: FakeFdrSink
|
||||||
|
) -> None:
|
||||||
|
"""End-to-end: real thread, 200 Hz poll rate, source flips on second read."""
|
||||||
|
|
||||||
|
readings: deque[ThermalReading] = deque(
|
||||||
|
[_cool_reading()] + [_hot_reading()] * 64
|
||||||
|
)
|
||||||
|
|
||||||
|
class _Live:
|
||||||
|
closed = False
|
||||||
|
|
||||||
|
def read(self) -> ThermalReading:
|
||||||
|
if readings:
|
||||||
|
return readings.popleft()
|
||||||
|
return _hot_reading()
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._closed_flag = True
|
||||||
|
|
||||||
|
source = _Live()
|
||||||
|
publisher = ThermalStatePublisher(fast_config, fdr, source=source)
|
||||||
|
publisher.start()
|
||||||
|
deadline = time.monotonic() + 1.0
|
||||||
|
while publisher.read().thermal_throttle_active is False and time.monotonic() < deadline:
|
||||||
|
time.sleep(0.005)
|
||||||
|
publisher.stop()
|
||||||
|
|
||||||
|
assert publisher.read().thermal_throttle_active is True
|
||||||
|
assert any(r.kind == _TRANSITION_KIND for r in fdr.records)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# AC-8: FDR record envelope matches the schema (round-trip via serialise/parse).
|
||||||
|
|
||||||
|
|
||||||
|
def test_ac8_fdr_record_round_trips_through_wire_format(
|
||||||
|
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||||
|
) -> None:
|
||||||
|
from gps_denied_onboard.fdr_client.records import KNOWN_PAYLOAD_KEYS, parse, serialise
|
||||||
|
|
||||||
|
source = _ScriptedSource([_cool_reading(), _hot_reading()])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
clock.advance_by(1_000_000_000)
|
||||||
|
publisher._poll_once()
|
||||||
|
|
||||||
|
record = fdr.records[0]
|
||||||
|
encoded = serialise(record)
|
||||||
|
decoded = parse(encoded)
|
||||||
|
|
||||||
|
assert decoded.kind == _TRANSITION_KIND
|
||||||
|
assert decoded.producer_id == _PRODUCER_ID
|
||||||
|
expected_keys = KNOWN_PAYLOAD_KEYS[_TRANSITION_KIND]
|
||||||
|
assert set(decoded.payload.keys()) - {"extra"} <= expected_keys
|
||||||
|
assert decoded.payload == record.payload
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# NFR-reliability-default-safe: first poll fails → snapshot is default-safe.
|
||||||
|
|
||||||
|
|
||||||
|
def test_nfr_default_safe_on_first_poll_failure(
|
||||||
|
config: Config, fdr: FakeFdrSink, clock: _ManualClock
|
||||||
|
) -> None:
|
||||||
|
err = TelemetryUnavailableError("source dead on arrival")
|
||||||
|
source = _ScriptedSource([err])
|
||||||
|
publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source)
|
||||||
|
_prime_without_thread(publisher, source)
|
||||||
|
|
||||||
|
snapshot = publisher.read()
|
||||||
|
|
||||||
|
assert snapshot.is_telemetry_available is False
|
||||||
|
assert snapshot.thermal_throttle_active is False
|
||||||
|
assert snapshot.gpu_temp_c is None
|
||||||
|
assert snapshot.cpu_temp_c is None
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Structural: ThermalSource is a runtime-checkable Protocol; the scripted
|
||||||
|
# source satisfies it.
|
||||||
|
|
||||||
|
|
||||||
|
def test_scripted_source_satisfies_protocol() -> None:
|
||||||
|
source = _ScriptedSource([_cool_reading()])
|
||||||
|
assert isinstance(source, ThermalSource)
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_safe_snapshot_invariant_i6() -> None:
|
||||||
|
"""``is_telemetry_available == False`` ⇒ ``thermal_throttle_active == False``."""
|
||||||
|
|
||||||
|
snap = tp_mod._default_safe_snapshot(measured_at_ns=42)
|
||||||
|
assert snap.is_telemetry_available is False
|
||||||
|
assert snap.thermal_throttle_active is False
|
||||||
|
assert snap.measured_at_ns == 42
|
||||||
@@ -131,6 +131,15 @@ def _kind_payload(kind: str) -> dict[str, object]:
|
|||||||
"strategy_label": "okvis2",
|
"strategy_label": "okvis2",
|
||||||
"frame_id": "frame-0001",
|
"frame_id": "frame-0001",
|
||||||
}
|
}
|
||||||
|
if kind == "c7.thermal_transition":
|
||||||
|
return {
|
||||||
|
"previous_state": False,
|
||||||
|
"new_state": True,
|
||||||
|
"gpu_temp_c": 92.0,
|
||||||
|
"cpu_temp_c": 85.0,
|
||||||
|
"measured_clock_mhz": 600,
|
||||||
|
"measured_at_ns": 1_700_000_000_000_000_000,
|
||||||
|
}
|
||||||
raise AssertionError(f"unhandled kind in fixture: {kind!r}")
|
raise AssertionError(f"unhandled kind in fixture: {kind!r}")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user