Files
Oleksandr Bezdieniezhnykh 73cd632e95 [AZ-428] [AZ-429] [AZ-430] [AZ-431] Add NFT-PERF-01..04 perf scenarios
Batch 85 — 4 Performance NFT scenarios + pure-logic evaluators.

- NFT-PERF-01 (AZ-428, Tier-2): two-config e2e latency p95 ≤ 400 ms
  (K=3@25°C, K=2 hybrid@50°C) + frame-drop ≤10% + informational per-stage
  partition recording (D-CROSS-LATENCY-1).
- NFT-PERF-02 (AZ-429): inter-emit p95 ≤ 350 ms + no ≥3 missed-emit
  windows. fc-adapter-aware SITL timestamp extraction (tlog vs MSP).
- NFT-PERF-03 (AZ-430, Tier-2): cold-start TTFF p95 ≤ 30 s AND max ≤ 45 s
  over N≥10 iterations.
- NFT-PERF-04 (AZ-431): spoof-promotion latency p95 ≤ 600 ms over N≥20
  randomized-start blackout+spoof events.

All scenarios consume external fixtures (AZ-595 dependency surfaced) and
fail loudly when fixtures are missing or empty. Public-boundary
discipline preserved — evaluators do NOT import src/gps_denied_onboard.

Tests: 60 new unit tests pass; 24 scenarios collect (4 tests × 2 fc × 3
vio). Code review: PASS_WITH_WARNINGS — 1 Medium (fixed in batch),
3 Low (production-dependency surfacings + future hygiene).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 16:46:49 +03:00

