Files
gps-denied-onboard/e2e/tests/resilience/test_nft_res_02_companion_reboot.py
T
Oleksandr Bezdieniezhnykh 330893be5c [AZ-432] [AZ-433] [AZ-434] [AZ-435] Add NFT-RES-01..04 resilience scenarios
Batch 86: 4 NFT-RES blackbox scenarios + 4 helper evaluators + 74 unit
tests + directory-layout registration.

* AZ-432 NFT-RES-01: 30 s IMU-only fallback drift bound (AC-3.5 + AC-NEW-7);
  two sub-cases (no_imu ≤100m, good_imu_combined_factor ≤50m).
* AZ-433 NFT-RES-02: companion mid-flight reboot (AC-5.2 + AC-5.3); resume
  ≤30s + first-emission accuracy ≤100m.
* AZ-434 NFT-RES-03: 100-iteration Monte Carlo envelope (AC-NEW-4);
  iteration-count + master-seed determinism + envelope ratio ≥0.95.
  Canonical-param by default; E2E_NFT_RES_03_FULL_MATRIX=1 unlocks matrix.
* AZ-435 NFT-RES-04: 35s blackout+spoof escalation ladder (AC-NEW-8);
  AC-1 (cov-2d→fix-degrade ≤500ms) + AC-2 (failsafe→999+STATUSTEXT
  ≤500ms) + AC-ORDER (strict ordering).

Verdict: PASS_WITH_WARNINGS (0 Critical, 0 High, 0 Medium, 5 Low).
F5 documents intentional threshold duplication with blackout_spoof
evaluator (prevents contract drift between FT-N-04 and NFT-RES-04).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:09:04 +03:00

208 lines
7.3 KiB
Python

