"""Unit tests for ``runner.helpers.egress_observer`` (NFT-SEC-02 + NFT-SEC-05 / AZ-437).""" from __future__ import annotations import csv from pathlib import Path import pytest from runner.helpers import egress_observer as eo def _snap(other: int = 0, internal: int = 0, udp53: int = 0) -> eo.EgressCounterSnapshot: return eo.EgressCounterSnapshot( egress_packets_to_internal_net=internal, egress_packets_to_other_destinations=other, udp53_egress_packets=udp53, ) def test_egress_counter_rejects_negative_values() -> None: with pytest.raises(ValueError, match="cannot be negative"): eo.EgressCounterSnapshot( egress_packets_to_internal_net=-1, egress_packets_to_other_destinations=0, udp53_egress_packets=0, ) def test_no_egress_zero_delta_passes() -> None: before = _snap(other=10, internal=5) after = _snap(other=10, internal=42) # internal traffic grew; that's fine report = eo.evaluate_no_egress(before, after, window_label="5min") assert report.delta_other_destinations == 0 assert report.passes def test_no_egress_nonzero_delta_fails() -> None: before = _snap(other=10) after = _snap(other=11) report = eo.evaluate_no_egress(before, after, window_label="5min") assert report.delta_other_destinations == 1 assert not report.passes def test_no_egress_records_internal_delta_for_evidence() -> None: before = _snap(internal=100) after = _snap(internal=200) report = eo.evaluate_no_egress(before, after, window_label="5min-derkachi") assert report.delta_internal == 100 # informational; does not affect verdict assert report.passes def test_dns_blackhole_passes_on_full_silence_and_failed_lookup() -> None: before = _snap(udp53=7) after = _snap(udp53=7) report = eo.evaluate_dns_blackhole( before, after, lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, sidecar_healthy=True, ) assert report.passes def test_dns_blackhole_fails_on_successful_lookup() -> None: before = _snap(udp53=7) after = _snap(udp53=7) report = eo.evaluate_dns_blackhole( before, after, lookup_outcome=eo.DnsLookupOutcome.SUCCESS, sidecar_healthy=True, ) assert not report.passes_lookup assert not report.passes def test_dns_blackhole_fails_when_udp53_packets_escaped() -> None: before = _snap(udp53=7) after = _snap(udp53=8) report = eo.evaluate_dns_blackhole( before, after, lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, sidecar_healthy=True, ) assert not report.passes_udp_silence assert not report.passes def test_dns_blackhole_fails_when_sidecar_unhealthy() -> None: before = _snap() after = _snap() report = eo.evaluate_dns_blackhole( before, after, lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, sidecar_healthy=False, ) assert not report.passes @pytest.mark.parametrize( "outcome", [ eo.DnsLookupOutcome.NXDOMAIN, eo.DnsLookupOutcome.TIMEOUT, eo.DnsLookupOutcome.NO_SERVERS, eo.DnsLookupOutcome.OTHER_FAILURE, ], ) def test_all_failure_outcomes_pass_lookup_check(outcome: eo.DnsLookupOutcome) -> None: report = eo.evaluate_dns_blackhole( _snap(), _snap(), lookup_outcome=outcome, sidecar_healthy=True, ) assert report.passes_lookup def test_no_egress_csv_evidence_round_trip(tmp_path: Path) -> None: before = _snap(other=0, internal=5) after = _snap(other=0, internal=42) report = eo.evaluate_no_egress(before, after, window_label="5min") out = tmp_path / "out.csv" eo.write_no_egress_csv_evidence(out, report) with out.open() as fh: rows = list(csv.reader(fh)) assert rows[0][0] == "window_label" assert rows[1][0] == "5min" assert rows[1][-1] == "true" def test_dns_blackhole_csv_evidence_round_trip(tmp_path: Path) -> None: report = eo.evaluate_dns_blackhole( _snap(udp53=7), _snap(udp53=7), lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, sidecar_healthy=True, ) out = tmp_path / "out.csv" eo.write_dns_blackhole_csv_evidence(out, report) with out.open() as fh: rows = list(csv.reader(fh)) assert rows[0][0] == "sidecar_healthy" assert rows[1][1] == "nxdomain" assert rows[1][-1] == "true"