"""Inter-emit interval evaluator for NFT-PERF-02 (AZ-429 / AC-4.4). The SUT promises that estimates are streamed frame-by-frame, NOT batched. The contract is observable at the SITL boundary: the receipt timestamps of consecutive accepted ``GPS_INPUT`` (ArduPilot) / ``MSP2_SENSOR_GPS`` (iNav) messages should track the configured target cadence with little jitter and never miss ≥3 consecutive emits. This module owns the pure-logic side. The scenario test (``e2e/tests/performance/test_nft_perf_02_streaming.py``) is a thin adapter that reads timestamps from ``sitl_observer`` and asks the helpers below for the per-AC verdict. ACs evaluated (per AZ-429): * AC-1: ``p95(inter_emit_interval) ≤ STREAMING_P95_BUDGET_MS`` (=350 ms at the 3 Hz target = inter-frame × 1.05). * AC-2: no window contains ≥``MISSED_EMIT_WINDOW_LIMIT`` (=3) consecutive missed emits, where a "missed emit" is an interval > ``MISSED_EMIT_RATIO`` (=2.0) × target inter-frame. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol; reads only float lists of SITL-side ms timestamps that the scenario adapter projects out of the boundary observers. """ from __future__ import annotations import csv from dataclasses import dataclass from math import floor from pathlib import Path from typing import Iterable, Sequence # AC-1 — inter-frame × 1.05 at 3 Hz target (333.333 ms × 1.05 = 350 ms). TARGET_FRAME_RATE_HZ = 3.0 TARGET_INTER_FRAME_MS = 1000.0 / TARGET_FRAME_RATE_HZ # 333.333... ms STREAMING_P95_BUDGET_MS = 350.0 # AC-2 — a "missed emit" interval is > 2× target = >666 ms at 3 Hz. MISSED_EMIT_RATIO = 2.0 MISSED_EMIT_WINDOW_LIMIT = 3 @dataclass(frozen=True) class InterEmitReport: """Aggregate AC-1 result for one run.""" sample_count: int interval_count: int # = sample_count - 1 p50_ms: float | None p95_ms: float | None p99_ms: float | None max_ms: float | None target_inter_frame_ms: float budget_ms: float @property def passes_p95(self) -> bool: return self.p95_ms is not None and self.p95_ms <= self.budget_ms @dataclass(frozen=True) class MissedEmitWindow: """One run of consecutive missed-emit intervals starting at a sample index.""" start_index: int # index into the SORTED timestamp list (0-based) length: int start_ms: float end_ms: float @dataclass(frozen=True) class MissedEmitReport: """AC-2 result: list of consecutive-missed-emit windows + verdict.""" missed_emit_threshold_ms: float longest_run: int windows: tuple[MissedEmitWindow, ...] limit: int @property def passes(self) -> bool: return self.longest_run < self.limit @dataclass(frozen=True) class StreamingReport: """Aggregate FT-PERF-02 result for one parameterized run.""" inter_emit: InterEmitReport missed_emits: MissedEmitReport @property def passes(self) -> bool: return self.inter_emit.passes_p95 and self.missed_emits.passes def _sorted_intervals_ms(emit_times_ms: Sequence[float]) -> list[float]: """Return positive inter-emit intervals from a sorted timestamp list. Sorting is defensive — sitl_observer emits in monotonic order but the helper must not silently produce negative intervals if a caller hands in an unsorted list. """ if len(emit_times_ms) < 2: return [] ordered = sorted(float(t) for t in emit_times_ms) return [ordered[i] - ordered[i - 1] for i in range(1, len(ordered))] def _percentile(values: Sequence[float], q: float) -> float | None: """Linear-interpolation percentile (``numpy.percentile``-equivalent). Returns ``None`` when ``values`` is empty so callers can distinguish a no-data run from a zero-latency run. Accepts any real ``q`` in [0, 100]; outside that range is a programmer error. """ if not 0.0 <= q <= 100.0: raise ValueError(f"percentile q must be in [0, 100], got {q!r}") if not values: return None ordered = sorted(values) if len(ordered) == 1: return ordered[0] rank = (q / 100.0) * (len(ordered) - 1) lo = floor(rank) hi = min(lo + 1, len(ordered) - 1) frac = rank - lo return ordered[lo] + (ordered[hi] - ordered[lo]) * frac def evaluate_inter_emit( emit_times_ms: Sequence[float], *, target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, budget_ms: float = STREAMING_P95_BUDGET_MS, ) -> InterEmitReport: """AC-1: p95 inter-emit interval ≤ ``budget_ms``. Caller passes the SITL-side receipt timestamps (ms, any epoch — only deltas matter). ``target_inter_frame_ms`` is recorded for the evidence file but does not gate the verdict; ``budget_ms`` does. """ intervals = _sorted_intervals_ms(emit_times_ms) return InterEmitReport( sample_count=len(emit_times_ms), interval_count=len(intervals), p50_ms=_percentile(intervals, 50.0), p95_ms=_percentile(intervals, 95.0), p99_ms=_percentile(intervals, 99.0), max_ms=max(intervals) if intervals else None, target_inter_frame_ms=target_inter_frame_ms, budget_ms=budget_ms, ) def evaluate_missed_emits( emit_times_ms: Sequence[float], *, target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, missed_ratio: float = MISSED_EMIT_RATIO, limit: int = MISSED_EMIT_WINDOW_LIMIT, ) -> MissedEmitReport: """AC-2: longest run of consecutive missed-emit intervals < ``limit``. A "missed emit" is an inter-emit interval that exceeds ``missed_ratio × target_inter_frame_ms``. We collect every maximal run of consecutive missed-emit intervals and the longest length. """ if missed_ratio <= 1.0: raise ValueError( f"missed_ratio must be > 1.0 (was {missed_ratio!r}) — equal or " "below the target stride would flag every interval as missed" ) if limit < 1: raise ValueError(f"limit must be >= 1 (was {limit!r})") threshold = missed_ratio * target_inter_frame_ms ordered = sorted(float(t) for t in emit_times_ms) windows: list[MissedEmitWindow] = [] # `run_start` is the sample index of the FIRST sample of an # in-progress missed-interval run. Number of missed intervals in # the open run after processing iteration `i` is `i - run_start`. run_start: int | None = None run_start_ms: float | None = None longest = 0 for i in range(1, len(ordered)): delta = ordered[i] - ordered[i - 1] if delta > threshold: if run_start is None: run_start = i - 1 run_start_ms = ordered[i - 1] longest = max(longest, i - run_start) elif run_start is not None and run_start_ms is not None: length = (i - 1) - run_start windows.append( MissedEmitWindow( start_index=run_start, length=length, start_ms=run_start_ms, end_ms=ordered[i - 1], ) ) run_start = None run_start_ms = None if run_start is not None and run_start_ms is not None: length = (len(ordered) - 1) - run_start windows.append( MissedEmitWindow( start_index=run_start, length=length, start_ms=run_start_ms, end_ms=ordered[-1], ) ) longest = max(longest, length) return MissedEmitReport( missed_emit_threshold_ms=threshold, longest_run=longest, windows=tuple(windows), limit=limit, ) def evaluate( emit_times_ms: Sequence[float], *, target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, budget_ms: float = STREAMING_P95_BUDGET_MS, missed_ratio: float = MISSED_EMIT_RATIO, limit: int = MISSED_EMIT_WINDOW_LIMIT, ) -> StreamingReport: """Run AC-1 + AC-2 over one boundary-observed emit-time list.""" return StreamingReport( inter_emit=evaluate_inter_emit( emit_times_ms, target_inter_frame_ms=target_inter_frame_ms, budget_ms=budget_ms, ), missed_emits=evaluate_missed_emits( emit_times_ms, target_inter_frame_ms=target_inter_frame_ms, missed_ratio=missed_ratio, limit=limit, ), ) def write_csv_evidence(out_path: Path, report: StreamingReport) -> Path: """One-row evidence file naming the AC-1/AC-2 verdict + percentiles.""" out_path.parent.mkdir(parents=True, exist_ok=True) r = report with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "sample_count", "interval_count", "p50_ms", "p95_ms", "p99_ms", "max_ms", "target_inter_frame_ms", "p95_budget_ms", "ac1_passes", "missed_emit_threshold_ms", "longest_missed_run", "ac2_passes", "passes", ] ) ie = r.inter_emit me = r.missed_emits writer.writerow( [ ie.sample_count, ie.interval_count, "" if ie.p50_ms is None else f"{ie.p50_ms:.3f}", "" if ie.p95_ms is None else f"{ie.p95_ms:.3f}", "" if ie.p99_ms is None else f"{ie.p99_ms:.3f}", "" if ie.max_ms is None else f"{ie.max_ms:.3f}", f"{ie.target_inter_frame_ms:.3f}", f"{ie.budget_ms:.3f}", "true" if ie.passes_p95 else "false", f"{me.missed_emit_threshold_ms:.3f}", me.longest_run, "true" if me.passes else "false", "true" if r.passes else "false", ] ) return out_path def write_intervals_csv(out_path: Path, emit_times_ms: Iterable[float]) -> Path: """Per-interval CSV for evidence (one row per consecutive pair). The aggregate ``write_csv_evidence`` row is the AC verdict; this detail CSV is what a reviewer reads when the budget is breached. """ out_path.parent.mkdir(parents=True, exist_ok=True) ordered = sorted(float(t) for t in emit_times_ms) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow(["index", "t_emit_ms", "inter_emit_ms"]) for i, t in enumerate(ordered): interval = (t - ordered[i - 1]) if i > 0 else "" writer.writerow( [ i, f"{t:.3f}", "" if interval == "" else f"{interval:.3f}", ] ) return out_path