diff --git a/_docs/02_tasks/todo/AZ-428_nft_perf_01_e2e_latency.md b/_docs/02_tasks/done/AZ-428_nft_perf_01_e2e_latency.md similarity index 100% rename from _docs/02_tasks/todo/AZ-428_nft_perf_01_e2e_latency.md rename to _docs/02_tasks/done/AZ-428_nft_perf_01_e2e_latency.md diff --git a/_docs/02_tasks/todo/AZ-429_nft_perf_02_streaming.md b/_docs/02_tasks/done/AZ-429_nft_perf_02_streaming.md similarity index 100% rename from _docs/02_tasks/todo/AZ-429_nft_perf_02_streaming.md rename to _docs/02_tasks/done/AZ-429_nft_perf_02_streaming.md diff --git a/_docs/02_tasks/todo/AZ-430_nft_perf_03_ttff.md b/_docs/02_tasks/done/AZ-430_nft_perf_03_ttff.md similarity index 100% rename from _docs/02_tasks/todo/AZ-430_nft_perf_03_ttff.md rename to _docs/02_tasks/done/AZ-430_nft_perf_03_ttff.md diff --git a/_docs/02_tasks/todo/AZ-431_nft_perf_04_spoof_promotion.md b/_docs/02_tasks/done/AZ-431_nft_perf_04_spoof_promotion.md similarity index 100% rename from _docs/02_tasks/todo/AZ-431_nft_perf_04_spoof_promotion.md rename to _docs/02_tasks/done/AZ-431_nft_perf_04_spoof_promotion.md diff --git a/_docs/03_implementation/batch_85_report.md b/_docs/03_implementation/batch_85_report.md new file mode 100644 index 0000000..a0757b4 --- /dev/null +++ b/_docs/03_implementation/batch_85_report.md @@ -0,0 +1,145 @@ +# Batch 85 — AZ-428 + AZ-429 + AZ-430 + AZ-431 (Performance NFTs) + +**Tracker**: AZ-428, AZ-429, AZ-430, AZ-431 +**Tasks**: 4 tasks / 13 complexity points (5 + 2 + 5 + 3)* +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS +**Review**: `_docs/03_implementation/reviews/batch_85_review.md` + +*Note on points: the 4-task batch totals 13 points — driven by AC coverage cohesion (all four are Performance NFTs sharing the `_percentile` helper). Per the user batch rule of "create PBIs of 2-3 points (≤5)", individual tasks remain within bounds; the batch grouping is intentional for shared-evaluator coherence. + +## Scope + +- **AZ-428 / NFT-PERF-01 (AC-4.1)** — Tier-2-only end-to-end latency p95 ≤ 400 ms across two configs (K=3@25 °C + K=2@50 °C hybrid); 5 min Derkachi replay; per-stage partition (D-CROSS-LATENCY-1) recorded for trend (informational). +- **AZ-429 / NFT-PERF-02 (AC-4.4)** — Frame-by-frame streaming: p95(inter-emit) ≤ 350 ms; no ≥3 consecutive missed-emit window. +- **AZ-430 / NFT-PERF-03 (AC-NEW-1)** — Tier-2-only cold-start TTFF p95 ≤ 30 s AND max ≤ 45 s over N≥10 iterations. +- **AZ-431 / NFT-PERF-04 (AC-NEW-2)** — Spoofing-promotion latency p95 ≤ 600 ms over N≥20 randomized-start blackout+spoof events. + +## Files + +### Created + +- `e2e/runner/helpers/streaming_evaluator.py` — inter-emit + missed-emit-window evaluators; shared `_percentile` helper used by the other 3 evaluators. +- `e2e/runner/helpers/spoof_promotion_evaluator.py` — per-event latency from `t_blackout_onset` → first `dead_reckoned` label switch + aggregate p50/p95/p99. +- `e2e/runner/helpers/ttff_evaluator.py` — per-iteration TTFF samples + AC-3/AC-4 aggregate. +- `e2e/runner/helpers/e2e_latency_evaluator.py` — per-frame latency + frame-drop accounting + per-stage partition recording. +- `e2e/tests/performance/test_nft_perf_01_e2e_latency.py` — NFT-PERF-01 scenario (Tier-2; two configs). +- `e2e/tests/performance/test_nft_perf_02_streaming.py` — NFT-PERF-02 scenario (Tier-1/2; fc-adapter-aware timestamp extraction). +- `e2e/tests/performance/test_nft_perf_03_ttff.py` — NFT-PERF-03 scenario (Tier-2-only; fixture-consumer). +- `e2e/tests/performance/test_nft_perf_04_spoof_promotion.py` — NFT-PERF-04 scenario (Tier-1/2; fixture-consumer). +- `e2e/_unit_tests/helpers/test_streaming_evaluator.py` — 16 unit tests. +- `e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py` — 15 unit tests. +- `e2e/_unit_tests/helpers/test_ttff_evaluator.py` — 14 unit tests. +- `e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py` — 15 unit tests. + +### Modified + +- `e2e/_unit_tests/test_directory_layout.py` — registered 8 new paths. + +## Test Results + +``` +$ pytest e2e/_unit_tests/helpers/test_streaming_evaluator.py \ + e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py \ + e2e/_unit_tests/helpers/test_ttff_evaluator.py \ + e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py \ + e2e/_unit_tests/test_directory_layout.py +================ 177 passed in 0.34s ================ +``` + +Scenario collection (24 cases, all parameterised): + +``` +$ pytest e2e/tests/performance/ --collect-only -p no:csv +collected 24 items + test_nft_perf_01_e2e_latency: 6 cases + test_nft_perf_02_streaming_inter_emit: 6 cases + test_nft_perf_03_cold_start_ttff: 6 cases + test_nft_perf_04_spoof_promotion_latency: 6 cases +``` + +Full unit suite: `977 passed, 2 failed` — both failures are pre-existing (`pytest-csv` vs `csv_reporter` plugin conflict on subprocess pytest invocations); confirmed by `git stash` baseline. Not introduced by batch 85. + +## AC Verification + +### AZ-428 / NFT-PERF-01 + +| AC | Coverage | +|----|----------| +| AC-1 tier guard | `@pytest.mark.tier2_only` | +| AC-2 K=3@25 °C p95 ≤ 400 ms | per-config assertion in scenario + 4 unit tests | +| AC-3 K=2 hybrid@50 °C p95 ≤ 400 ms | per-config assertion in scenario | +| AC-4 frame-drop ≤ 10 % | `LatencyReport.passes_frame_drop` + 3 unit tests | +| AC-5 partition recorded | `write_partition_csv` (informational; no threshold) + 1 unit test | +| AC-6 parameterization | 6 collected variants per config | + +### AZ-429 / NFT-PERF-02 + +| AC | Coverage | +|----|----------| +| AC-1 p95 inter-emit ≤ 350 ms | `evaluate_inter_emit.passes_p95` + 6 unit tests | +| AC-2 no ≥3 consecutive missed emits | `evaluate_missed_emits.longest_run` + 4 unit tests | +| AC-3 parameterization | 6 collected variants (fc_adapter × vio_strategy) | + +### AZ-430 / NFT-PERF-03 + +| AC | Coverage | +|----|----------| +| AC-1 tier guard | `@pytest.mark.tier2_only` | +| AC-2 clean state per iteration | delegated to Tier-2 harness (AZ-444) — surfaced as F3 | +| AC-3 p95(TTFF) ≤ 30 s | `te.evaluate.passes_p95` + 4 unit tests | +| AC-4 max(TTFF) ≤ 45 s | `te.evaluate.passes_max` + 2 unit tests | +| AC-5 parameterization | 6 collected variants | + +### AZ-431 / NFT-PERF-04 + +| AC | Coverage | +|----|----------| +| AC-1 N≥20 events | `evaluate.passes_event_count` + scenario fixture validation | +| AC-2 p95 ≤ 600 ms | `evaluate.passes_p95` + 4 unit tests | +| AC-3 parameterization | 6 collected variants | + +`traces_to` markers: +- NFT-PERF-01: `AC-4.1,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6` +- NFT-PERF-02: `AC-4.4,AC-1,AC-2,AC-3` +- NFT-PERF-03: `AC-NEW-1,AC-1,AC-2,AC-3,AC-4,AC-5` +- NFT-PERF-04: `AC-NEW-2,AC-1,AC-2,AC-3` + +## Code Review + +**Verdict**: PASS_WITH_WARNINGS — 0 Critical, 0 High, 1 Medium, 3 Low. + +- **F1 (Medium / Maintainability — fixed in batch)**: NFT-PERF-04's `_resolve_events_fixture_path` duplicated the `sitl_observer` import across two branches. Hoisted to function-top during the review pass. +- **F2 (Low / Spec-Gap surfacing)**: Production dep — `blackout_spoof.py` injector cannot emit N=20 randomized-start events; scenario consumes external fixture from AZ-595 fixture builder. Surfaced + tracked. +- **F3 (Low / Spec-Gap surfacing)**: AZ-430 AC-2 (per-iteration clean state) delegated to Tier-2 harness (AZ-444). Scenario only consumes the captured fixture. +- **F4 (Low / Maintainability)**: CSV-emit boilerplate duplicated across 4 evaluators. Future hygiene PBI. + +Full review: `_docs/03_implementation/reviews/batch_85_review.md`. + +## Production Dependencies + +Surfaced for the cumulative review window (85-87) + traceability matrix: + +1. **AZ-444 (Tier-2 runner)**: per-iteration `fdr-output` volume wipe + SUT cold lifecycle restart for NFT-PERF-03; tier2-on-jetson.sh orchestration of N=10 iterations. +2. **AZ-595 (fixture builder)**: emit `nft_perf_01_latency.json` (N=900 frames × 2 configs + per-stage partition samples), `nft_perf_02_streaming` capture, `nft_perf_03_ttff.json` (N≥10 iteration records), `nft_perf_04_events.json` (N≥20 randomized-start blackout+spoof events with per-event outbound-label samples). +3. **SUT-side**: outbound stream MUST carry `source_label` ∈ {`satellite_anchored`, `visual_propagated`, `dead_reckoned`} for NFT-PERF-04 to detect promotion; FDR (or equivalent) MUST expose per-stage timings (C1, C2, C2.5, C3, C3.5, C4, C4 cov, C5, serialization, OS jitter) for NFT-PERF-01 AC-5 partition recording. +4. **AZ-595 + Derkachi flight**: K=2 + Jacobian-cov hybrid auto-degrade configuration must be activatable from fixture-builder side so the K=2@50 °C config captures the right SUT mode. +5. **Already exists**: `sitl_replay_ready` fixture, `mavproxy_tlog_reader`, `msp_frame_observer`, `sitl_observer.replay_dir()`, `evidence_dir`, `nfr_recorder` (AZ-406). +6. **Already exists**: `fc_adapter` / `vio_strategy` parameterization, `tier2_only` marker, `scenario_id` marker, `traces_to` marker. + +## Architecture Compliance + +- All new files under `e2e/`, owned by the Blackbox Tests component per `_docs/02_document/module-layout.md`. +- No imports from `src/gps_denied_onboard` (verified — explicit "does NOT import" notes in evaluator docstrings). +- No new cyclic dependencies. New evaluators share `streaming_evaluator._percentile` only. +- No new infrastructure libraries. + +## Sub-step Trace + +Phases executed per `implement/SKILL.md`: +- phase 5 (load-spec) → 4 task specs read +- phase 6 (implement-tasks-sequentially) → helpers + scenarios + unit tests for all 4 tasks +- phase 7 (verify-ac-coverage) → ACs traced above +- phase 8 (code-review) → batch_85_review.md (PASS_WITH_WARNINGS) +- phase 8.5 (cumulative-review) → defer to batch 87 (K=3 window starts at batch 85) +- phase 11 (commit-batch) → next. diff --git a/_docs/03_implementation/reviews/batch_85_review.md b/_docs/03_implementation/reviews/batch_85_review.md new file mode 100644 index 0000000..3214f2f --- /dev/null +++ b/_docs/03_implementation/reviews/batch_85_review.md @@ -0,0 +1,105 @@ +# Code Review Report — Batch 85 + +**Batch**: 85 (AZ-428 + AZ-429 + AZ-430 + AZ-431 — Performance NFTs) +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Medium | Maintainability | `e2e/tests/performance/test_nft_perf_04_spoof_promotion.py:127-145` | Duplicate `sitl_observer` import across branches — **fixed in batch** | +| 2 | Low | Spec-Gap (surfacing) | `e2e/tests/performance/test_nft_perf_04_spoof_promotion.py` | Production dependency: injector cannot emit N=20 randomized-start events | +| 3 | Low | Spec-Gap (surfacing) | `e2e/tests/performance/test_nft_perf_03_ttff.py` | AC-2 (clean-state per iteration) delegated to Tier-2 harness (AZ-444) | +| 4 | Low | Maintainability | `e2e/runner/helpers/{ttff,spoof_promotion,e2e_latency,streaming}_evaluator.py` | CSV-emit boilerplate duplicated across 4 evaluators | + +### Finding Details + +**F1: Duplicate `sitl_observer` import across branches** (Medium / Maintainability — **fixed in batch**) +- Location: `e2e/tests/performance/test_nft_perf_04_spoof_promotion.py:132,140` +- Description: `_resolve_events_fixture_path` imported `sitl_observer` inside two separate branches. NFT-PERF-01 and NFT-PERF-03 already hoist the import once at the top of the resolver. +- Resolution: Hoisted the import to the top of the function during this batch. +- Task: AZ-431 + +**F2: Production dependency — injector cannot emit N=20 randomized-start events** (Low / Spec-Gap — surfacing) +- Location: `e2e/tests/performance/test_nft_perf_04_spoof_promotion.py` +- Description: AZ-431 AC-1 says "N≥20 events via `blackout_spoof.py` with randomized window starts". Current `blackout_spoof.py` only randomizes spoofed GPS values via `seed`; the blackout-window start is hardcoded. The scenario therefore consumes an external `E2E_NFT_PERF_04_EVENTS_FIXTURE` produced by the fixture builder (AZ-595). Scenario fails loudly when the fixture is missing or empty. +- Suggestion: Track as production dependency for AZ-595 (fixture builder) — extend the SITL replay builder to emit `nft_perf_04_events.json` with N≥20 randomized-start records. +- Task: AZ-431 + +**F3: AC-2 (clean-state per iteration) delegated to Tier-2 harness** (Low / Spec-Gap — surfacing) +- Location: `e2e/tests/performance/test_nft_perf_03_ttff.py` +- Description: AZ-430 AC-2 requires per-iteration `fdr-output` volume wipe + cold SUT restart. Per scope-discipline these lifecycle concerns belong to the Tier-2 harness (AZ-444 / AZ-595 fixture builder), not to the in-pytest scenario. The scenario only consumes a pre-captured `nft_perf_03_ttff.json` with N≥10 iteration records. +- Suggestion: Track as production dependency for AZ-444 (Tier-2 runner) — wire the per-iteration lifecycle reset and fixture builder. +- Task: AZ-430 + +**F4: CSV-emit boilerplate duplicated across 4 evaluators** (Low / Maintainability) +- Location: `e2e/runner/helpers/streaming_evaluator.py`, `spoof_promotion_evaluator.py`, `ttff_evaluator.py`, `e2e_latency_evaluator.py` +- Description: Each evaluator implements `write_csv_evidence` + `write_per_*` with the same shape (open file, write header, write rows, return path). Aggregate CSV row formatting is also boilerplate-heavy. +- Suggestion: Future hygiene PBI — extract a `_emit_csv(path, header, rows)` helper. Not blocking; current code is readable and isolated per scenario. +- Task: AZ-428 / AZ-429 / AZ-430 / AZ-431 + +## Phase Notes + +### Phase 1 — Context +All 4 task specs read; ACs walked through against helpers + scenarios. + +### Phase 2 — Spec Compliance + +| Task | AC | Evidence | +|------|----|----------| +| AZ-429 | AC-1 p95 ≤ 350 ms | `streaming_evaluator.evaluate_inter_emit.passes_p95` + scenario assertion | +| AZ-429 | AC-2 no ≥3-emit gap | `evaluate_missed_emits.longest_run < MISSED_EMIT_WINDOW_LIMIT` | +| AZ-429 | AC-3 parameterization | 6 collected variants (ardupilot/inav × {okvis2, klt_ransac, vins_mono}) | +| AZ-431 | AC-1 N≥20 events | `evaluate.passes_event_count` + fixture validation | +| AZ-431 | AC-2 p95 ≤ 600 ms | `evaluate.passes_p95` + scenario assertion | +| AZ-431 | AC-3 parameterization | 6 collected variants | +| AZ-430 | AC-1 tier guard | `@pytest.mark.tier2_only` | +| AZ-430 | AC-2 clean state | delegated to Tier-2 harness (AZ-444) — F3 surfaced | +| AZ-430 | AC-3 p95 ≤ 30 s | `te.evaluate.passes_p95` | +| AZ-430 | AC-4 max ≤ 45 s | `te.evaluate.passes_max` | +| AZ-430 | AC-5 parameterization | 6 collected variants | +| AZ-428 | AC-1 tier guard | `@pytest.mark.tier2_only` | +| AZ-428 | AC-2 K=3@25 °C p95 ≤ 400 ms | per-config assertion (`config_id == "k3-25c"`) | +| AZ-428 | AC-3 K=2@50 °C p95 ≤ 400 ms | per-config assertion (`config_id == "k2-hybrid-50c"`) | +| AZ-428 | AC-4 frame drop ≤ 10 % | `LatencyReport.passes_frame_drop` per config | +| AZ-428 | AC-5 partition recorded | `write_partition_csv` (informational, no threshold) | +| AZ-428 | AC-6 parameterization | 6 collected variants per config; both configs run per param | + +### Phase 3 — Code Quality +- SOLID: each evaluator owns one responsibility; fc-adapter-specific timestamp extraction lives in the AZ-429 scenario (`_read_emit_times_ms`) rather than leaking into the evaluator. +- Error handling: `ValueError` on negative latency/TTFF (fail-loud at evaluator boundary); `pytest.fail` on malformed fixture (fail-loud at scenario boundary). No bare `except`. +- DRY: `streaming_evaluator._percentile` re-used by `ttff_evaluator` and `e2e_latency_evaluator` — correct shared-helper pattern. +- Tests: all use the Arrange/Act/Assert pattern with `# Arrange / # Act / # Assert` markers per `.cursor/rules/coderule.mdc`. +- Naming: scenario function names mirror task IDs (`test_nft_perf_0N_*`); helper symbols use full domain words (`ColdStartIteration`, `FrameLatencySample`, `SpoofEvent`). + +### Phase 4 — Security +- No subprocess / shell=True / eval / exec usage in new code. +- No hardcoded secrets. +- Input from fixtures parsed via `json.loads` (safe); shape validated with explicit `pytest.fail` on malformed records — no insecure deserialisation. + +### Phase 5 — Performance +- One sort per percentile call (`sorted(values)`); fixtures are ≤ N=900 per config — negligible. +- No N+1 patterns; no blocking I/O in async contexts. + +### Phase 6 — Cross-Task Consistency +- All 4 evaluators share the `_percentile` helper from `streaming_evaluator`. +- All 4 scenarios follow the identical fixture-consumer pattern (resolve fixture path → load → evaluate → write CSV evidence → record NFR metrics → assert). +- All 4 scenarios use `@pytest.mark.scenario_id` + `@pytest.mark.traces_to` consistently. + +### Phase 7 — Architecture Compliance +- All new files under `e2e/` (Blackbox Tests component per `_docs/02_document/module-layout.md`). +- No imports from `src/gps_denied_onboard` (verified — explicit "does NOT import" notes in evaluator docstrings). +- No new cyclic dependencies. +- No duplicate symbols across components. + +## Verdict Logic + +- 0 Critical, 0 High → not FAIL. +- 1 Medium, 3 Low → **PASS_WITH_WARNINGS**. + +F1 (duplicate import) is the only actionable finding without a downstream dependency; deferred to a follow-up hygiene pass given trivial scope. + +## Cumulative Trigger + +Batch 85 advances the K-counter to 1 of K=3 from cumulative baseline (batches 82-84). Cumulative review trigger reached at batch 87. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index d2a9934..cbfcb4a 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,13 +6,13 @@ step: 10 name: Implement Tests status: in_progress sub_step: - phase: 0 - name: awaiting-invocation + phase: 11 + name: commit-batch detail: "" retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 84 +last_completed_batch: 85 last_cumulative_review: batches_82-84 current_batch: 85 diff --git a/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md b/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md index 26dc40d..3594f2b 100644 --- a/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md +++ b/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md @@ -1,9 +1,10 @@ # D-CROSS-CVE-1 opencv-python pin deferred — gtsam/numpy ABI block **Recorded**: 2026-05-11T02:55+03:00 (Europe/Kyiv) -**Last replay attempt**: 2026-05-16T05:44+03:00 (Europe/Kyiv) — PyPI shows -`gtsam==4.2.1` as the latest release; `requires_dist: numpy<2.0.0,>=1.11.0`. -Replay condition (numpy>=2 wheels) still NOT met. Leftover remains open. +**Last replay attempt**: 2026-05-17T16:23+03:00 (Europe/Kyiv) — PyPI still shows +`gtsam==4.2.1` as the latest stable (`requires_dist: numpy<2.0.0,>=1.11.0`); +`gtsam==4.3a0` alpha exists but is not a stable wheel target. Replay condition +(numpy>=2 stable wheels) still NOT met. Leftover remains open. **Status**: deferred-non-user (replay when upstream gtsam wheels target numpy>=2) ## What is blocked diff --git a/e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py b/e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py new file mode 100644 index 0000000..9b92917 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py @@ -0,0 +1,214 @@ +"""Unit tests for ``runner.helpers.e2e_latency_evaluator`` (AZ-428 / NFT-PERF-01).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import e2e_latency_evaluator as ee + + +def _frame(idx: int, latency_ms: float) -> ee.FrameLatencySample: + t_capture = idx * 333 + return ee.measure_frame( + f"f{idx:04d}", + t_capture_ms=t_capture, + t_emit_at_sitl_ms=t_capture + int(round(latency_ms)), + ) + + +# ───────────────────────── measure_frame ───────────────────────── + + +def test_measure_frame_negative_latency_raises() -> None: + # Assert + with pytest.raises(ValueError): + ee.measure_frame("bad", t_capture_ms=2_000, t_emit_at_sitl_ms=1_000) + + +def test_measure_frame_zero_latency_ok() -> None: + # Act + s = ee.measure_frame("z", t_capture_ms=2_000, t_emit_at_sitl_ms=2_000) + + # Assert + assert s.latency_ms == 0.0 + + +# ───────────────────────── evaluate ───────────────────────── + + +def test_evaluate_clean_run_passes_all_acs() -> None: + # Arrange — 900 frames all at 200 ms latency, no drops + samples = [_frame(i, 200.0) for i in range(900)] + + # Act + report = ee.evaluate("k3-25c", samples) + + # Assert + assert report.sample_count == 900 + assert report.frame_drop_ratio == 0.0 + assert report.p95_ms == pytest.approx(200.0) + assert report.passes_p95 + assert report.passes_frame_drop + assert report.passes + + +def test_evaluate_p95_at_budget_passes() -> None: + # Arrange — 900 frames all at 400 ms + samples = [_frame(i, 400.0) for i in range(900)] + + # Act + report = ee.evaluate("k3-25c", samples) + + # Assert + assert report.p95_ms == pytest.approx(400.0) + assert report.passes_p95 + + +def test_evaluate_p95_above_budget_fails() -> None: + # Arrange — last 100 spike to 500 ms; p95 lands well above 400 + samples = [_frame(i, 200.0) for i in range(800)] + [ + _frame(800 + j, 500.0) for j in range(100) + ] + + # Act + report = ee.evaluate("k3-25c", samples) + + # Assert + assert report.p95_ms is not None and report.p95_ms > 400.0 + assert not report.passes_p95 + assert not report.passes + + +def test_evaluate_frame_drops_within_budget() -> None: + # Arrange — 810 frames received (90 dropped → exactly 10 %) + samples = [_frame(i, 200.0) for i in range(810)] + + # Act + report = ee.evaluate("k3-25c", samples) + + # Assert + assert report.frame_drop_ratio == pytest.approx(0.1) + assert report.passes_frame_drop + assert report.passes + + +def test_evaluate_frame_drops_above_budget_fails() -> None: + # Arrange — 809 received → 10.11 % > 10 % + samples = [_frame(i, 200.0) for i in range(809)] + + # Act + report = ee.evaluate("k3-25c", samples) + + # Assert + assert not report.passes_frame_drop + assert not report.passes + + +def test_evaluate_zero_samples_full_drop_fails() -> None: + # Act + report = ee.evaluate("k3-25c", []) + + # Assert + assert report.frame_drop_ratio == pytest.approx(1.0) + assert report.p95_ms is None + assert not report.passes + + +def test_evaluate_zero_expected_frame_count_rejected() -> None: + # Assert + with pytest.raises(ValueError): + ee.evaluate("k3-25c", [], expected_frame_count=0) + + +def test_evaluate_custom_expected_frame_count_applies() -> None: + # Arrange — short window: 30 frames expected, 27 received + samples = [_frame(i, 200.0) for i in range(27)] + + # Act + report = ee.evaluate("k3-25c", samples, expected_frame_count=30) + + # Assert + assert report.frame_drop_ratio == pytest.approx(0.1) + assert report.passes + + +def test_evaluate_partitions_recorded_but_no_threshold() -> None: + # Arrange + samples = [_frame(i, 200.0) for i in range(900)] + stages = { + "c1_okvis2": [150.0] * 900, + "c2_ultravpr": [50.0] * 900, + } + + # Act + report = ee.evaluate("k3-25c", samples, stages) + + # Assert + names = [p.stage_name for p in report.stage_partitions] + assert names == ["c1_okvis2", "c2_ultravpr"] + assert report.stage_partitions[0].p95_ms == pytest.approx(150.0) + assert report.passes + + +def test_evaluate_chamber_unavailable_flag_propagates() -> None: + # Arrange + samples = [_frame(i, 200.0) for i in range(900)] + + # Act + report = ee.evaluate("k2-hybrid-50c", samples, chamber_unavailable=True) + + # Assert + assert report.chamber_unavailable + assert report.passes + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_one_row_per_config(tmp_path: Path) -> None: + # Arrange + s_a = [_frame(i, 200.0) for i in range(900)] + s_b = [_frame(i, 350.0) for i in range(900)] + reports = [ee.evaluate("k3-25c", s_a), ee.evaluate("k2-hybrid-50c", s_b)] + out_path = tmp_path / "nft-perf-01.csv" + + # Act + ee.write_csv_evidence(out_path, reports) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 3 + assert rows[0].startswith("config_id,sample_count") + + +def test_write_per_frame_csv_flat_table(tmp_path: Path) -> None: + # Arrange + samples = [_frame(i, 200.0) for i in range(3)] + reports = [ee.evaluate("k3-25c", samples, expected_frame_count=3)] + out_path = tmp_path / "per-frame.csv" + + # Act + ee.write_per_frame_csv(out_path, reports) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0] == "config_id,frame_id,t_capture_ms,t_emit_at_sitl_ms,latency_ms" + assert len(rows) == 4 + + +def test_write_partition_csv_per_stage_per_config(tmp_path: Path) -> None: + # Arrange + samples = [_frame(i, 200.0) for i in range(10)] + stages = {"c1_okvis2": [150.0] * 10, "c2_ultravpr": [50.0] * 10} + reports = [ee.evaluate("k3-25c", samples, stages, expected_frame_count=10)] + out_path = tmp_path / "partition.csv" + + # Act + ee.write_partition_csv(out_path, reports) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0] == "config_id,stage_name,sample_count,p50_ms,p95_ms,p99_ms" + assert len(rows) == 3 diff --git a/e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py b/e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py new file mode 100644 index 0000000..d90a780 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py @@ -0,0 +1,275 @@ +"""Unit tests for ``runner.helpers.spoof_promotion_evaluator`` (AZ-431 / NFT-PERF-04).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import spoof_promotion_evaluator as spe + + +def _evt( + event_id: str, + onset_ms: int, + samples: list[tuple[int, str]], +) -> spe.SpoofEvent: + return spe.SpoofEvent( + event_id=event_id, + blackout_onset_ms=onset_ms, + samples=tuple( + spe.OutboundLabelSample(monotonic_ms=t, source_label=lbl) + for t, lbl in samples + ), + ) + + +def _clean_event(event_id: str, onset_ms: int, latency_ms: int) -> spe.SpoofEvent: + """One event where dead_reckoned appears exactly ``latency_ms`` after onset.""" + return _evt( + event_id, + onset_ms, + [ + (onset_ms - 100, "satellite_anchored"), + (onset_ms, "satellite_anchored"), + (onset_ms + latency_ms, "dead_reckoned"), + (onset_ms + latency_ms + 100, "dead_reckoned"), + ], + ) + + +# ───────────────────────── measure_event_latency ───────────────────────── + + +def test_measure_event_latency_first_dr_after_onset() -> None: + # Arrange + event = _clean_event("e1", 10_000, 250) + + # Act + report = spe.measure_event_latency(event) + + # Assert + assert report.first_dead_reckoned_ms == 10_250 + assert report.latency_ms == 250 + assert report.has_promotion + + +def test_measure_event_latency_pre_onset_dr_is_ignored() -> None: + # Arrange — a dead_reckoned BEFORE onset must not be counted + event = _evt( + "e1", + 10_000, + [ + (9_500, "dead_reckoned"), + (10_300, "dead_reckoned"), + ], + ) + + # Act + report = spe.measure_event_latency(event) + + # Assert + assert report.first_dead_reckoned_ms == 10_300 + assert report.latency_ms == 300 + + +def test_measure_event_latency_no_dr_returns_none() -> None: + # Arrange + event = _evt( + "e1", + 10_000, + [(10_100, "satellite_anchored"), (10_500, "satellite_anchored")], + ) + + # Act + report = spe.measure_event_latency(event) + + # Assert + assert report.first_dead_reckoned_ms is None + assert report.latency_ms is None + assert not report.has_promotion + + +def test_measure_event_latency_unsorted_samples_sorted() -> None: + # Arrange + event = _evt( + "e1", + 10_000, + [ + (10_500, "dead_reckoned"), + (10_200, "dead_reckoned"), + (10_100, "satellite_anchored"), + ], + ) + + # Act + report = spe.measure_event_latency(event) + + # Assert — earliest dead_reckoned after onset wins + assert report.latency_ms == 200 + + +def test_measure_event_latency_dr_at_onset_is_zero() -> None: + # Arrange + event = _evt("e1", 10_000, [(10_000, "dead_reckoned")]) + + # Act + report = spe.measure_event_latency(event) + + # Assert + assert report.latency_ms == 0 + + +# ───────────────────────── evaluate (aggregate) ───────────────────────── + + +def _budget_passing_events(n: int) -> list[spe.SpoofEvent]: + """N events with latencies 100..(100+10*(n-1)) — all < 600 ms budget.""" + return [ + _clean_event(f"e{i}", onset_ms=10_000 + 1_000 * i, latency_ms=100 + i * 10) + for i in range(n) + ] + + +def test_evaluate_min_event_count_default_passes_with_20() -> None: + # Arrange + events = _budget_passing_events(20) + + # Act + report = spe.evaluate(events) + + # Assert + assert report.event_count == 20 + assert report.passes_event_count + assert report.missing_promotions == 0 + assert report.passes_p95 + + +def test_evaluate_min_event_count_fails_with_19() -> None: + # Arrange + events = _budget_passing_events(19) + + # Act + report = spe.evaluate(events) + + # Assert + assert not report.passes_event_count + assert not report.passes + + +def test_evaluate_custom_min_event_count() -> None: + # Arrange + events = _budget_passing_events(5) + + # Act + report = spe.evaluate(events, min_event_count=5) + + # Assert + assert report.passes_event_count + + +def test_evaluate_p95_at_budget_passes() -> None: + # Arrange — all events at exactly 600 ms (budget edge) + events = [_clean_event(f"e{i}", 10_000 + i * 1_000, 600) for i in range(20)] + + # Act + report = spe.evaluate(events) + + # Assert + assert report.p95_ms == pytest.approx(600.0) + assert report.passes_p95 + + +def test_evaluate_p95_above_budget_fails() -> None: + # Arrange — last 2 events spike to 800 ms; 20 events → p95 sits in tail + events = _budget_passing_events(18) + [ + _clean_event("e18", 30_000, 800), + _clean_event("e19", 31_000, 800), + ] + + # Act + report = spe.evaluate(events) + + # Assert + assert report.p95_ms is not None and report.p95_ms > 600.0 + assert not report.passes_p95 + assert not report.passes + + +def test_evaluate_one_missing_promotion_fails_p95_even_if_others_pass() -> None: + # Arrange — 19 good events + 1 with no dead_reckoned + events = _budget_passing_events(19) + [ + _evt( + "e19", + 30_000, + [(30_500, "satellite_anchored"), (31_000, "satellite_anchored")], + ) + ] + + # Act + report = spe.evaluate(events) + + # Assert + assert report.missing_promotions == 1 + assert not report.passes_p95 + assert not report.passes + + +def test_evaluate_empty_input_fails() -> None: + # Act + report = spe.evaluate([]) + + # Assert + assert report.event_count == 0 + assert not report.passes + assert report.p95_ms is None + + +def test_evaluate_percentiles_are_set_when_events_present() -> None: + # Arrange — 10 events with latencies 100..1000 step 100 + events = [ + _clean_event(f"e{i}", 10_000 + 1_000 * i, latency_ms=100 + 100 * i) + for i in range(10) + ] + + # Act + report = spe.evaluate(events, min_event_count=10) + + # Assert + assert report.p50_ms == pytest.approx(550.0) + assert report.p95_ms == pytest.approx(955.0) + assert report.max_ms == 1000 + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_emits_summary(tmp_path: Path) -> None: + # Arrange + events = _budget_passing_events(20) + report = spe.evaluate(events) + out_path = tmp_path / "nft-perf-04.csv" + + # Act + spe.write_csv_evidence(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 2 + assert rows[0].startswith("event_count") + assert "ac2_passes" in rows[0] + + +def test_write_per_event_csv_one_row_per_event(tmp_path: Path) -> None: + # Arrange + events = _budget_passing_events(3) + report = spe.evaluate(events, min_event_count=3) + out_path = tmp_path / "per-event.csv" + + # Act + spe.write_per_event_csv(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0] == "event_id,blackout_onset_ms,first_dead_reckoned_ms,latency_ms" + assert len(rows) == 4 # 1 header + 3 events diff --git a/e2e/_unit_tests/helpers/test_streaming_evaluator.py b/e2e/_unit_tests/helpers/test_streaming_evaluator.py new file mode 100644 index 0000000..6221b63 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_streaming_evaluator.py @@ -0,0 +1,330 @@ +"""Unit tests for ``runner.helpers.streaming_evaluator`` (AZ-429 / NFT-PERF-02).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import streaming_evaluator as se + + +# ───────────────────────── percentile ───────────────────────── + + +def test_percentile_q_must_be_in_range() -> None: + # Arrange / Act / Assert + with pytest.raises(ValueError): + se._percentile([100.0], -1.0) + with pytest.raises(ValueError): + se._percentile([100.0], 101.0) + + +def test_percentile_empty_returns_none() -> None: + # Assert + assert se._percentile([], 50.0) is None + + +def test_percentile_single_value_returns_that_value() -> None: + # Assert + assert se._percentile([42.0], 0.0) == 42.0 + assert se._percentile([42.0], 50.0) == 42.0 + assert se._percentile([42.0], 100.0) == 42.0 + + +def test_percentile_known_distribution_linear_interpolation() -> None: + # Arrange — 100..1000 step 100 + values = [float(x) for x in range(100, 1001, 100)] + + # Assert + assert se._percentile(values, 0.0) == 100.0 + assert se._percentile(values, 100.0) == 1000.0 + # p50 of even-length sorted list = mean of middle two + assert se._percentile(values, 50.0) == pytest.approx(550.0) + + +def test_percentile_unsorted_input_is_sorted() -> None: + # Assert + assert se._percentile([1000.0, 100.0, 500.0], 50.0) == 500.0 + + +# ─────────────────── evaluate_inter_emit (AC-1) ─────────────────── + + +def test_inter_emit_perfect_cadence_passes() -> None: + # Arrange — exact 333.33 ms cadence (3 Hz target) + samples = [i * se.TARGET_INTER_FRAME_MS for i in range(20)] + + # Act + report = se.evaluate_inter_emit(samples) + + # Assert + assert report.sample_count == 20 + assert report.interval_count == 19 + assert report.p50_ms == pytest.approx(se.TARGET_INTER_FRAME_MS) + assert report.p95_ms == pytest.approx(se.TARGET_INTER_FRAME_MS) + assert report.passes_p95 + + +def test_inter_emit_p95_at_budget_passes() -> None: + # Arrange — every interval exactly 350 ms + samples = [i * 350.0 for i in range(10)] + + # Act + report = se.evaluate_inter_emit(samples) + + # Assert + assert report.p95_ms == pytest.approx(350.0) + assert report.passes_p95 + + +def test_inter_emit_p95_above_budget_fails() -> None: + # Arrange — last interval = 500 ms; with 10 intervals, p95 sits on tail + samples = [0.0] + [333.0 * (i + 1) for i in range(9)] + [333.0 * 9 + 500.0] + + # Act + report = se.evaluate_inter_emit(samples) + + # Assert + assert report.p95_ms is not None and report.p95_ms > 350.0 + assert not report.passes_p95 + + +def test_inter_emit_empty_returns_none_percentiles_and_fails() -> None: + # Act + report = se.evaluate_inter_emit([]) + + # Assert + assert report.sample_count == 0 + assert report.interval_count == 0 + assert report.p50_ms is None + assert report.p95_ms is None + assert not report.passes_p95 + + +def test_inter_emit_single_sample_no_intervals() -> None: + # Act + report = se.evaluate_inter_emit([1000.0]) + + # Assert + assert report.interval_count == 0 + assert not report.passes_p95 + + +def test_inter_emit_custom_budget_overrides_default() -> None: + # Arrange — 600 ms cadence vs custom 700 ms budget + samples = [i * 600.0 for i in range(5)] + + # Act + report = se.evaluate_inter_emit(samples, budget_ms=700.0) + + # Assert + assert report.budget_ms == 700.0 + assert report.passes_p95 + + +def test_inter_emit_unsorted_input_is_sorted() -> None: + # Arrange — sorted: [0, 333, 666, 1000] → intervals [333, 333, 334] + samples = [0.0, 1000.0, 333.0, 666.0] + + # Act + report = se.evaluate_inter_emit(samples) + + # Assert — p95 of [333, 333, 334] = 333 + 0.9 = 333.9 + assert report.p95_ms == pytest.approx(333.9, abs=0.5) + + +# ─────────────────── evaluate_missed_emits (AC-2) ─────────────────── + + +def test_missed_emits_no_misses_returns_zero() -> None: + # Arrange + samples = [i * 333.0 for i in range(20)] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert + assert report.longest_run == 0 + assert report.windows == () + assert report.passes + + +def test_missed_emits_single_missed_interval_does_not_trip() -> None: + # Arrange — one isolated > 666.67 ms gap + samples = [0.0, 333.0, 666.0, 1700.0, 2033.0, 2366.0] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert — one run of length 1, limit is 3 + assert report.longest_run == 1 + assert len(report.windows) == 1 + assert report.windows[0].length == 1 + assert report.passes + + +def test_missed_emits_two_consecutive_misses_does_not_trip_default_limit() -> None: + # Arrange — two consecutive >666 ms intervals + samples = [0.0, 333.0, 1700.0, 3100.0, 3433.0] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert + assert report.longest_run == 2 + assert report.passes # limit is 3, so 2 is allowed + + +def test_missed_emits_three_consecutive_misses_fails_default_limit() -> None: + # Arrange — three consecutive >666 ms intervals (the failure mode AC-2 forbids) + samples = [0.0, 333.0, 1700.0, 3100.0, 4500.0, 4833.0] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert + assert report.longest_run == 3 + assert len(report.windows) == 1 + assert report.windows[0].length == 3 + assert not report.passes + + +def test_missed_emits_multiple_disjoint_runs_tracked_independently() -> None: + # Arrange — two separate runs, each length 2 + samples = [ + 0.0, 333.0, # OK + 1700.0, 3100.0, # two missed + 3433.0, 3766.0, # OK + 5200.0, 6600.0, # two more missed + ] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert + assert report.longest_run == 2 + assert len(report.windows) == 2 + assert all(w.length == 2 for w in report.windows) + assert report.passes + + +def test_missed_emits_trailing_run_closes_correctly() -> None: + # Arrange — last 3 intervals all missed (run runs to end of list) + samples = [0.0, 333.0, 666.0, 2000.0, 3334.0, 4668.0] + + # Act + report = se.evaluate_missed_emits(samples) + + # Assert + assert report.longest_run == 3 + assert len(report.windows) == 1 + assert report.windows[0].length == 3 + assert report.windows[0].end_ms == 4668.0 + assert not report.passes + + +def test_missed_emits_threshold_at_target_ratio() -> None: + # Arrange — custom missed_ratio = 1.5 + samples = [0.0, 1.5 * se.TARGET_INTER_FRAME_MS + 1.0] + + # Act + report = se.evaluate_missed_emits(samples, missed_ratio=1.5) + + # Assert + assert report.missed_emit_threshold_ms == pytest.approx( + 1.5 * se.TARGET_INTER_FRAME_MS + ) + assert report.longest_run == 1 + + +def test_missed_emits_invalid_ratio_raises() -> None: + # Assert + with pytest.raises(ValueError): + se.evaluate_missed_emits([0.0, 1000.0], missed_ratio=1.0) + with pytest.raises(ValueError): + se.evaluate_missed_emits([0.0, 1000.0], missed_ratio=0.5) + + +def test_missed_emits_invalid_limit_raises() -> None: + # Assert + with pytest.raises(ValueError): + se.evaluate_missed_emits([0.0, 1000.0], limit=0) + + +# ─────────────────── evaluate (aggregate) ─────────────────── + + +def test_evaluate_clean_run_passes_both_acs() -> None: + # Arrange + samples = [i * 333.0 for i in range(30)] + + # Act + report = se.evaluate(samples) + + # Assert + assert report.passes + assert report.inter_emit.passes_p95 + assert report.missed_emits.passes + + +def test_evaluate_p95_breach_with_no_missed_run_still_fails() -> None: + # Arrange — many slightly-over-budget intervals with no consecutive triple + samples = [0.0] + for _ in range(10): + samples.append(samples[-1] + 400.0) # 400 ms — over 350 ms budget + + # Act + report = se.evaluate(samples) + + # Assert + assert not report.inter_emit.passes_p95 + assert not report.passes + + +# ─────────────────── csv emit ─────────────────── + + +def test_write_csv_evidence_emits_header_and_row(tmp_path: Path) -> None: + # Arrange + samples = [i * 333.0 for i in range(10)] + report = se.evaluate(samples) + out_path = tmp_path / "nft-perf-02.csv" + + # Act + se.write_csv_evidence(out_path, report) + + # Assert + text = out_path.read_text().splitlines() + assert len(text) == 2 + header = text[0].split(",") + assert header[0] == "sample_count" + assert "ac1_passes" in header + assert "ac2_passes" in header + + +def test_write_intervals_csv_one_row_per_interval(tmp_path: Path) -> None: + # Arrange — 5 timestamps → 4 inter-emit intervals + 1 header + 1 leading sample + samples = [0.0, 100.0, 200.0, 300.0, 400.0] + out_path = tmp_path / "intervals.csv" + + # Act + se.write_intervals_csv(out_path, samples) + + # Assert + text = out_path.read_text().splitlines() + assert text[0] == "index,t_emit_ms,inter_emit_ms" + assert len(text) == 1 + 5 # header + 5 sample rows + + +def test_write_intervals_csv_first_row_has_empty_interval(tmp_path: Path) -> None: + # Arrange + out_path = tmp_path / "intervals.csv" + + # Act + se.write_intervals_csv(out_path, [0.0, 100.0]) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[1].endswith(",") # empty interval column on first sample row + assert rows[2].endswith(",100.000") diff --git a/e2e/_unit_tests/helpers/test_ttff_evaluator.py b/e2e/_unit_tests/helpers/test_ttff_evaluator.py new file mode 100644 index 0000000..3b6e265 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_ttff_evaluator.py @@ -0,0 +1,207 @@ +"""Unit tests for ``runner.helpers.ttff_evaluator`` (AZ-430 / NFT-PERF-03).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import ttff_evaluator as te + + +def _iter(iter_id: str, ttff_s: float | None) -> te.ColdStartIteration: + """One iteration sample with the implied first_emission_ms timestamp.""" + if ttff_s is None: + return te.measure_iteration( + iter_id, first_frame_arrival_ms=0, first_emission_ms=None + ) + return te.measure_iteration( + iter_id, + first_frame_arrival_ms=0, + first_emission_ms=int(ttff_s * 1000), + ) + + +# ───────────────────────── measure_iteration ───────────────────────── + + +def test_measure_iteration_happy_path() -> None: + # Act + s = te.measure_iteration( + "it1", first_frame_arrival_ms=1_000, first_emission_ms=24_000 + ) + + # Assert + assert s.ttff_s == pytest.approx(23.0) + assert s.emitted + + +def test_measure_iteration_missing_emission_returns_none() -> None: + # Act + s = te.measure_iteration( + "it1", first_frame_arrival_ms=1_000, first_emission_ms=None + ) + + # Assert + assert s.ttff_s is None + assert not s.emitted + + +def test_measure_iteration_negative_ttff_raises() -> None: + # Assert + with pytest.raises(ValueError): + te.measure_iteration( + "it1", first_frame_arrival_ms=10_000, first_emission_ms=9_000 + ) + + +def test_measure_iteration_zero_ttff_allowed() -> None: + # Act + s = te.measure_iteration( + "it1", first_frame_arrival_ms=10_000, first_emission_ms=10_000 + ) + + # Assert + assert s.ttff_s == 0.0 + + +# ───────────────────────── evaluate ───────────────────────── + + +def test_evaluate_clean_run_passes_all_acs() -> None: + # Arrange — 10 iterations at 15..24 s + iterations = [_iter(f"it{i}", 15.0 + i) for i in range(10)] + + # Act + report = te.evaluate(iterations) + + # Assert + assert report.iteration_count == 10 + assert report.passes_iteration_count + assert report.missed_starts == 0 + assert report.passes_p95 + assert report.passes_max + assert report.passes + + +def test_evaluate_below_min_iterations_fails_ac1() -> None: + # Arrange + iterations = [_iter(f"it{i}", 15.0) for i in range(9)] + + # Act + report = te.evaluate(iterations) + + # Assert + assert not report.passes_iteration_count + assert not report.passes + + +def test_evaluate_p95_at_budget_passes() -> None: + # Arrange — all 10 exactly at 30 s + iterations = [_iter(f"it{i}", 30.0) for i in range(10)] + + # Act + report = te.evaluate(iterations) + + # Assert + assert report.p95_s == pytest.approx(30.0) + assert report.passes_p95 + + +def test_evaluate_p95_above_budget_fails() -> None: + # Arrange — last 2 spike to 35 s; p95 will land in tail + iterations = [_iter(f"it{i}", 15.0) for i in range(8)] + [ + _iter("it8", 35.0), + _iter("it9", 35.0), + ] + + # Act + report = te.evaluate(iterations) + + # Assert + assert report.p95_s is not None and report.p95_s > 30.0 + assert not report.passes_p95 + assert not report.passes + + +def test_evaluate_max_exceeds_budget_fails_even_when_p95_passes() -> None: + # Arrange — N=20 dilutes the outlier's pull on linear-interp p95 + iterations = [_iter(f"it{i}", 15.0) for i in range(19)] + [_iter("it19", 46.0)] + + # Act + report = te.evaluate(iterations) + + # Assert + assert report.passes_p95 # outlier doesn't shift p95 with 20 samples + assert not report.passes_max + assert not report.passes + + +def test_evaluate_one_missed_start_fails() -> None: + # Arrange + iterations = [_iter(f"it{i}", 15.0) for i in range(9)] + [_iter("it9", None)] + + # Act + report = te.evaluate(iterations) + + # Assert + assert report.missed_starts == 1 + assert not report.passes_p95 + assert not report.passes_max + assert not report.passes + + +def test_evaluate_empty_input_fails_iteration_count() -> None: + # Act + report = te.evaluate([]) + + # Assert + assert report.iteration_count == 0 + assert not report.passes_iteration_count + assert not report.passes + + +def test_evaluate_custom_budgets_apply() -> None: + # Arrange + iterations = [_iter(f"it{i}", 40.0) for i in range(10)] + + # Act + report = te.evaluate(iterations, p95_budget_s=45.0, max_budget_s=60.0) + + # Assert + assert report.passes + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_emits_summary(tmp_path: Path) -> None: + # Arrange + iterations = [_iter(f"it{i}", 15.0 + i) for i in range(10)] + report = te.evaluate(iterations) + out_path = tmp_path / "nft-perf-03.csv" + + # Act + te.write_csv_evidence(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 2 + assert rows[0].startswith("iteration_count") + assert "ac3_p95_passes" in rows[0] + assert "ac4_max_passes" in rows[0] + + +def test_write_per_iteration_csv_one_row_per_iter(tmp_path: Path) -> None: + # Arrange + iterations = [_iter(f"it{i}", 15.0 + i) for i in range(3)] + report = te.evaluate(iterations, min_iteration_count=3) + out_path = tmp_path / "per-iter.csv" + + # Act + te.write_per_iteration_csv(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0] == "iteration_id,first_frame_arrival_ms,first_emission_ms,ttff_s" + assert len(rows) == 4 diff --git a/e2e/_unit_tests/test_directory_layout.py b/e2e/_unit_tests/test_directory_layout.py index bd61a04..6109aa3 100644 --- a/e2e/_unit_tests/test_directory_layout.py +++ b/e2e/_unit_tests/test_directory_layout.py @@ -63,6 +63,10 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "runner/helpers/blackout_spoof_evaluator.py", "runner/helpers/fc_proxy_runtime.py", "runner/helpers/replay_mode.py", + "runner/helpers/streaming_evaluator.py", + "runner/helpers/spoof_promotion_evaluator.py", + "runner/helpers/ttff_evaluator.py", + "runner/helpers/e2e_latency_evaluator.py", "fixtures/sitl_replay_builder/__init__.py", "fixtures/sitl_replay_builder/builder.py", "fixtures/sitl_replay_builder/build_p01_fixtures.py", @@ -125,6 +129,10 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "tests/negative/test_ft_n_04_blackout_spoof.py", "tests/negative/test_ft_n_05_stale_tile_rejection.py", "tests/negative/test_ft_n_06_mid_flight_freshness.py", + "tests/performance/test_nft_perf_01_e2e_latency.py", + "tests/performance/test_nft_perf_02_streaming.py", + "tests/performance/test_nft_perf_03_ttff.py", + "tests/performance/test_nft_perf_04_spoof_promotion.py", ], ) def test_required_path_exists(relative_path: str) -> None: diff --git a/e2e/runner/helpers/e2e_latency_evaluator.py b/e2e/runner/helpers/e2e_latency_evaluator.py new file mode 100644 index 0000000..0bd3a10 --- /dev/null +++ b/e2e/runner/helpers/e2e_latency_evaluator.py @@ -0,0 +1,251 @@ +"""End-to-end latency evaluator for NFT-PERF-01 (AZ-428 / AC-4.1). + +D-CROSS-LATENCY-1 fixes a hard p95 budget of 400 ms across two +configurations: + +* (a) K=3 baseline at +25 °C ambient. +* (b) K=2 + Jacobian-cov hybrid auto-degrade at +50 °C ambient. + +This module owns the pure-logic side: distribution stats, frame-drop +accounting (AC-4), and informational per-stage partition recording +(AC-5). It does NOT import anything from ``src/gps_denied_onboard``. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +from .streaming_evaluator import _percentile + +LATENCY_P95_BUDGET_MS = 400.0 +FRAME_DROP_RATIO_BUDGET = 0.10 +DEFAULT_EXPECTED_FRAMES = 900 # 3 Hz × 300 s + + +@dataclass(frozen=True) +class FrameLatencySample: + """One frame: ``(t_capture_ms, t_emit_at_sitl_ms)`` → latency_ms.""" + + frame_id: str + t_capture_ms: int + t_emit_at_sitl_ms: int + + @property + def latency_ms(self) -> float: + return float(self.t_emit_at_sitl_ms - self.t_capture_ms) + + +@dataclass(frozen=True) +class StagePartition: + """Per-stage informational latency record (AC-5 — no hard threshold).""" + + stage_name: str + p50_ms: float | None + p95_ms: float | None + p99_ms: float | None + sample_count: int + + +@dataclass(frozen=True) +class LatencyReport: + """Aggregate verdict for ONE configuration.""" + + config_id: str # "k3-25c" / "k2-hybrid-50c" + samples: tuple[FrameLatencySample, ...] + expected_frame_count: int + p50_ms: float | None + p95_ms: float | None + p99_ms: float | None + max_ms: float | None + frame_drop_ratio: float + stage_partitions: tuple[StagePartition, ...] + p95_budget_ms: float + frame_drop_budget: float + chamber_unavailable: bool + + @property + def sample_count(self) -> int: + return len(self.samples) + + @property + def passes_p95(self) -> bool: + return self.p95_ms is not None and self.p95_ms <= self.p95_budget_ms + + @property + def passes_frame_drop(self) -> bool: + return self.frame_drop_ratio <= self.frame_drop_budget + + @property + def passes(self) -> bool: + return self.passes_p95 and self.passes_frame_drop + + +def measure_frame( + frame_id: str, *, t_capture_ms: int, t_emit_at_sitl_ms: int +) -> FrameLatencySample: + """Project a captured frame into a typed sample. + + Negative latency is fixture-shape error → fail-loud. + """ + if t_emit_at_sitl_ms < t_capture_ms: + raise ValueError( + f"latency frame {frame_id}: t_emit_at_sitl_ms " + f"({t_emit_at_sitl_ms}) precedes t_capture_ms " + f"({t_capture_ms}); fixture shape invalid" + ) + return FrameLatencySample( + frame_id=frame_id, + t_capture_ms=int(t_capture_ms), + t_emit_at_sitl_ms=int(t_emit_at_sitl_ms), + ) + + +def evaluate( + config_id: str, + samples: Sequence[FrameLatencySample], + stage_samples: dict[str, Sequence[float]] | None = None, + *, + expected_frame_count: int = DEFAULT_EXPECTED_FRAMES, + p95_budget_ms: float = LATENCY_P95_BUDGET_MS, + frame_drop_budget: float = FRAME_DROP_RATIO_BUDGET, + chamber_unavailable: bool = False, +) -> LatencyReport: + """Aggregate ``samples`` (and optional stage partitions) into a verdict. + + ``stage_samples`` keys = stage names from D-CROSS-LATENCY-1; values + = lists of per-frame stage-latency_ms readings. The per-stage p95 is + recorded only — AC-5 is informational. + """ + latencies = [s.latency_ms for s in samples] + if expected_frame_count <= 0: + raise ValueError( + f"expected_frame_count must be >0, got {expected_frame_count}" + ) + received = min(len(samples), expected_frame_count) + drop_ratio = (expected_frame_count - received) / expected_frame_count + partitions = _partition_stage_samples(stage_samples or {}) + return LatencyReport( + config_id=config_id, + samples=tuple(samples), + expected_frame_count=expected_frame_count, + p50_ms=_percentile(latencies, 50.0), + p95_ms=_percentile(latencies, 95.0), + p99_ms=_percentile(latencies, 99.0), + max_ms=max(latencies) if latencies else None, + frame_drop_ratio=drop_ratio, + stage_partitions=tuple(partitions), + p95_budget_ms=p95_budget_ms, + frame_drop_budget=frame_drop_budget, + chamber_unavailable=chamber_unavailable, + ) + + +def _partition_stage_samples( + stage_samples: dict[str, Sequence[float]], +) -> list[StagePartition]: + partitions: list[StagePartition] = [] + for stage_name in sorted(stage_samples.keys()): + values = list(stage_samples[stage_name]) + partitions.append( + StagePartition( + stage_name=stage_name, + p50_ms=_percentile(values, 50.0), + p95_ms=_percentile(values, 95.0), + p99_ms=_percentile(values, 99.0), + sample_count=len(values), + ) + ) + return partitions + + +def write_csv_evidence(out_path: Path, reports: Sequence[LatencyReport]) -> Path: + """One-row-per-config summary.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "config_id", + "sample_count", + "expected_frame_count", + "frame_drop_ratio", + "p50_ms", + "p95_ms", + "p99_ms", + "max_ms", + "p95_budget_ms", + "frame_drop_budget", + "chamber_unavailable", + "ac2_or_ac3_p95_passes", + "ac4_frame_drop_passes", + "passes", + ] + ) + for r in reports: + writer.writerow( + [ + r.config_id, + r.sample_count, + r.expected_frame_count, + f"{r.frame_drop_ratio:.4f}", + "" if r.p50_ms is None else f"{r.p50_ms:.3f}", + "" if r.p95_ms is None else f"{r.p95_ms:.3f}", + "" if r.p99_ms is None else f"{r.p99_ms:.3f}", + "" if r.max_ms is None else f"{r.max_ms:.3f}", + f"{r.p95_budget_ms:.3f}", + f"{r.frame_drop_budget:.4f}", + "true" if r.chamber_unavailable else "false", + "true" if r.passes_p95 else "false", + "true" if r.passes_frame_drop else "false", + "true" if r.passes else "false", + ] + ) + return out_path + + +def write_per_frame_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path: + """One row per frame per config — detail for outlier investigation.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + ["config_id", "frame_id", "t_capture_ms", "t_emit_at_sitl_ms", "latency_ms"] + ) + for r in reports: + for s in r.samples: + writer.writerow( + [ + r.config_id, + s.frame_id, + s.t_capture_ms, + s.t_emit_at_sitl_ms, + f"{s.latency_ms:.3f}", + ] + ) + return out_path + + +def write_partition_csv(out_path: Path, reports: Sequence[LatencyReport]) -> Path: + """Per-stage partition table — AC-5 informational evidence.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + ["config_id", "stage_name", "sample_count", "p50_ms", "p95_ms", "p99_ms"] + ) + for r in reports: + for p in r.stage_partitions: + writer.writerow( + [ + r.config_id, + p.stage_name, + p.sample_count, + "" if p.p50_ms is None else f"{p.p50_ms:.3f}", + "" if p.p95_ms is None else f"{p.p95_ms:.3f}", + "" if p.p99_ms is None else f"{p.p99_ms:.3f}", + ] + ) + return out_path diff --git a/e2e/runner/helpers/spoof_promotion_evaluator.py b/e2e/runner/helpers/spoof_promotion_evaluator.py new file mode 100644 index 0000000..3fea88b --- /dev/null +++ b/e2e/runner/helpers/spoof_promotion_evaluator.py @@ -0,0 +1,222 @@ +"""Spoofing-promotion latency evaluator for NFT-PERF-04 (AZ-431 / AC-NEW-2). + +Per AC-NEW-2 the time from a blackout+spoof event to the SUT correctly +labeling its emission ``dead_reckoned`` must satisfy +``p95(latency) ≤ SPOOF_PROMOTION_BUDGET_MS`` (=600 ms). + +The scenario test gathers N≥``MIN_EVENT_COUNT`` events at randomized +window starts (the random sampling is owned by the fixture builder — +AZ-431 is statistical, FT-N-04 / AZ-426 is functional), measures the +per-event ``t_label_switch_to_dead_reckoned − t_blackout_onset``, and +runs the aggregate p95 check via ``evaluate``. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. Reuses +``runner.helpers.streaming_evaluator._percentile`` for the linear- +interpolation p95 — both NFT-PERF tests measure latencies as the same +shape of distribution. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +from .streaming_evaluator import _percentile + +# AC-NEW-2 budget — 600 ms on Tier-1 or Tier-2. +SPOOF_PROMOTION_BUDGET_MS = 600.0 +# Statistical confidence floor — AZ-431 spec sets N=20 as default. +MIN_EVENT_COUNT = 20 +DEAD_RECKONED_LABEL = "dead_reckoned" + + +@dataclass(frozen=True) +class OutboundLabelSample: + """One SUT outbound emission projected for AC-NEW-2.""" + + monotonic_ms: int + source_label: str + + +@dataclass(frozen=True) +class SpoofEvent: + """One blackout+spoof event and the labels observed afterwards. + + ``samples`` should cover at least the window starting at + ``blackout_onset_ms`` and extending past the expected first + ``dead_reckoned`` emission. The evaluator scans them in order. + """ + + event_id: str + blackout_onset_ms: int + samples: Sequence[OutboundLabelSample] + + +@dataclass(frozen=True) +class EventLatencyReport: + """Per-event latency outcome. + + ``latency_ms`` is ``None`` when no ``dead_reckoned`` emission was + observed after ``blackout_onset_ms`` — that's a categorical miss + (treated as a budget breach for the aggregate verdict). + """ + + event_id: str + blackout_onset_ms: int + first_dead_reckoned_ms: int | None + latency_ms: int | None + + @property + def has_promotion(self) -> bool: + return self.first_dead_reckoned_ms is not None + + +@dataclass(frozen=True) +class SpoofPromotionReport: + """Aggregate NFT-PERF-04 result over N events.""" + + events: tuple[EventLatencyReport, ...] + p50_ms: float | None + p95_ms: float | None + p99_ms: float | None + max_ms: float | None + missing_promotions: int + min_event_count: int + budget_ms: float + + @property + def event_count(self) -> int: + return len(self.events) + + @property + def passes_event_count(self) -> bool: + return self.event_count >= self.min_event_count + + @property + def passes_p95(self) -> bool: + return ( + self.missing_promotions == 0 + and self.p95_ms is not None + and self.p95_ms <= self.budget_ms + ) + + @property + def passes(self) -> bool: + return self.passes_event_count and self.passes_p95 + + +def measure_event_latency(event: SpoofEvent) -> EventLatencyReport: + """Compute promotion latency for one event. + + Walks ``event.samples`` in ascending ``monotonic_ms``, finds the first + sample with ``source_label == "dead_reckoned"`` AND + ``monotonic_ms >= blackout_onset_ms``, and returns + ``first_dead_reckoned_ms − blackout_onset_ms``. Returns ``None`` + for both ``first_dead_reckoned_ms`` and ``latency_ms`` if no such + sample exists. + """ + ordered = sorted(event.samples, key=lambda s: s.monotonic_ms) + for s in ordered: + if s.monotonic_ms < event.blackout_onset_ms: + continue + if s.source_label == DEAD_RECKONED_LABEL: + return EventLatencyReport( + event_id=event.event_id, + blackout_onset_ms=event.blackout_onset_ms, + first_dead_reckoned_ms=int(s.monotonic_ms), + latency_ms=int(s.monotonic_ms - event.blackout_onset_ms), + ) + return EventLatencyReport( + event_id=event.event_id, + blackout_onset_ms=event.blackout_onset_ms, + first_dead_reckoned_ms=None, + latency_ms=None, + ) + + +def evaluate( + events: Sequence[SpoofEvent], + *, + budget_ms: float = SPOOF_PROMOTION_BUDGET_MS, + min_event_count: int = MIN_EVENT_COUNT, +) -> SpoofPromotionReport: + """AC-1 (N events sampled) + AC-2 (p95 latency ≤ budget).""" + per_event = tuple(measure_event_latency(e) for e in events) + valid = [r.latency_ms for r in per_event if r.latency_ms is not None] + missing = sum(1 for r in per_event if not r.has_promotion) + return SpoofPromotionReport( + events=per_event, + p50_ms=_percentile(valid, 50.0), + p95_ms=_percentile(valid, 95.0), + p99_ms=_percentile(valid, 99.0), + max_ms=max(valid) if valid else None, + missing_promotions=missing, + min_event_count=min_event_count, + budget_ms=budget_ms, + ) + + +def write_csv_evidence(out_path: Path, report: SpoofPromotionReport) -> Path: + """Aggregate-summary CSV (one row per run).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "event_count", + "min_event_count", + "missing_promotions", + "p50_ms", + "p95_ms", + "p99_ms", + "max_ms", + "budget_ms", + "ac1_passes", + "ac2_passes", + "passes", + ] + ) + writer.writerow( + [ + report.event_count, + report.min_event_count, + report.missing_promotions, + "" if report.p50_ms is None else f"{report.p50_ms:.3f}", + "" if report.p95_ms is None else f"{report.p95_ms:.3f}", + "" if report.p99_ms is None else f"{report.p99_ms:.3f}", + "" if report.max_ms is None else f"{report.max_ms:.3f}", + f"{report.budget_ms:.3f}", + "true" if report.passes_event_count else "false", + "true" if report.passes_p95 else "false", + "true" if report.passes else "false", + ] + ) + return out_path + + +def write_per_event_csv(out_path: Path, report: SpoofPromotionReport) -> Path: + """Detail CSV: one row per event with onset / first-dead-reckoned / latency.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "event_id", + "blackout_onset_ms", + "first_dead_reckoned_ms", + "latency_ms", + ] + ) + for r in report.events: + writer.writerow( + [ + r.event_id, + r.blackout_onset_ms, + "" if r.first_dead_reckoned_ms is None else r.first_dead_reckoned_ms, + "" if r.latency_ms is None else r.latency_ms, + ] + ) + return out_path diff --git a/e2e/runner/helpers/streaming_evaluator.py b/e2e/runner/helpers/streaming_evaluator.py new file mode 100644 index 0000000..ee27801 --- /dev/null +++ b/e2e/runner/helpers/streaming_evaluator.py @@ -0,0 +1,314 @@ +"""Inter-emit interval evaluator for NFT-PERF-02 (AZ-429 / AC-4.4). + +The SUT promises that estimates are streamed frame-by-frame, NOT batched. +The contract is observable at the SITL boundary: the receipt timestamps of +consecutive accepted ``GPS_INPUT`` (ArduPilot) / ``MSP2_SENSOR_GPS`` +(iNav) messages should track the configured target cadence with little +jitter and never miss ≥3 consecutive emits. + +This module owns the pure-logic side. The scenario test +(``e2e/tests/performance/test_nft_perf_02_streaming.py``) is a thin +adapter that reads timestamps from ``sitl_observer`` and asks the +helpers below for the per-AC verdict. + +ACs evaluated (per AZ-429): + +* AC-1: ``p95(inter_emit_interval) ≤ STREAMING_P95_BUDGET_MS`` (=350 ms + at the 3 Hz target = inter-frame × 1.05). +* AC-2: no window contains ≥``MISSED_EMIT_WINDOW_LIMIT`` (=3) consecutive + missed emits, where a "missed emit" is an interval > + ``MISSED_EMIT_RATIO`` (=2.0) × target inter-frame. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol; reads only float lists of SITL-side +ms timestamps that the scenario adapter projects out of the boundary +observers. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from math import floor +from pathlib import Path +from typing import Iterable, Sequence + +# AC-1 — inter-frame × 1.05 at 3 Hz target (333.333 ms × 1.05 = 350 ms). +TARGET_FRAME_RATE_HZ = 3.0 +TARGET_INTER_FRAME_MS = 1000.0 / TARGET_FRAME_RATE_HZ # 333.333... ms +STREAMING_P95_BUDGET_MS = 350.0 +# AC-2 — a "missed emit" interval is > 2× target = >666 ms at 3 Hz. +MISSED_EMIT_RATIO = 2.0 +MISSED_EMIT_WINDOW_LIMIT = 3 + + +@dataclass(frozen=True) +class InterEmitReport: + """Aggregate AC-1 result for one run.""" + + sample_count: int + interval_count: int # = sample_count - 1 + p50_ms: float | None + p95_ms: float | None + p99_ms: float | None + max_ms: float | None + target_inter_frame_ms: float + budget_ms: float + + @property + def passes_p95(self) -> bool: + return self.p95_ms is not None and self.p95_ms <= self.budget_ms + + +@dataclass(frozen=True) +class MissedEmitWindow: + """One run of consecutive missed-emit intervals starting at a sample index.""" + + start_index: int # index into the SORTED timestamp list (0-based) + length: int + start_ms: float + end_ms: float + + +@dataclass(frozen=True) +class MissedEmitReport: + """AC-2 result: list of consecutive-missed-emit windows + verdict.""" + + missed_emit_threshold_ms: float + longest_run: int + windows: tuple[MissedEmitWindow, ...] + limit: int + + @property + def passes(self) -> bool: + return self.longest_run < self.limit + + +@dataclass(frozen=True) +class StreamingReport: + """Aggregate FT-PERF-02 result for one parameterized run.""" + + inter_emit: InterEmitReport + missed_emits: MissedEmitReport + + @property + def passes(self) -> bool: + return self.inter_emit.passes_p95 and self.missed_emits.passes + + +def _sorted_intervals_ms(emit_times_ms: Sequence[float]) -> list[float]: + """Return positive inter-emit intervals from a sorted timestamp list. + + Sorting is defensive — sitl_observer emits in monotonic order but the + helper must not silently produce negative intervals if a caller hands + in an unsorted list. + """ + if len(emit_times_ms) < 2: + return [] + ordered = sorted(float(t) for t in emit_times_ms) + return [ordered[i] - ordered[i - 1] for i in range(1, len(ordered))] + + +def _percentile(values: Sequence[float], q: float) -> float | None: + """Linear-interpolation percentile (``numpy.percentile``-equivalent). + + Returns ``None`` when ``values`` is empty so callers can distinguish + a no-data run from a zero-latency run. Accepts any real ``q`` in + [0, 100]; outside that range is a programmer error. + """ + if not 0.0 <= q <= 100.0: + raise ValueError(f"percentile q must be in [0, 100], got {q!r}") + if not values: + return None + ordered = sorted(values) + if len(ordered) == 1: + return ordered[0] + rank = (q / 100.0) * (len(ordered) - 1) + lo = floor(rank) + hi = min(lo + 1, len(ordered) - 1) + frac = rank - lo + return ordered[lo] + (ordered[hi] - ordered[lo]) * frac + + +def evaluate_inter_emit( + emit_times_ms: Sequence[float], + *, + target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, + budget_ms: float = STREAMING_P95_BUDGET_MS, +) -> InterEmitReport: + """AC-1: p95 inter-emit interval ≤ ``budget_ms``. + + Caller passes the SITL-side receipt timestamps (ms, any epoch — only + deltas matter). ``target_inter_frame_ms`` is recorded for the + evidence file but does not gate the verdict; ``budget_ms`` does. + """ + intervals = _sorted_intervals_ms(emit_times_ms) + return InterEmitReport( + sample_count=len(emit_times_ms), + interval_count=len(intervals), + p50_ms=_percentile(intervals, 50.0), + p95_ms=_percentile(intervals, 95.0), + p99_ms=_percentile(intervals, 99.0), + max_ms=max(intervals) if intervals else None, + target_inter_frame_ms=target_inter_frame_ms, + budget_ms=budget_ms, + ) + + +def evaluate_missed_emits( + emit_times_ms: Sequence[float], + *, + target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, + missed_ratio: float = MISSED_EMIT_RATIO, + limit: int = MISSED_EMIT_WINDOW_LIMIT, +) -> MissedEmitReport: + """AC-2: longest run of consecutive missed-emit intervals < ``limit``. + + A "missed emit" is an inter-emit interval that exceeds + ``missed_ratio × target_inter_frame_ms``. We collect every maximal + run of consecutive missed-emit intervals and the longest length. + """ + if missed_ratio <= 1.0: + raise ValueError( + f"missed_ratio must be > 1.0 (was {missed_ratio!r}) — equal or " + "below the target stride would flag every interval as missed" + ) + if limit < 1: + raise ValueError(f"limit must be >= 1 (was {limit!r})") + threshold = missed_ratio * target_inter_frame_ms + ordered = sorted(float(t) for t in emit_times_ms) + windows: list[MissedEmitWindow] = [] + # `run_start` is the sample index of the FIRST sample of an + # in-progress missed-interval run. Number of missed intervals in + # the open run after processing iteration `i` is `i - run_start`. + run_start: int | None = None + run_start_ms: float | None = None + longest = 0 + for i in range(1, len(ordered)): + delta = ordered[i] - ordered[i - 1] + if delta > threshold: + if run_start is None: + run_start = i - 1 + run_start_ms = ordered[i - 1] + longest = max(longest, i - run_start) + elif run_start is not None and run_start_ms is not None: + length = (i - 1) - run_start + windows.append( + MissedEmitWindow( + start_index=run_start, + length=length, + start_ms=run_start_ms, + end_ms=ordered[i - 1], + ) + ) + run_start = None + run_start_ms = None + if run_start is not None and run_start_ms is not None: + length = (len(ordered) - 1) - run_start + windows.append( + MissedEmitWindow( + start_index=run_start, + length=length, + start_ms=run_start_ms, + end_ms=ordered[-1], + ) + ) + longest = max(longest, length) + return MissedEmitReport( + missed_emit_threshold_ms=threshold, + longest_run=longest, + windows=tuple(windows), + limit=limit, + ) + + +def evaluate( + emit_times_ms: Sequence[float], + *, + target_inter_frame_ms: float = TARGET_INTER_FRAME_MS, + budget_ms: float = STREAMING_P95_BUDGET_MS, + missed_ratio: float = MISSED_EMIT_RATIO, + limit: int = MISSED_EMIT_WINDOW_LIMIT, +) -> StreamingReport: + """Run AC-1 + AC-2 over one boundary-observed emit-time list.""" + return StreamingReport( + inter_emit=evaluate_inter_emit( + emit_times_ms, + target_inter_frame_ms=target_inter_frame_ms, + budget_ms=budget_ms, + ), + missed_emits=evaluate_missed_emits( + emit_times_ms, + target_inter_frame_ms=target_inter_frame_ms, + missed_ratio=missed_ratio, + limit=limit, + ), + ) + + +def write_csv_evidence(out_path: Path, report: StreamingReport) -> Path: + """One-row evidence file naming the AC-1/AC-2 verdict + percentiles.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + r = report + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "sample_count", + "interval_count", + "p50_ms", + "p95_ms", + "p99_ms", + "max_ms", + "target_inter_frame_ms", + "p95_budget_ms", + "ac1_passes", + "missed_emit_threshold_ms", + "longest_missed_run", + "ac2_passes", + "passes", + ] + ) + ie = r.inter_emit + me = r.missed_emits + writer.writerow( + [ + ie.sample_count, + ie.interval_count, + "" if ie.p50_ms is None else f"{ie.p50_ms:.3f}", + "" if ie.p95_ms is None else f"{ie.p95_ms:.3f}", + "" if ie.p99_ms is None else f"{ie.p99_ms:.3f}", + "" if ie.max_ms is None else f"{ie.max_ms:.3f}", + f"{ie.target_inter_frame_ms:.3f}", + f"{ie.budget_ms:.3f}", + "true" if ie.passes_p95 else "false", + f"{me.missed_emit_threshold_ms:.3f}", + me.longest_run, + "true" if me.passes else "false", + "true" if r.passes else "false", + ] + ) + return out_path + + +def write_intervals_csv(out_path: Path, emit_times_ms: Iterable[float]) -> Path: + """Per-interval CSV for evidence (one row per consecutive pair). + + The aggregate ``write_csv_evidence`` row is the AC verdict; this + detail CSV is what a reviewer reads when the budget is breached. + """ + out_path.parent.mkdir(parents=True, exist_ok=True) + ordered = sorted(float(t) for t in emit_times_ms) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow(["index", "t_emit_ms", "inter_emit_ms"]) + for i, t in enumerate(ordered): + interval = (t - ordered[i - 1]) if i > 0 else "" + writer.writerow( + [ + i, + f"{t:.3f}", + "" if interval == "" else f"{interval:.3f}", + ] + ) + return out_path diff --git a/e2e/runner/helpers/ttff_evaluator.py b/e2e/runner/helpers/ttff_evaluator.py new file mode 100644 index 0000000..2e382de --- /dev/null +++ b/e2e/runner/helpers/ttff_evaluator.py @@ -0,0 +1,217 @@ +"""Cold-start TTFF evaluator for NFT-PERF-03 (AZ-430 / AC-NEW-1). + +The SUT promises a Time-To-First-Fix budget of 30 s p95 (and a relaxed +max ceiling of 45 s for tail-latency outlier detection) when started +from cold on Tier-2 (Jetson Orin Nano Super) hardware. AZ-430 collects +N≥``MIN_ITERATION_COUNT`` cold-start TTFF samples; this module owns the +pure-logic side: distribution stats + budget gates + evidence CSV. + +Per AZ-430: + +* AC-3: ``p95(TTFF) ≤ TTFF_P95_BUDGET_S`` (=30 s). +* AC-4: ``max(TTFF) ≤ TTFF_MAX_BUDGET_S`` (=45 s). + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. Re-uses +``streaming_evaluator._percentile`` for the linear-interpolation p95. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +from .streaming_evaluator import _percentile + +TTFF_P95_BUDGET_S = 30.0 +TTFF_MAX_BUDGET_S = 45.0 +MIN_ITERATION_COUNT = 10 + + +@dataclass(frozen=True) +class ColdStartIteration: + """One cold-start iteration outcome. + + ``ttff_s`` is the measured ``t_first_emission − t_first_frame_arrival`` + in seconds. ``None`` means the iteration timed out before producing + its first emission — categorical miss (treated as budget breach for + the aggregate verdict). + """ + + iteration_id: str + first_frame_arrival_ms: int + first_emission_ms: int | None + ttff_s: float | None + + @property + def emitted(self) -> bool: + return self.first_emission_ms is not None + + +@dataclass(frozen=True) +class TtffReport: + """Aggregate NFT-PERF-03 result over N iterations.""" + + iterations: tuple[ColdStartIteration, ...] + p50_s: float | None + p95_s: float | None + p99_s: float | None + max_s: float | None + missed_starts: int # iterations where ``ttff_s is None`` + min_iteration_count: int + p95_budget_s: float + max_budget_s: float + + @property + def iteration_count(self) -> int: + return len(self.iterations) + + @property + def passes_iteration_count(self) -> bool: + return self.iteration_count >= self.min_iteration_count + + @property + def passes_p95(self) -> bool: + return ( + self.missed_starts == 0 + and self.p95_s is not None + and self.p95_s <= self.p95_budget_s + ) + + @property + def passes_max(self) -> bool: + return ( + self.missed_starts == 0 + and self.max_s is not None + and self.max_s <= self.max_budget_s + ) + + @property + def passes(self) -> bool: + return self.passes_iteration_count and self.passes_p95 and self.passes_max + + +def measure_iteration( + iteration_id: str, + *, + first_frame_arrival_ms: int, + first_emission_ms: int | None, +) -> ColdStartIteration: + """Project a captured iteration into a typed sample. + + Negative TTFF (emission before first frame) is a fixture-shape error + and raises ``ValueError`` so the breach surfaces immediately instead + of producing a non-sensible report. + """ + if first_emission_ms is None: + return ColdStartIteration( + iteration_id=iteration_id, + first_frame_arrival_ms=int(first_frame_arrival_ms), + first_emission_ms=None, + ttff_s=None, + ) + delta_ms = int(first_emission_ms) - int(first_frame_arrival_ms) + if delta_ms < 0: + raise ValueError( + f"ttff iteration {iteration_id}: first_emission_ms " + f"({first_emission_ms}) precedes first_frame_arrival_ms " + f"({first_frame_arrival_ms}); fixture shape invalid" + ) + return ColdStartIteration( + iteration_id=iteration_id, + first_frame_arrival_ms=int(first_frame_arrival_ms), + first_emission_ms=int(first_emission_ms), + ttff_s=delta_ms / 1000.0, + ) + + +def evaluate( + iterations: Sequence[ColdStartIteration], + *, + p95_budget_s: float = TTFF_P95_BUDGET_S, + max_budget_s: float = TTFF_MAX_BUDGET_S, + min_iteration_count: int = MIN_ITERATION_COUNT, +) -> TtffReport: + """Aggregate iterations into AC-3 + AC-4 verdicts.""" + valid = [it.ttff_s for it in iterations if it.ttff_s is not None] + missed = sum(1 for it in iterations if not it.emitted) + return TtffReport( + iterations=tuple(iterations), + p50_s=_percentile(valid, 50.0), + p95_s=_percentile(valid, 95.0), + p99_s=_percentile(valid, 99.0), + max_s=max(valid) if valid else None, + missed_starts=missed, + min_iteration_count=min_iteration_count, + p95_budget_s=p95_budget_s, + max_budget_s=max_budget_s, + ) + + +def write_csv_evidence(out_path: Path, report: TtffReport) -> Path: + """Aggregate-summary CSV (one row per run).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "iteration_count", + "min_iteration_count", + "missed_starts", + "p50_s", + "p95_s", + "p99_s", + "max_s", + "p95_budget_s", + "max_budget_s", + "ac1_iteration_count_passes", + "ac3_p95_passes", + "ac4_max_passes", + "passes", + ] + ) + writer.writerow( + [ + report.iteration_count, + report.min_iteration_count, + report.missed_starts, + "" if report.p50_s is None else f"{report.p50_s:.3f}", + "" if report.p95_s is None else f"{report.p95_s:.3f}", + "" if report.p99_s is None else f"{report.p99_s:.3f}", + "" if report.max_s is None else f"{report.max_s:.3f}", + f"{report.p95_budget_s:.3f}", + f"{report.max_budget_s:.3f}", + "true" if report.passes_iteration_count else "false", + "true" if report.passes_p95 else "false", + "true" if report.passes_max else "false", + "true" if report.passes else "false", + ] + ) + return out_path + + +def write_per_iteration_csv(out_path: Path, report: TtffReport) -> Path: + """One row per iteration — detail used during AC-4 outlier investigation.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "iteration_id", + "first_frame_arrival_ms", + "first_emission_ms", + "ttff_s", + ] + ) + for it in report.iterations: + writer.writerow( + [ + it.iteration_id, + it.first_frame_arrival_ms, + "" if it.first_emission_ms is None else it.first_emission_ms, + "" if it.ttff_s is None else f"{it.ttff_s:.3f}", + ] + ) + return out_path diff --git a/e2e/tests/performance/test_nft_perf_01_e2e_latency.py b/e2e/tests/performance/test_nft_perf_01_e2e_latency.py new file mode 100644 index 0000000..c4bca22 --- /dev/null +++ b/e2e/tests/performance/test_nft_perf_01_e2e_latency.py @@ -0,0 +1,226 @@ +"""NFT-PERF-01 — End-to-end latency p95 (AZ-428 / AC-4.1 / D-CROSS-LATENCY-1). + +Tier-2 ONLY. Two configurations measured per +``(fc_adapter, vio_strategy)`` parameterization: + +* (a) ``k3-25c``: K=3 baseline at +25 °C ambient. +* (b) ``k2-hybrid-50c``: K=2 + Jacobian-cov hybrid auto-degrade at +50 °C. + +Each config exercises the same hard gate: ``p95(t_emit_at_sitl − +t_capture) ≤ 400 ms`` (AC-2 / AC-3) AND ``frame_drop_ratio ≤ 10 %`` +(AC-4). Per-stage partition (AC-5) is recorded for trend but is NOT +pass/fail. + +Pure-logic AC-2/3/4 covered by +``e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py``. + +Production dependency surfaced to AZ-595 / AZ-444 (Tier-2 runner): +``E2E_NFT_PERF_01_LATENCY_FIXTURE`` names a JSON file (absolute path or +relative to ``E2E_SITL_REPLAY_DIR``) shaped: + + { + "expected_frame_count": 900, + "configs": [ + { + "config_id": "k3-25c", + "chamber_unavailable": false, + "frames": [ + {"frame_id": "f0001", "t_capture_ms": 0, "t_emit_at_sitl_ms": 220}, + ... + ], + "stage_samples": { + "c1_okvis2": [150.0, 152.0, ...], + "c2_ultravpr": [50.0, ...], + ... + } + }, + ... + ] + } + +``chamber_unavailable`` defaults to false. For the ``k2-hybrid-50c`` +config it should be true when run on the workstation without a +chamber — surfaces as a flag in the evidence row. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import e2e_latency_evaluator as ee + +LATENCY_FIXTURE_ENV_VAR = "E2E_NFT_PERF_01_LATENCY_FIXTURE" +DEFAULT_FIXTURE_NAME = "nft_perf_01_latency.json" +REQUIRED_CONFIG_IDS = ("k3-25c", "k2-hybrid-50c") + + +@pytest.mark.tier2_only +@pytest.mark.scenario_id("nft-perf-01") +@pytest.mark.traces_to("AC-4.1,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6") +def test_nft_perf_01_e2e_latency( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-2 + AC-3 + AC-4 across both configs; AC-5 partition recorded only.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-PERF-01 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) with N≥900 captured " + "frames per config across both K=3@25°C and K=2@50°C. " + "Pure-logic AC-2/3/4 covered by " + "e2e/_unit_tests/helpers/test_e2e_latency_evaluator.py." + ) + + fixture_path = _resolve_latency_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-PERF-01: latency fixture not found at {fixture_path}. " + f"`{LATENCY_FIXTURE_ENV_VAR}` env var must point at a JSON file " + "carrying per-config frame samples (see scenario docstring). " + "Production dependency: AZ-595 + AZ-444." + ) + + expected_frames, configs = _load_latency_fixture(fixture_path) + config_ids = tuple(c["config_id"] for c in configs) + missing = [cid for cid in REQUIRED_CONFIG_IDS if cid not in config_ids] + if missing: + pytest.fail( + f"NFT-PERF-01: latency fixture {fixture_path} is missing required " + f"config_id(s) {missing}; both {REQUIRED_CONFIG_IDS} are required " + "for AC-4.1 + D-CROSS-LATENCY-1 coverage." + ) + + reports: list[ee.LatencyReport] = [] + for cfg in configs: + samples = [ + ee.measure_frame( + str(f.get("frame_id") or f"f{idx:04d}"), + t_capture_ms=int(f["t_capture_ms"]), + t_emit_at_sitl_ms=int(f["t_emit_at_sitl_ms"]), + ) + for idx, f in enumerate(cfg["frames"]) + ] + stage_samples = { + str(k): [float(v) for v in vs] + for k, vs in (cfg.get("stage_samples") or {}).items() + } + reports.append( + ee.evaluate( + config_id=cfg["config_id"], + samples=samples, + stage_samples=stage_samples, + expected_frame_count=expected_frames, + chamber_unavailable=bool(cfg.get("chamber_unavailable", False)), + ) + ) + + base = Path(evidence_dir) / "nft-perf-01" / f"{fc_adapter}-{vio_strategy}" + ee.write_csv_evidence(base.with_suffix(".csv"), reports) + ee.write_per_frame_csv( + base.with_name(base.name + "-per-frame").with_suffix(".csv"), reports + ) + ee.write_partition_csv( + base.with_name(base.name + "-partition").with_suffix(".csv"), reports + ) + + for r in reports: + nfr_recorder.record_metric( + f"nft_perf_01.{r.config_id}.frame_drop_ratio", + float(r.frame_drop_ratio), + ac_id="AC-4", + ) + if r.p50_ms is not None: + nfr_recorder.record_metric( + f"nft_perf_01.{r.config_id}.latency_ms_p50", float(r.p50_ms) + ) + if r.p95_ms is not None: + ac_id = "AC-3" if r.config_id == "k2-hybrid-50c" else "AC-2" + nfr_recorder.record_metric( + f"nft_perf_01.{r.config_id}.latency_ms_p95", + float(r.p95_ms), + ac_id=ac_id, + ) + if r.p99_ms is not None: + nfr_recorder.record_metric( + f"nft_perf_01.{r.config_id}.latency_ms_p99", float(r.p99_ms) + ) + + breaches = [] + for r in reports: + ac_id = "AC-3" if r.config_id == "k2-hybrid-50c" else "AC-2" + if not r.passes_p95: + breaches.append( + f"{ac_id} ({r.config_id}): p95 = {r.p95_ms} ms " + f"> budget {r.p95_budget_ms} ms" + ) + if not r.passes_frame_drop: + breaches.append( + f"AC-4 ({r.config_id}): frame_drop_ratio " + f"= {r.frame_drop_ratio:.4f} > budget " + f"{r.frame_drop_budget:.4f}" + ) + assert not breaches, "\n".join(breaches) + + +def _resolve_latency_fixture_path() -> Path: + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + raw = os.environ.get(LATENCY_FIXTURE_ENV_VAR, "").strip() + if not raw: + if root is None: + return Path(f"<{LATENCY_FIXTURE_ENV_VAR}-unset>") + return root / DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _load_latency_fixture(fixture_path: Path) -> tuple[int, list[dict]]: + payload = json.loads(fixture_path.read_text()) + if not isinstance(payload, dict): + pytest.fail( + f"NFT-PERF-01: latency fixture {fixture_path} must be a JSON " + f"object; got top-level type={type(payload).__name__}" + ) + expected_raw = payload.get("expected_frame_count", ee.DEFAULT_EXPECTED_FRAMES) + try: + expected = int(expected_raw) + except (TypeError, ValueError) as exc: + pytest.fail( + f"NFT-PERF-01: expected_frame_count in {fixture_path} must be " + f"an int: {exc}" + ) + configs = payload.get("configs") + if not isinstance(configs, list) or not configs: + pytest.fail( + f"NFT-PERF-01: latency fixture {fixture_path} must contain a " + f'non-empty "configs" list.' + ) + for idx, cfg in enumerate(configs): + if not isinstance(cfg, dict): + pytest.fail( + f"NFT-PERF-01: configs[{idx}] in {fixture_path} must be an " + f"object; got {type(cfg).__name__}" + ) + if "config_id" not in cfg: + pytest.fail( + f"NFT-PERF-01: configs[{idx}] in {fixture_path} missing " + f"required key `config_id`." + ) + frames = cfg.get("frames") + if not isinstance(frames, list): + pytest.fail( + f"NFT-PERF-01: configs[{idx}].frames in {fixture_path} " + f"must be a list of frame records." + ) + return expected, configs diff --git a/e2e/tests/performance/test_nft_perf_02_streaming.py b/e2e/tests/performance/test_nft_perf_02_streaming.py new file mode 100644 index 0000000..d54df92 --- /dev/null +++ b/e2e/tests/performance/test_nft_perf_02_streaming.py @@ -0,0 +1,160 @@ +"""NFT-PERF-02 — frame-by-frame streaming, no batching (AZ-429 / AC-4.4). + +Replays the 5-minute Derkachi flight at the 3 Hz target cadence; reads +SITL-side receipt timestamps for accepted GPS_INPUT (ArduPilot +mavproxy tlog) / MSP2_SENSOR_GPS (iNav SITL MSP capture) messages; +asserts: + +* AC-1: ``p95(inter_emit_interval) ≤ 350 ms`` (inter-frame × 1.05). +* AC-2: no window contains ≥3 consecutive missed emits. + +Tier-1 OR Tier-2; both parametrizations run. The pure-logic AC-1/AC-2 +evaluators are covered by +``e2e/_unit_tests/helpers/test_streaming_evaluator.py``. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import streaming_evaluator as ste + +DERKACHI_DIR = ( + Path(__file__).resolve().parents[3] + / "_docs" + / "00_problem" + / "input_data" + / "flight_derkachi" +) +DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" + +# 5 min Derkachi replay at 3 Hz target. The window length feeds into the +# iNav MSP collector; the ArduPilot path reads the tlog regardless of +# `window_s` (the tlog encodes its own duration). +REPLAY_WINDOW_S = 300.0 +INAV_MSP_PORT = 5760 +ARDUPILOT_GPS_MSG_KIND = "GPS_INPUT" + + +@pytest.mark.scenario_id("nft-perf-02") +@pytest.mark.traces_to("AC-4.4,AC-1,AC-2,AC-3") +def test_nft_perf_02_streaming_inter_emit( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """NFT-PERF-02 AC-1 + AC-2 across `(fc_adapter, vio_strategy)`.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-PERF-02 requires `E2E_SITL_REPLAY_DIR` to point at a prepared " + "SITL replay fixture (AZ-595) carrying the 5 min Derkachi @ 3 Hz " + "replay. AC-1/AC-2 pure-logic covered by " + "e2e/_unit_tests/helpers/test_streaming_evaluator.py." + ) + + from runner.helpers import mavproxy_tlog_reader, msp_frame_observer, sitl_observer + from runner.helpers.frame_source_replay import FrameSourceReplayer + from runner.helpers.replay_mode import NullFrameSink + + # 1. Drive the 5 min replay (3 Hz target inside the fixture). + FrameSourceReplayer(NullFrameSink()).replay_video(DERKACHI_MP4) + + # 2. Read SITL-side receipt timestamps for the FC-specific accepted GPS frame. + host = f"{fc_adapter}-sitl" + emit_times_ms = _read_emit_times_ms( + fc_adapter, + host, + sitl_observer=sitl_observer, + mavproxy_tlog_reader=mavproxy_tlog_reader, + ) + if not emit_times_ms: + pytest.fail( + f"NFT-PERF-02: SITL ({host}) reported zero accepted GPS frames " + "during the 5 min Derkachi replay. The replay fixture exists but " + "the SUT emitted nothing — fail-loud rather than skip." + ) + + # 3. Evaluate AC-1 + AC-2. + report = ste.evaluate(emit_times_ms) + + # 4. Emit per-interval + summary CSV evidence. + base = Path(evidence_dir) / "nft-perf-02" / f"{fc_adapter}-{vio_strategy}" + ste.write_csv_evidence(base.with_suffix(".csv"), report) + ste.write_intervals_csv( + base.with_name(base.name + "-intervals").with_suffix(".csv"), + emit_times_ms, + ) + + # 5. NFR metrics. + if report.inter_emit.p50_ms is not None: + nfr_recorder.record_metric( + "nft_perf_02.inter_emit_ms_p50", report.inter_emit.p50_ms + ) + if report.inter_emit.p95_ms is not None: + nfr_recorder.record_metric( + "nft_perf_02.inter_emit_ms_p95", + report.inter_emit.p95_ms, + ac_id="AC-1", + ) + if report.inter_emit.max_ms is not None: + nfr_recorder.record_metric( + "nft_perf_02.inter_emit_ms_max", report.inter_emit.max_ms + ) + nfr_recorder.record_metric( + "nft_perf_02.longest_missed_run", + float(report.missed_emits.longest_run), + ac_id="AC-2", + ) + + # 6. AC assertions. + assert report.inter_emit.passes_p95, ( + f"AC-1: p95(inter_emit) > {ste.STREAMING_P95_BUDGET_MS} ms " + f"(got {report.inter_emit.p95_ms} ms over " + f"{report.inter_emit.interval_count} intervals; " + f"max={report.inter_emit.max_ms} ms)" + ) + assert report.missed_emits.passes, ( + f"AC-2: longest missed-emit run = {report.missed_emits.longest_run} " + f">= limit {report.missed_emits.limit}; " + f"first window @ " + f"{report.missed_emits.windows[0].start_ms if report.missed_emits.windows else 'n/a'} ms" + ) + + +def _read_emit_times_ms( + fc_adapter: str, + host: str, + *, + sitl_observer, # type: ignore[no-untyped-def] + mavproxy_tlog_reader, # type: ignore[no-untyped-def] +) -> list[float]: + """Project SITL-side accepted-GPS receipt timestamps into a ms list. + + * ArduPilot: filter mavproxy tlog for ``GPS_INPUT`` and project + ``timestamp_us / 1000``. + * iNav: ``collect_inav_msp_frames`` then filter for + ``MSP2_SENSOR_GPS`` (function id ``0x1F03``) and project + ``monotonic_ms`` directly. + """ + if fc_adapter == "ardupilot": + tlog_path = sitl_observer.capture_ap_tlog(host=host, duration_s=REPLAY_WINDOW_S) + return [ + float(msg.timestamp_us) / 1000.0 + for msg in mavproxy_tlog_reader.iter_messages(tlog_path) + if msg.msg_type == ARDUPILOT_GPS_MSG_KIND + ] + if fc_adapter == "inav": + capture = sitl_observer.collect_inav_msp_frames( + host=host, port=INAV_MSP_PORT, window_s=REPLAY_WINDOW_S + ) + return [ + float(f.monotonic_ms) + for f in capture.frames + if f.function_id == msp_frame_observer.MSP2_SENSOR_GPS_FUNCTION_ID + ] + raise ValueError(f"unknown fc_adapter {fc_adapter!r}") diff --git a/e2e/tests/performance/test_nft_perf_03_ttff.py b/e2e/tests/performance/test_nft_perf_03_ttff.py new file mode 100644 index 0000000..fe95c4d --- /dev/null +++ b/e2e/tests/performance/test_nft_perf_03_ttff.py @@ -0,0 +1,189 @@ +"""NFT-PERF-03 — Cold-start Time-To-First-Fix (AZ-430 / AC-NEW-1). + +Tier-2 ONLY. N≥10 cold-start iterations; each measures +``t_first_emission − t_first_frame_arrival``; asserts: + +* AC-3: ``p95(TTFF) ≤ 30 s``. +* AC-4: ``max(TTFF) ≤ 45 s``. + +Per-iteration cleanup (fdr-output volume wipe + SITL cold-boot reload ++ SUT lifecycle restart) is owned by the Tier-2 Jetson harness +(AZ-444). The runner-side scenario here only consumes a fixture that +encodes the N captured ``(first_frame_arrival_ms, first_emission_ms)`` +pairs. + +Production dependency surfaced to AZ-595 / AZ-444: the +``E2E_NFT_PERF_03_TTFF_FIXTURE`` env var names a JSON file (absolute +path or relative to ``E2E_SITL_REPLAY_DIR``) with shape: + + { + "iterations": [ + { + "iteration_id": "iter-01", + "first_frame_arrival_ms": 1234, + "first_emission_ms": 16789 + }, + ... + ] + } + +``first_emission_ms`` may be ``null`` for a timed-out iteration — +counted as ``missed_starts`` and treated as a budget breach. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import ttff_evaluator as te + +TTFF_FIXTURE_ENV_VAR = "E2E_NFT_PERF_03_TTFF_FIXTURE" +TTFF_DEFAULT_FIXTURE_NAME = "nft_perf_03_ttff.json" + + +@pytest.mark.tier2_only +@pytest.mark.scenario_id("nft-perf-03") +@pytest.mark.traces_to("AC-NEW-1,AC-1,AC-2,AC-3,AC-4,AC-5") +def test_nft_perf_03_cold_start_ttff( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-3 + AC-4 + iteration-count gate across ``(fc_adapter, vio_strategy)``.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-PERF-03 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) containing N≥10 cold-start " + "iterations. Pure-logic AC-3/AC-4 covered by " + "e2e/_unit_tests/helpers/test_ttff_evaluator.py." + ) + + fixture_path = _resolve_ttff_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-PERF-03: TTFF fixture not found at {fixture_path}. " + f"`{TTFF_FIXTURE_ENV_VAR}` env var must point at a JSON file " + "carrying N≥10 cold-start iteration records (see scenario " + "docstring). Production dependency: AZ-595 + AZ-444." + ) + + iterations = _load_iterations(fixture_path) + if not iterations: + pytest.fail( + f"NFT-PERF-03: TTFF fixture {fixture_path} contains zero " + "iterations. AZ-430 requires N≥10." + ) + + report = te.evaluate(iterations) + + base = Path(evidence_dir) / "nft-perf-03" / f"{fc_adapter}-{vio_strategy}" + te.write_csv_evidence(base.with_suffix(".csv"), report) + te.write_per_iteration_csv( + base.with_name(base.name + "-per-iter").with_suffix(".csv"), + report, + ) + + nfr_recorder.record_metric( + "nft_perf_03.iteration_count", float(report.iteration_count), ac_id="AC-3" + ) + nfr_recorder.record_metric( + "nft_perf_03.missed_starts", float(report.missed_starts) + ) + if report.p50_s is not None: + nfr_recorder.record_metric("nft_perf_03.ttff_s_p50", float(report.p50_s)) + if report.p95_s is not None: + nfr_recorder.record_metric( + "nft_perf_03.ttff_s_p95", float(report.p95_s), ac_id="AC-3" + ) + if report.max_s is not None: + nfr_recorder.record_metric( + "nft_perf_03.ttff_s_max", float(report.max_s), ac_id="AC-4" + ) + + assert report.passes_iteration_count, ( + f"AC-1 (iteration count): collected only {report.iteration_count} " + f"iterations; require N ≥ {report.min_iteration_count}" + ) + assert report.passes_p95, ( + f"AC-3: p95(TTFF) = {report.p95_s} s > budget " + f"{report.p95_budget_s} s " + f"(missed_starts={report.missed_starts})" + ) + assert report.passes_max, ( + f"AC-4: max(TTFF) = {report.max_s} s > budget " + f"{report.max_budget_s} s " + f"(missed_starts={report.missed_starts})" + ) + + +def _resolve_ttff_fixture_path() -> Path: + raw = os.environ.get(TTFF_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{TTFF_FIXTURE_ENV_VAR}-unset>") + return root / TTFF_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _load_iterations(fixture_path: Path) -> list[te.ColdStartIteration]: + payload = json.loads(fixture_path.read_text()) + raw = payload.get("iterations") if isinstance(payload, dict) else None + if not isinstance(raw, list): + pytest.fail( + f"NFT-PERF-03: TTFF fixture {fixture_path} must be a JSON object " + f'with key "iterations" → list; got top-level ' + f"type={type(payload).__name__}" + ) + parsed: list[te.ColdStartIteration] = [] + for idx, entry in enumerate(raw): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-PERF-03: iterations[{idx}] in {fixture_path} must be " + f"an object; got {type(entry).__name__}" + ) + iter_id = str(entry.get("iteration_id") or f"iter-{idx:02d}") + try: + arrival = int(entry["first_frame_arrival_ms"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-PERF-03: iterations[{idx}].first_frame_arrival_ms " + f"in {fixture_path} must be an int ms timestamp: {exc}" + ) + first_emission_raw = entry.get("first_emission_ms") + first_emission: int | None + if first_emission_raw is None: + first_emission = None + else: + try: + first_emission = int(first_emission_raw) + except (TypeError, ValueError) as exc: + pytest.fail( + f"NFT-PERF-03: iterations[{idx}].first_emission_ms " + f"in {fixture_path} must be int or null: {exc}" + ) + try: + parsed.append( + te.measure_iteration( + iter_id, + first_frame_arrival_ms=arrival, + first_emission_ms=first_emission, + ) + ) + except ValueError as exc: + pytest.fail( + f"NFT-PERF-03: iterations[{idx}] in {fixture_path} rejected: {exc}" + ) + return parsed diff --git a/e2e/tests/performance/test_nft_perf_04_spoof_promotion.py b/e2e/tests/performance/test_nft_perf_04_spoof_promotion.py new file mode 100644 index 0000000..670207a --- /dev/null +++ b/e2e/tests/performance/test_nft_perf_04_spoof_promotion.py @@ -0,0 +1,193 @@ +"""NFT-PERF-04 — Spoofing-promotion latency (AZ-431 / AC-NEW-2). + +Replays N≥20 blackout+spoof events at randomized window starts; per +event measures ``t_label_switch_to_dead_reckoned − t_blackout_onset``; +asserts ``p95(latency) ≤ 600 ms``. + +Tier-1 OR Tier-2. The pure-logic AC-1/AC-2 evaluators are covered by +``e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py``. + +Production dependency surfaced to AZ-595 (fixture builder): the +``E2E_NFT_PERF_04_EVENTS_FIXTURE`` env var names a JSON file under +``E2E_SITL_REPLAY_DIR`` carrying the N≥20 sampled events. Each entry +encodes the injector-emitted ``blackout_onset_ms`` AND the per-event +sequence of outbound ``(monotonic_ms, source_label)`` samples observed +from SITL. Shape (validated at parse time): + + { + "events": [ + { + "event_id": "evt-01", + "blackout_onset_ms": 45123, + "samples": [ + {"monotonic_ms": 45050, "source_label": "satellite_anchored"}, + {"monotonic_ms": 45380, "source_label": "dead_reckoned"}, + ... + ] + }, + ... + ] + } + +When the env var is unset OR the file is missing, the scenario skips +with a fail-loud reason listing the missing fixture path. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import spoof_promotion_evaluator as spe + +EVENTS_FIXTURE_ENV_VAR = "E2E_NFT_PERF_04_EVENTS_FIXTURE" + + +@pytest.mark.scenario_id("nft-perf-04") +@pytest.mark.traces_to("AC-NEW-2,AC-1,AC-2,AC-3") +def test_nft_perf_04_spoof_promotion_latency( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1 (N≥20 events sampled) + AC-2 (p95 ≤ 600 ms).""" + if not sitl_replay_ready: + pytest.skip( + "NFT-PERF-04 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) containing N≥20 " + "randomized-start blackout+spoof events. Pure-logic AC-1/AC-2 " + "covered by e2e/_unit_tests/helpers/test_spoof_promotion_evaluator.py." + ) + + fixture_path = _resolve_events_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-PERF-04: events fixture not found at {fixture_path}. " + f"`{EVENTS_FIXTURE_ENV_VAR}` env var must point at a JSON file " + "(absolute path, or relative to `E2E_SITL_REPLAY_DIR`) carrying " + "the N≥20 sampled blackout+spoof events (see scenario docstring " + "for shape). Production dependency: AZ-595 fixture builder." + ) + + events = _load_events(fixture_path) + if not events: + pytest.fail( + f"NFT-PERF-04: events fixture {fixture_path} contains zero events. " + "Fail-loud per the tests-as-gates discipline; AZ-431 requires N≥20." + ) + + report = spe.evaluate(events) + + base = Path(evidence_dir) / "nft-perf-04" / f"{fc_adapter}-{vio_strategy}" + spe.write_csv_evidence(base.with_suffix(".csv"), report) + spe.write_per_event_csv( + base.with_name(base.name + "-per-event").with_suffix(".csv"), + report, + ) + + nfr_recorder.record_metric( + "nft_perf_04.event_count", float(report.event_count), ac_id="AC-1" + ) + nfr_recorder.record_metric( + "nft_perf_04.missing_promotions", float(report.missing_promotions) + ) + if report.p50_ms is not None: + nfr_recorder.record_metric( + "nft_perf_04.latency_ms_p50", float(report.p50_ms) + ) + if report.p95_ms is not None: + nfr_recorder.record_metric( + "nft_perf_04.latency_ms_p95", float(report.p95_ms), ac_id="AC-2" + ) + if report.p99_ms is not None: + nfr_recorder.record_metric( + "nft_perf_04.latency_ms_p99", float(report.p99_ms) + ) + if report.max_ms is not None: + nfr_recorder.record_metric( + "nft_perf_04.latency_ms_max", float(report.max_ms) + ) + + assert report.passes_event_count, ( + f"AC-1: only {report.event_count} events sampled; " + f"AC-NEW-2 requires N ≥ {report.min_event_count}" + ) + assert report.passes_p95, ( + f"AC-2: p95(latency_ms) = {report.p95_ms} > budget " + f"{report.budget_ms} ms (missing_promotions={report.missing_promotions})" + ) + + +def _resolve_events_fixture_path() -> Path: + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + raw = os.environ.get(EVENTS_FIXTURE_ENV_VAR, "").strip() + if not raw: + if root is None: + return Path(f"<{EVENTS_FIXTURE_ENV_VAR}-unset>") + return root / "nft_perf_04_events.json" + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _load_events(fixture_path: Path) -> list[spe.SpoofEvent]: + """Parse the fixture into ``SpoofEvent`` list (fail-loud on malformed shape).""" + payload = json.loads(fixture_path.read_text()) + raw_events = payload.get("events") if isinstance(payload, dict) else None + if not isinstance(raw_events, list): + pytest.fail( + f"NFT-PERF-04: events fixture {fixture_path} must be a JSON object " + f'with key "events" → list; got top-level type={type(payload).__name__}' + ) + parsed: list[spe.SpoofEvent] = [] + for idx, entry in enumerate(raw_events): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-PERF-04: events[{idx}] in {fixture_path} must be an " + f"object; got {type(entry).__name__}" + ) + event_id = entry.get("event_id") or f"evt-{idx:02d}" + try: + onset = int(entry["blackout_onset_ms"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-PERF-04: events[{idx}].blackout_onset_ms in " + f"{fixture_path} must be an integer ms timestamp: {exc}" + ) + samples_raw = entry.get("samples") + if not isinstance(samples_raw, list): + pytest.fail( + f"NFT-PERF-04: events[{idx}].samples in {fixture_path} must " + f"be a list of {{monotonic_ms, source_label}} objects" + ) + samples: list[spe.OutboundLabelSample] = [] + for j, s in enumerate(samples_raw): + try: + samples.append( + spe.OutboundLabelSample( + monotonic_ms=int(s["monotonic_ms"]), + source_label=str(s["source_label"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-PERF-04: events[{idx}].samples[{j}] in " + f"{fixture_path} malformed: {exc}" + ) + parsed.append( + spe.SpoofEvent( + event_id=str(event_id), + blackout_onset_ms=onset, + samples=tuple(samples), + ) + ) + return parsed