Files
Oleksandr Bezdieniezhnykh 73cd632e95 [AZ-428] [AZ-429] [AZ-430] [AZ-431] Add NFT-PERF-01..04 perf scenarios
Batch 85 — 4 Performance NFT scenarios + pure-logic evaluators.

- NFT-PERF-01 (AZ-428, Tier-2): two-config e2e latency p95 ≤ 400 ms
  (K=3@25°C, K=2 hybrid@50°C) + frame-drop ≤10% + informational per-stage
  partition recording (D-CROSS-LATENCY-1).
- NFT-PERF-02 (AZ-429): inter-emit p95 ≤ 350 ms + no ≥3 missed-emit
  windows. fc-adapter-aware SITL timestamp extraction (tlog vs MSP).
- NFT-PERF-03 (AZ-430, Tier-2): cold-start TTFF p95 ≤ 30 s AND max ≤ 45 s
  over N≥10 iterations.
- NFT-PERF-04 (AZ-431): spoof-promotion latency p95 ≤ 600 ms over N≥20
  randomized-start blackout+spoof events.

All scenarios consume external fixtures (AZ-595 dependency surfaced) and
fail loudly when fixtures are missing or empty. Public-boundary
discipline preserved — evaluators do NOT import src/gps_denied_onboard.

Tests: 60 new unit tests pass; 24 scenarios collect (4 tests × 2 fc × 3
vio). Code review: PASS_WITH_WARNINGS — 1 Medium (fixed in batch),
3 Low (production-dependency surfacings + future hygiene).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 16:46:49 +03:00

