Files
gps-denied-onboard/e2e/tests/security/test_nft_sec_01_cache_poisoning.py
Oleksandr Bezdieniezhnykh c56d4584e6 [AZ-436] [AZ-437] [AZ-438] [AZ-439] Add NFT-SEC-01..05 security scenarios
Batch 87: 6 NFT-SEC blackbox scenarios + 5 helper evaluators + 75 unit
tests + cumulative review batches 85-87.

* AZ-436 NFT-SEC-01: cache-poisoning safety budget (AC-NEW-9); aggregate
  false_trust_count ≤ N×1e-6; zero-tolerance default. Canonical-only by
  default; E2E_NFT_SEC_01_RELEASE_GATE=1 unlocks full matrix.
* AZ-437 NFT-SEC-02 + NFT-SEC-05: shared egress-observation evaluator
  (AC-NEW-10); SEC-02 = 0 packets to non-e2e-net over 5min replay;
  SEC-05 = DNS-blackhole sidecar healthy + lookup fails + UDP-53 silent.
* AZ-438 NFT-SEC-03: AP-only signing rejection (AC-NEW-11); 3 sub-cases
  (unsigned/wrong-key/replayed) each reject ≤500ms + no position drift.
* AZ-439 NFT-SEC-04: probe (always-run) = no-crash + deterministic
  decode outcome; ASan-fuzz (release-gate) = 0 findings ≥4h; AC-3
  corpus floor informational only per spec.

Verdict per-batch: PASS_WITH_WARNINGS (5 Low). Cumulative review for
batches 85-87 (K=3 window) also PASS_WITH_WARNINGS with 5 cross-batch
findings — recommends hygiene PBIs for write_csv_evidence duplication
(13 helpers) and _resolve_fixture_path duplication (13 scenarios), plus
new tickets for AZ-595 fixture builder + DNS-blackhole sidecar service.

Also adds _docs/LESSONS.md documenting the Jira transition-ID lesson
(always call getTransitionsForJiraIssue first, never memorize numeric
IDs across sessions).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:33:22 +03:00

