"""NFT-SEC-04 probe — OpenCV CVE-2025-53644 no-crash (AZ-439 / RESTRICT-CVE-1). Always-runs (Tier-1 OR Tier-2). The crafted ``cve-2025-53644.jpg`` is fed to the SUT's nav-camera as a single frame and the FDR archive is inspected: * AC-1a: at least one FDR record exists strictly after the probe injection (proves the SUT process did not crash); * AC-1b: the FDR record matched within ``±tolerance_ms`` of the probe is one of ``decode-success`` or ``frame-decode-error`` (proves the SUT either decoded the patched JPEG or gracefully rejected it). The companion ≥4 h ASan fuzz lives in ``test_nft_sec_04_asan_fuzz.py`` and is release-gated. Production dependencies surfaced: * **AZ-595**: emit ``nft_sec_04_cve_probe.json`` carrying ``probe_injected_at_ms`` + the per-frame FDR record sequence the runner captured; * **SUT**: the SUT MUST honor its FDR per-frame outcome contract — a silent drop is treated as a defense-bypass failure even when the process does not crash. Fixture JSON shape:: { "probe_injected_at_ms": , "tolerance_ms": , "fdr_records": [ {"monotonic_ms": , "kind": }, ... ] } """ from __future__ import annotations import json import os from pathlib import Path import pytest from runner.helpers import cve_probe_evaluator as cpe NFT_SEC_04_FIXTURE_ENV_VAR = "E2E_NFT_SEC_04_FIXTURE" NFT_SEC_04_DEFAULT_FIXTURE_NAME = "nft_sec_04_cve_probe.json" @pytest.mark.scenario_id("nft-sec-04") @pytest.mark.traces_to("RESTRICT-CVE-1,AC-1,AC-4") def test_nft_sec_04_opencv_cve_probe( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """SUT survives the crafted JPEG and records a deterministic outcome.""" if not sitl_replay_ready: pytest.skip( "NFT-SEC-04 probe requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595) carrying the post-probe " "FDR record sequence. Pure no-crash / outcome-classification " "logic covered by " "e2e/_unit_tests/helpers/test_cve_probe_evaluator.py." ) fixture_path = _resolve_fixture_path() if not fixture_path.is_file(): pytest.fail( f"NFT-SEC-04 probe: fixture not found at {fixture_path}. " f"`{NFT_SEC_04_FIXTURE_ENV_VAR}` env var must point at a JSON " "file with the schema documented in the scenario docstring. " "Production dependency: AZ-595." ) payload = json.loads(fixture_path.read_text()) probe_at_ms, tolerance_ms, fdr_records = _parse_payload(payload, fixture_path) report = cpe.evaluate( fdr_records, probe_injected_at_ms=probe_at_ms, tolerance_ms=tolerance_ms, ) out_csv = ( evidence_dir / "nft-sec-04" / f"{fc_adapter}-{vio_strategy}-probe.csv" ) cpe.write_csv_evidence(out_csv, report) nfr_recorder.record_metric( "nft_sec_04.probe_outcome_is_decode_success", 1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.DECODE_SUCCESS else 0.0, ac_id="AC-1", ) nfr_recorder.record_metric( "nft_sec_04.probe_outcome_is_graceful_error", 1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.FRAME_DECODE_ERROR else 0.0, ac_id="AC-1", ) assert report.passes_no_crash, ( f"AC-1a: SUT did not produce any FDR record after probe injection " f"at {report.probe_injected_at_ms} ms — process likely crashed. " f"last_fdr_record_at_ms={report.last_fdr_record_at_ms}." ) assert report.passes_graceful_outcome, ( f"AC-1b: SUT silently dropped the probe frame (no decode-success " f"or frame-decode-error in FDR within ±{tolerance_ms} ms of " f"probe injection at {report.probe_injected_at_ms} ms). Silent " f"drops are a defense-bypass failure even if the process did not " f"crash." ) def _resolve_fixture_path() -> Path: raw = os.environ.get(NFT_SEC_04_FIXTURE_ENV_VAR, "").strip() from runner.helpers import sitl_observer root = sitl_observer.replay_dir() if not raw: if root is None: return Path(f"<{NFT_SEC_04_FIXTURE_ENV_VAR}-unset>") return root / NFT_SEC_04_DEFAULT_FIXTURE_NAME path = Path(raw) if not path.is_absolute() and root is not None: path = root / path return path def _parse_payload( payload: object, fixture_path: Path ) -> tuple[int, int, list[cpe.FdrSurvivalRecord]]: if not isinstance(payload, dict): pytest.fail( f"NFT-SEC-04 probe: fixture {fixture_path} must be a JSON object; " f"got top-level type={type(payload).__name__}" ) try: probe_at = int(payload["probe_injected_at_ms"]) tolerance = int(payload.get("tolerance_ms", 50)) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-SEC-04 probe: fixture {fixture_path} missing/invalid " f"probe_injected_at_ms or tolerance_ms: {exc}" ) raw_records = payload.get("fdr_records") if not isinstance(raw_records, list): pytest.fail( f"NFT-SEC-04 probe: fixture {fixture_path} 'fdr_records' must be a list" ) records: list[cpe.FdrSurvivalRecord] = [] for idx, entry in enumerate(raw_records): if not isinstance(entry, dict): pytest.fail( f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} " f"must be an object" ) try: records.append( cpe.FdrSurvivalRecord( monotonic_ms=int(entry["monotonic_ms"]), kind=str(entry["kind"]), ) ) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} " f"shape invalid: {exc}" ) return probe_at, tolerance, records