Files
gps-denied-onboard/e2e/tests/security/test_nft_sec_04_opencv_cve.py
T
Oleksandr Bezdieniezhnykh c56d4584e6 [AZ-436] [AZ-437] [AZ-438] [AZ-439] Add NFT-SEC-01..05 security scenarios
Batch 87: 6 NFT-SEC blackbox scenarios + 5 helper evaluators + 75 unit
tests + cumulative review batches 85-87.

* AZ-436 NFT-SEC-01: cache-poisoning safety budget (AC-NEW-9); aggregate
  false_trust_count ≤ N×1e-6; zero-tolerance default. Canonical-only by
  default; E2E_NFT_SEC_01_RELEASE_GATE=1 unlocks full matrix.
* AZ-437 NFT-SEC-02 + NFT-SEC-05: shared egress-observation evaluator
  (AC-NEW-10); SEC-02 = 0 packets to non-e2e-net over 5min replay;
  SEC-05 = DNS-blackhole sidecar healthy + lookup fails + UDP-53 silent.
* AZ-438 NFT-SEC-03: AP-only signing rejection (AC-NEW-11); 3 sub-cases
  (unsigned/wrong-key/replayed) each reject ≤500ms + no position drift.
* AZ-439 NFT-SEC-04: probe (always-run) = no-crash + deterministic
  decode outcome; ASan-fuzz (release-gate) = 0 findings ≥4h; AC-3
  corpus floor informational only per spec.

Verdict per-batch: PASS_WITH_WARNINGS (5 Low). Cumulative review for
batches 85-87 (K=3 window) also PASS_WITH_WARNINGS with 5 cross-batch
findings — recommends hygiene PBIs for write_csv_evidence duplication
(13 helpers) and _resolve_fixture_path duplication (13 scenarios), plus
new tickets for AZ-595 fixture builder + DNS-blackhole sidecar service.

Also adds _docs/LESSONS.md documenting the Jira transition-ID lesson
(always call getTransitionsForJiraIssue first, never memorize numeric
IDs across sessions).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:33:22 +03:00

174 lines
6.0 KiB
Python

"""NFT-SEC-04 probe — OpenCV CVE-2025-53644 no-crash (AZ-439 / RESTRICT-CVE-1).
Always-runs (Tier-1 OR Tier-2). The crafted ``cve-2025-53644.jpg`` is
fed to the SUT's nav-camera as a single frame and the FDR archive is
inspected:
* AC-1a: at least one FDR record exists strictly after the probe
injection (proves the SUT process did not crash);
* AC-1b: the FDR record matched within ``±tolerance_ms`` of the probe
is one of ``decode-success`` or ``frame-decode-error`` (proves the
SUT either decoded the patched JPEG or gracefully rejected it).
The companion ≥4 h ASan fuzz lives in
``test_nft_sec_04_asan_fuzz.py`` and is release-gated.
Production dependencies surfaced:
* **AZ-595**: emit ``nft_sec_04_cve_probe.json`` carrying
``probe_injected_at_ms`` + the per-frame FDR record sequence the
runner captured;
* **SUT**: the SUT MUST honor its FDR per-frame outcome contract — a
silent drop is treated as a defense-bypass failure even when the
process does not crash.
Fixture JSON shape::
{
"probe_injected_at_ms": <int>,
"tolerance_ms": <int, optional, default 50>,
"fdr_records": [
{"monotonic_ms": <int>, "kind": <str>}, ...
]
}
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import cve_probe_evaluator as cpe
NFT_SEC_04_FIXTURE_ENV_VAR = "E2E_NFT_SEC_04_FIXTURE"
NFT_SEC_04_DEFAULT_FIXTURE_NAME = "nft_sec_04_cve_probe.json"
@pytest.mark.scenario_id("nft-sec-04")
@pytest.mark.traces_to("RESTRICT-CVE-1,AC-1,AC-4")
def test_nft_sec_04_opencv_cve_probe(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""SUT survives the crafted JPEG and records a deterministic outcome."""
if not sitl_replay_ready:
pytest.skip(
"NFT-SEC-04 probe requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying the post-probe "
"FDR record sequence. Pure no-crash / outcome-classification "
"logic covered by "
"e2e/_unit_tests/helpers/test_cve_probe_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-SEC-04 probe: fixture not found at {fixture_path}. "
f"`{NFT_SEC_04_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
probe_at_ms, tolerance_ms, fdr_records = _parse_payload(payload, fixture_path)
report = cpe.evaluate(
fdr_records,
probe_injected_at_ms=probe_at_ms,
tolerance_ms=tolerance_ms,
)
out_csv = (
evidence_dir
/ "nft-sec-04"
/ f"{fc_adapter}-{vio_strategy}-probe.csv"
)
cpe.write_csv_evidence(out_csv, report)
nfr_recorder.record_metric(
"nft_sec_04.probe_outcome_is_decode_success",
1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.DECODE_SUCCESS else 0.0,
ac_id="AC-1",
)
nfr_recorder.record_metric(
"nft_sec_04.probe_outcome_is_graceful_error",
1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.FRAME_DECODE_ERROR else 0.0,
ac_id="AC-1",
)
assert report.passes_no_crash, (
f"AC-1a: SUT did not produce any FDR record after probe injection "
f"at {report.probe_injected_at_ms} ms — process likely crashed. "
f"last_fdr_record_at_ms={report.last_fdr_record_at_ms}."
)
assert report.passes_graceful_outcome, (
f"AC-1b: SUT silently dropped the probe frame (no decode-success "
f"or frame-decode-error in FDR within ±{tolerance_ms} ms of "
f"probe injection at {report.probe_injected_at_ms} ms). Silent "
f"drops are a defense-bypass failure even if the process did not "
f"crash."
)
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_SEC_04_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_SEC_04_FIXTURE_ENV_VAR}-unset>")
return root / NFT_SEC_04_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> tuple[int, int, list[cpe.FdrSurvivalRecord]]:
if not isinstance(payload, dict):
pytest.fail(
f"NFT-SEC-04 probe: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
try:
probe_at = int(payload["probe_injected_at_ms"])
tolerance = int(payload.get("tolerance_ms", 50))
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-SEC-04 probe: fixture {fixture_path} missing/invalid "
f"probe_injected_at_ms or tolerance_ms: {exc}"
)
raw_records = payload.get("fdr_records")
if not isinstance(raw_records, list):
pytest.fail(
f"NFT-SEC-04 probe: fixture {fixture_path} 'fdr_records' must be a list"
)
records: list[cpe.FdrSurvivalRecord] = []
for idx, entry in enumerate(raw_records):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} "
f"must be an object"
)
try:
records.append(
cpe.FdrSurvivalRecord(
monotonic_ms=int(entry["monotonic_ms"]),
kind=str(entry["kind"]),
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} "
f"shape invalid: {exc}"
)
return probe_at, tolerance, records