Files
Oleksandr Bezdieniezhnykh 73cd632e95 [AZ-428] [AZ-429] [AZ-430] [AZ-431] Add NFT-PERF-01..04 perf scenarios
Batch 85 — 4 Performance NFT scenarios + pure-logic evaluators.

- NFT-PERF-01 (AZ-428, Tier-2): two-config e2e latency p95 ≤ 400 ms
  (K=3@25°C, K=2 hybrid@50°C) + frame-drop ≤10% + informational per-stage
  partition recording (D-CROSS-LATENCY-1).
- NFT-PERF-02 (AZ-429): inter-emit p95 ≤ 350 ms + no ≥3 missed-emit
  windows. fc-adapter-aware SITL timestamp extraction (tlog vs MSP).
- NFT-PERF-03 (AZ-430, Tier-2): cold-start TTFF p95 ≤ 30 s AND max ≤ 45 s
  over N≥10 iterations.
- NFT-PERF-04 (AZ-431): spoof-promotion latency p95 ≤ 600 ms over N≥20
  randomized-start blackout+spoof events.

All scenarios consume external fixtures (AZ-595 dependency surfaced) and
fail loudly when fixtures are missing or empty. Public-boundary
discipline preserved — evaluators do NOT import src/gps_denied_onboard.

Tests: 60 new unit tests pass; 24 scenarios collect (4 tests × 2 fc × 3
vio). Code review: PASS_WITH_WARNINGS — 1 Medium (fixed in batch),
3 Low (production-dependency surfacings + future hygiene).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 16:46:49 +03:00