"""NFT-RES-02 — Companion mid-flight reboot recovery (AZ-433 / AC-5.2 + AC-5.3).
Tier-1 OR Tier-2. Mid-Derkachi-replay restart (Docker on Tier-1,
systemd on Tier-2). Asserts:
* AC-1 — process restarts within ≤5 s of the restart command.
* AC-2 — first post-restart outbound emission within ≤30 s of the
restart command.
* AC-3 — that first emission is within ≤100 m of GT.
The runner harness owns the actual restart command + observation of the
process-up timestamp + capture of the first post-restart emission. The
scenario consumes a fixture that encodes the captured timestamps + the
first-emission estimate + GT-at-that-timestamp.
Production dependency surfaced to AZ-595 / AZ-444: the
``E2E_NFT_RES_02_FIXTURE`` env var names a JSON file with shape:
{
"restart_command_monotonic_ms": <int>,
"process_restarted_monotonic_ms": <int | null>,
"first_post_restart_emission_monotonic_ms": <int | null>,
"first_post_restart_estimate": {"monotonic_ms": <int>, "lat_deg": <f>, "lon_deg": <f>} | null,
"ground_truth_at_first_emission": {"monotonic_ms": <int>, "lat_deg": <f>, "lon_deg": <f>} | null
}
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import companion_reboot_evaluator as cre
NFT_RES_02_FIXTURE_ENV_VAR = "E2E_NFT_RES_02_FIXTURE"
NFT_RES_02_DEFAULT_FIXTURE_NAME = "nft_res_02_companion_reboot.json"
@pytest.mark.scenario_id("nft-res-02")
@pytest.mark.traces_to("AC-5.2,AC-5.3,AC-1,AC-2,AC-3,AC-4")
def test_nft_res_02_companion_reboot(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""AC-1 (restart trigger ≤5 s) + AC-2 (resume ≤30 s) + AC-3 (accuracy ≤100 m)."""
if not sitl_replay_ready:
pytest.skip(
"NFT-RES-02 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595 + AZ-444) carrying "
"restart-command + first-post-restart-emission timestamps + "
"GT-at-emission. Pure-logic AC verdicts covered by "
"e2e/_unit_tests/helpers/test_companion_reboot_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-RES-02: fixture not found at {fixture_path}. "
f"`{NFT_RES_02_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependencies: AZ-595 (capture builder) + AZ-444 "
"(Tier-2 systemd restart orchestration)."
)
evidence = _parse_fixture(fixture_path)
report = cre.evaluate(evidence)
out_csv = (
evidence_dir
/ "nft-res-02"
/ f"{fc_adapter}-{vio_strategy}.csv"
)
cre.write_csv_evidence(out_csv, report)
if report.restart_trigger_latency_s is not None:
nfr_recorder.record_metric(
"nft_res_02.restart_trigger_latency_s",
float(report.restart_trigger_latency_s),
ac_id="AC-1",
)
if report.resume_time_s is not None:
nfr_recorder.record_metric(
"nft_res_02.resume_time_s",
float(report.resume_time_s),
ac_id="AC-2",
)
if report.first_emission_accuracy_m is not None:
nfr_recorder.record_metric(
"nft_res_02.first_emission_accuracy_m",
float(report.first_emission_accuracy_m),
ac_id="AC-3",
)
assert report.passes_restart_trigger, (
f"AC-1: restart trigger latency = {report.restart_trigger_latency_s} s > budget "
f"{report.restart_trigger_budget_s} s (process_restarted_ms="
f"{evidence.process_restarted_monotonic_ms})"
)
assert report.passes_resume_time, (
f"AC-2: resume time = {report.resume_time_s} s > budget "
f"{report.resume_budget_s} s (first_emission_ms="
f"{evidence.first_post_restart_emission_monotonic_ms})"
)
assert report.passes_first_emission_accuracy, (
f"AC-3: first-emission accuracy = {report.first_emission_accuracy_m} m > "
f"budget {report.accuracy_budget_m} m "
f"(estimate={evidence.first_post_restart_estimate}, "
f"gt={evidence.ground_truth_at_first_emission})"
)
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_RES_02_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_RES_02_FIXTURE_ENV_VAR}-unset>")
return root / NFT_RES_02_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_fixture(fixture_path: Path) -> cre.RestartEvidence:
payload = json.loads(fixture_path.read_text())
if not isinstance(payload, dict):
pytest.fail(
f"NFT-RES-02: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
try:
command_ms = int(payload["restart_command_monotonic_ms"])
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-02: fixture {fixture_path} missing/invalid "
f"'restart_command_monotonic_ms': {exc}"
)
process_ms = _maybe_int(
payload.get("process_restarted_monotonic_ms"),
fixture_path,
"process_restarted_monotonic_ms",
)
first_emission_ms = _maybe_int(
payload.get("first_post_restart_emission_monotonic_ms"),
fixture_path,
"first_post_restart_emission_monotonic_ms",
)
estimate = _maybe_geofix(
payload.get("first_post_restart_estimate"),
fixture_path,
"first_post_restart_estimate",
)
gt = _maybe_geofix(
payload.get("ground_truth_at_first_emission"),
fixture_path,
"ground_truth_at_first_emission",
)
return cre.RestartEvidence(
restart_command_monotonic_ms=command_ms,
process_restarted_monotonic_ms=process_ms,
first_post_restart_emission_monotonic_ms=first_emission_ms,
first_post_restart_estimate=estimate,
ground_truth_at_first_emission=gt,
)
def _maybe_int(raw: object, fixture_path: Path, where: str) -> int | None:
if raw is None:
return None
try:
return int(raw)
except (TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-02: {where} in {fixture_path} must be int or null: {exc}"
)
return None # unreachable; pytest.fail raises
def _maybe_geofix(raw: object, fixture_path: Path, where: str) -> cre.GeoFix | None:
if raw is None:
return None
if not isinstance(raw, dict):
pytest.fail(
f"NFT-RES-02: {where} in {fixture_path} must be object or null"
)
try:
return cre.GeoFix(
monotonic_ms=int(raw["monotonic_ms"]),
lat_deg=float(raw["lat_deg"]),
lon_deg=float(raw["lon_deg"]),
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-RES-02: {where} in {fixture_path} shape invalid: {exc}"
)
return None # unreachable; pytest.fail raises