"""NFT-RES-03 — 100-iteration Monte Carlo statistical envelope (AZ-434 / AC-NEW-4). Tier-1 OR Tier-2. The runner orchestrates 100 Derkachi replays with seeded perturbations (gain noise, IMU bias, frame-drop, outlier injection) and supplies this scenario with a captured fixture containing per-iteration per-frame ``(error_m, cov_semi_major_m)`` pairs. The scenario validates: * AC-1 — iteration_count ≥ 100. * AC-2 — same master_seed yields bit-identical iteration outcomes (verified by re-evaluating the same fixture twice and comparing ``determinism_fingerprint``). * AC-3 — global aggregate envelope: ``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95``. * AC-4 — parameterization: SHOULD run only one canonical parameterization per CI invocation by default; full-matrix mode gated behind ``E2E_NFT_RES_03_FULL_MATRIX=1``. The scenario uses ``fc_adapter`` + ``vio_strategy`` fixtures so the harness param matrix decides which combinations to run. Production dependency surfaced to AZ-595: the ``E2E_NFT_RES_03_FIXTURE`` env var names a JSON file with shape: { "master_seed": , "iterations": [ { "iteration_id": "iter-001", "iteration_seed": , "samples": [{"error_m": , "cov_semi_major_m": }, ...] }, ... ] } The harness MAY emit the fixture with a single canonical parameterization per CI invocation by default — ``E2E_NFT_RES_03_FULL_MATRIX=1`` unlocks the full 100 × N_params expansion. """ from __future__ import annotations import json import os from pathlib import Path import pytest from runner.helpers import monte_carlo_envelope_evaluator as mce NFT_RES_03_FIXTURE_ENV_VAR = "E2E_NFT_RES_03_FIXTURE" NFT_RES_03_DEFAULT_FIXTURE_NAME = "nft_res_03_monte_carlo.json" NFT_RES_03_FULL_MATRIX_ENV_VAR = "E2E_NFT_RES_03_FULL_MATRIX" NFT_RES_03_CANONICAL_FC = "ardupilot" NFT_RES_03_CANONICAL_VIO = "okvis2" @pytest.mark.scenario_id("nft-res-03") @pytest.mark.traces_to("AC-NEW-4,AC-1,AC-2,AC-3,AC-4") def test_nft_res_03_monte_carlo( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """AC-1 (iteration count) + AC-2 (determinism) + AC-3 (envelope) + AC-4 (param).""" if not _full_matrix_enabled() and ( fc_adapter != NFT_RES_03_CANONICAL_FC or vio_strategy != NFT_RES_03_CANONICAL_VIO ): pytest.skip( f"NFT-RES-03 AC-4: by default runs only canonical " f"({NFT_RES_03_CANONICAL_FC}, {NFT_RES_03_CANONICAL_VIO}); " f"set {NFT_RES_03_FULL_MATRIX_ENV_VAR}=1 to enable the " f"100 × N_params full-matrix expansion." ) if not sitl_replay_ready: pytest.skip( "NFT-RES-03 requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture (AZ-595) carrying N≥100 " "Monte Carlo iterations. Pure-logic AC-1 + AC-2 + AC-3 " "covered by " "e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py." ) fixture_path = _resolve_fixture_path() if not fixture_path.is_file(): pytest.fail( f"NFT-RES-03: fixture not found at {fixture_path}. " f"`{NFT_RES_03_FIXTURE_ENV_VAR}` env var must point at a JSON " "file with the schema documented in the scenario docstring. " "Production dependency: AZ-595." ) payload = json.loads(fixture_path.read_text()) master_seed, iterations = _parse_payload(payload, fixture_path) report1 = mce.evaluate(iterations, master_seed=master_seed) report2 = mce.evaluate(iterations, master_seed=master_seed) fingerprint = mce.determinism_fingerprint(report1) fingerprint2 = mce.determinism_fingerprint(report2) out_base = ( evidence_dir / "nft-res-03" / f"{fc_adapter}-{vio_strategy}" ) mce.write_csv_evidence(out_base.with_suffix(".csv"), report1) mce.write_per_iteration_csv( out_base.with_name(out_base.name + "-per-iter").with_suffix(".csv"), report1, ) nfr_recorder.record_metric( "nft_res_03.iteration_count", float(report1.iteration_count), ac_id="AC-1", band=f"≥{report1.min_iteration_count} iterations", ) nfr_recorder.record_metric( "nft_res_03.total_samples", float(report1.total_samples) ) if report1.envelope_ratio is not None: # AZ-446 AC-2 — per-iteration envelope ratios provide the empirical # 95% interval (2.5th / 97.5th percentile across 100 iterations). per_iter_ratios = _per_iteration_envelope_ratios(report1) ci_low, ci_high = _percentile_pair(per_iter_ratios, 2.5, 97.5) nfr_recorder.record_metric( "nft_res_03.envelope_ratio", float(report1.envelope_ratio), ac_id="AC-3", band=f"≥{report1.envelope_ratio_budget:.2f}", ci95_low=ci_low, ci95_high=ci_high, ) nfr_recorder.record_metric( "nft_res_03.master_seed", float(report1.master_seed) ) assert report1.passes_iteration_count, ( f"AC-1: iteration_count={report1.iteration_count} < required " f"{report1.min_iteration_count}" ) assert fingerprint == fingerprint2, ( f"AC-2: determinism fingerprint differs across two evaluations of the " f"same fixture: {fingerprint} vs {fingerprint2}" ) assert report1.passes_envelope, ( f"AC-3: envelope ratio = {report1.envelope_ratio} < budget " f"{report1.envelope_ratio_budget} " f"(covered={report1.covered_samples}/{report1.total_samples})" ) def _full_matrix_enabled() -> bool: return os.environ.get(NFT_RES_03_FULL_MATRIX_ENV_VAR, "").strip() in {"1", "true", "yes"} def _per_iteration_envelope_ratios(report: mce.MonteCarloReport) -> list[float]: """Per-iteration ``covered/frames`` ratios (AZ-446 CI95 input).""" ratios: list[float] = [] for it in report.iterations: if not it.samples: continue covered = sum( 1 for s in it.samples if s.error_m <= mce.ENVELOPE_MULTIPLIER * s.cov_semi_major_m ) ratios.append(covered / len(it.samples)) return ratios def _percentile_pair( values: list[float], q_low: float, q_high: float ) -> tuple[float | None, float | None]: """Linear-interpolation percentiles. Returns ``(None, None)`` if empty.""" if not values: return None, None ordered = sorted(values) if len(ordered) == 1: return float(ordered[0]), float(ordered[0]) def _at(q: float) -> float: rank = (q / 100.0) * (len(ordered) - 1) lo = int(rank) hi = min(lo + 1, len(ordered) - 1) frac = rank - lo return float(ordered[lo] + (ordered[hi] - ordered[lo]) * frac) return _at(q_low), _at(q_high) def _resolve_fixture_path() -> Path: raw = os.environ.get(NFT_RES_03_FIXTURE_ENV_VAR, "").strip() from runner.helpers import sitl_observer root = sitl_observer.replay_dir() if not raw: if root is None: return Path(f"<{NFT_RES_03_FIXTURE_ENV_VAR}-unset>") return root / NFT_RES_03_DEFAULT_FIXTURE_NAME path = Path(raw) if not path.is_absolute() and root is not None: path = root / path return path def _parse_payload( payload: object, fixture_path: Path ) -> tuple[int, list[mce.IterationOutcome]]: if not isinstance(payload, dict): pytest.fail( f"NFT-RES-03: fixture {fixture_path} must be a JSON object; " f"got top-level type={type(payload).__name__}" ) try: master_seed = int(payload["master_seed"]) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-RES-03: fixture {fixture_path} missing/invalid " f"'master_seed': {exc}" ) raw_iters = payload.get("iterations") if not isinstance(raw_iters, list) or not raw_iters: pytest.fail( f"NFT-RES-03: fixture {fixture_path} 'iterations' must be a " f"non-empty list" ) parsed: list[mce.IterationOutcome] = [] for idx, entry in enumerate(raw_iters): if not isinstance(entry, dict): pytest.fail( f"NFT-RES-03: iterations[{idx}] in {fixture_path} must be " f"an object; got {type(entry).__name__}" ) iter_id = str(entry.get("iteration_id") or f"iter-{idx:03d}") try: seed = int(entry["iteration_seed"]) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-RES-03: iterations[{idx}].iteration_seed in " f"{fixture_path} must be int: {exc}" ) raw_samples = entry.get("samples") if not isinstance(raw_samples, list): pytest.fail( f"NFT-RES-03: iterations[{idx}].samples in {fixture_path} " f"must be a list of objects" ) samples: list[mce.FrameSample] = [] for j, s in enumerate(raw_samples): if not isinstance(s, dict): pytest.fail( f"NFT-RES-03: iterations[{idx}].samples[{j}] in " f"{fixture_path} must be an object" ) try: samples.append( mce.FrameSample( error_m=float(s["error_m"]), cov_semi_major_m=float(s["cov_semi_major_m"]), ) ) except (KeyError, TypeError, ValueError) as exc: pytest.fail( f"NFT-RES-03: iterations[{idx}].samples[{j}] in " f"{fixture_path} shape invalid: {exc}" ) parsed.append( mce.IterationOutcome( iteration_id=iter_id, iteration_seed=seed, samples=tuple(samples), ) ) return master_seed, parsed