"""Cache-poisoning safety-budget evaluator for NFT-SEC-01 (AZ-436 / AC-NEW-9). The contract: across ``N`` synthetic flights — each carrying ``1-5 %`` of *poisoned* tiles (signing-cert mismatch, freshness violation, or voting disagreement) — the SUT MUST NOT silently emit a ``satellite_anchored`` estimate that traces back to a poisoned tile (a *false-trust event*). Aggregate budget (Mode B Fact #103): total_false_trust_events <= N * 1e-6 At default CI N=1000 the budget is 0.001 expected events; the test therefore enforces the strict ``count == 0`` zero-tolerance default. The ``release-gate`` N=10000 run keeps the same zero-tolerance default (the budget allows 0.01 events; one event is already a regression). Per-flight invariants additionally guarded by this evaluator: * poison ratio in ``[POISON_RATIO_MIN, POISON_RATIO_MAX]`` (AC-2); * at least one poisoned tile per **defense layer** in every flight (AC-2 — each layer must be exercised so a per-layer regression cannot hide behind layer-confusion); * per-event ``rejection_reason`` is one of the documented categories (so an unlabeled false-positive cannot pass as a "rejection" in CSV evidence). Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. The evaluator only consumes runner-collected counter objects. """ from __future__ import annotations import csv from dataclasses import dataclass, field from pathlib import Path from typing import Sequence FALSE_TRUST_BUDGET_PER_FLIGHT = 1e-6 POISON_RATIO_MIN = 0.01 POISON_RATIO_MAX = 0.05 DEFENSE_LAYER_SIGNING = "signing_cert_mismatch" DEFENSE_LAYER_FRESHNESS = "freshness_violation" DEFENSE_LAYER_VOTING = "voting_disagreement" DEFENSE_LAYERS: tuple[str, ...] = ( DEFENSE_LAYER_SIGNING, DEFENSE_LAYER_FRESHNESS, DEFENSE_LAYER_VOTING, ) REJECTION_REASONS: frozenset[str] = frozenset( { DEFENSE_LAYER_SIGNING, DEFENSE_LAYER_FRESHNESS, DEFENSE_LAYER_VOTING, "freshness_gate_downgrade", "service_voting_downgrade", } ) @dataclass(frozen=True) class PoisonedTileSpec: """One poisoned tile slotted into a flight's tile cache.""" tile_id: str defense_layer: str # MUST be one of ``DEFENSE_LAYERS`` @dataclass(frozen=True) class FalseTrustEvent: """A frame where the SUT emitted ``satellite_anchored`` traced to a poisoned tile.""" flight_id: str tile_id: str monotonic_ms: int defense_layer: str @dataclass(frozen=True) class FlightOutcome: """One synthetic-flight result. ``total_tile_count`` is the **cache size for that flight** (used to compute the poison ratio). ``poisoned_tiles`` is the slate of crafted tiles injected. ``false_trust_events`` are the runner-observed frames where the SUT trusted a poisoned tile. ``rejection_reasons`` is a counter of how often each documented rejection-reason fired, taken from the runner's outbound ``source_label`` capture + FDR signing-rejection events. Used in evidence only (no AC assertion); a flight with zero rejection events is suspicious but not necessarily a failure (the SUT may have downgraded the candidate without naming the cause). """ flight_id: str total_tile_count: int poisoned_tiles: Sequence[PoisonedTileSpec] false_trust_events: Sequence[FalseTrustEvent] rejection_reasons: dict[str, int] = field(default_factory=dict) @property def poison_ratio(self) -> float: if self.total_tile_count <= 0: return 0.0 return len(self.poisoned_tiles) / self.total_tile_count @property def defense_layers_present(self) -> set[str]: return {p.defense_layer for p in self.poisoned_tiles} @property def false_trust_count(self) -> int: return len(self.false_trust_events) def has_unknown_rejection_reasons(self) -> bool: return any(r not in REJECTION_REASONS for r in self.rejection_reasons) @dataclass(frozen=True) class CachePoisoningReport: """Aggregate verdict over N flights.""" flights: Sequence[FlightOutcome] flight_count: int total_false_trust: int flights_with_bad_poison_ratio: Sequence[str] flights_missing_defense_layers: Sequence[tuple[str, list[str]]] flights_with_unknown_rejection_reasons: Sequence[str] @property def budget(self) -> float: return self.flight_count * FALSE_TRUST_BUDGET_PER_FLIGHT @property def passes_budget(self) -> bool: # Zero-tolerance default: the budget at N=1000 is 0.001 expected # events; one observed event is already a regression even on a # purely floating-point reading of the budget. return self.total_false_trust == 0 @property def passes_ratio(self) -> bool: return len(self.flights_with_bad_poison_ratio) == 0 @property def passes_layer_coverage(self) -> bool: return len(self.flights_missing_defense_layers) == 0 @property def passes_rejection_reason_vocabulary(self) -> bool: return len(self.flights_with_unknown_rejection_reasons) == 0 @property def passes(self) -> bool: return ( self.passes_budget and self.passes_ratio and self.passes_layer_coverage and self.passes_rejection_reason_vocabulary ) def evaluate(flights: Sequence[FlightOutcome]) -> CachePoisoningReport: """Compute the aggregate AC-1..AC-3 verdict for one Monte Carlo run.""" bad_ratio: list[str] = [] missing_layers: list[tuple[str, list[str]]] = [] unknown_reasons: list[str] = [] total_false_trust = 0 for flight in flights: total_false_trust += flight.false_trust_count ratio = flight.poison_ratio if not (POISON_RATIO_MIN <= ratio <= POISON_RATIO_MAX): bad_ratio.append(flight.flight_id) missing = sorted(set(DEFENSE_LAYERS) - flight.defense_layers_present) if missing: missing_layers.append((flight.flight_id, missing)) if flight.has_unknown_rejection_reasons(): unknown_reasons.append(flight.flight_id) return CachePoisoningReport( flights=tuple(flights), flight_count=len(flights), total_false_trust=total_false_trust, flights_with_bad_poison_ratio=tuple(bad_ratio), flights_missing_defense_layers=tuple(missing_layers), flights_with_unknown_rejection_reasons=tuple(unknown_reasons), ) def write_csv_evidence(out_path: Path, report: CachePoisoningReport) -> Path: """Per-flight CSV — one row per flight + an aggregate footer row.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "flight_id", "total_tile_count", "poisoned_tile_count", "poison_ratio", "defense_layers_present", "false_trust_count", "rejection_reason_breakdown", ] ) for flight in report.flights: layers_present = ",".join(sorted(flight.defense_layers_present)) or "" breakdown = ";".join( f"{reason}={count}" for reason, count in sorted(flight.rejection_reasons.items()) ) writer.writerow( [ flight.flight_id, flight.total_tile_count, len(flight.poisoned_tiles), f"{flight.poison_ratio:.4f}", layers_present, flight.false_trust_count, breakdown, ] ) writer.writerow([]) writer.writerow( [ "AGGREGATE", f"flight_count={report.flight_count}", f"total_false_trust={report.total_false_trust}", f"budget={report.budget:g}", f"passes_budget={'true' if report.passes_budget else 'false'}", f"passes_ratio={'true' if report.passes_ratio else 'false'}", f"passes_layer_coverage={'true' if report.passes_layer_coverage else 'false'}", ] ) return out_path