[AZ-440] [AZ-441] [AZ-442] [AZ-443] NFT-LIM-01/02/03+05/04 blackbox scenarios

Batch 88 — adds four resource-limit blackbox scenarios + pure-logic
helpers + unit tests:

- NFT-LIM-01 Jetson memory (AC-NEW-13): tier2_only; Plan A/B budgets;
  AC-4 OOM-event scan; 30 s warm-up window; VmRSS + tegrastats streams.
- NFT-LIM-02 FDR size (AC-7.3): 30 min → 8 h linear extrapolation
  against 50 GiB; ±60 s replay-window slack for AC-1.
- NFT-LIM-03+05 storage (AC-7.4 + AC-NEW-12 + RESTRICT-STORAGE):
  aggregate ≤ 100 GiB across tile-cache + tile-cache-write +
  fdr-output; thumbnail-log < 1 GiB strict 8 h-extrapolated.
- NFT-LIM-04 thermal (AC-NEW-5 PARTIAL): tier2_only; CPU/SoC p99
  ≤ T_throttle − 5 °C; throttle-event scan; PARTIAL annotation written
  to traceability-status.json. Thresholds fixture lives at
  e2e/fixtures/jetson/thermal-thresholds.json (moved from the
  task spec's suggested tests/fixtures/ path so the file stays
  inside the blackbox_tests Owns: e2e/** envelope).

All four helpers are public-boundary-only (no src/gps_denied_onboard
imports). Scenarios skip cleanly in the Tier-1 docker harness pending
AZ-595 (SITL replay builder) for the four shared fixture inputs and
AZ-444 (Tier-2 Jetson runner) for the tier2_only scenarios.

Code review: PASS_WITH_WARNINGS (0/0/2/1). Both Mediums are
carried-over write_csv_evidence + _resolve_fixture_path duplication,
deferred to AZ-446 (batch 89). Low is the self-resolved AZ-443 fixture
ownership drift documented in the review.

Tests: 1223 e2e/_unit_tests passing (+1 vs. batch 87 from the new
directory-layout entry); 24 resource_limit scenarios collect and skip
cleanly under runner/pytest.ini.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-17 18:01:55 +03:00
parent d1e30f818f
commit 6e4a575221
22 changed files with 2785 additions and 4 deletions
+162
View File
@@ -0,0 +1,162 @@
"""FDR size budget evaluator for NFT-LIM-02 (AZ-441 / AC-7.3).
A 30 min Derkachi replay (4× the 8 min flight) is sampled per-minute
via ``du -sh fdr-output``. The per-minute samples are projected into a
typed ``(monotonic_ms, size_bytes)`` stream by the scenario; this
module extrapolates the 30 min size linearly to 8 h:
extrapolated_bytes = size_at_30min_bytes / 30 × 480
and asserts ``extrapolated_bytes ≤ 50 GiB`` (AC-2).
AC-1 (the runner actually looped Derkachi for 30 min wall-clock) is
verdict-checked here from the sample timestamps; the scenario test
provides the canonical replay duration as input.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol — inputs are pre-projected typed
samples.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
GIB_BYTES = 1024**3
REPLAY_WINDOW_MINUTES = 30
EXTRAPOLATION_WINDOW_MINUTES = 8 * 60 # AC-2 — 8 hours
DEFAULT_BUDGET_BYTES = 50 * GIB_BYTES # AC-2 — ≤ 50 GiB
# AC-1 tolerance: the scenario claims a 30 min replay; in practice the
# wall-clock window may drift by a few seconds due to loop overhead.
# Accept ±60 s slack — anything beyond that is a real replay deviation.
REPLAY_WINDOW_SLACK_MS = 60_000
@dataclass(frozen=True)
class FdrSizeSample:
"""One ``du -sh fdr-output`` sample at a monotonic timestamp."""
monotonic_ms: int
size_bytes: int
@dataclass(frozen=True)
class FdrSizeReport:
"""Aggregate NFT-LIM-02 verdict for one run."""
sample_count: int
replay_window_ms: int
size_at_30min_bytes: int | None
extrapolated_8h_bytes: int | None
budget_bytes: int
replay_window_slack_ms: int
@property
def passes_replay_window(self) -> bool:
# AC-1 — actual sampled window is within ±slack of 30 min.
target_ms = REPLAY_WINDOW_MINUTES * 60_000
return abs(self.replay_window_ms - target_ms) <= self.replay_window_slack_ms
@property
def passes_extrapolation(self) -> bool:
# AC-2 — extrapolated 8 h size ≤ budget.
return (
self.extrapolated_8h_bytes is not None
and self.extrapolated_8h_bytes <= self.budget_bytes
)
@property
def passes(self) -> bool:
return self.passes_replay_window and self.passes_extrapolation
def evaluate(
samples: Sequence[FdrSizeSample],
*,
budget_bytes: int = DEFAULT_BUDGET_BYTES,
replay_window_slack_ms: int = REPLAY_WINDOW_SLACK_MS,
) -> FdrSizeReport:
"""Compute AC-1 + AC-2 verdict from a sorted-or-unsorted sample list."""
if budget_bytes <= 0:
raise ValueError(f"budget_bytes must be > 0 (was {budget_bytes!r})")
if replay_window_slack_ms < 0:
raise ValueError(
f"replay_window_slack_ms must be >= 0 (was {replay_window_slack_ms!r})"
)
if not samples:
return FdrSizeReport(
sample_count=0,
replay_window_ms=0,
size_at_30min_bytes=None,
extrapolated_8h_bytes=None,
budget_bytes=budget_bytes,
replay_window_slack_ms=replay_window_slack_ms,
)
ordered = sorted(samples, key=lambda s: s.monotonic_ms)
window_ms = ordered[-1].monotonic_ms - ordered[0].monotonic_ms
size_at_end = ordered[-1].size_bytes
extrapolated = int(
round((size_at_end / REPLAY_WINDOW_MINUTES) * EXTRAPOLATION_WINDOW_MINUTES)
)
return FdrSizeReport(
sample_count=len(ordered),
replay_window_ms=window_ms,
size_at_30min_bytes=size_at_end,
extrapolated_8h_bytes=extrapolated,
budget_bytes=budget_bytes,
replay_window_slack_ms=replay_window_slack_ms,
)
def write_csv_evidence(out_path: Path, report: FdrSizeReport) -> Path:
"""One-row evidence file naming AC-1/AC-2 verdict + sizes."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"sample_count",
"replay_window_ms",
"size_at_30min_bytes",
"extrapolated_8h_bytes",
"budget_bytes",
"replay_window_slack_ms",
"ac1_replay_window_passes",
"ac2_extrapolation_passes",
"passes",
]
)
writer.writerow(
[
r.sample_count,
r.replay_window_ms,
"" if r.size_at_30min_bytes is None else r.size_at_30min_bytes,
"" if r.extrapolated_8h_bytes is None else r.extrapolated_8h_bytes,
r.budget_bytes,
r.replay_window_slack_ms,
"true" if r.passes_replay_window else "false",
"true" if r.passes_extrapolation else "false",
"true" if r.passes else "false",
]
)
return out_path
def write_per_minute_csv(
out_path: Path, samples: Sequence[FdrSizeSample]
) -> Path:
"""Per-sample CSV (one row per minute) for evidence trend lines."""
out_path.parent.mkdir(parents=True, exist_ok=True)
ordered = sorted(samples, key=lambda s: s.monotonic_ms)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["index", "monotonic_ms", "size_bytes"])
for i, s in enumerate(ordered):
writer.writerow([i, s.monotonic_ms, s.size_bytes])
return out_path
@@ -0,0 +1,278 @@
"""Jetson memory budget evaluator for NFT-LIM-01 (AZ-440 / AC-NEW-13).
Tier-2 only scenario. Runs a 30 s warm-up + 5 min Derkachi replay; the
runner samples memory at 1 Hz from two boundary observers:
* ``/proc/<sut_pid>/status`` ``VmRSS`` (the SUT process resident set);
* ``tegrastats`` (system-level memory used).
Both streams are evaluated against the Plan-A budgets by default
(steady ``p50 ≤ 4.5 GiB``, peak ``max ≤ 5.0 GiB``). Plan B
(``steady ≤ 6.0 GiB``, ``peak ≤ 6.5 GiB``) is gated behind the
``MEMORY_PLAN=B`` env flag — the scenario test passes the active plan
into ``evaluate(...)``; this module exposes both as named ``Plan``
constants and never reads the environment itself.
AC-4 (no OOM kills) is evaluated from a ``Sequence[OomEvent]`` projected
out of ``dmesg --since "<run_start>"`` by the scenario.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol. All inputs are pre-projected typed
records (samples / OOM events).
"""
from __future__ import annotations
import csv
from dataclasses import dataclass, field
from enum import Enum
from math import floor
from pathlib import Path
from typing import Sequence
GIB_BYTES = 1024**3
class Plan(str, Enum):
"""Active memory budget plan per AC-5 (Plan A default, Plan B gated)."""
PLAN_A = "plan-a"
PLAN_B = "plan-b"
@dataclass(frozen=True)
class PlanBudgets:
"""A pair (steady, peak) budget in bytes for one Plan."""
steady_bytes: int
peak_bytes: int
@classmethod
def for_plan(cls, plan: Plan) -> "PlanBudgets":
if plan is Plan.PLAN_A:
return cls(
steady_bytes=int(4.5 * GIB_BYTES),
peak_bytes=int(5.0 * GIB_BYTES),
)
if plan is Plan.PLAN_B:
return cls(
steady_bytes=int(6.0 * GIB_BYTES),
peak_bytes=int(6.5 * GIB_BYTES),
)
raise ValueError(f"unknown memory plan: {plan!r}")
@dataclass(frozen=True)
class MemorySample:
"""One memory sample at a monotonic timestamp.
``vmrss_bytes`` is the ``/proc/<pid>/status`` ``VmRSS`` value
converted to bytes; ``tegrastats_used_bytes`` is the system-level
used-RAM figure parsed from one ``tegrastats`` line. Both are
captured at the same nominal sample tick — they MAY diverge
slightly because the two sources poll at different cadences, which
is why the AC budgets apply to each stream independently.
"""
monotonic_ms: int
vmrss_bytes: int
tegrastats_used_bytes: int
@dataclass(frozen=True)
class OomEvent:
"""One OOM-killer line captured from ``dmesg``.
``snippet`` is the matched dmesg line (truncated to ≤200 chars in
CSV evidence). ``monotonic_ms`` is the runner's projection of the
kernel timestamp onto the monotonic clock — may be ``None`` if the
runner could not align it (the verdict still fails AC-4).
"""
monotonic_ms: int | None
snippet: str
@dataclass(frozen=True)
class StreamStats:
"""p50 + max for one memory stream over the post-warm-up window."""
sample_count: int
p50_bytes: int | None
max_bytes: int | None
def passes_steady(self, budget_bytes: int) -> bool:
return self.p50_bytes is not None and self.p50_bytes <= budget_bytes
def passes_peak(self, budget_bytes: int) -> bool:
return self.max_bytes is not None and self.max_bytes <= budget_bytes
@dataclass(frozen=True)
class MemoryBudgetReport:
"""Aggregate NFT-LIM-01 verdict for one Tier-2 run."""
plan: Plan
budgets: PlanBudgets
warm_up_ms: int
window_end_ms: int
vmrss: StreamStats
tegrastats: StreamStats
oom_events: Sequence[OomEvent] = field(default_factory=tuple)
@property
def passes_steady_state(self) -> bool:
# AC-2 — BOTH streams must satisfy steady budget.
return self.vmrss.passes_steady(self.budgets.steady_bytes) and (
self.tegrastats.passes_steady(self.budgets.steady_bytes)
)
@property
def passes_peak(self) -> bool:
# AC-3 — VmRSS peak ≤ peak budget. tegrastats system-level peak is
# informational only; AC-3 specifies VmRSS as the gating stream.
return self.vmrss.passes_peak(self.budgets.peak_bytes)
@property
def passes_no_oom(self) -> bool:
# AC-4 — zero OOM-killer entries since run_start.
return len(self.oom_events) == 0
@property
def passes(self) -> bool:
return self.passes_steady_state and self.passes_peak and self.passes_no_oom
def _percentile_int(values: Sequence[int], q: float) -> int | None:
"""Linear-interpolation percentile rounded to int bytes.
Returns ``None`` for empty input so the caller distinguishes the
no-data case. Accepts any real ``q`` in [0, 100]; outside that range
is a programmer error.
"""
if not 0.0 <= q <= 100.0:
raise ValueError(f"percentile q must be in [0, 100], got {q!r}")
if not values:
return None
ordered = sorted(values)
if len(ordered) == 1:
return int(ordered[0])
rank = (q / 100.0) * (len(ordered) - 1)
lo = floor(rank)
hi = min(lo + 1, len(ordered) - 1)
frac = rank - lo
return int(round(ordered[lo] + (ordered[hi] - ordered[lo]) * frac))
def _post_warmup_window(
samples: Sequence[MemorySample], warm_up_ms: int
) -> list[MemorySample]:
"""Drop samples whose timestamp is inside the warm-up window."""
if warm_up_ms < 0:
raise ValueError(f"warm_up_ms must be >= 0 (was {warm_up_ms!r})")
if not samples:
return []
first = min(s.monotonic_ms for s in samples)
cutoff = first + warm_up_ms
return [s for s in samples if s.monotonic_ms >= cutoff]
def _stream_stats(values: Sequence[int]) -> StreamStats:
return StreamStats(
sample_count=len(values),
p50_bytes=_percentile_int(values, 50.0),
max_bytes=max(values) if values else None,
)
def evaluate(
samples: Sequence[MemorySample],
oom_events: Sequence[OomEvent],
*,
plan: Plan = Plan.PLAN_A,
warm_up_ms: int = 30_000,
) -> MemoryBudgetReport:
"""Compute NFT-LIM-01 AC-2 + AC-3 + AC-4 verdict for one Tier-2 run."""
budgets = PlanBudgets.for_plan(plan)
post_warmup = _post_warmup_window(samples, warm_up_ms)
vmrss_values = [s.vmrss_bytes for s in post_warmup]
tegrastats_values = [s.tegrastats_used_bytes for s in post_warmup]
window_end_ms = max((s.monotonic_ms for s in post_warmup), default=warm_up_ms)
return MemoryBudgetReport(
plan=plan,
budgets=budgets,
warm_up_ms=warm_up_ms,
window_end_ms=window_end_ms,
vmrss=_stream_stats(vmrss_values),
tegrastats=_stream_stats(tegrastats_values),
oom_events=tuple(oom_events),
)
def write_csv_evidence(out_path: Path, report: MemoryBudgetReport) -> Path:
"""One-row evidence file naming the AC-2/3/4 verdict + percentiles."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"plan",
"warm_up_ms",
"window_end_ms",
"vmrss_sample_count",
"vmrss_p50_bytes",
"vmrss_max_bytes",
"tegrastats_sample_count",
"tegrastats_p50_bytes",
"tegrastats_max_bytes",
"steady_budget_bytes",
"peak_budget_bytes",
"ac2_steady_passes",
"ac3_peak_passes",
"ac4_no_oom_passes",
"oom_event_count",
"passes",
]
)
writer.writerow(
[
r.plan.value,
r.warm_up_ms,
r.window_end_ms,
r.vmrss.sample_count,
"" if r.vmrss.p50_bytes is None else r.vmrss.p50_bytes,
"" if r.vmrss.max_bytes is None else r.vmrss.max_bytes,
r.tegrastats.sample_count,
"" if r.tegrastats.p50_bytes is None else r.tegrastats.p50_bytes,
"" if r.tegrastats.max_bytes is None else r.tegrastats.max_bytes,
r.budgets.steady_bytes,
r.budgets.peak_bytes,
"true" if r.passes_steady_state else "false",
"true" if r.passes_peak else "false",
"true" if r.passes_no_oom else "false",
len(r.oom_events),
"true" if r.passes else "false",
]
)
return out_path
def write_oom_events_csv(
out_path: Path, oom_events: Sequence[OomEvent]
) -> Path:
"""Per-OOM-event CSV (one row per event) for evidence."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["index", "monotonic_ms", "snippet"])
for i, ev in enumerate(oom_events):
snippet = ev.snippet if len(ev.snippet) <= 200 else ev.snippet[:200]
writer.writerow(
[
i,
"" if ev.monotonic_ms is None else ev.monotonic_ms,
snippet,
]
)
return out_path
@@ -0,0 +1,202 @@
"""Aggregate storage + thumbnail-log budget evaluator for NFT-LIM-03/05
(AZ-442 / AC-7.4 + AC-NEW-12 + RESTRICT-STORAGE).
The two scenarios share the same 30 min Derkachi replay and the same
per-minute ``du -sh`` sampling. NFT-LIM-03 caps the *aggregate* of
three volumes at ``100 GiB`` (end-of-run snapshot); NFT-LIM-05
extrapolates the thumbnail-log subdirectory linearly to 8 h and caps
it at ``1 GiB``.
The runner projects each per-minute sample into a
``VolumeSnapshot`` carrying the four monitored sizes at one timestamp.
This module evaluates the AC-1 (aggregate) + AC-2 (8 h thumbnail-log
extrapolation) verdicts from a ``Sequence[VolumeSnapshot]``.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol — inputs are pre-projected typed
samples.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
GIB_BYTES = 1024**3
REPLAY_WINDOW_MINUTES = 30
EXTRAPOLATION_WINDOW_MINUTES = 8 * 60
AGGREGATE_BUDGET_BYTES = 100 * GIB_BYTES # AC-1 — ≤ 100 GiB
THUMBNAIL_LOG_BUDGET_BYTES = 1 * GIB_BYTES # AC-2 — < 1 GiB 8 h-extrapolated
@dataclass(frozen=True)
class VolumeSnapshot:
"""One per-minute ``du -sh`` snapshot for the four monitored volumes."""
monotonic_ms: int
tile_cache_bytes: int
tile_cache_write_bytes: int
fdr_output_bytes: int
thumbnail_log_bytes: int
@property
def aggregate_bytes(self) -> int:
return (
self.tile_cache_bytes
+ self.tile_cache_write_bytes
+ self.fdr_output_bytes
)
@dataclass(frozen=True)
class StorageBudgetReport:
"""Aggregate AC-1 + AC-2 verdict for one NFT-LIM-03+05 run."""
sample_count: int
aggregate_at_end_bytes: int | None
thumbnail_log_at_end_bytes: int | None
thumbnail_log_extrapolated_8h_bytes: int | None
aggregate_budget_bytes: int
thumbnail_log_budget_bytes: int
@property
def passes_aggregate(self) -> bool:
# AC-1 — end-of-run aggregate snapshot ≤ budget.
return (
self.aggregate_at_end_bytes is not None
and self.aggregate_at_end_bytes <= self.aggregate_budget_bytes
)
@property
def passes_thumbnail_log(self) -> bool:
# AC-2 — extrapolated 8 h thumbnail-log < budget. Strict ``<``
# because AC-2 says ``< 1 GB`` (not ``≤``).
return (
self.thumbnail_log_extrapolated_8h_bytes is not None
and self.thumbnail_log_extrapolated_8h_bytes
< self.thumbnail_log_budget_bytes
)
@property
def passes(self) -> bool:
return self.passes_aggregate and self.passes_thumbnail_log
def evaluate(
samples: Sequence[VolumeSnapshot],
*,
aggregate_budget_bytes: int = AGGREGATE_BUDGET_BYTES,
thumbnail_log_budget_bytes: int = THUMBNAIL_LOG_BUDGET_BYTES,
) -> StorageBudgetReport:
"""Compute AC-1 + AC-2 verdict from a snapshot stream."""
if aggregate_budget_bytes <= 0:
raise ValueError(
f"aggregate_budget_bytes must be > 0 (was {aggregate_budget_bytes!r})"
)
if thumbnail_log_budget_bytes <= 0:
raise ValueError(
f"thumbnail_log_budget_bytes must be > 0 "
f"(was {thumbnail_log_budget_bytes!r})"
)
if not samples:
return StorageBudgetReport(
sample_count=0,
aggregate_at_end_bytes=None,
thumbnail_log_at_end_bytes=None,
thumbnail_log_extrapolated_8h_bytes=None,
aggregate_budget_bytes=aggregate_budget_bytes,
thumbnail_log_budget_bytes=thumbnail_log_budget_bytes,
)
ordered = sorted(samples, key=lambda s: s.monotonic_ms)
last = ordered[-1]
extrapolated_thumb = int(
round(
(last.thumbnail_log_bytes / REPLAY_WINDOW_MINUTES)
* EXTRAPOLATION_WINDOW_MINUTES
)
)
return StorageBudgetReport(
sample_count=len(ordered),
aggregate_at_end_bytes=last.aggregate_bytes,
thumbnail_log_at_end_bytes=last.thumbnail_log_bytes,
thumbnail_log_extrapolated_8h_bytes=extrapolated_thumb,
aggregate_budget_bytes=aggregate_budget_bytes,
thumbnail_log_budget_bytes=thumbnail_log_budget_bytes,
)
def write_csv_evidence(out_path: Path, report: StorageBudgetReport) -> Path:
"""One-row evidence file naming the AC-1/AC-2 verdict + sizes."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"sample_count",
"aggregate_at_end_bytes",
"thumbnail_log_at_end_bytes",
"thumbnail_log_extrapolated_8h_bytes",
"aggregate_budget_bytes",
"thumbnail_log_budget_bytes",
"ac1_aggregate_passes",
"ac2_thumbnail_log_passes",
"passes",
]
)
writer.writerow(
[
r.sample_count,
"" if r.aggregate_at_end_bytes is None else r.aggregate_at_end_bytes,
""
if r.thumbnail_log_at_end_bytes is None
else r.thumbnail_log_at_end_bytes,
""
if r.thumbnail_log_extrapolated_8h_bytes is None
else r.thumbnail_log_extrapolated_8h_bytes,
r.aggregate_budget_bytes,
r.thumbnail_log_budget_bytes,
"true" if r.passes_aggregate else "false",
"true" if r.passes_thumbnail_log else "false",
"true" if r.passes else "false",
]
)
return out_path
def write_per_minute_csv(
out_path: Path, samples: Sequence[VolumeSnapshot]
) -> Path:
"""Per-sample CSV (one row per minute) for evidence trend lines."""
out_path.parent.mkdir(parents=True, exist_ok=True)
ordered = sorted(samples, key=lambda s: s.monotonic_ms)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"index",
"monotonic_ms",
"tile_cache_bytes",
"tile_cache_write_bytes",
"fdr_output_bytes",
"thumbnail_log_bytes",
"aggregate_bytes",
]
)
for i, s in enumerate(ordered):
writer.writerow(
[
i,
s.monotonic_ms,
s.tile_cache_bytes,
s.tile_cache_write_bytes,
s.fdr_output_bytes,
s.thumbnail_log_bytes,
s.aggregate_bytes,
]
)
return out_path
@@ -0,0 +1,256 @@
"""Jetson thermal envelope evaluator for NFT-LIM-04 (AZ-443 / AC-NEW-5 PARTIAL).
Tier-2 only scenario. Runs a 30 min Derkachi loop at workstation
ambient; the runner samples ``tegrastats`` at 1 Hz (cpu_temp, soc_temp)
and parses ``dmesg --since "<run_start>"`` for thermal-throttle entries.
AC-2 — zero throttling events in dmesg.
AC-3 — ``p99(cpu_temp) ≤ T_throttle_cpu 5 °C`` AND ``p99(soc_temp)
≤ T_throttle_soc 5 °C``. The throttle thresholds are read at
runtime from a fixture file (``e2e/fixtures/jetson/thermal-thresholds.json``)
so future Jetson hardware updates only require a fixture bump.
AC-4 — emit a ``traceability-status.json`` entry recording AC-NEW-5 as
PARTIAL (chamber portion required for full).
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol — inputs are pre-projected typed
records (samples + throttle events + the loaded thresholds).
"""
from __future__ import annotations
import csv
import json
from dataclasses import dataclass
from math import floor
from pathlib import Path
from typing import Sequence
HEADROOM_C = 5.0 # AC-3 — 5 °C headroom below documented T_throttle.
@dataclass(frozen=True)
class ThermalThresholds:
"""Hardware-documented T_throttle values, loaded from a fixture file.
Defaults match the Jetson Orin Nano Super values quoted in the
AZ-443 task spec (CPU = 97 °C, SoC = 95 °C per nVidia documentation).
Callers SHOULD ``load_from_fixture`` to keep this aligned with the
actual deployed hardware revision.
"""
cpu_t_throttle_c: float = 97.0
soc_t_throttle_c: float = 95.0
@property
def cpu_budget_c(self) -> float:
return self.cpu_t_throttle_c - HEADROOM_C
@property
def soc_budget_c(self) -> float:
return self.soc_t_throttle_c - HEADROOM_C
@classmethod
def load_from_fixture(cls, fixture_path: Path) -> "ThermalThresholds":
"""Parse a `thermal-thresholds.json` file. Required keys:
``cpu_t_throttle_c`` (float) and ``soc_t_throttle_c`` (float).
"""
payload = json.loads(Path(fixture_path).read_text())
if not isinstance(payload, dict):
raise ValueError(
f"thermal threshold fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
try:
cpu = float(payload["cpu_t_throttle_c"])
soc = float(payload["soc_t_throttle_c"])
except KeyError as exc:
raise ValueError(
f"thermal threshold fixture {fixture_path} missing required key {exc}"
) from exc
except (TypeError, ValueError) as exc:
raise ValueError(
f"thermal threshold fixture {fixture_path} has non-numeric value: {exc}"
) from exc
return cls(cpu_t_throttle_c=cpu, soc_t_throttle_c=soc)
@dataclass(frozen=True)
class ThermalSample:
"""One ``tegrastats`` sample at a monotonic timestamp."""
monotonic_ms: int
cpu_temp_c: float
soc_temp_c: float
@dataclass(frozen=True)
class ThrottleEvent:
"""One throttling line captured from ``dmesg`` since run_start."""
monotonic_ms: int | None
snippet: str
@dataclass(frozen=True)
class TempStreamStats:
"""p99 + max for one temperature stream."""
sample_count: int
p99_c: float | None
max_c: float | None
def passes_budget(self, budget_c: float) -> bool:
return self.p99_c is not None and self.p99_c <= budget_c
@dataclass(frozen=True)
class ThermalEnvelopeReport:
"""Aggregate AC-2 + AC-3 verdict for one NFT-LIM-04 run."""
thresholds: ThermalThresholds
cpu: TempStreamStats
soc: TempStreamStats
throttle_events: Sequence[ThrottleEvent]
@property
def passes_no_throttle(self) -> bool:
return len(self.throttle_events) == 0
@property
def passes_headroom(self) -> bool:
return self.cpu.passes_budget(self.thresholds.cpu_budget_c) and (
self.soc.passes_budget(self.thresholds.soc_budget_c)
)
@property
def passes(self) -> bool:
return self.passes_no_throttle and self.passes_headroom
def _percentile_float(values: Sequence[float], q: float) -> float | None:
if not 0.0 <= q <= 100.0:
raise ValueError(f"percentile q must be in [0, 100], got {q!r}")
if not values:
return None
ordered = sorted(values)
if len(ordered) == 1:
return float(ordered[0])
rank = (q / 100.0) * (len(ordered) - 1)
lo = floor(rank)
hi = min(lo + 1, len(ordered) - 1)
frac = rank - lo
return float(ordered[lo] + (ordered[hi] - ordered[lo]) * frac)
def _temp_stream_stats(values: Sequence[float]) -> TempStreamStats:
return TempStreamStats(
sample_count=len(values),
p99_c=_percentile_float(values, 99.0),
max_c=max(values) if values else None,
)
def evaluate(
samples: Sequence[ThermalSample],
throttle_events: Sequence[ThrottleEvent],
thresholds: ThermalThresholds,
) -> ThermalEnvelopeReport:
"""Compute AC-2 + AC-3 verdict from sampled thermal data + dmesg events."""
cpu_vals = [s.cpu_temp_c for s in samples]
soc_vals = [s.soc_temp_c for s in samples]
return ThermalEnvelopeReport(
thresholds=thresholds,
cpu=_temp_stream_stats(cpu_vals),
soc=_temp_stream_stats(soc_vals),
throttle_events=tuple(throttle_events),
)
def write_traceability_partial_annotation(out_path: Path) -> Path:
"""AC-4 — emit the AC-NEW-5 PARTIAL entry.
Writes (or merges into) a ``traceability-status.json`` file in the
evidence bundle. If the file exists, the AC-NEW-5 entry is added /
overwritten without touching other entries.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
payload: dict[str, str]
if out_path.is_file():
existing = json.loads(out_path.read_text())
if not isinstance(existing, dict):
raise ValueError(
f"existing traceability-status.json at {out_path} is not a JSON "
f"object; cannot merge"
)
payload = {str(k): str(v) for k, v in existing.items()}
else:
payload = {}
payload["AC-NEW-5"] = "PARTIAL — chamber required for full"
out_path.write_text(json.dumps(payload, indent=2, sort_keys=True))
return out_path
def write_csv_evidence(out_path: Path, report: ThermalEnvelopeReport) -> Path:
"""One-row evidence file naming AC-2/AC-3 verdict + percentiles."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"cpu_t_throttle_c",
"soc_t_throttle_c",
"cpu_budget_c",
"soc_budget_c",
"cpu_sample_count",
"cpu_p99_c",
"cpu_max_c",
"soc_sample_count",
"soc_p99_c",
"soc_max_c",
"throttle_event_count",
"ac2_no_throttle_passes",
"ac3_headroom_passes",
"passes",
]
)
writer.writerow(
[
r.thresholds.cpu_t_throttle_c,
r.thresholds.soc_t_throttle_c,
r.thresholds.cpu_budget_c,
r.thresholds.soc_budget_c,
r.cpu.sample_count,
"" if r.cpu.p99_c is None else f"{r.cpu.p99_c:.3f}",
"" if r.cpu.max_c is None else f"{r.cpu.max_c:.3f}",
r.soc.sample_count,
"" if r.soc.p99_c is None else f"{r.soc.p99_c:.3f}",
"" if r.soc.max_c is None else f"{r.soc.max_c:.3f}",
len(r.throttle_events),
"true" if r.passes_no_throttle else "false",
"true" if r.passes_headroom else "false",
"true" if r.passes else "false",
]
)
return out_path
def write_throttle_events_csv(
out_path: Path, events: Sequence[ThrottleEvent]
) -> Path:
"""Per-event CSV for evidence triage."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["index", "monotonic_ms", "snippet"])
for i, ev in enumerate(events):
snippet = ev.snippet if len(ev.snippet) <= 200 else ev.snippet[:200]
writer.writerow(
[
i,
"" if ev.monotonic_ms is None else ev.monotonic_ms,
snippet,
]
)
return out_path