Files
gps-denied-onboard/e2e/tests/resilience/test_nft_res_03_monte_carlo.py
T
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

236 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NFT-RES-03 — 100-iteration Monte Carlo statistical envelope (AZ-434 / AC-NEW-4).
Tier-1 OR Tier-2. The runner orchestrates 100 Derkachi replays with
seeded perturbations (gain noise, IMU bias, frame-drop, outlier
injection) and supplies this scenario with a captured fixture
containing per-iteration per-frame ``(error_m, cov_semi_major_m)``
pairs. The scenario validates:
* AC-1 — iteration_count ≥ 100.
* AC-2 — same master_seed yields bit-identical iteration outcomes
(verified by re-evaluating the same fixture twice and comparing
``determinism_fingerprint``).
* AC-3 — global aggregate envelope:
``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95``.
* AC-4 — parameterization: SHOULD run only one canonical
parameterization per CI invocation by default; full-matrix mode
gated behind ``E2E_NFT_RES_03_FULL_MATRIX=1``. The scenario uses
``fc_adapter`` + ``vio_strategy`` fixtures so the harness param
matrix decides which combinations to run.
Production dependency surfaced to AZ-595: the
``E2E_NFT_RES_03_FIXTURE`` env var names a JSON file with shape:
{
"master_seed": <int>,
"iterations": [
{
"iteration_id": "iter-001",
"iteration_seed": <int>,
"samples": [{"error_m": <f>, "cov_semi_major_m": <f>}, ...]
},
...
]
}
The harness MAY emit the fixture with a single canonical parameterization
per CI invocation by default — ``E2E_NFT_RES_03_FULL_MATRIX=1``
unlocks the full 100 × N_params expansion.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import monte_carlo_envelope_evaluator as mce
NFT_RES_03_FIXTURE_ENV_VAR = "E2E_NFT_RES_03_FIXTURE"
NFT_RES_03_DEFAULT_FIXTURE_NAME = "nft_res_03_monte_carlo.json"
NFT_RES_03_FULL_MATRIX_ENV_VAR = "E2E_NFT_RES_03_FULL_MATRIX"
NFT_RES_03_CANONICAL_FC = "ardupilot"
NFT_RES_03_CANONICAL_VIO = "okvis2"
@pytest.mark.scenario_id("nft-res-03")
@pytest.mark.traces_to("AC-NEW-4,AC-1,AC-2,AC-3,AC-4")
def test_nft_res_03_monte_carlo(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-1 (iteration count) + AC-2 (determinism) + AC-3 (envelope) + AC-4 (param)."""
if not _full_matrix_enabled() and (
fc_adapter != NFT_RES_03_CANONICAL_FC
or vio_strategy != NFT_RES_03_CANONICAL_VIO
):
pytest.skip(
f"NFT-RES-03 AC-4: by default runs only canonical "
f"({NFT_RES_03_CANONICAL_FC}, {NFT_RES_03_CANONICAL_VIO}); "
f"set {NFT_RES_03_FULL_MATRIX_ENV_VAR}=1 to enable the "
f"100 × N_params full-matrix expansion."
)
if not sitl_replay_ready:
pytest.skip(
"NFT-RES-03 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying N≥100 "
"Monte Carlo iterations. Pure-logic AC-1 + AC-2 + AC-3 "
"covered by "
"e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-RES-03: fixture not found at {fixture_path}. "
f"`{NFT_RES_03_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
master_seed, iterations = _parse_payload(payload, fixture_path)
report1 = mce.evaluate(iterations, master_seed=master_seed)
report2 = mce.evaluate(iterations, master_seed=master_seed)
fingerprint = mce.determinism_fingerprint(report1)
fingerprint2 = mce.determinism_fingerprint(report2)
out_base = (
evidence_dir
/ "nft-res-03"
/ f"{fc_adapter}-{vio_strategy}"
)
mce.write_csv_evidence(out_base.with_suffix(".csv"), report1)
mce.write_per_iteration_csv(
out_base.with_name(out_base.name + "-per-iter").with_suffix(".csv"),
report1,
)
nfr_recorder.record_metric(
"nft_res_03.iteration_count", float(report1.iteration_count), ac_id="AC-1"
)
nfr_recorder.record_metric(
"nft_res_03.total_samples", float(report1.total_samples)
)
if report1.envelope_ratio is not None:
nfr_recorder.record_metric(
"nft_res_03.envelope_ratio", float(report1.envelope_ratio), ac_id="AC-3"
)
nfr_recorder.record_metric(
"nft_res_03.master_seed", float(report1.master_seed)
)
assert report1.passes_iteration_count, (
f"AC-1: iteration_count={report1.iteration_count} < required "
f"{report1.min_iteration_count}"
)
assert fingerprint == fingerprint2, (
f"AC-2: determinism fingerprint differs across two evaluations of the "
f"same fixture: {fingerprint} vs {fingerprint2}"
)
assert report1.passes_envelope, (
f"AC-3: envelope ratio = {report1.envelope_ratio} < budget "
f"{report1.envelope_ratio_budget} "
f"(covered={report1.covered_samples}/{report1.total_samples})"
)
def _full_matrix_enabled() -> bool:
return os.environ.get(NFT_RES_03_FULL_MATRIX_ENV_VAR, "").strip() in {"1", "true", "yes"}
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_RES_03_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_RES_03_FIXTURE_ENV_VAR}-unset>")
return root / NFT_RES_03_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> tuple[int, list[mce.IterationOutcome]]:
if not isinstance(payload, dict):
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
try:
master_seed = int(payload["master_seed"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} missing/invalid "
f"'master_seed': {exc}"
)
raw_iters = payload.get("iterations")
if not isinstance(raw_iters, list) or not raw_iters:
pytest.fail(
f"NFT-RES-03: fixture {fixture_path} 'iterations' must be a "
f"non-empty list"
)
parsed: list[mce.IterationOutcome] = []
for idx, entry in enumerate(raw_iters):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-RES-03: iterations[{idx}] in {fixture_path} must be "
f"an object; got {type(entry).__name__}"
)
iter_id = str(entry.get("iteration_id") or f"iter-{idx:03d}")
try:
seed = int(entry["iteration_seed"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: iterations[{idx}].iteration_seed in "
f"{fixture_path} must be int: {exc}"
)
raw_samples = entry.get("samples")
if not isinstance(raw_samples, list):
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples in {fixture_path} "
f"must be a list of objects"
)
samples: list[mce.FrameSample] = []
for j, s in enumerate(raw_samples):
if not isinstance(s, dict):
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples[{j}] in "
f"{fixture_path} must be an object"
)
try:
samples.append(
mce.FrameSample(
error_m=float(s["error_m"]),
cov_semi_major_m=float(s["cov_semi_major_m"]),
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-03: iterations[{idx}].samples[{j}] in "
f"{fixture_path} shape invalid: {exc}"
)
parsed.append(
mce.IterationOutcome(
iteration_id=iter_id,
iteration_seed=seed,
samples=tuple(samples),
)
)
return master_seed, parsed