Files
gps-denied-onboard/e2e/tests/resilience/test_nft_res_04_blackout_escalation.py
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

228 lines
8.3 KiB
Python

"""NFT-RES-04 — 35 s blackout + spoof full escalation ladder (AZ-435 / AC-NEW-8 escalation).
Tier-1 OR Tier-2. Sibling of FT-N-04 — same 35 s window with spoof,
but asserts the *full* escalation ladder fires in observable order
under tight latency budgets:
* AC-1 — 100 m covariance → fix-type degrade within ≤500 ms.
* AC-2 — 500 m covariance OR 30 s elapsed → horiz_accuracy=999.0
AND ``VISUAL_BLACKOUT_FAILSAFE`` STATUSTEXT within ≤500 ms.
* AC-ORDER — AC-1 crossing strictly precedes the AC-2 trigger.
* AC-3 — parameterized over (fc_adapter, vio_strategy).
The runner consumes the same Derkachi replay + blackout-spoof
injector fixture as FT-N-04 (``E2E_SITL_REPLAY_DIR``), so the
``E2E_NFT_RES_04_FIXTURE`` env var defaults to the same payload.
This avoids duplicating the 35 s captured trace just for the
resilience-tier assertions.
Production dependency surfaced to AZ-595: the fixture JSON has shape:
{
"window": {"onset_monotonic_ms": <int>, "end_monotonic_ms": <int>},
"estimates": [
{"monotonic_ms": <int>, "cov_semi_major_m": <f>,
"horiz_accuracy": <f>, "fix_type": <int>}, ...
],
"statustexts": [
{"monotonic_ms": <int>, "text": <str>}, ...
]
}
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import escalation_ladder_evaluator as ele
NFT_RES_04_FIXTURE_ENV_VAR = "E2E_NFT_RES_04_FIXTURE"
NFT_RES_04_DEFAULT_FIXTURE_NAME = "nft_res_04_blackout_escalation.json"
@pytest.mark.scenario_id("nft-res-04")
@pytest.mark.traces_to("AC-NEW-8,AC-1,AC-2,AC-3")
def test_nft_res_04_blackout_escalation(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-1 + AC-2 + AC-ORDER for the 35 s spoof+blackout window."""
if not sitl_replay_ready:
pytest.skip(
"NFT-RES-04 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying the 35 s "
"spoof+blackout window with cov_semi_major_m, horiz_accuracy, "
"fix_type, and STATUSTEXT samples. Pure-logic AC-1/AC-2/AC-ORDER "
"covered by "
"e2e/_unit_tests/helpers/test_escalation_ladder_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-RES-04: fixture not found at {fixture_path}. "
f"`{NFT_RES_04_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
window, estimates, statustexts = _parse_payload(payload, fixture_path)
if not window.is_35s:
pytest.fail(
f"NFT-RES-04: window duration {window.duration_s:.2f}s outside "
f"35±2s — the resilience-tier scenario only meaningfully covers "
f"the 35 s sub-case; other sub-cases are owned by FT-N-04 "
f"({fixture_path})."
)
report = ele.evaluate(window, estimates=estimates, statustexts=statustexts)
out_csv = (
evidence_dir
/ "nft-res-04"
/ f"{fc_adapter}-{vio_strategy}.csv"
)
ele.write_csv_evidence(out_csv, report)
if report.fix_degrade.latency_ms is not None:
nfr_recorder.record_metric(
"nft_res_04.cov2d_to_fix_degrade_latency_ms",
float(report.fix_degrade.latency_ms),
ac_id="AC-1",
)
if report.failsafe.horiz_999_latency_ms is not None:
nfr_recorder.record_metric(
"nft_res_04.failsafe_to_horiz999_latency_ms",
float(report.failsafe.horiz_999_latency_ms),
ac_id="AC-2",
)
if report.failsafe.statustext_latency_ms is not None:
nfr_recorder.record_metric(
"nft_res_04.failsafe_to_statustext_latency_ms",
float(report.failsafe.statustext_latency_ms),
ac_id="AC-2",
)
assert report.fix_degrade.passes, (
f"AC-1: cov-2d → fix-degrade latency = "
f"{report.fix_degrade.latency_ms} ms (budget {report.fix_degrade.budget_ms} ms); "
f"cov2d_at_ms={report.fix_degrade.cov2d_crossed_at_ms}, "
f"fix_degraded_at_ms={report.fix_degrade.fix_degraded_at_ms}"
)
assert report.failsafe.passes, (
f"AC-2: failsafe escalation incomplete; "
f"trigger_at_ms={report.failsafe.failsafe_trigger_at_ms}, "
f"horiz_999_latency_ms={report.failsafe.horiz_999_latency_ms}, "
f"statustext_latency_ms={report.failsafe.statustext_latency_ms}, "
f"budget {report.failsafe.budget_ms} ms"
)
assert report.ordering.passes, (
f"AC-ORDER: cov-2d crossing must strictly precede failsafe trigger; "
f"cov2d_at_ms={report.ordering.cov2d_at_ms}, "
f"failsafe_trigger_at_ms={report.ordering.failsafe_trigger_at_ms}"
)
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_RES_04_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_RES_04_FIXTURE_ENV_VAR}-unset>")
return root / NFT_RES_04_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> tuple[
ele.BlackoutWindow,
list[ele.EstimateSample],
list[ele.StatustextSample],
]:
if not isinstance(payload, dict):
pytest.fail(
f"NFT-RES-04: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
win_raw = payload.get("window")
if not isinstance(win_raw, dict):
pytest.fail(
f"NFT-RES-04: fixture {fixture_path} missing 'window' object"
)
try:
window = ele.BlackoutWindow(
onset_monotonic_ms=int(win_raw["onset_monotonic_ms"]),
end_monotonic_ms=int(win_raw["end_monotonic_ms"]),
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-04: fixture {fixture_path} 'window' shape invalid: {exc}"
)
raw_estimates = payload.get("estimates")
if not isinstance(raw_estimates, list):
pytest.fail(
f"NFT-RES-04: fixture {fixture_path} 'estimates' must be a list"
)
estimates: list[ele.EstimateSample] = []
for idx, entry in enumerate(raw_estimates):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-RES-04: estimates[{idx}] in {fixture_path} must be "
f"an object; got {type(entry).__name__}"
)
try:
estimates.append(
ele.EstimateSample(
monotonic_ms=int(entry["monotonic_ms"]),
cov_semi_major_m=float(entry["cov_semi_major_m"]),
horiz_accuracy=float(entry["horiz_accuracy"]),
fix_type=int(entry["fix_type"]),
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-04: estimates[{idx}] in {fixture_path} shape invalid: {exc}"
)
raw_st = payload.get("statustexts", [])
if not isinstance(raw_st, list):
pytest.fail(
f"NFT-RES-04: fixture {fixture_path} 'statustexts' must be a list "
"(may be empty)"
)
statustexts: list[ele.StatustextSample] = []
for idx, entry in enumerate(raw_st):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-RES-04: statustexts[{idx}] in {fixture_path} must be "
f"an object"
)
try:
statustexts.append(
ele.StatustextSample(
monotonic_ms=int(entry["monotonic_ms"]),
text=str(entry["text"]),
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-04: statustexts[{idx}] in {fixture_path} shape invalid: {exc}"
)
return window, estimates, statustexts