"""Jetson thermal envelope evaluator for NFT-LIM-04 (AZ-443 / AC-NEW-5 PARTIAL). Tier-2 only scenario. Runs a 30 min Derkachi loop at workstation ambient; the runner samples ``tegrastats`` at 1 Hz (cpu_temp, soc_temp) and parses ``dmesg --since ""`` for thermal-throttle entries. AC-2 — zero throttling events in dmesg. AC-3 — ``p99(cpu_temp) ≤ T_throttle_cpu − 5 °C`` AND ``p99(soc_temp) ≤ T_throttle_soc − 5 °C``. The throttle thresholds are read at runtime from a fixture file (``e2e/fixtures/jetson/thermal-thresholds.json``) so future Jetson hardware updates only require a fixture bump. AC-4 — emit a ``traceability-status.json`` entry recording AC-NEW-5 as PARTIAL (chamber portion required for full). Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol — inputs are pre-projected typed records (samples + throttle events + the loaded thresholds). """ from __future__ import annotations import csv import json from dataclasses import dataclass from math import floor from pathlib import Path from typing import Sequence HEADROOM_C = 5.0 # AC-3 — 5 °C headroom below documented T_throttle. @dataclass(frozen=True) class ThermalThresholds: """Hardware-documented T_throttle values, loaded from a fixture file. Defaults match the Jetson Orin Nano Super values quoted in the AZ-443 task spec (CPU = 97 °C, SoC = 95 °C per nVidia documentation). Callers SHOULD ``load_from_fixture`` to keep this aligned with the actual deployed hardware revision. """ cpu_t_throttle_c: float = 97.0 soc_t_throttle_c: float = 95.0 @property def cpu_budget_c(self) -> float: return self.cpu_t_throttle_c - HEADROOM_C @property def soc_budget_c(self) -> float: return self.soc_t_throttle_c - HEADROOM_C @classmethod def load_from_fixture(cls, fixture_path: Path) -> "ThermalThresholds": """Parse a `thermal-thresholds.json` file. Required keys: ``cpu_t_throttle_c`` (float) and ``soc_t_throttle_c`` (float). """ payload = json.loads(Path(fixture_path).read_text()) if not isinstance(payload, dict): raise ValueError( f"thermal threshold fixture {fixture_path} must be a JSON object; " f"got top-level type={type(payload).__name__}" ) try: cpu = float(payload["cpu_t_throttle_c"]) soc = float(payload["soc_t_throttle_c"]) except KeyError as exc: raise ValueError( f"thermal threshold fixture {fixture_path} missing required key {exc}" ) from exc except (TypeError, ValueError) as exc: raise ValueError( f"thermal threshold fixture {fixture_path} has non-numeric value: {exc}" ) from exc return cls(cpu_t_throttle_c=cpu, soc_t_throttle_c=soc) @dataclass(frozen=True) class ThermalSample: """One ``tegrastats`` sample at a monotonic timestamp.""" monotonic_ms: int cpu_temp_c: float soc_temp_c: float @dataclass(frozen=True) class ThrottleEvent: """One throttling line captured from ``dmesg`` since run_start.""" monotonic_ms: int | None snippet: str @dataclass(frozen=True) class TempStreamStats: """p99 + max for one temperature stream.""" sample_count: int p99_c: float | None max_c: float | None def passes_budget(self, budget_c: float) -> bool: return self.p99_c is not None and self.p99_c <= budget_c @dataclass(frozen=True) class ThermalEnvelopeReport: """Aggregate AC-2 + AC-3 verdict for one NFT-LIM-04 run.""" thresholds: ThermalThresholds cpu: TempStreamStats soc: TempStreamStats throttle_events: Sequence[ThrottleEvent] @property def passes_no_throttle(self) -> bool: return len(self.throttle_events) == 0 @property def passes_headroom(self) -> bool: return self.cpu.passes_budget(self.thresholds.cpu_budget_c) and ( self.soc.passes_budget(self.thresholds.soc_budget_c) ) @property def passes(self) -> bool: return self.passes_no_throttle and self.passes_headroom def _percentile_float(values: Sequence[float], q: float) -> float | None: if not 0.0 <= q <= 100.0: raise ValueError(f"percentile q must be in [0, 100], got {q!r}") if not values: return None ordered = sorted(values) if len(ordered) == 1: return float(ordered[0]) rank = (q / 100.0) * (len(ordered) - 1) lo = floor(rank) hi = min(lo + 1, len(ordered) - 1) frac = rank - lo return float(ordered[lo] + (ordered[hi] - ordered[lo]) * frac) def _temp_stream_stats(values: Sequence[float]) -> TempStreamStats: return TempStreamStats( sample_count=len(values), p99_c=_percentile_float(values, 99.0), max_c=max(values) if values else None, ) def evaluate( samples: Sequence[ThermalSample], throttle_events: Sequence[ThrottleEvent], thresholds: ThermalThresholds, ) -> ThermalEnvelopeReport: """Compute AC-2 + AC-3 verdict from sampled thermal data + dmesg events.""" cpu_vals = [s.cpu_temp_c for s in samples] soc_vals = [s.soc_temp_c for s in samples] return ThermalEnvelopeReport( thresholds=thresholds, cpu=_temp_stream_stats(cpu_vals), soc=_temp_stream_stats(soc_vals), throttle_events=tuple(throttle_events), ) def write_traceability_partial_annotation(out_path: Path) -> Path: """AC-4 — emit the AC-NEW-5 PARTIAL entry. Writes (or merges into) a ``traceability-status.json`` file in the evidence bundle. If the file exists, the AC-NEW-5 entry is added / overwritten without touching other entries. """ out_path.parent.mkdir(parents=True, exist_ok=True) payload: dict[str, str] if out_path.is_file(): existing = json.loads(out_path.read_text()) if not isinstance(existing, dict): raise ValueError( f"existing traceability-status.json at {out_path} is not a JSON " f"object; cannot merge" ) payload = {str(k): str(v) for k, v in existing.items()} else: payload = {} payload["AC-NEW-5"] = "PARTIAL — chamber required for full" out_path.write_text(json.dumps(payload, indent=2, sort_keys=True)) return out_path def write_csv_evidence(out_path: Path, report: ThermalEnvelopeReport) -> Path: """One-row evidence file naming AC-2/AC-3 verdict + percentiles.""" out_path.parent.mkdir(parents=True, exist_ok=True) r = report with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "cpu_t_throttle_c", "soc_t_throttle_c", "cpu_budget_c", "soc_budget_c", "cpu_sample_count", "cpu_p99_c", "cpu_max_c", "soc_sample_count", "soc_p99_c", "soc_max_c", "throttle_event_count", "ac2_no_throttle_passes", "ac3_headroom_passes", "passes", ] ) writer.writerow( [ r.thresholds.cpu_t_throttle_c, r.thresholds.soc_t_throttle_c, r.thresholds.cpu_budget_c, r.thresholds.soc_budget_c, r.cpu.sample_count, "" if r.cpu.p99_c is None else f"{r.cpu.p99_c:.3f}", "" if r.cpu.max_c is None else f"{r.cpu.max_c:.3f}", r.soc.sample_count, "" if r.soc.p99_c is None else f"{r.soc.p99_c:.3f}", "" if r.soc.max_c is None else f"{r.soc.max_c:.3f}", len(r.throttle_events), "true" if r.passes_no_throttle else "false", "true" if r.passes_headroom else "false", "true" if r.passes else "false", ] ) return out_path def write_throttle_events_csv( out_path: Path, events: Sequence[ThrottleEvent] ) -> Path: """Per-event CSV for evidence triage.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow(["index", "monotonic_ms", "snippet"]) for i, ev in enumerate(events): snippet = ev.snippet if len(ev.snippet) <= 200 else ev.snippet[:200] writer.writerow( [ i, "" if ev.monotonic_ms is None else ev.monotonic_ms, snippet, ] ) return out_path