"""Spoofing-promotion latency evaluator for NFT-PERF-04 (AZ-431 / AC-NEW-2). Per AC-NEW-2 the time from a blackout+spoof event to the SUT correctly labeling its emission ``dead_reckoned`` must satisfy ``p95(latency) ≤ SPOOF_PROMOTION_BUDGET_MS`` (=600 ms). The scenario test gathers N≥``MIN_EVENT_COUNT`` events at randomized window starts (the random sampling is owned by the fixture builder — AZ-431 is statistical, FT-N-04 / AZ-426 is functional), measures the per-event ``t_label_switch_to_dead_reckoned − t_blackout_onset``, and runs the aggregate p95 check via ``evaluate``. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. Reuses ``runner.helpers.streaming_evaluator._percentile`` for the linear- interpolation p95 — both NFT-PERF tests measure latencies as the same shape of distribution. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Sequence from .streaming_evaluator import _percentile # AC-NEW-2 budget — 600 ms on Tier-1 or Tier-2. SPOOF_PROMOTION_BUDGET_MS = 600.0 # Statistical confidence floor — AZ-431 spec sets N=20 as default. MIN_EVENT_COUNT = 20 DEAD_RECKONED_LABEL = "dead_reckoned" @dataclass(frozen=True) class OutboundLabelSample: """One SUT outbound emission projected for AC-NEW-2.""" monotonic_ms: int source_label: str @dataclass(frozen=True) class SpoofEvent: """One blackout+spoof event and the labels observed afterwards. ``samples`` should cover at least the window starting at ``blackout_onset_ms`` and extending past the expected first ``dead_reckoned`` emission. The evaluator scans them in order. """ event_id: str blackout_onset_ms: int samples: Sequence[OutboundLabelSample] @dataclass(frozen=True) class EventLatencyReport: """Per-event latency outcome. ``latency_ms`` is ``None`` when no ``dead_reckoned`` emission was observed after ``blackout_onset_ms`` — that's a categorical miss (treated as a budget breach for the aggregate verdict). """ event_id: str blackout_onset_ms: int first_dead_reckoned_ms: int | None latency_ms: int | None @property def has_promotion(self) -> bool: return self.first_dead_reckoned_ms is not None @dataclass(frozen=True) class SpoofPromotionReport: """Aggregate NFT-PERF-04 result over N events.""" events: tuple[EventLatencyReport, ...] p50_ms: float | None p95_ms: float | None p99_ms: float | None max_ms: float | None missing_promotions: int min_event_count: int budget_ms: float @property def event_count(self) -> int: return len(self.events) @property def passes_event_count(self) -> bool: return self.event_count >= self.min_event_count @property def passes_p95(self) -> bool: return ( self.missing_promotions == 0 and self.p95_ms is not None and self.p95_ms <= self.budget_ms ) @property def passes(self) -> bool: return self.passes_event_count and self.passes_p95 def measure_event_latency(event: SpoofEvent) -> EventLatencyReport: """Compute promotion latency for one event. Walks ``event.samples`` in ascending ``monotonic_ms``, finds the first sample with ``source_label == "dead_reckoned"`` AND ``monotonic_ms >= blackout_onset_ms``, and returns ``first_dead_reckoned_ms − blackout_onset_ms``. Returns ``None`` for both ``first_dead_reckoned_ms`` and ``latency_ms`` if no such sample exists. """ ordered = sorted(event.samples, key=lambda s: s.monotonic_ms) for s in ordered: if s.monotonic_ms < event.blackout_onset_ms: continue if s.source_label == DEAD_RECKONED_LABEL: return EventLatencyReport( event_id=event.event_id, blackout_onset_ms=event.blackout_onset_ms, first_dead_reckoned_ms=int(s.monotonic_ms), latency_ms=int(s.monotonic_ms - event.blackout_onset_ms), ) return EventLatencyReport( event_id=event.event_id, blackout_onset_ms=event.blackout_onset_ms, first_dead_reckoned_ms=None, latency_ms=None, ) def evaluate( events: Sequence[SpoofEvent], *, budget_ms: float = SPOOF_PROMOTION_BUDGET_MS, min_event_count: int = MIN_EVENT_COUNT, ) -> SpoofPromotionReport: """AC-1 (N events sampled) + AC-2 (p95 latency ≤ budget).""" per_event = tuple(measure_event_latency(e) for e in events) valid = [r.latency_ms for r in per_event if r.latency_ms is not None] missing = sum(1 for r in per_event if not r.has_promotion) return SpoofPromotionReport( events=per_event, p50_ms=_percentile(valid, 50.0), p95_ms=_percentile(valid, 95.0), p99_ms=_percentile(valid, 99.0), max_ms=max(valid) if valid else None, missing_promotions=missing, min_event_count=min_event_count, budget_ms=budget_ms, ) def write_csv_evidence(out_path: Path, report: SpoofPromotionReport) -> Path: """Aggregate-summary CSV (one row per run).""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "event_count", "min_event_count", "missing_promotions", "p50_ms", "p95_ms", "p99_ms", "max_ms", "budget_ms", "ac1_passes", "ac2_passes", "passes", ] ) writer.writerow( [ report.event_count, report.min_event_count, report.missing_promotions, "" if report.p50_ms is None else f"{report.p50_ms:.3f}", "" if report.p95_ms is None else f"{report.p95_ms:.3f}", "" if report.p99_ms is None else f"{report.p99_ms:.3f}", "" if report.max_ms is None else f"{report.max_ms:.3f}", f"{report.budget_ms:.3f}", "true" if report.passes_event_count else "false", "true" if report.passes_p95 else "false", "true" if report.passes else "false", ] ) return out_path def write_per_event_csv(out_path: Path, report: SpoofPromotionReport) -> Path: """Detail CSV: one row per event with onset / first-dead-reckoned / latency.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "event_id", "blackout_onset_ms", "first_dead_reckoned_ms", "latency_ms", ] ) for r in report.events: writer.writerow( [ r.event_id, r.blackout_onset_ms, "" if r.first_dead_reckoned_ms is None else r.first_dead_reckoned_ms, "" if r.latency_ms is None else r.latency_ms, ] ) return out_path