Files
gps-denied-onboard/e2e/runner/helpers/monte_carlo_envelope_evaluator.py
T
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

229 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Monte Carlo statistical-envelope evaluator for NFT-RES-03 (AZ-434 / AC-NEW-4).
The SUT promises an *honest* covariance: across many runs with seeded
perturbations (gain noise, IMU bias, frame-drop pattern, outlier
injection), the actual error distribution should stay within the
``1.96 × cov_semi_major_m`` envelope at the 95th percentile. The
runner drives N iterations and feeds this module the per-iteration
per-frame ``(error_m, cov_semi_major_m)`` pairs.
ACs evaluated (per AZ-434):
* AC-1 — iteration_count == ``MIN_ITERATION_COUNT`` (100). Partial
completion is a hard failure; the runner is responsible for
re-running missing iterations rather than passing a short list.
* AC-2 — determinism check: re-running with the same master_seed
produces bit-identical iteration outcomes. This module records the
master_seed and a SHA-256 of the per-iteration ``(error_m,
cov_semi_major_m)`` tuples; the scenario harness compares the seed
+ hash across two runs (the comparison itself is not a method on
this evaluator — it's a scenario-level check).
* AC-3 — global aggregate envelope: across all 100 × N_frames
samples, ``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥
0.95``.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
import hashlib
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
MIN_ITERATION_COUNT = 100
ENVELOPE_MULTIPLIER = 1.96 # 95th-percentile envelope on a normal cov_semi_major
ENVELOPE_RATIO_BUDGET = 0.95
@dataclass(frozen=True)
class FrameSample:
"""One per-frame ``(error_m, cov_semi_major_m)`` pair.
``error_m`` is the WGS84 Vincenty distance between the SUT's
estimate and ground truth at that frame; ``cov_semi_major_m`` is
the SUT's self-reported uncertainty semi-major-axis (m) at the
same frame.
"""
error_m: float
cov_semi_major_m: float
@dataclass(frozen=True)
class IterationOutcome:
"""One Monte Carlo iteration: ordered per-frame samples + iteration seed."""
iteration_id: str
iteration_seed: int
samples: tuple[FrameSample, ...]
@property
def frame_count(self) -> int:
return len(self.samples)
@dataclass(frozen=True)
class MonteCarloReport:
"""Aggregate NFT-RES-03 result over N iterations."""
iterations: tuple[IterationOutcome, ...]
master_seed: int
iteration_count: int
total_samples: int
covered_samples: int
envelope_ratio: float | None
min_iteration_count: int
envelope_ratio_budget: float
@property
def passes_iteration_count(self) -> bool:
return self.iteration_count >= self.min_iteration_count
@property
def passes_envelope(self) -> bool:
return (
self.envelope_ratio is not None
and self.envelope_ratio >= self.envelope_ratio_budget
)
@property
def passes(self) -> bool:
return self.passes_iteration_count and self.passes_envelope
def iteration_hash(iteration: IterationOutcome) -> str:
"""SHA-256 of the iteration's ``(error_m, cov_semi_major_m)`` tuples.
Used to certify AC-2 determinism — two runs of the same iteration
with the same iteration_seed must produce the same hash.
"""
h = hashlib.sha256()
h.update(f"{iteration.iteration_id}\n{iteration.iteration_seed}\n".encode("ascii"))
for s in iteration.samples:
h.update(f"{s.error_m!r}|{s.cov_semi_major_m!r}\n".encode("ascii"))
return h.hexdigest()
def determinism_fingerprint(report: MonteCarloReport) -> str:
"""One-shot fingerprint of an entire MC run — for AC-2 cross-run comparison."""
h = hashlib.sha256()
h.update(f"master_seed={report.master_seed}\n".encode("ascii"))
for it in report.iterations:
h.update(f"{iteration_hash(it)}\n".encode("ascii"))
return h.hexdigest()
def evaluate(
iterations: Sequence[IterationOutcome],
*,
master_seed: int,
min_iteration_count: int = MIN_ITERATION_COUNT,
envelope_ratio_budget: float = ENVELOPE_RATIO_BUDGET,
envelope_multiplier: float = ENVELOPE_MULTIPLIER,
) -> MonteCarloReport:
"""Compute the AC-1 + AC-3 verdict.
AC-2 (determinism) is a scenario-level check: the scenario calls
this twice with the same master_seed and compares
``determinism_fingerprint(report1) == determinism_fingerprint(report2)``.
"""
total = 0
covered = 0
for it in iterations:
for s in it.samples:
total += 1
if s.error_m <= envelope_multiplier * s.cov_semi_major_m:
covered += 1
ratio: float | None
if total == 0:
ratio = None
else:
ratio = covered / total
return MonteCarloReport(
iterations=tuple(iterations),
master_seed=master_seed,
iteration_count=len(iterations),
total_samples=total,
covered_samples=covered,
envelope_ratio=ratio,
min_iteration_count=min_iteration_count,
envelope_ratio_budget=envelope_ratio_budget,
)
def write_csv_evidence(out_path: Path, report: MonteCarloReport) -> Path:
"""Aggregate-summary CSV (one row per run)."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"master_seed",
"iteration_count",
"min_iteration_count",
"total_samples",
"covered_samples",
"envelope_ratio",
"envelope_ratio_budget",
"ac1_iteration_count_passes",
"ac3_envelope_passes",
"passes",
"fingerprint_sha256",
]
)
writer.writerow(
[
report.master_seed,
report.iteration_count,
report.min_iteration_count,
report.total_samples,
report.covered_samples,
"" if report.envelope_ratio is None else f"{report.envelope_ratio:.6f}",
f"{report.envelope_ratio_budget:.6f}",
"true" if report.passes_iteration_count else "false",
"true" if report.passes_envelope else "false",
"true" if report.passes else "false",
determinism_fingerprint(report),
]
)
return out_path
def write_per_iteration_csv(out_path: Path, report: MonteCarloReport) -> Path:
"""One row per iteration — used during AC-3 envelope-breach investigation."""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"iteration_id",
"iteration_seed",
"frame_count",
"covered_count",
"envelope_ratio",
"iteration_hash_sha256",
]
)
for it in report.iterations:
covered = sum(
1
for s in it.samples
if s.error_m <= ENVELOPE_MULTIPLIER * s.cov_semi_major_m
)
ratio = (covered / it.frame_count) if it.frame_count else None
writer.writerow(
[
it.iteration_id,
it.iteration_seed,
it.frame_count,
covered,
"" if ratio is None else f"{ratio:.6f}",
iteration_hash(it),
]
)
return out_path