Files
Oleksandr Bezdieniezhnykh c56d4584e6 [AZ-436] [AZ-437] [AZ-438] [AZ-439] Add NFT-SEC-01..05 security scenarios
Batch 87: 6 NFT-SEC blackbox scenarios + 5 helper evaluators + 75 unit
tests + cumulative review batches 85-87.

* AZ-436 NFT-SEC-01: cache-poisoning safety budget (AC-NEW-9); aggregate
  false_trust_count ≤ N×1e-6; zero-tolerance default. Canonical-only by
  default; E2E_NFT_SEC_01_RELEASE_GATE=1 unlocks full matrix.
* AZ-437 NFT-SEC-02 + NFT-SEC-05: shared egress-observation evaluator
  (AC-NEW-10); SEC-02 = 0 packets to non-e2e-net over 5min replay;
  SEC-05 = DNS-blackhole sidecar healthy + lookup fails + UDP-53 silent.
* AZ-438 NFT-SEC-03: AP-only signing rejection (AC-NEW-11); 3 sub-cases
  (unsigned/wrong-key/replayed) each reject ≤500ms + no position drift.
* AZ-439 NFT-SEC-04: probe (always-run) = no-crash + deterministic
  decode outcome; ASan-fuzz (release-gate) = 0 findings ≥4h; AC-3
  corpus floor informational only per spec.

Verdict per-batch: PASS_WITH_WARNINGS (5 Low). Cumulative review for
batches 85-87 (K=3 window) also PASS_WITH_WARNINGS with 5 cross-batch
findings — recommends hygiene PBIs for write_csv_evidence duplication
(13 helpers) and _resolve_fixture_path duplication (13 scenarios), plus
new tickets for AZ-595 fixture builder + DNS-blackhole sidecar service.

Also adds _docs/LESSONS.md documenting the Jira transition-ID lesson
(always call getTransitionsForJiraIssue first, never memorize numeric
IDs across sessions).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:33:22 +03:00

169 lines
4.4 KiB
Python

"""Unit tests for ``runner.helpers.egress_observer`` (NFT-SEC-02 + NFT-SEC-05 / AZ-437)."""
from __future__ import annotations
import csv
from pathlib import Path
import pytest
from runner.helpers import egress_observer as eo
def _snap(other: int = 0, internal: int = 0, udp53: int = 0) -> eo.EgressCounterSnapshot:
return eo.EgressCounterSnapshot(
egress_packets_to_internal_net=internal,
egress_packets_to_other_destinations=other,
udp53_egress_packets=udp53,
)
def test_egress_counter_rejects_negative_values() -> None:
with pytest.raises(ValueError, match="cannot be negative"):
eo.EgressCounterSnapshot(
egress_packets_to_internal_net=-1,
egress_packets_to_other_destinations=0,
udp53_egress_packets=0,
)
def test_no_egress_zero_delta_passes() -> None:
before = _snap(other=10, internal=5)
after = _snap(other=10, internal=42) # internal traffic grew; that's fine
report = eo.evaluate_no_egress(before, after, window_label="5min")
assert report.delta_other_destinations == 0
assert report.passes
def test_no_egress_nonzero_delta_fails() -> None:
before = _snap(other=10)
after = _snap(other=11)
report = eo.evaluate_no_egress(before, after, window_label="5min")
assert report.delta_other_destinations == 1
assert not report.passes
def test_no_egress_records_internal_delta_for_evidence() -> None:
before = _snap(internal=100)
after = _snap(internal=200)
report = eo.evaluate_no_egress(before, after, window_label="5min-derkachi")
assert report.delta_internal == 100 # informational; does not affect verdict
assert report.passes
def test_dns_blackhole_passes_on_full_silence_and_failed_lookup() -> None:
before = _snap(udp53=7)
after = _snap(udp53=7)
report = eo.evaluate_dns_blackhole(
before,
after,
lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN,
sidecar_healthy=True,
)
assert report.passes
def test_dns_blackhole_fails_on_successful_lookup() -> None:
before = _snap(udp53=7)
after = _snap(udp53=7)
report = eo.evaluate_dns_blackhole(
before,
after,
lookup_outcome=eo.DnsLookupOutcome.SUCCESS,
sidecar_healthy=True,
)
assert not report.passes_lookup
assert not report.passes
def test_dns_blackhole_fails_when_udp53_packets_escaped() -> None:
before = _snap(udp53=7)
after = _snap(udp53=8)
report = eo.evaluate_dns_blackhole(
before,
after,
lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN,
sidecar_healthy=True,
)
assert not report.passes_udp_silence
assert not report.passes
def test_dns_blackhole_fails_when_sidecar_unhealthy() -> None:
before = _snap()
after = _snap()
report = eo.evaluate_dns_blackhole(
before,
after,
lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN,
sidecar_healthy=False,
)
assert not report.passes
@pytest.mark.parametrize(
"outcome",
[
eo.DnsLookupOutcome.NXDOMAIN,
eo.DnsLookupOutcome.TIMEOUT,
eo.DnsLookupOutcome.NO_SERVERS,
eo.DnsLookupOutcome.OTHER_FAILURE,
],
)
def test_all_failure_outcomes_pass_lookup_check(outcome: eo.DnsLookupOutcome) -> None:
report = eo.evaluate_dns_blackhole(
_snap(),
_snap(),
lookup_outcome=outcome,
sidecar_healthy=True,
)
assert report.passes_lookup
def test_no_egress_csv_evidence_round_trip(tmp_path: Path) -> None:
before = _snap(other=0, internal=5)
after = _snap(other=0, internal=42)
report = eo.evaluate_no_egress(before, after, window_label="5min")
out = tmp_path / "out.csv"
eo.write_no_egress_csv_evidence(out, report)
with out.open() as fh:
rows = list(csv.reader(fh))
assert rows[0][0] == "window_label"
assert rows[1][0] == "5min"
assert rows[1][-1] == "true"
def test_dns_blackhole_csv_evidence_round_trip(tmp_path: Path) -> None:
report = eo.evaluate_dns_blackhole(
_snap(udp53=7),
_snap(udp53=7),
lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN,
sidecar_healthy=True,
)
out = tmp_path / "out.csv"
eo.write_dns_blackhole_csv_evidence(out, report)
with out.open() as fh:
rows = list(csv.reader(fh))
assert rows[0][0] == "sidecar_healthy"
assert rows[1][1] == "nxdomain"
assert rows[1][-1] == "true"