Files
gps-denied-onboard/e2e/tests/performance/test_nft_perf_03_ttff.py
T
Oleksandr Bezdieniezhnykh 73cd632e95 [AZ-428] [AZ-429] [AZ-430] [AZ-431] Add NFT-PERF-01..04 perf scenarios
Batch 85 — 4 Performance NFT scenarios + pure-logic evaluators.

- NFT-PERF-01 (AZ-428, Tier-2): two-config e2e latency p95 ≤ 400 ms
  (K=3@25°C, K=2 hybrid@50°C) + frame-drop ≤10% + informational per-stage
  partition recording (D-CROSS-LATENCY-1).
- NFT-PERF-02 (AZ-429): inter-emit p95 ≤ 350 ms + no ≥3 missed-emit
  windows. fc-adapter-aware SITL timestamp extraction (tlog vs MSP).
- NFT-PERF-03 (AZ-430, Tier-2): cold-start TTFF p95 ≤ 30 s AND max ≤ 45 s
  over N≥10 iterations.
- NFT-PERF-04 (AZ-431): spoof-promotion latency p95 ≤ 600 ms over N≥20
  randomized-start blackout+spoof events.

All scenarios consume external fixtures (AZ-595 dependency surfaced) and
fail loudly when fixtures are missing or empty. Public-boundary
discipline preserved — evaluators do NOT import src/gps_denied_onboard.

Tests: 60 new unit tests pass; 24 scenarios collect (4 tests × 2 fc × 3
vio). Code review: PASS_WITH_WARNINGS — 1 Medium (fixed in batch),
3 Low (production-dependency surfacings + future hygiene).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 16:46:49 +03:00

190 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NFT-PERF-03 — Cold-start Time-To-First-Fix (AZ-430 / AC-NEW-1).
Tier-2 ONLY. N≥10 cold-start iterations; each measures
``t_first_emission t_first_frame_arrival``; asserts:
* AC-3: ``p95(TTFF) ≤ 30 s``.
* AC-4: ``max(TTFF) ≤ 45 s``.
Per-iteration cleanup (fdr-output volume wipe + SITL cold-boot reload
+ SUT lifecycle restart) is owned by the Tier-2 Jetson harness
(AZ-444). The runner-side scenario here only consumes a fixture that
encodes the N captured ``(first_frame_arrival_ms, first_emission_ms)``
pairs.
Production dependency surfaced to AZ-595 / AZ-444: the
``E2E_NFT_PERF_03_TTFF_FIXTURE`` env var names a JSON file (absolute
path or relative to ``E2E_SITL_REPLAY_DIR``) with shape:
{
"iterations": [
{
"iteration_id": "iter-01",
"first_frame_arrival_ms": 1234,
"first_emission_ms": 16789
},
...
]
}
``first_emission_ms`` may be ``null`` for a timed-out iteration —
counted as ``missed_starts`` and treated as a budget breach.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import ttff_evaluator as te
TTFF_FIXTURE_ENV_VAR = "E2E_NFT_PERF_03_TTFF_FIXTURE"
TTFF_DEFAULT_FIXTURE_NAME = "nft_perf_03_ttff.json"
@pytest.mark.tier2_only
@pytest.mark.scenario_id("nft-perf-03")
@pytest.mark.traces_to("AC-NEW-1,AC-1,AC-2,AC-3,AC-4,AC-5")
def test_nft_perf_03_cold_start_ttff(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-3 + AC-4 + iteration-count gate across ``(fc_adapter, vio_strategy)``."""
if not sitl_replay_ready:
pytest.skip(
"NFT-PERF-03 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) containing N≥10 cold-start "
"iterations. Pure-logic AC-3/AC-4 covered by "
"e2e/_unit_tests/helpers/test_ttff_evaluator.py."
)
fixture_path = _resolve_ttff_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-PERF-03: TTFF fixture not found at {fixture_path}. "
f"`{TTFF_FIXTURE_ENV_VAR}` env var must point at a JSON file "
"carrying N≥10 cold-start iteration records (see scenario "
"docstring). Production dependency: AZ-595 + AZ-444."
)
iterations = _load_iterations(fixture_path)
if not iterations:
pytest.fail(
f"NFT-PERF-03: TTFF fixture {fixture_path} contains zero "
"iterations. AZ-430 requires N≥10."
)
report = te.evaluate(iterations)
base = Path(evidence_dir) / "nft-perf-03" / f"{fc_adapter}-{vio_strategy}"
te.write_csv_evidence(base.with_suffix(".csv"), report)
te.write_per_iteration_csv(
base.with_name(base.name + "-per-iter").with_suffix(".csv"),
report,
)
nfr_recorder.record_metric(
"nft_perf_03.iteration_count", float(report.iteration_count), ac_id="AC-3"
)
nfr_recorder.record_metric(
"nft_perf_03.missed_starts", float(report.missed_starts)
)
if report.p50_s is not None:
nfr_recorder.record_metric("nft_perf_03.ttff_s_p50", float(report.p50_s))
if report.p95_s is not None:
nfr_recorder.record_metric(
"nft_perf_03.ttff_s_p95", float(report.p95_s), ac_id="AC-3"
)
if report.max_s is not None:
nfr_recorder.record_metric(
"nft_perf_03.ttff_s_max", float(report.max_s), ac_id="AC-4"
)
assert report.passes_iteration_count, (
f"AC-1 (iteration count): collected only {report.iteration_count} "
f"iterations; require N ≥ {report.min_iteration_count}"
)
assert report.passes_p95, (
f"AC-3: p95(TTFF) = {report.p95_s} s > budget "
f"{report.p95_budget_s} s "
f"(missed_starts={report.missed_starts})"
)
assert report.passes_max, (
f"AC-4: max(TTFF) = {report.max_s} s > budget "
f"{report.max_budget_s} s "
f"(missed_starts={report.missed_starts})"
)
def _resolve_ttff_fixture_path() -> Path:
raw = os.environ.get(TTFF_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{TTFF_FIXTURE_ENV_VAR}-unset>")
return root / TTFF_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _load_iterations(fixture_path: Path) -> list[te.ColdStartIteration]:
payload = json.loads(fixture_path.read_text())
raw = payload.get("iterations") if isinstance(payload, dict) else None
if not isinstance(raw, list):
pytest.fail(
f"NFT-PERF-03: TTFF fixture {fixture_path} must be a JSON object "
f'with key "iterations" → list; got top-level '
f"type={type(payload).__name__}"
)
parsed: list[te.ColdStartIteration] = []
for idx, entry in enumerate(raw):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-PERF-03: iterations[{idx}] in {fixture_path} must be "
f"an object; got {type(entry).__name__}"
)
iter_id = str(entry.get("iteration_id") or f"iter-{idx:02d}")
try:
arrival = int(entry["first_frame_arrival_ms"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-PERF-03: iterations[{idx}].first_frame_arrival_ms "
f"in {fixture_path} must be an int ms timestamp: {exc}"
)
first_emission_raw = entry.get("first_emission_ms")
first_emission: int | None
if first_emission_raw is None:
first_emission = None
else:
try:
first_emission = int(first_emission_raw)
except (TypeError, ValueError) as exc:
pytest.fail(
f"NFT-PERF-03: iterations[{idx}].first_emission_ms "
f"in {fixture_path} must be int or null: {exc}"
)
try:
parsed.append(
te.measure_iteration(
iter_id,
first_frame_arrival_ms=arrival,
first_emission_ms=first_emission,
)
)
except ValueError as exc:
pytest.fail(
f"NFT-PERF-03: iterations[{idx}] in {fixture_path} rejected: {exc}"
)
return parsed