Files
gps-denied-onboard/e2e/tests/resilience/test_nft_res_03_monte_carlo.py
Oleksandr Bezdieniezhnykh 33e683dc0f [AZ-446] CSV reporter: band + ci95 annotations + report.csv emitter
Batch 89 — adds optional `band`, `ci95_low`, `ci95_high` kw-only
parameters to `_NfrRecorder.record_metric` and emits a new per-metric
report.csv artifact (one row per scenario × metric, columns:
scenario_id, metric_name, value, value_band, ci95_low, ci95_high,
ac_id, outcome). Backwards compatible — existing 4-arg callers
unchanged; unbalanced ci95 pair raises ValueError. report.csv is
written once per pytest session from `pytest_sessionfinish` so the
annotation pass runs once per CI invocation regardless of
(fc_adapter, vio_strategy) (AC-3). `regression-baseline.json`
intentionally kept flat to preserve the diff contract used by
regression-detection tooling.

NFT-RES-03 + NFT-PERF-01 scenarios updated to pass real bands and
compute empirical 2.5/97.5-percentile ci95 from their own sample
streams (per-iteration envelope ratios for Monte Carlo,
per-frame latency samples for N-sample latency).

Tests: 1229 e2e/_unit_tests pass (+6 vs. batch 88 for AZ-446
band/CI behavior, value-error on unbalanced ci95, report.csv columns,
explicit-path override, and end-to-end emission via the pytest
plugin). Code review: PASS_WITH_WARNINGS — 1 Low (empirical-CI
semantics, documented inline), 1 Medium carried over from batch 88's
cumulative-review backlog (write_csv_evidence + _resolve_fixture_path
duplication is outside AZ-446 reporting scope).

This commit closes Step 10 Implement Tests for cycle 1 (41 of 41
blackbox-test tasks done, AZ-406..AZ-446). Greenfield auto-chains to
Step 11 Run Tests next.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 18:14:00 +03:00

