"""AddressSanitizer fuzz evaluator for NFT-SEC-04 (AZ-439 / RESTRICT-CVE-1 release-gate). Companion to ``cve_probe_evaluator``: while the probe asserts a single crafted JPEG does not crash the SUT, the fuzz scenario runs the ``build_kind=ASan`` SUT image under random JPEG inputs for ≥4 h and asserts: * AC-2: 0 ASan findings (``heap-buffer-overflow``, ``use-after-free``, ``stack-buffer-overflow``, ``heap-use-after-free``, etc.) in the captured stderr / ASan log; * AC-3 (informational only — no hard threshold): the harness reached ≥``MIN_CORPUS_COVERAGE`` unique JPEG inputs. ASan-finding categories follow the canonical sanitizer wording. The classifier matches a curated, non-exhaustive set; an *unknown* match is bucketed into ``OTHER_FINDING`` and still fails AC-2. Unknown findings are surfaced in the CSV evidence so a regression triage knows to extend the canonical set. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import re from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Sequence MIN_FUZZ_DURATION_SECONDS = 4 * 3600 # AC-2 — release-gate minimum MIN_CORPUS_COVERAGE = 1000 # AC-3 — informational only class AsanFindingCategory(str, Enum): HEAP_BUFFER_OVERFLOW = "heap-buffer-overflow" HEAP_USE_AFTER_FREE = "heap-use-after-free" STACK_BUFFER_OVERFLOW = "stack-buffer-overflow" STACK_USE_AFTER_RETURN = "stack-use-after-return" GLOBAL_BUFFER_OVERFLOW = "global-buffer-overflow" USE_AFTER_FREE = "use-after-free" DOUBLE_FREE = "double-free" OTHER_FINDING = "other-finding" # canonical unknown ASan match # Each entry is (regex, category). Matched in order — first hit wins. _KNOWN_PATTERNS: tuple[tuple[str, AsanFindingCategory], ...] = ( (r"ERROR: AddressSanitizer:\s*heap-buffer-overflow", AsanFindingCategory.HEAP_BUFFER_OVERFLOW), (r"ERROR: AddressSanitizer:\s*heap-use-after-free", AsanFindingCategory.HEAP_USE_AFTER_FREE), (r"ERROR: AddressSanitizer:\s*stack-buffer-overflow", AsanFindingCategory.STACK_BUFFER_OVERFLOW), (r"ERROR: AddressSanitizer:\s*stack-use-after-return", AsanFindingCategory.STACK_USE_AFTER_RETURN), (r"ERROR: AddressSanitizer:\s*global-buffer-overflow", AsanFindingCategory.GLOBAL_BUFFER_OVERFLOW), (r"ERROR: AddressSanitizer:\s*use-after-free", AsanFindingCategory.USE_AFTER_FREE), (r"ERROR: AddressSanitizer:\s*double-free", AsanFindingCategory.DOUBLE_FREE), ) _KNOWN_COMPILED = tuple((re.compile(pat), cat) for pat, cat in _KNOWN_PATTERNS) _ANY_ASAN_RE = re.compile(r"ERROR: AddressSanitizer:") def classify_asan_line(line: str) -> AsanFindingCategory | None: """Classify one stderr line. Returns ``None`` if it's not an ASan finding.""" for regex, category in _KNOWN_COMPILED: if regex.search(line): return category if _ANY_ASAN_RE.search(line): return AsanFindingCategory.OTHER_FINDING return None @dataclass(frozen=True) class AsanFinding: """One classified finding (one line OR one synthesized event).""" category: AsanFindingCategory snippet: str # the matched line; truncated to ≤200 chars in evidence @dataclass(frozen=True) class AsanFuzzReport: """Aggregate verdict for one ≥4 h fuzz run.""" duration_seconds: float corpus_size: int findings: Sequence[AsanFinding] @property def passes_duration(self) -> bool: return self.duration_seconds >= MIN_FUZZ_DURATION_SECONDS @property def passes_findings(self) -> bool: return len(self.findings) == 0 @property def reached_corpus_floor(self) -> bool: # Informational only — does NOT contribute to ``passes``. return self.corpus_size >= MIN_CORPUS_COVERAGE @property def passes(self) -> bool: return self.passes_duration and self.passes_findings def evaluate( asan_log_lines: Sequence[str], *, duration_seconds: float, corpus_size: int, ) -> AsanFuzzReport: """Scan the ASan log, classify findings, and assemble the report.""" findings: list[AsanFinding] = [] for line in asan_log_lines: category = classify_asan_line(line) if category is not None: findings.append( AsanFinding( category=category, snippet=line.strip()[:200], ) ) return AsanFuzzReport( duration_seconds=duration_seconds, corpus_size=corpus_size, findings=tuple(findings), ) def write_csv_evidence(out_path: Path, report: AsanFuzzReport) -> Path: out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "duration_seconds", "passes_duration", "corpus_size", "reached_corpus_floor", "finding_count", "passes_findings", "passes", "finding_breakdown", ] ) breakdown: dict[str, int] = {} for f in report.findings: breakdown[f.category.value] = breakdown.get(f.category.value, 0) + 1 breakdown_str = ";".join( f"{cat}={count}" for cat, count in sorted(breakdown.items()) ) writer.writerow( [ f"{report.duration_seconds:.0f}", "true" if report.passes_duration else "false", report.corpus_size, "true" if report.reached_corpus_floor else "false", len(report.findings), "true" if report.passes_findings else "false", "true" if report.passes else "false", breakdown_str, ] ) if report.findings: writer.writerow([]) writer.writerow(["finding_index", "category", "snippet"]) for idx, f in enumerate(report.findings): writer.writerow([idx, f.category.value, f.snippet]) return out_path