"""Monte Carlo statistical-envelope evaluator for NFT-RES-03 (AZ-434 / AC-NEW-4). The SUT promises an *honest* covariance: across many runs with seeded perturbations (gain noise, IMU bias, frame-drop pattern, outlier injection), the actual error distribution should stay within the ``1.96 × cov_semi_major_m`` envelope at the 95th percentile. The runner drives N iterations and feeds this module the per-iteration per-frame ``(error_m, cov_semi_major_m)`` pairs. ACs evaluated (per AZ-434): * AC-1 — iteration_count == ``MIN_ITERATION_COUNT`` (100). Partial completion is a hard failure; the runner is responsible for re-running missing iterations rather than passing a short list. * AC-2 — determinism check: re-running with the same master_seed produces bit-identical iteration outcomes. This module records the master_seed and a SHA-256 of the per-iteration ``(error_m, cov_semi_major_m)`` tuples; the scenario harness compares the seed + hash across two runs (the comparison itself is not a method on this evaluator — it's a scenario-level check). * AC-3 — global aggregate envelope: across all 100 × N_frames samples, ``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95``. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import hashlib from dataclasses import dataclass from pathlib import Path from typing import Sequence MIN_ITERATION_COUNT = 100 ENVELOPE_MULTIPLIER = 1.96 # 95th-percentile envelope on a normal cov_semi_major ENVELOPE_RATIO_BUDGET = 0.95 @dataclass(frozen=True) class FrameSample: """One per-frame ``(error_m, cov_semi_major_m)`` pair. ``error_m`` is the WGS84 Vincenty distance between the SUT's estimate and ground truth at that frame; ``cov_semi_major_m`` is the SUT's self-reported uncertainty semi-major-axis (m) at the same frame. """ error_m: float cov_semi_major_m: float @dataclass(frozen=True) class IterationOutcome: """One Monte Carlo iteration: ordered per-frame samples + iteration seed.""" iteration_id: str iteration_seed: int samples: tuple[FrameSample, ...] @property def frame_count(self) -> int: return len(self.samples) @dataclass(frozen=True) class MonteCarloReport: """Aggregate NFT-RES-03 result over N iterations.""" iterations: tuple[IterationOutcome, ...] master_seed: int iteration_count: int total_samples: int covered_samples: int envelope_ratio: float | None min_iteration_count: int envelope_ratio_budget: float @property def passes_iteration_count(self) -> bool: return self.iteration_count >= self.min_iteration_count @property def passes_envelope(self) -> bool: return ( self.envelope_ratio is not None and self.envelope_ratio >= self.envelope_ratio_budget ) @property def passes(self) -> bool: return self.passes_iteration_count and self.passes_envelope def iteration_hash(iteration: IterationOutcome) -> str: """SHA-256 of the iteration's ``(error_m, cov_semi_major_m)`` tuples. Used to certify AC-2 determinism — two runs of the same iteration with the same iteration_seed must produce the same hash. """ h = hashlib.sha256() h.update(f"{iteration.iteration_id}\n{iteration.iteration_seed}\n".encode("ascii")) for s in iteration.samples: h.update(f"{s.error_m!r}|{s.cov_semi_major_m!r}\n".encode("ascii")) return h.hexdigest() def determinism_fingerprint(report: MonteCarloReport) -> str: """One-shot fingerprint of an entire MC run — for AC-2 cross-run comparison.""" h = hashlib.sha256() h.update(f"master_seed={report.master_seed}\n".encode("ascii")) for it in report.iterations: h.update(f"{iteration_hash(it)}\n".encode("ascii")) return h.hexdigest() def evaluate( iterations: Sequence[IterationOutcome], *, master_seed: int, min_iteration_count: int = MIN_ITERATION_COUNT, envelope_ratio_budget: float = ENVELOPE_RATIO_BUDGET, envelope_multiplier: float = ENVELOPE_MULTIPLIER, ) -> MonteCarloReport: """Compute the AC-1 + AC-3 verdict. AC-2 (determinism) is a scenario-level check: the scenario calls this twice with the same master_seed and compares ``determinism_fingerprint(report1) == determinism_fingerprint(report2)``. """ total = 0 covered = 0 for it in iterations: for s in it.samples: total += 1 if s.error_m <= envelope_multiplier * s.cov_semi_major_m: covered += 1 ratio: float | None if total == 0: ratio = None else: ratio = covered / total return MonteCarloReport( iterations=tuple(iterations), master_seed=master_seed, iteration_count=len(iterations), total_samples=total, covered_samples=covered, envelope_ratio=ratio, min_iteration_count=min_iteration_count, envelope_ratio_budget=envelope_ratio_budget, ) def write_csv_evidence(out_path: Path, report: MonteCarloReport) -> Path: """Aggregate-summary CSV (one row per run).""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "master_seed", "iteration_count", "min_iteration_count", "total_samples", "covered_samples", "envelope_ratio", "envelope_ratio_budget", "ac1_iteration_count_passes", "ac3_envelope_passes", "passes", "fingerprint_sha256", ] ) writer.writerow( [ report.master_seed, report.iteration_count, report.min_iteration_count, report.total_samples, report.covered_samples, "" if report.envelope_ratio is None else f"{report.envelope_ratio:.6f}", f"{report.envelope_ratio_budget:.6f}", "true" if report.passes_iteration_count else "false", "true" if report.passes_envelope else "false", "true" if report.passes else "false", determinism_fingerprint(report), ] ) return out_path def write_per_iteration_csv(out_path: Path, report: MonteCarloReport) -> Path: """One row per iteration — used during AC-3 envelope-breach investigation.""" out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "iteration_id", "iteration_seed", "frame_count", "covered_count", "envelope_ratio", "iteration_hash_sha256", ] ) for it in report.iterations: covered = sum( 1 for s in it.samples if s.error_m <= ENVELOPE_MULTIPLIER * s.cov_semi_major_m ) ratio = (covered / it.frame_count) if it.frame_count else None writer.writerow( [ it.iteration_id, it.iteration_seed, it.frame_count, covered, "" if ratio is None else f"{ratio:.6f}", iteration_hash(it), ] ) return out_path