283 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NFT-RES-03 — 100-iteration Monte Carlo statistical envelope (AZ-434 / AC-NEW-4).
Tier-1 OR Tier-2. The runner orchestrates 100 Derkachi replays with
seeded perturbations (gain noise, IMU bias, frame-drop, outlier
injection) and supplies this scenario with a captured fixture
containing per-iteration per-frame ``(error_m, cov_semi_major_m)``
pairs. The scenario validates:
* AC-1 — iteration_count ≥ 100.
* AC-2 — same master_seed yields bit-identical iteration outcomes
(verified by re-evaluating the same fixture twice and comparing
``determinism_fingerprint``).
* AC-3 — global aggregate envelope:
``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95``.
* AC-4 — parameterization: SHOULD run only one canonical
parameterization per CI invocation by default; full-matrix mode
gated behind ``E2E_NFT_RES_03_FULL_MATRIX=1``. The scenario uses
``fc_adapter`` + ``vio_strategy`` fixtures so the harness param
matrix decides which combinations to run.
Production dependency surfaced to AZ-595: the
``E2E_NFT_RES_03_FIXTURE`` env var names a JSON file with shape:
{
"master_seed": <int>,
"iterations": [
{
"iteration_id": "iter-001",
"iteration_seed": <int>,
"samples": [{"error_m": <f>, "cov_semi_major_m": <f>}, ...]
},
...
]
}
The harness MAY emit the fixture with a single canonical parameterization
per CI invocation by default — ``E2E_NFT_RES_03_FULL_MATRIX=1``
unlocks the full 100 × N_params expansion.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import monte_carlo_envelope_evaluator as mce
NFT_RES_03_FIXTURE_ENV_VAR = "E2E_NFT_RES_03_FIXTURE"
NFT_RES_03_DEFAULT_FIXTURE_NAME = "nft_res_03_monte_carlo.json"
NFT_RES_03_FULL_MATRIX_ENV_VAR = "E2E_NFT_RES_03_FULL_MATRIX"
NFT_RES_03_CANONICAL_FC = "ardupilot"
NFT_RES_03_CANONICAL_VIO = "okvis2"
@pytest.mark.scenario_id("nft-res-03")
@pytest.mark.traces_to("AC-NEW-4,AC-1,AC-2,AC-3,AC-4")
def test_nft_res_03_monte_carlo(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-1 (iteration count) + AC-2 (determinism) + AC-3 (envelope) + AC-4 (param)."""
if not _full_matrix_enabled() and (
fc_adapter != NFT_RES_03_CANONICAL_FC
or vio_strategy != NFT_RES_03_CANONICAL_VIO
):
pytest.skip(
f"NFT-RES-03 AC-4: by default runs only canonical "
f"({NFT_RES_03_CANONICAL_FC}, {NFT_RES_03_CANONICAL_VIO}); "
f"set {NFT_RES_03_FULL_MATRIX_ENV_VAR}=1 to enable the "
f"100 × N_params full-matrix expansion."
)
if not sitl_replay_ready:
pytest.skip(
"NFT-RES-03 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying N≥100 "
"Monte Carlo iterations. Pure-logic AC-1 + AC-2 + AC-3 "
"covered by "
"e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-RES-03: fixture not found at {fixture_path}. "
f"`{NFT_RES_03_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
master_seed, iterations = _parse_payload(payload, fixture_path)
report1 = mce.evaluate(iterations, master_seed=master_seed)
report2 = mce.evaluate(iterations, master_seed=master_seed)
fingerprint = mce.determinism_fingerprint(report1)
fingerprint2 = mce.determinism_fingerprint(report2)
out_base = (
evidence_dir
/ "nft-res-03"
/ f"{fc_adapter}-{vio_strategy}"
)
mce.write_csv_evidence(out_base.with_suffix(".csv"), report1)
mce.write_per_iteration_csv(
out_base.with_name(out_base.name + "-per-iter").with_suffix(".csv"),
report1,
)
nfr_recorder.record_metric(
"nft_res_03.iteration_count",
float(report1.iteration_count),
ac_id="AC-1",
band=f"{report1.min_iteration_count} iterations",
)
nfr_recorder.record_metric(
"nft_res_03.total_samples", float(report1.total_samples)
)
if report1.envelope_ratio is not None:
# AZ-446 AC-2 — per-iteration envelope ratios provide the empirical
# 95% interval (2.5th / 97.5th percentile across 100 iterations).
per_iter_ratios = _per_iteration_envelope_ratios(report1)
ci_low, ci_high = _percentile_pair(per_iter_ratios, 2.5, 97.5)
nfr_recorder.record_metric(
"nft_res_03.envelope_ratio",
float(report1.envelope_ratio),
ac_id="AC-3",
band=f"{report1.envelope_ratio_budget:.2f}",
ci95_low=ci_low,
ci95_high=ci_high,
)
nfr_recorder.record_metric(
"nft_res_03.master_seed", float(report1.master_seed)
)
assert report1.passes_iteration_count, (
f"AC-1: iteration_count={report1.iteration_count} < required "
f"{report1.min_iteration_count}"
)
assert fingerprint == fingerprint2, (
f"AC-2: determinism fingerprint differs across two evaluations of the "
f"same fixture: {fingerprint} vs {fingerprint2}"
)
assert report1.passes_envelope, (
f"AC-3: envelope ratio = {report1.envelope_ratio} < budget "
f"{report1.envelope_ratio_budget} "
f"(covered={report1.covered_samples}/{report1.total_samples})"
)
def _full_matrix_enabled() -> bool:
return os.environ.get(NFT_RES_03_FULL_MATRIX_ENV_VAR, "").strip() in {"1", "true", "yes"}
def _per_iteration_envelope_ratios(report: mce.MonteCarloReport) -> list[float]:
"""Per-iteration ``covered/frames`` ratios (AZ-446 CI95 input)."""
ratios: list[float] = []
for it in report.iterations:
if not it.samples:
continue
covered = sum(
1
for s in it.samples
if s.error_m <= mce.ENVELOPE_MULTIPLIER * s.cov_semi_major_m
)
ratios.append(covered / len(it.samples))
return ratios
def _percentile_pair(
values: list[float], q_low: float, q_high: float
) -> tuple[float | None, float | None]:
"""Linear-interpolation percentiles. Returns ``(None, None)`` if empty."""
if not values:
return None, None
ordered = sorted(values)
if len(ordered) == 1:
return float(ordered[0]), float(ordered[0])
def _at(q: float) -> float:
rank = (q / 100.0) * (len(ordered) - 1)
lo = int(rank)
hi = min(lo + 1, len(ordered) - 1)
frac = rank - lo
return float(ordered[lo] + (ordered[hi] - ordered[lo]) * frac)
return _at(q_low), _at(q_high)
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_RES_03_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_RES_03_FIXTURE_ENV_VAR}-unset>")
return root / NFT_RES_03_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> tuple[int, list[mce.IterationOutcome]]:
if not isinstance(payload, dict):
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
try:
master_seed = int(payload["master_seed"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} missing/invalid "
f"'master_seed': {exc}"
)
raw_iters = payload.get("iterations")
if not isinstance(raw_iters, list) or not raw_iters:
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} 'iterations' must be a "
f"non-empty list"
)
parsed: list[mce.IterationOutcome] = []
for idx, entry in enumerate(raw_iters):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-RES-03: iterations[{idx}] in {fixture_path} must be "
f"an object; got {type(entry).__name__}"
)
iter_id = str(entry.get("iteration_id") or f"iter-{idx:03d}")
try:
seed = int(entry["iteration_seed"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: iterations[{idx}].iteration_seed in "
f"{fixture_path} must be int: {exc}"
)
raw_samples = entry.get("samples")
if not isinstance(raw_samples, list):
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples in {fixture_path} "
f"must be a list of objects"
)
samples: list[mce.FrameSample] = []
for j, s in enumerate(raw_samples):
if not isinstance(s, dict):
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples[{j}] in "
f"{fixture_path} must be an object"
)
try:
samples.append(
mce.FrameSample(
error_m=float(s["error_m"]),
cov_semi_major_m=float(s["cov_semi_major_m"]),
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples[{j}] in "
f"{fixture_path} shape invalid: {exc}"
)
parsed.append(
mce.IterationOutcome(
iteration_id=iter_id,
iteration_seed=seed,
samples=tuple(samples),
)
)
return master_seed, parsed