"""Aggregate storage + thumbnail-log budget evaluator for NFT-LIM-03/05 (AZ-442 / AC-7.4 + AC-NEW-12 + RESTRICT-STORAGE). The two scenarios share the same 30 min Derkachi replay and the same per-minute ``du -sh`` sampling. NFT-LIM-03 caps the *aggregate* of three volumes at ``100 GiB`` (end-of-run snapshot); NFT-LIM-05 extrapolates the thumbnail-log subdirectory linearly to 8 h and caps it at ``1 GiB``. The runner projects each per-minute sample into a ``VolumeSnapshot`` carrying the four monitored sizes at one timestamp. This module evaluates the AC-1 (aggregate) + AC-2 (8 h thumbnail-log extrapolation) verdicts from a ``Sequence[VolumeSnapshot]``. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol — inputs are pre-projected typed samples. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Sequence GIB_BYTES = 1024**3 REPLAY_WINDOW_MINUTES = 30 EXTRAPOLATION_WINDOW_MINUTES = 8 * 60 AGGREGATE_BUDGET_BYTES = 100 * GIB_BYTES # AC-1 — ≤ 100 GiB THUMBNAIL_LOG_BUDGET_BYTES = 1 * GIB_BYTES # AC-2 — < 1 GiB 8 h-extrapolated @dataclass(frozen=True) class VolumeSnapshot: """One per-minute ``du -sh`` snapshot for the four monitored volumes.""" monotonic_ms: int tile_cache_bytes: int tile_cache_write_bytes: int fdr_output_bytes: int thumbnail_log_bytes: int @property def aggregate_bytes(self) -> int: return ( self.tile_cache_bytes + self.tile_cache_write_bytes + self.fdr_output_bytes ) @dataclass(frozen=True) class StorageBudgetReport: """Aggregate AC-1 + AC-2 verdict for one NFT-LIM-03+05 run.""" sample_count: int aggregate_at_end_bytes: int | None thumbnail_log_at_end_bytes: int | None thumbnail_log_extrapolated_8h_bytes: int | None aggregate_budget_bytes: int thumbnail_log_budget_bytes: int @property def passes_aggregate(self) -> bool: # AC-1 — end-of-run aggregate snapshot ≤ budget. return ( self.aggregate_at_end_bytes is not None and self.aggregate_at_end_bytes <= self.aggregate_budget_bytes ) @property def passes_thumbnail_log(self) -> bool: # AC-2 — extrapolated 8 h thumbnail-log < budget. Strict ``<`` # because AC-2 says ``< 1 GB`` (not ``≤``). return ( self.thumbnail_log_extrapolated_8h_bytes is not None and self.thumbnail_log_extrapolated_8h_bytes < self.thumbnail_log_budget_bytes ) @property def passes(self) -> bool: return self.passes_aggregate and self.passes_thumbnail_log def evaluate( samples: Sequence[VolumeSnapshot], *, aggregate_budget_bytes: int = AGGREGATE_BUDGET_BYTES, thumbnail_log_budget_bytes: int = THUMBNAIL_LOG_BUDGET_BYTES, ) -> StorageBudgetReport: """Compute AC-1 + AC-2 verdict from a snapshot stream.""" if aggregate_budget_bytes <= 0: raise ValueError( f"aggregate_budget_bytes must be > 0 (was {aggregate_budget_bytes!r})" ) if thumbnail_log_budget_bytes <= 0: raise ValueError( f"thumbnail_log_budget_bytes must be > 0 " f"(was {thumbnail_log_budget_bytes!r})" ) if not samples: return StorageBudgetReport( sample_count=0, aggregate_at_end_bytes=None, thumbnail_log_at_end_bytes=None, thumbnail_log_extrapolated_8h_bytes=None, aggregate_budget_bytes=aggregate_budget_bytes, thumbnail_log_budget_bytes=thumbnail_log_budget_bytes, ) ordered = sorted(samples, key=lambda s: s.monotonic_ms) last = ordered[-1] extrapolated_thumb = int( round( (last.thumbnail_log_bytes / REPLAY_WINDOW_MINUTES) * EXTRAPOLATION_WINDOW_MINUTES ) ) return StorageBudgetReport( sample_count=len(ordered), aggregate_at_end_bytes=last.aggregate_bytes, thumbnail_log_at_end_bytes=last.thumbnail_log_bytes, thumbnail_log_extrapolated_8h_bytes=extrapolated_thumb, aggregate_budget_bytes=aggregate_budget_bytes, thumbnail_log_budget_bytes=thumbnail_log_budget_bytes, ) def write_csv_evidence(out_path: Path, report: StorageBudgetReport) -> Path: """One-row evidence file naming the AC-1/AC-2 verdict + sizes.""" out_path.parent.mkdir(parents=True, exist_ok=True) r = report with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "sample_count", "aggregate_at_end_bytes", "thumbnail_log_at_end_bytes", "thumbnail_log_extrapolated_8h_bytes", "aggregate_budget_bytes", "thumbnail_log_budget_bytes", "ac1_aggregate_passes", "ac2_thumbnail_log_passes", "passes", ] ) writer.writerow( [ r.sample_count, "" if r.aggregate_at_end_bytes is None else r.aggregate_at_end_bytes, "" if r.thumbnail_log_at_end_bytes is None else r.thumbnail_log_at_end_bytes, "" if r.thumbnail_log_extrapolated_8h_bytes is None else r.thumbnail_log_extrapolated_8h_bytes, r.aggregate_budget_bytes, r.thumbnail_log_budget_bytes, "true" if r.passes_aggregate else "false", "true" if r.passes_thumbnail_log else "false", "true" if r.passes else "false", ] ) return out_path def write_per_minute_csv( out_path: Path, samples: Sequence[VolumeSnapshot] ) -> Path: """Per-sample CSV (one row per minute) for evidence trend lines.""" out_path.parent.mkdir(parents=True, exist_ok=True) ordered = sorted(samples, key=lambda s: s.monotonic_ms) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "index", "monotonic_ms", "tile_cache_bytes", "tile_cache_write_bytes", "fdr_output_bytes", "thumbnail_log_bytes", "aggregate_bytes", ] ) for i, s in enumerate(ordered): writer.writerow( [ i, s.monotonic_ms, s.tile_cache_bytes, s.tile_cache_write_bytes, s.fdr_output_bytes, s.thumbnail_log_bytes, s.aggregate_bytes, ] ) return out_path