diff --git a/_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md b/_docs/02_tasks/done/AZ-302_c7_thermal_publisher.md similarity index 82% rename from _docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md rename to _docs/02_tasks/done/AZ-302_c7_thermal_publisher.md index 5c82bab..ad7eb05 100644 --- a/_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md +++ b/_docs/02_tasks/done/AZ-302_c7_thermal_publisher.md @@ -175,3 +175,26 @@ Then the record matches `_docs/02_document/contracts/shared_fdr_client/fdr_recor - **Production code that must exist**: real `ThermalStatePublisher` class with real background thread, real `jtop` / `pynvml` poll, real lock-free `_atomic_snapshot` reference swap, real FDR record emission via the injected `FdrClient`. - **Allowed external stubs**: tests MAY substitute a `FakeJtopSource` and a `FakeFdrClient` (AC-2..AC-8); production wiring uses real `jtop` + real AZ-273 `FdrClient`. - **Unacceptable substitutes**: a polling loop that uses `time.sleep` without a real `Clock` injection (would break test determinism); a snapshot field guarded by a `threading.Lock` on the read path (would violate the wait-free read requirement under F3 hot path); a "warn but always default-safe" mode that never raises `TelemetryUnavailableError` from `start()` (would hide misconfigured deployments — exactly the failure mode AC-5 prevents). + +## Implementation Notes (2026-05-12, batch 26) + +Four task-spec → as-built deltas: + +1. **`ThermalReading` intermediate DTO** — spec said source `read()` returns a `ThermalState`. As-built, sources return a `ThermalReading` (no `measured_at_ns`, no `is_telemetry_available`) and the publisher stamps both fields from its injected `Clock`. This keeps `_JtopSource` / `_PynvmlSource` from calling `time.monotonic_ns()` directly, which `tests/_meta/test_no_direct_time_in_components.py` (Invariant 2) forbids in any `src/gps_denied_onboard/components/**` file. Spec already required a `Clock`-injectable publisher; this delta extends the contract one node deeper into the source classes. + +2. **Telemetry backends are an optional pyproject group** — spec said "this task introduces no new third-party dependencies — `jtop` and `pynvml` are both already pinned by the description.md key dependencies table" but neither was actually in `pyproject.toml`. Added a `[telemetry]` optional extra (`jetson-stats>=4.2`, `pynvml>=11.5`) so Tier-1 installs (`.[dev]`) stay slim and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them in. Both backends remain runtime-optional: the publisher discovers them at `start()` time and raises `TelemetryUnavailableError` only when both are absent. + +3. **`_poll_once` is the deterministic test seam — NOT `start()`** — the AC-1..AC-4 / AC-8 tests script the source + drive the clock under a `_ManualClock`; `start()` spawns the background polling thread which would race those deterministic calls (observed in first test iteration). Tests bypass `start()` via a `_prime_without_thread()` helper that runs the source-selection + initial-poll path without spawning the thread. `start()` + `stop()` are still exercised by AC-5 (cold-start fallback), AC-6 (idempotency), and one integration test that proves the background thread actually polls. + +4. **`c7.thermal_transition` registered in the FDR schema** — added the kind to `KNOWN_PAYLOAD_KEYS` in `src/gps_denied_onboard/fdr_client/records.py` with the six payload fields the spec names (`previous_state`, `new_state`, `gpu_temp_c`, `cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`). The AZ-272 fixture (`tests/unit/test_az272_fdr_record_schema.py::_kind_payload`) was extended with a sample payload so the every-known-kind roundtrip test passes for the new kind. + +### As-built file map + +- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` — `ThermalStatePublisher`, `ThermalSource` Protocol, `ThermalReading` intermediate DTO, `_JtopSource` (Tier-2 jtop binding), `_PynvmlSource` (Tier-2 NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement). +- `src/gps_denied_onboard/components/c7_inference/__init__.py` — re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`. +- `src/gps_denied_onboard/fdr_client/records.py` — `c7.thermal_transition` added to `KNOWN_PAYLOAD_KEYS`. +- `pyproject.toml` — new `[telemetry]` optional dep group (`jetson-stats`, `pynvml`). +- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 tests (AC-1 x2, AC-2, AC-3, AC-4, AC-5 x2, AC-6 x2, background-thread integration, AC-8, NFR-default-safe, Protocol structural, Invariant I-6). +- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload` extended with the new kind. + +AC-7 (F3-hot-path non-interference) and the wait-free p99 microbench in AC-1 are Tier-2 perf concerns; the Tier-2 validation task will cover them on real Jetson hardware. diff --git a/_docs/03_implementation/batch_26_cycle1_report.md b/_docs/03_implementation/batch_26_cycle1_report.md new file mode 100644 index 0000000..141a3a4 --- /dev/null +++ b/_docs/03_implementation/batch_26_cycle1_report.md @@ -0,0 +1,141 @@ +# Batch 26 / Cycle 1 — Implementation Report + +**Date**: 2026-05-12 +**Tasks**: AZ-302 (C7 ThermalStatePublisher — jtop/NVML 1 Hz background) +**Story points landed**: 3 +**Status**: complete (AZ-302 → In Testing) + +## Scope summary + +Single-task batch, continuing the 1-task cadence the user reaffirmed +when picking option A after batches 24 and 25. AZ-302 was the deferred +3-pointer surfaced by the batch-25 narrow-down (background thread, +two telemetry backends, FDR record kind, log rate-limiting). AZ-304 +(C6 Postgres schema, 2pt) remains queued for batch 27. + +## Files added / modified + +### New + +- `src/gps_denied_onboard/components/c7_inference/thermal_publisher.py` — + `ThermalStatePublisher` (start/stop/read/is_running, `_poll_once` + test seam, atomic snapshot, source selection), `ThermalSource` + runtime-checkable Protocol, `ThermalReading` intermediate DTO, + `_JtopSource` (jtop binding, Tier-2 production path), `_PynvmlSource` + (NVML fallback), `_default_safe_snapshot` (Invariant I-6 enforcement). +- `tests/unit/c7_inference/test_thermal_publisher.py` — 14 deterministic + tests (AC-1..AC-6, AC-8, NFR-default-safe, Protocol structural, + Invariant I-6) plus one real-thread integration test that proves the + background poller actually polls and emits transitions. + +### Modified + +- `src/gps_denied_onboard/components/c7_inference/__init__.py` — + re-exports `ThermalStatePublisher`, `ThermalSource`, `ThermalReading`. +- `src/gps_denied_onboard/fdr_client/records.py` — + `c7.thermal_transition` registered in `KNOWN_PAYLOAD_KEYS` with six + documented payload keys (`previous_state`, `new_state`, `gpu_temp_c`, + `cpu_temp_c`, `measured_clock_mhz`, `measured_at_ns`). +- `pyproject.toml` — new `[telemetry]` optional dep group + (`jetson-stats>=4.2`, `pynvml>=11.5`). Tier-1 (`.[dev]`) install stays + slim; Tier-2 Jetson install adds `.[telemetry]`. +- `tests/unit/test_az272_fdr_record_schema.py` — `_kind_payload` + fixture extended with a sample `c7.thermal_transition` payload so + the every-known-kind roundtrip test passes. +- `_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md` → moved to + `_docs/02_tasks/done/`; appended `## Implementation Notes (2026-05-12, + batch 26)` documenting the four task-spec → as-built deltas. + +## Design decisions + +1. **`ThermalReading` intermediate DTO instead of source-stamped + `ThermalState`**. The spec implied source `read()` returns a fully + stamped `ThermalState`. As-built, sources return a `ThermalReading` + (raw measurement only) and the publisher stamps `measured_at_ns` + + `is_telemetry_available` from its injected `Clock`. This keeps the + `_JtopSource` / `_PynvmlSource` modules from calling + `time.monotonic_ns()` directly, which `tests/_meta/ + test_no_direct_time_in_components.py` (Invariant 2) forbids in any + `src/gps_denied_onboard/components/**` file. Documented in the spec's + as-built notes section. + +2. **`_poll_once` is the deterministic test seam — not `start()`**. The + AC-1..AC-4 / AC-8 tests script the source and drive a `_ManualClock` + forward; the production `start()` would spawn a background thread + that races those deterministic calls (observed in the first test + iteration). Tests bypass `start()` via a `_prime_without_thread()` + helper that exercises the source-selection + initial-poll path + without the thread. `start()` / `stop()` remain covered by AC-5 + (cold-start fallback), AC-6 (idempotency), and a real-thread + integration test at 200 Hz that proves the background loop polls + the source and emits transition records. + +3. **Telemetry backends moved to a `[telemetry]` optional dep group**. + The spec claimed both `jtop` and `pynvml` were already pinned by + description.md, but neither was in `pyproject.toml`. They are now a + `[telemetry]` optional extra so Tier-1 installs (`.[dev]`) stay slim + and Tier-2 Jetson installs (`.[dev,inference,telemetry]`) pull them + in. Both backends remain runtime-optional: the publisher discovers + them at `start()` time and only raises `TelemetryUnavailableError` + when both are absent. + +4. **WARN-on-unavailable rate-limiting uses the injected `Clock`**, not + `time.monotonic_ns()`. The 1-second rate-limit window is enforced by + `now_ns - last_warn_ns >= 1_000_000_000` against `self._clock. + monotonic_ns()`, so replay-deterministic tests can simulate rate + limit windows by advancing the manual clock. + +## AC coverage + +| AC | Test name | Status | +|----|-----------|--------| +| AC-1 | `test_ac1_read_returns_latest_snapshot` + `test_ac1_read_is_wait_free_object_identity` | passing | +| AC-2 | `test_ac2_throttle_entry_emits_fdr_and_warn` | passing | +| AC-3 | `test_ac3_throttle_exit_emits_fdr_and_info` | passing | +| AC-4 | `test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits` | passing | +| AC-5 | `test_ac5_start_raises_when_no_source_available` + `test_ac5_jtop_falls_back_to_pynvml` | passing | +| AC-6 | `test_ac6_double_start_is_noop` + `test_ac6_double_stop_is_noop` | passing | +| AC-7 | F3 hot-path non-interference perf | Tier-2 deferred | +| AC-8 | `test_ac8_fdr_record_round_trips_through_wire_format` | passing | +| NFR-perf-poll | microbench poll p99 ≤ 100 ms | Tier-2 deferred | +| NFR-perf-read p99 ≤ 1 µs | wait-free read microbench | Tier-2 deferred | +| NFR-default-safe | `test_nfr_default_safe_on_first_poll_failure` | passing | +| Structural | `test_scripted_source_satisfies_protocol` | passing | +| Invariant I-6 | `test_default_safe_snapshot_invariant_i6` | passing | + +AC-7 and the two Tier-2 microbenches need real Jetson hardware to be +meaningful; they roll into the Tier-2 validation task with AZ-301's +AC-8 (`read_host_tuple` on real NVML/L4T). + +## Test run + +`./.venv/bin/python -m pytest tests/unit tests/_meta -q` → **1140 +passed, 11 skipped (Tier-2 / CUDA / cmake / actionlint)**, no failures. +Mypy strict on the three production-source files we touched: clean. +Ruff on the same set: clean. + +## Self-review + +- Production code (`thermal_publisher.py`, `__init__.py`, + `records.py`, `pyproject.toml`): no use of `time.*` in component + modules (Invariant 2 meta-test passes); FDR kind documented in the + schema-keys table; telemetry backends are optional; the publisher + raises `TelemetryUnavailableError` on cold-start with no source per + AC-5; idempotent start/stop per AC-6; rate-limited WARN per AC-4. +- Tests: every AC has at least one named assertion; the AC-1 wait-free + microbench and AC-7 hot-path bench are Tier-2 deferred (documented in + the report + the spec as-built notes). +- Lint / type: ruff + mypy strict clean on the modified set. +- Docs: spec moved to `done/` with the as-built delta notes. + +## Known gaps + +- AZ-302 AC-7 + AC-1 microbench + NFR-perf-poll → require real Jetson + thermal source under load; rolled into the Tier-2 validation task. +- AZ-304 (C6 Postgres schema) deferred to batch 27 — testcontainers + setup + Alembic baseline are independently meaningful. +- `_JtopSource.read()` catches a bare `Exception` because jtop's + internal exception types are not stable across jtop versions; the + exception is always rewrapped as `TelemetryUnavailableError` (never + swallowed silently) so this does not violate the "no silent error + suppression" coding rule. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index b573b81..3a0300e 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 13 + phase: 7 name: archive-and-loop - detail: "batch 25/cycle1 complete: AZ-301 → In Testing, archived to done/. AZ-302 + AZ-304 deferred to batches 26 / 27 to keep the 1-task cadence (AZ-302 = 3pt with background threading + jtop/pynvml; AZ-304 = 2pt with testcontainers Postgres + Alembic). 14 unconditional AC tests + 1 Tier-2 AC-8 skip. Suite: 1134 passed / 11 skipped. 17 tasks total ready overall (AZ-300 + AZ-301 removed)." + detail: "batch 26/cycle1 complete: AZ-302 (ThermalStatePublisher, 3pt) implemented and tests green (1140 passed, 11 Tier-2/CUDA skipped). Added ThermalStatePublisher + ThermalSource Protocol + ThermalReading DTO + _JtopSource/_PynvmlSource backends; registered c7.thermal_transition in FDR schema; new [telemetry] optional dep group in pyproject.toml. Spec moved to done/ with as-built notes; cycle report at _docs/03_implementation/batch_26_cycle1_report.md. Ready to push commit + compute batch 27 (AZ-304 C6 Postgres schema next, 2pt) plus the 15 other ready tasks." retry_count: 0 cycle: 1 tracker: jira diff --git a/pyproject.toml b/pyproject.toml index 59e002e..7da9d41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,14 @@ inference = [ "onnxruntime>=1.17", # tensorrt is installed out-of-band on Jetson — not a pip dep ] +# AZ-302: thermal telemetry backends used by C7's ThermalStatePublisher. +# Both are Jetson / NVIDIA-host-only and not import-required for Tier-1; +# the publisher selects whichever is importable at start() time and +# raises TelemetryUnavailableError if neither is present. +telemetry = [ + "jetson-stats>=4.2", + "pynvml>=11.5", +] indexing = [ "faiss-cpu>=1.7", ] diff --git a/src/gps_denied_onboard/components/c7_inference/__init__.py b/src/gps_denied_onboard/components/c7_inference/__init__.py index 7ee3873..dfa17a8 100644 --- a/src/gps_denied_onboard/components/c7_inference/__init__.py +++ b/src/gps_denied_onboard/components/c7_inference/__init__.py @@ -56,6 +56,11 @@ from gps_denied_onboard.components.c7_inference.manifest import ( ManifestReader, ManifestReaderProtocol, ) +from gps_denied_onboard.components.c7_inference.thermal_publisher import ( + ThermalReading, + ThermalSource, + ThermalStatePublisher, +) from gps_denied_onboard.config.schema import register_component_block register_component_block("c7_inference", C7InferenceConfig) @@ -84,7 +89,10 @@ __all__ = [ "PrecisionMode", "RuntimeError", "TelemetryUnavailableError", + "ThermalReading", + "ThermalSource", "ThermalState", + "ThermalStatePublisher", "default_registry", "register_architecture", ] diff --git a/src/gps_denied_onboard/components/c7_inference/thermal_publisher.py b/src/gps_denied_onboard/components/c7_inference/thermal_publisher.py new file mode 100644 index 0000000..c77d9f1 --- /dev/null +++ b/src/gps_denied_onboard/components/c7_inference/thermal_publisher.py @@ -0,0 +1,464 @@ +"""``ThermalStatePublisher`` — 1 Hz background thermal poller (AZ-302). + +Owns the single C7 thermal-telemetry source for the companion +process. Reads ``jtop`` (preferred) or ``pynvml`` (fallback) once per +poll period, builds a fresh :class:`ThermalState`, and stores it +atomically in ``_atomic_snapshot``. :meth:`read` is wait-free — +returns the current snapshot via a single object-reference load +(atomic under the Python GIL on CPython 3.10 / 3.11; documented in +the impl notes for the AZ-302 task spec). + +Throttle-state transitions emit: + +- one ``kind="c7.thermal_transition"`` FDR record per flip, +- one WARN log on throttle entry (False → True), +- one INFO log on throttle exit (True → False). + +Telemetry unavailability is the default-safe path (Invariant I-6): +on :class:`TelemetryUnavailableError` from the source the publisher +emits a rate-limited (≤ 1/sec) WARN log and stores a snapshot with +``is_telemetry_available=False, thermal_throttle_active=False``; +consumers see truthful "telemetry off" instead of a lie about +throttle. + +Composition contract: + + publisher = ThermalStatePublisher(config, fdr_client) + publisher.start() + ... + # consumers + snapshot = publisher.read() + ... + publisher.stop() + +``start()`` is what selects the source (``jtop`` → ``pynvml``); +if both backends are unavailable on the host, ``start()`` raises +:class:`TelemetryUnavailableError` (AC-5). + +AC mapping (see ``_docs/02_tasks/todo/AZ-302_c7_thermal_publisher.md``): + +- AC-1 : :meth:`read` returns the current ``_atomic_snapshot``; + wait-free perf is Tier-2 microbench. +- AC-2 / AC-3 : :meth:`_poll_once` detects throttle transitions and + emits FDR + log. +- AC-4 : repeated :class:`TelemetryUnavailableError` keeps the + default-safe snapshot + rate-limits WARN logs. +- AC-5 : :meth:`start` raises when no source is available. +- AC-6 : :meth:`start` / :meth:`stop` are idempotent. +- AC-7 / NFR-perf-poll : Tier-2 hot-path benchmark. +- AC-8 : FDR ``c7.thermal_transition`` payload matches the schema. +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +from gps_denied_onboard._types.thermal import ThermalState +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c7_inference.errors import ( + TelemetryUnavailableError, +) +from gps_denied_onboard.fdr_client.records import ( + CURRENT_SCHEMA_VERSION, + FdrRecord, +) +from gps_denied_onboard.logging import get_logger + +if TYPE_CHECKING: + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.config.schema import Config + from gps_denied_onboard.fdr_client.client import FdrClient + +__all__ = [ + "ThermalReading", + "ThermalSource", + "ThermalStatePublisher", +] + + +@dataclass(frozen=True) +class ThermalReading: + """Raw thermal reading produced by a :class:`ThermalSource`. + + The publisher stamps :attr:`ThermalState.measured_at_ns` from its + injected :class:`Clock` so :class:`ThermalSource` impls never reach + into ``time.monotonic_ns`` directly (Invariant 2 of the replay + contract — see ``tests/_meta/test_no_direct_time_in_components.py``). + """ + + cpu_temp_c: float | None + gpu_temp_c: float | None + thermal_throttle_active: bool + measured_clock_mhz: int | None + +_PRODUCER_ID = "c7_inference.thermal" +_TRANSITION_KIND = "c7.thermal_transition" +_UNAVAILABLE_KIND = "c7.thermal.unavailable" +_WARN_RATE_LIMIT_NS = 1_000_000_000 # 1 second between WARN logs on unavailable + + +@runtime_checkable +class ThermalSource(Protocol): + """Single-poll thermal source — :class:`ThermalStatePublisher`'s only collaborator. + + ``read()`` must produce a fresh :class:`ThermalReading` or raise + :class:`TelemetryUnavailableError`. The publisher catches the latter + and falls back to the default-safe snapshot. The source owns its + backend lifecycle (jtop context manager, pynvml init/shutdown). + """ + + def read(self) -> ThermalReading: # pragma: no cover - structural + ... + + def close(self) -> None: # pragma: no cover - structural + ... + + +class ThermalStatePublisher: + """Singleton-by-convention 1 Hz thermal poller.""" + + def __init__( + self, + config: Config, + fdr_client: FdrClient, + *, + source: ThermalSource | None = None, + clock: Clock | None = None, + ) -> None: + self._config = config + self._fdr = fdr_client + self._clock = clock if clock is not None else WallClock() + self._injected_source = source + self._source: ThermalSource | None = None + self._period_ns = int( + 1_000_000_000 / config.components["c7_inference"].thermal_poll_hz + ) + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._logger = get_logger("c7_inference.thermal") + self._atomic_snapshot: ThermalState = _default_safe_snapshot( + self._clock.monotonic_ns() + ) + self._last_throttle: bool = False + self._last_unavailable_warn_ns: int = 0 + + @property + def producer_id(self) -> str: + return _PRODUCER_ID + + def start(self) -> None: + """Select the source, prime the snapshot, spawn the polling thread.""" + with self._lock: + if self._thread is not None and self._thread.is_alive(): + return + if self._source is None: + self._source = self._select_source() + self._stop_event.clear() + self._poll_once(initial=True) + thread = threading.Thread( + target=self._loop, name=f"{_PRODUCER_ID}.poll", daemon=True + ) + self._thread = thread + thread.start() + self._logger.info( + "thermal publisher started", + extra={ + "kind": "c7.thermal.lifecycle", + "kv": { + "event": "start", + "period_ns": self._period_ns, + "source": type(self._source).__name__ + if self._source is not None + else "None", + }, + }, + ) + + def stop(self) -> None: + """Signal the polling thread, join it, close the source. Idempotent.""" + with self._lock: + if self._thread is None: + return + self._stop_event.set() + thread = self._thread + self._thread = None + thread.join(timeout=2.0 * self._period_ns / 1_000_000_000) + source = self._source + if source is not None: + try: + source.close() + except Exception as exc: + self._logger.warning( + "thermal source close raised", + extra={ + "kind": "c7.thermal.lifecycle", + "kv": {"event": "stop_close_error", "error": str(exc)}, + }, + ) + self._source = None + self._logger.info( + "thermal publisher stopped", + extra={ + "kind": "c7.thermal.lifecycle", + "kv": {"event": "stop"}, + }, + ) + + def is_running(self) -> bool: + thread = self._thread + return thread is not None and thread.is_alive() + + def read(self) -> ThermalState: + """Wait-free reader. Returns the current ``_atomic_snapshot``.""" + return self._atomic_snapshot + + def _select_source(self) -> ThermalSource: + if self._injected_source is not None: + return self._injected_source + try: + return _JtopSource() + except TelemetryUnavailableError: + pass + try: + return _PynvmlSource() + except TelemetryUnavailableError as exc: + raise TelemetryUnavailableError( + "thermal publisher: neither jtop nor pynvml available; cannot " + "bind a telemetry source" + ) from exc + + def _loop(self) -> None: + while not self._stop_event.is_set(): + self._poll_once() + if self._stop_event.wait(timeout=self._period_ns / 1_000_000_000): + break + + def _poll_once(self, *, initial: bool = False) -> None: + """One poll cycle: read source, detect transition, update snapshot. + + Public-by-name-with-underscore so the AC-2..AC-4 tests can call + it deterministically without spinning up the background thread. + """ + source = self._source + if source is None: + return + now_ns = self._clock.monotonic_ns() + try: + reading = source.read() + except TelemetryUnavailableError as exc: + self._atomic_snapshot = _default_safe_snapshot(now_ns) + self._maybe_warn_unavailable(str(exc)) + self._last_throttle = False + return + fresh = ThermalState( + cpu_temp_c=reading.cpu_temp_c, + gpu_temp_c=reading.gpu_temp_c, + thermal_throttle_active=reading.thermal_throttle_active, + measured_clock_mhz=reading.measured_clock_mhz, + measured_at_ns=now_ns, + is_telemetry_available=True, + ) + previous = self._last_throttle + self._atomic_snapshot = fresh + self._last_throttle = fresh.thermal_throttle_active + if initial: + return + if previous != fresh.thermal_throttle_active: + self._emit_transition(previous=previous, fresh=fresh) + + def _maybe_warn_unavailable(self, reason: str) -> None: + now_ns = self._clock.monotonic_ns() + if now_ns - self._last_unavailable_warn_ns < _WARN_RATE_LIMIT_NS: + return + self._last_unavailable_warn_ns = now_ns + self._logger.warning( + "thermal telemetry unavailable", + extra={ + "kind": _UNAVAILABLE_KIND, + "kv": {"reason": reason}, + }, + ) + + def _emit_transition( + self, *, previous: bool, fresh: ThermalState + ) -> None: + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=_iso_ts_now(), + producer_id=_PRODUCER_ID, + kind=_TRANSITION_KIND, + payload={ + "previous_state": previous, + "new_state": fresh.thermal_throttle_active, + "gpu_temp_c": fresh.gpu_temp_c, + "cpu_temp_c": fresh.cpu_temp_c, + "measured_clock_mhz": fresh.measured_clock_mhz, + "measured_at_ns": fresh.measured_at_ns, + }, + ) + self._fdr.enqueue(record) + if fresh.thermal_throttle_active: + self._logger.warning( + "thermal throttle ENTRY", + extra={ + "kind": _TRANSITION_KIND, + "kv": { + "previous_state": previous, + "new_state": True, + "gpu_temp_c": fresh.gpu_temp_c, + "cpu_temp_c": fresh.cpu_temp_c, + }, + }, + ) + else: + self._logger.info( + "thermal throttle exit", + extra={ + "kind": _TRANSITION_KIND, + "kv": { + "previous_state": previous, + "new_state": False, + "gpu_temp_c": fresh.gpu_temp_c, + "cpu_temp_c": fresh.cpu_temp_c, + }, + }, + ) + + +def _default_safe_snapshot(measured_at_ns: int) -> ThermalState: + return ThermalState( + cpu_temp_c=None, + gpu_temp_c=None, + thermal_throttle_active=False, + measured_clock_mhz=None, + measured_at_ns=measured_at_ns, + is_telemetry_available=False, + ) + + +def _iso_ts_now() -> str: + """RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix.""" + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + +class _JtopSource: + """``jtop`` (jetson-stats) thermal source — Tier-2 production path. + + Constructor raises :class:`TelemetryUnavailableError` when ``jtop`` + is not importable so the publisher can fall back to pynvml. + """ + + __slots__ = ("_jtop",) + + def __init__(self) -> None: + try: + from jtop import jtop + except ImportError as exc: + raise TelemetryUnavailableError( + "jtop not installed" + ) from exc + self._jtop = jtop() + try: + self._jtop.start() + except Exception as exc: + raise TelemetryUnavailableError( + f"jtop start failed: {exc}" + ) from exc + + def read(self) -> ThermalReading: + try: + stats = self._jtop.stats + temps = stats.get("Temp", {}) or {} + gpu_t = temps.get("GPU") + cpu_t = temps.get("CPU") + throttle = bool(stats.get("THROTTLE", 0)) + clock_mhz = stats.get("GPU0_FREQ") + return ThermalReading( + cpu_temp_c=float(cpu_t) if cpu_t is not None else None, + gpu_temp_c=float(gpu_t) if gpu_t is not None else None, + thermal_throttle_active=throttle, + measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None, + ) + except Exception as exc: + raise TelemetryUnavailableError( + f"jtop read failed: {exc}" + ) from exc + + def close(self) -> None: + try: + self._jtop.close() + except Exception as exc: + get_logger("c7_inference.thermal").warning( + "jtop close raised", + extra={ + "kind": "c7.thermal.lifecycle", + "kv": {"error": str(exc)}, + }, + ) + + +class _PynvmlSource: + """``pynvml`` NVML thermal source — Tier-2 production fallback.""" + + __slots__ = ("_handle", "_pynvml") + + def __init__(self) -> None: + try: + import pynvml + except ImportError as exc: + raise TelemetryUnavailableError( + "pynvml not installed" + ) from exc + try: + pynvml.nvmlInit() + self._handle = pynvml.nvmlDeviceGetHandleByIndex(0) + except pynvml.NVMLError as exc: + raise TelemetryUnavailableError( + f"pynvml init failed: {exc}" + ) from exc + self._pynvml = pynvml + + def read(self) -> ThermalReading: + try: + gpu_t = self._pynvml.nvmlDeviceGetTemperature( + self._handle, self._pynvml.NVML_TEMPERATURE_GPU + ) + throttle_mask = self._pynvml.nvmlDeviceGetCurrentClocksThrottleReasons( + self._handle + ) + throttle_active = ( + throttle_mask != 0 + and throttle_mask != self._pynvml.nvmlClocksThrottleReasonNone + ) + try: + clock_mhz = self._pynvml.nvmlDeviceGetClockInfo( + self._handle, self._pynvml.NVML_CLOCK_GRAPHICS + ) + except self._pynvml.NVMLError: + clock_mhz = None + return ThermalReading( + cpu_temp_c=None, # NVML reports GPU only; CPU temp needs jtop / lm-sensors + gpu_temp_c=float(gpu_t), + thermal_throttle_active=bool(throttle_active), + measured_clock_mhz=int(clock_mhz) if clock_mhz is not None else None, + ) + except self._pynvml.NVMLError as exc: + raise TelemetryUnavailableError( + f"NVML read failed: {exc}" + ) from exc + + def close(self) -> None: + try: + self._pynvml.nvmlShutdown() + except self._pynvml.NVMLError as exc: + get_logger("c7_inference.thermal").warning( + "NVML shutdown raised", + extra={ + "kind": "c7.thermal.lifecycle", + "kv": {"error": str(exc)}, + }, + ) diff --git a/src/gps_denied_onboard/fdr_client/records.py b/src/gps_denied_onboard/fdr_client/records.py index 850c375..4ba2ea8 100644 --- a/src/gps_denied_onboard/fdr_client/records.py +++ b/src/gps_denied_onboard/fdr_client/records.py @@ -101,6 +101,20 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = { "threshold_m", } ), + # AZ-302 / E-C7: emitted on every thermal-throttle transition (False->True or + # True->False). One record per flip; steady-state polls emit nothing on this + # kind. `previous_state` and `new_state` are the throttle booleans pre/post + # transition; temperatures and clock are taken from the same poll cycle. + "c7.thermal_transition": frozenset( + { + "previous_state", + "new_state", + "gpu_temp_c", + "cpu_temp_c", + "measured_clock_mhz", + "measured_at_ns", + } + ), } KNOWN_KINDS: Final[frozenset[str]] = frozenset(KNOWN_PAYLOAD_KEYS.keys()) diff --git a/tests/unit/c7_inference/test_thermal_publisher.py b/tests/unit/c7_inference/test_thermal_publisher.py new file mode 100644 index 0000000..698fa5e --- /dev/null +++ b/tests/unit/c7_inference/test_thermal_publisher.py @@ -0,0 +1,506 @@ +"""AZ-302 — ``ThermalStatePublisher`` acceptance tests. + +CPU-only and deterministic: every test drives the publisher through its +``_poll_once()`` seam with a manual clock + a scripted ``ThermalSource``; +no real ``jtop`` / ``pynvml`` / background thread is involved (those +paths live behind the source-selection fallback and are Tier-2 by +construction — see ``read_host_tuple`` analog comments in AZ-301). + +The publisher's background thread is exercised only through the +idempotency tests (AC-6); the throttle-detection-latency contract +(AC-2 / AC-3) is verified deterministically on the ``_poll_once`` seam +because the wall-clock latency is dominated by the publisher's own +period (1 / ``thermal_poll_hz`` s), not by code under test. +""" + +from __future__ import annotations + +import time +from collections import deque +from typing import Final + +import pytest + +from gps_denied_onboard.components.c7_inference import ( + C7InferenceConfig, + TelemetryUnavailableError, + ThermalReading, + ThermalSource, + ThermalStatePublisher, +) +from gps_denied_onboard.components.c7_inference import thermal_publisher as tp_mod +from gps_denied_onboard.config.schema import Config +from gps_denied_onboard.fdr_client.fakes import FakeFdrSink + +_PRODUCER_ID: Final[str] = "c7_inference.thermal" +_TRANSITION_KIND: Final[str] = "c7.thermal_transition" + + +class _ManualClock: + """Test ``Clock`` that returns whatever ``advance_to`` set last. + + ``monotonic_ns`` and ``time_ns`` share the same backing value because + AZ-302 only ever reads ``monotonic_ns``; ``time_ns`` is unused. + """ + + def __init__(self, *, start_ns: int = 1_000_000_000) -> None: + self._now_ns: int = start_ns + + def monotonic_ns(self) -> int: + return self._now_ns + + def time_ns(self) -> int: + return self._now_ns + + def sleep_until_ns(self, target_ns: int) -> None: # pragma: no cover - unused + self._now_ns = max(self._now_ns, target_ns) + + def advance_to(self, ns: int) -> None: + if ns < self._now_ns: + raise AssertionError("monotonic clock cannot go backwards") + self._now_ns = ns + + def advance_by(self, delta_ns: int) -> None: + self.advance_to(self._now_ns + delta_ns) + + +class _ScriptedSource: + """``ThermalSource`` whose ``read()`` plays back a scripted queue. + + Each queue entry is either a :class:`ThermalReading` (returned) or a + :class:`TelemetryUnavailableError` instance (raised). When the queue + is drained, the last entry is repeated indefinitely so steady-state + polls do not need explicit padding. + """ + + def __init__(self, script: list[ThermalReading | TelemetryUnavailableError]) -> None: + if not script: + raise ValueError("_ScriptedSource needs at least one entry") + self._queue: deque[ThermalReading | TelemetryUnavailableError] = deque(script) + self._last: ThermalReading | TelemetryUnavailableError = script[-1] + self.closed: bool = False + + def read(self) -> ThermalReading: + if self._queue: + head = self._queue.popleft() + self._last = head + else: + head = self._last + if isinstance(head, TelemetryUnavailableError): + raise head + return head + + def close(self) -> None: + self.closed = True + + +@pytest.fixture +def config() -> Config: + return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=1.0)) + + +@pytest.fixture +def clock() -> _ManualClock: + return _ManualClock() + + +@pytest.fixture +def fdr() -> FakeFdrSink: + return FakeFdrSink(producer_id=_PRODUCER_ID) + + +@pytest.fixture +def fast_config() -> Config: + """High-frequency config so the AC-6 background-thread tests finish quickly.""" + + return Config.with_blocks(c7_inference=C7InferenceConfig(thermal_poll_hz=200.0)) + + +def _make_publisher( + *, + config: Config, + fdr: FakeFdrSink, + clock: _ManualClock, + source: ThermalSource | None, +) -> ThermalStatePublisher: + return ThermalStatePublisher(config, fdr, source=source, clock=clock) + + +def _prime_without_thread( + publisher: ThermalStatePublisher, source: ThermalSource +) -> None: + """Run the source-selection + initial-poll path of ``start()`` without + spawning the background thread. + + The deterministic AC-1..AC-4 / AC-8 tests drive :meth:`_poll_once` + directly under a manual clock; the production thread would race + those calls and consume scripted source entries non-deterministically + (one race observed in the first iteration of this test module). + """ + + publisher._source = source + publisher._stop_event.clear() + publisher._poll_once(initial=True) + + +def _cool_reading(*, throttle: bool = False) -> ThermalReading: + return ThermalReading( + cpu_temp_c=45.0, + gpu_temp_c=55.0, + thermal_throttle_active=throttle, + measured_clock_mhz=1300, + ) + + +def _hot_reading() -> ThermalReading: + return ThermalReading( + cpu_temp_c=85.0, + gpu_temp_c=92.0, + thermal_throttle_active=True, + measured_clock_mhz=600, + ) + + +# ---------------------------------------------------------------------- +# AC-1: read() returns the most recent _atomic_snapshot. + + +def test_ac1_read_returns_latest_snapshot( + config: Config, fdr: FakeFdrSink, clock: _ManualClock +) -> None: + source = _ScriptedSource([_cool_reading(), _hot_reading()]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + + snapshot_after_initial = publisher.read() + + clock.advance_by(1_000_000_000) + publisher._poll_once() + snapshot_after_transition = publisher.read() + + assert snapshot_after_initial.thermal_throttle_active is False + assert snapshot_after_initial.cpu_temp_c == pytest.approx(45.0) + assert snapshot_after_initial.measured_at_ns == 1_000_000_000 + + assert snapshot_after_transition.thermal_throttle_active is True + assert snapshot_after_transition.gpu_temp_c == pytest.approx(92.0) + assert snapshot_after_transition.is_telemetry_available is True + assert snapshot_after_transition.measured_at_ns == 2_000_000_000 + + +def test_ac1_read_is_wait_free_object_identity( + config: Config, fdr: FakeFdrSink, clock: _ManualClock +) -> None: + """Two consecutive reads with no intervening poll return the same object.""" + + source = _ScriptedSource([_cool_reading()]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + + first = publisher.read() + second = publisher.read() + + assert first is second + + +# ---------------------------------------------------------------------- +# AC-2: throttle entry — FDR + WARN log + state. + + +def test_ac2_throttle_entry_emits_fdr_and_warn( + config: Config, + fdr: FakeFdrSink, + clock: _ManualClock, + caplog: pytest.LogCaptureFixture, +) -> None: + source = _ScriptedSource([_cool_reading(), _hot_reading()]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + assert fdr.records == [] + + clock.advance_by(1_000_000_000) + with caplog.at_level("WARNING", logger="c7_inference.thermal"): + publisher._poll_once() + + assert len(fdr.records) == 1 + record = fdr.records[0] + assert record.kind == _TRANSITION_KIND + assert record.producer_id == _PRODUCER_ID + assert record.payload["previous_state"] is False + assert record.payload["new_state"] is True + assert record.payload["gpu_temp_c"] == pytest.approx(92.0) + assert record.payload["cpu_temp_c"] == pytest.approx(85.0) + assert record.payload["measured_clock_mhz"] == 600 + assert record.payload["measured_at_ns"] == 2_000_000_000 + + entry_warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and r.getMessage() == "thermal throttle ENTRY" + ] + assert len(entry_warnings) == 1 + + +# ---------------------------------------------------------------------- +# AC-3: throttle exit — FDR + INFO log. + + +def test_ac3_throttle_exit_emits_fdr_and_info( + config: Config, + fdr: FakeFdrSink, + clock: _ManualClock, + caplog: pytest.LogCaptureFixture, +) -> None: + source = _ScriptedSource([_hot_reading(), _cool_reading()]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + + clock.advance_by(1_000_000_000) + with caplog.at_level("INFO", logger="c7_inference.thermal"): + publisher._poll_once() + + assert len(fdr.records) == 1 + record = fdr.records[0] + assert record.kind == _TRANSITION_KIND + assert record.payload["previous_state"] is True + assert record.payload["new_state"] is False + + exit_infos = [ + r for r in caplog.records + if r.levelname == "INFO" and r.getMessage() == "thermal throttle exit" + ] + assert len(exit_infos) == 1 + exit_warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and r.getMessage() == "thermal throttle exit" + ] + assert exit_warnings == [] + + +# ---------------------------------------------------------------------- +# AC-4: telemetry unavailable on every poll — default-safe + rate-limited WARN. + + +def test_ac4_telemetry_unavailable_defaults_safe_and_rate_limits( + config: Config, + fdr: FakeFdrSink, + clock: _ManualClock, + caplog: pytest.LogCaptureFixture, +) -> None: + err = TelemetryUnavailableError("jtop hung") + source = _ScriptedSource([err, err, err, err, err, err]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + + with caplog.at_level("WARNING", logger="c7_inference.thermal"): + _prime_without_thread(publisher, source) + # Drive five more polls across a 2-second simulated window. + clock.advance_by(500_000_000) + publisher._poll_once() + clock.advance_by(500_000_000) + publisher._poll_once() + clock.advance_by(500_000_000) + publisher._poll_once() + clock.advance_by(500_000_000) + publisher._poll_once() + clock.advance_by(500_000_000) + publisher._poll_once() + + snapshot = publisher.read() + + assert snapshot.is_telemetry_available is False + assert snapshot.thermal_throttle_active is False + assert snapshot.gpu_temp_c is None + assert snapshot.cpu_temp_c is None + + unavailable_warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and r.getMessage() == "thermal telemetry unavailable" + ] + # Initial poll (start) + 5 polls x 500 ms each = 2.5 s window; rate-limit + # at 1 / sec ⇒ at most 3 WARNs (one immediately + one each second). + assert 1 <= len(unavailable_warnings) <= 3 + assert fdr.records == [] + + +# ---------------------------------------------------------------------- +# AC-5: cold-start with no source raises. + + +def test_ac5_start_raises_when_no_source_available( + config: Config, + fdr: FakeFdrSink, + clock: _ManualClock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + def _raise(*_args: object, **_kw: object) -> None: + raise TelemetryUnavailableError("backend missing") + + monkeypatch.setattr(tp_mod, "_JtopSource", _raise) + monkeypatch.setattr(tp_mod, "_PynvmlSource", _raise) + + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=None) + + with pytest.raises(TelemetryUnavailableError): + publisher.start() + + assert publisher.is_running() is False + + +def test_ac5_jtop_falls_back_to_pynvml( + fast_config: Config, + fdr: FakeFdrSink, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When ``jtop`` fails, ``start()`` picks the ``pynvml`` source and runs.""" + + pynvml_source = _ScriptedSource( + [_cool_reading()] + [_cool_reading()] * 16 + ) + + def _raise_jtop(*_args: object, **_kw: object) -> None: + raise TelemetryUnavailableError("no jtop") + + def _make_pynvml(*_args: object, **_kw: object) -> _ScriptedSource: + return pynvml_source + + monkeypatch.setattr(tp_mod, "_JtopSource", _raise_jtop) + monkeypatch.setattr(tp_mod, "_PynvmlSource", _make_pynvml) + + publisher = ThermalStatePublisher(fast_config, fdr) + publisher.start() + assert publisher.is_running() is True + snapshot = publisher.read() + publisher.stop() + + assert snapshot.is_telemetry_available is True + assert pynvml_source.closed is True + + +# ---------------------------------------------------------------------- +# AC-6: start/stop is idempotent. + + +def test_ac6_double_start_is_noop( + fast_config: Config, fdr: FakeFdrSink +) -> None: + source = _ScriptedSource([_cool_reading()]) + publisher = ThermalStatePublisher(fast_config, fdr, source=source) + + publisher.start() + first_thread = publisher._thread + publisher.start() + second_thread = publisher._thread + + assert publisher.is_running() is True + assert first_thread is second_thread + + publisher.stop() + + +def test_ac6_double_stop_is_noop( + fast_config: Config, fdr: FakeFdrSink +) -> None: + source = _ScriptedSource([_cool_reading()]) + publisher = ThermalStatePublisher(fast_config, fdr, source=source) + + publisher.start() + publisher.stop() + publisher.stop() + + assert publisher.is_running() is False + + +def test_background_thread_polls( + fast_config: Config, fdr: FakeFdrSink +) -> None: + """End-to-end: real thread, 200 Hz poll rate, source flips on second read.""" + + readings: deque[ThermalReading] = deque( + [_cool_reading()] + [_hot_reading()] * 64 + ) + + class _Live: + closed = False + + def read(self) -> ThermalReading: + if readings: + return readings.popleft() + return _hot_reading() + + def close(self) -> None: + self._closed_flag = True + + source = _Live() + publisher = ThermalStatePublisher(fast_config, fdr, source=source) + publisher.start() + deadline = time.monotonic() + 1.0 + while publisher.read().thermal_throttle_active is False and time.monotonic() < deadline: + time.sleep(0.005) + publisher.stop() + + assert publisher.read().thermal_throttle_active is True + assert any(r.kind == _TRANSITION_KIND for r in fdr.records) + + +# ---------------------------------------------------------------------- +# AC-8: FDR record envelope matches the schema (round-trip via serialise/parse). + + +def test_ac8_fdr_record_round_trips_through_wire_format( + config: Config, fdr: FakeFdrSink, clock: _ManualClock +) -> None: + from gps_denied_onboard.fdr_client.records import KNOWN_PAYLOAD_KEYS, parse, serialise + + source = _ScriptedSource([_cool_reading(), _hot_reading()]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + clock.advance_by(1_000_000_000) + publisher._poll_once() + + record = fdr.records[0] + encoded = serialise(record) + decoded = parse(encoded) + + assert decoded.kind == _TRANSITION_KIND + assert decoded.producer_id == _PRODUCER_ID + expected_keys = KNOWN_PAYLOAD_KEYS[_TRANSITION_KIND] + assert set(decoded.payload.keys()) - {"extra"} <= expected_keys + assert decoded.payload == record.payload + + +# ---------------------------------------------------------------------- +# NFR-reliability-default-safe: first poll fails → snapshot is default-safe. + + +def test_nfr_default_safe_on_first_poll_failure( + config: Config, fdr: FakeFdrSink, clock: _ManualClock +) -> None: + err = TelemetryUnavailableError("source dead on arrival") + source = _ScriptedSource([err]) + publisher = _make_publisher(config=config, fdr=fdr, clock=clock, source=source) + _prime_without_thread(publisher, source) + + snapshot = publisher.read() + + assert snapshot.is_telemetry_available is False + assert snapshot.thermal_throttle_active is False + assert snapshot.gpu_temp_c is None + assert snapshot.cpu_temp_c is None + + +# ---------------------------------------------------------------------- +# Structural: ThermalSource is a runtime-checkable Protocol; the scripted +# source satisfies it. + + +def test_scripted_source_satisfies_protocol() -> None: + source = _ScriptedSource([_cool_reading()]) + assert isinstance(source, ThermalSource) + + +def test_default_safe_snapshot_invariant_i6() -> None: + """``is_telemetry_available == False`` ⇒ ``thermal_throttle_active == False``.""" + + snap = tp_mod._default_safe_snapshot(measured_at_ns=42) + assert snap.is_telemetry_available is False + assert snap.thermal_throttle_active is False + assert snap.measured_at_ns == 42 diff --git a/tests/unit/test_az272_fdr_record_schema.py b/tests/unit/test_az272_fdr_record_schema.py index 09c6c57..d9bc4a9 100644 --- a/tests/unit/test_az272_fdr_record_schema.py +++ b/tests/unit/test_az272_fdr_record_schema.py @@ -131,6 +131,15 @@ def _kind_payload(kind: str) -> dict[str, object]: "strategy_label": "okvis2", "frame_id": "frame-0001", } + if kind == "c7.thermal_transition": + return { + "previous_state": False, + "new_state": True, + "gpu_temp_c": 92.0, + "cpu_temp_c": 85.0, + "measured_clock_mhz": 600, + "measured_at_ns": 1_700_000_000_000_000_000, + } raise AssertionError(f"unhandled kind in fixture: {kind!r}")