161 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NFT-PERF-02 — frame-by-frame streaming, no batching (AZ-429 / AC-4.4).
Replays the 5-minute Derkachi flight at the 3 Hz target cadence; reads
SITL-side receipt timestamps for accepted GPS_INPUT (ArduPilot
mavproxy tlog) / MSP2_SENSOR_GPS (iNav SITL MSP capture) messages;
asserts:
* AC-1: ``p95(inter_emit_interval) ≤ 350 ms`` (inter-frame × 1.05).
* AC-2: no window contains ≥3 consecutive missed emits.
Tier-1 OR Tier-2; both parametrizations run. The pure-logic AC-1/AC-2
evaluators are covered by
``e2e/_unit_tests/helpers/test_streaming_evaluator.py``.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from runner.helpers import streaming_evaluator as ste
DERKACHI_DIR = (
Path(__file__).resolve().parents[3]
/ "_docs"
/ "00_problem"
/ "input_data"
/ "flight_derkachi"
)
DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4"
# 5 min Derkachi replay at 3 Hz target. The window length feeds into the
# iNav MSP collector; the ArduPilot path reads the tlog regardless of
# `window_s` (the tlog encodes its own duration).
REPLAY_WINDOW_S = 300.0
INAV_MSP_PORT = 5760
ARDUPILOT_GPS_MSG_KIND = "GPS_INPUT"
@pytest.mark.scenario_id("nft-perf-02")
@pytest.mark.traces_to("AC-4.4,AC-1,AC-2,AC-3")
def test_nft_perf_02_streaming_inter_emit(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""NFT-PERF-02 AC-1 + AC-2 across `(fc_adapter, vio_strategy)`."""
if not sitl_replay_ready:
pytest.skip(
"NFT-PERF-02 requires `E2E_SITL_REPLAY_DIR` to point at a prepared "
"SITL replay fixture (AZ-595) carrying the 5 min Derkachi @ 3 Hz "
"replay. AC-1/AC-2 pure-logic covered by "
"e2e/_unit_tests/helpers/test_streaming_evaluator.py."
)
from runner.helpers import mavproxy_tlog_reader, msp_frame_observer, sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
from runner.helpers.replay_mode import NullFrameSink
# 1. Drive the 5 min replay (3 Hz target inside the fixture).
FrameSourceReplayer(NullFrameSink()).replay_video(DERKACHI_MP4)
# 2. Read SITL-side receipt timestamps for the FC-specific accepted GPS frame.
host = f"{fc_adapter}-sitl"
emit_times_ms = _read_emit_times_ms(
fc_adapter,
host,
sitl_observer=sitl_observer,
mavproxy_tlog_reader=mavproxy_tlog_reader,
)
if not emit_times_ms:
pytest.fail(
f"NFT-PERF-02: SITL ({host}) reported zero accepted GPS frames "
"during the 5 min Derkachi replay. The replay fixture exists but "
"the SUT emitted nothing — fail-loud rather than skip."
)
# 3. Evaluate AC-1 + AC-2.
report = ste.evaluate(emit_times_ms)
# 4. Emit per-interval + summary CSV evidence.
base = Path(evidence_dir) / "nft-perf-02" / f"{fc_adapter}-{vio_strategy}"
ste.write_csv_evidence(base.with_suffix(".csv"), report)
ste.write_intervals_csv(
base.with_name(base.name + "-intervals").with_suffix(".csv"),
emit_times_ms,
)
# 5. NFR metrics.
if report.inter_emit.p50_ms is not None:
nfr_recorder.record_metric(
"nft_perf_02.inter_emit_ms_p50", report.inter_emit.p50_ms
)
if report.inter_emit.p95_ms is not None:
nfr_recorder.record_metric(
"nft_perf_02.inter_emit_ms_p95",
report.inter_emit.p95_ms,
ac_id="AC-1",
)
if report.inter_emit.max_ms is not None:
nfr_recorder.record_metric(
"nft_perf_02.inter_emit_ms_max", report.inter_emit.max_ms
)
nfr_recorder.record_metric(
"nft_perf_02.longest_missed_run",
float(report.missed_emits.longest_run),
ac_id="AC-2",
)
# 6. AC assertions.
assert report.inter_emit.passes_p95, (
f"AC-1: p95(inter_emit) > {ste.STREAMING_P95_BUDGET_MS} ms "
f"(got {report.inter_emit.p95_ms} ms over "
f"{report.inter_emit.interval_count} intervals; "
f"max={report.inter_emit.max_ms} ms)"
)
assert report.missed_emits.passes, (
f"AC-2: longest missed-emit run = {report.missed_emits.longest_run} "
f">= limit {report.missed_emits.limit}; "
f"first window @ "
f"{report.missed_emits.windows[0].start_ms if report.missed_emits.windows else 'n/a'} ms"
)
def _read_emit_times_ms(
fc_adapter: str,
host: str,
*,
sitl_observer, # type: ignore[no-untyped-def]
mavproxy_tlog_reader, # type: ignore[no-untyped-def]
) -> list[float]:
"""Project SITL-side accepted-GPS receipt timestamps into a ms list.
* ArduPilot: filter mavproxy tlog for ``GPS_INPUT`` and project
``timestamp_us / 1000``.
* iNav: ``collect_inav_msp_frames`` then filter for
``MSP2_SENSOR_GPS`` (function id ``0x1F03``) and project
``monotonic_ms`` directly.
"""
if fc_adapter == "ardupilot":
tlog_path = sitl_observer.capture_ap_tlog(host=host, duration_s=REPLAY_WINDOW_S)
return [
float(msg.timestamp_us) / 1000.0
for msg in mavproxy_tlog_reader.iter_messages(tlog_path)
if msg.msg_type == ARDUPILOT_GPS_MSG_KIND
]
if fc_adapter == "inav":
capture = sitl_observer.collect_inav_msp_frames(
host=host, port=INAV_MSP_PORT, window_s=REPLAY_WINDOW_S
)
return [
float(f.monotonic_ms)
for f in capture.frames
if f.function_id == msp_frame_observer.MSP2_SENSOR_GPS_FUNCTION_ID
]
raise ValueError(f"unknown fc_adapter {fc_adapter!r}")