"""NFT-SEC-02 — No-egress contract (AZ-437 / AC-NEW-10). Tier-1 OR Tier-2. Over a 5-min Derkachi replay against ``e2e-net.internal: true``, ``docker network inspect e2e-net`` MUST show zero packets from the SUT container to any non-``e2e-net`` destination. The egress-counter snapshot pair is sourced from the SITL replay fixture (AZ-595) since the live ``docker network inspect`` call requires a running e2e-runner container with Docker-API access — which only exists inside the harness, not on the developer workstation. The scenario test therefore behaves identically to the other fixture- consumer NFTs: skip cleanly without fixtures; parse + verdict + record when fixtures are present. Production dependency surfaced to AZ-595: fixture JSON shape { "window_label": "", "before": {"egress_packets_to_internal_net": , "egress_packets_to_other_destinations": , "udp53_egress_packets": }, "after": {... same shape ...} } """ from __future__ import annotations import json import os from pathlib import Path import pytest from runner.helpers import egress_observer as eo NFT_SEC_02_FIXTURE_ENV_VAR = "E2E_NFT_SEC_02_FIXTURE" NFT_SEC_02_DEFAULT_FIXTURE_NAME = "nft_sec_02_no_egress.json" @pytest.mark.scenario_id("nft-sec-02") @pytest.mark.traces_to("AC-NEW-10,AC-1,AC-4") def test_nft_sec_02_no_egress( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """AC-1: 0 packets to non-e2e-net during the 5-min replay window.""" if not sitl_replay_ready: pytest.skip( "NFT-SEC-02 requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595) carrying the Docker " "network-stats before/after snapshots. Pure delta-verdict " "logic covered by " "e2e/_unit_tests/helpers/test_egress_observer.py." ) fixture_path = _resolve_fixture_path() if not fixture_path.is_file(): pytest.fail( f"NFT-SEC-02: fixture not found at {fixture_path}. " f"`{NFT_SEC_02_FIXTURE_ENV_VAR}` env var must point at a JSON " "file with the schema documented in the scenario docstring. " "Production dependency: AZ-595." ) payload = json.loads(fixture_path.read_text()) before, after, window_label = _parse_payload(payload, fixture_path) report = eo.evaluate_no_egress(before, after, window_label=window_label) out_csv = ( evidence_dir / "nft-sec-02" / f"{fc_adapter}-{vio_strategy}.csv" ) eo.write_no_egress_csv_evidence(out_csv, report) nfr_recorder.record_metric( "nft_sec_02.egress_packets_to_other_destinations_delta", float(report.delta_other_destinations), ac_id="AC-1", ) nfr_recorder.record_metric( "nft_sec_02.egress_packets_to_internal_net_delta", float(report.delta_internal), ac_id="AC-1", ) assert report.passes, ( f"AC-1: SUT container egressed {report.delta_other_destinations} " f"packets to non-e2e-net destinations during window " f"'{report.window_label}' (budget = 0). " f"before={report.before.egress_packets_to_other_destinations}, " f"after={report.after.egress_packets_to_other_destinations}" ) def _resolve_fixture_path() -> Path: raw = os.environ.get(NFT_SEC_02_FIXTURE_ENV_VAR, "").strip() from runner.helpers import sitl_observer root = sitl_observer.replay_dir() if not raw: if root is None: return Path(f"<{NFT_SEC_02_FIXTURE_ENV_VAR}-unset>") return root / NFT_SEC_02_DEFAULT_FIXTURE_NAME path = Path(raw) if not path.is_absolute() and root is not None: path = root / path return path def _parse_payload( payload: object, fixture_path: Path ) -> tuple[eo.EgressCounterSnapshot, eo.EgressCounterSnapshot, str]: if not isinstance(payload, dict): pytest.fail( f"NFT-SEC-02: fixture {fixture_path} must be a JSON object; " f"got top-level type={type(payload).__name__}" ) window_label = str(payload.get("window_label", "5min-derkachi-replay")) try: before = _parse_snapshot(payload["before"], fixture_path, "before") after = _parse_snapshot(payload["after"], fixture_path, "after") except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-SEC-02: fixture {fixture_path} snapshot shape invalid: {exc}" ) return before, after, window_label def _parse_snapshot( raw: object, fixture_path: Path, label: str ) -> eo.EgressCounterSnapshot: if not isinstance(raw, dict): pytest.fail( f"NFT-SEC-02: fixture {fixture_path} '{label}' must be an object" ) return eo.EgressCounterSnapshot( egress_packets_to_internal_net=int(raw["egress_packets_to_internal_net"]), egress_packets_to_other_destinations=int( raw["egress_packets_to_other_destinations"] ), udp53_egress_packets=int(raw.get("udp53_egress_packets", 0)), )