"""NFT-SEC-04 ≥4 h ASan fuzz — release-gated (AZ-439 / RESTRICT-CVE-1 AC-2 + AC-3). Companion to ``test_nft_sec_04_opencv_cve.py`` (the always-run probe). This scenario consumes the captured fuzz-run summary (ASan stderr log + duration + corpus size) and asserts: * AC-2: 0 ASan findings of any category; * AC-3: ≥1000 unique JPEG corpus inputs (informational only — does NOT contribute to ``passes`` so a fuzz with high finding count + low corpus fails for the finding count, not the coverage proxy). Release-gated by ``E2E_NFT_SEC_04_RELEASE_GATE=1`` because the fuzz run takes ≥4 h. fc_adapter parameterization is irrelevant for image decode (AC-4): only the ``ardupilot`` parameterization actually executes; the rest skip cleanly to avoid duplicating a 4 h run. Production dependencies surfaced: * **AZ-444 (Tier-2 harness)**: optional. The Tier-1 path can run a shorter fuzz against the ASan SUT image on x86; Tier-2 runs the same fuzz on Jetson with the same SUT image. * **AZ-595**: emit ``nft_sec_04_asan_fuzz.json`` carrying the captured ASan stderr log lines + duration + corpus size. Fixture JSON shape:: { "duration_seconds": , "corpus_size": , "asan_log_lines": [, , ...] } """ from __future__ import annotations import json import os from pathlib import Path import pytest from runner.helpers import asan_fuzz_evaluator as afe NFT_SEC_04_ASAN_FIXTURE_ENV_VAR = "E2E_NFT_SEC_04_ASAN_FIXTURE" NFT_SEC_04_ASAN_DEFAULT_FIXTURE_NAME = "nft_sec_04_asan_fuzz.json" NFT_SEC_04_RELEASE_GATE_ENV_VAR = "E2E_NFT_SEC_04_RELEASE_GATE" @pytest.mark.scenario_id("nft-sec-04-asan-fuzz") @pytest.mark.traces_to("RESTRICT-CVE-1,AC-2,AC-3,AC-4") def test_nft_sec_04_asan_fuzz( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """0 ASan findings across ≥4 h JPEG-fuzz; corpus ≥1000 (informational).""" if not _release_gate_enabled(): pytest.skip( "NFT-SEC-04 ASan-fuzz is release-gated (≥4 h run). Set " f"`{NFT_SEC_04_RELEASE_GATE_ENV_VAR}=1` to execute. The " "probe scenario (test_nft_sec_04_opencv_cve.py) covers " "RESTRICT-CVE-1 AC-1 on every CI run." ) if fc_adapter != "ardupilot": pytest.skip( "AC-4: NFT-SEC-04 ASan-fuzz is fc_adapter-agnostic (image " "decode is upstream of FC); only run once per vio_strategy " "under fc_adapter=ardupilot to avoid duplicating a 4 h run." ) if not sitl_replay_ready: pytest.skip( "NFT-SEC-04 ASan-fuzz requires `E2E_SITL_REPLAY_DIR` to point " "at a prepared SITL replay fixture (AZ-595) carrying the " "captured fuzz-run summary. Pure ASan log classification + " "verdict logic covered by " "e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py." ) fixture_path = _resolve_fixture_path() if not fixture_path.is_file(): pytest.fail( f"NFT-SEC-04 ASan-fuzz: fixture not found at {fixture_path}. " f"`{NFT_SEC_04_ASAN_FIXTURE_ENV_VAR}` env var must point at a " "JSON file with the schema documented in the scenario " "docstring. Production dependency: AZ-595 + (optional) AZ-444." ) payload = json.loads(fixture_path.read_text()) duration_s, corpus_size, log_lines = _parse_payload(payload, fixture_path) report = afe.evaluate( log_lines, duration_seconds=duration_s, corpus_size=corpus_size, ) out_csv = ( evidence_dir / "nft-sec-04" / f"{fc_adapter}-{vio_strategy}-asan-fuzz.csv" ) afe.write_csv_evidence(out_csv, report) nfr_recorder.record_metric( "nft_sec_04.asan_finding_count", float(len(report.findings)), ac_id="AC-2", ) nfr_recorder.record_metric( "nft_sec_04.fuzz_duration_seconds", report.duration_seconds, ac_id="AC-2", ) nfr_recorder.record_metric( "nft_sec_04.fuzz_corpus_size", float(report.corpus_size), ac_id="AC-3", ) assert report.passes_duration, ( f"AC-2 pre-condition: fuzz duration {report.duration_seconds:.0f} s " f"is below the required ≥{afe.MIN_FUZZ_DURATION_SECONDS} s — " "the 0-finding result is not statistically meaningful for the " "RESTRICT-CVE-1 budget without the full window." ) assert report.passes_findings, ( f"AC-2: {len(report.findings)} ASan finding(s) recorded — " f"see `nft-sec-04/{fc_adapter}-{vio_strategy}-asan-fuzz.csv` " f"for per-finding categories. Any finding is a release-blocker." ) # AC-3 is informational: emit a warning-style fail-fast message via # the evidence CSV (already written above) but do NOT fail the test. # The user is expected to inspect the corpus floor manually. def _release_gate_enabled() -> bool: return os.environ.get(NFT_SEC_04_RELEASE_GATE_ENV_VAR, "").strip().lower() in ( "1", "true", "yes", ) def _resolve_fixture_path() -> Path: raw = os.environ.get(NFT_SEC_04_ASAN_FIXTURE_ENV_VAR, "").strip() from runner.helpers import sitl_observer root = sitl_observer.replay_dir() if not raw: if root is None: return Path(f"<{NFT_SEC_04_ASAN_FIXTURE_ENV_VAR}-unset>") return root / NFT_SEC_04_ASAN_DEFAULT_FIXTURE_NAME path = Path(raw) if not path.is_absolute() and root is not None: path = root / path return path def _parse_payload( payload: object, fixture_path: Path ) -> tuple[float, int, list[str]]: if not isinstance(payload, dict): pytest.fail( f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} must be a JSON " f"object; got top-level type={type(payload).__name__}" ) try: duration_s = float(payload["duration_seconds"]) corpus_size = int(payload["corpus_size"]) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} missing/invalid " f"duration_seconds or corpus_size: {exc}" ) raw_lines = payload.get("asan_log_lines", []) if not isinstance(raw_lines, list): pytest.fail( f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} " f"'asan_log_lines' must be a list (may be empty)" ) log_lines = [str(line) for line in raw_lines] return duration_s, corpus_size, log_lines