252 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""End-to-end latency evaluator for NFT-PERF-01 (AZ-428 / AC-4.1).
D-CROSS-LATENCY-1 fixes a hard p95 budget of 400 ms across two
configurations:
* (a) K=3 baseline at +25 °C ambient.
* (b) K=2 + Jacobian-cov hybrid auto-degrade at +50 °C ambient.
This module owns the pure-logic side: distribution stats, frame-drop
accounting (AC-4), and informational per-stage partition recording
(AC-5). It does NOT import anything from ``src/gps_denied_onboard``.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
from .streaming_evaluator import _percentile
LATENCY_P95_BUDGET_MS = 400.0
FRAME_DROP_RATIO_BUDGET = 0.10
DEFAULT_EXPECTED_FRAMES = 900 # 3 Hz × 300 s
@dataclass(frozen=True)
class FrameLatencySample:
"""One frame: ``(t_capture_ms, t_emit_at_sitl_ms)`` → latency_ms."""
frame_id: str
t_capture_ms: int
t_emit_at_sitl_ms: int
@property
def latency_ms(self) -> float:
return float(self.t_emit_at_sitl_ms - self.t_capture_ms)
@dataclass(frozen=True)
class StagePartition:
"""Per-stage informational latency record (AC-5 — no hard threshold)."""
stage_name: str
p50_ms: float | None
p95_ms: float | None
p99_ms: float | None
sample_count: int
@dataclass(frozen=True)
class LatencyReport:
"""Aggregate verdict for ONE configuration."""
config_id: str # "k3-25c" / "k2-hybrid-50c"
samples: tuple[FrameLatencySample, ...]
expected_frame_count: int
p50_ms: float | None
p95_ms: float | None
p99_ms: float | None
max_ms: float | None
frame_drop_ratio: float
stage_partitions: tuple[StagePartition, ...]
p95_budget_ms: float
frame_drop_budget: float
chamber_unavailable: bool
@property
def sample_count(self) -> int:
return len(self.samples)
@property
def passes_p95(self) -> bool:
return self.p95_ms is not None and self.p95_ms <= self.p95_budget_ms
@property
def passes_frame_drop(self) -> bool:
return self.frame_drop_ratio <= self.frame_drop_budget
@property
def passes(self) -> bool:
return self.passes_p95 and self.passes_frame_drop
def measure_frame(
frame_id: str, *, t_capture_ms: int, t_emit_at_sitl_ms: int
) -> FrameLatencySample:
"""Project a captured frame into a typed sample.
Negative latency is fixture-shape error → fail-loud.
"""
if t_emit_at_sitl_ms < t_capture_ms:
raise ValueError(
f"latency frame {frame_id}: t_emit_at_sitl_ms "
f"({t_emit_at_sitl_ms}) precedes t_capture_ms "
f"({t_capture_ms}); fixture shape invalid"
)
return FrameLatencySample(
frame_id=frame_id,
t_capture_ms=int(t_capture_ms),
t_emit_at_sitl_ms=int(t_emit_at_sitl_ms),
)
def evaluate(
config_id: str,
samples: Sequence[FrameLatencySample],
stage_samples: dict[str, Sequence[float]] | None = None,
*,
expected_frame_count: int = DEFAULT_EXPECTED_FRAMES,
p95_budget_ms: float = LATENCY_P95_BUDGET_MS,
frame_drop_budget: float = FRAME_DROP_RATIO_BUDGET,
chamber_unavailable: bool = False,
) -> LatencyReport:
"""Aggregate ``samples`` (and optional stage partitions) into a verdict.
``stage_samples`` keys = stage names from D-CROSS-LATENCY-1; values
= lists of per-frame stage-latency_ms readings. The per-stage p95 is
recorded only — AC-5 is informational.
"""
latencies = [s.latency_ms for s in samples]
if expected_frame_count <= 0:
raise ValueError(
f"expected_frame_count must be >0, got {expected_frame_count}"
)
received = min(len(samples), expected_frame_count)
drop_ratio = (expected_frame_count - received) / expected_frame_count
partitions = _partition_stage_samples(stage_samples or {})
return LatencyReport(
config_id=config_id,
samples=tuple(samples),
expected_frame_count=expected_frame_count,
p50_ms=_percentile(latencies, 50.0),
p95_ms=_percentile(latencies, 95.0),
p99_ms=_percentile(latencies, 99.0),
max_ms=max(latencies) if latencies else None,
frame_drop_ratio=drop_ratio,
stage_partitions=tuple(partitions),
p95_budget_ms=p95_budget_ms,
frame_drop_budget=frame_drop_budget,
chamber_unavailable=chamber_unavailable,
)
def _partition_stage_samples(
stage_samples: dict[str, Sequence[float]],
) -> list[StagePartition]:
partitions: list[StagePartition] = []
for stage_name in sorted(stage_samples.keys()):
values = list(stage_samples[stage_name])
partitions.append(
StagePartition(
stage_name=stage_name,
p50_ms=_percentile(values, 50.0),
p95_ms=_percentile(values, 95.0),
p99_ms=_percentile(values, 99.0),
sample_count=len(values),
)
)
return partitions
def write_csv_evidence(out_path: Path, reports: Sequence[LatencyReport]) -> Path:
"""One-row-per-config summary."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"config_id",
"sample_count",
"expected_frame_count",
"frame_drop_ratio",
"p50_ms",
"p95_ms",
"p99_ms",
"max_ms",
"p95_budget_ms",
"frame_drop_budget",
"chamber_unavailable",
"ac2_or_ac3_p95_passes",
"ac4_frame_drop_passes",
"passes",
]
)
for r in reports:
writer.writerow(
[
r.config_id,
r.sample_count,
r.expected_frame_count,
f"{r.frame_drop_ratio:.4f}",
"" if r.p50_ms is None else f"{r.p50_ms:.3f}",
"" if r.p95_ms is None else f"{r.p95_ms:.3f}",
"" if r.p99_ms is None else f"{r.p99_ms:.3f}",
"" if r.max_ms is None else f"{r.max_ms:.3f}",
f"{r.p95_budget_ms:.3f}",
f"{r.frame_drop_budget:.4f}",
"true" if r.chamber_unavailable else "false",
"true" if r.passes_p95 else "false",
"true" if r.passes_frame_drop else "false",
"true" if r.passes else "false",
]
)
return out_path
def write_per_frame_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path:
"""One row per frame per config — detail for outlier investigation."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
["config_id", "frame_id", "t_capture_ms", "t_emit_at_sitl_ms", "latency_ms"]
)
for r in reports:
for s in r.samples:
writer.writerow(
[
r.config_id,
s.frame_id,
s.t_capture_ms,
s.t_emit_at_sitl_ms,
f"{s.latency_ms:.3f}",
]
)
return out_path
def write_partition_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path:
"""Per-stage partition table — AC-5 informational evidence."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
["config_id", "stage_name", "sample_count", "p50_ms", "p95_ms", "p99_ms"]
)
for r in reports:
for p in r.stage_partitions:
writer.writerow(
[
r.config_id,
p.stage_name,
p.sample_count,
"" if p.p50_ms is None else f"{p.p50_ms:.3f}",
"" if p.p95_ms is None else f"{p.p95_ms:.3f}",
"" if p.p99_ms is None else f"{p.p99_ms:.3f}",
]
)
return out_path