"""End-to-end latency evaluator for NFT-PERF-01 (AZ-428 / AC-4.1). D-CROSS-LATENCY-1 fixes a hard p95 budget of 400 ms across two configurations: * (a) K=3 baseline at +25 °C ambient. * (b) K=2 + Jacobian-cov hybrid auto-degrade at +50 °C ambient. This module owns the pure-logic side: distribution stats, frame-drop accounting (AC-4), and informational per-stage partition recording (AC-5). It does NOT import anything from ``src/gps_denied_onboard``. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path from typing import Sequence from .streaming_evaluator import _percentile LATENCY_P95_BUDGET_MS = 400.0 FRAME_DROP_RATIO_BUDGET = 0.10 DEFAULT_EXPECTED_FRAMES = 900 # 3 Hz × 300 s @dataclass(frozen=True) class FrameLatencySample: """One frame: ``(t_capture_ms, t_emit_at_sitl_ms)`` → latency_ms.""" frame_id: str t_capture_ms: int t_emit_at_sitl_ms: int @property def latency_ms(self) -> float: return float(self.t_emit_at_sitl_ms - self.t_capture_ms) @dataclass(frozen=True) class StagePartition: """Per-stage informational latency record (AC-5 — no hard threshold).""" stage_name: str p50_ms: float | None p95_ms: float | None p99_ms: float | None sample_count: int @dataclass(frozen=True) class LatencyReport: """Aggregate verdict for ONE configuration.""" config_id: str # "k3-25c" / "k2-hybrid-50c" samples: tuple[FrameLatencySample, ...] expected_frame_count: int p50_ms: float | None p95_ms: float | None p99_ms: float | None max_ms: float | None frame_drop_ratio: float stage_partitions: tuple[StagePartition, ...] p95_budget_ms: float frame_drop_budget: float chamber_unavailable: bool @property def sample_count(self) -> int: return len(self.samples) @property def passes_p95(self) -> bool: return self.p95_ms is not None and self.p95_ms <= self.p95_budget_ms @property def passes_frame_drop(self) -> bool: return self.frame_drop_ratio <= self.frame_drop_budget @property def passes(self) -> bool: return self.passes_p95 and self.passes_frame_drop def measure_frame( frame_id: str, *, t_capture_ms: int, t_emit_at_sitl_ms: int ) -> FrameLatencySample: """Project a captured frame into a typed sample. Negative latency is fixture-shape error → fail-loud. """ if t_emit_at_sitl_ms < t_capture_ms: raise ValueError( f"latency frame {frame_id}: t_emit_at_sitl_ms " f"({t_emit_at_sitl_ms}) precedes t_capture_ms " f"({t_capture_ms}); fixture shape invalid" ) return FrameLatencySample( frame_id=frame_id, t_capture_ms=int(t_capture_ms), t_emit_at_sitl_ms=int(t_emit_at_sitl_ms), ) def evaluate( config_id: str, samples: Sequence[FrameLatencySample], stage_samples: dict[str, Sequence[float]] | None = None, *, expected_frame_count: int = DEFAULT_EXPECTED_FRAMES, p95_budget_ms: float = LATENCY_P95_BUDGET_MS, frame_drop_budget: float = FRAME_DROP_RATIO_BUDGET, chamber_unavailable: bool = False, ) -> LatencyReport: """Aggregate ``samples`` (and optional stage partitions) into a verdict. ``stage_samples`` keys = stage names from D-CROSS-LATENCY-1; values = lists of per-frame stage-latency_ms readings. The per-stage p95 is recorded only — AC-5 is informational. """ latencies = [s.latency_ms for s in samples] if expected_frame_count <= 0: raise ValueError( f"expected_frame_count must be >0, got {expected_frame_count}" ) received = min(len(samples), expected_frame_count) drop_ratio = (expected_frame_count - received) / expected_frame_count partitions = _partition_stage_samples(stage_samples or {}) return LatencyReport( config_id=config_id, samples=tuple(samples), expected_frame_count=expected_frame_count, p50_ms=_percentile(latencies, 50.0), p95_ms=_percentile(latencies, 95.0), p99_ms=_percentile(latencies, 99.0), max_ms=max(latencies) if latencies else None, frame_drop_ratio=drop_ratio, stage_partitions=tuple(partitions), p95_budget_ms=p95_budget_ms, frame_drop_budget=frame_drop_budget, chamber_unavailable=chamber_unavailable, ) def _partition_stage_samples( stage_samples: dict[str, Sequence[float]], ) -> list[StagePartition]: partitions: list[StagePartition] = [] for stage_name in sorted(stage_samples.keys()): values = list(stage_samples[stage_name]) partitions.append( StagePartition( stage_name=stage_name, p50_ms=_percentile(values, 50.0), p95_ms=_percentile(values, 95.0), p99_ms=_percentile(values, 99.0), sample_count=len(values), ) ) return partitions def write_csv_evidence(out_path: Path, reports: Sequence[LatencyReport]) -> Path: """One-row-per-config summary.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "config_id", "sample_count", "expected_frame_count", "frame_drop_ratio", "p50_ms", "p95_ms", "p99_ms", "max_ms", "p95_budget_ms", "frame_drop_budget", "chamber_unavailable", "ac2_or_ac3_p95_passes", "ac4_frame_drop_passes", "passes", ] ) for r in reports: writer.writerow( [ r.config_id, r.sample_count, r.expected_frame_count, f"{r.frame_drop_ratio:.4f}", "" if r.p50_ms is None else f"{r.p50_ms:.3f}", "" if r.p95_ms is None else f"{r.p95_ms:.3f}", "" if r.p99_ms is None else f"{r.p99_ms:.3f}", "" if r.max_ms is None else f"{r.max_ms:.3f}", f"{r.p95_budget_ms:.3f}", f"{r.frame_drop_budget:.4f}", "true" if r.chamber_unavailable else "false", "true" if r.passes_p95 else "false", "true" if r.passes_frame_drop else "false", "true" if r.passes else "false", ] ) return out_path def write_per_frame_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path: """One row per frame per config — detail for outlier investigation.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( ["config_id", "frame_id", "t_capture_ms", "t_emit_at_sitl_ms", "latency_ms"] ) for r in reports: for s in r.samples: writer.writerow( [ r.config_id, s.frame_id, s.t_capture_ms, s.t_emit_at_sitl_ms, f"{s.latency_ms:.3f}", ] ) return out_path def write_partition_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path: """Per-stage partition table — AC-5 informational evidence.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( ["config_id", "stage_name", "sample_count", "p50_ms", "p95_ms", "p99_ms"] ) for r in reports: for p in r.stage_partitions: writer.writerow( [ r.config_id, p.stage_name, p.sample_count, "" if p.p50_ms is None else f"{p.p50_ms:.3f}", "" if p.p95_ms is None else f"{p.p95_ms:.3f}", "" if p.p99_ms is None else f"{p.p99_ms:.3f}", ] ) return out_path