diff --git a/_docs/03_implementation/batch_86_report.md b/_docs/03_implementation/batch_86_report.md new file mode 100644 index 0000000..d36ce83 --- /dev/null +++ b/_docs/03_implementation/batch_86_report.md @@ -0,0 +1,151 @@ +# Batch 86 — AZ-432 + AZ-433 + AZ-434 + AZ-435 (Resilience NFTs) + +**Tracker**: AZ-432, AZ-433, AZ-434, AZ-435 +**Tasks**: 4 tasks / 14 complexity points (3 + 3 + 5 + 3) +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS +**Review**: `_docs/03_implementation/reviews/batch_86_review.md` + +## Scope + +- **AZ-432 / NFT-RES-01 (AC-3.5 + AC-NEW-7)** — 30 s pure-vision-blackout drift bound; two sub-cases (`no_imu` ≤100 m, `good_imu_combined_factor` ≤50 m); Tier-1 OR Tier-2. +- **AZ-433 / NFT-RES-02 (AC-5.2 + AC-5.3)** — Mid-flight Docker/systemd restart; resume ≤30 s + first-emission accuracy ≤100 m; Tier-1 OR Tier-2. +- **AZ-434 / NFT-RES-03 (AC-NEW-4)** — 100-iteration Monte Carlo statistical envelope; AC-1 (N≥100) + AC-2 (master-seed determinism) + AC-3 (`count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95`); canonical-param by default, `E2E_NFT_RES_03_FULL_MATRIX=1` unlocks full matrix. +- **AZ-435 / NFT-RES-04 (AC-NEW-8 escalation)** — 35 s blackout+spoof full ladder; AC-1 (cov-2d → fix-degrade ≤500 ms) + AC-2 (failsafe trigger → 999+STATUSTEXT ≤500 ms) + AC-ORDER (cov-2d strictly precedes failsafe trigger). + +## Files + +### Created (12 files) + +- `e2e/runner/helpers/imu_fallback_drift_evaluator.py` — sub-case drift evaluator (no_imu / good_imu_combined_factor) with window-in-spec guard. +- `e2e/runner/helpers/companion_reboot_evaluator.py` — restart-trigger + resume-time + first-emission-accuracy verdicts from one captured `RestartEvidence`. +- `e2e/runner/helpers/monte_carlo_envelope_evaluator.py` — iteration-count + envelope-ratio + SHA-256 determinism fingerprint. +- `e2e/runner/helpers/escalation_ladder_evaluator.py` — cov-2d/cov-500/duration triggers, latency budgets, strict ordering. +- `e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py` — NFT-RES-01 scenario (Tier-1/2; fixture-consumer). +- `e2e/tests/resilience/test_nft_res_02_companion_reboot.py` — NFT-RES-02 scenario (Tier-1/2; fixture-consumer). +- `e2e/tests/resilience/test_nft_res_03_monte_carlo.py` — NFT-RES-03 scenario (Tier-1/2; canonical-only by default; full-matrix gated). +- `e2e/tests/resilience/test_nft_res_04_blackout_escalation.py` — NFT-RES-04 scenario (Tier-1/2; fixture-consumer; sibling of FT-N-04). +- `e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py` — 16 unit tests. +- `e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py` — 19 unit tests. +- `e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py` — 15 unit tests. +- `e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py` — 24 unit tests. + +### Modified + +- `e2e/_unit_tests/test_directory_layout.py` — registered 8 new paths. + +## Test Results + +``` +$ pytest e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py \ + e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py \ + e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py \ + e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py \ + e2e/_unit_tests/test_directory_layout.py +================ 199 passed in 0.69s ================ +``` + +Scenario collection (24 cases, all parameterised): + +``` +$ pytest e2e/tests/resilience/ --collect-only -p no:csv --evidence-out=/tmp/e2e-test-evidence +collected 24 items + test_nft_res_01_imu_only_fallback: 6 cases + test_nft_res_02_companion_reboot: 6 cases + test_nft_res_03_monte_carlo: 6 cases + test_nft_res_04_blackout_escalation: 6 cases +``` + +Scenario smoke (all 24 skip cleanly with rich diagnostic messages): + +``` +24 skipped in 0.17s +``` + +Skip breakdown: +- 13 skip-on-`sitl_replay_ready=False` (no `E2E_SITL_REPLAY_DIR` locally — expected pattern). +- 8 skip-on-`vins_mono` (research-build-only per D-C1-1-SUB-A — conftest applies on production builds). +- 3 skip-on-non-canonical-param for NFT-RES-03 (AC-4 default canonical-only; unlock with `E2E_NFT_RES_03_FULL_MATRIX=1`). + +## AC Verification + +### AZ-432 / NFT-RES-01 + +| AC | Coverage | +|----|----------| +| AC-1 30 s window injected | `BlackoutWindow.window_in_spec` (±2 s tolerance) + 3 unit tests + scenario gate | +| AC-2 no-IMU drift ≤ 100 m | `evaluate_subcase(... no_imu).passes` + 2 unit tests + scenario AC-2 assert | +| AC-3 good-IMU drift ≤ 50 m | `evaluate_subcase(... good_imu_combined_factor).passes` + 2 unit tests + scenario AC-3 assert | +| AC-4 parameterization | 6 collected variants (fc_adapter × vio_strategy) | + +### AZ-433 / NFT-RES-02 + +| AC | Coverage | +|----|----------| +| AC-1 restart trigger ≤ 5 s | `passes_restart_trigger` + 4 unit tests + scenario AC-1 assert | +| AC-2 resume time ≤ 30 s | `passes_resume_time` + 4 unit tests + scenario AC-2 assert | +| AC-3 first-emission accuracy ≤ 100 m | `passes_first_emission_accuracy` + 5 unit tests + scenario AC-3 assert | +| AC-4 parameterization | 6 collected variants | + +### AZ-434 / NFT-RES-03 + +| AC | Coverage | +|----|----------| +| AC-1 N ≥ 100 iterations | `passes_iteration_count` + 3 unit tests + scenario AC-1 assert | +| AC-2 master-seed determinism | `determinism_fingerprint` + 4 unit tests + scenario dual-evaluate AC-2 assert | +| AC-3 envelope ratio ≥ 0.95 | `passes_envelope` + 6 unit tests + scenario AC-3 assert | +| AC-4 parameterization | Canonical (ardupilot, okvis2) by default; full matrix via `E2E_NFT_RES_03_FULL_MATRIX=1` | + +### AZ-435 / NFT-RES-04 + +| AC | Coverage | +|----|----------| +| AC-1 100 m → fix-degrade ≤ 500 ms | `fix_degrade.passes` + 4 unit tests + scenario AC-1 assert | +| AC-2 500 m / 30 s → 999+STATUSTEXT ≤ 500 ms | `failsafe.passes` + 6 unit tests + scenario AC-2 assert | +| AC-ORDER cov-2d strictly precedes failsafe | `ordering.passes` + 3 unit tests + scenario AC-ORDER assert | +| AC-3 parameterization | 6 collected variants | + +`traces_to` markers: +- NFT-RES-01: `AC-3.5,AC-NEW-7,AC-1,AC-2,AC-3,AC-4` +- NFT-RES-02: `AC-5.2,AC-5.3,AC-1,AC-2,AC-3,AC-4` +- NFT-RES-03: `AC-NEW-4,AC-1,AC-2,AC-3,AC-4` +- NFT-RES-04: `AC-NEW-8,AC-1,AC-2,AC-3` + +## Code Review + +**Verdict**: PASS_WITH_WARNINGS — 0 Critical, 0 High, 0 Medium, 5 Low. + +- **F1 (Low / Maintainability — carry-over of batch-85 F4)**: `write_csv_evidence` boilerplate now in 8 evaluators. Future hygiene PBI. +- **F2 (Low / Spec-Gap surfacing)**: AZ-432 sub-case (a) needs SUT-side disable-IMU path OR empty IMU stream from FC inbound proxy — production dep on AZ-595. +- **F3 (Low / Spec-Gap surfacing)**: AZ-433 process-restart observation needs runner-side health-probe; AZ-444 owns Tier-2, Tier-1 needs docker-compose healthcheck wiring. +- **F4 (Low / Maintainability)**: `_resolve_fixture_path` duplicated across 4 new scenarios (matches NFT-PERF pattern from batch 85). Future hygiene PBI. +- **F5 (Low / Maintainability — intentional)**: `escalation_ladder_evaluator` thresholds intentionally re-defined locally rather than imported from `blackout_spoof_evaluator`. Documented; will be cited if a future review proposes to "DRY" them. + +Full review: `_docs/03_implementation/reviews/batch_86_review.md`. + +## Production Dependencies + +Surfaced for the cumulative review window (85-87) + traceability matrix: + +1. **AZ-595 (fixture builder)**: emit `nft_res_01_imu_fallback.json` (both sub-cases × 30 s blackout × estimate+GT samples), `nft_res_02_companion_reboot.json` (restart-command + process-restarted + first-post-restart-emission + GT-at-emission timestamps), `nft_res_03_monte_carlo.json` (master_seed + 100 iterations × per-frame (error_m, cov_semi_major_m)), `nft_res_04_blackout_escalation.json` (35 s window + estimate stream with cov/horiz/fix_type + STATUSTEXT stream). +2. **AZ-444 (Tier-2 runner)**: per-iteration clean-state lifecycle for NFT-RES-03 (fdr-output volume wipe + SUT cold restart × 100); systemd watchdog observation for NFT-RES-02 process-restart timing. +3. **AZ-595 + SUT**: SUT-side `no_imu` config path OR FC-proxy empty IMU stream injection for AZ-432 sub-case (a). +4. **SUT-side**: outbound stream MUST carry `cov_semi_major_m`, `horiz_accuracy`, and `fix_type` per-frame for NFT-RES-04 to detect the ladder. Existing FT-N-04 already requires the first two; `fix_type` is new for NFT-RES-04 AC-1 (MAVLink `GPS_INPUT.fix_type ≤ 2` for AP, equivalent for iNav). +5. **Already exists**: `sitl_replay_ready` fixture, `sitl_observer.replay_dir()`, `evidence_dir`, `nfr_recorder` (AZ-406/AZ-445), `geo.distance_m` (AZ-407), conftest `fc_adapter` / `vio_strategy` / `vins_mono` skip rules. + +## Architecture Compliance + +- All new files under `e2e/`, owned by the Blackbox Tests cross-cutting component per `_docs/02_document/module-layout.md`. +- No imports from `src/gps_denied_onboard` (verified — only `runner.helpers.geo`, `runner.helpers.sitl_observer`, `pyproj`, stdlib). +- No new cyclic dependencies. New evaluators are leaves of the import DAG. +- No new infrastructure libraries. + +## Sub-step Trace + +Phases executed per `implement/SKILL.md`: +- phase 5 (load-spec) → 4 task specs read +- phase 6 (implement-tasks-sequentially) → helpers + scenarios + unit tests for all 4 tasks +- phase 7 (verify-ac-coverage) → ACs traced above +- phase 8 (code-review) → batch_86_review.md (PASS_WITH_WARNINGS) +- phase 8.5 (cumulative-review) → defer to batch 87 (K=3 window 85-87) +- phase 11 (commit-batch) → next. diff --git a/_docs/03_implementation/reviews/batch_86_review.md b/_docs/03_implementation/reviews/batch_86_review.md new file mode 100644 index 0000000..e764337 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_86_review.md @@ -0,0 +1,118 @@ +# Code Review Report — Batch 86 + +**Batch**: 86 (AZ-432 + AZ-433 + AZ-434 + AZ-435 — Resilience NFTs) +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Low | Maintainability | `e2e/runner/helpers/{imu_fallback_drift,companion_reboot,monte_carlo_envelope,escalation_ladder}_evaluator.py` | `write_csv_evidence` duplication continues — now 8 evaluators (carry-over of batch-85 F4) | +| 2 | Low | Spec-Gap (surfacing) | `e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py` | Sub-case (a) "no IMU" needs SUT-side disable-IMU config OR empty IMU stream; production dep on AZ-595 | +| 3 | Low | Spec-Gap (surfacing) | `e2e/tests/resilience/test_nft_res_02_companion_reboot.py` | Process-restart observation requires runner-side health probe; AZ-444 owns Tier-2, Tier-1 needs docker-compose healthcheck wiring | +| 4 | Low | Maintainability | `e2e/tests/resilience/test_nft_res_0{1,2,3,4}_*.py` | `_resolve_fixture_path` duplicated across 4 new scenarios (matches NFT-PERF pattern) | +| 5 | Low | Maintainability (intentional) | `e2e/runner/helpers/escalation_ladder_evaluator.py` | Thresholds (`COV_2D_THRESHOLD_M`, `COV_FAILSAFE_THRESHOLD_M`, etc.) deliberately re-defined locally rather than imported from `blackout_spoof_evaluator` | + +### Finding Details + +**F1: `write_csv_evidence` duplication continues** (Low / Maintainability — carry-over) +- Location: `imu_fallback_drift_evaluator.py`, `companion_reboot_evaluator.py`, `monte_carlo_envelope_evaluator.py`, `escalation_ladder_evaluator.py` +- Description: Batch-85 surfaced this on 4 evaluators (F4); batch 86 adds 4 more. The pattern is: open file, write header row, write data row(s), close. All 8 evaluators implement it independently. +- Suggestion: Continued hygiene PBI — extract a `_emit_csv(path, header, rows)` helper. Not blocking; will be surfaced again in the cumulative review of batches 85-87. +- Task: AZ-432 / AZ-433 / AZ-434 / AZ-435 + +**F2: Sub-case (a) "no IMU" needs SUT-side disable-IMU path** (Low / Spec-Gap — surfacing) +- Location: `e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py` +- Description: AZ-432 sub-case (a) requires the SUT to run with IMU input disabled. The task spec calls out two options: (i) SUT config flag `E2E_NFT_RES_01_DISABLE_IMU=1` (or equivalent) propagated to the SUT, or (ii) empty IMU stream from the FC inbound proxy. Neither path is wired in production code today; the fixture builder (AZ-595) is responsible for setting up the variant that produces the captured no-IMU estimates. +- Suggestion: Track as production dependency for AZ-595 — the SITL replay builder must emit a `no_imu` sub-case capture using one of those two strategies and label it accordingly in the fixture JSON. +- Task: AZ-432 + +**F3: Process-restart observation needs runner-side wiring** (Low / Spec-Gap — surfacing) +- Location: `e2e/tests/resilience/test_nft_res_02_companion_reboot.py` +- Description: AZ-433 AC-1 (process restarts within ≤5 s) requires the runner to observe the moment the SUT process is back up. On Tier-2, AZ-444 owns the systemd watchdog observation. On Tier-1, the equivalent docker-compose `healthcheck` transition (`unhealthy → healthy`) is not yet captured into the fixture JSON. The scenario tolerates `process_restarted_monotonic_ms = null` (AC-1 fails loud), but the *positive* path requires the harness. +- Suggestion: Track as production dependency for AZ-444 (Tier-2) + AZ-595 (Tier-1 docker-compose healthcheck capture). +- Task: AZ-433 + +**F4: `_resolve_fixture_path` duplicated across scenarios** (Low / Maintainability) +- Location: `test_nft_res_0{1,2,3,4}_*.py` +- Description: Each scenario re-implements the env-var-or-default fixture path resolver (`E2E_NFT_RES_NN_FIXTURE` → absolute or relative to `E2E_SITL_REPLAY_DIR`). The NFT-PERF batch (85) introduced the same pattern across 4 scenarios. Now 8 scenarios duplicate it. +- Suggestion: Future hygiene PBI — extract a `runner.helpers.fixture_resolver.resolve(env_var, default_name)` utility. Not blocking; isolation per scenario keeps each test self-contained today. +- Task: AZ-432 / AZ-433 / AZ-434 / AZ-435 + +**F5: Thresholds intentionally re-defined locally** (Low / Maintainability — intentional) +- Location: `e2e/runner/helpers/escalation_ladder_evaluator.py:25-40` +- Description: `COV_2D_THRESHOLD_M = 100.0`, `COV_FAILSAFE_THRESHOLD_M = 500.0`, `DURATION_FAILSAFE_S = 30.0`, `FIX_TYPE_2D = 2`, `HORIZ_ACCURACY_FAILSAFE = 999.0`, `STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE"`, `ESCALATION_LATENCY_MS = 500` are also defined in `blackout_spoof_evaluator.py` under the same numeric values. The duplication is **intentional** — NFT-RES-04 and FT-N-04 must continue to fail loud if either evaluator's contract drifts away from the other. A shared constants module would silently propagate one team's edit into both AC suites. +- Suggestion: NONE. Documented in `escalation_ladder_evaluator.py` module docstring. If a future cumulative review proposes to "DRY" these, reject and re-cite this finding. +- Task: AZ-435 + +## Phase Notes + +### Phase 1 — Context + +All 4 task specs read; ACs walked through against helpers + scenarios. Architecture-compliance file (`module-layout.md` § blackbox_tests) re-read to confirm `e2e/**` ownership envelope. + +### Phase 2 — Spec Compliance + +| Task | AC | Evidence | +|------|----|----------| +| AZ-432 | AC-1 30 s window | `imu_fallback_drift_evaluator.BlackoutWindow.window_in_spec` + scenario AC-1 assert | +| AZ-432 | AC-2 no-IMU drift ≤ 100 m | `evaluate_subcase(... SUBCASE_NO_IMU).passes` + 2 unit tests + scenario AC-2 assert | +| AZ-432 | AC-3 good-IMU drift ≤ 50 m | `evaluate_subcase(... SUBCASE_GOOD_IMU).passes` + 2 unit tests + scenario AC-3 assert | +| AZ-432 | AC-4 parameterization | 6 collected variants per scenario | +| AZ-433 | AC-1 restart trigger ≤ 5 s | `companion_reboot_evaluator.passes_restart_trigger` + 4 unit tests + scenario AC-1 assert | +| AZ-433 | AC-2 resume time ≤ 30 s | `passes_resume_time` + 4 unit tests + scenario AC-2 assert | +| AZ-433 | AC-3 accuracy ≤ 100 m | `passes_first_emission_accuracy` + 5 unit tests + scenario AC-3 assert | +| AZ-433 | AC-4 parameterization | 6 collected variants | +| AZ-434 | AC-1 N ≥ 100 | `monte_carlo_envelope_evaluator.passes_iteration_count` + 3 unit tests + scenario AC-1 assert | +| AZ-434 | AC-2 determinism | `determinism_fingerprint` + 4 unit tests + scenario dual-evaluate AC-2 assert | +| AZ-434 | AC-3 envelope ≥ 95 % | `passes_envelope` + 6 unit tests + scenario AC-3 assert | +| AZ-434 | AC-4 parameterization | Canonical-only by default; `E2E_NFT_RES_03_FULL_MATRIX=1` unlocks full 6 variants | +| AZ-435 | AC-1 100 m → fix-degrade ≤ 500 ms | `escalation_ladder_evaluator.fix_degrade.passes` + 4 unit tests + scenario AC-1 assert | +| AZ-435 | AC-2 500 m/30 s → 999+STATUSTEXT ≤ 500 ms | `failsafe.passes` + 6 unit tests + scenario AC-2 assert | +| AZ-435 | AC-ORDER cov-2d strictly precedes failsafe | `ordering.passes` + 3 unit tests + scenario AC-ORDER assert | +| AZ-435 | AC-3 parameterization | 6 collected variants | + +All ACs PASS. + +### Phase 3 — Code Quality + +- SRP: each evaluator owns one resilience concern (drift / restart / envelope / ladder). Scenarios are thin adapters. +- Error handling: explicit `ValueError` on bad helper input (negative latency, unknown sub-case); `pytest.fail()` on bad fixture shape; no bare `except`. +- Naming: clear (`SubCaseReport`, `RestartEvidence`, `IterationOutcome`, `EscalationLadderReport`); follows project conventions. +- Complexity: longest function is `escalation_ladder_evaluator.evaluate` (~60 lines), all sequential helper calls — acceptable. +- DRY: F1 + F4 + F5 noted above. +- Test quality: every helper public function has ≥3 unit tests covering happy path, boundary, failure; total 74 new unit tests (16 + 19 + 15 + 24). + +### Phase 4 — Security Quick-Scan + +- No SQL, no `subprocess(shell=True)`, no `eval/exec`, no hardcoded secrets, no insecure deserialization (JSON loads only with explicit shape validation). +- Pass. + +### Phase 5 — Performance Scan + +- All evaluators are O(n) single-pass sweeps over their inputs. +- No nested loops, no N+1 patterns, no blocking I/O in hot paths. +- Pass. + +### Phase 6 — Cross-Task Consistency + +- All 4 helpers share `geo.distance_m` (consistent with batch-85 + earlier pattern). +- All 4 scenarios use the same skip pattern (`sitl_replay_ready` → `pytest.skip`; missing fixture → `pytest.fail`). +- All 4 scenarios use `nfr_recorder.record_metric` with `ac_id` kwarg consistently. +- F5 documents the one intentional duplication (escalation-ladder thresholds vs blackout-spoof thresholds). + +### Phase 7 — Architecture Compliance + +- All new files under `e2e/**` per `module-layout.md` § blackbox_tests `Owns` glob. +- No imports from `src/gps_denied_onboard/**` (verified — only `runner.helpers.geo`, `runner.helpers.sitl_observer`, `pyproj`, stdlib). +- No new cyclic dependencies. New evaluators are leaves of the import DAG. +- No new infrastructure libraries. + +## Auto-Fix Gate + +0 Critical, 0 High, 0 Medium, 5 Low. All Low findings are either intentional, surfacing, or future hygiene. No auto-fix attempts triggered; no escalation. + +## Verdict + +**PASS_WITH_WARNINGS**. diff --git a/e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py b/e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py new file mode 100644 index 0000000..ff24e20 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py @@ -0,0 +1,278 @@ +"""Unit tests for ``runner.helpers.companion_reboot_evaluator`` (AZ-433 / NFT-RES-02).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import companion_reboot_evaluator as cre + +REF_LAT = 49.9 +REF_LON = 36.3 + + +def _offset_lat(lat: float, north_m: float) -> float: + return lat + north_m / 111_320.0 + + +def _evidence( + *, + restart_command_ms: int = 60_000, + process_restarted_ms: int | None = 62_000, + first_emission_ms: int | None = 80_000, + estimate_offset_m: float | None = 0.0, + gt_offset_m: float | None = 0.0, +) -> cre.RestartEvidence: + estimate = ( + None + if estimate_offset_m is None + else cre.GeoFix( + monotonic_ms=first_emission_ms or 0, + lat_deg=_offset_lat(REF_LAT, estimate_offset_m), + lon_deg=REF_LON, + ) + ) + gt = ( + None + if gt_offset_m is None + else cre.GeoFix( + monotonic_ms=first_emission_ms or 0, + lat_deg=_offset_lat(REF_LAT, gt_offset_m), + lon_deg=REF_LON, + ) + ) + return cre.RestartEvidence( + restart_command_monotonic_ms=restart_command_ms, + process_restarted_monotonic_ms=process_restarted_ms, + first_post_restart_emission_monotonic_ms=first_emission_ms, + first_post_restart_estimate=estimate, + ground_truth_at_first_emission=gt, + ) + + +# ───────────────────────── happy path ───────────────────────── + + +def test_evaluate_clean_run_passes_all_acs() -> None: + # Arrange — 2 s restart latency, 20 s resume, 50 m drift. + evidence = _evidence( + process_restarted_ms=62_000, + first_emission_ms=80_000, + estimate_offset_m=50.0, + gt_offset_m=0.0, + ) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.restart_trigger_latency_s == pytest.approx(2.0) + assert report.resume_time_s == pytest.approx(20.0) + assert report.first_emission_accuracy_m == pytest.approx(50.0, abs=0.5) + assert report.passes_restart_trigger + assert report.passes_resume_time + assert report.passes_first_emission_accuracy + assert report.passes + + +# ───────────────────────── AC-1: restart trigger ───────────────────────── + + +def test_ac1_at_budget_passes() -> None: + # Arrange — restart latency = exactly 5 s. + evidence = _evidence(process_restarted_ms=60_000 + 5000) + + # Act + Assert + assert cre.evaluate(evidence).passes_restart_trigger + + +def test_ac1_above_budget_fails() -> None: + # Arrange + evidence = _evidence(process_restarted_ms=60_000 + 6000) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.restart_trigger_latency_s == pytest.approx(6.0) + assert not report.passes_restart_trigger + assert not report.passes + + +def test_ac1_process_never_restarts_fails() -> None: + # Arrange + evidence = _evidence(process_restarted_ms=None) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.restart_trigger_latency_s is None + assert not report.passes_restart_trigger + assert not report.passes + + +def test_ac1_negative_latency_raises() -> None: + # Arrange — clock skew bug. + evidence = _evidence(process_restarted_ms=60_000 - 1000) + + # Assert + with pytest.raises(ValueError): + cre.evaluate(evidence) + + +# ───────────────────────── AC-2: resume time ───────────────────────── + + +def test_ac2_at_budget_passes() -> None: + # Arrange — resume = exactly 30 s. + evidence = _evidence(first_emission_ms=60_000 + 30_000) + + # Act + Assert + assert cre.evaluate(evidence).passes_resume_time + + +def test_ac2_above_budget_fails() -> None: + # Arrange — resume = 31 s. + evidence = _evidence(first_emission_ms=60_000 + 31_000) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.resume_time_s == pytest.approx(31.0) + assert not report.passes_resume_time + + +def test_ac2_no_emission_fails() -> None: + # Arrange + evidence = _evidence(first_emission_ms=None, estimate_offset_m=None, gt_offset_m=None) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.resume_time_s is None + assert not report.passes_resume_time + assert not report.passes + + +def test_ac2_negative_resume_raises() -> None: + # Arrange — emission before restart command. + evidence = _evidence(first_emission_ms=59_000) + + # Assert + with pytest.raises(ValueError): + cre.evaluate(evidence) + + +# ───────────────────────── AC-3: first-emission accuracy ───────────────────────── + + +def test_ac3_within_budget_passes() -> None: + # Arrange — 80 m drift, budget 100 m. + evidence = _evidence(estimate_offset_m=80.0, gt_offset_m=0.0) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.first_emission_accuracy_m == pytest.approx(80.0, abs=0.5) + assert report.passes_first_emission_accuracy + + +def test_ac3_at_budget_passes() -> None: + # Arrange — exactly 100 m. + evidence = _evidence(estimate_offset_m=100.0, gt_offset_m=0.0) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.passes_first_emission_accuracy + + +def test_ac3_above_budget_fails() -> None: + # Arrange — 120 m drift. + evidence = _evidence(estimate_offset_m=120.0, gt_offset_m=0.0) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert not report.passes_first_emission_accuracy + assert not report.passes + + +def test_ac3_missing_estimate_fails() -> None: + # Arrange + evidence = _evidence(estimate_offset_m=None) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.first_emission_accuracy_m is None + assert not report.passes_first_emission_accuracy + + +def test_ac3_missing_gt_fails() -> None: + # Arrange + evidence = _evidence(gt_offset_m=None) + + # Act + report = cre.evaluate(evidence) + + # Assert + assert report.first_emission_accuracy_m is None + assert not report.passes_first_emission_accuracy + + +# ───────────────────────── custom budgets ───────────────────────── + + +def test_custom_budgets_apply() -> None: + # Arrange — 8 s restart latency (above default 5s) but custom 10s budget passes. + evidence = _evidence(process_restarted_ms=68_000) + + # Act + report = cre.evaluate(evidence, restart_trigger_budget_s=10.0) + + # Assert + assert report.restart_trigger_budget_s == 10.0 + assert report.passes_restart_trigger + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_emits_summary(tmp_path: Path) -> None: + # Arrange + evidence = _evidence(estimate_offset_m=50.0, gt_offset_m=0.0) + report = cre.evaluate(evidence) + out_path = tmp_path / "nft-res-02.csv" + + # Act + cre.write_csv_evidence(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 2 + assert rows[0].startswith("restart_trigger_latency_s") + assert "ac1_passes" in rows[0] + assert "ac2_passes" in rows[0] + assert "ac3_passes" in rows[0] + + +def test_write_csv_evidence_creates_parent_dir(tmp_path: Path) -> None: + # Arrange + evidence = _evidence() + report = cre.evaluate(evidence) + out_path = tmp_path / "deep" / "nest" / "out.csv" + + # Act + cre.write_csv_evidence(out_path, report) + + # Assert + assert out_path.exists() diff --git a/e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py b/e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py new file mode 100644 index 0000000..4010c1e --- /dev/null +++ b/e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py @@ -0,0 +1,348 @@ +"""Unit tests for ``runner.helpers.escalation_ladder_evaluator`` (AZ-435 / NFT-RES-04).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import escalation_ladder_evaluator as ele + + +def _window(duration_s: float = 35.0, onset_ms: int = 60_000) -> ele.BlackoutWindow: + return ele.BlackoutWindow( + onset_monotonic_ms=onset_ms, + end_monotonic_ms=onset_ms + int(duration_s * 1000), + ) + + +def _est(t_ms: int, *, cov: float, horiz: float = 5.0, fix_type: int = 3) -> ele.EstimateSample: + return ele.EstimateSample( + monotonic_ms=t_ms, + cov_semi_major_m=cov, + horiz_accuracy=horiz, + fix_type=fix_type, + ) + + +def _st(t_ms: int, text: str) -> ele.StatustextSample: + return ele.StatustextSample(monotonic_ms=t_ms, text=text) + + +# ───────────────────────── BlackoutWindow ───────────────────────── + + +def test_window_is_35s_exact_passes() -> None: + assert _window(35.0).is_35s + + +@pytest.mark.parametrize("duration", [33.5, 36.5]) +def test_window_is_35s_within_tolerance(duration: float) -> None: + assert _window(duration).is_35s + + +@pytest.mark.parametrize("duration", [32.0, 38.0]) +def test_window_is_35s_outside_tolerance(duration: float) -> None: + assert not _window(duration).is_35s + + +# ───────────────────────── AC-1 fix degrade ───────────────────────── + + +def test_ac1_fix_degrades_within_budget_passes() -> None: + # Arrange — cov crosses 100 m at t=70s, fix drops at t=70.3s (300ms latency). + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 5000, cov=50.0, fix_type=3), + _est(w.onset_monotonic_ms + 10_000, cov=100.0, fix_type=3), # crossing + _est(w.onset_monotonic_ms + 10_300, cov=120.0, fix_type=2), # degrade + ] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.fix_degrade.cov2d_crossed_at_ms == w.onset_monotonic_ms + 10_000 + assert r.fix_degrade.fix_degraded_at_ms == w.onset_monotonic_ms + 10_300 + assert r.fix_degrade.latency_ms == 300 + assert r.fix_degrade.passes + + +def test_ac1_fix_degrade_above_budget_fails() -> None: + # Arrange — fix drops 700ms after crossing (>500ms budget). + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 10_000, cov=100.0, fix_type=3), + _est(w.onset_monotonic_ms + 10_700, cov=120.0, fix_type=2), + ] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.fix_degrade.latency_ms == 700 + assert not r.fix_degrade.passes + + +def test_ac1_no_crossing_fails() -> None: + # Arrange — cov never reaches 100. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 5000, cov=50.0)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.fix_degrade.cov2d_crossed_at_ms is None + assert not r.fix_degrade.passes + + +def test_ac1_crossing_but_no_fix_degrade_fails() -> None: + # Arrange + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 10_000, cov=100.0, fix_type=3), + _est(w.onset_monotonic_ms + 11_000, cov=120.0, fix_type=3), # stays at 3 + ] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.fix_degrade.cov2d_crossed_at_ms is not None + assert r.fix_degrade.fix_degraded_at_ms is None + assert not r.fix_degrade.passes + + +# ───────────────────────── AC-2 failsafe ───────────────────────── + + +def test_ac2_cov500_trigger_passes_within_budget() -> None: + # Arrange — cov crosses 500 at t=25s; horiz=999 and STATUSTEXT both within 400ms. + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 25_000, cov=500.0, horiz=200.0, fix_type=2), # trigger + _est(onset + 25_400, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 25_400, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert + assert r.failsafe.failsafe_trigger_at_ms == onset + 25_000 + assert r.failsafe.horiz_999_latency_ms == 400 + assert r.failsafe.statustext_latency_ms == 400 + assert r.failsafe.passes + + +def test_ac2_duration_trigger_used_when_cov500_never_crossed() -> None: + # Arrange — cov stays below 500; window crosses 30s at +30s. + w = _window(35.0) + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 10_000, cov=120.0, fix_type=2), + _est(onset + 30_000, cov=200.0, fix_type=2), # duration trigger fires + _est(onset + 30_400, cov=200.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 30_300, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert — trigger at onset + 30s. + assert r.failsafe.failsafe_trigger_at_ms == onset + 30_000 + assert r.failsafe.horiz_999_latency_ms == 400 + assert r.failsafe.statustext_latency_ms == 300 + assert r.failsafe.passes + + +def test_ac2_uses_earliest_of_cov500_and_duration_trigger() -> None: + # Arrange — cov crosses 500 at +20s, duration trigger at +30s. Earliest wins. + w = _window(35.0) + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 20_000, cov=500.0, fix_type=2), # cov trigger + _est(onset + 20_300, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 20_400, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert — picks the cov trigger (earlier than duration trigger). + assert r.failsafe.failsafe_trigger_at_ms == onset + 20_000 + + +def test_ac2_horiz_999_late_fails() -> None: + # Arrange — 999 arrives 700ms after trigger. + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 25_000, cov=500.0, fix_type=2), + _est(onset + 25_700, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 25_300, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert + assert r.failsafe.horiz_999_latency_ms == 700 + assert not r.failsafe.horiz_999_passes + assert not r.failsafe.passes + + +def test_ac2_missing_statustext_fails() -> None: + # Arrange + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 25_000, cov=500.0, fix_type=2), + _est(onset + 25_300, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.failsafe.statustext_at_ms is None + assert not r.failsafe.statustext_passes + assert not r.failsafe.passes + + +def test_ac2_no_trigger_fails() -> None: + # Arrange — cov never crosses 500 and window < 30s. + w = _window(20.0) + estimates = [_est(w.onset_monotonic_ms + 5000, cov=120.0, fix_type=2)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert r.failsafe.failsafe_trigger_at_ms is None + assert not r.failsafe.passes + + +# ───────────────────────── AC-ORDER ───────────────────────── + + +def test_ac_order_cov2d_before_failsafe_passes() -> None: + # Arrange — cov-2d at t=+10s; failsafe trigger at t=+25s. + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 10_000, cov=100.0, fix_type=3), + _est(onset + 10_300, cov=120.0, fix_type=2), + _est(onset + 25_000, cov=500.0, fix_type=2), + _est(onset + 25_300, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 25_400, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert + assert r.ordering.passes + assert r.passes + + +def test_ac_order_simultaneous_crossings_fails_ordering() -> None: + # Arrange — single sample crosses BOTH 100m and 500m at the same instant. + # By contract this is a strict-monotonicity violation: the SUT cannot + # publish a covariance > 500 m before the AC-1 escalation fired on the + # 100 m crossing. + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 25_000, cov=500.0, fix_type=2), # both crossings + _est(onset + 25_300, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 25_300, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert — cov2d_at == failsafe_trigger_at; ordering requires strict <. + assert r.ordering.cov2d_at_ms == r.ordering.failsafe_trigger_at_ms + assert not r.ordering.passes + assert not r.passes + + +def test_ac_order_missing_pole_fails_ordering() -> None: + # Arrange — only cov-2d crosses; no failsafe trigger. + w = _window(20.0) + estimates = [ + _est(w.onset_monotonic_ms + 5000, cov=100.0, fix_type=3), + _est(w.onset_monotonic_ms + 5300, cov=120.0, fix_type=2), + ] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=[]) + + # Assert + assert not r.ordering.passes + + +# ───────────────────────── aggregate ───────────────────────── + + +def test_aggregate_passes_when_all_ac_pass() -> None: + # Arrange — full clean escalation. + w = _window() + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 10_000, cov=100.0, fix_type=3), + _est(onset + 10_200, cov=120.0, fix_type=2), + _est(onset + 25_000, cov=500.0, fix_type=2), + _est(onset + 25_200, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 25_300, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert + assert r.passes + + +def test_aggregate_fails_window_out_of_spec() -> None: + # Arrange — 25 s window (under tolerance). + w = _window(25.0) + onset = w.onset_monotonic_ms + estimates = [ + _est(onset + 5_000, cov=100.0, fix_type=3), + _est(onset + 5_200, cov=120.0, fix_type=2), + _est(onset + 15_000, cov=500.0, fix_type=2), + _est(onset + 15_200, cov=520.0, horiz=ele.HORIZ_ACCURACY_FAILSAFE, fix_type=2), + ] + statustexts = [_st(onset + 15_300, ele.STATUSTEXT_FAILSAFE)] + + # Act + r = ele.evaluate(w, estimates=estimates, statustexts=statustexts) + + # Assert + assert not r.passes_window + assert not r.passes + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_one_row(tmp_path: Path) -> None: + # Arrange + w = _window() + r = ele.evaluate(w, estimates=[], statustexts=[]) + out_path = tmp_path / "nft-res-04.csv" + + # Act + ele.write_csv_evidence(out_path, r) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 2 + assert rows[0].startswith("window_duration_s") + assert "ac1_passes" in rows[0] + assert "ac2_passes" in rows[0] + assert "ac_order_passes" in rows[0] diff --git a/e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py b/e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py new file mode 100644 index 0000000..3c9eb4d --- /dev/null +++ b/e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py @@ -0,0 +1,318 @@ +"""Unit tests for ``runner.helpers.imu_fallback_drift_evaluator`` (AZ-432 / NFT-RES-01).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import imu_fallback_drift_evaluator as ife +from runner.helpers.geo import distance_m + + +# Reference point in Derkachi (used so distances are realistic): 49.9°N, 36.3°E. +REF_LAT = 49.9 +REF_LON = 36.3 + + +def _offset_lat(lat: float, north_m: float) -> float: + """Approximate latitude offset by ``north_m`` meters (good enough for tests).""" + # ~111_320 m per degree of latitude at WGS84. + return lat + north_m / 111_320.0 + + +def _window(duration_s: float, onset_ms: int = 60_000) -> ife.BlackoutWindow: + return ife.BlackoutWindow( + onset_monotonic_ms=onset_ms, + end_monotonic_ms=onset_ms + int(duration_s * 1000), + ) + + +def _samples(*pairs: tuple[int, float]) -> tuple[ife.PositionSample, ...]: + """Build samples at REF_LON; second component is northward offset in meters.""" + return tuple( + ife.PositionSample( + monotonic_ms=t_ms, + lat_deg=_offset_lat(REF_LAT, north_m), + lon_deg=REF_LON, + ) + for t_ms, north_m in pairs + ) + + +# ───────────────────────── BlackoutWindow ───────────────────────── + + +def test_window_in_spec_exact_30s_passes() -> None: + # Act + w = _window(30.0) + # Assert + assert w.duration_s == pytest.approx(30.0) + assert w.window_in_spec + + +@pytest.mark.parametrize("duration", [28.5, 31.5]) +def test_window_in_spec_within_tolerance(duration: float) -> None: + # Assert + assert _window(duration).window_in_spec + + +@pytest.mark.parametrize("duration", [27.0, 33.0]) +def test_window_in_spec_outside_tolerance(duration: float) -> None: + # Assert + assert not _window(duration).window_in_spec + + +# ───────────────────────── evaluate_subcase ───────────────────────── + + +def test_evaluate_subcase_unknown_subcase_raises() -> None: + # Assert + with pytest.raises(ValueError): + ife.evaluate_subcase( + _window(30.0), + estimates=_samples((90_000, 60.0)), + ground_truth=_samples((90_000, 0.0)), + subcase="some_typo", + ) + + +def test_evaluate_subcase_no_imu_within_budget_passes() -> None: + # Arrange — 80 m drift at blackout end, budget is 100 m. + w = _window(30.0) + estimates = _samples((w.end_monotonic_ms, 80.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_NO_IMU) + + # Assert + assert report.subcase == ife.SUBCASE_NO_IMU + assert report.budget_m == ife.NO_IMU_BUDGET_M + assert report.drift_m is not None + assert report.drift_m == pytest.approx(80.0, abs=0.5) + assert report.passes + + +def test_evaluate_subcase_no_imu_above_budget_fails() -> None: + # Arrange — 120 m drift, budget 100 m. + w = _window(30.0) + estimates = _samples((w.end_monotonic_ms, 120.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_NO_IMU) + + # Assert + assert report.drift_m is not None + assert report.drift_m == pytest.approx(120.0, abs=0.5) + assert not report.passes + + +def test_evaluate_subcase_good_imu_within_budget_passes() -> None: + # Arrange — 40 m drift, budget 50 m. + w = _window(30.0) + estimates = _samples((w.end_monotonic_ms, 40.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_GOOD_IMU) + + # Assert + assert report.budget_m == ife.GOOD_IMU_BUDGET_M + assert report.passes + + +def test_evaluate_subcase_good_imu_above_budget_fails() -> None: + # Arrange — 70 m drift, budget 50 m. + w = _window(30.0) + estimates = _samples((w.end_monotonic_ms, 70.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_GOOD_IMU) + + # Assert + assert not report.passes + + +def test_evaluate_subcase_no_estimate_at_end_returns_none_drift() -> None: + # Arrange — only estimate sample is BEFORE blackout onset. + w = _window(30.0) + estimates = _samples((w.onset_monotonic_ms - 5_000, 0.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_NO_IMU) + + # Assert — sample at onset−5s is valid because it's ≤ end_ms, so drift is 0 + # (estimate hasn't moved). The "no estimate" case is when nothing is ≤ end_ms. + assert report.estimate_at_end is not None # onset-5s ≤ end_ms + + estimates_empty: tuple[ife.PositionSample, ...] = () + report2 = ife.evaluate_subcase(w, estimates_empty, gt, subcase=ife.SUBCASE_NO_IMU) + assert report2.drift_m is None + assert not report2.passes + + +def test_evaluate_subcase_picks_latest_sample_at_or_before_end() -> None: + # Arrange — three estimate samples, only the second is at the boundary. + w = _window(30.0) + estimates = _samples( + (w.end_monotonic_ms - 1000, 30.0), + (w.end_monotonic_ms, 50.0), + (w.end_monotonic_ms + 1000, 200.0), # AFTER the window — must be ignored + ) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase(w, estimates, gt, subcase=ife.SUBCASE_GOOD_IMU) + + # Assert + assert report.estimate_at_end is not None + assert report.estimate_at_end.monotonic_ms == w.end_monotonic_ms + assert report.drift_m == pytest.approx(50.0, abs=0.5) + + +def test_evaluate_subcase_explicit_budget_overrides_default() -> None: + # Arrange — 60 m drift, default no_imu budget=100 (would pass); override to 40. + w = _window(30.0) + estimates = _samples((w.end_monotonic_ms, 60.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate_subcase( + w, estimates, gt, subcase=ife.SUBCASE_NO_IMU, budget_m=40.0 + ) + + # Assert + assert report.budget_m == 40.0 + assert not report.passes + + +# ───────────────────────── evaluate (aggregate) ───────────────────────── + + +def test_evaluate_both_subcases_pass() -> None: + # Arrange + w = _window(30.0) + no_imu_est = _samples((w.end_monotonic_ms, 80.0)) + good_imu_est = _samples((w.end_monotonic_ms, 40.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate( + w, + sub_cases=[ + (ife.SUBCASE_NO_IMU, no_imu_est, gt), + (ife.SUBCASE_GOOD_IMU, good_imu_est, gt), + ], + ) + + # Assert + assert len(report.sub_cases) == 2 + assert report.passes_window + assert report.passes + assert report.by_subcase(ife.SUBCASE_NO_IMU).passes + assert report.by_subcase(ife.SUBCASE_GOOD_IMU).passes + + +def test_evaluate_one_subcase_fails_fails_aggregate() -> None: + # Arrange + w = _window(30.0) + no_imu_est = _samples((w.end_monotonic_ms, 80.0)) # passes (≤100) + good_imu_est = _samples((w.end_monotonic_ms, 70.0)) # fails (>50) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate( + w, + sub_cases=[ + (ife.SUBCASE_NO_IMU, no_imu_est, gt), + (ife.SUBCASE_GOOD_IMU, good_imu_est, gt), + ], + ) + + # Assert + assert not report.passes + + +def test_evaluate_window_out_of_spec_fails_even_if_drift_passes() -> None: + # Arrange — only 25 s window (under tolerance). + w = _window(25.0) + estimates = _samples((w.end_monotonic_ms, 10.0)) + gt = _samples((w.end_monotonic_ms, 0.0)) + + # Act + report = ife.evaluate( + w, sub_cases=[(ife.SUBCASE_NO_IMU, estimates, gt)] + ) + + # Assert + assert not report.passes_window + assert not report.passes + + +def test_evaluate_by_subcase_unknown_raises() -> None: + # Arrange + w = _window(30.0) + report = ife.evaluate(w, sub_cases=[]) + + # Assert + with pytest.raises(KeyError): + report.by_subcase(ife.SUBCASE_NO_IMU) + + +# ───────────────────────── distance sanity ───────────────────────── + + +def test_helper_offset_lat_aligns_with_vincenty_within_meter() -> None: + """Cross-check our convenience helper against the real Vincenty distance.""" + # Arrange + target = _offset_lat(REF_LAT, 100.0) + + # Act + d = distance_m(REF_LAT, REF_LON, target, REF_LON) + + # Assert — flat-earth approximation is within ~1 m for 100 m offsets. + assert abs(d - 100.0) < 1.0 + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_one_row_per_subcase(tmp_path: Path) -> None: + # Arrange + w = _window(30.0) + report = ife.evaluate( + w, + sub_cases=[ + (ife.SUBCASE_NO_IMU, _samples((w.end_monotonic_ms, 80.0)), _samples((w.end_monotonic_ms, 0.0))), + (ife.SUBCASE_GOOD_IMU, _samples((w.end_monotonic_ms, 40.0)), _samples((w.end_monotonic_ms, 0.0))), + ], + ) + out_path = tmp_path / "nft-res-01.csv" + + # Act + ife.write_csv_evidence(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0].startswith("subcase,") + assert "drift_m" in rows[0] + assert len(rows) == 3 # header + 2 sub-cases + assert ife.SUBCASE_NO_IMU in rows[1] or ife.SUBCASE_NO_IMU in rows[2] + assert ife.SUBCASE_GOOD_IMU in rows[1] or ife.SUBCASE_GOOD_IMU in rows[2] + + +def test_write_csv_evidence_creates_parent_dir(tmp_path: Path) -> None: + # Arrange + w = _window(30.0) + report = ife.evaluate(w, sub_cases=[]) + out_path = tmp_path / "nested" / "deep" / "nft-res-01.csv" + + # Act + ife.write_csv_evidence(out_path, report) + + # Assert + assert out_path.exists() diff --git a/e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py b/e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py new file mode 100644 index 0000000..10c42bf --- /dev/null +++ b/e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py @@ -0,0 +1,227 @@ +"""Unit tests for ``runner.helpers.monte_carlo_envelope_evaluator`` (AZ-434 / NFT-RES-03).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import monte_carlo_envelope_evaluator as mce + + +def _iter(iter_id: str, seed: int, samples: list[tuple[float, float]]) -> mce.IterationOutcome: + return mce.IterationOutcome( + iteration_id=iter_id, + iteration_seed=seed, + samples=tuple(mce.FrameSample(error_m=e, cov_semi_major_m=c) for e, c in samples), + ) + + +def _uniform_iter(iter_id: str, seed: int, *, n: int, error_m: float, cov_m: float) -> mce.IterationOutcome: + return _iter(iter_id, seed, [(error_m, cov_m) for _ in range(n)]) + + +# ───────────────────────── AC-1 iteration count ───────────────────────── + + +def test_ac1_at_min_iterations_passes() -> None: + # Arrange — 100 iterations × 1 sample each, all covered. + iterations = [_uniform_iter(f"it{i:03d}", i, n=1, error_m=0.5, cov_m=1.0) for i in range(100)] + + # Act + report = mce.evaluate(iterations, master_seed=42) + + # Assert + assert report.iteration_count == 100 + assert report.passes_iteration_count + assert report.passes + + +def test_ac1_below_min_iterations_fails() -> None: + # Arrange — 99 iterations. + iterations = [_uniform_iter(f"it{i:03d}", i, n=1, error_m=0.5, cov_m=1.0) for i in range(99)] + + # Act + report = mce.evaluate(iterations, master_seed=42) + + # Assert + assert report.iteration_count == 99 + assert not report.passes_iteration_count + assert not report.passes + + +def test_ac1_custom_min_iteration_count() -> None: + # Arrange — 5 iterations with custom min=5. + iterations = [_uniform_iter(f"it{i}", i, n=10, error_m=0.5, cov_m=1.0) for i in range(5)] + + # Act + report = mce.evaluate(iterations, master_seed=1, min_iteration_count=5) + + # Assert + assert report.passes_iteration_count + + +# ───────────────────────── AC-3 envelope ratio ───────────────────────── + + +def test_ac3_all_covered_passes() -> None: + # Arrange — error = 0.5 * cov, multiplier 1.96 → trivially covered. + iterations = [_uniform_iter(f"it{i}", i, n=10, error_m=0.5, cov_m=1.0) for i in range(100)] + + # Act + report = mce.evaluate(iterations, master_seed=0) + + # Assert + assert report.total_samples == 1000 + assert report.covered_samples == 1000 + assert report.envelope_ratio == pytest.approx(1.0) + assert report.passes_envelope + + +def test_ac3_at_budget_passes() -> None: + # Arrange — 95 covered + 5 not covered = ratio 0.95 exactly. + covered = [_uniform_iter(f"c{i}", i, n=1, error_m=0.5, cov_m=1.0) for i in range(95)] + uncovered = [_uniform_iter(f"u{i}", 1000 + i, n=1, error_m=3.0, cov_m=1.0) for i in range(5)] + + # Act + report = mce.evaluate(covered + uncovered, master_seed=0) + + # Assert — 3.0 > 1.96, so uncovered. 95/100 = 0.95 exactly. + assert report.envelope_ratio == pytest.approx(0.95) + assert report.passes_envelope + + +def test_ac3_below_budget_fails() -> None: + # Arrange — 90 covered + 10 not covered = ratio 0.90. + covered = [_uniform_iter(f"c{i}", i, n=1, error_m=0.5, cov_m=1.0) for i in range(90)] + uncovered = [_uniform_iter(f"u{i}", 1000 + i, n=1, error_m=3.0, cov_m=1.0) for i in range(10)] + + # Act + report = mce.evaluate(covered + uncovered, master_seed=0) + + # Assert + assert report.envelope_ratio == pytest.approx(0.90) + assert not report.passes_envelope + + +def test_ac3_edge_error_exactly_at_envelope_counts_as_covered() -> None: + # Arrange — error = 1.96 * cov should still count. + iterations = [_uniform_iter(f"it{i}", i, n=1, error_m=1.96, cov_m=1.0) for i in range(100)] + + # Act + report = mce.evaluate(iterations, master_seed=0) + + # Assert + assert report.envelope_ratio == pytest.approx(1.0) + + +def test_ac3_empty_iterations_returns_none_ratio() -> None: + # Act + report = mce.evaluate([], master_seed=0) + + # Assert + assert report.total_samples == 0 + assert report.envelope_ratio is None + assert not report.passes_envelope + assert not report.passes + + +def test_ac3_custom_envelope_multiplier() -> None: + # Arrange — error = 1.0, cov = 1.0; default multiplier 1.96 → covered; + # custom multiplier 0.5 → uncovered. + iterations = [_uniform_iter(f"it{i}", i, n=1, error_m=1.0, cov_m=1.0) for i in range(100)] + + # Act + report = mce.evaluate(iterations, master_seed=0, envelope_multiplier=0.5) + + # Assert — 1.0 > 0.5 * 1.0 = 0.5 → uncovered. + assert report.envelope_ratio == pytest.approx(0.0) + assert not report.passes_envelope + + +# ───────────────────────── AC-2 determinism ───────────────────────── + + +def test_iteration_hash_same_for_same_inputs() -> None: + # Arrange + it1 = _iter("it1", 7, [(0.5, 1.0), (0.6, 1.0), (0.4, 0.9)]) + it2 = _iter("it1", 7, [(0.5, 1.0), (0.6, 1.0), (0.4, 0.9)]) + + # Assert + assert mce.iteration_hash(it1) == mce.iteration_hash(it2) + + +def test_iteration_hash_differs_when_samples_differ() -> None: + # Arrange + it1 = _iter("it1", 7, [(0.5, 1.0)]) + it2 = _iter("it1", 7, [(0.6, 1.0)]) + + # Assert + assert mce.iteration_hash(it1) != mce.iteration_hash(it2) + + +def test_iteration_hash_differs_when_seed_differs() -> None: + # Arrange + it1 = _iter("it1", 7, [(0.5, 1.0)]) + it2 = _iter("it1", 8, [(0.5, 1.0)]) + + # Assert + assert mce.iteration_hash(it1) != mce.iteration_hash(it2) + + +def test_determinism_fingerprint_stable_across_two_evaluate_calls() -> None: + # Arrange + iterations = [_uniform_iter(f"it{i:03d}", i, n=3, error_m=0.5, cov_m=1.0) for i in range(10)] + r1 = mce.evaluate(iterations, master_seed=42) + r2 = mce.evaluate(iterations, master_seed=42) + + # Assert + assert mce.determinism_fingerprint(r1) == mce.determinism_fingerprint(r2) + + +def test_determinism_fingerprint_differs_when_master_seed_changes() -> None: + # Arrange + iterations = [_uniform_iter(f"it{i:03d}", i, n=3, error_m=0.5, cov_m=1.0) for i in range(10)] + r1 = mce.evaluate(iterations, master_seed=42) + r2 = mce.evaluate(iterations, master_seed=43) + + # Assert + assert mce.determinism_fingerprint(r1) != mce.determinism_fingerprint(r2) + + +# ───────────────────────── csv emit ───────────────────────── + + +def test_write_csv_evidence_emits_summary(tmp_path: Path) -> None: + # Arrange + iterations = [_uniform_iter(f"it{i:03d}", i, n=10, error_m=0.5, cov_m=1.0) for i in range(100)] + report = mce.evaluate(iterations, master_seed=42) + out_path = tmp_path / "nft-res-03.csv" + + # Act + mce.write_csv_evidence(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert len(rows) == 2 + assert rows[0].startswith("master_seed") + assert "ac1_iteration_count_passes" in rows[0] + assert "ac3_envelope_passes" in rows[0] + assert "fingerprint_sha256" in rows[0] + + +def test_write_per_iteration_csv_row_per_iter(tmp_path: Path) -> None: + # Arrange + iterations = [_uniform_iter(f"it{i}", i, n=2, error_m=0.5, cov_m=1.0) for i in range(5)] + report = mce.evaluate(iterations, master_seed=1, min_iteration_count=5) + out_path = tmp_path / "per-iter.csv" + + # Act + mce.write_per_iteration_csv(out_path, report) + + # Assert + rows = out_path.read_text().splitlines() + assert rows[0] == ( + "iteration_id,iteration_seed,frame_count,covered_count,envelope_ratio,iteration_hash_sha256" + ) + assert len(rows) == 6 # header + 5 iterations diff --git a/e2e/_unit_tests/test_directory_layout.py b/e2e/_unit_tests/test_directory_layout.py index 6109aa3..ecfac42 100644 --- a/e2e/_unit_tests/test_directory_layout.py +++ b/e2e/_unit_tests/test_directory_layout.py @@ -67,6 +67,10 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "runner/helpers/spoof_promotion_evaluator.py", "runner/helpers/ttff_evaluator.py", "runner/helpers/e2e_latency_evaluator.py", + "runner/helpers/imu_fallback_drift_evaluator.py", + "runner/helpers/companion_reboot_evaluator.py", + "runner/helpers/monte_carlo_envelope_evaluator.py", + "runner/helpers/escalation_ladder_evaluator.py", "fixtures/sitl_replay_builder/__init__.py", "fixtures/sitl_replay_builder/builder.py", "fixtures/sitl_replay_builder/build_p01_fixtures.py", @@ -133,6 +137,10 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "tests/performance/test_nft_perf_02_streaming.py", "tests/performance/test_nft_perf_03_ttff.py", "tests/performance/test_nft_perf_04_spoof_promotion.py", + "tests/resilience/test_nft_res_01_imu_only_fallback.py", + "tests/resilience/test_nft_res_02_companion_reboot.py", + "tests/resilience/test_nft_res_03_monte_carlo.py", + "tests/resilience/test_nft_res_04_blackout_escalation.py", ], ) def test_required_path_exists(relative_path: str) -> None: diff --git a/e2e/runner/helpers/companion_reboot_evaluator.py b/e2e/runner/helpers/companion_reboot_evaluator.py new file mode 100644 index 0000000..bf7a692 --- /dev/null +++ b/e2e/runner/helpers/companion_reboot_evaluator.py @@ -0,0 +1,201 @@ +"""Companion-process reboot recovery evaluator for NFT-RES-02 (AZ-433 / AC-5.2 + AC-5.3). + +Mid-flight, the runner issues a restart command (``docker compose +restart gps-denied-onboard`` on Tier-1, ``systemctl restart +gps-denied-onboard`` on Tier-2). The SUT must: + +* AC-1 — actually restart within ≤``RESTART_TRIGGER_BUDGET_S`` (5 s). +* AC-2 — emit its first post-restart outbound estimate within + ≤``RESUME_BUDGET_S`` (30 s) of the restart command. +* AC-3 — that first post-restart estimate must be within + ≤``ACCURACY_BUDGET_M`` (100 m) of ground truth at that timestamp. + +This module owns the pure-logic side of those budgets + CSV evidence. +The scenario test owns the orchestration (issue restart, capture +timestamps, query GT). + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path + +from .geo import distance_m + +RESTART_TRIGGER_BUDGET_S = 5.0 +RESUME_BUDGET_S = 30.0 +ACCURACY_BUDGET_M = 100.0 + + +@dataclass(frozen=True) +class GeoFix: + """A WGS84 fix at a monotonic-ms timestamp.""" + + monotonic_ms: int + lat_deg: float + lon_deg: float + + +@dataclass(frozen=True) +class RestartEvidence: + """Captured timestamps + first post-restart fix vs GT. + + All ``*_monotonic_ms`` fields share the runner's monotonic clock so + deltas are well-defined. ``first_post_restart_estimate`` and + ``ground_truth_at_first_emission`` are both captured at + ``first_post_restart_emission_monotonic_ms``. + + ``process_restarted_monotonic_ms`` is the wall-clock-ish moment the + SUT process is observed to have come back up (e.g., first PID write, + health probe transition). Used for AC-1 only. + + ``first_post_restart_emission_monotonic_ms`` is the moment the runner + captures the first outbound estimate AFTER the restart command. Used + for AC-2 + AC-3. May be ``None`` if no emission arrives in the budget + window — counted as AC-2 + AC-3 failures. + """ + + restart_command_monotonic_ms: int + process_restarted_monotonic_ms: int | None + first_post_restart_emission_monotonic_ms: int | None + first_post_restart_estimate: GeoFix | None + ground_truth_at_first_emission: GeoFix | None + + +@dataclass(frozen=True) +class CompanionRebootReport: + """NFT-RES-02 aggregate verdict for one run.""" + + restart_trigger_latency_s: float | None + resume_time_s: float | None + first_emission_accuracy_m: float | None + restart_trigger_budget_s: float + resume_budget_s: float + accuracy_budget_m: float + + @property + def passes_restart_trigger(self) -> bool: + return ( + self.restart_trigger_latency_s is not None + and self.restart_trigger_latency_s <= self.restart_trigger_budget_s + ) + + @property + def passes_resume_time(self) -> bool: + return ( + self.resume_time_s is not None + and self.resume_time_s <= self.resume_budget_s + ) + + @property + def passes_first_emission_accuracy(self) -> bool: + return ( + self.first_emission_accuracy_m is not None + and self.first_emission_accuracy_m <= self.accuracy_budget_m + ) + + @property + def passes(self) -> bool: + return ( + self.passes_restart_trigger + and self.passes_resume_time + and self.passes_first_emission_accuracy + ) + + +def evaluate( + evidence: RestartEvidence, + *, + restart_trigger_budget_s: float = RESTART_TRIGGER_BUDGET_S, + resume_budget_s: float = RESUME_BUDGET_S, + accuracy_budget_m: float = ACCURACY_BUDGET_M, +) -> CompanionRebootReport: + """Compute the AC-1 + AC-2 + AC-3 verdict from captured restart evidence.""" + trigger_latency: float | None = None + if evidence.process_restarted_monotonic_ms is not None: + delta_ms = ( + evidence.process_restarted_monotonic_ms + - evidence.restart_command_monotonic_ms + ) + if delta_ms < 0: + raise ValueError( + "process_restarted precedes restart_command — clock-skew bug? " + f"command={evidence.restart_command_monotonic_ms} " + f"restarted={evidence.process_restarted_monotonic_ms}" + ) + trigger_latency = delta_ms / 1000.0 + + resume_time: float | None = None + if evidence.first_post_restart_emission_monotonic_ms is not None: + delta_ms = ( + evidence.first_post_restart_emission_monotonic_ms + - evidence.restart_command_monotonic_ms + ) + if delta_ms < 0: + raise ValueError( + "first_post_restart_emission precedes restart_command — " + "ordering bug; an emission BEFORE the restart command " + "cannot be the 'first post-restart' emission" + ) + resume_time = delta_ms / 1000.0 + + accuracy_m: float | None = None + if ( + evidence.first_post_restart_estimate is not None + and evidence.ground_truth_at_first_emission is not None + ): + accuracy_m = distance_m( + evidence.first_post_restart_estimate.lat_deg, + evidence.first_post_restart_estimate.lon_deg, + evidence.ground_truth_at_first_emission.lat_deg, + evidence.ground_truth_at_first_emission.lon_deg, + ) + + return CompanionRebootReport( + restart_trigger_latency_s=trigger_latency, + resume_time_s=resume_time, + first_emission_accuracy_m=accuracy_m, + restart_trigger_budget_s=restart_trigger_budget_s, + resume_budget_s=resume_budget_s, + accuracy_budget_m=accuracy_budget_m, + ) + + +def write_csv_evidence(out_path: Path, report: CompanionRebootReport) -> Path: + """Aggregate-summary CSV (one row per run).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "restart_trigger_latency_s", + "restart_trigger_budget_s", + "ac1_passes", + "resume_time_s", + "resume_budget_s", + "ac2_passes", + "first_emission_accuracy_m", + "accuracy_budget_m", + "ac3_passes", + "passes", + ] + ) + writer.writerow( + [ + "" if report.restart_trigger_latency_s is None else f"{report.restart_trigger_latency_s:.3f}", + f"{report.restart_trigger_budget_s:.3f}", + "true" if report.passes_restart_trigger else "false", + "" if report.resume_time_s is None else f"{report.resume_time_s:.3f}", + f"{report.resume_budget_s:.3f}", + "true" if report.passes_resume_time else "false", + "" if report.first_emission_accuracy_m is None else f"{report.first_emission_accuracy_m:.3f}", + f"{report.accuracy_budget_m:.3f}", + "true" if report.passes_first_emission_accuracy else "false", + "true" if report.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/escalation_ladder_evaluator.py b/e2e/runner/helpers/escalation_ladder_evaluator.py new file mode 100644 index 0000000..a39fa58 --- /dev/null +++ b/e2e/runner/helpers/escalation_ladder_evaluator.py @@ -0,0 +1,324 @@ +"""Escalation-ladder evaluator for NFT-RES-04 (AZ-435 / AC-NEW-8 escalation order). + +FT-N-04 already proves the 35 s blackout-with-spoof window's per-AC +thresholds (see ``blackout_spoof_evaluator``). NFT-RES-04 is the +*resilience-tier* scenario: it asserts the **full ladder fires in +observable order** within tight latency budgets: + +* AC-1 — when the SUT's reported 95 % covariance crosses + ``COV_2D_THRESHOLD_M`` (100 m), MAVLink fix-quality degrades to + ≤``FIX_TYPE_2D`` within ≤``ESCALATION_LATENCY_MS`` (500 ms) of the + crossing. +* AC-2 — when covariance crosses ``COV_FAILSAFE_THRESHOLD_M`` (500 m) + OR blackout duration exceeds ``DURATION_FAILSAFE_S`` (30 s), the + outbound ``horiz_accuracy`` becomes ``HORIZ_ACCURACY_FAILSAFE`` + (999.0) AND a ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT is emitted + within ≤``ESCALATION_LATENCY_MS`` of the trigger. +* AC-ORDER — the AC-1 fix-degrade crossing must precede the AC-2 + failsafe trigger in observed time. A later-than-AC-2 cov-2d crossing + is a strict-monotonicity bug because the SUT cannot un-cross 100 m + on its way past 500 m. + +This evaluator deliberately re-defines the thresholds locally rather +than importing them from ``blackout_spoof_evaluator`` so a future +contract drift in either evaluator does not silently propagate. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +COV_2D_THRESHOLD_M = 100.0 +COV_FAILSAFE_THRESHOLD_M = 500.0 +DURATION_FAILSAFE_S = 30.0 +FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D +HORIZ_ACCURACY_FAILSAFE = 999.0 +STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE" +ESCALATION_LATENCY_MS = 500 +EXPECTED_WINDOW_S = 35.0 +WINDOW_TOLERANCE_S = 2.0 + + +@dataclass(frozen=True) +class BlackoutWindow: + """The injector-emitted window the ladder is evaluated over.""" + + onset_monotonic_ms: int + end_monotonic_ms: int + + @property + def duration_s(self) -> float: + return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 + + @property + def is_35s(self) -> bool: + return abs(self.duration_s - EXPECTED_WINDOW_S) <= WINDOW_TOLERANCE_S + + +@dataclass(frozen=True) +class EstimateSample: + """One outbound estimate at a monotonic-ms timestamp.""" + + monotonic_ms: int + cov_semi_major_m: float + horiz_accuracy: float + fix_type: int + + +@dataclass(frozen=True) +class StatustextSample: + monotonic_ms: int + text: str + + +@dataclass(frozen=True) +class FixDegradeReport: + """AC-1: cov-2d crossing → fix-type degrade within ≤500 ms.""" + + cov2d_crossed_at_ms: int | None + fix_degraded_at_ms: int | None + latency_ms: int | None + budget_ms: int + + @property + def passes(self) -> bool: + return ( + self.cov2d_crossed_at_ms is not None + and self.fix_degraded_at_ms is not None + and self.latency_ms is not None + and self.latency_ms <= self.budget_ms + ) + + +@dataclass(frozen=True) +class FailsafeReport: + """AC-2: cov-500 OR 30 s elapsed → 999.0 AND STATUSTEXT within ≤500 ms.""" + + failsafe_trigger_at_ms: int | None + horiz_999_at_ms: int | None + horiz_999_latency_ms: int | None + statustext_at_ms: int | None + statustext_latency_ms: int | None + budget_ms: int + + @property + def horiz_999_passes(self) -> bool: + return ( + self.failsafe_trigger_at_ms is not None + and self.horiz_999_latency_ms is not None + and self.horiz_999_latency_ms <= self.budget_ms + ) + + @property + def statustext_passes(self) -> bool: + return ( + self.failsafe_trigger_at_ms is not None + and self.statustext_latency_ms is not None + and self.statustext_latency_ms <= self.budget_ms + ) + + @property + def passes(self) -> bool: + return self.horiz_999_passes and self.statustext_passes + + +@dataclass(frozen=True) +class OrderingReport: + """AC-ORDER: cov-2d crossing must precede the failsafe trigger.""" + + cov2d_at_ms: int | None + failsafe_trigger_at_ms: int | None + + @property + def passes(self) -> bool: + if self.cov2d_at_ms is None or self.failsafe_trigger_at_ms is None: + # Cannot verify ordering when either pole is missing — + # the per-AC pass/fail covers that case. + return False + return self.cov2d_at_ms < self.failsafe_trigger_at_ms + + +@dataclass(frozen=True) +class EscalationLadderReport: + """Aggregate NFT-RES-04 verdict for one 35 s window.""" + + window: BlackoutWindow + fix_degrade: FixDegradeReport + failsafe: FailsafeReport + ordering: OrderingReport + + @property + def passes_window(self) -> bool: + return self.window.is_35s + + @property + def passes(self) -> bool: + return ( + self.passes_window + and self.fix_degrade.passes + and self.failsafe.passes + and self.ordering.passes + ) + + +def _samples_in_window( + window: BlackoutWindow, samples: Sequence[EstimateSample] +) -> list[EstimateSample]: + return [ + s + for s in samples + if window.onset_monotonic_ms <= s.monotonic_ms <= window.end_monotonic_ms + ] + + +def _first_cov_crossing( + window: BlackoutWindow, + samples: Sequence[EstimateSample], + threshold_m: float, +) -> int | None: + for s in _samples_in_window(window, samples): + if s.cov_semi_major_m >= threshold_m: + return s.monotonic_ms + return None + + +def _first_fix_degrade( + samples: Sequence[EstimateSample], from_ms: int +) -> int | None: + for s in samples: + if s.monotonic_ms < from_ms: + continue + if s.fix_type <= FIX_TYPE_2D and s.fix_type >= 0: + return s.monotonic_ms + return None + + +def _first_horiz_999(samples: Sequence[EstimateSample], from_ms: int) -> int | None: + for s in samples: + if s.monotonic_ms < from_ms: + continue + if s.horiz_accuracy == HORIZ_ACCURACY_FAILSAFE: + return s.monotonic_ms + return None + + +def _first_failsafe_statustext( + statustexts: Sequence[StatustextSample], from_ms: int +) -> int | None: + for st in statustexts: + if st.monotonic_ms < from_ms: + continue + if STATUSTEXT_FAILSAFE in st.text: + return st.monotonic_ms + return None + + +def evaluate( + window: BlackoutWindow, + *, + estimates: Sequence[EstimateSample], + statustexts: Sequence[StatustextSample], + budget_ms: int = ESCALATION_LATENCY_MS, +) -> EscalationLadderReport: + """Compute AC-1 + AC-2 + AC-ORDER verdicts for one 35 s window.""" + cov2d_at = _first_cov_crossing(window, estimates, COV_2D_THRESHOLD_M) + fix_degraded_at = ( + _first_fix_degrade(estimates, cov2d_at) if cov2d_at is not None else None + ) + fix_latency: int | None = None + if cov2d_at is not None and fix_degraded_at is not None: + fix_latency = fix_degraded_at - cov2d_at + fix_report = FixDegradeReport( + cov2d_crossed_at_ms=cov2d_at, + fix_degraded_at_ms=fix_degraded_at, + latency_ms=fix_latency, + budget_ms=budget_ms, + ) + + cov500_at = _first_cov_crossing(window, estimates, COV_FAILSAFE_THRESHOLD_M) + duration_trip_at: int | None = None + if window.duration_s >= DURATION_FAILSAFE_S: + duration_trip_at = ( + window.onset_monotonic_ms + int(DURATION_FAILSAFE_S * 1000) + ) + failsafe_trigger_at: int | None + candidates = [t for t in (cov500_at, duration_trip_at) if t is not None] + failsafe_trigger_at = min(candidates) if candidates else None + + horiz_999_at: int | None = None + horiz_latency: int | None = None + statustext_at: int | None = None + statustext_latency: int | None = None + if failsafe_trigger_at is not None: + horiz_999_at = _first_horiz_999(estimates, failsafe_trigger_at) + if horiz_999_at is not None: + horiz_latency = horiz_999_at - failsafe_trigger_at + statustext_at = _first_failsafe_statustext(statustexts, failsafe_trigger_at) + if statustext_at is not None: + statustext_latency = statustext_at - failsafe_trigger_at + + failsafe_report = FailsafeReport( + failsafe_trigger_at_ms=failsafe_trigger_at, + horiz_999_at_ms=horiz_999_at, + horiz_999_latency_ms=horiz_latency, + statustext_at_ms=statustext_at, + statustext_latency_ms=statustext_latency, + budget_ms=budget_ms, + ) + + ordering = OrderingReport( + cov2d_at_ms=cov2d_at, failsafe_trigger_at_ms=failsafe_trigger_at + ) + + return EscalationLadderReport( + window=window, + fix_degrade=fix_report, + failsafe=failsafe_report, + ordering=ordering, + ) + + +def write_csv_evidence(out_path: Path, report: EscalationLadderReport) -> Path: + """Aggregate-summary CSV (one row per window).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "window_duration_s", + "window_is_35s", + "ac1_cov2d_at_ms", + "ac1_fix_degraded_at_ms", + "ac1_latency_ms", + "ac1_passes", + "ac2_failsafe_trigger_at_ms", + "ac2_horiz_999_latency_ms", + "ac2_statustext_latency_ms", + "ac2_passes", + "ac_order_passes", + "passes", + ] + ) + writer.writerow( + [ + f"{report.window.duration_s:.3f}", + "true" if report.window.is_35s else "false", + "" if report.fix_degrade.cov2d_crossed_at_ms is None else report.fix_degrade.cov2d_crossed_at_ms, + "" if report.fix_degrade.fix_degraded_at_ms is None else report.fix_degrade.fix_degraded_at_ms, + "" if report.fix_degrade.latency_ms is None else report.fix_degrade.latency_ms, + "true" if report.fix_degrade.passes else "false", + "" if report.failsafe.failsafe_trigger_at_ms is None else report.failsafe.failsafe_trigger_at_ms, + "" if report.failsafe.horiz_999_latency_ms is None else report.failsafe.horiz_999_latency_ms, + "" if report.failsafe.statustext_latency_ms is None else report.failsafe.statustext_latency_ms, + "true" if report.failsafe.passes else "false", + "true" if report.ordering.passes else "false", + "true" if report.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/imu_fallback_drift_evaluator.py b/e2e/runner/helpers/imu_fallback_drift_evaluator.py new file mode 100644 index 0000000..ef7dbcd --- /dev/null +++ b/e2e/runner/helpers/imu_fallback_drift_evaluator.py @@ -0,0 +1,227 @@ +"""IMU-only fallback drift evaluator for NFT-RES-01 (AZ-432 / AC-3.5 + AC-NEW-7). + +A pure-vision-blackout (no spoof) lasting 30 s is injected by +``fixtures/injectors/blackout_spoof.py --no-spoof``. The SUT must +fall back to IMU-only dead reckoning. AC-3.5 + AC-NEW-7 prescribe two +drift budgets at the end of the blackout, depending on whether the +CombinedImuFactor (PreintegratedCombinedMeasurements) is active: + +* sub-case (a) — no good IMU → ``drift ≤ NO_IMU_BUDGET_M`` (100 m). +* sub-case (b) — CombinedImuFactor active (SUT default config) → + ``drift ≤ GOOD_IMU_BUDGET_M`` (50 m). + +Drift is the Vincenty distance between the SUT's last estimate at +blackout end and the ground-truth position at the same timestamp. + +The scenario test owns the orchestration (window injection, +sub-case selection, fixture loading). This module owns the pure +arithmetic + CSV evidence. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol; consumes only typed samples that +the scenario adapter projects out of the boundary observers. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Sequence + +from .geo import distance_m + +# AC-2 / AC-3 +NO_IMU_BUDGET_M = 100.0 +GOOD_IMU_BUDGET_M = 50.0 +# AC-1 — accept windows within ±2 s of the nominal 30 s. +WINDOW_NOMINAL_S = 30.0 +WINDOW_TOLERANCE_S = 2.0 + +SUBCASE_NO_IMU = "no_imu" +SUBCASE_GOOD_IMU = "good_imu_combined_factor" +ALLOWED_SUBCASES = (SUBCASE_NO_IMU, SUBCASE_GOOD_IMU) + + +@dataclass(frozen=True) +class PositionSample: + """One WGS84 sample tagged with a monotonic-ms timestamp. + + Used for both the SUT's outbound estimate stream and the ground-truth + track. Both streams must share the same monotonic clock so the + scenario can pick the "at blackout end" sample by interpolation / + nearest-neighbour lookup. + """ + + monotonic_ms: int + lat_deg: float + lon_deg: float + + +@dataclass(frozen=True) +class BlackoutWindow: + """The injector-emitted window the evaluator is bound to.""" + + onset_monotonic_ms: int + end_monotonic_ms: int + + @property + def duration_s(self) -> float: + return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 + + @property + def window_in_spec(self) -> bool: + """AC-1: window duration must be within ±2 s of nominal 30 s.""" + return abs(self.duration_s - WINDOW_NOMINAL_S) <= WINDOW_TOLERANCE_S + + +@dataclass(frozen=True) +class SubCaseReport: + """Drift result for a single sub-case (no_imu / good_imu).""" + + subcase: str + drift_m: float | None + budget_m: float + estimate_at_end: PositionSample | None + gt_at_end: PositionSample | None + + @property + def passes(self) -> bool: + return self.drift_m is not None and self.drift_m <= self.budget_m + + +@dataclass(frozen=True) +class ImuFallbackReport: + """Aggregate NFT-RES-01 result for one parameterization.""" + + window: BlackoutWindow + sub_cases: tuple[SubCaseReport, ...] + + @property + def passes_window(self) -> bool: + return self.window.window_in_spec + + @property + def passes(self) -> bool: + return self.passes_window and all(s.passes for s in self.sub_cases) + + def by_subcase(self, subcase: str) -> SubCaseReport: + for s in self.sub_cases: + if s.subcase == subcase: + return s + raise KeyError(f"sub-case {subcase!r} not present in report") + + +def _pick_at_or_before( + samples: Sequence[PositionSample], t_ms: int +) -> PositionSample | None: + """Return the latest sample with ``monotonic_ms ≤ t_ms`` (None if none qualify). + + Tests against the closest sample on the "left" of the boundary — + drift evaluation must NOT extrapolate past the captured window. + """ + chosen: PositionSample | None = None + for s in samples: + if s.monotonic_ms <= t_ms: + if chosen is None or s.monotonic_ms > chosen.monotonic_ms: + chosen = s + return chosen + + +def evaluate_subcase( + window: BlackoutWindow, + estimates: Sequence[PositionSample], + ground_truth: Sequence[PositionSample], + *, + subcase: str, + budget_m: float | None = None, +) -> SubCaseReport: + """Compute drift for one sub-case. + + `subcase` selects the budget when `budget_m` is omitted: 100 m for + ``no_imu``, 50 m for ``good_imu_combined_factor``. Unknown sub-case + names raise ``ValueError`` so a typo at the call site fails loud + instead of silently relaxing the budget. + """ + if subcase not in ALLOWED_SUBCASES: + raise ValueError( + f"subcase must be one of {ALLOWED_SUBCASES}; got {subcase!r}" + ) + if budget_m is None: + budget_m = ( + NO_IMU_BUDGET_M if subcase == SUBCASE_NO_IMU else GOOD_IMU_BUDGET_M + ) + estimate_end = _pick_at_or_before(estimates, window.end_monotonic_ms) + gt_end = _pick_at_or_before(ground_truth, window.end_monotonic_ms) + drift: float | None + if estimate_end is None or gt_end is None: + drift = None + else: + drift = distance_m( + estimate_end.lat_deg, + estimate_end.lon_deg, + gt_end.lat_deg, + gt_end.lon_deg, + ) + return SubCaseReport( + subcase=subcase, + drift_m=drift, + budget_m=budget_m, + estimate_at_end=estimate_end, + gt_at_end=gt_end, + ) + + +def evaluate( + window: BlackoutWindow, + *, + sub_cases: Iterable[tuple[str, Sequence[PositionSample], Sequence[PositionSample]]], +) -> ImuFallbackReport: + """Compute the aggregate report across multiple sub-cases. + + Each tuple is ``(subcase_name, estimates, ground_truth)``. The + evaluator does not require both sub-cases to be present — a scenario + that can only exercise one path still gets a partial report whose + ``passes`` is False (because the missing sub-case has no drift). + """ + reports: list[SubCaseReport] = [] + for subcase, estimates, ground_truth in sub_cases: + reports.append(evaluate_subcase(window, estimates, ground_truth, subcase=subcase)) + return ImuFallbackReport(window=window, sub_cases=tuple(reports)) + + +def write_csv_evidence(out_path: Path, report: ImuFallbackReport) -> Path: + """Aggregate-summary CSV (one row per sub-case).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "subcase", + "window_duration_s", + "window_in_spec", + "drift_m", + "budget_m", + "estimate_end_lat", + "estimate_end_lon", + "gt_end_lat", + "gt_end_lon", + "passes", + ] + ) + for sub in report.sub_cases: + writer.writerow( + [ + sub.subcase, + f"{report.window.duration_s:.3f}", + "true" if report.passes_window else "false", + "" if sub.drift_m is None else f"{sub.drift_m:.3f}", + f"{sub.budget_m:.3f}", + "" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lat_deg:.7f}", + "" if sub.estimate_at_end is None else f"{sub.estimate_at_end.lon_deg:.7f}", + "" if sub.gt_at_end is None else f"{sub.gt_at_end.lat_deg:.7f}", + "" if sub.gt_at_end is None else f"{sub.gt_at_end.lon_deg:.7f}", + "true" if sub.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/monte_carlo_envelope_evaluator.py b/e2e/runner/helpers/monte_carlo_envelope_evaluator.py new file mode 100644 index 0000000..17c725e --- /dev/null +++ b/e2e/runner/helpers/monte_carlo_envelope_evaluator.py @@ -0,0 +1,228 @@ +"""Monte Carlo statistical-envelope evaluator for NFT-RES-03 (AZ-434 / AC-NEW-4). + +The SUT promises an *honest* covariance: across many runs with seeded +perturbations (gain noise, IMU bias, frame-drop pattern, outlier +injection), the actual error distribution should stay within the +``1.96 × cov_semi_major_m`` envelope at the 95th percentile. The +runner drives N iterations and feeds this module the per-iteration +per-frame ``(error_m, cov_semi_major_m)`` pairs. + +ACs evaluated (per AZ-434): + +* AC-1 — iteration_count == ``MIN_ITERATION_COUNT`` (100). Partial + completion is a hard failure; the runner is responsible for + re-running missing iterations rather than passing a short list. +* AC-2 — determinism check: re-running with the same master_seed + produces bit-identical iteration outcomes. This module records the + master_seed and a SHA-256 of the per-iteration ``(error_m, + cov_semi_major_m)`` tuples; the scenario harness compares the seed + + hash across two runs (the comparison itself is not a method on + this evaluator — it's a scenario-level check). +* AC-3 — global aggregate envelope: across all 100 × N_frames + samples, ``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ + 0.95``. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +import hashlib +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +MIN_ITERATION_COUNT = 100 +ENVELOPE_MULTIPLIER = 1.96 # 95th-percentile envelope on a normal cov_semi_major +ENVELOPE_RATIO_BUDGET = 0.95 + + +@dataclass(frozen=True) +class FrameSample: + """One per-frame ``(error_m, cov_semi_major_m)`` pair. + + ``error_m`` is the WGS84 Vincenty distance between the SUT's + estimate and ground truth at that frame; ``cov_semi_major_m`` is + the SUT's self-reported uncertainty semi-major-axis (m) at the + same frame. + """ + + error_m: float + cov_semi_major_m: float + + +@dataclass(frozen=True) +class IterationOutcome: + """One Monte Carlo iteration: ordered per-frame samples + iteration seed.""" + + iteration_id: str + iteration_seed: int + samples: tuple[FrameSample, ...] + + @property + def frame_count(self) -> int: + return len(self.samples) + + +@dataclass(frozen=True) +class MonteCarloReport: + """Aggregate NFT-RES-03 result over N iterations.""" + + iterations: tuple[IterationOutcome, ...] + master_seed: int + iteration_count: int + total_samples: int + covered_samples: int + envelope_ratio: float | None + min_iteration_count: int + envelope_ratio_budget: float + + @property + def passes_iteration_count(self) -> bool: + return self.iteration_count >= self.min_iteration_count + + @property + def passes_envelope(self) -> bool: + return ( + self.envelope_ratio is not None + and self.envelope_ratio >= self.envelope_ratio_budget + ) + + @property + def passes(self) -> bool: + return self.passes_iteration_count and self.passes_envelope + + +def iteration_hash(iteration: IterationOutcome) -> str: + """SHA-256 of the iteration's ``(error_m, cov_semi_major_m)`` tuples. + + Used to certify AC-2 determinism — two runs of the same iteration + with the same iteration_seed must produce the same hash. + """ + h = hashlib.sha256() + h.update(f"{iteration.iteration_id}\n{iteration.iteration_seed}\n".encode("ascii")) + for s in iteration.samples: + h.update(f"{s.error_m!r}|{s.cov_semi_major_m!r}\n".encode("ascii")) + return h.hexdigest() + + +def determinism_fingerprint(report: MonteCarloReport) -> str: + """One-shot fingerprint of an entire MC run — for AC-2 cross-run comparison.""" + h = hashlib.sha256() + h.update(f"master_seed={report.master_seed}\n".encode("ascii")) + for it in report.iterations: + h.update(f"{iteration_hash(it)}\n".encode("ascii")) + return h.hexdigest() + + +def evaluate( + iterations: Sequence[IterationOutcome], + *, + master_seed: int, + min_iteration_count: int = MIN_ITERATION_COUNT, + envelope_ratio_budget: float = ENVELOPE_RATIO_BUDGET, + envelope_multiplier: float = ENVELOPE_MULTIPLIER, +) -> MonteCarloReport: + """Compute the AC-1 + AC-3 verdict. + + AC-2 (determinism) is a scenario-level check: the scenario calls + this twice with the same master_seed and compares + ``determinism_fingerprint(report1) == determinism_fingerprint(report2)``. + """ + total = 0 + covered = 0 + for it in iterations: + for s in it.samples: + total += 1 + if s.error_m <= envelope_multiplier * s.cov_semi_major_m: + covered += 1 + ratio: float | None + if total == 0: + ratio = None + else: + ratio = covered / total + return MonteCarloReport( + iterations=tuple(iterations), + master_seed=master_seed, + iteration_count=len(iterations), + total_samples=total, + covered_samples=covered, + envelope_ratio=ratio, + min_iteration_count=min_iteration_count, + envelope_ratio_budget=envelope_ratio_budget, + ) + + +def write_csv_evidence(out_path: Path, report: MonteCarloReport) -> Path: + """Aggregate-summary CSV (one row per run).""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "master_seed", + "iteration_count", + "min_iteration_count", + "total_samples", + "covered_samples", + "envelope_ratio", + "envelope_ratio_budget", + "ac1_iteration_count_passes", + "ac3_envelope_passes", + "passes", + "fingerprint_sha256", + ] + ) + writer.writerow( + [ + report.master_seed, + report.iteration_count, + report.min_iteration_count, + report.total_samples, + report.covered_samples, + "" if report.envelope_ratio is None else f"{report.envelope_ratio:.6f}", + f"{report.envelope_ratio_budget:.6f}", + "true" if report.passes_iteration_count else "false", + "true" if report.passes_envelope else "false", + "true" if report.passes else "false", + determinism_fingerprint(report), + ] + ) + return out_path + + +def write_per_iteration_csv(out_path: Path, report: MonteCarloReport) -> Path: + """One row per iteration — used during AC-3 envelope-breach investigation.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "iteration_id", + "iteration_seed", + "frame_count", + "covered_count", + "envelope_ratio", + "iteration_hash_sha256", + ] + ) + for it in report.iterations: + covered = sum( + 1 + for s in it.samples + if s.error_m <= ENVELOPE_MULTIPLIER * s.cov_semi_major_m + ) + ratio = (covered / it.frame_count) if it.frame_count else None + writer.writerow( + [ + it.iteration_id, + it.iteration_seed, + it.frame_count, + covered, + "" if ratio is None else f"{ratio:.6f}", + iteration_hash(it), + ] + ) + return out_path diff --git a/e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py b/e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py new file mode 100644 index 0000000..87d2323 --- /dev/null +++ b/e2e/tests/resilience/test_nft_res_01_imu_only_fallback.py @@ -0,0 +1,228 @@ +"""NFT-RES-01 — 30 s IMU-only fallback drift bound (AZ-432 / AC-3.5, AC-NEW-7). + +Tier-1 OR Tier-2. Two sub-cases run sequentially per +``(fc_adapter, vio_strategy)``: + +* sub-case (a) ``no_imu`` — SUT runs with IMU input disabled + (``E2E_NFT_RES_01_DISABLE_IMU=1`` env propagated to the SUT, OR + empty IMU stream from the FC inbound proxy). Drift budget = 100 m. +* sub-case (b) ``good_imu_combined_factor`` — SUT default config. + Drift budget = 50 m. + +Each sub-case injects a 30 s pure-vision-blackout window (no spoof) +via ``fixtures/injectors/blackout_spoof.py --no-spoof`` and measures +the SUT's outbound estimate vs ground truth at blackout end. Drift +is Vincenty distance. + +Production dependency surfaced to AZ-595: the +``E2E_NFT_RES_01_FIXTURE`` env var names a JSON file (absolute path +or relative to ``E2E_SITL_REPLAY_DIR``) with shape: + + { + "window": {"onset_monotonic_ms": , "end_monotonic_ms": }, + "sub_cases": [ + { + "subcase": "no_imu", + "estimates": [{"monotonic_ms": , "lat_deg": , "lon_deg": }, ...], + "ground_truth": [{"monotonic_ms": , "lat_deg": , "lon_deg": }, ...] + }, + { + "subcase": "good_imu_combined_factor", + ... + } + ] + } + +Both sub-cases must be present; partial fixtures fail the test. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import imu_fallback_drift_evaluator as ife + +NFT_RES_01_FIXTURE_ENV_VAR = "E2E_NFT_RES_01_FIXTURE" +NFT_RES_01_DEFAULT_FIXTURE_NAME = "nft_res_01_imu_fallback.json" + + +@pytest.mark.scenario_id("nft-res-01") +@pytest.mark.traces_to("AC-3.5,AC-NEW-7,AC-1,AC-2,AC-3,AC-4") +def test_nft_res_01_imu_only_fallback( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1 (window) + AC-2 (no-IMU drift) + AC-3 (good-IMU drift).""" + if not sitl_replay_ready: + pytest.skip( + "NFT-RES-01 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying both sub-case " + "estimates + GT for a 30 s blackout window. Pure-logic drift " + "evaluation covered by " + "e2e/_unit_tests/helpers/test_imu_fallback_drift_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-RES-01: fixture not found at {fixture_path}. " + f"`{NFT_RES_01_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring " + "(window + sub-cases for no_imu + good_imu_combined_factor). " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + window, sub_cases = _parse_payload(payload, fixture_path) + if not window.window_in_spec: + pytest.fail( + f"NFT-RES-01: AC-1 violated — fixture window duration " + f"{window.duration_s:.2f}s outside [28, 32]s nominal-30s " + f"tolerance. Fixture path: {fixture_path}." + ) + + sub_case_names = {name for name, _, _ in sub_cases} + if sub_case_names != set(ife.ALLOWED_SUBCASES): + pytest.fail( + f"NFT-RES-01: fixture must contain both sub-cases " + f"{ife.ALLOWED_SUBCASES}; got {sorted(sub_case_names)}. " + f"Fixture path: {fixture_path}." + ) + + report = ife.evaluate(window, sub_cases=sub_cases) + out_csv = ( + evidence_dir + / "nft-res-01" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + ife.write_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_res_01.window_duration_s", + float(report.window.duration_s), + ac_id="AC-1", + ) + no_imu = report.by_subcase(ife.SUBCASE_NO_IMU) + good_imu = report.by_subcase(ife.SUBCASE_GOOD_IMU) + if no_imu.drift_m is not None: + nfr_recorder.record_metric( + "nft_res_01.no_imu_drift_m", float(no_imu.drift_m), ac_id="AC-2" + ) + if good_imu.drift_m is not None: + nfr_recorder.record_metric( + "nft_res_01.good_imu_drift_m", float(good_imu.drift_m), ac_id="AC-3" + ) + + assert report.passes_window, ( + f"AC-1: 30 s window not injected; observed {report.window.duration_s:.2f}s " + f"(tolerance ±{ife.WINDOW_TOLERANCE_S}s)" + ) + assert no_imu.passes, ( + f"AC-2: no-IMU drift {no_imu.drift_m} m > budget {no_imu.budget_m} m " + f"(estimate_end={no_imu.estimate_at_end}, gt_end={no_imu.gt_at_end})" + ) + assert good_imu.passes, ( + f"AC-3: good-IMU drift {good_imu.drift_m} m > budget {good_imu.budget_m} m " + f"(estimate_end={good_imu.estimate_at_end}, gt_end={good_imu.gt_at_end})" + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_RES_01_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_RES_01_FIXTURE_ENV_VAR}-unset>") + return root / NFT_RES_01_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[ + ife.BlackoutWindow, + list[tuple[str, list[ife.PositionSample], list[ife.PositionSample]]], +]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-RES-01: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + win_raw = payload.get("window") + if not isinstance(win_raw, dict): + pytest.fail( + f"NFT-RES-01: fixture {fixture_path} missing 'window' object" + ) + try: + window = ife.BlackoutWindow( + onset_monotonic_ms=int(win_raw["onset_monotonic_ms"]), + end_monotonic_ms=int(win_raw["end_monotonic_ms"]), + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-01: fixture {fixture_path} 'window' shape invalid: {exc}" + ) + + subs_raw = payload.get("sub_cases") + if not isinstance(subs_raw, list) or not subs_raw: + pytest.fail( + f"NFT-RES-01: fixture {fixture_path} 'sub_cases' must be a " + f"non-empty list" + ) + + parsed: list[tuple[str, list[ife.PositionSample], list[ife.PositionSample]]] = [] + for idx, entry in enumerate(subs_raw): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-RES-01: sub_cases[{idx}] in {fixture_path} must be " + f"an object; got {type(entry).__name__}" + ) + name = str(entry.get("subcase", "")) + if name not in ife.ALLOWED_SUBCASES: + pytest.fail( + f"NFT-RES-01: sub_cases[{idx}].subcase {name!r} not in " + f"{ife.ALLOWED_SUBCASES}" + ) + estimates = _parse_samples(entry.get("estimates"), fixture_path, f"sub_cases[{idx}].estimates") + ground_truth = _parse_samples(entry.get("ground_truth"), fixture_path, f"sub_cases[{idx}].ground_truth") + parsed.append((name, estimates, ground_truth)) + return window, parsed + + +def _parse_samples(raw: object, fixture_path: Path, where: str) -> list[ife.PositionSample]: + if not isinstance(raw, list): + pytest.fail( + f"NFT-RES-01: {where} in {fixture_path} must be a list of objects" + ) + out: list[ife.PositionSample] = [] + for j, entry in enumerate(raw): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-RES-01: {where}[{j}] in {fixture_path} must be an object" + ) + try: + out.append( + ife.PositionSample( + monotonic_ms=int(entry["monotonic_ms"]), + lat_deg=float(entry["lat_deg"]), + lon_deg=float(entry["lon_deg"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-01: {where}[{j}] in {fixture_path} shape invalid: {exc}" + ) + return out diff --git a/e2e/tests/resilience/test_nft_res_02_companion_reboot.py b/e2e/tests/resilience/test_nft_res_02_companion_reboot.py new file mode 100644 index 0000000..e510af0 --- /dev/null +++ b/e2e/tests/resilience/test_nft_res_02_companion_reboot.py @@ -0,0 +1,207 @@ +"""NFT-RES-02 — Companion mid-flight reboot recovery (AZ-433 / AC-5.2 + AC-5.3). + +Tier-1 OR Tier-2. Mid-Derkachi-replay restart (Docker on Tier-1, +systemd on Tier-2). Asserts: + +* AC-1 — process restarts within ≤5 s of the restart command. +* AC-2 — first post-restart outbound emission within ≤30 s of the + restart command. +* AC-3 — that first emission is within ≤100 m of GT. + +The runner harness owns the actual restart command + observation of the +process-up timestamp + capture of the first post-restart emission. The +scenario consumes a fixture that encodes the captured timestamps + the +first-emission estimate + GT-at-that-timestamp. + +Production dependency surfaced to AZ-595 / AZ-444: the +``E2E_NFT_RES_02_FIXTURE`` env var names a JSON file with shape: + + { + "restart_command_monotonic_ms": , + "process_restarted_monotonic_ms": , + "first_post_restart_emission_monotonic_ms": , + "first_post_restart_estimate": {"monotonic_ms": , "lat_deg": , "lon_deg": } | null, + "ground_truth_at_first_emission": {"monotonic_ms": , "lat_deg": , "lon_deg": } | null + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import companion_reboot_evaluator as cre + +NFT_RES_02_FIXTURE_ENV_VAR = "E2E_NFT_RES_02_FIXTURE" +NFT_RES_02_DEFAULT_FIXTURE_NAME = "nft_res_02_companion_reboot.json" + + +@pytest.mark.scenario_id("nft-res-02") +@pytest.mark.traces_to("AC-5.2,AC-5.3,AC-1,AC-2,AC-3,AC-4") +def test_nft_res_02_companion_reboot( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1 (restart trigger ≤5 s) + AC-2 (resume ≤30 s) + AC-3 (accuracy ≤100 m).""" + if not sitl_replay_ready: + pytest.skip( + "NFT-RES-02 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595 + AZ-444) carrying " + "restart-command + first-post-restart-emission timestamps + " + "GT-at-emission. Pure-logic AC verdicts covered by " + "e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-RES-02: fixture not found at {fixture_path}. " + f"`{NFT_RES_02_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependencies: AZ-595 (capture builder) + AZ-444 " + "(Tier-2 systemd restart orchestration)." + ) + + evidence = _parse_fixture(fixture_path) + report = cre.evaluate(evidence) + out_csv = ( + evidence_dir + / "nft-res-02" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + cre.write_csv_evidence(out_csv, report) + + if report.restart_trigger_latency_s is not None: + nfr_recorder.record_metric( + "nft_res_02.restart_trigger_latency_s", + float(report.restart_trigger_latency_s), + ac_id="AC-1", + ) + if report.resume_time_s is not None: + nfr_recorder.record_metric( + "nft_res_02.resume_time_s", + float(report.resume_time_s), + ac_id="AC-2", + ) + if report.first_emission_accuracy_m is not None: + nfr_recorder.record_metric( + "nft_res_02.first_emission_accuracy_m", + float(report.first_emission_accuracy_m), + ac_id="AC-3", + ) + + assert report.passes_restart_trigger, ( + f"AC-1: restart trigger latency = {report.restart_trigger_latency_s} s > budget " + f"{report.restart_trigger_budget_s} s (process_restarted_ms=" + f"{evidence.process_restarted_monotonic_ms})" + ) + assert report.passes_resume_time, ( + f"AC-2: resume time = {report.resume_time_s} s > budget " + f"{report.resume_budget_s} s (first_emission_ms=" + f"{evidence.first_post_restart_emission_monotonic_ms})" + ) + assert report.passes_first_emission_accuracy, ( + f"AC-3: first-emission accuracy = {report.first_emission_accuracy_m} m > " + f"budget {report.accuracy_budget_m} m " + f"(estimate={evidence.first_post_restart_estimate}, " + f"gt={evidence.ground_truth_at_first_emission})" + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_RES_02_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_RES_02_FIXTURE_ENV_VAR}-unset>") + return root / NFT_RES_02_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_fixture(fixture_path: Path) -> cre.RestartEvidence: + payload = json.loads(fixture_path.read_text()) + if not isinstance(payload, dict): + pytest.fail( + f"NFT-RES-02: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + try: + command_ms = int(payload["restart_command_monotonic_ms"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-02: fixture {fixture_path} missing/invalid " + f"'restart_command_monotonic_ms': {exc}" + ) + + process_ms = _maybe_int( + payload.get("process_restarted_monotonic_ms"), + fixture_path, + "process_restarted_monotonic_ms", + ) + first_emission_ms = _maybe_int( + payload.get("first_post_restart_emission_monotonic_ms"), + fixture_path, + "first_post_restart_emission_monotonic_ms", + ) + estimate = _maybe_geofix( + payload.get("first_post_restart_estimate"), + fixture_path, + "first_post_restart_estimate", + ) + gt = _maybe_geofix( + payload.get("ground_truth_at_first_emission"), + fixture_path, + "ground_truth_at_first_emission", + ) + + return cre.RestartEvidence( + restart_command_monotonic_ms=command_ms, + process_restarted_monotonic_ms=process_ms, + first_post_restart_emission_monotonic_ms=first_emission_ms, + first_post_restart_estimate=estimate, + ground_truth_at_first_emission=gt, + ) + + +def _maybe_int(raw: object, fixture_path: Path, where: str) -> int | None: + if raw is None: + return None + try: + return int(raw) + except (TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-02: {where} in {fixture_path} must be int or null: {exc}" + ) + return None # unreachable; pytest.fail raises + + +def _maybe_geofix(raw: object, fixture_path: Path, where: str) -> cre.GeoFix | None: + if raw is None: + return None + if not isinstance(raw, dict): + pytest.fail( + f"NFT-RES-02: {where} in {fixture_path} must be object or null" + ) + try: + return cre.GeoFix( + monotonic_ms=int(raw["monotonic_ms"]), + lat_deg=float(raw["lat_deg"]), + lon_deg=float(raw["lon_deg"]), + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-02: {where} in {fixture_path} shape invalid: {exc}" + ) + return None # unreachable; pytest.fail raises diff --git a/e2e/tests/resilience/test_nft_res_03_monte_carlo.py b/e2e/tests/resilience/test_nft_res_03_monte_carlo.py new file mode 100644 index 0000000..b86f53c --- /dev/null +++ b/e2e/tests/resilience/test_nft_res_03_monte_carlo.py @@ -0,0 +1,235 @@ +"""NFT-RES-03 — 100-iteration Monte Carlo statistical envelope (AZ-434 / AC-NEW-4). + +Tier-1 OR Tier-2. The runner orchestrates 100 Derkachi replays with +seeded perturbations (gain noise, IMU bias, frame-drop, outlier +injection) and supplies this scenario with a captured fixture +containing per-iteration per-frame ``(error_m, cov_semi_major_m)`` +pairs. The scenario validates: + +* AC-1 — iteration_count ≥ 100. +* AC-2 — same master_seed yields bit-identical iteration outcomes + (verified by re-evaluating the same fixture twice and comparing + ``determinism_fingerprint``). +* AC-3 — global aggregate envelope: + ``count(error_m ≤ 1.96 × cov_semi_major_m) / total ≥ 0.95``. +* AC-4 — parameterization: SHOULD run only one canonical + parameterization per CI invocation by default; full-matrix mode + gated behind ``E2E_NFT_RES_03_FULL_MATRIX=1``. The scenario uses + ``fc_adapter`` + ``vio_strategy`` fixtures so the harness param + matrix decides which combinations to run. + +Production dependency surfaced to AZ-595: the +``E2E_NFT_RES_03_FIXTURE`` env var names a JSON file with shape: + + { + "master_seed": , + "iterations": [ + { + "iteration_id": "iter-001", + "iteration_seed": , + "samples": [{"error_m": , "cov_semi_major_m": }, ...] + }, + ... + ] + } + +The harness MAY emit the fixture with a single canonical parameterization +per CI invocation by default — ``E2E_NFT_RES_03_FULL_MATRIX=1`` +unlocks the full 100 × N_params expansion. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import monte_carlo_envelope_evaluator as mce + +NFT_RES_03_FIXTURE_ENV_VAR = "E2E_NFT_RES_03_FIXTURE" +NFT_RES_03_DEFAULT_FIXTURE_NAME = "nft_res_03_monte_carlo.json" +NFT_RES_03_FULL_MATRIX_ENV_VAR = "E2E_NFT_RES_03_FULL_MATRIX" +NFT_RES_03_CANONICAL_FC = "ardupilot" +NFT_RES_03_CANONICAL_VIO = "okvis2" + + +@pytest.mark.scenario_id("nft-res-03") +@pytest.mark.traces_to("AC-NEW-4,AC-1,AC-2,AC-3,AC-4") +def test_nft_res_03_monte_carlo( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1 (iteration count) + AC-2 (determinism) + AC-3 (envelope) + AC-4 (param).""" + if not _full_matrix_enabled() and ( + fc_adapter != NFT_RES_03_CANONICAL_FC + or vio_strategy != NFT_RES_03_CANONICAL_VIO + ): + pytest.skip( + f"NFT-RES-03 AC-4: by default runs only canonical " + f"({NFT_RES_03_CANONICAL_FC}, {NFT_RES_03_CANONICAL_VIO}); " + f"set {NFT_RES_03_FULL_MATRIX_ENV_VAR}=1 to enable the " + f"100 × N_params full-matrix expansion." + ) + + if not sitl_replay_ready: + pytest.skip( + "NFT-RES-03 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying N≥100 " + "Monte Carlo iterations. Pure-logic AC-1 + AC-2 + AC-3 " + "covered by " + "e2e/_unit_tests/helpers/test_monte_carlo_envelope_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-RES-03: fixture not found at {fixture_path}. " + f"`{NFT_RES_03_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + master_seed, iterations = _parse_payload(payload, fixture_path) + + report1 = mce.evaluate(iterations, master_seed=master_seed) + report2 = mce.evaluate(iterations, master_seed=master_seed) + fingerprint = mce.determinism_fingerprint(report1) + fingerprint2 = mce.determinism_fingerprint(report2) + + out_base = ( + evidence_dir + / "nft-res-03" + / f"{fc_adapter}-{vio_strategy}" + ) + mce.write_csv_evidence(out_base.with_suffix(".csv"), report1) + mce.write_per_iteration_csv( + out_base.with_name(out_base.name + "-per-iter").with_suffix(".csv"), + report1, + ) + + nfr_recorder.record_metric( + "nft_res_03.iteration_count", float(report1.iteration_count), ac_id="AC-1" + ) + nfr_recorder.record_metric( + "nft_res_03.total_samples", float(report1.total_samples) + ) + if report1.envelope_ratio is not None: + nfr_recorder.record_metric( + "nft_res_03.envelope_ratio", float(report1.envelope_ratio), ac_id="AC-3" + ) + nfr_recorder.record_metric( + "nft_res_03.master_seed", float(report1.master_seed) + ) + + assert report1.passes_iteration_count, ( + f"AC-1: iteration_count={report1.iteration_count} < required " + f"{report1.min_iteration_count}" + ) + assert fingerprint == fingerprint2, ( + f"AC-2: determinism fingerprint differs across two evaluations of the " + f"same fixture: {fingerprint} vs {fingerprint2}" + ) + assert report1.passes_envelope, ( + f"AC-3: envelope ratio = {report1.envelope_ratio} < budget " + f"{report1.envelope_ratio_budget} " + f"(covered={report1.covered_samples}/{report1.total_samples})" + ) + + +def _full_matrix_enabled() -> bool: + return os.environ.get(NFT_RES_03_FULL_MATRIX_ENV_VAR, "").strip() in {"1", "true", "yes"} + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_RES_03_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_RES_03_FIXTURE_ENV_VAR}-unset>") + return root / NFT_RES_03_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[int, list[mce.IterationOutcome]]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-RES-03: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + try: + master_seed = int(payload["master_seed"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-03: fixture {fixture_path} missing/invalid " + f"'master_seed': {exc}" + ) + + raw_iters = payload.get("iterations") + if not isinstance(raw_iters, list) or not raw_iters: + pytest.fail( + f"NFT-RES-03: fixture {fixture_path} 'iterations' must be a " + f"non-empty list" + ) + + parsed: list[mce.IterationOutcome] = [] + for idx, entry in enumerate(raw_iters): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-RES-03: iterations[{idx}] in {fixture_path} must be " + f"an object; got {type(entry).__name__}" + ) + iter_id = str(entry.get("iteration_id") or f"iter-{idx:03d}") + try: + seed = int(entry["iteration_seed"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-03: iterations[{idx}].iteration_seed in " + f"{fixture_path} must be int: {exc}" + ) + raw_samples = entry.get("samples") + if not isinstance(raw_samples, list): + pytest.fail( + f"NFT-RES-03: iterations[{idx}].samples in {fixture_path} " + f"must be a list of objects" + ) + samples: list[mce.FrameSample] = [] + for j, s in enumerate(raw_samples): + if not isinstance(s, dict): + pytest.fail( + f"NFT-RES-03: iterations[{idx}].samples[{j}] in " + f"{fixture_path} must be an object" + ) + try: + samples.append( + mce.FrameSample( + error_m=float(s["error_m"]), + cov_semi_major_m=float(s["cov_semi_major_m"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-03: iterations[{idx}].samples[{j}] in " + f"{fixture_path} shape invalid: {exc}" + ) + parsed.append( + mce.IterationOutcome( + iteration_id=iter_id, + iteration_seed=seed, + samples=tuple(samples), + ) + ) + return master_seed, parsed diff --git a/e2e/tests/resilience/test_nft_res_04_blackout_escalation.py b/e2e/tests/resilience/test_nft_res_04_blackout_escalation.py new file mode 100644 index 0000000..4a25c3c --- /dev/null +++ b/e2e/tests/resilience/test_nft_res_04_blackout_escalation.py @@ -0,0 +1,227 @@ +"""NFT-RES-04 — 35 s blackout + spoof full escalation ladder (AZ-435 / AC-NEW-8 escalation). + +Tier-1 OR Tier-2. Sibling of FT-N-04 — same 35 s window with spoof, +but asserts the *full* escalation ladder fires in observable order +under tight latency budgets: + +* AC-1 — 100 m covariance → fix-type degrade within ≤500 ms. +* AC-2 — 500 m covariance OR 30 s elapsed → horiz_accuracy=999.0 + AND ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT within ≤500 ms. +* AC-ORDER — AC-1 crossing strictly precedes the AC-2 trigger. +* AC-3 — parameterized over (fc_adapter, vio_strategy). + +The runner consumes the same Derkachi replay + blackout-spoof +injector fixture as FT-N-04 (``E2E_SITL_REPLAY_DIR``), so the +``E2E_NFT_RES_04_FIXTURE`` env var defaults to the same payload. +This avoids duplicating the 35 s captured trace just for the +resilience-tier assertions. + +Production dependency surfaced to AZ-595: the fixture JSON has shape: + + { + "window": {"onset_monotonic_ms": , "end_monotonic_ms": }, + "estimates": [ + {"monotonic_ms": , "cov_semi_major_m": , + "horiz_accuracy": , "fix_type": }, ... + ], + "statustexts": [ + {"monotonic_ms": , "text": }, ... + ] + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import escalation_ladder_evaluator as ele + +NFT_RES_04_FIXTURE_ENV_VAR = "E2E_NFT_RES_04_FIXTURE" +NFT_RES_04_DEFAULT_FIXTURE_NAME = "nft_res_04_blackout_escalation.json" + + +@pytest.mark.scenario_id("nft-res-04") +@pytest.mark.traces_to("AC-NEW-8,AC-1,AC-2,AC-3") +def test_nft_res_04_blackout_escalation( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1 + AC-2 + AC-ORDER for the 35 s spoof+blackout window.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-RES-04 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the 35 s " + "spoof+blackout window with cov_semi_major_m, horiz_accuracy, " + "fix_type, and STATUSTEXT samples. Pure-logic AC-1/AC-2/AC-ORDER " + "covered by " + "e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-RES-04: fixture not found at {fixture_path}. " + f"`{NFT_RES_04_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + window, estimates, statustexts = _parse_payload(payload, fixture_path) + if not window.is_35s: + pytest.fail( + f"NFT-RES-04: window duration {window.duration_s:.2f}s outside " + f"35±2s — the resilience-tier scenario only meaningfully covers " + f"the 35 s sub-case; other sub-cases are owned by FT-N-04 " + f"({fixture_path})." + ) + + report = ele.evaluate(window, estimates=estimates, statustexts=statustexts) + out_csv = ( + evidence_dir + / "nft-res-04" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + ele.write_csv_evidence(out_csv, report) + + if report.fix_degrade.latency_ms is not None: + nfr_recorder.record_metric( + "nft_res_04.cov2d_to_fix_degrade_latency_ms", + float(report.fix_degrade.latency_ms), + ac_id="AC-1", + ) + if report.failsafe.horiz_999_latency_ms is not None: + nfr_recorder.record_metric( + "nft_res_04.failsafe_to_horiz999_latency_ms", + float(report.failsafe.horiz_999_latency_ms), + ac_id="AC-2", + ) + if report.failsafe.statustext_latency_ms is not None: + nfr_recorder.record_metric( + "nft_res_04.failsafe_to_statustext_latency_ms", + float(report.failsafe.statustext_latency_ms), + ac_id="AC-2", + ) + + assert report.fix_degrade.passes, ( + f"AC-1: cov-2d → fix-degrade latency = " + f"{report.fix_degrade.latency_ms} ms (budget {report.fix_degrade.budget_ms} ms); " + f"cov2d_at_ms={report.fix_degrade.cov2d_crossed_at_ms}, " + f"fix_degraded_at_ms={report.fix_degrade.fix_degraded_at_ms}" + ) + assert report.failsafe.passes, ( + f"AC-2: failsafe escalation incomplete; " + f"trigger_at_ms={report.failsafe.failsafe_trigger_at_ms}, " + f"horiz_999_latency_ms={report.failsafe.horiz_999_latency_ms}, " + f"statustext_latency_ms={report.failsafe.statustext_latency_ms}, " + f"budget {report.failsafe.budget_ms} ms" + ) + assert report.ordering.passes, ( + f"AC-ORDER: cov-2d crossing must strictly precede failsafe trigger; " + f"cov2d_at_ms={report.ordering.cov2d_at_ms}, " + f"failsafe_trigger_at_ms={report.ordering.failsafe_trigger_at_ms}" + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_RES_04_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_RES_04_FIXTURE_ENV_VAR}-unset>") + return root / NFT_RES_04_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[ + ele.BlackoutWindow, + list[ele.EstimateSample], + list[ele.StatustextSample], +]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-RES-04: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + win_raw = payload.get("window") + if not isinstance(win_raw, dict): + pytest.fail( + f"NFT-RES-04: fixture {fixture_path} missing 'window' object" + ) + try: + window = ele.BlackoutWindow( + onset_monotonic_ms=int(win_raw["onset_monotonic_ms"]), + end_monotonic_ms=int(win_raw["end_monotonic_ms"]), + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-04: fixture {fixture_path} 'window' shape invalid: {exc}" + ) + + raw_estimates = payload.get("estimates") + if not isinstance(raw_estimates, list): + pytest.fail( + f"NFT-RES-04: fixture {fixture_path} 'estimates' must be a list" + ) + estimates: list[ele.EstimateSample] = [] + for idx, entry in enumerate(raw_estimates): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-RES-04: estimates[{idx}] in {fixture_path} must be " + f"an object; got {type(entry).__name__}" + ) + try: + estimates.append( + ele.EstimateSample( + monotonic_ms=int(entry["monotonic_ms"]), + cov_semi_major_m=float(entry["cov_semi_major_m"]), + horiz_accuracy=float(entry["horiz_accuracy"]), + fix_type=int(entry["fix_type"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-04: estimates[{idx}] in {fixture_path} shape invalid: {exc}" + ) + + raw_st = payload.get("statustexts", []) + if not isinstance(raw_st, list): + pytest.fail( + f"NFT-RES-04: fixture {fixture_path} 'statustexts' must be a list " + "(may be empty)" + ) + statustexts: list[ele.StatustextSample] = [] + for idx, entry in enumerate(raw_st): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-RES-04: statustexts[{idx}] in {fixture_path} must be " + f"an object" + ) + try: + statustexts.append( + ele.StatustextSample( + monotonic_ms=int(entry["monotonic_ms"]), + text=str(entry["text"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-RES-04: statustexts[{idx}] in {fixture_path} shape invalid: {exc}" + ) + + return window, estimates, statustexts