Files
Oleksandr Bezdieniezhnykh 6e4a575221 [AZ-440] [AZ-441] [AZ-442] [AZ-443] NFT-LIM-01/02/03+05/04 blackbox scenarios
Batch 88 — adds four resource-limit blackbox scenarios + pure-logic
helpers + unit tests:

- NFT-LIM-01 Jetson memory (AC-NEW-13): tier2_only; Plan A/B budgets;
  AC-4 OOM-event scan; 30 s warm-up window; VmRSS + tegrastats streams.
- NFT-LIM-02 FDR size (AC-7.3): 30 min → 8 h linear extrapolation
  against 50 GiB; ±60 s replay-window slack for AC-1.
- NFT-LIM-03+05 storage (AC-7.4 + AC-NEW-12 + RESTRICT-STORAGE):
  aggregate ≤ 100 GiB across tile-cache + tile-cache-write +
  fdr-output; thumbnail-log < 1 GiB strict 8 h-extrapolated.
- NFT-LIM-04 thermal (AC-NEW-5 PARTIAL): tier2_only; CPU/SoC p99
  ≤ T_throttle − 5 °C; throttle-event scan; PARTIAL annotation written
  to traceability-status.json. Thresholds fixture lives at
  e2e/fixtures/jetson/thermal-thresholds.json (moved from the
  task spec's suggested tests/fixtures/ path so the file stays
  inside the blackbox_tests Owns: e2e/** envelope).

All four helpers are public-boundary-only (no src/gps_denied_onboard
imports). Scenarios skip cleanly in the Tier-1 docker harness pending
AZ-595 (SITL replay builder) for the four shared fixture inputs and
AZ-444 (Tier-2 Jetson runner) for the tier2_only scenarios.

Code review: PASS_WITH_WARNINGS (0/0/2/1). Both Mediums are
carried-over write_csv_evidence + _resolve_fixture_path duplication,
deferred to AZ-446 (batch 89). Low is the self-resolved AZ-443 fixture
ownership drift documented in the review.

Tests: 1223 e2e/_unit_tests passing (+1 vs. batch 87 from the new
directory-layout entry); 24 resource_limit scenarios collect and skip
cleanly under runner/pytest.ini.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 18:01:55 +03:00

279 lines
9.1 KiB
Python

"""Jetson memory budget evaluator for NFT-LIM-01 (AZ-440 / AC-NEW-13).
Tier-2 only scenario. Runs a 30 s warm-up + 5 min Derkachi replay; the
runner samples memory at 1 Hz from two boundary observers:
* ``/proc/<sut_pid>/status`` ``VmRSS`` (the SUT process resident set);
* ``tegrastats`` (system-level memory used).
Both streams are evaluated against the Plan-A budgets by default
(steady ``p50 ≤ 4.5 GiB``, peak ``max ≤ 5.0 GiB``). Plan B
(``steady ≤ 6.0 GiB``, ``peak ≤ 6.5 GiB``) is gated behind the
``MEMORY_PLAN=B`` env flag — the scenario test passes the active plan
into ``evaluate(...)``; this module exposes both as named ``Plan``
constants and never reads the environment itself.
AC-4 (no OOM kills) is evaluated from a ``Sequence[OomEvent]`` projected
out of ``dmesg --since "<run_start>"`` by the scenario.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol. All inputs are pre-projected typed
records (samples / OOM events).
"""
from __future__ import annotations
import csv
from dataclasses import dataclass, field
from enum import Enum
from math import floor
from pathlib import Path
from typing import Sequence
GIB_BYTES = 1024**3
class Plan(str, Enum):
"""Active memory budget plan per AC-5 (Plan A default, Plan B gated)."""
PLAN_A = "plan-a"
PLAN_B = "plan-b"
@dataclass(frozen=True)
class PlanBudgets:
"""A pair (steady, peak) budget in bytes for one Plan."""
steady_bytes: int
peak_bytes: int
@classmethod
def for_plan(cls, plan: Plan) -> "PlanBudgets":
if plan is Plan.PLAN_A:
return cls(
steady_bytes=int(4.5 * GIB_BYTES),
peak_bytes=int(5.0 * GIB_BYTES),
)
if plan is Plan.PLAN_B:
return cls(
steady_bytes=int(6.0 * GIB_BYTES),
peak_bytes=int(6.5 * GIB_BYTES),
)
raise ValueError(f"unknown memory plan: {plan!r}")
@dataclass(frozen=True)
class MemorySample:
"""One memory sample at a monotonic timestamp.
``vmrss_bytes`` is the ``/proc/<pid>/status`` ``VmRSS`` value
converted to bytes; ``tegrastats_used_bytes`` is the system-level
used-RAM figure parsed from one ``tegrastats`` line. Both are
captured at the same nominal sample tick — they MAY diverge
slightly because the two sources poll at different cadences, which
is why the AC budgets apply to each stream independently.
"""
monotonic_ms: int
vmrss_bytes: int
tegrastats_used_bytes: int
@dataclass(frozen=True)
class OomEvent:
"""One OOM-killer line captured from ``dmesg``.
``snippet`` is the matched dmesg line (truncated to ≤200 chars in
CSV evidence). ``monotonic_ms`` is the runner's projection of the
kernel timestamp onto the monotonic clock — may be ``None`` if the
runner could not align it (the verdict still fails AC-4).
"""
monotonic_ms: int | None
snippet: str
@dataclass(frozen=True)
class StreamStats:
"""p50 + max for one memory stream over the post-warm-up window."""
sample_count: int
p50_bytes: int | None
max_bytes: int | None
def passes_steady(self, budget_bytes: int) -> bool:
return self.p50_bytes is not None and self.p50_bytes <= budget_bytes
def passes_peak(self, budget_bytes: int) -> bool:
return self.max_bytes is not None and self.max_bytes <= budget_bytes
@dataclass(frozen=True)
class MemoryBudgetReport:
"""Aggregate NFT-LIM-01 verdict for one Tier-2 run."""
plan: Plan
budgets: PlanBudgets
warm_up_ms: int
window_end_ms: int
vmrss: StreamStats
tegrastats: StreamStats
oom_events: Sequence[OomEvent] = field(default_factory=tuple)
@property
def passes_steady_state(self) -> bool:
# AC-2 — BOTH streams must satisfy steady budget.
return self.vmrss.passes_steady(self.budgets.steady_bytes) and (
self.tegrastats.passes_steady(self.budgets.steady_bytes)
)
@property
def passes_peak(self) -> bool:
# AC-3 — VmRSS peak ≤ peak budget. tegrastats system-level peak is
# informational only; AC-3 specifies VmRSS as the gating stream.
return self.vmrss.passes_peak(self.budgets.peak_bytes)
@property
def passes_no_oom(self) -> bool:
# AC-4 — zero OOM-killer entries since run_start.
return len(self.oom_events) == 0
@property
def passes(self) -> bool:
return self.passes_steady_state and self.passes_peak and self.passes_no_oom
def _percentile_int(values: Sequence[int], q: float) -> int | None:
"""Linear-interpolation percentile rounded to int bytes.
Returns ``None`` for empty input so the caller distinguishes the
no-data case. Accepts any real ``q`` in [0, 100]; outside that range
is a programmer error.
"""
if not 0.0 <= q <= 100.0:
raise ValueError(f"percentile q must be in [0, 100], got {q!r}")
if not values:
return None
ordered = sorted(values)
if len(ordered) == 1:
return int(ordered[0])
rank = (q / 100.0) * (len(ordered) - 1)
lo = floor(rank)
hi = min(lo + 1, len(ordered) - 1)
frac = rank - lo
return int(round(ordered[lo] + (ordered[hi] - ordered[lo]) * frac))
def _post_warmup_window(
samples: Sequence[MemorySample], warm_up_ms: int
) -> list[MemorySample]:
"""Drop samples whose timestamp is inside the warm-up window."""
if warm_up_ms < 0:
raise ValueError(f"warm_up_ms must be >= 0 (was {warm_up_ms!r})")
if not samples:
return []
first = min(s.monotonic_ms for s in samples)
cutoff = first + warm_up_ms
return [s for s in samples if s.monotonic_ms >= cutoff]
def _stream_stats(values: Sequence[int]) -> StreamStats:
return StreamStats(
sample_count=len(values),
p50_bytes=_percentile_int(values, 50.0),
max_bytes=max(values) if values else None,
)
def evaluate(
samples: Sequence[MemorySample],
oom_events: Sequence[OomEvent],
*,
plan: Plan = Plan.PLAN_A,
warm_up_ms: int = 30_000,
) -> MemoryBudgetReport:
"""Compute NFT-LIM-01 AC-2 + AC-3 + AC-4 verdict for one Tier-2 run."""
budgets = PlanBudgets.for_plan(plan)
post_warmup = _post_warmup_window(samples, warm_up_ms)
vmrss_values = [s.vmrss_bytes for s in post_warmup]
tegrastats_values = [s.tegrastats_used_bytes for s in post_warmup]
window_end_ms = max((s.monotonic_ms for s in post_warmup), default=warm_up_ms)
return MemoryBudgetReport(
plan=plan,
budgets=budgets,
warm_up_ms=warm_up_ms,
window_end_ms=window_end_ms,
vmrss=_stream_stats(vmrss_values),
tegrastats=_stream_stats(tegrastats_values),
oom_events=tuple(oom_events),
)
def write_csv_evidence(out_path: Path, report: MemoryBudgetReport) -> Path:
"""One-row evidence file naming the AC-2/3/4 verdict + percentiles."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"plan",
"warm_up_ms",
"window_end_ms",
"vmrss_sample_count",
"vmrss_p50_bytes",
"vmrss_max_bytes",
"tegrastats_sample_count",
"tegrastats_p50_bytes",
"tegrastats_max_bytes",
"steady_budget_bytes",
"peak_budget_bytes",
"ac2_steady_passes",
"ac3_peak_passes",
"ac4_no_oom_passes",
"oom_event_count",
"passes",
]
)
writer.writerow(
[
r.plan.value,
r.warm_up_ms,
r.window_end_ms,
r.vmrss.sample_count,
"" if r.vmrss.p50_bytes is None else r.vmrss.p50_bytes,
"" if r.vmrss.max_bytes is None else r.vmrss.max_bytes,
r.tegrastats.sample_count,
"" if r.tegrastats.p50_bytes is None else r.tegrastats.p50_bytes,
"" if r.tegrastats.max_bytes is None else r.tegrastats.max_bytes,
r.budgets.steady_bytes,
r.budgets.peak_bytes,
"true" if r.passes_steady_state else "false",
"true" if r.passes_peak else "false",
"true" if r.passes_no_oom else "false",
len(r.oom_events),
"true" if r.passes else "false",
]
)
return out_path
def write_oom_events_csv(
out_path: Path, oom_events: Sequence[OomEvent]
) -> Path:
"""Per-OOM-event CSV (one row per event) for evidence."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["index", "monotonic_ms", "snippet"])
for i, ev in enumerate(oom_events):
snippet = ev.snippet if len(ev.snippet) <= 200 else ev.snippet[:200]
writer.writerow(
[
i,
"" if ev.monotonic_ms is None else ev.monotonic_ms,
snippet,
]
)
return out_path