315 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Inter-emit interval evaluator for NFT-PERF-02 (AZ-429 / AC-4.4).
The SUT promises that estimates are streamed frame-by-frame, NOT batched.
The contract is observable at the SITL boundary: the receipt timestamps of
consecutive accepted ``GPS_INPUT`` (ArduPilot) / ``MSP2_SENSOR_GPS``
(iNav) messages should track the configured target cadence with little
jitter and never miss ≥3 consecutive emits.
This module owns the pure-logic side. The scenario test
(``e2e/tests/performance/test_nft_perf_02_streaming.py``) is a thin
adapter that reads timestamps from ``sitl_observer`` and asks the
helpers below for the per-AC verdict.
ACs evaluated (per AZ-429):
* AC-1: ``p95(inter_emit_interval) ≤ STREAMING_P95_BUDGET_MS`` (=350 ms
at the 3 Hz target = inter-frame × 1.05).
* AC-2: no window contains ≥``MISSED_EMIT_WINDOW_LIMIT`` (=3) consecutive
missed emits, where a "missed emit" is an interval >
``MISSED_EMIT_RATIO`` (=2.0) × target inter-frame.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol; reads only float lists of SITL-side
ms timestamps that the scenario adapter projects out of the boundary
observers.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from math import floor
from pathlib import Path
from typing import Iterable, Sequence
# AC-1 — inter-frame × 1.05 at 3 Hz target (333.333 ms × 1.05 = 350 ms).
TARGET_FRAME_RATE_HZ = 3.0
TARGET_INTER_FRAME_MS = 1000.0 / TARGET_FRAME_RATE_HZ # 333.333... ms
STREAMING_P95_BUDGET_MS = 350.0
# AC-2 — a "missed emit" interval is > 2× target = >666 ms at 3 Hz.
MISSED_EMIT_RATIO = 2.0
MISSED_EMIT_WINDOW_LIMIT = 3
@dataclass(frozen=True)
class InterEmitReport:
"""Aggregate AC-1 result for one run."""
sample_count: int
interval_count: int # = sample_count - 1
p50_ms: float | None
p95_ms: float | None
p99_ms: float | None
max_ms: float | None
target_inter_frame_ms: float
budget_ms: float
@property
def passes_p95(self) -> bool:
return self.p95_ms is not None and self.p95_ms <= self.budget_ms
@dataclass(frozen=True)
class MissedEmitWindow:
"""One run of consecutive missed-emit intervals starting at a sample index."""
start_index: int # index into the SORTED timestamp list (0-based)
length: int
start_ms: float
end_ms: float
@dataclass(frozen=True)
class MissedEmitReport:
"""AC-2 result: list of consecutive-missed-emit windows + verdict."""
missed_emit_threshold_ms: float
longest_run: int
windows: tuple[MissedEmitWindow, ...]
limit: int
@property
def passes(self) -> bool:
return self.longest_run < self.limit
@dataclass(frozen=True)
class StreamingReport:
"""Aggregate FT-PERF-02 result for one parameterized run."""
inter_emit: InterEmitReport
missed_emits: MissedEmitReport
@property
def passes(self) -> bool:
return self.inter_emit.passes_p95 and self.missed_emits.passes
def _sorted_intervals_ms(emit_times_ms: Sequence[float]) -> list[float]:
"""Return positive inter-emit intervals from a sorted timestamp list.
Sorting is defensive — sitl_observer emits in monotonic order but the
helper must not silently produce negative intervals if a caller hands
in an unsorted list.
"""
if len(emit_times_ms) < 2:
return []
ordered = sorted(float(t) for t in emit_times_ms)
return [ordered[i] - ordered[i - 1] for i in range(1, len(ordered))]
def _percentile(values: Sequence[float], q: float) -> float | None:
"""Linear-interpolation percentile (``numpy.percentile``-equivalent).
Returns ``None`` when ``values`` is empty so callers can distinguish
a no-data run from a zero-latency run. Accepts any real ``q`` in
[0, 100]; outside that range is a programmer error.
"""
if not 0.0 <= q <= 100.0:
raise ValueError(f"percentile q must be in [0, 100], got {q!r}")
if not values:
return None
ordered = sorted(values)
if len(ordered) == 1:
return ordered[0]
rank = (q / 100.0) * (len(ordered) - 1)
lo = floor(rank)
hi = min(lo + 1, len(ordered) - 1)
frac = rank - lo
return ordered[lo] + (ordered[hi] - ordered[lo]) * frac
def evaluate_inter_emit(
emit_times_ms: Sequence[float],
*,
target_inter_frame_ms: float = TARGET_INTER_FRAME_MS,
budget_ms: float = STREAMING_P95_BUDGET_MS,
) -> InterEmitReport:
"""AC-1: p95 inter-emit interval ≤ ``budget_ms``.
Caller passes the SITL-side receipt timestamps (ms, any epoch — only
deltas matter). ``target_inter_frame_ms`` is recorded for the
evidence file but does not gate the verdict; ``budget_ms`` does.
"""
intervals = _sorted_intervals_ms(emit_times_ms)
return InterEmitReport(
sample_count=len(emit_times_ms),
interval_count=len(intervals),
p50_ms=_percentile(intervals, 50.0),
p95_ms=_percentile(intervals, 95.0),
p99_ms=_percentile(intervals, 99.0),
max_ms=max(intervals) if intervals else None,
target_inter_frame_ms=target_inter_frame_ms,
budget_ms=budget_ms,
)
def evaluate_missed_emits(
emit_times_ms: Sequence[float],
*,
target_inter_frame_ms: float = TARGET_INTER_FRAME_MS,
missed_ratio: float = MISSED_EMIT_RATIO,
limit: int = MISSED_EMIT_WINDOW_LIMIT,
) -> MissedEmitReport:
"""AC-2: longest run of consecutive missed-emit intervals < ``limit``.
A "missed emit" is an inter-emit interval that exceeds
``missed_ratio × target_inter_frame_ms``. We collect every maximal
run of consecutive missed-emit intervals and the longest length.
"""
if missed_ratio <= 1.0:
raise ValueError(
f"missed_ratio must be > 1.0 (was {missed_ratio!r}) — equal or "
"below the target stride would flag every interval as missed"
)
if limit < 1:
raise ValueError(f"limit must be >= 1 (was {limit!r})")
threshold = missed_ratio * target_inter_frame_ms
ordered = sorted(float(t) for t in emit_times_ms)
windows: list[MissedEmitWindow] = []
# `run_start` is the sample index of the FIRST sample of an
# in-progress missed-interval run. Number of missed intervals in
# the open run after processing iteration `i` is `i - run_start`.
run_start: int | None = None
run_start_ms: float | None = None
longest = 0
for i in range(1, len(ordered)):
delta = ordered[i] - ordered[i - 1]
if delta > threshold:
if run_start is None:
run_start = i - 1
run_start_ms = ordered[i - 1]
longest = max(longest, i - run_start)
elif run_start is not None and run_start_ms is not None:
length = (i - 1) - run_start
windows.append(
MissedEmitWindow(
start_index=run_start,
length=length,
start_ms=run_start_ms,
end_ms=ordered[i - 1],
)
)
run_start = None
run_start_ms = None
if run_start is not None and run_start_ms is not None:
length = (len(ordered) - 1) - run_start
windows.append(
MissedEmitWindow(
start_index=run_start,
length=length,
start_ms=run_start_ms,
end_ms=ordered[-1],
)
)
longest = max(longest, length)
return MissedEmitReport(
missed_emit_threshold_ms=threshold,
longest_run=longest,
windows=tuple(windows),
limit=limit,
)
def evaluate(
emit_times_ms: Sequence[float],
*,
target_inter_frame_ms: float = TARGET_INTER_FRAME_MS,
budget_ms: float = STREAMING_P95_BUDGET_MS,
missed_ratio: float = MISSED_EMIT_RATIO,
limit: int = MISSED_EMIT_WINDOW_LIMIT,
) -> StreamingReport:
"""Run AC-1 + AC-2 over one boundary-observed emit-time list."""
return StreamingReport(
inter_emit=evaluate_inter_emit(
emit_times_ms,
target_inter_frame_ms=target_inter_frame_ms,
budget_ms=budget_ms,
),
missed_emits=evaluate_missed_emits(
emit_times_ms,
target_inter_frame_ms=target_inter_frame_ms,
missed_ratio=missed_ratio,
limit=limit,
),
)
def write_csv_evidence(out_path: Path, report: StreamingReport) -> Path:
"""One-row evidence file naming the AC-1/AC-2 verdict + percentiles."""
out_path.parent.mkdir(parents=True, exist_ok=True)
r = report
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"sample_count",
"interval_count",
"p50_ms",
"p95_ms",
"p99_ms",
"max_ms",
"target_inter_frame_ms",
"p95_budget_ms",
"ac1_passes",
"missed_emit_threshold_ms",
"longest_missed_run",
"ac2_passes",
"passes",
]
)
ie = r.inter_emit
me = r.missed_emits
writer.writerow(
[
ie.sample_count,
ie.interval_count,
"" if ie.p50_ms is None else f"{ie.p50_ms:.3f}",
"" if ie.p95_ms is None else f"{ie.p95_ms:.3f}",
"" if ie.p99_ms is None else f"{ie.p99_ms:.3f}",
"" if ie.max_ms is None else f"{ie.max_ms:.3f}",
f"{ie.target_inter_frame_ms:.3f}",
f"{ie.budget_ms:.3f}",
"true" if ie.passes_p95 else "false",
f"{me.missed_emit_threshold_ms:.3f}",
me.longest_run,
"true" if me.passes else "false",
"true" if r.passes else "false",
]
)
return out_path
def write_intervals_csv(out_path: Path, emit_times_ms: Iterable[float]) -> Path:
"""Per-interval CSV for evidence (one row per consecutive pair).
The aggregate ``write_csv_evidence`` row is the AC verdict; this
detail CSV is what a reviewer reads when the budget is breached.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
ordered = sorted(float(t) for t in emit_times_ms)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["index", "t_emit_ms", "inter_emit_ms"])
for i, t in enumerate(ordered):
interval = (t - ordered[i - 1]) if i > 0 else ""
writer.writerow(
[
i,
f"{t:.3f}",
"" if interval == "" else f"{interval:.3f}",
]
)
return out_path