"""NFT-PERF-02 — frame-by-frame streaming, no batching (AZ-429 / AC-4.4). Replays the 5-minute Derkachi flight at the 3 Hz target cadence; reads SITL-side receipt timestamps for accepted GPS_INPUT (ArduPilot mavproxy tlog) / MSP2_SENSOR_GPS (iNav SITL MSP capture) messages; asserts: * AC-1: ``p95(inter_emit_interval) ≤ 350 ms`` (inter-frame × 1.05). * AC-2: no window contains ≥3 consecutive missed emits. Tier-1 OR Tier-2; both parametrizations run. The pure-logic AC-1/AC-2 evaluators are covered by ``e2e/_unit_tests/helpers/test_streaming_evaluator.py``. """ from __future__ import annotations from pathlib import Path import pytest from runner.helpers import streaming_evaluator as ste DERKACHI_DIR = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" ) DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" # 5 min Derkachi replay at 3 Hz target. The window length feeds into the # iNav MSP collector; the ArduPilot path reads the tlog regardless of # `window_s` (the tlog encodes its own duration). REPLAY_WINDOW_S = 300.0 INAV_MSP_PORT = 5760 ARDUPILOT_GPS_MSG_KIND = "GPS_INPUT" @pytest.mark.scenario_id("nft-perf-02") @pytest.mark.traces_to("AC-4.4,AC-1,AC-2,AC-3") def test_nft_perf_02_streaming_inter_emit( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """NFT-PERF-02 AC-1 + AC-2 across `(fc_adapter, vio_strategy)`.""" if not sitl_replay_ready: pytest.skip( "NFT-PERF-02 requires `E2E_SITL_REPLAY_DIR` to point at a prepared " "SITL replay fixture (AZ-595) carrying the 5 min Derkachi @ 3 Hz " "replay. AC-1/AC-2 pure-logic covered by " "e2e/_unit_tests/helpers/test_streaming_evaluator.py." ) from runner.helpers import mavproxy_tlog_reader, msp_frame_observer, sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer from runner.helpers.replay_mode import NullFrameSink # 1. Drive the 5 min replay (3 Hz target inside the fixture). FrameSourceReplayer(NullFrameSink()).replay_video(DERKACHI_MP4) # 2. Read SITL-side receipt timestamps for the FC-specific accepted GPS frame. host = f"{fc_adapter}-sitl" emit_times_ms = _read_emit_times_ms( fc_adapter, host, sitl_observer=sitl_observer, mavproxy_tlog_reader=mavproxy_tlog_reader, ) if not emit_times_ms: pytest.fail( f"NFT-PERF-02: SITL ({host}) reported zero accepted GPS frames " "during the 5 min Derkachi replay. The replay fixture exists but " "the SUT emitted nothing — fail-loud rather than skip." ) # 3. Evaluate AC-1 + AC-2. report = ste.evaluate(emit_times_ms) # 4. Emit per-interval + summary CSV evidence. base = Path(evidence_dir) / "nft-perf-02" / f"{fc_adapter}-{vio_strategy}" ste.write_csv_evidence(base.with_suffix(".csv"), report) ste.write_intervals_csv( base.with_name(base.name + "-intervals").with_suffix(".csv"), emit_times_ms, ) # 5. NFR metrics. if report.inter_emit.p50_ms is not None: nfr_recorder.record_metric( "nft_perf_02.inter_emit_ms_p50", report.inter_emit.p50_ms ) if report.inter_emit.p95_ms is not None: nfr_recorder.record_metric( "nft_perf_02.inter_emit_ms_p95", report.inter_emit.p95_ms, ac_id="AC-1", ) if report.inter_emit.max_ms is not None: nfr_recorder.record_metric( "nft_perf_02.inter_emit_ms_max", report.inter_emit.max_ms ) nfr_recorder.record_metric( "nft_perf_02.longest_missed_run", float(report.missed_emits.longest_run), ac_id="AC-2", ) # 6. AC assertions. assert report.inter_emit.passes_p95, ( f"AC-1: p95(inter_emit) > {ste.STREAMING_P95_BUDGET_MS} ms " f"(got {report.inter_emit.p95_ms} ms over " f"{report.inter_emit.interval_count} intervals; " f"max={report.inter_emit.max_ms} ms)" ) assert report.missed_emits.passes, ( f"AC-2: longest missed-emit run = {report.missed_emits.longest_run} " f">= limit {report.missed_emits.limit}; " f"first window @ " f"{report.missed_emits.windows[0].start_ms if report.missed_emits.windows else 'n/a'} ms" ) def _read_emit_times_ms( fc_adapter: str, host: str, *, sitl_observer, # type: ignore[no-untyped-def] mavproxy_tlog_reader, # type: ignore[no-untyped-def] ) -> list[float]: """Project SITL-side accepted-GPS receipt timestamps into a ms list. * ArduPilot: filter mavproxy tlog for ``GPS_INPUT`` and project ``timestamp_us / 1000``. * iNav: ``collect_inav_msp_frames`` then filter for ``MSP2_SENSOR_GPS`` (function id ``0x1F03``) and project ``monotonic_ms`` directly. """ if fc_adapter == "ardupilot": tlog_path = sitl_observer.capture_ap_tlog(host=host, duration_s=REPLAY_WINDOW_S) return [ float(msg.timestamp_us) / 1000.0 for msg in mavproxy_tlog_reader.iter_messages(tlog_path) if msg.msg_type == ARDUPILOT_GPS_MSG_KIND ] if fc_adapter == "inav": capture = sitl_observer.collect_inav_msp_frames( host=host, port=INAV_MSP_PORT, window_s=REPLAY_WINDOW_S ) return [ float(f.monotonic_ms) for f in capture.frames if f.function_id == msp_frame_observer.MSP2_SENSOR_GPS_FUNCTION_ID ] raise ValueError(f"unknown fc_adapter {fc_adapter!r}")