"""Egress-observation evaluator shared by NFT-SEC-02 and NFT-SEC-05 (AZ-437). Both scenarios verify the same invariant: **no packets leave the e2e Docker network** from the SUT container. NFT-SEC-02 reads the Docker network-stats counter over a 5-min Derkachi replay. NFT-SEC-05 runs a ``nslookup`` probe inside the SUT container's network namespace and checks that (a) the lookup fails and (b) no UDP-53 packets escape the host's outbound interface during the probe. The observation pattern is identical in both cases: take a *before* counter snapshot, run the workload, take an *after* snapshot, assert ``after - before == 0`` for the relevant counter family. The runner is responsible for the actual ``docker network inspect`` / ``ip -s link`` collection; this helper only performs the delta + verdict logic so the scenario code stays tight and the verdict logic is unit-testable in isolation. DNS-resolution outcome categories follow the spec's wording (NXDOMAIN, timeout, "no servers can be reached") + a generic "other failure" bucket for resolver implementations that emit a different string but still fail. A *success* outcome — i.e. an actual A record returned — is the only failing case. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv from dataclasses import dataclass from enum import Enum from pathlib import Path class DnsLookupOutcome(str, Enum): """The runner-classified outcome of a ``nslookup`` probe.""" NXDOMAIN = "nxdomain" TIMEOUT = "timeout" NO_SERVERS = "no_servers_can_be_reached" OTHER_FAILURE = "other_failure" SUCCESS = "success" # the only outcome that fails the AC FAILING_DNS_OUTCOMES: frozenset[DnsLookupOutcome] = frozenset( { DnsLookupOutcome.NXDOMAIN, DnsLookupOutcome.TIMEOUT, DnsLookupOutcome.NO_SERVERS, DnsLookupOutcome.OTHER_FAILURE, } ) @dataclass(frozen=True) class EgressCounterSnapshot: """One snapshot of egress-byte / packet counters on the SUT-facing interface.""" egress_packets_to_internal_net: int egress_packets_to_other_destinations: int udp53_egress_packets: int def __post_init__(self) -> None: for field_name in ( "egress_packets_to_internal_net", "egress_packets_to_other_destinations", "udp53_egress_packets", ): value = getattr(self, field_name) if value < 0: raise ValueError( f"egress counter {field_name!r} cannot be negative; got {value}" ) @dataclass(frozen=True) class NoEgressReport: """NFT-SEC-02 verdict — zero packets to non-internal destinations during the window.""" before: EgressCounterSnapshot after: EgressCounterSnapshot window_label: str # e.g. "5min-derkachi-replay" @property def delta_other_destinations(self) -> int: return ( self.after.egress_packets_to_other_destinations - self.before.egress_packets_to_other_destinations ) @property def delta_internal(self) -> int: return ( self.after.egress_packets_to_internal_net - self.before.egress_packets_to_internal_net ) @property def passes(self) -> bool: return self.delta_other_destinations == 0 @dataclass(frozen=True) class DnsBlackholeReport: """NFT-SEC-05 verdict — lookup fails AND no UDP-53 packets escape.""" before: EgressCounterSnapshot after: EgressCounterSnapshot lookup_outcome: DnsLookupOutcome sidecar_healthy: bool @property def delta_udp53(self) -> int: return self.after.udp53_egress_packets - self.before.udp53_egress_packets @property def passes_lookup(self) -> bool: return self.lookup_outcome in FAILING_DNS_OUTCOMES @property def passes_udp_silence(self) -> bool: return self.delta_udp53 == 0 @property def passes(self) -> bool: return ( self.sidecar_healthy and self.passes_lookup and self.passes_udp_silence ) def evaluate_no_egress( before: EgressCounterSnapshot, after: EgressCounterSnapshot, *, window_label: str, ) -> NoEgressReport: """AC-1 verdict for NFT-SEC-02.""" return NoEgressReport(before=before, after=after, window_label=window_label) def evaluate_dns_blackhole( before: EgressCounterSnapshot, after: EgressCounterSnapshot, *, lookup_outcome: DnsLookupOutcome, sidecar_healthy: bool, ) -> DnsBlackholeReport: """AC-2 + AC-3 verdict for NFT-SEC-05.""" return DnsBlackholeReport( before=before, after=after, lookup_outcome=lookup_outcome, sidecar_healthy=sidecar_healthy, ) def write_no_egress_csv_evidence(out_path: Path, report: NoEgressReport) -> Path: out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "window_label", "before_other", "after_other", "delta_other", "before_internal", "after_internal", "delta_internal", "passes", ] ) writer.writerow( [ report.window_label, report.before.egress_packets_to_other_destinations, report.after.egress_packets_to_other_destinations, report.delta_other_destinations, report.before.egress_packets_to_internal_net, report.after.egress_packets_to_internal_net, report.delta_internal, "true" if report.passes else "false", ] ) return out_path def write_dns_blackhole_csv_evidence( out_path: Path, report: DnsBlackholeReport ) -> Path: out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "sidecar_healthy", "lookup_outcome", "passes_lookup", "before_udp53", "after_udp53", "delta_udp53", "passes_udp_silence", "passes", ] ) writer.writerow( [ "true" if report.sidecar_healthy else "false", report.lookup_outcome.value, "true" if report.passes_lookup else "false", report.before.udp53_egress_packets, report.after.udp53_egress_packets, report.delta_udp53, "true" if report.passes_udp_silence else "false", "true" if report.passes else "false", ] ) return out_path