"""NFT-SEC-05 — DNS-blackhole defense-in-depth (AZ-437 / AC-NEW-10, residual-risk #1). Tier-1 OR Tier-2. Even if ``e2e-net.internal: true`` is misconfigured, the DNS-blackhole sidecar MUST prevent DNS-based exfiltration. The runner executes a ``nslookup`` inside the SUT container's network namespace and asserts: * AC-2: the sidecar's health endpoint returns healthy; * AC-3a: the lookup *fails* (NXDOMAIN, timeout, "no servers can be reached", or any other documented failure outcome); * AC-3b: no UDP-53 packets cross the host's outbound interface during the probe. The combined verdict object is sourced from the SITL replay fixture (AZ-595) for the same reason NFT-SEC-02 is fixture-sourced: the live ``docker exec`` + host-interface-counter pipeline only exists inside the harness. Production dependency surfaced to AZ-595: fixture JSON shape { "sidecar_healthy": , "lookup_outcome": "nxdomain" | "timeout" | "no_servers_can_be_reached" | "other_failure" | "success", "before": {... egress snapshot ...}, "after": {... egress snapshot ...} } """ from __future__ import annotations import json import os from pathlib import Path import pytest from runner.helpers import egress_observer as eo NFT_SEC_05_FIXTURE_ENV_VAR = "E2E_NFT_SEC_05_FIXTURE" NFT_SEC_05_DEFAULT_FIXTURE_NAME = "nft_sec_05_dns_blackhole.json" @pytest.mark.scenario_id("nft-sec-05") @pytest.mark.traces_to("AC-NEW-10,AC-2,AC-3,AC-4") def test_nft_sec_05_dns_blackhole( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """Sidecar healthy + lookup fails + UDP-53 silent.""" if not sitl_replay_ready: pytest.skip( "NFT-SEC-05 requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595) carrying the DNS-probe " "outcome + UDP-53 counter snapshots. Pure verdict logic " "covered by e2e/_unit_tests/helpers/test_egress_observer.py." ) fixture_path = _resolve_fixture_path() if not fixture_path.is_file(): pytest.fail( f"NFT-SEC-05: fixture not found at {fixture_path}. " f"`{NFT_SEC_05_FIXTURE_ENV_VAR}` env var must point at a JSON " "file with the schema documented in the scenario docstring. " "Production dependency: AZ-595." ) payload = json.loads(fixture_path.read_text()) before, after, lookup_outcome, sidecar_healthy = _parse_payload( payload, fixture_path ) report = eo.evaluate_dns_blackhole( before, after, lookup_outcome=lookup_outcome, sidecar_healthy=sidecar_healthy, ) out_csv = ( evidence_dir / "nft-sec-05" / f"{fc_adapter}-{vio_strategy}.csv" ) eo.write_dns_blackhole_csv_evidence(out_csv, report) nfr_recorder.record_metric( "nft_sec_05.udp53_egress_delta", float(report.delta_udp53), ac_id="AC-3", ) assert report.sidecar_healthy, ( "AC-2: DNS blackhole sidecar reported unhealthy — defense-in-depth " "is unavailable; SUT egress isolation is the only layer protecting " "data residency." ) assert report.passes_lookup, ( f"AC-3a: nslookup outcome = {report.lookup_outcome.value} — DNS " "resolution must FAIL inside the SUT container (NXDOMAIN, timeout, " "no-servers, or other-failure). SUCCESS means an exfiltration " "path exists." ) assert report.passes_udp_silence, ( f"AC-3b: UDP-53 egress delta = {report.delta_udp53} packets " "during probe (budget = 0). Even a single packet leaving the " "host means the DNS-blackhole sidecar failed to absorb the probe." ) def _resolve_fixture_path() -> Path: raw = os.environ.get(NFT_SEC_05_FIXTURE_ENV_VAR, "").strip() from runner.helpers import sitl_observer root = sitl_observer.replay_dir() if not raw: if root is None: return Path(f"<{NFT_SEC_05_FIXTURE_ENV_VAR}-unset>") return root / NFT_SEC_05_DEFAULT_FIXTURE_NAME path = Path(raw) if not path.is_absolute() and root is not None: path = root / path return path def _parse_payload( payload: object, fixture_path: Path ) -> tuple[eo.EgressCounterSnapshot, eo.EgressCounterSnapshot, eo.DnsLookupOutcome, bool]: if not isinstance(payload, dict): pytest.fail( f"NFT-SEC-05: fixture {fixture_path} must be a JSON object; " f"got top-level type={type(payload).__name__}" ) try: sidecar_healthy = bool(payload["sidecar_healthy"]) outcome_raw = str(payload["lookup_outcome"]) try: lookup_outcome = eo.DnsLookupOutcome(outcome_raw) except ValueError as exc: pytest.fail( f"NFT-SEC-05: fixture {fixture_path} 'lookup_outcome' must " f"be one of " f"{sorted(o.value for o in eo.DnsLookupOutcome)}; got " f"{outcome_raw!r} ({exc})" ) before = _parse_snapshot(payload["before"], fixture_path, "before") after = _parse_snapshot(payload["after"], fixture_path, "after") except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-SEC-05: fixture {fixture_path} shape invalid: {exc}" ) return before, after, lookup_outcome, sidecar_healthy def _parse_snapshot( raw: object, fixture_path: Path, label: str ) -> eo.EgressCounterSnapshot: if not isinstance(raw, dict): pytest.fail( f"NFT-SEC-05: fixture {fixture_path} '{label}' must be an object" ) return eo.EgressCounterSnapshot( egress_packets_to_internal_net=int(raw["egress_packets_to_internal_net"]), egress_packets_to_other_destinations=int( raw["egress_packets_to_other_destinations"] ), udp53_egress_packets=int(raw["udp53_egress_packets"]), )