Files
Oleksandr Bezdieniezhnykh c56d4584e6 [AZ-436] [AZ-437] [AZ-438] [AZ-439] Add NFT-SEC-01..05 security scenarios
Batch 87: 6 NFT-SEC blackbox scenarios + 5 helper evaluators + 75 unit
tests + cumulative review batches 85-87.

* AZ-436 NFT-SEC-01: cache-poisoning safety budget (AC-NEW-9); aggregate
  false_trust_count ≤ N×1e-6; zero-tolerance default. Canonical-only by
  default; E2E_NFT_SEC_01_RELEASE_GATE=1 unlocks full matrix.
* AZ-437 NFT-SEC-02 + NFT-SEC-05: shared egress-observation evaluator
  (AC-NEW-10); SEC-02 = 0 packets to non-e2e-net over 5min replay;
  SEC-05 = DNS-blackhole sidecar healthy + lookup fails + UDP-53 silent.
* AZ-438 NFT-SEC-03: AP-only signing rejection (AC-NEW-11); 3 sub-cases
  (unsigned/wrong-key/replayed) each reject ≤500ms + no position drift.
* AZ-439 NFT-SEC-04: probe (always-run) = no-crash + deterministic
  decode outcome; ASan-fuzz (release-gate) = 0 findings ≥4h; AC-3
  corpus floor informational only per spec.

Verdict per-batch: PASS_WITH_WARNINGS (5 Low). Cumulative review for
batches 85-87 (K=3 window) also PASS_WITH_WARNINGS with 5 cross-batch
findings — recommends hygiene PBIs for write_csv_evidence duplication
(13 helpers) and _resolve_fixture_path duplication (13 scenarios), plus
new tickets for AZ-595 fixture builder + DNS-blackhole sidecar service.

Also adds _docs/LESSONS.md documenting the Jira transition-ID lesson
(always call getTransitionsForJiraIssue first, never memorize numeric
IDs across sessions).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:33:22 +03:00

147 lines
5.1 KiB
Python

"""NFT-SEC-02 — No-egress contract (AZ-437 / AC-NEW-10).
Tier-1 OR Tier-2. Over a 5-min Derkachi replay against
``e2e-net.internal: true``, ``docker network inspect e2e-net`` MUST show
zero packets from the SUT container to any non-``e2e-net`` destination.
The egress-counter snapshot pair is sourced from the SITL replay
fixture (AZ-595) since the live ``docker network inspect`` call requires
a running e2e-runner container with Docker-API access — which only
exists inside the harness, not on the developer workstation. The
scenario test therefore behaves identically to the other fixture-
consumer NFTs: skip cleanly without fixtures; parse + verdict + record
when fixtures are present.
Production dependency surfaced to AZ-595: fixture JSON shape
{
"window_label": "<str>",
"before": {"egress_packets_to_internal_net": <int>,
"egress_packets_to_other_destinations": <int>,
"udp53_egress_packets": <int>},
"after": {... same shape ...}
}
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import egress_observer as eo
NFT_SEC_02_FIXTURE_ENV_VAR = "E2E_NFT_SEC_02_FIXTURE"
NFT_SEC_02_DEFAULT_FIXTURE_NAME = "nft_sec_02_no_egress.json"
@pytest.mark.scenario_id("nft-sec-02")
@pytest.mark.traces_to("AC-NEW-10,AC-1,AC-4")
def test_nft_sec_02_no_egress(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-1: 0 packets to non-e2e-net during the 5-min replay window."""
if not sitl_replay_ready:
pytest.skip(
"NFT-SEC-02 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying the Docker "
"network-stats before/after snapshots. Pure delta-verdict "
"logic covered by "
"e2e/_unit_tests/helpers/test_egress_observer.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-SEC-02: fixture not found at {fixture_path}. "
f"`{NFT_SEC_02_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
before, after, window_label = _parse_payload(payload, fixture_path)
report = eo.evaluate_no_egress(before, after, window_label=window_label)
out_csv = (
evidence_dir
/ "nft-sec-02"
/ f"{fc_adapter}-{vio_strategy}.csv"
)
eo.write_no_egress_csv_evidence(out_csv, report)
nfr_recorder.record_metric(
"nft_sec_02.egress_packets_to_other_destinations_delta",
float(report.delta_other_destinations),
ac_id="AC-1",
)
nfr_recorder.record_metric(
"nft_sec_02.egress_packets_to_internal_net_delta",
float(report.delta_internal),
ac_id="AC-1",
)
assert report.passes, (
f"AC-1: SUT container egressed {report.delta_other_destinations} "
f"packets to non-e2e-net destinations during window "
f"'{report.window_label}' (budget = 0). "
f"before={report.before.egress_packets_to_other_destinations}, "
f"after={report.after.egress_packets_to_other_destinations}"
)
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_SEC_02_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_SEC_02_FIXTURE_ENV_VAR}-unset>")
return root / NFT_SEC_02_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> tuple[eo.EgressCounterSnapshot, eo.EgressCounterSnapshot, str]:
if not isinstance(payload, dict):
pytest.fail(
f"NFT-SEC-02: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
window_label = str(payload.get("window_label", "5min-derkachi-replay"))
try:
before = _parse_snapshot(payload["before"], fixture_path, "before")
after = _parse_snapshot(payload["after"], fixture_path, "after")
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-SEC-02: fixture {fixture_path} snapshot shape invalid: {exc}"
)
return before, after, window_label
def _parse_snapshot(
raw: object, fixture_path: Path, label: str
) -> eo.EgressCounterSnapshot:
if not isinstance(raw, dict):
pytest.fail(
f"NFT-SEC-02: fixture {fixture_path} '{label}' must be an object"
)
return eo.EgressCounterSnapshot(
egress_packets_to_internal_net=int(raw["egress_packets_to_internal_net"]),
egress_packets_to_other_destinations=int(
raw["egress_packets_to_other_destinations"]
),
udp53_egress_packets=int(raw.get("udp53_egress_packets", 0)),
)