238 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NFT-SEC-01 — Cache-poisoning safety probability ≤ 1e-6 / flight (AZ-436 / AC-NEW-9).
Tier-1 OR Tier-2. ``N`` synthetic micro-flights (~60 s each) carry ``[1 %,
5 %]`` of crafted poisoned tiles (signing-cert mismatch, freshness
violation, voting disagreement). The SUT MUST reject or downgrade them
via three independent defense layers and emit zero false-trust events.
Default CI runs N=1000 with a single canonical parameterization to keep
total runtime bounded; the full release-gate run is N=10000 across
``(fc_adapter × vio_strategy)`` and is gated behind
``E2E_NFT_SEC_01_RELEASE_GATE=1``.
Production dependencies surfaced to the cumulative review window:
* **AZ-595**: emit ``nft_sec_01_cache_poisoning.json`` containing
per-flight tile-cache slates + runner-collected false-trust events
+ per-flight ``rejection_reasons`` counter — see fixture JSON shape
in the docstring of ``_parse_payload``.
* **SUT**: outbound ``source_label`` MUST carry the ``tile_id`` so the
runner can match a ``satellite_anchored`` frame back to a poisoned
tile; otherwise false-trust events cannot be detected reliably.
Pure aggregate-budget logic is fully covered by
``e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py``; the
scenario test only validates the fixture parser, the AC assertions, and
the conftest skip-rules.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from runner.helpers import cache_poisoning_evaluator as cpe
NFT_SEC_01_FIXTURE_ENV_VAR = "E2E_NFT_SEC_01_FIXTURE"
NFT_SEC_01_DEFAULT_FIXTURE_NAME = "nft_sec_01_cache_poisoning.json"
NFT_SEC_01_RELEASE_GATE_ENV_VAR = "E2E_NFT_SEC_01_RELEASE_GATE"
NFT_SEC_01_CI_MIN_FLIGHTS = 1000
@pytest.mark.scenario_id("nft-sec-01")
@pytest.mark.traces_to("AC-NEW-9,AC-1,AC-2,AC-3,AC-4")
def test_nft_sec_01_cache_poisoning(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""Aggregate false-trust count ≤ N × 1e-6 (zero-tolerance default)."""
release_gate = _release_gate_enabled()
if not release_gate and not _is_canonical_param(fc_adapter, vio_strategy):
pytest.skip(
"NFT-SEC-01 default CI run uses a single canonical "
"parameterization (ardupilot, okvis2) to keep N=1000 × 4 "
"Monte Carlo cost bounded. Set "
f"`{NFT_SEC_01_RELEASE_GATE_ENV_VAR}=1` for the full matrix."
)
if not sitl_replay_ready:
pytest.skip(
"NFT-SEC-01 requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595) carrying the N "
"synthetic flights with crafted poisoned tiles. Pure "
"aggregate-budget logic covered by "
"e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py."
)
fixture_path = _resolve_fixture_path()
if not fixture_path.is_file():
pytest.fail(
f"NFT-SEC-01: fixture not found at {fixture_path}. "
f"`{NFT_SEC_01_FIXTURE_ENV_VAR}` env var must point at a JSON "
"file with the schema documented in the scenario docstring. "
"Production dependency: AZ-595."
)
payload = json.loads(fixture_path.read_text())
flights = _parse_payload(payload, fixture_path)
if len(flights) < NFT_SEC_01_CI_MIN_FLIGHTS and not release_gate:
pytest.fail(
f"NFT-SEC-01 AC-1: fixture provides only {len(flights)} flights "
f"but the CI default requires ≥{NFT_SEC_01_CI_MIN_FLIGHTS}. "
f"Set `{NFT_SEC_01_RELEASE_GATE_ENV_VAR}=1` to allow shorter runs "
"for debugging."
)
report = cpe.evaluate(flights)
out_csv = (
evidence_dir
/ "nft-sec-01"
/ f"{fc_adapter}-{vio_strategy}.csv"
)
cpe.write_csv_evidence(out_csv, report)
nfr_recorder.record_metric(
"nft_sec_01.flight_count",
float(report.flight_count),
ac_id="AC-1",
)
nfr_recorder.record_metric(
"nft_sec_01.total_false_trust",
float(report.total_false_trust),
ac_id="AC-3",
)
nfr_recorder.record_metric(
"nft_sec_01.budget",
report.budget,
ac_id="AC-3",
)
assert report.passes_ratio, (
"AC-2: poison ratio outside [1%, 5%] in flights: "
f"{list(report.flights_with_bad_poison_ratio)[:10]}"
)
assert report.passes_layer_coverage, (
"AC-2: at least one defense layer absent from flight: "
f"{list(report.flights_missing_defense_layers)[:10]}"
)
assert report.passes_rejection_reason_vocabulary, (
"AC-2 evidence: unknown rejection_reason vocabulary in flights: "
f"{list(report.flights_with_unknown_rejection_reasons)[:10]}"
)
assert report.passes_budget, (
f"AC-3: total_false_trust = {report.total_false_trust} "
f"(budget {report.budget:g} expected events at N={report.flight_count}; "
"zero-tolerance default — see Mode B Fact #103)."
)
def _release_gate_enabled() -> bool:
return os.environ.get(NFT_SEC_01_RELEASE_GATE_ENV_VAR, "").strip().lower() in (
"1",
"true",
"yes",
)
def _is_canonical_param(fc_adapter: str, vio_strategy: str) -> bool:
return fc_adapter == "ardupilot" and vio_strategy == "okvis2"
def _resolve_fixture_path() -> Path:
raw = os.environ.get(NFT_SEC_01_FIXTURE_ENV_VAR, "").strip()
from runner.helpers import sitl_observer
root = sitl_observer.replay_dir()
if not raw:
if root is None:
return Path(f"<{NFT_SEC_01_FIXTURE_ENV_VAR}-unset>")
return root / NFT_SEC_01_DEFAULT_FIXTURE_NAME
path = Path(raw)
if not path.is_absolute() and root is not None:
path = root / path
return path
def _parse_payload(
payload: object, fixture_path: Path
) -> list[cpe.FlightOutcome]:
"""Parse the fixture into typed ``FlightOutcome`` records.
Expected shape:
{
"flights": [
{
"flight_id": "<str>",
"total_tile_count": <int>,
"poisoned_tiles": [
{"tile_id": "<str>", "defense_layer": "<str>"}, ...
],
"false_trust_events": [
{"flight_id": "<str>", "tile_id": "<str>",
"monotonic_ms": <int>, "defense_layer": "<str>"}, ...
],
"rejection_reasons": {"<reason>": <int>, ...}
}, ...
]
}
"""
if not isinstance(payload, dict):
pytest.fail(
f"NFT-SEC-01: fixture {fixture_path} must be a JSON object; "
f"got top-level type={type(payload).__name__}"
)
raw_flights = payload.get("flights")
if not isinstance(raw_flights, list):
pytest.fail(
f"NFT-SEC-01: fixture {fixture_path} 'flights' must be a list"
)
flights: list[cpe.FlightOutcome] = []
for idx, entry in enumerate(raw_flights):
if not isinstance(entry, dict):
pytest.fail(
f"NFT-SEC-01: flights[{idx}] in {fixture_path} must be "
f"an object; got {type(entry).__name__}"
)
try:
poisoned = tuple(
cpe.PoisonedTileSpec(
tile_id=str(p["tile_id"]),
defense_layer=str(p["defense_layer"]),
)
for p in entry.get("poisoned_tiles", [])
)
false_trust = tuple(
cpe.FalseTrustEvent(
flight_id=str(e.get("flight_id", entry["flight_id"])),
tile_id=str(e["tile_id"]),
monotonic_ms=int(e["monotonic_ms"]),
defense_layer=str(e["defense_layer"]),
)
for e in entry.get("false_trust_events", [])
)
rejection_reasons = {
str(k): int(v)
for k, v in (entry.get("rejection_reasons") or {}).items()
}
flights.append(
cpe.FlightOutcome(
flight_id=str(entry["flight_id"]),
total_tile_count=int(entry["total_tile_count"]),
poisoned_tiles=poisoned,
false_trust_events=false_trust,
rejection_reasons=rejection_reasons,
)
)
except (KeyError, TypeError, ValueError) as exc:
pytest.fail(
f"NFT-SEC-01: flights[{idx}] in {fixture_path} shape invalid: {exc}"
)
return flights