"""Jetson memory budget evaluator for NFT-LIM-01 (AZ-440 / AC-NEW-13). Tier-2 only scenario. Runs a 30 s warm-up + 5 min Derkachi replay; the runner samples memory at 1 Hz from two boundary observers: * ``/proc//status`` ``VmRSS`` (the SUT process resident set); * ``tegrastats`` (system-level memory used). Both streams are evaluated against the Plan-A budgets by default (steady ``p50 ≤ 4.5 GiB``, peak ``max ≤ 5.0 GiB``). Plan B (``steady ≤ 6.0 GiB``, ``peak ≤ 6.5 GiB``) is gated behind the ``MEMORY_PLAN=B`` env flag — the scenario test passes the active plan into ``evaluate(...)``; this module exposes both as named ``Plan`` constants and never reads the environment itself. AC-4 (no OOM kills) is evaluated from a ``Sequence[OomEvent]`` projected out of ``dmesg --since ""`` by the scenario. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. All inputs are pre-projected typed records (samples / OOM events). """ from __future__ import annotations import csv from dataclasses import dataclass, field from enum import Enum from math import floor from pathlib import Path from typing import Sequence GIB_BYTES = 1024**3 class Plan(str, Enum): """Active memory budget plan per AC-5 (Plan A default, Plan B gated).""" PLAN_A = "plan-a" PLAN_B = "plan-b" @dataclass(frozen=True) class PlanBudgets: """A pair (steady, peak) budget in bytes for one Plan.""" steady_bytes: int peak_bytes: int @classmethod def for_plan(cls, plan: Plan) -> "PlanBudgets": if plan is Plan.PLAN_A: return cls( steady_bytes=int(4.5 * GIB_BYTES), peak_bytes=int(5.0 * GIB_BYTES), ) if plan is Plan.PLAN_B: return cls( steady_bytes=int(6.0 * GIB_BYTES), peak_bytes=int(6.5 * GIB_BYTES), ) raise ValueError(f"unknown memory plan: {plan!r}") @dataclass(frozen=True) class MemorySample: """One memory sample at a monotonic timestamp. ``vmrss_bytes`` is the ``/proc//status`` ``VmRSS`` value converted to bytes; ``tegrastats_used_bytes`` is the system-level used-RAM figure parsed from one ``tegrastats`` line. Both are captured at the same nominal sample tick — they MAY diverge slightly because the two sources poll at different cadences, which is why the AC budgets apply to each stream independently. """ monotonic_ms: int vmrss_bytes: int tegrastats_used_bytes: int @dataclass(frozen=True) class OomEvent: """One OOM-killer line captured from ``dmesg``. ``snippet`` is the matched dmesg line (truncated to ≤200 chars in CSV evidence). ``monotonic_ms`` is the runner's projection of the kernel timestamp onto the monotonic clock — may be ``None`` if the runner could not align it (the verdict still fails AC-4). """ monotonic_ms: int | None snippet: str @dataclass(frozen=True) class StreamStats: """p50 + max for one memory stream over the post-warm-up window.""" sample_count: int p50_bytes: int | None max_bytes: int | None def passes_steady(self, budget_bytes: int) -> bool: return self.p50_bytes is not None and self.p50_bytes <= budget_bytes def passes_peak(self, budget_bytes: int) -> bool: return self.max_bytes is not None and self.max_bytes <= budget_bytes @dataclass(frozen=True) class MemoryBudgetReport: """Aggregate NFT-LIM-01 verdict for one Tier-2 run.""" plan: Plan budgets: PlanBudgets warm_up_ms: int window_end_ms: int vmrss: StreamStats tegrastats: StreamStats oom_events: Sequence[OomEvent] = field(default_factory=tuple) @property def passes_steady_state(self) -> bool: # AC-2 — BOTH streams must satisfy steady budget. return self.vmrss.passes_steady(self.budgets.steady_bytes) and ( self.tegrastats.passes_steady(self.budgets.steady_bytes) ) @property def passes_peak(self) -> bool: # AC-3 — VmRSS peak ≤ peak budget. tegrastats system-level peak is # informational only; AC-3 specifies VmRSS as the gating stream. return self.vmrss.passes_peak(self.budgets.peak_bytes) @property def passes_no_oom(self) -> bool: # AC-4 — zero OOM-killer entries since run_start. return len(self.oom_events) == 0 @property def passes(self) -> bool: return self.passes_steady_state and self.passes_peak and self.passes_no_oom def _percentile_int(values: Sequence[int], q: float) -> int | None: """Linear-interpolation percentile rounded to int bytes. Returns ``None`` for empty input so the caller distinguishes the no-data case. Accepts any real ``q`` in [0, 100]; outside that range is a programmer error. """ if not 0.0 <= q <= 100.0: raise ValueError(f"percentile q must be in [0, 100], got {q!r}") if not values: return None ordered = sorted(values) if len(ordered) == 1: return int(ordered[0]) rank = (q / 100.0) * (len(ordered) - 1) lo = floor(rank) hi = min(lo + 1, len(ordered) - 1) frac = rank - lo return int(round(ordered[lo] + (ordered[hi] - ordered[lo]) * frac)) def _post_warmup_window( samples: Sequence[MemorySample], warm_up_ms: int ) -> list[MemorySample]: """Drop samples whose timestamp is inside the warm-up window.""" if warm_up_ms < 0: raise ValueError(f"warm_up_ms must be >= 0 (was {warm_up_ms!r})") if not samples: return [] first = min(s.monotonic_ms for s in samples) cutoff = first + warm_up_ms return [s for s in samples if s.monotonic_ms >= cutoff] def _stream_stats(values: Sequence[int]) -> StreamStats: return StreamStats( sample_count=len(values), p50_bytes=_percentile_int(values, 50.0), max_bytes=max(values) if values else None, ) def evaluate( samples: Sequence[MemorySample], oom_events: Sequence[OomEvent], *, plan: Plan = Plan.PLAN_A, warm_up_ms: int = 30_000, ) -> MemoryBudgetReport: """Compute NFT-LIM-01 AC-2 + AC-3 + AC-4 verdict for one Tier-2 run.""" budgets = PlanBudgets.for_plan(plan) post_warmup = _post_warmup_window(samples, warm_up_ms) vmrss_values = [s.vmrss_bytes for s in post_warmup] tegrastats_values = [s.tegrastats_used_bytes for s in post_warmup] window_end_ms = max((s.monotonic_ms for s in post_warmup), default=warm_up_ms) return MemoryBudgetReport( plan=plan, budgets=budgets, warm_up_ms=warm_up_ms, window_end_ms=window_end_ms, vmrss=_stream_stats(vmrss_values), tegrastats=_stream_stats(tegrastats_values), oom_events=tuple(oom_events), ) def write_csv_evidence(out_path: Path, report: MemoryBudgetReport) -> Path: """One-row evidence file naming the AC-2/3/4 verdict + percentiles.""" out_path.parent.mkdir(parents=True, exist_ok=True) r = report with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "plan", "warm_up_ms", "window_end_ms", "vmrss_sample_count", "vmrss_p50_bytes", "vmrss_max_bytes", "tegrastats_sample_count", "tegrastats_p50_bytes", "tegrastats_max_bytes", "steady_budget_bytes", "peak_budget_bytes", "ac2_steady_passes", "ac3_peak_passes", "ac4_no_oom_passes", "oom_event_count", "passes", ] ) writer.writerow( [ r.plan.value, r.warm_up_ms, r.window_end_ms, r.vmrss.sample_count, "" if r.vmrss.p50_bytes is None else r.vmrss.p50_bytes, "" if r.vmrss.max_bytes is None else r.vmrss.max_bytes, r.tegrastats.sample_count, "" if r.tegrastats.p50_bytes is None else r.tegrastats.p50_bytes, "" if r.tegrastats.max_bytes is None else r.tegrastats.max_bytes, r.budgets.steady_bytes, r.budgets.peak_bytes, "true" if r.passes_steady_state else "false", "true" if r.passes_peak else "false", "true" if r.passes_no_oom else "false", len(r.oom_events), "true" if r.passes else "false", ] ) return out_path def write_oom_events_csv( out_path: Path, oom_events: Sequence[OomEvent] ) -> Path: """Per-OOM-event CSV (one row per event) for evidence.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow(["index", "monotonic_ms", "snippet"]) for i, ev in enumerate(oom_events): snippet = ev.snippet if len(ev.snippet) <= 200 else ev.snippet[:200] writer.writerow( [ i, "" if ev.monotonic_ms is None else ev.monotonic_ms, snippet, ] ) return out_path